summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/bcast.c280
-rw-r--r--net/tipc/bcast.h14
-rw-r--r--net/tipc/bearer.c153
-rw-r--r--net/tipc/bearer.h47
-rw-r--r--net/tipc/config.c12
-rw-r--r--net/tipc/core.c14
-rw-r--r--net/tipc/core.h10
-rw-r--r--net/tipc/discover.c281
-rw-r--r--net/tipc/discover.h1
-rw-r--r--net/tipc/eth_media.c51
-rw-r--r--net/tipc/handler.c134
-rw-r--r--net/tipc/ib_media.c34
-rw-r--r--net/tipc/link.c1002
-rw-r--r--net/tipc/link.h28
-rw-r--r--net/tipc/msg.c407
-rw-r--r--net/tipc/msg.h40
-rw-r--r--net/tipc/name_distr.c144
-rw-r--r--net/tipc/name_distr.h35
-rw-r--r--net/tipc/name_table.c14
-rw-r--r--net/tipc/net.c132
-rw-r--r--net/tipc/net.h6
-rw-r--r--net/tipc/netlink.c2
-rw-r--r--net/tipc/node.c128
-rw-r--r--net/tipc/node.h105
-rw-r--r--net/tipc/node_subscr.c13
-rw-r--r--net/tipc/node_subscr.h2
-rw-r--r--net/tipc/port.c471
-rw-r--r--net/tipc/port.h58
-rw-r--r--net/tipc/socket.c628
-rw-r--r--net/tipc/socket.h20
31 files changed, 1960 insertions, 2308 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index b282f7130d2..a080c66d819 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -5,7 +5,7 @@
obj-$(CONFIG_TIPC) := tipc.o
tipc-y += addr.o bcast.o bearer.o config.o \
- core.o handler.o link.o discover.o msg.o \
+ core.o link.o discover.o msg.o \
name_distr.o subscr.o name_table.o net.o \
netlink.o node.o node_subscr.o port.o ref.o \
socket.o log.o eth_media.o server.o
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 95ab5ef9292..dd13bfa0933 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
/*
* net/tipc/bcast.c: TIPC broadcast code
*
- * Copyright (c) 2004-2006, Ericsson AB
+ * Copyright (c) 2004-2006, 2014, Ericsson AB
* Copyright (c) 2004, Intel Corporation.
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
@@ -38,6 +38,8 @@
#include "core.h"
#include "link.h"
#include "port.h"
+#include "socket.h"
+#include "msg.h"
#include "bcast.h"
#include "name_distr.h"
@@ -71,7 +73,7 @@ struct tipc_bcbearer_pair {
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
- * prevented through use of the spinlock "bc_lock".
+ * prevented through use of the spinlock "bclink_lock".
*/
struct tipc_bcbearer {
struct tipc_bearer bearer;
@@ -84,34 +86,69 @@ struct tipc_bcbearer {
/**
* struct tipc_bclink - link used for broadcast messages
+ * @lock: spinlock governing access to structure
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
+ * @flags: represent bclink states
* @bcast_nodes: map of broadcast-capable nodes
* @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
struct tipc_bclink {
+ spinlock_t lock;
struct tipc_link link;
struct tipc_node node;
+ unsigned int flags;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
};
-static struct tipc_bcbearer bcast_bearer;
-static struct tipc_bclink bcast_link;
-
-static struct tipc_bcbearer *bcbearer = &bcast_bearer;
-static struct tipc_bclink *bclink = &bcast_link;
-static struct tipc_link *bcl = &bcast_link.link;
-
-static DEFINE_SPINLOCK(bc_lock);
+static struct tipc_bcbearer *bcbearer;
+static struct tipc_bclink *bclink;
+static struct tipc_link *bcl;
const char tipc_bclink_name[] = "broadcast-link";
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff);
+static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
+static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
+
+static void tipc_bclink_lock(void)
+{
+ spin_lock_bh(&bclink->lock);
+}
+
+static void tipc_bclink_unlock(void)
+{
+ struct tipc_node *node = NULL;
+
+ if (likely(!bclink->flags)) {
+ spin_unlock_bh(&bclink->lock);
+ return;
+ }
+
+ if (bclink->flags & TIPC_BCLINK_RESET) {
+ bclink->flags &= ~TIPC_BCLINK_RESET;
+ node = tipc_bclink_retransmit_to();
+ }
+ spin_unlock_bh(&bclink->lock);
+
+ if (node)
+ tipc_link_reset_all(node);
+}
+
+uint tipc_bclink_get_mtu(void)
+{
+ return MAX_PKT_DEFAULT_MCAST;
+}
+
+void tipc_bclink_set_flags(unsigned int flags)
+{
+ bclink->flags |= flags;
+}
static u32 bcbuf_acks(struct sk_buff *buf)
{
@@ -130,16 +167,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
void tipc_bclink_add_node(u32 addr)
{
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
tipc_nmap_add(&bclink->bcast_nodes, addr);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
}
void tipc_bclink_remove_node(u32 addr)
{
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
tipc_nmap_remove(&bclink->bcast_nodes, addr);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
}
static void bclink_set_last_sent(void)
@@ -165,7 +202,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
/**
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
- * Called with bc_lock locked
+ * Called with bclink_lock locked
*/
struct tipc_node *tipc_bclink_retransmit_to(void)
{
@@ -177,7 +214,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
*
- * Called with bc_lock locked
+ * Called with bclink_lock locked
*/
static void bclink_retransmit_pkt(u32 after, u32 to)
{
@@ -194,7 +231,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
* @n_ptr: node that sent acknowledgement info
* @acked: broadcast sequence # that has been acknowledged
*
- * Node is locked, bc_lock unlocked.
+ * Node is locked, bclink_lock unlocked.
*/
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
@@ -202,8 +239,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
struct sk_buff *next;
unsigned int released = 0;
- spin_lock_bh(&bc_lock);
-
+ tipc_bclink_lock();
/* Bail out if tx queue is empty (no clean up is required) */
crs = bcl->first_out;
if (!crs)
@@ -267,13 +303,13 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
if (unlikely(released && !list_empty(&bcl->waiting_ports)))
tipc_link_wakeup_ports(bcl, 0);
exit:
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
}
/**
* tipc_bclink_update_link_state - update broadcast link state
*
- * tipc_net_lock and node lock set
+ * RCU and node lock set
*/
void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{
@@ -320,10 +356,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
? buf_seqno(n_ptr->bclink.deferred_head) - 1
: n_ptr->bclink.last_sent);
- spin_lock_bh(&bc_lock);
- tipc_bearer_send(&bcbearer->bearer, buf, NULL);
+ tipc_bclink_lock();
+ tipc_bearer_send(MAX_BEARERS, buf, NULL);
bcl->stats.sent_nacks++;
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
kfree_skb(buf);
n_ptr->bclink.oos_state++;
@@ -335,8 +371,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
*
* Delay any upcoming NACK by this node if another node has already
* requested the first message this node is going to ask for.
- *
- * Only tipc_net_lock set.
*/
static void bclink_peek_nack(struct tipc_msg *msg)
{
@@ -355,36 +389,56 @@ static void bclink_peek_nack(struct tipc_msg *msg)
tipc_node_unlock(n_ptr);
}
-/*
- * tipc_bclink_xmit - broadcast a packet to all nodes in cluster
+/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+ * and to identified node local sockets
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_bclink_xmit(struct sk_buff *buf)
{
- int res;
-
- spin_lock_bh(&bc_lock);
-
- if (!bclink->bcast_nodes.count) {
- res = msg_data_sz(buf_msg(buf));
- kfree_skb(buf);
- goto exit;
+ int rc = 0;
+ int bc = 0;
+ struct sk_buff *clbuf;
+
+ /* Prepare clone of message for local node */
+ clbuf = tipc_msg_reassemble(buf);
+ if (unlikely(!clbuf)) {
+ kfree_skb_list(buf);
+ return -EHOSTUNREACH;
}
- res = __tipc_link_xmit(bcl, buf);
- if (likely(res >= 0)) {
- bclink_set_last_sent();
- bcl->stats.queue_sz_counts++;
- bcl->stats.accu_queue_sz += bcl->out_queue_size;
+ /* Broadcast to all other nodes */
+ if (likely(bclink)) {
+ tipc_bclink_lock();
+ if (likely(bclink->bcast_nodes.count)) {
+ rc = __tipc_link_xmit(bcl, buf);
+ if (likely(!rc)) {
+ bclink_set_last_sent();
+ bcl->stats.queue_sz_counts++;
+ bcl->stats.accu_queue_sz += bcl->out_queue_size;
+ }
+ bc = 1;
+ }
+ tipc_bclink_unlock();
}
-exit:
- spin_unlock_bh(&bc_lock);
- return res;
+
+ if (unlikely(!bc))
+ kfree_skb_list(buf);
+
+ /* Deliver message clone */
+ if (likely(!rc))
+ tipc_sk_mcast_rcv(clbuf);
+ else
+ kfree_skb(clbuf);
+
+ return rc;
}
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
- * Called with both sending node's lock and bc_lock taken.
+ * Called with both sending node's lock and bclink_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
@@ -408,7 +462,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
/**
* tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
*
- * tipc_net_lock is read_locked, no other locks set
+ * RCU is locked, no other locks set
*/
void tipc_bclink_rcv(struct sk_buff *buf)
{
@@ -416,7 +470,7 @@ void tipc_bclink_rcv(struct sk_buff *buf)
struct tipc_node *node;
u32 next_in;
u32 seqno;
- int deferred;
+ int deferred = 0;
/* Screen out unwanted broadcast messages */
@@ -439,12 +493,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
if (msg_destnode(msg) == tipc_own_addr) {
tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_node_unlock(node);
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bcl->stats.recv_nacks++;
bclink->retransmit_to = node;
bclink_retransmit_pkt(msg_bcgap_after(msg),
msg_bcgap_to(msg));
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
} else {
tipc_node_unlock(node);
bclink_peek_nack(msg);
@@ -462,51 +516,47 @@ receive:
/* Deliver message to destination */
if (likely(msg_isdata(msg))) {
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
- tipc_port_mcast_rcv(buf, NULL);
+ tipc_sk_mcast_rcv(buf);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_link_bundle_rcv(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
- int ret;
- ret = tipc_link_frag_rcv(&node->bclink.reasm_head,
- &node->bclink.reasm_tail,
- &buf);
- if (ret == LINK_REASM_ERROR)
+ tipc_buf_append(&node->bclink.reasm_buf, &buf);
+ if (unlikely(!buf && !node->bclink.reasm_buf))
goto unlock;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
- if (ret == LINK_REASM_COMPLETE) {
+ if (buf) {
bcl->stats.recv_fragmented++;
- /* Point msg to inner header */
msg = buf_msg(buf);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
goto receive;
}
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_named_rcv(buf);
} else {
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
kfree_skb(buf);
}
@@ -536,6 +586,7 @@ receive:
buf = node->bclink.deferred_head;
node->bclink.deferred_head = buf->next;
+ buf->next = NULL;
node->bclink.deferred_size--;
goto receive;
}
@@ -549,17 +600,16 @@ receive:
node->bclink.deferred_size += deferred;
bclink_update_last_sent(node, seqno);
buf = NULL;
- } else
- deferred = 0;
+ }
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
if (deferred)
bcl->stats.deferred_recv++;
else
bcl->stats.duplicates++;
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
unlock:
tipc_node_unlock(node);
@@ -587,6 +637,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
{
int bp_index;
+ struct tipc_msg *msg = buf_msg(buf);
/* Prepare broadcast link message for reliable transmission,
* if first time trying to send it;
@@ -594,10 +645,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
* since they are sent in an unreliable manner and don't need it
*/
if (likely(!msg_non_seq(buf_msg(buf)))) {
- struct tipc_msg *msg;
-
bcbuf_set_acks(buf, bclink->bcast_nodes.count);
- msg = buf_msg(buf);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
bcl->stats.sent_info++;
@@ -614,12 +662,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
- struct tipc_bearer *b = p;
+ struct tipc_bearer *bp[2] = {p, s};
+ struct tipc_bearer *b = bp[msg_link_selector(msg)];
struct sk_buff *tbuf;
if (!p)
break; /* No more bearers to try */
-
+ if (!b)
+ b = p;
tipc_nmap_diff(&bcbearer->remains, &b->nodes,
&bcbearer->remains_new);
if (bcbearer->remains_new.count == bcbearer->remains.count)
@@ -627,22 +677,15 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
if (bp_index == 0) {
/* Use original buffer for first bearer */
- tipc_bearer_send(b, buf, &b->bcast_addr);
+ tipc_bearer_send(b->identity, buf, &b->bcast_addr);
} else {
/* Avoid concurrent buffer access */
- tbuf = pskb_copy(buf, GFP_ATOMIC);
+ tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
if (!tbuf)
break;
- tipc_bearer_send(b, tbuf, &b->bcast_addr);
+ tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
kfree_skb(tbuf); /* Bearer keeps a clone */
}
-
- /* Swap bearers for next packet */
- if (s) {
- bcbearer->bpairs[bp_index].primary = s;
- bcbearer->bpairs[bp_index].secondary = p;
- }
-
if (bcbearer->remains_new.count == 0)
break; /* All targets reached */
@@ -655,20 +698,27 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/
-void tipc_bcbearer_sort(void)
+void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
{
struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
struct tipc_bcbearer_pair *bp_curr;
+ struct tipc_bearer *b;
int b_index;
int pri;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
+
+ if (action)
+ tipc_nmap_add(nm_ptr, node);
+ else
+ tipc_nmap_remove(nm_ptr, node);
/* Group bearers by priority (can assume max of two per priority) */
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
+ rcu_read_lock();
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
- struct tipc_bearer *b = bearer_list[b_index];
+ b = rcu_dereference_rtnl(bearer_list[b_index]);
if (!b || !b->nodes.count)
continue;
@@ -677,6 +727,7 @@ void tipc_bcbearer_sort(void)
else
bp_temp[b->priority].secondary = b;
}
+ rcu_read_unlock();
/* Create array of bearer pairs for broadcasting */
bp_curr = bcbearer->bpairs;
@@ -702,7 +753,7 @@ void tipc_bcbearer_sort(void)
bp_curr++;
}
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
}
@@ -714,7 +765,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
if (!bcl)
return 0;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
s = &bcl->stats;
@@ -743,7 +794,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
return ret;
}
@@ -752,9 +803,9 @@ int tipc_bclink_reset_stats(void)
if (!bcl)
return -ENOPROTOOPT;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
memset(&bcl->stats, 0, sizeof(bcl->stats));
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
return 0;
}
@@ -765,46 +816,59 @@ int tipc_bclink_set_queue_limits(u32 limit)
if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
return -EINVAL;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
tipc_link_set_queue_limits(bcl, limit);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
return 0;
}
-void tipc_bclink_init(void)
+int tipc_bclink_init(void)
{
+ bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
+ if (!bcbearer)
+ return -ENOMEM;
+
+ bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
+ if (!bclink) {
+ kfree(bcbearer);
+ return -ENOMEM;
+ }
+
+ bcl = &bclink->link;
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");
+ spin_lock_init(&bclink->lock);
INIT_LIST_HEAD(&bcl->waiting_ports);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
bcl->owner = &bclink->node;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
- bcl->b_ptr = &bcbearer->bearer;
- bearer_list[BCBEARER] = &bcbearer->bearer;
+ bcl->bearer_id = MAX_BEARERS;
+ rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
+ return 0;
}
void tipc_bclink_stop(void)
{
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
tipc_link_purge_queues(bcl);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
- bearer_list[BCBEARER] = NULL;
- memset(bclink, 0, sizeof(*bclink));
- memset(bcbearer, 0, sizeof(*bcbearer));
+ RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
+ synchronize_net();
+ kfree(bcbearer);
+ kfree(bclink);
}
-
/**
* tipc_nmap_add - add a node to a node map
*/
-void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
+static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
@@ -819,7 +883,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
/**
* tipc_nmap_remove - remove a node from a node map
*/
-void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
+static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index a80ef54b818..4875d9536ae 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
/*
* net/tipc/bcast.h: Include file for TIPC broadcast code
*
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -39,6 +39,7 @@
#define MAX_NODES 4096
#define WSIZE 32
+#define TIPC_BCLINK_RESET 1
/**
* struct tipc_node_map - set of node identifiers
@@ -69,9 +70,6 @@ struct tipc_node;
extern const char tipc_bclink_name[];
-void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
-void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
-
/**
* tipc_nmap_equal - test for equality of node maps
*/
@@ -84,13 +82,13 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port);
void tipc_port_list_free(struct tipc_port_list *pl_ptr);
-void tipc_bclink_init(void);
+int tipc_bclink_init(void);
void tipc_bclink_stop(void);
+void tipc_bclink_set_flags(unsigned int flags);
void tipc_bclink_add_node(u32 addr);
void tipc_bclink_remove_node(u32 addr);
struct tipc_node *tipc_bclink_retransmit_to(void);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
-int tipc_bclink_xmit(struct sk_buff *buf);
void tipc_bclink_rcv(struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(void);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
@@ -98,6 +96,8 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit);
-void tipc_bcbearer_sort(void);
+void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
+uint tipc_bclink_get_mtu(void);
+int tipc_bclink_xmit(struct sk_buff *buf);
#endif
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 3fef7eb776d..264474394f9 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -49,7 +49,7 @@ static struct tipc_media * const media_info_array[] = {
NULL
};
-struct tipc_bearer *bearer_list[MAX_BEARERS + 1];
+struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];
static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down);
@@ -178,7 +178,7 @@ struct tipc_bearer *tipc_bearer_find(const char *name)
u32 i;
for (i = 0; i < MAX_BEARERS; i++) {
- b_ptr = bearer_list[i];
+ b_ptr = rtnl_dereference(bearer_list[i]);
if (b_ptr && (!strcmp(b_ptr->name, name)))
return b_ptr;
}
@@ -198,10 +198,9 @@ struct sk_buff *tipc_bearer_get_names(void)
if (!buf)
return NULL;
- read_lock_bh(&tipc_net_lock);
for (i = 0; media_info_array[i] != NULL; i++) {
for (j = 0; j < MAX_BEARERS; j++) {
- b = bearer_list[j];
+ b = rtnl_dereference(bearer_list[j]);
if (!b)
continue;
if (b->media == media_info_array[i]) {
@@ -211,22 +210,33 @@ struct sk_buff *tipc_bearer_get_names(void)
}
}
}
- read_unlock_bh(&tipc_net_lock);
return buf;
}
-void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest)
+void tipc_bearer_add_dest(u32 bearer_id, u32 dest)
{
- tipc_nmap_add(&b_ptr->nodes, dest);
- tipc_bcbearer_sort();
- tipc_disc_add_dest(b_ptr->link_req);
+ struct tipc_bearer *b_ptr;
+
+ rcu_read_lock();
+ b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]);
+ if (b_ptr) {
+ tipc_bcbearer_sort(&b_ptr->nodes, dest, true);
+ tipc_disc_add_dest(b_ptr->link_req);
+ }
+ rcu_read_unlock();
}
-void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
+void tipc_bearer_remove_dest(u32 bearer_id, u32 dest)
{
- tipc_nmap_remove(&b_ptr->nodes, dest);
- tipc_bcbearer_sort();
- tipc_disc_remove_dest(b_ptr->link_req);
+ struct tipc_bearer *b_ptr;
+
+ rcu_read_lock();
+ b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]);
+ if (b_ptr) {
+ tipc_bcbearer_sort(&b_ptr->nodes, dest, false);
+ tipc_disc_remove_dest(b_ptr->link_req);
+ }
+ rcu_read_unlock();
}
/**
@@ -271,13 +281,11 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
return -EINVAL;
}
- write_lock_bh(&tipc_net_lock);
-
m_ptr = tipc_media_find(b_names.media_name);
if (!m_ptr) {
pr_warn("Bearer <%s> rejected, media <%s> not registered\n",
name, b_names.media_name);
- goto exit;
+ return -EINVAL;
}
if (priority == TIPC_MEDIA_LINK_PRI)
@@ -287,7 +295,7 @@ restart:
bearer_id = MAX_BEARERS;
with_this_prio = 1;
for (i = MAX_BEARERS; i-- != 0; ) {
- b_ptr = bearer_list[i];
+ b_ptr = rtnl_dereference(bearer_list[i]);
if (!b_ptr) {
bearer_id = i;
continue;
@@ -295,14 +303,14 @@ restart:
if (!strcmp(name, b_ptr->name)) {
pr_warn("Bearer <%s> rejected, already enabled\n",
name);
- goto exit;
+ return -EINVAL;
}
if ((b_ptr->priority == priority) &&
(++with_this_prio > 2)) {
if (priority-- == 0) {
pr_warn("Bearer <%s> rejected, duplicate priority\n",
name);
- goto exit;
+ return -EINVAL;
}
pr_warn("Bearer <%s> priority adjustment required %u->%u\n",
name, priority + 1, priority);
@@ -312,21 +320,20 @@ restart:
if (bearer_id >= MAX_BEARERS) {
pr_warn("Bearer <%s> rejected, bearer limit reached (%u)\n",
name, MAX_BEARERS);
- goto exit;
+ return -EINVAL;
}
b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC);
- if (!b_ptr) {
- res = -ENOMEM;
- goto exit;
- }
+ if (!b_ptr)
+ return -ENOMEM;
+
strcpy(b_ptr->name, name);
b_ptr->media = m_ptr;
res = m_ptr->enable_media(b_ptr);
if (res) {
pr_warn("Bearer <%s> rejected, enable failure (%d)\n",
name, -res);
- goto exit;
+ return -EINVAL;
}
b_ptr->identity = bearer_id;
@@ -341,16 +348,14 @@ restart:
bearer_disable(b_ptr, false);
pr_warn("Bearer <%s> rejected, discovery object creation failed\n",
name);
- goto exit;
+ return -EINVAL;
}
- bearer_list[bearer_id] = b_ptr;
+ rcu_assign_pointer(bearer_list[bearer_id], b_ptr);
pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
name,
tipc_addr_string_fill(addr_string, disc_domain), priority);
-exit:
- write_unlock_bh(&tipc_net_lock);
return res;
}
@@ -359,19 +364,16 @@ exit:
*/
static int tipc_reset_bearer(struct tipc_bearer *b_ptr)
{
- read_lock_bh(&tipc_net_lock);
pr_info("Resetting bearer <%s>\n", b_ptr->name);
- tipc_disc_delete(b_ptr->link_req);
tipc_link_reset_list(b_ptr->identity);
- tipc_disc_create(b_ptr, &b_ptr->bcast_addr);
- read_unlock_bh(&tipc_net_lock);
+ tipc_disc_reset(b_ptr);
return 0;
}
/**
* bearer_disable
*
- * Note: This routine assumes caller holds tipc_net_lock.
+ * Note: This routine assumes caller holds RTNL lock.
*/
static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)
{
@@ -385,12 +387,12 @@ static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)
tipc_disc_delete(b_ptr->link_req);
for (i = 0; i < MAX_BEARERS; i++) {
- if (b_ptr == bearer_list[i]) {
- bearer_list[i] = NULL;
+ if (b_ptr == rtnl_dereference(bearer_list[i])) {
+ RCU_INIT_POINTER(bearer_list[i], NULL);
break;
}
}
- kfree(b_ptr);
+ kfree_rcu(b_ptr, rcu);
}
int tipc_disable_bearer(const char *name)
@@ -398,7 +400,6 @@ int tipc_disable_bearer(const char *name)
struct tipc_bearer *b_ptr;
int res;
- write_lock_bh(&tipc_net_lock);
b_ptr = tipc_bearer_find(name);
if (b_ptr == NULL) {
pr_warn("Attempt to disable unknown bearer <%s>\n", name);
@@ -407,32 +408,9 @@ int tipc_disable_bearer(const char *name)
bearer_disable(b_ptr, false);
res = 0;
}
- write_unlock_bh(&tipc_net_lock);
return res;
}
-
-/* tipc_l2_media_addr_set - initialize Ethernet media address structure
- *
- * Media-dependent "value" field stores MAC address in first 6 bytes
- * and zeroes out the remaining bytes.
- */
-void tipc_l2_media_addr_set(const struct tipc_bearer *b,
- struct tipc_media_addr *a, char *mac)
-{
- int len = b->media->hwaddr_len;
-
- if (unlikely(sizeof(a->value) < len)) {
- WARN_ONCE(1, "Media length invalid\n");
- return;
- }
-
- memcpy(a->value, mac, len);
- memset(a->value + len, 0, sizeof(a->value) - len);
- a->media_id = b->media->type_id;
- a->broadcast = !memcmp(mac, b->bcast_addr.value, len);
-}
-
int tipc_enable_l2_media(struct tipc_bearer *b)
{
struct net_device *dev;
@@ -443,33 +421,37 @@ int tipc_enable_l2_media(struct tipc_bearer *b)
if (!dev)
return -ENODEV;
- /* Associate TIPC bearer with Ethernet bearer */
- b->media_ptr = dev;
- memset(b->bcast_addr.value, 0, sizeof(b->bcast_addr.value));
+ /* Associate TIPC bearer with L2 bearer */
+ rcu_assign_pointer(b->media_ptr, dev);
+ memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));
memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
b->bcast_addr.media_id = b->media->type_id;
b->bcast_addr.broadcast = 1;
b->mtu = dev->mtu;
- tipc_l2_media_addr_set(b, &b->addr, (char *)dev->dev_addr);
+ b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);
rcu_assign_pointer(dev->tipc_ptr, b);
return 0;
}
-/* tipc_disable_l2_media - detach TIPC bearer from an Ethernet interface
+/* tipc_disable_l2_media - detach TIPC bearer from an L2 interface
*
- * Mark Ethernet bearer as inactive so that incoming buffers are thrown away,
+ * Mark L2 bearer as inactive so that incoming buffers are thrown away,
* then get worker thread to complete bearer cleanup. (Can't do cleanup
* here because cleanup code needs to sleep and caller holds spinlocks.)
*/
void tipc_disable_l2_media(struct tipc_bearer *b)
{
- struct net_device *dev = (struct net_device *)b->media_ptr;
+ struct net_device *dev;
+
+ dev = (struct net_device *)rtnl_dereference(b->media_ptr);
+ RCU_INIT_POINTER(b->media_ptr, NULL);
RCU_INIT_POINTER(dev->tipc_ptr, NULL);
+ synchronize_net();
dev_put(dev);
}
/**
- * tipc_l2_send_msg - send a TIPC packet out over an Ethernet interface
+ * tipc_l2_send_msg - send a TIPC packet out over an L2 interface
* @buf: the packet to be sent
* @b_ptr: the bearer through which the packet is to be sent
* @dest: peer destination address
@@ -478,8 +460,12 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
struct tipc_media_addr *dest)
{
struct sk_buff *clone;
+ struct net_device *dev;
int delta;
- struct net_device *dev = (struct net_device *)b->media_ptr;
+
+ dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr);
+ if (!dev)
+ return 0;
clone = skb_clone(buf, GFP_ATOMIC);
if (!clone)
@@ -507,10 +493,16 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
*/
-void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
+void tipc_bearer_send(u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest)
{
- b->media->send_msg(buf, b, dest);
+ struct tipc_bearer *b_ptr;
+
+ rcu_read_lock();
+ b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]);
+ if (likely(b_ptr))
+ b_ptr->media->send_msg(buf, b_ptr, dest);
+ rcu_read_unlock();
}
/**
@@ -535,7 +527,7 @@ static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev,
}
rcu_read_lock();
- b_ptr = rcu_dereference(dev->tipc_ptr);
+ b_ptr = rcu_dereference_rtnl(dev->tipc_ptr);
if (likely(b_ptr)) {
if (likely(buf->pkt_type <= PACKET_BROADCAST)) {
buf->next = NULL;
@@ -568,12 +560,9 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
if (!net_eq(dev_net(dev), &init_net))
return NOTIFY_DONE;
- rcu_read_lock();
- b_ptr = rcu_dereference(dev->tipc_ptr);
- if (!b_ptr) {
- rcu_read_unlock();
+ b_ptr = rtnl_dereference(dev->tipc_ptr);
+ if (!b_ptr)
return NOTIFY_DONE;
- }
b_ptr->mtu = dev->mtu;
@@ -586,17 +575,15 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
tipc_reset_bearer(b_ptr);
break;
case NETDEV_CHANGEADDR:
- tipc_l2_media_addr_set(b_ptr, &b_ptr->addr,
+ b_ptr->media->raw2addr(b_ptr, &b_ptr->addr,
(char *)dev->dev_addr);
tipc_reset_bearer(b_ptr);
break;
case NETDEV_UNREGISTER:
case NETDEV_CHANGENAME:
- tipc_disable_bearer(b_ptr->name);
+ bearer_disable(b_ptr, false);
break;
}
- rcu_read_unlock();
-
return NOTIFY_OK;
}
@@ -633,7 +620,7 @@ void tipc_bearer_stop(void)
u32 i;
for (i = 0; i < MAX_BEARERS; i++) {
- b_ptr = bearer_list[i];
+ b_ptr = rtnl_dereference(bearer_list[i]);
if (b_ptr) {
bearer_disable(b_ptr, true);
bearer_list[i] = NULL;
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index ba48145e871..78fccc49de2 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -42,14 +42,12 @@
#define MAX_BEARERS 2
#define MAX_MEDIA 2
-/*
- * Identifiers associated with TIPC message header media address info
- *
- * - address info field is 20 bytes long
- * - media type identifier located at offset 3
- * - remaining bytes vary according to media type
+/* Identifiers associated with TIPC message header media address info
+ * - address info field is 32 bytes long
+ * - the field's actual content and length is defined per media
+ * - remaining unused bytes in the field are set to zero
*/
-#define TIPC_MEDIA_ADDR_SIZE 20
+#define TIPC_MEDIA_ADDR_SIZE 32
#define TIPC_MEDIA_TYPE_OFFSET 3
/*
@@ -77,9 +75,10 @@ struct tipc_bearer;
* @send_msg: routine which handles buffer transmission
* @enable_media: routine which enables a media
* @disable_media: routine which disables a media
- * @addr2str: routine which converts media address to string
- * @addr2msg: routine which converts media address to protocol message area
- * @msg2addr: routine which converts media address from protocol message area
+ * @addr2str: convert media address format to string
+ * @addr2msg: convert from media addr format to discovery msg addr format
+ * @msg2addr: convert from discovery msg addr format to media addr format
+ * @raw2addr: convert from raw addr format to media addr format
* @priority: default link (and bearer) priority
* @tolerance: default time (in ms) before declaring link failure
* @window: default window (in packets) before declaring link congestion
@@ -93,10 +92,16 @@ struct tipc_media {
struct tipc_media_addr *dest);
int (*enable_media)(struct tipc_bearer *b_ptr);
void (*disable_media)(struct tipc_bearer *b_ptr);
- int (*addr2str)(struct tipc_media_addr *a, char *str_buf, int str_size);
- int (*addr2msg)(struct tipc_media_addr *a, char *msg_area);
- int (*msg2addr)(const struct tipc_bearer *b_ptr,
- struct tipc_media_addr *a, char *msg_area);
+ int (*addr2str)(struct tipc_media_addr *addr,
+ char *strbuf,
+ int bufsz);
+ int (*addr2msg)(char *msg, struct tipc_media_addr *addr);
+ int (*msg2addr)(struct tipc_bearer *b,
+ struct tipc_media_addr *addr,
+ char *msg);
+ int (*raw2addr)(struct tipc_bearer *b,
+ struct tipc_media_addr *addr,
+ char *raw);
u32 priority;
u32 tolerance;
u32 window;
@@ -113,6 +118,7 @@ struct tipc_media {
* @name: bearer name (format = media:interface)
* @media: ptr to media structure associated with bearer
* @bcast_addr: media address used in broadcasting
+ * @rcu: rcu struct for tipc_bearer
* @priority: default link priority for bearer
* @window: default window size for bearer
* @tolerance: default link tolerance for bearer
@@ -127,12 +133,13 @@ struct tipc_media {
* care of initializing all other fields.
*/
struct tipc_bearer {
- void *media_ptr; /* initalized by media */
+ void __rcu *media_ptr; /* initalized by media */
u32 mtu; /* initalized by media */
struct tipc_media_addr addr; /* initalized by media */
char name[TIPC_MAX_BEARER_NAME];
struct tipc_media *media;
struct tipc_media_addr bcast_addr;
+ struct rcu_head rcu;
u32 priority;
u32 window;
u32 tolerance;
@@ -150,7 +157,7 @@ struct tipc_bearer_names {
struct tipc_link;
-extern struct tipc_bearer *bearer_list[];
+extern struct tipc_bearer __rcu *bearer_list[];
/*
* TIPC routines available to supported media types
@@ -173,22 +180,20 @@ int tipc_media_set_priority(const char *name, u32 new_value);
int tipc_media_set_window(const char *name, u32 new_value);
void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a);
struct sk_buff *tipc_media_get_names(void);
-void tipc_l2_media_addr_set(const struct tipc_bearer *b,
- struct tipc_media_addr *a, char *mac);
int tipc_enable_l2_media(struct tipc_bearer *b);
void tipc_disable_l2_media(struct tipc_bearer *b);
int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
struct tipc_media_addr *dest);
struct sk_buff *tipc_bearer_get_names(void);
-void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
-void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
+void tipc_bearer_add_dest(u32 bearer_id, u32 dest);
+void tipc_bearer_remove_dest(u32 bearer_id, u32 dest);
struct tipc_bearer *tipc_bearer_find(const char *name);
struct tipc_media *tipc_media_find(const char *name);
int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(void);
-void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
+void tipc_bearer_send(u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/config.c b/net/tipc/config.c
index 4b981c05382..2b42403ad33 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -42,8 +42,6 @@
#define REPLY_TRUNCATED "<truncated>\n"
-static DEFINE_MUTEX(config_mutex);
-
static const void *req_tlv_area; /* request message TLV area */
static int req_tlv_space; /* request message TLV area size */
static int rep_headroom; /* reply message headroom to use */
@@ -179,8 +177,10 @@ static struct sk_buff *cfg_set_own_addr(void)
if (tipc_own_addr)
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (cannot change node address once assigned)");
- tipc_net_start(addr);
- return tipc_cfg_reply_none();
+ if (!tipc_net_start(addr))
+ return tipc_cfg_reply_none();
+
+ return tipc_cfg_reply_error_string("cannot change to network mode");
}
static struct sk_buff *cfg_set_max_ports(void)
@@ -223,7 +223,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
{
struct sk_buff *rep_tlv_buf;
- mutex_lock(&config_mutex);
+ rtnl_lock();
/* Save request and reply details in a well-known location */
req_tlv_area = request_area;
@@ -337,6 +337,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
/* Return reply buffer */
exit:
- mutex_unlock(&config_mutex);
+ rtnl_unlock();
return rep_tlv_buf;
}
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 50d57429ebc..676d18015dd 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -80,7 +80,6 @@ struct sk_buff *tipc_buf_acquire(u32 size)
*/
static void tipc_core_stop(void)
{
- tipc_handler_stop();
tipc_net_stop();
tipc_bearer_cleanup();
tipc_netlink_stop();
@@ -100,10 +99,6 @@ static int tipc_core_start(void)
get_random_bytes(&tipc_random, sizeof(tipc_random));
- err = tipc_handler_start();
- if (err)
- goto out_handler;
-
err = tipc_ref_table_init(tipc_max_ports, tipc_random);
if (err)
goto out_reftbl;
@@ -146,8 +141,6 @@ out_netlink:
out_nametbl:
tipc_ref_table_stop();
out_reftbl:
- tipc_handler_stop();
-out_handler:
return err;
}
@@ -161,10 +154,11 @@ static int __init tipc_init(void)
tipc_max_ports = CONFIG_TIPC_PORTS;
tipc_net_id = 4711;
- sysctl_tipc_rmem[0] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE;
- sysctl_tipc_rmem[1] = CONN_OVERLOAD_LIMIT >> 4 <<
+ sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
+ TIPC_LOW_IMPORTANCE;
+ sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
TIPC_CRITICAL_IMPORTANCE;
- sysctl_tipc_rmem[2] = CONN_OVERLOAD_LIMIT;
+ sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
res = tipc_core_start();
if (res)
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 8985bbcb942..bb26ed1ee96 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -56,7 +56,8 @@
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/vmalloc.h>
-
+#include <linux/rtnetlink.h>
+#include <linux/etherdevice.h>
#define TIPC_MOD_VER "2.0.0"
@@ -89,8 +90,6 @@ extern int tipc_random __read_mostly;
/*
* Routines available to privileged subsystems
*/
-int tipc_handler_start(void);
-void tipc_handler_stop(void);
int tipc_netlink_start(void);
void tipc_netlink_stop(void);
int tipc_socket_init(void);
@@ -109,12 +108,10 @@ void tipc_unregister_sysctl(void);
#endif
/*
- * TIPC timer and signal code
+ * TIPC timer code
*/
typedef void (*Handler) (unsigned long);
-u32 tipc_k_signal(Handler routine, unsigned long argument);
-
/**
* k_init_timer - initialize a timer
* @timer: pointer to timer structure
@@ -191,6 +188,7 @@ static inline void k_term_timer(struct timer_list *timer)
struct tipc_skb_cb {
void *handle;
bool deferred;
+ struct sk_buff *tail;
};
#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 542fe3413dc..aa722a42ef8 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -1,7 +1,7 @@
/*
* net/tipc/discover.c
*
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
* Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -46,8 +46,9 @@
/**
* struct tipc_link_req - information about an ongoing link setup request
- * @bearer: bearer issuing requests
+ * @bearer_id: identity of bearer issuing requests
* @dest: destination address for request messages
+ * @domain: network domain to which links can be established
* @num_nodes: number of nodes currently discovered (i.e. with an active link)
* @lock: spinlock for controlling access to requests
* @buf: request message to be (repeatedly) sent
@@ -55,8 +56,9 @@
* @timer_intv: current interval between requests (in ms)
*/
struct tipc_link_req {
- struct tipc_bearer *bearer;
+ u32 bearer_id;
struct tipc_media_addr dest;
+ u32 domain;
int num_nodes;
spinlock_t lock;
struct sk_buff *buf;
@@ -69,22 +71,19 @@ struct tipc_link_req {
* @type: message type (request or response)
* @b_ptr: ptr to bearer issuing message
*/
-static struct sk_buff *tipc_disc_init_msg(u32 type, struct tipc_bearer *b_ptr)
+static void tipc_disc_init_msg(struct sk_buff *buf, u32 type,
+ struct tipc_bearer *b_ptr)
{
- struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE);
struct tipc_msg *msg;
u32 dest_domain = b_ptr->domain;
- if (buf) {
- msg = buf_msg(buf);
- tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
- msg_set_non_seq(msg, 1);
- msg_set_node_sig(msg, tipc_random);
- msg_set_dest_domain(msg, dest_domain);
- msg_set_bc_netid(msg, tipc_net_id);
- b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg));
- }
- return buf;
+ msg = buf_msg(buf);
+ tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
+ msg_set_non_seq(msg, 1);
+ msg_set_node_sig(msg, tipc_random);
+ msg_set_dest_domain(msg, dest_domain);
+ msg_set_bc_netid(msg, tipc_net_id);
+ b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
}
/**
@@ -107,146 +106,150 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
}
/**
- * tipc_disc_rcv - handle incoming link setup message (request or response)
+ * tipc_disc_rcv - handle incoming discovery message (request or response)
* @buf: buffer containing message
- * @b_ptr: bearer that message arrived on
+ * @bearer: bearer that message arrived on
*/
-void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr)
+void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *bearer)
{
- struct tipc_node *n_ptr;
+ struct tipc_node *node;
struct tipc_link *link;
- struct tipc_media_addr media_addr;
+ struct tipc_media_addr maddr;
struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf);
- u32 dest = msg_dest_domain(msg);
- u32 orig = msg_prevnode(msg);
+ u32 ddom = msg_dest_domain(msg);
+ u32 onode = msg_prevnode(msg);
u32 net_id = msg_bc_netid(msg);
- u32 type = msg_type(msg);
+ u32 mtyp = msg_type(msg);
u32 signature = msg_node_sig(msg);
- int addr_mismatch;
- int link_fully_up;
-
- media_addr.broadcast = 1;
- b_ptr->media->msg2addr(b_ptr, &media_addr, msg_media_addr(msg));
+ bool addr_match = false;
+ bool sign_match = false;
+ bool link_up = false;
+ bool accept_addr = false;
+ bool accept_sign = false;
+ bool respond = false;
+
+ bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg));
kfree_skb(buf);
/* Ensure message from node is valid and communication is permitted */
if (net_id != tipc_net_id)
return;
- if (media_addr.broadcast)
+ if (maddr.broadcast)
return;
- if (!tipc_addr_domain_valid(dest))
+ if (!tipc_addr_domain_valid(ddom))
return;
- if (!tipc_addr_node_valid(orig))
+ if (!tipc_addr_node_valid(onode))
return;
- if (orig == tipc_own_addr) {
- if (memcmp(&media_addr, &b_ptr->addr, sizeof(media_addr)))
- disc_dupl_alert(b_ptr, tipc_own_addr, &media_addr);
+
+ if (in_own_node(onode)) {
+ if (memcmp(&maddr, &bearer->addr, sizeof(maddr)))
+ disc_dupl_alert(bearer, tipc_own_addr, &maddr);
return;
}
- if (!tipc_in_scope(dest, tipc_own_addr))
+ if (!tipc_in_scope(ddom, tipc_own_addr))
return;
- if (!tipc_in_scope(b_ptr->domain, orig))
+ if (!tipc_in_scope(bearer->domain, onode))
return;
- /* Locate structure corresponding to requesting node */
- n_ptr = tipc_node_find(orig);
- if (!n_ptr) {
- n_ptr = tipc_node_create(orig);
- if (!n_ptr)
- return;
- }
- tipc_node_lock(n_ptr);
+ /* Locate, or if necessary, create, node: */
+ node = tipc_node_find(onode);
+ if (!node)
+ node = tipc_node_create(onode);
+ if (!node)
+ return;
- /* Prepare to validate requesting node's signature and media address */
- link = n_ptr->links[b_ptr->identity];
- addr_mismatch = (link != NULL) &&
- memcmp(&link->media_addr, &media_addr, sizeof(media_addr));
+ tipc_node_lock(node);
+ link = node->links[bearer->identity];
- /*
- * Ensure discovery message's signature is correct
- *
- * If signature is incorrect and there is no working link to the node,
- * accept the new signature but invalidate all existing links to the
- * node so they won't re-activate without a new discovery message.
- *
- * If signature is incorrect and the requested link to the node is
- * working, accept the new signature. (This is an instance of delayed
- * rediscovery, where a link endpoint was able to re-establish contact
- * with its peer endpoint on a node that rebooted before receiving a
- * discovery message from that node.)
- *
- * If signature is incorrect and there is a working link to the node
- * that is not the requested link, reject the request (must be from
- * a duplicate node).
- */
- if (signature != n_ptr->signature) {
- if (n_ptr->working_links == 0) {
- struct tipc_link *curr_link;
- int i;
-
- for (i = 0; i < MAX_BEARERS; i++) {
- curr_link = n_ptr->links[i];
- if (curr_link) {
- memset(&curr_link->media_addr, 0,
- sizeof(media_addr));
- tipc_link_reset(curr_link);
- }
- }
- addr_mismatch = (link != NULL);
- } else if (tipc_link_is_up(link) && !addr_mismatch) {
- /* delayed rediscovery */
- } else {
- disc_dupl_alert(b_ptr, orig, &media_addr);
- tipc_node_unlock(n_ptr);
- return;
- }
- n_ptr->signature = signature;
+ /* Prepare to validate requesting node's signature and media address */
+ sign_match = (signature == node->signature);
+ addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr));
+ link_up = link && tipc_link_is_up(link);
+
+
+ /* These three flags give us eight permutations: */
+
+ if (sign_match && addr_match && link_up) {
+ /* All is fine. Do nothing. */
+ } else if (sign_match && addr_match && !link_up) {
+ /* Respond. The link will come up in due time */
+ respond = true;
+ } else if (sign_match && !addr_match && link_up) {
+ /* Peer has changed i/f address without rebooting.
+ * If so, the link will reset soon, and the next
+ * discovery will be accepted. So we can ignore it.
+ * It may also be an cloned or malicious peer having
+ * chosen the same node address and signature as an
+ * existing one.
+ * Ignore requests until the link goes down, if ever.
+ */
+ disc_dupl_alert(bearer, onode, &maddr);
+ } else if (sign_match && !addr_match && !link_up) {
+ /* Peer link has changed i/f address without rebooting.
+ * It may also be a cloned or malicious peer; we can't
+ * distinguish between the two.
+ * The signature is correct, so we must accept.
+ */
+ accept_addr = true;
+ respond = true;
+ } else if (!sign_match && addr_match && link_up) {
+ /* Peer node rebooted. Two possibilities:
+ * - Delayed re-discovery; this link endpoint has already
+ * reset and re-established contact with the peer, before
+ * receiving a discovery message from that node.
+ * (The peer happened to receive one from this node first).
+ * - The peer came back so fast that our side has not
+ * discovered it yet. Probing from this side will soon
+ * reset the link, since there can be no working link
+ * endpoint at the peer end, and the link will re-establish.
+ * Accept the signature, since it comes from a known peer.
+ */
+ accept_sign = true;
+ } else if (!sign_match && addr_match && !link_up) {
+ /* The peer node has rebooted.
+ * Accept signature, since it is a known peer.
+ */
+ accept_sign = true;
+ respond = true;
+ } else if (!sign_match && !addr_match && link_up) {
+ /* Peer rebooted with new address, or a new/duplicate peer.
+ * Ignore until the link goes down, if ever.
+ */
+ disc_dupl_alert(bearer, onode, &maddr);
+ } else if (!sign_match && !addr_match && !link_up) {
+ /* Peer rebooted with new address, or it is a new peer.
+ * Accept signature and address.
+ */
+ accept_sign = true;
+ accept_addr = true;
+ respond = true;
}
- /*
- * Ensure requesting node's media address is correct
- *
- * If media address doesn't match and the link is working, reject the
- * request (must be from a duplicate node).
- *
- * If media address doesn't match and the link is not working, accept
- * the new media address and reset the link to ensure it starts up
- * cleanly.
- */
- if (addr_mismatch) {
- if (tipc_link_is_up(link)) {
- disc_dupl_alert(b_ptr, orig, &media_addr);
- tipc_node_unlock(n_ptr);
- return;
- } else {
- memcpy(&link->media_addr, &media_addr,
- sizeof(media_addr));
- tipc_link_reset(link);
- }
- }
+ if (accept_sign)
+ node->signature = signature;
- /* Create a link endpoint for this bearer, if necessary */
- if (!link) {
- link = tipc_link_create(n_ptr, b_ptr, &media_addr);
- if (!link) {
- tipc_node_unlock(n_ptr);
- return;
+ if (accept_addr) {
+ if (!link)
+ link = tipc_link_create(node, bearer, &maddr);
+ if (link) {
+ memcpy(&link->media_addr, &maddr, sizeof(maddr));
+ tipc_link_reset(link);
+ } else {
+ respond = false;
}
}
- /* Accept discovery message & send response, if necessary */
- link_fully_up = link_working_working(link);
-
- if ((type == DSC_REQ_MSG) && !link_fully_up) {
- rbuf = tipc_disc_init_msg(DSC_RESP_MSG, b_ptr);
+ /* Send response, if necessary */
+ if (respond && (mtyp == DSC_REQ_MSG)) {
+ rbuf = tipc_buf_acquire(INT_H_SIZE);
if (rbuf) {
- tipc_bearer_send(b_ptr, rbuf, &media_addr);
+ tipc_disc_init_msg(rbuf, DSC_RESP_MSG, bearer);
+ tipc_bearer_send(bearer->identity, rbuf, &maddr);
kfree_skb(rbuf);
}
}
-
- tipc_node_unlock(n_ptr);
+ tipc_node_unlock(node);
}
/**
@@ -303,7 +306,7 @@ static void disc_timeout(struct tipc_link_req *req)
spin_lock_bh(&req->lock);
/* Stop searching if only desired node has been found */
- if (tipc_node(req->bearer->domain) && req->num_nodes) {
+ if (tipc_node(req->domain) && req->num_nodes) {
req->timer_intv = TIPC_LINK_REQ_INACTIVE;
goto exit;
}
@@ -315,7 +318,7 @@ static void disc_timeout(struct tipc_link_req *req)
* hold at fast polling rate if don't have any associated nodes,
* otherwise hold at slow polling rate
*/
- tipc_bearer_send(req->bearer, req->buf, &req->dest);
+ tipc_bearer_send(req->bearer_id, req->buf, &req->dest);
req->timer_intv *= 2;
@@ -347,21 +350,23 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest)
if (!req)
return -ENOMEM;
- req->buf = tipc_disc_init_msg(DSC_REQ_MSG, b_ptr);
+ req->buf = tipc_buf_acquire(INT_H_SIZE);
if (!req->buf) {
kfree(req);
- return -ENOMSG;
+ return -ENOMEM;
}
+ tipc_disc_init_msg(req->buf, DSC_REQ_MSG, b_ptr);
memcpy(&req->dest, dest, sizeof(*dest));
- req->bearer = b_ptr;
+ req->bearer_id = b_ptr->identity;
+ req->domain = b_ptr->domain;
req->num_nodes = 0;
req->timer_intv = TIPC_LINK_REQ_INIT;
spin_lock_init(&req->lock);
k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req);
k_start_timer(&req->timer, req->timer_intv);
b_ptr->link_req = req;
- tipc_bearer_send(req->bearer, req->buf, &req->dest);
+ tipc_bearer_send(req->bearer_id, req->buf, &req->dest);
return 0;
}
@@ -376,3 +381,23 @@ void tipc_disc_delete(struct tipc_link_req *req)
kfree_skb(req->buf);
kfree(req);
}
+
+/**
+ * tipc_disc_reset - reset object to send periodic link setup requests
+ * @b_ptr: ptr to bearer issuing requests
+ * @dest_domain: network domain to which links can be established
+ */
+void tipc_disc_reset(struct tipc_bearer *b_ptr)
+{
+ struct tipc_link_req *req = b_ptr->link_req;
+
+ spin_lock_bh(&req->lock);
+ tipc_disc_init_msg(req->buf, DSC_REQ_MSG, b_ptr);
+ req->bearer_id = b_ptr->identity;
+ req->domain = b_ptr->domain;
+ req->num_nodes = 0;
+ req->timer_intv = TIPC_LINK_REQ_INIT;
+ k_start_timer(&req->timer, req->timer_intv);
+ tipc_bearer_send(req->bearer_id, req->buf, &req->dest);
+ spin_unlock_bh(&req->lock);
+}
diff --git a/net/tipc/discover.h b/net/tipc/discover.h
index 07f34729459..515b57392f4 100644
--- a/net/tipc/discover.h
+++ b/net/tipc/discover.h
@@ -41,6 +41,7 @@ struct tipc_link_req;
int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest);
void tipc_disc_delete(struct tipc_link_req *req);
+void tipc_disc_reset(struct tipc_bearer *b_ptr);
void tipc_disc_add_dest(struct tipc_link_req *req);
void tipc_disc_remove_dest(struct tipc_link_req *req);
void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr);
diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c
index 67cf3f935db..5e1426f1751 100644
--- a/net/tipc/eth_media.c
+++ b/net/tipc/eth_media.c
@@ -1,7 +1,7 @@
/*
* net/tipc/eth_media.c: Ethernet bearer support for TIPC
*
- * Copyright (c) 2001-2007, 2013, Ericsson AB
+ * Copyright (c) 2001-2007, 2013-2014, Ericsson AB
* Copyright (c) 2005-2008, 2011-2013, Wind River Systems
* All rights reserved.
*
@@ -37,39 +37,52 @@
#include "core.h"
#include "bearer.h"
-#define ETH_ADDR_OFFSET 4 /* message header offset of MAC address */
+#define ETH_ADDR_OFFSET 4 /* MAC addr position inside address field */
-/* convert Ethernet address to string */
-static int tipc_eth_addr2str(struct tipc_media_addr *a, char *str_buf,
- int str_size)
+/* Convert Ethernet address (media address format) to string */
+static int tipc_eth_addr2str(struct tipc_media_addr *addr,
+ char *strbuf, int bufsz)
{
- if (str_size < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */
+ if (bufsz < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */
return 1;
- sprintf(str_buf, "%pM", a->value);
+ sprintf(strbuf, "%pM", addr->value);
return 0;
}
-/* convert Ethernet address format to message header format */
-static int tipc_eth_addr2msg(struct tipc_media_addr *a, char *msg_area)
+/* Convert from media address format to discovery message addr format */
+static int tipc_eth_addr2msg(char *msg, struct tipc_media_addr *addr)
{
- memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE);
- msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH;
- memcpy(msg_area + ETH_ADDR_OFFSET, a->value, ETH_ALEN);
+ memset(msg, 0, TIPC_MEDIA_ADDR_SIZE);
+ msg[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH;
+ memcpy(msg + ETH_ADDR_OFFSET, addr->value, ETH_ALEN);
return 0;
}
-/* convert message header address format to Ethernet format */
-static int tipc_eth_msg2addr(const struct tipc_bearer *tb_ptr,
- struct tipc_media_addr *a, char *msg_area)
+/* Convert raw mac address format to media addr format */
+static int tipc_eth_raw2addr(struct tipc_bearer *b,
+ struct tipc_media_addr *addr,
+ char *msg)
{
- if (msg_area[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_ETH)
- return 1;
+ char bcast_mac[ETH_ALEN] = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
- tipc_l2_media_addr_set(tb_ptr, a, msg_area + ETH_ADDR_OFFSET);
+ memset(addr, 0, sizeof(*addr));
+ ether_addr_copy(addr->value, msg);
+ addr->media_id = TIPC_MEDIA_TYPE_ETH;
+ addr->broadcast = !memcmp(addr->value, bcast_mac, ETH_ALEN);
return 0;
}
+/* Convert discovery msg addr format to Ethernet media addr format */
+static int tipc_eth_msg2addr(struct tipc_bearer *b,
+ struct tipc_media_addr *addr,
+ char *msg)
+{
+ /* Skip past preamble: */
+ msg += ETH_ADDR_OFFSET;
+ return tipc_eth_raw2addr(b, addr, msg);
+}
+
/* Ethernet media registration info */
struct tipc_media eth_media_info = {
.send_msg = tipc_l2_send_msg,
@@ -78,6 +91,7 @@ struct tipc_media eth_media_info = {
.addr2str = tipc_eth_addr2str,
.addr2msg = tipc_eth_addr2msg,
.msg2addr = tipc_eth_msg2addr,
+ .raw2addr = tipc_eth_raw2addr,
.priority = TIPC_DEF_LINK_PRI,
.tolerance = TIPC_DEF_LINK_TOL,
.window = TIPC_DEF_LINK_WIN,
@@ -85,4 +99,3 @@ struct tipc_media eth_media_info = {
.hwaddr_len = ETH_ALEN,
.name = "eth"
};
-
diff --git a/net/tipc/handler.c b/net/tipc/handler.c
deleted file mode 100644
index 1fabf160501..00000000000
--- a/net/tipc/handler.c
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * net/tipc/handler.c: TIPC signal handling
- *
- * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * 3. Neither the names of the copyright holders nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * Alternatively, this software may be distributed under the terms of the
- * GNU General Public License ("GPL") version 2 as published by the Free
- * Software Foundation.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "core.h"
-
-struct queue_item {
- struct list_head next_signal;
- void (*handler) (unsigned long);
- unsigned long data;
-};
-
-static struct kmem_cache *tipc_queue_item_cache;
-static struct list_head signal_queue_head;
-static DEFINE_SPINLOCK(qitem_lock);
-static int handler_enabled __read_mostly;
-
-static void process_signal_queue(unsigned long dummy);
-
-static DECLARE_TASKLET_DISABLED(tipc_tasklet, process_signal_queue, 0);
-
-
-unsigned int tipc_k_signal(Handler routine, unsigned long argument)
-{
- struct queue_item *item;
-
- spin_lock_bh(&qitem_lock);
- if (!handler_enabled) {
- spin_unlock_bh(&qitem_lock);
- return -ENOPROTOOPT;
- }
-
- item = kmem_cache_alloc(tipc_queue_item_cache, GFP_ATOMIC);
- if (!item) {
- pr_err("Signal queue out of memory\n");
- spin_unlock_bh(&qitem_lock);
- return -ENOMEM;
- }
- item->handler = routine;
- item->data = argument;
- list_add_tail(&item->next_signal, &signal_queue_head);
- spin_unlock_bh(&qitem_lock);
- tasklet_schedule(&tipc_tasklet);
- return 0;
-}
-
-static void process_signal_queue(unsigned long dummy)
-{
- struct queue_item *__volatile__ item;
- struct list_head *l, *n;
-
- spin_lock_bh(&qitem_lock);
- list_for_each_safe(l, n, &signal_queue_head) {
- item = list_entry(l, struct queue_item, next_signal);
- list_del(&item->next_signal);
- spin_unlock_bh(&qitem_lock);
- item->handler(item->data);
- spin_lock_bh(&qitem_lock);
- kmem_cache_free(tipc_queue_item_cache, item);
- }
- spin_unlock_bh(&qitem_lock);
-}
-
-int tipc_handler_start(void)
-{
- tipc_queue_item_cache =
- kmem_cache_create("tipc_queue_items", sizeof(struct queue_item),
- 0, SLAB_HWCACHE_ALIGN, NULL);
- if (!tipc_queue_item_cache)
- return -ENOMEM;
-
- INIT_LIST_HEAD(&signal_queue_head);
- tasklet_enable(&tipc_tasklet);
- handler_enabled = 1;
- return 0;
-}
-
-void tipc_handler_stop(void)
-{
- struct list_head *l, *n;
- struct queue_item *item;
-
- spin_lock_bh(&qitem_lock);
- if (!handler_enabled) {
- spin_unlock_bh(&qitem_lock);
- return;
- }
- handler_enabled = 0;
- spin_unlock_bh(&qitem_lock);
-
- tasklet_kill(&tipc_tasklet);
-
- spin_lock_bh(&qitem_lock);
- list_for_each_safe(l, n, &signal_queue_head) {
- item = list_entry(l, struct queue_item, next_signal);
- list_del(&item->next_signal);
- kmem_cache_free(tipc_queue_item_cache, item);
- }
- spin_unlock_bh(&qitem_lock);
-
- kmem_cache_destroy(tipc_queue_item_cache);
-}
diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c
index 844a77e2582..8522eef9c13 100644
--- a/net/tipc/ib_media.c
+++ b/net/tipc/ib_media.c
@@ -42,7 +42,7 @@
#include "core.h"
#include "bearer.h"
-/* convert InfiniBand address to string */
+/* convert InfiniBand address (media address format) media address to string */
static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf,
int str_size)
{
@@ -54,23 +54,35 @@ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf,
return 0;
}
-/* convert InfiniBand address format to message header format */
-static int tipc_ib_addr2msg(struct tipc_media_addr *a, char *msg_area)
+/* Convert from media address format to discovery message addr format */
+static int tipc_ib_addr2msg(char *msg, struct tipc_media_addr *addr)
{
- memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE);
- msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_IB;
- memcpy(msg_area, a->value, INFINIBAND_ALEN);
+ memset(msg, 0, TIPC_MEDIA_ADDR_SIZE);
+ memcpy(msg, addr->value, INFINIBAND_ALEN);
return 0;
}
-/* convert message header address format to InfiniBand format */
-static int tipc_ib_msg2addr(const struct tipc_bearer *tb_ptr,
- struct tipc_media_addr *a, char *msg_area)
+/* Convert raw InfiniBand address format to media addr format */
+static int tipc_ib_raw2addr(struct tipc_bearer *b,
+ struct tipc_media_addr *addr,
+ char *msg)
{
- tipc_l2_media_addr_set(tb_ptr, a, msg_area);
+ memset(addr, 0, sizeof(*addr));
+ memcpy(addr->value, msg, INFINIBAND_ALEN);
+ addr->media_id = TIPC_MEDIA_TYPE_IB;
+ addr->broadcast = !memcmp(msg, b->bcast_addr.value,
+ INFINIBAND_ALEN);
return 0;
}
+/* Convert discovery msg addr format to InfiniBand media addr format */
+static int tipc_ib_msg2addr(struct tipc_bearer *b,
+ struct tipc_media_addr *addr,
+ char *msg)
+{
+ return tipc_ib_raw2addr(b, addr, msg);
+}
+
/* InfiniBand media registration info */
struct tipc_media ib_media_info = {
.send_msg = tipc_l2_send_msg,
@@ -79,6 +91,7 @@ struct tipc_media ib_media_info = {
.addr2str = tipc_ib_addr2str,
.addr2msg = tipc_ib_addr2msg,
.msg2addr = tipc_ib_msg2addr,
+ .raw2addr = tipc_ib_raw2addr,
.priority = TIPC_DEF_LINK_PRI,
.tolerance = TIPC_DEF_LINK_TOL,
.window = TIPC_DEF_LINK_WIN,
@@ -86,4 +99,3 @@ struct tipc_media ib_media_info = {
.hwaddr_len = INFINIBAND_ALEN,
.name = "ib"
};
-
diff --git a/net/tipc/link.c b/net/tipc/link.c
index c5190ab7529..fb1485dc673 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -37,6 +37,7 @@
#include "core.h"
#include "link.h"
#include "port.h"
+#include "socket.h"
#include "name_distr.h"
#include "discover.h"
#include "config.h"
@@ -81,15 +82,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
struct sk_buff **buf);
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
-static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
- struct iovec const *msg_sect,
- unsigned int len, u32 destnode);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
static void tipc_link_sync_xmit(struct tipc_link *l);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
+static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
+static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
/*
* Simple link routines
@@ -101,9 +100,18 @@ static unsigned int align(unsigned int i)
static void link_init_max_pkt(struct tipc_link *l_ptr)
{
+ struct tipc_bearer *b_ptr;
u32 max_pkt;
- max_pkt = (l_ptr->b_ptr->mtu & ~3);
+ rcu_read_lock();
+ b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
+ if (!b_ptr) {
+ rcu_read_unlock();
+ return;
+ }
+ max_pkt = (b_ptr->mtu & ~3);
+ rcu_read_unlock();
+
if (max_pkt > MAX_MSG_SIZE)
max_pkt = MAX_MSG_SIZE;
@@ -248,7 +256,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->owner = n_ptr;
l_ptr->checkpoint = 1;
l_ptr->peer_session = INVALID_SESSION;
- l_ptr->b_ptr = b_ptr;
+ l_ptr->bearer_id = b_ptr->identity;
link_set_supervision_props(l_ptr, b_ptr->tolerance);
l_ptr->state = RESET_UNKNOWN;
@@ -263,6 +271,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->priority = b_ptr->priority;
tipc_link_set_queue_limits(l_ptr, b_ptr->window);
+ l_ptr->net_plane = b_ptr->net_plane;
link_init_max_pkt(l_ptr);
l_ptr->next_out_no = 1;
@@ -287,14 +296,14 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
- spin_lock_bh(&n_ptr->lock);
+ tipc_node_lock(n_ptr);
l_ptr = n_ptr->links[bearer_id];
if (l_ptr) {
tipc_link_reset(l_ptr);
if (shutting_down || !tipc_node_is_up(n_ptr)) {
tipc_node_detach_link(l_ptr->owner, l_ptr);
tipc_link_reset_fragments(l_ptr);
- spin_unlock_bh(&n_ptr->lock);
+ tipc_node_unlock(n_ptr);
/* Nobody else can access this link now: */
del_timer_sync(&l_ptr->timer);
@@ -302,12 +311,12 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
} else {
/* Detach/delete when failover is finished: */
l_ptr->flags |= LINK_STOPPED;
- spin_unlock_bh(&n_ptr->lock);
+ tipc_node_unlock(n_ptr);
del_timer_sync(&l_ptr->timer);
}
continue;
}
- spin_unlock_bh(&n_ptr->lock);
+ tipc_node_unlock(n_ptr);
}
rcu_read_unlock();
}
@@ -324,13 +333,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
{
struct tipc_port *p_ptr;
+ struct tipc_sock *tsk;
spin_lock_bh(&tipc_port_list_lock);
p_ptr = tipc_port_lock(origport);
if (p_ptr) {
if (!list_empty(&p_ptr->wait_list))
goto exit;
- p_ptr->congested = 1;
+ tsk = tipc_port_to_sock(p_ptr);
+ tsk->link_cong = 1;
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
l_ptr->stats.link_congs++;
@@ -344,6 +355,7 @@ exit:
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
{
struct tipc_port *p_ptr;
+ struct tipc_sock *tsk;
struct tipc_port *temp_p_ptr;
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
@@ -359,10 +371,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
wait_list) {
if (win <= 0)
break;
+ tsk = tipc_port_to_sock(p_ptr);
list_del_init(&p_ptr->wait_list);
spin_lock_bh(p_ptr->lock);
- p_ptr->congested = 0;
- tipc_port_wakeup(p_ptr);
+ tsk->link_cong = 0;
+ tipc_sock_wakeup(tsk);
win -= p_ptr->waiting_pkts;
spin_unlock_bh(p_ptr->lock);
}
@@ -388,9 +401,8 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
*/
void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{
- kfree_skb(l_ptr->reasm_head);
- l_ptr->reasm_head = NULL;
- l_ptr->reasm_tail = NULL;
+ kfree_skb(l_ptr->reasm_buf);
+ l_ptr->reasm_buf = NULL;
}
/**
@@ -426,7 +438,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
return;
tipc_node_link_down(l_ptr->owner, l_ptr);
- tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
+ tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr);
if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
l_ptr->reset_checkpoint = checkpoint;
@@ -464,11 +476,11 @@ void tipc_link_reset_list(unsigned int bearer_id)
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
- spin_lock_bh(&n_ptr->lock);
+ tipc_node_lock(n_ptr);
l_ptr = n_ptr->links[bearer_id];
if (l_ptr)
tipc_link_reset(l_ptr);
- spin_unlock_bh(&n_ptr->lock);
+ tipc_node_unlock(n_ptr);
}
rcu_read_unlock();
}
@@ -477,7 +489,7 @@ static void link_activate(struct tipc_link *l_ptr)
{
l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
tipc_node_link_up(l_ptr->owner, l_ptr);
- tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
+ tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr);
}
/**
@@ -666,180 +678,142 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
}
}
-/*
- * link_bundle_buf(): Append contents of a buffer to
- * the tail of an existing one.
+/* tipc_link_cong: determine return value and how to treat the
+ * sent buffer during link congestion.
+ * - For plain, errorless user data messages we keep the buffer and
+ * return -ELINKONG.
+ * - For all other messages we discard the buffer and return -EHOSTUNREACH
+ * - For TIPC internal messages we also reset the link
*/
-static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
- struct sk_buff *buf)
+static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
{
- struct tipc_msg *bundler_msg = buf_msg(bundler);
struct tipc_msg *msg = buf_msg(buf);
- u32 size = msg_size(msg);
- u32 bundle_size = msg_size(bundler_msg);
- u32 to_pos = align(bundle_size);
- u32 pad = to_pos - bundle_size;
-
- if (msg_user(bundler_msg) != MSG_BUNDLER)
- return 0;
- if (msg_type(bundler_msg) != OPEN_MSG)
- return 0;
- if (skb_tailroom(bundler) < (pad + size))
- return 0;
- if (l_ptr->max_pkt < (to_pos + size))
- return 0;
-
- skb_put(bundler, pad + size);
- skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
- msg_set_size(bundler_msg, to_pos + size);
- msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
- kfree_skb(buf);
- l_ptr->stats.sent_bundled++;
- return 1;
-}
-
-static void link_add_to_outqueue(struct tipc_link *l_ptr,
- struct sk_buff *buf,
- struct tipc_msg *msg)
-{
- u32 ack = mod(l_ptr->next_in_no - 1);
- u32 seqno = mod(l_ptr->next_out_no++);
-
- msg_set_word(msg, 2, ((ack << 16) | seqno));
- msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- buf->next = NULL;
- if (l_ptr->first_out) {
- l_ptr->last_out->next = buf;
- l_ptr->last_out = buf;
- } else
- l_ptr->first_out = l_ptr->last_out = buf;
-
- l_ptr->out_queue_size++;
- if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
- l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
-}
-
-static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
- struct sk_buff *buf_chain,
- u32 long_msgno)
-{
- struct sk_buff *buf;
- struct tipc_msg *msg;
+ uint psz = msg_size(msg);
+ uint imp = tipc_msg_tot_importance(msg);
+ u32 oport = msg_tot_origport(msg);
- if (!l_ptr->next_out)
- l_ptr->next_out = buf_chain;
- while (buf_chain) {
- buf = buf_chain;
- buf_chain = buf_chain->next;
-
- msg = buf_msg(buf);
- msg_set_long_msgno(msg, long_msgno);
- link_add_to_outqueue(l_ptr, buf, msg);
+ if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
+ if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
+ link_schedule_port(link, oport, psz);
+ return -ELINKCONG;
+ }
+ } else {
+ pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
+ tipc_link_reset(link);
}
+ kfree_skb_list(buf);
+ return -EHOSTUNREACH;
}
-/*
- * tipc_link_xmit() is the 'full path' for messages, called from
- * inside TIPC when the 'fast path' in tipc_send_xmit
- * has failed, and from link_send()
+/**
+ * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
+ * @link: link to use
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
+ * user data messages) or -EHOSTUNREACH (all other messages/senders)
+ * Only the socket functions tipc_send_stream() and tipc_send_packet() need
+ * to act on the return value, since they may need to do more send attempts.
*/
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
- u32 size = msg_size(msg);
- u32 dsz = msg_data_sz(msg);
- u32 queue_size = l_ptr->out_queue_size;
- u32 imp = tipc_msg_tot_importance(msg);
- u32 queue_limit = l_ptr->queue_limit[imp];
- u32 max_packet = l_ptr->max_pkt;
-
- /* Match msg importance against queue limits: */
- if (unlikely(queue_size >= queue_limit)) {
- if (imp <= TIPC_CRITICAL_IMPORTANCE) {
- link_schedule_port(l_ptr, msg_origport(msg), size);
- kfree_skb(buf);
- return -ELINKCONG;
- }
- kfree_skb(buf);
- if (imp > CONN_MANAGER) {
- pr_warn("%s<%s>, send queue full", link_rst_msg,
- l_ptr->name);
- tipc_link_reset(l_ptr);
+ uint psz = msg_size(msg);
+ uint qsz = link->out_queue_size;
+ uint sndlim = link->queue_limit[0];
+ uint imp = tipc_msg_tot_importance(msg);
+ uint mtu = link->max_pkt;
+ uint ack = mod(link->next_in_no - 1);
+ uint seqno = link->next_out_no;
+ uint bc_last_in = link->owner->bclink.last_in;
+ struct tipc_media_addr *addr = &link->media_addr;
+ struct sk_buff *next = buf->next;
+
+ /* Match queue limits against msg importance: */
+ if (unlikely(qsz >= link->queue_limit[imp]))
+ return tipc_link_cong(link, buf);
+
+ /* Has valid packet limit been used ? */
+ if (unlikely(psz > mtu)) {
+ kfree_skb_list(buf);
+ return -EMSGSIZE;
+ }
+
+ /* Prepare each packet for sending, and add to outqueue: */
+ while (buf) {
+ next = buf->next;
+ msg = buf_msg(buf);
+ msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+ msg_set_bcast_ack(msg, bc_last_in);
+
+ if (!link->first_out) {
+ link->first_out = buf;
+ } else if (qsz < sndlim) {
+ link->last_out->next = buf;
+ } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
+ link->stats.sent_bundled++;
+ buf = next;
+ next = buf->next;
+ continue;
+ } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
+ link->stats.sent_bundled++;
+ link->stats.sent_bundles++;
+ link->last_out->next = buf;
+ if (!link->next_out)
+ link->next_out = buf;
+ } else {
+ link->last_out->next = buf;
+ if (!link->next_out)
+ link->next_out = buf;
}
- return dsz;
- }
- /* Fragmentation needed ? */
- if (size > max_packet)
- return tipc_link_frag_xmit(l_ptr, buf);
-
- /* Packet can be queued or sent. */
- if (likely(!link_congested(l_ptr))) {
- link_add_to_outqueue(l_ptr, buf, msg);
-
- tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
- l_ptr->unacked_window = 0;
- return dsz;
- }
- /* Congestion: can message be bundled ? */
- if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
- (msg_user(msg) != MSG_FRAGMENTER)) {
-
- /* Try adding message to an existing bundle */
- if (l_ptr->next_out &&
- link_bundle_buf(l_ptr, l_ptr->last_out, buf))
- return dsz;
-
- /* Try creating a new bundle */
- if (size <= max_packet * 2 / 3) {
- struct sk_buff *bundler = tipc_buf_acquire(max_packet);
- struct tipc_msg bundler_hdr;
-
- if (bundler) {
- tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
- INT_H_SIZE, l_ptr->addr);
- skb_copy_to_linear_data(bundler, &bundler_hdr,
- INT_H_SIZE);
- skb_trim(bundler, INT_H_SIZE);
- link_bundle_buf(l_ptr, bundler, buf);
- buf = bundler;
- msg = buf_msg(buf);
- l_ptr->stats.sent_bundles++;
- }
+ /* Send packet if possible: */
+ if (likely(++qsz <= sndlim)) {
+ tipc_bearer_send(link->bearer_id, buf, addr);
+ link->next_out = next;
+ link->unacked_window = 0;
}
+ seqno++;
+ link->last_out = buf;
+ buf = next;
}
- if (!l_ptr->next_out)
- l_ptr->next_out = buf;
- link_add_to_outqueue(l_ptr, buf, msg);
- return dsz;
+ link->next_out_no = seqno;
+ link->out_queue_size = qsz;
+ return 0;
}
-/*
- * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
- * has not been selected yet, and the the owner node is not locked
- * Called by TIPC internal users, e.g. the name distributor
+/**
+ * tipc_link_xmit() is the general link level function for message sending
+ * @buf: chain of buffers containing message
+ * @dsz: amount of user data to be sent
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
-int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
+int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
{
- struct tipc_link *l_ptr;
- struct tipc_node *n_ptr;
- int res = -ELINKCONG;
+ struct tipc_link *link = NULL;
+ struct tipc_node *node;
+ int rc = -EHOSTUNREACH;
- read_lock_bh(&tipc_net_lock);
- n_ptr = tipc_node_find(dest);
- if (n_ptr) {
- tipc_node_lock(n_ptr);
- l_ptr = n_ptr->active_links[selector & 1];
- if (l_ptr)
- res = __tipc_link_xmit(l_ptr, buf);
- else
- kfree_skb(buf);
- tipc_node_unlock(n_ptr);
- } else {
- kfree_skb(buf);
+ node = tipc_node_find(dnode);
+ if (node) {
+ tipc_node_lock(node);
+ link = node->active_links[selector & 1];
+ if (link)
+ rc = __tipc_link_xmit(link, buf);
+ tipc_node_unlock(node);
}
- read_unlock_bh(&tipc_net_lock);
- return res;
+
+ if (link)
+ return rc;
+
+ if (likely(in_own_node(dnode)))
+ return tipc_sk_rcv(buf);
+
+ kfree_skb_list(buf);
+ return rc;
}
/*
@@ -850,7 +824,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
*
* Called with node locked
*/
-static void tipc_link_sync_xmit(struct tipc_link *l)
+static void tipc_link_sync_xmit(struct tipc_link *link)
{
struct sk_buff *buf;
struct tipc_msg *msg;
@@ -860,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l)
return;
msg = buf_msg(buf);
- tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
- msg_set_last_bcast(msg, l->owner->bclink.acked);
- link_add_chain_to_outqueue(l, buf, 0);
- tipc_link_push_queue(l);
+ tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
+ msg_set_last_bcast(msg, link->owner->bclink.acked);
+ __tipc_link_xmit(link, buf);
}
/*
@@ -884,299 +857,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
}
/*
- * tipc_link_names_xmit - send name table entries to new neighbor
- *
- * Send routine for bulk delivery of name table messages when contact
- * with a new neighbor occurs. No link congestion checking is performed
- * because name table messages *must* be delivered. The messages must be
- * small enough not to require fragmentation.
- * Called without any locks held.
- */
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
-{
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
- struct sk_buff *buf;
- struct sk_buff *temp_buf;
-
- if (list_empty(message_list))
- return;
-
- read_lock_bh(&tipc_net_lock);
- n_ptr = tipc_node_find(dest);
- if (n_ptr) {
- tipc_node_lock(n_ptr);
- l_ptr = n_ptr->active_links[0];
- if (l_ptr) {
- /* convert circular list to linear list */
- ((struct sk_buff *)message_list->prev)->next = NULL;
- link_add_chain_to_outqueue(l_ptr,
- (struct sk_buff *)message_list->next, 0);
- tipc_link_push_queue(l_ptr);
- INIT_LIST_HEAD(message_list);
- }
- tipc_node_unlock(n_ptr);
- }
- read_unlock_bh(&tipc_net_lock);
-
- /* discard the messages if they couldn't be sent */
- list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
- list_del((struct list_head *)buf);
- kfree_skb(buf);
- }
-}
-
-/*
- * tipc_link_xmit_fast: Entry for data messages where the
- * destination link is known and the header is complete,
- * inclusive total message length. Very time critical.
- * Link is locked. Returns user data length.
- */
-static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
- u32 *used_max_pkt)
-{
- struct tipc_msg *msg = buf_msg(buf);
- int res = msg_data_sz(msg);
-
- if (likely(!link_congested(l_ptr))) {
- if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
- link_add_to_outqueue(l_ptr, buf, msg);
- tipc_bearer_send(l_ptr->b_ptr, buf,
- &l_ptr->media_addr);
- l_ptr->unacked_window = 0;
- return res;
- }
- else
- *used_max_pkt = l_ptr->max_pkt;
- }
- return __tipc_link_xmit(l_ptr, buf); /* All other cases */
-}
-
-/*
- * tipc_link_iovec_xmit_fast: Entry for messages where the
- * destination processor is known and the header is complete,
- * except for total message length.
- * Returns user data length or errno.
- */
-int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
- struct iovec const *msg_sect,
- unsigned int len, u32 destaddr)
-{
- struct tipc_msg *hdr = &sender->phdr;
- struct tipc_link *l_ptr;
- struct sk_buff *buf;
- struct tipc_node *node;
- int res;
- u32 selector = msg_origport(hdr) & 1;
-
-again:
- /*
- * Try building message using port's max_pkt hint.
- * (Must not hold any locks while building message.)
- */
- res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
- /* Exit if build request was invalid */
- if (unlikely(res < 0))
- return res;
-
- read_lock_bh(&tipc_net_lock);
- node = tipc_node_find(destaddr);
- if (likely(node)) {
- tipc_node_lock(node);
- l_ptr = node->active_links[selector];
- if (likely(l_ptr)) {
- if (likely(buf)) {
- res = tipc_link_xmit_fast(l_ptr, buf,
- &sender->max_pkt);
-exit:
- tipc_node_unlock(node);
- read_unlock_bh(&tipc_net_lock);
- return res;
- }
-
- /* Exit if link (or bearer) is congested */
- if (link_congested(l_ptr)) {
- res = link_schedule_port(l_ptr,
- sender->ref, res);
- goto exit;
- }
-
- /*
- * Message size exceeds max_pkt hint; update hint,
- * then re-try fast path or fragment the message
- */
- sender->max_pkt = l_ptr->max_pkt;
- tipc_node_unlock(node);
- read_unlock_bh(&tipc_net_lock);
-
-
- if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
- goto again;
-
- return tipc_link_iovec_long_xmit(sender, msg_sect,
- len, destaddr);
- }
- tipc_node_unlock(node);
- }
- read_unlock_bh(&tipc_net_lock);
-
- /* Couldn't find a link to the destination node */
- kfree_skb(buf);
- tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
- return -ENETUNREACH;
-}
-
-/*
- * tipc_link_iovec_long_xmit(): Entry for long messages where the
- * destination node is known and the header is complete,
- * inclusive total message length.
- * Link and bearer congestion status have been checked to be ok,
- * and are ignored if they change.
- *
- * Note that fragments do not use the full link MTU so that they won't have
- * to undergo refragmentation if link changeover causes them to be sent
- * over another link with an additional tunnel header added as prefix.
- * (Refragmentation will still occur if the other link has a smaller MTU.)
- *
- * Returns user data length or errno.
- */
-static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
- struct iovec const *msg_sect,
- unsigned int len, u32 destaddr)
-{
- struct tipc_link *l_ptr;
- struct tipc_node *node;
- struct tipc_msg *hdr = &sender->phdr;
- u32 dsz = len;
- u32 max_pkt, fragm_sz, rest;
- struct tipc_msg fragm_hdr;
- struct sk_buff *buf, *buf_chain, *prev;
- u32 fragm_crs, fragm_rest, hsz, sect_rest;
- const unchar __user *sect_crs;
- int curr_sect;
- u32 fragm_no;
- int res = 0;
-
-again:
- fragm_no = 1;
- max_pkt = sender->max_pkt - INT_H_SIZE;
- /* leave room for tunnel header in case of link changeover */
- fragm_sz = max_pkt - INT_H_SIZE;
- /* leave room for fragmentation header in each fragment */
- rest = dsz;
- fragm_crs = 0;
- fragm_rest = 0;
- sect_rest = 0;
- sect_crs = NULL;
- curr_sect = -1;
-
- /* Prepare reusable fragment header */
- tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
- INT_H_SIZE, msg_destnode(hdr));
- msg_set_size(&fragm_hdr, max_pkt);
- msg_set_fragm_no(&fragm_hdr, 1);
-
- /* Prepare header of first fragment */
- buf_chain = buf = tipc_buf_acquire(max_pkt);
- if (!buf)
- return -ENOMEM;
- buf->next = NULL;
- skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
- hsz = msg_hdr_sz(hdr);
- skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
-
- /* Chop up message */
- fragm_crs = INT_H_SIZE + hsz;
- fragm_rest = fragm_sz - hsz;
-
- do { /* For all sections */
- u32 sz;
-
- if (!sect_rest) {
- sect_rest = msg_sect[++curr_sect].iov_len;
- sect_crs = msg_sect[curr_sect].iov_base;
- }
-
- if (sect_rest < fragm_rest)
- sz = sect_rest;
- else
- sz = fragm_rest;
-
- if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
- res = -EFAULT;
-error:
- kfree_skb_list(buf_chain);
- return res;
- }
- sect_crs += sz;
- sect_rest -= sz;
- fragm_crs += sz;
- fragm_rest -= sz;
- rest -= sz;
-
- if (!fragm_rest && rest) {
-
- /* Initiate new fragment: */
- if (rest <= fragm_sz) {
- fragm_sz = rest;
- msg_set_type(&fragm_hdr, LAST_FRAGMENT);
- } else {
- msg_set_type(&fragm_hdr, FRAGMENT);
- }
- msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
- msg_set_fragm_no(&fragm_hdr, ++fragm_no);
- prev = buf;
- buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
- if (!buf) {
- res = -ENOMEM;
- goto error;
- }
-
- buf->next = NULL;
- prev->next = buf;
- skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
- fragm_crs = INT_H_SIZE;
- fragm_rest = fragm_sz;
- }
- } while (rest > 0);
-
- /*
- * Now we have a buffer chain. Select a link and check
- * that packet size is still OK
- */
- node = tipc_node_find(destaddr);
- if (likely(node)) {
- tipc_node_lock(node);
- l_ptr = node->active_links[sender->ref & 1];
- if (!l_ptr) {
- tipc_node_unlock(node);
- goto reject;
- }
- if (l_ptr->max_pkt < max_pkt) {
- sender->max_pkt = l_ptr->max_pkt;
- tipc_node_unlock(node);
- kfree_skb_list(buf_chain);
- goto again;
- }
- } else {
-reject:
- kfree_skb_list(buf_chain);
- tipc_port_iovec_reject(sender, hdr, msg_sect, len,
- TIPC_ERR_NO_NODE);
- return -ENETUNREACH;
- }
-
- /* Append chain of fragments to send queue & send them */
- l_ptr->long_msg_seq_no++;
- link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
- l_ptr->stats.sent_fragments += fragm_no;
- l_ptr->stats.sent_fragmented++;
- tipc_link_push_queue(l_ptr);
- tipc_node_unlock(node);
- return dsz;
-}
-
-/*
* tipc_link_push_packet: Push one unsent packet to the media
*/
static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
@@ -1204,7 +884,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (r_q_size && buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
l_ptr->retransm_queue_head = mod(++r_q_head);
l_ptr->retransm_queue_size = --r_q_size;
l_ptr->stats.retransmitted++;
@@ -1216,7 +896,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
kfree_skb(buf);
l_ptr->proto_msg_queue = NULL;
@@ -1233,9 +913,10 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (mod(next - first) < l_ptr->queue_limit[0]) {
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ tipc_bearer_send(l_ptr->bearer_id, buf,
+ &l_ptr->media_addr);
if (msg_user(msg) == MSG_BUNDLER)
- msg_set_type(msg, CLOSED_MSG);
+ msg_set_type(msg, BUNDLE_CLOSED);
l_ptr->next_out = buf->next;
return 0;
}
@@ -1256,33 +937,24 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
} while (!res);
}
-static void link_reset_all(unsigned long addr)
+void tipc_link_reset_all(struct tipc_node *node)
{
- struct tipc_node *n_ptr;
char addr_string[16];
u32 i;
- read_lock_bh(&tipc_net_lock);
- n_ptr = tipc_node_find((u32)addr);
- if (!n_ptr) {
- read_unlock_bh(&tipc_net_lock);
- return; /* node no longer exists */
- }
-
- tipc_node_lock(n_ptr);
+ tipc_node_lock(node);
pr_warn("Resetting all links to %s\n",
- tipc_addr_string_fill(addr_string, n_ptr->addr));
+ tipc_addr_string_fill(addr_string, node->addr));
for (i = 0; i < MAX_BEARERS; i++) {
- if (n_ptr->links[i]) {
- link_print(n_ptr->links[i], "Resetting link\n");
- tipc_link_reset(n_ptr->links[i]);
+ if (node->links[i]) {
+ link_print(node->links[i], "Resetting link\n");
+ tipc_link_reset(node->links[i]);
}
}
- tipc_node_unlock(n_ptr);
- read_unlock_bh(&tipc_net_lock);
+ tipc_node_unlock(node);
}
static void link_retransmit_failure(struct tipc_link *l_ptr,
@@ -1319,10 +991,9 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
n_ptr->bclink.oos_state,
n_ptr->bclink.last_sent);
- tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
-
tipc_node_unlock(n_ptr);
+ tipc_bclink_set_flags(TIPC_BCLINK_RESET);
l_ptr->stale_count = 0;
}
}
@@ -1352,7 +1023,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
msg = buf_msg(buf);
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
buf = buf->next;
retransmits--;
l_ptr->stats.retransmitted++;
@@ -1440,14 +1111,13 @@ static int link_recv_buf_validate(struct sk_buff *buf)
/**
* tipc_rcv - process TIPC packets/messages arriving from off-node
* @head: pointer to message buffer chain
- * @tb_ptr: pointer to bearer message arrived on
+ * @b_ptr: pointer to bearer message arrived on
*
* Invoked with no locks held. Bearer pointer must point to a valid bearer
* structure (i.e. cannot be NULL), but bearer can be inactive.
*/
void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
{
- read_lock_bh(&tipc_net_lock);
while (head) {
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
@@ -1497,14 +1167,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
goto unlock_discard;
/* Verify that communication with node is currently allowed */
- if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
- msg_user(msg) == LINK_PROTOCOL &&
- (msg_type(msg) == RESET_MSG ||
- msg_type(msg) == ACTIVATE_MSG) &&
- !msg_redundant_link(msg))
- n_ptr->block_setup &= ~WAIT_PEER_DOWN;
-
- if (n_ptr->block_setup)
+ if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
+ msg_user(msg) == LINK_PROTOCOL &&
+ (msg_type(msg) == RESET_MSG ||
+ msg_type(msg) == ACTIVATE_MSG) &&
+ !msg_redundant_link(msg))
+ n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
+
+ if (tipc_node_blocked(n_ptr))
goto unlock_discard;
/* Validate message sequence number info */
@@ -1535,11 +1205,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (unlikely(!list_empty(&l_ptr->waiting_ports)))
tipc_link_wakeup_ports(l_ptr, 0);
- if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
- l_ptr->stats.sent_acks++;
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
- }
-
/* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) {
@@ -1573,69 +1238,99 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (unlikely(l_ptr->oldest_deferred_in))
head = link_insert_deferred_queue(l_ptr, head);
- /* Deliver packet/message to correct user: */
- if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) {
- if (!tipc_link_tunnel_rcv(n_ptr, &buf)) {
- tipc_node_unlock(n_ptr);
- continue;
- }
- msg = buf_msg(buf);
- } else if (msg_user(msg) == MSG_FRAGMENTER) {
- int rc;
-
- l_ptr->stats.recv_fragments++;
- rc = tipc_link_frag_rcv(&l_ptr->reasm_head,
- &l_ptr->reasm_tail,
- &buf);
- if (rc == LINK_REASM_COMPLETE) {
- l_ptr->stats.recv_fragmented++;
- msg = buf_msg(buf);
- } else {
- if (rc == LINK_REASM_ERROR)
- tipc_link_reset(l_ptr);
- tipc_node_unlock(n_ptr);
- continue;
- }
+ if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
+ l_ptr->stats.sent_acks++;
+ tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
- switch (msg_user(msg)) {
- case TIPC_LOW_IMPORTANCE:
- case TIPC_MEDIUM_IMPORTANCE:
- case TIPC_HIGH_IMPORTANCE:
- case TIPC_CRITICAL_IMPORTANCE:
- tipc_node_unlock(n_ptr);
- tipc_port_rcv(buf);
- continue;
- case MSG_BUNDLER:
- l_ptr->stats.recv_bundles++;
- l_ptr->stats.recv_bundled += msg_msgcnt(msg);
- tipc_node_unlock(n_ptr);
- tipc_link_bundle_rcv(buf);
- continue;
- case NAME_DISTRIBUTOR:
- n_ptr->bclink.recv_permitted = true;
- tipc_node_unlock(n_ptr);
- tipc_named_rcv(buf);
- continue;
- case CONN_MANAGER:
+ if (tipc_link_prepare_input(l_ptr, &buf)) {
tipc_node_unlock(n_ptr);
- tipc_port_proto_rcv(buf);
continue;
- case BCAST_PROTOCOL:
- tipc_link_sync_rcv(n_ptr, buf);
- break;
- default:
- kfree_skb(buf);
- break;
}
tipc_node_unlock(n_ptr);
+ msg = buf_msg(buf);
+ if (tipc_link_input(l_ptr, buf) != 0)
+ goto discard;
continue;
unlock_discard:
tipc_node_unlock(n_ptr);
discard:
kfree_skb(buf);
}
- read_unlock_bh(&tipc_net_lock);
+}
+
+/**
+ * tipc_link_prepare_input - process TIPC link messages
+ *
+ * returns nonzero if the message was consumed
+ *
+ * Node lock must be held
+ */
+static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
+{
+ struct tipc_node *n;
+ struct tipc_msg *msg;
+ int res = -EINVAL;
+
+ n = l->owner;
+ msg = buf_msg(*buf);
+ switch (msg_user(msg)) {
+ case CHANGEOVER_PROTOCOL:
+ if (tipc_link_tunnel_rcv(n, buf))
+ res = 0;
+ break;
+ case MSG_FRAGMENTER:
+ l->stats.recv_fragments++;
+ if (tipc_buf_append(&l->reasm_buf, buf)) {
+ l->stats.recv_fragmented++;
+ res = 0;
+ } else if (!l->reasm_buf) {
+ tipc_link_reset(l);
+ }
+ break;
+ case MSG_BUNDLER:
+ l->stats.recv_bundles++;
+ l->stats.recv_bundled += msg_msgcnt(msg);
+ res = 0;
+ break;
+ case NAME_DISTRIBUTOR:
+ n->bclink.recv_permitted = true;
+ res = 0;
+ break;
+ case BCAST_PROTOCOL:
+ tipc_link_sync_rcv(n, *buf);
+ break;
+ default:
+ res = 0;
+ }
+ return res;
+}
+/**
+ * tipc_link_input - Deliver message too higher layers
+ */
+static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ int res = 0;
+
+ switch (msg_user(msg)) {
+ case TIPC_LOW_IMPORTANCE:
+ case TIPC_MEDIUM_IMPORTANCE:
+ case TIPC_HIGH_IMPORTANCE:
+ case TIPC_CRITICAL_IMPORTANCE:
+ case CONN_MANAGER:
+ tipc_sk_rcv(buf);
+ break;
+ case NAME_DISTRIBUTOR:
+ tipc_named_rcv(buf);
+ break;
+ case MSG_BUNDLER:
+ tipc_link_bundle_rcv(buf);
+ break;
+ default:
+ res = -EINVAL;
+ }
+ return res;
}
/**
@@ -1747,12 +1442,12 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
return;
/* Abort non-RESET send if communication with node is prohibited */
- if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
+ if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
return;
/* Create protocol message with "out-of-sequence" sequence number */
msg_set_type(msg, msg_typ);
- msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
+ msg_set_net_plane(msg, l_ptr->net_plane);
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
@@ -1818,7 +1513,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
buf->priority = TC_PRIO_CONTROL;
- tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
kfree_skb(buf);
}
@@ -1840,12 +1535,9 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
if (l_ptr->exp_msg_count)
goto exit;
- /* record unnumbered packet arrival (force mismatch on next timeout) */
- l_ptr->checkpoint--;
-
- if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
+ if (l_ptr->net_plane != msg_net_plane(msg))
if (tipc_own_addr > msg_prevnode(msg))
- l_ptr->b_ptr->net_plane = msg_net_plane(msg);
+ l_ptr->net_plane = msg_net_plane(msg);
switch (msg_type(msg)) {
@@ -1862,7 +1554,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
* peer has lost contact -- don't allow peer's links
* to reactivate before we recognize loss & clean up
*/
- l_ptr->owner->block_setup = WAIT_NODE_DOWN;
+ l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
}
link_state_event(l_ptr, RESET_MSG);
@@ -1918,6 +1610,10 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
tipc_link_reset(l_ptr); /* Enforce change to take effect */
break;
}
+
+ /* Record reception; force mismatch at next timeout: */
+ l_ptr->checkpoint--;
+
link_state_event(l_ptr, TRAFFIC_MSG_EVT);
l_ptr->stats.recv_states++;
if (link_reset_unknown(l_ptr))
@@ -2177,9 +1873,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
}
if (msg_user(msg) == MSG_FRAGMENTER) {
l_ptr->stats.recv_fragments++;
- tipc_link_frag_rcv(&l_ptr->reasm_head,
- &l_ptr->reasm_tail,
- &buf);
+ tipc_buf_append(&l_ptr->reasm_buf, &buf);
}
}
exit:
@@ -2232,6 +1926,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
u32 msgcount = msg_msgcnt(buf_msg(buf));
u32 pos = INT_H_SIZE;
struct sk_buff *obuf;
+ struct tipc_msg *omsg;
while (msgcount--) {
obuf = buf_extract(buf, pos);
@@ -2239,129 +1934,18 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
pr_warn("Link unable to unbundle message(s)\n");
break;
}
- pos += align(msg_size(buf_msg(obuf)));
- tipc_net_route_msg(obuf);
- }
- kfree_skb(buf);
-}
-
-/*
- * Fragmentation/defragmentation:
- */
-
-/*
- * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
- * The buffer is complete, inclusive total message length.
- * Returns user data length.
- */
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
-{
- struct sk_buff *buf_chain = NULL;
- struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
- struct tipc_msg *inmsg = buf_msg(buf);
- struct tipc_msg fragm_hdr;
- u32 insize = msg_size(inmsg);
- u32 dsz = msg_data_sz(inmsg);
- unchar *crs = buf->data;
- u32 rest = insize;
- u32 pack_sz = l_ptr->max_pkt;
- u32 fragm_sz = pack_sz - INT_H_SIZE;
- u32 fragm_no = 0;
- u32 destaddr;
-
- if (msg_short(inmsg))
- destaddr = l_ptr->addr;
- else
- destaddr = msg_destnode(inmsg);
-
- /* Prepare reusable fragment header: */
- tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
- INT_H_SIZE, destaddr);
-
- /* Chop up message: */
- while (rest > 0) {
- struct sk_buff *fragm;
-
- if (rest <= fragm_sz) {
- fragm_sz = rest;
- msg_set_type(&fragm_hdr, LAST_FRAGMENT);
- }
- fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
- if (fragm == NULL) {
- kfree_skb(buf);
- kfree_skb_list(buf_chain);
- return -ENOMEM;
+ omsg = buf_msg(obuf);
+ pos += align(msg_size(omsg));
+ if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
+ tipc_sk_rcv(obuf);
+ } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
+ tipc_named_rcv(obuf);
+ } else {
+ pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
+ kfree_skb(obuf);
}
- msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
- fragm_no++;
- msg_set_fragm_no(&fragm_hdr, fragm_no);
- skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
- skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
- fragm_sz);
- buf_chain_tail->next = fragm;
- buf_chain_tail = fragm;
-
- rest -= fragm_sz;
- crs += fragm_sz;
- msg_set_type(&fragm_hdr, FRAGMENT);
}
kfree_skb(buf);
-
- /* Append chain of fragments to send queue & send them */
- l_ptr->long_msg_seq_no++;
- link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
- l_ptr->stats.sent_fragments += fragm_no;
- l_ptr->stats.sent_fragmented++;
- tipc_link_push_queue(l_ptr);
-
- return dsz;
-}
-
-/* tipc_link_frag_rcv(): Called with node lock on. Returns
- * the reassembled buffer if message is complete.
- */
-int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail,
- struct sk_buff **fbuf)
-{
- struct sk_buff *frag = *fbuf;
- struct tipc_msg *msg = buf_msg(frag);
- u32 fragid = msg_type(msg);
- bool headstolen;
- int delta;
-
- skb_pull(frag, msg_hdr_sz(msg));
- if (fragid == FIRST_FRAGMENT) {
- if (*head || skb_unclone(frag, GFP_ATOMIC))
- goto out_free;
- *head = frag;
- skb_frag_list_init(*head);
- *fbuf = NULL;
- return 0;
- } else if (*head &&
- skb_try_coalesce(*head, frag, &headstolen, &delta)) {
- kfree_skb_partial(frag, headstolen);
- } else {
- if (!*head)
- goto out_free;
- if (!skb_has_frag_list(*head))
- skb_shinfo(*head)->frag_list = frag;
- else
- (*tail)->next = frag;
- *tail = frag;
- (*head)->truesize += frag->truesize;
- }
- if (fragid == LAST_FRAGMENT) {
- *fbuf = *head;
- *tail = *head = NULL;
- return LINK_REASM_COMPLETE;
- }
- *fbuf = NULL;
- return 0;
-out_free:
- pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
- kfree_skb(*fbuf);
- *fbuf = NULL;
- return LINK_REASM_ERROR;
}
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
@@ -2397,8 +1981,6 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
/* tipc_link_find_owner - locate owner node of link by link's name
* @name: pointer to link name string
* @bearer_id: pointer to index in 'node->links' array where the link was found.
- * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
- * this also prevents link deletion.
*
* Returns pointer to node owning the link, or 0 if no matching link is found.
*/
@@ -2460,7 +2042,7 @@ static int link_value_is_valid(u16 cmd, u32 new_value)
* @new_value: new value of link, bearer, or media setting
* @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
*
- * Caller must hold 'tipc_net_lock' to ensure link/bearer/media is not deleted.
+ * Caller must hold RTNL lock to ensure link/bearer/media is not deleted.
*
* Returns 0 if value updated and negative value on error.
*/
@@ -2566,9 +2148,7 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space
" (cannot change setting on broadcast link)");
}
- read_lock_bh(&tipc_net_lock);
res = link_cmd_set_value(args->name, new_value, cmd);
- read_unlock_bh(&tipc_net_lock);
if (res)
return tipc_cfg_reply_error_string("cannot change link setting");
@@ -2602,22 +2182,18 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
return tipc_cfg_reply_error_string("link not found");
return tipc_cfg_reply_none();
}
- read_lock_bh(&tipc_net_lock);
node = tipc_link_find_owner(link_name, &bearer_id);
- if (!node) {
- read_unlock_bh(&tipc_net_lock);
+ if (!node)
return tipc_cfg_reply_error_string("link not found");
- }
+
tipc_node_lock(node);
l_ptr = node->links[bearer_id];
if (!l_ptr) {
tipc_node_unlock(node);
- read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_error_string("link not found");
}
link_reset_statistics(l_ptr);
tipc_node_unlock(node);
- read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_none();
}
@@ -2650,18 +2226,15 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
if (!strcmp(name, tipc_bclink_name))
return tipc_bclink_stats(buf, buf_size);
- read_lock_bh(&tipc_net_lock);
node = tipc_link_find_owner(name, &bearer_id);
- if (!node) {
- read_unlock_bh(&tipc_net_lock);
+ if (!node)
return 0;
- }
+
tipc_node_lock(node);
l = node->links[bearer_id];
if (!l) {
tipc_node_unlock(node);
- read_unlock_bh(&tipc_net_lock);
return 0;
}
@@ -2727,7 +2300,6 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
(s->accu_queue_sz / s->queue_sz_counts) : 0);
tipc_node_unlock(node);
- read_unlock_bh(&tipc_net_lock);
return ret;
}
@@ -2778,7 +2350,6 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
if (dest == tipc_own_addr)
return MAX_MSG_SIZE;
- read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(dest);
if (n_ptr) {
tipc_node_lock(n_ptr);
@@ -2787,13 +2358,18 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
res = l_ptr->max_pkt;
tipc_node_unlock(n_ptr);
}
- read_unlock_bh(&tipc_net_lock);
return res;
}
static void link_print(struct tipc_link *l_ptr, const char *str)
{
- pr_info("%s Link %x<%s>:", str, l_ptr->addr, l_ptr->b_ptr->name);
+ struct tipc_bearer *b_ptr;
+
+ rcu_read_lock();
+ b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
+ if (b_ptr)
+ pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
+ rcu_read_unlock();
if (link_working_unknown(l_ptr))
pr_cont(":WU\n");
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 8c0b49b5b2e..782983ccd32 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -40,11 +40,6 @@
#include "msg.h"
#include "node.h"
-/* Link reassembly status codes
- */
-#define LINK_REASM_ERROR -1
-#define LINK_REASM_COMPLETE 1
-
/* Out-of-range value for link sequence numbers
*/
#define INVALID_LINK_SEQ 0x10000
@@ -107,7 +102,7 @@ struct tipc_stats {
* @checkpoint: reference point for triggering link continuity checking
* @peer_session: link session # being used by peer end of link
* @peer_bearer_id: bearer id used by link's peer endpoint
- * @b_ptr: pointer to bearer used by link
+ * @bearer_id: local bearer id used by link
* @tolerance: minimum link continuity loss needed to reset link [in ms]
* @continuity_interval: link continuity testing interval [in ms]
* @abort_limit: # of unacknowledged continuity probes needed to reset link
@@ -116,6 +111,7 @@ struct tipc_stats {
* @proto_msg: template for control messages generated by link
* @pmsg: convenience pointer to "proto_msg" field
* @priority: current link priority
+ * @net_plane: current link network plane ('A' through 'H')
* @queue_limit: outbound message queue congestion thresholds (indexed by user)
* @exp_msg_count: # of tunnelled messages expected during link changeover
* @reset_checkpoint: seq # of last acknowledged message at time of link reset
@@ -139,8 +135,7 @@ struct tipc_stats {
* @next_out: ptr to first unsent outbound message in queue
* @waiting_ports: linked list of ports waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages
- * @reasm_head: list head of partially reassembled inbound message fragments
- * @reasm_tail: last fragment received
+ * @reasm_buf: head of partially reassembled inbound message fragments
* @stats: collects statistics regarding link activity
*/
struct tipc_link {
@@ -155,7 +150,7 @@ struct tipc_link {
u32 checkpoint;
u32 peer_session;
u32 peer_bearer_id;
- struct tipc_bearer *b_ptr;
+ u32 bearer_id;
u32 tolerance;
u32 continuity_interval;
u32 abort_limit;
@@ -167,6 +162,7 @@ struct tipc_link {
} proto_msg;
struct tipc_msg *pmsg;
u32 priority;
+ char net_plane;
u32 queue_limit[15]; /* queue_limit[0]==window limit */
/* Changeover */
@@ -202,8 +198,7 @@ struct tipc_link {
/* Fragmentation/reassembly */
u32 long_msg_seq_no;
- struct sk_buff *reasm_head;
- struct sk_buff *reasm_tail;
+ struct sk_buff *reasm_buf;
/* Statistics */
struct tipc_stats stats;
@@ -228,20 +223,13 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area,
int req_tlv_space);
struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
int req_tlv_space);
+void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
-int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
-int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
- struct iovec const *msg_sect,
- unsigned int len, u32 destnode);
void tipc_link_bundle_rcv(struct sk_buff *buf);
-int tipc_link_frag_rcv(struct sk_buff **reasm_head,
- struct sk_buff **reasm_tail,
- struct sk_buff **fbuf);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
void tipc_link_push_queue(struct tipc_link *l_ptr);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index e525f8ce1de..9680be6d388 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -1,7 +1,7 @@
/*
* net/tipc/msg.c: TIPC message header routines
*
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2006, 2014, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -36,21 +36,16 @@
#include "core.h"
#include "msg.h"
+#include "addr.h"
+#include "name_table.h"
-u32 tipc_msg_tot_importance(struct tipc_msg *m)
+#define MAX_FORWARD_SIZE 1024
+
+static unsigned int align(unsigned int i)
{
- if (likely(msg_isdata(m))) {
- if (likely(msg_orignode(m) == tipc_own_addr))
- return msg_importance(m);
- return msg_importance(m) + 4;
- }
- if ((msg_user(m) == MSG_FRAGMENTER) &&
- (msg_type(m) == FIRST_FRAGMENT))
- return msg_importance(msg_get_wrapped(m));
- return msg_importance(m);
+ return (i + 3) & ~3u;
}
-
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode)
{
@@ -65,37 +60,373 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
msg_set_destnode(m, destnode);
}
-/**
- * tipc_msg_build - create message using specified header and data
- *
- * Note: Caller must not hold any locks in case copy_from_user() is interrupted!
- *
- * Returns message data size or errno
+/* tipc_buf_append(): Append a buffer to the fragment list of another buffer
+ * @*headbuf: in: NULL for first frag, otherwise value returned from prev call
+ * out: set when successful non-complete reassembly, otherwise NULL
+ * @*buf: in: the buffer to append. Always defined
+ * out: head buf after sucessful complete reassembly, otherwise NULL
+ * Returns 1 when reassembly complete, otherwise 0
*/
-int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
- unsigned int len, int max_size, struct sk_buff **buf)
+int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
{
- int dsz, sz, hsz;
- unsigned char *to;
-
- dsz = len;
- hsz = msg_hdr_sz(hdr);
- sz = hsz + dsz;
- msg_set_size(hdr, sz);
- if (unlikely(sz > max_size)) {
+ struct sk_buff *head = *headbuf;
+ struct sk_buff *frag = *buf;
+ struct sk_buff *tail;
+ struct tipc_msg *msg;
+ u32 fragid;
+ int delta;
+ bool headstolen;
+
+ if (!frag)
+ goto err;
+
+ msg = buf_msg(frag);
+ fragid = msg_type(msg);
+ frag->next = NULL;
+ skb_pull(frag, msg_hdr_sz(msg));
+
+ if (fragid == FIRST_FRAGMENT) {
+ if (unlikely(head))
+ goto err;
+ if (unlikely(skb_unclone(frag, GFP_ATOMIC)))
+ goto err;
+ head = *headbuf = frag;
+ skb_frag_list_init(head);
+ TIPC_SKB_CB(head)->tail = NULL;
*buf = NULL;
- return dsz;
+ return 0;
}
- *buf = tipc_buf_acquire(sz);
- if (!(*buf))
- return -ENOMEM;
- skb_copy_to_linear_data(*buf, hdr, hsz);
- to = (*buf)->data + hsz;
- if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) {
- kfree_skb(*buf);
- *buf = NULL;
- return -EFAULT;
+ if (!head)
+ goto err;
+
+ if (skb_try_coalesce(head, frag, &headstolen, &delta)) {
+ kfree_skb_partial(frag, headstolen);
+ } else {
+ tail = TIPC_SKB_CB(head)->tail;
+ if (!skb_has_frag_list(head))
+ skb_shinfo(head)->frag_list = frag;
+ else
+ tail->next = frag;
+ head->truesize += frag->truesize;
+ head->data_len += frag->len;
+ head->len += frag->len;
+ TIPC_SKB_CB(head)->tail = frag;
+ }
+
+ if (fragid == LAST_FRAGMENT) {
+ *buf = head;
+ TIPC_SKB_CB(head)->tail = NULL;
+ *headbuf = NULL;
+ return 1;
+ }
+ *buf = NULL;
+ return 0;
+
+err:
+ pr_warn_ratelimited("Unable to build fragment list\n");
+ kfree_skb(*buf);
+ kfree_skb(*headbuf);
+ *buf = *headbuf = NULL;
+ return 0;
+}
+
+
+/**
+ * tipc_msg_build - create buffer chain containing specified header and data
+ * @mhdr: Message header, to be prepended to data
+ * @iov: User data
+ * @offset: Posision in iov to start copying from
+ * @dsz: Total length of user data
+ * @pktmax: Max packet size that can be used
+ * @chain: Buffer or chain of buffers to be returned to caller
+ * Returns message data size or errno: -ENOMEM, -EFAULT
+ */
+int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
+ int offset, int dsz, int pktmax , struct sk_buff **chain)
+{
+ int mhsz = msg_hdr_sz(mhdr);
+ int msz = mhsz + dsz;
+ int pktno = 1;
+ int pktsz;
+ int pktrem = pktmax;
+ int drem = dsz;
+ struct tipc_msg pkthdr;
+ struct sk_buff *buf, *prev;
+ char *pktpos;
+ int rc;
+
+ msg_set_size(mhdr, msz);
+
+ /* No fragmentation needed? */
+ if (likely(msz <= pktmax)) {
+ buf = tipc_buf_acquire(msz);
+ *chain = buf;
+ if (unlikely(!buf))
+ return -ENOMEM;
+ skb_copy_to_linear_data(buf, mhdr, mhsz);
+ pktpos = buf->data + mhsz;
+ if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
+ return dsz;
+ rc = -EFAULT;
+ goto error;
}
+
+ /* Prepare reusable fragment header */
+ tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
+ INT_H_SIZE, msg_destnode(mhdr));
+ msg_set_size(&pkthdr, pktmax);
+ msg_set_fragm_no(&pkthdr, pktno);
+
+ /* Prepare first fragment */
+ *chain = buf = tipc_buf_acquire(pktmax);
+ if (!buf)
+ return -ENOMEM;
+ pktpos = buf->data;
+ skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
+ pktpos += INT_H_SIZE;
+ pktrem -= INT_H_SIZE;
+ skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz);
+ pktpos += mhsz;
+ pktrem -= mhsz;
+
+ do {
+ if (drem < pktrem)
+ pktrem = drem;
+
+ if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) {
+ rc = -EFAULT;
+ goto error;
+ }
+ drem -= pktrem;
+ offset += pktrem;
+
+ if (!drem)
+ break;
+
+ /* Prepare new fragment: */
+ if (drem < (pktmax - INT_H_SIZE))
+ pktsz = drem + INT_H_SIZE;
+ else
+ pktsz = pktmax;
+ prev = buf;
+ buf = tipc_buf_acquire(pktsz);
+ if (!buf) {
+ rc = -ENOMEM;
+ goto error;
+ }
+ prev->next = buf;
+ msg_set_type(&pkthdr, FRAGMENT);
+ msg_set_size(&pkthdr, pktsz);
+ msg_set_fragm_no(&pkthdr, ++pktno);
+ skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
+ pktpos = buf->data + INT_H_SIZE;
+ pktrem = pktsz - INT_H_SIZE;
+
+ } while (1);
+
+ msg_set_type(buf_msg(buf), LAST_FRAGMENT);
return dsz;
+error:
+ kfree_skb_list(*chain);
+ *chain = NULL;
+ return rc;
+}
+
+/**
+ * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
+ * @bbuf: the existing buffer ("bundle")
+ * @buf: buffer to be appended
+ * @mtu: max allowable size for the bundle buffer
+ * Consumes buffer if successful
+ * Returns true if bundling could be performed, otherwise false
+ */
+bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
+{
+ struct tipc_msg *bmsg = buf_msg(bbuf);
+ struct tipc_msg *msg = buf_msg(buf);
+ unsigned int bsz = msg_size(bmsg);
+ unsigned int msz = msg_size(msg);
+ u32 start = align(bsz);
+ u32 max = mtu - INT_H_SIZE;
+ u32 pad = start - bsz;
+
+ if (likely(msg_user(msg) == MSG_FRAGMENTER))
+ return false;
+ if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
+ return false;
+ if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
+ return false;
+ if (likely(msg_user(bmsg) != MSG_BUNDLER))
+ return false;
+ if (likely(msg_type(bmsg) != BUNDLE_OPEN))
+ return false;
+ if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
+ return false;
+ if (unlikely(max < (start + msz)))
+ return false;
+
+ skb_put(bbuf, pad + msz);
+ skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
+ msg_set_size(bmsg, start + msz);
+ msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
+ bbuf->next = buf->next;
+ kfree_skb(buf);
+ return true;
+}
+
+/**
+ * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
+ * @buf: buffer to be appended and replaced
+ * @mtu: max allowable size for the bundle buffer, inclusive header
+ * @dnode: destination node for message. (Not always present in header)
+ * Replaces buffer if successful
+ * Returns true if sucess, otherwise false
+ */
+bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
+{
+ struct sk_buff *bbuf;
+ struct tipc_msg *bmsg;
+ struct tipc_msg *msg = buf_msg(*buf);
+ u32 msz = msg_size(msg);
+ u32 max = mtu - INT_H_SIZE;
+
+ if (msg_user(msg) == MSG_FRAGMENTER)
+ return false;
+ if (msg_user(msg) == CHANGEOVER_PROTOCOL)
+ return false;
+ if (msg_user(msg) == BCAST_PROTOCOL)
+ return false;
+ if (msz > (max / 2))
+ return false;
+
+ bbuf = tipc_buf_acquire(max);
+ if (!bbuf)
+ return false;
+
+ skb_trim(bbuf, INT_H_SIZE);
+ bmsg = buf_msg(bbuf);
+ tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode);
+ msg_set_seqno(bmsg, msg_seqno(msg));
+ msg_set_ack(bmsg, msg_ack(msg));
+ msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
+ bbuf->next = (*buf)->next;
+ tipc_msg_bundle(bbuf, *buf, mtu);
+ *buf = bbuf;
+ return true;
+}
+
+/**
+ * tipc_msg_reverse(): swap source and destination addresses and add error code
+ * @buf: buffer containing message to be reversed
+ * @dnode: return value: node where to send message after reversal
+ * @err: error code to be set in message
+ * Consumes buffer if failure
+ * Returns true if success, otherwise false
+ */
+bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ uint imp = msg_importance(msg);
+ struct tipc_msg ohdr;
+ uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
+
+ if (skb_linearize(buf))
+ goto exit;
+ if (msg_dest_droppable(msg))
+ goto exit;
+ if (msg_errcode(msg))
+ goto exit;
+
+ memcpy(&ohdr, msg, msg_hdr_sz(msg));
+ imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
+ if (msg_isdata(msg))
+ msg_set_importance(msg, imp);
+ msg_set_errcode(msg, err);
+ msg_set_origport(msg, msg_destport(&ohdr));
+ msg_set_destport(msg, msg_origport(&ohdr));
+ msg_set_prevnode(msg, tipc_own_addr);
+ if (!msg_short(msg)) {
+ msg_set_orignode(msg, msg_destnode(&ohdr));
+ msg_set_destnode(msg, msg_orignode(&ohdr));
+ }
+ msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
+ skb_trim(buf, msg_size(msg));
+ skb_orphan(buf);
+ *dnode = msg_orignode(&ohdr);
+ return true;
+exit:
+ kfree_skb(buf);
+ return false;
+}
+
+/**
+ * tipc_msg_eval: determine fate of message that found no destination
+ * @buf: the buffer containing the message.
+ * @dnode: return value: next-hop node, if message to be forwarded
+ * @err: error code to use, if message to be rejected
+ *
+ * Does not consume buffer
+ * Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error
+ * code if message to be rejected
+ */
+int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ u32 dport;
+
+ if (msg_type(msg) != TIPC_NAMED_MSG)
+ return -TIPC_ERR_NO_PORT;
+ if (skb_linearize(buf))
+ return -TIPC_ERR_NO_NAME;
+ if (msg_data_sz(msg) > MAX_FORWARD_SIZE)
+ return -TIPC_ERR_NO_NAME;
+ if (msg_reroute_cnt(msg) > 0)
+ return -TIPC_ERR_NO_NAME;
+
+ *dnode = addr_domain(msg_lookup_scope(msg));
+ dport = tipc_nametbl_translate(msg_nametype(msg),
+ msg_nameinst(msg),
+ dnode);
+ if (!dport)
+ return -TIPC_ERR_NO_NAME;
+ msg_incr_reroute_cnt(msg);
+ msg_set_destnode(msg, *dnode);
+ msg_set_destport(msg, dport);
+ return TIPC_OK;
+}
+
+/* tipc_msg_reassemble() - clone a buffer chain of fragments and
+ * reassemble the clones into one message
+ */
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
+{
+ struct sk_buff *buf = chain;
+ struct sk_buff *frag = buf;
+ struct sk_buff *head = NULL;
+ int hdr_sz;
+
+ /* Copy header if single buffer */
+ if (!buf->next) {
+ hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
+ return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
+ }
+
+ /* Clone all fragments and reassemble */
+ while (buf) {
+ frag = skb_clone(buf, GFP_ATOMIC);
+ if (!frag)
+ goto error;
+ frag->next = NULL;
+ if (tipc_buf_append(&head, &frag))
+ break;
+ if (!head)
+ goto error;
+ buf = buf->next;
+ }
+ return frag;
+error:
+ pr_warn("Failed do clone local mcast rcv buffer\n");
+ kfree_skb(head);
+ return NULL;
}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 76d1269b944..462fa194a6a 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
/*
* net/tipc/msg.h: Include file for TIPC message header routines
*
- * Copyright (c) 2000-2007, Ericsson AB
+ * Copyright (c) 2000-2007, 2014, Ericsson AB
* Copyright (c) 2005-2008, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -463,6 +463,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
#define FRAGMENT 1
#define LAST_FRAGMENT 2
+/* Bundling protocol message types
+ */
+#define BUNDLE_OPEN 0
+#define BUNDLE_CLOSED 1
+
/*
* Link management protocol message types
*/
@@ -706,9 +711,36 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
-u32 tipc_msg_tot_importance(struct tipc_msg *m);
+static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
+{
+ if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
+ return msg_importance(msg_get_wrapped(m));
+ return msg_importance(m);
+}
+
+static inline u32 msg_tot_origport(struct tipc_msg *m)
+{
+ if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
+ return msg_origport(msg_get_wrapped(m));
+ return msg_origport(m);
+}
+
+bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err);
+
+int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
+
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode);
-int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
- unsigned int len, int max_size, struct sk_buff **buf);
+
+int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
+
+bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
+
+bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
+
+int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
+ int offset, int dsz, int mtu , struct sk_buff **chain);
+
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
+
#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index aff8041dc15..dcc15bcd569 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -38,34 +38,6 @@
#include "link.h"
#include "name_distr.h"
-#define ITEM_SIZE sizeof(struct distr_item)
-
-/**
- * struct distr_item - publication info distributed to other nodes
- * @type: name sequence type
- * @lower: name sequence lower bound
- * @upper: name sequence upper bound
- * @ref: publishing port reference
- * @key: publication key
- *
- * ===> All fields are stored in network byte order. <===
- *
- * First 3 fields identify (name or) name sequence being published.
- * Reference field uniquely identifies port that published name sequence.
- * Key field uniquely identifies publication, in the event a port has
- * multiple publications of the same name sequence.
- *
- * Note: There is no field that identifies the publishing node because it is
- * the same for all items contained within a publication message.
- */
-struct distr_item {
- __be32 type;
- __be32 lower;
- __be32 upper;
- __be32 ref;
- __be32 key;
-};
-
/**
* struct publ_list - list of publications made by this node
* @list: circular list of publications
@@ -127,26 +99,24 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
return buf;
}
-static void named_cluster_distribute(struct sk_buff *buf)
+void named_cluster_distribute(struct sk_buff *buf)
{
- struct sk_buff *buf_copy;
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
+ struct sk_buff *obuf;
+ struct tipc_node *node;
+ u32 dnode;
rcu_read_lock();
- list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
- spin_lock_bh(&n_ptr->lock);
- l_ptr = n_ptr->active_links[n_ptr->addr & 1];
- if (l_ptr) {
- buf_copy = skb_copy(buf, GFP_ATOMIC);
- if (!buf_copy) {
- spin_unlock_bh(&n_ptr->lock);
- break;
- }
- msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
- __tipc_link_xmit(l_ptr, buf_copy);
- }
- spin_unlock_bh(&n_ptr->lock);
+ list_for_each_entry_rcu(node, &tipc_node_list, list) {
+ dnode = node->addr;
+ if (in_own_node(dnode))
+ continue;
+ if (!tipc_node_active_links(node))
+ continue;
+ obuf = skb_copy(buf, GFP_ATOMIC);
+ if (!obuf)
+ break;
+ msg_set_destnode(buf_msg(obuf), dnode);
+ tipc_link_xmit(obuf, dnode, dnode);
}
rcu_read_unlock();
@@ -156,7 +126,7 @@ static void named_cluster_distribute(struct sk_buff *buf)
/**
* tipc_named_publish - tell other nodes about a new publication by this node
*/
-void tipc_named_publish(struct publication *publ)
+struct sk_buff *tipc_named_publish(struct publication *publ)
{
struct sk_buff *buf;
struct distr_item *item;
@@ -165,23 +135,23 @@ void tipc_named_publish(struct publication *publ)
publ_lists[publ->scope]->size++;
if (publ->scope == TIPC_NODE_SCOPE)
- return;
+ return NULL;
buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0);
if (!buf) {
pr_warn("Publication distribution failure\n");
- return;
+ return NULL;
}
item = (struct distr_item *)msg_data(buf_msg(buf));
publ_to_item(item, publ);
- named_cluster_distribute(buf);
+ return buf;
}
/**
* tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
*/
-void tipc_named_withdraw(struct publication *publ)
+struct sk_buff *tipc_named_withdraw(struct publication *publ)
{
struct sk_buff *buf;
struct distr_item *item;
@@ -190,47 +160,57 @@ void tipc_named_withdraw(struct publication *publ)
publ_lists[publ->scope]->size--;
if (publ->scope == TIPC_NODE_SCOPE)
- return;
+ return NULL;
buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0);
if (!buf) {
pr_warn("Withdrawal distribution failure\n");
- return;
+ return NULL;
}
item = (struct distr_item *)msg_data(buf_msg(buf));
publ_to_item(item, publ);
- named_cluster_distribute(buf);
+ return buf;
}
-/*
+/**
* named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
*/
-static void named_distribute(struct list_head *message_list, u32 node,
- struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+ struct publ_list *pls)
{
struct publication *publ;
struct sk_buff *buf = NULL;
struct distr_item *item = NULL;
- u32 left = 0;
- u32 rest = pls->size * ITEM_SIZE;
+ uint dsz = pls->size * ITEM_SIZE;
+ uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+ uint rem = dsz;
+ uint msg_rem = 0;
list_for_each_entry(publ, &pls->list, local_list) {
+ /* Prepare next buffer: */
if (!buf) {
- left = (rest <= max_item_buf) ? rest : max_item_buf;
- rest -= left;
- buf = named_prepare_buf(PUBLICATION, left, node);
+ msg_rem = min_t(uint, rem, msg_dsz);
+ rem -= msg_rem;
+ buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
if (!buf) {
pr_warn("Bulk publication failure\n");
return;
}
item = (struct distr_item *)msg_data(buf_msg(buf));
}
+
+ /* Pack publication into message: */
publ_to_item(item, publ);
item++;
- left -= ITEM_SIZE;
- if (!left) {
- list_add_tail((struct list_head *)buf, message_list);
+ msg_rem -= ITEM_SIZE;
+
+ /* Append full buffer to list: */
+ if (!msg_rem) {
+ list_add_tail((struct list_head *)buf, msg_list);
buf = NULL;
}
}
@@ -239,38 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
/**
* tipc_named_node_up - tell specified node about all publications by this node
*/
-void tipc_named_node_up(unsigned long nodearg)
+void tipc_named_node_up(u32 dnode)
{
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
- struct list_head message_list;
- u32 node = (u32)nodearg;
- u32 max_item_buf = 0;
-
- /* compute maximum amount of publication data to send per message */
- read_lock_bh(&tipc_net_lock);
- n_ptr = tipc_node_find(node);
- if (n_ptr) {
- tipc_node_lock(n_ptr);
- l_ptr = n_ptr->active_links[0];
- if (l_ptr)
- max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
- ITEM_SIZE) * ITEM_SIZE;
- tipc_node_unlock(n_ptr);
- }
- read_unlock_bh(&tipc_net_lock);
- if (!max_item_buf)
- return;
-
- /* create list of publication messages, then send them as a unit */
- INIT_LIST_HEAD(&message_list);
+ LIST_HEAD(msg_list);
+ struct sk_buff *buf_chain;
read_lock_bh(&tipc_nametbl_lock);
- named_distribute(&message_list, node, &publ_cluster, max_item_buf);
- named_distribute(&message_list, node, &publ_zone, max_item_buf);
+ named_distribute(&msg_list, dnode, &publ_cluster);
+ named_distribute(&msg_list, dnode, &publ_zone);
read_unlock_bh(&tipc_nametbl_lock);
- tipc_link_names_xmit(&message_list, node);
+ /* Convert circular list to linear list and send: */
+ buf_chain = (struct sk_buff *)msg_list.next;
+ ((struct sk_buff *)msg_list.prev)->next = NULL;
+ tipc_link_xmit(buf_chain, dnode, dnode);
}
/**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index 9b312ccfd43..8afe32b7fc9 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -39,9 +39,38 @@
#include "name_table.h"
-void tipc_named_publish(struct publication *publ);
-void tipc_named_withdraw(struct publication *publ);
-void tipc_named_node_up(unsigned long node);
+#define ITEM_SIZE sizeof(struct distr_item)
+
+/**
+ * struct distr_item - publication info distributed to other nodes
+ * @type: name sequence type
+ * @lower: name sequence lower bound
+ * @upper: name sequence upper bound
+ * @ref: publishing port reference
+ * @key: publication key
+ *
+ * ===> All fields are stored in network byte order. <===
+ *
+ * First 3 fields identify (name or) name sequence being published.
+ * Reference field uniquely identifies port that published name sequence.
+ * Key field uniquely identifies publication, in the event a port has
+ * multiple publications of the same name sequence.
+ *
+ * Note: There is no field that identifies the publishing node because it is
+ * the same for all items contained within a publication message.
+ */
+struct distr_item {
+ __be32 type;
+ __be32 lower;
+ __be32 upper;
+ __be32 ref;
+ __be32 key;
+};
+
+struct sk_buff *tipc_named_publish(struct publication *publ);
+struct sk_buff *tipc_named_withdraw(struct publication *publ);
+void named_cluster_distribute(struct sk_buff *buf);
+void tipc_named_node_up(u32 dnode);
void tipc_named_rcv(struct sk_buff *buf);
void tipc_named_reinit(void);
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 042e8e3cabc..9d7d37d9518 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -664,6 +664,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
u32 scope, u32 port_ref, u32 key)
{
struct publication *publ;
+ struct sk_buff *buf = NULL;
if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) {
pr_warn("Publication failed, local publication limit reached (%u)\n",
@@ -676,9 +677,12 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
tipc_own_addr, port_ref, key);
if (likely(publ)) {
table.local_publ_count++;
- tipc_named_publish(publ);
+ buf = tipc_named_publish(publ);
}
write_unlock_bh(&tipc_nametbl_lock);
+
+ if (buf)
+ named_cluster_distribute(buf);
return publ;
}
@@ -688,15 +692,19 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
{
struct publication *publ;
+ struct sk_buff *buf;
write_lock_bh(&tipc_nametbl_lock);
publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
if (likely(publ)) {
table.local_publ_count--;
- tipc_named_withdraw(publ);
+ buf = tipc_named_withdraw(publ);
write_unlock_bh(&tipc_nametbl_lock);
list_del_init(&publ->pport_list);
kfree(publ);
+
+ if (buf)
+ named_cluster_distribute(buf);
return 1;
}
write_unlock_bh(&tipc_nametbl_lock);
@@ -961,6 +969,7 @@ static void tipc_purge_publications(struct name_seq *seq)
list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) {
tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node,
publ->ref, publ->key);
+ kfree(publ);
}
}
@@ -982,7 +991,6 @@ void tipc_nametbl_stop(void)
hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) {
tipc_purge_publications(seq);
}
- continue;
}
kfree(table.types);
table.types = NULL;
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 4c564eb69e1..7fcc94998fe 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -1,7 +1,7 @@
/*
* net/tipc/net.c: TIPC network routing code
*
- * Copyright (c) 1995-2006, Ericsson AB
+ * Copyright (c) 1995-2006, 2014, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -39,45 +39,41 @@
#include "name_distr.h"
#include "subscr.h"
#include "port.h"
+#include "socket.h"
#include "node.h"
#include "config.h"
/*
* The TIPC locking policy is designed to ensure a very fine locking
* granularity, permitting complete parallel access to individual
- * port and node/link instances. The code consists of three major
+ * port and node/link instances. The code consists of four major
* locking domains, each protected with their own disjunct set of locks.
*
- * 1: The routing hierarchy.
- * Comprises the structures 'zone', 'cluster', 'node', 'link'
- * and 'bearer'. The whole hierarchy is protected by a big
- * read/write lock, tipc_net_lock, to enssure that nothing is added
- * or removed while code is accessing any of these structures.
- * This layer must not be called from the two others while they
- * hold any of their own locks.
- * Neither must it itself do any upcalls to the other two before
- * it has released tipc_net_lock and other protective locks.
+ * 1: The bearer level.
+ * RTNL lock is used to serialize the process of configuring bearer
+ * on update side, and RCU lock is applied on read side to make
+ * bearer instance valid on both paths of message transmission and
+ * reception.
*
- * Within the tipc_net_lock domain there are two sub-domains;'node' and
- * 'bearer', where local write operations are permitted,
- * provided that those are protected by individual spin_locks
- * per instance. Code holding tipc_net_lock(read) and a node spin_lock
- * is permitted to poke around in both the node itself and its
- * subordinate links. I.e, it can update link counters and queues,
- * change link state, send protocol messages, and alter the
- * "active_links" array in the node; but it can _not_ remove a link
- * or a node from the overall structure.
- * Correspondingly, individual bearers may change status within a
- * tipc_net_lock(read), protected by an individual spin_lock ber bearer
- * instance, but it needs tipc_net_lock(write) to remove/add any bearers.
+ * 2: The node and link level.
+ * All node instances are saved into two tipc_node_list and node_htable
+ * lists. The two lists are protected by node_list_lock on write side,
+ * and they are guarded with RCU lock on read side. Especially node
+ * instance is destroyed only when TIPC module is removed, and we can
+ * confirm that there has no any user who is accessing the node at the
+ * moment. Therefore, Except for iterating the two lists within RCU
+ * protection, it's no needed to hold RCU that we access node instance
+ * in other places.
*
+ * In addition, all members in node structure including link instances
+ * are protected by node spin lock.
*
- * 2: The transport level of the protocol.
- * This consists of the structures port, (and its user level
- * representations, such as user_port and tipc_sock), reference and
- * tipc_user (port.c, reg.c, socket.c).
+ * 3: The transport level of the protocol.
+ * This consists of the structures port, (and its user level
+ * representations, such as user_port and tipc_sock), reference and
+ * tipc_user (port.c, reg.c, socket.c).
*
- * This layer has four different locks:
+ * This layer has four different locks:
* - The tipc_port spin_lock. This is protecting each port instance
* from parallel data access and removal. Since we can not place
* this lock in the port itself, it has been placed in the
@@ -96,7 +92,7 @@
* There are two such lists; 'port_list', which is used for management,
* and 'wait_list', which is used to queue ports during congestion.
*
- * 3: The name table (name_table.c, name_distr.c, subscription.c)
+ * 4: The name table (name_table.c, name_distr.c, subscription.c)
* - There is one big read/write-lock (tipc_nametbl_lock) protecting the
* overall name table structure. Nothing must be added/removed to
* this structure without holding write access to it.
@@ -108,85 +104,25 @@
* - A local spin_lock protecting the queue of subscriber events.
*/
-DEFINE_RWLOCK(tipc_net_lock);
-
-static void net_route_named_msg(struct sk_buff *buf)
-{
- struct tipc_msg *msg = buf_msg(buf);
- u32 dnode;
- u32 dport;
-
- if (!msg_named(msg)) {
- kfree_skb(buf);
- return;
- }
-
- dnode = addr_domain(msg_lookup_scope(msg));
- dport = tipc_nametbl_translate(msg_nametype(msg), msg_nameinst(msg), &dnode);
- if (dport) {
- msg_set_destnode(msg, dnode);
- msg_set_destport(msg, dport);
- tipc_net_route_msg(buf);
- return;
- }
- tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
-}
-
-void tipc_net_route_msg(struct sk_buff *buf)
-{
- struct tipc_msg *msg;
- u32 dnode;
-
- if (!buf)
- return;
- msg = buf_msg(buf);
-
- /* Handle message for this node */
- dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg);
- if (tipc_in_scope(dnode, tipc_own_addr)) {
- if (msg_isdata(msg)) {
- if (msg_mcast(msg))
- tipc_port_mcast_rcv(buf, NULL);
- else if (msg_destport(msg))
- tipc_port_rcv(buf);
- else
- net_route_named_msg(buf);
- return;
- }
- switch (msg_user(msg)) {
- case NAME_DISTRIBUTOR:
- tipc_named_rcv(buf);
- break;
- case CONN_MANAGER:
- tipc_port_proto_rcv(buf);
- break;
- default:
- kfree_skb(buf);
- }
- return;
- }
-
- /* Handle message for another node */
- skb_trim(buf, msg_size(msg));
- tipc_link_xmit(buf, dnode, msg_link_selector(msg));
-}
-
-void tipc_net_start(u32 addr)
+int tipc_net_start(u32 addr)
{
char addr_string[16];
+ int res;
- write_lock_bh(&tipc_net_lock);
tipc_own_addr = addr;
tipc_named_reinit();
tipc_port_reinit();
- tipc_bclink_init();
- write_unlock_bh(&tipc_net_lock);
+ res = tipc_bclink_init();
+ if (res)
+ return res;
tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr,
TIPC_ZONE_SCOPE, 0, tipc_own_addr);
+
pr_info("Started in network mode\n");
pr_info("Own node address %s, network identity %u\n",
tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id);
+ return 0;
}
void tipc_net_stop(void)
@@ -195,11 +131,11 @@ void tipc_net_stop(void)
return;
tipc_nametbl_withdraw(TIPC_CFG_SRV, tipc_own_addr, 0, tipc_own_addr);
- write_lock_bh(&tipc_net_lock);
+ rtnl_lock();
tipc_bearer_stop();
tipc_bclink_stop();
tipc_node_stop();
- write_unlock_bh(&tipc_net_lock);
+ rtnl_unlock();
pr_info("Left network mode\n");
}
diff --git a/net/tipc/net.h b/net/tipc/net.h
index 079daadb3f7..59ef3388be2 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -37,11 +37,7 @@
#ifndef _TIPC_NET_H
#define _TIPC_NET_H
-extern rwlock_t tipc_net_lock;
-
-void tipc_net_route_msg(struct sk_buff *buf);
-
-void tipc_net_start(u32 addr);
+int tipc_net_start(u32 addr);
void tipc_net_stop(void);
#endif
diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c
index 3aaf73de9e2..ad844d36534 100644
--- a/net/tipc/netlink.c
+++ b/net/tipc/netlink.c
@@ -47,7 +47,7 @@ static int handle_cmd(struct sk_buff *skb, struct genl_info *info)
int hdr_space = nlmsg_total_size(GENL_HDRLEN + TIPC_GENL_HDRLEN);
u16 cmd;
- if ((req_userhdr->cmd & 0xC000) && (!capable(CAP_NET_ADMIN)))
+ if ((req_userhdr->cmd & 0xC000) && (!netlink_capable(skb, CAP_NET_ADMIN)))
cmd = TIPC_CMD_NOT_NET_ADMIN;
else
cmd = req_userhdr->cmd;
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1d3a4999a70..f7069299943 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
/*
* net/tipc/node.c: TIPC node management routines
*
- * Copyright (c) 2000-2006, 2012 Ericsson AB
+ * Copyright (c) 2000-2006, 2012-2014, Ericsson AB
* Copyright (c) 2005-2006, 2010-2014, Wind River Systems
* All rights reserved.
*
@@ -108,7 +108,7 @@ struct tipc_node *tipc_node_create(u32 addr)
break;
}
list_add_tail_rcu(&n_ptr->list, &temp_node->list);
- n_ptr->block_setup = WAIT_PEER_DOWN;
+ n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;
n_ptr->signature = INVALID_NODE_SIG;
tipc_num_nodes++;
@@ -144,30 +144,36 @@ void tipc_node_stop(void)
void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
struct tipc_link **active = &n_ptr->active_links[0];
+ u32 addr = n_ptr->addr;
n_ptr->working_links++;
-
+ tipc_nametbl_publish(TIPC_LINK_STATE, addr, addr, TIPC_NODE_SCOPE,
+ l_ptr->bearer_id, addr);
pr_info("Established link <%s> on network plane %c\n",
- l_ptr->name, l_ptr->b_ptr->net_plane);
+ l_ptr->name, l_ptr->net_plane);
if (!active[0]) {
active[0] = active[1] = l_ptr;
node_established_contact(n_ptr);
- return;
+ goto exit;
}
if (l_ptr->priority < active[0]->priority) {
pr_info("New link <%s> becomes standby\n", l_ptr->name);
- return;
+ goto exit;
}
tipc_link_dup_queue_xmit(active[0], l_ptr);
if (l_ptr->priority == active[0]->priority) {
active[0] = l_ptr;
- return;
+ goto exit;
}
pr_info("Old link <%s> becomes standby\n", active[0]->name);
if (active[1] != active[0])
pr_info("Old link <%s> becomes standby\n", active[1]->name);
active[0] = active[1] = l_ptr;
+exit:
+ /* Leave room for changeover header when returning 'mtu' to users: */
+ n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
+ n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
}
/**
@@ -203,16 +209,18 @@ static void node_select_active_links(struct tipc_node *n_ptr)
void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
struct tipc_link **active;
+ u32 addr = n_ptr->addr;
n_ptr->working_links--;
+ tipc_nametbl_withdraw(TIPC_LINK_STATE, addr, l_ptr->bearer_id, addr);
if (!tipc_link_is_active(l_ptr)) {
pr_info("Lost standby link <%s> on network plane %c\n",
- l_ptr->name, l_ptr->b_ptr->net_plane);
+ l_ptr->name, l_ptr->net_plane);
return;
}
pr_info("Lost link <%s> on network plane %c\n",
- l_ptr->name, l_ptr->b_ptr->net_plane);
+ l_ptr->name, l_ptr->net_plane);
active = &n_ptr->active_links[0];
if (active[0] == l_ptr)
@@ -225,6 +233,19 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
tipc_link_failover_send_queue(l_ptr);
else
node_lost_contact(n_ptr);
+
+ /* Leave room for changeover header when returning 'mtu' to users: */
+ if (active[0]) {
+ n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE;
+ n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE;
+ return;
+ }
+
+ /* Loopback link went down? No fragmentation needed from now on. */
+ if (n_ptr->addr == tipc_own_addr) {
+ n_ptr->act_mtus[0] = MAX_MSG_SIZE;
+ n_ptr->act_mtus[1] = MAX_MSG_SIZE;
+ }
}
int tipc_node_active_links(struct tipc_node *n_ptr)
@@ -239,7 +260,7 @@ int tipc_node_is_up(struct tipc_node *n_ptr)
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
- n_ptr->links[l_ptr->b_ptr->identity] = l_ptr;
+ n_ptr->links[l_ptr->bearer_id] = l_ptr;
spin_lock_bh(&node_list_lock);
tipc_num_links++;
spin_unlock_bh(&node_list_lock);
@@ -263,26 +284,12 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
static void node_established_contact(struct tipc_node *n_ptr)
{
- tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
+ n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
n_ptr->bclink.oos_state = 0;
n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
}
-static void node_name_purge_complete(unsigned long node_addr)
-{
- struct tipc_node *n_ptr;
-
- read_lock_bh(&tipc_net_lock);
- n_ptr = tipc_node_find(node_addr);
- if (n_ptr) {
- tipc_node_lock(n_ptr);
- n_ptr->block_setup &= ~WAIT_NAMES_GONE;
- tipc_node_unlock(n_ptr);
- }
- read_unlock_bh(&tipc_net_lock);
-}
-
static void node_lost_contact(struct tipc_node *n_ptr)
{
char addr_string[16];
@@ -296,10 +303,9 @@ static void node_lost_contact(struct tipc_node *n_ptr)
kfree_skb_list(n_ptr->bclink.deferred_head);
n_ptr->bclink.deferred_size = 0;
- if (n_ptr->bclink.reasm_head) {
- kfree_skb(n_ptr->bclink.reasm_head);
- n_ptr->bclink.reasm_head = NULL;
- n_ptr->bclink.reasm_tail = NULL;
+ if (n_ptr->bclink.reasm_buf) {
+ kfree_skb(n_ptr->bclink.reasm_buf);
+ n_ptr->bclink.reasm_buf = NULL;
}
tipc_bclink_remove_node(n_ptr->addr);
@@ -318,12 +324,13 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_link_reset_fragments(l_ptr);
}
- /* Notify subscribers */
- tipc_nodesub_notify(n_ptr);
+ n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN;
- /* Prevent re-contact with node until cleanup is done */
- n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE;
- tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr);
+ /* Notify subscribers and prevent re-contact with node until
+ * cleanup is done.
+ */
+ n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN |
+ TIPC_NOTIFY_NODE_DOWN;
}
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
@@ -436,3 +443,56 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
rcu_read_unlock();
return buf;
}
+
+/**
+ * tipc_node_get_linkname - get the name of a link
+ *
+ * @bearer_id: id of the bearer
+ * @node: peer node address
+ * @linkname: link name output buffer
+ *
+ * Returns 0 on success
+ */
+int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
+{
+ struct tipc_link *link;
+ struct tipc_node *node = tipc_node_find(addr);
+
+ if ((bearer_id >= MAX_BEARERS) || !node)
+ return -EINVAL;
+ tipc_node_lock(node);
+ link = node->links[bearer_id];
+ if (link) {
+ strncpy(linkname, link->name, len);
+ tipc_node_unlock(node);
+ return 0;
+ }
+ tipc_node_unlock(node);
+ return -EINVAL;
+}
+
+void tipc_node_unlock(struct tipc_node *node)
+{
+ LIST_HEAD(nsub_list);
+ u32 addr = 0;
+
+ if (likely(!node->action_flags)) {
+ spin_unlock_bh(&node->lock);
+ return;
+ }
+
+ if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) {
+ list_replace_init(&node->nsub, &nsub_list);
+ node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
+ }
+ if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
+ node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
+ addr = node->addr;
+ }
+ spin_unlock_bh(&node->lock);
+
+ if (!list_empty(&nsub_list))
+ tipc_nodesub_notify(&nsub_list);
+ if (addr)
+ tipc_named_node_up(addr);
+}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 7cbb8cec1a9..b61716a8218 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -41,68 +41,81 @@
#include "addr.h"
#include "net.h"
#include "bearer.h"
+#include "msg.h"
/*
* Out-of-range value for node signature
*/
#define INVALID_NODE_SIG 0x10000
-/* Flags used to block (re)establishment of contact with a neighboring node */
-#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */
-#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */
-#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */
+/* Flags used to take different actions according to flag type
+ * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
+ * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
+ * TIPC_NOTIFY_NODE_DOWN: notify node is down
+ * TIPC_NOTIFY_NODE_UP: notify node is up
+ */
+enum {
+ TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
+ TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
+ TIPC_NOTIFY_NODE_DOWN = (1 << 3),
+ TIPC_NOTIFY_NODE_UP = (1 << 4)
+};
+
+/**
+ * struct tipc_node_bclink - TIPC node bclink structure
+ * @acked: sequence # of last outbound b'cast message acknowledged by node
+ * @last_in: sequence # of last in-sequence b'cast message received from node
+ * @last_sent: sequence # of last b'cast message sent by node
+ * @oos_state: state tracker for handling OOS b'cast messages
+ * @deferred_size: number of OOS b'cast messages in deferred queue
+ * @deferred_head: oldest OOS b'cast message received from node
+ * @deferred_tail: newest OOS b'cast message received from node
+ * @reasm_buf: broadcast reassembly queue head from node
+ * @recv_permitted: true if node is allowed to receive b'cast messages
+ */
+struct tipc_node_bclink {
+ u32 acked;
+ u32 last_in;
+ u32 last_sent;
+ u32 oos_state;
+ u32 deferred_size;
+ struct sk_buff *deferred_head;
+ struct sk_buff *deferred_tail;
+ struct sk_buff *reasm_buf;
+ bool recv_permitted;
+};
/**
* struct tipc_node - TIPC node structure
* @addr: network address of node
* @lock: spinlock governing access to structure
* @hash: links to adjacent nodes in unsorted hash chain
- * @list: links to adjacent nodes in sorted list of cluster's nodes
- * @nsub: list of "node down" subscriptions monitoring node
* @active_links: pointers to active links to node
* @links: pointers to all links to node
+ * @action_flags: bit mask of different types of node actions
+ * @bclink: broadcast-related info
+ * @list: links to adjacent nodes in sorted list of cluster's nodes
* @working_links: number of working links to node (both active and standby)
- * @block_setup: bit mask of conditions preventing link establishment to node
* @link_cnt: number of links to node
* @signature: node instance identifier
- * @bclink: broadcast-related info
+ * @nsub: list of "node down" subscriptions monitoring node
* @rcu: rcu struct for tipc_node
- * @acked: sequence # of last outbound b'cast message acknowledged by node
- * @last_in: sequence # of last in-sequence b'cast message received from node
- * @last_sent: sequence # of last b'cast message sent by node
- * @oos_state: state tracker for handling OOS b'cast messages
- * @deferred_size: number of OOS b'cast messages in deferred queue
- * @deferred_head: oldest OOS b'cast message received from node
- * @deferred_tail: newest OOS b'cast message received from node
- * @reasm_head: broadcast reassembly queue head from node
- * @reasm_tail: last broadcast fragment received from node
- * @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node {
u32 addr;
spinlock_t lock;
struct hlist_node hash;
- struct list_head list;
- struct list_head nsub;
struct tipc_link *active_links[2];
+ u32 act_mtus[2];
struct tipc_link *links[MAX_BEARERS];
+ unsigned int action_flags;
+ struct tipc_node_bclink bclink;
+ struct list_head list;
int link_cnt;
int working_links;
- int block_setup;
u32 signature;
+ struct list_head nsub;
struct rcu_head rcu;
- struct {
- u32 acked;
- u32 last_in;
- u32 last_sent;
- u32 oos_state;
- u32 deferred_size;
- struct sk_buff *deferred_head;
- struct sk_buff *deferred_tail;
- struct sk_buff *reasm_head;
- struct sk_buff *reasm_tail;
- bool recv_permitted;
- } bclink;
};
extern struct list_head tipc_node_list;
@@ -118,15 +131,33 @@ int tipc_node_active_links(struct tipc_node *n_ptr);
int tipc_node_is_up(struct tipc_node *n_ptr);
struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space);
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
+int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len);
+void tipc_node_unlock(struct tipc_node *node);
-static inline void tipc_node_lock(struct tipc_node *n_ptr)
+static inline void tipc_node_lock(struct tipc_node *node)
{
- spin_lock_bh(&n_ptr->lock);
+ spin_lock_bh(&node->lock);
}
-static inline void tipc_node_unlock(struct tipc_node *n_ptr)
+static inline bool tipc_node_blocked(struct tipc_node *node)
{
- spin_unlock_bh(&n_ptr->lock);
+ return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN |
+ TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
+}
+
+static inline uint tipc_node_get_mtu(u32 addr, u32 selector)
+{
+ struct tipc_node *node;
+ u32 mtu;
+
+ node = tipc_node_find(addr);
+
+ if (likely(node))
+ mtu = node->act_mtus[selector & 1];
+ else
+ mtu = MAX_MSG_SIZE;
+
+ return mtu;
}
#endif
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 8a7384c04ad..2d13eea8574 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -81,15 +81,16 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
*
* Note: node is locked by caller
*/
-void tipc_nodesub_notify(struct tipc_node *node)
+void tipc_nodesub_notify(struct list_head *nsub_list)
{
- struct tipc_node_subscr *ns;
+ struct tipc_node_subscr *ns, *safe;
+ net_ev_handler handle_node_down;
- list_for_each_entry(ns, &node->nsub, nodesub_list) {
- if (ns->handle_node_down) {
- tipc_k_signal((Handler)ns->handle_node_down,
- (unsigned long)ns->usr_handle);
+ list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
+ handle_node_down = ns->handle_node_down;
+ if (handle_node_down) {
ns->handle_node_down = NULL;
+ handle_node_down(ns->usr_handle);
}
}
}
diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h
index c95d20727de..d91b8cc81e3 100644
--- a/net/tipc/node_subscr.h
+++ b/net/tipc/node_subscr.h
@@ -58,6 +58,6 @@ struct tipc_node_subscr {
void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
void *usr_handle, net_ev_handler handle_down);
void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
-void tipc_nodesub_notify(struct tipc_node *node);
+void tipc_nodesub_notify(struct list_head *nsub_list);
#endif
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 5c14c7801ee..7e096a5e770 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -42,8 +42,6 @@
/* Connection management: */
#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */
-#define CONFIRMED 0
-#define PROBING 1
#define MAX_REJECT_SIZE 1024
@@ -76,124 +74,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
(!peernode && (orignode == tipc_own_addr));
}
-/**
- * tipc_port_mcast_xmit - send a multicast message to local and remote
- * destinations
- */
-int tipc_port_mcast_xmit(struct tipc_port *oport,
- struct tipc_name_seq const *seq,
- struct iovec const *msg_sect,
- unsigned int len)
-{
- struct tipc_msg *hdr;
- struct sk_buff *buf;
- struct sk_buff *ibuf = NULL;
- struct tipc_port_list dports = {0, NULL, };
- int ext_targets;
- int res;
-
- /* Create multicast message */
- hdr = &oport->phdr;
- msg_set_type(hdr, TIPC_MCAST_MSG);
- msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
- msg_set_destport(hdr, 0);
- msg_set_destnode(hdr, 0);
- msg_set_nametype(hdr, seq->type);
- msg_set_namelower(hdr, seq->lower);
- msg_set_nameupper(hdr, seq->upper);
- msg_set_hdr_sz(hdr, MCAST_H_SIZE);
- res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
- if (unlikely(!buf))
- return res;
-
- /* Figure out where to send multicast message */
- ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
- TIPC_NODE_SCOPE, &dports);
-
- /* Send message to destinations (duplicate it only if necessary) */
- if (ext_targets) {
- if (dports.count != 0) {
- ibuf = skb_copy(buf, GFP_ATOMIC);
- if (ibuf == NULL) {
- tipc_port_list_free(&dports);
- kfree_skb(buf);
- return -ENOMEM;
- }
- }
- res = tipc_bclink_xmit(buf);
- if ((res < 0) && (dports.count != 0))
- kfree_skb(ibuf);
- } else {
- ibuf = buf;
- }
-
- if (res >= 0) {
- if (ibuf)
- tipc_port_mcast_rcv(ibuf, &dports);
- } else {
- tipc_port_list_free(&dports);
- }
- return res;
-}
-
-/**
- * tipc_port_mcast_rcv - deliver multicast message to all destination ports
- *
- * If there is no port list, perform a lookup to create one
- */
-void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
-{
- struct tipc_msg *msg;
- struct tipc_port_list dports = {0, NULL, };
- struct tipc_port_list *item = dp;
- int cnt = 0;
-
- msg = buf_msg(buf);
-
- /* Create destination port list, if one wasn't supplied */
- if (dp == NULL) {
- tipc_nametbl_mc_translate(msg_nametype(msg),
- msg_namelower(msg),
- msg_nameupper(msg),
- TIPC_CLUSTER_SCOPE,
- &dports);
- item = dp = &dports;
- }
-
- /* Deliver a copy of message to each destination port */
- if (dp->count != 0) {
- msg_set_destnode(msg, tipc_own_addr);
- if (dp->count == 1) {
- msg_set_destport(msg, dp->ports[0]);
- tipc_port_rcv(buf);
- tipc_port_list_free(dp);
- return;
- }
- for (; cnt < dp->count; cnt++) {
- int index = cnt % PLSIZE;
- struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
-
- if (b == NULL) {
- pr_warn("Unable to deliver multicast message(s)\n");
- goto exit;
- }
- if ((index == 0) && (cnt != 0))
- item = item->next;
- msg_set_destport(buf_msg(b), item->ports[index]);
- tipc_port_rcv(b);
- }
- }
-exit:
- kfree_skb(buf);
- tipc_port_list_free(dp);
-}
-
-
-void tipc_port_wakeup(struct tipc_port *port)
-{
- tipc_sock_wakeup(tipc_port_to_sock(port));
-}
-
/* tipc_port_init - intiate TIPC port and lock it
*
* Returns obtained reference if initialization is successful, zero otherwise
@@ -235,6 +115,8 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
void tipc_port_destroy(struct tipc_port *p_ptr)
{
struct sk_buff *buf = NULL;
+ struct tipc_msg *msg = NULL;
+ u32 peer;
tipc_withdraw(p_ptr, 0, NULL);
@@ -246,14 +128,15 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
if (p_ptr->connected) {
buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
tipc_nodesub_unsubscribe(&p_ptr->subscription);
+ msg = buf_msg(buf);
+ peer = msg_destnode(msg);
+ tipc_link_xmit(buf, peer, msg_link_selector(msg));
}
-
spin_lock_bh(&tipc_port_list_lock);
list_del(&p_ptr->port_list);
list_del(&p_ptr->wait_list);
spin_unlock_bh(&tipc_port_list_lock);
k_term_timer(&p_ptr->timer);
- tipc_net_route_msg(buf);
}
/*
@@ -275,100 +158,16 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
msg_set_destport(msg, tipc_port_peerport(p_ptr));
msg_set_origport(msg, p_ptr->ref);
msg_set_msgcnt(msg, ack);
+ buf->next = NULL;
}
return buf;
}
-int tipc_reject_msg(struct sk_buff *buf, u32 err)
-{
- struct tipc_msg *msg = buf_msg(buf);
- struct sk_buff *rbuf;
- struct tipc_msg *rmsg;
- int hdr_sz;
- u32 imp;
- u32 data_sz = msg_data_sz(msg);
- u32 src_node;
- u32 rmsg_sz;
-
- /* discard rejected message if it shouldn't be returned to sender */
- if (WARN(!msg_isdata(msg),
- "attempt to reject message with user=%u", msg_user(msg))) {
- dump_stack();
- goto exit;
- }
- if (msg_errcode(msg) || msg_dest_droppable(msg))
- goto exit;
-
- /*
- * construct returned message by copying rejected message header and
- * data (or subset), then updating header fields that need adjusting
- */
- hdr_sz = msg_hdr_sz(msg);
- rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
-
- rbuf = tipc_buf_acquire(rmsg_sz);
- if (rbuf == NULL)
- goto exit;
-
- rmsg = buf_msg(rbuf);
- skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
-
- if (msg_connected(rmsg)) {
- imp = msg_importance(rmsg);
- if (imp < TIPC_CRITICAL_IMPORTANCE)
- msg_set_importance(rmsg, ++imp);
- }
- msg_set_non_seq(rmsg, 0);
- msg_set_size(rmsg, rmsg_sz);
- msg_set_errcode(rmsg, err);
- msg_set_prevnode(rmsg, tipc_own_addr);
- msg_swap_words(rmsg, 4, 5);
- if (!msg_short(rmsg))
- msg_swap_words(rmsg, 6, 7);
-
- /* send self-abort message when rejecting on a connected port */
- if (msg_connected(msg)) {
- struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
-
- if (p_ptr) {
- struct sk_buff *abuf = NULL;
-
- if (p_ptr->connected)
- abuf = port_build_self_abort_msg(p_ptr, err);
- tipc_port_unlock(p_ptr);
- tipc_net_route_msg(abuf);
- }
- }
-
- /* send returned message & dispose of rejected message */
- src_node = msg_prevnode(msg);
- if (in_own_node(src_node))
- tipc_port_rcv(rbuf);
- else
- tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
-exit:
- kfree_skb(buf);
- return data_sz;
-}
-
-int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr,
- struct iovec const *msg_sect, unsigned int len,
- int err)
-{
- struct sk_buff *buf;
- int res;
-
- res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
- if (!buf)
- return res;
-
- return tipc_reject_msg(buf, err);
-}
-
static void port_timeout(unsigned long ref)
{
struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
+ struct tipc_msg *msg = NULL;
if (!p_ptr)
return;
@@ -379,15 +178,16 @@ static void port_timeout(unsigned long ref)
}
/* Last probe answered ? */
- if (p_ptr->probing_state == PROBING) {
+ if (p_ptr->probing_state == TIPC_CONN_PROBING) {
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
} else {
buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
- p_ptr->probing_state = PROBING;
+ p_ptr->probing_state = TIPC_CONN_PROBING;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
}
tipc_port_unlock(p_ptr);
- tipc_net_route_msg(buf);
+ msg = buf_msg(buf);
+ tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
@@ -395,12 +195,14 @@ static void port_handle_node_down(unsigned long ref)
{
struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
+ struct tipc_msg *msg = NULL;
if (!p_ptr)
return;
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
tipc_port_unlock(p_ptr);
- tipc_net_route_msg(buf);
+ msg = buf_msg(buf);
+ tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
@@ -412,6 +214,7 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 er
struct tipc_msg *msg = buf_msg(buf);
msg_swap_words(msg, 4, 5);
msg_swap_words(msg, 6, 7);
+ buf->next = NULL;
}
return buf;
}
@@ -436,60 +239,11 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er
if (imp < TIPC_CRITICAL_IMPORTANCE)
msg_set_importance(msg, ++imp);
msg_set_errcode(msg, err);
+ buf->next = NULL;
}
return buf;
}
-void tipc_port_proto_rcv(struct sk_buff *buf)
-{
- struct tipc_msg *msg = buf_msg(buf);
- struct tipc_port *p_ptr;
- struct sk_buff *r_buf = NULL;
- u32 destport = msg_destport(msg);
- int wakeable;
-
- /* Validate connection */
- p_ptr = tipc_port_lock(destport);
- if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
- r_buf = tipc_buf_acquire(BASIC_H_SIZE);
- if (r_buf) {
- msg = buf_msg(r_buf);
- tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
- BASIC_H_SIZE, msg_orignode(msg));
- msg_set_errcode(msg, TIPC_ERR_NO_PORT);
- msg_set_origport(msg, destport);
- msg_set_destport(msg, msg_origport(msg));
- }
- if (p_ptr)
- tipc_port_unlock(p_ptr);
- goto exit;
- }
-
- /* Process protocol message sent by peer */
- switch (msg_type(msg)) {
- case CONN_ACK:
- wakeable = tipc_port_congested(p_ptr) && p_ptr->congested;
- p_ptr->acked += msg_msgcnt(msg);
- if (!tipc_port_congested(p_ptr)) {
- p_ptr->congested = 0;
- if (wakeable)
- tipc_port_wakeup(p_ptr);
- }
- break;
- case CONN_PROBE:
- r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
- break;
- default:
- /* CONN_PROBE_REPLY or unrecognized - no action required */
- break;
- }
- p_ptr->probing_state = CONFIRMED;
- tipc_port_unlock(p_ptr);
-exit:
- tipc_net_route_msg(r_buf);
- kfree_skb(buf);
-}
-
static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
{
struct publication *publ;
@@ -581,16 +335,19 @@ void tipc_acknowledge(u32 ref, u32 ack)
{
struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
+ struct tipc_msg *msg;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return;
- if (p_ptr->connected) {
- p_ptr->conn_unacked -= ack;
+ if (p_ptr->connected)
buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
- }
+
tipc_port_unlock(p_ptr);
- tipc_net_route_msg(buf);
+ if (!buf)
+ return;
+ msg = buf_msg(buf);
+ tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
@@ -689,7 +446,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
msg_set_hdr_sz(msg, SHORT_H_SIZE);
p_ptr->probing_interval = PROBING_INTERVAL;
- p_ptr->probing_state = CONFIRMED;
+ p_ptr->probing_state = TIPC_CONN_OK;
p_ptr->connected = 1;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
@@ -698,7 +455,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
(net_ev_handler)port_handle_node_down);
res = 0;
exit:
- p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
+ p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
return res;
}
@@ -741,6 +498,7 @@ int tipc_port_disconnect(u32 ref)
*/
int tipc_port_shutdown(u32 ref)
{
+ struct tipc_msg *msg;
struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
@@ -750,180 +508,7 @@ int tipc_port_shutdown(u32 ref)
buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
tipc_port_unlock(p_ptr);
- tipc_net_route_msg(buf);
+ msg = buf_msg(buf);
+ tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
return tipc_port_disconnect(ref);
}
-
-/**
- * tipc_port_rcv - receive message from lower layer and deliver to port user
- */
-int tipc_port_rcv(struct sk_buff *buf)
-{
- struct tipc_port *p_ptr;
- struct tipc_msg *msg = buf_msg(buf);
- u32 destport = msg_destport(msg);
- u32 dsz = msg_data_sz(msg);
- u32 err;
-
- /* forward unresolved named message */
- if (unlikely(!destport)) {
- tipc_net_route_msg(buf);
- return dsz;
- }
-
- /* validate destination & pass to port, otherwise reject message */
- p_ptr = tipc_port_lock(destport);
- if (likely(p_ptr)) {
- err = tipc_sk_rcv(&tipc_port_to_sock(p_ptr)->sk, buf);
- tipc_port_unlock(p_ptr);
- if (likely(!err))
- return dsz;
- } else {
- err = TIPC_ERR_NO_PORT;
- }
-
- return tipc_reject_msg(buf, err);
-}
-
-/*
- * tipc_port_iovec_rcv: Concatenate and deliver sectioned
- * message for this node.
- */
-static int tipc_port_iovec_rcv(struct tipc_port *sender,
- struct iovec const *msg_sect,
- unsigned int len)
-{
- struct sk_buff *buf;
- int res;
-
- res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
- if (likely(buf))
- tipc_port_rcv(buf);
- return res;
-}
-
-/**
- * tipc_send - send message sections on connection
- */
-int tipc_send(struct tipc_port *p_ptr,
- struct iovec const *msg_sect,
- unsigned int len)
-{
- u32 destnode;
- int res;
-
- if (!p_ptr->connected)
- return -EINVAL;
-
- p_ptr->congested = 1;
- if (!tipc_port_congested(p_ptr)) {
- destnode = tipc_port_peernode(p_ptr);
- if (likely(!in_own_node(destnode)))
- res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
- destnode);
- else
- res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
-
- if (likely(res != -ELINKCONG)) {
- p_ptr->congested = 0;
- if (res > 0)
- p_ptr->sent++;
- return res;
- }
- }
- if (tipc_port_unreliable(p_ptr)) {
- p_ptr->congested = 0;
- return len;
- }
- return -ELINKCONG;
-}
-
-/**
- * tipc_send2name - send message sections to port name
- */
-int tipc_send2name(struct tipc_port *p_ptr,
- struct tipc_name const *name,
- unsigned int domain,
- struct iovec const *msg_sect,
- unsigned int len)
-{
- struct tipc_msg *msg;
- u32 destnode = domain;
- u32 destport;
- int res;
-
- if (p_ptr->connected)
- return -EINVAL;
-
- msg = &p_ptr->phdr;
- msg_set_type(msg, TIPC_NAMED_MSG);
- msg_set_hdr_sz(msg, NAMED_H_SIZE);
- msg_set_nametype(msg, name->type);
- msg_set_nameinst(msg, name->instance);
- msg_set_lookup_scope(msg, tipc_addr_scope(domain));
- destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
- msg_set_destnode(msg, destnode);
- msg_set_destport(msg, destport);
-
- if (likely(destport || destnode)) {
- if (likely(in_own_node(destnode)))
- res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
- else if (tipc_own_addr)
- res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
- destnode);
- else
- res = tipc_port_iovec_reject(p_ptr, msg, msg_sect,
- len, TIPC_ERR_NO_NODE);
- if (likely(res != -ELINKCONG)) {
- if (res > 0)
- p_ptr->sent++;
- return res;
- }
- if (tipc_port_unreliable(p_ptr))
- return len;
-
- return -ELINKCONG;
- }
- return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
- TIPC_ERR_NO_NAME);
-}
-
-/**
- * tipc_send2port - send message sections to port identity
- */
-int tipc_send2port(struct tipc_port *p_ptr,
- struct tipc_portid const *dest,
- struct iovec const *msg_sect,
- unsigned int len)
-{
- struct tipc_msg *msg;
- int res;
-
- if (p_ptr->connected)
- return -EINVAL;
-
- msg = &p_ptr->phdr;
- msg_set_type(msg, TIPC_DIRECT_MSG);
- msg_set_lookup_scope(msg, 0);
- msg_set_destnode(msg, dest->node);
- msg_set_destport(msg, dest->ref);
- msg_set_hdr_sz(msg, BASIC_H_SIZE);
-
- if (in_own_node(dest->node))
- res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
- else if (tipc_own_addr)
- res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
- dest->node);
- else
- res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
- TIPC_ERR_NO_NODE);
- if (likely(res != -ELINKCONG)) {
- if (res > 0)
- p_ptr->sent++;
- return res;
- }
- if (tipc_port_unreliable(p_ptr))
- return len;
-
- return -ELINKCONG;
-}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index a00397393bd..3f93454592b 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -42,9 +42,10 @@
#include "msg.h"
#include "node_subscr.h"
-#define TIPC_FLOW_CONTROL_WIN 512
-#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
- SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
+#define TIPC_CONNACK_INTV 256
+#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
+#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
+ SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
/**
* struct tipc_port - TIPC port structure
@@ -52,17 +53,13 @@
* @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established
* @conn_instance: TIPC instance used when connection was established
- * @conn_unacked: number of unacknowledged messages received from peer port
* @published: non-zero if port has one or more associated names
- * @congested: non-zero if cannot send because of link or port congestion
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports
* @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts:
- * @sent: # of non-empty messages sent by port
- * @acked: # of non-empty message acknowledgements from connected port's peer
* @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
@@ -75,17 +72,13 @@ struct tipc_port {
int connected;
u32 conn_type;
u32 conn_instance;
- u32 conn_unacked;
int published;
- u32 congested;
u32 max_pkt;
u32 ref;
struct tipc_msg phdr;
struct list_head port_list;
struct list_head wait_list;
u32 waiting_pkts;
- u32 sent;
- u32 acked;
struct list_head publications;
u32 pub_count;
u32 probing_state;
@@ -103,8 +96,6 @@ struct tipc_port_list;
u32 tipc_port_init(struct tipc_port *p_ptr,
const unsigned int importance);
-int tipc_reject_msg(struct sk_buff *buf, u32 err);
-
void tipc_acknowledge(u32 port_ref, u32 ack);
void tipc_port_destroy(struct tipc_port *p_ptr);
@@ -121,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
int tipc_port_shutdown(u32 ref);
-void tipc_port_wakeup(struct tipc_port *port);
-
/*
* The following routines require that the port be locked on entry
*/
@@ -131,40 +120,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
struct tipc_portid const *peer);
int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
-/*
- * TIPC messaging routines
- */
-int tipc_port_rcv(struct sk_buff *buf);
-
-int tipc_send(struct tipc_port *port,
- struct iovec const *msg_sect,
- unsigned int len);
-
-int tipc_send2name(struct tipc_port *port,
- struct tipc_name const *name,
- u32 domain,
- struct iovec const *msg_sect,
- unsigned int len);
-
-int tipc_send2port(struct tipc_port *port,
- struct tipc_portid const *dest,
- struct iovec const *msg_sect,
- unsigned int len);
-
-int tipc_port_mcast_xmit(struct tipc_port *port,
- struct tipc_name_seq const *seq,
- struct iovec const *msg,
- unsigned int len);
-
-int tipc_port_iovec_reject(struct tipc_port *p_ptr,
- struct tipc_msg *hdr,
- struct iovec const *msg_sect,
- unsigned int len,
- int err);
-
struct sk_buff *tipc_port_get_ports(void);
-void tipc_port_proto_rcv(struct sk_buff *buf);
-void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
void tipc_port_reinit(void);
/**
@@ -185,12 +141,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
spin_unlock_bh(p_ptr->lock);
}
-static inline int tipc_port_congested(struct tipc_port *p_ptr)
-{
- return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2);
-}
-
-
static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
{
return msg_destnode(&p_ptr->phdr);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3c0256962f7..7d423ee1089 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -36,19 +36,23 @@
#include "core.h"
#include "port.h"
-
+#include "name_table.h"
+#include "node.h"
+#include "link.h"
#include <linux/export.h>
#define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
+#define TIPC_FWD_MSG 1
-static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
+static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
static void tipc_data_ready(struct sock *sk);
static void tipc_write_space(struct sock *sk);
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
+static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
@@ -122,9 +126,12 @@ static void advance_rx_queue(struct sock *sk)
static void reject_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
+ u32 dnode;
- while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
- tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+ while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
+ if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
+ tipc_link_xmit(buf, dnode, 0);
+ }
}
/**
@@ -195,11 +202,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sock->state = state;
sock_init_data(sock, sk);
- sk->sk_backlog_rcv = backlog_rcv;
+ sk->sk_backlog_rcv = tipc_backlog_rcv;
sk->sk_rcvbuf = sysctl_tipc_rmem[1];
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
- tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
+ tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
+ tsk->sent_unacked = 0;
+ atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port);
if (sock->state == SS_READY) {
@@ -301,6 +310,7 @@ static int tipc_release(struct socket *sock)
struct tipc_sock *tsk;
struct tipc_port *port;
struct sk_buff *buf;
+ u32 dnode;
/*
* Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -329,7 +339,8 @@ static int tipc_release(struct socket *sock)
sock->state = SS_DISCONNECTING;
tipc_port_disconnect(port->ref);
}
- tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
+ if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
+ tipc_link_xmit(buf, dnode, 0);
}
}
@@ -502,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch ((int)sock->state) {
case SS_UNCONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong)
mask |= POLLOUT;
break;
case SS_READY:
case SS_CONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case SS_CONNECTING:
@@ -524,6 +535,136 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
}
/**
+ * tipc_sendmcast - send multicast message
+ * @sock: socket structure
+ * @seq: destination address
+ * @iov: message data to send
+ * @dsz: total length of message data
+ * @timeo: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
+ struct iovec *iov, size_t dsz, long timeo)
+{
+ struct sock *sk = sock->sk;
+ struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr;
+ struct sk_buff *buf;
+ uint mtu;
+ int rc;
+
+ msg_set_type(mhdr, TIPC_MCAST_MSG);
+ msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
+ msg_set_destport(mhdr, 0);
+ msg_set_destnode(mhdr, 0);
+ msg_set_nametype(mhdr, seq->type);
+ msg_set_namelower(mhdr, seq->lower);
+ msg_set_nameupper(mhdr, seq->upper);
+ msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
+
+new_mtu:
+ mtu = tipc_bclink_get_mtu();
+ rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
+ if (unlikely(rc < 0))
+ return rc;
+
+ do {
+ rc = tipc_bclink_xmit(buf);
+ if (likely(rc >= 0)) {
+ rc = dsz;
+ break;
+ }
+ if (rc == -EMSGSIZE)
+ goto new_mtu;
+ if (rc != -ELINKCONG)
+ break;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (rc)
+ kfree_skb_list(buf);
+ } while (!rc);
+ return rc;
+}
+
+/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+ */
+void tipc_sk_mcast_rcv(struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_port_list dports = {0, NULL, };
+ struct tipc_port_list *item;
+ struct sk_buff *b;
+ uint i, last, dst = 0;
+ u32 scope = TIPC_CLUSTER_SCOPE;
+
+ if (in_own_node(msg_orignode(msg)))
+ scope = TIPC_NODE_SCOPE;
+
+ /* Create destination port list: */
+ tipc_nametbl_mc_translate(msg_nametype(msg),
+ msg_namelower(msg),
+ msg_nameupper(msg),
+ scope,
+ &dports);
+ last = dports.count;
+ if (!last) {
+ kfree_skb(buf);
+ return;
+ }
+
+ for (item = &dports; item; item = item->next) {
+ for (i = 0; i < PLSIZE && ++dst <= last; i++) {
+ b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
+ if (!b) {
+ pr_warn("Failed do clone mcast rcv buffer\n");
+ continue;
+ }
+ msg_set_destport(msg, item->ports[i]);
+ tipc_sk_rcv(b);
+ }
+ }
+ tipc_port_list_free(&dports);
+}
+
+/**
+ * tipc_sk_proto_rcv - receive a connection mng protocol message
+ * @tsk: receiving socket
+ * @dnode: node to send response message to, if any
+ * @buf: buffer containing protocol message
+ * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
+ * (CONN_PROBE_REPLY) message should be forwarded.
+ */
+static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
+ struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_port *port = &tsk->port;
+ int conn_cong;
+
+ /* Ignore if connection cannot be validated: */
+ if (!port->connected || !tipc_port_peer_msg(port, msg))
+ goto exit;
+
+ port->probing_state = TIPC_CONN_OK;
+
+ if (msg_type(msg) == CONN_ACK) {
+ conn_cong = tipc_sk_conn_cong(tsk);
+ tsk->sent_unacked -= msg_msgcnt(msg);
+ if (conn_cong)
+ tipc_sock_wakeup(tsk);
+ } else if (msg_type(msg) == CONN_PROBE) {
+ if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
+ return TIPC_OK;
+ msg_set_type(msg, CONN_PROBE_REPLY);
+ return TIPC_FWD_MSG;
+ }
+ /* Do nothing if msg_type() == CONN_PROBE_REPLY */
+exit:
+ kfree_skb(buf);
+ return TIPC_OK;
+}
+
+/**
* dest_name_check - verify user is permitted to send to specified port name
* @dest: destination address
* @m: descriptor for message to be sent
@@ -537,6 +678,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
{
struct tipc_cfg_msg_hdr hdr;
+ if (unlikely(dest->addrtype == TIPC_ADDR_ID))
+ return 0;
if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
return 0;
if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
@@ -573,19 +716,18 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
+ done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
-
/**
* tipc_sendmsg - send message in connectionless manner
* @iocb: if NULL, indicates that socket lock is already held
* @sock: socket structure
* @m: message to send
- * @total_len: length of message
+ * @dsz: amount of user data to be sent
*
* Message must have an destination specified explicitly.
* Used for SOCK_RDM and SOCK_DGRAM messages,
@@ -595,100 +737,123 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
* Returns the number of bytes sent on success, or errno otherwise
*/
static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
- struct msghdr *m, size_t total_len)
+ struct msghdr *m, size_t dsz)
{
+ DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_port *port = &tsk->port;
- DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
- int needs_conn;
+ struct tipc_msg *mhdr = &port->phdr;
+ struct iovec *iov = m->msg_iov;
+ u32 dnode, dport;
+ struct sk_buff *buf;
+ struct tipc_name_seq *seq = &dest->addr.nameseq;
+ u32 mtu;
long timeo;
- int res = -EINVAL;
+ int rc = -EINVAL;
if (unlikely(!dest))
return -EDESTADDRREQ;
+
if (unlikely((m->msg_namelen < sizeof(*dest)) ||
(dest->family != AF_TIPC)))
return -EINVAL;
- if (total_len > TIPC_MAX_USER_MSG_SIZE)
+
+ if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE;
if (iocb)
lock_sock(sk);
- needs_conn = (sock->state != SS_READY);
- if (unlikely(needs_conn)) {
+ if (unlikely(sock->state != SS_READY)) {
if (sock->state == SS_LISTENING) {
- res = -EPIPE;
+ rc = -EPIPE;
goto exit;
}
if (sock->state != SS_UNCONNECTED) {
- res = -EISCONN;
+ rc = -EISCONN;
goto exit;
}
if (tsk->port.published) {
- res = -EOPNOTSUPP;
+ rc = -EOPNOTSUPP;
goto exit;
}
if (dest->addrtype == TIPC_ADDR_NAME) {
tsk->port.conn_type = dest->addr.name.name.type;
tsk->port.conn_instance = dest->addr.name.name.instance;
}
-
- /* Abort any pending connection attempts (very unlikely) */
- reject_rx_queue(sk);
}
+ rc = dest_name_check(dest, m);
+ if (rc)
+ goto exit;
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
- do {
- if (dest->addrtype == TIPC_ADDR_NAME) {
- res = dest_name_check(dest, m);
- if (res)
- break;
- res = tipc_send2name(port,
- &dest->addr.name.name,
- dest->addr.name.domain,
- m->msg_iov,
- total_len);
- } else if (dest->addrtype == TIPC_ADDR_ID) {
- res = tipc_send2port(port,
- &dest->addr.id,
- m->msg_iov,
- total_len);
- } else if (dest->addrtype == TIPC_ADDR_MCAST) {
- if (needs_conn) {
- res = -EOPNOTSUPP;
- break;
- }
- res = dest_name_check(dest, m);
- if (res)
- break;
- res = tipc_port_mcast_xmit(port,
- &dest->addr.nameseq,
- m->msg_iov,
- total_len);
+
+ if (dest->addrtype == TIPC_ADDR_MCAST) {
+ rc = tipc_sendmcast(sock, seq, iov, dsz, timeo);
+ goto exit;
+ } else if (dest->addrtype == TIPC_ADDR_NAME) {
+ u32 type = dest->addr.name.name.type;
+ u32 inst = dest->addr.name.name.instance;
+ u32 domain = dest->addr.name.domain;
+
+ dnode = domain;
+ msg_set_type(mhdr, TIPC_NAMED_MSG);
+ msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
+ msg_set_nametype(mhdr, type);
+ msg_set_nameinst(mhdr, inst);
+ msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+ dport = tipc_nametbl_translate(type, inst, &dnode);
+ msg_set_destnode(mhdr, dnode);
+ msg_set_destport(mhdr, dport);
+ if (unlikely(!dport && !dnode)) {
+ rc = -EHOSTUNREACH;
+ goto exit;
}
- if (likely(res != -ELINKCONG)) {
- if (needs_conn && (res >= 0))
+ } else if (dest->addrtype == TIPC_ADDR_ID) {
+ dnode = dest->addr.id.node;
+ msg_set_type(mhdr, TIPC_DIRECT_MSG);
+ msg_set_lookup_scope(mhdr, 0);
+ msg_set_destnode(mhdr, dnode);
+ msg_set_destport(mhdr, dest->addr.id.ref);
+ msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+ }
+
+new_mtu:
+ mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
+ rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
+ if (rc < 0)
+ goto exit;
+
+ do {
+ rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
+ if (likely(rc >= 0)) {
+ if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
+ rc = dsz;
break;
}
- res = tipc_wait_for_sndmsg(sock, &timeo);
- if (res)
+ if (rc == -EMSGSIZE)
+ goto new_mtu;
+
+ if (rc != -ELINKCONG)
break;
- } while (1);
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (rc)
+ kfree_skb_list(buf);
+ } while (!rc);
exit:
if (iocb)
release_sock(sk);
- return res;
+
+ return rc;
}
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
- struct tipc_port *port = &tsk->port;
DEFINE_WAIT(wait);
int done;
@@ -707,37 +872,49 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p,
- (!port->congested || !port->connected));
+ (!tsk->link_cong &&
+ !tipc_sk_conn_cong(tsk)) ||
+ !tsk->port.connected);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
/**
- * tipc_send_packet - send a connection-oriented message
- * @iocb: if NULL, indicates that socket lock is already held
+ * tipc_send_stream - send stream-oriented data
+ * @iocb: (unused)
* @sock: socket structure
- * @m: message to send
- * @total_len: length of message
+ * @m: data to send
+ * @dsz: total length of data to be transmitted
*
- * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
+ * Used for SOCK_STREAM data.
*
- * Returns the number of bytes sent on success, or errno otherwise
+ * Returns the number of bytes sent on success (or partial success),
+ * or errno if no data sent
*/
-static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
- struct msghdr *m, size_t total_len)
+static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
+ struct msghdr *m, size_t dsz)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_port *port = &tsk->port;
+ struct tipc_msg *mhdr = &port->phdr;
+ struct sk_buff *buf;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
- int res = -EINVAL;
+ u32 ref = port->ref;
+ int rc = -EINVAL;
long timeo;
+ u32 dnode;
+ uint mtu, send, sent = 0;
/* Handle implied connection establishment */
- if (unlikely(dest))
- return tipc_sendmsg(iocb, sock, m, total_len);
-
- if (total_len > TIPC_MAX_USER_MSG_SIZE)
+ if (unlikely(dest)) {
+ rc = tipc_sendmsg(iocb, sock, m, dsz);
+ if (dsz && (dsz == rc))
+ tsk->sent_unacked = 1;
+ return rc;
+ }
+ if (dsz > (uint)INT_MAX)
return -EMSGSIZE;
if (iocb)
@@ -745,123 +922,66 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
if (unlikely(sock->state != SS_CONNECTED)) {
if (sock->state == SS_DISCONNECTING)
- res = -EPIPE;
+ rc = -EPIPE;
else
- res = -ENOTCONN;
+ rc = -ENOTCONN;
goto exit;
}
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+ dnode = tipc_port_peernode(port);
+
+next:
+ mtu = port->max_pkt;
+ send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
+ rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
+ if (unlikely(rc < 0))
+ goto exit;
do {
- res = tipc_send(&tsk->port, m->msg_iov, total_len);
- if (likely(res != -ELINKCONG))
- break;
- res = tipc_wait_for_sndpkt(sock, &timeo);
- if (res)
- break;
- } while (1);
+ if (likely(!tipc_sk_conn_cong(tsk))) {
+ rc = tipc_link_xmit(buf, dnode, ref);
+ if (likely(!rc)) {
+ tsk->sent_unacked++;
+ sent += send;
+ if (sent == dsz)
+ break;
+ goto next;
+ }
+ if (rc == -EMSGSIZE) {
+ port->max_pkt = tipc_node_get_mtu(dnode, ref);
+ goto next;
+ }
+ if (rc != -ELINKCONG)
+ break;
+ }
+ rc = tipc_wait_for_sndpkt(sock, &timeo);
+ if (rc)
+ kfree_skb_list(buf);
+ } while (!rc);
exit:
if (iocb)
release_sock(sk);
- return res;
+ return sent ? sent : rc;
}
/**
- * tipc_send_stream - send stream-oriented data
- * @iocb: (unused)
+ * tipc_send_packet - send a connection-oriented message
+ * @iocb: if NULL, indicates that socket lock is already held
* @sock: socket structure
- * @m: data to send
- * @total_len: total length of data to be sent
+ * @m: message to send
+ * @dsz: length of data to be transmitted
*
- * Used for SOCK_STREAM data.
+ * Used for SOCK_SEQPACKET messages.
*
- * Returns the number of bytes sent on success (or partial success),
- * or errno if no data sent
+ * Returns the number of bytes sent on success, or errno otherwise
*/
-static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
- struct msghdr *m, size_t total_len)
+static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
+ struct msghdr *m, size_t dsz)
{
- struct sock *sk = sock->sk;
- struct tipc_sock *tsk = tipc_sk(sk);
- struct msghdr my_msg;
- struct iovec my_iov;
- struct iovec *curr_iov;
- int curr_iovlen;
- char __user *curr_start;
- u32 hdr_size;
- int curr_left;
- int bytes_to_send;
- int bytes_sent;
- int res;
-
- lock_sock(sk);
-
- /* Handle special cases where there is no connection */
- if (unlikely(sock->state != SS_CONNECTED)) {
- if (sock->state == SS_UNCONNECTED)
- res = tipc_send_packet(NULL, sock, m, total_len);
- else
- res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
- goto exit;
- }
-
- if (unlikely(m->msg_name)) {
- res = -EISCONN;
- goto exit;
- }
-
- if (total_len > (unsigned int)INT_MAX) {
- res = -EMSGSIZE;
- goto exit;
- }
-
- /*
- * Send each iovec entry using one or more messages
- *
- * Note: This algorithm is good for the most likely case
- * (i.e. one large iovec entry), but could be improved to pass sets
- * of small iovec entries into send_packet().
- */
- curr_iov = m->msg_iov;
- curr_iovlen = m->msg_iovlen;
- my_msg.msg_iov = &my_iov;
- my_msg.msg_iovlen = 1;
- my_msg.msg_flags = m->msg_flags;
- my_msg.msg_name = NULL;
- bytes_sent = 0;
-
- hdr_size = msg_hdr_sz(&tsk->port.phdr);
-
- while (curr_iovlen--) {
- curr_start = curr_iov->iov_base;
- curr_left = curr_iov->iov_len;
-
- while (curr_left) {
- bytes_to_send = tsk->port.max_pkt - hdr_size;
- if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
- bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
- if (curr_left < bytes_to_send)
- bytes_to_send = curr_left;
- my_iov.iov_base = curr_start;
- my_iov.iov_len = bytes_to_send;
- res = tipc_send_packet(NULL, sock, &my_msg,
- bytes_to_send);
- if (res < 0) {
- if (bytes_sent)
- res = bytes_sent;
- goto exit;
- }
- curr_left -= bytes_to_send;
- curr_start += bytes_to_send;
- bytes_sent += bytes_to_send;
- }
+ if (dsz > TIPC_MAX_USER_MSG_SIZE)
+ return -EMSGSIZE;
- curr_iov++;
- }
- res = bytes_sent;
-exit:
- release_sock(sk);
- return res;
+ return tipc_send_stream(iocb, sock, m, dsz);
}
/**
@@ -983,10 +1103,11 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
-static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)
+static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
{
struct sock *sk = sock->sk;
DEFINE_WAIT(wait);
+ long timeo = *timeop;
int err;
for (;;) {
@@ -1011,6 +1132,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)
break;
}
finish_wait(sk_sleep(sk), &wait);
+ *timeop = timeo;
return err;
}
@@ -1054,7 +1176,7 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
restart:
/* Look for a message in receive queue; wait if necessary */
- res = tipc_wait_for_rcvmsg(sock, timeo);
+ res = tipc_wait_for_rcvmsg(sock, &timeo);
if (res)
goto exit;
@@ -1100,8 +1222,10 @@ restart:
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) &&
- (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
exit:
@@ -1152,7 +1276,7 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
restart:
/* Look for a message in receive queue; wait if necessary */
- res = tipc_wait_for_rcvmsg(sock, timeo);
+ res = tipc_wait_for_rcvmsg(sock, &timeo);
if (res)
goto exit;
@@ -1209,8 +1333,10 @@ restart:
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
@@ -1265,17 +1391,16 @@ static void tipc_data_ready(struct sock *sk)
* @tsk: TIPC socket
* @msg: message
*
- * Returns TIPC error status code and socket error status code
- * once it encounters some errors
+ * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise
*/
-static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
+static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
{
struct sock *sk = &tsk->sk;
struct tipc_port *port = &tsk->port;
struct socket *sock = sk->sk_socket;
struct tipc_msg *msg = buf_msg(*buf);
- u32 retval = TIPC_ERR_NO_PORT;
+ int retval = -TIPC_ERR_NO_PORT;
int res;
if (msg_mcast(msg))
@@ -1378,32 +1503,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
*
* Called with socket lock already taken; port lock may also be taken.
*
- * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
+ * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
+ * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
*/
-static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
+static int filter_rcv(struct sock *sk, struct sk_buff *buf)
{
struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *msg = buf_msg(buf);
unsigned int limit = rcvbuf_limit(sk, buf);
- u32 res = TIPC_OK;
+ u32 onode;
+ int rc = TIPC_OK;
+
+ if (unlikely(msg_user(msg) == CONN_MANAGER))
+ return tipc_sk_proto_rcv(tsk, &onode, buf);
/* Reject message if it is wrong sort of message for socket */
if (msg_type(msg) > TIPC_DIRECT_MSG)
- return TIPC_ERR_NO_PORT;
+ return -TIPC_ERR_NO_PORT;
if (sock->state == SS_READY) {
if (msg_connected(msg))
- return TIPC_ERR_NO_PORT;
+ return -TIPC_ERR_NO_PORT;
} else {
- res = filter_connect(tsk, &buf);
- if (res != TIPC_OK || buf == NULL)
- return res;
+ rc = filter_connect(tsk, &buf);
+ if (rc != TIPC_OK || buf == NULL)
+ return rc;
}
/* Reject message if there isn't room to queue it */
if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
- return TIPC_ERR_OVERLOAD;
+ return -TIPC_ERR_OVERLOAD;
/* Enqueue message */
TIPC_SKB_CB(buf)->handle = NULL;
@@ -1415,7 +1545,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
}
/**
- * backlog_rcv - handle incoming message from backlog queue
+ * tipc_backlog_rcv - handle incoming message from backlog queue
* @sk: socket
* @buf: message
*
@@ -1423,47 +1553,78 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
*
* Returns 0
*/
-static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
+static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
{
- u32 res;
+ int rc;
+ u32 onode;
+ struct tipc_sock *tsk = tipc_sk(sk);
+ uint truesize = buf->truesize;
+
+ rc = filter_rcv(sk, buf);
+
+ if (likely(!rc)) {
+ if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
+ atomic_add(truesize, &tsk->dupl_rcvcnt);
+ return 0;
+ }
+
+ if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
+ return 0;
+
+ tipc_link_xmit(buf, onode, 0);
- res = filter_rcv(sk, buf);
- if (res)
- tipc_reject_msg(buf, res);
return 0;
}
/**
* tipc_sk_rcv - handle incoming message
- * @sk: socket receiving message
- * @buf: message
- *
- * Called with port lock already taken.
- *
- * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
+ * @buf: buffer containing arriving message
+ * Consumes buffer
+ * Returns 0 if success, or errno: -EHOSTUNREACH
*/
-u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf)
+int tipc_sk_rcv(struct sk_buff *buf)
{
- u32 res;
+ struct tipc_sock *tsk;
+ struct tipc_port *port;
+ struct sock *sk;
+ u32 dport = msg_destport(buf_msg(buf));
+ int rc = TIPC_OK;
+ uint limit;
+ u32 dnode;
+
+ /* Validate destination and message */
+ port = tipc_port_lock(dport);
+ if (unlikely(!port)) {
+ rc = tipc_msg_eval(buf, &dnode);
+ goto exit;
+ }
- /*
- * Process message if socket is unlocked; otherwise add to backlog queue
- *
- * This code is based on sk_receive_skb(), but must be distinct from it
- * since a TIPC-specific filter/reject mechanism is utilized
- */
+ tsk = tipc_port_to_sock(port);
+ sk = &tsk->sk;
+
+ /* Queue message */
bh_lock_sock(sk);
+
if (!sock_owned_by_user(sk)) {
- res = filter_rcv(sk, buf);
+ rc = filter_rcv(sk, buf);
} else {
- if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
- res = TIPC_ERR_OVERLOAD;
- else
- res = TIPC_OK;
+ if (sk->sk_backlog.len == 0)
+ atomic_set(&tsk->dupl_rcvcnt, 0);
+ limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
+ if (sk_add_backlog(sk, buf, limit))
+ rc = -TIPC_ERR_OVERLOAD;
}
bh_unlock_sock(sk);
+ tipc_port_unlock(port);
- return res;
+ if (likely(!rc))
+ return 0;
+exit:
+ if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
+ return -EHOSTUNREACH;
+
+ tipc_link_xmit(buf, dnode, 0);
+ return (rc < 0) ? -EHOSTUNREACH : 0;
}
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -1727,6 +1888,7 @@ static int tipc_shutdown(struct socket *sock, int how)
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_port *port = &tsk->port;
struct sk_buff *buf;
+ u32 peer;
int res;
if (how != SHUT_RDWR)
@@ -1747,7 +1909,8 @@ restart:
goto restart;
}
tipc_port_disconnect(port->ref);
- tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
+ if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
+ tipc_link_xmit(buf, peer, 0);
} else {
tipc_port_shutdown(port->ref);
}
@@ -1905,6 +2068,27 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
return put_user(sizeof(value), ol);
}
+static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
+{
+ struct tipc_sioc_ln_req lnr;
+ void __user *argp = (void __user *)arg;
+
+ switch (cmd) {
+ case SIOCGETLINKNAME:
+ if (copy_from_user(&lnr, argp, sizeof(lnr)))
+ return -EFAULT;
+ if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
+ lnr.linkname, TIPC_MAX_LINK_NAME)) {
+ if (copy_to_user(argp, &lnr, sizeof(lnr)))
+ return -EFAULT;
+ return 0;
+ }
+ return -EADDRNOTAVAIL;
+ default:
+ return -ENOIOCTLCMD;
+ }
+}
+
/* Protocol switches for the various types of TIPC sockets */
static const struct proto_ops msg_ops = {
@@ -1917,7 +2101,7 @@ static const struct proto_ops msg_ops = {
.accept = sock_no_accept,
.getname = tipc_getname,
.poll = tipc_poll,
- .ioctl = sock_no_ioctl,
+ .ioctl = tipc_ioctl,
.listen = sock_no_listen,
.shutdown = tipc_shutdown,
.setsockopt = tipc_setsockopt,
@@ -1938,7 +2122,7 @@ static const struct proto_ops packet_ops = {
.accept = tipc_accept,
.getname = tipc_getname,
.poll = tipc_poll,
- .ioctl = sock_no_ioctl,
+ .ioctl = tipc_ioctl,
.listen = tipc_listen,
.shutdown = tipc_shutdown,
.setsockopt = tipc_setsockopt,
@@ -1959,7 +2143,7 @@ static const struct proto_ops stream_ops = {
.accept = tipc_accept,
.getname = tipc_getname,
.poll = tipc_poll,
- .ioctl = sock_no_ioctl,
+ .ioctl = tipc_ioctl,
.listen = tipc_listen,
.shutdown = tipc_shutdown,
.setsockopt = tipc_setsockopt,
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 74e5c7f195a..43b75b3cece 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -38,18 +38,29 @@
#include "port.h"
#include <net/sock.h>
+#define TIPC_CONN_OK 0
+#define TIPC_CONN_PROBING 1
+
/**
* struct tipc_sock - TIPC socket structure
* @sk: socket - interacts with 'port' and with user via the socket API
* @port: port - interacts with 'sk' and with the rest of the TIPC stack
* @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request
+ * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
+ * @link_cong: non-zero if owner must sleep because of link congestion
+ * @sent_unacked: # messages sent by socket, and not yet acked by peer
+ * @rcv_unacked: # messages read by user, but not yet acked back to peer
*/
struct tipc_sock {
struct sock sk;
struct tipc_port port;
unsigned int conn_timeout;
+ atomic_t dupl_rcvcnt;
+ int link_cong;
+ uint sent_unacked;
+ uint rcv_unacked;
};
static inline struct tipc_sock *tipc_sk(const struct sock *sk)
@@ -67,6 +78,13 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
tsk->sk.sk_write_space(&tsk->sk);
}
-u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf);
+static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
+{
+ return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+}
+
+int tipc_sk_rcv(struct sk_buff *buf);
+
+void tipc_sk_mcast_rcv(struct sk_buff *buf);
#endif