From 50100a5e39461b2a61d6040e73c384766c29975d Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Fri, 22 Aug 2014 18:09:07 -0400 Subject: tipc: use pseudo message to wake up sockets after link congestion The current link implementation keeps a linked list of blocked ports/ sockets that is populated when there is link congestion. The purpose of this is to let the link know which users to wake up when the congestion abates. This adds unnecessary complexity to the data structure and the code, since it forces us to involve the link each time we want to delete a socket. It also forces us to grab the spinlock port_lock within the scope of node_lock. We want to get rid of this direct dependence, as well as the deadlock hazard resulting from the usage of port_lock. In this commit, we instead let the link keep list of a "wakeup" pseudo messages for use in such situations. Those messages are sent to the pending sockets via the ordinary message reception path, and wake up the socket's owner when they are received. This enables us to get rid of the 'waiting_ports' linked lists in struct tipc_port that manifest this direct reference. As a consequence, we can eliminate another BH entry into the socket, and hence the need to grab port_lock. This is a further step in our effort to remove port_lock altogether. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 119 +++++++++++++++++++++++++------------------------------- 1 file changed, 54 insertions(+), 65 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index fb1485dc673..6c775a107a0 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, link_init_max_pkt(l_ptr); l_ptr->next_out_no = 1; - INIT_LIST_HEAD(&l_ptr->waiting_ports); + __skb_queue_head_init(&l_ptr->waiting_sks); link_reset_statistics(l_ptr); @@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) } /** - * link_schedule_port - schedule port for deferred sending - * @l_ptr: pointer to link - * @origport: reference to sending port - * @sz: amount of data to be sent - * - * Schedules port for renewed sending of messages after link congestion - * has abated. + * link_schedule_user - schedule user for wakeup after congestion + * @link: congested link + * @oport: sending port + * @chain_sz: size of buffer chain that was attempted sent + * @imp: importance of message attempted sent + * Create pseudo msg to send back to user when congestion abates */ -static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) +static bool link_schedule_user(struct tipc_link *link, u32 oport, + uint chain_sz, uint imp) { - struct tipc_port *p_ptr; - struct tipc_sock *tsk; + struct sk_buff *buf; - spin_lock_bh(&tipc_port_list_lock); - p_ptr = tipc_port_lock(origport); - if (p_ptr) { - if (!list_empty(&p_ptr->wait_list)) - goto exit; - tsk = tipc_port_to_sock(p_ptr); - tsk->link_cong = 1; - p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); - list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); - l_ptr->stats.link_congs++; -exit: - tipc_port_unlock(p_ptr); - } - spin_unlock_bh(&tipc_port_list_lock); - return -ELINKCONG; + buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr, + tipc_own_addr, oport, 0, 0); + if (!buf) + return false; + TIPC_SKB_CB(buf)->chain_sz = chain_sz; + TIPC_SKB_CB(buf)->chain_imp = imp; + __skb_queue_tail(&link->waiting_sks, buf); + link->stats.link_congs++; + return true; } -void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) +/** + * link_prepare_wakeup - prepare users for wakeup after congestion + * @link: congested link + * Move a number of waiting users, as permitted by available space in + * the send queue, from link wait queue to node wait queue for wakeup + */ +static void link_prepare_wakeup(struct tipc_link *link) { - struct tipc_port *p_ptr; - struct tipc_sock *tsk; - struct tipc_port *temp_p_ptr; - int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; - - if (all) - win = 100000; - if (win <= 0) - return; - if (!spin_trylock_bh(&tipc_port_list_lock)) - return; - if (link_congested(l_ptr)) - goto exit; - list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, - wait_list) { - if (win <= 0) + struct sk_buff_head *wq = &link->waiting_sks; + struct sk_buff *buf; + uint pend_qsz = link->out_queue_size; + + for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { + if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) break; - tsk = tipc_port_to_sock(p_ptr); - list_del_init(&p_ptr->wait_list); - spin_lock_bh(p_ptr->lock); - tsk->link_cong = 0; - tipc_sock_wakeup(tsk); - win -= p_ptr->waiting_pkts; - spin_unlock_bh(p_ptr->lock); + pend_qsz += TIPC_SKB_CB(buf)->chain_sz; + __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); } - -exit: - spin_unlock_bh(&tipc_port_list_lock); } /** @@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) u32 prev_state = l_ptr->state; u32 checkpoint = l_ptr->next_in_no; int was_active_link = tipc_link_is_active(l_ptr); + struct tipc_node *owner = l_ptr->owner; msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); @@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr) kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; kfree_skb_list(l_ptr->oldest_deferred_in); - if (!list_empty(&l_ptr->waiting_ports)) - tipc_link_wakeup_ports(l_ptr, 1); - + if (!skb_queue_empty(&l_ptr->waiting_sks)) { + skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); + owner->action_flags |= TIPC_WAKEUP_USERS; + } l_ptr->retransm_queue_head = 0; l_ptr->retransm_queue_size = 0; l_ptr->last_out = NULL; @@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); - uint psz = msg_size(msg); uint imp = tipc_msg_tot_importance(msg); u32 oport = msg_tot_origport(msg); - if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { - if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) { - link_schedule_port(link, oport, psz); - return -ELINKCONG; - } - } else { + if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); tipc_link_reset(link); + goto drop; } + if (unlikely(msg_errcode(msg))) + goto drop; + if (unlikely(msg_reroute_cnt(msg))) + goto drop; + if (TIPC_SKB_CB(buf)->wakeup_pending) + return -ELINKCONG; + if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) + return -ELINKCONG; +drop: kfree_skb_list(buf); return -EHOSTUNREACH; } @@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->next_out)) tipc_link_push_queue(l_ptr); - if (unlikely(!list_empty(&l_ptr->waiting_ports))) - tipc_link_wakeup_ports(l_ptr, 0); + if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { + link_prepare_wakeup(l_ptr); + l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS; + } /* Process the incoming packet */ if (unlikely(!link_working_working(l_ptr))) { -- cgit v1.2.3-70-g09d2 From 2e84c60b77e4dd96068f568a5971e681bb7e6b68 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Fri, 22 Aug 2014 18:09:18 -0400 Subject: tipc: remove include file port.h We move the inline functions in the file port.h to socket.c, and modify their names accordingly. We move struct tipc_port and some macros to socket.h. Finally, we remove the file port.h. Signed-off-by: Jon Maloy Reviewed-by: Erik Hugne Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/bcast.c | 1 - net/tipc/core.c | 2 +- net/tipc/link.c | 1 - net/tipc/name_table.c | 1 - net/tipc/net.c | 1 - net/tipc/port.h | 145 -------------------------------------------------- net/tipc/socket.c | 123 ++++++++++++++++++++++++++++-------------- net/tipc/socket.h | 39 +++++++++++++- net/tipc/subscr.c | 1 - 9 files changed, 121 insertions(+), 193 deletions(-) delete mode 100644 net/tipc/port.h (limited to 'net/tipc/link.c') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 9510fb2df56..b2bbe69b255 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -37,7 +37,6 @@ #include "core.h" #include "link.h" -#include "port.h" #include "socket.h" #include "msg.h" #include "bcast.h" diff --git a/net/tipc/core.c b/net/tipc/core.c index 676d18015dd..b3b03ef30df 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -39,7 +39,7 @@ #include "name_table.h" #include "subscr.h" #include "config.h" -#include "port.h" +#include "socket.h" #include diff --git a/net/tipc/link.c b/net/tipc/link.c index 6c775a107a0..65410e18b8a 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -36,7 +36,6 @@ #include "core.h" #include "link.h" -#include "port.h" #include "socket.h" #include "name_distr.h" #include "discover.h" diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 9d7d37d9518..c058e30f84a 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -39,7 +39,6 @@ #include "name_table.h" #include "name_distr.h" #include "subscr.h" -#include "port.h" #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ diff --git a/net/tipc/net.c b/net/tipc/net.c index 421dd89152a..93b9944a6a8 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -38,7 +38,6 @@ #include "net.h" #include "name_distr.h" #include "subscr.h" -#include "port.h" #include "socket.h" #include "node.h" #include "config.h" diff --git a/net/tipc/port.h b/net/tipc/port.h deleted file mode 100644 index 38bf8cb3df1..00000000000 --- a/net/tipc/port.h +++ /dev/null @@ -1,145 +0,0 @@ -/* - * net/tipc/port.h: Include file for TIPC port code - * - * Copyright (c) 1994-2007, 2014, Ericsson AB - * Copyright (c) 2004-2007, 2010-2013, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _TIPC_PORT_H -#define _TIPC_PORT_H - -#include "net.h" -#include "msg.h" -#include "node_subscr.h" - -#define TIPC_CONNACK_INTV 256 -#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) -#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ - SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) - -/** - * struct tipc_port - TIPC port structure - * @lock: pointer to spinlock for controlling access to port - * @connected: non-zero if port is currently connected to a peer port - * @conn_type: TIPC type used when connection was established - * @conn_instance: TIPC instance used when connection was established - * @published: non-zero if port has one or more associated names - * @max_pkt: maximum packet size "hint" used when building messages sent by port - * @ref: unique reference to port in TIPC object registry - * @phdr: preformatted message header used when sending messages - * @port_list: adjacent ports in TIPC's global list of ports - * @publications: list of publications for port - * @pub_count: total # of publications port has made during its lifetime - * @probing_state: - * @probing_interval: - * @timer_ref: - */ -struct tipc_port { - int connected; - u32 conn_type; - u32 conn_instance; - int published; - u32 max_pkt; - u32 ref; - struct tipc_msg phdr; - struct list_head publications; - u32 pub_count; - u32 probing_state; - u32 probing_interval; - struct timer_list timer; -}; - -/* - * TIPC port manipulation routines - */ -u32 tipc_port_init(struct tipc_port *p_ptr, - const unsigned int importance); - -void tipc_port_destroy(struct tipc_port *p_ptr); - -int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, - struct tipc_name_seq const *name_seq); - -int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope, - struct tipc_name_seq const *name_seq); - -int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); - -void tipc_port_reinit(void); - -static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) -{ - return msg_destnode(&p_ptr->phdr); -} - -static inline u32 tipc_port_peerport(struct tipc_port *p_ptr) -{ - return msg_destport(&p_ptr->phdr); -} - -static inline bool tipc_port_unreliable(struct tipc_port *port) -{ - return msg_src_droppable(&port->phdr) != 0; -} - -static inline void tipc_port_set_unreliable(struct tipc_port *port, - bool unreliable) -{ - msg_set_src_droppable(&port->phdr, unreliable ? 1 : 0); -} - -static inline bool tipc_port_unreturnable(struct tipc_port *port) -{ - return msg_dest_droppable(&port->phdr) != 0; -} - -static inline void tipc_port_set_unreturnable(struct tipc_port *port, - bool unreturnable) -{ - msg_set_dest_droppable(&port->phdr, unreturnable ? 1 : 0); -} - - -static inline int tipc_port_importance(struct tipc_port *port) -{ - return msg_importance(&port->phdr); -} - -static inline int tipc_port_set_importance(struct tipc_port *port, int imp) -{ - if (imp > TIPC_CRITICAL_IMPORTANCE) - return -EINVAL; - msg_set_importance(&port->phdr, (u32)imp); - return 0; -} - -#endif diff --git a/net/tipc/socket.c b/net/tipc/socket.c index ea33eab4fb9..70eaceae1f8 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -36,12 +36,12 @@ #include "core.h" #include "ref.h" -#include "port.h" #include "name_table.h" #include "node.h" #include "link.h" #include #include "config.h" +#include "socket.h" #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ @@ -114,24 +114,65 @@ static struct proto tipc_proto_kern; * - port reference */ -#include "socket.h" +static u32 tsk_peer_node(struct tipc_port *p_ptr) +{ + return msg_destnode(&p_ptr->phdr); +} + +static u32 tsk_peer_port(struct tipc_port *p_ptr) +{ + return msg_destport(&p_ptr->phdr); +} + +static bool tsk_unreliable(struct tipc_port *port) +{ + return msg_src_droppable(&port->phdr) != 0; +} + +static void tsk_set_unreliable(struct tipc_port *port, bool unreliable) +{ + msg_set_src_droppable(&port->phdr, unreliable ? 1 : 0); +} + +static bool tsk_unreturnable(struct tipc_port *port) +{ + return msg_dest_droppable(&port->phdr) != 0; +} + +static void tsk_set_unreturnable(struct tipc_port *port, bool unreturnable) +{ + msg_set_dest_droppable(&port->phdr, unreturnable ? 1 : 0); +} + +static int tsk_importance(struct tipc_port *port) +{ + return msg_importance(&port->phdr); +} + +static int tsk_set_importance(struct tipc_port *port, int imp) +{ + if (imp > TIPC_CRITICAL_IMPORTANCE) + return -EINVAL; + msg_set_importance(&port->phdr, (u32)imp); + return 0; +} /** - * advance_rx_queue - discard first buffer in socket receive queue + * tsk_advance_rx_queue - discard first buffer in socket receive queue * * Caller must hold socket lock */ -static void advance_rx_queue(struct sock *sk) +static void tsk_advance_rx_queue(struct sock *sk) { kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); } /** - * reject_rx_queue - reject all buffers in socket receive queue + * tsk_rej_rx_queue - reject all buffers in socket receive queue * * Caller must hold socket lock */ -static void reject_rx_queue(struct sock *sk) +static void tsk_rej_rx_queue(struct sock *sk) { struct sk_buff *buf; u32 dnode; @@ -142,14 +183,14 @@ static void reject_rx_queue(struct sock *sk) } } -/* tipc_sk_peer_msg - verify if message was sent by connected port's peer +/* tsk_peer_msg - verify if message was sent by connected port's peer * * Handles cases where the node's network address has changed from * the default of <0.0.0> to its configured setting. */ -static bool tipc_sk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) +static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) { - u32 peer_port = tipc_port_peerport(&tsk->port); + u32 peer_port = tsk_peer_port(&tsk->port); u32 orig_node; u32 peer_node; @@ -160,7 +201,7 @@ static bool tipc_sk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) return false; orig_node = msg_orignode(msg); - peer_node = tipc_port_peernode(&tsk->port); + peer_node = tsk_peer_node(&tsk->port); if (likely(orig_node == peer_node)) return true; @@ -258,9 +299,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock, atomic_set(&tsk->dupl_rcvcnt, 0); if (sock->state == SS_READY) { - tipc_port_set_unreturnable(port, true); + tsk_set_unreturnable(port, true); if (sock->type == SOCK_DGRAM) - tipc_port_set_unreliable(port, true); + tsk_set_unreliable(port, true); } return 0; } @@ -373,7 +414,7 @@ static int tipc_release(struct socket *sock) * Reject all unreceived messages, except on an active connection * (which disconnects locally & sends a 'FIN+' to peer) */ - dnode = tipc_port_peernode(port); + dnode = tsk_peer_node(port); while (sock->state != SS_DISCONNECTING) { buf = __skb_dequeue(&sk->sk_receive_queue); if (buf == NULL) @@ -398,7 +439,7 @@ static int tipc_release(struct socket *sock) if (port->connected) { buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, tipc_own_addr, - tipc_port_peerport(port), + tsk_peer_port(port), port->ref, TIPC_ERR_NO_PORT); if (buf) tipc_link_xmit(buf, dnode, port->ref); @@ -502,8 +543,8 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr, if ((sock->state != SS_CONNECTED) && ((peer != 2) || (sock->state != SS_DISCONNECTING))) return -ENOTCONN; - addr->addr.id.ref = tipc_port_peerport(&tsk->port); - addr->addr.id.node = tipc_port_peernode(&tsk->port); + addr->addr.id.ref = tsk_peer_port(&tsk->port); + addr->addr.id.node = tsk_peer_node(&tsk->port); } else { addr->addr.id.ref = tsk->port.ref; addr->addr.id.node = tipc_own_addr; @@ -699,7 +740,7 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, int conn_cong; /* Ignore if connection cannot be validated: */ - if (!tipc_sk_peer_msg(tsk, msg)) + if (!tsk_peer_msg(tsk, msg)) goto exit; port->probing_state = TIPC_CONN_OK; @@ -986,7 +1027,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, } timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - dnode = tipc_port_peernode(port); + dnode = tsk_peer_node(port); next: mtu = port->max_pkt; @@ -1161,8 +1202,8 @@ static void tipc_sk_send_ack(struct tipc_port *port, uint ack) { struct sk_buff *buf = NULL; struct tipc_msg *msg; - u32 peer_port = tipc_port_peerport(port); - u32 dnode = tipc_port_peernode(port); + u32 peer_port = tsk_peer_port(port); + u32 dnode = tsk_peer_node(port); if (!port->connected) return; @@ -1260,7 +1301,7 @@ restart: /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); goto restart; } @@ -1298,7 +1339,7 @@ restart: tipc_sk_send_ack(port, tsk->rcv_unacked); tsk->rcv_unacked = 0; } - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); } exit: release_sock(sk); @@ -1360,7 +1401,7 @@ restart: /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); goto restart; } @@ -1409,7 +1450,7 @@ restart: tipc_sk_send_ack(port, tsk->rcv_unacked); tsk->rcv_unacked = 0; } - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); } /* Loop around if more data is required */ @@ -1480,12 +1521,12 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) switch ((int)sock->state) { case SS_CONNECTED: /* Accept only connection-based messages sent by peer */ - if (tipc_sk_peer_msg(tsk, msg)) { + if (tsk_peer_msg(tsk, msg)) { if (unlikely(msg_errcode(msg))) { sock->state = SS_DISCONNECTING; port->connected = 0; /* let timer expire on it's own */ - tipc_node_remove_conn(tipc_port_peernode(port), + tipc_node_remove_conn(tsk_peer_node(port), port->ref); } retval = TIPC_OK; @@ -1919,13 +1960,13 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) * Reject any stray messages received by new socket * before the socket lock was taken (very, very unlikely) */ - reject_rx_queue(new_sk); + tsk_rej_rx_queue(new_sk); /* Connect new socket to it's peer */ tipc_sk_finish_conn(new_port, msg_origport(msg), msg_orignode(msg)); new_sock->state = SS_CONNECTED; - tipc_port_set_importance(new_port, msg_importance(msg)); + tsk_set_importance(new_port, msg_importance(msg)); if (msg_named(msg)) { new_port->conn_type = msg_nametype(msg); new_port->conn_instance = msg_nameinst(msg); @@ -1938,7 +1979,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) if (!msg_data_sz(msg)) { struct msghdr m = {NULL,}; - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); tipc_send_packet(NULL, new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); @@ -1990,11 +2031,11 @@ restart: tipc_link_xmit(buf, dnode, port->ref); tipc_node_remove_conn(dnode, port->ref); } else { - dnode = tipc_port_peernode(port); + dnode = tsk_peer_node(port); buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, tipc_own_addr, - tipc_port_peerport(port), + tsk_peer_port(port), port->ref, TIPC_CONN_SHUTDOWN); tipc_link_xmit(buf, dnode, port->ref); } @@ -2040,8 +2081,8 @@ static void tipc_sk_timeout(unsigned long ref) bh_unlock_sock(sk); goto exit; } - peer_port = tipc_port_peerport(port); - peer_node = tipc_port_peernode(port); + peer_port = tsk_peer_port(port); + peer_node = tsk_peer_node(port); if (port->probing_state == TIPC_CONN_PROBING) { /* Previous probe not answered -> self abort */ @@ -2132,8 +2173,8 @@ static int tipc_sk_show(struct tipc_port *port, char *buf, ret = tipc_snprintf(buf, len, "%-10u:", port->ref); if (port->connected) { - u32 dport = tipc_port_peerport(port); - u32 destnode = tipc_port_peernode(port); + u32 dport = tsk_peer_port(port); + u32 destnode = tsk_peer_node(port); ret += tipc_snprintf(buf + ret, len - ret, " connected to <%u.%u.%u:%u>", @@ -2248,16 +2289,16 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, switch (opt) { case TIPC_IMPORTANCE: - res = tipc_port_set_importance(port, value); + res = tsk_set_importance(port, value); break; case TIPC_SRC_DROPPABLE: if (sock->type != SOCK_STREAM) - tipc_port_set_unreliable(port, value); + tsk_set_unreliable(port, value); else res = -ENOPROTOOPT; break; case TIPC_DEST_DROPPABLE: - tipc_port_set_unreturnable(port, value); + tsk_set_unreturnable(port, value); break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; @@ -2307,13 +2348,13 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, switch (opt) { case TIPC_IMPORTANCE: - value = tipc_port_importance(port); + value = tsk_importance(port); break; case TIPC_SRC_DROPPABLE: - value = tipc_port_unreliable(port); + value = tsk_unreliable(port); break; case TIPC_DEST_DROPPABLE: - value = tipc_port_unreturnable(port); + value = tsk_unreturnable(port); break; case TIPC_CONN_TIMEOUT: value = tipc_sk(sk)->conn_timeout; diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 5d515be604a..b98725e27b9 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -35,11 +35,48 @@ #ifndef _TIPC_SOCK_H #define _TIPC_SOCK_H -#include "port.h" #include +#include "msg.h" #define TIPC_CONN_OK 0 #define TIPC_CONN_PROBING 1 +#define TIPC_CONNACK_INTV 256 +#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) +#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ + SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) + +/** + * struct tipc_port - TIPC port structure + * @lock: pointer to spinlock for controlling access to port + * @connected: non-zero if port is currently connected to a peer port + * @conn_type: TIPC type used when connection was established + * @conn_instance: TIPC instance used when connection was established + * @published: non-zero if port has one or more associated names + * @max_pkt: maximum packet size "hint" used when building messages sent by port + * @ref: unique reference to port in TIPC object registry + * @phdr: preformatted message header used when sending messages + * @port_list: adjacent ports in TIPC's global list of ports + * @publications: list of publications for port + * @pub_count: total # of publications port has made during its lifetime + * @probing_state: + * @probing_interval: + * @timer_ref: + */ +struct tipc_port { + int connected; + u32 conn_type; + u32 conn_instance; + int published; + u32 max_pkt; + u32 ref; + struct tipc_msg phdr; + struct list_head port_list; + struct list_head publications; + u32 pub_count; + u32 probing_state; + u32 probing_interval; + struct timer_list timer; +}; /** * struct tipc_sock - TIPC socket structure diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 642437231ad..31b5cb232a4 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -36,7 +36,6 @@ #include "core.h" #include "name_table.h" -#include "port.h" #include "subscr.h" /** -- cgit v1.2.3-70-g09d2 From 643566d4b47e2956110e79c0e6f65db9b9ea42c6 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Fri, 17 Oct 2014 15:25:28 -0400 Subject: tipc: fix bug in bundled buffer reception In commit ec8a2e5621db2da24badb3969eda7fd359e1869f ("tipc: same receive code path for connection protocol and data messages") we omitted the the possiblilty that an arriving message extracted from a bundle buffer may be a multicast message. Such messages need to be to be delivered to the socket via a separate function, tipc_sk_mcast_rcv(). As a result, small multicast messages arriving as members of a bundle buffer will be silently dropped. This commit corrects the error by considering this case in the function tipc_link_bundle_rcv(). Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 65410e18b8a..1db162aa64a 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1924,7 +1924,12 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) } omsg = buf_msg(obuf); pos += align(msg_size(omsg)); - if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) { + if (msg_isdata(omsg)) { + if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG)) + tipc_sk_mcast_rcv(obuf); + else + tipc_sk_rcv(obuf); + } else if (msg_user(omsg) == CONN_MANAGER) { tipc_sk_rcv(obuf); } else if (msg_user(omsg) == NAME_DISTRIBUTOR) { tipc_named_rcv(obuf); -- cgit v1.2.3-70-g09d2