summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-06-15 12:32:04 -0700
committerSage Weil <sage@inktank.com>2012-06-15 12:32:04 -0700
commit9a64e8e0ace51b309fdcff4b4754b3649250382a (patch)
tree1f0d75c196c5ab0408c55ed6cf3a152f1f921e15 /net/tipc
parentf3dea7edd3d449fe7a6d402c1ce56a294b985261 (diff)
parentf8f5701bdaf9134b1f90e5044a82c66324d2073f (diff)
Merge tag 'v3.5-rc1'
Linux 3.5-rc1 Conflicts: net/ceph/messenger.c
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/addr.c3
-rw-r--r--net/tipc/addr.h19
-rw-r--r--net/tipc/bcast.c356
-rw-r--r--net/tipc/bcast.h5
-rw-r--r--net/tipc/bearer.c29
-rw-r--r--net/tipc/bearer.h4
-rw-r--r--net/tipc/config.c51
-rw-r--r--net/tipc/config.h1
-rw-r--r--net/tipc/core.c21
-rw-r--r--net/tipc/core.h56
-rw-r--r--net/tipc/discover.c91
-rw-r--r--net/tipc/eth_media.c19
-rw-r--r--net/tipc/handler.c1
-rw-r--r--net/tipc/link.c409
-rw-r--r--net/tipc/link.h6
-rw-r--r--net/tipc/log.c16
-rw-r--r--net/tipc/log.h1
-rw-r--r--net/tipc/msg.c5
-rw-r--r--net/tipc/msg.h36
-rw-r--r--net/tipc/name_distr.c138
-rw-r--r--net/tipc/name_table.c146
-rw-r--r--net/tipc/name_table.h5
-rw-r--r--net/tipc/net.c20
-rw-r--r--net/tipc/node.c99
-rw-r--r--net/tipc/node.h39
-rw-r--r--net/tipc/node_subscr.c5
-rw-r--r--net/tipc/node_subscr.h1
-rw-r--r--net/tipc/port.c176
-rw-r--r--net/tipc/port.h56
-rw-r--r--net/tipc/ref.c13
-rw-r--r--net/tipc/socket.c114
-rw-r--r--net/tipc/subscr.c47
-rw-r--r--net/tipc/subscr.h2
34 files changed, 741 insertions, 1251 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 521d24d04ab..6cd55d671d3 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -9,5 +9,3 @@ tipc-y += addr.o bcast.o bearer.o config.o \
name_distr.o subscr.o name_table.o net.o \
netlink.o node.o node_subscr.o port.o ref.o \
socket.o log.o eth_media.o
-
-# End of file
diff --git a/net/tipc/addr.c b/net/tipc/addr.c
index a6fdab33877..357b74b26f9 100644
--- a/net/tipc/addr.c
+++ b/net/tipc/addr.c
@@ -45,7 +45,6 @@
*
* Returns 1 if domain address is valid, otherwise 0
*/
-
int tipc_addr_domain_valid(u32 addr)
{
u32 n = tipc_node(addr);
@@ -66,7 +65,6 @@ int tipc_addr_domain_valid(u32 addr)
*
* Returns 1 if address can be used, otherwise 0
*/
-
int tipc_addr_node_valid(u32 addr)
{
return tipc_addr_domain_valid(addr) && tipc_node(addr);
@@ -86,7 +84,6 @@ int tipc_in_scope(u32 domain, u32 addr)
/**
* tipc_addr_scope - convert message lookup domain to a 2-bit scope value
*/
-
int tipc_addr_scope(u32 domain)
{
if (likely(!domain))
diff --git a/net/tipc/addr.h b/net/tipc/addr.h
index e4f35afe320..60b00ab93d7 100644
--- a/net/tipc/addr.h
+++ b/net/tipc/addr.h
@@ -50,18 +50,33 @@ static inline u32 tipc_cluster_mask(u32 addr)
return addr & TIPC_CLUSTER_MASK;
}
-static inline int in_own_cluster(u32 addr)
+static inline int in_own_cluster_exact(u32 addr)
{
return !((addr ^ tipc_own_addr) >> 12);
}
/**
+ * in_own_node - test for node inclusion; <0.0.0> always matches
+ */
+static inline int in_own_node(u32 addr)
+{
+ return (addr == tipc_own_addr) || !addr;
+}
+
+/**
+ * in_own_cluster - test for cluster inclusion; <0.0.0> always matches
+ */
+static inline int in_own_cluster(u32 addr)
+{
+ return in_own_cluster_exact(addr) || !addr;
+}
+
+/**
* addr_domain - convert 2-bit scope value to equivalent message lookup domain
*
* Needed when address of a named message must be looked up a second time
* after a network hop.
*/
-
static inline u32 addr_domain(u32 sc)
{
if (likely(sc == TIPC_NODE_SCOPE))
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 8eb87b11d10..2625f5ebe3e 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -73,7 +73,6 @@ struct tipc_bcbearer_pair {
* large local variables within multicast routines. Concurrent access is
* prevented through use of the spinlock "bc_lock".
*/
-
struct tipc_bcbearer {
struct tipc_bearer bearer;
struct tipc_media media;
@@ -92,7 +91,6 @@ struct tipc_bcbearer {
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
-
struct tipc_bclink {
struct tipc_link link;
struct tipc_node node;
@@ -157,44 +155,18 @@ u32 tipc_bclink_get_last_sent(void)
return bcl->fsm_msg_cnt;
}
-/**
- * bclink_set_gap - set gap according to contents of current deferred pkt queue
- *
- * Called with 'node' locked, bc_lock unlocked
- */
-
-static void bclink_set_gap(struct tipc_node *n_ptr)
-{
- struct sk_buff *buf = n_ptr->bclink.deferred_head;
-
- n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
- mod(n_ptr->bclink.last_in);
- if (unlikely(buf != NULL))
- n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
-}
-
-/**
- * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
- *
- * This mechanism endeavours to prevent all nodes in network from trying
- * to ACK or NACK at the same time.
- *
- * Note: TIPC uses a different trigger to distribute ACKs than it does to
- * distribute NACKs, but tries to use the same spacing (divide by 16).
- */
-
-static int bclink_ack_allowed(u32 n)
+static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
{
- return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
+ node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
+ seqno : node->bclink.last_sent;
}
-/**
+/*
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
* Called with bc_lock locked
*/
-
struct tipc_node *tipc_bclink_retransmit_to(void)
{
return bclink->retransmit_to;
@@ -207,7 +179,6 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
*
* Called with bc_lock locked
*/
-
static void bclink_retransmit_pkt(u32 after, u32 to)
{
struct sk_buff *buf;
@@ -225,7 +196,6 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
*
* Node is locked, bc_lock unlocked.
*/
-
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *crs;
@@ -281,7 +251,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
if (bcbuf_acks(crs) == 0) {
bcl->first_out = next;
bcl->out_queue_size--;
- buf_discard(crs);
+ kfree_skb(crs);
released = 1;
}
crs = next;
@@ -300,143 +270,94 @@ exit:
spin_unlock_bh(&bc_lock);
}
-/**
- * bclink_send_ack - unicast an ACK msg
+/*
+ * tipc_bclink_update_link_state - update broadcast link state
*
* tipc_net_lock and node lock set
*/
-
-static void bclink_send_ack(struct tipc_node *n_ptr)
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{
- struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
+ struct sk_buff *buf;
- if (l_ptr != NULL)
- tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
-}
+ /* Ignore "stale" link state info */
-/**
- * bclink_send_nack- broadcast a NACK msg
- *
- * tipc_net_lock and node lock set
- */
+ if (less_eq(last_sent, n_ptr->bclink.last_in))
+ return;
-static void bclink_send_nack(struct tipc_node *n_ptr)
-{
- struct sk_buff *buf;
- struct tipc_msg *msg;
+ /* Update link synchronization state; quit if in sync */
+
+ bclink_update_last_sent(n_ptr, last_sent);
+
+ if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
+ return;
+
+ /* Update out-of-sync state; quit if loss is still unconfirmed */
- if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
+ if ((++n_ptr->bclink.oos_state) == 1) {
+ if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
+ return;
+ n_ptr->bclink.oos_state++;
+ }
+
+ /* Don't NACK if one has been recently sent (or seen) */
+
+ if (n_ptr->bclink.oos_state & 0x1)
return;
+ /* Send NACK */
+
buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
- msg = buf_msg(buf);
+ struct tipc_msg *msg = buf_msg(buf);
+
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
- INT_H_SIZE, n_ptr->addr);
+ INT_H_SIZE, n_ptr->addr);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
- msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
- msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
- msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
- msg_set_bcast_tag(msg, tipc_own_tag);
+ msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
+ msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
+ msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
+ ? buf_seqno(n_ptr->bclink.deferred_head) - 1
+ : n_ptr->bclink.last_sent);
+ spin_lock_bh(&bc_lock);
tipc_bearer_send(&bcbearer->bearer, buf, NULL);
bcl->stats.sent_nacks++;
- buf_discard(buf);
-
- /*
- * Ensure we doesn't send another NACK msg to the node
- * until 16 more deferred messages arrive from it
- * (i.e. helps prevent all nodes from NACK'ing at same time)
- */
+ spin_unlock_bh(&bc_lock);
+ kfree_skb(buf);
- n_ptr->bclink.nack_sync = tipc_own_tag;
+ n_ptr->bclink.oos_state++;
}
}
-/**
- * tipc_bclink_check_gap - send a NACK if a sequence gap exists
+/*
+ * bclink_peek_nack - monitor retransmission requests sent by other nodes
*
- * tipc_net_lock and node lock set
- */
-
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
-{
- if (!n_ptr->bclink.supported ||
- less_eq(last_sent, mod(n_ptr->bclink.last_in)))
- return;
-
- bclink_set_gap(n_ptr);
- if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
- n_ptr->bclink.gap_to = last_sent;
- bclink_send_nack(n_ptr);
-}
-
-/**
- * tipc_bclink_peek_nack - process a NACK msg meant for another node
+ * Delay any upcoming NACK by this node if another node has already
+ * requested the first message this node is going to ask for.
*
* Only tipc_net_lock set.
*/
-
-static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
+static void bclink_peek_nack(struct tipc_msg *msg)
{
- struct tipc_node *n_ptr = tipc_node_find(dest);
- u32 my_after, my_to;
+ struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
- if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
+ if (unlikely(!n_ptr))
return;
+
tipc_node_lock(n_ptr);
- /*
- * Modify gap to suppress unnecessary NACKs from this node
- */
- my_after = n_ptr->bclink.gap_after;
- my_to = n_ptr->bclink.gap_to;
-
- if (less_eq(gap_after, my_after)) {
- if (less(my_after, gap_to) && less(gap_to, my_to))
- n_ptr->bclink.gap_after = gap_to;
- else if (less_eq(my_to, gap_to))
- n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
- } else if (less_eq(gap_after, my_to)) {
- if (less_eq(my_to, gap_to))
- n_ptr->bclink.gap_to = gap_after;
- } else {
- /*
- * Expand gap if missing bufs not in deferred queue:
- */
- struct sk_buff *buf = n_ptr->bclink.deferred_head;
- u32 prev = n_ptr->bclink.gap_to;
- for (; buf; buf = buf->next) {
- u32 seqno = buf_seqno(buf);
+ if (n_ptr->bclink.supported &&
+ (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
+ (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
+ n_ptr->bclink.oos_state = 2;
- if (mod(seqno - prev) != 1) {
- buf = NULL;
- break;
- }
- if (seqno == gap_after)
- break;
- prev = seqno;
- }
- if (buf == NULL)
- n_ptr->bclink.gap_to = gap_after;
- }
- /*
- * Some nodes may send a complementary NACK now:
- */
- if (bclink_ack_allowed(sender_tag + 1)) {
- if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
- bclink_send_nack(n_ptr);
- bclink_set_gap(n_ptr);
- }
- }
tipc_node_unlock(n_ptr);
}
-/**
+/*
* tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
*/
-
int tipc_bclink_send_msg(struct sk_buff *buf)
{
int res;
@@ -445,7 +366,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
if (!bclink->bcast_nodes.count) {
res = msg_data_sz(buf_msg(buf));
- buf_discard(buf);
+ kfree_skb(buf);
goto exit;
}
@@ -460,19 +381,43 @@ exit:
return res;
}
-/**
+/*
+ * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
+ *
+ * Called with both sending node's lock and bc_lock taken.
+ */
+static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
+{
+ bclink_update_last_sent(node, seqno);
+ node->bclink.last_in = seqno;
+ node->bclink.oos_state = 0;
+ bcl->stats.recv_info++;
+
+ /*
+ * Unicast an ACK periodically, ensuring that
+ * all nodes in the cluster don't ACK at the same time
+ */
+
+ if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
+ tipc_link_send_proto_msg(
+ node->active_links[node->addr & 1],
+ STATE_MSG, 0, 0, 0, 0, 0);
+ bcl->stats.sent_acks++;
+ }
+}
+
+/*
* tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
*
* tipc_net_lock is read_locked, no other locks set
*/
-
void tipc_bclink_recv_pkt(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_node *node;
u32 next_in;
u32 seqno;
- struct sk_buff *deferred;
+ int deferred;
/* Screen out unwanted broadcast messages */
@@ -487,6 +432,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
if (unlikely(!node->bclink.supported))
goto unlock;
+ /* Handle broadcast protocol message */
+
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
if (msg_type(msg) != STATE_MSG)
goto unlock;
@@ -501,89 +448,118 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
spin_unlock_bh(&bc_lock);
} else {
tipc_node_unlock(node);
- tipc_bclink_peek_nack(msg_destnode(msg),
- msg_bcast_tag(msg),
- msg_bcgap_after(msg),
- msg_bcgap_to(msg));
+ bclink_peek_nack(msg);
}
goto exit;
}
/* Handle in-sequence broadcast message */
-receive:
- next_in = mod(node->bclink.last_in + 1);
seqno = msg_seqno(msg);
+ next_in = mod(node->bclink.last_in + 1);
if (likely(seqno == next_in)) {
- bcl->stats.recv_info++;
- node->bclink.last_in++;
- bclink_set_gap(node);
- if (unlikely(bclink_ack_allowed(seqno))) {
- bclink_send_ack(node);
- bcl->stats.sent_acks++;
- }
+receive:
+ /* Deliver message to destination */
+
if (likely(msg_isdata(msg))) {
+ spin_lock_bh(&bc_lock);
+ bclink_accept_pkt(node, seqno);
+ spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_port_recv_mcast(buf, NULL);
else
- buf_discard(buf);
+ kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
+ spin_lock_bh(&bc_lock);
+ bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
+ spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
+ int ret = tipc_link_recv_fragment(&node->bclink.defragm,
+ &buf, &msg);
+ if (ret < 0)
+ goto unlock;
+ spin_lock_bh(&bc_lock);
+ bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
- if (tipc_link_recv_fragment(&node->bclink.defragm,
- &buf, &msg))
+ if (ret > 0)
bcl->stats.recv_fragmented++;
+ spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
tipc_net_route_msg(buf);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
+ spin_lock_bh(&bc_lock);
+ bclink_accept_pkt(node, seqno);
+ spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
tipc_named_recv(buf);
} else {
+ spin_lock_bh(&bc_lock);
+ bclink_accept_pkt(node, seqno);
+ spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
- buf_discard(buf);
+ kfree_skb(buf);
}
buf = NULL;
+
+ /* Determine new synchronization state */
+
tipc_node_lock(node);
- deferred = node->bclink.deferred_head;
- if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) {
- buf = deferred;
- msg = buf_msg(buf);
- node->bclink.deferred_head = deferred->next;
- goto receive;
- }
- } else if (less(next_in, seqno)) {
- u32 gap_after = node->bclink.gap_after;
- u32 gap_to = node->bclink.gap_to;
-
- if (tipc_link_defer_pkt(&node->bclink.deferred_head,
- &node->bclink.deferred_tail,
- buf)) {
- node->bclink.nack_sync++;
- bcl->stats.deferred_recv++;
- if (seqno == mod(gap_after + 1))
- node->bclink.gap_after = seqno;
- else if (less(gap_after, seqno) && less(seqno, gap_to))
- node->bclink.gap_to = seqno;
+ if (unlikely(!tipc_node_is_up(node)))
+ goto unlock;
+
+ if (node->bclink.last_in == node->bclink.last_sent)
+ goto unlock;
+
+ if (!node->bclink.deferred_head) {
+ node->bclink.oos_state = 1;
+ goto unlock;
}
+
+ msg = buf_msg(node->bclink.deferred_head);
+ seqno = msg_seqno(msg);
+ next_in = mod(next_in + 1);
+ if (seqno != next_in)
+ goto unlock;
+
+ /* Take in-sequence message from deferred queue & deliver it */
+
+ buf = node->bclink.deferred_head;
+ node->bclink.deferred_head = buf->next;
+ node->bclink.deferred_size--;
+ goto receive;
+ }
+
+ /* Handle out-of-sequence broadcast message */
+
+ if (less(next_in, seqno)) {
+ deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
+ &node->bclink.deferred_tail,
+ buf);
+ node->bclink.deferred_size += deferred;
+ bclink_update_last_sent(node, seqno);
buf = NULL;
- if (bclink_ack_allowed(node->bclink.nack_sync)) {
- if (gap_to != gap_after)
- bclink_send_nack(node);
- bclink_set_gap(node);
- }
- } else {
+ } else
+ deferred = 0;
+
+ spin_lock_bh(&bc_lock);
+
+ if (deferred)
+ bcl->stats.deferred_recv++;
+ else
bcl->stats.duplicates++;
- }
+
+ spin_unlock_bh(&bc_lock);
+
unlock:
tipc_node_unlock(node);
exit:
- buf_discard(buf);
+ kfree_skb(buf);
}
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
@@ -602,7 +578,6 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
* Returns 0 (packet sent successfully) under all circumstances,
* since the broadcast link's pseudo-bearer never blocks
*/
-
static int tipc_bcbearer_send(struct sk_buff *buf,
struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
@@ -615,7 +590,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
* preparation is skipped for broadcast link protocol messages
* since they are sent in an unreliable manner and don't need it
*/
-
if (likely(!msg_non_seq(buf_msg(buf)))) {
struct tipc_msg *msg;
@@ -632,7 +606,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
}
/* Send buffer over bearers until all targets reached */
-
bcbearer->remains = bclink->bcast_nodes;
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
@@ -674,7 +647,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/
-
void tipc_bcbearer_sort(void)
{
struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
@@ -685,7 +657,6 @@ void tipc_bcbearer_sort(void)
spin_lock_bh(&bc_lock);
/* Group bearers by priority (can assume max of two per priority) */
-
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
@@ -701,7 +672,6 @@ void tipc_bcbearer_sort(void)
}
/* Create array of bearer pairs for broadcasting */
-
bp_curr = bcbearer->bpairs;
memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
@@ -831,7 +801,6 @@ void tipc_bclink_stop(void)
/**
* tipc_nmap_add - add a node to a node map
*/
-
void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
@@ -847,7 +816,6 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
/**
* tipc_nmap_remove - remove a node from a node map
*/
-
void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
@@ -866,7 +834,6 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
* @nm_b: input node map B
* @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
*/
-
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff)
@@ -892,7 +859,6 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,
/**
* tipc_port_list_add - add a port to a port list, ensuring no duplicates
*/
-
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
{
struct tipc_port_list *item = pl_ptr;
@@ -926,7 +892,6 @@ void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
* tipc_port_list_free - free dynamically created entries in port_list chain
*
*/
-
void tipc_port_list_free(struct tipc_port_list *pl_ptr)
{
struct tipc_port_list *item;
@@ -937,4 +902,3 @@ void tipc_port_list_free(struct tipc_port_list *pl_ptr)
kfree(item);
}
}
-
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index b009666c60b..a93306557e0 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -45,7 +45,6 @@
* @count: # of nodes in set
* @map: bitmap of node identifiers that are in the set
*/
-
struct tipc_node_map {
u32 count;
u32 map[MAX_NODES / WSIZE];
@@ -59,7 +58,6 @@ struct tipc_node_map {
* @next: pointer to next entry in list
* @ports: array of port references
*/
-
struct tipc_port_list {
int count;
struct tipc_port_list *next;
@@ -77,7 +75,6 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
/**
* tipc_nmap_equal - test for equality of node maps
*/
-
static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b)
{
return !memcmp(nm_a, nm_b, sizeof(*nm_a));
@@ -96,7 +93,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf);
void tipc_bclink_recv_pkt(struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(void);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno);
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 329fb659fae..a297e3a2e3e 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -53,7 +53,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr);
*
* Returns 1 if media name is valid, otherwise 0.
*/
-
static int media_name_valid(const char *name)
{
u32 len;
@@ -67,7 +66,6 @@ static int media_name_valid(const char *name)
/**
* tipc_media_find - locates specified media object by name
*/
-
struct tipc_media *tipc_media_find(const char *name)
{
u32 i;
@@ -82,7 +80,6 @@ struct tipc_media *tipc_media_find(const char *name)
/**
* media_find_id - locates specified media object by type identifier
*/
-
static struct tipc_media *media_find_id(u8 type)
{
u32 i;
@@ -99,7 +96,6 @@ static struct tipc_media *media_find_id(u8 type)
*
* Bearers for this media type must be activated separately at a later stage.
*/
-
int tipc_register_media(struct tipc_media *m_ptr)
{
int res = -EINVAL;
@@ -134,7 +130,6 @@ exit:
/**
* tipc_media_addr_printf - record media address in print buffer
*/
-
void tipc_media_addr_printf(struct print_buf *pb, struct tipc_media_addr *a)
{
char addr_str[MAX_ADDR_STR];
@@ -156,7 +151,6 @@ void tipc_media_addr_printf(struct print_buf *pb, struct tipc_media_addr *a)
/**
* tipc_media_get_names - record names of registered media in buffer
*/
-
struct sk_buff *tipc_media_get_names(void)
{
struct sk_buff *buf;
@@ -183,7 +177,6 @@ struct sk_buff *tipc_media_get_names(void)
*
* Returns 1 if bearer name is valid, otherwise 0.
*/
-
static int bearer_name_validate(const char *name,
struct tipc_bearer_names *name_parts)
{
@@ -194,7 +187,6 @@ static int bearer_name_validate(const char *name,
u32 if_len;
/* copy bearer name & ensure length is OK */
-
name_copy[TIPC_MAX_BEARER_NAME - 1] = 0;
/* need above in case non-Posix strncpy() doesn't pad with nulls */
strncpy(name_copy, name, TIPC_MAX_BEARER_NAME);
@@ -202,7 +194,6 @@ static int bearer_name_validate(const char *name,
return 0;
/* ensure all component parts of bearer name are present */
-
media_name = name_copy;
if_name = strchr(media_name, ':');
if (if_name == NULL)
@@ -212,7 +203,6 @@ static int bearer_name_validate(const char *name,
if_len = strlen(if_name) + 1;
/* validate component parts of bearer name */
-
if ((media_len <= 1) || (media_len > TIPC_MAX_MEDIA_NAME) ||
(if_len <= 1) || (if_len > TIPC_MAX_IF_NAME) ||
(strspn(media_name, tipc_alphabet) != (media_len - 1)) ||
@@ -220,7 +210,6 @@ static int bearer_name_validate(const char *name,
return 0;
/* return bearer name components, if necessary */
-
if (name_parts) {
strcpy(name_parts->media_name, media_name);
strcpy(name_parts->if_name, if_name);
@@ -231,7 +220,6 @@ static int bearer_name_validate(const char *name,
/**
* tipc_bearer_find - locates bearer object with matching bearer name
*/
-
struct tipc_bearer *tipc_bearer_find(const char *name)
{
struct tipc_bearer *b_ptr;
@@ -247,7 +235,6 @@ struct tipc_bearer *tipc_bearer_find(const char *name)
/**
* tipc_bearer_find_interface - locates bearer object with matching interface name
*/
-
struct tipc_bearer *tipc_bearer_find_interface(const char *if_name)
{
struct tipc_bearer *b_ptr;
@@ -267,7 +254,6 @@ struct tipc_bearer *tipc_bearer_find_interface(const char *if_name)
/**
* tipc_bearer_get_names - record names of bearers in buffer
*/
-
struct sk_buff *tipc_bearer_get_names(void)
{
struct sk_buff *buf;
@@ -363,7 +349,6 @@ void tipc_continue(struct tipc_bearer *b_ptr)
* the bearer is congested. 'tipc_net_lock' is in read_lock here
* bearer.lock is busy
*/
-
static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr,
struct tipc_link *l_ptr)
{
@@ -377,7 +362,6 @@ static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr,
* the bearer is congested. 'tipc_net_lock' is in read_lock here,
* bearer.lock is free
*/
-
void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
{
spin_lock_bh(&b_ptr->lock);
@@ -410,7 +394,6 @@ int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
/**
* tipc_bearer_congested - determines if bearer is currently congested
*/
-
int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
{
if (unlikely(b_ptr->blocked))
@@ -423,7 +406,6 @@ int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
/**
* tipc_enable_bearer - enable bearer with the given name
*/
-
int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
{
struct tipc_bearer *b_ptr;
@@ -435,7 +417,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
u32 i;
int res = -EINVAL;
- if (tipc_mode != TIPC_NET_MODE) {
+ if (!tipc_own_addr) {
warn("Bearer <%s> rejected, not supported in standalone mode\n",
name);
return -ENOPROTOOPT;
@@ -449,15 +431,14 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
if (tipc_in_scope(disc_domain, tipc_own_addr)) {
disc_domain = tipc_own_addr & TIPC_CLUSTER_MASK;
res = 0; /* accept any node in own cluster */
- } else if (in_own_cluster(disc_domain))
+ } else if (in_own_cluster_exact(disc_domain))
res = 0; /* accept specified node in own cluster */
}
if (res) {
warn("Bearer <%s> rejected, illegal discovery domain\n", name);
return -EINVAL;
}
- if ((priority < TIPC_MIN_LINK_PRI ||
- priority > TIPC_MAX_LINK_PRI) &&
+ if ((priority > TIPC_MAX_LINK_PRI) &&
(priority != TIPC_MEDIA_LINK_PRI)) {
warn("Bearer <%s> rejected, illegal priority\n", name);
return -EINVAL;
@@ -542,7 +523,6 @@ exit:
* tipc_block_bearer(): Block the bearer with the given name,
* and reset all its links
*/
-
int tipc_block_bearer(const char *name)
{
struct tipc_bearer *b_ptr = NULL;
@@ -574,11 +554,10 @@ int tipc_block_bearer(const char *name)
}
/**
- * bearer_disable -
+ * bearer_disable
*
* Note: This routine assumes caller holds tipc_net_lock.
*/
-
static void bearer_disable(struct tipc_bearer *b_ptr)
{
struct tipc_link *l_ptr;
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index d3eac56b8c2..e3b2be37fb3 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -49,7 +49,6 @@
* - media type identifier located at offset 3
* - remaining bytes vary according to media type
*/
-
#define TIPC_MEDIA_ADDR_SIZE 20
#define TIPC_MEDIA_TYPE_OFFSET 3
@@ -64,7 +63,6 @@
* @media_id: TIPC media type identifier
* @broadcast: non-zero if address is a broadcast address
*/
-
struct tipc_media_addr {
u8 value[TIPC_MEDIA_ADDR_SIZE];
u8 media_id;
@@ -89,7 +87,6 @@ struct tipc_bearer;
* @type_id: TIPC media identifier
* @name: media name
*/
-
struct tipc_media {
int (*send_msg)(struct sk_buff *buf,
struct tipc_bearer *b_ptr,
@@ -216,7 +213,6 @@ void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
* send routine always returns success -- even if the buffer was not sent --
* and let TIPC's link code deal with the undelivered message.
*/
-
static inline int tipc_bearer_send(struct tipc_bearer *b_ptr,
struct sk_buff *buf,
struct tipc_media_addr *dest)
diff --git a/net/tipc/config.c b/net/tipc/config.c
index 4785bf26cdf..c5712a34381 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -131,7 +131,6 @@ static struct sk_buff *tipc_show_stats(void)
tipc_printf(&pb, "TIPC version " TIPC_MOD_VER "\n");
/* Use additional tipc_printf()'s to return more info ... */
-
str_len = tipc_printbuf_validate(&pb);
skb_put(buf, TLV_SPACE(str_len));
TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
@@ -179,7 +178,7 @@ static struct sk_buff *cfg_set_own_addr(void)
if (!tipc_addr_node_valid(addr))
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (node address)");
- if (tipc_mode == TIPC_NET_MODE)
+ if (tipc_own_addr)
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (cannot change node address once assigned)");
@@ -191,7 +190,6 @@ static struct sk_buff *cfg_set_own_addr(void)
* configuration commands can't be received until a local configuration
* command to enable the first bearer is received and processed.
*/
-
spin_unlock_bh(&config_lock);
tipc_core_start_net(addr);
spin_lock_bh(&config_lock);
@@ -218,7 +216,7 @@ static struct sk_buff *cfg_set_max_publications(void)
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
- if (value != delimit(value, 1, 65535))
+ if (value < 1 || value > 65535)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (max publications must be 1-65535)");
tipc_max_publications = value;
@@ -233,7 +231,7 @@ static struct sk_buff *cfg_set_max_subscriptions(void)
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
- if (value != delimit(value, 1, 65535))
+ if (value < 1 || value > 65535)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (max subscriptions must be 1-65535");
tipc_max_subscriptions = value;
@@ -249,14 +247,11 @@ static struct sk_buff *cfg_set_max_ports(void)
value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
if (value == tipc_max_ports)
return tipc_cfg_reply_none();
- if (value != delimit(value, 127, 65535))
+ if (value < 127 || value > 65535)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (max ports must be 127-65535)");
- if (tipc_mode != TIPC_NOT_RUNNING)
- return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
- " (cannot change max ports while TIPC is active)");
- tipc_max_ports = value;
- return tipc_cfg_reply_none();
+ return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
+ " (cannot change max ports while TIPC is active)");
}
static struct sk_buff *cfg_set_netid(void)
@@ -268,10 +263,10 @@ static struct sk_buff *cfg_set_netid(void)
value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
if (value == tipc_net_id)
return tipc_cfg_reply_none();
- if (value != delimit(value, 1, 9999))
+ if (value < 1 || value > 9999)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (network id must be 1-9999)");
- if (tipc_mode == TIPC_NET_MODE)
+ if (tipc_own_addr)
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (cannot change network id once TIPC has joined a network)");
tipc_net_id = value;
@@ -286,14 +281,12 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
spin_lock_bh(&config_lock);
/* Save request and reply details in a well-known location */
-
req_tlv_area = request_area;
req_tlv_space = request_space;
rep_headroom = reply_headroom;
/* Check command authorization */
-
- if (likely(orig_node == tipc_own_addr)) {
+ if (likely(in_own_node(orig_node))) {
/* command is permitted */
} else if (cmd >= 0x8000) {
rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
@@ -313,7 +306,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
}
/* Call appropriate processing routine */
-
switch (cmd) {
case TIPC_CMD_NOOP:
rep_tlv_buf = tipc_cfg_reply_none();
@@ -436,7 +428,6 @@ static void cfg_named_msg_event(void *userdata,
struct sk_buff *rep_buf;
/* Validate configuration message header (ignore invalid message) */
-
req_hdr = (struct tipc_cfg_msg_hdr *)msg;
if ((size < sizeof(*req_hdr)) ||
(size != TCM_ALIGN(ntohl(req_hdr->tcm_len))) ||
@@ -446,7 +437,6 @@ static void cfg_named_msg_event(void *userdata,
}
/* Generate reply for request (if can't, return request) */
-
rep_buf = tipc_cfg_do_cmd(orig->node,
ntohs(req_hdr->tcm_type),
msg + sizeof(*req_hdr),
@@ -481,7 +471,7 @@ int tipc_cfg_init(void)
seq.type = TIPC_CFG_SRV;
seq.lower = seq.upper = tipc_own_addr;
- res = tipc_nametbl_publish_rsv(config_port_ref, TIPC_ZONE_SCOPE, &seq);
+ res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq);
if (res)
goto failed;
@@ -492,10 +482,23 @@ failed:
return res;
}
+void tipc_cfg_reinit(void)
+{
+ struct tipc_name_seq seq;
+ int res;
+
+ seq.type = TIPC_CFG_SRV;
+ seq.lower = seq.upper = 0;
+ tipc_withdraw(config_port_ref, TIPC_ZONE_SCOPE, &seq);
+
+ seq.lower = seq.upper = tipc_own_addr;
+ res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq);
+ if (res)
+ err("Unable to reinitialize configuration service\n");
+}
+
void tipc_cfg_stop(void)
{
- if (config_port_ref) {
- tipc_deleteport(config_port_ref);
- config_port_ref = 0;
- }
+ tipc_deleteport(config_port_ref);
+ config_port_ref = 0;
}
diff --git a/net/tipc/config.h b/net/tipc/config.h
index 80da6ebc278..1f252f3fa05 100644
--- a/net/tipc/config.h
+++ b/net/tipc/config.h
@@ -66,6 +66,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd,
int headroom);
int tipc_cfg_init(void);
+void tipc_cfg_reinit(void);
void tipc_cfg_stop(void);
#endif
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 2691cd57b8a..f7b95239ebd 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -52,15 +52,12 @@
#endif
/* global variables used by multiple sub-systems within TIPC */
-
-int tipc_mode = TIPC_NOT_RUNNING;
int tipc_random;
const char tipc_alphabet[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_.";
/* configurable TIPC parameters */
-
u32 tipc_own_addr;
int tipc_max_ports;
int tipc_max_subscriptions;
@@ -78,7 +75,6 @@ int tipc_remote_management;
* NOTE: Headroom is reserved to allow prepending of a data link header.
* There may also be unrequested tailroom present at the buffer's end.
*/
-
struct sk_buff *tipc_buf_acquire(u32 size)
{
struct sk_buff *skb;
@@ -96,7 +92,6 @@ struct sk_buff *tipc_buf_acquire(u32 size)
/**
* tipc_core_stop_net - shut down TIPC networking sub-systems
*/
-
static void tipc_core_stop_net(void)
{
tipc_net_stop();
@@ -106,7 +101,6 @@ static void tipc_core_stop_net(void)
/**
* start_net - start TIPC networking sub-systems
*/
-
int tipc_core_start_net(unsigned long addr)
{
int res;
@@ -122,14 +116,8 @@ int tipc_core_start_net(unsigned long addr)
/**
* tipc_core_stop - switch TIPC from SINGLE NODE to NOT RUNNING mode
*/
-
static void tipc_core_stop(void)
{
- if (tipc_mode != TIPC_NODE_MODE)
- return;
-
- tipc_mode = TIPC_NOT_RUNNING;
-
tipc_netlink_stop();
tipc_handler_stop();
tipc_cfg_stop();
@@ -143,16 +131,11 @@ static void tipc_core_stop(void)
/**
* tipc_core_start - switch TIPC from NOT RUNNING to SINGLE NODE mode
*/
-
static int tipc_core_start(void)
{
int res;
- if (tipc_mode != TIPC_NOT_RUNNING)
- return -ENOPROTOOPT;
-
get_random_bytes(&tipc_random, sizeof(tipc_random));
- tipc_mode = TIPC_NODE_MODE;
res = tipc_handler_start();
if (!res)
@@ -160,9 +143,9 @@ static int tipc_core_start(void)
if (!res)
res = tipc_nametbl_init();
if (!res)
- res = tipc_k_signal((Handler)tipc_subscr_start, 0);
+ res = tipc_subscr_start();
if (!res)
- res = tipc_k_signal((Handler)tipc_cfg_init, 0);
+ res = tipc_cfg_init();
if (!res)
res = tipc_netlink_start();
if (!res)
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 2761af36d14..2a9bb99537b 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -85,7 +85,6 @@ void tipc_printf(struct print_buf *, const char *fmt, ...);
/*
* TIPC_OUTPUT is the destination print buffer for system messages.
*/
-
#ifndef TIPC_OUTPUT
#define TIPC_OUTPUT TIPC_LOG
#endif
@@ -102,7 +101,6 @@ void tipc_printf(struct print_buf *, const char *fmt, ...);
/*
* DBG_OUTPUT is the destination print buffer for debug messages.
*/
-
#ifndef DBG_OUTPUT
#define DBG_OUTPUT TIPC_LOG
#endif
@@ -126,20 +124,11 @@ void tipc_msg_dbg(struct print_buf *, struct tipc_msg *, const char *);
/*
* TIPC-specific error codes
*/
-
#define ELINKCONG EAGAIN /* link congestion <=> resource unavailable */
/*
- * TIPC operating mode routines
- */
-#define TIPC_NOT_RUNNING 0
-#define TIPC_NODE_MODE 1
-#define TIPC_NET_MODE 2
-
-/*
* Global configuration variables
*/
-
extern u32 tipc_own_addr;
extern int tipc_max_ports;
extern int tipc_max_subscriptions;
@@ -150,8 +139,6 @@ extern int tipc_remote_management;
/*
* Other global variables
*/
-
-extern int tipc_mode;
extern int tipc_random;
extern const char tipc_alphabet[];
@@ -159,7 +146,6 @@ extern const char tipc_alphabet[];
/*
* Routines available to privileged subsystems
*/
-
extern int tipc_core_start_net(unsigned long);
extern int tipc_handler_start(void);
extern void tipc_handler_stop(void);
@@ -168,20 +154,9 @@ extern void tipc_netlink_stop(void);
extern int tipc_socket_init(void);
extern void tipc_socket_stop(void);
-static inline int delimit(int val, int min, int max)
-{
- if (val > max)
- return max;
- if (val < min)
- return min;
- return val;
-}
-
-
/*
* TIPC timer and signal code
*/
-
typedef void (*Handler) (unsigned long);
u32 tipc_k_signal(Handler routine, unsigned long argument);
@@ -194,7 +169,6 @@ u32 tipc_k_signal(Handler routine, unsigned long argument);
*
* Timer must be initialized before use (and terminated when no longer needed).
*/
-
static inline void k_init_timer(struct timer_list *timer, Handler routine,
unsigned long argument)
{
@@ -214,7 +188,6 @@ static inline void k_init_timer(struct timer_list *timer, Handler routine,
* then an additional jiffy is added to account for the fact that
* the starting time may be in the middle of the current jiffy.
*/
-
static inline void k_start_timer(struct timer_list *timer, unsigned long msec)
{
mod_timer(timer, jiffies + msecs_to_jiffies(msec) + 1);
@@ -230,7 +203,6 @@ static inline void k_start_timer(struct timer_list *timer, unsigned long msec)
* WARNING: Must not be called when holding locks required by the timer's
* timeout routine, otherwise deadlock can occur on SMP systems!
*/
-
static inline void k_cancel_timer(struct timer_list *timer)
{
del_timer_sync(timer);
@@ -247,12 +219,10 @@ static inline void k_cancel_timer(struct timer_list *timer)
* (Do not "enhance" this routine to automatically cancel an active timer,
* otherwise deadlock can arise when a timeout routine calls k_term_timer.)
*/
-
static inline void k_term_timer(struct timer_list *timer)
{
}
-
/*
* TIPC message buffer code
*
@@ -262,7 +232,6 @@ static inline void k_term_timer(struct timer_list *timer)
* Note: Headroom should be a multiple of 4 to ensure the TIPC header fields
* are word aligned for quicker access
*/
-
#define BUF_HEADROOM LL_MAX_HEADER
struct tipc_skb_cb {
@@ -271,7 +240,6 @@ struct tipc_skb_cb {
#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
-
static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
{
return (struct tipc_msg *)skb->data;
@@ -279,28 +247,4 @@ static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
extern struct sk_buff *tipc_buf_acquire(u32 size);
-/**
- * buf_discard - frees a TIPC message buffer
- * @skb: message buffer
- *
- * Frees a message buffer. If passed NULL, just returns.
- */
-
-static inline void buf_discard(struct sk_buff *skb)
-{
- kfree_skb(skb);
-}
-
-/**
- * buf_linearize - convert a TIPC message buffer into a single contiguous piece
- * @skb: message buffer
- *
- * Returns 0 on success.
- */
-
-static inline int buf_linearize(struct sk_buff *skb)
-{
- return skb_linearize(skb);
-}
-
#endif
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index a00e5f81156..ae054cfe179 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -70,7 +70,6 @@ struct tipc_link_req {
* @dest_domain: network domain of node(s) which should respond to message
* @b_ptr: ptr to bearer issuing message
*/
-
static struct sk_buff *tipc_disc_init_msg(u32 type,
u32 dest_domain,
struct tipc_bearer *b_ptr)
@@ -82,6 +81,7 @@ static struct sk_buff *tipc_disc_init_msg(u32 type,
msg = buf_msg(buf);
tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
msg_set_non_seq(msg, 1);
+ msg_set_node_sig(msg, tipc_random);
msg_set_dest_domain(msg, dest_domain);
msg_set_bc_netid(msg, tipc_net_id);
b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg));
@@ -95,7 +95,6 @@ static struct sk_buff *tipc_disc_init_msg(u32 type,
* @node_addr: duplicated node address
* @media_addr: media address advertised by duplicated node
*/
-
static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
struct tipc_media_addr *media_addr)
{
@@ -116,25 +115,26 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
* @buf: buffer containing message
* @b_ptr: bearer that message arrived on
*/
-
void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
{
struct tipc_node *n_ptr;
struct tipc_link *link;
- struct tipc_media_addr media_addr, *addr;
+ struct tipc_media_addr media_addr;
struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf);
u32 dest = msg_dest_domain(msg);
u32 orig = msg_prevnode(msg);
u32 net_id = msg_bc_netid(msg);
u32 type = msg_type(msg);
+ u32 signature = msg_node_sig(msg);
+ int addr_mismatch;
int link_fully_up;
media_addr.broadcast = 1;
b_ptr->media->msg2addr(&media_addr, msg_media_addr(msg));
- buf_discard(buf);
+ kfree_skb(buf);
- /* Validate discovery message from requesting node */
+ /* Ensure message from node is valid and communication is permitted */
if (net_id != tipc_net_id)
return;
if (media_addr.broadcast)
@@ -162,15 +162,50 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
}
tipc_node_lock(n_ptr);
+ /* Prepare to validate requesting node's signature and media address */
link = n_ptr->links[b_ptr->identity];
+ addr_mismatch = (link != NULL) &&
+ memcmp(&link->media_addr, &media_addr, sizeof(media_addr));
- /* Create a link endpoint for this bearer, if necessary */
- if (!link) {
- link = tipc_link_create(n_ptr, b_ptr, &media_addr);
- if (!link) {
+ /*
+ * Ensure discovery message's signature is correct
+ *
+ * If signature is incorrect and there is no working link to the node,
+ * accept the new signature but invalidate all existing links to the
+ * node so they won't re-activate without a new discovery message.
+ *
+ * If signature is incorrect and the requested link to the node is
+ * working, accept the new signature. (This is an instance of delayed
+ * rediscovery, where a link endpoint was able to re-establish contact
+ * with its peer endpoint on a node that rebooted before receiving a
+ * discovery message from that node.)
+ *
+ * If signature is incorrect and there is a working link to the node
+ * that is not the requested link, reject the request (must be from
+ * a duplicate node).
+ */
+ if (signature != n_ptr->signature) {
+ if (n_ptr->working_links == 0) {
+ struct tipc_link *curr_link;
+ int i;
+
+ for (i = 0; i < MAX_BEARERS; i++) {
+ curr_link = n_ptr->links[i];
+ if (curr_link) {
+ memset(&curr_link->media_addr, 0,
+ sizeof(media_addr));
+ tipc_link_reset(curr_link);
+ }
+ }
+ addr_mismatch = (link != NULL);
+ } else if (tipc_link_is_up(link) && !addr_mismatch) {
+ /* delayed rediscovery */
+ } else {
+ disc_dupl_alert(b_ptr, orig, &media_addr);
tipc_node_unlock(n_ptr);
return;
}
+ n_ptr->signature = signature;
}
/*
@@ -183,17 +218,25 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
* the new media address and reset the link to ensure it starts up
* cleanly.
*/
- addr = &link->media_addr;
- if (memcmp(addr, &media_addr, sizeof(*addr))) {
- if (tipc_link_is_up(link) || (!link->started)) {
+ if (addr_mismatch) {
+ if (tipc_link_is_up(link)) {
disc_dupl_alert(b_ptr, orig, &media_addr);
tipc_node_unlock(n_ptr);
return;
+ } else {
+ memcpy(&link->media_addr, &media_addr,
+ sizeof(media_addr));
+ tipc_link_reset(link);
+ }
+ }
+
+ /* Create a link endpoint for this bearer, if necessary */
+ if (!link) {
+ link = tipc_link_create(n_ptr, b_ptr, &media_addr);
+ if (!link) {
+ tipc_node_unlock(n_ptr);
+ return;
}
- warn("Resetting link <%s>, peer interface address changed\n",
- link->name);
- memcpy(addr, &media_addr, sizeof(*addr));
- tipc_link_reset(link);
}
/* Accept discovery message & send response, if necessary */
@@ -203,7 +246,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
if (rbuf) {
b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
- buf_discard(rbuf);
+ kfree_skb(rbuf);
}
}
@@ -217,7 +260,6 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
* Reinitiates discovery process if discovery object has no associated nodes
* and is either not currently searching or is searching at a slow rate
*/
-
static void disc_update(struct tipc_link_req *req)
{
if (!req->num_nodes) {
@@ -233,7 +275,6 @@ static void disc_update(struct tipc_link_req *req)
* tipc_disc_add_dest - increment set of discovered nodes
* @req: ptr to link request structure
*/
-
void tipc_disc_add_dest(struct tipc_link_req *req)
{
req->num_nodes++;
@@ -243,7 +284,6 @@ void tipc_disc_add_dest(struct tipc_link_req *req)
* tipc_disc_remove_dest - decrement set of discovered nodes
* @req: ptr to link request structure
*/
-
void tipc_disc_remove_dest(struct tipc_link_req *req)
{
req->num_nodes--;
@@ -254,7 +294,6 @@ void tipc_disc_remove_dest(struct tipc_link_req *req)
* disc_send_msg - send link setup request message
* @req: ptr to link request structure
*/
-
static void disc_send_msg(struct tipc_link_req *req)
{
if (!req->bearer->blocked)
@@ -267,7 +306,6 @@ static void disc_send_msg(struct tipc_link_req *req)
*
* Called whenever a link setup request timer associated with a bearer expires.
*/
-
static void disc_timeout(struct tipc_link_req *req)
{
int max_delay;
@@ -275,7 +313,6 @@ static void disc_timeout(struct tipc_link_req *req)
spin_lock_bh(&req->bearer->lock);
/* Stop searching if only desired node has been found */
-
if (tipc_node(req->domain) && req->num_nodes) {
req->timer_intv = TIPC_LINK_REQ_INACTIVE;
goto exit;
@@ -288,7 +325,6 @@ static void disc_timeout(struct tipc_link_req *req)
* hold at fast polling rate if don't have any associated nodes,
* otherwise hold at slow polling rate
*/
-
disc_send_msg(req);
req->timer_intv *= 2;
@@ -312,7 +348,6 @@ exit:
*
* Returns 0 if successful, otherwise -errno.
*/
-
int tipc_disc_create(struct tipc_bearer *b_ptr,
struct tipc_media_addr *dest, u32 dest_domain)
{
@@ -344,12 +379,10 @@ int tipc_disc_create(struct tipc_bearer *b_ptr,
* tipc_disc_delete - destroy object sending periodic link setup requests
* @req: ptr to link request structure
*/
-
void tipc_disc_delete(struct tipc_link_req *req)
{
k_cancel_timer(&req->timer);
k_term_timer(&req->timer);
- buf_discard(req->buf);
+ kfree_skb(req->buf);
kfree(req);
}
-
diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c
index 527e3f0e165..90ac9bfa7ab 100644
--- a/net/tipc/eth_media.c
+++ b/net/tipc/eth_media.c
@@ -48,7 +48,6 @@
* @tipc_packet_type: used in binding TIPC to Ethernet driver
* @cleanup: work item used when disabling bearer
*/
-
struct eth_bearer {
struct tipc_bearer *bearer;
struct net_device *dev;
@@ -67,7 +66,6 @@ static struct notifier_block notifier;
* Media-dependent "value" field stores MAC address in first 6 bytes
* and zeroes out the remaining bytes.
*/
-
static void eth_media_addr_set(struct tipc_media_addr *a, char *mac)
{
memcpy(a->value, mac, ETH_ALEN);
@@ -79,7 +77,6 @@ static void eth_media_addr_set(struct tipc_media_addr *a, char *mac)
/**
* send_msg - send a TIPC message out over an Ethernet interface
*/
-
static int send_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr,
struct tipc_media_addr *dest)
{
@@ -115,7 +112,6 @@ static int send_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr,
* ignores packets sent using Ethernet multicast, and traffic sent to other
* nodes (which can happen if interface is running in promiscuous mode).
*/
-
static int recv_msg(struct sk_buff *buf, struct net_device *dev,
struct packet_type *pt, struct net_device *orig_dev)
{
@@ -140,7 +136,6 @@ static int recv_msg(struct sk_buff *buf, struct net_device *dev,
/**
* enable_bearer - attach TIPC bearer to an Ethernet interface
*/
-
static int enable_bearer(struct tipc_bearer *tb_ptr)
{
struct net_device *dev = NULL;
@@ -151,7 +146,6 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
int pending_dev = 0;
/* Find unused Ethernet bearer structure */
-
while (eb_ptr->dev) {
if (!eb_ptr->bearer)
pending_dev++;
@@ -160,7 +154,6 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
}
/* Find device with specified name */
-
read_lock(&dev_base_lock);
for_each_netdev(&init_net, pdev) {
if (!strncmp(pdev->name, driver_name, IFNAMSIZ)) {
@@ -174,7 +167,6 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
return -ENODEV;
/* Create Ethernet bearer for device */
-
eb_ptr->dev = dev;
eb_ptr->tipc_packet_type.type = htons(ETH_P_TIPC);
eb_ptr->tipc_packet_type.dev = dev;
@@ -184,7 +176,6 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
dev_add_pack(&eb_ptr->tipc_packet_type);
/* Associate TIPC bearer with Ethernet bearer */
-
eb_ptr->bearer = tb_ptr;
tb_ptr->usr_handle = (void *)eb_ptr;
tb_ptr->mtu = dev->mtu;
@@ -198,7 +189,6 @@ static int enable_bearer(struct tipc_bearer *tb_ptr)
*
* This routine must be invoked from a work queue because it can sleep.
*/
-
static void cleanup_bearer(struct work_struct *work)
{
struct eth_bearer *eb_ptr =
@@ -216,7 +206,6 @@ static void cleanup_bearer(struct work_struct *work)
* then get worker thread to complete bearer cleanup. (Can't do cleanup
* here because cleanup code needs to sleep and caller holds spinlocks.)
*/
-
static void disable_bearer(struct tipc_bearer *tb_ptr)
{
struct eth_bearer *eb_ptr = (struct eth_bearer *)tb_ptr->usr_handle;
@@ -232,7 +221,6 @@ static void disable_bearer(struct tipc_bearer *tb_ptr)
* Change the state of the Ethernet bearer (if any) associated with the
* specified device.
*/
-
static int recv_notification(struct notifier_block *nb, unsigned long evt,
void *dv)
{
@@ -281,7 +269,6 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
/**
* eth_addr2str - convert Ethernet address to string
*/
-
static int eth_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size)
{
if (str_size < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */
@@ -294,7 +281,6 @@ static int eth_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size)
/**
* eth_str2addr - convert string to Ethernet address
*/
-
static int eth_str2addr(struct tipc_media_addr *a, char *str_buf)
{
char mac[ETH_ALEN];
@@ -314,7 +300,6 @@ static int eth_str2addr(struct tipc_media_addr *a, char *str_buf)
/**
* eth_str2addr - convert Ethernet address format to message header format
*/
-
static int eth_addr2msg(struct tipc_media_addr *a, char *msg_area)
{
memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE);
@@ -326,7 +311,6 @@ static int eth_addr2msg(struct tipc_media_addr *a, char *msg_area)
/**
* eth_str2addr - convert message header address format to Ethernet format
*/
-
static int eth_msg2addr(struct tipc_media_addr *a, char *msg_area)
{
if (msg_area[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_ETH)
@@ -339,7 +323,6 @@ static int eth_msg2addr(struct tipc_media_addr *a, char *msg_area)
/*
* Ethernet media registration info
*/
-
static struct tipc_media eth_media_info = {
.send_msg = send_msg,
.enable_bearer = enable_bearer,
@@ -363,7 +346,6 @@ static struct tipc_media eth_media_info = {
* Register Ethernet media type with TIPC bearer code. Also register
* with OS for notifications about device state changes.
*/
-
int tipc_eth_media_start(void)
{
int res;
@@ -386,7 +368,6 @@ int tipc_eth_media_start(void)
/**
* tipc_eth_media_stop - deactivate Ethernet bearer support
*/
-
void tipc_eth_media_stop(void)
{
if (!eth_started)
diff --git a/net/tipc/handler.c b/net/tipc/handler.c
index 274c98e164b..9c6f22ff1c6 100644
--- a/net/tipc/handler.c
+++ b/net/tipc/handler.c
@@ -129,4 +129,3 @@ void tipc_handler_stop(void)
kmem_cache_destroy(tipc_queue_item_cache);
}
-
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac1832a66f8..7a614f43549 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -45,13 +45,11 @@
/*
* Out-of-range value for link session numbers
*/
-
#define INVALID_SESSION 0x10000
/*
* Link state events:
*/
-
#define STARTING_EVT 856384768 /* link processing trigger */
#define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */
#define TIMEOUT_EVT 560817u /* link timer expired */
@@ -67,7 +65,6 @@
/*
* State value stored in 'exp_msg_count'
*/
-
#define START_CHANGEOVER 100000u
/**
@@ -77,7 +74,6 @@
* @addr_peer: network address of node at far end
* @if_peer: name of interface at far end
*/
-
struct tipc_link_name {
u32 addr_local;
char if_local[TIPC_MAX_IF_NAME];
@@ -105,7 +101,6 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
/*
* Simple link routines
*/
-
static unsigned int align(unsigned int i)
{
return (i + 3) & ~3u;
@@ -143,7 +138,6 @@ static u32 link_last_sent(struct tipc_link *l_ptr)
/*
* Simple non-static link routines (i.e. referenced outside this file)
*/
-
int tipc_link_is_up(struct tipc_link *l_ptr)
{
if (!l_ptr)
@@ -164,7 +158,6 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
*
* Returns 1 if link name is valid, otherwise 0.
*/
-
static int link_name_validate(const char *name,
struct tipc_link_name *name_parts)
{
@@ -180,7 +173,6 @@ static int link_name_validate(const char *name,
u32 if_peer_len;
/* copy link name & ensure length is OK */
-
name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
/* need above in case non-Posix strncpy() doesn't pad with nulls */
strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
@@ -188,7 +180,6 @@ static int link_name_validate(const char *name,
return 0;
/* ensure all component parts of link name are present */
-
addr_local = name_copy;
if_local = strchr(addr_local, ':');
if (if_local == NULL)
@@ -206,7 +197,6 @@ static int link_name_validate(const char *name,
if_peer_len = strlen(if_peer) + 1;
/* validate component parts of link name */
-
if ((sscanf(addr_local, "%u.%u.%u%c",
&z_local, &c_local, &n_local, &dummy) != 3) ||
(sscanf(addr_peer, "%u.%u.%u%c",
@@ -220,7 +210,6 @@ static int link_name_validate(const char *name,
return 0;
/* return link name components, if necessary */
-
if (name_parts) {
name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
strcpy(name_parts->if_local, if_local);
@@ -239,13 +228,11 @@ static int link_name_validate(const char *name,
* another thread because tipc_link_delete() always cancels the link timer before
* tipc_node_delete() is called.)
*/
-
static void link_timeout(struct tipc_link *l_ptr)
{
tipc_node_lock(l_ptr->owner);
/* update counters used in statistical profiling of send traffic */
-
l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
l_ptr->stats.queue_sz_counts++;
@@ -278,7 +265,6 @@ static void link_timeout(struct tipc_link *l_ptr)
}
/* do all other link processing performed on a periodic basis */
-
link_check_defragm_bufs(l_ptr);
link_state_event(l_ptr, TIMEOUT_EVT);
@@ -302,7 +288,6 @@ static void link_set_timer(struct tipc_link *l_ptr, u32 time)
*
* Returns pointer to link.
*/
-
struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
struct tipc_bearer *b_ptr,
const struct tipc_media_addr *media_addr)
@@ -383,7 +368,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
* This routine must not grab the node lock until after link timer cancellation
* to avoid a potential deadlock situation.
*/
-
void tipc_link_delete(struct tipc_link *l_ptr)
{
if (!l_ptr) {
@@ -419,7 +403,6 @@ static void link_start(struct tipc_link *l_ptr)
* Schedules port for renewed sending of messages after link congestion
* has abated.
*/
-
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
{
struct tipc_port *p_ptr;
@@ -476,7 +459,6 @@ exit:
* link_release_outqueue - purge link's outbound message queue
* @l_ptr: pointer to link
*/
-
static void link_release_outqueue(struct tipc_link *l_ptr)
{
struct sk_buff *buf = l_ptr->first_out;
@@ -484,7 +466,7 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
while (buf) {
next = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
}
l_ptr->first_out = NULL;
@@ -495,7 +477,6 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
* tipc_link_reset_fragments - purge link's inbound message fragments queue
* @l_ptr: pointer to link
*/
-
void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{
struct sk_buff *buf = l_ptr->defragm_buf;
@@ -503,7 +484,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
while (buf) {
next = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
}
l_ptr->defragm_buf = NULL;
@@ -513,7 +494,6 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
* tipc_link_stop - purge all inbound and outbound messages associated with link
* @l_ptr: pointer to link
*/
-
void tipc_link_stop(struct tipc_link *l_ptr)
{
struct sk_buff *buf;
@@ -522,20 +502,20 @@ void tipc_link_stop(struct tipc_link *l_ptr)
buf = l_ptr->oldest_deferred_in;
while (buf) {
next = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
}
buf = l_ptr->first_out;
while (buf) {
next = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
}
tipc_link_reset_fragments(l_ptr);
- buf_discard(l_ptr->proto_msg_queue);
+ kfree_skb(l_ptr->proto_msg_queue);
l_ptr->proto_msg_queue = NULL;
}
@@ -569,14 +549,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
}
/* Clean up all queues: */
-
link_release_outqueue(l_ptr);
- buf_discard(l_ptr->proto_msg_queue);
+ kfree_skb(l_ptr->proto_msg_queue);
l_ptr->proto_msg_queue = NULL;
buf = l_ptr->oldest_deferred_in;
while (buf) {
struct sk_buff *next = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
}
if (!list_empty(&l_ptr->waiting_ports))
@@ -611,8 +590,7 @@ static void link_activate(struct tipc_link *l_ptr)
* @l_ptr: pointer to link
* @event: state machine event to process
*/
-
-static void link_state_event(struct tipc_link *l_ptr, unsigned event)
+static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
{
struct tipc_link *other;
u32 cont_intv = l_ptr->continuity_interval;
@@ -785,7 +763,6 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned event)
* link_bundle_buf(): Append contents of a buffer to
* the tail of an existing one.
*/
-
static int link_bundle_buf(struct tipc_link *l_ptr,
struct sk_buff *bundler,
struct sk_buff *buf)
@@ -810,7 +787,7 @@ static int link_bundle_buf(struct tipc_link *l_ptr,
skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
msg_set_size(bundler_msg, to_pos + size);
msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
- buf_discard(buf);
+ kfree_skb(buf);
l_ptr->stats.sent_bundled++;
return 1;
}
@@ -860,7 +837,6 @@ static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
* inside TIPC when the 'fast path' in tipc_send_buf
* has failed, and from link_send()
*/
-
int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
@@ -871,17 +847,14 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
u32 queue_limit = l_ptr->queue_limit[imp];
u32 max_packet = l_ptr->max_pkt;
- msg_set_prevnode(msg, tipc_own_addr); /* If routed message */
-
/* Match msg importance against queue limits: */
-
if (unlikely(queue_size >= queue_limit)) {
if (imp <= TIPC_CRITICAL_IMPORTANCE) {
link_schedule_port(l_ptr, msg_origport(msg), size);
- buf_discard(buf);
+ kfree_skb(buf);
return -ELINKCONG;
}
- buf_discard(buf);
+ kfree_skb(buf);
if (imp > CONN_MANAGER) {
warn("Resetting link <%s>, send queue full", l_ptr->name);
tipc_link_reset(l_ptr);
@@ -890,12 +863,10 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
}
/* Fragmentation needed ? */
-
if (size > max_packet)
return link_send_long_buf(l_ptr, buf);
- /* Packet can be queued or sent: */
-
+ /* Packet can be queued or sent. */
if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
!link_congested(l_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
@@ -909,13 +880,11 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
}
return dsz;
}
- /* Congestion: can message be bundled ?: */
-
+ /* Congestion: can message be bundled ? */
if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
(msg_user(msg) != MSG_FRAGMENTER)) {
/* Try adding message to an existing bundle */
-
if (l_ptr->next_out &&
link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
@@ -923,7 +892,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
}
/* Try creating a new bundle */
-
if (size <= max_packet * 2 / 3) {
struct sk_buff *bundler = tipc_buf_acquire(max_packet);
struct tipc_msg bundler_hdr;
@@ -953,7 +921,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
* not been selected yet, and the the owner node is not locked
* Called by TIPC internal users, e.g. the name distributor
*/
-
int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
{
struct tipc_link *l_ptr;
@@ -968,10 +935,10 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
if (l_ptr)
res = tipc_link_send_buf(l_ptr, buf);
else
- buf_discard(buf);
+ kfree_skb(buf);
tipc_node_unlock(n_ptr);
} else {
- buf_discard(buf);
+ kfree_skb(buf);
}
read_unlock_bh(&tipc_net_lock);
return res;
@@ -986,7 +953,6 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
* small enough not to require fragmentation.
* Called without any locks held.
*/
-
void tipc_link_send_names(struct list_head *message_list, u32 dest)
{
struct tipc_node *n_ptr;
@@ -1015,10 +981,9 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest)
read_unlock_bh(&tipc_net_lock);
/* discard the messages if they couldn't be sent */
-
list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
list_del((struct list_head *)buf);
- buf_discard(buf);
+ kfree_skb(buf);
}
}
@@ -1028,7 +993,6 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest)
* inclusive total message length. Very time critical.
* Link is locked. Returns user data length.
*/
-
static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
u32 *used_max_pkt)
{
@@ -1113,7 +1077,6 @@ again:
* Try building message using port's max_pkt hint.
* (Must not hold any locks while building message.)
*/
-
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
sender->max_pkt, !sender->user_port, &buf);
@@ -1133,12 +1096,10 @@ exit:
}
/* Exit if build request was invalid */
-
if (unlikely(res < 0))
goto exit;
/* Exit if link (or bearer) is congested */
-
if (link_congested(l_ptr) ||
!list_empty(&l_ptr->b_ptr->cong_links)) {
res = link_schedule_port(l_ptr,
@@ -1150,7 +1111,6 @@ exit:
* Message size exceeds max_pkt hint; update hint,
* then re-try fast path or fragment the message
*/
-
sender->max_pkt = l_ptr->max_pkt;
tipc_node_unlock(node);
read_unlock_bh(&tipc_net_lock);
@@ -1168,7 +1128,6 @@ exit:
read_unlock_bh(&tipc_net_lock);
/* Couldn't find a link to the destination node */
-
if (buf)
return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
if (res >= 0)
@@ -1222,15 +1181,13 @@ again:
sect_crs = NULL;
curr_sect = -1;
- /* Prepare reusable fragment header: */
-
+ /* Prepare reusable fragment header */
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, msg_destnode(hdr));
msg_set_size(&fragm_hdr, max_pkt);
msg_set_fragm_no(&fragm_hdr, 1);
- /* Prepare header of first fragment: */
-
+ /* Prepare header of first fragment */
buf_chain = buf = tipc_buf_acquire(max_pkt);
if (!buf)
return -ENOMEM;
@@ -1239,8 +1196,7 @@ again:
hsz = msg_hdr_sz(hdr);
skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
- /* Chop up message: */
-
+ /* Chop up message */
fragm_crs = INT_H_SIZE + hsz;
fragm_rest = fragm_sz - hsz;
@@ -1262,7 +1218,7 @@ again:
error:
for (; buf_chain; buf_chain = buf) {
buf = buf_chain->next;
- buf_discard(buf_chain);
+ kfree_skb(buf_chain);
}
return -EFAULT;
}
@@ -1316,7 +1272,7 @@ error:
tipc_node_unlock(node);
for (; buf_chain; buf_chain = buf) {
buf = buf_chain->next;
- buf_discard(buf_chain);
+ kfree_skb(buf_chain);
}
goto again;
}
@@ -1324,14 +1280,13 @@ error:
reject:
for (; buf_chain; buf_chain = buf) {
buf = buf_chain->next;
- buf_discard(buf_chain);
+ kfree_skb(buf_chain);
}
return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
total_len, TIPC_ERR_NO_NODE);
}
/* Append chain of fragments to send queue & send them */
-
l_ptr->long_msg_seq_no++;
link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
l_ptr->stats.sent_fragments += fragm_no;
@@ -1352,7 +1307,6 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
/* Step to position where retransmission failed, if any, */
/* consider that buffers may have been released in meantime */
-
if (r_q_size && buf) {
u32 last = lesser(mod(r_q_head + r_q_size),
link_last_sent(l_ptr));
@@ -1367,7 +1321,6 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
}
/* Continue retransmission now, if there is anything: */
-
if (r_q_size && buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
@@ -1383,14 +1336,13 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
}
/* Send deferred protocol message, if any: */
-
buf = l_ptr->proto_msg_queue;
if (buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
l_ptr->unacked_window = 0;
- buf_discard(buf);
+ kfree_skb(buf);
l_ptr->proto_msg_queue = NULL;
return 0;
} else {
@@ -1400,7 +1352,6 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
}
/* Send one deferred data message, if send window not full: */
-
buf = l_ptr->next_out;
if (buf) {
struct tipc_msg *msg = buf_msg(buf);
@@ -1480,16 +1431,12 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
warn("Retransmission failure on link <%s>\n", l_ptr->name);
if (l_ptr->addr) {
-
/* Handle failure on standard link */
-
link_print(l_ptr, "Resetting link\n");
tipc_link_reset(l_ptr);
} else {
-
/* Handle failure on broadcast link */
-
struct tipc_node *n_ptr;
char addr_string[16];
@@ -1501,13 +1448,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
tipc_node_lock(n_ptr);
tipc_addr_string_fill(addr_string, n_ptr->addr);
- info("Multicast link info for %s\n", addr_string);
+ info("Broadcast link info for %s\n", addr_string);
+ info("Supportable: %d, ", n_ptr->bclink.supportable);
info("Supported: %d, ", n_ptr->bclink.supported);
info("Acked: %u\n", n_ptr->bclink.acked);
info("Last in: %u, ", n_ptr->bclink.last_in);
- info("Gap after: %u, ", n_ptr->bclink.gap_after);
- info("Gap to: %u\n", n_ptr->bclink.gap_to);
- info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
+ info("Oos state: %u, ", n_ptr->bclink.oos_state);
+ info("Last sent: %u\n", n_ptr->bclink.last_sent);
tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
@@ -1538,7 +1485,6 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
return;
} else {
/* Detect repeated retransmit failures on uncongested bearer */
-
if (l_ptr->last_retransmitted == msg_seqno(msg)) {
if (++l_ptr->stale_count > 100) {
link_retransmit_failure(l_ptr, buf);
@@ -1573,7 +1519,6 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
/**
* link_insert_deferred_queue - insert deferred messages back into receive chain
*/
-
static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
struct sk_buff *buf)
{
@@ -1604,7 +1549,6 @@ static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
* TIPC will ignore the excess, under the assumption that it is optional info
* introduced by a later release of the protocol.
*/
-
static int link_recv_buf_validate(struct sk_buff *buf)
{
static u32 min_data_hdr_size[8] = {
@@ -1650,7 +1594,6 @@ static int link_recv_buf_validate(struct sk_buff *buf)
* Invoked with no locks held. Bearer pointer must point to a valid bearer
* structure (i.e. cannot be NULL), but bearer can be inactive.
*/
-
void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
{
read_lock_bh(&tipc_net_lock);
@@ -1668,22 +1611,18 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
head = head->next;
/* Ensure bearer is still enabled */
-
if (unlikely(!b_ptr->active))
goto cont;
/* Ensure message is well-formed */
-
if (unlikely(!link_recv_buf_validate(buf)))
goto cont;
/* Ensure message data is a single contiguous unit */
-
- if (unlikely(buf_linearize(buf)))
+ if (unlikely(skb_linearize(buf)))
goto cont;
/* Handle arrival of a non-unicast link message */
-
msg = buf_msg(buf);
if (unlikely(msg_non_seq(msg))) {
@@ -1695,20 +1634,17 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
}
/* Discard unicast link messages destined for another node */
-
if (unlikely(!msg_short(msg) &&
(msg_destnode(msg) != tipc_own_addr)))
goto cont;
/* Locate neighboring node that sent message */
-
n_ptr = tipc_node_find(msg_prevnode(msg));
if (unlikely(!n_ptr))
goto cont;
tipc_node_lock(n_ptr);
/* Locate unicast link endpoint that should handle message */
-
l_ptr = n_ptr->links[b_ptr->identity];
if (unlikely(!l_ptr)) {
tipc_node_unlock(n_ptr);
@@ -1716,7 +1652,6 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
}
/* Verify that communication with node is currently allowed */
-
if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
msg_user(msg) == LINK_PROTOCOL &&
(msg_type(msg) == RESET_MSG ||
@@ -1730,13 +1665,11 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
}
/* Validate message sequence number info */
-
seq_no = msg_seqno(msg);
ackd = msg_ack(msg);
/* Release acked messages */
-
- if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
+ if (n_ptr->bclink.supported)
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
crs = l_ptr->first_out;
@@ -1744,7 +1677,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
less_eq(buf_seqno(crs), ackd)) {
struct sk_buff *next = crs->next;
- buf_discard(crs);
+ kfree_skb(crs);
crs = next;
released++;
}
@@ -1754,7 +1687,6 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
}
/* Try sending any messages link endpoint has pending */
-
if (unlikely(l_ptr->next_out))
tipc_link_push_queue(l_ptr);
if (unlikely(!list_empty(&l_ptr->waiting_ports)))
@@ -1765,7 +1697,6 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
}
/* Now (finally!) process the incoming message */
-
protocol_check:
if (likely(link_working_working(l_ptr))) {
if (likely(seq_no == mod(l_ptr->next_in_no))) {
@@ -1773,52 +1704,56 @@ protocol_check:
if (unlikely(l_ptr->oldest_deferred_in))
head = link_insert_deferred_queue(l_ptr,
head);
- if (likely(msg_is_dest(msg, tipc_own_addr))) {
deliver:
- if (likely(msg_isdata(msg))) {
- tipc_node_unlock(n_ptr);
- tipc_port_recv_msg(buf);
- continue;
+ if (likely(msg_isdata(msg))) {
+ tipc_node_unlock(n_ptr);
+ tipc_port_recv_msg(buf);
+ continue;
+ }
+ switch (msg_user(msg)) {
+ int ret;
+ case MSG_BUNDLER:
+ l_ptr->stats.recv_bundles++;
+ l_ptr->stats.recv_bundled +=
+ msg_msgcnt(msg);
+ tipc_node_unlock(n_ptr);
+ tipc_link_recv_bundle(buf);
+ continue;
+ case NAME_DISTRIBUTOR:
+ tipc_node_unlock(n_ptr);
+ tipc_named_recv(buf);
+ continue;
+ case CONN_MANAGER:
+ tipc_node_unlock(n_ptr);
+ tipc_port_recv_proto_msg(buf);
+ continue;
+ case MSG_FRAGMENTER:
+ l_ptr->stats.recv_fragments++;
+ ret = tipc_link_recv_fragment(
+ &l_ptr->defragm_buf,
+ &buf, &msg);
+ if (ret == 1) {
+ l_ptr->stats.recv_fragmented++;
+ goto deliver;
}
- switch (msg_user(msg)) {
- case MSG_BUNDLER:
- l_ptr->stats.recv_bundles++;
- l_ptr->stats.recv_bundled +=
- msg_msgcnt(msg);
- tipc_node_unlock(n_ptr);
- tipc_link_recv_bundle(buf);
- continue;
- case NAME_DISTRIBUTOR:
- tipc_node_unlock(n_ptr);
- tipc_named_recv(buf);
- continue;
- case CONN_MANAGER:
- tipc_node_unlock(n_ptr);
- tipc_port_recv_proto_msg(buf);
- continue;
- case MSG_FRAGMENTER:
- l_ptr->stats.recv_fragments++;
- if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
- &buf, &msg)) {
- l_ptr->stats.recv_fragmented++;
+ if (ret == -1)
+ l_ptr->next_in_no--;
+ break;
+ case CHANGEOVER_PROTOCOL:
+ type = msg_type(msg);
+ if (link_recv_changeover_msg(&l_ptr,
+ &buf)) {
+ msg = buf_msg(buf);
+ seq_no = msg_seqno(msg);
+ if (type == ORIGINAL_MSG)
goto deliver;
- }
- break;
- case CHANGEOVER_PROTOCOL:
- type = msg_type(msg);
- if (link_recv_changeover_msg(&l_ptr, &buf)) {
- msg = buf_msg(buf);
- seq_no = msg_seqno(msg);
- if (type == ORIGINAL_MSG)
- goto deliver;
- goto protocol_check;
- }
- break;
- default:
- buf_discard(buf);
- buf = NULL;
- break;
+ goto protocol_check;
}
+ break;
+ default:
+ kfree_skb(buf);
+ buf = NULL;
+ break;
}
tipc_node_unlock(n_ptr);
tipc_net_route_msg(buf);
@@ -1847,23 +1782,21 @@ deliver:
}
tipc_node_unlock(n_ptr);
cont:
- buf_discard(buf);
+ kfree_skb(buf);
}
read_unlock_bh(&tipc_net_lock);
}
/*
- * link_defer_buf(): Sort a received out-of-sequence packet
- * into the deferred reception queue.
- * Returns the increase of the queue length,i.e. 0 or 1
+ * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
+ *
+ * Returns increase in queue length (i.e. 0 or 1)
*/
-
-u32 tipc_link_defer_pkt(struct sk_buff **head,
- struct sk_buff **tail,
+u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff *buf)
{
- struct sk_buff *prev = NULL;
- struct sk_buff *crs = *head;
+ struct sk_buff *queue_buf;
+ struct sk_buff **prev;
u32 seq_no = buf_seqno(buf);
buf->next = NULL;
@@ -1881,34 +1814,32 @@ u32 tipc_link_defer_pkt(struct sk_buff **head,
return 1;
}
- /* Scan through queue and sort it in */
- do {
- struct tipc_msg *msg = buf_msg(crs);
+ /* Locate insertion point in queue, then insert; discard if duplicate */
+ prev = head;
+ queue_buf = *head;
+ for (;;) {
+ u32 curr_seqno = buf_seqno(queue_buf);
- if (less(seq_no, msg_seqno(msg))) {
- buf->next = crs;
- if (prev)
- prev->next = buf;
- else
- *head = buf;
- return 1;
+ if (seq_no == curr_seqno) {
+ kfree_skb(buf);
+ return 0;
}
- if (seq_no == msg_seqno(msg))
+
+ if (less(seq_no, curr_seqno))
break;
- prev = crs;
- crs = crs->next;
- } while (crs);
- /* Message is a duplicate of an existing message */
+ prev = &queue_buf->next;
+ queue_buf = queue_buf->next;
+ }
- buf_discard(buf);
- return 0;
+ buf->next = queue_buf;
+ *prev = buf;
+ return 1;
}
-/**
+/*
* link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
*/
-
static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
struct sk_buff *buf)
{
@@ -1920,17 +1851,15 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
}
/* Record OOS packet arrival (force mismatch on next timeout) */
-
l_ptr->checkpoint--;
/*
* Discard packet if a duplicate; otherwise add it to deferred queue
* and notify peer of gap as per protocol specification
*/
-
if (less(seq_no, mod(l_ptr->next_in_no))) {
l_ptr->stats.duplicates++;
- buf_discard(buf);
+ kfree_skb(buf);
return;
}
@@ -1956,17 +1885,23 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
u32 msg_size = sizeof(l_ptr->proto_msg);
int r_flag;
+ /* Discard any previous message that was deferred due to congestion */
+ if (l_ptr->proto_msg_queue) {
+ kfree_skb(l_ptr->proto_msg_queue);
+ l_ptr->proto_msg_queue = NULL;
+ }
+
if (link_blocked(l_ptr))
return;
/* Abort non-RESET send if communication with node is prohibited */
-
if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
return;
+ /* Create protocol message with "out-of-sequence" sequence number */
msg_set_type(msg, msg_typ);
msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
- msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
+ msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
if (msg_typ == STATE_MSG) {
@@ -2020,44 +1955,33 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
msg_set_redundant_link(msg, r_flag);
msg_set_linkprio(msg, l_ptr->priority);
-
- /* Ensure sequence number will not fit : */
+ msg_set_size(msg, msg_size);
msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
- /* Congestion? */
-
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
- if (!l_ptr->proto_msg_queue) {
- l_ptr->proto_msg_queue =
- tipc_buf_acquire(sizeof(l_ptr->proto_msg));
- }
- buf = l_ptr->proto_msg_queue;
- if (!buf)
- return;
- skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
- return;
- }
-
- /* Message can be sent */
-
buf = tipc_buf_acquire(msg_size);
if (!buf)
return;
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
- msg_set_size(buf_msg(buf), msg_size);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- l_ptr->unacked_window = 0;
- buf_discard(buf);
+ /* Defer message if bearer is already congested */
+ if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
+ l_ptr->proto_msg_queue = buf;
+ return;
+ }
+
+ /* Defer message if attempting to send results in bearer congestion */
+ if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
+ tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
+ l_ptr->proto_msg_queue = buf;
+ l_ptr->stats.bearer_congs++;
return;
}
- /* New congestion */
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->proto_msg_queue = buf;
- l_ptr->stats.bearer_congs++;
+ /* Discard message if it was sent successfully */
+ l_ptr->unacked_window = 0;
+ kfree_skb(buf);
}
/*
@@ -2065,7 +1989,6 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
* Note that network plane id propagates through the network, and may
* change at any time. The node with lowest address rules
*/
-
static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
{
u32 rec_gap = 0;
@@ -2078,7 +2001,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
goto exit;
/* record unnumbered packet arrival (force mismatch on next timeout) */
-
l_ptr->checkpoint--;
if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
@@ -2105,10 +2027,11 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
l_ptr->owner->block_setup = WAIT_NODE_DOWN;
}
+ link_state_event(l_ptr, RESET_MSG);
+
/* fall thru' */
case ACTIVATE_MSG:
/* Update link settings according other endpoint's values */
-
strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
msg_tol = msg_link_tolerance(msg);
@@ -2127,16 +2050,21 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
} else {
l_ptr->max_pkt = l_ptr->max_pkt_target;
}
- l_ptr->owner->bclink.supported = (max_pkt_info != 0);
+ l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
- link_state_event(l_ptr, msg_type(msg));
+ /* Synchronize broadcast link info, if not done previously */
+ if (!tipc_node_is_up(l_ptr->owner)) {
+ l_ptr->owner->bclink.last_sent =
+ l_ptr->owner->bclink.last_in =
+ msg_last_bcast(msg);
+ l_ptr->owner->bclink.oos_state = 0;
+ }
l_ptr->peer_session = msg_session(msg);
l_ptr->peer_bearer_id = msg_bearer_id(msg);
- /* Synchronize broadcast sequence numbers */
- if (!tipc_node_redundant_links(l_ptr->owner))
- l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
+ if (msg_type(msg) == ACTIVATE_MSG)
+ link_state_event(l_ptr, ACTIVATE_MSG);
break;
case STATE_MSG:
@@ -2176,8 +2104,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
}
/* Protocol message before retransmits, reduce loss risk */
-
- tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
+ if (l_ptr->owner->bclink.supported)
+ tipc_bclink_update_link_state(l_ptr->owner,
+ msg_last_bcast(msg));
if (rec_gap || (msg_probe(msg))) {
tipc_link_send_proto_msg(l_ptr, STATE_MSG,
@@ -2191,7 +2120,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
break;
}
exit:
- buf_discard(buf);
+ kfree_skb(buf);
}
@@ -2232,7 +2161,6 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr,
* changeover(): Send whole message queue via the remaining link
* Owner node is locked.
*/
-
void tipc_link_changeover(struct tipc_link *l_ptr)
{
u32 msgcount = l_ptr->out_queue_size;
@@ -2332,8 +2260,6 @@ void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
}
}
-
-
/**
* buf_extract - extracts embedded TIPC message from another message
* @skb: encapsulating message buffer
@@ -2342,7 +2268,6 @@ void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
* Returns a new message buffer containing an embedded message. The
* encapsulating message itself is left unchanged.
*/
-
static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
{
struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
@@ -2359,7 +2284,6 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
* link_recv_changeover_msg(): Receive tunneled packet sent
* via other link. Node is locked. Return extracted buffer.
*/
-
static int link_recv_changeover_msg(struct tipc_link **l_ptr,
struct sk_buff **buf)
{
@@ -2389,12 +2313,11 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
warn("Link changeover error, duplicate msg dropped\n");
goto exit;
}
- buf_discard(tunnel_buf);
+ kfree_skb(tunnel_buf);
return 1;
}
/* First original message ?: */
-
if (tipc_link_is_up(dest_link)) {
info("Resetting link <%s>, changeover initiated by peer\n",
dest_link->name);
@@ -2409,7 +2332,6 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
}
/* Receive original message */
-
if (dest_link->exp_msg_count == 0) {
warn("Link switchover error, "
"got too many tunnelled messages\n");
@@ -2421,7 +2343,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
} else {
*buf = buf_extract(tunnel_buf, INT_H_SIZE);
if (*buf != NULL) {
- buf_discard(tunnel_buf);
+ kfree_skb(tunnel_buf);
return 1;
} else {
warn("Link changeover error, original msg dropped\n");
@@ -2429,7 +2351,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
}
exit:
*buf = NULL;
- buf_discard(tunnel_buf);
+ kfree_skb(tunnel_buf);
return 0;
}
@@ -2451,14 +2373,13 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
pos += align(msg_size(buf_msg(obuf)));
tipc_net_route_msg(obuf);
}
- buf_discard(buf);
+ kfree_skb(buf);
}
/*
* Fragmentation/defragmentation:
*/
-
/*
* link_send_long_buf: Entry for buffers needing fragmentation.
* The buffer is complete, inclusive total message length.
@@ -2485,12 +2406,10 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
destaddr = msg_destnode(inmsg);
/* Prepare reusable fragment header: */
-
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, destaddr);
/* Chop up message: */
-
while (rest > 0) {
struct sk_buff *fragm;
@@ -2500,11 +2419,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
}
fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
if (fragm == NULL) {
- buf_discard(buf);
+ kfree_skb(buf);
while (buf_chain) {
buf = buf_chain;
buf_chain = buf_chain->next;
- buf_discard(buf);
+ kfree_skb(buf);
}
return -ENOMEM;
}
@@ -2521,10 +2440,9 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
crs += fragm_sz;
msg_set_type(&fragm_hdr, FRAGMENT);
}
- buf_discard(buf);
+ kfree_skb(buf);
/* Append chain of fragments to send queue & send them */
-
l_ptr->long_msg_seq_no++;
link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
l_ptr->stats.sent_fragments += fragm_no;
@@ -2540,7 +2458,6 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
* help storing these values in unused, available fields in the
* pending message. This makes dynamic memory allocation unnecessary.
*/
-
static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
{
msg_set_seqno(buf_msg(buf), seqno);
@@ -2592,7 +2509,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
*fb = NULL;
/* Is there an incomplete message waiting for this fragment? */
-
while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
prev = pbuf;
@@ -2608,7 +2524,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
if (msg_type(imsg) == TIPC_MCAST_MSG)
max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
if (msg_size(imsg) > max) {
- buf_discard(fbuf);
+ kfree_skb(fbuf);
return 0;
}
pbuf = tipc_buf_acquire(msg_size(imsg));
@@ -2618,14 +2534,15 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
skb_copy_to_linear_data(pbuf, imsg,
msg_data_sz(fragm));
/* Prepare buffer for subsequent fragments. */
-
set_long_msg_seqno(pbuf, long_msg_seq_no);
set_fragm_size(pbuf, fragm_sz);
set_expected_frags(pbuf, exp_fragm_cnt - 1);
} else {
- warn("Link unable to reassemble fragmented message\n");
+ dbg("Link unable to reassemble fragmented message\n");
+ kfree_skb(fbuf);
+ return -1;
}
- buf_discard(fbuf);
+ kfree_skb(fbuf);
return 0;
} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
u32 dsz = msg_data_sz(fragm);
@@ -2634,10 +2551,9 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
u32 exp_frags = get_expected_frags(pbuf) - 1;
skb_copy_to_linear_data_offset(pbuf, crs,
msg_data(fragm), dsz);
- buf_discard(fbuf);
+ kfree_skb(fbuf);
/* Is message complete? */
-
if (exp_frags == 0) {
if (prev)
prev->next = pbuf->next;
@@ -2651,7 +2567,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
set_expected_frags(pbuf, exp_frags);
return 0;
}
- buf_discard(fbuf);
+ kfree_skb(fbuf);
return 0;
}
@@ -2659,7 +2575,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
* link_check_defragm_bufs - flush stale incoming message fragments
* @l_ptr: pointer to link
*/
-
static void link_check_defragm_bufs(struct tipc_link *l_ptr)
{
struct sk_buff *prev = NULL;
@@ -2682,14 +2597,12 @@ static void link_check_defragm_bufs(struct tipc_link *l_ptr)
prev->next = buf->next;
else
l_ptr->defragm_buf = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
}
buf = next;
}
}
-
-
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
{
if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
@@ -2701,7 +2614,6 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
}
-
void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
{
/* Data messages from this node, inclusive FIRST_FRAGM */
@@ -2731,7 +2643,6 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
*
* Returns pointer to link (or 0 if invalid link name).
*/
-
static struct tipc_link *link_find_link(const char *name,
struct tipc_node **node)
{
@@ -2765,7 +2676,6 @@ static struct tipc_link *link_find_link(const char *name,
*
* Returns 1 if value is within range, 0 if not.
*/
-
static int link_value_is_valid(u16 cmd, u32 new_value)
{
switch (cmd) {
@@ -2781,7 +2691,6 @@ static int link_value_is_valid(u16 cmd, u32 new_value)
return 0;
}
-
/**
* link_cmd_set_value - change priority/tolerance/window for link/bearer/media
* @name - ptr to link, bearer, or media name
@@ -2792,7 +2701,6 @@ static int link_value_is_valid(u16 cmd, u32 new_value)
*
* Returns 0 if value updated and negative value on error.
*/
-
static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
{
struct tipc_node *node;
@@ -2897,7 +2805,6 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space
* link_reset_statistics - reset link statistics
* @l_ptr: pointer to link
*/
-
static void link_reset_statistics(struct tipc_link *l_ptr)
{
memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
@@ -2938,7 +2845,6 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
/**
* percent - convert count to a percentage of total (rounding up or down)
*/
-
static u32 percent(u32 count, u32 total)
{
return (count * 100 + (total / 2)) / total;
@@ -2952,7 +2858,6 @@ static u32 percent(u32 count, u32 total)
*
* Returns length of print buffer data string (or 0 if error)
*/
-
static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
{
struct print_buf pb;
@@ -3057,7 +2962,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s
str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
(char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
if (!str_len) {
- buf_discard(buf);
+ kfree_skb(buf);
return tipc_cfg_reply_error_string("link not found");
}
@@ -3074,7 +2979,6 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s
*
* If no active link can be found, uses default maximum packet size.
*/
-
u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
{
struct tipc_node *n_ptr;
@@ -3158,4 +3062,3 @@ print_state:
tipc_printbuf_validate(buf);
info("%s", print_area);
}
-
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 73c18c140e1..d6a60a963ce 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -47,13 +47,11 @@
/*
* Out-of-range value for link sequence numbers
*/
-
#define INVALID_LINK_SEQ 0x10000
/*
* Link states
*/
-
#define WORKING_WORKING 560810u
#define WORKING_UNKNOWN 560811u
#define RESET_UNKNOWN 560812u
@@ -63,7 +61,6 @@
* Starting value for maximum packet size negotiation on unicast links
* (unless bearer MTU is less)
*/
-
#define MAX_PKT_DEFAULT 1500
/**
@@ -114,7 +111,6 @@
* @defragm_buf: list of partially reassembled inbound message fragments
* @stats: collects statistics regarding link activity
*/
-
struct tipc_link {
u32 addr;
char name[TIPC_MAX_LINK_NAME];
@@ -255,7 +251,6 @@ void tipc_link_retransmit(struct tipc_link *l_ptr,
/*
* Link sequence number manipulation routines (uses modulo 2**16 arithmetic)
*/
-
static inline u32 buf_seqno(struct sk_buff *buf)
{
return msg_seqno(buf_msg(buf));
@@ -294,7 +289,6 @@ static inline u32 lesser(u32 left, u32 right)
/*
* Link status checking routines
*/
-
static inline int link_working_working(struct tipc_link *l_ptr)
{
return l_ptr->state == WORKING_WORKING;
diff --git a/net/tipc/log.c b/net/tipc/log.c
index 952c39f643e..026733f2491 100644
--- a/net/tipc/log.c
+++ b/net/tipc/log.c
@@ -47,7 +47,6 @@
*
* Additional user-defined print buffers are also permitted.
*/
-
static struct print_buf null_buf = { NULL, 0, NULL, 0 };
struct print_buf *const TIPC_NULL = &null_buf;
@@ -72,7 +71,6 @@ struct print_buf *const TIPC_LOG = &log_buf;
* on the caller to prevent simultaneous use of the print buffer(s) being
* manipulated.
*/
-
static char print_string[TIPC_PB_MAX_STR];
static DEFINE_SPINLOCK(print_lock);
@@ -97,7 +95,6 @@ static void tipc_printbuf_move(struct print_buf *pb_to,
* Note: If the character array is too small (or absent), the print buffer
* becomes a null device that discards anything written to it.
*/
-
void tipc_printbuf_init(struct print_buf *pb, char *raw, u32 size)
{
pb->buf = raw;
@@ -117,7 +114,6 @@ void tipc_printbuf_init(struct print_buf *pb, char *raw, u32 size)
* tipc_printbuf_reset - reinitialize print buffer to empty state
* @pb: pointer to print buffer structure
*/
-
static void tipc_printbuf_reset(struct print_buf *pb)
{
if (pb->buf) {
@@ -133,7 +129,6 @@ static void tipc_printbuf_reset(struct print_buf *pb)
*
* Returns non-zero if print buffer is empty.
*/
-
static int tipc_printbuf_empty(struct print_buf *pb)
{
return !pb->buf || (pb->crs == pb->buf);
@@ -148,7 +143,6 @@ static int tipc_printbuf_empty(struct print_buf *pb)
*
* Returns length of print buffer data string (including trailing NUL)
*/
-
int tipc_printbuf_validate(struct print_buf *pb)
{
char *err = "\n\n*** PRINT BUFFER OVERFLOW ***\n\n";
@@ -182,14 +176,12 @@ int tipc_printbuf_validate(struct print_buf *pb)
* Current contents of destination print buffer (if any) are discarded.
* Source print buffer becomes empty if a successful move occurs.
*/
-
static void tipc_printbuf_move(struct print_buf *pb_to,
struct print_buf *pb_from)
{
int len;
/* Handle the cases where contents can't be moved */
-
if (!pb_to->buf)
return;
@@ -206,7 +198,6 @@ static void tipc_printbuf_move(struct print_buf *pb_to,
}
/* Copy data from char after cursor to end (if used) */
-
len = pb_from->buf + pb_from->size - pb_from->crs - 2;
if ((pb_from->buf[pb_from->size - 1] == 0) && (len > 0)) {
strcpy(pb_to->buf, pb_from->crs + 1);
@@ -215,7 +206,6 @@ static void tipc_printbuf_move(struct print_buf *pb_to,
pb_to->crs = pb_to->buf;
/* Copy data from start to cursor (always) */
-
len = pb_from->crs - pb_from->buf;
strcpy(pb_to->crs, pb_from->buf);
pb_to->crs += len;
@@ -228,7 +218,6 @@ static void tipc_printbuf_move(struct print_buf *pb_to,
* @pb: pointer to print buffer
* @fmt: formatted info to be printed
*/
-
void tipc_printf(struct print_buf *pb, const char *fmt, ...)
{
int chars_to_add;
@@ -270,7 +259,6 @@ void tipc_printf(struct print_buf *pb, const char *fmt, ...)
* tipc_log_resize - change the size of the TIPC log buffer
* @log_size: print buffer size to use
*/
-
int tipc_log_resize(int log_size)
{
int res = 0;
@@ -295,7 +283,6 @@ int tipc_log_resize(int log_size)
/**
* tipc_log_resize_cmd - reconfigure size of TIPC log buffer
*/
-
struct sk_buff *tipc_log_resize_cmd(const void *req_tlv_area, int req_tlv_space)
{
u32 value;
@@ -304,7 +291,7 @@ struct sk_buff *tipc_log_resize_cmd(const void *req_tlv_area, int req_tlv_space)
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
- if (value != delimit(value, 0, 32768))
+ if (value > 32768)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (log size must be 0-32768)");
if (tipc_log_resize(value))
@@ -316,7 +303,6 @@ struct sk_buff *tipc_log_resize_cmd(const void *req_tlv_area, int req_tlv_space)
/**
* tipc_log_dump - capture TIPC log buffer contents in configuration message
*/
-
struct sk_buff *tipc_log_dump(void)
{
struct sk_buff *reply;
diff --git a/net/tipc/log.h b/net/tipc/log.h
index 2248d96238e..d1f5eb967fd 100644
--- a/net/tipc/log.h
+++ b/net/tipc/log.h
@@ -44,7 +44,6 @@
* @crs: pointer to first unused space in character array (i.e. final NUL)
* @echo: echo output to system console if non-zero
*/
-
struct print_buf {
char *buf;
u32 size;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 3e4d3e29be6..deea0d232dc 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -72,7 +72,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
*
* Returns message data size or errno
*/
-
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
u32 num_sect, unsigned int total_len,
int max_size, int usrmem, struct sk_buff **buf)
@@ -106,13 +105,12 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
if (likely(res))
return dsz;
- buf_discard(*buf);
+ kfree_skb(*buf);
*buf = NULL;
return -EFAULT;
}
#ifdef CONFIG_TIPC_DEBUG
-
void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
{
u32 usr = msg_user(msg);
@@ -352,5 +350,4 @@ void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
if ((usr == MSG_FRAGMENTER) && (msg_type(msg) == FIRST_FRAGMENT))
tipc_msg_dbg(buf, msg_get_wrapped(msg), " /");
}
-
#endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7b0cda16710..ba2a72beea6 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -44,7 +44,6 @@
*
* Note: Some items are also used with TIPC internal message headers
*/
-
#define TIPC_VERSION 2
/*
@@ -58,7 +57,6 @@
/*
* Payload message types
*/
-
#define TIPC_CONN_MSG 0
#define TIPC_MCAST_MSG 1
#define TIPC_NAMED_MSG 2
@@ -67,7 +65,6 @@
/*
* Message header sizes
*/
-
#define SHORT_H_SIZE 24 /* In-cluster basic payload message */
#define BASIC_H_SIZE 32 /* Basic payload message */
#define NAMED_H_SIZE 40 /* Named payload message */
@@ -121,7 +118,6 @@ static inline void msg_swap_words(struct tipc_msg *msg, u32 a, u32 b)
/*
* Word 0
*/
-
static inline u32 msg_version(struct tipc_msg *m)
{
return msg_bits(m, 0, 29, 7);
@@ -216,7 +212,6 @@ static inline void msg_set_size(struct tipc_msg *m, u32 sz)
/*
* Word 1
*/
-
static inline u32 msg_type(struct tipc_msg *m)
{
return msg_bits(m, 1, 29, 0x7);
@@ -291,7 +286,6 @@ static inline void msg_set_bcast_ack(struct tipc_msg *m, u32 n)
/*
* Word 2
*/
-
static inline u32 msg_ack(struct tipc_msg *m)
{
return msg_bits(m, 2, 16, 0xffff);
@@ -315,8 +309,6 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)
/*
* Words 3-10
*/
-
-
static inline u32 msg_prevnode(struct tipc_msg *m)
{
return msg_word(m, 3);
@@ -384,11 +376,6 @@ static inline void msg_set_destnode(struct tipc_msg *m, u32 a)
msg_set_word(m, 7, a);
}
-static inline int msg_is_dest(struct tipc_msg *m, u32 d)
-{
- return msg_short(m) || (msg_destnode(m) == d);
-}
-
static inline u32 msg_nametype(struct tipc_msg *m)
{
return msg_word(m, 8);
@@ -439,7 +426,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
return (struct tipc_msg *)msg_data(m);
}
-
/*
* Constants and routines used to read and write TIPC internal message headers
*/
@@ -447,7 +433,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
* Internal message users
*/
-
#define BCAST_PROTOCOL 5
#define MSG_BUNDLER 6
#define LINK_PROTOCOL 7
@@ -461,7 +446,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
* Connection management protocol message types
*/
-
#define CONN_PROBE 0
#define CONN_PROBE_REPLY 1
#define CONN_ACK 2
@@ -469,14 +453,12 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
* Name distributor message types
*/
-
#define PUBLICATION 0
#define WITHDRAWAL 1
/*
* Segmentation message types
*/
-
#define FIRST_FRAGMENT 0
#define FRAGMENT 1
#define LAST_FRAGMENT 2
@@ -484,7 +466,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
* Link management protocol message types
*/
-
#define STATE_MSG 0
#define RESET_MSG 1
#define ACTIVATE_MSG 2
@@ -498,7 +479,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
* Config protocol message types
*/
-
#define DSC_REQ_MSG 0
#define DSC_RESP_MSG 1
@@ -506,7 +486,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
* Word 1
*/
-
static inline u32 msg_seq_gap(struct tipc_msg *m)
{
return msg_bits(m, 1, 16, 0x1fff);
@@ -517,11 +496,20 @@ static inline void msg_set_seq_gap(struct tipc_msg *m, u32 n)
msg_set_bits(m, 1, 16, 0x1fff, n);
}
+static inline u32 msg_node_sig(struct tipc_msg *m)
+{
+ return msg_bits(m, 1, 0, 0xffff);
+}
+
+static inline void msg_set_node_sig(struct tipc_msg *m, u32 n)
+{
+ msg_set_bits(m, 1, 0, 0xffff, n);
+}
+
/*
* Word 2
*/
-
static inline u32 msg_dest_domain(struct tipc_msg *m)
{
return msg_word(m, 2);
@@ -556,7 +544,6 @@ static inline void msg_set_bcgap_to(struct tipc_msg *m, u32 n)
/*
* Word 4
*/
-
static inline u32 msg_last_bcast(struct tipc_msg *m)
{
return msg_bits(m, 4, 16, 0xffff);
@@ -623,7 +610,6 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
/*
* Word 5
*/
-
static inline u32 msg_session(struct tipc_msg *m)
{
return msg_bits(m, 5, 16, 0xffff);
@@ -692,7 +678,6 @@ static inline char *msg_media_addr(struct tipc_msg *m)
/*
* Word 9
*/
-
static inline u32 msg_msgcnt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
@@ -739,5 +724,4 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
u32 num_sect, unsigned int total_len,
int max_size, int usrmem, struct sk_buff **buf);
-
#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 98ebb37f180..158318e67b0 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -58,7 +58,6 @@
* Note: There is no field that identifies the publishing node because it is
* the same for all items contained within a publication message.
*/
-
struct distr_item {
__be32 type;
__be32 lower;
@@ -68,17 +67,41 @@ struct distr_item {
};
/**
- * List of externally visible publications by this node --
- * that is, all publications having scope > TIPC_NODE_SCOPE.
+ * struct publ_list - list of publications made by this node
+ * @list: circular list of publications
+ * @list_size: number of entries in list
*/
+struct publ_list {
+ struct list_head list;
+ u32 size;
+};
+
+static struct publ_list publ_zone = {
+ .list = LIST_HEAD_INIT(publ_zone.list),
+ .size = 0,
+};
+
+static struct publ_list publ_cluster = {
+ .list = LIST_HEAD_INIT(publ_cluster.list),
+ .size = 0,
+};
+
+static struct publ_list publ_node = {
+ .list = LIST_HEAD_INIT(publ_node.list),
+ .size = 0,
+};
+
+static struct publ_list *publ_lists[] = {
+ NULL,
+ &publ_zone, /* publ_lists[TIPC_ZONE_SCOPE] */
+ &publ_cluster, /* publ_lists[TIPC_CLUSTER_SCOPE] */
+ &publ_node /* publ_lists[TIPC_NODE_SCOPE] */
+};
-static LIST_HEAD(publ_root);
-static u32 publ_cnt;
/**
* publ_to_item - add publication info to a publication message
*/
-
static void publ_to_item(struct distr_item *i, struct publication *p)
{
i->type = htonl(p->type);
@@ -91,7 +114,6 @@ static void publ_to_item(struct distr_item *i, struct publication *p)
/**
* named_prepare_buf - allocate & initialize a publication message
*/
-
static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
{
struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size);
@@ -120,20 +142,22 @@ static void named_cluster_distribute(struct sk_buff *buf)
}
}
- buf_discard(buf);
+ kfree_skb(buf);
}
/**
* tipc_named_publish - tell other nodes about a new publication by this node
*/
-
void tipc_named_publish(struct publication *publ)
{
struct sk_buff *buf;
struct distr_item *item;
- list_add_tail(&publ->local_list, &publ_root);
- publ_cnt++;
+ list_add_tail(&publ->local_list, &publ_lists[publ->scope]->list);
+ publ_lists[publ->scope]->size++;
+
+ if (publ->scope == TIPC_NODE_SCOPE)
+ return;
buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0);
if (!buf) {
@@ -149,14 +173,16 @@ void tipc_named_publish(struct publication *publ)
/**
* tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
*/
-
void tipc_named_withdraw(struct publication *publ)
{
struct sk_buff *buf;
struct distr_item *item;
list_del(&publ->local_list);
- publ_cnt--;
+ publ_lists[publ->scope]->size--;
+
+ if (publ->scope == TIPC_NODE_SCOPE)
+ return;
buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0);
if (!buf) {
@@ -169,25 +195,51 @@ void tipc_named_withdraw(struct publication *publ)
named_cluster_distribute(buf);
}
+/*
+ * named_distribute - prepare name info for bulk distribution to another node
+ */
+static void named_distribute(struct list_head *message_list, u32 node,
+ struct publ_list *pls, u32 max_item_buf)
+{
+ struct publication *publ;
+ struct sk_buff *buf = NULL;
+ struct distr_item *item = NULL;
+ u32 left = 0;
+ u32 rest = pls->size * ITEM_SIZE;
+
+ list_for_each_entry(publ, &pls->list, local_list) {
+ if (!buf) {
+ left = (rest <= max_item_buf) ? rest : max_item_buf;
+ rest -= left;
+ buf = named_prepare_buf(PUBLICATION, left, node);
+ if (!buf) {
+ warn("Bulk publication failure\n");
+ return;
+ }
+ item = (struct distr_item *)msg_data(buf_msg(buf));
+ }
+ publ_to_item(item, publ);
+ item++;
+ left -= ITEM_SIZE;
+ if (!left) {
+ list_add_tail((struct list_head *)buf, message_list);
+ buf = NULL;
+ }
+ }
+}
+
/**
* tipc_named_node_up - tell specified node about all publications by this node
*/
-
void tipc_named_node_up(unsigned long nodearg)
{
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
- struct publication *publ;
- struct distr_item *item = NULL;
- struct sk_buff *buf = NULL;
struct list_head message_list;
u32 node = (u32)nodearg;
- u32 left = 0;
- u32 rest;
u32 max_item_buf = 0;
/* compute maximum amount of publication data to send per message */
-
read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(node);
if (n_ptr) {
@@ -203,32 +255,11 @@ void tipc_named_node_up(unsigned long nodearg)
return;
/* create list of publication messages, then send them as a unit */
-
INIT_LIST_HEAD(&message_list);
read_lock_bh(&tipc_nametbl_lock);
- rest = publ_cnt * ITEM_SIZE;
-
- list_for_each_entry(publ, &publ_root, local_list) {
- if (!buf) {
- left = (rest <= max_item_buf) ? rest : max_item_buf;
- rest -= left;
- buf = named_prepare_buf(PUBLICATION, left, node);
- if (!buf) {
- warn("Bulk publication distribution failure\n");
- goto exit;
- }
- item = (struct distr_item *)msg_data(buf_msg(buf));
- }
- publ_to_item(item, publ);
- item++;
- left -= ITEM_SIZE;
- if (!left) {
- list_add_tail((struct list_head *)buf, &message_list);
- buf = NULL;
- }
- }
-exit:
+ named_distribute(&message_list, node, &publ_cluster, max_item_buf);
+ named_distribute(&message_list, node, &publ_zone, max_item_buf);
read_unlock_bh(&tipc_nametbl_lock);
tipc_link_send_names(&message_list, (u32)node);
@@ -239,17 +270,12 @@ exit:
*
* Invoked for each publication issued by a newly failed node.
* Removes publication structure from name table & deletes it.
- * In rare cases the link may have come back up again when this
- * function is called, and we have two items representing the same
- * publication. Nudge this item's key to distinguish it from the other.
*/
-
static void named_purge_publ(struct publication *publ)
{
struct publication *p;
write_lock_bh(&tipc_nametbl_lock);
- publ->key += 1222345;
p = tipc_nametbl_remove_publ(publ->type, publ->lower,
publ->node, publ->ref, publ->key);
if (p)
@@ -268,7 +294,6 @@ static void named_purge_publ(struct publication *publ)
/**
* tipc_named_recv - process name table update message sent by another node
*/
-
void tipc_named_recv(struct sk_buff *buf)
{
struct publication *publ;
@@ -316,25 +341,26 @@ void tipc_named_recv(struct sk_buff *buf)
item++;
}
write_unlock_bh(&tipc_nametbl_lock);
- buf_discard(buf);
+ kfree_skb(buf);
}
/**
- * tipc_named_reinit - re-initialize local publication list
+ * tipc_named_reinit - re-initialize local publications
*
* This routine is called whenever TIPC networking is enabled.
- * All existing publications by this node that have "cluster" or "zone" scope
- * are updated to reflect the node's new network address.
+ * All name table entries published by this node are updated to reflect
+ * the node's new network address.
*/
-
void tipc_named_reinit(void)
{
struct publication *publ;
+ int scope;
write_lock_bh(&tipc_nametbl_lock);
- list_for_each_entry(publ, &publ_root, local_list)
- publ->node = tipc_own_addr;
+ for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++)
+ list_for_each_entry(publ, &publ_lists[scope]->list, local_list)
+ publ->node = tipc_own_addr;
write_unlock_bh(&tipc_nametbl_lock);
}
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 89eb5621ebb..010f24a59da 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -56,7 +56,6 @@ static int tipc_nametbl_size = 1024; /* must be a power of 2 */
* publications of the associated name sequence belong to it.
* (The cluster and node lists may be empty.)
*/
-
struct name_info {
struct list_head node_list;
struct list_head cluster_list;
@@ -72,7 +71,6 @@ struct name_info {
* @upper: name sequence upper bound
* @info: pointer to name sequence publication info
*/
-
struct sub_seq {
u32 lower;
u32 upper;
@@ -90,7 +88,6 @@ struct sub_seq {
* @subscriptions: list of subscriptions for this 'type'
* @lock: spinlock controlling access to publication lists of all sub-sequences
*/
-
struct name_seq {
u32 type;
struct sub_seq *sseqs;
@@ -107,17 +104,14 @@ struct name_seq {
* accessed via hashing on 'type'; name sequence lists are *not* sorted
* @local_publ_count: number of publications issued by this node
*/
-
struct name_table {
struct hlist_head *types;
u32 local_publ_count;
};
static struct name_table table;
-static atomic_t rsv_publ_ok = ATOMIC_INIT(0);
DEFINE_RWLOCK(tipc_nametbl_lock);
-
static int hash(int x)
{
return x & (tipc_nametbl_size - 1);
@@ -126,7 +120,6 @@ static int hash(int x)
/**
* publ_create - create a publication structure
*/
-
static struct publication *publ_create(u32 type, u32 lower, u32 upper,
u32 scope, u32 node, u32 port_ref,
u32 key)
@@ -153,7 +146,6 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
/**
* tipc_subseq_alloc - allocate a specified number of sub-sequence structures
*/
-
static struct sub_seq *tipc_subseq_alloc(u32 cnt)
{
struct sub_seq *sseq = kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC);
@@ -165,7 +157,6 @@ static struct sub_seq *tipc_subseq_alloc(u32 cnt)
*
* Allocates a single sub-sequence structure and sets it to all 0's.
*/
-
static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head)
{
struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC);
@@ -188,12 +179,23 @@ static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_hea
return nseq;
}
-/**
+/*
+ * nameseq_delete_empty - deletes a name sequence structure if now unused
+ */
+static void nameseq_delete_empty(struct name_seq *seq)
+{
+ if (!seq->first_free && list_empty(&seq->subscriptions)) {
+ hlist_del_init(&seq->ns_list);
+ kfree(seq->sseqs);
+ kfree(seq);
+ }
+}
+
+/*
* nameseq_find_subseq - find sub-sequence (if any) matching a name instance
*
* Very time-critical, so binary searches through sub-sequence array.
*/
-
static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq,
u32 instance)
{
@@ -223,7 +225,6 @@ static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq,
*
* Note: Similar to binary search code for locating a sub-sequence.
*/
-
static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
{
struct sub_seq *sseqs = nseq->sseqs;
@@ -244,9 +245,8 @@ static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
}
/**
- * tipc_nameseq_insert_publ -
+ * tipc_nameseq_insert_publ
*/
-
static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
u32 type, u32 lower, u32 upper,
u32 scope, u32 node, u32 port, u32 key)
@@ -262,7 +262,6 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
if (sseq) {
/* Lower end overlaps existing entry => need an exact match */
-
if ((sseq->lower != lower) || (sseq->upper != upper)) {
warn("Cannot publish {%u,%u,%u}, overlap error\n",
type, lower, upper);
@@ -270,16 +269,21 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
}
info = sseq->info;
+
+ /* Check if an identical publication already exists */
+ list_for_each_entry(publ, &info->zone_list, zone_list) {
+ if ((publ->ref == port) && (publ->key == key) &&
+ (!publ->node || (publ->node == node)))
+ return NULL;
+ }
} else {
u32 inspos;
struct sub_seq *freesseq;
/* Find where lower end should be inserted */
-
inspos = nameseq_locate_subseq(nseq, lower);
/* Fail if upper end overlaps into an existing entry */
-
if ((inspos < nseq->first_free) &&
(upper >= nseq->sseqs[inspos].lower)) {
warn("Cannot publish {%u,%u,%u}, overlap error\n",
@@ -288,7 +292,6 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
}
/* Ensure there is space for new sub-sequence */
-
if (nseq->first_free == nseq->alloc) {
struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2);
@@ -316,7 +319,6 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
INIT_LIST_HEAD(&info->zone_list);
/* Insert new sub-sequence */
-
sseq = &nseq->sseqs[inspos];
freesseq = &nseq->sseqs[nseq->first_free];
memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof(*sseq));
@@ -328,8 +330,7 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
created_subseq = 1;
}
- /* Insert a publication: */
-
+ /* Insert a publication */
publ = publ_create(type, lower, upper, scope, node, port, key);
if (!publ)
return NULL;
@@ -342,14 +343,12 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
info->cluster_list_size++;
}
- if (node == tipc_own_addr) {
+ if (in_own_node(node)) {
list_add(&publ->node_list, &info->node_list);
info->node_list_size++;
}
- /*
- * Any subscriptions waiting for notification?
- */
+ /* Any subscriptions waiting for notification? */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscr_report_overlap(s,
publ->lower,
@@ -363,7 +362,7 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
}
/**
- * tipc_nameseq_remove_publ -
+ * tipc_nameseq_remove_publ
*
* NOTE: There may be cases where TIPC is asked to remove a publication
* that is not in the name table. For example, if another node issues a
@@ -373,7 +372,6 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
* A failed withdraw request simply returns a failure indication and lets the
* caller issue any error or warning messages associated with such a problem.
*/
-
static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 inst,
u32 node, u32 ref, u32 key)
{
@@ -390,7 +388,6 @@ static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 i
info = sseq->info;
/* Locate publication, if it exists */
-
list_for_each_entry(publ, &info->zone_list, zone_list) {
if ((publ->key == key) && (publ->ref == ref) &&
(!publ->node || (publ->node == node)))
@@ -400,26 +397,22 @@ static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 i
found:
/* Remove publication from zone scope list */
-
list_del(&publ->zone_list);
info->zone_list_size--;
/* Remove publication from cluster scope list, if present */
-
if (in_own_cluster(node)) {
list_del(&publ->cluster_list);
info->cluster_list_size--;
}
/* Remove publication from node scope list, if present */
-
- if (node == tipc_own_addr) {
+ if (in_own_node(node)) {
list_del(&publ->node_list);
info->node_list_size--;
}
/* Contract subseq list if no more publications for that subseq */
-
if (list_empty(&info->zone_list)) {
kfree(info);
free = &nseq->sseqs[nseq->first_free--];
@@ -428,7 +421,6 @@ found:
}
/* Notify any waiting subscriptions */
-
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscr_report_overlap(s,
publ->lower,
@@ -447,7 +439,6 @@ found:
* the prescribed number of events if there is any sub-
* sequence overlapping with the requested sequence
*/
-
static void tipc_nameseq_subscribe(struct name_seq *nseq,
struct tipc_subscription *s)
{
@@ -499,9 +490,10 @@ struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
{
struct name_seq *seq = nametbl_find_seq(type);
- if (lower > upper) {
- warn("Failed to publish illegal {%u,%u,%u}\n",
- type, lower, upper);
+ if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE) ||
+ (lower > upper)) {
+ dbg("Failed to publish illegal {%u,%u,%u} with scope %u\n",
+ type, lower, upper, scope);
return NULL;
}
@@ -524,22 +516,23 @@ struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
return NULL;
publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key);
-
- if (!seq->first_free && list_empty(&seq->subscriptions)) {
- hlist_del_init(&seq->ns_list);
- kfree(seq->sseqs);
- kfree(seq);
- }
+ nameseq_delete_empty(seq);
return publ;
}
/*
- * tipc_nametbl_translate - translate name to port id
+ * tipc_nametbl_translate - perform name translation
+ *
+ * On entry, 'destnode' is the search domain used during translation.
*
- * Note: on entry 'destnode' is the search domain used during translation;
- * on exit it passes back the node address of the matching port (if any)
+ * On exit:
+ * - if name translation is deferred to another node/cluster/zone,
+ * leaves 'destnode' unchanged (will be non-zero) and returns 0
+ * - if name translation is attempted and succeeds, sets 'destnode'
+ * to publishing node and returns port reference (will be non-zero)
+ * - if name translation is attempted and fails, sets 'destnode' to 0
+ * and returns 0
*/
-
u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
{
struct sub_seq *sseq;
@@ -547,6 +540,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
struct publication *publ;
struct name_seq *seq;
u32 ref = 0;
+ u32 node = 0;
if (!tipc_in_scope(*destnode, tipc_own_addr))
return 0;
@@ -561,7 +555,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
spin_lock_bh(&seq->lock);
info = sseq->info;
- /* Closest-First Algorithm: */
+ /* Closest-First Algorithm */
if (likely(!*destnode)) {
if (!list_empty(&info->node_list)) {
publ = list_first_entry(&info->node_list,
@@ -584,14 +578,14 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
}
}
- /* Round-Robin Algorithm: */
+ /* Round-Robin Algorithm */
else if (*destnode == tipc_own_addr) {
if (list_empty(&info->node_list))
goto no_match;
publ = list_first_entry(&info->node_list, struct publication,
node_list);
list_move_tail(&publ->node_list, &info->node_list);
- } else if (in_own_cluster(*destnode)) {
+ } else if (in_own_cluster_exact(*destnode)) {
if (list_empty(&info->cluster_list))
goto no_match;
publ = list_first_entry(&info->cluster_list, struct publication,
@@ -604,11 +598,12 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
}
ref = publ->ref;
- *destnode = publ->node;
+ node = publ->node;
no_match:
spin_unlock_bh(&seq->lock);
not_found:
read_unlock_bh(&tipc_nametbl_lock);
+ *destnode = node;
return ref;
}
@@ -624,7 +619,6 @@ not_found:
*
* Returns non-zero if any off-node ports overlap
*/
-
int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
struct tipc_port_list *dports)
{
@@ -665,25 +659,9 @@ exit:
return res;
}
-/**
- * tipc_nametbl_publish_rsv - publish port name using a reserved name type
- */
-
-int tipc_nametbl_publish_rsv(u32 ref, unsigned int scope,
- struct tipc_name_seq const *seq)
-{
- int res;
-
- atomic_inc(&rsv_publ_ok);
- res = tipc_publish(ref, scope, seq);
- atomic_dec(&rsv_publ_ok);
- return res;
-}
-
-/**
+/*
* tipc_nametbl_publish - add name publication to network name tables
*/
-
struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
u32 scope, u32 port_ref, u32 key)
{
@@ -694,18 +672,14 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
tipc_max_publications);
return NULL;
}
- if ((type < TIPC_RESERVED_TYPES) && !atomic_read(&rsv_publ_ok)) {
- warn("Publication failed, reserved name {%u,%u,%u}\n",
- type, lower, upper);
- return NULL;
- }
write_lock_bh(&tipc_nametbl_lock);
- table.local_publ_count++;
publ = tipc_nametbl_insert_publ(type, lower, upper, scope,
tipc_own_addr, port_ref, key);
- if (publ && (scope != TIPC_NODE_SCOPE))
+ if (likely(publ)) {
+ table.local_publ_count++;
tipc_named_publish(publ);
+ }
write_unlock_bh(&tipc_nametbl_lock);
return publ;
}
@@ -713,7 +687,6 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
/**
* tipc_nametbl_withdraw - withdraw name publication from network name tables
*/
-
int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
{
struct publication *publ;
@@ -722,8 +695,7 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
if (likely(publ)) {
table.local_publ_count--;
- if (publ->scope != TIPC_NODE_SCOPE)
- tipc_named_withdraw(publ);
+ tipc_named_withdraw(publ);
write_unlock_bh(&tipc_nametbl_lock);
list_del_init(&publ->pport_list);
kfree(publ);
@@ -739,7 +711,6 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
/**
* tipc_nametbl_subscribe - add a subscription object to the name table
*/
-
void tipc_nametbl_subscribe(struct tipc_subscription *s)
{
u32 type = s->seq.type;
@@ -763,7 +734,6 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
/**
* tipc_nametbl_unsubscribe - remove a subscription object from name table
*/
-
void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
{
struct name_seq *seq;
@@ -774,11 +744,7 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
spin_lock_bh(&seq->lock);
list_del_init(&s->nameseq_list);
spin_unlock_bh(&seq->lock);
- if ((seq->first_free == 0) && list_empty(&seq->subscriptions)) {
- hlist_del_init(&seq->ns_list);
- kfree(seq->sseqs);
- kfree(seq);
- }
+ nameseq_delete_empty(seq);
}
write_unlock_bh(&tipc_nametbl_lock);
}
@@ -787,7 +753,6 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
/**
* subseq_list: print specified sub-sequence contents into the given buffer
*/
-
static void subseq_list(struct sub_seq *sseq, struct print_buf *buf, u32 depth,
u32 index)
{
@@ -824,7 +789,6 @@ static void subseq_list(struct sub_seq *sseq, struct print_buf *buf, u32 depth,
/**
* nameseq_list: print specified name sequence contents into the given buffer
*/
-
static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,
u32 type, u32 lowbound, u32 upbound, u32 index)
{
@@ -855,7 +819,6 @@ static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,
/**
* nametbl_header - print name table header into the given buffer
*/
-
static void nametbl_header(struct print_buf *buf, u32 depth)
{
const char *header[] = {
@@ -877,7 +840,6 @@ static void nametbl_header(struct print_buf *buf, u32 depth)
/**
* nametbl_list - print specified name table contents into the given buffer
*/
-
static void nametbl_list(struct print_buf *buf, u32 depth_info,
u32 type, u32 lowbound, u32 upbound)
{
@@ -976,7 +938,6 @@ void tipc_nametbl_stop(void)
return;
/* Verify name table is empty, then release it */
-
write_lock_bh(&tipc_nametbl_lock);
for (i = 0; i < tipc_nametbl_size; i++) {
if (!hlist_empty(&table.types[i]))
@@ -986,4 +947,3 @@ void tipc_nametbl_stop(void)
table.types = NULL;
write_unlock_bh(&tipc_nametbl_lock);
}
-
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 8086b42f92a..71cb4dc712d 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -45,10 +45,8 @@ struct tipc_port_list;
/*
* TIPC name types reserved for internal TIPC use (both current and planned)
*/
-
#define TIPC_ZM_SRV 3 /* zone master service name type */
-
/**
* struct publication - info about a published (name or) name sequence
* @type: name sequence type
@@ -67,7 +65,6 @@ struct tipc_port_list;
*
* Note that the node list, cluster list, and zone list are circular lists.
*/
-
struct publication {
u32 type;
u32 lower;
@@ -91,8 +88,6 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space);
u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
struct tipc_port_list *dports);
-int tipc_nametbl_publish_rsv(u32 ref, unsigned int scope,
- struct tipc_name_seq const *seq);
struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
u32 scope, u32 port_ref, u32 key);
int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key);
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 61afee7e829..7c236c89cf5 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -117,7 +117,7 @@ static void net_route_named_msg(struct sk_buff *buf)
u32 dport;
if (!msg_named(msg)) {
- buf_discard(buf);
+ kfree_skb(buf);
return;
}
@@ -161,7 +161,7 @@ void tipc_net_route_msg(struct sk_buff *buf)
tipc_port_recv_proto_msg(buf);
break;
default:
- buf_discard(buf);
+ kfree_skb(buf);
}
return;
}
@@ -175,21 +175,14 @@ int tipc_net_start(u32 addr)
{
char addr_string[16];
- if (tipc_mode != TIPC_NODE_MODE)
- return -ENOPROTOOPT;
-
- tipc_subscr_stop();
- tipc_cfg_stop();
-
+ write_lock_bh(&tipc_net_lock);
tipc_own_addr = addr;
- tipc_mode = TIPC_NET_MODE;
tipc_named_reinit();
tipc_port_reinit();
-
tipc_bclink_init();
+ write_unlock_bh(&tipc_net_lock);
- tipc_k_signal((Handler)tipc_subscr_start, 0);
- tipc_k_signal((Handler)tipc_cfg_init, 0);
+ tipc_cfg_reinit();
info("Started in network mode\n");
info("Own node address %s, network identity %u\n",
@@ -201,10 +194,9 @@ void tipc_net_stop(void)
{
struct tipc_node *node, *t_node;
- if (tipc_mode != TIPC_NET_MODE)
+ if (!tipc_own_addr)
return;
write_lock_bh(&tipc_net_lock);
- tipc_mode = TIPC_NODE_MODE;
tipc_bearer_stop();
tipc_bclink_stop();
list_for_each_entry_safe(node, t_node, &tipc_node_list, list)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6b226faad89..d4fd341e6e0 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -39,6 +39,8 @@
#include "node.h"
#include "name_distr.h"
+#define NODE_HTABLE_SIZE 512
+
static void node_lost_contact(struct tipc_node *n_ptr);
static void node_established_contact(struct tipc_node *n_ptr);
@@ -49,18 +51,27 @@ LIST_HEAD(tipc_node_list);
static u32 tipc_num_nodes;
static atomic_t tipc_num_links = ATOMIC_INIT(0);
-u32 tipc_own_tag;
-/**
- * tipc_node_find - locate specified node object, if it exists
+/*
+ * A trivial power-of-two bitmask technique is used for speed, since this
+ * operation is done for every incoming TIPC packet. The number of hash table
+ * entries has been chosen so that no hash chain exceeds 8 nodes and will
+ * usually be much smaller (typically only a single node).
*/
+static unsigned int tipc_hashfn(u32 addr)
+{
+ return addr & (NODE_HTABLE_SIZE - 1);
+}
+/*
+ * tipc_node_find - locate specified node object, if it exists
+ */
struct tipc_node *tipc_node_find(u32 addr)
{
struct tipc_node *node;
struct hlist_node *pos;
- if (unlikely(!in_own_cluster(addr)))
+ if (unlikely(!in_own_cluster_exact(addr)))
return NULL;
hlist_for_each_entry(node, pos, &node_htable[tipc_hashfn(addr)], hash) {
@@ -79,7 +90,6 @@ struct tipc_node *tipc_node_find(u32 addr)
* time. (It would be preferable to switch to holding net_lock in write mode,
* but this is a non-trivial change.)
*/
-
struct tipc_node *tipc_node_create(u32 addr)
{
struct tipc_node *n_ptr, *temp_node;
@@ -113,6 +123,7 @@ struct tipc_node *tipc_node_create(u32 addr)
}
list_add_tail(&n_ptr->list, &temp_node->list);
n_ptr->block_setup = WAIT_PEER_DOWN;
+ n_ptr->signature = INVALID_NODE_SIG;
tipc_num_nodes++;
@@ -129,13 +140,11 @@ void tipc_node_delete(struct tipc_node *n_ptr)
tipc_num_nodes--;
}
-
/**
* tipc_node_link_up - handle addition of link
*
* Link becomes active (alone or shared) or standby, depending on its priority.
*/
-
void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
struct tipc_link **active = &n_ptr->active_links[0];
@@ -168,7 +177,6 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
/**
* node_select_active_links - select active link
*/
-
static void node_select_active_links(struct tipc_node *n_ptr)
{
struct tipc_link **active = &n_ptr->active_links[0];
@@ -196,7 +204,6 @@ static void node_select_active_links(struct tipc_node *n_ptr)
/**
* tipc_node_link_down - handle loss of link
*/
-
void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
struct tipc_link **active;
@@ -253,63 +260,14 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
n_ptr->link_cnt--;
}
-/*
- * Routing table management - five cases to handle:
- *
- * 1: A link towards a zone/cluster external node comes up.
- * => Send a multicast message updating routing tables of all
- * system nodes within own cluster that the new destination
- * can be reached via this node.
- * (node.establishedContact()=>cluster.multicastNewRoute())
- *
- * 2: A link towards a slave node comes up.
- * => Send a multicast message updating routing tables of all
- * system nodes within own cluster that the new destination
- * can be reached via this node.
- * (node.establishedContact()=>cluster.multicastNewRoute())
- * => Send a message to the slave node about existence
- * of all system nodes within cluster:
- * (node.establishedContact()=>cluster.sendLocalRoutes())
- *
- * 3: A new cluster local system node becomes available.
- * => Send message(s) to this particular node containing
- * information about all cluster external and slave
- * nodes which can be reached via this node.
- * (node.establishedContact()==>network.sendExternalRoutes())
- * (node.establishedContact()==>network.sendSlaveRoutes())
- * => Send messages to all directly connected slave nodes
- * containing information about the existence of the new node
- * (node.establishedContact()=>cluster.multicastNewRoute())
- *
- * 4: The link towards a zone/cluster external node or slave
- * node goes down.
- * => Send a multcast message updating routing tables of all
- * nodes within cluster that the new destination can not any
- * longer be reached via this node.
- * (node.lostAllLinks()=>cluster.bcastLostRoute())
- *
- * 5: A cluster local system node becomes unavailable.
- * => Remove all references to this node from the local
- * routing tables. Note: This is a completely node
- * local operation.
- * (node.lostAllLinks()=>network.removeAsRouter())
- * => Send messages to all directly connected slave nodes
- * containing information about loss of the node
- * (node.establishedContact()=>cluster.multicastLostRoute())
- *
- */
-
static void node_established_contact(struct tipc_node *n_ptr)
{
tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
- /* Syncronize broadcast acks */
- n_ptr->bclink.acked = tipc_bclink_get_last_sent();
-
- if (n_ptr->bclink.supported) {
+ if (n_ptr->bclink.supportable) {
+ n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
- if (n_ptr->addr < tipc_own_addr)
- tipc_own_tag++;
+ n_ptr->bclink.supported = 1;
}
}
@@ -336,24 +294,21 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_addr_string_fill(addr_string, n_ptr->addr));
/* Flush broadcast link info associated with lost node */
-
if (n_ptr->bclink.supported) {
- n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next;
- buf_discard(buf);
+ kfree_skb(buf);
}
+ n_ptr->bclink.deferred_size = 0;
if (n_ptr->bclink.defragm) {
- buf_discard(n_ptr->bclink.defragm);
+ kfree_skb(n_ptr->bclink.defragm);
n_ptr->bclink.defragm = NULL;
}
tipc_bclink_remove_node(n_ptr->addr);
tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
- if (n_ptr->addr < tipc_own_addr)
- tipc_own_tag--;
n_ptr->bclink.supported = 0;
}
@@ -372,7 +327,6 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_nodesub_notify(n_ptr);
/* Prevent re-contact with node until cleanup is done */
-
n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE;
tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr);
}
@@ -400,7 +354,6 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
}
/* For now, get space for all other nodes */
-
payload_size = TLV_SPACE(sizeof(node_info)) * tipc_num_nodes;
if (payload_size > 32768u) {
read_unlock_bh(&tipc_net_lock);
@@ -414,7 +367,6 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
}
/* Add TLVs for all nodes in scope */
-
list_for_each_entry(n_ptr, &tipc_node_list, list) {
if (!tipc_in_scope(domain, n_ptr->addr))
continue;
@@ -444,13 +396,12 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (network address)");
- if (tipc_mode != TIPC_NET_MODE)
+ if (!tipc_own_addr)
return tipc_cfg_reply_none();
read_lock_bh(&tipc_net_lock);
- /* Get space for all unicast links + multicast link */
-
+ /* Get space for all unicast links + broadcast link */
payload_size = TLV_SPACE(sizeof(link_info)) *
(atomic_read(&tipc_num_links) + 1);
if (payload_size > 32768u) {
@@ -465,14 +416,12 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
}
/* Add TLV for broadcast link */
-
link_info.dest = htonl(tipc_cluster_mask(tipc_own_addr));
link_info.up = htonl(1);
strlcpy(link_info.str, tipc_bclink_name, TIPC_MAX_LINK_NAME);
tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO, &link_info, sizeof(link_info));
/* Add TLVs for any other links in scope */
-
list_for_each_entry(n_ptr, &tipc_node_list, list) {
u32 i;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 0b1c5f8b699..cfcaf4d6e48 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -42,8 +42,12 @@
#include "net.h"
#include "bearer.h"
-/* Flags used to block (re)establishment of contact with a neighboring node */
+/*
+ * Out-of-range value for node signature
+ */
+#define INVALID_NODE_SIG 0x10000
+/* Flags used to block (re)establishment of contact with a neighboring node */
#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */
#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */
#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */
@@ -61,18 +65,19 @@
* @block_setup: bit mask of conditions preventing link establishment to node
* @link_cnt: number of links to node
* @permit_changeover: non-zero if node has redundant links to this system
+ * @signature: node instance identifier
* @bclink: broadcast-related info
+ * @supportable: non-zero if node supports TIPC b'cast link capability
* @supported: non-zero if node supports TIPC b'cast capability
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
- * @gap_after: sequence # of last message not requiring a NAK request
- * @gap_to: sequence # of last message requiring a NAK request
- * @nack_sync: counter that determines when NAK requests should be sent
+ * @last_sent: sequence # of last b'cast message sent by node
+ * @oos_state: state tracker for handling OOS b'cast messages
+ * @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node
*/
-
struct tipc_node {
u32 addr;
spinlock_t lock;
@@ -85,35 +90,23 @@ struct tipc_node {
int working_links;
int block_setup;
int permit_changeover;
+ u32 signature;
struct {
- int supported;
+ u8 supportable;
+ u8 supported;
u32 acked;
u32 last_in;
- u32 gap_after;
- u32 gap_to;
- u32 nack_sync;
+ u32 last_sent;
+ u32 oos_state;
+ u32 deferred_size;
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *defragm;
} bclink;
};
-#define NODE_HTABLE_SIZE 512
extern struct list_head tipc_node_list;
-/*
- * A trivial power-of-two bitmask technique is used for speed, since this
- * operation is done for every incoming TIPC packet. The number of hash table
- * entries has been chosen so that no hash chain exceeds 8 nodes and will
- * usually be much smaller (typically only a single node).
- */
-static inline unsigned int tipc_hashfn(u32 addr)
-{
- return addr & (NODE_HTABLE_SIZE - 1);
-}
-
-extern u32 tipc_own_tag;
-
struct tipc_node *tipc_node_find(u32 addr);
struct tipc_node *tipc_node_create(u32 addr);
void tipc_node_delete(struct tipc_node *n_ptr);
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index c3c2815ae63..7a27344108f 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -41,11 +41,10 @@
/**
* tipc_nodesub_subscribe - create "node down" subscription for specified node
*/
-
void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
void *usr_handle, net_ev_handler handle_down)
{
- if (addr == tipc_own_addr) {
+ if (in_own_node(addr)) {
node_sub->node = NULL;
return;
}
@@ -66,7 +65,6 @@ void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
/**
* tipc_nodesub_unsubscribe - cancel "node down" subscription (if any)
*/
-
void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
{
if (!node_sub->node)
@@ -82,7 +80,6 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
*
* Note: node is locked by caller
*/
-
void tipc_nodesub_notify(struct tipc_node *node)
{
struct tipc_node_subscr *ns;
diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h
index 4bc2ca0867a..c95d20727de 100644
--- a/net/tipc/node_subscr.h
+++ b/net/tipc/node_subscr.h
@@ -48,7 +48,6 @@ typedef void (*net_ev_handler) (void *usr_handle);
* @usr_handle: argument to pass to routine when node fails
* @nodesub_list: adjacent entries in list of subscriptions for the node
*/
-
struct tipc_node_subscr {
struct tipc_node *node;
net_ev_handler handle_node_down;
diff --git a/net/tipc/port.c b/net/tipc/port.c
index d91efc69e6f..2ad37a4db37 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -69,10 +69,30 @@ static u32 port_peerport(struct tipc_port *p_ptr)
return msg_destport(&p_ptr->phdr);
}
+/*
+ * tipc_port_peer_msg - verify message was sent by connected port's peer
+ *
+ * Handles cases where the node's network address has changed from
+ * the default of <0.0.0> to its configured setting.
+ */
+int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
+{
+ u32 peernode;
+ u32 orignode;
+
+ if (msg_origport(msg) != port_peerport(p_ptr))
+ return 0;
+
+ orignode = msg_orignode(msg);
+ peernode = port_peernode(p_ptr);
+ return (orignode == peernode) ||
+ (!orignode && (peernode == tipc_own_addr)) ||
+ (!peernode && (orignode == tipc_own_addr));
+}
+
/**
* tipc_multicast - send a multicast message to local and remote destinations
*/
-
int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
u32 num_sect, struct iovec const *msg_sect,
unsigned int total_len)
@@ -89,7 +109,6 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
return -EINVAL;
/* Create multicast message */
-
hdr = &oport->phdr;
msg_set_type(hdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
@@ -105,24 +124,22 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
return res;
/* Figure out where to send multicast message */
-
ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
TIPC_NODE_SCOPE, &dports);
/* Send message to destinations (duplicate it only if necessary) */
-
if (ext_targets) {
if (dports.count != 0) {
ibuf = skb_copy(buf, GFP_ATOMIC);
if (ibuf == NULL) {
tipc_port_list_free(&dports);
- buf_discard(buf);
+ kfree_skb(buf);
return -ENOMEM;
}
}
res = tipc_bclink_send_msg(buf);
if ((res < 0) && (dports.count != 0))
- buf_discard(ibuf);
+ kfree_skb(ibuf);
} else {
ibuf = buf;
}
@@ -141,7 +158,6 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
*
* If there is no port list, perform a lookup to create one
*/
-
void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
{
struct tipc_msg *msg;
@@ -152,7 +168,6 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
msg = buf_msg(buf);
/* Create destination port list, if one wasn't supplied */
-
if (dp == NULL) {
tipc_nametbl_mc_translate(msg_nametype(msg),
msg_namelower(msg),
@@ -163,7 +178,6 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
}
/* Deliver a copy of message to each destination port */
-
if (dp->count != 0) {
msg_set_destnode(msg, tipc_own_addr);
if (dp->count == 1) {
@@ -187,7 +201,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
}
}
exit:
- buf_discard(buf);
+ kfree_skb(buf);
tipc_port_list_free(dp);
}
@@ -196,7 +210,6 @@ exit:
*
* Returns pointer to (locked) TIPC port, or NULL if unable to create it
*/
-
struct tipc_port *tipc_createport_raw(void *usr_handle,
u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
void (*wakeup)(struct tipc_port *),
@@ -221,18 +234,24 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
p_ptr->usr_handle = usr_handle;
p_ptr->max_pkt = MAX_PKT_DEFAULT;
p_ptr->ref = ref;
- msg = &p_ptr->phdr;
- tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
- msg_set_origport(msg, ref);
INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
p_ptr->dispatcher = dispatcher;
p_ptr->wakeup = wakeup;
p_ptr->user_port = NULL;
k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
- spin_lock_bh(&tipc_port_list_lock);
INIT_LIST_HEAD(&p_ptr->publications);
INIT_LIST_HEAD(&p_ptr->port_list);
+
+ /*
+ * Must hold port list lock while initializing message header template
+ * to ensure a change to node's own network address doesn't result
+ * in template containing out-dated network address information
+ */
+ spin_lock_bh(&tipc_port_list_lock);
+ msg = &p_ptr->phdr;
+ tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
+ msg_set_origport(msg, ref);
list_add_tail(&p_ptr->port_list, &ports);
spin_unlock_bh(&tipc_port_list_lock);
return p_ptr;
@@ -361,7 +380,6 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
u32 rmsg_sz;
/* discard rejected message if it shouldn't be returned to sender */
-
if (WARN(!msg_isdata(msg),
"attempt to reject message with user=%u", msg_user(msg))) {
dump_stack();
@@ -374,7 +392,6 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
* construct returned message by copying rejected message header and
* data (or subset), then updating header fields that need adjusting
*/
-
hdr_sz = msg_hdr_sz(msg);
rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
@@ -400,26 +417,26 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
/* send self-abort message when rejecting on a connected port */
if (msg_connected(msg)) {
- struct sk_buff *abuf = NULL;
struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
if (p_ptr) {
+ struct sk_buff *abuf = NULL;
+
if (p_ptr->connected)
abuf = port_build_self_abort_msg(p_ptr, err);
tipc_port_unlock(p_ptr);
+ tipc_net_route_msg(abuf);
}
- tipc_net_route_msg(abuf);
}
/* send returned message & dispose of rejected message */
-
src_node = msg_prevnode(msg);
- if (src_node == tipc_own_addr)
+ if (in_own_node(src_node))
tipc_port_recv_msg(rbuf);
else
tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
exit:
- buf_discard(buf);
+ kfree_skb(buf);
return data_sz;
}
@@ -518,25 +535,20 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
struct tipc_msg *msg = buf_msg(buf);
struct tipc_port *p_ptr;
struct sk_buff *r_buf = NULL;
- u32 orignode = msg_orignode(msg);
- u32 origport = msg_origport(msg);
u32 destport = msg_destport(msg);
int wakeable;
/* Validate connection */
-
p_ptr = tipc_port_lock(destport);
- if (!p_ptr || !p_ptr->connected ||
- (port_peernode(p_ptr) != orignode) ||
- (port_peerport(p_ptr) != origport)) {
+ if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
r_buf = tipc_buf_acquire(BASIC_H_SIZE);
if (r_buf) {
msg = buf_msg(r_buf);
tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
- BASIC_H_SIZE, orignode);
+ BASIC_H_SIZE, msg_orignode(msg));
msg_set_errcode(msg, TIPC_ERR_NO_PORT);
msg_set_origport(msg, destport);
- msg_set_destport(msg, origport);
+ msg_set_destport(msg, msg_origport(msg));
}
if (p_ptr)
tipc_port_unlock(p_ptr);
@@ -544,7 +556,6 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
}
/* Process protocol message sent by peer */
-
switch (msg_type(msg)) {
case CONN_ACK:
wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
@@ -567,7 +578,7 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
tipc_port_unlock(p_ptr);
exit:
tipc_net_route_msg(r_buf);
- buf_discard(buf);
+ kfree_skb(buf);
}
static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
@@ -645,8 +656,6 @@ void tipc_port_reinit(void)
spin_lock_bh(&tipc_port_list_lock);
list_for_each_entry(p_ptr, &ports, port_list) {
msg = &p_ptr->phdr;
- if (msg_orignode(msg) == tipc_own_addr)
- break;
msg_set_prevnode(msg, tipc_own_addr);
msg_set_orignode(msg, tipc_own_addr);
}
@@ -658,7 +667,6 @@ void tipc_port_reinit(void)
* port_dispatcher_sigh(): Signal handler for messages destinated
* to the tipc_port interface.
*/
-
static void port_dispatcher_sigh(void *dummy)
{
struct sk_buff *buf;
@@ -675,6 +683,7 @@ static void port_dispatcher_sigh(void *dummy)
struct tipc_name_seq dseq;
void *usr_handle;
int connected;
+ int peer_invalid;
int published;
u32 message_type;
@@ -695,6 +704,7 @@ static void port_dispatcher_sigh(void *dummy)
up_ptr = p_ptr->user_port;
usr_handle = up_ptr->usr_handle;
connected = p_ptr->connected;
+ peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
published = p_ptr->published;
if (unlikely(msg_errcode(msg)))
@@ -704,8 +714,6 @@ static void port_dispatcher_sigh(void *dummy)
case TIPC_CONN_MSG:{
tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
- u32 peer_port = port_peerport(p_ptr);
- u32 peer_node = port_peernode(p_ptr);
u32 dsz;
tipc_port_unlock(p_ptr);
@@ -714,8 +722,7 @@ static void port_dispatcher_sigh(void *dummy)
if (unlikely(!connected)) {
if (tipc_connect2port(dref, &orig))
goto reject;
- } else if ((msg_origport(msg) != peer_port) ||
- (msg_orignode(msg) != peer_node))
+ } else if (peer_invalid)
goto reject;
dsz = msg_data_sz(msg);
if (unlikely(dsz &&
@@ -758,7 +765,7 @@ static void port_dispatcher_sigh(void *dummy)
}
}
if (buf)
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
continue;
err:
@@ -767,14 +774,9 @@ err:
case TIPC_CONN_MSG:{
tipc_conn_shutdown_event cb =
up_ptr->conn_err_cb;
- u32 peer_port = port_peerport(p_ptr);
- u32 peer_node = port_peernode(p_ptr);
tipc_port_unlock(p_ptr);
- if (!cb || !connected)
- break;
- if ((msg_origport(msg) != peer_port) ||
- (msg_orignode(msg) != peer_node))
+ if (!cb || !connected || peer_invalid)
break;
tipc_disconnect(dref);
skb_pull(buf, msg_hdr_sz(msg));
@@ -812,7 +814,7 @@ err:
}
}
if (buf)
- buf_discard(buf);
+ kfree_skb(buf);
buf = next;
continue;
reject:
@@ -825,7 +827,6 @@ reject:
* port_dispatcher(): Dispatcher for messages destinated
* to the tipc_port interface. Called with port locked.
*/
-
static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
{
buf->next = NULL;
@@ -842,10 +843,8 @@ static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
}
/*
- * Wake up port after congestion: Called with port locked,
- *
+ * Wake up port after congestion: Called with port locked
*/
-
static void port_wakeup_sh(unsigned long ref)
{
struct tipc_port *p_ptr;
@@ -891,7 +890,6 @@ void tipc_acknowledge(u32 ref, u32 ack)
/*
* tipc_createport(): user level call.
*/
-
int tipc_createport(void *usr_handle,
unsigned int importance,
tipc_msg_err_event error_cb,
@@ -900,7 +898,7 @@ int tipc_createport(void *usr_handle,
tipc_msg_event msg_cb,
tipc_named_msg_event named_msg_cb,
tipc_conn_msg_event conn_msg_cb,
- tipc_continue_event continue_event_cb,/* May be zero */
+ tipc_continue_event continue_event_cb, /* May be zero */
u32 *portref)
{
struct user_port *up_ptr;
@@ -974,10 +972,6 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
if (p_ptr->connected)
goto exit;
- if (seq->lower > seq->upper)
- goto exit;
- if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
- goto exit;
key = ref + p_ptr->pub_count + 1;
if (key == ref) {
res = -EADDRINUSE;
@@ -1053,8 +1047,6 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
msg = &p_ptr->phdr;
msg_set_destnode(msg, peer->node);
msg_set_destport(msg, peer->ref);
- msg_set_orignode(msg, tipc_own_addr);
- msg_set_origport(msg, p_ptr->ref);
msg_set_type(msg, TIPC_CONN_MSG);
msg_set_lookup_scope(msg, 0);
msg_set_hdr_sz(msg, SHORT_H_SIZE);
@@ -1079,7 +1071,6 @@ exit:
*
* Port must be locked.
*/
-
int tipc_disconnect_port(struct tipc_port *tp_ptr)
{
int res;
@@ -1100,7 +1091,6 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr)
* tipc_disconnect(): Disconnect port form peer.
* This is a node local operation.
*/
-
int tipc_disconnect(u32 ref)
{
struct tipc_port *p_ptr;
@@ -1132,11 +1122,41 @@ int tipc_shutdown(u32 ref)
return tipc_disconnect(ref);
}
+/**
+ * tipc_port_recv_msg - receive message from lower layer and deliver to port user
+ */
+int tipc_port_recv_msg(struct sk_buff *buf)
+{
+ struct tipc_port *p_ptr;
+ struct tipc_msg *msg = buf_msg(buf);
+ u32 destport = msg_destport(msg);
+ u32 dsz = msg_data_sz(msg);
+ u32 err;
+
+ /* forward unresolved named message */
+ if (unlikely(!destport)) {
+ tipc_net_route_msg(buf);
+ return dsz;
+ }
+
+ /* validate destination & pass to port, otherwise reject message */
+ p_ptr = tipc_port_lock(destport);
+ if (likely(p_ptr)) {
+ err = p_ptr->dispatcher(p_ptr, buf);
+ tipc_port_unlock(p_ptr);
+ if (likely(!err))
+ return dsz;
+ } else {
+ err = TIPC_ERR_NO_PORT;
+ }
+
+ return tipc_reject_msg(buf, err);
+}
+
/*
* tipc_port_recv_sections(): Concatenate and deliver sectioned
* message for this node.
*/
-
static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
struct iovec const *msg_sect,
unsigned int total_len)
@@ -1154,7 +1174,6 @@ static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_se
/**
* tipc_send - send message sections on connection
*/
-
int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len)
{
@@ -1169,7 +1188,7 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
p_ptr->congested = 1;
if (!tipc_port_congested(p_ptr)) {
destnode = port_peernode(p_ptr);
- if (likely(destnode != tipc_own_addr))
+ if (likely(!in_own_node(destnode)))
res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
total_len, destnode);
else
@@ -1193,7 +1212,6 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
/**
* tipc_send2name - send message sections to port name
*/
-
int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len)
@@ -1210,8 +1228,6 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_NAMED_MSG);
- msg_set_orignode(msg, tipc_own_addr);
- msg_set_origport(msg, ref);
msg_set_hdr_sz(msg, NAMED_H_SIZE);
msg_set_nametype(msg, name->type);
msg_set_nameinst(msg, name->instance);
@@ -1220,14 +1236,18 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
msg_set_destnode(msg, destnode);
msg_set_destport(msg, destport);
- if (likely(destport)) {
- if (likely(destnode == tipc_own_addr))
+ if (likely(destport || destnode)) {
+ if (likely(in_own_node(destnode)))
res = tipc_port_recv_sections(p_ptr, num_sect,
msg_sect, total_len);
- else
+ else if (tipc_own_addr)
res = tipc_link_send_sections_fast(p_ptr, msg_sect,
num_sect, total_len,
destnode);
+ else
+ res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
+ num_sect, total_len,
+ TIPC_ERR_NO_NODE);
if (likely(res != -ELINKCONG)) {
if (res > 0)
p_ptr->sent++;
@@ -1245,7 +1265,6 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
/**
* tipc_send2port - send message sections to port identity
*/
-
int tipc_send2port(u32 ref, struct tipc_portid const *dest,
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len)
@@ -1261,18 +1280,19 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_DIRECT_MSG);
msg_set_lookup_scope(msg, 0);
- msg_set_orignode(msg, tipc_own_addr);
- msg_set_origport(msg, ref);
msg_set_destnode(msg, dest->node);
msg_set_destport(msg, dest->ref);
msg_set_hdr_sz(msg, BASIC_H_SIZE);
- if (dest->node == tipc_own_addr)
+ if (in_own_node(dest->node))
res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
total_len);
- else
+ else if (tipc_own_addr)
res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
total_len, dest->node);
+ else
+ res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
+ total_len, TIPC_ERR_NO_NODE);
if (likely(res != -ELINKCONG)) {
if (res > 0)
p_ptr->sent++;
@@ -1287,7 +1307,6 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
/**
* tipc_send_buf2port - send message buffer to port identity
*/
-
int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
struct sk_buff *buf, unsigned int dsz)
{
@@ -1301,8 +1320,6 @@ int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_DIRECT_MSG);
- msg_set_orignode(msg, tipc_own_addr);
- msg_set_origport(msg, ref);
msg_set_destnode(msg, dest->node);
msg_set_destport(msg, dest->ref);
msg_set_hdr_sz(msg, BASIC_H_SIZE);
@@ -1313,7 +1330,7 @@ int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
skb_push(buf, BASIC_H_SIZE);
skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
- if (dest->node == tipc_own_addr)
+ if (in_own_node(dest->node))
res = tipc_port_recv_msg(buf);
else
res = tipc_send_buf_fast(buf, dest->node);
@@ -1326,4 +1343,3 @@ int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
return dsz;
return -ELINKCONG;
}
-
diff --git a/net/tipc/port.h b/net/tipc/port.h
index f751807e2a9..98cbec9c453 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -81,7 +81,6 @@ typedef void (*tipc_continue_event) (void *usr_handle, u32 portref);
* @ref: object reference to associated TIPC port
* <various callback routines>
*/
-
struct user_port {
void *usr_handle;
u32 ref;
@@ -201,10 +200,12 @@ int tipc_shutdown(u32 ref);
* The following routines require that the port be locked on entry
*/
int tipc_disconnect_port(struct tipc_port *tp_ptr);
+int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
/*
* TIPC messaging routines
*/
+int tipc_port_recv_msg(struct sk_buff *buf);
int tipc_send(u32 portref, unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len);
@@ -234,7 +235,6 @@ void tipc_port_reinit(void);
/**
* tipc_port_lock - lock port instance referred to and return its pointer
*/
-
static inline struct tipc_port *tipc_port_lock(u32 ref)
{
return (struct tipc_port *)tipc_ref_lock(ref);
@@ -245,7 +245,6 @@ static inline struct tipc_port *tipc_port_lock(u32 ref)
*
* Can use pointer instead of tipc_ref_unlock() since port is already locked.
*/
-
static inline void tipc_port_unlock(struct tipc_port *p_ptr)
{
spin_unlock_bh(p_ptr->lock);
@@ -256,60 +255,9 @@ static inline struct tipc_port *tipc_port_deref(u32 ref)
return (struct tipc_port *)tipc_ref_deref(ref);
}
-static inline u32 tipc_peer_port(struct tipc_port *p_ptr)
-{
- return msg_destport(&p_ptr->phdr);
-}
-
-static inline u32 tipc_peer_node(struct tipc_port *p_ptr)
-{
- return msg_destnode(&p_ptr->phdr);
-}
-
static inline int tipc_port_congested(struct tipc_port *p_ptr)
{
return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2);
}
-/**
- * tipc_port_recv_msg - receive message from lower layer and deliver to port user
- */
-
-static inline int tipc_port_recv_msg(struct sk_buff *buf)
-{
- struct tipc_port *p_ptr;
- struct tipc_msg *msg = buf_msg(buf);
- u32 destport = msg_destport(msg);
- u32 dsz = msg_data_sz(msg);
- u32 err;
-
- /* forward unresolved named message */
- if (unlikely(!destport)) {
- tipc_net_route_msg(buf);
- return dsz;
- }
-
- /* validate destination & pass to port, otherwise reject message */
- p_ptr = tipc_port_lock(destport);
- if (likely(p_ptr)) {
- if (likely(p_ptr->connected)) {
- if ((unlikely(msg_origport(msg) != tipc_peer_port(p_ptr))) ||
- (unlikely(msg_orignode(msg) != tipc_peer_node(p_ptr))) ||
- (unlikely(!msg_connected(msg)))) {
- err = TIPC_ERR_NO_PORT;
- tipc_port_unlock(p_ptr);
- goto reject;
- }
- }
- err = p_ptr->dispatcher(p_ptr, buf);
- tipc_port_unlock(p_ptr);
- if (likely(!err))
- return dsz;
- } else {
- err = TIPC_ERR_NO_PORT;
- }
-reject:
- return tipc_reject_msg(buf, err);
-}
-
#endif
diff --git a/net/tipc/ref.c b/net/tipc/ref.c
index 9e37b7812c3..5cada0e38e0 100644
--- a/net/tipc/ref.c
+++ b/net/tipc/ref.c
@@ -43,7 +43,6 @@
* @lock: spinlock controlling access to object
* @ref: reference value for object (combines instance & array index info)
*/
-
struct reference {
void *object;
spinlock_t lock;
@@ -60,7 +59,6 @@ struct reference {
* @index_mask: bitmask for array index portion of reference values
* @start_mask: initial value for instance value portion of reference values
*/
-
struct ref_table {
struct reference *entries;
u32 capacity;
@@ -96,7 +94,6 @@ static DEFINE_RWLOCK(ref_table_lock);
/**
* tipc_ref_table_init - create reference table for objects
*/
-
int tipc_ref_table_init(u32 requested_size, u32 start)
{
struct reference *table;
@@ -109,7 +106,6 @@ int tipc_ref_table_init(u32 requested_size, u32 start)
/* do nothing */ ;
/* allocate table & mark all entries as uninitialized */
-
table = vzalloc(actual_size * sizeof(struct reference));
if (table == NULL)
return -ENOMEM;
@@ -128,7 +124,6 @@ int tipc_ref_table_init(u32 requested_size, u32 start)
/**
* tipc_ref_table_stop - destroy reference table for objects
*/
-
void tipc_ref_table_stop(void)
{
if (!tipc_ref_table.entries)
@@ -149,7 +144,6 @@ void tipc_ref_table_stop(void)
* register a partially initialized object, without running the risk that
* the object will be accessed before initialization is complete.
*/
-
u32 tipc_ref_acquire(void *object, spinlock_t **lock)
{
u32 index;
@@ -168,7 +162,6 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)
}
/* take a free entry, if available; otherwise initialize a new entry */
-
write_lock_bh(&ref_table_lock);
if (tipc_ref_table.first_free) {
index = tipc_ref_table.first_free;
@@ -211,7 +204,6 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)
* Disallow future references to an object and free up the entry for re-use.
* Note: The entry's spin_lock may still be busy after discard
*/
-
void tipc_ref_discard(u32 ref)
{
struct reference *entry;
@@ -242,12 +234,10 @@ void tipc_ref_discard(u32 ref)
* mark entry as unused; increment instance part of entry's reference
* to invalidate any subsequent references
*/
-
entry->object = NULL;
entry->ref = (ref & ~index_mask) + (index_mask + 1);
/* append entry to free entry list */
-
if (tipc_ref_table.first_free == 0)
tipc_ref_table.first_free = index;
else
@@ -261,7 +251,6 @@ exit:
/**
* tipc_ref_lock - lock referenced object and return pointer to it
*/
-
void *tipc_ref_lock(u32 ref)
{
if (likely(tipc_ref_table.entries)) {
@@ -283,7 +272,6 @@ void *tipc_ref_lock(u32 ref)
/**
* tipc_ref_deref - return pointer referenced object (without locking it)
*/
-
void *tipc_ref_deref(u32 ref)
{
if (likely(tipc_ref_table.entries)) {
@@ -296,4 +284,3 @@ void *tipc_ref_deref(u32 ref)
}
return NULL;
}
-
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e2f7c5d370b..5577a447f53 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -123,10 +123,9 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
*
* Caller must hold socket lock
*/
-
static void advance_rx_queue(struct sock *sk)
{
- buf_discard(__skb_dequeue(&sk->sk_receive_queue));
+ kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
atomic_dec(&tipc_queue_size);
}
@@ -135,14 +134,13 @@ static void advance_rx_queue(struct sock *sk)
*
* Caller must hold socket lock
*/
-
static void discard_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
atomic_dec(&tipc_queue_size);
- buf_discard(buf);
+ kfree_skb(buf);
}
}
@@ -151,7 +149,6 @@ static void discard_rx_queue(struct sock *sk)
*
* Caller must hold socket lock
*/
-
static void reject_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
@@ -174,7 +171,6 @@ static void reject_rx_queue(struct sock *sk)
*
* Returns 0 on success, errno otherwise
*/
-
static int tipc_create(struct net *net, struct socket *sock, int protocol,
int kern)
{
@@ -184,7 +180,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
struct tipc_port *tp_ptr;
/* Validate arguments */
-
if (unlikely(protocol != 0))
return -EPROTONOSUPPORT;
@@ -207,13 +202,11 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
}
/* Allocate socket's protocol area */
-
sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
if (sk == NULL)
return -ENOMEM;
/* Allocate TIPC port for socket to use */
-
tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
TIPC_LOW_IMPORTANCE);
if (unlikely(!tp_ptr)) {
@@ -222,7 +215,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
}
/* Finish initializing socket data structures */
-
sock->ops = ops;
sock->state = state;
@@ -258,7 +250,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
*
* Returns 0 on success, errno otherwise
*/
-
static int release(struct socket *sock)
{
struct sock *sk = sock->sk;
@@ -270,7 +261,6 @@ static int release(struct socket *sock)
* Exit if socket isn't fully initialized (occurs when a failed accept()
* releases a pre-allocated child socket that was never used)
*/
-
if (sk == NULL)
return 0;
@@ -281,14 +271,13 @@ static int release(struct socket *sock)
* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer)
*/
-
while (sock->state != SS_DISCONNECTING) {
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf == NULL)
break;
atomic_dec(&tipc_queue_size);
if (TIPC_SKB_CB(buf)->handle != 0)
- buf_discard(buf);
+ kfree_skb(buf);
else {
if ((sock->state == SS_CONNECTING) ||
(sock->state == SS_CONNECTED)) {
@@ -303,15 +292,12 @@ static int release(struct socket *sock)
* Delete TIPC port; this ensures no more messages are queued
* (also disconnects an active connection & sends a 'FIN-' to peer)
*/
-
res = tipc_deleteport(tport->ref);
/* Discard any remaining (connection-based) messages in receive queue */
-
discard_rx_queue(sk);
/* Reject any messages that accumulated in backlog queue */
-
sock->state = SS_DISCONNECTING;
release_sock(sk);
@@ -336,7 +322,6 @@ static int release(struct socket *sock)
* NOTE: This routine doesn't need to take the socket lock since it doesn't
* access any non-constant socket information.
*/
-
static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
{
struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
@@ -355,6 +340,9 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
return -EAFNOSUPPORT;
+ if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
+ return -EACCES;
+
return (addr->scope > 0) ?
tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
@@ -373,7 +361,6 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
* accesses socket information that is unchanging (or which changes in
* a completely predictable manner).
*/
-
static int get_name(struct socket *sock, struct sockaddr *uaddr,
int *uaddr_len, int peer)
{
@@ -441,7 +428,6 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,
* imply that the operation will succeed, merely that it should be performed
* and will not block.
*/
-
static unsigned int poll(struct file *file, struct socket *sock,
poll_table *wait)
{
@@ -479,7 +465,6 @@ static unsigned int poll(struct file *file, struct socket *sock,
*
* Returns 0 if permission is granted, otherwise errno
*/
-
static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
{
struct tipc_cfg_msg_hdr hdr;
@@ -515,7 +500,6 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
*
* Returns the number of bytes sent on success, or errno otherwise
*/
-
static int send_msg(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t total_len)
{
@@ -532,7 +516,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
(dest->family != AF_TIPC)))
return -EINVAL;
if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
- (m->msg_iovlen > (unsigned)INT_MAX))
+ (m->msg_iovlen > (unsigned int)INT_MAX))
return -EMSGSIZE;
if (iocb)
@@ -559,7 +543,6 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
}
/* Abort any pending connection attempts (very unlikely) */
-
reject_rx_queue(sk);
}
@@ -628,7 +611,6 @@ exit:
*
* Returns the number of bytes sent on success, or errno otherwise
*/
-
static int send_packet(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t total_len)
{
@@ -639,12 +621,11 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
int res;
/* Handle implied connection establishment */
-
if (unlikely(dest))
return send_msg(iocb, sock, m, total_len);
if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
- (m->msg_iovlen > (unsigned)INT_MAX))
+ (m->msg_iovlen > (unsigned int)INT_MAX))
return -EMSGSIZE;
if (iocb)
@@ -692,7 +673,6 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
* Returns the number of bytes sent on success (or partial success),
* or errno if no data sent
*/
-
static int send_stream(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t total_len)
{
@@ -712,7 +692,6 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
lock_sock(sk);
/* Handle special cases where there is no connection */
-
if (unlikely(sock->state != SS_CONNECTED)) {
if (sock->state == SS_UNCONNECTED) {
res = send_packet(NULL, sock, m, total_len);
@@ -731,8 +710,8 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
goto exit;
}
- if ((total_len > (unsigned)INT_MAX) ||
- (m->msg_iovlen > (unsigned)INT_MAX)) {
+ if ((total_len > (unsigned int)INT_MAX) ||
+ (m->msg_iovlen > (unsigned int)INT_MAX)) {
res = -EMSGSIZE;
goto exit;
}
@@ -744,7 +723,6 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
* (i.e. one large iovec entry), but could be improved to pass sets
* of small iovec entries into send_packet().
*/
-
curr_iov = m->msg_iov;
curr_iovlen = m->msg_iovlen;
my_msg.msg_iov = &my_iov;
@@ -793,7 +771,6 @@ exit:
*
* Returns 0 on success, errno otherwise
*/
-
static int auto_connect(struct socket *sock, struct tipc_msg *msg)
{
struct tipc_sock *tsock = tipc_sk(sock->sk);
@@ -818,7 +795,6 @@ static int auto_connect(struct socket *sock, struct tipc_msg *msg)
*
* Note: Address is not captured if not requested by receiver.
*/
-
static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
{
struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
@@ -844,7 +820,6 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
*
* Returns 0 if successful, otherwise errno
*/
-
static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
struct tipc_port *tport)
{
@@ -858,7 +833,6 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
/* Optionally capture errored message object(s) */
-
err = msg ? msg_errcode(msg) : 0;
if (unlikely(err)) {
anc_data[0] = err;
@@ -875,7 +849,6 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
}
/* Optionally capture message destination object */
-
dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
switch (dest_type) {
case TIPC_NAMED_MSG:
@@ -920,7 +893,6 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
*
* Returns size of returned message data, errno otherwise
*/
-
static int recv_msg(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t buf_len, int flags)
{
@@ -934,7 +906,6 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
int res;
/* Catch invalid receive requests */
-
if (unlikely(!buf_len))
return -EINVAL;
@@ -949,7 +920,6 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
restart:
/* Look for a message in receive queue; wait if necessary */
-
while (skb_queue_empty(&sk->sk_receive_queue)) {
if (sock->state == SS_DISCONNECTING) {
res = -ENOTCONN;
@@ -967,14 +937,12 @@ restart:
}
/* Look at first message in receive queue */
-
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
err = msg_errcode(msg);
/* Complete connection setup for an implied connect */
-
if (unlikely(sock->state == SS_CONNECTING)) {
res = auto_connect(sock, msg);
if (res)
@@ -982,24 +950,20 @@ restart:
}
/* Discard an empty non-errored message & try again */
-
if ((!sz) && (!err)) {
advance_rx_queue(sk);
goto restart;
}
/* Capture sender's address (optional) */
-
set_orig_addr(m, msg);
/* Capture ancillary data (optional) */
-
res = anc_data_recv(m, msg, tport);
if (res)
goto exit;
/* Capture message data (if valid) & compute return value (always) */
-
if (!err) {
if (unlikely(buf_len < sz)) {
sz = buf_len;
@@ -1019,7 +983,6 @@ restart:
}
/* Consume received message (optional) */
-
if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) &&
(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
@@ -1043,7 +1006,6 @@ exit:
*
* Returns size of returned message data, errno otherwise
*/
-
static int recv_stream(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t buf_len, int flags)
{
@@ -1059,7 +1021,6 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
int res = 0;
/* Catch invalid receive attempts */
-
if (unlikely(!buf_len))
return -EINVAL;
@@ -1073,10 +1034,9 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
-restart:
+restart:
/* Look for a message in receive queue; wait if necessary */
-
while (skb_queue_empty(&sk->sk_receive_queue)) {
if (sock->state == SS_DISCONNECTING) {
res = -ENOTCONN;
@@ -1094,21 +1054,18 @@ restart:
}
/* Look at first message in receive queue */
-
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
-
if ((!sz) && (!err)) {
advance_rx_queue(sk);
goto restart;
}
/* Optionally capture sender's address & ancillary data of first msg */
-
if (sz_copied == 0) {
set_orig_addr(m, msg);
res = anc_data_recv(m, msg, tport);
@@ -1117,7 +1074,6 @@ restart:
}
/* Capture message data (if valid) & compute return value (always) */
-
if (!err) {
u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
@@ -1149,7 +1105,6 @@ restart:
}
/* Consume received message (optional) */
-
if (likely(!(flags & MSG_PEEK))) {
if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
tipc_acknowledge(tport->ref, tport->conn_unacked);
@@ -1157,7 +1112,6 @@ restart:
}
/* Loop around if more data is required */
-
if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */
@@ -1178,7 +1132,6 @@ exit:
*
* Returns 1 if queue is unable to accept message, 0 otherwise
*/
-
static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
{
u32 threshold;
@@ -1211,7 +1164,6 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
*
* Returns TIPC error status code (TIPC_OK if message is not to be rejected)
*/
-
static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
{
struct socket *sock = sk->sk_socket;
@@ -1219,12 +1171,8 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
u32 recv_q_len;
/* Reject message if it is wrong sort of message for socket */
-
- /*
- * WOULD IT BE BETTER TO JUST DISCARD THESE MESSAGES INSTEAD?
- * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY
- * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC
- */
+ if (msg_type(msg) > TIPC_DIRECT_MSG)
+ return TIPC_ERR_NO_PORT;
if (sock->state == SS_READY) {
if (msg_connected(msg))
@@ -1233,7 +1181,8 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
if (msg_mcast(msg))
return TIPC_ERR_NO_PORT;
if (sock->state == SS_CONNECTED) {
- if (!msg_connected(msg))
+ if (!msg_connected(msg) ||
+ !tipc_port_peer_msg(tipc_sk_port(sk), msg))
return TIPC_ERR_NO_PORT;
} else if (sock->state == SS_CONNECTING) {
if (!msg_connected(msg) && (msg_errcode(msg) == 0))
@@ -1250,7 +1199,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
}
/* Reject message if there isn't room to queue it */
-
recv_q_len = (u32)atomic_read(&tipc_queue_size);
if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
@@ -1263,13 +1211,11 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
}
/* Enqueue message (finally!) */
-
TIPC_SKB_CB(buf)->handle = 0;
atomic_inc(&tipc_queue_size);
__skb_queue_tail(&sk->sk_receive_queue, buf);
/* Initiate connection termination for an incoming 'FIN' */
-
if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
sock->state = SS_DISCONNECTING;
tipc_disconnect_port(tipc_sk_port(sk));
@@ -1289,7 +1235,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
*
* Returns 0
*/
-
static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
{
u32 res;
@@ -1309,7 +1254,6 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
*
* Returns TIPC error status code (TIPC_OK if message is not to be rejected)
*/
-
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
{
struct sock *sk = (struct sock *)tport->usr_handle;
@@ -1321,12 +1265,11 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
* This code is based on sk_receive_skb(), but must be distinct from it
* since a TIPC-specific filter/reject mechanism is utilized
*/
-
bh_lock_sock(sk);
if (!sock_owned_by_user(sk)) {
res = filter_rcv(sk, buf);
} else {
- if (sk_add_backlog(sk, buf))
+ if (sk_add_backlog(sk, buf, sk->sk_rcvbuf))
res = TIPC_ERR_OVERLOAD;
else
res = TIPC_OK;
@@ -1342,7 +1285,6 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
*
* Called with port lock already taken.
*/
-
static void wakeupdispatch(struct tipc_port *tport)
{
struct sock *sk = (struct sock *)tport->usr_handle;
@@ -1360,7 +1302,6 @@ static void wakeupdispatch(struct tipc_port *tport)
*
* Returns 0 on success, errno otherwise
*/
-
static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
int flags)
{
@@ -1375,21 +1316,18 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
lock_sock(sk);
/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
-
if (sock->state == SS_READY) {
res = -EOPNOTSUPP;
goto exit;
}
/* For now, TIPC does not support the non-blocking form of connect() */
-
if (flags & O_NONBLOCK) {
res = -EOPNOTSUPP;
goto exit;
}
/* Issue Posix-compliant error code if socket is in the wrong state */
-
if (sock->state == SS_LISTENING) {
res = -EOPNOTSUPP;
goto exit;
@@ -1409,18 +1347,15 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
* Note: send_msg() validates the rest of the address fields,
* so there's no need to do it here
*/
-
if (dst->addrtype == TIPC_ADDR_MCAST) {
res = -EINVAL;
goto exit;
}
/* Reject any messages already in receive queue (very unlikely) */
-
reject_rx_queue(sk);
/* Send a 'SYN-' to destination */
-
m.msg_name = dest;
m.msg_namelen = destlen;
res = send_msg(NULL, sock, &m, 0);
@@ -1428,7 +1363,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
goto exit;
/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
-
timeout = tipc_sk(sk)->conn_timeout;
release_sock(sk);
res = wait_event_interruptible_timeout(*sk_sleep(sk),
@@ -1473,7 +1407,6 @@ exit:
*
* Returns 0 on success, errno otherwise
*/
-
static int listen(struct socket *sock, int len)
{
struct sock *sk = sock->sk;
@@ -1500,7 +1433,6 @@ static int listen(struct socket *sock, int len)
*
* Returns 0 on success, errno otherwise
*/
-
static int accept(struct socket *sock, struct socket *new_sock, int flags)
{
struct sock *sk = sock->sk;
@@ -1543,11 +1475,9 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
* Reject any stray messages received by new socket
* before the socket lock was taken (very, very unlikely)
*/
-
reject_rx_queue(new_sk);
/* Connect new socket to it's peer */
-
new_tsock->peer_name.ref = msg_origport(msg);
new_tsock->peer_name.node = msg_orignode(msg);
tipc_connect2port(new_ref, &new_tsock->peer_name);
@@ -1563,7 +1493,6 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
* Respond to 'SYN-' by discarding it & returning 'ACK'-.
* Respond to 'SYN+' by queuing it on new socket.
*/
-
if (!msg_data_sz(msg)) {
struct msghdr m = {NULL,};
@@ -1589,7 +1518,6 @@ exit:
*
* Returns 0 on success, errno otherwise
*/
-
static int shutdown(struct socket *sock, int how)
{
struct sock *sk = sock->sk;
@@ -1606,13 +1534,13 @@ static int shutdown(struct socket *sock, int how)
case SS_CONNECTING:
case SS_CONNECTED:
- /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
restart:
+ /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf) {
atomic_dec(&tipc_queue_size);
if (TIPC_SKB_CB(buf)->handle != 0) {
- buf_discard(buf);
+ kfree_skb(buf);
goto restart;
}
tipc_disconnect(tport->ref);
@@ -1628,7 +1556,6 @@ restart:
case SS_DISCONNECTING:
/* Discard any unreceived messages; wake up sleeping tasks */
-
discard_rx_queue(sk);
if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk));
@@ -1656,7 +1583,6 @@ restart:
*
* Returns 0 on success, errno otherwise
*/
-
static int setsockopt(struct socket *sock,
int lvl, int opt, char __user *ov, unsigned int ol)
{
@@ -1716,7 +1642,6 @@ static int setsockopt(struct socket *sock,
*
* Returns 0 on success, errno otherwise
*/
-
static int getsockopt(struct socket *sock,
int lvl, int opt, char __user *ov, int __user *ol)
{
@@ -1777,7 +1702,6 @@ static int getsockopt(struct socket *sock,
/**
* Protocol switches for the various types of TIPC sockets
*/
-
static const struct proto_ops msg_ops = {
.owner = THIS_MODULE,
.family = AF_TIPC,
@@ -1883,7 +1807,6 @@ int tipc_socket_init(void)
/**
* tipc_socket_stop - stop TIPC socket interface
*/
-
void tipc_socket_stop(void)
{
if (!sockets_enabled)
@@ -1893,4 +1816,3 @@ void tipc_socket_stop(void)
sock_unregister(tipc_family_ops.family);
proto_unregister(&tipc_proto);
}
-
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 8c49566da8f..f976e9cd6a7 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -46,7 +46,6 @@
* @subscriber_list: adjacent subscribers in top. server's list of subscribers
* @subscription_list: list of subscription objects for this subscriber
*/
-
struct tipc_subscriber {
u32 port_ref;
spinlock_t *lock;
@@ -56,13 +55,11 @@ struct tipc_subscriber {
/**
* struct top_srv - TIPC network topology subscription service
- * @user_ref: TIPC userid of subscription service
* @setup_port: reference to TIPC port that handles subscription requests
* @subscription_count: number of active subscriptions (not subscribers!)
* @subscriber_list: list of ports subscribing to service
* @lock: spinlock govering access to subscriber list
*/
-
struct top_srv {
u32 setup_port;
atomic_t subscription_count;
@@ -79,7 +76,6 @@ static struct top_srv topsrv;
*
* Returns converted value
*/
-
static u32 htohl(u32 in, int swap)
{
return swap ? swab32(in) : in;
@@ -91,7 +87,6 @@ static u32 htohl(u32 in, int swap)
* Note: Must not hold subscriber's server port lock, since tipc_send() will
* try to take the lock if the message is rejected and returned!
*/
-
static void subscr_send_event(struct tipc_subscription *sub,
u32 found_lower,
u32 found_upper,
@@ -117,7 +112,6 @@ static void subscr_send_event(struct tipc_subscription *sub,
*
* Returns 1 if there is overlap, otherwise 0.
*/
-
int tipc_subscr_overlap(struct tipc_subscription *sub,
u32 found_lower,
u32 found_upper)
@@ -137,7 +131,6 @@ int tipc_subscr_overlap(struct tipc_subscription *sub,
*
* Protected by nameseq.lock in name_table.c
*/
-
void tipc_subscr_report_overlap(struct tipc_subscription *sub,
u32 found_lower,
u32 found_upper,
@@ -157,43 +150,35 @@ void tipc_subscr_report_overlap(struct tipc_subscription *sub,
/**
* subscr_timeout - subscription timeout has occurred
*/
-
static void subscr_timeout(struct tipc_subscription *sub)
{
struct tipc_port *server_port;
/* Validate server port reference (in case subscriber is terminating) */
-
server_port = tipc_port_lock(sub->server_ref);
if (server_port == NULL)
return;
/* Validate timeout (in case subscription is being cancelled) */
-
if (sub->timeout == TIPC_WAIT_FOREVER) {
tipc_port_unlock(server_port);
return;
}
/* Unlink subscription from name table */
-
tipc_nametbl_unsubscribe(sub);
/* Unlink subscription from subscriber */
-
list_del(&sub->subscription_list);
/* Release subscriber's server port */
-
tipc_port_unlock(server_port);
/* Notify subscriber of timeout */
-
subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
/* Now destroy subscription */
-
k_term_timer(&sub->timer);
kfree(sub);
atomic_dec(&topsrv.subscription_count);
@@ -204,7 +189,6 @@ static void subscr_timeout(struct tipc_subscription *sub)
*
* Called with subscriber port locked.
*/
-
static void subscr_del(struct tipc_subscription *sub)
{
tipc_nametbl_unsubscribe(sub);
@@ -223,7 +207,6 @@ static void subscr_del(struct tipc_subscription *sub)
* a new object reference in the interim that uses this lock; this routine will
* simply wait for it to be released, then claim it.)
*/
-
static void subscr_terminate(struct tipc_subscriber *subscriber)
{
u32 port_ref;
@@ -231,18 +214,15 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
struct tipc_subscription *sub_temp;
/* Invalidate subscriber reference */
-
port_ref = subscriber->port_ref;
subscriber->port_ref = 0;
spin_unlock_bh(subscriber->lock);
/* Sever connection to subscriber */
-
tipc_shutdown(port_ref);
tipc_deleteport(port_ref);
/* Destroy any existing subscriptions for subscriber */
-
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
subscription_list) {
if (sub->timeout != TIPC_WAIT_FOREVER) {
@@ -253,17 +233,14 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
}
/* Remove subscriber from topology server's subscriber list */
-
spin_lock_bh(&topsrv.lock);
list_del(&subscriber->subscriber_list);
spin_unlock_bh(&topsrv.lock);
/* Reclaim subscriber lock */
-
spin_lock_bh(subscriber->lock);
/* Now destroy subscriber */
-
kfree(subscriber);
}
@@ -276,7 +253,6 @@ static void subscr_terminate(struct tipc_subscriber *subscriber)
*
* Note that fields of 's' use subscriber's endianness!
*/
-
static void subscr_cancel(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
{
@@ -285,7 +261,6 @@ static void subscr_cancel(struct tipc_subscr *s,
int found = 0;
/* Find first matching subscription, exit if not found */
-
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
subscription_list) {
if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
@@ -297,7 +272,6 @@ static void subscr_cancel(struct tipc_subscr *s,
return;
/* Cancel subscription timer (if used), then delete subscription */
-
if (sub->timeout != TIPC_WAIT_FOREVER) {
sub->timeout = TIPC_WAIT_FOREVER;
spin_unlock_bh(subscriber->lock);
@@ -313,7 +287,6 @@ static void subscr_cancel(struct tipc_subscr *s,
*
* Called with subscriber port locked.
*/
-
static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
{
@@ -321,11 +294,9 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
int swap;
/* Determine subscriber's endianness */
-
swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
/* Detect & process a subscription cancellation request */
-
if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
subscr_cancel(s, subscriber);
@@ -333,7 +304,6 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
/* Refuse subscription if global limit exceeded */
-
if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) {
warn("Subscription rejected, subscription limit reached (%u)\n",
tipc_max_subscriptions);
@@ -342,7 +312,6 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
/* Allocate subscription object */
-
sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
if (!sub) {
warn("Subscription rejected, no memory\n");
@@ -351,7 +320,6 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
}
/* Initialize subscription object */
-
sub->seq.type = htohl(s->seq.type, swap);
sub->seq.lower = htohl(s->seq.lower, swap);
sub->seq.upper = htohl(s->seq.upper, swap);
@@ -385,7 +353,6 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
*
* Called with subscriber's server port unlocked.
*/
-
static void subscr_conn_shutdown_event(void *usr_handle,
u32 port_ref,
struct sk_buff **buf,
@@ -409,7 +376,6 @@ static void subscr_conn_shutdown_event(void *usr_handle,
*
* Called with subscriber's server port unlocked.
*/
-
static void subscr_conn_msg_event(void *usr_handle,
u32 port_ref,
struct sk_buff **buf,
@@ -424,7 +390,6 @@ static void subscr_conn_msg_event(void *usr_handle,
* Lock subscriber's server port (& make a local copy of lock pointer,
* in case subscriber is deleted while processing subscription request)
*/
-
if (tipc_port_lock(port_ref) == NULL)
return;
@@ -452,7 +417,6 @@ static void subscr_conn_msg_event(void *usr_handle,
* timeout code cannot delete the subscription,
* so the subscription object is still protected.
*/
-
tipc_nametbl_subscribe(sub);
}
}
@@ -461,7 +425,6 @@ static void subscr_conn_msg_event(void *usr_handle,
/**
* subscr_named_msg_event - handle request to establish a new subscriber
*/
-
static void subscr_named_msg_event(void *usr_handle,
u32 port_ref,
struct sk_buff **buf,
@@ -475,7 +438,6 @@ static void subscr_named_msg_event(void *usr_handle,
u32 server_port_ref;
/* Create subscriber object */
-
subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
if (subscriber == NULL) {
warn("Subscriber rejected, no memory\n");
@@ -485,7 +447,6 @@ static void subscr_named_msg_event(void *usr_handle,
INIT_LIST_HEAD(&subscriber->subscriber_list);
/* Create server port & establish connection to subscriber */
-
tipc_createport(subscriber,
importance,
NULL,
@@ -504,26 +465,21 @@ static void subscr_named_msg_event(void *usr_handle,
tipc_connect2port(subscriber->port_ref, orig);
/* Lock server port (& save lock address for future use) */
-
subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
/* Add subscriber to topology server's subscriber list */
-
spin_lock_bh(&topsrv.lock);
list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
spin_unlock_bh(&topsrv.lock);
/* Unlock server port */
-
server_port_ref = subscriber->port_ref;
spin_unlock_bh(subscriber->lock);
/* Send an ACK- to complete connection handshaking */
-
tipc_send(server_port_ref, 0, NULL, 0);
/* Handle optional subscription request */
-
if (size != 0) {
subscr_conn_msg_event(subscriber, server_port_ref,
buf, data, size);
@@ -535,7 +491,6 @@ int tipc_subscr_start(void)
struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
int res;
- memset(&topsrv, 0, sizeof(topsrv));
spin_lock_init(&topsrv.lock);
INIT_LIST_HEAD(&topsrv.subscriber_list);
@@ -552,7 +507,7 @@ int tipc_subscr_start(void)
if (res)
goto failed;
- res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
+ res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
if (res) {
tipc_deleteport(topsrv.setup_port);
topsrv.setup_port = 0;
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index ef6529c8456..218d2e07f0c 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -51,7 +51,6 @@ struct tipc_subscription;
* @swap: indicates if subscriber uses opposite endianness in its messages
* @evt: template for events generated by subscription
*/
-
struct tipc_subscription {
struct tipc_name_seq seq;
u32 timeout;
@@ -80,5 +79,4 @@ int tipc_subscr_start(void);
void tipc_subscr_stop(void);
-
#endif