From 3e4b6ab58d614934e7ca99bdf448089695d34ffa Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Thu, 20 Nov 2014 10:29:17 +0100 Subject: tipc: add node get/dump to new netlink api Add TIPC_NL_NODE_GET to the new tipc netlink API. This command can dump the address and node status of all nodes in the tipc cluster. Netlink logical layout of returned node/address data: -> node -> address -> up flag Signed-off-by: Richard Alpe Reviewed-by: Erik Hugne Reviewed-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/node.c | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) (limited to 'net/tipc/node.c') diff --git a/net/tipc/node.c b/net/tipc/node.c index 5781634e957..72a75d4838b 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -58,6 +58,12 @@ struct tipc_sock_conn { struct list_head list; }; +static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = { + [TIPC_NLA_NODE_UNSPEC] = { .type = NLA_UNSPEC }, + [TIPC_NLA_NODE_ADDR] = { .type = NLA_U32 }, + [TIPC_NLA_NODE_UP] = { .type = NLA_FLAG } +}; + /* * A trivial power-of-two bitmask technique is used for speed, since this * operation is done for every incoming TIPC packet. The number of hash table @@ -601,3 +607,93 @@ void tipc_node_unlock(struct tipc_node *node) tipc_nametbl_withdraw(TIPC_LINK_STATE, addr, link_id, addr); } + +/* Caller should hold node lock for the passed node */ +int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) +{ + void *hdr; + struct nlattr *attrs; + + hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, + NLM_F_MULTI, TIPC_NL_NODE_GET); + if (!hdr) + return -EMSGSIZE; + + attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE); + if (!attrs) + goto msg_full; + + if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr)) + goto attr_msg_full; + if (tipc_node_is_up(node)) + if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP)) + goto attr_msg_full; + + nla_nest_end(msg->skb, attrs); + genlmsg_end(msg->skb, hdr); + + return 0; + +attr_msg_full: + nla_nest_cancel(msg->skb, attrs); +msg_full: + genlmsg_cancel(msg->skb, hdr); + + return -EMSGSIZE; +} + +int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) +{ + int err; + int done = cb->args[0]; + int last_addr = cb->args[1]; + struct tipc_node *node; + struct tipc_nl_msg msg; + + if (done) + return 0; + + msg.skb = skb; + msg.portid = NETLINK_CB(cb->skb).portid; + msg.seq = cb->nlh->nlmsg_seq; + + rcu_read_lock(); + + if (last_addr && !tipc_node_find(last_addr)) { + rcu_read_unlock(); + /* We never set seq or call nl_dump_check_consistent() this + * means that setting prev_seq here will cause the consistence + * check to fail in the netlink callback handler. Resulting in + * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if + * the node state changed while we released the lock. + */ + cb->prev_seq = 1; + return -EPIPE; + } + + list_for_each_entry_rcu(node, &tipc_node_list, list) { + if (last_addr) { + if (node->addr == last_addr) + last_addr = 0; + else + continue; + } + + tipc_node_lock(node); + err = __tipc_nl_add_node(&msg, node); + if (err) { + last_addr = node->addr; + tipc_node_unlock(node); + goto out; + } + + tipc_node_unlock(node); + } + done = 1; +out: + cb->args[0] = done; + cb->args[1] = last_addr; + rcu_read_unlock(); + + return skb->len; +} -- cgit v1.2.3-70-g09d2 From d8182804cfd6503e73dc1c0a409903412a389541 Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Mon, 24 Nov 2014 11:10:29 +0100 Subject: tipc: fix sparse warnings in new nl api Fix sparse warnings about non-static declaration of static functions in the new tipc netlink API. Signed-off-by: Richard Alpe Signed-off-by: David S. Miller --- net/tipc/bcast.c | 3 ++- net/tipc/bearer.c | 6 ++++-- net/tipc/link.c | 10 ++++++---- net/tipc/name_table.c | 13 +++++++------ net/tipc/node.c | 2 +- net/tipc/socket.c | 16 +++++++++------- 6 files changed, 29 insertions(+), 21 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index dcf3589e3cc..556b26ad4b1 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -767,7 +767,8 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) tipc_bclink_unlock(); } -int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, struct tipc_stats *stats) +static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, + struct tipc_stats *stats) { int i; struct nlattr *nest; diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 5f6f32369c1..463db5b15b8 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -647,7 +647,8 @@ void tipc_bearer_stop(void) } /* Caller should hold rtnl_lock to protect the bearer */ -int __tipc_nl_add_bearer(struct tipc_nl_msg *msg, struct tipc_bearer *bearer) +static int __tipc_nl_add_bearer(struct tipc_nl_msg *msg, + struct tipc_bearer *bearer) { void *hdr; struct nlattr *attrs; @@ -905,7 +906,8 @@ int tipc_nl_bearer_set(struct sk_buff *skb, struct genl_info *info) return 0; } -int __tipc_nl_add_media(struct tipc_nl_msg *msg, struct tipc_media *media) +static int __tipc_nl_add_media(struct tipc_nl_msg *msg, + struct tipc_media *media) { void *hdr; struct nlattr *attrs; diff --git a/net/tipc/link.c b/net/tipc/link.c index 629e8cf393b..4738cb1bf7c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2514,7 +2514,8 @@ out: return res; } -int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s) + +static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s) { int i; struct nlattr *stats; @@ -2580,7 +2581,7 @@ msg_full: } /* Caller should hold appropriate locks to protect the link */ -int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link) +static int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link) { int err; void *hdr; @@ -2649,8 +2650,9 @@ msg_full: } /* Caller should hold node lock */ -int __tipc_nl_add_node_links(struct tipc_nl_msg *msg, struct tipc_node *node, - u32 *prev_link) +static int __tipc_nl_add_node_links(struct tipc_nl_msg *msg, + struct tipc_node *node, + u32 *prev_link) { u32 i; int err; diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 30ca8e0e0f8..7cfb7a4aa58 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -1002,8 +1002,9 @@ void tipc_nametbl_stop(void) write_unlock_bh(&tipc_nametbl_lock); } -int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, struct name_seq *seq, - struct sub_seq *sseq, u32 *last_publ) +static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, + struct name_seq *seq, + struct sub_seq *sseq, u32 *last_publ) { void *hdr; struct nlattr *attrs; @@ -1071,8 +1072,8 @@ msg_full: return -EMSGSIZE; } -int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, - u32 *last_lower, u32 *last_publ) +static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, + u32 *last_lower, u32 *last_publ) { struct sub_seq *sseq; struct sub_seq *sseq_start; @@ -1098,8 +1099,8 @@ int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, return 0; } -int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type, u32 *last_lower, - u32 *last_publ) +static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type, + u32 *last_lower, u32 *last_publ) { struct hlist_head *seq_head; struct name_seq *seq; diff --git a/net/tipc/node.c b/net/tipc/node.c index 72a75d4838b..82e5edddc37 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -609,7 +609,7 @@ void tipc_node_unlock(struct tipc_node *node) } /* Caller should hold node lock for the passed node */ -int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) +static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) { void *hdr; struct nlattr *attrs; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index e91809182c2..6aa8c6a1ab1 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2811,7 +2811,7 @@ void tipc_socket_stop(void) } /* Caller should hold socket lock for the passed tipc socket. */ -int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk) +static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk) { u32 peer_node; u32 peer_port; @@ -2846,8 +2846,8 @@ msg_full: } /* Caller should hold socket lock for the passed tipc socket. */ -int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb, - struct tipc_sock *tsk) +static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb, + struct tipc_sock *tsk) { int err; void *hdr; @@ -2912,8 +2912,9 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb) } /* Caller should hold socket lock for the passed tipc socket. */ -int __tipc_nl_add_sk_publ(struct sk_buff *skb, struct netlink_callback *cb, - struct publication *publ) +static int __tipc_nl_add_sk_publ(struct sk_buff *skb, + struct netlink_callback *cb, + struct publication *publ) { void *hdr; struct nlattr *attrs; @@ -2950,8 +2951,9 @@ msg_cancel: } /* Caller should hold socket lock for the passed tipc socket. */ -int __tipc_nl_list_sk_publ(struct sk_buff *skb, struct netlink_callback *cb, - struct tipc_sock *tsk, u32 *last_publ) +static int __tipc_nl_list_sk_publ(struct sk_buff *skb, + struct netlink_callback *cb, + struct tipc_sock *tsk, u32 *last_publ) { int err; struct publication *p; -- cgit v1.2.3-70-g09d2 From a8f48af587b0f257c49dce5b49a62554a4b8627e Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Wed, 26 Nov 2014 11:41:45 +0800 Subject: tipc: remove node subscription infrastructure The node subscribe infrastructure represents a virtual base class, so its users, such as struct tipc_port and struct publication, can derive its implemented functionalities. However, after the removal of struct tipc_port, struct publication is left as its only single user now. So defining an abstract infrastructure for one user becomes no longer reasonable. If corresponding new functions associated with the infrastructure are moved to name_table.c file, the node subscription infrastructure can be removed as well. Signed-off-by: Ying Xue Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/Makefile | 4 +-- net/tipc/name_distr.c | 52 +++++++++++++++++++++++---- net/tipc/name_distr.h | 1 + net/tipc/name_table.c | 2 +- net/tipc/name_table.h | 6 ++-- net/tipc/node.c | 6 ++-- net/tipc/node.h | 5 ++- net/tipc/node_subscr.c | 96 -------------------------------------------------- net/tipc/node_subscr.h | 63 --------------------------------- 9 files changed, 56 insertions(+), 179 deletions(-) delete mode 100644 net/tipc/node_subscr.c delete mode 100644 net/tipc/node_subscr.h (limited to 'net/tipc/node.c') diff --git a/net/tipc/Makefile b/net/tipc/Makefile index b8a13caad59..333e4592772 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -7,8 +7,8 @@ obj-$(CONFIG_TIPC) := tipc.o tipc-y += addr.o bcast.o bearer.o config.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ - netlink.o node.o node_subscr.o \ - socket.o log.o eth_media.o server.o + netlink.o node.o socket.o log.o eth_media.o \ + server.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o tipc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 376d2bb51d8..6c2638d3c65 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -250,13 +250,45 @@ void tipc_named_node_up(u32 dnode) tipc_link_xmit(buf_chain, dnode, dnode); } +static void tipc_publ_subscribe(struct publication *publ, u32 addr) +{ + struct tipc_node *node; + + if (in_own_node(addr)) + return; + + node = tipc_node_find(addr); + if (!node) { + pr_warn("Node subscription rejected, unknown node 0x%x\n", + addr); + return; + } + + tipc_node_lock(node); + list_add_tail(&publ->nodesub_list, &node->publ_list); + tipc_node_unlock(node); +} + +static void tipc_publ_unsubscribe(struct publication *publ, u32 addr) +{ + struct tipc_node *node; + + node = tipc_node_find(addr); + if (!node) + return; + + tipc_node_lock(node); + list_del_init(&publ->nodesub_list); + tipc_node_unlock(node); +} + /** - * named_purge_publ - remove publication associated with a failed node + * tipc_publ_purge - remove publication associated with a failed node * * Invoked for each publication issued by a newly failed node. * Removes publication structure from name table & deletes it. */ -static void named_purge_publ(struct publication *publ) +static void tipc_publ_purge(struct publication *publ, u32 addr) { struct publication *p; @@ -264,7 +296,7 @@ static void named_purge_publ(struct publication *publ) p = tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, publ->ref, publ->key); if (p) - tipc_nodesub_unsubscribe(&p->subscr); + tipc_publ_unsubscribe(p, addr); write_unlock_bh(&tipc_nametbl_lock); if (p != publ) { @@ -277,6 +309,14 @@ static void named_purge_publ(struct publication *publ) kfree(p); } +void tipc_publ_notify(struct list_head *nsub_list, u32 addr) +{ + struct publication *publ, *tmp; + + list_for_each_entry_safe(publ, tmp, nsub_list, nodesub_list) + tipc_publ_purge(publ, addr); +} + /** * tipc_update_nametbl - try to process a nametable update and notify * subscribers @@ -294,9 +334,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype) TIPC_CLUSTER_SCOPE, node, ntohl(i->ref), ntohl(i->key)); if (publ) { - tipc_nodesub_subscribe(&publ->subscr, node, publ, - (net_ev_handler) - named_purge_publ); + tipc_publ_subscribe(publ, node); return true; } } else if (dtype == WITHDRAWAL) { @@ -304,7 +342,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype) node, ntohl(i->ref), ntohl(i->key)); if (publ) { - tipc_nodesub_unsubscribe(&publ->subscr); + tipc_publ_unsubscribe(publ, node); kfree(publ); return true; } diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index b9e75feb343..cef55cedcfb 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -74,5 +74,6 @@ void tipc_named_node_up(u32 dnode); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); void tipc_named_process_backlog(void); +void tipc_publ_notify(struct list_head *nsub_list, u32 addr); #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 7cfb7a4aa58..772be1cd8bf 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -144,7 +144,7 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper, publ->key = key; INIT_LIST_HEAD(&publ->local_list); INIT_LIST_HEAD(&publ->pport_list); - INIT_LIST_HEAD(&publ->subscr.nodesub_list); + INIT_LIST_HEAD(&publ->nodesub_list); return publ; } diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index b38ebecac76..c6287782665 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -37,8 +37,6 @@ #ifndef _TIPC_NAME_TABLE_H #define _TIPC_NAME_TABLE_H -#include "node_subscr.h" - struct tipc_subscription; struct tipc_port_list; @@ -56,7 +54,7 @@ struct tipc_port_list; * @node: network address of publishing port's node * @ref: publishing port * @key: publication key - * @subscr: subscription to "node down" event (for off-node publications only) + * @nodesub_list: subscription to "node down" event (off-node publication only) * @local_list: adjacent entries in list of publications made by this node * @pport_list: adjacent entries in list of publications made by this port * @node_list: adjacent matching name seq publications with >= node scope @@ -73,7 +71,7 @@ struct publication { u32 node; u32 ref; u32 key; - struct tipc_node_subscr subscr; + struct list_head nodesub_list; struct list_head local_list; struct list_head pport_list; struct list_head node_list; diff --git a/net/tipc/node.c b/net/tipc/node.c index 82e5edddc37..17b8092f9c4 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -113,7 +113,7 @@ struct tipc_node *tipc_node_create(u32 addr) spin_lock_init(&n_ptr->lock); INIT_HLIST_NODE(&n_ptr->hash); INIT_LIST_HEAD(&n_ptr->list); - INIT_LIST_HEAD(&n_ptr->nsub); + INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); __skb_queue_head_init(&n_ptr->waiting_sks); @@ -574,7 +574,7 @@ void tipc_node_unlock(struct tipc_node *node) skb_queue_splice_init(&node->waiting_sks, &waiting_sks); if (flags & TIPC_NOTIFY_NODE_DOWN) { - list_replace_init(&node->nsub, &nsub_list); + list_replace_init(&node->publ_list, &nsub_list); list_replace_init(&node->conn_sks, &conn_sks); } node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | @@ -591,7 +591,7 @@ void tipc_node_unlock(struct tipc_node *node) tipc_node_abort_sock_conns(&conn_sks); if (!list_empty(&nsub_list)) - tipc_nodesub_notify(&nsub_list); + tipc_publ_notify(&nsub_list, addr); if (flags & TIPC_WAKEUP_BCAST_USERS) tipc_bclink_wakeup_users(); diff --git a/net/tipc/node.h b/net/tipc/node.h index 005fbcef321..f1994511f03 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -37,7 +37,6 @@ #ifndef _TIPC_NODE_H #define _TIPC_NODE_H -#include "node_subscr.h" #include "addr.h" #include "net.h" #include "bearer.h" @@ -104,7 +103,7 @@ struct tipc_node_bclink { * @link_cnt: number of links to node * @signature: node instance identifier * @link_id: local and remote bearer ids of changing link, if any - * @nsub: list of "node down" subscriptions monitoring node + * @publ_list: list of publications * @rcu: rcu struct for tipc_node */ struct tipc_node { @@ -121,7 +120,7 @@ struct tipc_node { int working_links; u32 signature; u32 link_id; - struct list_head nsub; + struct list_head publ_list; struct sk_buff_head waiting_sks; struct list_head conn_sks; struct rcu_head rcu; diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c deleted file mode 100644 index 2d13eea8574..00000000000 --- a/net/tipc/node_subscr.c +++ /dev/null @@ -1,96 +0,0 @@ -/* - * net/tipc/node_subscr.c: TIPC "node down" subscription handling - * - * Copyright (c) 1995-2006, Ericsson AB - * Copyright (c) 2005, 2010-2011, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "core.h" -#include "node_subscr.h" -#include "node.h" - -/** - * tipc_nodesub_subscribe - create "node down" subscription for specified node - */ -void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, - void *usr_handle, net_ev_handler handle_down) -{ - if (in_own_node(addr)) { - node_sub->node = NULL; - return; - } - - node_sub->node = tipc_node_find(addr); - if (!node_sub->node) { - pr_warn("Node subscription rejected, unknown node 0x%x\n", - addr); - return; - } - node_sub->handle_node_down = handle_down; - node_sub->usr_handle = usr_handle; - - tipc_node_lock(node_sub->node); - list_add_tail(&node_sub->nodesub_list, &node_sub->node->nsub); - tipc_node_unlock(node_sub->node); -} - -/** - * tipc_nodesub_unsubscribe - cancel "node down" subscription (if any) - */ -void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub) -{ - if (!node_sub->node) - return; - - tipc_node_lock(node_sub->node); - list_del_init(&node_sub->nodesub_list); - tipc_node_unlock(node_sub->node); -} - -/** - * tipc_nodesub_notify - notify subscribers that a node is unreachable - * - * Note: node is locked by caller - */ -void tipc_nodesub_notify(struct list_head *nsub_list) -{ - struct tipc_node_subscr *ns, *safe; - net_ev_handler handle_node_down; - - list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { - handle_node_down = ns->handle_node_down; - if (handle_node_down) { - ns->handle_node_down = NULL; - handle_node_down(ns->usr_handle); - } - } -} diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h deleted file mode 100644 index d91b8cc81e3..00000000000 --- a/net/tipc/node_subscr.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * net/tipc/node_subscr.h: Include file for TIPC "node down" subscription handling - * - * Copyright (c) 1995-2006, Ericsson AB - * Copyright (c) 2005, 2010-2011, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _TIPC_NODE_SUBSCR_H -#define _TIPC_NODE_SUBSCR_H - -#include "addr.h" - -typedef void (*net_ev_handler) (void *usr_handle); - -/** - * struct tipc_node_subscr - "node down" subscription entry - * @node: ptr to node structure of interest (or NULL, if none) - * @handle_node_down: routine to invoke when node fails - * @usr_handle: argument to pass to routine when node fails - * @nodesub_list: adjacent entries in list of subscriptions for the node - */ -struct tipc_node_subscr { - struct tipc_node *node; - net_ev_handler handle_node_down; - void *usr_handle; - struct list_head nodesub_list; -}; - -void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, - void *usr_handle, net_ev_handler handle_down); -void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub); -void tipc_nodesub_notify(struct list_head *nsub_list); - -#endif -- cgit v1.2.3-70-g09d2 From bc6fecd4098df2d21b056486e5b418c84be95032 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Wed, 26 Nov 2014 11:41:53 +0800 Subject: tipc: use generic SKB list APIs to manage deferred queue of link Use standard SKB list APIs associated with struct sk_buff_head to manage link's deferred queue, simplifying relevant code. Signed-off-by: Ying Xue Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/bcast.c | 20 ++++++--------- net/tipc/link.c | 74 ++++++++++++++++++++++++-------------------------------- net/tipc/link.h | 11 +++------ net/tipc/node.c | 4 +-- net/tipc/node.h | 7 ++---- 5 files changed, 47 insertions(+), 69 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 4a1a3c8627d..7b238b1f339 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -352,6 +352,8 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) buf = tipc_buf_acquire(INT_H_SIZE); if (buf) { struct tipc_msg *msg = buf_msg(buf); + struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); + u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, n_ptr->addr); @@ -359,9 +361,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) msg_set_mc_netid(msg, tipc_net_id); msg_set_bcast_ack(msg, n_ptr->bclink.last_in); msg_set_bcgap_after(msg, n_ptr->bclink.last_in); - msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head - ? buf_seqno(n_ptr->bclink.deferred_head) - 1 - : n_ptr->bclink.last_sent); + msg_set_bcgap_to(msg, to); tipc_bclink_lock(); tipc_bearer_send(MAX_BEARERS, buf, NULL); @@ -574,31 +574,26 @@ receive: if (node->bclink.last_in == node->bclink.last_sent) goto unlock; - if (!node->bclink.deferred_head) { + if (skb_queue_empty(&node->bclink.deferred_queue)) { node->bclink.oos_state = 1; goto unlock; } - msg = buf_msg(node->bclink.deferred_head); + msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); seqno = msg_seqno(msg); next_in = mod(next_in + 1); if (seqno != next_in) goto unlock; /* Take in-sequence message from deferred queue & deliver it */ - buf = node->bclink.deferred_head; - node->bclink.deferred_head = buf->next; - buf->next = NULL; - node->bclink.deferred_size--; + buf = __skb_dequeue(&node->bclink.deferred_queue); goto receive; } /* Handle out-of-sequence broadcast message */ if (less(next_in, seqno)) { - deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, - &node->bclink.deferred_tail, + deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, buf); - node->bclink.deferred_size += deferred; bclink_update_last_sent(node, seqno); buf = NULL; } @@ -954,6 +949,7 @@ int tipc_bclink_init(void) spin_lock_init(&bclink->lock); __skb_queue_head_init(&bcl->outqueue); + __skb_queue_head_init(&bcl->deferred_queue); __skb_queue_head_init(&bcl->waiting_sks); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); diff --git a/net/tipc/link.c b/net/tipc/link.c index 9e94bf935e4..d9c2310e417 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -292,6 +292,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, l_ptr->next_out_no = 1; __skb_queue_head_init(&l_ptr->outqueue); + __skb_queue_head_init(&l_ptr->deferred_queue); __skb_queue_head_init(&l_ptr->waiting_sks); link_reset_statistics(l_ptr); @@ -398,7 +399,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) */ void tipc_link_purge_queues(struct tipc_link *l_ptr) { - kfree_skb_list(l_ptr->oldest_deferred_in); + __skb_queue_purge(&l_ptr->deferred_queue); __skb_queue_purge(&l_ptr->outqueue); tipc_link_reset_fragments(l_ptr); } @@ -433,7 +434,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) /* Clean up all queues: */ __skb_queue_purge(&l_ptr->outqueue); - kfree_skb_list(l_ptr->oldest_deferred_in); + __skb_queue_purge(&l_ptr->deferred_queue); if (!skb_queue_empty(&l_ptr->waiting_sks)) { skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); owner->action_flags |= TIPC_WAKEUP_USERS; @@ -442,9 +443,6 @@ void tipc_link_reset(struct tipc_link *l_ptr) l_ptr->unacked_window = 0; l_ptr->checkpoint = 1; l_ptr->next_out_no = 1; - l_ptr->deferred_inqueue_sz = 0; - l_ptr->oldest_deferred_in = NULL; - l_ptr->newest_deferred_in = NULL; l_ptr->fsm_msg_cnt = 0; l_ptr->stale_count = 0; link_reset_statistics(l_ptr); @@ -974,19 +972,23 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr, struct sk_buff *buf) { + struct sk_buff_head head; + struct sk_buff *skb = NULL; u32 seq_no; - if (l_ptr->oldest_deferred_in == NULL) + if (skb_queue_empty(&l_ptr->deferred_queue)) return buf; - seq_no = buf_seqno(l_ptr->oldest_deferred_in); + seq_no = buf_seqno(skb_peek(&l_ptr->deferred_queue)); if (seq_no == mod(l_ptr->next_in_no)) { - l_ptr->newest_deferred_in->next = buf; - buf = l_ptr->oldest_deferred_in; - l_ptr->oldest_deferred_in = NULL; - l_ptr->deferred_inqueue_sz = 0; + __skb_queue_head_init(&head); + skb_queue_splice_tail_init(&l_ptr->deferred_queue, &head); + skb = head.next; + skb->prev = NULL; + head.prev->next = buf; + head.prev->prev = NULL; } - return buf; + return skb; } /** @@ -1170,7 +1172,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) continue; } l_ptr->next_in_no++; - if (unlikely(l_ptr->oldest_deferred_in)) + if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) head = link_insert_deferred_queue(l_ptr, head); if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { @@ -1273,48 +1275,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) * * Returns increase in queue length (i.e. 0 or 1) */ -u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff *buf) +u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) { - struct sk_buff *queue_buf; - struct sk_buff **prev; - u32 seq_no = buf_seqno(buf); - - buf->next = NULL; + struct sk_buff *skb1; + u32 seq_no = buf_seqno(skb); /* Empty queue ? */ - if (*head == NULL) { - *head = *tail = buf; + if (skb_queue_empty(list)) { + __skb_queue_tail(list, skb); return 1; } /* Last ? */ - if (less(buf_seqno(*tail), seq_no)) { - (*tail)->next = buf; - *tail = buf; + if (less(buf_seqno(skb_peek_tail(list)), seq_no)) { + __skb_queue_tail(list, skb); return 1; } /* Locate insertion point in queue, then insert; discard if duplicate */ - prev = head; - queue_buf = *head; - for (;;) { - u32 curr_seqno = buf_seqno(queue_buf); + skb_queue_walk(list, skb1) { + u32 curr_seqno = buf_seqno(skb1); if (seq_no == curr_seqno) { - kfree_skb(buf); + kfree_skb(skb); return 0; } if (less(seq_no, curr_seqno)) break; - - prev = &queue_buf->next; - queue_buf = queue_buf->next; } - buf->next = queue_buf; - *prev = buf; + __skb_queue_before(list, skb1, skb); return 1; } @@ -1344,15 +1335,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, return; } - if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, - &l_ptr->newest_deferred_in, buf)) { - l_ptr->deferred_inqueue_sz++; + if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { l_ptr->stats.deferred_recv++; TIPC_SKB_CB(buf)->deferred = true; - if ((l_ptr->deferred_inqueue_sz % 16) == 1) + if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1) tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); - } else + } else { l_ptr->stats.duplicates++; + } } /* @@ -1388,8 +1378,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, if (l_ptr->next_out) next_sent = buf_seqno(l_ptr->next_out); msg_set_next_sent(msg, next_sent); - if (l_ptr->oldest_deferred_in) { - u32 rec = buf_seqno(l_ptr->oldest_deferred_in); + if (!skb_queue_empty(&l_ptr->deferred_queue)) { + u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); gap = mod(rec - mod(l_ptr->next_in_no)); } msg_set_seq_gap(msg, gap); diff --git a/net/tipc/link.h b/net/tipc/link.h index 96f1e1bf079..de7b8833641 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -124,9 +124,7 @@ struct tipc_stats { * @last_retransmitted: sequence number of most recently retransmitted message * @stale_count: # of identical retransmit requests made by peer * @next_in_no: next sequence number to expect for inbound messages - * @deferred_inqueue_sz: # of messages in inbound message queue - * @oldest_deferred_in: ptr to first inbound message in queue - * @newest_deferred_in: ptr to last inbound message in queue + * @deferred_queue: deferred queue saved OOS b'cast message received from node * @unacked_window: # of inbound messages rx'd without ack'ing back to peer * @next_out: ptr to first unsent outbound message in queue * @waiting_sks: linked list of sockets waiting for link congestion to abate @@ -178,9 +176,7 @@ struct tipc_link { /* Reception */ u32 next_in_no; - u32 deferred_inqueue_sz; - struct sk_buff *oldest_deferred_in; - struct sk_buff *newest_deferred_in; + struct sk_buff_head deferred_queue; u32 unacked_window; /* Congestion handling */ @@ -224,8 +220,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); void tipc_link_push_packets(struct tipc_link *l_ptr); -u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff *buf); +u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf); void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *start, u32 retransmits); diff --git a/net/tipc/node.c b/net/tipc/node.c index 17b8092f9c4..69b96be09a8 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -116,6 +116,7 @@ struct tipc_node *tipc_node_create(u32 addr) INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); __skb_queue_head_init(&n_ptr->waiting_sks); + __skb_queue_head_init(&n_ptr->bclink.deferred_queue); hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); @@ -381,8 +382,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Flush broadcast link info associated with lost node */ if (n_ptr->bclink.recv_permitted) { - kfree_skb_list(n_ptr->bclink.deferred_head); - n_ptr->bclink.deferred_size = 0; + __skb_queue_purge(&n_ptr->bclink.deferred_queue); if (n_ptr->bclink.reasm_buf) { kfree_skb(n_ptr->bclink.reasm_buf); diff --git a/net/tipc/node.h b/net/tipc/node.h index f1994511f03..cbe0e950f1c 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -71,9 +71,7 @@ enum { * @last_in: sequence # of last in-sequence b'cast message received from node * @last_sent: sequence # of last b'cast message sent by node * @oos_state: state tracker for handling OOS b'cast messages - * @deferred_size: number of OOS b'cast messages in deferred queue - * @deferred_head: oldest OOS b'cast message received from node - * @deferred_tail: newest OOS b'cast message received from node + * @deferred_queue: deferred queue saved OOS b'cast message received from node * @reasm_buf: broadcast reassembly queue head from node * @recv_permitted: true if node is allowed to receive b'cast messages */ @@ -83,8 +81,7 @@ struct tipc_node_bclink { u32 last_sent; u32 oos_state; u32 deferred_size; - struct sk_buff *deferred_head; - struct sk_buff *deferred_tail; + struct sk_buff_head deferred_queue; struct sk_buff *reasm_buf; bool recv_permitted; }; -- cgit v1.2.3-70-g09d2 From 340b6e59fbc6ac97469253315c96e952908c9c0d Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Wed, 10 Dec 2014 09:46:54 +0100 Subject: tipc: fix broadcast wakeup contention after congestion commit 908344cdda80 ("tipc: fix bug in multicast congestion handling") introduced a race in the broadcast link wakeup functionality. This patch eliminates this broadcast link wakeup race caused by operation on the wakeup list without proper locking. If this race hit and corrupted the list all subsequent wakeup messages would be lost, resulting in a considerable memory leak. Signed-off-by: Richard Alpe Signed-off-by: Erik Hugne Signed-off-by: David S. Miller --- net/tipc/link.c | 8 ++++---- net/tipc/node.c | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'net/tipc/node.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 34bf15c90c7..23bcc113236 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -293,7 +293,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, l_ptr->next_out_no = 1; __skb_queue_head_init(&l_ptr->outqueue); __skb_queue_head_init(&l_ptr->deferred_queue); - __skb_queue_head_init(&l_ptr->waiting_sks); + skb_queue_head_init(&l_ptr->waiting_sks); link_reset_statistics(l_ptr); @@ -358,7 +358,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport, return false; TIPC_SKB_CB(buf)->chain_sz = chain_sz; TIPC_SKB_CB(buf)->chain_imp = imp; - __skb_queue_tail(&link->waiting_sks, buf); + skb_queue_tail(&link->waiting_sks, buf); link->stats.link_congs++; return true; } @@ -378,8 +378,8 @@ static void link_prepare_wakeup(struct tipc_link *link) if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) break; pend_qsz += TIPC_SKB_CB(skb)->chain_sz; - __skb_unlink(skb, &link->waiting_sks); - __skb_queue_tail(&link->owner->waiting_sks, skb); + skb_unlink(skb, &link->waiting_sks); + skb_queue_tail(&link->owner->waiting_sks, skb); } } diff --git a/net/tipc/node.c b/net/tipc/node.c index 69b96be09a8..8d353ec77a6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -115,7 +115,7 @@ struct tipc_node *tipc_node_create(u32 addr) INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); - __skb_queue_head_init(&n_ptr->waiting_sks); + skb_queue_head_init(&n_ptr->waiting_sks); __skb_queue_head_init(&n_ptr->bclink.deferred_queue); hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); -- cgit v1.2.3-70-g09d2