summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile3
-rw-r--r--net/tipc/bcast.c3
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/config.c119
-rw-r--r--net/tipc/core.c22
-rw-r--r--net/tipc/core.h17
-rw-r--r--net/tipc/discover.c7
-rw-r--r--net/tipc/eth_media.c19
-rw-r--r--net/tipc/ib_media.c25
-rw-r--r--net/tipc/link.c88
-rw-r--r--net/tipc/msg.c19
-rw-r--r--net/tipc/msg.h8
-rw-r--r--net/tipc/name_table.c10
-rw-r--r--net/tipc/name_table.h11
-rw-r--r--net/tipc/node_subscr.c2
-rw-r--r--net/tipc/port.c320
-rw-r--r--net/tipc/port.h85
-rw-r--r--net/tipc/server.c605
-rw-r--r--net/tipc/server.h94
-rw-r--r--net/tipc/socket.c146
-rw-r--r--net/tipc/subscr.c348
-rw-r--r--net/tipc/subscr.h21
-rw-r--r--net/tipc/sysctl.c64
23 files changed, 1154 insertions, 885 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 4df8e02d900..b282f7130d2 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -8,6 +8,7 @@ tipc-y += addr.o bcast.o bearer.o config.o \
core.o handler.o link.o discover.o msg.o \
name_distr.o subscr.o name_table.o net.o \
netlink.o node.o node_subscr.o port.o ref.o \
- socket.o log.o eth_media.o
+ socket.o log.o eth_media.o server.o
tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
+tipc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index e5f3da50782..716de1ac6cb 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -578,8 +578,7 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
* Returns 0 (packet sent successfully) under all circumstances,
* since the broadcast link's pseudo-bearer never blocks
*/
-static int tipc_bcbearer_send(struct sk_buff *buf,
- struct tipc_bearer *unused1,
+static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
{
int bp_index;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index a93306557e0..6ee587b469f 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -75,7 +75,8 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
/**
* tipc_nmap_equal - test for equality of node maps
*/
-static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b)
+static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
+ struct tipc_node_map *nm_b)
{
return !memcmp(nm_a, nm_b, sizeof(*nm_a));
}
diff --git a/net/tipc/config.c b/net/tipc/config.c
index f67866c765d..c301a9a592d 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -2,7 +2,7 @@
* net/tipc/config.c: TIPC configuration management code
*
* Copyright (c) 2002-2006, Ericsson AB
- * Copyright (c) 2004-2007, 2010-2012, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,12 +38,12 @@
#include "port.h"
#include "name_table.h"
#include "config.h"
+#include "server.h"
#define REPLY_TRUNCATED "<truncated>\n"
-static u32 config_port_ref;
-
-static DEFINE_SPINLOCK(config_lock);
+static DEFINE_MUTEX(config_mutex);
+static struct tipc_server cfgsrv;
static const void *req_tlv_area; /* request message TLV area */
static int req_tlv_space; /* request message TLV area size */
@@ -181,18 +181,7 @@ static struct sk_buff *cfg_set_own_addr(void)
if (tipc_own_addr)
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (cannot change node address once assigned)");
-
- /*
- * Must temporarily release configuration spinlock while switching into
- * networking mode as it calls tipc_eth_media_start(), which may sleep.
- * Releasing the lock is harmless as other locally-issued configuration
- * commands won't occur until this one completes, and remotely-issued
- * configuration commands can't be received until a local configuration
- * command to enable the first bearer is received and processed.
- */
- spin_unlock_bh(&config_lock);
tipc_core_start_net(addr);
- spin_lock_bh(&config_lock);
return tipc_cfg_reply_none();
}
@@ -248,7 +237,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
{
struct sk_buff *rep_tlv_buf;
- spin_lock_bh(&config_lock);
+ mutex_lock(&config_mutex);
/* Save request and reply details in a well-known location */
req_tlv_area = request_area;
@@ -377,37 +366,31 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
/* Return reply buffer */
exit:
- spin_unlock_bh(&config_lock);
+ mutex_unlock(&config_mutex);
return rep_tlv_buf;
}
-static void cfg_named_msg_event(void *userdata,
- u32 port_ref,
- struct sk_buff **buf,
- const unchar *msg,
- u32 size,
- u32 importance,
- struct tipc_portid const *orig,
- struct tipc_name_seq const *dest)
+static void cfg_conn_msg_event(int conid, struct sockaddr_tipc *addr,
+ void *usr_data, void *buf, size_t len)
{
struct tipc_cfg_msg_hdr *req_hdr;
struct tipc_cfg_msg_hdr *rep_hdr;
struct sk_buff *rep_buf;
+ int ret;
/* Validate configuration message header (ignore invalid message) */
- req_hdr = (struct tipc_cfg_msg_hdr *)msg;
- if ((size < sizeof(*req_hdr)) ||
- (size != TCM_ALIGN(ntohl(req_hdr->tcm_len))) ||
+ req_hdr = (struct tipc_cfg_msg_hdr *)buf;
+ if ((len < sizeof(*req_hdr)) ||
+ (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) ||
(ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) {
pr_warn("Invalid configuration message discarded\n");
return;
}
/* Generate reply for request (if can't, return request) */
- rep_buf = tipc_cfg_do_cmd(orig->node,
- ntohs(req_hdr->tcm_type),
- msg + sizeof(*req_hdr),
- size - sizeof(*req_hdr),
+ rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type),
+ buf + sizeof(*req_hdr),
+ len - sizeof(*req_hdr),
BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr));
if (rep_buf) {
skb_push(rep_buf, sizeof(*rep_hdr));
@@ -415,57 +398,51 @@ static void cfg_named_msg_event(void *userdata,
memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr));
rep_hdr->tcm_len = htonl(rep_buf->len);
rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST);
- } else {
- rep_buf = *buf;
- *buf = NULL;
- }
- /* NEED TO ADD CODE TO HANDLE FAILED SEND (SUCH AS CONGESTION) */
- tipc_send_buf2port(port_ref, orig, rep_buf, rep_buf->len);
+ ret = tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data,
+ rep_buf->len);
+ if (ret < 0)
+ pr_err("Sending cfg reply message failed, no memory\n");
+
+ kfree_skb(rep_buf);
+ }
}
+static struct sockaddr_tipc cfgsrv_addr __read_mostly = {
+ .family = AF_TIPC,
+ .addrtype = TIPC_ADDR_NAMESEQ,
+ .addr.nameseq.type = TIPC_CFG_SRV,
+ .addr.nameseq.lower = 0,
+ .addr.nameseq.upper = 0,
+ .scope = TIPC_ZONE_SCOPE
+};
+
+static struct tipc_server cfgsrv __read_mostly = {
+ .saddr = &cfgsrv_addr,
+ .imp = TIPC_CRITICAL_IMPORTANCE,
+ .type = SOCK_RDM,
+ .max_rcvbuf_size = 64 * 1024,
+ .name = "cfg_server",
+ .tipc_conn_recvmsg = cfg_conn_msg_event,
+ .tipc_conn_new = NULL,
+ .tipc_conn_shutdown = NULL
+};
+
int tipc_cfg_init(void)
{
- struct tipc_name_seq seq;
- int res;
-
- res = tipc_createport(NULL, TIPC_CRITICAL_IMPORTANCE,
- NULL, NULL, NULL,
- NULL, cfg_named_msg_event, NULL,
- NULL, &config_port_ref);
- if (res)
- goto failed;
-
- seq.type = TIPC_CFG_SRV;
- seq.lower = seq.upper = tipc_own_addr;
- res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq);
- if (res)
- goto failed;
-
- return 0;
-
-failed:
- pr_err("Unable to create configuration service\n");
- return res;
+ return tipc_server_start(&cfgsrv);
}
void tipc_cfg_reinit(void)
{
- struct tipc_name_seq seq;
- int res;
-
- seq.type = TIPC_CFG_SRV;
- seq.lower = seq.upper = 0;
- tipc_withdraw(config_port_ref, TIPC_ZONE_SCOPE, &seq);
+ tipc_server_stop(&cfgsrv);
- seq.lower = seq.upper = tipc_own_addr;
- res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq);
- if (res)
- pr_err("Unable to reinitialize configuration service\n");
+ cfgsrv_addr.addr.nameseq.lower = tipc_own_addr;
+ cfgsrv_addr.addr.nameseq.upper = tipc_own_addr;
+ tipc_server_start(&cfgsrv);
}
void tipc_cfg_stop(void)
{
- tipc_deleteport(config_port_ref);
- config_port_ref = 0;
+ tipc_server_stop(&cfgsrv);
}
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 7ec2c1eb94f..fd4eeeaa972 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -2,7 +2,7 @@
* net/tipc/core.c: TIPC module code
*
* Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
+ * Copyright (c) 2005-2006, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,6 +39,7 @@
#include "name_table.h"
#include "subscr.h"
#include "config.h"
+#include "port.h"
#include <linux/module.h>
@@ -50,7 +51,7 @@ u32 tipc_own_addr __read_mostly;
int tipc_max_ports __read_mostly;
int tipc_net_id __read_mostly;
int tipc_remote_management __read_mostly;
-
+int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */
/**
* tipc_buf_acquire - creates a TIPC message buffer
@@ -118,6 +119,7 @@ static void tipc_core_stop(void)
tipc_nametbl_stop();
tipc_ref_table_stop();
tipc_socket_stop();
+ tipc_unregister_sysctl();
}
/**
@@ -135,20 +137,21 @@ static int tipc_core_start(void)
if (!res)
res = tipc_nametbl_init();
if (!res)
- res = tipc_subscr_start();
- if (!res)
- res = tipc_cfg_init();
- if (!res)
res = tipc_netlink_start();
if (!res)
res = tipc_socket_init();
+ if (!res)
+ res = tipc_register_sysctl();
+ if (!res)
+ res = tipc_subscr_start();
+ if (!res)
+ res = tipc_cfg_init();
if (res)
tipc_core_stop();
return res;
}
-
static int __init tipc_init(void)
{
int res;
@@ -160,6 +163,11 @@ static int __init tipc_init(void)
tipc_max_ports = CONFIG_TIPC_PORTS;
tipc_net_id = 4711;
+ sysctl_tipc_rmem[0] = CONN_OVERLOAD_LIMIT >> 4 << TIPC_LOW_IMPORTANCE;
+ sysctl_tipc_rmem[1] = CONN_OVERLOAD_LIMIT >> 4 <<
+ TIPC_CRITICAL_IMPORTANCE;
+ sysctl_tipc_rmem[2] = CONN_OVERLOAD_LIMIT;
+
res = tipc_core_start();
if (res)
pr_err("Unable to start in single node mode\n");
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 0207db04179..be72f8cebc5 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -1,8 +1,8 @@
/*
* net/tipc/core.h: Include file for TIPC global declarations
*
- * Copyright (c) 2005-2006, Ericsson AB
- * Copyright (c) 2005-2007, 2010-2011, Wind River Systems
+ * Copyright (c) 2005-2006, 2013 Ericsson AB
+ * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,6 +80,7 @@ extern u32 tipc_own_addr __read_mostly;
extern int tipc_max_ports __read_mostly;
extern int tipc_net_id __read_mostly;
extern int tipc_remote_management __read_mostly;
+extern int sysctl_tipc_rmem[3] __read_mostly;
/*
* Other global variables
@@ -96,6 +97,18 @@ extern int tipc_netlink_start(void);
extern void tipc_netlink_stop(void);
extern int tipc_socket_init(void);
extern void tipc_socket_stop(void);
+extern int tipc_sock_create_local(int type, struct socket **res);
+extern void tipc_sock_release_local(struct socket *sock);
+extern int tipc_sock_accept_local(struct socket *sock,
+ struct socket **newsock, int flags);
+
+#ifdef CONFIG_SYSCTL
+extern int tipc_register_sysctl(void);
+extern void tipc_unregister_sysctl(void);
+#else
+#define tipc_register_sysctl() 0
+#define tipc_unregister_sysctl()
+#endif
/*
* TIPC timer and signal code
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index eedff58d038..ecc758c6eac 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -70,8 +70,7 @@ struct tipc_link_req {
* @dest_domain: network domain of node(s) which should respond to message
* @b_ptr: ptr to bearer issuing message
*/
-static struct sk_buff *tipc_disc_init_msg(u32 type,
- u32 dest_domain,
+static struct sk_buff *tipc_disc_init_msg(u32 type, u32 dest_domain,
struct tipc_bearer *b_ptr)
{
struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE);
@@ -346,8 +345,8 @@ exit:
*
* Returns 0 if successful, otherwise -errno.
*/
-int tipc_disc_create(struct tipc_bearer *b_ptr,
- struct tipc_media_addr *dest, u32 dest_domain)
+int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest,
+ u32 dest_domain)
{
struct tipc_link_req *req;
diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c
index 120a676a336..40ea40cf620 100644
--- a/net/tipc/eth_media.c
+++ b/net/tipc/eth_media.c
@@ -62,7 +62,7 @@ static struct eth_bearer eth_bearers[MAX_ETH_BEARERS];
static int eth_started;
static int recv_notification(struct notifier_block *nb, unsigned long evt,
- void *dv);
+ void *dv);
/*
* Network device notifier info
*/
@@ -162,8 +162,7 @@ static void setup_bearer(struct work_struct *work)
*/
static int enable_bearer(struct tipc_bearer *tb_ptr)
{
- struct net_device *dev = NULL;
- struct net_device *pdev = NULL;
+ struct net_device *dev;
struct eth_bearer *eb_ptr = &eth_bearers[0];
struct eth_bearer *stop = &eth_bearers[MAX_ETH_BEARERS];
char *driver_name = strchr((const char *)tb_ptr->name, ':') + 1;
@@ -178,15 +177,7 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
}
/* Find device with specified name */
- read_lock(&dev_base_lock);
- for_each_netdev(&init_net, pdev) {
- if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) {
- dev = pdev;
- dev_hold(dev);
- break;
- }
- }
- read_unlock(&dev_base_lock);
+ dev = dev_get_by_name(&init_net, driver_name);
if (!dev)
return -ENODEV;
@@ -251,9 +242,9 @@ static void disable_bearer(struct tipc_bearer *tb_ptr)
* specified device.
*/
static int recv_notification(struct notifier_block *nb, unsigned long evt,
- void *dv)
+ void *ptr)
{
- struct net_device *dev = (struct net_device *)dv;
+ struct net_device *dev = netdev_notifier_info_to_dev(ptr);
struct eth_bearer *eb_ptr = &eth_bearers[0];
struct eth_bearer *stop = &eth_bearers[MAX_ETH_BEARERS];
diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c
index 2a2864c25e1..9934a32bfa8 100644
--- a/net/tipc/ib_media.c
+++ b/net/tipc/ib_media.c
@@ -155,8 +155,7 @@ static void setup_bearer(struct work_struct *work)
*/
static int enable_bearer(struct tipc_bearer *tb_ptr)
{
- struct net_device *dev = NULL;
- struct net_device *pdev = NULL;
+ struct net_device *dev;
struct ib_bearer *ib_ptr = &ib_bearers[0];
struct ib_bearer *stop = &ib_bearers[MAX_IB_BEARERS];
char *driver_name = strchr((const char *)tb_ptr->name, ':') + 1;
@@ -171,15 +170,7 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
}
/* Find device with specified name */
- read_lock(&dev_base_lock);
- for_each_netdev(&init_net, pdev) {
- if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) {
- dev = pdev;
- dev_hold(dev);
- break;
- }
- }
- read_unlock(&dev_base_lock);
+ dev = dev_get_by_name(&init_net, driver_name);
if (!dev)
return -ENODEV;
@@ -244,9 +235,9 @@ static void disable_bearer(struct tipc_bearer *tb_ptr)
* specified device.
*/
static int recv_notification(struct notifier_block *nb, unsigned long evt,
- void *dv)
+ void *ptr)
{
- struct net_device *dev = (struct net_device *)dv;
+ struct net_device *dev = netdev_notifier_info_to_dev(ptr);
struct ib_bearer *ib_ptr = &ib_bearers[0];
struct ib_bearer *stop = &ib_bearers[MAX_IB_BEARERS];
@@ -301,13 +292,7 @@ static int ib_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size)
if (str_size < 60) /* 60 = 19 * strlen("xx:") + strlen("xx\0") */
return 1;
- sprintf(str_buf, "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:"
- "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x",
- a->value[0], a->value[1], a->value[2], a->value[3],
- a->value[4], a->value[5], a->value[6], a->value[7],
- a->value[8], a->value[9], a->value[10], a->value[11],
- a->value[12], a->value[13], a->value[14], a->value[15],
- a->value[16], a->value[17], a->value[18], a->value[19]);
+ sprintf(str_buf, "%20phC", a->value);
return 0;
}
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a80feee5197..0cc3d9015c5 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -2,7 +2,7 @@
* net/tipc/link.c: TIPC link code
*
* Copyright (c) 1996-2007, 2012, Ericsson AB
- * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,8 @@
#include "discover.h"
#include "config.h"
+#include <linux/pkt_sched.h>
+
/*
* Error message prefixes
*/
@@ -771,8 +773,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
* link_bundle_buf(): Append contents of a buffer to
* the tail of an existing one.
*/
-static int link_bundle_buf(struct tipc_link *l_ptr,
- struct sk_buff *bundler,
+static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
struct sk_buff *buf)
{
struct tipc_msg *bundler_msg = buf_msg(bundler);
@@ -1057,40 +1058,6 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
}
/*
- * tipc_send_buf_fast: Entry for data messages where the
- * destination node is known and the header is complete,
- * inclusive total message length.
- * Returns user data length.
- */
-int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
-{
- struct tipc_link *l_ptr;
- struct tipc_node *n_ptr;
- int res;
- u32 selector = msg_origport(buf_msg(buf)) & 1;
- u32 dummy;
-
- read_lock_bh(&tipc_net_lock);
- n_ptr = tipc_node_find(destnode);
- if (likely(n_ptr)) {
- tipc_node_lock(n_ptr);
- l_ptr = n_ptr->active_links[selector];
- if (likely(l_ptr)) {
- res = link_send_buf_fast(l_ptr, buf, &dummy);
- tipc_node_unlock(n_ptr);
- read_unlock_bh(&tipc_net_lock);
- return res;
- }
- tipc_node_unlock(n_ptr);
- }
- read_unlock_bh(&tipc_net_lock);
- res = msg_data_sz(buf_msg(buf));
- tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
- return res;
-}
-
-
-/*
* tipc_link_send_sections_fast: Entry for messages where the
* destination processor is known and the header is complete,
* except for total message length.
@@ -1098,8 +1065,7 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
*/
int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
- const u32 num_sect,
- unsigned int total_len,
+ const u32 num_sect, unsigned int total_len,
u32 destaddr)
{
struct tipc_msg *hdr = &sender->phdr;
@@ -1115,7 +1081,10 @@ again:
* (Must not hold any locks while building message.)
*/
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
- sender->max_pkt, !sender->user_port, &buf);
+ sender->max_pkt, &buf);
+ /* Exit if build request was invalid */
+ if (unlikely(res < 0))
+ return res;
read_lock_bh(&tipc_net_lock);
node = tipc_node_find(destaddr);
@@ -1132,10 +1101,6 @@ exit:
return res;
}
- /* Exit if build request was invalid */
- if (unlikely(res < 0))
- goto exit;
-
/* Exit if link (or bearer) is congested */
if (link_congested(l_ptr) ||
tipc_bearer_blocked(l_ptr->b_ptr)) {
@@ -1189,8 +1154,7 @@ exit:
*/
static int link_send_sections_long(struct tipc_port *sender,
struct iovec const *msg_sect,
- u32 num_sect,
- unsigned int total_len,
+ u32 num_sect, unsigned int total_len,
u32 destaddr)
{
struct tipc_link *l_ptr;
@@ -1204,6 +1168,7 @@ static int link_send_sections_long(struct tipc_port *sender,
const unchar *sect_crs;
int curr_sect;
u32 fragm_no;
+ int res = 0;
again:
fragm_no = 1;
@@ -1250,18 +1215,15 @@ again:
else
sz = fragm_rest;
- if (likely(!sender->user_port)) {
- if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
+ if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
+ res = -EFAULT;
error:
- for (; buf_chain; buf_chain = buf) {
- buf = buf_chain->next;
- kfree_skb(buf_chain);
- }
- return -EFAULT;
+ for (; buf_chain; buf_chain = buf) {
+ buf = buf_chain->next;
+ kfree_skb(buf_chain);
}
- } else
- skb_copy_to_linear_data_offset(buf, fragm_crs,
- sect_crs, sz);
+ return res;
+ }
sect_crs += sz;
sect_rest -= sz;
fragm_crs += sz;
@@ -1281,8 +1243,10 @@ error:
msg_set_fragm_no(&fragm_hdr, ++fragm_no);
prev = buf;
buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
- if (!buf)
+ if (!buf) {
+ res = -ENOMEM;
goto error;
+ }
buf->next = NULL;
prev->next = buf;
@@ -1446,7 +1410,7 @@ static void link_reset_all(unsigned long addr)
}
static void link_retransmit_failure(struct tipc_link *l_ptr,
- struct sk_buff *buf)
+ struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
@@ -1901,8 +1865,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
* Send protocol message to the other endpoint.
*/
void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
- int probe_msg, u32 gap, u32 tolerance,
- u32 priority, u32 ack_mtu)
+ int probe_msg, u32 gap, u32 tolerance,
+ u32 priority, u32 ack_mtu)
{
struct sk_buff *buf = NULL;
struct tipc_msg *msg = l_ptr->pmsg;
@@ -1988,6 +1952,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
return;
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
+ buf->priority = TC_PRIO_CONTROL;
/* Defer message if bearer is already blocked */
if (tipc_bearer_blocked(l_ptr->b_ptr)) {
@@ -2145,8 +2110,7 @@ exit:
* another bearer. Owner node is locked.
*/
static void tipc_link_tunnel(struct tipc_link *l_ptr,
- struct tipc_msg *tunnel_hdr,
- struct tipc_msg *msg,
+ struct tipc_msg *tunnel_hdr, struct tipc_msg *msg,
u32 selector)
{
struct tipc_link *tunnel;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index f2db8a87d9c..ced60e2fc4f 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -51,8 +51,8 @@ u32 tipc_msg_tot_importance(struct tipc_msg *m)
}
-void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
- u32 hsize, u32 destnode)
+void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
+ u32 destnode)
{
memset(m, 0, hsize);
msg_set_version(m);
@@ -73,8 +73,8 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
* Returns message data size or errno
*/
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
- u32 num_sect, unsigned int total_len,
- int max_size, int usrmem, struct sk_buff **buf)
+ u32 num_sect, unsigned int total_len, int max_size,
+ struct sk_buff **buf)
{
int dsz, sz, hsz, pos, res, cnt;
@@ -92,14 +92,9 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
return -ENOMEM;
skb_copy_to_linear_data(*buf, hdr, hsz);
for (res = 1, cnt = 0; res && (cnt < num_sect); cnt++) {
- if (likely(usrmem))
- res = !copy_from_user((*buf)->data + pos,
- msg_sect[cnt].iov_base,
- msg_sect[cnt].iov_len);
- else
- skb_copy_to_linear_data_offset(*buf, pos,
- msg_sect[cnt].iov_base,
- msg_sect[cnt].iov_len);
+ skb_copy_to_linear_data_offset(*buf, pos,
+ msg_sect[cnt].iov_base,
+ msg_sect[cnt].iov_len);
pos += msg_sect[cnt].iov_len;
}
if (likely(res))
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index ba2a72beea6..5e4ccf5c27d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -719,9 +719,9 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
}
u32 tipc_msg_tot_importance(struct tipc_msg *m);
-void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
- u32 hsize, u32 destnode);
+void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
+ u32 destnode);
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
- u32 num_sect, unsigned int total_len,
- int max_size, int usrmem, struct sk_buff **buf);
+ u32 num_sect, unsigned int total_len, int max_size,
+ struct sk_buff **buf);
#endif
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 24b16791431..09dcd54b04e 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -440,7 +440,7 @@ found:
* sequence overlapping with the requested sequence
*/
static void tipc_nameseq_subscribe(struct name_seq *nseq,
- struct tipc_subscription *s)
+ struct tipc_subscription *s)
{
struct sub_seq *sseq = nseq->sseqs;
@@ -662,7 +662,7 @@ exit:
* tipc_nametbl_publish - add name publication to network name tables
*/
struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
- u32 scope, u32 port_ref, u32 key)
+ u32 scope, u32 port_ref, u32 key)
{
struct publication *publ;
@@ -753,7 +753,7 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
* subseq_list - print specified sub-sequence contents into the given buffer
*/
static int subseq_list(struct sub_seq *sseq, char *buf, int len, u32 depth,
- u32 index)
+ u32 index)
{
char portIdStr[27];
const char *scope_str[] = {"", " zone", " cluster", " node"};
@@ -792,7 +792,7 @@ static int subseq_list(struct sub_seq *sseq, char *buf, int len, u32 depth,
* nameseq_list - print specified name sequence contents into the given buffer
*/
static int nameseq_list(struct name_seq *seq, char *buf, int len, u32 depth,
- u32 type, u32 lowbound, u32 upbound, u32 index)
+ u32 type, u32 lowbound, u32 upbound, u32 index)
{
struct sub_seq *sseq;
char typearea[11];
@@ -849,7 +849,7 @@ static int nametbl_header(char *buf, int len, u32 depth)
* nametbl_list - print specified name table contents into the given buffer
*/
static int nametbl_list(char *buf, int len, u32 depth_info,
- u32 type, u32 lowbound, u32 upbound)
+ u32 type, u32 lowbound, u32 upbound)
{
struct hlist_head *seq_head;
struct name_seq *seq;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 71cb4dc712d..f02f48b9a21 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -87,14 +87,15 @@ extern rwlock_t tipc_nametbl_lock;
struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space);
u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
- struct tipc_port_list *dports);
+ struct tipc_port_list *dports);
struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
- u32 scope, u32 port_ref, u32 key);
+ u32 scope, u32 port_ref, u32 key);
int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key);
struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
- u32 scope, u32 node, u32 ref, u32 key);
-struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
- u32 node, u32 ref, u32 key);
+ u32 scope, u32 node, u32 ref,
+ u32 key);
+struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower, u32 node,
+ u32 ref, u32 key);
void tipc_nametbl_subscribe(struct tipc_subscription *s);
void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(void);
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 5e34b015da4..8a7384c04ad 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -42,7 +42,7 @@
* tipc_nodesub_subscribe - create "node down" subscription for specified node
*/
void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
- void *usr_handle, net_ev_handler handle_down)
+ void *usr_handle, net_ev_handler handle_down)
{
if (in_own_node(addr)) {
node_sub->node = NULL;
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 18098cac62f..b3ed2fcab4f 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -2,7 +2,7 @@
* net/tipc/port.c: TIPC port code
*
* Copyright (c) 1992-2007, Ericsson AB
- * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -46,11 +46,7 @@
#define MAX_REJECT_SIZE 1024
-static struct sk_buff *msg_queue_head;
-static struct sk_buff *msg_queue_tail;
-
DEFINE_SPINLOCK(tipc_port_list_lock);
-static DEFINE_SPINLOCK(queue_lock);
static LIST_HEAD(ports);
static void port_handle_node_down(unsigned long ref);
@@ -119,7 +115,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
msg_set_nameupper(hdr, seq->upper);
msg_set_hdr_sz(hdr, MCAST_H_SIZE);
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
- !oport->user_port, &buf);
+ &buf);
if (unlikely(!buf))
return res;
@@ -206,14 +202,15 @@ exit:
}
/**
- * tipc_createport_raw - create a generic TIPC port
+ * tipc_createport - create a generic TIPC port
*
* Returns pointer to (locked) TIPC port, or NULL if unable to create it
*/
-struct tipc_port *tipc_createport_raw(void *usr_handle,
- u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
- void (*wakeup)(struct tipc_port *),
- const u32 importance)
+struct tipc_port *tipc_createport(struct sock *sk,
+ u32 (*dispatcher)(struct tipc_port *,
+ struct sk_buff *),
+ void (*wakeup)(struct tipc_port *),
+ const u32 importance)
{
struct tipc_port *p_ptr;
struct tipc_msg *msg;
@@ -231,14 +228,13 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
return NULL;
}
- p_ptr->usr_handle = usr_handle;
+ p_ptr->sk = sk;
p_ptr->max_pkt = MAX_PKT_DEFAULT;
p_ptr->ref = ref;
INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
p_ptr->dispatcher = dispatcher;
p_ptr->wakeup = wakeup;
- p_ptr->user_port = NULL;
k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
INIT_LIST_HEAD(&p_ptr->publications);
INIT_LIST_HEAD(&p_ptr->port_list);
@@ -275,7 +271,6 @@ int tipc_deleteport(u32 ref)
buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
tipc_nodesub_unsubscribe(&p_ptr->subscription);
}
- kfree(p_ptr->user_port);
spin_lock_bh(&tipc_port_list_lock);
list_del(&p_ptr->port_list);
@@ -448,7 +443,7 @@ int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
int res;
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
- !p_ptr->user_port, &buf);
+ &buf);
if (!buf)
return res;
@@ -668,215 +663,6 @@ void tipc_port_reinit(void)
spin_unlock_bh(&tipc_port_list_lock);
}
-
-/*
- * port_dispatcher_sigh(): Signal handler for messages destinated
- * to the tipc_port interface.
- */
-static void port_dispatcher_sigh(void *dummy)
-{
- struct sk_buff *buf;
-
- spin_lock_bh(&queue_lock);
- buf = msg_queue_head;
- msg_queue_head = NULL;
- spin_unlock_bh(&queue_lock);
-
- while (buf) {
- struct tipc_port *p_ptr;
- struct user_port *up_ptr;
- struct tipc_portid orig;
- struct tipc_name_seq dseq;
- void *usr_handle;
- int connected;
- int peer_invalid;
- int published;
- u32 message_type;
-
- struct sk_buff *next = buf->next;
- struct tipc_msg *msg = buf_msg(buf);
- u32 dref = msg_destport(msg);
-
- message_type = msg_type(msg);
- if (message_type > TIPC_DIRECT_MSG)
- goto reject; /* Unsupported message type */
-
- p_ptr = tipc_port_lock(dref);
- if (!p_ptr)
- goto reject; /* Port deleted while msg in queue */
-
- orig.ref = msg_origport(msg);
- orig.node = msg_orignode(msg);
- up_ptr = p_ptr->user_port;
- usr_handle = up_ptr->usr_handle;
- connected = p_ptr->connected;
- peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
- published = p_ptr->published;
-
- if (unlikely(msg_errcode(msg)))
- goto err;
-
- switch (message_type) {
-
- case TIPC_CONN_MSG:{
- tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
- u32 dsz;
-
- tipc_port_unlock(p_ptr);
- if (unlikely(!cb))
- goto reject;
- if (unlikely(!connected)) {
- if (tipc_connect(dref, &orig))
- goto reject;
- } else if (peer_invalid)
- goto reject;
- dsz = msg_data_sz(msg);
- if (unlikely(dsz &&
- (++p_ptr->conn_unacked >=
- TIPC_FLOW_CONTROL_WIN)))
- tipc_acknowledge(dref,
- p_ptr->conn_unacked);
- skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg), dsz);
- break;
- }
- case TIPC_DIRECT_MSG:{
- tipc_msg_event cb = up_ptr->msg_cb;
-
- tipc_port_unlock(p_ptr);
- if (unlikely(!cb || connected))
- goto reject;
- skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg),
- msg_data_sz(msg), msg_importance(msg),
- &orig);
- break;
- }
- case TIPC_MCAST_MSG:
- case TIPC_NAMED_MSG:{
- tipc_named_msg_event cb = up_ptr->named_msg_cb;
-
- tipc_port_unlock(p_ptr);
- if (unlikely(!cb || connected || !published))
- goto reject;
- dseq.type = msg_nametype(msg);
- dseq.lower = msg_nameinst(msg);
- dseq.upper = (message_type == TIPC_NAMED_MSG)
- ? dseq.lower : msg_nameupper(msg);
- skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg),
- msg_data_sz(msg), msg_importance(msg),
- &orig, &dseq);
- break;
- }
- }
- if (buf)
- kfree_skb(buf);
- buf = next;
- continue;
-err:
- switch (message_type) {
-
- case TIPC_CONN_MSG:{
- tipc_conn_shutdown_event cb =
- up_ptr->conn_err_cb;
-
- tipc_port_unlock(p_ptr);
- if (!cb || !connected || peer_invalid)
- break;
- tipc_disconnect(dref);
- skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg),
- msg_data_sz(msg), msg_errcode(msg));
- break;
- }
- case TIPC_DIRECT_MSG:{
- tipc_msg_err_event cb = up_ptr->err_cb;
-
- tipc_port_unlock(p_ptr);
- if (!cb || connected)
- break;
- skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg),
- msg_data_sz(msg), msg_errcode(msg), &orig);
- break;
- }
- case TIPC_MCAST_MSG:
- case TIPC_NAMED_MSG:{
- tipc_named_msg_err_event cb =
- up_ptr->named_err_cb;
-
- tipc_port_unlock(p_ptr);
- if (!cb || connected)
- break;
- dseq.type = msg_nametype(msg);
- dseq.lower = msg_nameinst(msg);
- dseq.upper = (message_type == TIPC_NAMED_MSG)
- ? dseq.lower : msg_nameupper(msg);
- skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg),
- msg_data_sz(msg), msg_errcode(msg), &dseq);
- break;
- }
- }
- if (buf)
- kfree_skb(buf);
- buf = next;
- continue;
-reject:
- tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
- buf = next;
- }
-}
-
-/*
- * port_dispatcher(): Dispatcher for messages destinated
- * to the tipc_port interface. Called with port locked.
- */
-static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
-{
- buf->next = NULL;
- spin_lock_bh(&queue_lock);
- if (msg_queue_head) {
- msg_queue_tail->next = buf;
- msg_queue_tail = buf;
- } else {
- msg_queue_tail = msg_queue_head = buf;
- tipc_k_signal((Handler)port_dispatcher_sigh, 0);
- }
- spin_unlock_bh(&queue_lock);
- return 0;
-}
-
-/*
- * Wake up port after congestion: Called with port locked
- */
-static void port_wakeup_sh(unsigned long ref)
-{
- struct tipc_port *p_ptr;
- struct user_port *up_ptr;
- tipc_continue_event cb = NULL;
- void *uh = NULL;
-
- p_ptr = tipc_port_lock(ref);
- if (p_ptr) {
- up_ptr = p_ptr->user_port;
- if (up_ptr) {
- cb = up_ptr->continue_event_cb;
- uh = up_ptr->usr_handle;
- }
- tipc_port_unlock(p_ptr);
- }
- if (cb)
- cb(uh, ref);
-}
-
-
-static void port_wakeup(struct tipc_port *p_ptr)
-{
- tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
-}
-
void tipc_acknowledge(u32 ref, u32 ack)
{
struct tipc_port *p_ptr;
@@ -893,50 +679,6 @@ void tipc_acknowledge(u32 ref, u32 ack)
tipc_net_route_msg(buf);
}
-/*
- * tipc_createport(): user level call.
- */
-int tipc_createport(void *usr_handle,
- unsigned int importance,
- tipc_msg_err_event error_cb,
- tipc_named_msg_err_event named_error_cb,
- tipc_conn_shutdown_event conn_error_cb,
- tipc_msg_event msg_cb,
- tipc_named_msg_event named_msg_cb,
- tipc_conn_msg_event conn_msg_cb,
- tipc_continue_event continue_event_cb, /* May be zero */
- u32 *portref)
-{
- struct user_port *up_ptr;
- struct tipc_port *p_ptr;
-
- up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
- if (!up_ptr) {
- pr_warn("Port creation failed, no memory\n");
- return -ENOMEM;
- }
- p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup,
- importance);
- if (!p_ptr) {
- kfree(up_ptr);
- return -ENOMEM;
- }
-
- p_ptr->user_port = up_ptr;
- up_ptr->usr_handle = usr_handle;
- up_ptr->ref = p_ptr->ref;
- up_ptr->err_cb = error_cb;
- up_ptr->named_err_cb = named_error_cb;
- up_ptr->conn_err_cb = conn_error_cb;
- up_ptr->msg_cb = msg_cb;
- up_ptr->named_msg_cb = named_msg_cb;
- up_ptr->conn_msg_cb = conn_msg_cb;
- up_ptr->continue_event_cb = continue_event_cb;
- *portref = p_ptr->ref;
- tipc_port_unlock(p_ptr);
- return 0;
-}
-
int tipc_portimportance(u32 ref, unsigned int *importance)
{
struct tipc_port *p_ptr;
@@ -1184,7 +926,7 @@ static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_se
int res;
res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
- MAX_MSG_SIZE, !sender->user_port, &buf);
+ MAX_MSG_SIZE, &buf);
if (likely(buf))
tipc_port_recv_msg(buf);
return res;
@@ -1322,43 +1064,3 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
}
return -ELINKCONG;
}
-
-/**
- * tipc_send_buf2port - send message buffer to port identity
- */
-int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
- struct sk_buff *buf, unsigned int dsz)
-{
- struct tipc_port *p_ptr;
- struct tipc_msg *msg;
- int res;
-
- p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
- if (!p_ptr || p_ptr->connected)
- return -EINVAL;
-
- msg = &p_ptr->phdr;
- msg_set_type(msg, TIPC_DIRECT_MSG);
- msg_set_destnode(msg, dest->node);
- msg_set_destport(msg, dest->ref);
- msg_set_hdr_sz(msg, BASIC_H_SIZE);
- msg_set_size(msg, BASIC_H_SIZE + dsz);
- if (skb_cow(buf, BASIC_H_SIZE))
- return -ENOMEM;
-
- skb_push(buf, BASIC_H_SIZE);
- skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
-
- if (in_own_node(dest->node))
- res = tipc_port_recv_msg(buf);
- else
- res = tipc_send_buf_fast(buf, dest->node);
- if (likely(res != -ELINKCONG)) {
- if (res > 0)
- p_ptr->sent++;
- return res;
- }
- if (port_unreliable(p_ptr))
- return dsz;
- return -ELINKCONG;
-}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index fb66e2e5f4d..5a7026b9c34 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -2,7 +2,7 @@
* net/tipc/port.h: Include file for TIPC port code
*
* Copyright (c) 1994-2007, Ericsson AB
- * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,60 +43,12 @@
#include "node_subscr.h"
#define TIPC_FLOW_CONTROL_WIN 512
-
-typedef void (*tipc_msg_err_event) (void *usr_handle, u32 portref,
- struct sk_buff **buf, unsigned char const *data,
- unsigned int size, int reason,
- struct tipc_portid const *attmpt_destid);
-
-typedef void (*tipc_named_msg_err_event) (void *usr_handle, u32 portref,
- struct sk_buff **buf, unsigned char const *data,
- unsigned int size, int reason,
- struct tipc_name_seq const *attmpt_dest);
-
-typedef void (*tipc_conn_shutdown_event) (void *usr_handle, u32 portref,
- struct sk_buff **buf, unsigned char const *data,
- unsigned int size, int reason);
-
-typedef void (*tipc_msg_event) (void *usr_handle, u32 portref,
- struct sk_buff **buf, unsigned char const *data,
- unsigned int size, unsigned int importance,
- struct tipc_portid const *origin);
-
-typedef void (*tipc_named_msg_event) (void *usr_handle, u32 portref,
- struct sk_buff **buf, unsigned char const *data,
- unsigned int size, unsigned int importance,
- struct tipc_portid const *orig,
- struct tipc_name_seq const *dest);
-
-typedef void (*tipc_conn_msg_event) (void *usr_handle, u32 portref,
- struct sk_buff **buf, unsigned char const *data,
- unsigned int size);
-
-typedef void (*tipc_continue_event) (void *usr_handle, u32 portref);
-
-/**
- * struct user_port - TIPC user port (used with native API)
- * @usr_handle: user-specified field
- * @ref: object reference to associated TIPC port
- *
- * <various callback routines>
- */
-struct user_port {
- void *usr_handle;
- u32 ref;
- tipc_msg_err_event err_cb;
- tipc_named_msg_err_event named_err_cb;
- tipc_conn_shutdown_event conn_err_cb;
- tipc_msg_event msg_cb;
- tipc_named_msg_event named_msg_cb;
- tipc_conn_msg_event conn_msg_cb;
- tipc_continue_event continue_event_cb;
-};
+#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
+ SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
/**
* struct tipc_port - TIPC port structure
- * @usr_handle: pointer to additional user-defined information about port
+ * @sk: pointer to socket handle
* @lock: pointer to spinlock for controlling access to port
* @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established
@@ -110,7 +62,6 @@ struct user_port {
* @port_list: adjacent ports in TIPC's global list of ports
* @dispatcher: ptr to routine which handles received messages
* @wakeup: ptr to routine to call when port is no longer congested
- * @user_port: ptr to user port associated with port (if any)
* @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts:
* @sent: # of non-empty messages sent by port
@@ -123,7 +74,7 @@ struct user_port {
* @subscription: "node down" subscription used to terminate failed connections
*/
struct tipc_port {
- void *usr_handle;
+ struct sock *sk;
spinlock_t *lock;
int connected;
u32 conn_type;
@@ -137,7 +88,6 @@ struct tipc_port {
struct list_head port_list;
u32 (*dispatcher)(struct tipc_port *, struct sk_buff *);
void (*wakeup)(struct tipc_port *);
- struct user_port *user_port;
struct list_head wait_list;
u32 waiting_pkts;
u32 sent;
@@ -156,24 +106,16 @@ struct tipc_port_list;
/*
* TIPC port manipulation routines
*/
-struct tipc_port *tipc_createport_raw(void *usr_handle,
- u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
- void (*wakeup)(struct tipc_port *), const u32 importance);
+struct tipc_port *tipc_createport(struct sock *sk,
+ u32 (*dispatcher)(struct tipc_port *,
+ struct sk_buff *),
+ void (*wakeup)(struct tipc_port *),
+ const u32 importance);
int tipc_reject_msg(struct sk_buff *buf, u32 err);
-int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode);
-
void tipc_acknowledge(u32 port_ref, u32 ack);
-int tipc_createport(void *usr_handle,
- unsigned int importance, tipc_msg_err_event error_cb,
- tipc_named_msg_err_event named_error_cb,
- tipc_conn_shutdown_event conn_error_cb, tipc_msg_event msg_cb,
- tipc_named_msg_event named_msg_cb,
- tipc_conn_msg_event conn_msg_cb,
- tipc_continue_event continue_event_cb, u32 *portref);
-
int tipc_deleteport(u32 portref);
int tipc_portimportance(u32 portref, unsigned int *importance);
@@ -186,9 +128,9 @@ int tipc_portunreturnable(u32 portref, unsigned int *isunreturnable);
int tipc_set_portunreturnable(u32 portref, unsigned int isunreturnable);
int tipc_publish(u32 portref, unsigned int scope,
- struct tipc_name_seq const *name_seq);
+ struct tipc_name_seq const *name_seq);
int tipc_withdraw(u32 portref, unsigned int scope,
- struct tipc_name_seq const *name_seq);
+ struct tipc_name_seq const *name_seq);
int tipc_connect(u32 portref, struct tipc_portid const *port);
@@ -220,9 +162,6 @@ int tipc_send2port(u32 portref, struct tipc_portid const *dest,
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len);
-int tipc_send_buf2port(u32 portref, struct tipc_portid const *dest,
- struct sk_buff *buf, unsigned int dsz);
-
int tipc_multicast(u32 portref, struct tipc_name_seq const *seq,
unsigned int section_count, struct iovec const *msg,
unsigned int total_len);
diff --git a/net/tipc/server.c b/net/tipc/server.c
new file mode 100644
index 00000000000..fd3fa57a410
--- /dev/null
+++ b/net/tipc/server.c
@@ -0,0 +1,605 @@
+/*
+ * net/tipc/server.c: TIPC server infrastructure
+ *
+ * Copyright (c) 2012-2013, Wind River Systems
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "core.h"
+#include <net/sock.h>
+
+/* Number of messages to send before rescheduling */
+#define MAX_SEND_MSG_COUNT 25
+#define MAX_RECV_MSG_COUNT 25
+#define CF_CONNECTED 1
+
+#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
+
+/**
+ * struct tipc_conn - TIPC connection structure
+ * @kref: reference counter to connection object
+ * @conid: connection identifier
+ * @sock: socket handler associated with connection
+ * @flags: indicates connection state
+ * @server: pointer to connected server
+ * @rwork: receive work item
+ * @usr_data: user-specified field
+ * @rx_action: what to do when connection socket is active
+ * @outqueue: pointer to first outbound message in queue
+ * @outqueue_lock: controll access to the outqueue
+ * @outqueue: list of connection objects for its server
+ * @swork: send work item
+ */
+struct tipc_conn {
+ struct kref kref;
+ int conid;
+ struct socket *sock;
+ unsigned long flags;
+ struct tipc_server *server;
+ struct work_struct rwork;
+ int (*rx_action) (struct tipc_conn *con);
+ void *usr_data;
+ struct list_head outqueue;
+ spinlock_t outqueue_lock;
+ struct work_struct swork;
+};
+
+/* An entry waiting to be sent */
+struct outqueue_entry {
+ struct list_head list;
+ struct kvec iov;
+ struct sockaddr_tipc dest;
+};
+
+static void tipc_recv_work(struct work_struct *work);
+static void tipc_send_work(struct work_struct *work);
+static void tipc_clean_outqueues(struct tipc_conn *con);
+
+static void tipc_conn_kref_release(struct kref *kref)
+{
+ struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
+ struct tipc_server *s = con->server;
+
+ if (con->sock) {
+ tipc_sock_release_local(con->sock);
+ con->sock = NULL;
+ }
+
+ tipc_clean_outqueues(con);
+
+ if (con->conid)
+ s->tipc_conn_shutdown(con->conid, con->usr_data);
+
+ kfree(con);
+}
+
+static void conn_put(struct tipc_conn *con)
+{
+ kref_put(&con->kref, tipc_conn_kref_release);
+}
+
+static void conn_get(struct tipc_conn *con)
+{
+ kref_get(&con->kref);
+}
+
+static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
+{
+ struct tipc_conn *con;
+
+ spin_lock_bh(&s->idr_lock);
+ con = idr_find(&s->conn_idr, conid);
+ if (con)
+ conn_get(con);
+ spin_unlock_bh(&s->idr_lock);
+ return con;
+}
+
+static void sock_data_ready(struct sock *sk, int unused)
+{
+ struct tipc_conn *con;
+
+ read_lock(&sk->sk_callback_lock);
+ con = sock2con(sk);
+ if (con && test_bit(CF_CONNECTED, &con->flags)) {
+ conn_get(con);
+ if (!queue_work(con->server->rcv_wq, &con->rwork))
+ conn_put(con);
+ }
+ read_unlock(&sk->sk_callback_lock);
+}
+
+static void sock_write_space(struct sock *sk)
+{
+ struct tipc_conn *con;
+
+ read_lock(&sk->sk_callback_lock);
+ con = sock2con(sk);
+ if (con && test_bit(CF_CONNECTED, &con->flags)) {
+ conn_get(con);
+ if (!queue_work(con->server->send_wq, &con->swork))
+ conn_put(con);
+ }
+ read_unlock(&sk->sk_callback_lock);
+}
+
+static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
+{
+ struct sock *sk = sock->sk;
+
+ write_lock_bh(&sk->sk_callback_lock);
+
+ sk->sk_data_ready = sock_data_ready;
+ sk->sk_write_space = sock_write_space;
+ sk->sk_user_data = con;
+
+ con->sock = sock;
+
+ write_unlock_bh(&sk->sk_callback_lock);
+}
+
+static void tipc_unregister_callbacks(struct tipc_conn *con)
+{
+ struct sock *sk = con->sock->sk;
+
+ write_lock_bh(&sk->sk_callback_lock);
+ sk->sk_user_data = NULL;
+ write_unlock_bh(&sk->sk_callback_lock);
+}
+
+static void tipc_close_conn(struct tipc_conn *con)
+{
+ struct tipc_server *s = con->server;
+
+ if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
+ spin_lock_bh(&s->idr_lock);
+ idr_remove(&s->conn_idr, con->conid);
+ s->idr_in_use--;
+ spin_unlock_bh(&s->idr_lock);
+
+ tipc_unregister_callbacks(con);
+
+ /* We shouldn't flush pending works as we may be in the
+ * thread. In fact the races with pending rx/tx work structs
+ * are harmless for us here as we have already deleted this
+ * connection from server connection list and set
+ * sk->sk_user_data to 0 before releasing connection object.
+ */
+ kernel_sock_shutdown(con->sock, SHUT_RDWR);
+
+ conn_put(con);
+ }
+}
+
+static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
+{
+ struct tipc_conn *con;
+ int ret;
+
+ con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
+ if (!con)
+ return ERR_PTR(-ENOMEM);
+
+ kref_init(&con->kref);
+ INIT_LIST_HEAD(&con->outqueue);
+ spin_lock_init(&con->outqueue_lock);
+ INIT_WORK(&con->swork, tipc_send_work);
+ INIT_WORK(&con->rwork, tipc_recv_work);
+
+ spin_lock_bh(&s->idr_lock);
+ ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
+ if (ret < 0) {
+ kfree(con);
+ spin_unlock_bh(&s->idr_lock);
+ return ERR_PTR(-ENOMEM);
+ }
+ con->conid = ret;
+ s->idr_in_use++;
+ spin_unlock_bh(&s->idr_lock);
+
+ set_bit(CF_CONNECTED, &con->flags);
+ con->server = s;
+
+ return con;
+}
+
+static int tipc_receive_from_sock(struct tipc_conn *con)
+{
+ struct msghdr msg = {};
+ struct tipc_server *s = con->server;
+ struct sockaddr_tipc addr;
+ struct kvec iov;
+ void *buf;
+ int ret;
+
+ buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
+ if (!buf) {
+ ret = -ENOMEM;
+ goto out_close;
+ }
+
+ iov.iov_base = buf;
+ iov.iov_len = s->max_rcvbuf_size;
+ msg.msg_name = &addr;
+ ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+ MSG_DONTWAIT);
+ if (ret <= 0) {
+ kmem_cache_free(s->rcvbuf_cache, buf);
+ goto out_close;
+ }
+
+ s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret);
+
+ kmem_cache_free(s->rcvbuf_cache, buf);
+
+ return 0;
+
+out_close:
+ if (ret != -EWOULDBLOCK)
+ tipc_close_conn(con);
+ else if (ret == 0)
+ /* Don't return success if we really got EOF */
+ ret = -EAGAIN;
+
+ return ret;
+}
+
+static int tipc_accept_from_sock(struct tipc_conn *con)
+{
+ struct tipc_server *s = con->server;
+ struct socket *sock = con->sock;
+ struct socket *newsock;
+ struct tipc_conn *newcon;
+ int ret;
+
+ ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK);
+ if (ret < 0)
+ return ret;
+
+ newcon = tipc_alloc_conn(con->server);
+ if (IS_ERR(newcon)) {
+ ret = PTR_ERR(newcon);
+ sock_release(newsock);
+ return ret;
+ }
+
+ newcon->rx_action = tipc_receive_from_sock;
+ tipc_register_callbacks(newsock, newcon);
+
+ /* Notify that new connection is incoming */
+ newcon->usr_data = s->tipc_conn_new(newcon->conid);
+
+ /* Wake up receive process in case of 'SYN+' message */
+ newsock->sk->sk_data_ready(newsock->sk, 0);
+ return ret;
+}
+
+static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
+{
+ struct tipc_server *s = con->server;
+ struct socket *sock = NULL;
+ int ret;
+
+ ret = tipc_sock_create_local(s->type, &sock);
+ if (ret < 0)
+ return NULL;
+ ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
+ (char *)&s->imp, sizeof(s->imp));
+ if (ret < 0)
+ goto create_err;
+ ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
+ if (ret < 0)
+ goto create_err;
+
+ switch (s->type) {
+ case SOCK_STREAM:
+ case SOCK_SEQPACKET:
+ con->rx_action = tipc_accept_from_sock;
+
+ ret = kernel_listen(sock, 0);
+ if (ret < 0)
+ goto create_err;
+ break;
+ case SOCK_DGRAM:
+ case SOCK_RDM:
+ con->rx_action = tipc_receive_from_sock;
+ break;
+ default:
+ pr_err("Unknown socket type %d\n", s->type);
+ goto create_err;
+ }
+ return sock;
+
+create_err:
+ sock_release(sock);
+ con->sock = NULL;
+ return NULL;
+}
+
+static int tipc_open_listening_sock(struct tipc_server *s)
+{
+ struct socket *sock;
+ struct tipc_conn *con;
+
+ con = tipc_alloc_conn(s);
+ if (IS_ERR(con))
+ return PTR_ERR(con);
+
+ sock = tipc_create_listen_sock(con);
+ if (!sock) {
+ idr_remove(&s->conn_idr, con->conid);
+ s->idr_in_use--;
+ kfree(con);
+ return -EINVAL;
+ }
+
+ tipc_register_callbacks(sock, con);
+ return 0;
+}
+
+static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
+{
+ struct outqueue_entry *entry;
+ void *buf;
+
+ entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
+ if (!entry)
+ return NULL;
+
+ buf = kmalloc(len, GFP_ATOMIC);
+ if (!buf) {
+ kfree(entry);
+ return NULL;
+ }
+
+ memcpy(buf, data, len);
+ entry->iov.iov_base = buf;
+ entry->iov.iov_len = len;
+
+ return entry;
+}
+
+static void tipc_free_entry(struct outqueue_entry *e)
+{
+ kfree(e->iov.iov_base);
+ kfree(e);
+}
+
+static void tipc_clean_outqueues(struct tipc_conn *con)
+{
+ struct outqueue_entry *e, *safe;
+
+ spin_lock_bh(&con->outqueue_lock);
+ list_for_each_entry_safe(e, safe, &con->outqueue, list) {
+ list_del(&e->list);
+ tipc_free_entry(e);
+ }
+ spin_unlock_bh(&con->outqueue_lock);
+}
+
+int tipc_conn_sendmsg(struct tipc_server *s, int conid,
+ struct sockaddr_tipc *addr, void *data, size_t len)
+{
+ struct outqueue_entry *e;
+ struct tipc_conn *con;
+
+ con = tipc_conn_lookup(s, conid);
+ if (!con)
+ return -EINVAL;
+
+ e = tipc_alloc_entry(data, len);
+ if (!e) {
+ conn_put(con);
+ return -ENOMEM;
+ }
+
+ if (addr)
+ memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc));
+
+ spin_lock_bh(&con->outqueue_lock);
+ list_add_tail(&e->list, &con->outqueue);
+ spin_unlock_bh(&con->outqueue_lock);
+
+ if (test_bit(CF_CONNECTED, &con->flags))
+ if (!queue_work(s->send_wq, &con->swork))
+ conn_put(con);
+
+ return 0;
+}
+
+void tipc_conn_terminate(struct tipc_server *s, int conid)
+{
+ struct tipc_conn *con;
+
+ con = tipc_conn_lookup(s, conid);
+ if (con) {
+ tipc_close_conn(con);
+ conn_put(con);
+ }
+}
+
+static void tipc_send_to_sock(struct tipc_conn *con)
+{
+ int count = 0;
+ struct tipc_server *s = con->server;
+ struct outqueue_entry *e;
+ struct msghdr msg;
+ int ret;
+
+ spin_lock_bh(&con->outqueue_lock);
+ while (1) {
+ e = list_entry(con->outqueue.next, struct outqueue_entry,
+ list);
+ if ((struct list_head *) e == &con->outqueue)
+ break;
+ spin_unlock_bh(&con->outqueue_lock);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT;
+
+ if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
+ msg.msg_name = &e->dest;
+ msg.msg_namelen = sizeof(struct sockaddr_tipc);
+ }
+ ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
+ e->iov.iov_len);
+ if (ret == -EWOULDBLOCK || ret == 0) {
+ cond_resched();
+ goto out;
+ } else if (ret < 0) {
+ goto send_err;
+ }
+
+ /* Don't starve users filling buffers */
+ if (++count >= MAX_SEND_MSG_COUNT) {
+ cond_resched();
+ count = 0;
+ }
+
+ spin_lock_bh(&con->outqueue_lock);
+ list_del(&e->list);
+ tipc_free_entry(e);
+ }
+ spin_unlock_bh(&con->outqueue_lock);
+out:
+ return;
+
+send_err:
+ tipc_close_conn(con);
+}
+
+static void tipc_recv_work(struct work_struct *work)
+{
+ struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
+ int count = 0;
+
+ while (test_bit(CF_CONNECTED, &con->flags)) {
+ if (con->rx_action(con))
+ break;
+
+ /* Don't flood Rx machine */
+ if (++count >= MAX_RECV_MSG_COUNT) {
+ cond_resched();
+ count = 0;
+ }
+ }
+ conn_put(con);
+}
+
+static void tipc_send_work(struct work_struct *work)
+{
+ struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
+
+ if (test_bit(CF_CONNECTED, &con->flags))
+ tipc_send_to_sock(con);
+
+ conn_put(con);
+}
+
+static void tipc_work_stop(struct tipc_server *s)
+{
+ destroy_workqueue(s->rcv_wq);
+ destroy_workqueue(s->send_wq);
+}
+
+static int tipc_work_start(struct tipc_server *s)
+{
+ s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1);
+ if (!s->rcv_wq) {
+ pr_err("can't start tipc receive workqueue\n");
+ return -ENOMEM;
+ }
+
+ s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1);
+ if (!s->send_wq) {
+ pr_err("can't start tipc send workqueue\n");
+ destroy_workqueue(s->rcv_wq);
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+int tipc_server_start(struct tipc_server *s)
+{
+ int ret;
+
+ spin_lock_init(&s->idr_lock);
+ idr_init(&s->conn_idr);
+ s->idr_in_use = 0;
+
+ s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (!s->rcvbuf_cache)
+ return -ENOMEM;
+
+ ret = tipc_work_start(s);
+ if (ret < 0) {
+ kmem_cache_destroy(s->rcvbuf_cache);
+ return ret;
+ }
+ ret = tipc_open_listening_sock(s);
+ if (ret < 0) {
+ tipc_work_stop(s);
+ kmem_cache_destroy(s->rcvbuf_cache);
+ return ret;
+ }
+ s->enabled = 1;
+ return ret;
+}
+
+void tipc_server_stop(struct tipc_server *s)
+{
+ struct tipc_conn *con;
+ int total = 0;
+ int id;
+
+ if (!s->enabled)
+ return;
+
+ s->enabled = 0;
+ spin_lock_bh(&s->idr_lock);
+ for (id = 0; total < s->idr_in_use; id++) {
+ con = idr_find(&s->conn_idr, id);
+ if (con) {
+ total++;
+ spin_unlock_bh(&s->idr_lock);
+ tipc_close_conn(con);
+ spin_lock_bh(&s->idr_lock);
+ }
+ }
+ spin_unlock_bh(&s->idr_lock);
+
+ tipc_work_stop(s);
+ kmem_cache_destroy(s->rcvbuf_cache);
+ idr_destroy(&s->conn_idr);
+}
diff --git a/net/tipc/server.h b/net/tipc/server.h
new file mode 100644
index 00000000000..98b23f20bc0
--- /dev/null
+++ b/net/tipc/server.h
@@ -0,0 +1,94 @@
+/*
+ * net/tipc/server.h: Include file for TIPC server code
+ *
+ * Copyright (c) 2012-2013, Wind River Systems
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TIPC_SERVER_H
+#define _TIPC_SERVER_H
+
+#include "core.h"
+
+#define TIPC_SERVER_NAME_LEN 32
+
+/**
+ * struct tipc_server - TIPC server structure
+ * @conn_idr: identifier set of connection
+ * @idr_lock: protect the connection identifier set
+ * @idr_in_use: amount of allocated identifier entry
+ * @rcvbuf_cache: memory cache of server receive buffer
+ * @rcv_wq: receive workqueue
+ * @send_wq: send workqueue
+ * @max_rcvbuf_size: maximum permitted receive message length
+ * @tipc_conn_new: callback will be called when new connection is incoming
+ * @tipc_conn_shutdown: callback will be called when connection is shut down
+ * @tipc_conn_recvmsg: callback will be called when message arrives
+ * @saddr: TIPC server address
+ * @name: server name
+ * @imp: message importance
+ * @type: socket type
+ * @enabled: identify whether server is launched or not
+ */
+struct tipc_server {
+ struct idr conn_idr;
+ spinlock_t idr_lock;
+ int idr_in_use;
+ struct kmem_cache *rcvbuf_cache;
+ struct workqueue_struct *rcv_wq;
+ struct workqueue_struct *send_wq;
+ int max_rcvbuf_size;
+ void *(*tipc_conn_new) (int conid);
+ void (*tipc_conn_shutdown) (int conid, void *usr_data);
+ void (*tipc_conn_recvmsg) (int conid, struct sockaddr_tipc *addr,
+ void *usr_data, void *buf, size_t len);
+ struct sockaddr_tipc *saddr;
+ const char name[TIPC_SERVER_NAME_LEN];
+ int imp;
+ int type;
+ int enabled;
+};
+
+int tipc_conn_sendmsg(struct tipc_server *s, int conid,
+ struct sockaddr_tipc *addr, void *data, size_t len);
+
+/**
+ * tipc_conn_terminate - terminate connection with server
+ *
+ * Note: Must call it in process context since it might sleep
+ */
+void tipc_conn_terminate(struct tipc_server *s, int conid);
+
+int tipc_server_start(struct tipc_server *s);
+
+void tipc_server_stop(struct tipc_server *s);
+
+#endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 515ce38e4f4..ce8249c7682 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2,7 +2,7 @@
* net/tipc/socket.c: TIPC socket API
*
* Copyright (c) 2001-2007, 2012 Ericsson AB
- * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,8 +43,6 @@
#define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */
-#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
- SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
struct tipc_sock {
@@ -65,12 +63,15 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
static void tipc_data_ready(struct sock *sk, int len);
static void tipc_write_space(struct sock *sk);
+static int release(struct socket *sock);
+static int accept(struct socket *sock, struct socket *new_sock, int flags);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
static const struct proto_ops msg_ops;
static struct proto tipc_proto;
+static struct proto tipc_proto_kern;
static int sockets_enabled;
@@ -143,7 +144,7 @@ static void reject_rx_queue(struct sock *sk)
}
/**
- * tipc_create - create a TIPC socket
+ * tipc_sk_create - create a TIPC socket
* @net: network namespace (must be default network)
* @sock: pre-allocated socket structure
* @protocol: protocol indicator (must be 0)
@@ -154,8 +155,8 @@ static void reject_rx_queue(struct sock *sk)
*
* Returns 0 on success, errno otherwise
*/
-static int tipc_create(struct net *net, struct socket *sock, int protocol,
- int kern)
+static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
+ int kern)
{
const struct proto_ops *ops;
socket_state state;
@@ -185,13 +186,17 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
}
/* Allocate socket's protocol area */
- sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+ if (!kern)
+ sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+ else
+ sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
+
if (sk == NULL)
return -ENOMEM;
/* Allocate TIPC port for socket to use */
- tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
- TIPC_LOW_IMPORTANCE);
+ tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch,
+ TIPC_LOW_IMPORTANCE);
if (unlikely(!tp_ptr)) {
sk_free(sk);
return -ENOMEM;
@@ -203,6 +208,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
sock_init_data(sock, sk);
sk->sk_backlog_rcv = backlog_rcv;
+ sk->sk_rcvbuf = sysctl_tipc_rmem[1];
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
tipc_sk(sk)->p = tp_ptr;
@@ -220,6 +226,78 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
}
/**
+ * tipc_sock_create_local - create TIPC socket from inside TIPC module
+ * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
+ *
+ * We cannot use sock_creat_kern here because it bumps module user count.
+ * Since socket owner and creator is the same module we must make sure
+ * that module count remains zero for module local sockets, otherwise
+ * we cannot do rmmod.
+ *
+ * Returns 0 on success, errno otherwise
+ */
+int tipc_sock_create_local(int type, struct socket **res)
+{
+ int rc;
+ struct sock *sk;
+
+ rc = sock_create_lite(AF_TIPC, type, 0, res);
+ if (rc < 0) {
+ pr_err("Failed to create kernel socket\n");
+ return rc;
+ }
+ tipc_sk_create(&init_net, *res, 0, 1);
+
+ sk = (*res)->sk;
+
+ return 0;
+}
+
+/**
+ * tipc_sock_release_local - release socket created by tipc_sock_create_local
+ * @sock: the socket to be released.
+ *
+ * Module reference count is not incremented when such sockets are created,
+ * so we must keep it from being decremented when they are released.
+ */
+void tipc_sock_release_local(struct socket *sock)
+{
+ release(sock);
+ sock->ops = NULL;
+ sock_release(sock);
+}
+
+/**
+ * tipc_sock_accept_local - accept a connection on a socket created
+ * with tipc_sock_create_local. Use this function to avoid that
+ * module reference count is inadvertently incremented.
+ *
+ * @sock: the accepting socket
+ * @newsock: reference to the new socket to be created
+ * @flags: socket flags
+ */
+
+int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
+ int flags)
+{
+ struct sock *sk = sock->sk;
+ int ret;
+
+ ret = sock_create_lite(sk->sk_family, sk->sk_type,
+ sk->sk_protocol, newsock);
+ if (ret < 0)
+ return ret;
+
+ ret = accept(sock, *newsock, flags);
+ if (ret < 0) {
+ sock_release(*newsock);
+ return ret;
+ }
+ (*newsock)->ops = sock->ops;
+ return ret;
+}
+
+/**
* release - destroy a TIPC socket
* @sock: socket to destroy
*
@@ -324,7 +402,9 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
return -EAFNOSUPPORT;
- if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
+ if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
+ (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
+ (addr->addr.nameseq.type != TIPC_CFG_SRV))
return -EACCES;
return (addr->scope > 0) ?
@@ -519,8 +599,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
res = -EISCONN;
goto exit;
}
- if ((tport->published) ||
- ((sock->type == SOCK_STREAM) && (total_len != 0))) {
+ if (tport->published) {
res = -EOPNOTSUPP;
goto exit;
}
@@ -810,7 +889,7 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
* Returns 0 if successful, otherwise errno
*/
static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
- struct tipc_port *tport)
+ struct tipc_port *tport)
{
u32 anc_data[3];
u32 err;
@@ -1011,8 +1090,7 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
lock_sock(sk);
- if (unlikely((sock->state == SS_UNCONNECTED) ||
- (sock->state == SS_CONNECTING))) {
+ if (unlikely((sock->state == SS_UNCONNECTED))) {
res = -ENOTCONN;
goto exit;
}
@@ -1233,10 +1311,10 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
* For all connectionless messages, by default new queue limits are
* as belows:
*
- * TIPC_LOW_IMPORTANCE (5MB)
- * TIPC_MEDIUM_IMPORTANCE (10MB)
- * TIPC_HIGH_IMPORTANCE (20MB)
- * TIPC_CRITICAL_IMPORTANCE (40MB)
+ * TIPC_LOW_IMPORTANCE (4 MB)
+ * TIPC_MEDIUM_IMPORTANCE (8 MB)
+ * TIPC_HIGH_IMPORTANCE (16 MB)
+ * TIPC_CRITICAL_IMPORTANCE (32 MB)
*
* Returns overload limit according to corresponding message importance
*/
@@ -1246,9 +1324,10 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
unsigned int limit;
if (msg_connected(msg))
- limit = CONN_OVERLOAD_LIMIT;
+ limit = sysctl_tipc_rmem[2];
else
- limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
+ limit = sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
+ msg_importance(msg);
return limit;
}
@@ -1327,7 +1406,7 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
*/
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
{
- struct sock *sk = (struct sock *)tport->usr_handle;
+ struct sock *sk = tport->sk;
u32 res;
/*
@@ -1358,7 +1437,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
*/
static void wakeupdispatch(struct tipc_port *tport)
{
- struct sock *sk = (struct sock *)tport->usr_handle;
+ struct sock *sk = tport->sk;
sk->sk_write_space(sk);
}
@@ -1531,7 +1610,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
buf = skb_peek(&sk->sk_receive_queue);
- res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
+ res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
if (res)
goto exit;
@@ -1657,8 +1736,8 @@ restart:
*
* Returns 0 on success, errno otherwise
*/
-static int setsockopt(struct socket *sock,
- int lvl, int opt, char __user *ov, unsigned int ol)
+static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov,
+ unsigned int ol)
{
struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk);
@@ -1716,8 +1795,8 @@ static int setsockopt(struct socket *sock,
*
* Returns 0 on success, errno otherwise
*/
-static int getsockopt(struct socket *sock,
- int lvl, int opt, char __user *ov, int __user *ol)
+static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov,
+ int __user *ol)
{
struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk);
@@ -1841,13 +1920,20 @@ static const struct proto_ops stream_ops = {
static const struct net_proto_family tipc_family_ops = {
.owner = THIS_MODULE,
.family = AF_TIPC,
- .create = tipc_create
+ .create = tipc_sk_create
};
static struct proto tipc_proto = {
.name = "TIPC",
.owner = THIS_MODULE,
- .obj_size = sizeof(struct tipc_sock)
+ .obj_size = sizeof(struct tipc_sock),
+ .sysctl_rmem = sysctl_tipc_rmem
+};
+
+static struct proto tipc_proto_kern = {
+ .name = "TIPC",
+ .obj_size = sizeof(struct tipc_sock),
+ .sysctl_rmem = sysctl_tipc_rmem
};
/**
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 6b42d47029a..d38bb45d82e 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -2,7 +2,7 @@
* net/tipc/subscr.c: TIPC network topology service
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005-2007, 2010-2011, Wind River Systems
+ * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,33 +41,42 @@
/**
* struct tipc_subscriber - TIPC network topology subscriber
- * @port_ref: object reference to server port connecting to subscriber
- * @lock: pointer to spinlock controlling access to subscriber's server port
- * @subscriber_list: adjacent subscribers in top. server's list of subscribers
+ * @conid: connection identifier to server connecting to subscriber
+ * @lock: controll access to subscriber
* @subscription_list: list of subscription objects for this subscriber
*/
struct tipc_subscriber {
- u32 port_ref;
- spinlock_t *lock;
- struct list_head subscriber_list;
+ int conid;
+ spinlock_t lock;
struct list_head subscription_list;
};
-/**
- * struct top_srv - TIPC network topology subscription service
- * @setup_port: reference to TIPC port that handles subscription requests
- * @subscription_count: number of active subscriptions (not subscribers!)
- * @subscriber_list: list of ports subscribing to service
- * @lock: spinlock govering access to subscriber list
- */
-struct top_srv {
- u32 setup_port;
- atomic_t subscription_count;
- struct list_head subscriber_list;
- spinlock_t lock;
+static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
+ void *usr_data, void *buf, size_t len);
+static void *subscr_named_msg_event(int conid);
+static void subscr_conn_shutdown_event(int conid, void *usr_data);
+
+static atomic_t subscription_count = ATOMIC_INIT(0);
+
+static struct sockaddr_tipc topsrv_addr __read_mostly = {
+ .family = AF_TIPC,
+ .addrtype = TIPC_ADDR_NAMESEQ,
+ .addr.nameseq.type = TIPC_TOP_SRV,
+ .addr.nameseq.lower = TIPC_TOP_SRV,
+ .addr.nameseq.upper = TIPC_TOP_SRV,
+ .scope = TIPC_NODE_SCOPE
};
-static struct top_srv topsrv;
+static struct tipc_server topsrv __read_mostly = {
+ .saddr = &topsrv_addr,
+ .imp = TIPC_CRITICAL_IMPORTANCE,
+ .type = SOCK_SEQPACKET,
+ .max_rcvbuf_size = sizeof(struct tipc_subscr),
+ .name = "topology_server",
+ .tipc_conn_recvmsg = subscr_conn_msg_event,
+ .tipc_conn_new = subscr_named_msg_event,
+ .tipc_conn_shutdown = subscr_conn_shutdown_event,
+};
/**
* htohl - convert value to endianness used by destination
@@ -81,20 +90,13 @@ static u32 htohl(u32 in, int swap)
return swap ? swab32(in) : in;
}
-/**
- * subscr_send_event - send a message containing a tipc_event to the subscriber
- *
- * Note: Must not hold subscriber's server port lock, since tipc_send() will
- * try to take the lock if the message is rejected and returned!
- */
-static void subscr_send_event(struct tipc_subscription *sub,
- u32 found_lower,
- u32 found_upper,
- u32 event,
- u32 port_ref,
+static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
+ u32 found_upper, u32 event, u32 port_ref,
u32 node)
{
- struct iovec msg_sect;
+ struct tipc_subscriber *subscriber = sub->subscriber;
+ struct kvec msg_sect;
+ int ret;
msg_sect.iov_base = (void *)&sub->evt;
msg_sect.iov_len = sizeof(struct tipc_event);
@@ -104,7 +106,10 @@ static void subscr_send_event(struct tipc_subscription *sub,
sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap);
- tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len);
+ ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
+ msg_sect.iov_base, msg_sect.iov_len);
+ if (ret < 0)
+ pr_err("Sending subscription event failed, no memory\n");
}
/**
@@ -112,10 +117,8 @@ static void subscr_send_event(struct tipc_subscription *sub,
*
* Returns 1 if there is overlap, otherwise 0.
*/
-int tipc_subscr_overlap(struct tipc_subscription *sub,
- u32 found_lower,
+int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper)
-
{
if (found_lower < sub->seq.lower)
found_lower = sub->seq.lower;
@@ -131,13 +134,9 @@ int tipc_subscr_overlap(struct tipc_subscription *sub,
*
* Protected by nameseq.lock in name_table.c
*/
-void tipc_subscr_report_overlap(struct tipc_subscription *sub,
- u32 found_lower,
- u32 found_upper,
- u32 event,
- u32 port_ref,
- u32 node,
- int must)
+void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
+ u32 found_upper, u32 event, u32 port_ref,
+ u32 node, int must)
{
if (!tipc_subscr_overlap(sub, found_lower, found_upper))
return;
@@ -147,21 +146,24 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub,
subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
}
-/**
- * subscr_timeout - subscription timeout has occurred
- */
static void subscr_timeout(struct tipc_subscription *sub)
{
- struct tipc_port *server_port;
+ struct tipc_subscriber *subscriber = sub->subscriber;
+
+ /* The spin lock per subscriber is used to protect its members */
+ spin_lock_bh(&subscriber->lock);
- /* Validate server port reference (in case subscriber is terminating) */
- server_port = tipc_port_lock(sub->server_ref);
- if (server_port == NULL)
+ /* Validate if the connection related to the subscriber is
+ * closed (in case subscriber is terminating)
+ */
+ if (subscriber->conid == 0) {
+ spin_unlock_bh(&subscriber->lock);
return;
+ }
/* Validate timeout (in case subscription is being cancelled) */
if (sub->timeout == TIPC_WAIT_FOREVER) {
- tipc_port_unlock(server_port);
+ spin_unlock_bh(&subscriber->lock);
return;
}
@@ -171,8 +173,7 @@ static void subscr_timeout(struct tipc_subscription *sub)
/* Unlink subscription from subscriber */
list_del(&sub->subscription_list);
- /* Release subscriber's server port */
- tipc_port_unlock(server_port);
+ spin_unlock_bh(&subscriber->lock);
/* Notify subscriber of timeout */
subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
@@ -181,64 +182,54 @@ static void subscr_timeout(struct tipc_subscription *sub)
/* Now destroy subscription */
k_term_timer(&sub->timer);
kfree(sub);
- atomic_dec(&topsrv.subscription_count);
+ atomic_dec(&subscription_count);
}
/**
* subscr_del - delete a subscription within a subscription list
*
- * Called with subscriber port locked.
+ * Called with subscriber lock held.
*/
static void subscr_del(struct tipc_subscription *sub)
{
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscription_list);
kfree(sub);
- atomic_dec(&topsrv.subscription_count);
+ atomic_dec(&subscription_count);
}
/**
* subscr_terminate - terminate communication with a subscriber
*
- * Called with subscriber port locked. Routine must temporarily release lock
- * to enable subscription timeout routine(s) to finish without deadlocking;
- * the lock is then reclaimed to allow caller to release it upon return.
- * (This should work even in the unlikely event some other thread creates
- * a new object reference in the interim that uses this lock; this routine will
- * simply wait for it to be released, then claim it.)
+ * Note: Must call it in process context since it might sleep.
*/
static void subscr_terminate(struct tipc_subscriber *subscriber)
{
- u32 port_ref;
+ tipc_conn_terminate(&topsrv, subscriber->conid);
+}
+
+static void subscr_release(struct tipc_subscriber *subscriber)
+{
struct tipc_subscription *sub;
struct tipc_subscription *sub_temp;
- /* Invalidate subscriber reference */
- port_ref = subscriber->port_ref;
- subscriber->port_ref = 0;
- spin_unlock_bh(subscriber->lock);
+ spin_lock_bh(&subscriber->lock);
- /* Sever connection to subscriber */
- tipc_shutdown(port_ref);
- tipc_deleteport(port_ref);
+ /* Invalidate subscriber reference */
+ subscriber->conid = 0;
/* Destroy any existing subscriptions for subscriber */
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
subscription_list) {
if (sub->timeout != TIPC_WAIT_FOREVER) {
+ spin_unlock_bh(&subscriber->lock);
k_cancel_timer(&sub->timer);
k_term_timer(&sub->timer);
+ spin_lock_bh(&subscriber->lock);
}
subscr_del(sub);
}
-
- /* Remove subscriber from topology server's subscriber list */
- spin_lock_bh(&topsrv.lock);
- list_del(&subscriber->subscriber_list);
- spin_unlock_bh(&topsrv.lock);
-
- /* Reclaim subscriber lock */
- spin_lock_bh(subscriber->lock);
+ spin_unlock_bh(&subscriber->lock);
/* Now destroy subscriber */
kfree(subscriber);
@@ -247,7 +238,7 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
/**
* subscr_cancel - handle subscription cancellation request
*
- * Called with subscriber port locked. Routine must temporarily release lock
+ * Called with subscriber lock held. Routine must temporarily release lock
* to enable the subscription timeout routine to finish without deadlocking;
* the lock is then reclaimed to allow caller to release it upon return.
*
@@ -274,10 +265,10 @@ static void subscr_cancel(struct tipc_subscr *s,
/* Cancel subscription timer (if used), then delete subscription */
if (sub->timeout != TIPC_WAIT_FOREVER) {
sub->timeout = TIPC_WAIT_FOREVER;
- spin_unlock_bh(subscriber->lock);
+ spin_unlock_bh(&subscriber->lock);
k_cancel_timer(&sub->timer);
k_term_timer(&sub->timer);
- spin_lock_bh(subscriber->lock);
+ spin_lock_bh(&subscriber->lock);
}
subscr_del(sub);
}
@@ -285,7 +276,7 @@ static void subscr_cancel(struct tipc_subscr *s,
/**
* subscr_subscribe - create subscription for subscriber
*
- * Called with subscriber port locked.
+ * Called with subscriber lock held.
*/
static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
@@ -304,7 +295,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
/* Refuse subscription if global limit exceeded */
- if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
+ if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
pr_warn("Subscription rejected, limit reached (%u)\n",
TIPC_MAX_SUBSCRIPTIONS);
subscr_terminate(subscriber);
@@ -335,10 +326,10 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
INIT_LIST_HEAD(&sub->nameseq_list);
list_add(&sub->subscription_list, &subscriber->subscription_list);
- sub->server_ref = subscriber->port_ref;
+ sub->subscriber = subscriber;
sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
- atomic_inc(&topsrv.subscription_count);
+ atomic_inc(&subscription_count);
if (sub->timeout != TIPC_WAIT_FOREVER) {
k_init_timer(&sub->timer,
(Handler)subscr_timeout, (unsigned long)sub);
@@ -348,196 +339,51 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
return sub;
}
-/**
- * subscr_conn_shutdown_event - handle termination request from subscriber
- *
- * Called with subscriber's server port unlocked.
- */
-static void subscr_conn_shutdown_event(void *usr_handle,
- u32 port_ref,
- struct sk_buff **buf,
- unsigned char const *data,
- unsigned int size,
- int reason)
+/* Handle one termination request for the subscriber */
+static void subscr_conn_shutdown_event(int conid, void *usr_data)
{
- struct tipc_subscriber *subscriber = usr_handle;
- spinlock_t *subscriber_lock;
-
- if (tipc_port_lock(port_ref) == NULL)
- return;
-
- subscriber_lock = subscriber->lock;
- subscr_terminate(subscriber);
- spin_unlock_bh(subscriber_lock);
+ subscr_release((struct tipc_subscriber *)usr_data);
}
-/**
- * subscr_conn_msg_event - handle new subscription request from subscriber
- *
- * Called with subscriber's server port unlocked.
- */
-static void subscr_conn_msg_event(void *usr_handle,
- u32 port_ref,
- struct sk_buff **buf,
- const unchar *data,
- u32 size)
+/* Handle one request to create a new subscription for the subscriber */
+static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
+ void *usr_data, void *buf, size_t len)
{
- struct tipc_subscriber *subscriber = usr_handle;
- spinlock_t *subscriber_lock;
+ struct tipc_subscriber *subscriber = usr_data;
struct tipc_subscription *sub;
- /*
- * Lock subscriber's server port (& make a local copy of lock pointer,
- * in case subscriber is deleted while processing subscription request)
- */
- if (tipc_port_lock(port_ref) == NULL)
- return;
-
- subscriber_lock = subscriber->lock;
-
- if (size != sizeof(struct tipc_subscr)) {
- subscr_terminate(subscriber);
- spin_unlock_bh(subscriber_lock);
- } else {
- sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
- spin_unlock_bh(subscriber_lock);
- if (sub != NULL) {
-
- /*
- * We must release the server port lock before adding a
- * subscription to the name table since TIPC needs to be
- * able to (re)acquire the port lock if an event message
- * issued by the subscription process is rejected and
- * returned. The subscription cannot be deleted while
- * it is being added to the name table because:
- * a) the single-threading of the native API port code
- * ensures the subscription cannot be cancelled and
- * the subscriber connection cannot be broken, and
- * b) the name table lock ensures the subscription
- * timeout code cannot delete the subscription,
- * so the subscription object is still protected.
- */
- tipc_nametbl_subscribe(sub);
- }
- }
+ spin_lock_bh(&subscriber->lock);
+ sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
+ if (sub)
+ tipc_nametbl_subscribe(sub);
+ spin_unlock_bh(&subscriber->lock);
}
-/**
- * subscr_named_msg_event - handle request to establish a new subscriber
- */
-static void subscr_named_msg_event(void *usr_handle,
- u32 port_ref,
- struct sk_buff **buf,
- const unchar *data,
- u32 size,
- u32 importance,
- struct tipc_portid const *orig,
- struct tipc_name_seq const *dest)
+
+/* Handle one request to establish a new subscriber */
+static void *subscr_named_msg_event(int conid)
{
struct tipc_subscriber *subscriber;
- u32 server_port_ref;
/* Create subscriber object */
subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
if (subscriber == NULL) {
pr_warn("Subscriber rejected, no memory\n");
- return;
+ return NULL;
}
INIT_LIST_HEAD(&subscriber->subscription_list);
- INIT_LIST_HEAD(&subscriber->subscriber_list);
-
- /* Create server port & establish connection to subscriber */
- tipc_createport(subscriber,
- importance,
- NULL,
- NULL,
- subscr_conn_shutdown_event,
- NULL,
- NULL,
- subscr_conn_msg_event,
- NULL,
- &subscriber->port_ref);
- if (subscriber->port_ref == 0) {
- pr_warn("Subscriber rejected, unable to create port\n");
- kfree(subscriber);
- return;
- }
- tipc_connect(subscriber->port_ref, orig);
-
- /* Lock server port (& save lock address for future use) */
- subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
-
- /* Add subscriber to topology server's subscriber list */
- spin_lock_bh(&topsrv.lock);
- list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
- spin_unlock_bh(&topsrv.lock);
-
- /* Unlock server port */
- server_port_ref = subscriber->port_ref;
- spin_unlock_bh(subscriber->lock);
-
- /* Send an ACK- to complete connection handshaking */
- tipc_send(server_port_ref, 0, NULL, 0);
+ subscriber->conid = conid;
+ spin_lock_init(&subscriber->lock);
- /* Handle optional subscription request */
- if (size != 0) {
- subscr_conn_msg_event(subscriber, server_port_ref,
- buf, data, size);
- }
+ return (void *)subscriber;
}
int tipc_subscr_start(void)
{
- struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
- int res;
-
- spin_lock_init(&topsrv.lock);
- INIT_LIST_HEAD(&topsrv.subscriber_list);
-
- res = tipc_createport(NULL,
- TIPC_CRITICAL_IMPORTANCE,
- NULL,
- NULL,
- NULL,
- NULL,
- subscr_named_msg_event,
- NULL,
- NULL,
- &topsrv.setup_port);
- if (res)
- goto failed;
-
- res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
- if (res) {
- tipc_deleteport(topsrv.setup_port);
- topsrv.setup_port = 0;
- goto failed;
- }
-
- return 0;
-
-failed:
- pr_err("Failed to create subscription service\n");
- return res;
+ return tipc_server_start(&topsrv);
}
void tipc_subscr_stop(void)
{
- struct tipc_subscriber *subscriber;
- struct tipc_subscriber *subscriber_temp;
- spinlock_t *subscriber_lock;
-
- if (topsrv.setup_port) {
- tipc_deleteport(topsrv.setup_port);
- topsrv.setup_port = 0;
-
- list_for_each_entry_safe(subscriber, subscriber_temp,
- &topsrv.subscriber_list,
- subscriber_list) {
- subscriber_lock = subscriber->lock;
- spin_lock_bh(subscriber_lock);
- subscr_terminate(subscriber);
- spin_unlock_bh(subscriber_lock);
- }
- }
+ tipc_server_stop(&topsrv);
}
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 218d2e07f0c..393e417bee3 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -2,7 +2,7 @@
* net/tipc/subscr.h: Include file for TIPC network topology service
*
* Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005-2007, Wind River Systems
+ * Copyright (c) 2005-2007, 2012-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,10 +37,14 @@
#ifndef _TIPC_SUBSCR_H
#define _TIPC_SUBSCR_H
+#include "server.h"
+
struct tipc_subscription;
+struct tipc_subscriber;
/**
* struct tipc_subscription - TIPC network topology subscription object
+ * @subscriber: pointer to its subscriber
* @seq: name sequence associated with subscription
* @timeout: duration of subscription (in ms)
* @filter: event filtering to be done for subscription
@@ -52,28 +56,23 @@ struct tipc_subscription;
* @evt: template for events generated by subscription
*/
struct tipc_subscription {
+ struct tipc_subscriber *subscriber;
struct tipc_name_seq seq;
u32 timeout;
u32 filter;
struct timer_list timer;
struct list_head nameseq_list;
struct list_head subscription_list;
- u32 server_ref;
int swap;
struct tipc_event evt;
};
-int tipc_subscr_overlap(struct tipc_subscription *sub,
- u32 found_lower,
+int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper);
-void tipc_subscr_report_overlap(struct tipc_subscription *sub,
- u32 found_lower,
- u32 found_upper,
- u32 event,
- u32 port_ref,
- u32 node,
- int must_report);
+void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
+ u32 found_upper, u32 event, u32 port_ref,
+ u32 node, int must);
int tipc_subscr_start(void);
diff --git a/net/tipc/sysctl.c b/net/tipc/sysctl.c
new file mode 100644
index 00000000000..f3fef93325a
--- /dev/null
+++ b/net/tipc/sysctl.c
@@ -0,0 +1,64 @@
+/*
+ * net/tipc/sysctl.c: sysctl interface to TIPC subsystem
+ *
+ * Copyright (c) 2013, Wind River Systems
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "core.h"
+
+#include <linux/sysctl.h>
+
+static struct ctl_table_header *tipc_ctl_hdr;
+
+static struct ctl_table tipc_table[] = {
+ {
+ .procname = "tipc_rmem",
+ .data = &sysctl_tipc_rmem,
+ .maxlen = sizeof(sysctl_tipc_rmem),
+ .mode = 0644,
+ .proc_handler = proc_dointvec,
+ },
+ {}
+};
+
+int tipc_register_sysctl(void)
+{
+ tipc_ctl_hdr = register_net_sysctl(&init_net, "net/tipc", tipc_table);
+ if (tipc_ctl_hdr == NULL)
+ return -ENOMEM;
+ return 0;
+}
+
+void tipc_unregister_sysctl(void)
+{
+ unregister_net_sysctl_table(tipc_ctl_hdr);
+}