From f288bef46443eb3a0b212c1c57b222c0497e06f6 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Tue, 21 Aug 2012 11:16:57 +0800 Subject: tipc: fix race/inefficiencies in poll/wait behaviour When an application blocks at poll/select on a TIPC socket while requesting a specific event mask, both the filter_rcv() and wakeupdispatch() case will wake it up unconditionally whenever the state changes (i.e an incoming message arrives, or congestion has subsided). No mask is used. To avoid this, we populate sk->sk_data_ready and sk->sk_write_space with tipc_data_ready and tipc_write_space respectively, which makes tipc more in alignment with the rest of the networking code. These pass the exact set of possible events to the waker in fs/select.c hence avoiding waking up blocked processes unnecessarily. In doing so, we uncover another issue -- that there needs to be a memory barrier in these poll/receive callbacks, otherwise we are subject to the the same race as documented above wq_has_sleeper() [in commit a57de0b4 "net: adding memory barrier to the poll and receive callbacks"]. So we need to replace poll_wait() with sock_poll_wait() and use rcu protection for the sk->sk_wq pointer in these two new functions. Signed-off-by: Ying Xue Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 45 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index fd5f042dbff..b5fc8ed1d1f 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -62,6 +62,8 @@ struct tipc_sock { static int backlog_rcv(struct sock *sk, struct sk_buff *skb); static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); static void wakeupdispatch(struct tipc_port *tport); +static void tipc_data_ready(struct sock *sk, int len); +static void tipc_write_space(struct sock *sk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, sock_init_data(sock, sk); sk->sk_backlog_rcv = backlog_rcv; sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; + sk->sk_data_ready = tipc_data_ready; + sk->sk_write_space = tipc_write_space; tipc_sk(sk)->p = tp_ptr; tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; @@ -435,7 +439,7 @@ static unsigned int poll(struct file *file, struct socket *sock, struct sock *sk = sock->sk; u32 mask = 0; - poll_wait(file, sk_sleep(sk), wait); + sock_poll_wait(file, sk_sleep(sk), wait); switch ((int)sock->state) { case SS_READY: @@ -1125,6 +1129,39 @@ exit: return sz_copied ? sz_copied : res; } +/** + * tipc_write_space - wake up thread if port congestion is released + * @sk: socket + */ +static void tipc_write_space(struct sock *sk) +{ + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (wq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLOUT | + POLLWRNORM | POLLWRBAND); + rcu_read_unlock(); +} + +/** + * tipc_data_ready - wake up threads to indicate messages have been received + * @sk: socket + * @len: the length of messages + */ +static void tipc_data_ready(struct sock *sk, int len) +{ + struct socket_wq *wq; + + rcu_read_lock(); + wq = rcu_dereference(sk->sk_wq); + if (wq_has_sleeper(wq)) + wake_up_interruptible_sync_poll(&wq->wait, POLLIN | + POLLRDNORM | POLLRDBAND); + rcu_read_unlock(); +} + /** * rx_queue_full - determine if receive queue can accept another message * @msg: message to be added to queue @@ -1222,8 +1259,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) tipc_disconnect_port(tipc_sk_port(sk)); } - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + sk->sk_data_ready(sk, 0); return TIPC_OK; } @@ -1290,8 +1326,7 @@ static void wakeupdispatch(struct tipc_port *tport) { struct sock *sk = (struct sock *)tport->usr_handle; - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + sk->sk_write_space(sk); } /** -- cgit v1.2.3-70-g09d2 From c4fc298ab44011a7f7a391bf00350acf481eeaeb Mon Sep 17 00:00:00 2001 From: Erik Hugne Date: Tue, 16 Oct 2012 16:47:06 +0200 Subject: tipc: return POLLOUT for sockets in an unconnected state If an implied connect is attempted on a nonblocking STREAM/SEQPACKET socket during link congestion, the connect message will be discarded and sendmsg will return EAGAIN. This is normal behavior, and the application is expected to poll the socket until POLLOUT is set, after which the connection attempt can be retried. However, the POLLOUT flag is never set for unconnected sockets and poll() always returns a zero mask. The application is then left without a trigger for when it can make another attempt at sending the message. The solution is to check if we're polling on an unconnected socket and set the POLLOUT flag if the TIPC port owned by this socket is not congested. The TIPC ports waiting on a specific link will be marked as 'not congested' when the link congestion have abated. Signed-off-by: Erik Hugne Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b5fc8ed1d1f..59adc76905e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -412,7 +412,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr, * socket state flags set * ------------ --------- * unconnected no read flags - * no write flags + * POLLOUT if port is not congested * * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue * no write flags @@ -442,6 +442,10 @@ static unsigned int poll(struct file *file, struct socket *sock, sock_poll_wait(file, sk_sleep(sk), wait); switch ((int)sock->state) { + case SS_UNCONNECTED: + if (!tipc_sk_port(sk)->congested) + mask |= POLLOUT; + break; case SS_READY: case SS_CONNECTED: if (!tipc_sk_port(sk)->congested) -- cgit v1.2.3-70-g09d2 From 7503115107e5862870eaf5133627051b2e23ac0a Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 29 Oct 2012 09:38:15 -0400 Subject: tipc: wake up all waiting threads at socket shutdown When a socket is shut down, we should wake up all thread sleeping on it, instead of just one of them. Otherwise, when several threads are polling the same socket, and one of them does shutdown(), the remaining threads may end up sleeping forever. Also, to align socket usage with common practice in other stacks, we use one of the common socket callback handlers, sk_state_change(), to wake up pending users. This is similar to the usage in e.g. inet_shutdown(). [net/ipv4/af_inet.c]. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 59adc76905e..1a720c86e80 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1595,10 +1595,11 @@ restart: case SS_DISCONNECTING: - /* Discard any unreceived messages; wake up sleeping tasks */ + /* Discard any unreceived messages */ discard_rx_queue(sk); - if (waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + + /* Wake up anyone sleeping in poll */ + sk->sk_state_change(sk); res = 0; break; -- cgit v1.2.3-70-g09d2 From 3c294cb374bf7ad6f5c2763f994d75935fb7814d Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 15 Nov 2012 11:34:45 +0800 Subject: tipc: remove the bearer congestion mechanism Currently at the TIPC bearer layer there is the following congestion mechanism: Once sending packets has failed via that bearer, the bearer will be flagged as being in congested state at once. During bearer congestion, all packets arriving at link will be queued on the link's outgoing buffer. When we detect that the state of bearer congestion has relaxed (e.g. some packets are received from the bearer) we will try our best to push all packets in the link's outgoing buffer until the buffer is empty, or until the bearer is congested again. However, in fact the TIPC bearer never receives any feedback from the device layer whether a send was successful or not, so it must always assume it was successful. Therefore, the bearer congestion mechanism as it exists currently is of no value. But the bearer blocking state is still useful for us. For example, when the physical media goes down/up, we need to change the state of the links bound to the bearer. So the code maintaing the state information is not removed. Signed-off-by: Ying Xue Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 21 ++++------ net/tipc/bearer.c | 110 +++++------------------------------------------- net/tipc/bearer.h | 24 ++--------- net/tipc/discover.c | 2 +- net/tipc/link.c | 117 +++++++++++++++++----------------------------------- net/tipc/link.h | 4 -- 6 files changed, 61 insertions(+), 217 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e4e6d8cd47e..40da098eeb3 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, if (bcbearer->remains_new.count == bcbearer->remains.count) continue; /* bearer pair doesn't add anything */ - if (p->blocked || - p->media->send_msg(buf, p, &p->media->bcast_addr)) { + if (!tipc_bearer_blocked(p)) + tipc_bearer_send(p, buf, &p->media->bcast_addr); + else if (s && !tipc_bearer_blocked(s)) /* unable to send on primary bearer */ - if (!s || s->blocked || - s->media->send_msg(buf, s, - &s->media->bcast_addr)) { - /* unable to send on either bearer */ - continue; - } - } + tipc_bearer_send(s, buf, &s->media->bcast_addr); + else + /* unable to send on either bearer */ + continue; if (s) { bcbearer->bpairs[bp_index].primary = s; @@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) " TX naks:%u acks:%u dups:%u\n", s->sent_nacks, s->sent_acks, s->retransmitted); ret += tipc_snprintf(buf + ret, buf_size - ret, - " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", - s->bearer_congs, s->link_congs, s->max_queue_sz, + " Congestion link:%u Send queue max:%u avg:%u\n", + s->link_congs, s->max_queue_sz, s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); @@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit) void tipc_bclink_init(void) { - INIT_LIST_HEAD(&bcbearer->bearer.cong_links); bcbearer->bearer.media = &bcbearer->media; bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-broadcast"); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 4ec5c80e8a7..aa62f93a912 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -279,115 +279,30 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest) } /* - * bearer_push(): Resolve bearer congestion. Force the waiting - * links to push out their unsent packets, one packet per link - * per iteration, until all packets are gone or congestion reoccurs. - * 'tipc_net_lock' is read_locked when this function is called - * bearer.lock must be taken before calling - * Returns binary true(1) ore false(0) - */ -static int bearer_push(struct tipc_bearer *b_ptr) -{ - u32 res = 0; - struct tipc_link *ln, *tln; - - if (b_ptr->blocked) - return 0; - - while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) { - list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) { - res = tipc_link_push_packet(ln); - if (res == PUSH_FAILED) - break; - if (res == PUSH_FINISHED) - list_move_tail(&ln->link_list, &b_ptr->links); - } - } - return list_empty(&b_ptr->cong_links); -} - -void tipc_bearer_lock_push(struct tipc_bearer *b_ptr) -{ - spin_lock_bh(&b_ptr->lock); - bearer_push(b_ptr); - spin_unlock_bh(&b_ptr->lock); -} - - -/* - * Interrupt enabling new requests after bearer congestion or blocking: + * Interrupt enabling new requests after bearer blocking: * See bearer_send(). */ -void tipc_continue(struct tipc_bearer *b_ptr) +void tipc_continue(struct tipc_bearer *b) { - spin_lock_bh(&b_ptr->lock); - if (!list_empty(&b_ptr->cong_links)) - tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr); - b_ptr->blocked = 0; - spin_unlock_bh(&b_ptr->lock); + spin_lock_bh(&b->lock); + b->blocked = 0; + spin_unlock_bh(&b->lock); } /* - * Schedule link for sending of messages after the bearer - * has been deblocked by 'continue()'. This method is called - * when somebody tries to send a message via this link while - * the bearer is congested. 'tipc_net_lock' is in read_lock here - * bearer.lock is busy + * tipc_bearer_blocked - determines if bearer is currently blocked */ -static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr, - struct tipc_link *l_ptr) +int tipc_bearer_blocked(struct tipc_bearer *b) { - list_move_tail(&l_ptr->link_list, &b_ptr->cong_links); -} - -/* - * Schedule link for sending of messages after the bearer - * has been deblocked by 'continue()'. This method is called - * when somebody tries to send a message via this link while - * the bearer is congested. 'tipc_net_lock' is in read_lock here, - * bearer.lock is free - */ -void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr) -{ - spin_lock_bh(&b_ptr->lock); - tipc_bearer_schedule_unlocked(b_ptr, l_ptr); - spin_unlock_bh(&b_ptr->lock); -} - + int res; -/* - * tipc_bearer_resolve_congestion(): Check if there is bearer congestion, - * and if there is, try to resolve it before returning. - * 'tipc_net_lock' is read_locked when this function is called - */ -int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, - struct tipc_link *l_ptr) -{ - int res = 1; + spin_lock_bh(&b->lock); + res = b->blocked; + spin_unlock_bh(&b->lock); - if (list_empty(&b_ptr->cong_links)) - return 1; - spin_lock_bh(&b_ptr->lock); - if (!bearer_push(b_ptr)) { - tipc_bearer_schedule_unlocked(b_ptr, l_ptr); - res = 0; - } - spin_unlock_bh(&b_ptr->lock); return res; } -/** - * tipc_bearer_congested - determines if bearer is currently congested - */ -int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr) -{ - if (unlikely(b_ptr->blocked)) - return 1; - if (likely(list_empty(&b_ptr->cong_links))) - return 0; - return !tipc_bearer_resolve_congestion(b_ptr, l_ptr); -} - /** * tipc_enable_bearer - enable bearer with the given name */ @@ -489,7 +404,6 @@ restart: b_ptr->net_plane = bearer_id + 'A'; b_ptr->active = 1; b_ptr->priority = priority; - INIT_LIST_HEAD(&b_ptr->cong_links); INIT_LIST_HEAD(&b_ptr->links); spin_lock_init(&b_ptr->lock); @@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name) pr_info("Blocking bearer <%s>\n", name); spin_lock_bh(&b_ptr->lock); b_ptr->blocked = 1; - list_splice_init(&b_ptr->cong_links, &b_ptr->links); list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { struct tipc_node *n_ptr = l_ptr->owner; @@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr) spin_lock_bh(&b_ptr->lock); b_ptr->blocked = 1; b_ptr->media->disable_bearer(b_ptr); - list_splice_init(&b_ptr->cong_links, &b_ptr->links); list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { tipc_link_delete(l_ptr); } diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index dd4c2abf08e..39f1192d04b 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -120,7 +120,6 @@ struct tipc_media { * @identity: array index of this bearer within TIPC bearer array * @link_req: ptr to (optional) structure making periodic link setup requests * @links: list of non-congested links associated with bearer - * @cong_links: list of congested links associated with bearer * @active: non-zero if bearer structure is represents a bearer * @net_plane: network plane ('A' through 'H') currently associated with bearer * @nodes: indicates which nodes in cluster can be reached through bearer @@ -143,7 +142,6 @@ struct tipc_bearer { u32 identity; struct tipc_link_req *link_req; struct list_head links; - struct list_head cong_links; int active; char net_plane; struct tipc_node_map nodes; @@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void); struct sk_buff *tipc_bearer_get_names(void); void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest); void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); -void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr); struct tipc_bearer *tipc_bearer_find(const char *name); struct tipc_bearer *tipc_bearer_find_interface(const char *if_name); struct tipc_media *tipc_media_find(const char *name); -int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, - struct tipc_link *l_ptr); -int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr); +int tipc_bearer_blocked(struct tipc_bearer *b_ptr); void tipc_bearer_stop(void); -void tipc_bearer_lock_push(struct tipc_bearer *b_ptr); - /** * tipc_bearer_send- sends buffer to destination over bearer * - * Returns true (1) if successful, or false (0) if unable to send - * * IMPORTANT: * The media send routine must not alter the buffer being passed in * as it may be needed for later retransmission! - * - * If the media send routine returns a non-zero value (indicating that - * it was unable to send the buffer), it must: - * 1) mark the bearer as blocked, - * 2) call tipc_continue() once the bearer is able to send again. - * Media types that are unable to meet these two critera must ensure their - * send routine always returns success -- even if the buffer was not sent -- - * and let TIPC's link code deal with the undelivered message. */ -static inline int tipc_bearer_send(struct tipc_bearer *b_ptr, - struct sk_buff *buf, +static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, struct tipc_media_addr *dest) { - return !b_ptr->media->send_msg(buf, b_ptr, dest); + b->media->send_msg(buf, b, dest); } #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 50eaa403eb6..1074b9587e8 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) { rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); if (rbuf) { - b_ptr->media->send_msg(rbuf, b_ptr, &media_addr); + tipc_bearer_send(b_ptr, rbuf, &media_addr); kfree_skb(rbuf); } } diff --git a/net/tipc/link.c b/net/tipc/link.c index a79c755cb41..0cc64800ab9 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -872,17 +872,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) return link_send_long_buf(l_ptr, buf); /* Packet can be queued or sent. */ - if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && + if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) && !link_congested(l_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); - if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { - l_ptr->unacked_window = 0; - } else { - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->stats.bearer_congs++; - l_ptr->next_out = buf; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + l_ptr->unacked_window = 0; return dsz; } /* Congestion: can message be bundled ? */ @@ -891,10 +886,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) /* Try adding message to an existing bundle */ if (l_ptr->next_out && - link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { - tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); + link_bundle_buf(l_ptr, l_ptr->last_out, buf)) return dsz; - } /* Try creating a new bundle */ if (size <= max_packet * 2 / 3) { @@ -917,7 +910,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) if (!l_ptr->next_out) l_ptr->next_out = buf; link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); return dsz; } @@ -1006,16 +998,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, if (likely(!link_congested(l_ptr))) { if (likely(msg_size(msg) <= l_ptr->max_pkt)) { - if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { + if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); - if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, - &l_ptr->media_addr))) { - l_ptr->unacked_window = 0; - return res; - } - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->stats.bearer_congs++; - l_ptr->next_out = buf; + tipc_bearer_send(l_ptr->b_ptr, buf, + &l_ptr->media_addr); + l_ptr->unacked_window = 0; return res; } } else @@ -1106,7 +1093,7 @@ exit: /* Exit if link (or bearer) is congested */ if (link_congested(l_ptr) || - !list_empty(&l_ptr->b_ptr->cong_links)) { + tipc_bearer_blocked(l_ptr->b_ptr)) { res = link_schedule_port(l_ptr, sender->ref, res); goto exit; @@ -1329,15 +1316,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (r_q_size && buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - l_ptr->retransm_queue_head = mod(++r_q_head); - l_ptr->retransm_queue_size = --r_q_size; - l_ptr->stats.retransmitted++; - return 0; - } else { - l_ptr->stats.bearer_congs++; - return PUSH_FAILED; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + l_ptr->retransm_queue_head = mod(++r_q_head); + l_ptr->retransm_queue_size = --r_q_size; + l_ptr->stats.retransmitted++; + return 0; } /* Send deferred protocol message, if any: */ @@ -1345,15 +1328,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (buf) { msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - l_ptr->unacked_window = 0; - kfree_skb(buf); - l_ptr->proto_msg_queue = NULL; - return 0; - } else { - l_ptr->stats.bearer_congs++; - return PUSH_FAILED; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + l_ptr->unacked_window = 0; + kfree_skb(buf); + l_ptr->proto_msg_queue = NULL; + return 0; } /* Send one deferred data message, if send window not full: */ @@ -1366,18 +1345,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) if (mod(next - first) < l_ptr->queue_limit[0]) { msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); - l_ptr->next_out = buf->next; - return 0; - } else { - l_ptr->stats.bearer_congs++; - return PUSH_FAILED; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + if (msg_user(msg) == MSG_BUNDLER) + msg_set_type(msg, CLOSED_MSG); + l_ptr->next_out = buf->next; + return 0; } } - return PUSH_FINISHED; + return 1; } /* @@ -1388,15 +1363,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr) { u32 res; - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) + if (tipc_bearer_blocked(l_ptr->b_ptr)) return; do { res = tipc_link_push_packet(l_ptr); } while (!res); - - if (res == PUSH_FAILED) - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); } static void link_reset_all(unsigned long addr) @@ -1481,7 +1453,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, msg = buf_msg(buf); - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { + if (tipc_bearer_blocked(l_ptr->b_ptr)) { if (l_ptr->retransm_queue_size == 0) { l_ptr->retransm_queue_head = msg_seqno(msg); l_ptr->retransm_queue_size = retransmits; @@ -1491,7 +1463,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, } return; } else { - /* Detect repeated retransmit failures on uncongested bearer */ + /* Detect repeated retransmit failures on unblocked bearer */ if (l_ptr->last_retransmitted == msg_seqno(msg)) { if (++l_ptr->stale_count > 100) { link_retransmit_failure(l_ptr, buf); @@ -1507,17 +1479,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, msg = buf_msg(buf); msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - buf = buf->next; - retransmits--; - l_ptr->stats.retransmitted++; - } else { - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->stats.bearer_congs++; - l_ptr->retransm_queue_head = buf_seqno(buf); - l_ptr->retransm_queue_size = retransmits; - return; - } + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); + buf = buf->next; + retransmits--; + l_ptr->stats.retransmitted++; } l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; @@ -1972,21 +1937,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - /* Defer message if bearer is already congested */ - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { - l_ptr->proto_msg_queue = buf; - return; - } - - /* Defer message if attempting to send results in bearer congestion */ - if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); + /* Defer message if bearer is already blocked */ + if (tipc_bearer_blocked(l_ptr->b_ptr)) { l_ptr->proto_msg_queue = buf; - l_ptr->stats.bearer_congs++; return; } - /* Discard message if it was sent successfully */ + tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; kfree_skb(buf); } @@ -2937,8 +2894,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) s->sent_nacks, s->sent_acks, s->retransmitted); ret += tipc_snprintf(buf + ret, buf_size - ret, - " Congestion bearer:%u link:%u Send queue" - " max:%u avg:%u\n", s->bearer_congs, s->link_congs, + " Congestion link:%u Send queue" + " max:%u avg:%u\n", s->link_congs, s->max_queue_sz, s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); diff --git a/net/tipc/link.h b/net/tipc/link.h index 6e921121be0..c048ed1cbd7 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -40,9 +40,6 @@ #include "msg.h" #include "node.h" -#define PUSH_FAILED 1 -#define PUSH_FINISHED 2 - /* * Out-of-range value for link sequence numbers */ @@ -82,7 +79,6 @@ struct tipc_stats { u32 recv_fragmented; u32 recv_fragments; u32 link_congs; /* # port sends blocked by congestion */ - u32 bearer_congs; u32 deferred_recv; u32 duplicates; u32 max_queue_sz; /* send queue size high water mark */ -- cgit v1.2.3-70-g09d2 From 818f4da526656a100c637b098be06316fd4624e4 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Fri, 16 Nov 2012 13:51:29 +0800 Subject: tipc: remove supportable flag from bclink structure The "supportable" flag in bclink structure is a compatibility flag indicating whether a peer node is capable of receiving TIPC broadcast messages. However, all TIPC versions since tipc-1.5, and after the inclusion in the upstream Linux kernel in 2006, support this capability. It is highly unlikely that anybody is still using such an old version of TIPC, let alone that they want to mix it with TIPC-2.0 nodes. Therefore, we now remove the "supportable" flag. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 4 +--- net/tipc/node.c | 8 +++----- net/tipc/node.h | 2 -- 3 files changed, 4 insertions(+), 10 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/link.c b/net/tipc/link.c index 0cc64800ab9..f3a078fe70c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1426,8 +1426,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_addr_string_fill(addr_string, n_ptr->addr); pr_info("Broadcast link info for %s\n", addr_string); - pr_info("Supportable: %d, Supported: %d, Acked: %u\n", - n_ptr->bclink.supportable, + pr_info("Supported: %d, Acked: %u\n", n_ptr->bclink.supported, n_ptr->bclink.acked); pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", @@ -2014,7 +2013,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } else { l_ptr->max_pkt = l_ptr->max_pkt_target; } - l_ptr->owner->bclink.supportable = (max_pkt_info != 0); /* Synchronize broadcast link info, if not done previously */ if (!tipc_node_is_up(l_ptr->owner)) { diff --git a/net/tipc/node.c b/net/tipc/node.c index d21db204e25..6f3e9b35d27 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -264,11 +264,9 @@ static void node_established_contact(struct tipc_node *n_ptr) { tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); - if (n_ptr->bclink.supportable) { - n_ptr->bclink.acked = tipc_bclink_get_last_sent(); - tipc_bclink_add_node(n_ptr->addr); - n_ptr->bclink.supported = 1; - } + n_ptr->bclink.acked = tipc_bclink_get_last_sent(); + tipc_bclink_add_node(n_ptr->addr); + n_ptr->bclink.supported = 1; } static void node_name_purge_complete(unsigned long node_addr) diff --git a/net/tipc/node.h b/net/tipc/node.h index cfcaf4d6e48..3ac905f36b0 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -67,7 +67,6 @@ * @permit_changeover: non-zero if node has redundant links to this system * @signature: node instance identifier * @bclink: broadcast-related info - * @supportable: non-zero if node supports TIPC b'cast link capability * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node @@ -92,7 +91,6 @@ struct tipc_node { int permit_changeover; u32 signature; struct { - u8 supportable; u8 supported; u32 acked; u32 last_in; -- cgit v1.2.3-70-g09d2 From 389dd9bcf65e10929cedfeb79c49bd02069b8899 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Fri, 16 Nov 2012 13:51:30 +0800 Subject: tipc: rename supported flag to recv_permitted Rename the "supported" flag in bclink structure to "recv_permitted" to better reflect what it is used for. When this flag is set for a given node, we are permitted to receive and acknowledge broadcast messages from that node. Convert it to a bool at the same time, since it is not used to store any numerical values. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 6 +++--- net/tipc/link.c | 8 ++++---- net/tipc/node.c | 6 +++--- net/tipc/node.h | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 40da098eeb3..54f89f90ac3 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg) tipc_node_lock(n_ptr); - if (n_ptr->bclink.supported && + if (n_ptr->bclink.recv_permitted && (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && (n_ptr->bclink.last_in == msg_bcgap_after(msg))) n_ptr->bclink.oos_state = 2; @@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) goto exit; tipc_node_lock(node); - if (unlikely(!node->bclink.supported)) + if (unlikely(!node->bclink.recv_permitted)) goto unlock; /* Handle broadcast protocol message */ @@ -564,7 +564,7 @@ exit: u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) { - return (n_ptr->bclink.supported && + return (n_ptr->bclink.recv_permitted && (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); } diff --git a/net/tipc/link.c b/net/tipc/link.c index f3a078fe70c..20f128fc2be 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1426,8 +1426,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_addr_string_fill(addr_string, n_ptr->addr); pr_info("Broadcast link info for %s\n", addr_string); - pr_info("Supported: %d, Acked: %u\n", - n_ptr->bclink.supported, + pr_info("Reception permitted: %d, Acked: %u\n", + n_ptr->bclink.recv_permitted, n_ptr->bclink.acked); pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", n_ptr->bclink.last_in, @@ -1640,7 +1640,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) ackd = msg_ack(msg); /* Release acked messages */ - if (n_ptr->bclink.supported) + if (n_ptr->bclink.recv_permitted) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); crs = l_ptr->first_out; @@ -2067,7 +2067,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } /* Protocol message before retransmits, reduce loss risk */ - if (l_ptr->owner->bclink.supported) + if (l_ptr->owner->bclink.recv_permitted) tipc_bclink_update_link_state(l_ptr->owner, msg_last_bcast(msg)); diff --git a/net/tipc/node.c b/net/tipc/node.c index 6f3e9b35d27..283a59f0f1c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -266,7 +266,7 @@ static void node_established_contact(struct tipc_node *n_ptr) n_ptr->bclink.acked = tipc_bclink_get_last_sent(); tipc_bclink_add_node(n_ptr->addr); - n_ptr->bclink.supported = 1; + n_ptr->bclink.recv_permitted = true; } static void node_name_purge_complete(unsigned long node_addr) @@ -292,7 +292,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_addr_string_fill(addr_string, n_ptr->addr)); /* Flush broadcast link info associated with lost node */ - if (n_ptr->bclink.supported) { + if (n_ptr->bclink.recv_permitted) { while (n_ptr->bclink.deferred_head) { struct sk_buff *buf = n_ptr->bclink.deferred_head; n_ptr->bclink.deferred_head = buf->next; @@ -308,7 +308,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) tipc_bclink_remove_node(n_ptr->addr); tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); - n_ptr->bclink.supported = 0; + n_ptr->bclink.recv_permitted = false; } /* Abort link changeover */ diff --git a/net/tipc/node.h b/net/tipc/node.h index 3ac905f36b0..3c189b35b10 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -67,7 +67,6 @@ * @permit_changeover: non-zero if node has redundant links to this system * @signature: node instance identifier * @bclink: broadcast-related info - * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node * @last_sent: sequence # of last b'cast message sent by node @@ -76,6 +75,7 @@ * @deferred_head: oldest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node * @defragm: list of partially reassembled b'cast message fragments from node + * @recv_permitted: true if node is allowed to receive b'cast messages */ struct tipc_node { u32 addr; @@ -91,7 +91,6 @@ struct tipc_node { int permit_changeover; u32 signature; struct { - u8 supported; u32 acked; u32 last_in; u32 last_sent; @@ -100,6 +99,7 @@ struct tipc_node { struct sk_buff *deferred_head; struct sk_buff *deferred_tail; struct sk_buff *defragm; + bool recv_permitted; } bclink; }; -- cgit v1.2.3-70-g09d2 From c64f7a6a1fb13565687ae5415736095f82557880 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 16 Nov 2012 13:51:31 +0800 Subject: tipc: introduce message to synchronize broadcast link Upon establishing a first link between two nodes, there is currently a risk that the two endpoints will disagree on exactly which sequence number reception and acknowleding of broadcast packets should start. The following scenarios may happen: 1: Node A sends an ACTIVATE message to B, telling it to start acking packets from sequence number N. 2: Node A sends out broadcast N, but does not expect an acknowledge from B, since B is not yet in its broadcast receiver's list. 3: Node A receives ACK for N from all nodes except B, and releases packet N. 4: Node B receives the ACTIVATE, activates its link endpoint, and stores the value N as sequence number of first expected packet. 5: Node B sends a NAME_DISTR message to A. 6: Node A receives the NAME_DISTR message, and activates its endpoint. At this moment B is added to A's broadcast receiver's set. Node A also sets sequence number 0 as the first broadcast packet to be received from B. 7: Node A sends broadcast N+1. 8: B receives N+1, determines there is a gap in the sequence, since it is expecting N, and sends a NACK for N back to A. 9: Node A has already released N, so no retransmission is possible. The broadcast link in direction A->B is stale. In addition to, or instead of, 7-9 above, the following may happen: 10: Node B sends broadcast M > 0 to A. 11: Node A receives M, falsely decides there must be a gap, since it is expecting packet 0, and asks for retransmission of packets [0,M-1]. 12: Node B has already released these packets, so the broadcast link is stale in direction B->A. We solve this problem by introducing a new unicast message type, BCAST_PROTOCOL/STATE, to convey the sequence number of the next sent broadcast packet to the other endpoint, at exactly the moment that endpoint is added to the own node's broadcast receivers list, and before any other unicast messages are permitted to be sent. Furthermore, we don't allow any node to start receiving and processing broadcast packets until this new synchronization message has been received. To maintain backwards compatibility, we still open up for broadcast reception if we receive a NAME_DISTR message without any preceding broadcast sync message. In this case, we must assume that the other end has an older code version, and will never send out the new synchronization message. Hence, for mixed old and new nodes, the issue arising in 7-12 of the above may happen with the same probability as before. Signed-off-by: Jon Maloy Signed-off-by: Ying Xue Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- net/tipc/node.c | 5 ++--- 2 files changed, 60 insertions(+), 6 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/link.c b/net/tipc/link.c index 20f128fc2be..87bf5aad704 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@ /* * net/tipc/link.c: TIPC link code * - * Copyright (c) 1996-2007, Ericsson AB + * Copyright (c) 1996-2007, 2012, Ericsson AB * Copyright (c) 2004-2007, 2010-2011, Wind River Systems * All rights reserved. * @@ -103,6 +103,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); static void link_start(struct tipc_link *l_ptr); static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); +static void tipc_link_send_sync(struct tipc_link *l); +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); /* * Simple link routines @@ -712,6 +714,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) link_activate(l_ptr); tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; + if (l_ptr->owner->working_links == 1) + tipc_link_send_sync(l_ptr); link_set_timer(l_ptr, cont_intv); break; case RESET_MSG: @@ -745,6 +749,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) link_activate(l_ptr); tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; + if (l_ptr->owner->working_links == 1) + tipc_link_send_sync(l_ptr); link_set_timer(l_ptr, cont_intv); break; case RESET_MSG: @@ -941,7 +947,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) return res; } -/** +/* + * tipc_link_send_sync - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + * + * Called with node locked + */ +static void tipc_link_send_sync(struct tipc_link *l) +{ + struct sk_buff *buf; + struct tipc_msg *msg; + + buf = tipc_buf_acquire(INT_H_SIZE); + if (!buf) + return; + + msg = buf_msg(buf); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); + msg_set_last_bcast(msg, l->owner->bclink.acked); + link_add_chain_to_outqueue(l, buf, 0); + tipc_link_push_queue(l); +} + +/* + * tipc_link_recv_sync - synchronize broadcast link endpoints. + * Receive the sequence number where we should start receiving and + * acking broadcast packets from a newly added peer node, and open + * up for reception of such packets. + * + * Called with node locked + */ +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + + n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); + n->bclink.recv_permitted = true; + kfree_skb(buf); +} + +/* * tipc_link_send_names - send name table entries to new neighbor * * Send routine for bulk delivery of name table messages when contact @@ -1691,9 +1738,14 @@ deliver: tipc_link_recv_bundle(buf); continue; case NAME_DISTRIBUTOR: + n_ptr->bclink.recv_permitted = true; tipc_node_unlock(n_ptr); tipc_named_recv(buf); continue; + case BCAST_PROTOCOL: + tipc_link_recv_sync(n_ptr, buf); + tipc_node_unlock(n_ptr); + continue; case CONN_MANAGER: tipc_node_unlock(n_ptr); tipc_port_recv_proto_msg(buf); @@ -1736,16 +1788,19 @@ deliver: continue; } + /* Link is not in state WORKING_WORKING */ if (msg_user(msg) == LINK_PROTOCOL) { link_recv_proto_msg(l_ptr, buf); head = link_insert_deferred_queue(l_ptr, head); tipc_node_unlock(n_ptr); continue; } + + /* Traffic message. Conditionally activate link */ link_state_event(l_ptr, TRAFFIC_MSG_EVT); if (link_working_working(l_ptr)) { - /* Re-insert in front of queue */ + /* Re-insert buffer in front of queue */ buf->next = head; head = buf; tipc_node_unlock(n_ptr); diff --git a/net/tipc/node.c b/net/tipc/node.c index 283a59f0f1c..48f39dd3eae 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1,7 +1,7 @@ /* * net/tipc/node.c: TIPC node management routines * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2012 Ericsson AB * Copyright (c) 2005-2006, 2010-2011, Wind River Systems * All rights reserved. * @@ -263,10 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) static void node_established_contact(struct tipc_node *n_ptr) { tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); - + n_ptr->bclink.oos_state = 0; n_ptr->bclink.acked = tipc_bclink_get_last_sent(); tipc_bclink_add_node(n_ptr->addr); - n_ptr->bclink.recv_permitted = true; } static void node_name_purge_complete(unsigned long node_addr) -- cgit v1.2.3-70-g09d2 From 4cb7d55ab4c4726c5d966d5a19e7b304345e74ca Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Fri, 16 Nov 2012 13:51:32 +0800 Subject: tipc: eliminate an unnecessary cast of node variable As the variable:node is currently defined to u32 type, it is unnecessary to cast its type to u32 again when using it. Signed-off-by: Ying Xue Signed-off-by: Paul Gortmaker --- net/tipc/name_distr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/tipc') diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 55d3928dfd6..e0d08055754 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg) named_distribute(&message_list, node, &publ_zone, max_item_buf); read_unlock_bh(&tipc_nametbl_lock); - tipc_link_send_names(&message_list, (u32)node); + tipc_link_send_names(&message_list, node); } /** -- cgit v1.2.3-70-g09d2 From 94fc9c4719f53264c7cce62ee558781fee7b7128 Mon Sep 17 00:00:00 2001 From: Paul Gortmaker Date: Wed, 21 Nov 2012 20:11:54 -0500 Subject: tipc: delete TIPC_ADVANCED Kconfig variable There used to be a time when TIPC had lots of Kconfig knobs the end user could alter, but they have all been made automatic or obsolete, with the exception of CONFIG_TIPC_PORTS. This previously existing set of options was all hidden under the TIPC_ADVANCED setting, which does not exist in any code, but only in Kconfig scope. Having this now, just to hide the one remaining "advanced" option no longer makes sense. Remove it. Also get rid of the ifdeffery in the TIPC code that allowed for TIPC_PORTS to be possibly undefined. Signed-off-by: Paul Gortmaker --- net/tipc/Kconfig | 13 +------------ net/tipc/core.c | 5 ----- 2 files changed, 1 insertion(+), 17 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig index 585460180ff..bc41bd31ead 100644 --- a/net/tipc/Kconfig +++ b/net/tipc/Kconfig @@ -20,18 +20,9 @@ menuconfig TIPC If in doubt, say N. -if TIPC - -config TIPC_ADVANCED - bool "Advanced TIPC configuration" - default n - help - Saying Y here will open some advanced configuration for TIPC. - Most users do not need to bother; if unsure, just say N. - config TIPC_PORTS int "Maximum number of ports in a node" - depends on TIPC_ADVANCED + depends on TIPC range 127 65535 default "8191" help @@ -40,5 +31,3 @@ config TIPC_PORTS Setting this to a smaller value saves some memory, setting it to higher allows for more ports. - -endif # TIPC diff --git a/net/tipc/core.c b/net/tipc/core.c index bfe8af88469..fc05cecd748 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -42,11 +42,6 @@ #include -#ifndef CONFIG_TIPC_PORTS -#define CONFIG_TIPC_PORTS 8191 -#endif - - /* global variables used by multiple sub-systems within TIPC */ int tipc_random __read_mostly; -- cgit v1.2.3-70-g09d2 From c008413850d1d48cc02c940280bf2dcf76160f4c Mon Sep 17 00:00:00 2001 From: Erik Hugne Date: Wed, 7 Nov 2012 02:40:07 -0500 Subject: tipc: remove obsolete flush of stale reassembly buffer Each link instance has a periodic job checking if there is a stale ongoing message reassembly associated to the link. If no new fragment has been received during the last 4*[link_tolerance] period, it is assumed the missing fragment will never arrive. As a consequence, the reassembly buffer is discarded, and a gap in the message sequence occurs. This assumption is wrong. After we abandoned our ambition to develop packet routing for multi-cluster networks, only single-hop packet transfer remains as an option. For those, all packets are guaranteed to be delivered in sequence to the defragmentation layer. Any failure to achieve sequenced delivery will eventually lead to link reset, and the reassembly buffer will be flushed anyway. So we just remove this periodic check, which is now obsolete. Signed-off-by: Erik Hugne Acked-by: Ying Xue Signed-off-by: Jon Maloy [PG: also delete get/inc_timer count, since they are now unused] Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 44 -------------------------------------------- 1 file changed, 44 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/link.c b/net/tipc/link.c index 87bf5aad704..daa6080a2a0 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -97,7 +97,6 @@ static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, u32 num_sect, unsigned int total_len, u32 destnode); -static void link_check_defragm_bufs(struct tipc_link *l_ptr); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); @@ -271,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr) } /* do all other link processing performed on a periodic basis */ - link_check_defragm_bufs(l_ptr); link_state_event(l_ptr, TIMEOUT_EVT); @@ -2497,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp) msg_set_bcast_ack(buf_msg(buf), exp); } -static u32 get_timer_cnt(struct sk_buff *buf) -{ - return msg_reroute_cnt(buf_msg(buf)); -} - -static void incr_timer_cnt(struct sk_buff *buf) -{ - msg_incr_reroute_cnt(buf_msg(buf)); -} - /* * tipc_link_recv_fragment(): Called with node lock on. Returns * the reassembled buffer if message is complete. @@ -2585,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, return 0; } -/** - * link_check_defragm_bufs - flush stale incoming message fragments - * @l_ptr: pointer to link - */ -static void link_check_defragm_bufs(struct tipc_link *l_ptr) -{ - struct sk_buff *prev = NULL; - struct sk_buff *next = NULL; - struct sk_buff *buf = l_ptr->defragm_buf; - - if (!buf) - return; - if (!link_working_working(l_ptr)) - return; - while (buf) { - u32 cnt = get_timer_cnt(buf); - - next = buf->next; - if (cnt < 4) { - incr_timer_cnt(buf); - prev = buf; - } else { - if (prev) - prev->next = buf->next; - else - l_ptr->defragm_buf = buf->next; - kfree_skb(buf); - } - buf = next; - } -} - static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) { if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) -- cgit v1.2.3-70-g09d2 From 9da3d475874f4da49057767913af95ce01063ba3 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Tue, 27 Nov 2012 06:15:27 -0500 Subject: tipc: eliminate aggregate sk_receive_queue limit As a complement to the per-socket sk_recv_queue limit, TIPC keeps a global atomic counter for the sum of sk_recv_queue sizes across all tipc sockets. When incremented, the counter is compared to an upper threshold value, and if this is reached, the message is rejected with error code TIPC_OVERLOAD. This check was originally meant to protect the node against buffer exhaustion and general CPU overload. However, all experience indicates that the feature not only is redundant on Linux, but even harmful. Users run into the limit very often, causing disturbances for their applications, while removing it seems to have no negative effects at all. We have also seen that overall performance is boosted significantly when this bottleneck is removed. Furthermore, we don't see any other network protocols maintaining such a mechanism, something strengthening our conviction that this control can be eliminated. As a result, the atomic variable tipc_queue_size is now unused and so it can be deleted. There is a getsockopt call that used to allow reading it; we retain that but just return zero for maximum compatibility. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Cc: Neil Horman [PG: phase out tipc_queue_size as pointed out by Neil Horman] Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 1a720c86e80..848be692cb4 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2,7 +2,7 @@ * net/tipc/socket.c: TIPC socket API * * Copyright (c) 2001-2007, Ericsson AB - * Copyright (c) 2004-2008, 2010-2011, Wind River Systems + * Copyright (c) 2004-2008, 2010-2012, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -73,8 +73,6 @@ static struct proto tipc_proto; static int sockets_enabled; -static atomic_t tipc_queue_size = ATOMIC_INIT(0); - /* * Revised TIPC socket locking policy: * @@ -128,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0); static void advance_rx_queue(struct sock *sk) { kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); - atomic_dec(&tipc_queue_size); } /** @@ -140,10 +137,8 @@ static void discard_rx_queue(struct sock *sk) { struct sk_buff *buf; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { - atomic_dec(&tipc_queue_size); + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) kfree_skb(buf); - } } /** @@ -155,10 +150,8 @@ static void reject_rx_queue(struct sock *sk) { struct sk_buff *buf; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) tipc_reject_msg(buf, TIPC_ERR_NO_PORT); - atomic_dec(&tipc_queue_size); - } } /** @@ -280,7 +273,6 @@ static int release(struct socket *sock) buf = __skb_dequeue(&sk->sk_receive_queue); if (buf == NULL) break; - atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) kfree_skb(buf); else { @@ -1241,11 +1233,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) } /* Reject message if there isn't room to queue it */ - recv_q_len = (u32)atomic_read(&tipc_queue_size); - if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) { - if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE)) - return TIPC_ERR_OVERLOAD; - } recv_q_len = skb_queue_len(&sk->sk_receive_queue); if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) @@ -1254,7 +1241,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) /* Enqueue message (finally!) */ TIPC_SKB_CB(buf)->handle = 0; - atomic_inc(&tipc_queue_size); __skb_queue_tail(&sk->sk_receive_queue, buf); /* Initiate connection termination for an incoming 'FIN' */ @@ -1578,7 +1564,6 @@ restart: /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ buf = __skb_dequeue(&sk->sk_receive_queue); if (buf) { - atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) { kfree_skb(buf); goto restart; @@ -1717,7 +1702,7 @@ static int getsockopt(struct socket *sock, /* no need to set "res", since already 0 at this point */ break; case TIPC_NODE_RECVQ_DEPTH: - value = (u32)atomic_read(&tipc_queue_size); + value = 0; /* was tipc_queue_size, now obsolete */ break; case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); -- cgit v1.2.3-70-g09d2 From e643df156ade104b0430588562d25b8638683fc1 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Tue, 27 Nov 2012 06:15:29 -0500 Subject: tipc: change sk_receive_queue upper limit The sk_recv_queue upper limit for connectionless sockets has empirically turned out to be too low. When we double the current limit we get much fewer rejected messages and no noticable negative side-effects. Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 848be692cb4..f553fdecc1d 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, Ericsson AB + * Copyright (c) 2001-2007, 2012 Ericsson AB * Copyright (c) 2004-2008, 2010-2012, Wind River Systems * All rights reserved. * @@ -43,7 +43,7 @@ #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define OVERLOAD_LIMIT_BASE 5000 +#define OVERLOAD_LIMIT_BASE 10000 #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ struct tipc_sock { -- cgit v1.2.3-70-g09d2 From bc879117d4cf2a6fcf5c5a43f157143bbbe88e84 Mon Sep 17 00:00:00 2001 From: Paul Gortmaker Date: Thu, 29 Nov 2012 13:48:40 -0500 Subject: tipc: standardize across connect/disconnect function naming Currently we have tipc_disconnect and tipc_disconnect_port. It is not clear from the names alone, what they do or how they differ. It turns out that tipc_disconnect just deals with the port locking and then calls tipc_disconnect_port which does all the work. If we rename as follows: tipc_disconnect_port --> __tipc_disconnect then we will be following typical linux convention, where: __tipc_disconnect: "raw" function that does all the work. tipc_disconnect: wrapper that deals with locking and then calls the real core __tipc_disconnect function With this, the difference is immediately evident, and locking violations are more apt to be spotted by chance while working on, or even just while reading the code. On the connect side of things, we currently only have the single "tipc_connect2port" function. It does both the locking at enter/exit, and the core of the work. Pending changes will make it desireable to have the connect be a two part locking wrapper + worker function, just like the disconnect is already. Here, we make the connect look just like the updated disconnect case, for the above reason, and for consistency. In the process, we also get rid of the "2port" suffix that was on the original name, since it adds no descriptive value. On close examination, one might notice that the above connect changes implicitly move the call to tipc_link_get_max_pkt() to be within the scope of tipc_port_lock() protected region; when it was not previously. We don't see any issues with this, and it is in keeping with __tipc_connect doing the work and tipc_connect just handling the locking. Signed-off-by: Paul Gortmaker --- net/tipc/port.c | 32 +++++++++++++++++++++++--------- net/tipc/port.h | 6 ++++-- net/tipc/socket.c | 6 +++--- net/tipc/subscr.c | 2 +- 4 files changed, 31 insertions(+), 15 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/port.c b/net/tipc/port.c index 07c42fba672..18098cac62f 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy) if (unlikely(!cb)) goto reject; if (unlikely(!connected)) { - if (tipc_connect2port(dref, &orig)) + if (tipc_connect(dref, &orig)) goto reject; } else if (peer_invalid) goto reject; @@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) return res; } -int tipc_connect2port(u32 ref, struct tipc_portid const *peer) +int tipc_connect(u32 ref, struct tipc_portid const *peer) { struct tipc_port *p_ptr; - struct tipc_msg *msg; - int res = -EINVAL; + int res; p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; + res = __tipc_connect(ref, p_ptr, peer); + tipc_port_unlock(p_ptr); + return res; +} + +/* + * __tipc_connect - connect to a remote peer + * + * Port must be locked. + */ +int __tipc_connect(u32 ref, struct tipc_port *p_ptr, + struct tipc_portid const *peer) +{ + struct tipc_msg *msg; + int res = -EINVAL; + if (p_ptr->published || p_ptr->connected) goto exit; if (!peer->ref) @@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer) (net_ev_handler)port_handle_node_down); res = 0; exit: - tipc_port_unlock(p_ptr); p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); return res; } -/** - * tipc_disconnect_port - disconnect port from peer +/* + * __tipc_disconnect - disconnect port from peer * * Port must be locked. */ -int tipc_disconnect_port(struct tipc_port *tp_ptr) +int __tipc_disconnect(struct tipc_port *tp_ptr) { int res; @@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref) p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; - res = tipc_disconnect_port(p_ptr); + res = __tipc_disconnect(p_ptr); tipc_port_unlock(p_ptr); return res; } diff --git a/net/tipc/port.h b/net/tipc/port.h index 4660e306579..fb66e2e5f4d 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope, int tipc_withdraw(u32 portref, unsigned int scope, struct tipc_name_seq const *name_seq); -int tipc_connect2port(u32 portref, struct tipc_portid const *port); +int tipc_connect(u32 portref, struct tipc_portid const *port); int tipc_disconnect(u32 portref); @@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref); /* * The following routines require that the port be locked on entry */ -int tipc_disconnect_port(struct tipc_port *tp_ptr); +int __tipc_disconnect(struct tipc_port *tp_ptr); +int __tipc_connect(u32 ref, struct tipc_port *p_ptr, + struct tipc_portid const *peer); int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); /* diff --git a/net/tipc/socket.c b/net/tipc/socket.c index f553fdecc1d..b630f3839ca 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -783,7 +783,7 @@ static int auto_connect(struct socket *sock, struct tipc_msg *msg) tsock->peer_name.ref = msg_origport(msg); tsock->peer_name.node = msg_orignode(msg); - tipc_connect2port(tsock->p->ref, &tsock->peer_name); + tipc_connect(tsock->p->ref, &tsock->peer_name); tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); sock->state = SS_CONNECTED; return 0; @@ -1246,7 +1246,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) /* Initiate connection termination for an incoming 'FIN' */ if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { sock->state = SS_DISCONNECTING; - tipc_disconnect_port(tipc_sk_port(sk)); + __tipc_disconnect(tipc_sk_port(sk)); } sk->sk_data_ready(sk, 0); @@ -1506,7 +1506,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) /* Connect new socket to it's peer */ new_tsock->peer_name.ref = msg_origport(msg); new_tsock->peer_name.node = msg_orignode(msg); - tipc_connect2port(new_ref, &new_tsock->peer_name); + tipc_connect(new_ref, &new_tsock->peer_name); new_sock->state = SS_CONNECTED; tipc_set_portimportance(new_ref, msg_importance(msg)); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 0f7d0d007e2..6b42d47029a 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle, kfree(subscriber); return; } - tipc_connect2port(subscriber->port_ref, orig); + tipc_connect(subscriber->port_ref, orig); /* Lock server port (& save lock address for future use) */ subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; -- cgit v1.2.3-70-g09d2 From 7e6c131e1568dcc2033736739a9880dce1976886 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 29 Nov 2012 18:39:14 -0500 Subject: tipc: consolidate connection-oriented message reception in one function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Handling of connection-related message reception is currently scattered around at different places in the code. This makes it harder to verify that things are handled correctly in all possible scenarios. So we consolidate the existing processing of connection-oriented message reception in a single routine. In the process, we convert the chain of if/else into a switch/case for improved readability. A cast on the socket_state in the switch is needed to avoid compile warnings on 32 bit, like "net/tipc/socket.c:1252:2: warning: case value ‘4294967295’ not in enumerated type". This happens because existing tipc code pseudo extends the default linux socket state values with: #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ It may make sense to add these as _positive_ values to the existing socket state enum list someday, vs. these already existing defines. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy [PG: add cast to fix warning; remove returns from middle of switch] Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 75 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 24 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b630f3839ca..d16a6de32ea 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1186,6 +1186,53 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base) return queue_size >= threshold; } +/** + * filter_connect - Handle all incoming messages for a connection-based socket + * @tsock: TIPC socket + * @msg: message + * + * Returns TIPC error status code and socket error status code + * once it encounters some errors + */ +static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) +{ + struct socket *sock = tsock->sk.sk_socket; + struct tipc_msg *msg = buf_msg(*buf); + u32 retval = TIPC_ERR_NO_PORT; + + if (msg_mcast(msg)) + return retval; + + switch ((int)sock->state) { + case SS_CONNECTED: + /* Accept only connection-based messages sent by peer */ + if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) { + if (unlikely(msg_errcode(msg))) { + sock->state = SS_DISCONNECTING; + __tipc_disconnect(tsock->p); + } + retval = TIPC_OK; + } + break; + case SS_CONNECTING: + /* Accept only ACK or NACK message */ + if (msg_connected(msg) || (msg_errcode(msg))) + retval = TIPC_OK; + break; + case SS_LISTENING: + case SS_UNCONNECTED: + /* Accept only SYN message */ + if (!msg_connected(msg) && !(msg_errcode(msg))) + retval = TIPC_OK; + break; + case SS_DISCONNECTING: + break; + default: + pr_err("Unknown socket state %u\n", sock->state); + } + return retval; +} + /** * filter_rcv - validate incoming message * @sk: socket @@ -1203,6 +1250,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(buf); u32 recv_q_len; + u32 res = TIPC_OK; /* Reject message if it is wrong sort of message for socket */ if (msg_type(msg) > TIPC_DIRECT_MSG) @@ -1212,24 +1260,9 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) if (msg_connected(msg)) return TIPC_ERR_NO_PORT; } else { - if (msg_mcast(msg)) - return TIPC_ERR_NO_PORT; - if (sock->state == SS_CONNECTED) { - if (!msg_connected(msg) || - !tipc_port_peer_msg(tipc_sk_port(sk), msg)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_CONNECTING) { - if (!msg_connected(msg) && (msg_errcode(msg) == 0)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_LISTENING) { - if (msg_connected(msg) || msg_errcode(msg)) - return TIPC_ERR_NO_PORT; - } else if (sock->state == SS_DISCONNECTING) { - return TIPC_ERR_NO_PORT; - } else /* (sock->state == SS_UNCONNECTED) */ { - if (msg_connected(msg) || msg_errcode(msg)) - return TIPC_ERR_NO_PORT; - } + res = filter_connect(tipc_sk(sk), &buf); + if (res != TIPC_OK || buf == NULL) + return res; } /* Reject message if there isn't room to queue it */ @@ -1243,12 +1276,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) TIPC_SKB_CB(buf)->handle = 0; __skb_queue_tail(&sk->sk_receive_queue, buf); - /* Initiate connection termination for an incoming 'FIN' */ - if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { - sock->state = SS_DISCONNECTING; - __tipc_disconnect(tipc_sk_port(sk)); - } - sk->sk_data_ready(sk, 0); return TIPC_OK; } -- cgit v1.2.3-70-g09d2 From 584d24b3960e4c877fc623214815f278708f127c Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 29 Nov 2012 18:51:19 -0500 Subject: tipc: introduce non-blocking socket connect TIPC has so far only supported blocking connect(), meaning that a call to connect() doesn't return until either the connection is fully established, or an error occurs. This has proved insufficient for many users, so we now introduce non-blocking connect(), analogous to how this is done in TCP and other protocols. With this feature, if a connection cannot be established instantly, connect() will return the error code "-EINPROGRESS". If the user later calls connect() again, he will either have the return code "-EALREADY" or "-EISCONN", depending on whether the connection has been established or not. The user must have explicitly set the socket to be non-blocking (SOCK_NONBLOCK or O_NONBLOCK, depending on method used), so unless for some reason they had set this already (the socket would anyway remain blocking in current TIPC) this change should be completely backwards compatible. It is also now possible to call select() or poll() to wait for the completion of a connection. An effect of the above is that the actual completion of a connection may now be performed asynchronously, independent of the calls from user space. Therefore, we now execute this code in BH context, in the function filter_rcv(), which is executed upon reception of messages in the socket. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy [PG: minor refactoring for improved connect/disconnect function names] Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 158 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 93 insertions(+), 65 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d16a6de32ea..dbce2745f0a 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -775,16 +775,19 @@ exit: static int auto_connect(struct socket *sock, struct tipc_msg *msg) { struct tipc_sock *tsock = tipc_sk(sock->sk); - - if (msg_errcode(msg)) { - sock->state = SS_DISCONNECTING; - return -ECONNREFUSED; - } + struct tipc_port *p_ptr; tsock->peer_name.ref = msg_origport(msg); tsock->peer_name.node = msg_orignode(msg); - tipc_connect(tsock->p->ref, &tsock->peer_name); - tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); + p_ptr = tipc_port_deref(tsock->p->ref); + if (!p_ptr) + return -EINVAL; + + __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name); + + if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) + return -EINVAL; + msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg)); sock->state = SS_CONNECTED; return 0; } @@ -1198,7 +1201,9 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) { struct socket *sock = tsock->sk.sk_socket; struct tipc_msg *msg = buf_msg(*buf); + struct sock *sk = &tsock->sk; u32 retval = TIPC_ERR_NO_PORT; + int res; if (msg_mcast(msg)) return retval; @@ -1216,8 +1221,36 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) break; case SS_CONNECTING: /* Accept only ACK or NACK message */ - if (msg_connected(msg) || (msg_errcode(msg))) + if (unlikely(msg_errcode(msg))) { + sock->state = SS_DISCONNECTING; + sk->sk_err = -ECONNREFUSED; + retval = TIPC_OK; + break; + } + + if (unlikely(!msg_connected(msg))) + break; + + res = auto_connect(sock, msg); + if (res) { + sock->state = SS_DISCONNECTING; + sk->sk_err = res; retval = TIPC_OK; + break; + } + + /* If an incoming message is an 'ACK-', it should be + * discarded here because it doesn't contain useful + * data. In addition, we should try to wake up + * connect() routine if sleeping. + */ + if (msg_data_sz(msg) == 0) { + kfree_skb(*buf); + *buf = NULL; + if (waitqueue_active(sk_sleep(sk))) + wake_up_interruptible(sk_sleep(sk)); + } + retval = TIPC_OK; break; case SS_LISTENING: case SS_UNCONNECTED: @@ -1361,8 +1394,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, struct sock *sk = sock->sk; struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; struct msghdr m = {NULL,}; - struct sk_buff *buf; - struct tipc_msg *msg; unsigned int timeout; int res; @@ -1374,26 +1405,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - /* For now, TIPC does not support the non-blocking form of connect() */ - if (flags & O_NONBLOCK) { - res = -EOPNOTSUPP; - goto exit; - } - - /* Issue Posix-compliant error code if socket is in the wrong state */ - if (sock->state == SS_LISTENING) { - res = -EOPNOTSUPP; - goto exit; - } - if (sock->state == SS_CONNECTING) { - res = -EALREADY; - goto exit; - } - if (sock->state != SS_UNCONNECTED) { - res = -EISCONN; - goto exit; - } - /* * Reject connection attempt using multicast address * @@ -1405,49 +1416,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - /* Reject any messages already in receive queue (very unlikely) */ - reject_rx_queue(sk); + timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; + + switch (sock->state) { + case SS_UNCONNECTED: + /* Send a 'SYN-' to destination */ + m.msg_name = dest; + m.msg_namelen = destlen; + + /* If connect is in non-blocking case, set MSG_DONTWAIT to + * indicate send_msg() is never blocked. + */ + if (!timeout) + m.msg_flags = MSG_DONTWAIT; + + res = send_msg(NULL, sock, &m, 0); + if ((res < 0) && (res != -EWOULDBLOCK)) + goto exit; - /* Send a 'SYN-' to destination */ - m.msg_name = dest; - m.msg_namelen = destlen; - res = send_msg(NULL, sock, &m, 0); - if (res < 0) + /* Just entered SS_CONNECTING state; the only + * difference is that return value in non-blocking + * case is EINPROGRESS, rather than EALREADY. + */ + res = -EINPROGRESS; + break; + case SS_CONNECTING: + res = -EALREADY; + break; + case SS_CONNECTED: + res = -EISCONN; + break; + default: + res = -EINVAL; goto exit; + } - /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ - timeout = tipc_sk(sk)->conn_timeout; - release_sock(sk); - res = wait_event_interruptible_timeout(*sk_sleep(sk), - (!skb_queue_empty(&sk->sk_receive_queue) || - (sock->state != SS_CONNECTING)), - timeout ? (long)msecs_to_jiffies(timeout) - : MAX_SCHEDULE_TIMEOUT); - lock_sock(sk); + if (sock->state == SS_CONNECTING) { + if (!timeout) + goto exit; - if (res > 0) { - buf = skb_peek(&sk->sk_receive_queue); - if (buf != NULL) { - msg = buf_msg(buf); - res = auto_connect(sock, msg); - if (!res) { - if (!msg_data_sz(msg)) - advance_rx_queue(sk); - } - } else { - if (sock->state == SS_CONNECTED) - res = -EISCONN; + /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ + release_sock(sk); + res = wait_event_interruptible_timeout(*sk_sleep(sk), + sock->state != SS_CONNECTING, + timeout ? (long)msecs_to_jiffies(timeout) + : MAX_SCHEDULE_TIMEOUT); + lock_sock(sk); + if (res <= 0) { + if (res == 0) + res = -ETIMEDOUT; else - res = -ECONNREFUSED; + ; /* leave "res" unchanged */ + goto exit; } - } else { - if (res == 0) - res = -ETIMEDOUT; - else - ; /* leave "res" unchanged */ - sock->state = SS_DISCONNECTING; } + if (unlikely(sock->state == SS_DISCONNECTING)) + res = sock_error(sk); + else + res = 0; + exit: release_sock(sk); return res; -- cgit v1.2.3-70-g09d2 From cbab368790f23bc917d97fcf7a338c5ba5336ee0 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Thu, 29 Nov 2012 16:28:30 +0800 Subject: tipc: eliminate connection setup for implied connect in recv_msg() As connection setup is now completed asynchronously in BH context, in the function filter_connect(), the corresponding code in recv_msg() becomes redundant. Signed-off-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 7 ------- 1 file changed, 7 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index dbce2745f0a..ef75b627024 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -946,13 +946,6 @@ restart: sz = msg_data_sz(msg); err = msg_errcode(msg); - /* Complete connection setup for an implied connect */ - if (unlikely(sock->state == SS_CONNECTING)) { - res = auto_connect(sock, msg); - if (res) - goto exit; - } - /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { advance_rx_queue(sk); -- cgit v1.2.3-70-g09d2 From 258f8667a29d72b1c220065632b39c0faeb061ca Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Mon, 3 Dec 2012 16:12:07 +0800 Subject: tipc: add lock nesting notation to quiet lockdep warning TIPC accept() call grabs the socket lock on a newly allocated socket while holding the socket lock on an old socket. But lockdep worries that this might be a recursive lock attempt: [ INFO: possible recursive locking detected ] --------------------------------------------- kworker/u:0/6 is trying to acquire lock: (sk_lock-AF_TIPC){+.+.+.}, at: [] accept+0x15c/0x310 [tipc] but task is already holding lock: (sk_lock-AF_TIPC){+.+.+.}, at: [] accept+0x28/0x310 [tipc] other info that might help us debug this: Possible unsafe locking scenario: CPU0 ---- lock(sk_lock-AF_TIPC); lock(sk_lock-AF_TIPC); *** DEADLOCK *** May be due to missing lock nesting notation [...] Tell lockdep that this locking is safe by using lock_sock_nested(). This is similar to what was done in commit 5131a184a3458d9 for SCTP code ("SCTP: lock_sock_nested in sctp_sock_migrate"). Also note that this is isn't something that is seen normally, as it was uncovered with some experimental work-in-progress code not yet ready for mainline. So no need for stable backports or similar of this commit. Signed-off-by: Ying Xue Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index ef75b627024..b5c9795cf15 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1543,7 +1543,8 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) u32 new_ref = new_tport->ref; struct tipc_msg *msg = buf_msg(buf); - lock_sock(new_sk); + /* we lock on new_sk; but lockdep sees the lock on sk */ + lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); /* * Reject any stray messages received by new socket -- cgit v1.2.3-70-g09d2 From 0fef8f205f6f4cdff1869e54e44f317a79902785 Mon Sep 17 00:00:00 2001 From: Paul Gortmaker Date: Tue, 4 Dec 2012 11:01:55 -0500 Subject: tipc: refactor accept() code for improved readability In TIPC's accept() routine, there is a large block of code relating to initialization of a new socket, all within an if condition checking if the allocation succeeded. Here, we simply flip the check of the if, so that the main execution path stays at the same indentation level, which improves readability. If the allocation fails, we jump to an already existing exit label. Signed-off-by: Paul Gortmaker --- net/tipc/socket.c | 89 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 41 deletions(-) (limited to 'net/tipc') diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b5c9795cf15..9b4e4833a48 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1509,8 +1509,13 @@ static int listen(struct socket *sock, int len) */ static int accept(struct socket *sock, struct socket *new_sock, int flags) { - struct sock *sk = sock->sk; + struct sock *new_sk, *sk = sock->sk; struct sk_buff *buf; + struct tipc_sock *new_tsock; + struct tipc_port *new_tport; + struct tipc_msg *msg; + u32 new_ref; + int res; lock_sock(sk); @@ -1536,49 +1541,51 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) buf = skb_peek(&sk->sk_receive_queue); res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); - if (!res) { - struct sock *new_sk = new_sock->sk; - struct tipc_sock *new_tsock = tipc_sk(new_sk); - struct tipc_port *new_tport = new_tsock->p; - u32 new_ref = new_tport->ref; - struct tipc_msg *msg = buf_msg(buf); - - /* we lock on new_sk; but lockdep sees the lock on sk */ - lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); - - /* - * Reject any stray messages received by new socket - * before the socket lock was taken (very, very unlikely) - */ - reject_rx_queue(new_sk); - - /* Connect new socket to it's peer */ - new_tsock->peer_name.ref = msg_origport(msg); - new_tsock->peer_name.node = msg_orignode(msg); - tipc_connect(new_ref, &new_tsock->peer_name); - new_sock->state = SS_CONNECTED; - - tipc_set_portimportance(new_ref, msg_importance(msg)); - if (msg_named(msg)) { - new_tport->conn_type = msg_nametype(msg); - new_tport->conn_instance = msg_nameinst(msg); - } + if (res) + goto exit; - /* - * Respond to 'SYN-' by discarding it & returning 'ACK'-. - * Respond to 'SYN+' by queuing it on new socket. - */ - if (!msg_data_sz(msg)) { - struct msghdr m = {NULL,}; + new_sk = new_sock->sk; + new_tsock = tipc_sk(new_sk); + new_tport = new_tsock->p; + new_ref = new_tport->ref; + msg = buf_msg(buf); - advance_rx_queue(sk); - send_packet(NULL, new_sock, &m, 0); - } else { - __skb_dequeue(&sk->sk_receive_queue); - __skb_queue_head(&new_sk->sk_receive_queue, buf); - } - release_sock(new_sk); + /* we lock on new_sk; but lockdep sees the lock on sk */ + lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); + + /* + * Reject any stray messages received by new socket + * before the socket lock was taken (very, very unlikely) + */ + reject_rx_queue(new_sk); + + /* Connect new socket to it's peer */ + new_tsock->peer_name.ref = msg_origport(msg); + new_tsock->peer_name.node = msg_orignode(msg); + tipc_connect(new_ref, &new_tsock->peer_name); + new_sock->state = SS_CONNECTED; + + tipc_set_portimportance(new_ref, msg_importance(msg)); + if (msg_named(msg)) { + new_tport->conn_type = msg_nametype(msg); + new_tport->conn_instance = msg_nameinst(msg); } + + /* + * Respond to 'SYN-' by discarding it & returning 'ACK'-. + * Respond to 'SYN+' by queuing it on new socket. + */ + if (!msg_data_sz(msg)) { + struct msghdr m = {NULL,}; + + advance_rx_queue(sk); + send_packet(NULL, new_sock, &m, 0); + } else { + __skb_dequeue(&sk->sk_receive_queue); + __skb_queue_head(&new_sk->sk_receive_queue, buf); + } + release_sock(new_sk); + exit: release_sock(sk); return res; -- cgit v1.2.3-70-g09d2