diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/Makefile | 2 | ||||
-rw-r--r-- | net/tipc/bcast.c | 194 | ||||
-rw-r--r-- | net/tipc/bcast.h | 9 | ||||
-rw-r--r-- | net/tipc/bearer.c | 153 | ||||
-rw-r--r-- | net/tipc/bearer.h | 47 | ||||
-rw-r--r-- | net/tipc/config.c | 12 | ||||
-rw-r--r-- | net/tipc/core.c | 14 | ||||
-rw-r--r-- | net/tipc/core.h | 10 | ||||
-rw-r--r-- | net/tipc/discover.c | 281 | ||||
-rw-r--r-- | net/tipc/discover.h | 1 | ||||
-rw-r--r-- | net/tipc/eth_media.c | 51 | ||||
-rw-r--r-- | net/tipc/handler.c | 134 | ||||
-rw-r--r-- | net/tipc/ib_media.c | 34 | ||||
-rw-r--r-- | net/tipc/link.c | 216 | ||||
-rw-r--r-- | net/tipc/link.h | 21 | ||||
-rw-r--r-- | net/tipc/msg.c | 55 | ||||
-rw-r--r-- | net/tipc/msg.h | 5 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 78 | ||||
-rw-r--r-- | net/tipc/name_distr.h | 35 | ||||
-rw-r--r-- | net/tipc/name_table.c | 14 | ||||
-rw-r--r-- | net/tipc/net.c | 71 | ||||
-rw-r--r-- | net/tipc/net.h | 4 | ||||
-rw-r--r-- | net/tipc/node.c | 110 | ||||
-rw-r--r-- | net/tipc/node.h | 88 | ||||
-rw-r--r-- | net/tipc/node_subscr.c | 9 | ||||
-rw-r--r-- | net/tipc/node_subscr.h | 2 | ||||
-rw-r--r-- | net/tipc/port.c | 39 | ||||
-rw-r--r-- | net/tipc/port.h | 10 | ||||
-rw-r--r-- | net/tipc/socket.c | 121 | ||||
-rw-r--r-- | net/tipc/socket.h | 4 |
30 files changed, 908 insertions, 916 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile index b282f7130d2..a080c66d819 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -5,7 +5,7 @@ obj-$(CONFIG_TIPC) := tipc.o tipc-y += addr.o bcast.o bearer.o config.o \ - core.o handler.o link.o discover.o msg.o \ + core.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ netlink.o node.o node_subscr.o port.o ref.o \ socket.o log.o eth_media.o server.o diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 95ab5ef9292..26631679a1f 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -71,7 +71,7 @@ struct tipc_bcbearer_pair { * Note: The fields labelled "temporary" are incorporated into the bearer * to avoid consuming potentially limited stack space through the use of * large local variables within multicast routines. Concurrent access is - * prevented through use of the spinlock "bc_lock". + * prevented through use of the spinlock "bclink_lock". */ struct tipc_bcbearer { struct tipc_bearer bearer; @@ -84,34 +84,64 @@ struct tipc_bcbearer { /** * struct tipc_bclink - link used for broadcast messages + * @lock: spinlock governing access to structure * @link: (non-standard) broadcast link structure * @node: (non-standard) node structure representing b'cast link's peer node + * @flags: represent bclink states * @bcast_nodes: map of broadcast-capable nodes * @retransmit_to: node that most recently requested a retransmit * * Handles sequence numbering, fragmentation, bundling, etc. */ struct tipc_bclink { + spinlock_t lock; struct tipc_link link; struct tipc_node node; + unsigned int flags; struct tipc_node_map bcast_nodes; struct tipc_node *retransmit_to; }; -static struct tipc_bcbearer bcast_bearer; -static struct tipc_bclink bcast_link; - -static struct tipc_bcbearer *bcbearer = &bcast_bearer; -static struct tipc_bclink *bclink = &bcast_link; -static struct tipc_link *bcl = &bcast_link.link; - -static DEFINE_SPINLOCK(bc_lock); +static struct tipc_bcbearer *bcbearer; +static struct tipc_bclink *bclink; +static struct tipc_link *bcl; const char tipc_bclink_name[] = "broadcast-link"; static void tipc_nmap_diff(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b, struct tipc_node_map *nm_diff); +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); + +static void tipc_bclink_lock(void) +{ + spin_lock_bh(&bclink->lock); +} + +static void tipc_bclink_unlock(void) +{ + struct tipc_node *node = NULL; + + if (likely(!bclink->flags)) { + spin_unlock_bh(&bclink->lock); + return; + } + + if (bclink->flags & TIPC_BCLINK_RESET) { + bclink->flags &= ~TIPC_BCLINK_RESET; + node = tipc_bclink_retransmit_to(); + } + spin_unlock_bh(&bclink->lock); + + if (node) + tipc_link_reset_all(node); +} + +void tipc_bclink_set_flags(unsigned int flags) +{ + bclink->flags |= flags; +} static u32 bcbuf_acks(struct sk_buff *buf) { @@ -130,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf) void tipc_bclink_add_node(u32 addr) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_nmap_add(&bclink->bcast_nodes, addr); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } void tipc_bclink_remove_node(u32 addr) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_nmap_remove(&bclink->bcast_nodes, addr); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } static void bclink_set_last_sent(void) @@ -165,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) /** * tipc_bclink_retransmit_to - get most recent node to request retransmission * - * Called with bc_lock locked + * Called with bclink_lock locked */ struct tipc_node *tipc_bclink_retransmit_to(void) { @@ -177,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void) * @after: sequence number of last packet to *not* retransmit * @to: sequence number of last packet to retransmit * - * Called with bc_lock locked + * Called with bclink_lock locked */ static void bclink_retransmit_pkt(u32 after, u32 to) { @@ -194,7 +224,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to) * @n_ptr: node that sent acknowledgement info * @acked: broadcast sequence # that has been acknowledged * - * Node is locked, bc_lock unlocked. + * Node is locked, bclink_lock unlocked. */ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) { @@ -202,8 +232,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) struct sk_buff *next; unsigned int released = 0; - spin_lock_bh(&bc_lock); - + tipc_bclink_lock(); /* Bail out if tx queue is empty (no clean up is required) */ crs = bcl->first_out; if (!crs) @@ -267,13 +296,13 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) if (unlikely(released && !list_empty(&bcl->waiting_ports))) tipc_link_wakeup_ports(bcl, 0); exit: - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } /** * tipc_bclink_update_link_state - update broadcast link state * - * tipc_net_lock and node lock set + * RCU and node lock set */ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) { @@ -320,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) ? buf_seqno(n_ptr->bclink.deferred_head) - 1 : n_ptr->bclink.last_sent); - spin_lock_bh(&bc_lock); - tipc_bearer_send(&bcbearer->bearer, buf, NULL); + tipc_bclink_lock(); + tipc_bearer_send(MAX_BEARERS, buf, NULL); bcl->stats.sent_nacks++; - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); kfree_skb(buf); n_ptr->bclink.oos_state++; @@ -335,8 +364,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) * * Delay any upcoming NACK by this node if another node has already * requested the first message this node is going to ask for. - * - * Only tipc_net_lock set. */ static void bclink_peek_nack(struct tipc_msg *msg) { @@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf) { int res; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (!bclink->bcast_nodes.count) { res = msg_data_sz(buf_msg(buf)); @@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf) bcl->stats.accu_queue_sz += bcl->out_queue_size; } exit: - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return res; } /** * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet * - * Called with both sending node's lock and bc_lock taken. + * Called with both sending node's lock and bclink_lock taken. */ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) { @@ -408,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) /** * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards * - * tipc_net_lock is read_locked, no other locks set + * RCU is locked, no other locks set */ void tipc_bclink_rcv(struct sk_buff *buf) { @@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf) if (msg_destnode(msg) == tipc_own_addr) { tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); tipc_node_unlock(node); - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bcl->stats.recv_nacks++; bclink->retransmit_to = node; bclink_retransmit_pkt(msg_bcgap_after(msg), msg_bcgap_to(msg)); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } else { tipc_node_unlock(node); bclink_peek_nack(msg); @@ -462,51 +489,47 @@ receive: /* Deliver message to destination */ if (likely(msg_isdata(msg))) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); if (likely(msg_mcast(msg))) tipc_port_mcast_rcv(buf, NULL); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); tipc_link_bundle_rcv(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { - int ret; - ret = tipc_link_frag_rcv(&node->bclink.reasm_head, - &node->bclink.reasm_tail, - &buf); - if (ret == LINK_REASM_ERROR) + tipc_buf_append(&node->bclink.reasm_buf, &buf); + if (unlikely(!buf && !node->bclink.reasm_buf)) goto unlock; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_fragments++; - if (ret == LINK_REASM_COMPLETE) { + if (buf) { bcl->stats.recv_fragmented++; - /* Point msg to inner header */ msg = buf_msg(buf); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); goto receive; } - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); } else if (msg_user(msg) == NAME_DISTRIBUTOR) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); tipc_named_rcv(buf); } else { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); kfree_skb(buf); } @@ -552,14 +575,14 @@ receive: } else deferred = 0; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (deferred) bcl->stats.deferred_recv++; else bcl->stats.duplicates++; - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); unlock: tipc_node_unlock(node); @@ -627,13 +650,13 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, if (bp_index == 0) { /* Use original buffer for first bearer */ - tipc_bearer_send(b, buf, &b->bcast_addr); + tipc_bearer_send(b->identity, buf, &b->bcast_addr); } else { /* Avoid concurrent buffer access */ - tbuf = pskb_copy(buf, GFP_ATOMIC); + tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); if (!tbuf) break; - tipc_bearer_send(b, tbuf, &b->bcast_addr); + tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); kfree_skb(tbuf); /* Bearer keeps a clone */ } @@ -655,20 +678,27 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, /** * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer */ -void tipc_bcbearer_sort(void) +void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) { struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; struct tipc_bcbearer_pair *bp_curr; + struct tipc_bearer *b; int b_index; int pri; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); + + if (action) + tipc_nmap_add(nm_ptr, node); + else + tipc_nmap_remove(nm_ptr, node); /* Group bearers by priority (can assume max of two per priority) */ memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); + rcu_read_lock(); for (b_index = 0; b_index < MAX_BEARERS; b_index++) { - struct tipc_bearer *b = bearer_list[b_index]; + b = rcu_dereference_rtnl(bearer_list[b_index]); if (!b || !b->nodes.count) continue; @@ -677,6 +707,7 @@ void tipc_bcbearer_sort(void) else bp_temp[b->priority].secondary = b; } + rcu_read_unlock(); /* Create array of bearer pairs for broadcasting */ bp_curr = bcbearer->bpairs; @@ -702,7 +733,7 @@ void tipc_bcbearer_sort(void) bp_curr++; } - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } @@ -714,7 +745,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) if (!bcl) return 0; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); s = &bcl->stats; @@ -743,7 +774,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return ret; } @@ -752,9 +783,9 @@ int tipc_bclink_reset_stats(void) if (!bcl) return -ENOPROTOOPT; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); memset(&bcl->stats, 0, sizeof(bcl->stats)); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return 0; } @@ -765,46 +796,59 @@ int tipc_bclink_set_queue_limits(u32 limit) if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) return -EINVAL; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_link_set_queue_limits(bcl, limit); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return 0; } -void tipc_bclink_init(void) +int tipc_bclink_init(void) { + bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); + if (!bcbearer) + return -ENOMEM; + + bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); + if (!bclink) { + kfree(bcbearer); + return -ENOMEM; + } + + bcl = &bclink->link; bcbearer->bearer.media = &bcbearer->media; bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-broadcast"); + spin_lock_init(&bclink->lock); INIT_LIST_HEAD(&bcl->waiting_ports); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); bcl->owner = &bclink->node; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); - bcl->b_ptr = &bcbearer->bearer; - bearer_list[BCBEARER] = &bcbearer->bearer; + bcl->bearer_id = MAX_BEARERS; + rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer); bcl->state = WORKING_WORKING; strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); + return 0; } void tipc_bclink_stop(void) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_link_purge_queues(bcl); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); - bearer_list[BCBEARER] = NULL; - memset(bclink, 0, sizeof(*bclink)); - memset(bcbearer, 0, sizeof(*bcbearer)); + RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); + synchronize_net(); + kfree(bcbearer); + kfree(bclink); } - /** * tipc_nmap_add - add a node to a node map */ -void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) { int n = tipc_node(node); int w = n / WSIZE; @@ -819,7 +863,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) /** * tipc_nmap_remove - remove a node from a node map */ -void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) { int n = tipc_node(node); int w = n / WSIZE; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index a80ef54b818..00330c45df3 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -39,6 +39,7 @@ #define MAX_NODES 4096 #define WSIZE 32 +#define TIPC_BCLINK_RESET 1 /** * struct tipc_node_map - set of node identifiers @@ -69,9 +70,6 @@ struct tipc_node; extern const char tipc_bclink_name[]; -void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); -void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); - /** * tipc_nmap_equal - test for equality of node maps */ @@ -84,8 +82,9 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port); void tipc_port_list_free(struct tipc_port_list *pl_ptr); -void tipc_bclink_init(void); +int tipc_bclink_init(void); void tipc_bclink_stop(void); +void tipc_bclink_set_flags(unsigned int flags); void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); struct tipc_node *tipc_bclink_retransmit_to(void); @@ -98,6 +97,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent); int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); -void tipc_bcbearer_sort(void); +void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); #endif diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 3fef7eb776d..264474394f9 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -49,7 +49,7 @@ static struct tipc_media * const media_info_array[] = { NULL }; -struct tipc_bearer *bearer_list[MAX_BEARERS + 1]; +struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1]; static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down); @@ -178,7 +178,7 @@ struct tipc_bearer *tipc_bearer_find(const char *name) u32 i; for (i = 0; i < MAX_BEARERS; i++) { - b_ptr = bearer_list[i]; + b_ptr = rtnl_dereference(bearer_list[i]); if (b_ptr && (!strcmp(b_ptr->name, name))) return b_ptr; } @@ -198,10 +198,9 @@ struct sk_buff *tipc_bearer_get_names(void) if (!buf) return NULL; - read_lock_bh(&tipc_net_lock); for (i = 0; media_info_array[i] != NULL; i++) { for (j = 0; j < MAX_BEARERS; j++) { - b = bearer_list[j]; + b = rtnl_dereference(bearer_list[j]); if (!b) continue; if (b->media == media_info_array[i]) { @@ -211,22 +210,33 @@ struct sk_buff *tipc_bearer_get_names(void) } } } - read_unlock_bh(&tipc_net_lock); return buf; } -void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest) +void tipc_bearer_add_dest(u32 bearer_id, u32 dest) { - tipc_nmap_add(&b_ptr->nodes, dest); - tipc_bcbearer_sort(); - tipc_disc_add_dest(b_ptr->link_req); + struct tipc_bearer *b_ptr; + + rcu_read_lock(); + b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]); + if (b_ptr) { + tipc_bcbearer_sort(&b_ptr->nodes, dest, true); + tipc_disc_add_dest(b_ptr->link_req); + } + rcu_read_unlock(); } -void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest) +void tipc_bearer_remove_dest(u32 bearer_id, u32 dest) { - tipc_nmap_remove(&b_ptr->nodes, dest); - tipc_bcbearer_sort(); - tipc_disc_remove_dest(b_ptr->link_req); + struct tipc_bearer *b_ptr; + + rcu_read_lock(); + b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]); + if (b_ptr) { + tipc_bcbearer_sort(&b_ptr->nodes, dest, false); + tipc_disc_remove_dest(b_ptr->link_req); + } + rcu_read_unlock(); } /** @@ -271,13 +281,11 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority) return -EINVAL; } - write_lock_bh(&tipc_net_lock); - m_ptr = tipc_media_find(b_names.media_name); if (!m_ptr) { pr_warn("Bearer <%s> rejected, media <%s> not registered\n", name, b_names.media_name); - goto exit; + return -EINVAL; } if (priority == TIPC_MEDIA_LINK_PRI) @@ -287,7 +295,7 @@ restart: bearer_id = MAX_BEARERS; with_this_prio = 1; for (i = MAX_BEARERS; i-- != 0; ) { - b_ptr = bearer_list[i]; + b_ptr = rtnl_dereference(bearer_list[i]); if (!b_ptr) { bearer_id = i; continue; @@ -295,14 +303,14 @@ restart: if (!strcmp(name, b_ptr->name)) { pr_warn("Bearer <%s> rejected, already enabled\n", name); - goto exit; + return -EINVAL; } if ((b_ptr->priority == priority) && (++with_this_prio > 2)) { if (priority-- == 0) { pr_warn("Bearer <%s> rejected, duplicate priority\n", name); - goto exit; + return -EINVAL; } pr_warn("Bearer <%s> priority adjustment required %u->%u\n", name, priority + 1, priority); @@ -312,21 +320,20 @@ restart: if (bearer_id >= MAX_BEARERS) { pr_warn("Bearer <%s> rejected, bearer limit reached (%u)\n", name, MAX_BEARERS); - goto exit; + return -EINVAL; } b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC); - if (!b_ptr) { - res = -ENOMEM; - goto exit; - } + if (!b_ptr) + return -ENOMEM; + strcpy(b_ptr->name, name); b_ptr->media = m_ptr; res = m_ptr->enable_media(b_ptr); if (res) { pr_warn("Bearer <%s> rejected, enable failure (%d)\n", name, -res); - goto exit; + return -EINVAL; } b_ptr->identity = bearer_id; @@ -341,16 +348,14 @@ restart: bearer_disable(b_ptr, false); pr_warn("Bearer <%s> rejected, discovery object creation failed\n", name); - goto exit; + return -EINVAL; } - bearer_list[bearer_id] = b_ptr; + rcu_assign_pointer(bearer_list[bearer_id], b_ptr); pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n", name, tipc_addr_string_fill(addr_string, disc_domain), priority); -exit: - write_unlock_bh(&tipc_net_lock); return res; } @@ -359,19 +364,16 @@ exit: */ static int tipc_reset_bearer(struct tipc_bearer *b_ptr) { - read_lock_bh(&tipc_net_lock); pr_info("Resetting bearer <%s>\n", b_ptr->name); - tipc_disc_delete(b_ptr->link_req); tipc_link_reset_list(b_ptr->identity); - tipc_disc_create(b_ptr, &b_ptr->bcast_addr); - read_unlock_bh(&tipc_net_lock); + tipc_disc_reset(b_ptr); return 0; } /** * bearer_disable * - * Note: This routine assumes caller holds tipc_net_lock. + * Note: This routine assumes caller holds RTNL lock. */ static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down) { @@ -385,12 +387,12 @@ static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down) tipc_disc_delete(b_ptr->link_req); for (i = 0; i < MAX_BEARERS; i++) { - if (b_ptr == bearer_list[i]) { - bearer_list[i] = NULL; + if (b_ptr == rtnl_dereference(bearer_list[i])) { + RCU_INIT_POINTER(bearer_list[i], NULL); break; } } - kfree(b_ptr); + kfree_rcu(b_ptr, rcu); } int tipc_disable_bearer(const char *name) @@ -398,7 +400,6 @@ int tipc_disable_bearer(const char *name) struct tipc_bearer *b_ptr; int res; - write_lock_bh(&tipc_net_lock); b_ptr = tipc_bearer_find(name); if (b_ptr == NULL) { pr_warn("Attempt to disable unknown bearer <%s>\n", name); @@ -407,32 +408,9 @@ int tipc_disable_bearer(const char *name) bearer_disable(b_ptr, false); res = 0; } - write_unlock_bh(&tipc_net_lock); return res; } - -/* tipc_l2_media_addr_set - initialize Ethernet media address structure - * - * Media-dependent "value" field stores MAC address in first 6 bytes - * and zeroes out the remaining bytes. - */ -void tipc_l2_media_addr_set(const struct tipc_bearer *b, - struct tipc_media_addr *a, char *mac) -{ - int len = b->media->hwaddr_len; - - if (unlikely(sizeof(a->value) < len)) { - WARN_ONCE(1, "Media length invalid\n"); - return; - } - - memcpy(a->value, mac, len); - memset(a->value + len, 0, sizeof(a->value) - len); - a->media_id = b->media->type_id; - a->broadcast = !memcmp(mac, b->bcast_addr.value, len); -} - int tipc_enable_l2_media(struct tipc_bearer *b) { struct net_device *dev; @@ -443,33 +421,37 @@ int tipc_enable_l2_media(struct tipc_bearer *b) if (!dev) return -ENODEV; - /* Associate TIPC bearer with Ethernet bearer */ - b->media_ptr = dev; - memset(b->bcast_addr.value, 0, sizeof(b->bcast_addr.value)); + /* Associate TIPC bearer with L2 bearer */ + rcu_assign_pointer(b->media_ptr, dev); + memset(&b->bcast_addr, 0, sizeof(b->bcast_addr)); memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); b->bcast_addr.media_id = b->media->type_id; b->bcast_addr.broadcast = 1; b->mtu = dev->mtu; - tipc_l2_media_addr_set(b, &b->addr, (char *)dev->dev_addr); + b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr); rcu_assign_pointer(dev->tipc_ptr, b); return 0; } -/* tipc_disable_l2_media - detach TIPC bearer from an Ethernet interface +/* tipc_disable_l2_media - detach TIPC bearer from an L2 interface * - * Mark Ethernet bearer as inactive so that incoming buffers are thrown away, + * Mark L2 bearer as inactive so that incoming buffers are thrown away, * then get worker thread to complete bearer cleanup. (Can't do cleanup * here because cleanup code needs to sleep and caller holds spinlocks.) */ void tipc_disable_l2_media(struct tipc_bearer *b) { - struct net_device *dev = (struct net_device *)b->media_ptr; + struct net_device *dev; + + dev = (struct net_device *)rtnl_dereference(b->media_ptr); + RCU_INIT_POINTER(b->media_ptr, NULL); RCU_INIT_POINTER(dev->tipc_ptr, NULL); + synchronize_net(); dev_put(dev); } /** - * tipc_l2_send_msg - send a TIPC packet out over an Ethernet interface + * tipc_l2_send_msg - send a TIPC packet out over an L2 interface * @buf: the packet to be sent * @b_ptr: the bearer through which the packet is to be sent * @dest: peer destination address @@ -478,8 +460,12 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, struct tipc_media_addr *dest) { struct sk_buff *clone; + struct net_device *dev; int delta; - struct net_device *dev = (struct net_device *)b->media_ptr; + + dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr); + if (!dev) + return 0; clone = skb_clone(buf, GFP_ATOMIC); if (!clone) @@ -507,10 +493,16 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, * The media send routine must not alter the buffer being passed in * as it may be needed for later retransmission! */ -void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, +void tipc_bearer_send(u32 bearer_id, struct sk_buff *buf, struct tipc_media_addr *dest) { - b->media->send_msg(buf, b, dest); + struct tipc_bearer *b_ptr; + + rcu_read_lock(); + b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]); + if (likely(b_ptr)) + b_ptr->media->send_msg(buf, b_ptr, dest); + rcu_read_unlock(); } /** @@ -535,7 +527,7 @@ static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev, } rcu_read_lock(); - b_ptr = rcu_dereference(dev->tipc_ptr); + b_ptr = rcu_dereference_rtnl(dev->tipc_ptr); if (likely(b_ptr)) { if (likely(buf->pkt_type <= PACKET_BROADCAST)) { buf->next = NULL; @@ -568,12 +560,9 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt, if (!net_eq(dev_net(dev), &init_net)) return NOTIFY_DONE; - rcu_read_lock(); - b_ptr = rcu_dereference(dev->tipc_ptr); - if (!b_ptr) { - rcu_read_unlock(); + b_ptr = rtnl_dereference(dev->tipc_ptr); + if (!b_ptr) return NOTIFY_DONE; - } b_ptr->mtu = dev->mtu; @@ -586,17 +575,15 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt, tipc_reset_bearer(b_ptr); break; case NETDEV_CHANGEADDR: - tipc_l2_media_addr_set(b_ptr, &b_ptr->addr, + b_ptr->media->raw2addr(b_ptr, &b_ptr->addr, (char *)dev->dev_addr); tipc_reset_bearer(b_ptr); break; case NETDEV_UNREGISTER: case NETDEV_CHANGENAME: - tipc_disable_bearer(b_ptr->name); + bearer_disable(b_ptr, false); break; } - rcu_read_unlock(); - return NOTIFY_OK; } @@ -633,7 +620,7 @@ void tipc_bearer_stop(void) u32 i; for (i = 0; i < MAX_BEARERS; i++) { - b_ptr = bearer_list[i]; + b_ptr = rtnl_dereference(bearer_list[i]); if (b_ptr) { bearer_disable(b_ptr, true); bearer_list[i] = NULL; diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index ba48145e871..78fccc49de2 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -42,14 +42,12 @@ #define MAX_BEARERS 2 #define MAX_MEDIA 2 -/* - * Identifiers associated with TIPC message header media address info - * - * - address info field is 20 bytes long - * - media type identifier located at offset 3 - * - remaining bytes vary according to media type +/* Identifiers associated with TIPC message header media address info + * - address info field is 32 bytes long + * - the field's actual content and length is defined per media + * - remaining unused bytes in the field are set to zero */ -#define TIPC_MEDIA_ADDR_SIZE 20 +#define TIPC_MEDIA_ADDR_SIZE 32 #define TIPC_MEDIA_TYPE_OFFSET 3 /* @@ -77,9 +75,10 @@ struct tipc_bearer; * @send_msg: routine which handles buffer transmission * @enable_media: routine which enables a media * @disable_media: routine which disables a media - * @addr2str: routine which converts media address to string - * @addr2msg: routine which converts media address to protocol message area - * @msg2addr: routine which converts media address from protocol message area + * @addr2str: convert media address format to string + * @addr2msg: convert from media addr format to discovery msg addr format + * @msg2addr: convert from discovery msg addr format to media addr format + * @raw2addr: convert from raw addr format to media addr format * @priority: default link (and bearer) priority * @tolerance: default time (in ms) before declaring link failure * @window: default window (in packets) before declaring link congestion @@ -93,10 +92,16 @@ struct tipc_media { struct tipc_media_addr *dest); int (*enable_media)(struct tipc_bearer *b_ptr); void (*disable_media)(struct tipc_bearer *b_ptr); - int (*addr2str)(struct tipc_media_addr *a, char *str_buf, int str_size); - int (*addr2msg)(struct tipc_media_addr *a, char *msg_area); - int (*msg2addr)(const struct tipc_bearer *b_ptr, - struct tipc_media_addr *a, char *msg_area); + int (*addr2str)(struct tipc_media_addr *addr, + char *strbuf, + int bufsz); + int (*addr2msg)(char *msg, struct tipc_media_addr *addr); + int (*msg2addr)(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg); + int (*raw2addr)(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *raw); u32 priority; u32 tolerance; u32 window; @@ -113,6 +118,7 @@ struct tipc_media { * @name: bearer name (format = media:interface) * @media: ptr to media structure associated with bearer * @bcast_addr: media address used in broadcasting + * @rcu: rcu struct for tipc_bearer * @priority: default link priority for bearer * @window: default window size for bearer * @tolerance: default link tolerance for bearer @@ -127,12 +133,13 @@ struct tipc_media { * care of initializing all other fields. */ struct tipc_bearer { - void *media_ptr; /* initalized by media */ + void __rcu *media_ptr; /* initalized by media */ u32 mtu; /* initalized by media */ struct tipc_media_addr addr; /* initalized by media */ char name[TIPC_MAX_BEARER_NAME]; struct tipc_media *media; struct tipc_media_addr bcast_addr; + struct rcu_head rcu; u32 priority; u32 window; u32 tolerance; @@ -150,7 +157,7 @@ struct tipc_bearer_names { struct tipc_link; -extern struct tipc_bearer *bearer_list[]; +extern struct tipc_bearer __rcu *bearer_list[]; /* * TIPC routines available to supported media types @@ -173,22 +180,20 @@ int tipc_media_set_priority(const char *name, u32 new_value); int tipc_media_set_window(const char *name, u32 new_value); void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); struct sk_buff *tipc_media_get_names(void); -void tipc_l2_media_addr_set(const struct tipc_bearer *b, - struct tipc_media_addr *a, char *mac); int tipc_enable_l2_media(struct tipc_bearer *b); void tipc_disable_l2_media(struct tipc_bearer *b); int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, struct tipc_media_addr *dest); struct sk_buff *tipc_bearer_get_names(void); -void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest); -void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); +void tipc_bearer_add_dest(u32 bearer_id, u32 dest); +void tipc_bearer_remove_dest(u32 bearer_id, u32 dest); struct tipc_bearer *tipc_bearer_find(const char *name); struct tipc_media *tipc_media_find(const char *name); int tipc_bearer_setup(void); void tipc_bearer_cleanup(void); void tipc_bearer_stop(void); -void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, +void tipc_bearer_send(u32 bearer_id, struct sk_buff *buf, struct tipc_media_addr *dest); #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/config.c b/net/tipc/config.c index 4b981c05382..2b42403ad33 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -42,8 +42,6 @@ #define REPLY_TRUNCATED "<truncated>\n" -static DEFINE_MUTEX(config_mutex); - static const void *req_tlv_area; /* request message TLV area */ static int req_tlv_space; /* request message TLV area size */ static int rep_headroom; /* reply message headroom to use */ @@ -179,8 +177,10 @@ static struct sk_buff *cfg_set_own_addr(void) if (tipc_own_addr) return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (cannot change node address once assigned)"); - tipc_net_start(addr); - return tipc_cfg_reply_none(); + if (!tipc_net_start(addr)) + return tipc_cfg_reply_none(); + + return tipc_cfg_reply_error_string("cannot change to network mode"); } static struct sk_buff *cfg_set_max_ports(void) @@ -223,7 +223,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area { struct sk_buff *rep_tlv_buf; - mutex_lock(&config_mutex); + rtnl_lock(); /* Save request and reply details in a well-known location */ req_tlv_area = request_area; @@ -337,6 +337,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area /* Return reply buffer */ exit: - mutex_unlock(&config_mutex); + rtnl_unlock(); return rep_tlv_buf; } diff --git a/net/tipc/core.c b/net/tipc/core.c index 50d57429ebc..676d18015dd 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -80,7 +80,6 @@ struct sk_buff *tipc_buf_acquire(u32 size) */ static void tipc_core_stop(void) { - tipc_handler_stop(); tipc_net_stop(); tipc_bearer_cleanup(); tipc_netlink_stop(); @@ -100,10 +99,6 @@ static int tipc_core_start(void) get_random_bytes(&tipc_random, sizeof(tipc_random)); - err = tipc_handler_start(); - if (err) - goto out_handler; - err = tipc_ref_table_init(tipc_max_ports, tipc_random); if (err) goto out_reftbl; @@ -146,8 +141,6 @@ out_netlink: out_nametbl: tipc_ref_table_stop(); out_reftbl: - tipc_handler_stop(); -out_handler: return err; } @@ -161,10 +154,11 @@ static int __init tipc_init(void) tipc_max_ports = CONFIG_TIPC_PORTS; tipc_net_id = 4711; - sysctl_tipc_rmem[0] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE; - sysctl_tipc_rmem[1] = CONN_OVERLOAD_LIMIT >> 4 << + sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << + TIPC_LOW_IMPORTANCE; + sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << TIPC_CRITICAL_IMPORTANCE; - sysctl_tipc_rmem[2] = CONN_OVERLOAD_LIMIT; + sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT; res = tipc_core_start(); if (res) diff --git a/net/tipc/core.h b/net/tipc/core.h index 8985bbcb942..bb26ed1ee96 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -56,7 +56,8 @@ #include <linux/list.h> #include <linux/slab.h> #include <linux/vmalloc.h> - +#include <linux/rtnetlink.h> +#include <linux/etherdevice.h> #define TIPC_MOD_VER "2.0.0" @@ -89,8 +90,6 @@ extern int tipc_random __read_mostly; /* * Routines available to privileged subsystems */ -int tipc_handler_start(void); -void tipc_handler_stop(void); int tipc_netlink_start(void); void tipc_netlink_stop(void); int tipc_socket_init(void); @@ -109,12 +108,10 @@ void tipc_unregister_sysctl(void); #endif /* - * TIPC timer and signal code + * TIPC timer code */ typedef void (*Handler) (unsigned long); -u32 tipc_k_signal(Handler routine, unsigned long argument); - /** * k_init_timer - initialize a timer * @timer: pointer to timer structure @@ -191,6 +188,7 @@ static inline void k_term_timer(struct timer_list *timer) struct tipc_skb_cb { void *handle; bool deferred; + struct sk_buff *tail; }; #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 542fe3413dc..aa722a42ef8 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -1,7 +1,7 @@ /* * net/tipc/discover.c * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2014, Ericsson AB * Copyright (c) 2005-2006, 2010-2011, Wind River Systems * All rights reserved. * @@ -46,8 +46,9 @@ /** * struct tipc_link_req - information about an ongoing link setup request - * @bearer: bearer issuing requests + * @bearer_id: identity of bearer issuing requests * @dest: destination address for request messages + * @domain: network domain to which links can be established * @num_nodes: number of nodes currently discovered (i.e. with an active link) * @lock: spinlock for controlling access to requests * @buf: request message to be (repeatedly) sent @@ -55,8 +56,9 @@ * @timer_intv: current interval between requests (in ms) */ struct tipc_link_req { - struct tipc_bearer *bearer; + u32 bearer_id; struct tipc_media_addr dest; + u32 domain; int num_nodes; spinlock_t lock; struct sk_buff *buf; @@ -69,22 +71,19 @@ struct tipc_link_req { * @type: message type (request or response) * @b_ptr: ptr to bearer issuing message */ -static struct sk_buff *tipc_disc_init_msg(u32 type, struct tipc_bearer *b_ptr) +static void tipc_disc_init_msg(struct sk_buff *buf, u32 type, + struct tipc_bearer *b_ptr) { - struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE); struct tipc_msg *msg; u32 dest_domain = b_ptr->domain; - if (buf) { - msg = buf_msg(buf); - tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain); - msg_set_non_seq(msg, 1); - msg_set_node_sig(msg, tipc_random); - msg_set_dest_domain(msg, dest_domain); - msg_set_bc_netid(msg, tipc_net_id); - b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg)); - } - return buf; + msg = buf_msg(buf); + tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain); + msg_set_non_seq(msg, 1); + msg_set_node_sig(msg, tipc_random); + msg_set_dest_domain(msg, dest_domain); + msg_set_bc_netid(msg, tipc_net_id); + b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); } /** @@ -107,146 +106,150 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr, } /** - * tipc_disc_rcv - handle incoming link setup message (request or response) + * tipc_disc_rcv - handle incoming discovery message (request or response) * @buf: buffer containing message - * @b_ptr: bearer that message arrived on + * @bearer: bearer that message arrived on */ -void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr) +void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *bearer) { - struct tipc_node *n_ptr; + struct tipc_node *node; struct tipc_link *link; - struct tipc_media_addr media_addr; + struct tipc_media_addr maddr; struct sk_buff *rbuf; struct tipc_msg *msg = buf_msg(buf); - u32 dest = msg_dest_domain(msg); - u32 orig = msg_prevnode(msg); + u32 ddom = msg_dest_domain(msg); + u32 onode = msg_prevnode(msg); u32 net_id = msg_bc_netid(msg); - u32 type = msg_type(msg); + u32 mtyp = msg_type(msg); u32 signature = msg_node_sig(msg); - int addr_mismatch; - int link_fully_up; - - media_addr.broadcast = 1; - b_ptr->media->msg2addr(b_ptr, &media_addr, msg_media_addr(msg)); + bool addr_match = false; + bool sign_match = false; + bool link_up = false; + bool accept_addr = false; + bool accept_sign = false; + bool respond = false; + + bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg)); kfree_skb(buf); /* Ensure message from node is valid and communication is permitted */ if (net_id != tipc_net_id) return; - if (media_addr.broadcast) + if (maddr.broadcast) return; - if (!tipc_addr_domain_valid(dest)) + if (!tipc_addr_domain_valid(ddom)) return; - if (!tipc_addr_node_valid(orig)) + if (!tipc_addr_node_valid(onode)) return; - if (orig == tipc_own_addr) { - if (memcmp(&media_addr, &b_ptr->addr, sizeof(media_addr))) - disc_dupl_alert(b_ptr, tipc_own_addr, &media_addr); + + if (in_own_node(onode)) { + if (memcmp(&maddr, &bearer->addr, sizeof(maddr))) + disc_dupl_alert(bearer, tipc_own_addr, &maddr); return; } - if (!tipc_in_scope(dest, tipc_own_addr)) + if (!tipc_in_scope(ddom, tipc_own_addr)) return; - if (!tipc_in_scope(b_ptr->domain, orig)) + if (!tipc_in_scope(bearer->domain, onode)) return; - /* Locate structure corresponding to requesting node */ - n_ptr = tipc_node_find(orig); - if (!n_ptr) { - n_ptr = tipc_node_create(orig); - if (!n_ptr) - return; - } - tipc_node_lock(n_ptr); + /* Locate, or if necessary, create, node: */ + node = tipc_node_find(onode); + if (!node) + node = tipc_node_create(onode); + if (!node) + return; - /* Prepare to validate requesting node's signature and media address */ - link = n_ptr->links[b_ptr->identity]; - addr_mismatch = (link != NULL) && - memcmp(&link->media_addr, &media_addr, sizeof(media_addr)); + tipc_node_lock(node); + link = node->links[bearer->identity]; - /* - * Ensure discovery message's signature is correct - * - * If signature is incorrect and there is no working link to the node, - * accept the new signature but invalidate all existing links to the - * node so they won't re-activate without a new discovery message. - * - * If signature is incorrect and the requested link to the node is - * working, accept the new signature. (This is an instance of delayed - * rediscovery, where a link endpoint was able to re-establish contact - * with its peer endpoint on a node that rebooted before receiving a - * discovery message from that node.) - * - * If signature is incorrect and there is a working link to the node - * that is not the requested link, reject the request (must be from - * a duplicate node). - */ - if (signature != n_ptr->signature) { - if (n_ptr->working_links == 0) { - struct tipc_link *curr_link; - int i; - - for (i = 0; i < MAX_BEARERS; i++) { - curr_link = n_ptr->links[i]; - if (curr_link) { - memset(&curr_link->media_addr, 0, - sizeof(media_addr)); - tipc_link_reset(curr_link); - } - } - addr_mismatch = (link != NULL); - } else if (tipc_link_is_up(link) && !addr_mismatch) { - /* delayed rediscovery */ - } else { - disc_dupl_alert(b_ptr, orig, &media_addr); - tipc_node_unlock(n_ptr); - return; - } - n_ptr->signature = signature; + /* Prepare to validate requesting node's signature and media address */ + sign_match = (signature == node->signature); + addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr)); + link_up = link && tipc_link_is_up(link); + + + /* These three flags give us eight permutations: */ + + if (sign_match && addr_match && link_up) { + /* All is fine. Do nothing. */ + } else if (sign_match && addr_match && !link_up) { + /* Respond. The link will come up in due time */ + respond = true; + } else if (sign_match && !addr_match && link_up) { + /* Peer has changed i/f address without rebooting. + * If so, the link will reset soon, and the next + * discovery will be accepted. So we can ignore it. + * It may also be an cloned or malicious peer having + * chosen the same node address and signature as an + * existing one. + * Ignore requests until the link goes down, if ever. + */ + disc_dupl_alert(bearer, onode, &maddr); + } else if (sign_match && !addr_match && !link_up) { + /* Peer link has changed i/f address without rebooting. + * It may also be a cloned or malicious peer; we can't + * distinguish between the two. + * The signature is correct, so we must accept. + */ + accept_addr = true; + respond = true; + } else if (!sign_match && addr_match && link_up) { + /* Peer node rebooted. Two possibilities: + * - Delayed re-discovery; this link endpoint has already + * reset and re-established contact with the peer, before + * receiving a discovery message from that node. + * (The peer happened to receive one from this node first). + * - The peer came back so fast that our side has not + * discovered it yet. Probing from this side will soon + * reset the link, since there can be no working link + * endpoint at the peer end, and the link will re-establish. + * Accept the signature, since it comes from a known peer. + */ + accept_sign = true; + } else if (!sign_match && addr_match && !link_up) { + /* The peer node has rebooted. + * Accept signature, since it is a known peer. + */ + accept_sign = true; + respond = true; + } else if (!sign_match && !addr_match && link_up) { + /* Peer rebooted with new address, or a new/duplicate peer. + * Ignore until the link goes down, if ever. + */ + disc_dupl_alert(bearer, onode, &maddr); + } else if (!sign_match && !addr_match && !link_up) { + /* Peer rebooted with new address, or it is a new peer. + * Accept signature and address. + */ + accept_sign = true; + accept_addr = true; + respond = true; } - /* - * Ensure requesting node's media address is correct - * - * If media address doesn't match and the link is working, reject the - * request (must be from a duplicate node). - * - * If media address doesn't match and the link is not working, accept - * the new media address and reset the link to ensure it starts up - * cleanly. - */ - if (addr_mismatch) { - if (tipc_link_is_up(link)) { - disc_dupl_alert(b_ptr, orig, &media_addr); - tipc_node_unlock(n_ptr); - return; - } else { - memcpy(&link->media_addr, &media_addr, - sizeof(media_addr)); - tipc_link_reset(link); - } - } + if (accept_sign) + node->signature = signature; - /* Create a link endpoint for this bearer, if necessary */ - if (!link) { - link = tipc_link_create(n_ptr, b_ptr, &media_addr); - if (!link) { - tipc_node_unlock(n_ptr); - return; + if (accept_addr) { + if (!link) + link = tipc_link_create(node, bearer, &maddr); + if (link) { + memcpy(&link->media_addr, &maddr, sizeof(maddr)); + tipc_link_reset(link); + } else { + respond = false; } } - /* Accept discovery message & send response, if necessary */ - link_fully_up = link_working_working(link); - - if ((type == DSC_REQ_MSG) && !link_fully_up) { - rbuf = tipc_disc_init_msg(DSC_RESP_MSG, b_ptr); + /* Send response, if necessary */ + if (respond && (mtyp == DSC_REQ_MSG)) { + rbuf = tipc_buf_acquire(INT_H_SIZE); if (rbuf) { - tipc_bearer_send(b_ptr, rbuf, &media_addr); + tipc_disc_init_msg(rbuf, DSC_RESP_MSG, bearer); + tipc_bearer_send(bearer->identity, rbuf, &maddr); kfree_skb(rbuf); } } - - tipc_node_unlock(n_ptr); + tipc_node_unlock(node); } /** @@ -303,7 +306,7 @@ static void disc_timeout(struct tipc_link_req *req) spin_lock_bh(&req->lock); /* Stop searching if only desired node has been found */ - if (tipc_node(req->bearer->domain) && req->num_nodes) { + if (tipc_node(req->domain) && req->num_nodes) { req->timer_intv = TIPC_LINK_REQ_INACTIVE; goto exit; } @@ -315,7 +318,7 @@ static void disc_timeout(struct tipc_link_req *req) * hold at fast polling rate if don't have any associated nodes, * otherwise hold at slow polling rate */ - tipc_bearer_send(req->bearer, req->buf, &req->dest); + tipc_bearer_send(req->bearer_id, req->buf, &req->dest); req->timer_intv *= 2; @@ -347,21 +350,23 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest) if (!req) return -ENOMEM; - req->buf = tipc_disc_init_msg(DSC_REQ_MSG, b_ptr); + req->buf = tipc_buf_acquire(INT_H_SIZE); if (!req->buf) { kfree(req); - return -ENOMSG; + return -ENOMEM; } + tipc_disc_init_msg(req->buf, DSC_REQ_MSG, b_ptr); memcpy(&req->dest, dest, sizeof(*dest)); - req->bearer = b_ptr; + req->bearer_id = b_ptr->identity; + req->domain = b_ptr->domain; req->num_nodes = 0; req->timer_intv = TIPC_LINK_REQ_INIT; spin_lock_init(&req->lock); k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req); k_start_timer(&req->timer, req->timer_intv); b_ptr->link_req = req; - tipc_bearer_send(req->bearer, req->buf, &req->dest); + tipc_bearer_send(req->bearer_id, req->buf, &req->dest); return 0; } @@ -376,3 +381,23 @@ void tipc_disc_delete(struct tipc_link_req *req) kfree_skb(req->buf); kfree(req); } + +/** + * tipc_disc_reset - reset object to send periodic link setup requests + * @b_ptr: ptr to bearer issuing requests + * @dest_domain: network domain to which links can be established + */ +void tipc_disc_reset(struct tipc_bearer *b_ptr) +{ + struct tipc_link_req *req = b_ptr->link_req; + + spin_lock_bh(&req->lock); + tipc_disc_init_msg(req->buf, DSC_REQ_MSG, b_ptr); + req->bearer_id = b_ptr->identity; + req->domain = b_ptr->domain; + req->num_nodes = 0; + req->timer_intv = TIPC_LINK_REQ_INIT; + k_start_timer(&req->timer, req->timer_intv); + tipc_bearer_send(req->bearer_id, req->buf, &req->dest); + spin_unlock_bh(&req->lock); +} diff --git a/net/tipc/discover.h b/net/tipc/discover.h index 07f34729459..515b57392f4 100644 --- a/net/tipc/discover.h +++ b/net/tipc/discover.h @@ -41,6 +41,7 @@ struct tipc_link_req; int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest); void tipc_disc_delete(struct tipc_link_req *req); +void tipc_disc_reset(struct tipc_bearer *b_ptr); void tipc_disc_add_dest(struct tipc_link_req *req); void tipc_disc_remove_dest(struct tipc_link_req *req); void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr); diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index 67cf3f935db..5e1426f1751 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -1,7 +1,7 @@ /* * net/tipc/eth_media.c: Ethernet bearer support for TIPC * - * Copyright (c) 2001-2007, 2013, Ericsson AB + * Copyright (c) 2001-2007, 2013-2014, Ericsson AB * Copyright (c) 2005-2008, 2011-2013, Wind River Systems * All rights reserved. * @@ -37,39 +37,52 @@ #include "core.h" #include "bearer.h" -#define ETH_ADDR_OFFSET 4 /* message header offset of MAC address */ +#define ETH_ADDR_OFFSET 4 /* MAC addr position inside address field */ -/* convert Ethernet address to string */ -static int tipc_eth_addr2str(struct tipc_media_addr *a, char *str_buf, - int str_size) +/* Convert Ethernet address (media address format) to string */ +static int tipc_eth_addr2str(struct tipc_media_addr *addr, + char *strbuf, int bufsz) { - if (str_size < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */ + if (bufsz < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */ return 1; - sprintf(str_buf, "%pM", a->value); + sprintf(strbuf, "%pM", addr->value); return 0; } -/* convert Ethernet address format to message header format */ -static int tipc_eth_addr2msg(struct tipc_media_addr *a, char *msg_area) +/* Convert from media address format to discovery message addr format */ +static int tipc_eth_addr2msg(char *msg, struct tipc_media_addr *addr) { - memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); - msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; - memcpy(msg_area + ETH_ADDR_OFFSET, a->value, ETH_ALEN); + memset(msg, 0, TIPC_MEDIA_ADDR_SIZE); + msg[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; + memcpy(msg + ETH_ADDR_OFFSET, addr->value, ETH_ALEN); return 0; } -/* convert message header address format to Ethernet format */ -static int tipc_eth_msg2addr(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *msg_area) +/* Convert raw mac address format to media addr format */ +static int tipc_eth_raw2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) { - if (msg_area[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_ETH) - return 1; + char bcast_mac[ETH_ALEN] = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; - tipc_l2_media_addr_set(tb_ptr, a, msg_area + ETH_ADDR_OFFSET); + memset(addr, 0, sizeof(*addr)); + ether_addr_copy(addr->value, msg); + addr->media_id = TIPC_MEDIA_TYPE_ETH; + addr->broadcast = !memcmp(addr->value, bcast_mac, ETH_ALEN); return 0; } +/* Convert discovery msg addr format to Ethernet media addr format */ +static int tipc_eth_msg2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) +{ + /* Skip past preamble: */ + msg += ETH_ADDR_OFFSET; + return tipc_eth_raw2addr(b, addr, msg); +} + /* Ethernet media registration info */ struct tipc_media eth_media_info = { .send_msg = tipc_l2_send_msg, @@ -78,6 +91,7 @@ struct tipc_media eth_media_info = { .addr2str = tipc_eth_addr2str, .addr2msg = tipc_eth_addr2msg, .msg2addr = tipc_eth_msg2addr, + .raw2addr = tipc_eth_raw2addr, .priority = TIPC_DEF_LINK_PRI, .tolerance = TIPC_DEF_LINK_TOL, .window = TIPC_DEF_LINK_WIN, @@ -85,4 +99,3 @@ struct tipc_media eth_media_info = { .hwaddr_len = ETH_ALEN, .name = "eth" }; - diff --git a/net/tipc/handler.c b/net/tipc/handler.c deleted file mode 100644 index 1fabf160501..00000000000 --- a/net/tipc/handler.c +++ /dev/null @@ -1,134 +0,0 @@ -/* - * net/tipc/handler.c: TIPC signal handling - * - * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "core.h" - -struct queue_item { - struct list_head next_signal; - void (*handler) (unsigned long); - unsigned long data; -}; - -static struct kmem_cache *tipc_queue_item_cache; -static struct list_head signal_queue_head; -static DEFINE_SPINLOCK(qitem_lock); -static int handler_enabled __read_mostly; - -static void process_signal_queue(unsigned long dummy); - -static DECLARE_TASKLET_DISABLED(tipc_tasklet, process_signal_queue, 0); - - -unsigned int tipc_k_signal(Handler routine, unsigned long argument) -{ - struct queue_item *item; - - spin_lock_bh(&qitem_lock); - if (!handler_enabled) { - spin_unlock_bh(&qitem_lock); - return -ENOPROTOOPT; - } - - item = kmem_cache_alloc(tipc_queue_item_cache, GFP_ATOMIC); - if (!item) { - pr_err("Signal queue out of memory\n"); - spin_unlock_bh(&qitem_lock); - return -ENOMEM; - } - item->handler = routine; - item->data = argument; - list_add_tail(&item->next_signal, &signal_queue_head); - spin_unlock_bh(&qitem_lock); - tasklet_schedule(&tipc_tasklet); - return 0; -} - -static void process_signal_queue(unsigned long dummy) -{ - struct queue_item *__volatile__ item; - struct list_head *l, *n; - - spin_lock_bh(&qitem_lock); - list_for_each_safe(l, n, &signal_queue_head) { - item = list_entry(l, struct queue_item, next_signal); - list_del(&item->next_signal); - spin_unlock_bh(&qitem_lock); - item->handler(item->data); - spin_lock_bh(&qitem_lock); - kmem_cache_free(tipc_queue_item_cache, item); - } - spin_unlock_bh(&qitem_lock); -} - -int tipc_handler_start(void) -{ - tipc_queue_item_cache = - kmem_cache_create("tipc_queue_items", sizeof(struct queue_item), - 0, SLAB_HWCACHE_ALIGN, NULL); - if (!tipc_queue_item_cache) - return -ENOMEM; - - INIT_LIST_HEAD(&signal_queue_head); - tasklet_enable(&tipc_tasklet); - handler_enabled = 1; - return 0; -} - -void tipc_handler_stop(void) -{ - struct list_head *l, *n; - struct queue_item *item; - - spin_lock_bh(&qitem_lock); - if (!handler_enabled) { - spin_unlock_bh(&qitem_lock); - return; - } - handler_enabled = 0; - spin_unlock_bh(&qitem_lock); - - tasklet_kill(&tipc_tasklet); - - spin_lock_bh(&qitem_lock); - list_for_each_safe(l, n, &signal_queue_head) { - item = list_entry(l, struct queue_item, next_signal); - list_del(&item->next_signal); - kmem_cache_free(tipc_queue_item_cache, item); - } - spin_unlock_bh(&qitem_lock); - - kmem_cache_destroy(tipc_queue_item_cache); -} diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c index 844a77e2582..8522eef9c13 100644 --- a/net/tipc/ib_media.c +++ b/net/tipc/ib_media.c @@ -42,7 +42,7 @@ #include "core.h" #include "bearer.h" -/* convert InfiniBand address to string */ +/* convert InfiniBand address (media address format) media address to string */ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) { @@ -54,23 +54,35 @@ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, return 0; } -/* convert InfiniBand address format to message header format */ -static int tipc_ib_addr2msg(struct tipc_media_addr *a, char *msg_area) +/* Convert from media address format to discovery message addr format */ +static int tipc_ib_addr2msg(char *msg, struct tipc_media_addr *addr) { - memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); - msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_IB; - memcpy(msg_area, a->value, INFINIBAND_ALEN); + memset(msg, 0, TIPC_MEDIA_ADDR_SIZE); + memcpy(msg, addr->value, INFINIBAND_ALEN); return 0; } -/* convert message header address format to InfiniBand format */ -static int tipc_ib_msg2addr(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *msg_area) +/* Convert raw InfiniBand address format to media addr format */ +static int tipc_ib_raw2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) { - tipc_l2_media_addr_set(tb_ptr, a, msg_area); + memset(addr, 0, sizeof(*addr)); + memcpy(addr->value, msg, INFINIBAND_ALEN); + addr->media_id = TIPC_MEDIA_TYPE_IB; + addr->broadcast = !memcmp(msg, b->bcast_addr.value, + INFINIBAND_ALEN); return 0; } +/* Convert discovery msg addr format to InfiniBand media addr format */ +static int tipc_ib_msg2addr(struct tipc_bearer *b, + struct tipc_media_addr *addr, + char *msg) +{ + return tipc_ib_raw2addr(b, addr, msg); +} + /* InfiniBand media registration info */ struct tipc_media ib_media_info = { .send_msg = tipc_l2_send_msg, @@ -79,6 +91,7 @@ struct tipc_media ib_media_info = { .addr2str = tipc_ib_addr2str, .addr2msg = tipc_ib_addr2msg, .msg2addr = tipc_ib_msg2addr, + .raw2addr = tipc_ib_raw2addr, .priority = TIPC_DEF_LINK_PRI, .tolerance = TIPC_DEF_LINK_TOL, .window = TIPC_DEF_LINK_WIN, @@ -86,4 +99,3 @@ struct tipc_media ib_media_info = { .hwaddr_len = INFINIBAND_ALEN, .name = "ib" }; - diff --git a/net/tipc/link.c b/net/tipc/link.c index c5190ab7529..ad2c57f5868 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -37,6 +37,7 @@ #include "core.h" #include "link.h" #include "port.h" +#include "socket.h" #include "name_distr.h" #include "discover.h" #include "config.h" @@ -101,9 +102,18 @@ static unsigned int align(unsigned int i) static void link_init_max_pkt(struct tipc_link *l_ptr) { + struct tipc_bearer *b_ptr; u32 max_pkt; - max_pkt = (l_ptr->b_ptr->mtu & ~3); + rcu_read_lock(); + b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]); + if (!b_ptr) { + rcu_read_unlock(); + return; + } + max_pkt = (b_ptr->mtu & ~3); + rcu_read_unlock(); + if (max_pkt > MAX_MSG_SIZE) max_pkt = MAX_MSG_SIZE; @@ -248,7 +258,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, l_ptr->owner = n_ptr; l_ptr->checkpoint = 1; l_ptr->peer_session = INVALID_SESSION; - l_ptr->b_ptr = b_ptr; + l_ptr->bearer_id = b_ptr->identity; link_set_supervision_props(l_ptr, b_ptr->tolerance); l_ptr->state = RESET_UNKNOWN; @@ -263,6 +273,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, l_ptr->priority = b_ptr->priority; tipc_link_set_queue_limits(l_ptr, b_ptr->window); + l_ptr->net_plane = b_ptr->net_plane; link_init_max_pkt(l_ptr); l_ptr->next_out_no = 1; @@ -287,14 +298,14 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) rcu_read_lock(); list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - spin_lock_bh(&n_ptr->lock); + tipc_node_lock(n_ptr); l_ptr = n_ptr->links[bearer_id]; if (l_ptr) { tipc_link_reset(l_ptr); if (shutting_down || !tipc_node_is_up(n_ptr)) { tipc_node_detach_link(l_ptr->owner, l_ptr); tipc_link_reset_fragments(l_ptr); - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); /* Nobody else can access this link now: */ del_timer_sync(&l_ptr->timer); @@ -302,12 +313,12 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) } else { /* Detach/delete when failover is finished: */ l_ptr->flags |= LINK_STOPPED; - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); del_timer_sync(&l_ptr->timer); } continue; } - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); } rcu_read_unlock(); } @@ -388,9 +399,8 @@ static void link_release_outqueue(struct tipc_link *l_ptr) */ void tipc_link_reset_fragments(struct tipc_link *l_ptr) { - kfree_skb(l_ptr->reasm_head); - l_ptr->reasm_head = NULL; - l_ptr->reasm_tail = NULL; + kfree_skb(l_ptr->reasm_buf); + l_ptr->reasm_buf = NULL; } /** @@ -426,7 +436,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) return; tipc_node_link_down(l_ptr->owner, l_ptr); - tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); + tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr); if (was_active_link && tipc_node_active_links(l_ptr->owner)) { l_ptr->reset_checkpoint = checkpoint; @@ -464,11 +474,11 @@ void tipc_link_reset_list(unsigned int bearer_id) rcu_read_lock(); list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - spin_lock_bh(&n_ptr->lock); + tipc_node_lock(n_ptr); l_ptr = n_ptr->links[bearer_id]; if (l_ptr) tipc_link_reset(l_ptr); - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); } rcu_read_unlock(); } @@ -477,7 +487,7 @@ static void link_activate(struct tipc_link *l_ptr) { l_ptr->next_in_no = l_ptr->stats.recv_info = 1; tipc_node_link_up(l_ptr->owner, l_ptr); - tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); + tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr); } /** @@ -777,7 +787,7 @@ int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) if (likely(!link_congested(l_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; return dsz; } @@ -825,7 +835,6 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) struct tipc_node *n_ptr; int res = -ELINKCONG; - read_lock_bh(&tipc_net_lock); n_ptr = tipc_node_find(dest); if (n_ptr) { tipc_node_lock(n_ptr); @@ -838,7 +847,6 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) } else { kfree_skb(buf); } - read_unlock_bh(&tipc_net_lock); return res; } @@ -902,7 +910,6 @@ void tipc_link_names_xmit(struct list_head *message_list, u32 dest) if (list_empty(message_list)) return; - read_lock_bh(&tipc_net_lock); n_ptr = tipc_node_find(dest); if (n_ptr) { tipc_node_lock(n_ptr); @@ -917,7 +924,6 @@ void tipc_link_names_xmit(struct list_head *message_list, u32 dest) } tipc_node_unlock(n_ptr); } - read_unlock_bh(&tipc_net_lock); /* discard the messages if they couldn't be sent */ list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { @@ -941,7 +947,7 @@ static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf, if (likely(!link_congested(l_ptr))) { if (likely(msg_size(msg) <= l_ptr->max_pkt)) { link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_send(l_ptr->b_ptr, buf, + tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; return res; @@ -979,7 +985,6 @@ again: if (unlikely(res < 0)) return res; - read_lock_bh(&tipc_net_lock); node = tipc_node_find(destaddr); if (likely(node)) { tipc_node_lock(node); @@ -990,7 +995,6 @@ again: &sender->max_pkt); exit: tipc_node_unlock(node); - read_unlock_bh(&tipc_net_lock); return res; } @@ -1007,7 +1011,6 @@ exit: */ sender->max_pkt = l_ptr->max_pkt; tipc_node_unlock(node); - read_unlock_bh(&tipc_net_lock); if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) @@ -1018,7 +1021,6 @@ exit: } tipc_node_unlock(node); } - read_unlock_bh(&tipc_net_lock); /* Couldn't find a link to the destination node */ kfree_skb(buf); @@ -1204,7 +1206,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (r_q_size && buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); l_ptr->retransm_queue_head = mod(++r_q_head); l_ptr->retransm_queue_size = --r_q_size; l_ptr->stats.retransmitted++; @@ -1216,7 +1218,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; kfree_skb(buf); l_ptr->proto_msg_queue = NULL; @@ -1233,7 +1235,8 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (mod(next - first) < l_ptr->queue_limit[0]) { msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + tipc_bearer_send(l_ptr->bearer_id, buf, + &l_ptr->media_addr); if (msg_user(msg) == MSG_BUNDLER) msg_set_type(msg, CLOSED_MSG); l_ptr->next_out = buf->next; @@ -1256,33 +1259,24 @@ void tipc_link_push_queue(struct tipc_link *l_ptr) } while (!res); } -static void link_reset_all(unsigned long addr) +void tipc_link_reset_all(struct tipc_node *node) { - struct tipc_node *n_ptr; char addr_string[16]; u32 i; - read_lock_bh(&tipc_net_lock); - n_ptr = tipc_node_find((u32)addr); - if (!n_ptr) { - read_unlock_bh(&tipc_net_lock); - return; /* node no longer exists */ - } - - tipc_node_lock(n_ptr); + tipc_node_lock(node); pr_warn("Resetting all links to %s\n", - tipc_addr_string_fill(addr_string, n_ptr->addr)); + tipc_addr_string_fill(addr_string, node->addr)); for (i = 0; i < MAX_BEARERS; i++) { - if (n_ptr->links[i]) { - link_print(n_ptr->links[i], "Resetting link\n"); - tipc_link_reset(n_ptr->links[i]); + if (node->links[i]) { + link_print(node->links[i], "Resetting link\n"); + tipc_link_reset(node->links[i]); } } - tipc_node_unlock(n_ptr); - read_unlock_bh(&tipc_net_lock); + tipc_node_unlock(node); } static void link_retransmit_failure(struct tipc_link *l_ptr, @@ -1319,10 +1313,9 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, n_ptr->bclink.oos_state, n_ptr->bclink.last_sent); - tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); - tipc_node_unlock(n_ptr); + tipc_bclink_set_flags(TIPC_BCLINK_RESET); l_ptr->stale_count = 0; } } @@ -1352,7 +1345,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, msg = buf_msg(buf); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); buf = buf->next; retransmits--; l_ptr->stats.retransmitted++; @@ -1440,14 +1433,13 @@ static int link_recv_buf_validate(struct sk_buff *buf) /** * tipc_rcv - process TIPC packets/messages arriving from off-node * @head: pointer to message buffer chain - * @tb_ptr: pointer to bearer message arrived on + * @b_ptr: pointer to bearer message arrived on * * Invoked with no locks held. Bearer pointer must point to a valid bearer * structure (i.e. cannot be NULL), but bearer can be inactive. */ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) { - read_lock_bh(&tipc_net_lock); while (head) { struct tipc_node *n_ptr; struct tipc_link *l_ptr; @@ -1497,14 +1489,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) goto unlock_discard; /* Verify that communication with node is currently allowed */ - if ((n_ptr->block_setup & WAIT_PEER_DOWN) && - msg_user(msg) == LINK_PROTOCOL && - (msg_type(msg) == RESET_MSG || - msg_type(msg) == ACTIVATE_MSG) && - !msg_redundant_link(msg)) - n_ptr->block_setup &= ~WAIT_PEER_DOWN; - - if (n_ptr->block_setup) + if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && + msg_user(msg) == LINK_PROTOCOL && + (msg_type(msg) == RESET_MSG || + msg_type(msg) == ACTIVATE_MSG) && + !msg_redundant_link(msg)) + n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; + + if (tipc_node_blocked(n_ptr)) goto unlock_discard; /* Validate message sequence number info */ @@ -1581,17 +1573,12 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) } msg = buf_msg(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { - int rc; - l_ptr->stats.recv_fragments++; - rc = tipc_link_frag_rcv(&l_ptr->reasm_head, - &l_ptr->reasm_tail, - &buf); - if (rc == LINK_REASM_COMPLETE) { + if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) { l_ptr->stats.recv_fragmented++; msg = buf_msg(buf); } else { - if (rc == LINK_REASM_ERROR) + if (!l_ptr->reasm_buf) tipc_link_reset(l_ptr); tipc_node_unlock(n_ptr); continue; @@ -1604,7 +1591,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: tipc_node_unlock(n_ptr); - tipc_port_rcv(buf); + tipc_sk_rcv(buf); continue; case MSG_BUNDLER: l_ptr->stats.recv_bundles++; @@ -1635,7 +1622,6 @@ unlock_discard: discard: kfree_skb(buf); } - read_unlock_bh(&tipc_net_lock); } /** @@ -1747,12 +1733,12 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, return; /* Abort non-RESET send if communication with node is prohibited */ - if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) + if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG)) return; /* Create protocol message with "out-of-sequence" sequence number */ msg_set_type(msg, msg_typ); - msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); + msg_set_net_plane(msg, l_ptr->net_plane); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); @@ -1818,7 +1804,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); buf->priority = TC_PRIO_CONTROL; - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; kfree_skb(buf); } @@ -1840,12 +1826,9 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) if (l_ptr->exp_msg_count) goto exit; - /* record unnumbered packet arrival (force mismatch on next timeout) */ - l_ptr->checkpoint--; - - if (l_ptr->b_ptr->net_plane != msg_net_plane(msg)) + if (l_ptr->net_plane != msg_net_plane(msg)) if (tipc_own_addr > msg_prevnode(msg)) - l_ptr->b_ptr->net_plane = msg_net_plane(msg); + l_ptr->net_plane = msg_net_plane(msg); switch (msg_type(msg)) { @@ -1862,7 +1845,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) * peer has lost contact -- don't allow peer's links * to reactivate before we recognize loss & clean up */ - l_ptr->owner->block_setup = WAIT_NODE_DOWN; + l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN; } link_state_event(l_ptr, RESET_MSG); @@ -1918,6 +1901,10 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) tipc_link_reset(l_ptr); /* Enforce change to take effect */ break; } + + /* Record reception; force mismatch at next timeout: */ + l_ptr->checkpoint--; + link_state_event(l_ptr, TRAFFIC_MSG_EVT); l_ptr->stats.recv_states++; if (link_reset_unknown(l_ptr)) @@ -2177,9 +2164,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, } if (msg_user(msg) == MSG_FRAGMENTER) { l_ptr->stats.recv_fragments++; - tipc_link_frag_rcv(&l_ptr->reasm_head, - &l_ptr->reasm_tail, - &buf); + tipc_buf_append(&l_ptr->reasm_buf, &buf); } } exit: @@ -2317,53 +2302,6 @@ static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) return dsz; } -/* tipc_link_frag_rcv(): Called with node lock on. Returns - * the reassembled buffer if message is complete. - */ -int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff **fbuf) -{ - struct sk_buff *frag = *fbuf; - struct tipc_msg *msg = buf_msg(frag); - u32 fragid = msg_type(msg); - bool headstolen; - int delta; - - skb_pull(frag, msg_hdr_sz(msg)); - if (fragid == FIRST_FRAGMENT) { - if (*head || skb_unclone(frag, GFP_ATOMIC)) - goto out_free; - *head = frag; - skb_frag_list_init(*head); - *fbuf = NULL; - return 0; - } else if (*head && - skb_try_coalesce(*head, frag, &headstolen, &delta)) { - kfree_skb_partial(frag, headstolen); - } else { - if (!*head) - goto out_free; - if (!skb_has_frag_list(*head)) - skb_shinfo(*head)->frag_list = frag; - else - (*tail)->next = frag; - *tail = frag; - (*head)->truesize += frag->truesize; - } - if (fragid == LAST_FRAGMENT) { - *fbuf = *head; - *tail = *head = NULL; - return LINK_REASM_COMPLETE; - } - *fbuf = NULL; - return 0; -out_free: - pr_warn_ratelimited("Link unable to reassemble fragmented message\n"); - kfree_skb(*fbuf); - *fbuf = NULL; - return LINK_REASM_ERROR; -} - static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) { if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) @@ -2397,8 +2335,6 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) /* tipc_link_find_owner - locate owner node of link by link's name * @name: pointer to link name string * @bearer_id: pointer to index in 'node->links' array where the link was found. - * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; - * this also prevents link deletion. * * Returns pointer to node owning the link, or 0 if no matching link is found. */ @@ -2460,7 +2396,7 @@ static int link_value_is_valid(u16 cmd, u32 new_value) * @new_value: new value of link, bearer, or media setting * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*) * - * Caller must hold 'tipc_net_lock' to ensure link/bearer/media is not deleted. + * Caller must hold RTNL lock to ensure link/bearer/media is not deleted. * * Returns 0 if value updated and negative value on error. */ @@ -2566,9 +2502,7 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space " (cannot change setting on broadcast link)"); } - read_lock_bh(&tipc_net_lock); res = link_cmd_set_value(args->name, new_value, cmd); - read_unlock_bh(&tipc_net_lock); if (res) return tipc_cfg_reply_error_string("cannot change link setting"); @@ -2602,22 +2536,18 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_ return tipc_cfg_reply_error_string("link not found"); return tipc_cfg_reply_none(); } - read_lock_bh(&tipc_net_lock); node = tipc_link_find_owner(link_name, &bearer_id); - if (!node) { - read_unlock_bh(&tipc_net_lock); + if (!node) return tipc_cfg_reply_error_string("link not found"); - } + tipc_node_lock(node); l_ptr = node->links[bearer_id]; if (!l_ptr) { tipc_node_unlock(node); - read_unlock_bh(&tipc_net_lock); return tipc_cfg_reply_error_string("link not found"); } link_reset_statistics(l_ptr); tipc_node_unlock(node); - read_unlock_bh(&tipc_net_lock); return tipc_cfg_reply_none(); } @@ -2650,18 +2580,15 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) if (!strcmp(name, tipc_bclink_name)) return tipc_bclink_stats(buf, buf_size); - read_lock_bh(&tipc_net_lock); node = tipc_link_find_owner(name, &bearer_id); - if (!node) { - read_unlock_bh(&tipc_net_lock); + if (!node) return 0; - } + tipc_node_lock(node); l = node->links[bearer_id]; if (!l) { tipc_node_unlock(node); - read_unlock_bh(&tipc_net_lock); return 0; } @@ -2727,7 +2654,6 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) (s->accu_queue_sz / s->queue_sz_counts) : 0); tipc_node_unlock(node); - read_unlock_bh(&tipc_net_lock); return ret; } @@ -2778,7 +2704,6 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector) if (dest == tipc_own_addr) return MAX_MSG_SIZE; - read_lock_bh(&tipc_net_lock); n_ptr = tipc_node_find(dest); if (n_ptr) { tipc_node_lock(n_ptr); @@ -2787,13 +2712,18 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector) res = l_ptr->max_pkt; tipc_node_unlock(n_ptr); } - read_unlock_bh(&tipc_net_lock); return res; } static void link_print(struct tipc_link *l_ptr, const char *str) { - pr_info("%s Link %x<%s>:", str, l_ptr->addr, l_ptr->b_ptr->name); + struct tipc_bearer *b_ptr; + + rcu_read_lock(); + b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]); + if (b_ptr) + pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name); + rcu_read_unlock(); if (link_working_unknown(l_ptr)) pr_cont(":WU\n"); diff --git a/net/tipc/link.h b/net/tipc/link.h index 8c0b49b5b2e..200d518b218 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -40,11 +40,6 @@ #include "msg.h" #include "node.h" -/* Link reassembly status codes - */ -#define LINK_REASM_ERROR -1 -#define LINK_REASM_COMPLETE 1 - /* Out-of-range value for link sequence numbers */ #define INVALID_LINK_SEQ 0x10000 @@ -107,7 +102,7 @@ struct tipc_stats { * @checkpoint: reference point for triggering link continuity checking * @peer_session: link session # being used by peer end of link * @peer_bearer_id: bearer id used by link's peer endpoint - * @b_ptr: pointer to bearer used by link + * @bearer_id: local bearer id used by link * @tolerance: minimum link continuity loss needed to reset link [in ms] * @continuity_interval: link continuity testing interval [in ms] * @abort_limit: # of unacknowledged continuity probes needed to reset link @@ -116,6 +111,7 @@ struct tipc_stats { * @proto_msg: template for control messages generated by link * @pmsg: convenience pointer to "proto_msg" field * @priority: current link priority + * @net_plane: current link network plane ('A' through 'H') * @queue_limit: outbound message queue congestion thresholds (indexed by user) * @exp_msg_count: # of tunnelled messages expected during link changeover * @reset_checkpoint: seq # of last acknowledged message at time of link reset @@ -139,8 +135,7 @@ struct tipc_stats { * @next_out: ptr to first unsent outbound message in queue * @waiting_ports: linked list of ports waiting for link congestion to abate * @long_msg_seq_no: next identifier to use for outbound fragmented messages - * @reasm_head: list head of partially reassembled inbound message fragments - * @reasm_tail: last fragment received + * @reasm_buf: head of partially reassembled inbound message fragments * @stats: collects statistics regarding link activity */ struct tipc_link { @@ -155,7 +150,7 @@ struct tipc_link { u32 checkpoint; u32 peer_session; u32 peer_bearer_id; - struct tipc_bearer *b_ptr; + u32 bearer_id; u32 tolerance; u32 continuity_interval; u32 abort_limit; @@ -167,6 +162,7 @@ struct tipc_link { } proto_msg; struct tipc_msg *pmsg; u32 priority; + char net_plane; u32 queue_limit[15]; /* queue_limit[0]==window limit */ /* Changeover */ @@ -202,8 +198,7 @@ struct tipc_link { /* Fragmentation/reassembly */ u32 long_msg_seq_no; - struct sk_buff *reasm_head; - struct sk_buff *reasm_tail; + struct sk_buff *reasm_buf; /* Statistics */ struct tipc_stats stats; @@ -228,6 +223,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space); +void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); @@ -239,9 +235,6 @@ int tipc_link_iovec_xmit_fast(struct tipc_port *sender, struct iovec const *msg_sect, unsigned int len, u32 destnode); void tipc_link_bundle_rcv(struct sk_buff *buf); -int tipc_link_frag_rcv(struct sk_buff **reasm_head, - struct sk_buff **reasm_tail, - struct sk_buff **fbuf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); void tipc_link_push_queue(struct tipc_link *l_ptr); diff --git a/net/tipc/msg.c b/net/tipc/msg.c index e525f8ce1de..8be6e94a1ca 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -1,7 +1,7 @@ /* * net/tipc/msg.c: TIPC message header routines * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -99,3 +99,56 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, } return dsz; } + +/* tipc_buf_append(): Append a buffer to the fragment list of another buffer + * Let first buffer become head buffer + * Returns 1 and sets *buf to headbuf if chain is complete, otherwise 0 + * Leaves headbuf pointer at NULL if failure + */ +int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) +{ + struct sk_buff *head = *headbuf; + struct sk_buff *frag = *buf; + struct sk_buff *tail; + struct tipc_msg *msg = buf_msg(frag); + u32 fragid = msg_type(msg); + bool headstolen; + int delta; + + skb_pull(frag, msg_hdr_sz(msg)); + + if (fragid == FIRST_FRAGMENT) { + if (head || skb_unclone(frag, GFP_ATOMIC)) + goto out_free; + head = *headbuf = frag; + skb_frag_list_init(head); + return 0; + } + if (!head) + goto out_free; + tail = TIPC_SKB_CB(head)->tail; + if (skb_try_coalesce(head, frag, &headstolen, &delta)) { + kfree_skb_partial(frag, headstolen); + } else { + if (!skb_has_frag_list(head)) + skb_shinfo(head)->frag_list = frag; + else + tail->next = frag; + head->truesize += frag->truesize; + head->data_len += frag->len; + head->len += frag->len; + TIPC_SKB_CB(head)->tail = frag; + } + if (fragid == LAST_FRAGMENT) { + *buf = head; + TIPC_SKB_CB(head)->tail = NULL; + *headbuf = NULL; + return 1; + } + *buf = NULL; + return 0; +out_free: + pr_warn_ratelimited("Unable to build fragment list\n"); + kfree_skb(*buf); + return 0; +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 76d1269b944..503511903d1 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, Ericsson AB + * Copyright (c) 2000-2007, 2014, Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -711,4 +711,7 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, unsigned int len, int max_size, struct sk_buff **buf); + +int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); + #endif diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index aff8041dc15..8ce730984aa 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -38,34 +38,6 @@ #include "link.h" #include "name_distr.h" -#define ITEM_SIZE sizeof(struct distr_item) - -/** - * struct distr_item - publication info distributed to other nodes - * @type: name sequence type - * @lower: name sequence lower bound - * @upper: name sequence upper bound - * @ref: publishing port reference - * @key: publication key - * - * ===> All fields are stored in network byte order. <=== - * - * First 3 fields identify (name or) name sequence being published. - * Reference field uniquely identifies port that published name sequence. - * Key field uniquely identifies publication, in the event a port has - * multiple publications of the same name sequence. - * - * Note: There is no field that identifies the publishing node because it is - * the same for all items contained within a publication message. - */ -struct distr_item { - __be32 type; - __be32 lower; - __be32 upper; - __be32 ref; - __be32 key; -}; - /** * struct publ_list - list of publications made by this node * @list: circular list of publications @@ -127,7 +99,7 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) return buf; } -static void named_cluster_distribute(struct sk_buff *buf) +void named_cluster_distribute(struct sk_buff *buf) { struct sk_buff *buf_copy; struct tipc_node *n_ptr; @@ -135,18 +107,18 @@ static void named_cluster_distribute(struct sk_buff *buf) rcu_read_lock(); list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - spin_lock_bh(&n_ptr->lock); + tipc_node_lock(n_ptr); l_ptr = n_ptr->active_links[n_ptr->addr & 1]; if (l_ptr) { buf_copy = skb_copy(buf, GFP_ATOMIC); if (!buf_copy) { - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); break; } msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); __tipc_link_xmit(l_ptr, buf_copy); } - spin_unlock_bh(&n_ptr->lock); + tipc_node_unlock(n_ptr); } rcu_read_unlock(); @@ -156,7 +128,7 @@ static void named_cluster_distribute(struct sk_buff *buf) /** * tipc_named_publish - tell other nodes about a new publication by this node */ -void tipc_named_publish(struct publication *publ) +struct sk_buff *tipc_named_publish(struct publication *publ) { struct sk_buff *buf; struct distr_item *item; @@ -165,23 +137,23 @@ void tipc_named_publish(struct publication *publ) publ_lists[publ->scope]->size++; if (publ->scope == TIPC_NODE_SCOPE) - return; + return NULL; buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0); if (!buf) { pr_warn("Publication distribution failure\n"); - return; + return NULL; } item = (struct distr_item *)msg_data(buf_msg(buf)); publ_to_item(item, publ); - named_cluster_distribute(buf); + return buf; } /** * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node */ -void tipc_named_withdraw(struct publication *publ) +struct sk_buff *tipc_named_withdraw(struct publication *publ) { struct sk_buff *buf; struct distr_item *item; @@ -190,17 +162,17 @@ void tipc_named_withdraw(struct publication *publ) publ_lists[publ->scope]->size--; if (publ->scope == TIPC_NODE_SCOPE) - return; + return NULL; buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0); if (!buf) { pr_warn("Withdrawal distribution failure\n"); - return; + return NULL; } item = (struct distr_item *)msg_data(buf_msg(buf)); publ_to_item(item, publ); - named_cluster_distribute(buf); + return buf; } /* @@ -239,31 +211,9 @@ static void named_distribute(struct list_head *message_list, u32 node, /** * tipc_named_node_up - tell specified node about all publications by this node */ -void tipc_named_node_up(unsigned long nodearg) +void tipc_named_node_up(u32 max_item_buf, u32 node) { - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct list_head message_list; - u32 node = (u32)nodearg; - u32 max_item_buf = 0; - - /* compute maximum amount of publication data to send per message */ - read_lock_bh(&tipc_net_lock); - n_ptr = tipc_node_find(node); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[0]; - if (l_ptr) - max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) / - ITEM_SIZE) * ITEM_SIZE; - tipc_node_unlock(n_ptr); - } - read_unlock_bh(&tipc_net_lock); - if (!max_item_buf) - return; - - /* create list of publication messages, then send them as a unit */ - INIT_LIST_HEAD(&message_list); + LIST_HEAD(message_list); read_lock_bh(&tipc_nametbl_lock); named_distribute(&message_list, node, &publ_cluster, max_item_buf); diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index 9b312ccfd43..b2eed4ec152 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -39,9 +39,38 @@ #include "name_table.h" -void tipc_named_publish(struct publication *publ); -void tipc_named_withdraw(struct publication *publ); -void tipc_named_node_up(unsigned long node); +#define ITEM_SIZE sizeof(struct distr_item) + +/** + * struct distr_item - publication info distributed to other nodes + * @type: name sequence type + * @lower: name sequence lower bound + * @upper: name sequence upper bound + * @ref: publishing port reference + * @key: publication key + * + * ===> All fields are stored in network byte order. <=== + * + * First 3 fields identify (name or) name sequence being published. + * Reference field uniquely identifies port that published name sequence. + * Key field uniquely identifies publication, in the event a port has + * multiple publications of the same name sequence. + * + * Note: There is no field that identifies the publishing node because it is + * the same for all items contained within a publication message. + */ +struct distr_item { + __be32 type; + __be32 lower; + __be32 upper; + __be32 ref; + __be32 key; +}; + +struct sk_buff *tipc_named_publish(struct publication *publ); +struct sk_buff *tipc_named_withdraw(struct publication *publ); +void named_cluster_distribute(struct sk_buff *buf); +void tipc_named_node_up(u32 max_item_buf, u32 node); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 042e8e3cabc..9d7d37d9518 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -664,6 +664,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, u32 scope, u32 port_ref, u32 key) { struct publication *publ; + struct sk_buff *buf = NULL; if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) { pr_warn("Publication failed, local publication limit reached (%u)\n", @@ -676,9 +677,12 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, tipc_own_addr, port_ref, key); if (likely(publ)) { table.local_publ_count++; - tipc_named_publish(publ); + buf = tipc_named_publish(publ); } write_unlock_bh(&tipc_nametbl_lock); + + if (buf) + named_cluster_distribute(buf); return publ; } @@ -688,15 +692,19 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key) { struct publication *publ; + struct sk_buff *buf; write_lock_bh(&tipc_nametbl_lock); publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key); if (likely(publ)) { table.local_publ_count--; - tipc_named_withdraw(publ); + buf = tipc_named_withdraw(publ); write_unlock_bh(&tipc_nametbl_lock); list_del_init(&publ->pport_list); kfree(publ); + + if (buf) + named_cluster_distribute(buf); return 1; } write_unlock_bh(&tipc_nametbl_lock); @@ -961,6 +969,7 @@ static void tipc_purge_publications(struct name_seq *seq) list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) { tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, publ->ref, publ->key); + kfree(publ); } } @@ -982,7 +991,6 @@ void tipc_nametbl_stop(void) hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) { tipc_purge_publications(seq); } - continue; } kfree(table.types); table.types = NULL; diff --git a/net/tipc/net.c b/net/tipc/net.c index 4c564eb69e1..f64375e7f99 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -39,45 +39,41 @@ #include "name_distr.h" #include "subscr.h" #include "port.h" +#include "socket.h" #include "node.h" #include "config.h" /* * The TIPC locking policy is designed to ensure a very fine locking * granularity, permitting complete parallel access to individual - * port and node/link instances. The code consists of three major + * port and node/link instances. The code consists of four major * locking domains, each protected with their own disjunct set of locks. * - * 1: The routing hierarchy. - * Comprises the structures 'zone', 'cluster', 'node', 'link' - * and 'bearer'. The whole hierarchy is protected by a big - * read/write lock, tipc_net_lock, to enssure that nothing is added - * or removed while code is accessing any of these structures. - * This layer must not be called from the two others while they - * hold any of their own locks. - * Neither must it itself do any upcalls to the other two before - * it has released tipc_net_lock and other protective locks. + * 1: The bearer level. + * RTNL lock is used to serialize the process of configuring bearer + * on update side, and RCU lock is applied on read side to make + * bearer instance valid on both paths of message transmission and + * reception. * - * Within the tipc_net_lock domain there are two sub-domains;'node' and - * 'bearer', where local write operations are permitted, - * provided that those are protected by individual spin_locks - * per instance. Code holding tipc_net_lock(read) and a node spin_lock - * is permitted to poke around in both the node itself and its - * subordinate links. I.e, it can update link counters and queues, - * change link state, send protocol messages, and alter the - * "active_links" array in the node; but it can _not_ remove a link - * or a node from the overall structure. - * Correspondingly, individual bearers may change status within a - * tipc_net_lock(read), protected by an individual spin_lock ber bearer - * instance, but it needs tipc_net_lock(write) to remove/add any bearers. + * 2: The node and link level. + * All node instances are saved into two tipc_node_list and node_htable + * lists. The two lists are protected by node_list_lock on write side, + * and they are guarded with RCU lock on read side. Especially node + * instance is destroyed only when TIPC module is removed, and we can + * confirm that there has no any user who is accessing the node at the + * moment. Therefore, Except for iterating the two lists within RCU + * protection, it's no needed to hold RCU that we access node instance + * in other places. * + * In addition, all members in node structure including link instances + * are protected by node spin lock. * - * 2: The transport level of the protocol. - * This consists of the structures port, (and its user level - * representations, such as user_port and tipc_sock), reference and - * tipc_user (port.c, reg.c, socket.c). + * 3: The transport level of the protocol. + * This consists of the structures port, (and its user level + * representations, such as user_port and tipc_sock), reference and + * tipc_user (port.c, reg.c, socket.c). * - * This layer has four different locks: + * This layer has four different locks: * - The tipc_port spin_lock. This is protecting each port instance * from parallel data access and removal. Since we can not place * this lock in the port itself, it has been placed in the @@ -96,7 +92,7 @@ * There are two such lists; 'port_list', which is used for management, * and 'wait_list', which is used to queue ports during congestion. * - * 3: The name table (name_table.c, name_distr.c, subscription.c) + * 4: The name table (name_table.c, name_distr.c, subscription.c) * - There is one big read/write-lock (tipc_nametbl_lock) protecting the * overall name table structure. Nothing must be added/removed to * this structure without holding write access to it. @@ -108,8 +104,6 @@ * - A local spin_lock protecting the queue of subscriber events. */ -DEFINE_RWLOCK(tipc_net_lock); - static void net_route_named_msg(struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); @@ -148,7 +142,7 @@ void tipc_net_route_msg(struct sk_buff *buf) if (msg_mcast(msg)) tipc_port_mcast_rcv(buf, NULL); else if (msg_destport(msg)) - tipc_port_rcv(buf); + tipc_sk_rcv(buf); else net_route_named_msg(buf); return; @@ -171,22 +165,25 @@ void tipc_net_route_msg(struct sk_buff *buf) tipc_link_xmit(buf, dnode, msg_link_selector(msg)); } -void tipc_net_start(u32 addr) +int tipc_net_start(u32 addr) { char addr_string[16]; + int res; - write_lock_bh(&tipc_net_lock); tipc_own_addr = addr; tipc_named_reinit(); tipc_port_reinit(); - tipc_bclink_init(); - write_unlock_bh(&tipc_net_lock); + res = tipc_bclink_init(); + if (res) + return res; tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr, TIPC_ZONE_SCOPE, 0, tipc_own_addr); + pr_info("Started in network mode\n"); pr_info("Own node address %s, network identity %u\n", tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id); + return 0; } void tipc_net_stop(void) @@ -195,11 +192,11 @@ void tipc_net_stop(void) return; tipc_nametbl_withdraw(TIPC_CFG_SRV, tipc_own_addr, 0, tipc_own_addr); - write_lock_bh(&tipc_net_lock); + rtnl_lock(); tipc_bearer_stop(); tipc_bclink_stop(); tipc_node_stop(); - write_unlock_bh(&tipc_net_lock); + rtnl_unlock(); pr_info("Left network mode\n"); } diff --git a/net/tipc/net.h b/net/tipc/net.h index 079daadb3f7..c6c2b46f7c2 100644 --- a/net/tipc/net.h +++ b/net/tipc/net.h @@ -37,11 +37,9 @@ #ifndef _TIPC_NET_H #define _TIPC_NET_H -extern rwlock_t tipc_net_lock; - void tipc_net_route_msg(struct sk_buff *buf); -void tipc_net_start(u32 addr); +int tipc_net_start(u32 addr); void tipc_net_stop(void); #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index 1d3a4999a70..5b44c3041be 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -108,7 +108,7 @@ struct tipc_node *tipc_node_create(u32 addr) break; } list_add_tail_rcu(&n_ptr->list, &temp_node->list); - n_ptr->block_setup = WAIT_PEER_DOWN; + n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN; n_ptr->signature = INVALID_NODE_SIG; tipc_num_nodes++; @@ -144,11 +144,13 @@ void tipc_node_stop(void) void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { struct tipc_link **active = &n_ptr->active_links[0]; + u32 addr = n_ptr->addr; n_ptr->working_links++; - + tipc_nametbl_publish(TIPC_LINK_STATE, addr, addr, TIPC_NODE_SCOPE, + l_ptr->bearer_id, addr); pr_info("Established link <%s> on network plane %c\n", - l_ptr->name, l_ptr->b_ptr->net_plane); + l_ptr->name, l_ptr->net_plane); if (!active[0]) { active[0] = active[1] = l_ptr; @@ -203,16 +205,18 @@ static void node_select_active_links(struct tipc_node *n_ptr) void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { struct tipc_link **active; + u32 addr = n_ptr->addr; n_ptr->working_links--; + tipc_nametbl_withdraw(TIPC_LINK_STATE, addr, l_ptr->bearer_id, addr); if (!tipc_link_is_active(l_ptr)) { pr_info("Lost standby link <%s> on network plane %c\n", - l_ptr->name, l_ptr->b_ptr->net_plane); + l_ptr->name, l_ptr->net_plane); return; } pr_info("Lost link <%s> on network plane %c\n", - l_ptr->name, l_ptr->b_ptr->net_plane); + l_ptr->name, l_ptr->net_plane); active = &n_ptr->active_links[0]; if (active[0] == l_ptr) @@ -239,7 +243,7 @@ int tipc_node_is_up(struct tipc_node *n_ptr) void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { - n_ptr->links[l_ptr->b_ptr->identity] = l_ptr; + n_ptr->links[l_ptr->bearer_id] = l_ptr; spin_lock_bh(&node_list_lock); tipc_num_links++; spin_unlock_bh(&node_list_lock); @@ -263,26 +267,12 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) static void node_established_contact(struct tipc_node *n_ptr) { - tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); + n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP; n_ptr->bclink.oos_state = 0; n_ptr->bclink.acked = tipc_bclink_get_last_sent(); tipc_bclink_add_node(n_ptr->addr); } -static void node_name_purge_complete(unsigned long node_addr) -{ - struct tipc_node *n_ptr; - - read_lock_bh(&tipc_net_lock); - n_ptr = tipc_node_find(node_addr); - if (n_ptr) { - tipc_node_lock(n_ptr); - n_ptr->block_setup &= ~WAIT_NAMES_GONE; - tipc_node_unlock(n_ptr); - } - read_unlock_bh(&tipc_net_lock); -} - static void node_lost_contact(struct tipc_node *n_ptr) { char addr_string[16]; @@ -296,10 +286,9 @@ static void node_lost_contact(struct tipc_node *n_ptr) kfree_skb_list(n_ptr->bclink.deferred_head); n_ptr->bclink.deferred_size = 0; - if (n_ptr->bclink.reasm_head) { - kfree_skb(n_ptr->bclink.reasm_head); - n_ptr->bclink.reasm_head = NULL; - n_ptr->bclink.reasm_tail = NULL; + if (n_ptr->bclink.reasm_buf) { + kfree_skb(n_ptr->bclink.reasm_buf); + n_ptr->bclink.reasm_buf = NULL; } tipc_bclink_remove_node(n_ptr->addr); @@ -318,12 +307,13 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_link_reset_fragments(l_ptr); } - /* Notify subscribers */ - tipc_nodesub_notify(n_ptr); + n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; - /* Prevent re-contact with node until cleanup is done */ - n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE; - tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr); + /* Notify subscribers and prevent re-contact with node until + * cleanup is done. + */ + n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN | + TIPC_NOTIFY_NODE_DOWN; } struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) @@ -436,3 +426,63 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) rcu_read_unlock(); return buf; } + +/** + * tipc_node_get_linkname - get the name of a link + * + * @bearer_id: id of the bearer + * @node: peer node address + * @linkname: link name output buffer + * + * Returns 0 on success + */ +int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) +{ + struct tipc_link *link; + struct tipc_node *node = tipc_node_find(addr); + + if ((bearer_id >= MAX_BEARERS) || !node) + return -EINVAL; + tipc_node_lock(node); + link = node->links[bearer_id]; + if (link) { + strncpy(linkname, link->name, len); + tipc_node_unlock(node); + return 0; + } + tipc_node_unlock(node); + return -EINVAL; +} + +void tipc_node_unlock(struct tipc_node *node) +{ + LIST_HEAD(nsub_list); + struct tipc_link *link; + int pkt_sz = 0; + u32 addr = 0; + + if (likely(!node->action_flags)) { + spin_unlock_bh(&node->lock); + return; + } + + if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) { + list_replace_init(&node->nsub, &nsub_list); + node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; + } + if (node->action_flags & TIPC_NOTIFY_NODE_UP) { + link = node->active_links[0]; + node->action_flags &= ~TIPC_NOTIFY_NODE_UP; + if (link) { + pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) * + ITEM_SIZE; + addr = node->addr; + } + } + spin_unlock_bh(&node->lock); + + if (!list_empty(&nsub_list)) + tipc_nodesub_notify(&nsub_list); + if (pkt_sz) + tipc_named_node_up(pkt_sz, addr); +} diff --git a/net/tipc/node.h b/net/tipc/node.h index 7cbb8cec1a9..9087063793f 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -47,62 +47,73 @@ */ #define INVALID_NODE_SIG 0x10000 -/* Flags used to block (re)establishment of contact with a neighboring node */ -#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */ -#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */ -#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */ +/* Flags used to take different actions according to flag type + * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down + * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down + * TIPC_NOTIFY_NODE_DOWN: notify node is down + * TIPC_NOTIFY_NODE_UP: notify node is up + */ +enum { + TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), + TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), + TIPC_NOTIFY_NODE_DOWN = (1 << 3), + TIPC_NOTIFY_NODE_UP = (1 << 4) +}; + +/** + * struct tipc_node_bclink - TIPC node bclink structure + * @acked: sequence # of last outbound b'cast message acknowledged by node + * @last_in: sequence # of last in-sequence b'cast message received from node + * @last_sent: sequence # of last b'cast message sent by node + * @oos_state: state tracker for handling OOS b'cast messages + * @deferred_size: number of OOS b'cast messages in deferred queue + * @deferred_head: oldest OOS b'cast message received from node + * @deferred_tail: newest OOS b'cast message received from node + * @reasm_buf: broadcast reassembly queue head from node + * @recv_permitted: true if node is allowed to receive b'cast messages + */ +struct tipc_node_bclink { + u32 acked; + u32 last_in; + u32 last_sent; + u32 oos_state; + u32 deferred_size; + struct sk_buff *deferred_head; + struct sk_buff *deferred_tail; + struct sk_buff *reasm_buf; + bool recv_permitted; +}; /** * struct tipc_node - TIPC node structure * @addr: network address of node * @lock: spinlock governing access to structure * @hash: links to adjacent nodes in unsorted hash chain - * @list: links to adjacent nodes in sorted list of cluster's nodes - * @nsub: list of "node down" subscriptions monitoring node * @active_links: pointers to active links to node * @links: pointers to all links to node + * @action_flags: bit mask of different types of node actions + * @bclink: broadcast-related info + * @list: links to adjacent nodes in sorted list of cluster's nodes * @working_links: number of working links to node (both active and standby) - * @block_setup: bit mask of conditions preventing link establishment to node * @link_cnt: number of links to node * @signature: node instance identifier - * @bclink: broadcast-related info + * @nsub: list of "node down" subscriptions monitoring node * @rcu: rcu struct for tipc_node - * @acked: sequence # of last outbound b'cast message acknowledged by node - * @last_in: sequence # of last in-sequence b'cast message received from node - * @last_sent: sequence # of last b'cast message sent by node - * @oos_state: state tracker for handling OOS b'cast messages - * @deferred_size: number of OOS b'cast messages in deferred queue - * @deferred_head: oldest OOS b'cast message received from node - * @deferred_tail: newest OOS b'cast message received from node - * @reasm_head: broadcast reassembly queue head from node - * @reasm_tail: last broadcast fragment received from node - * @recv_permitted: true if node is allowed to receive b'cast messages */ struct tipc_node { u32 addr; spinlock_t lock; struct hlist_node hash; - struct list_head list; - struct list_head nsub; struct tipc_link *active_links[2]; struct tipc_link *links[MAX_BEARERS]; + unsigned int action_flags; + struct tipc_node_bclink bclink; + struct list_head list; int link_cnt; int working_links; - int block_setup; u32 signature; + struct list_head nsub; struct rcu_head rcu; - struct { - u32 acked; - u32 last_in; - u32 last_sent; - u32 oos_state; - u32 deferred_size; - struct sk_buff *deferred_head; - struct sk_buff *deferred_tail; - struct sk_buff *reasm_head; - struct sk_buff *reasm_tail; - bool recv_permitted; - } bclink; }; extern struct list_head tipc_node_list; @@ -118,15 +129,18 @@ int tipc_node_active_links(struct tipc_node *n_ptr); int tipc_node_is_up(struct tipc_node *n_ptr); struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); +int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len); +void tipc_node_unlock(struct tipc_node *node); -static inline void tipc_node_lock(struct tipc_node *n_ptr) +static inline void tipc_node_lock(struct tipc_node *node) { - spin_lock_bh(&n_ptr->lock); + spin_lock_bh(&node->lock); } -static inline void tipc_node_unlock(struct tipc_node *n_ptr) +static inline bool tipc_node_blocked(struct tipc_node *node) { - spin_unlock_bh(&n_ptr->lock); + return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN | + TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN)); } #endif diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c index 8a7384c04ad..7c59ab1d6ec 100644 --- a/net/tipc/node_subscr.c +++ b/net/tipc/node_subscr.c @@ -81,14 +81,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub) * * Note: node is locked by caller */ -void tipc_nodesub_notify(struct tipc_node *node) +void tipc_nodesub_notify(struct list_head *nsub_list) { - struct tipc_node_subscr *ns; + struct tipc_node_subscr *ns, *safe; - list_for_each_entry(ns, &node->nsub, nodesub_list) { + list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { if (ns->handle_node_down) { - tipc_k_signal((Handler)ns->handle_node_down, - (unsigned long)ns->usr_handle); + ns->handle_node_down(ns->usr_handle); ns->handle_node_down = NULL; } } diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h index c95d20727de..d91b8cc81e3 100644 --- a/net/tipc/node_subscr.h +++ b/net/tipc/node_subscr.h @@ -58,6 +58,6 @@ struct tipc_node_subscr { void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, void *usr_handle, net_ev_handler handle_down); void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub); -void tipc_nodesub_notify(struct tipc_node *node); +void tipc_nodesub_notify(struct list_head *nsub_list); #endif diff --git a/net/tipc/port.c b/net/tipc/port.c index 5c14c7801ee..5fd7acce01e 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -165,7 +165,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) msg_set_destnode(msg, tipc_own_addr); if (dp->count == 1) { msg_set_destport(msg, dp->ports[0]); - tipc_port_rcv(buf); + tipc_sk_rcv(buf); tipc_port_list_free(dp); return; } @@ -180,7 +180,7 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) if ((index == 0) && (cnt != 0)) item = item->next; msg_set_destport(buf_msg(b), item->ports[index]); - tipc_port_rcv(b); + tipc_sk_rcv(b); } } exit: @@ -343,7 +343,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) /* send returned message & dispose of rejected message */ src_node = msg_prevnode(msg); if (in_own_node(src_node)) - tipc_port_rcv(rbuf); + tipc_sk_rcv(rbuf); else tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg)); exit: @@ -754,37 +754,6 @@ int tipc_port_shutdown(u32 ref) return tipc_port_disconnect(ref); } -/** - * tipc_port_rcv - receive message from lower layer and deliver to port user - */ -int tipc_port_rcv(struct sk_buff *buf) -{ - struct tipc_port *p_ptr; - struct tipc_msg *msg = buf_msg(buf); - u32 destport = msg_destport(msg); - u32 dsz = msg_data_sz(msg); - u32 err; - - /* forward unresolved named message */ - if (unlikely(!destport)) { - tipc_net_route_msg(buf); - return dsz; - } - - /* validate destination & pass to port, otherwise reject message */ - p_ptr = tipc_port_lock(destport); - if (likely(p_ptr)) { - err = tipc_sk_rcv(&tipc_port_to_sock(p_ptr)->sk, buf); - tipc_port_unlock(p_ptr); - if (likely(!err)) - return dsz; - } else { - err = TIPC_ERR_NO_PORT; - } - - return tipc_reject_msg(buf, err); -} - /* * tipc_port_iovec_rcv: Concatenate and deliver sectioned * message for this node. @@ -798,7 +767,7 @@ static int tipc_port_iovec_rcv(struct tipc_port *sender, res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf); if (likely(buf)) - tipc_port_rcv(buf); + tipc_sk_rcv(buf); return res; } diff --git a/net/tipc/port.h b/net/tipc/port.h index a00397393bd..cf4ca5b1d9a 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -42,9 +42,10 @@ #include "msg.h" #include "node_subscr.h" -#define TIPC_FLOW_CONTROL_WIN 512 -#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \ - SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) +#define TIPC_CONNACK_INTV 256 +#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) +#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ + SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) /** * struct tipc_port - TIPC port structure @@ -134,7 +135,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); /* * TIPC messaging routines */ -int tipc_port_rcv(struct sk_buff *buf); int tipc_send(struct tipc_port *port, struct iovec const *msg_sect, @@ -187,7 +187,7 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) static inline int tipc_port_congested(struct tipc_port *p_ptr) { - return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2); + return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN); } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3c0256962f7..ef0475568f9 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -36,6 +36,7 @@ #include "core.h" #include "port.h" +#include "node.h" #include <linux/export.h> @@ -44,7 +45,7 @@ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ -static int backlog_rcv(struct sock *sk, struct sk_buff *skb); +static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); @@ -195,11 +196,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sock->state = state; sock_init_data(sock, sk); - sk->sk_backlog_rcv = backlog_rcv; + sk->sk_backlog_rcv = tipc_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; - tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; + tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; + atomic_set(&tsk->dupl_rcvcnt, 0); tipc_port_unlock(port); if (sock->state == SS_READY) { @@ -983,10 +985,11 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, return 0; } -static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo) +static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) { struct sock *sk = sock->sk; DEFINE_WAIT(wait); + long timeo = *timeop; int err; for (;;) { @@ -1011,6 +1014,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo) break; } finish_wait(sk_sleep(sk), &wait); + *timeop = timeo; return err; } @@ -1054,7 +1058,7 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, restart: /* Look for a message in receive queue; wait if necessary */ - res = tipc_wait_for_rcvmsg(sock, timeo); + res = tipc_wait_for_rcvmsg(sock, &timeo); if (res) goto exit; @@ -1100,7 +1104,7 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) + (++port->conn_unacked >= TIPC_CONNACK_INTV)) tipc_acknowledge(port->ref, port->conn_unacked); advance_rx_queue(sk); } @@ -1152,7 +1156,7 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, restart: /* Look for a message in receive queue; wait if necessary */ - res = tipc_wait_for_rcvmsg(sock, timeo); + res = tipc_wait_for_rcvmsg(sock, &timeo); if (res) goto exit; @@ -1209,7 +1213,7 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) + if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) tipc_acknowledge(port->ref, port->conn_unacked); advance_rx_queue(sk); } @@ -1415,7 +1419,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) } /** - * backlog_rcv - handle incoming message from backlog queue + * tipc_backlog_rcv - handle incoming message from backlog queue * @sk: socket * @buf: message * @@ -1423,47 +1427,74 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) * * Returns 0 */ -static int backlog_rcv(struct sock *sk, struct sk_buff *buf) +static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) { u32 res; + struct tipc_sock *tsk = tipc_sk(sk); + uint truesize = buf->truesize; res = filter_rcv(sk, buf); - if (res) + if (unlikely(res)) tipc_reject_msg(buf, res); + + if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) + atomic_add(truesize, &tsk->dupl_rcvcnt); + return 0; } /** * tipc_sk_rcv - handle incoming message - * @sk: socket receiving message - * @buf: message - * - * Called with port lock already taken. - * - * Returns TIPC error status code (TIPC_OK if message is not to be rejected) + * @buf: buffer containing arriving message + * Consumes buffer + * Returns 0 if success, or errno: -EHOSTUNREACH */ -u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf) +int tipc_sk_rcv(struct sk_buff *buf) { - u32 res; + struct tipc_sock *tsk; + struct tipc_port *port; + struct sock *sk; + u32 dport = msg_destport(buf_msg(buf)); + int err = TIPC_OK; + uint limit; - /* - * Process message if socket is unlocked; otherwise add to backlog queue - * - * This code is based on sk_receive_skb(), but must be distinct from it - * since a TIPC-specific filter/reject mechanism is utilized - */ + /* Forward unresolved named message */ + if (unlikely(!dport)) { + tipc_net_route_msg(buf); + return 0; + } + + /* Validate destination */ + port = tipc_port_lock(dport); + if (unlikely(!port)) { + err = TIPC_ERR_NO_PORT; + goto exit; + } + + tsk = tipc_port_to_sock(port); + sk = &tsk->sk; + + /* Queue message */ bh_lock_sock(sk); + if (!sock_owned_by_user(sk)) { - res = filter_rcv(sk, buf); + err = filter_rcv(sk, buf); } else { - if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf))) - res = TIPC_ERR_OVERLOAD; - else - res = TIPC_OK; + if (sk->sk_backlog.len == 0) + atomic_set(&tsk->dupl_rcvcnt, 0); + limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); + if (sk_add_backlog(sk, buf, limit)) + err = TIPC_ERR_OVERLOAD; } + bh_unlock_sock(sk); + tipc_port_unlock(port); - return res; + if (likely(!err)) + return 0; +exit: + tipc_reject_msg(buf, err); + return -EHOSTUNREACH; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -1905,6 +1936,28 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, return put_user(sizeof(value), ol); } +int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) +{ + struct tipc_sioc_ln_req lnr; + void __user *argp = (void __user *)arg; + + switch (cmd) { + case SIOCGETLINKNAME: + if (copy_from_user(&lnr, argp, sizeof(lnr))) + return -EFAULT; + if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer, + lnr.linkname, TIPC_MAX_LINK_NAME)) { + if (copy_to_user(argp, &lnr, sizeof(lnr))) + return -EFAULT; + return 0; + } + return -EADDRNOTAVAIL; + break; + default: + return -ENOIOCTLCMD; + } +} + /* Protocol switches for the various types of TIPC sockets */ static const struct proto_ops msg_ops = { @@ -1917,7 +1970,7 @@ static const struct proto_ops msg_ops = { .accept = sock_no_accept, .getname = tipc_getname, .poll = tipc_poll, - .ioctl = sock_no_ioctl, + .ioctl = tipc_ioctl, .listen = sock_no_listen, .shutdown = tipc_shutdown, .setsockopt = tipc_setsockopt, @@ -1938,7 +1991,7 @@ static const struct proto_ops packet_ops = { .accept = tipc_accept, .getname = tipc_getname, .poll = tipc_poll, - .ioctl = sock_no_ioctl, + .ioctl = tipc_ioctl, .listen = tipc_listen, .shutdown = tipc_shutdown, .setsockopt = tipc_setsockopt, @@ -1959,7 +2012,7 @@ static const struct proto_ops stream_ops = { .accept = tipc_accept, .getname = tipc_getname, .poll = tipc_poll, - .ioctl = sock_no_ioctl, + .ioctl = tipc_ioctl, .listen = tipc_listen, .shutdown = tipc_shutdown, .setsockopt = tipc_setsockopt, diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 74e5c7f195a..3afcd2a70b3 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -44,12 +44,14 @@ * @port: port - interacts with 'sk' and with the rest of the TIPC stack * @peer_name: the peer of the connection, if any * @conn_timeout: the time we can wait for an unresponded setup request + * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue */ struct tipc_sock { struct sock sk; struct tipc_port port; unsigned int conn_timeout; + atomic_t dupl_rcvcnt; }; static inline struct tipc_sock *tipc_sk(const struct sock *sk) @@ -67,6 +69,6 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk) tsk->sk.sk_write_space(&tsk->sk); } -u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf); +int tipc_sk_rcv(struct sk_buff *buf); #endif |