From 8809b255a9fca8c3179491d3bc9268c42e23ba97 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 10:44:35 -0400 Subject: tipc: improve the link deferred queue insertion algorithm Re-code the algorithm for inserting an out-of-sequence message into a unicast or broadcast link's deferred message queue. It remains functionally equivalent but should be easier to understand/maintain. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 48 +++++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 25 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index ac1832a66f8..c317abf74a7 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1853,17 +1853,16 @@ cont: } /* - * link_defer_buf(): Sort a received out-of-sequence packet - * into the deferred reception queue. - * Returns the increase of the queue length,i.e. 0 or 1 + * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue + * + * Returns increase in queue length (i.e. 0 or 1) */ -u32 tipc_link_defer_pkt(struct sk_buff **head, - struct sk_buff **tail, +u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, struct sk_buff *buf) { - struct sk_buff *prev = NULL; - struct sk_buff *crs = *head; + struct sk_buff *queue_buf; + struct sk_buff **prev; u32 seq_no = buf_seqno(buf); buf->next = NULL; @@ -1881,31 +1880,30 @@ u32 tipc_link_defer_pkt(struct sk_buff **head, return 1; } - /* Scan through queue and sort it in */ - do { - struct tipc_msg *msg = buf_msg(crs); + /* Locate insertion point in queue, then insert; discard if duplicate */ + prev = head; + queue_buf = *head; + for (;;) { + u32 curr_seqno = buf_seqno(queue_buf); - if (less(seq_no, msg_seqno(msg))) { - buf->next = crs; - if (prev) - prev->next = buf; - else - *head = buf; - return 1; + if (seq_no == curr_seqno) { + buf_discard(buf); + return 0; } - if (seq_no == msg_seqno(msg)) + + if (less(seq_no, curr_seqno)) break; - prev = crs; - crs = crs->next; - } while (crs); - /* Message is a duplicate of an existing message */ + prev = &queue_buf->next; + queue_buf = queue_buf->next; + } - buf_discard(buf); - return 0; + buf->next = queue_buf; + *prev = buf; + return 1; } -/** +/* * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet */ -- cgit v1.2.3-70-g09d2 From 92d2c905b404d8d056ce35a0ce645e23529742c2 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 11:20:26 -0400 Subject: tipc: Prevent transmission of outdated link protocol messages Ensures that a link endpoint discards any previously deferred link protocol message whenever it attempts to send a new one. Previously, it was possible for a link protocol message that was unsent due to congestion to be transmitted after newer protocol messages had been sent. The stale link protocol message might then cause the receiving link endpoint to malfunction because of its outdated conent. Thanks to Osamu Kaminuma [okaminum@avaya.com] for diagnosing the problem and contributing a prototype patch. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 53 +++++++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 26 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index c317abf74a7..bee316ce387 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1954,6 +1954,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, u32 msg_size = sizeof(l_ptr->proto_msg); int r_flag; + /* Discard any previous message that was deferred due to congestion */ + + if (l_ptr->proto_msg_queue) { + buf_discard(l_ptr->proto_msg_queue); + l_ptr->proto_msg_queue = NULL; + } + if (link_blocked(l_ptr)) return; @@ -1962,6 +1969,8 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) return; + /* Create protocol message with "out-of-sequence" sequence number */ + msg_set_type(msg, msg_typ); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); @@ -2018,44 +2027,36 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); msg_set_redundant_link(msg, r_flag); msg_set_linkprio(msg, l_ptr->priority); - - /* Ensure sequence number will not fit : */ + msg_set_size(msg, msg_size); msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); - /* Congestion? */ - - if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { - if (!l_ptr->proto_msg_queue) { - l_ptr->proto_msg_queue = - tipc_buf_acquire(sizeof(l_ptr->proto_msg)); - } - buf = l_ptr->proto_msg_queue; - if (!buf) - return; - skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - return; - } - - /* Message can be sent */ - buf = tipc_buf_acquire(msg_size); if (!buf) return; skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); - msg_set_size(buf_msg(buf), msg_size); - if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { - l_ptr->unacked_window = 0; - buf_discard(buf); + /* Defer message if bearer is already congested */ + + if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { + l_ptr->proto_msg_queue = buf; + return; + } + + /* Defer message if attempting to send results in bearer congestion */ + + if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { + tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); + l_ptr->proto_msg_queue = buf; + l_ptr->stats.bearer_congs++; return; } - /* New congestion */ - tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); - l_ptr->proto_msg_queue = buf; - l_ptr->stats.bearer_congs++; + /* Discard message if it was sent successfully */ + + l_ptr->unacked_window = 0; + buf_discard(buf); } /* -- cgit v1.2.3-70-g09d2 From 4d75313ce9b832efc4efb487f080b5ed72beae2c Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 12:19:05 -0400 Subject: tipc: Prevent broadcast link stalling in dual LAN environments Ensure that sequence number information about incoming broadcast link messages is initialized only by the activation of the first link to a given cluster node. Previously, a race condition allowed reset and/or activation messages for a second link to re-initialize this sequence number information with obsolete values. This could trigger TIPC to request the retransmission of previously acknowledged broadcast link messages from that node, resulting in broadcast link processing becoming stalled if the node had already released one or more of those messages and was unable to perform the required retransmission. Thanks to Laser for identifying this problem and assisting in the development of this fix. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index bee316ce387..4ea6cad1174 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2128,14 +2128,15 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } l_ptr->owner->bclink.supported = (max_pkt_info != 0); + /* Synchronize broadcast link info, if not done previously */ + + if (!tipc_node_is_up(l_ptr->owner)) + l_ptr->owner->bclink.last_in = msg_last_bcast(msg); + link_state_event(l_ptr, msg_type(msg)); l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); - - /* Synchronize broadcast sequence numbers */ - if (!tipc_node_redundant_links(l_ptr->owner)) - l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); break; case STATE_MSG: -- cgit v1.2.3-70-g09d2 From 934993137199ffb56fef50664f87e71cdb3471b0 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Tue, 25 Oct 2011 15:14:46 -0400 Subject: tipc: Ensure broadcast link re-acquires node after link failure Fix a bug that can prevent TIPC from sending broadcast messages to a node if contact with the node is lost and then regained. The problem occurs if the broadcast link first clears the flag indicating the node is part of the link's distribution set (when it loses contact with the node), and later fails to restore the flag (when contact is regained); restoration fails if contact with the node is regained by implicit unicast link activation triggered by the arrival of a data message, rather than explicitly by the arrival of a link activation message. The broadcast link now uses separate fields to track whether a node is theoretically capable of receiving broadcast messages versus whether it is actually part of the link's distribution set. The former member is updated by the receipt of link protocol messages, which can occur at any time; the latter member is updated only when contact with the node is gained or lost. This change also permits the simplification of several conditional expressions since the broadcast link's "supported" field can now only be set if there are working links to the associated node. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 5 +++-- net/tipc/node.c | 3 ++- net/tipc/node.h | 4 +++- 3 files changed, 8 insertions(+), 4 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 4ea6cad1174..3405f560a84 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1502,6 +1502,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_addr_string_fill(addr_string, n_ptr->addr); info("Multicast link info for %s\n", addr_string); + info("Supportable: %d, ", n_ptr->bclink.supportable); info("Supported: %d, ", n_ptr->bclink.supported); info("Acked: %u\n", n_ptr->bclink.acked); info("Last in: %u, ", n_ptr->bclink.last_in); @@ -1736,7 +1737,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) /* Release acked messages */ - if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) + if (n_ptr->bclink.supported) tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); crs = l_ptr->first_out; @@ -2126,7 +2127,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } else { l_ptr->max_pkt = l_ptr->max_pkt_target; } - l_ptr->owner->bclink.supported = (max_pkt_info != 0); + l_ptr->owner->bclink.supportable = (max_pkt_info != 0); /* Synchronize broadcast link info, if not done previously */ diff --git a/net/tipc/node.c b/net/tipc/node.c index 6b226faad89..9196f943b83 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -306,8 +306,9 @@ static void node_established_contact(struct tipc_node *n_ptr) /* Syncronize broadcast acks */ n_ptr->bclink.acked = tipc_bclink_get_last_sent(); - if (n_ptr->bclink.supported) { + if (n_ptr->bclink.supportable) { tipc_bclink_add_node(n_ptr->addr); + n_ptr->bclink.supported = 1; if (n_ptr->addr < tipc_own_addr) tipc_own_tag++; } diff --git a/net/tipc/node.h b/net/tipc/node.h index 0b1c5f8b699..90689f48761 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -62,6 +62,7 @@ * @link_cnt: number of links to node * @permit_changeover: non-zero if node has redundant links to this system * @bclink: broadcast-related info + * @supportable: non-zero if node supports TIPC b'cast link capability * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node @@ -86,7 +87,8 @@ struct tipc_node { int block_setup; int permit_changeover; struct { - int supported; + u8 supportable; + u8 supported; u32 acked; u32 last_in; u32 gap_after; -- cgit v1.2.3-70-g09d2 From 47361c87c504d89f1ba50b4230d56ef67792c258 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 26 Oct 2011 10:55:16 -0400 Subject: tipc: Fix problem with broadcast link synchronization between nodes Corrects a problem in which a link endpoint that activates as the result of receiving a RESET/STATE sequence of link protocol messages fails to properly record the broadcast link status information about the node to which it is now communicating with. (The problem does not occur with the more common RESET/ACTIVATE sequence of messages.) The fix ensures that the broadcast link status info is updated after the RESET message resets the link endpoint, rather than before, thereby preventing new information from being overwritten by the reset operation. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index 3405f560a84..1150ba5a648 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2105,6 +2105,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) l_ptr->owner->block_setup = WAIT_NODE_DOWN; } + link_state_event(l_ptr, RESET_MSG); + /* fall thru' */ case ACTIVATE_MSG: /* Update link settings according other endpoint's values */ @@ -2134,10 +2136,11 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) if (!tipc_node_is_up(l_ptr->owner)) l_ptr->owner->bclink.last_in = msg_last_bcast(msg); - link_state_event(l_ptr, msg_type(msg)); - l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); + + if (msg_type(msg) == ACTIVATE_MSG) + link_state_event(l_ptr, ACTIVATE_MSG); break; case STATE_MSG: -- cgit v1.2.3-70-g09d2 From 7a54d4a99dcbbfdf1d4550faa19b615091137953 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Thu, 27 Oct 2011 14:17:53 -0400 Subject: tipc: Major redesign of broadcast link ACK/NACK algorithms Completely redesigns broadcast link ACK and NACK mechanisms to prevent spurious retransmit requests in dual LAN networks, and to prevent the broadcast link from stalling due to the failure of a receiving node to acknowledge receiving a broadcast message or request its retransmission. Note: These changes only impact the timing of when ACK and NACK messages are sent, and not the basic broadcast link protocol itself, so inter- operability with nodes using the "classic" algorithms is maintained. The revised algorithms are as follows: 1) An explicit ACK message is still sent after receiving 16 in-sequence messages, and implicit ACK information continues to be carried in other unicast link message headers (including link state messages). However, the timing of explicit ACKs is now based on the receiving node's absolute network address rather than its relative network address to ensure that the failure of another node does not delay the ACK beyond its 16 message target. 2) A NACK message is now typically sent only when a message gap persists for two consecutive incoming link state messages; this ensures that a suspected gap is not confirmed until both LANs in a dual LAN network have had an opportunity to deliver the message, thereby preventing spurious NACKs. A NACK message can also be generated by the arrival of a single link state message, if the deferred queue is so big that the current message gap cannot be the result of "normal" mis-ordering due to the use of dual LANs (or one LAN using a bonded interface). Since link state messages typically arrive at different nodes at different times the problem of multiple nodes issuing identical NACKs simultaneously is inherently avoided. 3) Nodes continue to "peek" at NACK messages sent by other nodes. If another node requests retransmission of a message gap suspected (but not yet confirmed) by the peeking node, the peeking node forgets about the gap and does not generate a duplicate retransmit request. (If the peeking node subsequently fails to receive the lost message, later link state messages will cause it to rediscover and confirm the gap and send another NACK.) 4) Message gap "equality" is now determined by the start of the gap only. This is sufficient to deal with the most common cases of message loss, and eliminates the need for complex end of gap computations. 5) A peeking node no longer tries to determine whether it should send a complementary NACK, since the most common cases of message loss don't require it to be sent. Consequently, the node no longer examines the "broadcast tag" field of a NACK message when peeking. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 226 +++++++++++++++++++------------------------------------ net/tipc/bcast.h | 2 +- net/tipc/link.c | 21 ++++-- net/tipc/node.c | 4 +- net/tipc/node.h | 12 +-- 5 files changed, 100 insertions(+), 165 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index facc216c6a9..1f3b1607d9d 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void) return bcl->fsm_msg_cnt; } -/** - * bclink_set_gap - set gap according to contents of current deferred pkt queue - * - * Called with 'node' locked, bc_lock unlocked - */ - -static void bclink_set_gap(struct tipc_node *n_ptr) -{ - struct sk_buff *buf = n_ptr->bclink.deferred_head; - - n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = - mod(n_ptr->bclink.last_in); - if (unlikely(buf != NULL)) - n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); -} - -/** - * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment - * - * This mechanism endeavours to prevent all nodes in network from trying - * to ACK or NACK at the same time. - * - * Note: TIPC uses a different trigger to distribute ACKs than it does to - * distribute NACKs, but tries to use the same spacing (divide by 16). - */ - -static int bclink_ack_allowed(u32 n) +static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) { - return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; + node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? + seqno : node->bclink.last_sent; } -/** +/* * tipc_bclink_retransmit_to - get most recent node to request retransmission * * Called with bc_lock locked @@ -300,44 +275,56 @@ exit: spin_unlock_bh(&bc_lock); } -/** - * bclink_send_ack - unicast an ACK msg +/* + * tipc_bclink_update_link_state - update broadcast link state * * tipc_net_lock and node lock set */ -static void bclink_send_ack(struct tipc_node *n_ptr) +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) { - struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; + struct sk_buff *buf; - if (l_ptr != NULL) - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); -} + /* Ignore "stale" link state info */ -/** - * bclink_send_nack- broadcast a NACK msg - * - * tipc_net_lock and node lock set - */ + if (less_eq(last_sent, n_ptr->bclink.last_in)) + return; -static void bclink_send_nack(struct tipc_node *n_ptr) -{ - struct sk_buff *buf; - struct tipc_msg *msg; + /* Update link synchronization state; quit if in sync */ + + bclink_update_last_sent(n_ptr, last_sent); + + if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) + return; + + /* Update out-of-sync state; quit if loss is still unconfirmed */ - if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) + if ((++n_ptr->bclink.oos_state) == 1) { + if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) + return; + n_ptr->bclink.oos_state++; + } + + /* Don't NACK if one has been recently sent (or seen) */ + + if (n_ptr->bclink.oos_state & 0x1) return; + /* Send NACK */ + buf = tipc_buf_acquire(INT_H_SIZE); if (buf) { - msg = buf_msg(buf); + struct tipc_msg *msg = buf_msg(buf); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, n_ptr->addr); + INT_H_SIZE, n_ptr->addr); msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tipc_net_id); - msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); - msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); - msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); + msg_set_bcast_ack(msg, n_ptr->bclink.last_in); + msg_set_bcgap_after(msg, n_ptr->bclink.last_in); + msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head + ? buf_seqno(n_ptr->bclink.deferred_head) - 1 + : n_ptr->bclink.last_sent); msg_set_bcast_tag(msg, tipc_own_tag); spin_lock_bh(&bc_lock); @@ -346,96 +333,37 @@ static void bclink_send_nack(struct tipc_node *n_ptr) spin_unlock_bh(&bc_lock); buf_discard(buf); - /* - * Ensure we doesn't send another NACK msg to the node - * until 16 more deferred messages arrive from it - * (i.e. helps prevent all nodes from NACK'ing at same time) - */ - - n_ptr->bclink.nack_sync = tipc_own_tag; + n_ptr->bclink.oos_state++; } } -/** - * tipc_bclink_check_gap - send a NACK if a sequence gap exists +/* + * bclink_peek_nack - monitor retransmission requests sent by other nodes * - * tipc_net_lock and node lock set - */ - -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) -{ - if (!n_ptr->bclink.supported || - less_eq(last_sent, mod(n_ptr->bclink.last_in))) - return; - - bclink_set_gap(n_ptr); - if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) - n_ptr->bclink.gap_to = last_sent; - bclink_send_nack(n_ptr); -} - -/** - * tipc_bclink_peek_nack - process a NACK msg meant for another node + * Delay any upcoming NACK by this node if another node has already + * requested the first message this node is going to ask for. * * Only tipc_net_lock set. */ -static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) +static void bclink_peek_nack(struct tipc_msg *msg) { - struct tipc_node *n_ptr = tipc_node_find(dest); - u32 my_after, my_to; + struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); - if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) + if (unlikely(!n_ptr)) return; + tipc_node_lock(n_ptr); - /* - * Modify gap to suppress unnecessary NACKs from this node - */ - my_after = n_ptr->bclink.gap_after; - my_to = n_ptr->bclink.gap_to; - - if (less_eq(gap_after, my_after)) { - if (less(my_after, gap_to) && less(gap_to, my_to)) - n_ptr->bclink.gap_after = gap_to; - else if (less_eq(my_to, gap_to)) - n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; - } else if (less_eq(gap_after, my_to)) { - if (less_eq(my_to, gap_to)) - n_ptr->bclink.gap_to = gap_after; - } else { - /* - * Expand gap if missing bufs not in deferred queue: - */ - struct sk_buff *buf = n_ptr->bclink.deferred_head; - u32 prev = n_ptr->bclink.gap_to; - for (; buf; buf = buf->next) { - u32 seqno = buf_seqno(buf); + if (n_ptr->bclink.supported && + (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && + (n_ptr->bclink.last_in == msg_bcgap_after(msg))) + n_ptr->bclink.oos_state = 2; - if (mod(seqno - prev) != 1) { - buf = NULL; - break; - } - if (seqno == gap_after) - break; - prev = seqno; - } - if (buf == NULL) - n_ptr->bclink.gap_to = gap_after; - } - /* - * Some nodes may send a complementary NACK now: - */ - if (bclink_ack_allowed(sender_tag + 1)) { - if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { - bclink_send_nack(n_ptr); - bclink_set_gap(n_ptr); - } - } tipc_node_unlock(n_ptr); } -/** +/* * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster */ @@ -505,10 +433,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) spin_unlock_bh(&bc_lock); } else { tipc_node_unlock(node); - tipc_bclink_peek_nack(msg_destnode(msg), - msg_bcast_tag(msg), - msg_bcgap_after(msg), - msg_bcgap_to(msg)); + bclink_peek_nack(msg); } goto exit; } @@ -519,16 +444,28 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) next_in = mod(node->bclink.last_in + 1); if (likely(seqno == next_in)) { + bclink_update_last_sent(node, seqno); receive: + node->bclink.last_in = seqno; + node->bclink.oos_state = 0; + spin_lock_bh(&bc_lock); bcl->stats.recv_info++; - node->bclink.last_in++; - bclink_set_gap(node); - if (unlikely(bclink_ack_allowed(seqno))) { - bclink_send_ack(node); + + /* + * Unicast an ACK periodically, ensuring that + * all nodes in the cluster don't ACK at the same time + */ + + if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { + tipc_link_send_proto_msg( + node->active_links[node->addr & 1], + STATE_MSG, 0, 0, 0, 0, 0); bcl->stats.sent_acks++; } + /* Deliver message to destination */ + if (likely(msg_isdata(msg))) { spin_unlock_bh(&bc_lock); tipc_node_unlock(node); @@ -567,9 +504,14 @@ receive: if (unlikely(!tipc_node_is_up(node))) goto unlock; - if (!node->bclink.deferred_head) + if (node->bclink.last_in == node->bclink.last_sent) goto unlock; + if (!node->bclink.deferred_head) { + node->bclink.oos_state = 1; + goto unlock; + } + msg = buf_msg(node->bclink.deferred_head); seqno = msg_seqno(msg); next_in = mod(next_in + 1); @@ -580,31 +522,19 @@ receive: buf = node->bclink.deferred_head; node->bclink.deferred_head = buf->next; + node->bclink.deferred_size--; goto receive; } /* Handle out-of-sequence broadcast message */ if (less(next_in, seqno)) { - u32 gap_after = node->bclink.gap_after; - u32 gap_to = node->bclink.gap_to; - deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, &node->bclink.deferred_tail, buf); - if (deferred) { - node->bclink.nack_sync++; - if (seqno == mod(gap_after + 1)) - node->bclink.gap_after = seqno; - else if (less(gap_after, seqno) && less(seqno, gap_to)) - node->bclink.gap_to = seqno; - } + node->bclink.deferred_size += deferred; + bclink_update_last_sent(node, seqno); buf = NULL; - if (bclink_ack_allowed(node->bclink.nack_sync)) { - if (gap_to != gap_after) - bclink_send_nack(node); - bclink_set_gap(node); - } } else deferred = 0; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index b009666c60b..5571394098f 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -96,7 +96,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf); void tipc_bclink_recv_pkt(struct sk_buff *buf); u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno); +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent); int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); diff --git a/net/tipc/link.c b/net/tipc/link.c index 1150ba5a648..cce953723dd 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1501,14 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, tipc_node_lock(n_ptr); tipc_addr_string_fill(addr_string, n_ptr->addr); - info("Multicast link info for %s\n", addr_string); + info("Broadcast link info for %s\n", addr_string); info("Supportable: %d, ", n_ptr->bclink.supportable); info("Supported: %d, ", n_ptr->bclink.supported); info("Acked: %u\n", n_ptr->bclink.acked); info("Last in: %u, ", n_ptr->bclink.last_in); - info("Gap after: %u, ", n_ptr->bclink.gap_after); - info("Gap to: %u\n", n_ptr->bclink.gap_to); - info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync); + info("Oos state: %u, ", n_ptr->bclink.oos_state); + info("Last sent: %u\n", n_ptr->bclink.last_sent); tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); @@ -1974,7 +1973,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, msg_set_type(msg, msg_typ); msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); - msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); + msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); if (msg_typ == STATE_MSG) { @@ -2133,8 +2132,12 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) /* Synchronize broadcast link info, if not done previously */ - if (!tipc_node_is_up(l_ptr->owner)) - l_ptr->owner->bclink.last_in = msg_last_bcast(msg); + if (!tipc_node_is_up(l_ptr->owner)) { + l_ptr->owner->bclink.last_sent = + l_ptr->owner->bclink.last_in = + msg_last_bcast(msg); + l_ptr->owner->bclink.oos_state = 0; + } l_ptr->peer_session = msg_session(msg); l_ptr->peer_bearer_id = msg_bearer_id(msg); @@ -2181,7 +2184,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) /* Protocol message before retransmits, reduce loss risk */ - tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); + if (l_ptr->owner->bclink.supported) + tipc_bclink_update_link_state(l_ptr->owner, + msg_last_bcast(msg)); if (rec_gap || (msg_probe(msg))) { tipc_link_send_proto_msg(l_ptr, STATE_MSG, diff --git a/net/tipc/node.c b/net/tipc/node.c index 9196f943b83..6d8bdfd95cd 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -339,12 +339,12 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Flush broadcast link info associated with lost node */ if (n_ptr->bclink.supported) { - n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0; while (n_ptr->bclink.deferred_head) { struct sk_buff *buf = n_ptr->bclink.deferred_head; n_ptr->bclink.deferred_head = buf->next; buf_discard(buf); } + n_ptr->bclink.deferred_size = 0; if (n_ptr->bclink.defragm) { buf_discard(n_ptr->bclink.defragm); @@ -450,7 +450,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) read_lock_bh(&tipc_net_lock); - /* Get space for all unicast links + multicast link */ + /* Get space for all unicast links + broadcast link */ payload_size = TLV_SPACE(sizeof(link_info)) * (atomic_read(&tipc_num_links) + 1); diff --git a/net/tipc/node.h b/net/tipc/node.h index 90689f48761..c88ce64f8a3 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -66,9 +66,9 @@ * @supported: non-zero if node supports TIPC b'cast capability * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node - * @gap_after: sequence # of last message not requiring a NAK request - * @gap_to: sequence # of last message requiring a NAK request - * @nack_sync: counter that determines when NAK requests should be sent + * @last_sent: sequence # of last b'cast message sent by node + * @oos_state: state tracker for handling OOS b'cast messages + * @deferred_size: number of OOS b'cast messages in deferred queue * @deferred_head: oldest OOS b'cast message received from node * @deferred_tail: newest OOS b'cast message received from node * @defragm: list of partially reassembled b'cast message fragments from node @@ -91,9 +91,9 @@ struct tipc_node { u8 supported; u32 acked; u32 last_in; - u32 gap_after; - u32 gap_to; - u32 nack_sync; + u32 last_sent; + u32 oos_state; + u32 deferred_size; struct sk_buff *deferred_head; struct sk_buff *deferred_tail; struct sk_buff *defragm; -- cgit v1.2.3-70-g09d2 From b76b27cad5ade1d483d4b94df6b35976bccf1055 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Thu, 27 Oct 2011 16:31:26 -0400 Subject: tipc: Prevent loss of fragmented messages over unicast links Modifies unicast link endpoint logic so an incoming fragmented message is not lost if reassembly cannot begin because there is no buffer big enough to hold the entire reassembled message. The link endpoint now ignores the first fragment completely, which causes the sending node to retransmit the first fragment so that reassembly can be re-attempted. Previously, the sender would have had no reason to retransmit the 1st fragment, so we would never have a chance to re-try the allocation. Signed-off-by: Allan Stephens --- net/tipc/link.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index cce953723dd..d8b0a22367b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1774,6 +1774,7 @@ protocol_check: head = link_insert_deferred_queue(l_ptr, head); if (likely(msg_is_dest(msg, tipc_own_addr))) { + int ret; deliver: if (likely(msg_isdata(msg))) { tipc_node_unlock(n_ptr); @@ -1798,11 +1799,15 @@ deliver: continue; case MSG_FRAGMENTER: l_ptr->stats.recv_fragments++; - if (tipc_link_recv_fragment(&l_ptr->defragm_buf, - &buf, &msg)) { + ret = tipc_link_recv_fragment( + &l_ptr->defragm_buf, + &buf, &msg); + if (ret == 1) { l_ptr->stats.recv_fragmented++; goto deliver; } + if (ret == -1) + l_ptr->next_in_no--; break; case CHANGEOVER_PROTOCOL: type = msg_type(msg); @@ -2632,7 +2637,9 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, set_fragm_size(pbuf, fragm_sz); set_expected_frags(pbuf, exp_fragm_cnt - 1); } else { - warn("Link unable to reassemble fragmented message\n"); + dbg("Link unable to reassemble fragmented message\n"); + buf_discard(fbuf); + return -1; } buf_discard(fbuf); return 0; -- cgit v1.2.3-70-g09d2 From c74a46110fd5f97bf9299e68e9ed0453bdacb181 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 2 Nov 2011 15:08:44 -0400 Subject: tipc: Remove duplicate check of message destination node Eliminates a check in the processing of TIPC messages arriving from off node that ensures the message is destined for this node, since this check duplicates an earlier check. (The check would be necessary if TIPC needed to be able to route incoming messages to another node, but the elimination of multi-cluster support means that this never happens and all incoming messages are consumed by the receiving node.) Note: This change involves the elimination of a single "if" statement with a large "then" clause; consequently, a significant number of lines end up getting re-indented. In addition, a simple message header access routine that is no longer referenced is eliminated. However, the only functional change is the elimination of the single check described above. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 93 ++++++++++++++++++++++++++++----------------------------- net/tipc/msg.h | 5 ---- 2 files changed, 46 insertions(+), 52 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index d8b0a22367b..6cc78a97012 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1773,57 +1773,56 @@ protocol_check: if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); - if (likely(msg_is_dest(msg, tipc_own_addr))) { - int ret; deliver: - if (likely(msg_isdata(msg))) { - tipc_node_unlock(n_ptr); - tipc_port_recv_msg(buf); - continue; + if (likely(msg_isdata(msg))) { + tipc_node_unlock(n_ptr); + tipc_port_recv_msg(buf); + continue; + } + switch (msg_user(msg)) { + int ret; + case MSG_BUNDLER: + l_ptr->stats.recv_bundles++; + l_ptr->stats.recv_bundled += + msg_msgcnt(msg); + tipc_node_unlock(n_ptr); + tipc_link_recv_bundle(buf); + continue; + case NAME_DISTRIBUTOR: + tipc_node_unlock(n_ptr); + tipc_named_recv(buf); + continue; + case CONN_MANAGER: + tipc_node_unlock(n_ptr); + tipc_port_recv_proto_msg(buf); + continue; + case MSG_FRAGMENTER: + l_ptr->stats.recv_fragments++; + ret = tipc_link_recv_fragment( + &l_ptr->defragm_buf, + &buf, &msg); + if (ret == 1) { + l_ptr->stats.recv_fragmented++; + goto deliver; } - switch (msg_user(msg)) { - case MSG_BUNDLER: - l_ptr->stats.recv_bundles++; - l_ptr->stats.recv_bundled += - msg_msgcnt(msg); - tipc_node_unlock(n_ptr); - tipc_link_recv_bundle(buf); - continue; - case NAME_DISTRIBUTOR: - tipc_node_unlock(n_ptr); - tipc_named_recv(buf); - continue; - case CONN_MANAGER: - tipc_node_unlock(n_ptr); - tipc_port_recv_proto_msg(buf); - continue; - case MSG_FRAGMENTER: - l_ptr->stats.recv_fragments++; - ret = tipc_link_recv_fragment( - &l_ptr->defragm_buf, - &buf, &msg); - if (ret == 1) { - l_ptr->stats.recv_fragmented++; + if (ret == -1) + l_ptr->next_in_no--; + break; + case CHANGEOVER_PROTOCOL: + type = msg_type(msg); + if (link_recv_changeover_msg(&l_ptr, + &buf)) { + msg = buf_msg(buf); + seq_no = msg_seqno(msg); + if (type == ORIGINAL_MSG) goto deliver; - } - if (ret == -1) - l_ptr->next_in_no--; - break; - case CHANGEOVER_PROTOCOL: - type = msg_type(msg); - if (link_recv_changeover_msg(&l_ptr, &buf)) { - msg = buf_msg(buf); - seq_no = msg_seqno(msg); - if (type == ORIGINAL_MSG) - goto deliver; - goto protocol_check; - } - break; - default: - buf_discard(buf); - buf = NULL; - break; + goto protocol_check; } + break; + default: + buf_discard(buf); + buf = NULL; + break; } tipc_node_unlock(n_ptr); tipc_net_route_msg(buf); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 2ec13b73181..eba524e34a6 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -384,11 +384,6 @@ static inline void msg_set_destnode(struct tipc_msg *m, u32 a) msg_set_word(m, 7, a); } -static inline int msg_is_dest(struct tipc_msg *m, u32 d) -{ - return msg_short(m) || (msg_destnode(m) == d); -} - static inline u32 msg_nametype(struct tipc_msg *m) { return msg_word(m, 8); -- cgit v1.2.3-70-g09d2 From 5f6d9123f1c7ef7297b0da1620988fe16c738e75 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Fri, 4 Nov 2011 13:24:29 -0400 Subject: tipc: Eliminate trivial buffer manipulation helper routines Gets rid of two inlined routines that simply call existing sk_buff manipulation routines, since there is no longer any extra processing done by the helper routines. Note that these changes are essentially cosmetic in nature, and have no impact on the actual operation of TIPC. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/bcast.c | 12 ++++---- net/tipc/core.h | 24 ---------------- net/tipc/discover.c | 6 ++-- net/tipc/link.c | 80 +++++++++++++++++++++++++-------------------------- net/tipc/msg.c | 2 +- net/tipc/name_distr.c | 4 +-- net/tipc/net.c | 4 +-- net/tipc/node.c | 4 +-- net/tipc/port.c | 14 ++++----- net/tipc/socket.c | 8 +++--- 10 files changed, 67 insertions(+), 91 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 41ecf313073..e00441a2092 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -256,7 +256,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) if (bcbuf_acks(crs) == 0) { bcl->first_out = next; bcl->out_queue_size--; - buf_discard(crs); + kfree_skb(crs); released = 1; } crs = next; @@ -330,7 +330,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) tipc_bearer_send(&bcbearer->bearer, buf, NULL); bcl->stats.sent_nacks++; spin_unlock_bh(&bc_lock); - buf_discard(buf); + kfree_skb(buf); n_ptr->bclink.oos_state++; } @@ -374,7 +374,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf) if (!bclink->bcast_nodes.count) { res = msg_data_sz(buf_msg(buf)); - buf_discard(buf); + kfree_skb(buf); goto exit; } @@ -480,7 +480,7 @@ receive: if (likely(msg_mcast(msg))) tipc_port_recv_mcast(buf, NULL); else - buf_discard(buf); + kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { spin_lock_bh(&bc_lock); bclink_accept_pkt(node, seqno); @@ -513,7 +513,7 @@ receive: bclink_accept_pkt(node, seqno); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); - buf_discard(buf); + kfree_skb(buf); } buf = NULL; @@ -569,7 +569,7 @@ receive: unlock: tipc_node_unlock(node); exit: - buf_discard(buf); + kfree_skb(buf); } u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) diff --git a/net/tipc/core.h b/net/tipc/core.h index 1260b053bf2..aefe1869572 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -269,28 +269,4 @@ static inline struct tipc_msg *buf_msg(struct sk_buff *skb) extern struct sk_buff *tipc_buf_acquire(u32 size); -/** - * buf_discard - frees a TIPC message buffer - * @skb: message buffer - * - * Frees a message buffer. If passed NULL, just returns. - */ - -static inline void buf_discard(struct sk_buff *skb) -{ - kfree_skb(skb); -} - -/** - * buf_linearize - convert a TIPC message buffer into a single contiguous piece - * @skb: message buffer - * - * Returns 0 on success. - */ - -static inline int buf_linearize(struct sk_buff *skb) -{ - return skb_linearize(skb); -} - #endif diff --git a/net/tipc/discover.c b/net/tipc/discover.c index f5f9bf7a043..c630a21b2be 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -135,7 +135,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) media_addr.broadcast = 1; b_ptr->media->msg2addr(&media_addr, msg_media_addr(msg)); - buf_discard(buf); + kfree_skb(buf); /* Ensure message from node is valid and communication is permitted */ if (net_id != tipc_net_id) @@ -250,7 +250,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); if (rbuf) { b_ptr->media->send_msg(rbuf, b_ptr, &media_addr); - buf_discard(rbuf); + kfree_skb(rbuf); } } @@ -396,7 +396,7 @@ void tipc_disc_delete(struct tipc_link_req *req) { k_cancel_timer(&req->timer); k_term_timer(&req->timer); - buf_discard(req->buf); + kfree_skb(req->buf); kfree(req); } diff --git a/net/tipc/link.c b/net/tipc/link.c index 6cc78a97012..f16e65dd50c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -484,7 +484,7 @@ static void link_release_outqueue(struct tipc_link *l_ptr) while (buf) { next = buf->next; - buf_discard(buf); + kfree_skb(buf); buf = next; } l_ptr->first_out = NULL; @@ -503,7 +503,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) while (buf) { next = buf->next; - buf_discard(buf); + kfree_skb(buf); buf = next; } l_ptr->defragm_buf = NULL; @@ -522,20 +522,20 @@ void tipc_link_stop(struct tipc_link *l_ptr) buf = l_ptr->oldest_deferred_in; while (buf) { next = buf->next; - buf_discard(buf); + kfree_skb(buf); buf = next; } buf = l_ptr->first_out; while (buf) { next = buf->next; - buf_discard(buf); + kfree_skb(buf); buf = next; } tipc_link_reset_fragments(l_ptr); - buf_discard(l_ptr->proto_msg_queue); + kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; } @@ -571,12 +571,12 @@ void tipc_link_reset(struct tipc_link *l_ptr) /* Clean up all queues: */ link_release_outqueue(l_ptr); - buf_discard(l_ptr->proto_msg_queue); + kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; buf = l_ptr->oldest_deferred_in; while (buf) { struct sk_buff *next = buf->next; - buf_discard(buf); + kfree_skb(buf); buf = next; } if (!list_empty(&l_ptr->waiting_ports)) @@ -810,7 +810,7 @@ static int link_bundle_buf(struct tipc_link *l_ptr, skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); msg_set_size(bundler_msg, to_pos + size); msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); - buf_discard(buf); + kfree_skb(buf); l_ptr->stats.sent_bundled++; return 1; } @@ -878,10 +878,10 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) if (unlikely(queue_size >= queue_limit)) { if (imp <= TIPC_CRITICAL_IMPORTANCE) { link_schedule_port(l_ptr, msg_origport(msg), size); - buf_discard(buf); + kfree_skb(buf); return -ELINKCONG; } - buf_discard(buf); + kfree_skb(buf); if (imp > CONN_MANAGER) { warn("Resetting link <%s>, send queue full", l_ptr->name); tipc_link_reset(l_ptr); @@ -968,10 +968,10 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) if (l_ptr) res = tipc_link_send_buf(l_ptr, buf); else - buf_discard(buf); + kfree_skb(buf); tipc_node_unlock(n_ptr); } else { - buf_discard(buf); + kfree_skb(buf); } read_unlock_bh(&tipc_net_lock); return res; @@ -1018,7 +1018,7 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest) list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { list_del((struct list_head *)buf); - buf_discard(buf); + kfree_skb(buf); } } @@ -1262,7 +1262,7 @@ again: error: for (; buf_chain; buf_chain = buf) { buf = buf_chain->next; - buf_discard(buf_chain); + kfree_skb(buf_chain); } return -EFAULT; } @@ -1316,7 +1316,7 @@ error: tipc_node_unlock(node); for (; buf_chain; buf_chain = buf) { buf = buf_chain->next; - buf_discard(buf_chain); + kfree_skb(buf_chain); } goto again; } @@ -1324,7 +1324,7 @@ error: reject: for (; buf_chain; buf_chain = buf) { buf = buf_chain->next; - buf_discard(buf_chain); + kfree_skb(buf_chain); } return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, total_len, TIPC_ERR_NO_NODE); @@ -1390,7 +1390,7 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr) msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { l_ptr->unacked_window = 0; - buf_discard(buf); + kfree_skb(buf); l_ptr->proto_msg_queue = NULL; return 0; } else { @@ -1679,7 +1679,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) /* Ensure message data is a single contiguous unit */ - if (unlikely(buf_linearize(buf))) + if (unlikely(skb_linearize(buf))) goto cont; /* Handle arrival of a non-unicast link message */ @@ -1744,7 +1744,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) less_eq(buf_seqno(crs), ackd)) { struct sk_buff *next = crs->next; - buf_discard(crs); + kfree_skb(crs); crs = next; released++; } @@ -1820,7 +1820,7 @@ deliver: } break; default: - buf_discard(buf); + kfree_skb(buf); buf = NULL; break; } @@ -1851,7 +1851,7 @@ deliver: } tipc_node_unlock(n_ptr); cont: - buf_discard(buf); + kfree_skb(buf); } read_unlock_bh(&tipc_net_lock); } @@ -1891,7 +1891,7 @@ u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, u32 curr_seqno = buf_seqno(queue_buf); if (seq_no == curr_seqno) { - buf_discard(buf); + kfree_skb(buf); return 0; } @@ -1932,7 +1932,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, if (less(seq_no, mod(l_ptr->next_in_no))) { l_ptr->stats.duplicates++; - buf_discard(buf); + kfree_skb(buf); return; } @@ -1961,7 +1961,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, /* Discard any previous message that was deferred due to congestion */ if (l_ptr->proto_msg_queue) { - buf_discard(l_ptr->proto_msg_queue); + kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; } @@ -2060,7 +2060,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, /* Discard message if it was sent successfully */ l_ptr->unacked_window = 0; - buf_discard(buf); + kfree_skb(buf); } /* @@ -2204,7 +2204,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) break; } exit: - buf_discard(buf); + kfree_skb(buf); } @@ -2402,7 +2402,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr, warn("Link changeover error, duplicate msg dropped\n"); goto exit; } - buf_discard(tunnel_buf); + kfree_skb(tunnel_buf); return 1; } @@ -2434,7 +2434,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr, } else { *buf = buf_extract(tunnel_buf, INT_H_SIZE); if (*buf != NULL) { - buf_discard(tunnel_buf); + kfree_skb(tunnel_buf); return 1; } else { warn("Link changeover error, original msg dropped\n"); @@ -2442,7 +2442,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr, } exit: *buf = NULL; - buf_discard(tunnel_buf); + kfree_skb(tunnel_buf); return 0; } @@ -2464,7 +2464,7 @@ void tipc_link_recv_bundle(struct sk_buff *buf) pos += align(msg_size(buf_msg(obuf))); tipc_net_route_msg(obuf); } - buf_discard(buf); + kfree_skb(buf); } /* @@ -2513,11 +2513,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) } fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); if (fragm == NULL) { - buf_discard(buf); + kfree_skb(buf); while (buf_chain) { buf = buf_chain; buf_chain = buf_chain->next; - buf_discard(buf); + kfree_skb(buf); } return -ENOMEM; } @@ -2534,7 +2534,7 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) crs += fragm_sz; msg_set_type(&fragm_hdr, FRAGMENT); } - buf_discard(buf); + kfree_skb(buf); /* Append chain of fragments to send queue & send them */ @@ -2621,7 +2621,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, if (msg_type(imsg) == TIPC_MCAST_MSG) max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE; if (msg_size(imsg) > max) { - buf_discard(fbuf); + kfree_skb(fbuf); return 0; } pbuf = tipc_buf_acquire(msg_size(imsg)); @@ -2637,10 +2637,10 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, set_expected_frags(pbuf, exp_fragm_cnt - 1); } else { dbg("Link unable to reassemble fragmented message\n"); - buf_discard(fbuf); + kfree_skb(fbuf); return -1; } - buf_discard(fbuf); + kfree_skb(fbuf); return 0; } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) { u32 dsz = msg_data_sz(fragm); @@ -2649,7 +2649,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, u32 exp_frags = get_expected_frags(pbuf) - 1; skb_copy_to_linear_data_offset(pbuf, crs, msg_data(fragm), dsz); - buf_discard(fbuf); + kfree_skb(fbuf); /* Is message complete? */ @@ -2666,7 +2666,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, set_expected_frags(pbuf, exp_frags); return 0; } - buf_discard(fbuf); + kfree_skb(fbuf); return 0; } @@ -2697,7 +2697,7 @@ static void link_check_defragm_bufs(struct tipc_link *l_ptr) prev->next = buf->next; else l_ptr->defragm_buf = buf->next; - buf_discard(buf); + kfree_skb(buf); } buf = next; } @@ -3072,7 +3072,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area), (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO); if (!str_len) { - buf_discard(buf); + kfree_skb(buf); return tipc_cfg_reply_error_string("link not found"); } diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 3e4d3e29be6..e3afe162c0a 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -106,7 +106,7 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, if (likely(res)) return dsz; - buf_discard(*buf); + kfree_skb(*buf); *buf = NULL; return -EFAULT; } diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index acecfda82f3..d57da615961 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -120,7 +120,7 @@ static void named_cluster_distribute(struct sk_buff *buf) } } - buf_discard(buf); + kfree_skb(buf); } /** @@ -312,7 +312,7 @@ void tipc_named_recv(struct sk_buff *buf) item++; } write_unlock_bh(&tipc_nametbl_lock); - buf_discard(buf); + kfree_skb(buf); } /** diff --git a/net/tipc/net.c b/net/tipc/net.c index 61afee7e829..2abd4be4933 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -117,7 +117,7 @@ static void net_route_named_msg(struct sk_buff *buf) u32 dport; if (!msg_named(msg)) { - buf_discard(buf); + kfree_skb(buf); return; } @@ -161,7 +161,7 @@ void tipc_net_route_msg(struct sk_buff *buf) tipc_port_recv_proto_msg(buf); break; default: - buf_discard(buf); + kfree_skb(buf); } return; } diff --git a/net/tipc/node.c b/net/tipc/node.c index 1790f503f57..24c42f7568a 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -353,12 +353,12 @@ static void node_lost_contact(struct tipc_node *n_ptr) while (n_ptr->bclink.deferred_head) { struct sk_buff *buf = n_ptr->bclink.deferred_head; n_ptr->bclink.deferred_head = buf->next; - buf_discard(buf); + kfree_skb(buf); } n_ptr->bclink.deferred_size = 0; if (n_ptr->bclink.defragm) { - buf_discard(n_ptr->bclink.defragm); + kfree_skb(n_ptr->bclink.defragm); n_ptr->bclink.defragm = NULL; } diff --git a/net/tipc/port.c b/net/tipc/port.c index ba3268b8da4..c4b5a347037 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -116,13 +116,13 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, ibuf = skb_copy(buf, GFP_ATOMIC); if (ibuf == NULL) { tipc_port_list_free(&dports); - buf_discard(buf); + kfree_skb(buf); return -ENOMEM; } } res = tipc_bclink_send_msg(buf); if ((res < 0) && (dports.count != 0)) - buf_discard(ibuf); + kfree_skb(ibuf); } else { ibuf = buf; } @@ -187,7 +187,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) } } exit: - buf_discard(buf); + kfree_skb(buf); tipc_port_list_free(dp); } @@ -420,7 +420,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) else tipc_link_send(rbuf, src_node, msg_link_selector(rmsg)); exit: - buf_discard(buf); + kfree_skb(buf); return data_sz; } @@ -568,7 +568,7 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf) tipc_port_unlock(p_ptr); exit: tipc_net_route_msg(r_buf); - buf_discard(buf); + kfree_skb(buf); } static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id) @@ -759,7 +759,7 @@ static void port_dispatcher_sigh(void *dummy) } } if (buf) - buf_discard(buf); + kfree_skb(buf); buf = next; continue; err: @@ -813,7 +813,7 @@ err: } } if (buf) - buf_discard(buf); + kfree_skb(buf); buf = next; continue; reject: diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d3227f2a216..29e957f6445 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -126,7 +126,7 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0); static void advance_rx_queue(struct sock *sk) { - buf_discard(__skb_dequeue(&sk->sk_receive_queue)); + kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); atomic_dec(&tipc_queue_size); } @@ -142,7 +142,7 @@ static void discard_rx_queue(struct sock *sk) while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { atomic_dec(&tipc_queue_size); - buf_discard(buf); + kfree_skb(buf); } } @@ -288,7 +288,7 @@ static int release(struct socket *sock) break; atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) - buf_discard(buf); + kfree_skb(buf); else { if ((sock->state == SS_CONNECTING) || (sock->state == SS_CONNECTED)) { @@ -1615,7 +1615,7 @@ restart: if (buf) { atomic_dec(&tipc_queue_size); if (TIPC_SKB_CB(buf)->handle != 0) { - buf_discard(buf); + kfree_skb(buf); goto restart; } tipc_disconnect(tport->ref); -- cgit v1.2.3-70-g09d2 From e8ec1ae756de320644c69194898c53d247925586 Mon Sep 17 00:00:00 2001 From: Allan Stephens Date: Wed, 9 Nov 2011 17:38:20 -0500 Subject: tipc: Eliminate obsolete code for re-sending a message Removes code that updated the "previous node" field of an out-going message over TIPC's links. Such updating is unnecessary since the removal of the prototype multi-cluster capability means that all outgoing messages are generated locally and already have this field populated correctly. Signed-off-by: Allan Stephens Signed-off-by: Paul Gortmaker --- net/tipc/link.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'net/tipc/link.c') diff --git a/net/tipc/link.c b/net/tipc/link.c index f16e65dd50c..b4b9b30167a 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -871,8 +871,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) u32 queue_limit = l_ptr->queue_limit[imp]; u32 max_packet = l_ptr->max_pkt; - msg_set_prevnode(msg, tipc_own_addr); /* If routed message */ - /* Match msg importance against queue limits: */ if (unlikely(queue_size >= queue_limit)) { -- cgit v1.2.3-70-g09d2