diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/Makefile | 2 | ||||
-rw-r--r-- | net/tipc/bcast.c | 107 | ||||
-rw-r--r-- | net/tipc/bcast.h | 7 | ||||
-rw-r--r-- | net/tipc/config.c | 4 | ||||
-rw-r--r-- | net/tipc/core.c | 9 | ||||
-rw-r--r-- | net/tipc/core.h | 6 | ||||
-rw-r--r-- | net/tipc/link.c | 887 | ||||
-rw-r--r-- | net/tipc/link.h | 14 | ||||
-rw-r--r-- | net/tipc/msg.c | 405 | ||||
-rw-r--r-- | net/tipc/msg.h | 40 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 216 | ||||
-rw-r--r-- | net/tipc/name_distr.h | 3 | ||||
-rw-r--r-- | net/tipc/name_table.c | 9 | ||||
-rw-r--r-- | net/tipc/net.c | 66 | ||||
-rw-r--r-- | net/tipc/net.h | 2 | ||||
-rw-r--r-- | net/tipc/node.c | 133 | ||||
-rw-r--r-- | net/tipc/node.h | 25 | ||||
-rw-r--r-- | net/tipc/node_subscr.c | 6 | ||||
-rw-r--r-- | net/tipc/port.c | 898 | ||||
-rw-r--r-- | net/tipc/port.h | 237 | ||||
-rw-r--r-- | net/tipc/ref.c | 266 | ||||
-rw-r--r-- | net/tipc/ref.h | 48 | ||||
-rw-r--r-- | net/tipc/socket.c | 1383 | ||||
-rw-r--r-- | net/tipc/socket.h | 41 | ||||
-rw-r--r-- | net/tipc/subscr.c | 1 | ||||
-rw-r--r-- | net/tipc/sysctl.c | 7 |
26 files changed, 2116 insertions, 2706 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile index a080c66d819..b8a13caad59 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -7,7 +7,7 @@ obj-$(CONFIG_TIPC) := tipc.o tipc-y += addr.o bcast.o bearer.o config.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ - netlink.o node.o node_subscr.o port.o ref.o \ + netlink.o node.o node_subscr.o \ socket.o log.o eth_media.o server.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 55c6c9d3e1c..b8670bf262e 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@ /* * net/tipc/bcast.c: TIPC broadcast code * - * Copyright (c) 2004-2006, Ericsson AB + * Copyright (c) 2004-2006, 2014, Ericsson AB * Copyright (c) 2004, Intel Corporation. * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. @@ -37,7 +37,8 @@ #include "core.h" #include "link.h" -#include "port.h" +#include "socket.h" +#include "msg.h" #include "bcast.h" #include "name_distr.h" @@ -138,6 +139,11 @@ static void tipc_bclink_unlock(void) tipc_link_reset_all(node); } +uint tipc_bclink_get_mtu(void) +{ + return MAX_PKT_DEFAULT_MCAST; +} + void tipc_bclink_set_flags(unsigned int flags) { bclink->flags |= flags; @@ -220,6 +226,17 @@ static void bclink_retransmit_pkt(u32 after, u32 to) } /** + * tipc_bclink_wakeup_users - wake up pending users + * + * Called with no locks taken + */ +void tipc_bclink_wakeup_users(void) +{ + while (skb_queue_len(&bclink->link.waiting_sks)) + tipc_sk_rcv(skb_dequeue(&bclink->link.waiting_sks)); +} + +/** * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets * @n_ptr: node that sent acknowledgement info * @acked: broadcast sequence # that has been acknowledged @@ -293,8 +310,9 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) tipc_link_push_queue(bcl); bclink_set_last_sent(); } - if (unlikely(released && !list_empty(&bcl->waiting_ports))) - tipc_link_wakeup_ports(bcl, 0); + if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks))) + n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; + exit: tipc_bclink_unlock(); } @@ -382,30 +400,50 @@ static void bclink_peek_nack(struct tipc_msg *msg) tipc_node_unlock(n_ptr); } -/* - * tipc_bclink_xmit - broadcast a packet to all nodes in cluster +/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster + * and to identified node local sockets + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ int tipc_bclink_xmit(struct sk_buff *buf) { - int res; - - tipc_bclink_lock(); + int rc = 0; + int bc = 0; + struct sk_buff *clbuf; - if (!bclink->bcast_nodes.count) { - res = msg_data_sz(buf_msg(buf)); - kfree_skb(buf); - goto exit; + /* Prepare clone of message for local node */ + clbuf = tipc_msg_reassemble(buf); + if (unlikely(!clbuf)) { + kfree_skb_list(buf); + return -EHOSTUNREACH; } - res = __tipc_link_xmit(bcl, buf); - if (likely(res >= 0)) { - bclink_set_last_sent(); - bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += bcl->out_queue_size; + /* Broadcast to all other nodes */ + if (likely(bclink)) { + tipc_bclink_lock(); + if (likely(bclink->bcast_nodes.count)) { + rc = __tipc_link_xmit(bcl, buf); + if (likely(!rc)) { + bclink_set_last_sent(); + bcl->stats.queue_sz_counts++; + bcl->stats.accu_queue_sz += bcl->out_queue_size; + } + bc = 1; + } + tipc_bclink_unlock(); } -exit: - tipc_bclink_unlock(); - return res; + + if (unlikely(!bc)) + kfree_skb_list(buf); + + /* Deliver message clone */ + if (likely(!rc)) + tipc_sk_mcast_rcv(clbuf); + else + kfree_skb(clbuf); + + return rc; } /** @@ -443,7 +481,7 @@ void tipc_bclink_rcv(struct sk_buff *buf) struct tipc_node *node; u32 next_in; u32 seqno; - int deferred; + int deferred = 0; /* Screen out unwanted broadcast messages */ @@ -494,7 +532,7 @@ receive: tipc_bclink_unlock(); tipc_node_unlock(node); if (likely(msg_mcast(msg))) - tipc_port_mcast_rcv(buf, NULL); + tipc_sk_mcast_rcv(buf); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { @@ -573,8 +611,7 @@ receive: node->bclink.deferred_size += deferred; bclink_update_last_sent(node, seqno); buf = NULL; - } else - deferred = 0; + } tipc_bclink_lock(); @@ -611,6 +648,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, struct tipc_media_addr *unused2) { int bp_index; + struct tipc_msg *msg = buf_msg(buf); /* Prepare broadcast link message for reliable transmission, * if first time trying to send it; @@ -618,10 +656,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, * since they are sent in an unreliable manner and don't need it */ if (likely(!msg_non_seq(buf_msg(buf)))) { - struct tipc_msg *msg; - bcbuf_set_acks(buf, bclink->bcast_nodes.count); - msg = buf_msg(buf); msg_set_non_seq(msg, 1); msg_set_mc_netid(msg, tipc_net_id); bcl->stats.sent_info++; @@ -638,12 +673,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; - struct tipc_bearer *b = p; + struct tipc_bearer *bp[2] = {p, s}; + struct tipc_bearer *b = bp[msg_link_selector(msg)]; struct sk_buff *tbuf; if (!p) break; /* No more bearers to try */ - + if (!b) + b = p; tipc_nmap_diff(&bcbearer->remains, &b->nodes, &bcbearer->remains_new); if (bcbearer->remains_new.count == bcbearer->remains.count) @@ -660,13 +697,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); kfree_skb(tbuf); /* Bearer keeps a clone */ } - - /* Swap bearers for next packet */ - if (s) { - bcbearer->bpairs[bp_index].primary = s; - bcbearer->bpairs[bp_index].secondary = p; - } - if (bcbearer->remains_new.count == 0) break; /* All targets reached */ @@ -821,9 +851,10 @@ int tipc_bclink_init(void) sprintf(bcbearer->media.name, "tipc-broadcast"); spin_lock_init(&bclink->lock); - INIT_LIST_HEAD(&bcl->waiting_ports); + __skb_queue_head_init(&bcl->waiting_sks); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); + __skb_queue_head_init(&bclink->node.waiting_sks); bcl->owner = &bclink->node; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 00330c45df3..e7b0f85a82b 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -1,7 +1,7 @@ /* * net/tipc/bcast.h: Include file for TIPC broadcast code * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -89,7 +89,6 @@ void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); struct tipc_node *tipc_bclink_retransmit_to(void); void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); -int tipc_bclink_xmit(struct sk_buff *buf); void tipc_bclink_rcv(struct sk_buff *buf); u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); @@ -98,5 +97,7 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size); int tipc_bclink_reset_stats(void); int tipc_bclink_set_queue_limits(u32 limit); void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); - +uint tipc_bclink_get_mtu(void); +int tipc_bclink_xmit(struct sk_buff *buf); +void tipc_bclink_wakeup_users(void); #endif diff --git a/net/tipc/config.c b/net/tipc/config.c index 2b42403ad33..876f4c6a263 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -35,7 +35,7 @@ */ #include "core.h" -#include "port.h" +#include "socket.h" #include "name_table.h" #include "config.h" #include "server.h" @@ -266,7 +266,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area rep_tlv_buf = tipc_media_get_names(); break; case TIPC_CMD_SHOW_PORTS: - rep_tlv_buf = tipc_port_get_ports(); + rep_tlv_buf = tipc_sk_socks_show(); break; case TIPC_CMD_SHOW_STATS: rep_tlv_buf = tipc_show_stats(); diff --git a/net/tipc/core.c b/net/tipc/core.c index 676d18015dd..a5737b8407d 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -35,11 +35,10 @@ */ #include "core.h" -#include "ref.h" #include "name_table.h" #include "subscr.h" #include "config.h" -#include "port.h" +#include "socket.h" #include <linux/module.h> @@ -85,7 +84,7 @@ static void tipc_core_stop(void) tipc_netlink_stop(); tipc_subscr_stop(); tipc_nametbl_stop(); - tipc_ref_table_stop(); + tipc_sk_ref_table_stop(); tipc_socket_stop(); tipc_unregister_sysctl(); } @@ -99,7 +98,7 @@ static int tipc_core_start(void) get_random_bytes(&tipc_random, sizeof(tipc_random)); - err = tipc_ref_table_init(tipc_max_ports, tipc_random); + err = tipc_sk_ref_table_init(tipc_max_ports, tipc_random); if (err) goto out_reftbl; @@ -139,7 +138,7 @@ out_socket: out_netlink: tipc_nametbl_stop(); out_nametbl: - tipc_ref_table_stop(); + tipc_sk_ref_table_stop(); out_reftbl: return err; } diff --git a/net/tipc/core.h b/net/tipc/core.h index bb26ed1ee96..f773b148722 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -81,6 +81,7 @@ extern u32 tipc_own_addr __read_mostly; extern int tipc_max_ports __read_mostly; extern int tipc_net_id __read_mostly; extern int sysctl_tipc_rmem[3] __read_mostly; +extern int sysctl_tipc_named_timeout __read_mostly; /* * Other global variables @@ -187,8 +188,11 @@ static inline void k_term_timer(struct timer_list *timer) struct tipc_skb_cb { void *handle; - bool deferred; struct sk_buff *tail; + bool deferred; + bool wakeup_pending; + u16 chain_sz; + u16 chain_imp; }; #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) diff --git a/net/tipc/link.c b/net/tipc/link.c index ad2c57f5868..1db162aa64a 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -36,7 +36,6 @@ #include "core.h" #include "link.h" -#include "port.h" #include "socket.h" #include "name_distr.h" #include "discover.h" @@ -82,15 +81,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf); static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, struct sk_buff **buf); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); -static int tipc_link_iovec_long_xmit(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf); +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf); /* * Simple link routines @@ -277,7 +274,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, link_init_max_pkt(l_ptr); l_ptr->next_out_no = 1; - INIT_LIST_HEAD(&l_ptr->waiting_ports); + __skb_queue_head_init(&l_ptr->waiting_sks); link_reset_statistics(l_ptr); @@ -324,62 +321,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) } /** - * link_schedule_port - schedule port for deferred sending - * @l_ptr: pointer to link - * @origport: reference to sending port - * @sz: amount of data to be sent - * - * Schedules port for renewed sending of messages after link congestion - * has abated. + * link_schedule_user - schedule user for wakeup after congestion + * @link: congested link + * @oport: sending port + * @chain_sz: size of buffer chain that was attempted sent + * @imp: importance of message attempted sent + * Create pseudo msg to send back to user when congestion abates */ -static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) +static bool link_schedule_user(struct tipc_link *link, u32 oport, + uint chain_sz, uint imp) { - struct tipc_port *p_ptr; + struct sk_buff *buf; - spin_lock_bh(&tipc_port_list_lock); - p_ptr = tipc_port_lock(origport); - if (p_ptr) { - if (!list_empty(&p_ptr->wait_list)) - goto exit; - p_ptr->congested = 1; - p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); - list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); - l_ptr->stats.link_congs++; -exit: - tipc_port_unlock(p_ptr); - } - spin_unlock_bh(&tipc_port_list_lock); - return -ELINKCONG; + buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr, + tipc_own_addr, oport, 0, 0); + if (!buf) + return false; + TIPC_SKB_CB(buf)->chain_sz = chain_sz; + TIPC_SKB_CB(buf)->chain_imp = imp; + __skb_queue_tail(&link->waiting_sks, buf); + link->stats.link_congs++; + return true; } -void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) +/** + * link_prepare_wakeup - prepare users for wakeup after congestion + * @link: congested link + * Move a number of waiting users, as permitted by available space in + * the send queue, from link wait queue to node wait queue for wakeup + */ +static void link_prepare_wakeup(struct tipc_link *link) { - struct tipc_port *p_ptr; - struct tipc_port *temp_p_ptr; - int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; + struct sk_buff_head *wq = &link->waiting_sks; + struct sk_buff *buf; + uint pend_qsz = link->out_queue_size; - if (all) - win = 100000; - if (win <= 0) - return; - if (!spin_trylock_bh(&tipc_port_list_lock)) - return; - if (link_congested(l_ptr)) - goto exit; - list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, - wait_list) { - if (win <= 0) + for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { + if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) break; - list_del_init(&p_ptr->wait_list); - spin_lock_bh(p_ptr->lock); - p_ptr->congested = 0; - tipc_port_wakeup(p_ptr); - win -= p_ptr->waiting_pkts; - spin_unlock_bh(p_ptr->lock); + pend_qsz += TIPC_SKB_CB(buf)->chain_sz; + __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); } - -exit: - spin_unlock_bh(&tipc_port_list_lock); } /** @@ -421,6 +403,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) u32 prev_state = l_ptr->state; u32 checkpoint = l_ptr->next_in_no; int was_active_link = tipc_link_is_active(l_ptr); + struct tipc_node *owner = l_ptr->owner; msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); @@ -448,9 +431,10 @@ void tipc_link_reset(struct tipc_link *l_ptr) kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; kfree_skb_list(l_ptr->oldest_deferred_in); - if (!list_empty(&l_ptr->waiting_ports)) - tipc_link_wakeup_ports(l_ptr, 1); - + if (!skb_queue_empty(&l_ptr->waiting_sks)) { + skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); + owner->action_flags |= TIPC_WAKEUP_USERS; + } l_ptr->retransm_queue_head = 0; l_ptr->retransm_queue_size = 0; l_ptr->last_out = NULL; @@ -676,178 +660,146 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) } } -/* - * link_bundle_buf(): Append contents of a buffer to - * the tail of an existing one. +/* tipc_link_cong: determine return value and how to treat the + * sent buffer during link congestion. + * - For plain, errorless user data messages we keep the buffer and + * return -ELINKONG. + * - For all other messages we discard the buffer and return -EHOSTUNREACH + * - For TIPC internal messages we also reset the link */ -static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, - struct sk_buff *buf) +static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) { - struct tipc_msg *bundler_msg = buf_msg(bundler); struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 bundle_size = msg_size(bundler_msg); - u32 to_pos = align(bundle_size); - u32 pad = to_pos - bundle_size; - - if (msg_user(bundler_msg) != MSG_BUNDLER) - return 0; - if (msg_type(bundler_msg) != OPEN_MSG) - return 0; - if (skb_tailroom(bundler) < (pad + size)) - return 0; - if (l_ptr->max_pkt < (to_pos + size)) - return 0; - - skb_put(bundler, pad + size); - skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); - msg_set_size(bundler_msg, to_pos + size); - msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); - kfree_skb(buf); - l_ptr->stats.sent_bundled++; - return 1; -} - -static void link_add_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf, - struct tipc_msg *msg) -{ - u32 ack = mod(l_ptr->next_in_no - 1); - u32 seqno = mod(l_ptr->next_out_no++); + uint imp = tipc_msg_tot_importance(msg); + u32 oport = msg_tot_origport(msg); - msg_set_word(msg, 2, ((ack << 16) | seqno)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - buf->next = NULL; - if (l_ptr->first_out) { - l_ptr->last_out->next = buf; - l_ptr->last_out = buf; - } else - l_ptr->first_out = l_ptr->last_out = buf; - - l_ptr->out_queue_size++; - if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) - l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; -} - -static void link_add_chain_to_outqueue(struct tipc_link *l_ptr, - struct sk_buff *buf_chain, - u32 long_msgno) -{ - struct sk_buff *buf; - struct tipc_msg *msg; - - if (!l_ptr->next_out) - l_ptr->next_out = buf_chain; - while (buf_chain) { - buf = buf_chain; - buf_chain = buf_chain->next; - - msg = buf_msg(buf); - msg_set_long_msgno(msg, long_msgno); - link_add_to_outqueue(l_ptr, buf, msg); + if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { + pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); + tipc_link_reset(link); + goto drop; } + if (unlikely(msg_errcode(msg))) + goto drop; + if (unlikely(msg_reroute_cnt(msg))) + goto drop; + if (TIPC_SKB_CB(buf)->wakeup_pending) + return -ELINKCONG; + if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) + return -ELINKCONG; +drop: + kfree_skb_list(buf); + return -EHOSTUNREACH; } -/* - * tipc_link_xmit() is the 'full path' for messages, called from - * inside TIPC when the 'fast path' in tipc_send_xmit - * has failed, and from link_send() +/** + * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked + * @link: link to use + * @buf: chain of buffers containing message + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket + * user data messages) or -EHOSTUNREACH (all other messages/senders) + * Only the socket functions tipc_send_stream() and tipc_send_packet() need + * to act on the return value, since they may need to do more send attempts. */ -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); - u32 size = msg_size(msg); - u32 dsz = msg_data_sz(msg); - u32 queue_size = l_ptr->out_queue_size; - u32 imp = tipc_msg_tot_importance(msg); - u32 queue_limit = l_ptr->queue_limit[imp]; - u32 max_packet = l_ptr->max_pkt; - - /* Match msg importance against queue limits: */ - if (unlikely(queue_size >= queue_limit)) { - if (imp <= TIPC_CRITICAL_IMPORTANCE) { - link_schedule_port(l_ptr, msg_origport(msg), size); - kfree_skb(buf); - return -ELINKCONG; - } - kfree_skb(buf); - if (imp > CONN_MANAGER) { - pr_warn("%s<%s>, send queue full", link_rst_msg, - l_ptr->name); - tipc_link_reset(l_ptr); - } - return dsz; + uint psz = msg_size(msg); + uint qsz = link->out_queue_size; + uint sndlim = link->queue_limit[0]; + uint imp = tipc_msg_tot_importance(msg); + uint mtu = link->max_pkt; + uint ack = mod(link->next_in_no - 1); + uint seqno = link->next_out_no; + uint bc_last_in = link->owner->bclink.last_in; + struct tipc_media_addr *addr = &link->media_addr; + struct sk_buff *next = buf->next; + + /* Match queue limits against msg importance: */ + if (unlikely(qsz >= link->queue_limit[imp])) + return tipc_link_cong(link, buf); + + /* Has valid packet limit been used ? */ + if (unlikely(psz > mtu)) { + kfree_skb_list(buf); + return -EMSGSIZE; } - /* Fragmentation needed ? */ - if (size > max_packet) - return tipc_link_frag_xmit(l_ptr, buf); - - /* Packet can be queued or sent. */ - if (likely(!link_congested(l_ptr))) { - link_add_to_outqueue(l_ptr, buf, msg); + /* Prepare each packet for sending, and add to outqueue: */ + while (buf) { + next = buf->next; + msg = buf_msg(buf); + msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); + msg_set_bcast_ack(msg, bc_last_in); + + if (!link->first_out) { + link->first_out = buf; + } else if (qsz < sndlim) { + link->last_out->next = buf; + } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { + link->stats.sent_bundled++; + buf = next; + next = buf->next; + continue; + } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { + link->stats.sent_bundled++; + link->stats.sent_bundles++; + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } else { + link->last_out->next = buf; + if (!link->next_out) + link->next_out = buf; + } - tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return dsz; - } - /* Congestion: can message be bundled ? */ - if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && - (msg_user(msg) != MSG_FRAGMENTER)) { - - /* Try adding message to an existing bundle */ - if (l_ptr->next_out && - link_bundle_buf(l_ptr, l_ptr->last_out, buf)) - return dsz; - - /* Try creating a new bundle */ - if (size <= max_packet * 2 / 3) { - struct sk_buff *bundler = tipc_buf_acquire(max_packet); - struct tipc_msg bundler_hdr; - - if (bundler) { - tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, - INT_H_SIZE, l_ptr->addr); - skb_copy_to_linear_data(bundler, &bundler_hdr, - INT_H_SIZE); - skb_trim(bundler, INT_H_SIZE); - link_bundle_buf(l_ptr, bundler, buf); - buf = bundler; - msg = buf_msg(buf); - l_ptr->stats.sent_bundles++; - } + /* Send packet if possible: */ + if (likely(++qsz <= sndlim)) { + tipc_bearer_send(link->bearer_id, buf, addr); + link->next_out = next; + link->unacked_window = 0; } + seqno++; + link->last_out = buf; + buf = next; } - if (!l_ptr->next_out) - l_ptr->next_out = buf; - link_add_to_outqueue(l_ptr, buf, msg); - return dsz; + link->next_out_no = seqno; + link->out_queue_size = qsz; + return 0; } -/* - * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use - * has not been selected yet, and the the owner node is not locked - * Called by TIPC internal users, e.g. the name distributor +/** + * tipc_link_xmit() is the general link level function for message sending + * @buf: chain of buffers containing message + * @dsz: amount of user data to be sent + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) +int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) { - struct tipc_link *l_ptr; - struct tipc_node *n_ptr; - int res = -ELINKCONG; + struct tipc_link *link = NULL; + struct tipc_node *node; + int rc = -EHOSTUNREACH; - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[selector & 1]; - if (l_ptr) - res = __tipc_link_xmit(l_ptr, buf); - else - kfree_skb(buf); - tipc_node_unlock(n_ptr); - } else { - kfree_skb(buf); + node = tipc_node_find(dnode); + if (node) { + tipc_node_lock(node); + link = node->active_links[selector & 1]; + if (link) + rc = __tipc_link_xmit(link, buf); + tipc_node_unlock(node); } - return res; + + if (link) + return rc; + + if (likely(in_own_node(dnode))) + return tipc_sk_rcv(buf); + + kfree_skb_list(buf); + return rc; } /* @@ -858,7 +810,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) * * Called with node locked */ -static void tipc_link_sync_xmit(struct tipc_link *l) +static void tipc_link_sync_xmit(struct tipc_link *link) { struct sk_buff *buf; struct tipc_msg *msg; @@ -868,10 +820,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l) return; msg = buf_msg(buf); - tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); - msg_set_last_bcast(msg, l->owner->bclink.acked); - link_add_chain_to_outqueue(l, buf, 0); - tipc_link_push_queue(l); + tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); + msg_set_last_bcast(msg, link->owner->bclink.acked); + __tipc_link_xmit(link, buf); } /* @@ -892,293 +843,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) } /* - * tipc_link_names_xmit - send name table entries to new neighbor - * - * Send routine for bulk delivery of name table messages when contact - * with a new neighbor occurs. No link congestion checking is performed - * because name table messages *must* be delivered. The messages must be - * small enough not to require fragmentation. - * Called without any locks held. - */ -void tipc_link_names_xmit(struct list_head *message_list, u32 dest) -{ - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct sk_buff *temp_buf; - - if (list_empty(message_list)) - return; - - n_ptr = tipc_node_find(dest); - if (n_ptr) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[0]; - if (l_ptr) { - /* convert circular list to linear list */ - ((struct sk_buff *)message_list->prev)->next = NULL; - link_add_chain_to_outqueue(l_ptr, - (struct sk_buff *)message_list->next, 0); - tipc_link_push_queue(l_ptr); - INIT_LIST_HEAD(message_list); - } - tipc_node_unlock(n_ptr); - } - - /* discard the messages if they couldn't be sent */ - list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) { - list_del((struct list_head *)buf); - kfree_skb(buf); - } -} - -/* - * tipc_link_xmit_fast: Entry for data messages where the - * destination link is known and the header is complete, - * inclusive total message length. Very time critical. - * Link is locked. Returns user data length. - */ -static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf, - u32 *used_max_pkt) -{ - struct tipc_msg *msg = buf_msg(buf); - int res = msg_data_sz(msg); - - if (likely(!link_congested(l_ptr))) { - if (likely(msg_size(msg) <= l_ptr->max_pkt)) { - link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_send(l_ptr->bearer_id, buf, - &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return res; - } - else - *used_max_pkt = l_ptr->max_pkt; - } - return __tipc_link_xmit(l_ptr, buf); /* All other cases */ -} - -/* - * tipc_link_iovec_xmit_fast: Entry for messages where the - * destination processor is known and the header is complete, - * except for total message length. - * Returns user data length or errno. - */ -int tipc_link_iovec_xmit_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) -{ - struct tipc_msg *hdr = &sender->phdr; - struct tipc_link *l_ptr; - struct sk_buff *buf; - struct tipc_node *node; - int res; - u32 selector = msg_origport(hdr) & 1; - -again: - /* - * Try building message using port's max_pkt hint. - * (Must not hold any locks while building message.) - */ - res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf); - /* Exit if build request was invalid */ - if (unlikely(res < 0)) - return res; - - node = tipc_node_find(destaddr); - if (likely(node)) { - tipc_node_lock(node); - l_ptr = node->active_links[selector]; - if (likely(l_ptr)) { - if (likely(buf)) { - res = tipc_link_xmit_fast(l_ptr, buf, - &sender->max_pkt); -exit: - tipc_node_unlock(node); - return res; - } - - /* Exit if link (or bearer) is congested */ - if (link_congested(l_ptr)) { - res = link_schedule_port(l_ptr, - sender->ref, res); - goto exit; - } - - /* - * Message size exceeds max_pkt hint; update hint, - * then re-try fast path or fragment the message - */ - sender->max_pkt = l_ptr->max_pkt; - tipc_node_unlock(node); - - - if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) - goto again; - - return tipc_link_iovec_long_xmit(sender, msg_sect, - len, destaddr); - } - tipc_node_unlock(node); - } - - /* Couldn't find a link to the destination node */ - kfree_skb(buf); - tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE); - return -ENETUNREACH; -} - -/* - * tipc_link_iovec_long_xmit(): Entry for long messages where the - * destination node is known and the header is complete, - * inclusive total message length. - * Link and bearer congestion status have been checked to be ok, - * and are ignored if they change. - * - * Note that fragments do not use the full link MTU so that they won't have - * to undergo refragmentation if link changeover causes them to be sent - * over another link with an additional tunnel header added as prefix. - * (Refragmentation will still occur if the other link has a smaller MTU.) - * - * Returns user data length or errno. - */ -static int tipc_link_iovec_long_xmit(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) -{ - struct tipc_link *l_ptr; - struct tipc_node *node; - struct tipc_msg *hdr = &sender->phdr; - u32 dsz = len; - u32 max_pkt, fragm_sz, rest; - struct tipc_msg fragm_hdr; - struct sk_buff *buf, *buf_chain, *prev; - u32 fragm_crs, fragm_rest, hsz, sect_rest; - const unchar __user *sect_crs; - int curr_sect; - u32 fragm_no; - int res = 0; - -again: - fragm_no = 1; - max_pkt = sender->max_pkt - INT_H_SIZE; - /* leave room for tunnel header in case of link changeover */ - fragm_sz = max_pkt - INT_H_SIZE; - /* leave room for fragmentation header in each fragment */ - rest = dsz; - fragm_crs = 0; - fragm_rest = 0; - sect_rest = 0; - sect_crs = NULL; - curr_sect = -1; - - /* Prepare reusable fragment header */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, msg_destnode(hdr)); - msg_set_size(&fragm_hdr, max_pkt); - msg_set_fragm_no(&fragm_hdr, 1); - - /* Prepare header of first fragment */ - buf_chain = buf = tipc_buf_acquire(max_pkt); - if (!buf) - return -ENOMEM; - buf->next = NULL; - skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); - hsz = msg_hdr_sz(hdr); - skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); - - /* Chop up message */ - fragm_crs = INT_H_SIZE + hsz; - fragm_rest = fragm_sz - hsz; - - do { /* For all sections */ - u32 sz; - - if (!sect_rest) { - sect_rest = msg_sect[++curr_sect].iov_len; - sect_crs = msg_sect[curr_sect].iov_base; - } - - if (sect_rest < fragm_rest) - sz = sect_rest; - else - sz = fragm_rest; - - if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { - res = -EFAULT; -error: - kfree_skb_list(buf_chain); - return res; - } - sect_crs += sz; - sect_rest -= sz; - fragm_crs += sz; - fragm_rest -= sz; - rest -= sz; - - if (!fragm_rest && rest) { - - /* Initiate new fragment: */ - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } else { - msg_set_type(&fragm_hdr, FRAGMENT); - } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - msg_set_fragm_no(&fragm_hdr, ++fragm_no); - prev = buf; - buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (!buf) { - res = -ENOMEM; - goto error; - } - - buf->next = NULL; - prev->next = buf; - skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); - fragm_crs = INT_H_SIZE; - fragm_rest = fragm_sz; - } - } while (rest > 0); - - /* - * Now we have a buffer chain. Select a link and check - * that packet size is still OK - */ - node = tipc_node_find(destaddr); - if (likely(node)) { - tipc_node_lock(node); - l_ptr = node->active_links[sender->ref & 1]; - if (!l_ptr) { - tipc_node_unlock(node); - goto reject; - } - if (l_ptr->max_pkt < max_pkt) { - sender->max_pkt = l_ptr->max_pkt; - tipc_node_unlock(node); - kfree_skb_list(buf_chain); - goto again; - } - } else { -reject: - kfree_skb_list(buf_chain); - tipc_port_iovec_reject(sender, hdr, msg_sect, len, - TIPC_ERR_NO_NODE); - return -ENETUNREACH; - } - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - tipc_node_unlock(node); - return dsz; -} - -/* * tipc_link_push_packet: Push one unsent packet to the media */ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) @@ -1238,7 +902,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr) tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); if (msg_user(msg) == MSG_BUNDLER) - msg_set_type(msg, CLOSED_MSG); + msg_set_type(msg, BUNDLE_CLOSED); l_ptr->next_out = buf->next; return 0; } @@ -1524,12 +1188,9 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->next_out)) tipc_link_push_queue(l_ptr); - if (unlikely(!list_empty(&l_ptr->waiting_ports))) - tipc_link_wakeup_ports(l_ptr, 0); - - if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { - l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); + if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { + link_prepare_wakeup(l_ptr); + l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS; } /* Process the incoming packet */ @@ -1565,57 +1226,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); - /* Deliver packet/message to correct user: */ - if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { - if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { - tipc_node_unlock(n_ptr); - continue; - } - msg = buf_msg(buf); - } else if (msg_user(msg) == MSG_FRAGMENTER) { - l_ptr->stats.recv_fragments++; - if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) { - l_ptr->stats.recv_fragmented++; - msg = buf_msg(buf); - } else { - if (!l_ptr->reasm_buf) - tipc_link_reset(l_ptr); - tipc_node_unlock(n_ptr); - continue; - } + if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { + l_ptr->stats.sent_acks++; + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } - switch (msg_user(msg)) { - case TIPC_LOW_IMPORTANCE: - case TIPC_MEDIUM_IMPORTANCE: - case TIPC_HIGH_IMPORTANCE: - case TIPC_CRITICAL_IMPORTANCE: + if (tipc_link_prepare_input(l_ptr, &buf)) { tipc_node_unlock(n_ptr); - tipc_sk_rcv(buf); continue; - case MSG_BUNDLER: - l_ptr->stats.recv_bundles++; - l_ptr->stats.recv_bundled += msg_msgcnt(msg); - tipc_node_unlock(n_ptr); - tipc_link_bundle_rcv(buf); - continue; - case NAME_DISTRIBUTOR: - n_ptr->bclink.recv_permitted = true; - tipc_node_unlock(n_ptr); - tipc_named_rcv(buf); - continue; - case CONN_MANAGER: - tipc_node_unlock(n_ptr); - tipc_port_proto_rcv(buf); - continue; - case BCAST_PROTOCOL: - tipc_link_sync_rcv(n_ptr, buf); - break; - default: - kfree_skb(buf); - break; } tipc_node_unlock(n_ptr); + msg = buf_msg(buf); + if (tipc_link_input(l_ptr, buf) != 0) + goto discard; continue; unlock_discard: tipc_node_unlock(n_ptr); @@ -1625,6 +1248,80 @@ discard: } /** + * tipc_link_prepare_input - process TIPC link messages + * + * returns nonzero if the message was consumed + * + * Node lock must be held + */ +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf) +{ + struct tipc_node *n; + struct tipc_msg *msg; + int res = -EINVAL; + + n = l->owner; + msg = buf_msg(*buf); + switch (msg_user(msg)) { + case CHANGEOVER_PROTOCOL: + if (tipc_link_tunnel_rcv(n, buf)) + res = 0; + break; + case MSG_FRAGMENTER: + l->stats.recv_fragments++; + if (tipc_buf_append(&l->reasm_buf, buf)) { + l->stats.recv_fragmented++; + res = 0; + } else if (!l->reasm_buf) { + tipc_link_reset(l); + } + break; + case MSG_BUNDLER: + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(msg); + res = 0; + break; + case NAME_DISTRIBUTOR: + n->bclink.recv_permitted = true; + res = 0; + break; + case BCAST_PROTOCOL: + tipc_link_sync_rcv(n, *buf); + break; + default: + res = 0; + } + return res; +} +/** + * tipc_link_input - Deliver message too higher layers + */ +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + int res = 0; + + switch (msg_user(msg)) { + case TIPC_LOW_IMPORTANCE: + case TIPC_MEDIUM_IMPORTANCE: + case TIPC_HIGH_IMPORTANCE: + case TIPC_CRITICAL_IMPORTANCE: + case CONN_MANAGER: + tipc_sk_rcv(buf); + break; + case NAME_DISTRIBUTOR: + tipc_named_rcv(buf); + break; + case MSG_BUNDLER: + tipc_link_bundle_rcv(buf); + break; + default: + res = -EINVAL; + } + return res; +} + +/** * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue * * Returns increase in queue length (i.e. 0 or 1) @@ -2217,6 +1914,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) u32 msgcount = msg_msgcnt(buf_msg(buf)); u32 pos = INT_H_SIZE; struct sk_buff *obuf; + struct tipc_msg *omsg; while (msgcount--) { obuf = buf_extract(buf, pos); @@ -2224,82 +1922,23 @@ void tipc_link_bundle_rcv(struct sk_buff *buf) pr_warn("Link unable to unbundle message(s)\n"); break; } - pos += align(msg_size(buf_msg(obuf))); - tipc_net_route_msg(obuf); - } - kfree_skb(buf); -} - -/* - * Fragmentation/defragmentation: - */ - -/* - * tipc_link_frag_xmit: Entry for buffers needing fragmentation. - * The buffer is complete, inclusive total message length. - * Returns user data length. - */ -static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) -{ - struct sk_buff *buf_chain = NULL; - struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; - struct tipc_msg *inmsg = buf_msg(buf); - struct tipc_msg fragm_hdr; - u32 insize = msg_size(inmsg); - u32 dsz = msg_data_sz(inmsg); - unchar *crs = buf->data; - u32 rest = insize; - u32 pack_sz = l_ptr->max_pkt; - u32 fragm_sz = pack_sz - INT_H_SIZE; - u32 fragm_no = 0; - u32 destaddr; - - if (msg_short(inmsg)) - destaddr = l_ptr->addr; - else - destaddr = msg_destnode(inmsg); - - /* Prepare reusable fragment header: */ - tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, - INT_H_SIZE, destaddr); - - /* Chop up message: */ - while (rest > 0) { - struct sk_buff *fragm; - - if (rest <= fragm_sz) { - fragm_sz = rest; - msg_set_type(&fragm_hdr, LAST_FRAGMENT); - } - fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (fragm == NULL) { - kfree_skb(buf); - kfree_skb_list(buf_chain); - return -ENOMEM; + omsg = buf_msg(obuf); + pos += align(msg_size(omsg)); + if (msg_isdata(omsg)) { + if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG)) + tipc_sk_mcast_rcv(obuf); + else + tipc_sk_rcv(obuf); + } else if (msg_user(omsg) == CONN_MANAGER) { + tipc_sk_rcv(obuf); + } else if (msg_user(omsg) == NAME_DISTRIBUTOR) { + tipc_named_rcv(obuf); + } else { + pr_warn("Illegal bundled msg: %u\n", msg_user(omsg)); + kfree_skb(obuf); } - msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); - fragm_no++; - msg_set_fragm_no(&fragm_hdr, fragm_no); - skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, - fragm_sz); - buf_chain_tail->next = fragm; - buf_chain_tail = fragm; - - rest -= fragm_sz; - crs += fragm_sz; - msg_set_type(&fragm_hdr, FRAGMENT); } kfree_skb(buf); - - /* Append chain of fragments to send queue & send them */ - l_ptr->long_msg_seq_no++; - link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); - l_ptr->stats.sent_fragments += fragm_no; - l_ptr->stats.sent_fragmented++; - tipc_link_push_queue(l_ptr); - - return dsz; } static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) diff --git a/net/tipc/link.h b/net/tipc/link.h index 200d518b218..b567a3427fd 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -1,7 +1,7 @@ /* * net/tipc/link.h: Include file for TIPC link code * - * Copyright (c) 1995-2006, 2013, Ericsson AB + * Copyright (c) 1995-2006, 2013-2014, Ericsson AB * Copyright (c) 2004-2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -133,7 +133,7 @@ struct tipc_stats { * @retransm_queue_size: number of messages to retransmit * @retransm_queue_head: sequence number of first message to retransmit * @next_out: ptr to first unsent outbound message in queue - * @waiting_ports: linked list of ports waiting for link congestion to abate + * @waiting_sks: linked list of sockets waiting for link congestion to abate * @long_msg_seq_no: next identifier to use for outbound fragmented messages * @reasm_buf: head of partially reassembled inbound message fragments * @stats: collects statistics regarding link activity @@ -194,7 +194,7 @@ struct tipc_link { u32 retransm_queue_size; u32 retransm_queue_head; struct sk_buff *next_out; - struct list_head waiting_ports; + struct sk_buff_head waiting_sks; /* Fragmentation/reassembly */ u32 long_msg_seq_no; @@ -227,20 +227,14 @@ void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset_list(unsigned int bearer_id); int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); -void tipc_link_names_xmit(struct list_head *message_list, u32 dest); -int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); -int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); +int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); u32 tipc_link_get_max_pkt(u32 dest, u32 selector); -int tipc_link_iovec_xmit_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); void tipc_link_bundle_rcv(struct sk_buff *buf); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); void tipc_link_push_queue(struct tipc_link *l_ptr); u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, struct sk_buff *buf); -void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all); void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *start, u32 retransmits); diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 0a37a472c29..74745a47d72 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -36,21 +36,16 @@ #include "core.h" #include "msg.h" +#include "addr.h" +#include "name_table.h" -u32 tipc_msg_tot_importance(struct tipc_msg *m) +#define MAX_FORWARD_SIZE 1024 + +static unsigned int align(unsigned int i) { - if (likely(msg_isdata(m))) { - if (likely(msg_orignode(m) == tipc_own_addr)) - return msg_importance(m); - return msg_importance(m) + 4; - } - if ((msg_user(m) == MSG_FRAGMENTER) && - (msg_type(m) == FIRST_FRAGMENT)) - return msg_importance(msg_get_wrapped(m)); - return msg_importance(m); + return (i + 3) & ~3u; } - void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode) { @@ -61,43 +56,35 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, msg_set_size(m, hsize); msg_set_prevnode(m, tipc_own_addr); msg_set_type(m, type); - msg_set_orignode(m, tipc_own_addr); - msg_set_destnode(m, destnode); + if (hsize > SHORT_H_SIZE) { + msg_set_orignode(m, tipc_own_addr); + msg_set_destnode(m, destnode); + } } -/** - * tipc_msg_build - create message using specified header and data - * - * Note: Caller must not hold any locks in case copy_from_user() is interrupted! - * - * Returns message data size or errno - */ -int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - unsigned int len, int max_size, struct sk_buff **buf) +struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, + uint data_sz, u32 dnode, u32 onode, + u32 dport, u32 oport, int errcode) { - int dsz, sz, hsz; - unsigned char *to; - - dsz = len; - hsz = msg_hdr_sz(hdr); - sz = hsz + dsz; - msg_set_size(hdr, sz); - if (unlikely(sz > max_size)) { - *buf = NULL; - return dsz; - } + struct tipc_msg *msg; + struct sk_buff *buf; - *buf = tipc_buf_acquire(sz); - if (!(*buf)) - return -ENOMEM; - skb_copy_to_linear_data(*buf, hdr, hsz); - to = (*buf)->data + hsz; - if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) { - kfree_skb(*buf); - *buf = NULL; - return -EFAULT; + buf = tipc_buf_acquire(hdr_sz + data_sz); + if (unlikely(!buf)) + return NULL; + + msg = buf_msg(buf); + tipc_msg_init(msg, user, type, hdr_sz, dnode); + msg_set_size(msg, hdr_sz + data_sz); + msg_set_prevnode(msg, onode); + msg_set_origport(msg, oport); + msg_set_destport(msg, dport); + msg_set_errcode(msg, errcode); + if (hdr_sz > SHORT_H_SIZE) { + msg_set_orignode(msg, onode); + msg_set_destnode(msg, dnode); } - return dsz; + return buf; } /* tipc_buf_append(): Append a buffer to the fragment list of another buffer @@ -112,27 +99,38 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) struct sk_buff *head = *headbuf; struct sk_buff *frag = *buf; struct sk_buff *tail; - struct tipc_msg *msg = buf_msg(frag); - u32 fragid = msg_type(msg); - bool headstolen; + struct tipc_msg *msg; + u32 fragid; int delta; + bool headstolen; + + if (!frag) + goto err; + msg = buf_msg(frag); + fragid = msg_type(msg); + frag->next = NULL; skb_pull(frag, msg_hdr_sz(msg)); if (fragid == FIRST_FRAGMENT) { - if (head || skb_unclone(frag, GFP_ATOMIC)) - goto out_free; + if (unlikely(head)) + goto err; + if (unlikely(skb_unclone(frag, GFP_ATOMIC))) + goto err; head = *headbuf = frag; skb_frag_list_init(head); + TIPC_SKB_CB(head)->tail = NULL; *buf = NULL; return 0; } + if (!head) - goto out_free; - tail = TIPC_SKB_CB(head)->tail; + goto err; + if (skb_try_coalesce(head, frag, &headstolen, &delta)) { kfree_skb_partial(frag, headstolen); } else { + tail = TIPC_SKB_CB(head)->tail; if (!skb_has_frag_list(head)) skb_shinfo(head)->frag_list = frag; else @@ -142,6 +140,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) head->len += frag->len; TIPC_SKB_CB(head)->tail = frag; } + if (fragid == LAST_FRAGMENT) { *buf = head; TIPC_SKB_CB(head)->tail = NULL; @@ -150,10 +149,314 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) } *buf = NULL; return 0; -out_free: + +err: pr_warn_ratelimited("Unable to build fragment list\n"); kfree_skb(*buf); kfree_skb(*headbuf); *buf = *headbuf = NULL; return 0; } + + +/** + * tipc_msg_build - create buffer chain containing specified header and data + * @mhdr: Message header, to be prepended to data + * @iov: User data + * @offset: Posision in iov to start copying from + * @dsz: Total length of user data + * @pktmax: Max packet size that can be used + * @chain: Buffer or chain of buffers to be returned to caller + * Returns message data size or errno: -ENOMEM, -EFAULT + */ +int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, + int offset, int dsz, int pktmax , struct sk_buff **chain) +{ + int mhsz = msg_hdr_sz(mhdr); + int msz = mhsz + dsz; + int pktno = 1; + int pktsz; + int pktrem = pktmax; + int drem = dsz; + struct tipc_msg pkthdr; + struct sk_buff *buf, *prev; + char *pktpos; + int rc; + uint chain_sz = 0; + msg_set_size(mhdr, msz); + + /* No fragmentation needed? */ + if (likely(msz <= pktmax)) { + buf = tipc_buf_acquire(msz); + *chain = buf; + if (unlikely(!buf)) + return -ENOMEM; + skb_copy_to_linear_data(buf, mhdr, mhsz); + pktpos = buf->data + mhsz; + TIPC_SKB_CB(buf)->chain_sz = 1; + if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) + return dsz; + rc = -EFAULT; + goto error; + } + + /* Prepare reusable fragment header */ + tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT, + INT_H_SIZE, msg_destnode(mhdr)); + msg_set_size(&pkthdr, pktmax); + msg_set_fragm_no(&pkthdr, pktno); + + /* Prepare first fragment */ + *chain = buf = tipc_buf_acquire(pktmax); + if (!buf) + return -ENOMEM; + chain_sz = 1; + pktpos = buf->data; + skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); + pktpos += INT_H_SIZE; + pktrem -= INT_H_SIZE; + skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); + pktpos += mhsz; + pktrem -= mhsz; + + do { + if (drem < pktrem) + pktrem = drem; + + if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) { + rc = -EFAULT; + goto error; + } + drem -= pktrem; + offset += pktrem; + + if (!drem) + break; + + /* Prepare new fragment: */ + if (drem < (pktmax - INT_H_SIZE)) + pktsz = drem + INT_H_SIZE; + else + pktsz = pktmax; + prev = buf; + buf = tipc_buf_acquire(pktsz); + if (!buf) { + rc = -ENOMEM; + goto error; + } + chain_sz++; + prev->next = buf; + msg_set_type(&pkthdr, FRAGMENT); + msg_set_size(&pkthdr, pktsz); + msg_set_fragm_no(&pkthdr, ++pktno); + skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); + pktpos = buf->data + INT_H_SIZE; + pktrem = pktsz - INT_H_SIZE; + + } while (1); + TIPC_SKB_CB(*chain)->chain_sz = chain_sz; + msg_set_type(buf_msg(buf), LAST_FRAGMENT); + return dsz; +error: + kfree_skb_list(*chain); + *chain = NULL; + return rc; +} + +/** + * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one + * @bbuf: the existing buffer ("bundle") + * @buf: buffer to be appended + * @mtu: max allowable size for the bundle buffer + * Consumes buffer if successful + * Returns true if bundling could be performed, otherwise false + */ +bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) +{ + struct tipc_msg *bmsg = buf_msg(bbuf); + struct tipc_msg *msg = buf_msg(buf); + unsigned int bsz = msg_size(bmsg); + unsigned int msz = msg_size(msg); + u32 start = align(bsz); + u32 max = mtu - INT_H_SIZE; + u32 pad = start - bsz; + + if (likely(msg_user(msg) == MSG_FRAGMENTER)) + return false; + if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) + return false; + if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) + return false; + if (likely(msg_user(bmsg) != MSG_BUNDLER)) + return false; + if (likely(msg_type(bmsg) != BUNDLE_OPEN)) + return false; + if (unlikely(skb_tailroom(bbuf) < (pad + msz))) + return false; + if (unlikely(max < (start + msz))) + return false; + + skb_put(bbuf, pad + msz); + skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); + msg_set_size(bmsg, start + msz); + msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); + bbuf->next = buf->next; + kfree_skb(buf); + return true; +} + +/** + * tipc_msg_make_bundle(): Create bundle buf and append message to its tail + * @buf: buffer to be appended and replaced + * @mtu: max allowable size for the bundle buffer, inclusive header + * @dnode: destination node for message. (Not always present in header) + * Replaces buffer if successful + * Returns true if sucess, otherwise false + */ +bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) +{ + struct sk_buff *bbuf; + struct tipc_msg *bmsg; + struct tipc_msg *msg = buf_msg(*buf); + u32 msz = msg_size(msg); + u32 max = mtu - INT_H_SIZE; + + if (msg_user(msg) == MSG_FRAGMENTER) + return false; + if (msg_user(msg) == CHANGEOVER_PROTOCOL) + return false; + if (msg_user(msg) == BCAST_PROTOCOL) + return false; + if (msz > (max / 2)) + return false; + + bbuf = tipc_buf_acquire(max); + if (!bbuf) + return false; + + skb_trim(bbuf, INT_H_SIZE); + bmsg = buf_msg(bbuf); + tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode); + msg_set_seqno(bmsg, msg_seqno(msg)); + msg_set_ack(bmsg, msg_ack(msg)); + msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); + bbuf->next = (*buf)->next; + tipc_msg_bundle(bbuf, *buf, mtu); + *buf = bbuf; + return true; +} + +/** + * tipc_msg_reverse(): swap source and destination addresses and add error code + * @buf: buffer containing message to be reversed + * @dnode: return value: node where to send message after reversal + * @err: error code to be set in message + * Consumes buffer if failure + * Returns true if success, otherwise false + */ +bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err) +{ + struct tipc_msg *msg = buf_msg(buf); + uint imp = msg_importance(msg); + struct tipc_msg ohdr; + uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); + + if (skb_linearize(buf)) + goto exit; + if (msg_dest_droppable(msg)) + goto exit; + if (msg_errcode(msg)) + goto exit; + + memcpy(&ohdr, msg, msg_hdr_sz(msg)); + imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE); + if (msg_isdata(msg)) + msg_set_importance(msg, imp); + msg_set_errcode(msg, err); + msg_set_origport(msg, msg_destport(&ohdr)); + msg_set_destport(msg, msg_origport(&ohdr)); + msg_set_prevnode(msg, tipc_own_addr); + if (!msg_short(msg)) { + msg_set_orignode(msg, msg_destnode(&ohdr)); + msg_set_destnode(msg, msg_orignode(&ohdr)); + } + msg_set_size(msg, msg_hdr_sz(msg) + rdsz); + skb_trim(buf, msg_size(msg)); + skb_orphan(buf); + *dnode = msg_orignode(&ohdr); + return true; +exit: + kfree_skb(buf); + return false; +} + +/** + * tipc_msg_eval: determine fate of message that found no destination + * @buf: the buffer containing the message. + * @dnode: return value: next-hop node, if message to be forwarded + * @err: error code to use, if message to be rejected + * + * Does not consume buffer + * Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error + * code if message to be rejected + */ +int tipc_msg_eval(struct sk_buff *buf, u32 *dnode) +{ + struct tipc_msg *msg = buf_msg(buf); + u32 dport; + + if (msg_type(msg) != TIPC_NAMED_MSG) + return -TIPC_ERR_NO_PORT; + if (skb_linearize(buf)) + return -TIPC_ERR_NO_NAME; + if (msg_data_sz(msg) > MAX_FORWARD_SIZE) + return -TIPC_ERR_NO_NAME; + if (msg_reroute_cnt(msg) > 0) + return -TIPC_ERR_NO_NAME; + + *dnode = addr_domain(msg_lookup_scope(msg)); + dport = tipc_nametbl_translate(msg_nametype(msg), + msg_nameinst(msg), + dnode); + if (!dport) + return -TIPC_ERR_NO_NAME; + msg_incr_reroute_cnt(msg); + msg_set_destnode(msg, *dnode); + msg_set_destport(msg, dport); + return TIPC_OK; +} + +/* tipc_msg_reassemble() - clone a buffer chain of fragments and + * reassemble the clones into one message + */ +struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) +{ + struct sk_buff *buf = chain; + struct sk_buff *frag = buf; + struct sk_buff *head = NULL; + int hdr_sz; + + /* Copy header if single buffer */ + if (!buf->next) { + hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); + return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); + } + + /* Clone all fragments and reassemble */ + while (buf) { + frag = skb_clone(buf, GFP_ATOMIC); + if (!frag) + goto error; + frag->next = NULL; + if (tipc_buf_append(&head, &frag)) + break; + if (!head) + goto error; + buf = buf->next; + } + return frag; +error: + pr_warn("Failed do clone local mcast rcv buffer\n"); + kfree_skb(head); + return NULL; +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 503511903d1..0ea7b695ac4 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -442,6 +442,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) #define NAME_DISTRIBUTOR 11 #define MSG_FRAGMENTER 12 #define LINK_CONFIG 13 +#define SOCK_WAKEUP 14 /* pseudo user */ /* * Connection management protocol message types @@ -463,6 +464,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) #define FRAGMENT 1 #define LAST_FRAGMENT 2 +/* Bundling protocol message types + */ +#define BUNDLE_OPEN 0 +#define BUNDLE_CLOSED 1 + /* * Link management protocol message types */ @@ -706,12 +712,40 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } -u32 tipc_msg_tot_importance(struct tipc_msg *m); +static inline u32 tipc_msg_tot_importance(struct tipc_msg *m) +{ + if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) + return msg_importance(msg_get_wrapped(m)); + return msg_importance(m); +} + +static inline u32 msg_tot_origport(struct tipc_msg *m) +{ + if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) + return msg_origport(msg_get_wrapped(m)); + return msg_origport(m); +} + +bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err); + +int tipc_msg_eval(struct sk_buff *buf, u32 *dnode); + void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); -int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - unsigned int len, int max_size, struct sk_buff **buf); + +struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, + uint data_sz, u32 dnode, u32 onode, + u32 dport, u32 oport, int errcode); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); +bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); + +bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); + +int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, + int offset, int dsz, int mtu , struct sk_buff **chain); + +struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); + #endif diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 8ce730984aa..376d2bb51d8 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -1,7 +1,7 @@ /* * net/tipc/name_distr.c: TIPC name distribution code * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -71,6 +71,21 @@ static struct publ_list *publ_lists[] = { }; +int sysctl_tipc_named_timeout __read_mostly = 2000; + +/** + * struct tipc_dist_queue - queue holding deferred name table updates + */ +static struct list_head tipc_dist_queue = LIST_HEAD_INIT(tipc_dist_queue); + +struct distr_queue_item { + struct distr_item i; + u32 dtype; + u32 node; + unsigned long expires; + struct list_head next; +}; + /** * publ_to_item - add publication info to a publication message */ @@ -101,24 +116,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) void named_cluster_distribute(struct sk_buff *buf) { - struct sk_buff *buf_copy; - struct tipc_node *n_ptr; - struct tipc_link *l_ptr; + struct sk_buff *obuf; + struct tipc_node *node; + u32 dnode; rcu_read_lock(); - list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[n_ptr->addr & 1]; - if (l_ptr) { - buf_copy = skb_copy(buf, GFP_ATOMIC); - if (!buf_copy) { - tipc_node_unlock(n_ptr); - break; - } - msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); - __tipc_link_xmit(l_ptr, buf_copy); - } - tipc_node_unlock(n_ptr); + list_for_each_entry_rcu(node, &tipc_node_list, list) { + dnode = node->addr; + if (in_own_node(dnode)) + continue; + if (!tipc_node_active_links(node)) + continue; + obuf = skb_copy(buf, GFP_ATOMIC); + if (!obuf) + break; + msg_set_destnode(buf_msg(obuf), dnode); + tipc_link_xmit(obuf, dnode, dnode); } rcu_read_unlock(); @@ -175,34 +188,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ) return buf; } -/* +/** * named_distribute - prepare name info for bulk distribution to another node + * @msg_list: list of messages (buffers) to be returned from this function + * @dnode: node to be updated + * @pls: linked list of publication items to be packed into buffer chain */ -static void named_distribute(struct list_head *message_list, u32 node, - struct publ_list *pls, u32 max_item_buf) +static void named_distribute(struct list_head *msg_list, u32 dnode, + struct publ_list *pls) { struct publication *publ; struct sk_buff *buf = NULL; struct distr_item *item = NULL; - u32 left = 0; - u32 rest = pls->size * ITEM_SIZE; + uint dsz = pls->size * ITEM_SIZE; + uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; + uint rem = dsz; + uint msg_rem = 0; list_for_each_entry(publ, &pls->list, local_list) { + /* Prepare next buffer: */ if (!buf) { - left = (rest <= max_item_buf) ? rest : max_item_buf; - rest -= left; - buf = named_prepare_buf(PUBLICATION, left, node); + msg_rem = min_t(uint, rem, msg_dsz); + rem -= msg_rem; + buf = named_prepare_buf(PUBLICATION, msg_rem, dnode); if (!buf) { pr_warn("Bulk publication failure\n"); return; } item = (struct distr_item *)msg_data(buf_msg(buf)); } + + /* Pack publication into message: */ publ_to_item(item, publ); item++; - left -= ITEM_SIZE; - if (!left) { - list_add_tail((struct list_head *)buf, message_list); + msg_rem -= ITEM_SIZE; + + /* Append full buffer to list: */ + if (!msg_rem) { + list_add_tail((struct list_head *)buf, msg_list); buf = NULL; } } @@ -211,16 +234,20 @@ static void named_distribute(struct list_head *message_list, u32 node, /** * tipc_named_node_up - tell specified node about all publications by this node */ -void tipc_named_node_up(u32 max_item_buf, u32 node) +void tipc_named_node_up(u32 dnode) { - LIST_HEAD(message_list); + LIST_HEAD(msg_list); + struct sk_buff *buf_chain; read_lock_bh(&tipc_nametbl_lock); - named_distribute(&message_list, node, &publ_cluster, max_item_buf); - named_distribute(&message_list, node, &publ_zone, max_item_buf); + named_distribute(&msg_list, dnode, &publ_cluster); + named_distribute(&msg_list, dnode, &publ_zone); read_unlock_bh(&tipc_nametbl_lock); - tipc_link_names_xmit(&message_list, node); + /* Convert circular list to linear list and send: */ + buf_chain = (struct sk_buff *)msg_list.next; + ((struct sk_buff *)msg_list.prev)->next = NULL; + tipc_link_xmit(buf_chain, dnode, dnode); } /** @@ -251,54 +278,105 @@ static void named_purge_publ(struct publication *publ) } /** + * tipc_update_nametbl - try to process a nametable update and notify + * subscribers + * + * tipc_nametbl_lock must be held. + * Returns the publication item if successful, otherwise NULL. + */ +static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype) +{ + struct publication *publ = NULL; + + if (dtype == PUBLICATION) { + publ = tipc_nametbl_insert_publ(ntohl(i->type), ntohl(i->lower), + ntohl(i->upper), + TIPC_CLUSTER_SCOPE, node, + ntohl(i->ref), ntohl(i->key)); + if (publ) { + tipc_nodesub_subscribe(&publ->subscr, node, publ, + (net_ev_handler) + named_purge_publ); + return true; + } + } else if (dtype == WITHDRAWAL) { + publ = tipc_nametbl_remove_publ(ntohl(i->type), ntohl(i->lower), + node, ntohl(i->ref), + ntohl(i->key)); + if (publ) { + tipc_nodesub_unsubscribe(&publ->subscr); + kfree(publ); + return true; + } + } else { + pr_warn("Unrecognized name table message received\n"); + } + return false; +} + +/** + * tipc_named_add_backlog - add a failed name table update to the backlog + * + */ +static void tipc_named_add_backlog(struct distr_item *i, u32 type, u32 node) +{ + struct distr_queue_item *e; + unsigned long now = get_jiffies_64(); + + e = kzalloc(sizeof(*e), GFP_ATOMIC); + if (!e) + return; + e->dtype = type; + e->node = node; + e->expires = now + msecs_to_jiffies(sysctl_tipc_named_timeout); + memcpy(e, i, sizeof(*i)); + list_add_tail(&e->next, &tipc_dist_queue); +} + +/** + * tipc_named_process_backlog - try to process any pending name table updates + * from the network. + */ +void tipc_named_process_backlog(void) +{ + struct distr_queue_item *e, *tmp; + char addr[16]; + unsigned long now = get_jiffies_64(); + + list_for_each_entry_safe(e, tmp, &tipc_dist_queue, next) { + if (time_after(e->expires, now)) { + if (!tipc_update_nametbl(&e->i, e->node, e->dtype)) + continue; + } else { + tipc_addr_string_fill(addr, e->node); + pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %s key=%u\n", + e->dtype, ntohl(e->i.type), + ntohl(e->i.lower), + ntohl(e->i.upper), + addr, ntohl(e->i.key)); + } + list_del(&e->next); + kfree(e); + } +} + +/** * tipc_named_rcv - process name table update message sent by another node */ void tipc_named_rcv(struct sk_buff *buf) { - struct publication *publ; struct tipc_msg *msg = buf_msg(buf); struct distr_item *item = (struct distr_item *)msg_data(msg); u32 count = msg_data_sz(msg) / ITEM_SIZE; + u32 node = msg_orignode(msg); write_lock_bh(&tipc_nametbl_lock); while (count--) { - if (msg_type(msg) == PUBLICATION) { - publ = tipc_nametbl_insert_publ(ntohl(item->type), - ntohl(item->lower), - ntohl(item->upper), - TIPC_CLUSTER_SCOPE, - msg_orignode(msg), - ntohl(item->ref), - ntohl(item->key)); - if (publ) { - tipc_nodesub_subscribe(&publ->subscr, - msg_orignode(msg), - publ, - (net_ev_handler) - named_purge_publ); - } - } else if (msg_type(msg) == WITHDRAWAL) { - publ = tipc_nametbl_remove_publ(ntohl(item->type), - ntohl(item->lower), - msg_orignode(msg), - ntohl(item->ref), - ntohl(item->key)); - - if (publ) { - tipc_nodesub_unsubscribe(&publ->subscr); - kfree(publ); - } else { - pr_err("Unable to remove publication by node 0x%x\n" - " (type=%u, lower=%u, ref=%u, key=%u)\n", - msg_orignode(msg), ntohl(item->type), - ntohl(item->lower), ntohl(item->ref), - ntohl(item->key)); - } - } else { - pr_warn("Unrecognized name table message received\n"); - } + if (!tipc_update_nametbl(item, node, msg_type(msg))) + tipc_named_add_backlog(item, msg_type(msg), node); item++; } + tipc_named_process_backlog(); write_unlock_bh(&tipc_nametbl_lock); kfree_skb(buf); } diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index b2eed4ec152..b9e75feb343 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -70,8 +70,9 @@ struct distr_item { struct sk_buff *tipc_named_publish(struct publication *publ); struct sk_buff *tipc_named_withdraw(struct publication *publ); void named_cluster_distribute(struct sk_buff *buf); -void tipc_named_node_up(u32 max_item_buf, u32 node); +void tipc_named_node_up(u32 dnode); void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); +void tipc_named_process_backlog(void); #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 9d7d37d9518..3a6a0a7c075 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -39,7 +39,6 @@ #include "name_table.h" #include "name_distr.h" #include "subscr.h" -#include "port.h" #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ @@ -262,8 +261,6 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq, /* Lower end overlaps existing entry => need an exact match */ if ((sseq->lower != lower) || (sseq->upper != upper)) { - pr_warn("Cannot publish {%u,%u,%u}, overlap error\n", - type, lower, upper); return NULL; } @@ -285,8 +282,6 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq, /* Fail if upper end overlaps into an existing entry */ if ((inspos < nseq->first_free) && (upper >= nseq->sseqs[inspos].lower)) { - pr_warn("Cannot publish {%u,%u,%u}, overlap error\n", - type, lower, upper); return NULL; } @@ -678,6 +673,8 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, if (likely(publ)) { table.local_publ_count++; buf = tipc_named_publish(publ); + /* Any pending external events? */ + tipc_named_process_backlog(); } write_unlock_bh(&tipc_nametbl_lock); @@ -699,6 +696,8 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key) if (likely(publ)) { table.local_publ_count--; buf = tipc_named_withdraw(publ); + /* Any pending external events? */ + tipc_named_process_backlog(); write_unlock_bh(&tipc_nametbl_lock); list_del_init(&publ->pport_list); kfree(publ); diff --git a/net/tipc/net.c b/net/tipc/net.c index f64375e7f99..93b9944a6a8 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -1,7 +1,7 @@ /* * net/tipc/net.c: TIPC network routing code * - * Copyright (c) 1995-2006, Ericsson AB + * Copyright (c) 1995-2006, 2014, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -38,7 +38,6 @@ #include "net.h" #include "name_distr.h" #include "subscr.h" -#include "port.h" #include "socket.h" #include "node.h" #include "config.h" @@ -104,67 +103,6 @@ * - A local spin_lock protecting the queue of subscriber events. */ -static void net_route_named_msg(struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - u32 dnode; - u32 dport; - - if (!msg_named(msg)) { - kfree_skb(buf); - return; - } - - dnode = addr_domain(msg_lookup_scope(msg)); - dport = tipc_nametbl_translate(msg_nametype(msg), msg_nameinst(msg), &dnode); - if (dport) { - msg_set_destnode(msg, dnode); - msg_set_destport(msg, dport); - tipc_net_route_msg(buf); - return; - } - tipc_reject_msg(buf, TIPC_ERR_NO_NAME); -} - -void tipc_net_route_msg(struct sk_buff *buf) -{ - struct tipc_msg *msg; - u32 dnode; - - if (!buf) - return; - msg = buf_msg(buf); - - /* Handle message for this node */ - dnode = msg_short(msg) ? tipc_own_addr : msg_destnode(msg); - if (tipc_in_scope(dnode, tipc_own_addr)) { - if (msg_isdata(msg)) { - if (msg_mcast(msg)) - tipc_port_mcast_rcv(buf, NULL); - else if (msg_destport(msg)) - tipc_sk_rcv(buf); - else - net_route_named_msg(buf); - return; - } - switch (msg_user(msg)) { - case NAME_DISTRIBUTOR: - tipc_named_rcv(buf); - break; - case CONN_MANAGER: - tipc_port_proto_rcv(buf); - break; - default: - kfree_skb(buf); - } - return; - } - - /* Handle message for another node */ - skb_trim(buf, msg_size(msg)); - tipc_link_xmit(buf, dnode, msg_link_selector(msg)); -} - int tipc_net_start(u32 addr) { char addr_string[16]; @@ -172,7 +110,7 @@ int tipc_net_start(u32 addr) tipc_own_addr = addr; tipc_named_reinit(); - tipc_port_reinit(); + tipc_sk_reinit(); res = tipc_bclink_init(); if (res) return res; diff --git a/net/tipc/net.h b/net/tipc/net.h index c6c2b46f7c2..59ef3388be2 100644 --- a/net/tipc/net.h +++ b/net/tipc/net.h @@ -37,8 +37,6 @@ #ifndef _TIPC_NET_H #define _TIPC_NET_H -void tipc_net_route_msg(struct sk_buff *buf); - int tipc_net_start(u32 addr); void tipc_net_stop(void); diff --git a/net/tipc/node.c b/net/tipc/node.c index 5b44c3041be..90cee4a6fce 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1,7 +1,7 @@ /* * net/tipc/node.c: TIPC node management routines * - * Copyright (c) 2000-2006, 2012 Ericsson AB + * Copyright (c) 2000-2006, 2012-2014, Ericsson AB * Copyright (c) 2005-2006, 2010-2014, Wind River Systems * All rights reserved. * @@ -38,6 +38,7 @@ #include "config.h" #include "node.h" #include "name_distr.h" +#include "socket.h" #define NODE_HTABLE_SIZE 512 @@ -50,6 +51,13 @@ static u32 tipc_num_nodes; static u32 tipc_num_links; static DEFINE_SPINLOCK(node_list_lock); +struct tipc_sock_conn { + u32 port; + u32 peer_port; + u32 peer_node; + struct list_head list; +}; + /* * A trivial power-of-two bitmask technique is used for speed, since this * operation is done for every incoming TIPC packet. The number of hash table @@ -100,6 +108,8 @@ struct tipc_node *tipc_node_create(u32 addr) INIT_HLIST_NODE(&n_ptr->hash); INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->nsub); + INIT_LIST_HEAD(&n_ptr->conn_sks); + __skb_queue_head_init(&n_ptr->waiting_sks); hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); @@ -136,6 +146,71 @@ void tipc_node_stop(void) spin_unlock_bh(&node_list_lock); } +int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port) +{ + struct tipc_node *node; + struct tipc_sock_conn *conn; + + if (in_own_node(dnode)) + return 0; + + node = tipc_node_find(dnode); + if (!node) { + pr_warn("Connecting sock to node 0x%x failed\n", dnode); + return -EHOSTUNREACH; + } + conn = kmalloc(sizeof(*conn), GFP_ATOMIC); + if (!conn) + return -EHOSTUNREACH; + conn->peer_node = dnode; + conn->port = port; + conn->peer_port = peer_port; + + tipc_node_lock(node); + list_add_tail(&conn->list, &node->conn_sks); + tipc_node_unlock(node); + return 0; +} + +void tipc_node_remove_conn(u32 dnode, u32 port) +{ + struct tipc_node *node; + struct tipc_sock_conn *conn, *safe; + + if (in_own_node(dnode)) + return; + + node = tipc_node_find(dnode); + if (!node) + return; + + tipc_node_lock(node); + list_for_each_entry_safe(conn, safe, &node->conn_sks, list) { + if (port != conn->port) + continue; + list_del(&conn->list); + kfree(conn); + } + tipc_node_unlock(node); +} + +void tipc_node_abort_sock_conns(struct list_head *conns) +{ + struct tipc_sock_conn *conn, *safe; + struct sk_buff *buf; + + list_for_each_entry_safe(conn, safe, conns, list) { + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + SHORT_H_SIZE, 0, tipc_own_addr, + conn->peer_node, conn->port, + conn->peer_port, TIPC_ERR_NO_NODE); + if (likely(buf)) + tipc_sk_rcv(buf); + list_del(&conn->list); + kfree(conn); + } +} + /** * tipc_node_link_up - handle addition of link * @@ -155,21 +230,25 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) if (!active[0]) { active[0] = active[1] = l_ptr; node_established_contact(n_ptr); - return; + goto exit; } if (l_ptr->priority < active[0]->priority) { pr_info("New link <%s> becomes standby\n", l_ptr->name); - return; + goto exit; } tipc_link_dup_queue_xmit(active[0], l_ptr); if (l_ptr->priority == active[0]->priority) { active[0] = l_ptr; - return; + goto exit; } pr_info("Old link <%s> becomes standby\n", active[0]->name); if (active[1] != active[0]) pr_info("Old link <%s> becomes standby\n", active[1]->name); active[0] = active[1] = l_ptr; +exit: + /* Leave room for changeover header when returning 'mtu' to users: */ + n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; + n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; } /** @@ -229,6 +308,19 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) tipc_link_failover_send_queue(l_ptr); else node_lost_contact(n_ptr); + + /* Leave room for changeover header when returning 'mtu' to users: */ + if (active[0]) { + n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; + n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; + return; + } + + /* Loopback link went down? No fragmentation needed from now on. */ + if (n_ptr->addr == tipc_own_addr) { + n_ptr->act_mtus[0] = MAX_MSG_SIZE; + n_ptr->act_mtus[1] = MAX_MSG_SIZE; + } } int tipc_node_active_links(struct tipc_node *n_ptr) @@ -457,32 +549,45 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) void tipc_node_unlock(struct tipc_node *node) { LIST_HEAD(nsub_list); - struct tipc_link *link; - int pkt_sz = 0; + LIST_HEAD(conn_sks); + struct sk_buff_head waiting_sks; u32 addr = 0; + unsigned int flags = node->action_flags; if (likely(!node->action_flags)) { spin_unlock_bh(&node->lock); return; } + __skb_queue_head_init(&waiting_sks); + if (node->action_flags & TIPC_WAKEUP_USERS) { + skb_queue_splice_init(&node->waiting_sks, &waiting_sks); + node->action_flags &= ~TIPC_WAKEUP_USERS; + } if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) { list_replace_init(&node->nsub, &nsub_list); + list_replace_init(&node->conn_sks, &conn_sks); node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; } if (node->action_flags & TIPC_NOTIFY_NODE_UP) { - link = node->active_links[0]; node->action_flags &= ~TIPC_NOTIFY_NODE_UP; - if (link) { - pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) * - ITEM_SIZE; - addr = node->addr; - } + addr = node->addr; } + node->action_flags &= ~TIPC_WAKEUP_BCAST_USERS; spin_unlock_bh(&node->lock); + while (!skb_queue_empty(&waiting_sks)) + tipc_sk_rcv(__skb_dequeue(&waiting_sks)); + + if (!list_empty(&conn_sks)) + tipc_node_abort_sock_conns(&conn_sks); + if (!list_empty(&nsub_list)) tipc_nodesub_notify(&nsub_list); - if (pkt_sz) - tipc_named_node_up(pkt_sz, addr); + + if (flags & TIPC_WAKEUP_BCAST_USERS) + tipc_bclink_wakeup_users(); + + if (addr) + tipc_named_node_up(addr); } diff --git a/net/tipc/node.h b/net/tipc/node.h index 9087063793f..67513c3c852 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -41,6 +41,7 @@ #include "addr.h" #include "net.h" #include "bearer.h" +#include "msg.h" /* * Out-of-range value for node signature @@ -57,7 +58,9 @@ enum { TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), TIPC_NOTIFY_NODE_DOWN = (1 << 3), - TIPC_NOTIFY_NODE_UP = (1 << 4) + TIPC_NOTIFY_NODE_UP = (1 << 4), + TIPC_WAKEUP_USERS = (1 << 5), + TIPC_WAKEUP_BCAST_USERS = (1 << 6) }; /** @@ -105,6 +108,7 @@ struct tipc_node { spinlock_t lock; struct hlist_node hash; struct tipc_link *active_links[2]; + u32 act_mtus[2]; struct tipc_link *links[MAX_BEARERS]; unsigned int action_flags; struct tipc_node_bclink bclink; @@ -113,6 +117,8 @@ struct tipc_node { int working_links; u32 signature; struct list_head nsub; + struct sk_buff_head waiting_sks; + struct list_head conn_sks; struct rcu_head rcu; }; @@ -131,6 +137,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len); void tipc_node_unlock(struct tipc_node *node); +int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port); +void tipc_node_remove_conn(u32 dnode, u32 port); static inline void tipc_node_lock(struct tipc_node *node) { @@ -143,4 +151,19 @@ static inline bool tipc_node_blocked(struct tipc_node *node) TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN)); } +static inline uint tipc_node_get_mtu(u32 addr, u32 selector) +{ + struct tipc_node *node; + u32 mtu; + + node = tipc_node_find(addr); + + if (likely(node)) + mtu = node->act_mtus[selector & 1]; + else + mtu = MAX_MSG_SIZE; + + return mtu; +} + #endif diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c index 7c59ab1d6ec..2d13eea8574 100644 --- a/net/tipc/node_subscr.c +++ b/net/tipc/node_subscr.c @@ -84,11 +84,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub) void tipc_nodesub_notify(struct list_head *nsub_list) { struct tipc_node_subscr *ns, *safe; + net_ev_handler handle_node_down; list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) { - if (ns->handle_node_down) { - ns->handle_node_down(ns->usr_handle); + handle_node_down = ns->handle_node_down; + if (handle_node_down) { ns->handle_node_down = NULL; + handle_node_down(ns->usr_handle); } } } diff --git a/net/tipc/port.c b/net/tipc/port.c deleted file mode 100644 index 5fd7acce01e..00000000000 --- a/net/tipc/port.c +++ /dev/null @@ -1,898 +0,0 @@ -/* - * net/tipc/port.c: TIPC port code - * - * Copyright (c) 1992-2007, 2014, Ericsson AB - * Copyright (c) 2004-2008, 2010-2013, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "core.h" -#include "config.h" -#include "port.h" -#include "name_table.h" -#include "socket.h" - -/* Connection management: */ -#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ -#define CONFIRMED 0 -#define PROBING 1 - -#define MAX_REJECT_SIZE 1024 - -DEFINE_SPINLOCK(tipc_port_list_lock); - -static LIST_HEAD(ports); -static void port_handle_node_down(unsigned long ref); -static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err); -static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err); -static void port_timeout(unsigned long ref); - -/** - * tipc_port_peer_msg - verify message was sent by connected port's peer - * - * Handles cases where the node's network address has changed from - * the default of <0.0.0> to its configured setting. - */ -int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg) -{ - u32 peernode; - u32 orignode; - - if (msg_origport(msg) != tipc_port_peerport(p_ptr)) - return 0; - - orignode = msg_orignode(msg); - peernode = tipc_port_peernode(p_ptr); - return (orignode == peernode) || - (!orignode && (peernode == tipc_own_addr)) || - (!peernode && (orignode == tipc_own_addr)); -} - -/** - * tipc_port_mcast_xmit - send a multicast message to local and remote - * destinations - */ -int tipc_port_mcast_xmit(struct tipc_port *oport, - struct tipc_name_seq const *seq, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *hdr; - struct sk_buff *buf; - struct sk_buff *ibuf = NULL; - struct tipc_port_list dports = {0, NULL, }; - int ext_targets; - int res; - - /* Create multicast message */ - hdr = &oport->phdr; - msg_set_type(hdr, TIPC_MCAST_MSG); - msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); - msg_set_destport(hdr, 0); - msg_set_destnode(hdr, 0); - msg_set_nametype(hdr, seq->type); - msg_set_namelower(hdr, seq->lower); - msg_set_nameupper(hdr, seq->upper); - msg_set_hdr_sz(hdr, MCAST_H_SIZE); - res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (unlikely(!buf)) - return res; - - /* Figure out where to send multicast message */ - ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, - TIPC_NODE_SCOPE, &dports); - - /* Send message to destinations (duplicate it only if necessary) */ - if (ext_targets) { - if (dports.count != 0) { - ibuf = skb_copy(buf, GFP_ATOMIC); - if (ibuf == NULL) { - tipc_port_list_free(&dports); - kfree_skb(buf); - return -ENOMEM; - } - } - res = tipc_bclink_xmit(buf); - if ((res < 0) && (dports.count != 0)) - kfree_skb(ibuf); - } else { - ibuf = buf; - } - - if (res >= 0) { - if (ibuf) - tipc_port_mcast_rcv(ibuf, &dports); - } else { - tipc_port_list_free(&dports); - } - return res; -} - -/** - * tipc_port_mcast_rcv - deliver multicast message to all destination ports - * - * If there is no port list, perform a lookup to create one - */ -void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) -{ - struct tipc_msg *msg; - struct tipc_port_list dports = {0, NULL, }; - struct tipc_port_list *item = dp; - int cnt = 0; - - msg = buf_msg(buf); - - /* Create destination port list, if one wasn't supplied */ - if (dp == NULL) { - tipc_nametbl_mc_translate(msg_nametype(msg), - msg_namelower(msg), - msg_nameupper(msg), - TIPC_CLUSTER_SCOPE, - &dports); - item = dp = &dports; - } - - /* Deliver a copy of message to each destination port */ - if (dp->count != 0) { - msg_set_destnode(msg, tipc_own_addr); - if (dp->count == 1) { - msg_set_destport(msg, dp->ports[0]); - tipc_sk_rcv(buf); - tipc_port_list_free(dp); - return; - } - for (; cnt < dp->count; cnt++) { - int index = cnt % PLSIZE; - struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); - - if (b == NULL) { - pr_warn("Unable to deliver multicast message(s)\n"); - goto exit; - } - if ((index == 0) && (cnt != 0)) - item = item->next; - msg_set_destport(buf_msg(b), item->ports[index]); - tipc_sk_rcv(b); - } - } -exit: - kfree_skb(buf); - tipc_port_list_free(dp); -} - - -void tipc_port_wakeup(struct tipc_port *port) -{ - tipc_sock_wakeup(tipc_port_to_sock(port)); -} - -/* tipc_port_init - intiate TIPC port and lock it - * - * Returns obtained reference if initialization is successful, zero otherwise - */ -u32 tipc_port_init(struct tipc_port *p_ptr, - const unsigned int importance) -{ - struct tipc_msg *msg; - u32 ref; - - ref = tipc_ref_acquire(p_ptr, &p_ptr->lock); - if (!ref) { - pr_warn("Port registration failed, ref. table exhausted\n"); - return 0; - } - - p_ptr->max_pkt = MAX_PKT_DEFAULT; - p_ptr->ref = ref; - INIT_LIST_HEAD(&p_ptr->wait_list); - INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); - k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); - INIT_LIST_HEAD(&p_ptr->publications); - INIT_LIST_HEAD(&p_ptr->port_list); - - /* - * Must hold port list lock while initializing message header template - * to ensure a change to node's own network address doesn't result - * in template containing out-dated network address information - */ - spin_lock_bh(&tipc_port_list_lock); - msg = &p_ptr->phdr; - tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0); - msg_set_origport(msg, ref); - list_add_tail(&p_ptr->port_list, &ports); - spin_unlock_bh(&tipc_port_list_lock); - return ref; -} - -void tipc_port_destroy(struct tipc_port *p_ptr) -{ - struct sk_buff *buf = NULL; - - tipc_withdraw(p_ptr, 0, NULL); - - spin_lock_bh(p_ptr->lock); - tipc_ref_discard(p_ptr->ref); - spin_unlock_bh(p_ptr->lock); - - k_cancel_timer(&p_ptr->timer); - if (p_ptr->connected) { - buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); - tipc_nodesub_unsubscribe(&p_ptr->subscription); - } - - spin_lock_bh(&tipc_port_list_lock); - list_del(&p_ptr->port_list); - list_del(&p_ptr->wait_list); - spin_unlock_bh(&tipc_port_list_lock); - k_term_timer(&p_ptr->timer); - tipc_net_route_msg(buf); -} - -/* - * port_build_proto_msg(): create connection protocol message for port - * - * On entry the port must be locked and connected. - */ -static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr, - u32 type, u32 ack) -{ - struct sk_buff *buf; - struct tipc_msg *msg; - - buf = tipc_buf_acquire(INT_H_SIZE); - if (buf) { - msg = buf_msg(buf); - tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE, - tipc_port_peernode(p_ptr)); - msg_set_destport(msg, tipc_port_peerport(p_ptr)); - msg_set_origport(msg, p_ptr->ref); - msg_set_msgcnt(msg, ack); - } - return buf; -} - -int tipc_reject_msg(struct sk_buff *buf, u32 err) -{ - struct tipc_msg *msg = buf_msg(buf); - struct sk_buff *rbuf; - struct tipc_msg *rmsg; - int hdr_sz; - u32 imp; - u32 data_sz = msg_data_sz(msg); - u32 src_node; - u32 rmsg_sz; - - /* discard rejected message if it shouldn't be returned to sender */ - if (WARN(!msg_isdata(msg), - "attempt to reject message with user=%u", msg_user(msg))) { - dump_stack(); - goto exit; - } - if (msg_errcode(msg) || msg_dest_droppable(msg)) - goto exit; - - /* - * construct returned message by copying rejected message header and - * data (or subset), then updating header fields that need adjusting - */ - hdr_sz = msg_hdr_sz(msg); - rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE); - - rbuf = tipc_buf_acquire(rmsg_sz); - if (rbuf == NULL) - goto exit; - - rmsg = buf_msg(rbuf); - skb_copy_to_linear_data(rbuf, msg, rmsg_sz); - - if (msg_connected(rmsg)) { - imp = msg_importance(rmsg); - if (imp < TIPC_CRITICAL_IMPORTANCE) - msg_set_importance(rmsg, ++imp); - } - msg_set_non_seq(rmsg, 0); - msg_set_size(rmsg, rmsg_sz); - msg_set_errcode(rmsg, err); - msg_set_prevnode(rmsg, tipc_own_addr); - msg_swap_words(rmsg, 4, 5); - if (!msg_short(rmsg)) - msg_swap_words(rmsg, 6, 7); - - /* send self-abort message when rejecting on a connected port */ - if (msg_connected(msg)) { - struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); - - if (p_ptr) { - struct sk_buff *abuf = NULL; - - if (p_ptr->connected) - abuf = port_build_self_abort_msg(p_ptr, err); - tipc_port_unlock(p_ptr); - tipc_net_route_msg(abuf); - } - } - - /* send returned message & dispose of rejected message */ - src_node = msg_prevnode(msg); - if (in_own_node(src_node)) - tipc_sk_rcv(rbuf); - else - tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg)); -exit: - kfree_skb(buf); - return data_sz; -} - -int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr, - struct iovec const *msg_sect, unsigned int len, - int err) -{ - struct sk_buff *buf; - int res; - - res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (!buf) - return res; - - return tipc_reject_msg(buf, err); -} - -static void port_timeout(unsigned long ref) -{ - struct tipc_port *p_ptr = tipc_port_lock(ref); - struct sk_buff *buf = NULL; - - if (!p_ptr) - return; - - if (!p_ptr->connected) { - tipc_port_unlock(p_ptr); - return; - } - - /* Last probe answered ? */ - if (p_ptr->probing_state == PROBING) { - buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); - } else { - buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0); - p_ptr->probing_state = PROBING; - k_start_timer(&p_ptr->timer, p_ptr->probing_interval); - } - tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); -} - - -static void port_handle_node_down(unsigned long ref) -{ - struct tipc_port *p_ptr = tipc_port_lock(ref); - struct sk_buff *buf = NULL; - - if (!p_ptr) - return; - buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); - tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); -} - - -static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err) -{ - struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err); - - if (buf) { - struct tipc_msg *msg = buf_msg(buf); - msg_swap_words(msg, 4, 5); - msg_swap_words(msg, 6, 7); - } - return buf; -} - - -static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err) -{ - struct sk_buff *buf; - struct tipc_msg *msg; - u32 imp; - - if (!p_ptr->connected) - return NULL; - - buf = tipc_buf_acquire(BASIC_H_SIZE); - if (buf) { - msg = buf_msg(buf); - memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE); - msg_set_hdr_sz(msg, BASIC_H_SIZE); - msg_set_size(msg, BASIC_H_SIZE); - imp = msg_importance(msg); - if (imp < TIPC_CRITICAL_IMPORTANCE) - msg_set_importance(msg, ++imp); - msg_set_errcode(msg, err); - } - return buf; -} - -void tipc_port_proto_rcv(struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - struct tipc_port *p_ptr; - struct sk_buff *r_buf = NULL; - u32 destport = msg_destport(msg); - int wakeable; - - /* Validate connection */ - p_ptr = tipc_port_lock(destport); - if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) { - r_buf = tipc_buf_acquire(BASIC_H_SIZE); - if (r_buf) { - msg = buf_msg(r_buf); - tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG, - BASIC_H_SIZE, msg_orignode(msg)); - msg_set_errcode(msg, TIPC_ERR_NO_PORT); - msg_set_origport(msg, destport); - msg_set_destport(msg, msg_origport(msg)); - } - if (p_ptr) - tipc_port_unlock(p_ptr); - goto exit; - } - - /* Process protocol message sent by peer */ - switch (msg_type(msg)) { - case CONN_ACK: - wakeable = tipc_port_congested(p_ptr) && p_ptr->congested; - p_ptr->acked += msg_msgcnt(msg); - if (!tipc_port_congested(p_ptr)) { - p_ptr->congested = 0; - if (wakeable) - tipc_port_wakeup(p_ptr); - } - break; - case CONN_PROBE: - r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0); - break; - default: - /* CONN_PROBE_REPLY or unrecognized - no action required */ - break; - } - p_ptr->probing_state = CONFIRMED; - tipc_port_unlock(p_ptr); -exit: - tipc_net_route_msg(r_buf); - kfree_skb(buf); -} - -static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id) -{ - struct publication *publ; - int ret; - - if (full_id) - ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", - tipc_zone(tipc_own_addr), - tipc_cluster(tipc_own_addr), - tipc_node(tipc_own_addr), p_ptr->ref); - else - ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref); - - if (p_ptr->connected) { - u32 dport = tipc_port_peerport(p_ptr); - u32 destnode = tipc_port_peernode(p_ptr); - - ret += tipc_snprintf(buf + ret, len - ret, - " connected to <%u.%u.%u:%u>", - tipc_zone(destnode), - tipc_cluster(destnode), - tipc_node(destnode), dport); - if (p_ptr->conn_type != 0) - ret += tipc_snprintf(buf + ret, len - ret, - " via {%u,%u}", p_ptr->conn_type, - p_ptr->conn_instance); - } else if (p_ptr->published) { - ret += tipc_snprintf(buf + ret, len - ret, " bound to"); - list_for_each_entry(publ, &p_ptr->publications, pport_list) { - if (publ->lower == publ->upper) - ret += tipc_snprintf(buf + ret, len - ret, - " {%u,%u}", publ->type, - publ->lower); - else - ret += tipc_snprintf(buf + ret, len - ret, - " {%u,%u,%u}", publ->type, - publ->lower, publ->upper); - } - } - ret += tipc_snprintf(buf + ret, len - ret, "\n"); - return ret; -} - -struct sk_buff *tipc_port_get_ports(void) -{ - struct sk_buff *buf; - struct tlv_desc *rep_tlv; - char *pb; - int pb_len; - struct tipc_port *p_ptr; - int str_len = 0; - - buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); - if (!buf) - return NULL; - rep_tlv = (struct tlv_desc *)buf->data; - pb = TLV_DATA(rep_tlv); - pb_len = ULTRA_STRING_MAX_LEN; - - spin_lock_bh(&tipc_port_list_lock); - list_for_each_entry(p_ptr, &ports, port_list) { - spin_lock_bh(p_ptr->lock); - str_len += port_print(p_ptr, pb, pb_len, 0); - spin_unlock_bh(p_ptr->lock); - } - spin_unlock_bh(&tipc_port_list_lock); - str_len += 1; /* for "\0" */ - skb_put(buf, TLV_SPACE(str_len)); - TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); - - return buf; -} - -void tipc_port_reinit(void) -{ - struct tipc_port *p_ptr; - struct tipc_msg *msg; - - spin_lock_bh(&tipc_port_list_lock); - list_for_each_entry(p_ptr, &ports, port_list) { - msg = &p_ptr->phdr; - msg_set_prevnode(msg, tipc_own_addr); - msg_set_orignode(msg, tipc_own_addr); - } - spin_unlock_bh(&tipc_port_list_lock); -} - -void tipc_acknowledge(u32 ref, u32 ack) -{ - struct tipc_port *p_ptr; - struct sk_buff *buf = NULL; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return; - if (p_ptr->connected) { - p_ptr->conn_unacked -= ack; - buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); - } - tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); -} - -int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, - struct tipc_name_seq const *seq) -{ - struct publication *publ; - u32 key; - - if (p_ptr->connected) - return -EINVAL; - key = p_ptr->ref + p_ptr->pub_count + 1; - if (key == p_ptr->ref) - return -EADDRINUSE; - - publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, - scope, p_ptr->ref, key); - if (publ) { - list_add(&publ->pport_list, &p_ptr->publications); - p_ptr->pub_count++; - p_ptr->published = 1; - return 0; - } - return -EINVAL; -} - -int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope, - struct tipc_name_seq const *seq) -{ - struct publication *publ; - struct publication *tpubl; - int res = -EINVAL; - - if (!seq) { - list_for_each_entry_safe(publ, tpubl, - &p_ptr->publications, pport_list) { - tipc_nametbl_withdraw(publ->type, publ->lower, - publ->ref, publ->key); - } - res = 0; - } else { - list_for_each_entry_safe(publ, tpubl, - &p_ptr->publications, pport_list) { - if (publ->scope != scope) - continue; - if (publ->type != seq->type) - continue; - if (publ->lower != seq->lower) - continue; - if (publ->upper != seq->upper) - break; - tipc_nametbl_withdraw(publ->type, publ->lower, - publ->ref, publ->key); - res = 0; - break; - } - } - if (list_empty(&p_ptr->publications)) - p_ptr->published = 0; - return res; -} - -int tipc_port_connect(u32 ref, struct tipc_portid const *peer) -{ - struct tipc_port *p_ptr; - int res; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - res = __tipc_port_connect(ref, p_ptr, peer); - tipc_port_unlock(p_ptr); - return res; -} - -/* - * __tipc_port_connect - connect to a remote peer - * - * Port must be locked. - */ -int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, - struct tipc_portid const *peer) -{ - struct tipc_msg *msg; - int res = -EINVAL; - - if (p_ptr->published || p_ptr->connected) - goto exit; - if (!peer->ref) - goto exit; - - msg = &p_ptr->phdr; - msg_set_destnode(msg, peer->node); - msg_set_destport(msg, peer->ref); - msg_set_type(msg, TIPC_CONN_MSG); - msg_set_lookup_scope(msg, 0); - msg_set_hdr_sz(msg, SHORT_H_SIZE); - - p_ptr->probing_interval = PROBING_INTERVAL; - p_ptr->probing_state = CONFIRMED; - p_ptr->connected = 1; - k_start_timer(&p_ptr->timer, p_ptr->probing_interval); - - tipc_nodesub_subscribe(&p_ptr->subscription, peer->node, - (void *)(unsigned long)ref, - (net_ev_handler)port_handle_node_down); - res = 0; -exit: - p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); - return res; -} - -/* - * __tipc_disconnect - disconnect port from peer - * - * Port must be locked. - */ -int __tipc_port_disconnect(struct tipc_port *tp_ptr) -{ - if (tp_ptr->connected) { - tp_ptr->connected = 0; - /* let timer expire on it's own to avoid deadlock! */ - tipc_nodesub_unsubscribe(&tp_ptr->subscription); - return 0; - } - - return -ENOTCONN; -} - -/* - * tipc_port_disconnect(): Disconnect port form peer. - * This is a node local operation. - */ -int tipc_port_disconnect(u32 ref) -{ - struct tipc_port *p_ptr; - int res; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - res = __tipc_port_disconnect(p_ptr); - tipc_port_unlock(p_ptr); - return res; -} - -/* - * tipc_port_shutdown(): Send a SHUTDOWN msg to peer and disconnect - */ -int tipc_port_shutdown(u32 ref) -{ - struct tipc_port *p_ptr; - struct sk_buff *buf = NULL; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - - buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); - tipc_port_unlock(p_ptr); - tipc_net_route_msg(buf); - return tipc_port_disconnect(ref); -} - -/* - * tipc_port_iovec_rcv: Concatenate and deliver sectioned - * message for this node. - */ -static int tipc_port_iovec_rcv(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len) -{ - struct sk_buff *buf; - int res; - - res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf); - if (likely(buf)) - tipc_sk_rcv(buf); - return res; -} - -/** - * tipc_send - send message sections on connection - */ -int tipc_send(struct tipc_port *p_ptr, - struct iovec const *msg_sect, - unsigned int len) -{ - u32 destnode; - int res; - - if (!p_ptr->connected) - return -EINVAL; - - p_ptr->congested = 1; - if (!tipc_port_congested(p_ptr)) { - destnode = tipc_port_peernode(p_ptr); - if (likely(!in_own_node(destnode))) - res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, - destnode); - else - res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); - - if (likely(res != -ELINKCONG)) { - p_ptr->congested = 0; - if (res > 0) - p_ptr->sent++; - return res; - } - } - if (tipc_port_unreliable(p_ptr)) { - p_ptr->congested = 0; - return len; - } - return -ELINKCONG; -} - -/** - * tipc_send2name - send message sections to port name - */ -int tipc_send2name(struct tipc_port *p_ptr, - struct tipc_name const *name, - unsigned int domain, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *msg; - u32 destnode = domain; - u32 destport; - int res; - - if (p_ptr->connected) - return -EINVAL; - - msg = &p_ptr->phdr; - msg_set_type(msg, TIPC_NAMED_MSG); - msg_set_hdr_sz(msg, NAMED_H_SIZE); - msg_set_nametype(msg, name->type); - msg_set_nameinst(msg, name->instance); - msg_set_lookup_scope(msg, tipc_addr_scope(domain)); - destport = tipc_nametbl_translate(name->type, name->instance, &destnode); - msg_set_destnode(msg, destnode); - msg_set_destport(msg, destport); - - if (likely(destport || destnode)) { - if (likely(in_own_node(destnode))) - res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); - else if (tipc_own_addr) - res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, - destnode); - else - res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, - len, TIPC_ERR_NO_NODE); - if (likely(res != -ELINKCONG)) { - if (res > 0) - p_ptr->sent++; - return res; - } - if (tipc_port_unreliable(p_ptr)) - return len; - - return -ELINKCONG; - } - return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, - TIPC_ERR_NO_NAME); -} - -/** - * tipc_send2port - send message sections to port identity - */ -int tipc_send2port(struct tipc_port *p_ptr, - struct tipc_portid const *dest, - struct iovec const *msg_sect, - unsigned int len) -{ - struct tipc_msg *msg; - int res; - - if (p_ptr->connected) - return -EINVAL; - - msg = &p_ptr->phdr; - msg_set_type(msg, TIPC_DIRECT_MSG); - msg_set_lookup_scope(msg, 0); - msg_set_destnode(msg, dest->node); - msg_set_destport(msg, dest->ref); - msg_set_hdr_sz(msg, BASIC_H_SIZE); - - if (in_own_node(dest->node)) - res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); - else if (tipc_own_addr) - res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, - dest->node); - else - res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, - TIPC_ERR_NO_NODE); - if (likely(res != -ELINKCONG)) { - if (res > 0) - p_ptr->sent++; - return res; - } - if (tipc_port_unreliable(p_ptr)) - return len; - - return -ELINKCONG; -} diff --git a/net/tipc/port.h b/net/tipc/port.h deleted file mode 100644 index cf4ca5b1d9a..00000000000 --- a/net/tipc/port.h +++ /dev/null @@ -1,237 +0,0 @@ -/* - * net/tipc/port.h: Include file for TIPC port code - * - * Copyright (c) 1994-2007, 2014, Ericsson AB - * Copyright (c) 2004-2007, 2010-2013, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _TIPC_PORT_H -#define _TIPC_PORT_H - -#include "ref.h" -#include "net.h" -#include "msg.h" -#include "node_subscr.h" - -#define TIPC_CONNACK_INTV 256 -#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) -#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ - SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) - -/** - * struct tipc_port - TIPC port structure - * @lock: pointer to spinlock for controlling access to port - * @connected: non-zero if port is currently connected to a peer port - * @conn_type: TIPC type used when connection was established - * @conn_instance: TIPC instance used when connection was established - * @conn_unacked: number of unacknowledged messages received from peer port - * @published: non-zero if port has one or more associated names - * @congested: non-zero if cannot send because of link or port congestion - * @max_pkt: maximum packet size "hint" used when building messages sent by port - * @ref: unique reference to port in TIPC object registry - * @phdr: preformatted message header used when sending messages - * @port_list: adjacent ports in TIPC's global list of ports - * @wait_list: adjacent ports in list of ports waiting on link congestion - * @waiting_pkts: - * @sent: # of non-empty messages sent by port - * @acked: # of non-empty message acknowledgements from connected port's peer - * @publications: list of publications for port - * @pub_count: total # of publications port has made during its lifetime - * @probing_state: - * @probing_interval: - * @timer_ref: - * @subscription: "node down" subscription used to terminate failed connections - */ -struct tipc_port { - spinlock_t *lock; - int connected; - u32 conn_type; - u32 conn_instance; - u32 conn_unacked; - int published; - u32 congested; - u32 max_pkt; - u32 ref; - struct tipc_msg phdr; - struct list_head port_list; - struct list_head wait_list; - u32 waiting_pkts; - u32 sent; - u32 acked; - struct list_head publications; - u32 pub_count; - u32 probing_state; - u32 probing_interval; - struct timer_list timer; - struct tipc_node_subscr subscription; -}; - -extern spinlock_t tipc_port_list_lock; -struct tipc_port_list; - -/* - * TIPC port manipulation routines - */ -u32 tipc_port_init(struct tipc_port *p_ptr, - const unsigned int importance); - -int tipc_reject_msg(struct sk_buff *buf, u32 err); - -void tipc_acknowledge(u32 port_ref, u32 ack); - -void tipc_port_destroy(struct tipc_port *p_ptr); - -int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, - struct tipc_name_seq const *name_seq); - -int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope, - struct tipc_name_seq const *name_seq); - -int tipc_port_connect(u32 portref, struct tipc_portid const *port); - -int tipc_port_disconnect(u32 portref); - -int tipc_port_shutdown(u32 ref); - -void tipc_port_wakeup(struct tipc_port *port); - -/* - * The following routines require that the port be locked on entry - */ -int __tipc_port_disconnect(struct tipc_port *tp_ptr); -int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, - struct tipc_portid const *peer); -int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); - -/* - * TIPC messaging routines - */ - -int tipc_send(struct tipc_port *port, - struct iovec const *msg_sect, - unsigned int len); - -int tipc_send2name(struct tipc_port *port, - struct tipc_name const *name, - u32 domain, - struct iovec const *msg_sect, - unsigned int len); - -int tipc_send2port(struct tipc_port *port, - struct tipc_portid const *dest, - struct iovec const *msg_sect, - unsigned int len); - -int tipc_port_mcast_xmit(struct tipc_port *port, - struct tipc_name_seq const *seq, - struct iovec const *msg, - unsigned int len); - -int tipc_port_iovec_reject(struct tipc_port *p_ptr, - struct tipc_msg *hdr, - struct iovec const *msg_sect, - unsigned int len, - int err); - -struct sk_buff *tipc_port_get_ports(void); -void tipc_port_proto_rcv(struct sk_buff *buf); -void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp); -void tipc_port_reinit(void); - -/** - * tipc_port_lock - lock port instance referred to and return its pointer - */ -static inline struct tipc_port *tipc_port_lock(u32 ref) -{ - return (struct tipc_port *)tipc_ref_lock(ref); -} - -/** - * tipc_port_unlock - unlock a port instance - * - * Can use pointer instead of tipc_ref_unlock() since port is already locked. - */ -static inline void tipc_port_unlock(struct tipc_port *p_ptr) -{ - spin_unlock_bh(p_ptr->lock); -} - -static inline int tipc_port_congested(struct tipc_port *p_ptr) -{ - return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN); -} - - -static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) -{ - return msg_destnode(&p_ptr->phdr); -} - -static inline u32 tipc_port_peerport(struct tipc_port *p_ptr) -{ - return msg_destport(&p_ptr->phdr); -} - -static inline bool tipc_port_unreliable(struct tipc_port *port) -{ - return msg_src_droppable(&port->phdr) != 0; -} - -static inline void tipc_port_set_unreliable(struct tipc_port *port, - bool unreliable) -{ - msg_set_src_droppable(&port->phdr, unreliable ? 1 : 0); -} - -static inline bool tipc_port_unreturnable(struct tipc_port *port) -{ - return msg_dest_droppable(&port->phdr) != 0; -} - -static inline void tipc_port_set_unreturnable(struct tipc_port *port, - bool unreturnable) -{ - msg_set_dest_droppable(&port->phdr, unreturnable ? 1 : 0); -} - - -static inline int tipc_port_importance(struct tipc_port *port) -{ - return msg_importance(&port->phdr); -} - -static inline void tipc_port_set_importance(struct tipc_port *port, int imp) -{ - msg_set_importance(&port->phdr, (u32)imp); -} - -#endif diff --git a/net/tipc/ref.c b/net/tipc/ref.c deleted file mode 100644 index 3d4ecd754ee..00000000000 --- a/net/tipc/ref.c +++ /dev/null @@ -1,266 +0,0 @@ -/* - * net/tipc/ref.c: TIPC object registry code - * - * Copyright (c) 1991-2006, Ericsson AB - * Copyright (c) 2004-2007, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "core.h" -#include "ref.h" - -/** - * struct reference - TIPC object reference entry - * @object: pointer to object associated with reference entry - * @lock: spinlock controlling access to object - * @ref: reference value for object (combines instance & array index info) - */ -struct reference { - void *object; - spinlock_t lock; - u32 ref; -}; - -/** - * struct tipc_ref_table - table of TIPC object reference entries - * @entries: pointer to array of reference entries - * @capacity: array index of first unusable entry - * @init_point: array index of first uninitialized entry - * @first_free: array index of first unused object reference entry - * @last_free: array index of last unused object reference entry - * @index_mask: bitmask for array index portion of reference values - * @start_mask: initial value for instance value portion of reference values - */ -struct ref_table { - struct reference *entries; - u32 capacity; - u32 init_point; - u32 first_free; - u32 last_free; - u32 index_mask; - u32 start_mask; -}; - -/* - * Object reference table consists of 2**N entries. - * - * State Object ptr Reference - * ----- ---------- --------- - * In use non-NULL XXXX|own index - * (XXXX changes each time entry is acquired) - * Free NULL YYYY|next free index - * (YYYY is one more than last used XXXX) - * Uninitialized NULL 0 - * - * Entry 0 is not used; this allows index 0 to denote the end of the free list. - * - * Note that a reference value of 0 does not necessarily indicate that an - * entry is uninitialized, since the last entry in the free list could also - * have a reference value of 0 (although this is unlikely). - */ - -static struct ref_table tipc_ref_table; - -static DEFINE_SPINLOCK(ref_table_lock); - -/** - * tipc_ref_table_init - create reference table for objects - */ -int tipc_ref_table_init(u32 requested_size, u32 start) -{ - struct reference *table; - u32 actual_size; - - /* account for unused entry, then round up size to a power of 2 */ - - requested_size++; - for (actual_size = 16; actual_size < requested_size; actual_size <<= 1) - /* do nothing */ ; - - /* allocate table & mark all entries as uninitialized */ - table = vzalloc(actual_size * sizeof(struct reference)); - if (table == NULL) - return -ENOMEM; - - tipc_ref_table.entries = table; - tipc_ref_table.capacity = requested_size; - tipc_ref_table.init_point = 1; - tipc_ref_table.first_free = 0; - tipc_ref_table.last_free = 0; - tipc_ref_table.index_mask = actual_size - 1; - tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask; - - return 0; -} - -/** - * tipc_ref_table_stop - destroy reference table for objects - */ -void tipc_ref_table_stop(void) -{ - vfree(tipc_ref_table.entries); - tipc_ref_table.entries = NULL; -} - -/** - * tipc_ref_acquire - create reference to an object - * - * Register an object pointer in reference table and lock the object. - * Returns a unique reference value that is used from then on to retrieve the - * object pointer, or to determine that the object has been deregistered. - * - * Note: The object is returned in the locked state so that the caller can - * register a partially initialized object, without running the risk that - * the object will be accessed before initialization is complete. - */ -u32 tipc_ref_acquire(void *object, spinlock_t **lock) -{ - u32 index; - u32 index_mask; - u32 next_plus_upper; - u32 ref; - struct reference *entry = NULL; - - if (!object) { - pr_err("Attempt to acquire ref. to non-existent obj\n"); - return 0; - } - if (!tipc_ref_table.entries) { - pr_err("Ref. table not found in acquisition attempt\n"); - return 0; - } - - /* take a free entry, if available; otherwise initialize a new entry */ - spin_lock_bh(&ref_table_lock); - if (tipc_ref_table.first_free) { - index = tipc_ref_table.first_free; - entry = &(tipc_ref_table.entries[index]); - index_mask = tipc_ref_table.index_mask; - next_plus_upper = entry->ref; - tipc_ref_table.first_free = next_plus_upper & index_mask; - ref = (next_plus_upper & ~index_mask) + index; - } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) { - index = tipc_ref_table.init_point++; - entry = &(tipc_ref_table.entries[index]); - spin_lock_init(&entry->lock); - ref = tipc_ref_table.start_mask + index; - } else { - ref = 0; - } - spin_unlock_bh(&ref_table_lock); - - /* - * Grab the lock so no one else can modify this entry - * While we assign its ref value & object pointer - */ - if (entry) { - spin_lock_bh(&entry->lock); - entry->ref = ref; - entry->object = object; - *lock = &entry->lock; - /* - * keep it locked, the caller is responsible - * for unlocking this when they're done with it - */ - } - - return ref; -} - -/** - * tipc_ref_discard - invalidate references to an object - * - * Disallow future references to an object and free up the entry for re-use. - * Note: The entry's spin_lock may still be busy after discard - */ -void tipc_ref_discard(u32 ref) -{ - struct reference *entry; - u32 index; - u32 index_mask; - - if (!tipc_ref_table.entries) { - pr_err("Ref. table not found during discard attempt\n"); - return; - } - - index_mask = tipc_ref_table.index_mask; - index = ref & index_mask; - entry = &(tipc_ref_table.entries[index]); - - spin_lock_bh(&ref_table_lock); - - if (!entry->object) { - pr_err("Attempt to discard ref. to non-existent obj\n"); - goto exit; - } - if (entry->ref != ref) { - pr_err("Attempt to discard non-existent reference\n"); - goto exit; - } - - /* - * mark entry as unused; increment instance part of entry's reference - * to invalidate any subsequent references - */ - entry->object = NULL; - entry->ref = (ref & ~index_mask) + (index_mask + 1); - - /* append entry to free entry list */ - if (tipc_ref_table.first_free == 0) - tipc_ref_table.first_free = index; - else - tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index; - tipc_ref_table.last_free = index; - -exit: - spin_unlock_bh(&ref_table_lock); -} - -/** - * tipc_ref_lock - lock referenced object and return pointer to it - */ -void *tipc_ref_lock(u32 ref) -{ - if (likely(tipc_ref_table.entries)) { - struct reference *entry; - - entry = &tipc_ref_table.entries[ref & - tipc_ref_table.index_mask]; - if (likely(entry->ref != 0)) { - spin_lock_bh(&entry->lock); - if (likely((entry->ref == ref) && (entry->object))) - return entry->object; - spin_unlock_bh(&entry->lock); - } - } - return NULL; -} diff --git a/net/tipc/ref.h b/net/tipc/ref.h deleted file mode 100644 index d01aa1df63b..00000000000 --- a/net/tipc/ref.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * net/tipc/ref.h: Include file for TIPC object registry code - * - * Copyright (c) 1991-2006, Ericsson AB - * Copyright (c) 2005-2006, Wind River Systems - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _TIPC_REF_H -#define _TIPC_REF_H - -int tipc_ref_table_init(u32 requested_size, u32 start); -void tipc_ref_table_stop(void); - -u32 tipc_ref_acquire(void *object, spinlock_t **lock); -void tipc_ref_discard(u32 ref); - -void *tipc_ref_lock(u32 ref); - -#endif diff --git a/net/tipc/socket.c b/net/tipc/socket.c index ef0475568f9..75275c5cf92 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -35,21 +35,84 @@ */ #include "core.h" -#include "port.h" +#include "name_table.h" #include "node.h" - +#include "link.h" #include <linux/export.h> +#include "config.h" +#include "socket.h" #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ +#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ +#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */ +#define TIPC_FWD_MSG 1 +#define TIPC_CONN_OK 0 +#define TIPC_CONN_PROBING 1 + +/** + * struct tipc_sock - TIPC socket structure + * @sk: socket - interacts with 'port' and with user via the socket API + * @connected: non-zero if port is currently connected to a peer port + * @conn_type: TIPC type used when connection was established + * @conn_instance: TIPC instance used when connection was established + * @published: non-zero if port has one or more associated names + * @max_pkt: maximum packet size "hint" used when building messages sent by port + * @ref: unique reference to port in TIPC object registry + * @phdr: preformatted message header used when sending messages + * @port_list: adjacent ports in TIPC's global list of ports + * @publications: list of publications for port + * @pub_count: total # of publications port has made during its lifetime + * @probing_state: + * @probing_interval: + * @timer: + * @port: port - interacts with 'sk' and with the rest of the TIPC stack + * @peer_name: the peer of the connection, if any + * @conn_timeout: the time we can wait for an unresponded setup request + * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue + * @link_cong: non-zero if owner must sleep because of link congestion + * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @rcv_unacked: # messages read by user, but not yet acked back to peer + */ +struct tipc_sock { + struct sock sk; + int connected; + u32 conn_type; + u32 conn_instance; + int published; + u32 max_pkt; + u32 ref; + struct tipc_msg phdr; + struct list_head sock_list; + struct list_head publications; + u32 pub_count; + u32 probing_state; + u32 probing_interval; + struct timer_list timer; + uint conn_timeout; + atomic_t dupl_rcvcnt; + bool link_cong; + uint sent_unacked; + uint rcv_unacked; +}; static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); +static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); +static void tipc_sk_timeout(unsigned long ref); +static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq); +static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq); +static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk); +static void tipc_sk_ref_discard(u32 ref); +static struct tipc_sock *tipc_sk_get(u32 ref); +static struct tipc_sock *tipc_sk_get_next(u32 *ref); +static void tipc_sk_put(struct tipc_sock *tsk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -103,29 +166,115 @@ static struct proto tipc_proto_kern; * - port reference */ -#include "socket.h" +static u32 tsk_peer_node(struct tipc_sock *tsk) +{ + return msg_destnode(&tsk->phdr); +} + +static u32 tsk_peer_port(struct tipc_sock *tsk) +{ + return msg_destport(&tsk->phdr); +} + +static bool tsk_unreliable(struct tipc_sock *tsk) +{ + return msg_src_droppable(&tsk->phdr) != 0; +} + +static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable) +{ + msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0); +} + +static bool tsk_unreturnable(struct tipc_sock *tsk) +{ + return msg_dest_droppable(&tsk->phdr) != 0; +} + +static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable) +{ + msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0); +} + +static int tsk_importance(struct tipc_sock *tsk) +{ + return msg_importance(&tsk->phdr); +} + +static int tsk_set_importance(struct tipc_sock *tsk, int imp) +{ + if (imp > TIPC_CRITICAL_IMPORTANCE) + return -EINVAL; + msg_set_importance(&tsk->phdr, (u32)imp); + return 0; +} + +static struct tipc_sock *tipc_sk(const struct sock *sk) +{ + return container_of(sk, struct tipc_sock, sk); +} + +static int tsk_conn_cong(struct tipc_sock *tsk) +{ + return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; +} /** - * advance_rx_queue - discard first buffer in socket receive queue + * tsk_advance_rx_queue - discard first buffer in socket receive queue * * Caller must hold socket lock */ -static void advance_rx_queue(struct sock *sk) +static void tsk_advance_rx_queue(struct sock *sk) { kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); } /** - * reject_rx_queue - reject all buffers in socket receive queue + * tsk_rej_rx_queue - reject all buffers in socket receive queue * * Caller must hold socket lock */ -static void reject_rx_queue(struct sock *sk) +static void tsk_rej_rx_queue(struct sock *sk) { struct sk_buff *buf; + u32 dnode; + + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit(buf, dnode, 0); + } +} + +/* tsk_peer_msg - verify if message was sent by connected port's peer + * + * Handles cases where the node's network address has changed from + * the default of <0.0.0> to its configured setting. + */ +static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) +{ + u32 peer_port = tsk_peer_port(tsk); + u32 orig_node; + u32 peer_node; + + if (unlikely(!tsk->connected)) + return false; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); + if (unlikely(msg_origport(msg) != peer_port)) + return false; + + orig_node = msg_orignode(msg); + peer_node = tsk_peer_node(tsk); + + if (likely(orig_node == peer_node)) + return true; + + if (!orig_node && (peer_node == tipc_own_addr)) + return true; + + if (!peer_node && (orig_node == tipc_own_addr)) + return true; + + return false; } /** @@ -147,7 +296,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, socket_state state; struct sock *sk; struct tipc_sock *tsk; - struct tipc_port *port; + struct tipc_msg *msg; u32 ref; /* Validate arguments */ @@ -182,32 +331,36 @@ static int tipc_sk_create(struct net *net, struct socket *sock, return -ENOMEM; tsk = tipc_sk(sk); - port = &tsk->port; - - ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE); + ref = tipc_sk_ref_acquire(tsk); if (!ref) { - pr_warn("Socket registration failed, ref. table exhausted\n"); - sk_free(sk); + pr_warn("Socket create failed; reference table exhausted\n"); return -ENOMEM; } + tsk->max_pkt = MAX_PKT_DEFAULT; + tsk->ref = ref; + INIT_LIST_HEAD(&tsk->publications); + msg = &tsk->phdr; + tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, + NAMED_H_SIZE, 0); + msg_set_origport(msg, ref); /* Finish initializing socket data structures */ sock->ops = ops; sock->state = state; - sock_init_data(sock, sk); + k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref); sk->sk_backlog_rcv = tipc_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; + tsk->sent_unacked = 0; atomic_set(&tsk->dupl_rcvcnt, 0); - tipc_port_unlock(port); if (sock->state == SS_READY) { - tipc_port_set_unreturnable(port, true); + tsk_set_unreturnable(tsk, true); if (sock->type == SOCK_DGRAM) - tipc_port_set_unreliable(port, true); + tsk_set_unreliable(tsk, true); } return 0; } @@ -301,8 +454,8 @@ static int tipc_release(struct socket *sock) { struct sock *sk = sock->sk; struct tipc_sock *tsk; - struct tipc_port *port; struct sk_buff *buf; + u32 dnode; /* * Exit if socket isn't fully initialized (occurs when a failed accept() @@ -312,13 +465,13 @@ static int tipc_release(struct socket *sock) return 0; tsk = tipc_sk(sk); - port = &tsk->port; lock_sock(sk); /* * Reject all unreceived messages, except on an active connection * (which disconnects locally & sends a 'FIN+' to peer) */ + dnode = tsk_peer_node(tsk); while (sock->state != SS_DISCONNECTING) { buf = __skb_dequeue(&sk->sk_receive_queue); if (buf == NULL) @@ -329,16 +482,27 @@ static int tipc_release(struct socket *sock) if ((sock->state == SS_CONNECTING) || (sock->state == SS_CONNECTED)) { sock->state = SS_DISCONNECTING; - tipc_port_disconnect(port->ref); + tsk->connected = 0; + tipc_node_remove_conn(dnode, tsk->ref); } - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit(buf, dnode, 0); } } - /* Destroy TIPC port; also disconnects an active connection and - * sends a 'FIN-' to peer. - */ - tipc_port_destroy(port); + tipc_sk_withdraw(tsk, 0, NULL); + tipc_sk_ref_discard(tsk->ref); + k_cancel_timer(&tsk->timer); + if (tsk->connected) { + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + SHORT_H_SIZE, 0, dnode, tipc_own_addr, + tsk_peer_port(tsk), + tsk->ref, TIPC_ERR_NO_PORT); + if (buf) + tipc_link_xmit(buf, dnode, tsk->ref); + tipc_node_remove_conn(dnode, tsk->ref); + } + k_term_timer(&tsk->timer); /* Discard any remaining (connection-based) messages in receive queue */ __skb_queue_purge(&sk->sk_receive_queue); @@ -346,7 +510,6 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ sock->state = SS_DISCONNECTING; release_sock(sk); - sock_put(sk); sock->sk = NULL; @@ -378,7 +541,7 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, lock_sock(sk); if (unlikely(!uaddr_len)) { - res = tipc_withdraw(&tsk->port, 0, NULL); + res = tipc_sk_withdraw(tsk, 0, NULL); goto exit; } @@ -406,8 +569,8 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, } res = (addr->scope > 0) ? - tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) : - tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq); + tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) : + tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq); exit: release_sock(sk); return res; @@ -437,10 +600,10 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr, if ((sock->state != SS_CONNECTED) && ((peer != 2) || (sock->state != SS_DISCONNECTING))) return -ENOTCONN; - addr->addr.id.ref = tipc_port_peerport(&tsk->port); - addr->addr.id.node = tipc_port_peernode(&tsk->port); + addr->addr.id.ref = tsk_peer_port(tsk); + addr->addr.id.node = tsk_peer_node(tsk); } else { - addr->addr.id.ref = tsk->port.ref; + addr->addr.id.ref = tsk->ref; addr->addr.id.node = tipc_own_addr; } @@ -504,12 +667,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch ((int)sock->state) { case SS_UNCONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong) mask |= POLLOUT; break; case SS_READY: case SS_CONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong && !tsk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case SS_CONNECTING: @@ -526,6 +689,136 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, } /** + * tipc_sendmcast - send multicast message + * @sock: socket structure + * @seq: destination address + * @iov: message data to send + * @dsz: total length of message data + * @timeo: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, + struct iovec *iov, size_t dsz, long timeo) +{ + struct sock *sk = sock->sk; + struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; + struct sk_buff *buf; + uint mtu; + int rc; + + msg_set_type(mhdr, TIPC_MCAST_MSG); + msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(mhdr, 0); + msg_set_destnode(mhdr, 0); + msg_set_nametype(mhdr, seq->type); + msg_set_namelower(mhdr, seq->lower); + msg_set_nameupper(mhdr, seq->upper); + msg_set_hdr_sz(mhdr, MCAST_H_SIZE); + +new_mtu: + mtu = tipc_bclink_get_mtu(); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + if (unlikely(rc < 0)) + return rc; + + do { + rc = tipc_bclink_xmit(buf); + if (likely(rc >= 0)) { + rc = dsz; + break; + } + if (rc == -EMSGSIZE) + goto new_mtu; + if (rc != -ELINKCONG) + break; + tipc_sk(sk)->link_cong = 1; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); + return rc; +} + +/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets + */ +void tipc_sk_mcast_rcv(struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + struct tipc_port_list dports = {0, NULL, }; + struct tipc_port_list *item; + struct sk_buff *b; + uint i, last, dst = 0; + u32 scope = TIPC_CLUSTER_SCOPE; + + if (in_own_node(msg_orignode(msg))) + scope = TIPC_NODE_SCOPE; + + /* Create destination port list: */ + tipc_nametbl_mc_translate(msg_nametype(msg), + msg_namelower(msg), + msg_nameupper(msg), + scope, + &dports); + last = dports.count; + if (!last) { + kfree_skb(buf); + return; + } + + for (item = &dports; item; item = item->next) { + for (i = 0; i < PLSIZE && ++dst <= last; i++) { + b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; + if (!b) { + pr_warn("Failed do clone mcast rcv buffer\n"); + continue; + } + msg_set_destport(msg, item->ports[i]); + tipc_sk_rcv(b); + } + } + tipc_port_list_free(&dports); +} + +/** + * tipc_sk_proto_rcv - receive a connection mng protocol message + * @tsk: receiving socket + * @dnode: node to send response message to, if any + * @buf: buffer containing protocol message + * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if + * (CONN_PROBE_REPLY) message should be forwarded. + */ +static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, + struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + int conn_cong; + + /* Ignore if connection cannot be validated: */ + if (!tsk_peer_msg(tsk, msg)) + goto exit; + + tsk->probing_state = TIPC_CONN_OK; + + if (msg_type(msg) == CONN_ACK) { + conn_cong = tsk_conn_cong(tsk); + tsk->sent_unacked -= msg_msgcnt(msg); + if (conn_cong) + tsk->sk.sk_write_space(&tsk->sk); + } else if (msg_type(msg) == CONN_PROBE) { + if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) + return TIPC_OK; + msg_set_type(msg, CONN_PROBE_REPLY); + return TIPC_FWD_MSG; + } + /* Do nothing if msg_type() == CONN_PROBE_REPLY */ +exit: + kfree_skb(buf); + return TIPC_OK; +} + +/** * dest_name_check - verify user is permitted to send to specified port name * @dest: destination address * @m: descriptor for message to be sent @@ -539,6 +832,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m) { struct tipc_cfg_msg_hdr hdr; + if (unlikely(dest->addrtype == TIPC_ADDR_ID)) + return 0; if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES)) return 0; if (likely(dest->addr.name.name.type == TIPC_TOP_SRV)) @@ -575,19 +870,18 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) return sock_intr_errno(*timeo_p); prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); - done = sk_wait_event(sk, timeo_p, !tsk->port.congested); + done = sk_wait_event(sk, timeo_p, !tsk->link_cong); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; } - /** * tipc_sendmsg - send message in connectionless manner * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure * @m: message to send - * @total_len: length of message + * @dsz: amount of user data to be sent * * Message must have an destination specified explicitly. * Used for SOCK_RDM and SOCK_DGRAM messages, @@ -597,100 +891,122 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) * Returns the number of bytes sent on success, or errno otherwise */ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) + struct msghdr *m, size_t dsz) { + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - int needs_conn; + struct tipc_msg *mhdr = &tsk->phdr; + struct iovec *iov = m->msg_iov; + u32 dnode, dport; + struct sk_buff *buf; + struct tipc_name_seq *seq = &dest->addr.nameseq; + u32 mtu; long timeo; - int res = -EINVAL; + int rc = -EINVAL; if (unlikely(!dest)) return -EDESTADDRREQ; + if (unlikely((m->msg_namelen < sizeof(*dest)) || (dest->family != AF_TIPC))) return -EINVAL; - if (total_len > TIPC_MAX_USER_MSG_SIZE) + + if (dsz > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; if (iocb) lock_sock(sk); - needs_conn = (sock->state != SS_READY); - if (unlikely(needs_conn)) { + if (unlikely(sock->state != SS_READY)) { if (sock->state == SS_LISTENING) { - res = -EPIPE; + rc = -EPIPE; goto exit; } if (sock->state != SS_UNCONNECTED) { - res = -EISCONN; + rc = -EISCONN; goto exit; } - if (tsk->port.published) { - res = -EOPNOTSUPP; + if (tsk->published) { + rc = -EOPNOTSUPP; goto exit; } if (dest->addrtype == TIPC_ADDR_NAME) { - tsk->port.conn_type = dest->addr.name.name.type; - tsk->port.conn_instance = dest->addr.name.name.instance; + tsk->conn_type = dest->addr.name.name.type; + tsk->conn_instance = dest->addr.name.name.instance; } - - /* Abort any pending connection attempts (very unlikely) */ - reject_rx_queue(sk); } + rc = dest_name_check(dest, m); + if (rc) + goto exit; timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - do { - if (dest->addrtype == TIPC_ADDR_NAME) { - res = dest_name_check(dest, m); - if (res) - break; - res = tipc_send2name(port, - &dest->addr.name.name, - dest->addr.name.domain, - m->msg_iov, - total_len); - } else if (dest->addrtype == TIPC_ADDR_ID) { - res = tipc_send2port(port, - &dest->addr.id, - m->msg_iov, - total_len); - } else if (dest->addrtype == TIPC_ADDR_MCAST) { - if (needs_conn) { - res = -EOPNOTSUPP; - break; - } - res = dest_name_check(dest, m); - if (res) - break; - res = tipc_port_mcast_xmit(port, - &dest->addr.nameseq, - m->msg_iov, - total_len); + + if (dest->addrtype == TIPC_ADDR_MCAST) { + rc = tipc_sendmcast(sock, seq, iov, dsz, timeo); + goto exit; + } else if (dest->addrtype == TIPC_ADDR_NAME) { + u32 type = dest->addr.name.name.type; + u32 inst = dest->addr.name.name.instance; + u32 domain = dest->addr.name.domain; + + dnode = domain; + msg_set_type(mhdr, TIPC_NAMED_MSG); + msg_set_hdr_sz(mhdr, NAMED_H_SIZE); + msg_set_nametype(mhdr, type); + msg_set_nameinst(mhdr, inst); + msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); + dport = tipc_nametbl_translate(type, inst, &dnode); + msg_set_destnode(mhdr, dnode); + msg_set_destport(mhdr, dport); + if (unlikely(!dport && !dnode)) { + rc = -EHOSTUNREACH; + goto exit; } - if (likely(res != -ELINKCONG)) { - if (needs_conn && (res >= 0)) + } else if (dest->addrtype == TIPC_ADDR_ID) { + dnode = dest->addr.id.node; + msg_set_type(mhdr, TIPC_DIRECT_MSG); + msg_set_lookup_scope(mhdr, 0); + msg_set_destnode(mhdr, dnode); + msg_set_destport(mhdr, dest->addr.id.ref); + msg_set_hdr_sz(mhdr, BASIC_H_SIZE); + } + +new_mtu: + mtu = tipc_node_get_mtu(dnode, tsk->ref); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + if (rc < 0) + goto exit; + + do { + TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong; + rc = tipc_link_xmit(buf, dnode, tsk->ref); + if (likely(rc >= 0)) { + if (sock->state != SS_READY) sock->state = SS_CONNECTING; + rc = dsz; break; } - res = tipc_wait_for_sndmsg(sock, &timeo); - if (res) + if (rc == -EMSGSIZE) + goto new_mtu; + if (rc != -ELINKCONG) break; - } while (1); - + tsk->link_cong = 1; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); exit: if (iocb) release_sock(sk); - return res; + + return rc; } static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; DEFINE_WAIT(wait); int done; @@ -709,37 +1025,48 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); done = sk_wait_event(sk, timeo_p, - (!port->congested || !port->connected)); + (!tsk->link_cong && + !tsk_conn_cong(tsk)) || + !tsk->connected); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; } /** - * tipc_send_packet - send a connection-oriented message - * @iocb: if NULL, indicates that socket lock is already held + * tipc_send_stream - send stream-oriented data + * @iocb: (unused) * @sock: socket structure - * @m: message to send - * @total_len: length of message + * @m: data to send + * @dsz: total length of data to be transmitted * - * Used for SOCK_SEQPACKET messages and SOCK_STREAM data. + * Used for SOCK_STREAM data. * - * Returns the number of bytes sent on success, or errno otherwise + * Returns the number of bytes sent on success (or partial success), + * or errno if no data sent */ -static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t dsz) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *mhdr = &tsk->phdr; + struct sk_buff *buf; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - int res = -EINVAL; + u32 ref = tsk->ref; + int rc = -EINVAL; long timeo; + u32 dnode; + uint mtu, send, sent = 0; /* Handle implied connection establishment */ - if (unlikely(dest)) - return tipc_sendmsg(iocb, sock, m, total_len); - - if (total_len > TIPC_MAX_USER_MSG_SIZE) + if (unlikely(dest)) { + rc = tipc_sendmsg(iocb, sock, m, dsz); + if (dsz && (dsz == rc)) + tsk->sent_unacked = 1; + return rc; + } + if (dsz > (uint)INT_MAX) return -EMSGSIZE; if (iocb) @@ -747,148 +1074,88 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, if (unlikely(sock->state != SS_CONNECTED)) { if (sock->state == SS_DISCONNECTING) - res = -EPIPE; + rc = -EPIPE; else - res = -ENOTCONN; + rc = -ENOTCONN; goto exit; } timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + dnode = tsk_peer_node(tsk); + +next: + mtu = tsk->max_pkt; + send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); + rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf); + if (unlikely(rc < 0)) + goto exit; do { - res = tipc_send(&tsk->port, m->msg_iov, total_len); - if (likely(res != -ELINKCONG)) - break; - res = tipc_wait_for_sndpkt(sock, &timeo); - if (res) - break; - } while (1); + if (likely(!tsk_conn_cong(tsk))) { + rc = tipc_link_xmit(buf, dnode, ref); + if (likely(!rc)) { + tsk->sent_unacked++; + sent += send; + if (sent == dsz) + break; + goto next; + } + if (rc == -EMSGSIZE) { + tsk->max_pkt = tipc_node_get_mtu(dnode, ref); + goto next; + } + if (rc != -ELINKCONG) + break; + tsk->link_cong = 1; + } + rc = tipc_wait_for_sndpkt(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); exit: if (iocb) release_sock(sk); - return res; + return sent ? sent : rc; } /** - * tipc_send_stream - send stream-oriented data - * @iocb: (unused) + * tipc_send_packet - send a connection-oriented message + * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure - * @m: data to send - * @total_len: total length of data to be sent + * @m: message to send + * @dsz: length of data to be transmitted * - * Used for SOCK_STREAM data. + * Used for SOCK_SEQPACKET messages. * - * Returns the number of bytes sent on success (or partial success), - * or errno if no data sent + * Returns the number of bytes sent on success, or errno otherwise */ -static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t dsz) { - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - struct msghdr my_msg; - struct iovec my_iov; - struct iovec *curr_iov; - int curr_iovlen; - char __user *curr_start; - u32 hdr_size; - int curr_left; - int bytes_to_send; - int bytes_sent; - int res; - - lock_sock(sk); - - /* Handle special cases where there is no connection */ - if (unlikely(sock->state != SS_CONNECTED)) { - if (sock->state == SS_UNCONNECTED) - res = tipc_send_packet(NULL, sock, m, total_len); - else - res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN; - goto exit; - } - - if (unlikely(m->msg_name)) { - res = -EISCONN; - goto exit; - } - - if (total_len > (unsigned int)INT_MAX) { - res = -EMSGSIZE; - goto exit; - } - - /* - * Send each iovec entry using one or more messages - * - * Note: This algorithm is good for the most likely case - * (i.e. one large iovec entry), but could be improved to pass sets - * of small iovec entries into send_packet(). - */ - curr_iov = m->msg_iov; - curr_iovlen = m->msg_iovlen; - my_msg.msg_iov = &my_iov; - my_msg.msg_iovlen = 1; - my_msg.msg_flags = m->msg_flags; - my_msg.msg_name = NULL; - bytes_sent = 0; - - hdr_size = msg_hdr_sz(&tsk->port.phdr); - - while (curr_iovlen--) { - curr_start = curr_iov->iov_base; - curr_left = curr_iov->iov_len; - - while (curr_left) { - bytes_to_send = tsk->port.max_pkt - hdr_size; - if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE) - bytes_to_send = TIPC_MAX_USER_MSG_SIZE; - if (curr_left < bytes_to_send) - bytes_to_send = curr_left; - my_iov.iov_base = curr_start; - my_iov.iov_len = bytes_to_send; - res = tipc_send_packet(NULL, sock, &my_msg, - bytes_to_send); - if (res < 0) { - if (bytes_sent) - res = bytes_sent; - goto exit; - } - curr_left -= bytes_to_send; - curr_start += bytes_to_send; - bytes_sent += bytes_to_send; - } + if (dsz > TIPC_MAX_USER_MSG_SIZE) + return -EMSGSIZE; - curr_iov++; - } - res = bytes_sent; -exit: - release_sock(sk); - return res; + return tipc_send_stream(iocb, sock, m, dsz); } -/** - * auto_connect - complete connection setup to a remote port - * @tsk: tipc socket structure - * @msg: peer's response message - * - * Returns 0 on success, errno otherwise +/* tipc_sk_finish_conn - complete the setup of a connection */ -static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg) +static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, + u32 peer_node) { - struct tipc_port *port = &tsk->port; - struct socket *sock = tsk->sk.sk_socket; - struct tipc_portid peer; - - peer.ref = msg_origport(msg); - peer.node = msg_orignode(msg); - - __tipc_port_connect(port->ref, port, &peer); - - if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) - return -EINVAL; - msg_set_importance(&port->phdr, (u32)msg_importance(msg)); - sock->state = SS_CONNECTED; - return 0; + struct tipc_msg *msg = &tsk->phdr; + + msg_set_destnode(msg, peer_node); + msg_set_destport(msg, peer_port); + msg_set_type(msg, TIPC_CONN_MSG); + msg_set_lookup_scope(msg, 0); + msg_set_hdr_sz(msg, SHORT_H_SIZE); + + tsk->probing_interval = CONN_PROBING_INTERVAL; + tsk->probing_state = TIPC_CONN_OK; + tsk->connected = 1; + k_start_timer(&tsk->timer, tsk->probing_interval); + tipc_node_add_conn(peer_node, tsk->ref, peer_port); + tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref); } /** @@ -915,17 +1182,17 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) } /** - * anc_data_recv - optionally capture ancillary data for received message + * tipc_sk_anc_data_recv - optionally capture ancillary data for received message * @m: descriptor for message info * @msg: received message header - * @tport: TIPC port associated with message + * @tsk: TIPC port associated with message * * Note: Ancillary data is not captured if not requested by receiver. * * Returns 0 if successful, otherwise errno */ -static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, - struct tipc_port *tport) +static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, + struct tipc_sock *tsk) { u32 anc_data[3]; u32 err; @@ -968,10 +1235,10 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, anc_data[2] = msg_nameupper(msg); break; case TIPC_CONN_MSG: - has_name = (tport->conn_type != 0); - anc_data[0] = tport->conn_type; - anc_data[1] = tport->conn_instance; - anc_data[2] = tport->conn_instance; + has_name = (tsk->conn_type != 0); + anc_data[0] = tsk->conn_type; + anc_data[1] = tsk->conn_instance; + anc_data[2] = tsk->conn_instance; break; default: has_name = 0; @@ -985,6 +1252,24 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, return 0; } +static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) +{ + struct sk_buff *buf = NULL; + struct tipc_msg *msg; + u32 peer_port = tsk_peer_port(tsk); + u32 dnode = tsk_peer_node(tsk); + + if (!tsk->connected) + return; + buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, + tipc_own_addr, peer_port, tsk->ref, TIPC_OK); + if (!buf) + return; + msg = buf_msg(buf); + msg_set_msgcnt(msg, ack); + tipc_link_xmit(buf, dnode, msg_link_selector(msg)); +} + static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) { struct sock *sk = sock->sk; @@ -1035,7 +1320,6 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; struct sk_buff *buf; struct tipc_msg *msg; long timeo; @@ -1070,7 +1354,7 @@ restart: /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); goto restart; } @@ -1078,7 +1362,7 @@ restart: set_orig_addr(m, msg); /* Capture ancillary data (optional) */ - res = anc_data_recv(m, msg, port); + res = tipc_sk_anc_data_recv(m, msg, tsk); if (res) goto exit; @@ -1104,9 +1388,11 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); - advance_rx_queue(sk); + (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_sk_send_ack(tsk, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } + tsk_advance_rx_queue(sk); } exit: release_sock(sk); @@ -1130,7 +1416,6 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; struct sk_buff *buf; struct tipc_msg *msg; long timeo; @@ -1168,14 +1453,14 @@ restart: /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); goto restart; } /* Optionally capture sender's address & ancillary data of first msg */ if (sz_copied == 0) { set_orig_addr(m, msg); - res = anc_data_recv(m, msg, port); + res = tipc_sk_anc_data_recv(m, msg, tsk); if (res) goto exit; } @@ -1213,9 +1498,11 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); - advance_rx_queue(sk); + if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_sk_send_ack(tsk, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } + tsk_advance_rx_queue(sk); } /* Loop around if more data is required */ @@ -1269,18 +1556,14 @@ static void tipc_data_ready(struct sock *sk) * @tsk: TIPC socket * @msg: message * - * Returns TIPC error status code and socket error status code - * once it encounters some errors + * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise */ -static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) +static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) { struct sock *sk = &tsk->sk; - struct tipc_port *port = &tsk->port; struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(*buf); - - u32 retval = TIPC_ERR_NO_PORT; - int res; + int retval = -TIPC_ERR_NO_PORT; if (msg_mcast(msg)) return retval; @@ -1288,16 +1571,23 @@ static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) switch ((int)sock->state) { case SS_CONNECTED: /* Accept only connection-based messages sent by peer */ - if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) { + if (tsk_peer_msg(tsk, msg)) { if (unlikely(msg_errcode(msg))) { sock->state = SS_DISCONNECTING; - __tipc_port_disconnect(port); + tsk->connected = 0; + /* let timer expire on it's own */ + tipc_node_remove_conn(tsk_peer_node(tsk), + tsk->ref); } retval = TIPC_OK; } break; case SS_CONNECTING: /* Accept only ACK or NACK message */ + + if (unlikely(!msg_connected(msg))) + break; + if (unlikely(msg_errcode(msg))) { sock->state = SS_DISCONNECTING; sk->sk_err = ECONNREFUSED; @@ -1305,17 +1595,17 @@ static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) break; } - if (unlikely(!msg_connected(msg))) - break; - - res = auto_connect(tsk, msg); - if (res) { + if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { sock->state = SS_DISCONNECTING; - sk->sk_err = -res; + sk->sk_err = EINVAL; retval = TIPC_OK; break; } + tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); + msg_set_importance(&tsk->phdr, msg_importance(msg)); + sock->state = SS_CONNECTED; + /* If an incoming message is an 'ACK-', it should be * discarded here because it doesn't contain useful * data. In addition, we should try to wake up @@ -1382,32 +1672,44 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) * * Called with socket lock already taken; port lock may also be taken. * - * Returns TIPC error status code (TIPC_OK if message is not to be rejected) + * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message + * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded */ -static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) +static int filter_rcv(struct sock *sk, struct sk_buff *buf) { struct socket *sock = sk->sk_socket; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *msg = buf_msg(buf); unsigned int limit = rcvbuf_limit(sk, buf); - u32 res = TIPC_OK; + u32 onode; + int rc = TIPC_OK; + + if (unlikely(msg_user(msg) == CONN_MANAGER)) + return tipc_sk_proto_rcv(tsk, &onode, buf); + + if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { + kfree_skb(buf); + tsk->link_cong = 0; + sk->sk_write_space(sk); + return TIPC_OK; + } /* Reject message if it is wrong sort of message for socket */ if (msg_type(msg) > TIPC_DIRECT_MSG) - return TIPC_ERR_NO_PORT; + return -TIPC_ERR_NO_PORT; if (sock->state == SS_READY) { if (msg_connected(msg)) - return TIPC_ERR_NO_PORT; + return -TIPC_ERR_NO_PORT; } else { - res = filter_connect(tsk, &buf); - if (res != TIPC_OK || buf == NULL) - return res; + rc = filter_connect(tsk, &buf); + if (rc != TIPC_OK || buf == NULL) + return rc; } /* Reject message if there isn't room to queue it */ if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) - return TIPC_ERR_OVERLOAD; + return -TIPC_ERR_OVERLOAD; /* Enqueue message */ TIPC_SKB_CB(buf)->handle = NULL; @@ -1429,16 +1731,23 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) */ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) { - u32 res; + int rc; + u32 onode; struct tipc_sock *tsk = tipc_sk(sk); uint truesize = buf->truesize; - res = filter_rcv(sk, buf); - if (unlikely(res)) - tipc_reject_msg(buf, res); + rc = filter_rcv(sk, buf); + + if (likely(!rc)) { + if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) + atomic_add(truesize, &tsk->dupl_rcvcnt); + return 0; + } + + if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) + return 0; - if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) - atomic_add(truesize, &tsk->dupl_rcvcnt); + tipc_link_xmit(buf, onode, 0); return 0; } @@ -1452,49 +1761,42 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) int tipc_sk_rcv(struct sk_buff *buf) { struct tipc_sock *tsk; - struct tipc_port *port; struct sock *sk; u32 dport = msg_destport(buf_msg(buf)); - int err = TIPC_OK; + int rc = TIPC_OK; uint limit; + u32 dnode; - /* Forward unresolved named message */ - if (unlikely(!dport)) { - tipc_net_route_msg(buf); - return 0; - } - - /* Validate destination */ - port = tipc_port_lock(dport); - if (unlikely(!port)) { - err = TIPC_ERR_NO_PORT; + /* Validate destination and message */ + tsk = tipc_sk_get(dport); + if (unlikely(!tsk)) { + rc = tipc_msg_eval(buf, &dnode); goto exit; } - - tsk = tipc_port_to_sock(port); sk = &tsk->sk; /* Queue message */ bh_lock_sock(sk); if (!sock_owned_by_user(sk)) { - err = filter_rcv(sk, buf); + rc = filter_rcv(sk, buf); } else { if (sk->sk_backlog.len == 0) atomic_set(&tsk->dupl_rcvcnt, 0); limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); if (sk_add_backlog(sk, buf, limit)) - err = TIPC_ERR_OVERLOAD; + rc = -TIPC_ERR_OVERLOAD; } - bh_unlock_sock(sk); - tipc_port_unlock(port); - - if (likely(!err)) + tipc_sk_put(tsk); + if (likely(!rc)) return 0; exit: - tipc_reject_msg(buf, err); - return -EHOSTUNREACH; + if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) + return -EHOSTUNREACH; + + tipc_link_xmit(buf, dnode, 0); + return (rc < 0) ? -EHOSTUNREACH : 0; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -1673,10 +1975,8 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) { struct sock *new_sk, *sk = sock->sk; struct sk_buff *buf; - struct tipc_port *new_port; + struct tipc_sock *new_tsock; struct tipc_msg *msg; - struct tipc_portid peer; - u32 new_ref; long timeo; int res; @@ -1698,8 +1998,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) goto exit; new_sk = new_sock->sk; - new_port = &tipc_sk(new_sk)->port; - new_ref = new_port->ref; + new_tsock = tipc_sk(new_sk); msg = buf_msg(buf); /* we lock on new_sk; but lockdep sees the lock on sk */ @@ -1709,18 +2008,16 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) * Reject any stray messages received by new socket * before the socket lock was taken (very, very unlikely) */ - reject_rx_queue(new_sk); + tsk_rej_rx_queue(new_sk); /* Connect new socket to it's peer */ - peer.ref = msg_origport(msg); - peer.node = msg_orignode(msg); - tipc_port_connect(new_ref, &peer); + tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg)); new_sock->state = SS_CONNECTED; - tipc_port_set_importance(new_port, msg_importance(msg)); + tsk_set_importance(new_tsock, msg_importance(msg)); if (msg_named(msg)) { - new_port->conn_type = msg_nametype(msg); - new_port->conn_instance = msg_nameinst(msg); + new_tsock->conn_type = msg_nametype(msg); + new_tsock->conn_instance = msg_nameinst(msg); } /* @@ -1730,7 +2027,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) if (!msg_data_sz(msg)) { struct msghdr m = {NULL,}; - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); tipc_send_packet(NULL, new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); @@ -1756,8 +2053,8 @@ static int tipc_shutdown(struct socket *sock, int how) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; struct sk_buff *buf; + u32 dnode; int res; if (how != SHUT_RDWR) @@ -1777,14 +2074,21 @@ restart: kfree_skb(buf); goto restart; } - tipc_port_disconnect(port->ref); - tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN); + if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN)) + tipc_link_xmit(buf, dnode, tsk->ref); + tipc_node_remove_conn(dnode, tsk->ref); } else { - tipc_port_shutdown(port->ref); + dnode = tsk_peer_node(tsk); + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, + TIPC_CONN_MSG, SHORT_H_SIZE, + 0, dnode, tipc_own_addr, + tsk_peer_port(tsk), + tsk->ref, TIPC_CONN_SHUTDOWN); + tipc_link_xmit(buf, dnode, tsk->ref); } - + tsk->connected = 0; sock->state = SS_DISCONNECTING; - + tipc_node_remove_conn(dnode, tsk->ref); /* fall through */ case SS_DISCONNECTING: @@ -1805,6 +2109,432 @@ restart: return res; } +static void tipc_sk_timeout(unsigned long ref) +{ + struct tipc_sock *tsk; + struct sock *sk; + struct sk_buff *buf = NULL; + u32 peer_port, peer_node; + + tsk = tipc_sk_get(ref); + if (!tsk) + return; + + sk = &tsk->sk; + bh_lock_sock(sk); + if (!tsk->connected) { + bh_unlock_sock(sk); + goto exit; + } + peer_port = tsk_peer_port(tsk); + peer_node = tsk_peer_node(tsk); + + if (tsk->probing_state == TIPC_CONN_PROBING) { + /* Previous probe not answered -> self abort */ + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + SHORT_H_SIZE, 0, tipc_own_addr, + peer_node, ref, peer_port, + TIPC_ERR_NO_PORT); + } else { + buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, + 0, peer_node, tipc_own_addr, + peer_port, ref, TIPC_OK); + tsk->probing_state = TIPC_CONN_PROBING; + k_start_timer(&tsk->timer, tsk->probing_interval); + } + bh_unlock_sock(sk); + if (buf) + tipc_link_xmit(buf, peer_node, ref); +exit: + tipc_sk_put(tsk); +} + +static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq) +{ + struct publication *publ; + u32 key; + + if (tsk->connected) + return -EINVAL; + key = tsk->ref + tsk->pub_count + 1; + if (key == tsk->ref) + return -EADDRINUSE; + + publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, + scope, tsk->ref, key); + if (unlikely(!publ)) + return -EINVAL; + + list_add(&publ->pport_list, &tsk->publications); + tsk->pub_count++; + tsk->published = 1; + return 0; +} + +static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq) +{ + struct publication *publ; + struct publication *safe; + int rc = -EINVAL; + + list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) { + if (seq) { + if (publ->scope != scope) + continue; + if (publ->type != seq->type) + continue; + if (publ->lower != seq->lower) + continue; + if (publ->upper != seq->upper) + break; + tipc_nametbl_withdraw(publ->type, publ->lower, + publ->ref, publ->key); + rc = 0; + break; + } + tipc_nametbl_withdraw(publ->type, publ->lower, + publ->ref, publ->key); + rc = 0; + } + if (list_empty(&tsk->publications)) + tsk->published = 0; + return rc; +} + +static int tipc_sk_show(struct tipc_sock *tsk, char *buf, + int len, int full_id) +{ + struct publication *publ; + int ret; + + if (full_id) + ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", + tipc_zone(tipc_own_addr), + tipc_cluster(tipc_own_addr), + tipc_node(tipc_own_addr), tsk->ref); + else + ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref); + + if (tsk->connected) { + u32 dport = tsk_peer_port(tsk); + u32 destnode = tsk_peer_node(tsk); + + ret += tipc_snprintf(buf + ret, len - ret, + " connected to <%u.%u.%u:%u>", + tipc_zone(destnode), + tipc_cluster(destnode), + tipc_node(destnode), dport); + if (tsk->conn_type != 0) + ret += tipc_snprintf(buf + ret, len - ret, + " via {%u,%u}", tsk->conn_type, + tsk->conn_instance); + } else if (tsk->published) { + ret += tipc_snprintf(buf + ret, len - ret, " bound to"); + list_for_each_entry(publ, &tsk->publications, pport_list) { + if (publ->lower == publ->upper) + ret += tipc_snprintf(buf + ret, len - ret, + " {%u,%u}", publ->type, + publ->lower); + else + ret += tipc_snprintf(buf + ret, len - ret, + " {%u,%u,%u}", publ->type, + publ->lower, publ->upper); + } + } + ret += tipc_snprintf(buf + ret, len - ret, "\n"); + return ret; +} + +struct sk_buff *tipc_sk_socks_show(void) +{ + struct sk_buff *buf; + struct tlv_desc *rep_tlv; + char *pb; + int pb_len; + struct tipc_sock *tsk; + int str_len = 0; + u32 ref = 0; + + buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); + if (!buf) + return NULL; + rep_tlv = (struct tlv_desc *)buf->data; + pb = TLV_DATA(rep_tlv); + pb_len = ULTRA_STRING_MAX_LEN; + + tsk = tipc_sk_get_next(&ref); + for (; tsk; tsk = tipc_sk_get_next(&ref)) { + lock_sock(&tsk->sk); + str_len += tipc_sk_show(tsk, pb + str_len, + pb_len - str_len, 0); + release_sock(&tsk->sk); + tipc_sk_put(tsk); + } + str_len += 1; /* for "\0" */ + skb_put(buf, TLV_SPACE(str_len)); + TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); + + return buf; +} + +/* tipc_sk_reinit: set non-zero address in all existing sockets + * when we go from standalone to network mode. + */ +void tipc_sk_reinit(void) +{ + struct tipc_msg *msg; + u32 ref = 0; + struct tipc_sock *tsk = tipc_sk_get_next(&ref); + + for (; tsk; tsk = tipc_sk_get_next(&ref)) { + lock_sock(&tsk->sk); + msg = &tsk->phdr; + msg_set_prevnode(msg, tipc_own_addr); + msg_set_orignode(msg, tipc_own_addr); + release_sock(&tsk->sk); + tipc_sk_put(tsk); + } +} + +/** + * struct reference - TIPC socket reference entry + * @tsk: pointer to socket associated with reference entry + * @ref: reference value for socket (combines instance & array index info) + */ +struct reference { + struct tipc_sock *tsk; + u32 ref; +}; + +/** + * struct tipc_ref_table - table of TIPC socket reference entries + * @entries: pointer to array of reference entries + * @capacity: array index of first unusable entry + * @init_point: array index of first uninitialized entry + * @first_free: array index of first unused socket reference entry + * @last_free: array index of last unused socket reference entry + * @index_mask: bitmask for array index portion of reference values + * @start_mask: initial value for instance value portion of reference values + */ +struct ref_table { + struct reference *entries; + u32 capacity; + u32 init_point; + u32 first_free; + u32 last_free; + u32 index_mask; + u32 start_mask; +}; + +/* Socket reference table consists of 2**N entries. + * + * State Socket ptr Reference + * ----- ---------- --------- + * In use non-NULL XXXX|own index + * (XXXX changes each time entry is acquired) + * Free NULL YYYY|next free index + * (YYYY is one more than last used XXXX) + * Uninitialized NULL 0 + * + * Entry 0 is not used; this allows index 0 to denote the end of the free list. + * + * Note that a reference value of 0 does not necessarily indicate that an + * entry is uninitialized, since the last entry in the free list could also + * have a reference value of 0 (although this is unlikely). + */ + +static struct ref_table tipc_ref_table; + +static DEFINE_RWLOCK(ref_table_lock); + +/** + * tipc_ref_table_init - create reference table for sockets + */ +int tipc_sk_ref_table_init(u32 req_sz, u32 start) +{ + struct reference *table; + u32 actual_sz; + + /* account for unused entry, then round up size to a power of 2 */ + + req_sz++; + for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) { + /* do nothing */ + }; + + /* allocate table & mark all entries as uninitialized */ + table = vzalloc(actual_sz * sizeof(struct reference)); + if (table == NULL) + return -ENOMEM; + + tipc_ref_table.entries = table; + tipc_ref_table.capacity = req_sz; + tipc_ref_table.init_point = 1; + tipc_ref_table.first_free = 0; + tipc_ref_table.last_free = 0; + tipc_ref_table.index_mask = actual_sz - 1; + tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask; + + return 0; +} + +/** + * tipc_ref_table_stop - destroy reference table for sockets + */ +void tipc_sk_ref_table_stop(void) +{ + if (!tipc_ref_table.entries) + return; + vfree(tipc_ref_table.entries); + tipc_ref_table.entries = NULL; +} + +/* tipc_ref_acquire - create reference to a socket + * + * Register an socket pointer in the reference table. + * Returns a unique reference value that is used from then on to retrieve the + * socket pointer, or to determine if the socket has been deregistered. + */ +u32 tipc_sk_ref_acquire(struct tipc_sock *tsk) +{ + u32 index; + u32 index_mask; + u32 next_plus_upper; + u32 ref = 0; + struct reference *entry; + + if (unlikely(!tsk)) { + pr_err("Attempt to acquire ref. to non-existent obj\n"); + return 0; + } + if (unlikely(!tipc_ref_table.entries)) { + pr_err("Ref. table not found in acquisition attempt\n"); + return 0; + } + + /* Take a free entry, if available; otherwise initialize a new one */ + write_lock_bh(&ref_table_lock); + index = tipc_ref_table.first_free; + entry = &tipc_ref_table.entries[index]; + + if (likely(index)) { + index = tipc_ref_table.first_free; + entry = &tipc_ref_table.entries[index]; + index_mask = tipc_ref_table.index_mask; + next_plus_upper = entry->ref; + tipc_ref_table.first_free = next_plus_upper & index_mask; + ref = (next_plus_upper & ~index_mask) + index; + entry->tsk = tsk; + } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) { + index = tipc_ref_table.init_point++; + entry = &tipc_ref_table.entries[index]; + ref = tipc_ref_table.start_mask + index; + } + + if (ref) { + entry->ref = ref; + entry->tsk = tsk; + } + write_unlock_bh(&ref_table_lock); + return ref; +} + +/* tipc_sk_ref_discard - invalidate reference to an socket + * + * Disallow future references to an socket and free up the entry for re-use. + */ +void tipc_sk_ref_discard(u32 ref) +{ + struct reference *entry; + u32 index; + u32 index_mask; + + if (unlikely(!tipc_ref_table.entries)) { + pr_err("Ref. table not found during discard attempt\n"); + return; + } + + index_mask = tipc_ref_table.index_mask; + index = ref & index_mask; + entry = &tipc_ref_table.entries[index]; + + write_lock_bh(&ref_table_lock); + + if (unlikely(!entry->tsk)) { + pr_err("Attempt to discard ref. to non-existent socket\n"); + goto exit; + } + if (unlikely(entry->ref != ref)) { + pr_err("Attempt to discard non-existent reference\n"); + goto exit; + } + + /* Mark entry as unused; increment instance part of entry's + * reference to invalidate any subsequent references + */ + + entry->tsk = NULL; + entry->ref = (ref & ~index_mask) + (index_mask + 1); + + /* Append entry to free entry list */ + if (unlikely(tipc_ref_table.first_free == 0)) + tipc_ref_table.first_free = index; + else + tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index; + tipc_ref_table.last_free = index; +exit: + write_unlock_bh(&ref_table_lock); +} + +/* tipc_sk_get - find referenced socket and return pointer to it + */ +struct tipc_sock *tipc_sk_get(u32 ref) +{ + struct reference *entry; + struct tipc_sock *tsk; + + if (unlikely(!tipc_ref_table.entries)) + return NULL; + read_lock_bh(&ref_table_lock); + entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask]; + tsk = entry->tsk; + if (likely(tsk && (entry->ref == ref))) + sock_hold(&tsk->sk); + else + tsk = NULL; + read_unlock_bh(&ref_table_lock); + return tsk; +} + +/* tipc_sk_get_next - lock & return next socket after referenced one +*/ +struct tipc_sock *tipc_sk_get_next(u32 *ref) +{ + struct reference *entry; + struct tipc_sock *tsk = NULL; + uint index = *ref & tipc_ref_table.index_mask; + + read_lock_bh(&ref_table_lock); + while (++index < tipc_ref_table.capacity) { + entry = &tipc_ref_table.entries[index]; + if (!entry->tsk) + continue; + tsk = entry->tsk; + sock_hold(&tsk->sk); + *ref = entry->ref; + break; + } + read_unlock_bh(&ref_table_lock); + return tsk; +} + +static void tipc_sk_put(struct tipc_sock *tsk) +{ + sock_put(&tsk->sk); +} + /** * tipc_setsockopt - set socket option * @sock: socket structure @@ -1823,7 +2553,6 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; u32 value; int res; @@ -1841,16 +2570,16 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, switch (opt) { case TIPC_IMPORTANCE: - tipc_port_set_importance(port, value); + res = tsk_set_importance(tsk, value); break; case TIPC_SRC_DROPPABLE: if (sock->type != SOCK_STREAM) - tipc_port_set_unreliable(port, value); + tsk_set_unreliable(tsk, value); else res = -ENOPROTOOPT; break; case TIPC_DEST_DROPPABLE: - tipc_port_set_unreturnable(port, value); + tsk_set_unreturnable(tsk, value); break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; @@ -1883,7 +2612,6 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; int len; u32 value; int res; @@ -1900,16 +2628,16 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, switch (opt) { case TIPC_IMPORTANCE: - value = tipc_port_importance(port); + value = tsk_importance(tsk); break; case TIPC_SRC_DROPPABLE: - value = tipc_port_unreliable(port); + value = tsk_unreliable(tsk); break; case TIPC_DEST_DROPPABLE: - value = tipc_port_unreturnable(port); + value = tsk_unreturnable(tsk); break; case TIPC_CONN_TIMEOUT: - value = tipc_sk(sk)->conn_timeout; + value = tsk->conn_timeout; /* no need to set "res", since already 0 at this point */ break; case TIPC_NODE_RECVQ_DEPTH: @@ -1936,7 +2664,7 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, return put_user(sizeof(value), ol); } -int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) +static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) { struct tipc_sioc_ln_req lnr; void __user *argp = (void __user *)arg; @@ -1952,7 +2680,6 @@ int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) return 0; } return -EADDRNOTAVAIL; - break; default: return -ENOIOCTLCMD; } diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 3afcd2a70b3..baa43d03901 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -35,40 +35,17 @@ #ifndef _TIPC_SOCK_H #define _TIPC_SOCK_H -#include "port.h" #include <net/sock.h> -/** - * struct tipc_sock - TIPC socket structure - * @sk: socket - interacts with 'port' and with user via the socket API - * @port: port - interacts with 'sk' and with the rest of the TIPC stack - * @peer_name: the peer of the connection, if any - * @conn_timeout: the time we can wait for an unresponded setup request - * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue - */ - -struct tipc_sock { - struct sock sk; - struct tipc_port port; - unsigned int conn_timeout; - atomic_t dupl_rcvcnt; -}; - -static inline struct tipc_sock *tipc_sk(const struct sock *sk) -{ - return container_of(sk, struct tipc_sock, sk); -} - -static inline struct tipc_sock *tipc_port_to_sock(const struct tipc_port *port) -{ - return container_of(port, struct tipc_sock, port); -} - -static inline void tipc_sock_wakeup(struct tipc_sock *tsk) -{ - tsk->sk.sk_write_space(&tsk->sk); -} - +#define TIPC_CONNACK_INTV 256 +#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) +#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ + SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) int tipc_sk_rcv(struct sk_buff *buf); +struct sk_buff *tipc_sk_socks_show(void); +void tipc_sk_mcast_rcv(struct sk_buff *buf); +void tipc_sk_reinit(void); +int tipc_sk_ref_table_init(u32 requested_size, u32 start); +void tipc_sk_ref_table_stop(void); #endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 642437231ad..31b5cb232a4 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -36,7 +36,6 @@ #include "core.h" #include "name_table.h" -#include "port.h" #include "subscr.h" /** diff --git a/net/tipc/sysctl.c b/net/tipc/sysctl.c index f3fef93325a..1a779b1e851 100644 --- a/net/tipc/sysctl.c +++ b/net/tipc/sysctl.c @@ -47,6 +47,13 @@ static struct ctl_table tipc_table[] = { .mode = 0644, .proc_handler = proc_dointvec, }, + { + .procname = "named_timeout", + .data = &sysctl_tipc_named_timeout, + .maxlen = sizeof(sysctl_tipc_named_timeout), + .mode = 0644, + .proc_handler = proc_dointvec, + }, {} }; |