diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/Makefile | 3 | ||||
-rw-r--r-- | net/tipc/bcast.c | 3 | ||||
-rw-r--r-- | net/tipc/bcast.h | 3 | ||||
-rw-r--r-- | net/tipc/config.c | 119 | ||||
-rw-r--r-- | net/tipc/core.c | 22 | ||||
-rw-r--r-- | net/tipc/core.h | 17 | ||||
-rw-r--r-- | net/tipc/discover.c | 7 | ||||
-rw-r--r-- | net/tipc/eth_media.c | 19 | ||||
-rw-r--r-- | net/tipc/ib_media.c | 25 | ||||
-rw-r--r-- | net/tipc/link.c | 88 | ||||
-rw-r--r-- | net/tipc/msg.c | 19 | ||||
-rw-r--r-- | net/tipc/msg.h | 8 | ||||
-rw-r--r-- | net/tipc/name_table.c | 10 | ||||
-rw-r--r-- | net/tipc/name_table.h | 11 | ||||
-rw-r--r-- | net/tipc/node_subscr.c | 2 | ||||
-rw-r--r-- | net/tipc/port.c | 320 | ||||
-rw-r--r-- | net/tipc/port.h | 85 | ||||
-rw-r--r-- | net/tipc/server.c | 605 | ||||
-rw-r--r-- | net/tipc/server.h | 94 | ||||
-rw-r--r-- | net/tipc/socket.c | 146 | ||||
-rw-r--r-- | net/tipc/subscr.c | 348 | ||||
-rw-r--r-- | net/tipc/subscr.h | 21 | ||||
-rw-r--r-- | net/tipc/sysctl.c | 64 |
23 files changed, 1154 insertions, 885 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 4df8e02d900..b282f7130d2 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,6 +8,7 @@ tipc-y += addr.o bcast.o bearer.o config.o \ core.o handler.o link.o discover.o msg.o \ name_distr.o subscr.o name_table.o net.o \ netlink.o node.o node_subscr.o port.o ref.o \ - socket.o log.o eth_media.o + socket.o log.o eth_media.o server.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o +tipc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e5f3da50782..716de1ac6cb 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -578,8 +578,7 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) * Returns 0 (packet sent successfully) under all circumstances, * since the broadcast link's pseudo-bearer never blocks */ -static int tipc_bcbearer_send(struct sk_buff *buf, - struct tipc_bearer *unused1, +static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, struct tipc_media_addr *unused2) { int bp_index; diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index a93306557e0..6ee587b469f 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -75,7 +75,8 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); /** * tipc_nmap_equal - test for equality of node maps */ -static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b) +static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, + struct tipc_node_map *nm_b) { return !memcmp(nm_a, nm_b, sizeof(*nm_a)); } diff --git a/net/tipc/config.c b/net/tipc/config.c index f67866c765d..c301a9a592d 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -2,7 +2,7 @@ * net/tipc/config.c: TIPC configuration management code * * Copyright (c) 2002-2006, Ericsson AB - * Copyright (c) 2004-2007, 2010-2012, Wind River Systems + * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,12 +38,12 @@ #include "port.h" #include "name_table.h" #include "config.h" +#include "server.h" #define REPLY_TRUNCATED "<truncated>\n" -static u32 config_port_ref; - -static DEFINE_SPINLOCK(config_lock); +static DEFINE_MUTEX(config_mutex); +static struct tipc_server cfgsrv; static const void *req_tlv_area; /* request message TLV area */ static int req_tlv_space; /* request message TLV area size */ @@ -181,18 +181,7 @@ static struct sk_buff *cfg_set_own_addr(void) if (tipc_own_addr) return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (cannot change node address once assigned)"); - - /* - * Must temporarily release configuration spinlock while switching into - * networking mode as it calls tipc_eth_media_start(), which may sleep. - * Releasing the lock is harmless as other locally-issued configuration - * commands won't occur until this one completes, and remotely-issued - * configuration commands can't be received until a local configuration - * command to enable the first bearer is received and processed. - */ - spin_unlock_bh(&config_lock); tipc_core_start_net(addr); - spin_lock_bh(&config_lock); return tipc_cfg_reply_none(); } @@ -248,7 +237,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area { struct sk_buff *rep_tlv_buf; - spin_lock_bh(&config_lock); + mutex_lock(&config_mutex); /* Save request and reply details in a well-known location */ req_tlv_area = request_area; @@ -377,37 +366,31 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area /* Return reply buffer */ exit: - spin_unlock_bh(&config_lock); + mutex_unlock(&config_mutex); return rep_tlv_buf; } -static void cfg_named_msg_event(void *userdata, - u32 port_ref, - struct sk_buff **buf, - const unchar *msg, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) +static void cfg_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { struct tipc_cfg_msg_hdr *req_hdr; struct tipc_cfg_msg_hdr *rep_hdr; struct sk_buff *rep_buf; + int ret; /* Validate configuration message header (ignore invalid message) */ - req_hdr = (struct tipc_cfg_msg_hdr *)msg; - if ((size < sizeof(*req_hdr)) || - (size != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || + req_hdr = (struct tipc_cfg_msg_hdr *)buf; + if ((len < sizeof(*req_hdr)) || + (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || (ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) { pr_warn("Invalid configuration message discarded\n"); return; } /* Generate reply for request (if can't, return request) */ - rep_buf = tipc_cfg_do_cmd(orig->node, - ntohs(req_hdr->tcm_type), - msg + sizeof(*req_hdr), - size - sizeof(*req_hdr), + rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type), + buf + sizeof(*req_hdr), + len - sizeof(*req_hdr), BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr)); if (rep_buf) { skb_push(rep_buf, sizeof(*rep_hdr)); @@ -415,57 +398,51 @@ static void cfg_named_msg_event(void *userdata, memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr)); rep_hdr->tcm_len = htonl(rep_buf->len); rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST); - } else { - rep_buf = *buf; - *buf = NULL; - } - /* NEED TO ADD CODE TO HANDLE FAILED SEND (SUCH AS CONGESTION) */ - tipc_send_buf2port(port_ref, orig, rep_buf, rep_buf->len); + ret = tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data, + rep_buf->len); + if (ret < 0) + pr_err("Sending cfg reply message failed, no memory\n"); + + kfree_skb(rep_buf); + } } +static struct sockaddr_tipc cfgsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_CFG_SRV, + .addr.nameseq.lower = 0, + .addr.nameseq.upper = 0, + .scope = TIPC_ZONE_SCOPE +}; + +static struct tipc_server cfgsrv __read_mostly = { + .saddr = &cfgsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_RDM, + .max_rcvbuf_size = 64 * 1024, + .name = "cfg_server", + .tipc_conn_recvmsg = cfg_conn_msg_event, + .tipc_conn_new = NULL, + .tipc_conn_shutdown = NULL +}; + int tipc_cfg_init(void) { - struct tipc_name_seq seq; - int res; - - res = tipc_createport(NULL, TIPC_CRITICAL_IMPORTANCE, - NULL, NULL, NULL, - NULL, cfg_named_msg_event, NULL, - NULL, &config_port_ref); - if (res) - goto failed; - - seq.type = TIPC_CFG_SRV; - seq.lower = seq.upper = tipc_own_addr; - res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq); - if (res) - goto failed; - - return 0; - -failed: - pr_err("Unable to create configuration service\n"); - return res; + return tipc_server_start(&cfgsrv); } void tipc_cfg_reinit(void) { - struct tipc_name_seq seq; - int res; - - seq.type = TIPC_CFG_SRV; - seq.lower = seq.upper = 0; - tipc_withdraw(config_port_ref, TIPC_ZONE_SCOPE, &seq); + tipc_server_stop(&cfgsrv); - seq.lower = seq.upper = tipc_own_addr; - res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq); - if (res) - pr_err("Unable to reinitialize configuration service\n"); + cfgsrv_addr.addr.nameseq.lower = tipc_own_addr; + cfgsrv_addr.addr.nameseq.upper = tipc_own_addr; + tipc_server_start(&cfgsrv); } void tipc_cfg_stop(void) { - tipc_deleteport(config_port_ref); - config_port_ref = 0; + tipc_server_stop(&cfgsrv); } diff --git a/net/tipc/core.c b/net/tipc/core.c index 7ec2c1eb94f..fd4eeeaa972 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -2,7 +2,7 @@ * net/tipc/core.c: TIPC module code * * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005-2006, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,6 +39,7 @@ #include "name_table.h" #include "subscr.h" #include "config.h" +#include "port.h" #include <linux/module.h> @@ -50,7 +51,7 @@ u32 tipc_own_addr __read_mostly; int tipc_max_ports __read_mostly; int tipc_net_id __read_mostly; int tipc_remote_management __read_mostly; - +int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */ /** * tipc_buf_acquire - creates a TIPC message buffer @@ -118,6 +119,7 @@ static void tipc_core_stop(void) tipc_nametbl_stop(); tipc_ref_table_stop(); tipc_socket_stop(); + tipc_unregister_sysctl(); } /** @@ -135,20 +137,21 @@ static int tipc_core_start(void) if (!res) res = tipc_nametbl_init(); if (!res) - res = tipc_subscr_start(); - if (!res) - res = tipc_cfg_init(); - if (!res) res = tipc_netlink_start(); if (!res) res = tipc_socket_init(); + if (!res) + res = tipc_register_sysctl(); + if (!res) + res = tipc_subscr_start(); + if (!res) + res = tipc_cfg_init(); if (res) tipc_core_stop(); return res; } - static int __init tipc_init(void) { int res; @@ -160,6 +163,11 @@ static int __init tipc_init(void) tipc_max_ports = CONFIG_TIPC_PORTS; tipc_net_id = 4711; + sysctl_tipc_rmem[0] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE; + sysctl_tipc_rmem[1] = CONN_OVERLOAD_LIMIT >> 4 << + TIPC_CRITICAL_IMPORTANCE; + sysctl_tipc_rmem[2] = CONN_OVERLOAD_LIMIT; + res = tipc_core_start(); if (res) pr_err("Unable to start in single node mode\n"); diff --git a/net/tipc/core.h b/net/tipc/core.h index 0207db04179..be72f8cebc5 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -1,8 +1,8 @@ /* * net/tipc/core.h: Include file for TIPC global declarations * - * Copyright (c) 2005-2006, Ericsson AB - * Copyright (c) 2005-2007, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2013 Ericsson AB + * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -80,6 +80,7 @@ extern u32 tipc_own_addr __read_mostly; extern int tipc_max_ports __read_mostly; extern int tipc_net_id __read_mostly; extern int tipc_remote_management __read_mostly; +extern int sysctl_tipc_rmem[3] __read_mostly; /* * Other global variables @@ -96,6 +97,18 @@ extern int tipc_netlink_start(void); extern void tipc_netlink_stop(void); extern int tipc_socket_init(void); extern void tipc_socket_stop(void); +extern int tipc_sock_create_local(int type, struct socket **res); +extern void tipc_sock_release_local(struct socket *sock); +extern int tipc_sock_accept_local(struct socket *sock, + struct socket **newsock, int flags); + +#ifdef CONFIG_SYSCTL +extern int tipc_register_sysctl(void); +extern void tipc_unregister_sysctl(void); +#else +#define tipc_register_sysctl() 0 +#define tipc_unregister_sysctl() +#endif /* * TIPC timer and signal code diff --git a/net/tipc/discover.c b/net/tipc/discover.c index eedff58d038..ecc758c6eac 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -70,8 +70,7 @@ struct tipc_link_req { * @dest_domain: network domain of node(s) which should respond to message * @b_ptr: ptr to bearer issuing message */ -static struct sk_buff *tipc_disc_init_msg(u32 type, - u32 dest_domain, +static struct sk_buff *tipc_disc_init_msg(u32 type, u32 dest_domain, struct tipc_bearer *b_ptr) { struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE); @@ -346,8 +345,8 @@ exit: * * Returns 0 if successful, otherwise -errno. */ -int tipc_disc_create(struct tipc_bearer *b_ptr, - struct tipc_media_addr *dest, u32 dest_domain) +int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, + u32 dest_domain) { struct tipc_link_req *req; diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index 120a676a336..40ea40cf620 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -62,7 +62,7 @@ static struct eth_bearer eth_bearers[MAX_ETH_BEARERS]; static int eth_started; static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *dv); + void *dv); /* * Network device notifier info */ @@ -162,8 +162,7 @@ static void setup_bearer(struct work_struct *work) */ static int enable_bearer(struct tipc_bearer *tb_ptr) { - struct net_device *dev = NULL; - struct net_device *pdev = NULL; + struct net_device *dev; struct eth_bearer *eb_ptr = ð_bearers[0]; struct eth_bearer *stop = ð_bearers[MAX_ETH_BEARERS]; char *driver_name = strchr((const char *)tb_ptr->name, ':') + 1; @@ -178,15 +177,7 @@ static int enable_bearer(struct tipc_bearer *tb_ptr) } /* Find device with specified name */ - read_lock(&dev_base_lock); - for_each_netdev(&init_net, pdev) { - if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) { - dev = pdev; - dev_hold(dev); - break; - } - } - read_unlock(&dev_base_lock); + dev = dev_get_by_name(&init_net, driver_name); if (!dev) return -ENODEV; @@ -251,9 +242,9 @@ static void disable_bearer(struct tipc_bearer *tb_ptr) * specified device. */ static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *dv) + void *ptr) { - struct net_device *dev = (struct net_device *)dv; + struct net_device *dev = netdev_notifier_info_to_dev(ptr); struct eth_bearer *eb_ptr = ð_bearers[0]; struct eth_bearer *stop = ð_bearers[MAX_ETH_BEARERS]; diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c index 2a2864c25e1..9934a32bfa8 100644 --- a/net/tipc/ib_media.c +++ b/net/tipc/ib_media.c @@ -155,8 +155,7 @@ static void setup_bearer(struct work_struct *work) */ static int enable_bearer(struct tipc_bearer *tb_ptr) { - struct net_device *dev = NULL; - struct net_device *pdev = NULL; + struct net_device *dev; struct ib_bearer *ib_ptr = &ib_bearers[0]; struct ib_bearer *stop = &ib_bearers[MAX_IB_BEARERS]; char *driver_name = strchr((const char *)tb_ptr->name, ':') + 1; @@ -171,15 +170,7 @@ static int enable_bearer(struct tipc_bearer *tb_ptr) } /* Find device with specified name */ - read_lock(&dev_base_lock); - for_each_netdev(&init_net, pdev) { - if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) { - dev = pdev; - dev_hold(dev); - break; - } - } - read_unlock(&dev_base_lock); + dev = dev_get_by_name(&init_net, driver_name); if (!dev) return -ENODEV; @@ -244,9 +235,9 @@ static void disable_bearer(struct tipc_bearer *tb_ptr) * specified device. */ static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *dv) + void *ptr) { - struct net_device *dev = (struct net_device *)dv; + struct net_device *dev = netdev_notifier_info_to_dev(ptr); struct ib_bearer *ib_ptr = &ib_bearers[0]; struct ib_bearer *stop = &ib_bearers[MAX_IB_BEARERS]; @@ -301,13 +292,7 @@ static int ib_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) if (str_size < 60) /* 60 = 19 * strlen("xx:") + strlen("xx\0") */ return 1; - sprintf(str_buf, "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:" - "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x", - a->value[0], a->value[1], a->value[2], a->value[3], - a->value[4], a->value[5], a->value[6], a->value[7], - a->value[8], a->value[9], a->value[10], a->value[11], - a->value[12], a->value[13], a->value[14], a->value[15], - a->value[16], a->value[17], a->value[18], a->value[19]); + sprintf(str_buf, "%20phC", a->value); return 0; } diff --git a/net/tipc/link.c b/net/tipc/link.c index a80feee5197..0cc3d9015c5 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2,7 +2,7 @@ * net/tipc/link.c: TIPC link code * * Copyright (c) 1996-2007, 2012, Ericsson AB - * Copyright (c) 2004-2007, 2010-2011, Wind River Systems + * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,6 +41,8 @@ #include "discover.h" #include "config.h" +#include <linux/pkt_sched.h> + /* * Error message prefixes */ @@ -771,8 +773,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) * link_bundle_buf(): Append contents of a buffer to * the tail of an existing one. */ -static int link_bundle_buf(struct tipc_link *l_ptr, - struct sk_buff *bundler, +static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler, struct sk_buff *buf) { struct tipc_msg *bundler_msg = buf_msg(bundler); @@ -1057,40 +1058,6 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, } /* - * tipc_send_buf_fast: Entry for data messages where the - * destination node is known and the header is complete, - * inclusive total message length. - * Returns user data length. - */ -int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) -{ - struct tipc_link *l_ptr; - struct tipc_node *n_ptr; - int res; - u32 selector = msg_origport(buf_msg(buf)) & 1; - u32 dummy; - - read_lock_bh(&tipc_net_lock); - n_ptr = tipc_node_find(destnode); - if (likely(n_ptr)) { - tipc_node_lock(n_ptr); - l_ptr = n_ptr->active_links[selector]; - if (likely(l_ptr)) { - res = link_send_buf_fast(l_ptr, buf, &dummy); - tipc_node_unlock(n_ptr); - read_unlock_bh(&tipc_net_lock); - return res; - } - tipc_node_unlock(n_ptr); - } - read_unlock_bh(&tipc_net_lock); - res = msg_data_sz(buf_msg(buf)); - tipc_reject_msg(buf, TIPC_ERR_NO_NODE); - return res; -} - - -/* * tipc_link_send_sections_fast: Entry for messages where the * destination processor is known and the header is complete, * except for total message length. @@ -1098,8 +1065,7 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) */ int tipc_link_send_sections_fast(struct tipc_port *sender, struct iovec const *msg_sect, - const u32 num_sect, - unsigned int total_len, + const u32 num_sect, unsigned int total_len, u32 destaddr) { struct tipc_msg *hdr = &sender->phdr; @@ -1115,7 +1081,10 @@ again: * (Must not hold any locks while building message.) */ res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, - sender->max_pkt, !sender->user_port, &buf); + sender->max_pkt, &buf); + /* Exit if build request was invalid */ + if (unlikely(res < 0)) + return res; read_lock_bh(&tipc_net_lock); node = tipc_node_find(destaddr); @@ -1132,10 +1101,6 @@ exit: return res; } - /* Exit if build request was invalid */ - if (unlikely(res < 0)) - goto exit; - /* Exit if link (or bearer) is congested */ if (link_congested(l_ptr) || tipc_bearer_blocked(l_ptr->b_ptr)) { @@ -1189,8 +1154,7 @@ exit: */ static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, - u32 num_sect, - unsigned int total_len, + u32 num_sect, unsigned int total_len, u32 destaddr) { struct tipc_link *l_ptr; @@ -1204,6 +1168,7 @@ static int link_send_sections_long(struct tipc_port *sender, const unchar *sect_crs; int curr_sect; u32 fragm_no; + int res = 0; again: fragm_no = 1; @@ -1250,18 +1215,15 @@ again: else sz = fragm_rest; - if (likely(!sender->user_port)) { - if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { + if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { + res = -EFAULT; error: - for (; buf_chain; buf_chain = buf) { - buf = buf_chain->next; - kfree_skb(buf_chain); - } - return -EFAULT; + for (; buf_chain; buf_chain = buf) { + buf = buf_chain->next; + kfree_skb(buf_chain); } - } else - skb_copy_to_linear_data_offset(buf, fragm_crs, - sect_crs, sz); + return res; + } sect_crs += sz; sect_rest -= sz; fragm_crs += sz; @@ -1281,8 +1243,10 @@ error: msg_set_fragm_no(&fragm_hdr, ++fragm_no); prev = buf; buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE); - if (!buf) + if (!buf) { + res = -ENOMEM; goto error; + } buf->next = NULL; prev->next = buf; @@ -1446,7 +1410,7 @@ static void link_reset_all(unsigned long addr) } static void link_retransmit_failure(struct tipc_link *l_ptr, - struct sk_buff *buf) + struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); @@ -1901,8 +1865,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, * Send protocol message to the other endpoint. */ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, - int probe_msg, u32 gap, u32 tolerance, - u32 priority, u32 ack_mtu) + int probe_msg, u32 gap, u32 tolerance, + u32 priority, u32 ack_mtu) { struct sk_buff *buf = NULL; struct tipc_msg *msg = l_ptr->pmsg; @@ -1988,6 +1952,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, return; skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); + buf->priority = TC_PRIO_CONTROL; /* Defer message if bearer is already blocked */ if (tipc_bearer_blocked(l_ptr->b_ptr)) { @@ -2145,8 +2110,7 @@ exit: * another bearer. Owner node is locked. */ static void tipc_link_tunnel(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, - struct tipc_msg *msg, + struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, u32 selector) { struct tipc_link *tunnel; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index f2db8a87d9c..ced60e2fc4f 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -51,8 +51,8 @@ u32 tipc_msg_tot_importance(struct tipc_msg *m) } -void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, - u32 hsize, u32 destnode) +void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, + u32 destnode) { memset(m, 0, hsize); msg_set_version(m); @@ -73,8 +73,8 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, * Returns message data size or errno */ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - u32 num_sect, unsigned int total_len, - int max_size, int usrmem, struct sk_buff **buf) + u32 num_sect, unsigned int total_len, int max_size, + struct sk_buff **buf) { int dsz, sz, hsz, pos, res, cnt; @@ -92,14 +92,9 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, return -ENOMEM; skb_copy_to_linear_data(*buf, hdr, hsz); for (res = 1, cnt = 0; res && (cnt < num_sect); cnt++) { - if (likely(usrmem)) - res = !copy_from_user((*buf)->data + pos, - msg_sect[cnt].iov_base, - msg_sect[cnt].iov_len); - else - skb_copy_to_linear_data_offset(*buf, pos, - msg_sect[cnt].iov_base, - msg_sect[cnt].iov_len); + skb_copy_to_linear_data_offset(*buf, pos, + msg_sect[cnt].iov_base, + msg_sect[cnt].iov_len); pos += msg_sect[cnt].iov_len; } if (likely(res)) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index ba2a72beea6..5e4ccf5c27d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -719,9 +719,9 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) } u32 tipc_msg_tot_importance(struct tipc_msg *m); -void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, - u32 hsize, u32 destnode); +void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, + u32 destnode); int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect, - u32 num_sect, unsigned int total_len, - int max_size, int usrmem, struct sk_buff **buf); + u32 num_sect, unsigned int total_len, int max_size, + struct sk_buff **buf); #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 24b16791431..09dcd54b04e 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -440,7 +440,7 @@ found: * sequence overlapping with the requested sequence */ static void tipc_nameseq_subscribe(struct name_seq *nseq, - struct tipc_subscription *s) + struct tipc_subscription *s) { struct sub_seq *sseq = nseq->sseqs; @@ -662,7 +662,7 @@ exit: * tipc_nametbl_publish - add name publication to network name tables */ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, - u32 scope, u32 port_ref, u32 key) + u32 scope, u32 port_ref, u32 key) { struct publication *publ; @@ -753,7 +753,7 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) * subseq_list - print specified sub-sequence contents into the given buffer */ static int subseq_list(struct sub_seq *sseq, char *buf, int len, u32 depth, - u32 index) + u32 index) { char portIdStr[27]; const char *scope_str[] = {"", " zone", " cluster", " node"}; @@ -792,7 +792,7 @@ static int subseq_list(struct sub_seq *sseq, char *buf, int len, u32 depth, * nameseq_list - print specified name sequence contents into the given buffer */ static int nameseq_list(struct name_seq *seq, char *buf, int len, u32 depth, - u32 type, u32 lowbound, u32 upbound, u32 index) + u32 type, u32 lowbound, u32 upbound, u32 index) { struct sub_seq *sseq; char typearea[11]; @@ -849,7 +849,7 @@ static int nametbl_header(char *buf, int len, u32 depth) * nametbl_list - print specified name table contents into the given buffer */ static int nametbl_list(char *buf, int len, u32 depth_info, - u32 type, u32 lowbound, u32 upbound) + u32 type, u32 lowbound, u32 upbound) { struct hlist_head *seq_head; struct name_seq *seq; diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 71cb4dc712d..f02f48b9a21 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -87,14 +87,15 @@ extern rwlock_t tipc_nametbl_lock; struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space); u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit, - struct tipc_port_list *dports); + struct tipc_port_list *dports); struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, - u32 scope, u32 port_ref, u32 key); + u32 scope, u32 port_ref, u32 key); int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key); struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper, - u32 scope, u32 node, u32 ref, u32 key); -struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, - u32 node, u32 ref, u32 key); + u32 scope, u32 node, u32 ref, + u32 key); +struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, u32 node, + u32 ref, u32 key); void tipc_nametbl_subscribe(struct tipc_subscription *s); void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(void); diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c index 5e34b015da4..8a7384c04ad 100644 --- a/net/tipc/node_subscr.c +++ b/net/tipc/node_subscr.c @@ -42,7 +42,7 @@ * tipc_nodesub_subscribe - create "node down" subscription for specified node */ void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, - void *usr_handle, net_ev_handler handle_down) + void *usr_handle, net_ev_handler handle_down) { if (in_own_node(addr)) { node_sub->node = NULL; diff --git a/net/tipc/port.c b/net/tipc/port.c index 18098cac62f..b3ed2fcab4f 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -2,7 +2,7 @@ * net/tipc/port.c: TIPC port code * * Copyright (c) 1992-2007, Ericsson AB - * Copyright (c) 2004-2008, 2010-2011, Wind River Systems + * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -46,11 +46,7 @@ #define MAX_REJECT_SIZE 1024 -static struct sk_buff *msg_queue_head; -static struct sk_buff *msg_queue_tail; - DEFINE_SPINLOCK(tipc_port_list_lock); -static DEFINE_SPINLOCK(queue_lock); static LIST_HEAD(ports); static void port_handle_node_down(unsigned long ref); @@ -119,7 +115,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, msg_set_nameupper(hdr, seq->upper); msg_set_hdr_sz(hdr, MCAST_H_SIZE); res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, - !oport->user_port, &buf); + &buf); if (unlikely(!buf)) return res; @@ -206,14 +202,15 @@ exit: } /** - * tipc_createport_raw - create a generic TIPC port + * tipc_createport - create a generic TIPC port * * Returns pointer to (locked) TIPC port, or NULL if unable to create it */ -struct tipc_port *tipc_createport_raw(void *usr_handle, - u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), - void (*wakeup)(struct tipc_port *), - const u32 importance) +struct tipc_port *tipc_createport(struct sock *sk, + u32 (*dispatcher)(struct tipc_port *, + struct sk_buff *), + void (*wakeup)(struct tipc_port *), + const u32 importance) { struct tipc_port *p_ptr; struct tipc_msg *msg; @@ -231,14 +228,13 @@ struct tipc_port *tipc_createport_raw(void *usr_handle, return NULL; } - p_ptr->usr_handle = usr_handle; + p_ptr->sk = sk; p_ptr->max_pkt = MAX_PKT_DEFAULT; p_ptr->ref = ref; INIT_LIST_HEAD(&p_ptr->wait_list); INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); p_ptr->dispatcher = dispatcher; p_ptr->wakeup = wakeup; - p_ptr->user_port = NULL; k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); INIT_LIST_HEAD(&p_ptr->publications); INIT_LIST_HEAD(&p_ptr->port_list); @@ -275,7 +271,6 @@ int tipc_deleteport(u32 ref) buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); tipc_nodesub_unsubscribe(&p_ptr->subscription); } - kfree(p_ptr->user_port); spin_lock_bh(&tipc_port_list_lock); list_del(&p_ptr->port_list); @@ -448,7 +443,7 @@ int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, int res; res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, - !p_ptr->user_port, &buf); + &buf); if (!buf) return res; @@ -668,215 +663,6 @@ void tipc_port_reinit(void) spin_unlock_bh(&tipc_port_list_lock); } - -/* - * port_dispatcher_sigh(): Signal handler for messages destinated - * to the tipc_port interface. - */ -static void port_dispatcher_sigh(void *dummy) -{ - struct sk_buff *buf; - - spin_lock_bh(&queue_lock); - buf = msg_queue_head; - msg_queue_head = NULL; - spin_unlock_bh(&queue_lock); - - while (buf) { - struct tipc_port *p_ptr; - struct user_port *up_ptr; - struct tipc_portid orig; - struct tipc_name_seq dseq; - void *usr_handle; - int connected; - int peer_invalid; - int published; - u32 message_type; - - struct sk_buff *next = buf->next; - struct tipc_msg *msg = buf_msg(buf); - u32 dref = msg_destport(msg); - - message_type = msg_type(msg); - if (message_type > TIPC_DIRECT_MSG) - goto reject; /* Unsupported message type */ - - p_ptr = tipc_port_lock(dref); - if (!p_ptr) - goto reject; /* Port deleted while msg in queue */ - - orig.ref = msg_origport(msg); - orig.node = msg_orignode(msg); - up_ptr = p_ptr->user_port; - usr_handle = up_ptr->usr_handle; - connected = p_ptr->connected; - peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg); - published = p_ptr->published; - - if (unlikely(msg_errcode(msg))) - goto err; - - switch (message_type) { - - case TIPC_CONN_MSG:{ - tipc_conn_msg_event cb = up_ptr->conn_msg_cb; - u32 dsz; - - tipc_port_unlock(p_ptr); - if (unlikely(!cb)) - goto reject; - if (unlikely(!connected)) { - if (tipc_connect(dref, &orig)) - goto reject; - } else if (peer_invalid) - goto reject; - dsz = msg_data_sz(msg); - if (unlikely(dsz && - (++p_ptr->conn_unacked >= - TIPC_FLOW_CONTROL_WIN))) - tipc_acknowledge(dref, - p_ptr->conn_unacked); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), dsz); - break; - } - case TIPC_DIRECT_MSG:{ - tipc_msg_event cb = up_ptr->msg_cb; - - tipc_port_unlock(p_ptr); - if (unlikely(!cb || connected)) - goto reject; - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_importance(msg), - &orig); - break; - } - case TIPC_MCAST_MSG: - case TIPC_NAMED_MSG:{ - tipc_named_msg_event cb = up_ptr->named_msg_cb; - - tipc_port_unlock(p_ptr); - if (unlikely(!cb || connected || !published)) - goto reject; - dseq.type = msg_nametype(msg); - dseq.lower = msg_nameinst(msg); - dseq.upper = (message_type == TIPC_NAMED_MSG) - ? dseq.lower : msg_nameupper(msg); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_importance(msg), - &orig, &dseq); - break; - } - } - if (buf) - kfree_skb(buf); - buf = next; - continue; -err: - switch (message_type) { - - case TIPC_CONN_MSG:{ - tipc_conn_shutdown_event cb = - up_ptr->conn_err_cb; - - tipc_port_unlock(p_ptr); - if (!cb || !connected || peer_invalid) - break; - tipc_disconnect(dref); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_errcode(msg)); - break; - } - case TIPC_DIRECT_MSG:{ - tipc_msg_err_event cb = up_ptr->err_cb; - - tipc_port_unlock(p_ptr); - if (!cb || connected) - break; - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_errcode(msg), &orig); - break; - } - case TIPC_MCAST_MSG: - case TIPC_NAMED_MSG:{ - tipc_named_msg_err_event cb = - up_ptr->named_err_cb; - - tipc_port_unlock(p_ptr); - if (!cb || connected) - break; - dseq.type = msg_nametype(msg); - dseq.lower = msg_nameinst(msg); - dseq.upper = (message_type == TIPC_NAMED_MSG) - ? dseq.lower : msg_nameupper(msg); - skb_pull(buf, msg_hdr_sz(msg)); - cb(usr_handle, dref, &buf, msg_data(msg), - msg_data_sz(msg), msg_errcode(msg), &dseq); - break; - } - } - if (buf) - kfree_skb(buf); - buf = next; - continue; -reject: - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); - buf = next; - } -} - -/* - * port_dispatcher(): Dispatcher for messages destinated - * to the tipc_port interface. Called with port locked. - */ -static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) -{ - buf->next = NULL; - spin_lock_bh(&queue_lock); - if (msg_queue_head) { - msg_queue_tail->next = buf; - msg_queue_tail = buf; - } else { - msg_queue_tail = msg_queue_head = buf; - tipc_k_signal((Handler)port_dispatcher_sigh, 0); - } - spin_unlock_bh(&queue_lock); - return 0; -} - -/* - * Wake up port after congestion: Called with port locked - */ -static void port_wakeup_sh(unsigned long ref) -{ - struct tipc_port *p_ptr; - struct user_port *up_ptr; - tipc_continue_event cb = NULL; - void *uh = NULL; - - p_ptr = tipc_port_lock(ref); - if (p_ptr) { - up_ptr = p_ptr->user_port; - if (up_ptr) { - cb = up_ptr->continue_event_cb; - uh = up_ptr->usr_handle; - } - tipc_port_unlock(p_ptr); - } - if (cb) - cb(uh, ref); -} - - -static void port_wakeup(struct tipc_port *p_ptr) -{ - tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); -} - void tipc_acknowledge(u32 ref, u32 ack) { struct tipc_port *p_ptr; @@ -893,50 +679,6 @@ void tipc_acknowledge(u32 ref, u32 ack) tipc_net_route_msg(buf); } -/* - * tipc_createport(): user level call. - */ -int tipc_createport(void *usr_handle, - unsigned int importance, - tipc_msg_err_event error_cb, - tipc_named_msg_err_event named_error_cb, - tipc_conn_shutdown_event conn_error_cb, - tipc_msg_event msg_cb, - tipc_named_msg_event named_msg_cb, - tipc_conn_msg_event conn_msg_cb, - tipc_continue_event continue_event_cb, /* May be zero */ - u32 *portref) -{ - struct user_port *up_ptr; - struct tipc_port *p_ptr; - - up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); - if (!up_ptr) { - pr_warn("Port creation failed, no memory\n"); - return -ENOMEM; - } - p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, - importance); - if (!p_ptr) { - kfree(up_ptr); - return -ENOMEM; - } - - p_ptr->user_port = up_ptr; - up_ptr->usr_handle = usr_handle; - up_ptr->ref = p_ptr->ref; - up_ptr->err_cb = error_cb; - up_ptr->named_err_cb = named_error_cb; - up_ptr->conn_err_cb = conn_error_cb; - up_ptr->msg_cb = msg_cb; - up_ptr->named_msg_cb = named_msg_cb; - up_ptr->conn_msg_cb = conn_msg_cb; - up_ptr->continue_event_cb = continue_event_cb; - *portref = p_ptr->ref; - tipc_port_unlock(p_ptr); - return 0; -} - int tipc_portimportance(u32 ref, unsigned int *importance) { struct tipc_port *p_ptr; @@ -1184,7 +926,7 @@ static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_se int res; res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len, - MAX_MSG_SIZE, !sender->user_port, &buf); + MAX_MSG_SIZE, &buf); if (likely(buf)) tipc_port_recv_msg(buf); return res; @@ -1322,43 +1064,3 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest, } return -ELINKCONG; } - -/** - * tipc_send_buf2port - send message buffer to port identity - */ -int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest, - struct sk_buff *buf, unsigned int dsz) -{ - struct tipc_port *p_ptr; - struct tipc_msg *msg; - int res; - - p_ptr = (struct tipc_port *)tipc_ref_deref(ref); - if (!p_ptr || p_ptr->connected) - return -EINVAL; - - msg = &p_ptr->phdr; - msg_set_type(msg, TIPC_DIRECT_MSG); - msg_set_destnode(msg, dest->node); - msg_set_destport(msg, dest->ref); - msg_set_hdr_sz(msg, BASIC_H_SIZE); - msg_set_size(msg, BASIC_H_SIZE + dsz); - if (skb_cow(buf, BASIC_H_SIZE)) - return -ENOMEM; - - skb_push(buf, BASIC_H_SIZE); - skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE); - - if (in_own_node(dest->node)) - res = tipc_port_recv_msg(buf); - else - res = tipc_send_buf_fast(buf, dest->node); - if (likely(res != -ELINKCONG)) { - if (res > 0) - p_ptr->sent++; - return res; - } - if (port_unreliable(p_ptr)) - return dsz; - return -ELINKCONG; -} diff --git a/net/tipc/port.h b/net/tipc/port.h index fb66e2e5f4d..5a7026b9c34 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -2,7 +2,7 @@ * net/tipc/port.h: Include file for TIPC port code * * Copyright (c) 1994-2007, Ericsson AB - * Copyright (c) 2004-2007, 2010-2011, Wind River Systems + * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,60 +43,12 @@ #include "node_subscr.h" #define TIPC_FLOW_CONTROL_WIN 512 - -typedef void (*tipc_msg_err_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, int reason, - struct tipc_portid const *attmpt_destid); - -typedef void (*tipc_named_msg_err_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, int reason, - struct tipc_name_seq const *attmpt_dest); - -typedef void (*tipc_conn_shutdown_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, int reason); - -typedef void (*tipc_msg_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, unsigned int importance, - struct tipc_portid const *origin); - -typedef void (*tipc_named_msg_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size, unsigned int importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest); - -typedef void (*tipc_conn_msg_event) (void *usr_handle, u32 portref, - struct sk_buff **buf, unsigned char const *data, - unsigned int size); - -typedef void (*tipc_continue_event) (void *usr_handle, u32 portref); - -/** - * struct user_port - TIPC user port (used with native API) - * @usr_handle: user-specified field - * @ref: object reference to associated TIPC port - * - * <various callback routines> - */ -struct user_port { - void *usr_handle; - u32 ref; - tipc_msg_err_event err_cb; - tipc_named_msg_err_event named_err_cb; - tipc_conn_shutdown_event conn_err_cb; - tipc_msg_event msg_cb; - tipc_named_msg_event named_msg_cb; - tipc_conn_msg_event conn_msg_cb; - tipc_continue_event continue_event_cb; -}; +#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \ + SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) /** * struct tipc_port - TIPC port structure - * @usr_handle: pointer to additional user-defined information about port + * @sk: pointer to socket handle * @lock: pointer to spinlock for controlling access to port * @connected: non-zero if port is currently connected to a peer port * @conn_type: TIPC type used when connection was established @@ -110,7 +62,6 @@ struct user_port { * @port_list: adjacent ports in TIPC's global list of ports * @dispatcher: ptr to routine which handles received messages * @wakeup: ptr to routine to call when port is no longer congested - * @user_port: ptr to user port associated with port (if any) * @wait_list: adjacent ports in list of ports waiting on link congestion * @waiting_pkts: * @sent: # of non-empty messages sent by port @@ -123,7 +74,7 @@ struct user_port { * @subscription: "node down" subscription used to terminate failed connections */ struct tipc_port { - void *usr_handle; + struct sock *sk; spinlock_t *lock; int connected; u32 conn_type; @@ -137,7 +88,6 @@ struct tipc_port { struct list_head port_list; u32 (*dispatcher)(struct tipc_port *, struct sk_buff *); void (*wakeup)(struct tipc_port *); - struct user_port *user_port; struct list_head wait_list; u32 waiting_pkts; u32 sent; @@ -156,24 +106,16 @@ struct tipc_port_list; /* * TIPC port manipulation routines */ -struct tipc_port *tipc_createport_raw(void *usr_handle, - u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), - void (*wakeup)(struct tipc_port *), const u32 importance); +struct tipc_port *tipc_createport(struct sock *sk, + u32 (*dispatcher)(struct tipc_port *, + struct sk_buff *), + void (*wakeup)(struct tipc_port *), + const u32 importance); int tipc_reject_msg(struct sk_buff *buf, u32 err); -int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode); - void tipc_acknowledge(u32 port_ref, u32 ack); -int tipc_createport(void *usr_handle, - unsigned int importance, tipc_msg_err_event error_cb, - tipc_named_msg_err_event named_error_cb, - tipc_conn_shutdown_event conn_error_cb, tipc_msg_event msg_cb, - tipc_named_msg_event named_msg_cb, - tipc_conn_msg_event conn_msg_cb, - tipc_continue_event continue_event_cb, u32 *portref); - int tipc_deleteport(u32 portref); int tipc_portimportance(u32 portref, unsigned int *importance); @@ -186,9 +128,9 @@ int tipc_portunreturnable(u32 portref, unsigned int *isunreturnable); int tipc_set_portunreturnable(u32 portref, unsigned int isunreturnable); int tipc_publish(u32 portref, unsigned int scope, - struct tipc_name_seq const *name_seq); + struct tipc_name_seq const *name_seq); int tipc_withdraw(u32 portref, unsigned int scope, - struct tipc_name_seq const *name_seq); + struct tipc_name_seq const *name_seq); int tipc_connect(u32 portref, struct tipc_portid const *port); @@ -220,9 +162,6 @@ int tipc_send2port(u32 portref, struct tipc_portid const *dest, unsigned int num_sect, struct iovec const *msg_sect, unsigned int total_len); -int tipc_send_buf2port(u32 portref, struct tipc_portid const *dest, - struct sk_buff *buf, unsigned int dsz); - int tipc_multicast(u32 portref, struct tipc_name_seq const *seq, unsigned int section_count, struct iovec const *msg, unsigned int total_len); diff --git a/net/tipc/server.c b/net/tipc/server.c new file mode 100644 index 00000000000..fd3fa57a410 --- /dev/null +++ b/net/tipc/server.c @@ -0,0 +1,605 @@ +/* + * net/tipc/server.c: TIPC server infrastructure + * + * Copyright (c) 2012-2013, Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "core.h" +#include <net/sock.h> + +/* Number of messages to send before rescheduling */ +#define MAX_SEND_MSG_COUNT 25 +#define MAX_RECV_MSG_COUNT 25 +#define CF_CONNECTED 1 + +#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) + +/** + * struct tipc_conn - TIPC connection structure + * @kref: reference counter to connection object + * @conid: connection identifier + * @sock: socket handler associated with connection + * @flags: indicates connection state + * @server: pointer to connected server + * @rwork: receive work item + * @usr_data: user-specified field + * @rx_action: what to do when connection socket is active + * @outqueue: pointer to first outbound message in queue + * @outqueue_lock: controll access to the outqueue + * @outqueue: list of connection objects for its server + * @swork: send work item + */ +struct tipc_conn { + struct kref kref; + int conid; + struct socket *sock; + unsigned long flags; + struct tipc_server *server; + struct work_struct rwork; + int (*rx_action) (struct tipc_conn *con); + void *usr_data; + struct list_head outqueue; + spinlock_t outqueue_lock; + struct work_struct swork; +}; + +/* An entry waiting to be sent */ +struct outqueue_entry { + struct list_head list; + struct kvec iov; + struct sockaddr_tipc dest; +}; + +static void tipc_recv_work(struct work_struct *work); +static void tipc_send_work(struct work_struct *work); +static void tipc_clean_outqueues(struct tipc_conn *con); + +static void tipc_conn_kref_release(struct kref *kref) +{ + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); + struct tipc_server *s = con->server; + + if (con->sock) { + tipc_sock_release_local(con->sock); + con->sock = NULL; + } + + tipc_clean_outqueues(con); + + if (con->conid) + s->tipc_conn_shutdown(con->conid, con->usr_data); + + kfree(con); +} + +static void conn_put(struct tipc_conn *con) +{ + kref_put(&con->kref, tipc_conn_kref_release); +} + +static void conn_get(struct tipc_conn *con) +{ + kref_get(&con->kref); +} + +static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) +{ + struct tipc_conn *con; + + spin_lock_bh(&s->idr_lock); + con = idr_find(&s->conn_idr, conid); + if (con) + conn_get(con); + spin_unlock_bh(&s->idr_lock); + return con; +} + +static void sock_data_ready(struct sock *sk, int unused) +{ + struct tipc_conn *con; + + read_lock(&sk->sk_callback_lock); + con = sock2con(sk); + if (con && test_bit(CF_CONNECTED, &con->flags)) { + conn_get(con); + if (!queue_work(con->server->rcv_wq, &con->rwork)) + conn_put(con); + } + read_unlock(&sk->sk_callback_lock); +} + +static void sock_write_space(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock(&sk->sk_callback_lock); + con = sock2con(sk); + if (con && test_bit(CF_CONNECTED, &con->flags)) { + conn_get(con); + if (!queue_work(con->server->send_wq, &con->swork)) + conn_put(con); + } + read_unlock(&sk->sk_callback_lock); +} + +static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) +{ + struct sock *sk = sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + + sk->sk_data_ready = sock_data_ready; + sk->sk_write_space = sock_write_space; + sk->sk_user_data = con; + + con->sock = sock; + + write_unlock_bh(&sk->sk_callback_lock); +} + +static void tipc_unregister_callbacks(struct tipc_conn *con) +{ + struct sock *sk = con->sock->sk; + + write_lock_bh(&sk->sk_callback_lock); + sk->sk_user_data = NULL; + write_unlock_bh(&sk->sk_callback_lock); +} + +static void tipc_close_conn(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + + if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); + + tipc_unregister_callbacks(con); + + /* We shouldn't flush pending works as we may be in the + * thread. In fact the races with pending rx/tx work structs + * are harmless for us here as we have already deleted this + * connection from server connection list and set + * sk->sk_user_data to 0 before releasing connection object. + */ + kernel_sock_shutdown(con->sock, SHUT_RDWR); + + conn_put(con); + } +} + +static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) +{ + struct tipc_conn *con; + int ret; + + con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); + if (!con) + return ERR_PTR(-ENOMEM); + + kref_init(&con->kref); + INIT_LIST_HEAD(&con->outqueue); + spin_lock_init(&con->outqueue_lock); + INIT_WORK(&con->swork, tipc_send_work); + INIT_WORK(&con->rwork, tipc_recv_work); + + spin_lock_bh(&s->idr_lock); + ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); + if (ret < 0) { + kfree(con); + spin_unlock_bh(&s->idr_lock); + return ERR_PTR(-ENOMEM); + } + con->conid = ret; + s->idr_in_use++; + spin_unlock_bh(&s->idr_lock); + + set_bit(CF_CONNECTED, &con->flags); + con->server = s; + + return con; +} + +static int tipc_receive_from_sock(struct tipc_conn *con) +{ + struct msghdr msg = {}; + struct tipc_server *s = con->server; + struct sockaddr_tipc addr; + struct kvec iov; + void *buf; + int ret; + + buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); + if (!buf) { + ret = -ENOMEM; + goto out_close; + } + + iov.iov_base = buf; + iov.iov_len = s->max_rcvbuf_size; + msg.msg_name = &addr; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + MSG_DONTWAIT); + if (ret <= 0) { + kmem_cache_free(s->rcvbuf_cache, buf); + goto out_close; + } + + s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret); + + kmem_cache_free(s->rcvbuf_cache, buf); + + return 0; + +out_close: + if (ret != -EWOULDBLOCK) + tipc_close_conn(con); + else if (ret == 0) + /* Don't return success if we really got EOF */ + ret = -EAGAIN; + + return ret; +} + +static int tipc_accept_from_sock(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + struct socket *sock = con->sock; + struct socket *newsock; + struct tipc_conn *newcon; + int ret; + + ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK); + if (ret < 0) + return ret; + + newcon = tipc_alloc_conn(con->server); + if (IS_ERR(newcon)) { + ret = PTR_ERR(newcon); + sock_release(newsock); + return ret; + } + + newcon->rx_action = tipc_receive_from_sock; + tipc_register_callbacks(newsock, newcon); + + /* Notify that new connection is incoming */ + newcon->usr_data = s->tipc_conn_new(newcon->conid); + + /* Wake up receive process in case of 'SYN+' message */ + newsock->sk->sk_data_ready(newsock->sk, 0); + return ret; +} + +static struct socket *tipc_create_listen_sock(struct tipc_conn *con) +{ + struct tipc_server *s = con->server; + struct socket *sock = NULL; + int ret; + + ret = tipc_sock_create_local(s->type, &sock); + if (ret < 0) + return NULL; + ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&s->imp, sizeof(s->imp)); + if (ret < 0) + goto create_err; + ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); + if (ret < 0) + goto create_err; + + switch (s->type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + con->rx_action = tipc_accept_from_sock; + + ret = kernel_listen(sock, 0); + if (ret < 0) + goto create_err; + break; + case SOCK_DGRAM: + case SOCK_RDM: + con->rx_action = tipc_receive_from_sock; + break; + default: + pr_err("Unknown socket type %d\n", s->type); + goto create_err; + } + return sock; + +create_err: + sock_release(sock); + con->sock = NULL; + return NULL; +} + +static int tipc_open_listening_sock(struct tipc_server *s) +{ + struct socket *sock; + struct tipc_conn *con; + + con = tipc_alloc_conn(s); + if (IS_ERR(con)) + return PTR_ERR(con); + + sock = tipc_create_listen_sock(con); + if (!sock) { + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + kfree(con); + return -EINVAL; + } + + tipc_register_callbacks(sock, con); + return 0; +} + +static struct outqueue_entry *tipc_alloc_entry(void *data, int len) +{ + struct outqueue_entry *entry; + void *buf; + + entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); + if (!entry) + return NULL; + + buf = kmalloc(len, GFP_ATOMIC); + if (!buf) { + kfree(entry); + return NULL; + } + + memcpy(buf, data, len); + entry->iov.iov_base = buf; + entry->iov.iov_len = len; + + return entry; +} + +static void tipc_free_entry(struct outqueue_entry *e) +{ + kfree(e->iov.iov_base); + kfree(e); +} + +static void tipc_clean_outqueues(struct tipc_conn *con) +{ + struct outqueue_entry *e, *safe; + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + tipc_free_entry(e); + } + spin_unlock_bh(&con->outqueue_lock); +} + +int tipc_conn_sendmsg(struct tipc_server *s, int conid, + struct sockaddr_tipc *addr, void *data, size_t len) +{ + struct outqueue_entry *e; + struct tipc_conn *con; + + con = tipc_conn_lookup(s, conid); + if (!con) + return -EINVAL; + + e = tipc_alloc_entry(data, len); + if (!e) { + conn_put(con); + return -ENOMEM; + } + + if (addr) + memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); + + spin_lock_bh(&con->outqueue_lock); + list_add_tail(&e->list, &con->outqueue); + spin_unlock_bh(&con->outqueue_lock); + + if (test_bit(CF_CONNECTED, &con->flags)) + if (!queue_work(s->send_wq, &con->swork)) + conn_put(con); + + return 0; +} + +void tipc_conn_terminate(struct tipc_server *s, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(s, conid); + if (con) { + tipc_close_conn(con); + conn_put(con); + } +} + +static void tipc_send_to_sock(struct tipc_conn *con) +{ + int count = 0; + struct tipc_server *s = con->server; + struct outqueue_entry *e; + struct msghdr msg; + int ret; + + spin_lock_bh(&con->outqueue_lock); + while (1) { + e = list_entry(con->outqueue.next, struct outqueue_entry, + list); + if ((struct list_head *) e == &con->outqueue) + break; + spin_unlock_bh(&con->outqueue_lock); + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + + if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { + msg.msg_name = &e->dest; + msg.msg_namelen = sizeof(struct sockaddr_tipc); + } + ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, + e->iov.iov_len); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + goto out; + } else if (ret < 0) { + goto send_err; + } + + /* Don't starve users filling buffers */ + if (++count >= MAX_SEND_MSG_COUNT) { + cond_resched(); + count = 0; + } + + spin_lock_bh(&con->outqueue_lock); + list_del(&e->list); + tipc_free_entry(e); + } + spin_unlock_bh(&con->outqueue_lock); +out: + return; + +send_err: + tipc_close_conn(con); +} + +static void tipc_recv_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); + int count = 0; + + while (test_bit(CF_CONNECTED, &con->flags)) { + if (con->rx_action(con)) + break; + + /* Don't flood Rx machine */ + if (++count >= MAX_RECV_MSG_COUNT) { + cond_resched(); + count = 0; + } + } + conn_put(con); +} + +static void tipc_send_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); + + if (test_bit(CF_CONNECTED, &con->flags)) + tipc_send_to_sock(con); + + conn_put(con); +} + +static void tipc_work_stop(struct tipc_server *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + +static int tipc_work_start(struct tipc_server *s) +{ + s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1); + if (!s->rcv_wq) { + pr_err("can't start tipc receive workqueue\n"); + return -ENOMEM; + } + + s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1); + if (!s->send_wq) { + pr_err("can't start tipc send workqueue\n"); + destroy_workqueue(s->rcv_wq); + return -ENOMEM; + } + + return 0; +} + +int tipc_server_start(struct tipc_server *s) +{ + int ret; + + spin_lock_init(&s->idr_lock); + idr_init(&s->conn_idr); + s->idr_in_use = 0; + + s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, + 0, SLAB_HWCACHE_ALIGN, NULL); + if (!s->rcvbuf_cache) + return -ENOMEM; + + ret = tipc_work_start(s); + if (ret < 0) { + kmem_cache_destroy(s->rcvbuf_cache); + return ret; + } + ret = tipc_open_listening_sock(s); + if (ret < 0) { + tipc_work_stop(s); + kmem_cache_destroy(s->rcvbuf_cache); + return ret; + } + s->enabled = 1; + return ret; +} + +void tipc_server_stop(struct tipc_server *s) +{ + struct tipc_conn *con; + int total = 0; + int id; + + if (!s->enabled) + return; + + s->enabled = 0; + spin_lock_bh(&s->idr_lock); + for (id = 0; total < s->idr_in_use; id++) { + con = idr_find(&s->conn_idr, id); + if (con) { + total++; + spin_unlock_bh(&s->idr_lock); + tipc_close_conn(con); + spin_lock_bh(&s->idr_lock); + } + } + spin_unlock_bh(&s->idr_lock); + + tipc_work_stop(s); + kmem_cache_destroy(s->rcvbuf_cache); + idr_destroy(&s->conn_idr); +} diff --git a/net/tipc/server.h b/net/tipc/server.h new file mode 100644 index 00000000000..98b23f20bc0 --- /dev/null +++ b/net/tipc/server.h @@ -0,0 +1,94 @@ +/* + * net/tipc/server.h: Include file for TIPC server code + * + * Copyright (c) 2012-2013, Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SERVER_H +#define _TIPC_SERVER_H + +#include "core.h" + +#define TIPC_SERVER_NAME_LEN 32 + +/** + * struct tipc_server - TIPC server structure + * @conn_idr: identifier set of connection + * @idr_lock: protect the connection identifier set + * @idr_in_use: amount of allocated identifier entry + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @max_rcvbuf_size: maximum permitted receive message length + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_shutdown: callback will be called when connection is shut down + * @tipc_conn_recvmsg: callback will be called when message arrives + * @saddr: TIPC server address + * @name: server name + * @imp: message importance + * @type: socket type + * @enabled: identify whether server is launched or not + */ +struct tipc_server { + struct idr conn_idr; + spinlock_t idr_lock; + int idr_in_use; + struct kmem_cache *rcvbuf_cache; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + int max_rcvbuf_size; + void *(*tipc_conn_new) (int conid); + void (*tipc_conn_shutdown) (int conid, void *usr_data); + void (*tipc_conn_recvmsg) (int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); + struct sockaddr_tipc *saddr; + const char name[TIPC_SERVER_NAME_LEN]; + int imp; + int type; + int enabled; +}; + +int tipc_conn_sendmsg(struct tipc_server *s, int conid, + struct sockaddr_tipc *addr, void *data, size_t len); + +/** + * tipc_conn_terminate - terminate connection with server + * + * Note: Must call it in process context since it might sleep + */ +void tipc_conn_terminate(struct tipc_server *s, int conid); + +int tipc_server_start(struct tipc_server *s); + +void tipc_server_stop(struct tipc_server *s); + +#endif diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 515ce38e4f4..ce8249c7682 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2,7 +2,7 @@ * net/tipc/socket.c: TIPC socket API * * Copyright (c) 2001-2007, 2012 Ericsson AB - * Copyright (c) 2004-2008, 2010-2012, Wind River Systems + * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,8 +43,6 @@ #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \ - SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ struct tipc_sock { @@ -65,12 +63,15 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); static void wakeupdispatch(struct tipc_port *tport); static void tipc_data_ready(struct sock *sk, int len); static void tipc_write_space(struct sock *sk); +static int release(struct socket *sock); +static int accept(struct socket *sock, struct socket *new_sock, int flags); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; static const struct proto_ops msg_ops; static struct proto tipc_proto; +static struct proto tipc_proto_kern; static int sockets_enabled; @@ -143,7 +144,7 @@ static void reject_rx_queue(struct sock *sk) } /** - * tipc_create - create a TIPC socket + * tipc_sk_create - create a TIPC socket * @net: network namespace (must be default network) * @sock: pre-allocated socket structure * @protocol: protocol indicator (must be 0) @@ -154,8 +155,8 @@ static void reject_rx_queue(struct sock *sk) * * Returns 0 on success, errno otherwise */ -static int tipc_create(struct net *net, struct socket *sock, int protocol, - int kern) +static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, + int kern) { const struct proto_ops *ops; socket_state state; @@ -185,13 +186,17 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, } /* Allocate socket's protocol area */ - sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); + if (!kern) + sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); + else + sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern); + if (sk == NULL) return -ENOMEM; /* Allocate TIPC port for socket to use */ - tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch, - TIPC_LOW_IMPORTANCE); + tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch, + TIPC_LOW_IMPORTANCE); if (unlikely(!tp_ptr)) { sk_free(sk); return -ENOMEM; @@ -203,6 +208,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, sock_init_data(sock, sk); sk->sk_backlog_rcv = backlog_rcv; + sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; tipc_sk(sk)->p = tp_ptr; @@ -220,6 +226,78 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol, } /** + * tipc_sock_create_local - create TIPC socket from inside TIPC module + * @type: socket type - SOCK_RDM or SOCK_SEQPACKET + * + * We cannot use sock_creat_kern here because it bumps module user count. + * Since socket owner and creator is the same module we must make sure + * that module count remains zero for module local sockets, otherwise + * we cannot do rmmod. + * + * Returns 0 on success, errno otherwise + */ +int tipc_sock_create_local(int type, struct socket **res) +{ + int rc; + struct sock *sk; + + rc = sock_create_lite(AF_TIPC, type, 0, res); + if (rc < 0) { + pr_err("Failed to create kernel socket\n"); + return rc; + } + tipc_sk_create(&init_net, *res, 0, 1); + + sk = (*res)->sk; + + return 0; +} + +/** + * tipc_sock_release_local - release socket created by tipc_sock_create_local + * @sock: the socket to be released. + * + * Module reference count is not incremented when such sockets are created, + * so we must keep it from being decremented when they are released. + */ +void tipc_sock_release_local(struct socket *sock) +{ + release(sock); + sock->ops = NULL; + sock_release(sock); +} + +/** + * tipc_sock_accept_local - accept a connection on a socket created + * with tipc_sock_create_local. Use this function to avoid that + * module reference count is inadvertently incremented. + * + * @sock: the accepting socket + * @newsock: reference to the new socket to be created + * @flags: socket flags + */ + +int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, + int flags) +{ + struct sock *sk = sock->sk; + int ret; + + ret = sock_create_lite(sk->sk_family, sk->sk_type, + sk->sk_protocol, newsock); + if (ret < 0) + return ret; + + ret = accept(sock, *newsock, flags); + if (ret < 0) { + sock_release(*newsock); + return ret; + } + (*newsock)->ops = sock->ops; + return ret; +} + +/** * release - destroy a TIPC socket * @sock: socket to destroy * @@ -324,7 +402,9 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) else if (addr->addrtype != TIPC_ADDR_NAMESEQ) return -EAFNOSUPPORT; - if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES) + if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) && + (addr->addr.nameseq.type != TIPC_TOP_SRV) && + (addr->addr.nameseq.type != TIPC_CFG_SRV)) return -EACCES; return (addr->scope > 0) ? @@ -519,8 +599,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock, res = -EISCONN; goto exit; } - if ((tport->published) || - ((sock->type == SOCK_STREAM) && (total_len != 0))) { + if (tport->published) { res = -EOPNOTSUPP; goto exit; } @@ -810,7 +889,7 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) * Returns 0 if successful, otherwise errno */ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, - struct tipc_port *tport) + struct tipc_port *tport) { u32 anc_data[3]; u32 err; @@ -1011,8 +1090,7 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock, lock_sock(sk); - if (unlikely((sock->state == SS_UNCONNECTED) || - (sock->state == SS_CONNECTING))) { + if (unlikely((sock->state == SS_UNCONNECTED))) { res = -ENOTCONN; goto exit; } @@ -1233,10 +1311,10 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) * For all connectionless messages, by default new queue limits are * as belows: * - * TIPC_LOW_IMPORTANCE (5MB) - * TIPC_MEDIUM_IMPORTANCE (10MB) - * TIPC_HIGH_IMPORTANCE (20MB) - * TIPC_CRITICAL_IMPORTANCE (40MB) + * TIPC_LOW_IMPORTANCE (4 MB) + * TIPC_MEDIUM_IMPORTANCE (8 MB) + * TIPC_HIGH_IMPORTANCE (16 MB) + * TIPC_CRITICAL_IMPORTANCE (32 MB) * * Returns overload limit according to corresponding message importance */ @@ -1246,9 +1324,10 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) unsigned int limit; if (msg_connected(msg)) - limit = CONN_OVERLOAD_LIMIT; + limit = sysctl_tipc_rmem[2]; else - limit = sk->sk_rcvbuf << (msg_importance(msg) + 5); + limit = sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << + msg_importance(msg); return limit; } @@ -1327,7 +1406,7 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf) */ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) { - struct sock *sk = (struct sock *)tport->usr_handle; + struct sock *sk = tport->sk; u32 res; /* @@ -1358,7 +1437,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) */ static void wakeupdispatch(struct tipc_port *tport) { - struct sock *sk = (struct sock *)tport->usr_handle; + struct sock *sk = tport->sk; sk->sk_write_space(sk); } @@ -1531,7 +1610,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) buf = skb_peek(&sk->sk_receive_queue); - res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); + res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1); if (res) goto exit; @@ -1657,8 +1736,8 @@ restart: * * Returns 0 on success, errno otherwise */ -static int setsockopt(struct socket *sock, - int lvl, int opt, char __user *ov, unsigned int ol) +static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, + unsigned int ol) { struct sock *sk = sock->sk; struct tipc_port *tport = tipc_sk_port(sk); @@ -1716,8 +1795,8 @@ static int setsockopt(struct socket *sock, * * Returns 0 on success, errno otherwise */ -static int getsockopt(struct socket *sock, - int lvl, int opt, char __user *ov, int __user *ol) +static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov, + int __user *ol) { struct sock *sk = sock->sk; struct tipc_port *tport = tipc_sk_port(sk); @@ -1841,13 +1920,20 @@ static const struct proto_ops stream_ops = { static const struct net_proto_family tipc_family_ops = { .owner = THIS_MODULE, .family = AF_TIPC, - .create = tipc_create + .create = tipc_sk_create }; static struct proto tipc_proto = { .name = "TIPC", .owner = THIS_MODULE, - .obj_size = sizeof(struct tipc_sock) + .obj_size = sizeof(struct tipc_sock), + .sysctl_rmem = sysctl_tipc_rmem +}; + +static struct proto tipc_proto_kern = { + .name = "TIPC", + .obj_size = sizeof(struct tipc_sock), + .sysctl_rmem = sysctl_tipc_rmem }; /** diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 6b42d47029a..d38bb45d82e 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -2,7 +2,7 @@ * net/tipc/subscr.c: TIPC network topology service * * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005-2007, 2010-2011, Wind River Systems + * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,33 +41,42 @@ /** * struct tipc_subscriber - TIPC network topology subscriber - * @port_ref: object reference to server port connecting to subscriber - * @lock: pointer to spinlock controlling access to subscriber's server port - * @subscriber_list: adjacent subscribers in top. server's list of subscribers + * @conid: connection identifier to server connecting to subscriber + * @lock: controll access to subscriber * @subscription_list: list of subscription objects for this subscriber */ struct tipc_subscriber { - u32 port_ref; - spinlock_t *lock; - struct list_head subscriber_list; + int conid; + spinlock_t lock; struct list_head subscription_list; }; -/** - * struct top_srv - TIPC network topology subscription service - * @setup_port: reference to TIPC port that handles subscription requests - * @subscription_count: number of active subscriptions (not subscribers!) - * @subscriber_list: list of ports subscribing to service - * @lock: spinlock govering access to subscriber list - */ -struct top_srv { - u32 setup_port; - atomic_t subscription_count; - struct list_head subscriber_list; - spinlock_t lock; +static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); +static void *subscr_named_msg_event(int conid); +static void subscr_conn_shutdown_event(int conid, void *usr_data); + +static atomic_t subscription_count = ATOMIC_INIT(0); + +static struct sockaddr_tipc topsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_TOP_SRV, + .addr.nameseq.lower = TIPC_TOP_SRV, + .addr.nameseq.upper = TIPC_TOP_SRV, + .scope = TIPC_NODE_SCOPE }; -static struct top_srv topsrv; +static struct tipc_server topsrv __read_mostly = { + .saddr = &topsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_SEQPACKET, + .max_rcvbuf_size = sizeof(struct tipc_subscr), + .name = "topology_server", + .tipc_conn_recvmsg = subscr_conn_msg_event, + .tipc_conn_new = subscr_named_msg_event, + .tipc_conn_shutdown = subscr_conn_shutdown_event, +}; /** * htohl - convert value to endianness used by destination @@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap) return swap ? swab32(in) : in; } -/** - * subscr_send_event - send a message containing a tipc_event to the subscriber - * - * Note: Must not hold subscriber's server port lock, since tipc_send() will - * try to take the lock if the message is rejected and returned! - */ -static void subscr_send_event(struct tipc_subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, +static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, u32 node) { - struct iovec msg_sect; + struct tipc_subscriber *subscriber = sub->subscriber; + struct kvec msg_sect; + int ret; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); @@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); + ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, + msg_sect.iov_base, msg_sect.iov_len); + if (ret < 0) + pr_err("Sending subscription event failed, no memory\n"); } /** @@ -112,10 +117,8 @@ static void subscr_send_event(struct tipc_subscription *sub, * * Returns 1 if there is overlap, otherwise 0. */ -int tipc_subscr_overlap(struct tipc_subscription *sub, - u32 found_lower, +int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper) - { if (found_lower < sub->seq.lower) found_lower = sub->seq.lower; @@ -131,13 +134,9 @@ int tipc_subscr_overlap(struct tipc_subscription *sub, * * Protected by nameseq.lock in name_table.c */ -void tipc_subscr_report_overlap(struct tipc_subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, - u32 node, - int must) +void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, + u32 node, int must) { if (!tipc_subscr_overlap(sub, found_lower, found_upper)) return; @@ -147,21 +146,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub, subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); } -/** - * subscr_timeout - subscription timeout has occurred - */ static void subscr_timeout(struct tipc_subscription *sub) { - struct tipc_port *server_port; + struct tipc_subscriber *subscriber = sub->subscriber; + + /* The spin lock per subscriber is used to protect its members */ + spin_lock_bh(&subscriber->lock); - /* Validate server port reference (in case subscriber is terminating) */ - server_port = tipc_port_lock(sub->server_ref); - if (server_port == NULL) + /* Validate if the connection related to the subscriber is + * closed (in case subscriber is terminating) + */ + if (subscriber->conid == 0) { + spin_unlock_bh(&subscriber->lock); return; + } /* Validate timeout (in case subscription is being cancelled) */ if (sub->timeout == TIPC_WAIT_FOREVER) { - tipc_port_unlock(server_port); + spin_unlock_bh(&subscriber->lock); return; } @@ -171,8 +173,7 @@ static void subscr_timeout(struct tipc_subscription *sub) /* Unlink subscription from subscriber */ list_del(&sub->subscription_list); - /* Release subscriber's server port */ - tipc_port_unlock(server_port); + spin_unlock_bh(&subscriber->lock); /* Notify subscriber of timeout */ subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, @@ -181,64 +182,54 @@ static void subscr_timeout(struct tipc_subscription *sub) /* Now destroy subscription */ k_term_timer(&sub->timer); kfree(sub); - atomic_dec(&topsrv.subscription_count); + atomic_dec(&subscription_count); } /** * subscr_del - delete a subscription within a subscription list * - * Called with subscriber port locked. + * Called with subscriber lock held. */ static void subscr_del(struct tipc_subscription *sub) { tipc_nametbl_unsubscribe(sub); list_del(&sub->subscription_list); kfree(sub); - atomic_dec(&topsrv.subscription_count); + atomic_dec(&subscription_count); } /** * subscr_terminate - terminate communication with a subscriber * - * Called with subscriber port locked. Routine must temporarily release lock - * to enable subscription timeout routine(s) to finish without deadlocking; - * the lock is then reclaimed to allow caller to release it upon return. - * (This should work even in the unlikely event some other thread creates - * a new object reference in the interim that uses this lock; this routine will - * simply wait for it to be released, then claim it.) + * Note: Must call it in process context since it might sleep. */ static void subscr_terminate(struct tipc_subscriber *subscriber) { - u32 port_ref; + tipc_conn_terminate(&topsrv, subscriber->conid); +} + +static void subscr_release(struct tipc_subscriber *subscriber) +{ struct tipc_subscription *sub; struct tipc_subscription *sub_temp; - /* Invalidate subscriber reference */ - port_ref = subscriber->port_ref; - subscriber->port_ref = 0; - spin_unlock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); - /* Sever connection to subscriber */ - tipc_shutdown(port_ref); - tipc_deleteport(port_ref); + /* Invalidate subscriber reference */ + subscriber->conid = 0; /* Destroy any existing subscriptions for subscriber */ list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, subscription_list) { if (sub->timeout != TIPC_WAIT_FOREVER) { + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); + spin_lock_bh(&subscriber->lock); } subscr_del(sub); } - - /* Remove subscriber from topology server's subscriber list */ - spin_lock_bh(&topsrv.lock); - list_del(&subscriber->subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* Reclaim subscriber lock */ - spin_lock_bh(subscriber->lock); + spin_unlock_bh(&subscriber->lock); /* Now destroy subscriber */ kfree(subscriber); @@ -247,7 +238,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber) /** * subscr_cancel - handle subscription cancellation request * - * Called with subscriber port locked. Routine must temporarily release lock + * Called with subscriber lock held. Routine must temporarily release lock * to enable the subscription timeout routine to finish without deadlocking; * the lock is then reclaimed to allow caller to release it upon return. * @@ -274,10 +265,10 @@ static void subscr_cancel(struct tipc_subscr *s, /* Cancel subscription timer (if used), then delete subscription */ if (sub->timeout != TIPC_WAIT_FOREVER) { sub->timeout = TIPC_WAIT_FOREVER; - spin_unlock_bh(subscriber->lock); + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); - spin_lock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); } subscr_del(sub); } @@ -285,7 +276,7 @@ static void subscr_cancel(struct tipc_subscr *s, /** * subscr_subscribe - create subscription for subscriber * - * Called with subscriber port locked. + * Called with subscriber lock held. */ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, struct tipc_subscriber *subscriber) @@ -304,7 +295,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, } /* Refuse subscription if global limit exceeded */ - if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { + if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { pr_warn("Subscription rejected, limit reached (%u)\n", TIPC_MAX_SUBSCRIPTIONS); subscr_terminate(subscriber); @@ -335,10 +326,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, } INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); - sub->server_ref = subscriber->port_ref; + sub->subscriber = subscriber; sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); - atomic_inc(&topsrv.subscription_count); + atomic_inc(&subscription_count); if (sub->timeout != TIPC_WAIT_FOREVER) { k_init_timer(&sub->timer, (Handler)subscr_timeout, (unsigned long)sub); @@ -348,196 +339,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, return sub; } -/** - * subscr_conn_shutdown_event - handle termination request from subscriber - * - * Called with subscriber's server port unlocked. - */ -static void subscr_conn_shutdown_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - unsigned char const *data, - unsigned int size, - int reason) +/* Handle one termination request for the subscriber */ +static void subscr_conn_shutdown_event(int conid, void *usr_data) { - struct tipc_subscriber *subscriber = usr_handle; - spinlock_t *subscriber_lock; - - if (tipc_port_lock(port_ref) == NULL) - return; - - subscriber_lock = subscriber->lock; - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); + subscr_release((struct tipc_subscriber *)usr_data); } -/** - * subscr_conn_msg_event - handle new subscription request from subscriber - * - * Called with subscriber's server port unlocked. - */ -static void subscr_conn_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size) +/* Handle one request to create a new subscription for the subscriber */ +static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { - struct tipc_subscriber *subscriber = usr_handle; - spinlock_t *subscriber_lock; + struct tipc_subscriber *subscriber = usr_data; struct tipc_subscription *sub; - /* - * Lock subscriber's server port (& make a local copy of lock pointer, - * in case subscriber is deleted while processing subscription request) - */ - if (tipc_port_lock(port_ref) == NULL) - return; - - subscriber_lock = subscriber->lock; - - if (size != sizeof(struct tipc_subscr)) { - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } else { - sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); - spin_unlock_bh(subscriber_lock); - if (sub != NULL) { - - /* - * We must release the server port lock before adding a - * subscription to the name table since TIPC needs to be - * able to (re)acquire the port lock if an event message - * issued by the subscription process is rejected and - * returned. The subscription cannot be deleted while - * it is being added to the name table because: - * a) the single-threading of the native API port code - * ensures the subscription cannot be cancelled and - * the subscriber connection cannot be broken, and - * b) the name table lock ensures the subscription - * timeout code cannot delete the subscription, - * so the subscription object is still protected. - */ - tipc_nametbl_subscribe(sub); - } - } + spin_lock_bh(&subscriber->lock); + sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber); + if (sub) + tipc_nametbl_subscribe(sub); + spin_unlock_bh(&subscriber->lock); } -/** - * subscr_named_msg_event - handle request to establish a new subscriber - */ -static void subscr_named_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) + +/* Handle one request to establish a new subscriber */ +static void *subscr_named_msg_event(int conid) { struct tipc_subscriber *subscriber; - u32 server_port_ref; /* Create subscriber object */ subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); if (subscriber == NULL) { pr_warn("Subscriber rejected, no memory\n"); - return; + return NULL; } INIT_LIST_HEAD(&subscriber->subscription_list); - INIT_LIST_HEAD(&subscriber->subscriber_list); - - /* Create server port & establish connection to subscriber */ - tipc_createport(subscriber, - importance, - NULL, - NULL, - subscr_conn_shutdown_event, - NULL, - NULL, - subscr_conn_msg_event, - NULL, - &subscriber->port_ref); - if (subscriber->port_ref == 0) { - pr_warn("Subscriber rejected, unable to create port\n"); - kfree(subscriber); - return; - } - tipc_connect(subscriber->port_ref, orig); - - /* Lock server port (& save lock address for future use) */ - subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; - - /* Add subscriber to topology server's subscriber list */ - spin_lock_bh(&topsrv.lock); - list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* Unlock server port */ - server_port_ref = subscriber->port_ref; - spin_unlock_bh(subscriber->lock); - - /* Send an ACK- to complete connection handshaking */ - tipc_send(server_port_ref, 0, NULL, 0); + subscriber->conid = conid; + spin_lock_init(&subscriber->lock); - /* Handle optional subscription request */ - if (size != 0) { - subscr_conn_msg_event(subscriber, server_port_ref, - buf, data, size); - } + return (void *)subscriber; } int tipc_subscr_start(void) { - struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; - int res; - - spin_lock_init(&topsrv.lock); - INIT_LIST_HEAD(&topsrv.subscriber_list); - - res = tipc_createport(NULL, - TIPC_CRITICAL_IMPORTANCE, - NULL, - NULL, - NULL, - NULL, - subscr_named_msg_event, - NULL, - NULL, - &topsrv.setup_port); - if (res) - goto failed; - - res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); - if (res) { - tipc_deleteport(topsrv.setup_port); - topsrv.setup_port = 0; - goto failed; - } - - return 0; - -failed: - pr_err("Failed to create subscription service\n"); - return res; + return tipc_server_start(&topsrv); } void tipc_subscr_stop(void) { - struct tipc_subscriber *subscriber; - struct tipc_subscriber *subscriber_temp; - spinlock_t *subscriber_lock; - - if (topsrv.setup_port) { - tipc_deleteport(topsrv.setup_port); - topsrv.setup_port = 0; - - list_for_each_entry_safe(subscriber, subscriber_temp, - &topsrv.subscriber_list, - subscriber_list) { - subscriber_lock = subscriber->lock; - spin_lock_bh(subscriber_lock); - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } - } + tipc_server_stop(&topsrv); } diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 218d2e07f0c..393e417bee3 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -2,7 +2,7 @@ * net/tipc/subscr.h: Include file for TIPC network topology service * * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005-2007, Wind River Systems + * Copyright (c) 2005-2007, 2012-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,10 +37,14 @@ #ifndef _TIPC_SUBSCR_H #define _TIPC_SUBSCR_H +#include "server.h" + struct tipc_subscription; +struct tipc_subscriber; /** * struct tipc_subscription - TIPC network topology subscription object + * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription * @timeout: duration of subscription (in ms) * @filter: event filtering to be done for subscription @@ -52,28 +56,23 @@ struct tipc_subscription; * @evt: template for events generated by subscription */ struct tipc_subscription { + struct tipc_subscriber *subscriber; struct tipc_name_seq seq; u32 timeout; u32 filter; struct timer_list timer; struct list_head nameseq_list; struct list_head subscription_list; - u32 server_ref; int swap; struct tipc_event evt; }; -int tipc_subscr_overlap(struct tipc_subscription *sub, - u32 found_lower, +int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper); -void tipc_subscr_report_overlap(struct tipc_subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, - u32 node, - int must_report); +void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, + u32 node, int must); int tipc_subscr_start(void); diff --git a/net/tipc/sysctl.c b/net/tipc/sysctl.c new file mode 100644 index 00000000000..f3fef93325a --- /dev/null +++ b/net/tipc/sysctl.c @@ -0,0 +1,64 @@ +/* + * net/tipc/sysctl.c: sysctl interface to TIPC subsystem + * + * Copyright (c) 2013, Wind River Systems + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "core.h" + +#include <linux/sysctl.h> + +static struct ctl_table_header *tipc_ctl_hdr; + +static struct ctl_table tipc_table[] = { + { + .procname = "tipc_rmem", + .data = &sysctl_tipc_rmem, + .maxlen = sizeof(sysctl_tipc_rmem), + .mode = 0644, + .proc_handler = proc_dointvec, + }, + {} +}; + +int tipc_register_sysctl(void) +{ + tipc_ctl_hdr = register_net_sysctl(&init_net, "net/tipc", tipc_table); + if (tipc_ctl_hdr == NULL) + return -ENOMEM; + return 0; +} + +void tipc_unregister_sysctl(void) +{ + unregister_net_sysctl_table(tipc_ctl_hdr); +} |