diff options
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 1383 |
1 files changed, 1055 insertions, 328 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index ef0475568f9..75275c5cf92 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -35,21 +35,84 @@ */ #include "core.h" -#include "port.h" +#include "name_table.h" #include "node.h" - +#include "link.h" #include <linux/export.h> +#include "config.h" +#include "socket.h" #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ -#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ +#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ +#define CONN_PROBING_INTERVAL 3600000 /* [ms] => 1 h */ +#define TIPC_FWD_MSG 1 +#define TIPC_CONN_OK 0 +#define TIPC_CONN_PROBING 1 + +/** + * struct tipc_sock - TIPC socket structure + * @sk: socket - interacts with 'port' and with user via the socket API + * @connected: non-zero if port is currently connected to a peer port + * @conn_type: TIPC type used when connection was established + * @conn_instance: TIPC instance used when connection was established + * @published: non-zero if port has one or more associated names + * @max_pkt: maximum packet size "hint" used when building messages sent by port + * @ref: unique reference to port in TIPC object registry + * @phdr: preformatted message header used when sending messages + * @port_list: adjacent ports in TIPC's global list of ports + * @publications: list of publications for port + * @pub_count: total # of publications port has made during its lifetime + * @probing_state: + * @probing_interval: + * @timer: + * @port: port - interacts with 'sk' and with the rest of the TIPC stack + * @peer_name: the peer of the connection, if any + * @conn_timeout: the time we can wait for an unresponded setup request + * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue + * @link_cong: non-zero if owner must sleep because of link congestion + * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @rcv_unacked: # messages read by user, but not yet acked back to peer + */ +struct tipc_sock { + struct sock sk; + int connected; + u32 conn_type; + u32 conn_instance; + int published; + u32 max_pkt; + u32 ref; + struct tipc_msg phdr; + struct list_head sock_list; + struct list_head publications; + u32 pub_count; + u32 probing_state; + u32 probing_interval; + struct timer_list timer; + uint conn_timeout; + atomic_t dupl_rcvcnt; + bool link_cong; + uint sent_unacked; + uint rcv_unacked; +}; static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); +static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); +static void tipc_sk_timeout(unsigned long ref); +static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq); +static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq); +static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk); +static void tipc_sk_ref_discard(u32 ref); +static struct tipc_sock *tipc_sk_get(u32 ref); +static struct tipc_sock *tipc_sk_get_next(u32 *ref); +static void tipc_sk_put(struct tipc_sock *tsk); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -103,29 +166,115 @@ static struct proto tipc_proto_kern; * - port reference */ -#include "socket.h" +static u32 tsk_peer_node(struct tipc_sock *tsk) +{ + return msg_destnode(&tsk->phdr); +} + +static u32 tsk_peer_port(struct tipc_sock *tsk) +{ + return msg_destport(&tsk->phdr); +} + +static bool tsk_unreliable(struct tipc_sock *tsk) +{ + return msg_src_droppable(&tsk->phdr) != 0; +} + +static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable) +{ + msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0); +} + +static bool tsk_unreturnable(struct tipc_sock *tsk) +{ + return msg_dest_droppable(&tsk->phdr) != 0; +} + +static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable) +{ + msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0); +} + +static int tsk_importance(struct tipc_sock *tsk) +{ + return msg_importance(&tsk->phdr); +} + +static int tsk_set_importance(struct tipc_sock *tsk, int imp) +{ + if (imp > TIPC_CRITICAL_IMPORTANCE) + return -EINVAL; + msg_set_importance(&tsk->phdr, (u32)imp); + return 0; +} + +static struct tipc_sock *tipc_sk(const struct sock *sk) +{ + return container_of(sk, struct tipc_sock, sk); +} + +static int tsk_conn_cong(struct tipc_sock *tsk) +{ + return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; +} /** - * advance_rx_queue - discard first buffer in socket receive queue + * tsk_advance_rx_queue - discard first buffer in socket receive queue * * Caller must hold socket lock */ -static void advance_rx_queue(struct sock *sk) +static void tsk_advance_rx_queue(struct sock *sk) { kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); } /** - * reject_rx_queue - reject all buffers in socket receive queue + * tsk_rej_rx_queue - reject all buffers in socket receive queue * * Caller must hold socket lock */ -static void reject_rx_queue(struct sock *sk) +static void tsk_rej_rx_queue(struct sock *sk) { struct sk_buff *buf; + u32 dnode; + + while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit(buf, dnode, 0); + } +} + +/* tsk_peer_msg - verify if message was sent by connected port's peer + * + * Handles cases where the node's network address has changed from + * the default of <0.0.0> to its configured setting. + */ +static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg) +{ + u32 peer_port = tsk_peer_port(tsk); + u32 orig_node; + u32 peer_node; + + if (unlikely(!tsk->connected)) + return false; - while ((buf = __skb_dequeue(&sk->sk_receive_queue))) - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); + if (unlikely(msg_origport(msg) != peer_port)) + return false; + + orig_node = msg_orignode(msg); + peer_node = tsk_peer_node(tsk); + + if (likely(orig_node == peer_node)) + return true; + + if (!orig_node && (peer_node == tipc_own_addr)) + return true; + + if (!peer_node && (orig_node == tipc_own_addr)) + return true; + + return false; } /** @@ -147,7 +296,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, socket_state state; struct sock *sk; struct tipc_sock *tsk; - struct tipc_port *port; + struct tipc_msg *msg; u32 ref; /* Validate arguments */ @@ -182,32 +331,36 @@ static int tipc_sk_create(struct net *net, struct socket *sock, return -ENOMEM; tsk = tipc_sk(sk); - port = &tsk->port; - - ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE); + ref = tipc_sk_ref_acquire(tsk); if (!ref) { - pr_warn("Socket registration failed, ref. table exhausted\n"); - sk_free(sk); + pr_warn("Socket create failed; reference table exhausted\n"); return -ENOMEM; } + tsk->max_pkt = MAX_PKT_DEFAULT; + tsk->ref = ref; + INIT_LIST_HEAD(&tsk->publications); + msg = &tsk->phdr; + tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, + NAMED_H_SIZE, 0); + msg_set_origport(msg, ref); /* Finish initializing socket data structures */ sock->ops = ops; sock->state = state; - sock_init_data(sock, sk); + k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref); sk->sk_backlog_rcv = tipc_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; + tsk->sent_unacked = 0; atomic_set(&tsk->dupl_rcvcnt, 0); - tipc_port_unlock(port); if (sock->state == SS_READY) { - tipc_port_set_unreturnable(port, true); + tsk_set_unreturnable(tsk, true); if (sock->type == SOCK_DGRAM) - tipc_port_set_unreliable(port, true); + tsk_set_unreliable(tsk, true); } return 0; } @@ -301,8 +454,8 @@ static int tipc_release(struct socket *sock) { struct sock *sk = sock->sk; struct tipc_sock *tsk; - struct tipc_port *port; struct sk_buff *buf; + u32 dnode; /* * Exit if socket isn't fully initialized (occurs when a failed accept() @@ -312,13 +465,13 @@ static int tipc_release(struct socket *sock) return 0; tsk = tipc_sk(sk); - port = &tsk->port; lock_sock(sk); /* * Reject all unreceived messages, except on an active connection * (which disconnects locally & sends a 'FIN+' to peer) */ + dnode = tsk_peer_node(tsk); while (sock->state != SS_DISCONNECTING) { buf = __skb_dequeue(&sk->sk_receive_queue); if (buf == NULL) @@ -329,16 +482,27 @@ static int tipc_release(struct socket *sock) if ((sock->state == SS_CONNECTING) || (sock->state == SS_CONNECTED)) { sock->state = SS_DISCONNECTING; - tipc_port_disconnect(port->ref); + tsk->connected = 0; + tipc_node_remove_conn(dnode, tsk->ref); } - tipc_reject_msg(buf, TIPC_ERR_NO_PORT); + if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) + tipc_link_xmit(buf, dnode, 0); } } - /* Destroy TIPC port; also disconnects an active connection and - * sends a 'FIN-' to peer. - */ - tipc_port_destroy(port); + tipc_sk_withdraw(tsk, 0, NULL); + tipc_sk_ref_discard(tsk->ref); + k_cancel_timer(&tsk->timer); + if (tsk->connected) { + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + SHORT_H_SIZE, 0, dnode, tipc_own_addr, + tsk_peer_port(tsk), + tsk->ref, TIPC_ERR_NO_PORT); + if (buf) + tipc_link_xmit(buf, dnode, tsk->ref); + tipc_node_remove_conn(dnode, tsk->ref); + } + k_term_timer(&tsk->timer); /* Discard any remaining (connection-based) messages in receive queue */ __skb_queue_purge(&sk->sk_receive_queue); @@ -346,7 +510,6 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ sock->state = SS_DISCONNECTING; release_sock(sk); - sock_put(sk); sock->sk = NULL; @@ -378,7 +541,7 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, lock_sock(sk); if (unlikely(!uaddr_len)) { - res = tipc_withdraw(&tsk->port, 0, NULL); + res = tipc_sk_withdraw(tsk, 0, NULL); goto exit; } @@ -406,8 +569,8 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, } res = (addr->scope > 0) ? - tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) : - tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq); + tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) : + tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq); exit: release_sock(sk); return res; @@ -437,10 +600,10 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr, if ((sock->state != SS_CONNECTED) && ((peer != 2) || (sock->state != SS_DISCONNECTING))) return -ENOTCONN; - addr->addr.id.ref = tipc_port_peerport(&tsk->port); - addr->addr.id.node = tipc_port_peernode(&tsk->port); + addr->addr.id.ref = tsk_peer_port(tsk); + addr->addr.id.node = tsk_peer_node(tsk); } else { - addr->addr.id.ref = tsk->port.ref; + addr->addr.id.ref = tsk->ref; addr->addr.id.node = tipc_own_addr; } @@ -504,12 +667,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch ((int)sock->state) { case SS_UNCONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong) mask |= POLLOUT; break; case SS_READY: case SS_CONNECTED: - if (!tsk->port.congested) + if (!tsk->link_cong && !tsk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case SS_CONNECTING: @@ -526,6 +689,136 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, } /** + * tipc_sendmcast - send multicast message + * @sock: socket structure + * @seq: destination address + * @iov: message data to send + * @dsz: total length of message data + * @timeo: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, + struct iovec *iov, size_t dsz, long timeo) +{ + struct sock *sk = sock->sk; + struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; + struct sk_buff *buf; + uint mtu; + int rc; + + msg_set_type(mhdr, TIPC_MCAST_MSG); + msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(mhdr, 0); + msg_set_destnode(mhdr, 0); + msg_set_nametype(mhdr, seq->type); + msg_set_namelower(mhdr, seq->lower); + msg_set_nameupper(mhdr, seq->upper); + msg_set_hdr_sz(mhdr, MCAST_H_SIZE); + +new_mtu: + mtu = tipc_bclink_get_mtu(); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + if (unlikely(rc < 0)) + return rc; + + do { + rc = tipc_bclink_xmit(buf); + if (likely(rc >= 0)) { + rc = dsz; + break; + } + if (rc == -EMSGSIZE) + goto new_mtu; + if (rc != -ELINKCONG) + break; + tipc_sk(sk)->link_cong = 1; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); + return rc; +} + +/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets + */ +void tipc_sk_mcast_rcv(struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + struct tipc_port_list dports = {0, NULL, }; + struct tipc_port_list *item; + struct sk_buff *b; + uint i, last, dst = 0; + u32 scope = TIPC_CLUSTER_SCOPE; + + if (in_own_node(msg_orignode(msg))) + scope = TIPC_NODE_SCOPE; + + /* Create destination port list: */ + tipc_nametbl_mc_translate(msg_nametype(msg), + msg_namelower(msg), + msg_nameupper(msg), + scope, + &dports); + last = dports.count; + if (!last) { + kfree_skb(buf); + return; + } + + for (item = &dports; item; item = item->next) { + for (i = 0; i < PLSIZE && ++dst <= last; i++) { + b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; + if (!b) { + pr_warn("Failed do clone mcast rcv buffer\n"); + continue; + } + msg_set_destport(msg, item->ports[i]); + tipc_sk_rcv(b); + } + } + tipc_port_list_free(&dports); +} + +/** + * tipc_sk_proto_rcv - receive a connection mng protocol message + * @tsk: receiving socket + * @dnode: node to send response message to, if any + * @buf: buffer containing protocol message + * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if + * (CONN_PROBE_REPLY) message should be forwarded. + */ +static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, + struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + int conn_cong; + + /* Ignore if connection cannot be validated: */ + if (!tsk_peer_msg(tsk, msg)) + goto exit; + + tsk->probing_state = TIPC_CONN_OK; + + if (msg_type(msg) == CONN_ACK) { + conn_cong = tsk_conn_cong(tsk); + tsk->sent_unacked -= msg_msgcnt(msg); + if (conn_cong) + tsk->sk.sk_write_space(&tsk->sk); + } else if (msg_type(msg) == CONN_PROBE) { + if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) + return TIPC_OK; + msg_set_type(msg, CONN_PROBE_REPLY); + return TIPC_FWD_MSG; + } + /* Do nothing if msg_type() == CONN_PROBE_REPLY */ +exit: + kfree_skb(buf); + return TIPC_OK; +} + +/** * dest_name_check - verify user is permitted to send to specified port name * @dest: destination address * @m: descriptor for message to be sent @@ -539,6 +832,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m) { struct tipc_cfg_msg_hdr hdr; + if (unlikely(dest->addrtype == TIPC_ADDR_ID)) + return 0; if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES)) return 0; if (likely(dest->addr.name.name.type == TIPC_TOP_SRV)) @@ -575,19 +870,18 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) return sock_intr_errno(*timeo_p); prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); - done = sk_wait_event(sk, timeo_p, !tsk->port.congested); + done = sk_wait_event(sk, timeo_p, !tsk->link_cong); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; } - /** * tipc_sendmsg - send message in connectionless manner * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure * @m: message to send - * @total_len: length of message + * @dsz: amount of user data to be sent * * Message must have an destination specified explicitly. * Used for SOCK_RDM and SOCK_DGRAM messages, @@ -597,100 +891,122 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) * Returns the number of bytes sent on success, or errno otherwise */ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) + struct msghdr *m, size_t dsz) { + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - int needs_conn; + struct tipc_msg *mhdr = &tsk->phdr; + struct iovec *iov = m->msg_iov; + u32 dnode, dport; + struct sk_buff *buf; + struct tipc_name_seq *seq = &dest->addr.nameseq; + u32 mtu; long timeo; - int res = -EINVAL; + int rc = -EINVAL; if (unlikely(!dest)) return -EDESTADDRREQ; + if (unlikely((m->msg_namelen < sizeof(*dest)) || (dest->family != AF_TIPC))) return -EINVAL; - if (total_len > TIPC_MAX_USER_MSG_SIZE) + + if (dsz > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; if (iocb) lock_sock(sk); - needs_conn = (sock->state != SS_READY); - if (unlikely(needs_conn)) { + if (unlikely(sock->state != SS_READY)) { if (sock->state == SS_LISTENING) { - res = -EPIPE; + rc = -EPIPE; goto exit; } if (sock->state != SS_UNCONNECTED) { - res = -EISCONN; + rc = -EISCONN; goto exit; } - if (tsk->port.published) { - res = -EOPNOTSUPP; + if (tsk->published) { + rc = -EOPNOTSUPP; goto exit; } if (dest->addrtype == TIPC_ADDR_NAME) { - tsk->port.conn_type = dest->addr.name.name.type; - tsk->port.conn_instance = dest->addr.name.name.instance; + tsk->conn_type = dest->addr.name.name.type; + tsk->conn_instance = dest->addr.name.name.instance; } - - /* Abort any pending connection attempts (very unlikely) */ - reject_rx_queue(sk); } + rc = dest_name_check(dest, m); + if (rc) + goto exit; timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - do { - if (dest->addrtype == TIPC_ADDR_NAME) { - res = dest_name_check(dest, m); - if (res) - break; - res = tipc_send2name(port, - &dest->addr.name.name, - dest->addr.name.domain, - m->msg_iov, - total_len); - } else if (dest->addrtype == TIPC_ADDR_ID) { - res = tipc_send2port(port, - &dest->addr.id, - m->msg_iov, - total_len); - } else if (dest->addrtype == TIPC_ADDR_MCAST) { - if (needs_conn) { - res = -EOPNOTSUPP; - break; - } - res = dest_name_check(dest, m); - if (res) - break; - res = tipc_port_mcast_xmit(port, - &dest->addr.nameseq, - m->msg_iov, - total_len); + + if (dest->addrtype == TIPC_ADDR_MCAST) { + rc = tipc_sendmcast(sock, seq, iov, dsz, timeo); + goto exit; + } else if (dest->addrtype == TIPC_ADDR_NAME) { + u32 type = dest->addr.name.name.type; + u32 inst = dest->addr.name.name.instance; + u32 domain = dest->addr.name.domain; + + dnode = domain; + msg_set_type(mhdr, TIPC_NAMED_MSG); + msg_set_hdr_sz(mhdr, NAMED_H_SIZE); + msg_set_nametype(mhdr, type); + msg_set_nameinst(mhdr, inst); + msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); + dport = tipc_nametbl_translate(type, inst, &dnode); + msg_set_destnode(mhdr, dnode); + msg_set_destport(mhdr, dport); + if (unlikely(!dport && !dnode)) { + rc = -EHOSTUNREACH; + goto exit; } - if (likely(res != -ELINKCONG)) { - if (needs_conn && (res >= 0)) + } else if (dest->addrtype == TIPC_ADDR_ID) { + dnode = dest->addr.id.node; + msg_set_type(mhdr, TIPC_DIRECT_MSG); + msg_set_lookup_scope(mhdr, 0); + msg_set_destnode(mhdr, dnode); + msg_set_destport(mhdr, dest->addr.id.ref); + msg_set_hdr_sz(mhdr, BASIC_H_SIZE); + } + +new_mtu: + mtu = tipc_node_get_mtu(dnode, tsk->ref); + rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf); + if (rc < 0) + goto exit; + + do { + TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong; + rc = tipc_link_xmit(buf, dnode, tsk->ref); + if (likely(rc >= 0)) { + if (sock->state != SS_READY) sock->state = SS_CONNECTING; + rc = dsz; break; } - res = tipc_wait_for_sndmsg(sock, &timeo); - if (res) + if (rc == -EMSGSIZE) + goto new_mtu; + if (rc != -ELINKCONG) break; - } while (1); - + tsk->link_cong = 1; + rc = tipc_wait_for_sndmsg(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); exit: if (iocb) release_sock(sk); - return res; + + return rc; } static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; DEFINE_WAIT(wait); int done; @@ -709,37 +1025,48 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); done = sk_wait_event(sk, timeo_p, - (!port->congested || !port->connected)); + (!tsk->link_cong && + !tsk_conn_cong(tsk)) || + !tsk->connected); finish_wait(sk_sleep(sk), &wait); } while (!done); return 0; } /** - * tipc_send_packet - send a connection-oriented message - * @iocb: if NULL, indicates that socket lock is already held + * tipc_send_stream - send stream-oriented data + * @iocb: (unused) * @sock: socket structure - * @m: message to send - * @total_len: length of message + * @m: data to send + * @dsz: total length of data to be transmitted * - * Used for SOCK_SEQPACKET messages and SOCK_STREAM data. + * Used for SOCK_STREAM data. * - * Returns the number of bytes sent on success, or errno otherwise + * Returns the number of bytes sent on success (or partial success), + * or errno if no data sent */ -static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t dsz) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *mhdr = &tsk->phdr; + struct sk_buff *buf; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - int res = -EINVAL; + u32 ref = tsk->ref; + int rc = -EINVAL; long timeo; + u32 dnode; + uint mtu, send, sent = 0; /* Handle implied connection establishment */ - if (unlikely(dest)) - return tipc_sendmsg(iocb, sock, m, total_len); - - if (total_len > TIPC_MAX_USER_MSG_SIZE) + if (unlikely(dest)) { + rc = tipc_sendmsg(iocb, sock, m, dsz); + if (dsz && (dsz == rc)) + tsk->sent_unacked = 1; + return rc; + } + if (dsz > (uint)INT_MAX) return -EMSGSIZE; if (iocb) @@ -747,148 +1074,88 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, if (unlikely(sock->state != SS_CONNECTED)) { if (sock->state == SS_DISCONNECTING) - res = -EPIPE; + rc = -EPIPE; else - res = -ENOTCONN; + rc = -ENOTCONN; goto exit; } timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + dnode = tsk_peer_node(tsk); + +next: + mtu = tsk->max_pkt; + send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); + rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf); + if (unlikely(rc < 0)) + goto exit; do { - res = tipc_send(&tsk->port, m->msg_iov, total_len); - if (likely(res != -ELINKCONG)) - break; - res = tipc_wait_for_sndpkt(sock, &timeo); - if (res) - break; - } while (1); + if (likely(!tsk_conn_cong(tsk))) { + rc = tipc_link_xmit(buf, dnode, ref); + if (likely(!rc)) { + tsk->sent_unacked++; + sent += send; + if (sent == dsz) + break; + goto next; + } + if (rc == -EMSGSIZE) { + tsk->max_pkt = tipc_node_get_mtu(dnode, ref); + goto next; + } + if (rc != -ELINKCONG) + break; + tsk->link_cong = 1; + } + rc = tipc_wait_for_sndpkt(sock, &timeo); + if (rc) + kfree_skb_list(buf); + } while (!rc); exit: if (iocb) release_sock(sk); - return res; + return sent ? sent : rc; } /** - * tipc_send_stream - send stream-oriented data - * @iocb: (unused) + * tipc_send_packet - send a connection-oriented message + * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure - * @m: data to send - * @total_len: total length of data to be sent + * @m: message to send + * @dsz: length of data to be transmitted * - * Used for SOCK_STREAM data. + * Used for SOCK_SEQPACKET messages. * - * Returns the number of bytes sent on success (or partial success), - * or errno if no data sent + * Returns the number of bytes sent on success, or errno otherwise */ -static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t dsz) { - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - struct msghdr my_msg; - struct iovec my_iov; - struct iovec *curr_iov; - int curr_iovlen; - char __user *curr_start; - u32 hdr_size; - int curr_left; - int bytes_to_send; - int bytes_sent; - int res; - - lock_sock(sk); - - /* Handle special cases where there is no connection */ - if (unlikely(sock->state != SS_CONNECTED)) { - if (sock->state == SS_UNCONNECTED) - res = tipc_send_packet(NULL, sock, m, total_len); - else - res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN; - goto exit; - } - - if (unlikely(m->msg_name)) { - res = -EISCONN; - goto exit; - } - - if (total_len > (unsigned int)INT_MAX) { - res = -EMSGSIZE; - goto exit; - } - - /* - * Send each iovec entry using one or more messages - * - * Note: This algorithm is good for the most likely case - * (i.e. one large iovec entry), but could be improved to pass sets - * of small iovec entries into send_packet(). - */ - curr_iov = m->msg_iov; - curr_iovlen = m->msg_iovlen; - my_msg.msg_iov = &my_iov; - my_msg.msg_iovlen = 1; - my_msg.msg_flags = m->msg_flags; - my_msg.msg_name = NULL; - bytes_sent = 0; - - hdr_size = msg_hdr_sz(&tsk->port.phdr); - - while (curr_iovlen--) { - curr_start = curr_iov->iov_base; - curr_left = curr_iov->iov_len; - - while (curr_left) { - bytes_to_send = tsk->port.max_pkt - hdr_size; - if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE) - bytes_to_send = TIPC_MAX_USER_MSG_SIZE; - if (curr_left < bytes_to_send) - bytes_to_send = curr_left; - my_iov.iov_base = curr_start; - my_iov.iov_len = bytes_to_send; - res = tipc_send_packet(NULL, sock, &my_msg, - bytes_to_send); - if (res < 0) { - if (bytes_sent) - res = bytes_sent; - goto exit; - } - curr_left -= bytes_to_send; - curr_start += bytes_to_send; - bytes_sent += bytes_to_send; - } + if (dsz > TIPC_MAX_USER_MSG_SIZE) + return -EMSGSIZE; - curr_iov++; - } - res = bytes_sent; -exit: - release_sock(sk); - return res; + return tipc_send_stream(iocb, sock, m, dsz); } -/** - * auto_connect - complete connection setup to a remote port - * @tsk: tipc socket structure - * @msg: peer's response message - * - * Returns 0 on success, errno otherwise +/* tipc_sk_finish_conn - complete the setup of a connection */ -static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg) +static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, + u32 peer_node) { - struct tipc_port *port = &tsk->port; - struct socket *sock = tsk->sk.sk_socket; - struct tipc_portid peer; - - peer.ref = msg_origport(msg); - peer.node = msg_orignode(msg); - - __tipc_port_connect(port->ref, port, &peer); - - if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) - return -EINVAL; - msg_set_importance(&port->phdr, (u32)msg_importance(msg)); - sock->state = SS_CONNECTED; - return 0; + struct tipc_msg *msg = &tsk->phdr; + + msg_set_destnode(msg, peer_node); + msg_set_destport(msg, peer_port); + msg_set_type(msg, TIPC_CONN_MSG); + msg_set_lookup_scope(msg, 0); + msg_set_hdr_sz(msg, SHORT_H_SIZE); + + tsk->probing_interval = CONN_PROBING_INTERVAL; + tsk->probing_state = TIPC_CONN_OK; + tsk->connected = 1; + k_start_timer(&tsk->timer, tsk->probing_interval); + tipc_node_add_conn(peer_node, tsk->ref, peer_port); + tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref); } /** @@ -915,17 +1182,17 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) } /** - * anc_data_recv - optionally capture ancillary data for received message + * tipc_sk_anc_data_recv - optionally capture ancillary data for received message * @m: descriptor for message info * @msg: received message header - * @tport: TIPC port associated with message + * @tsk: TIPC port associated with message * * Note: Ancillary data is not captured if not requested by receiver. * * Returns 0 if successful, otherwise errno */ -static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, - struct tipc_port *tport) +static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, + struct tipc_sock *tsk) { u32 anc_data[3]; u32 err; @@ -968,10 +1235,10 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, anc_data[2] = msg_nameupper(msg); break; case TIPC_CONN_MSG: - has_name = (tport->conn_type != 0); - anc_data[0] = tport->conn_type; - anc_data[1] = tport->conn_instance; - anc_data[2] = tport->conn_instance; + has_name = (tsk->conn_type != 0); + anc_data[0] = tsk->conn_type; + anc_data[1] = tsk->conn_instance; + anc_data[2] = tsk->conn_instance; break; default: has_name = 0; @@ -985,6 +1252,24 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, return 0; } +static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) +{ + struct sk_buff *buf = NULL; + struct tipc_msg *msg; + u32 peer_port = tsk_peer_port(tsk); + u32 dnode = tsk_peer_node(tsk); + + if (!tsk->connected) + return; + buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, + tipc_own_addr, peer_port, tsk->ref, TIPC_OK); + if (!buf) + return; + msg = buf_msg(buf); + msg_set_msgcnt(msg, ack); + tipc_link_xmit(buf, dnode, msg_link_selector(msg)); +} + static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) { struct sock *sk = sock->sk; @@ -1035,7 +1320,6 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; struct sk_buff *buf; struct tipc_msg *msg; long timeo; @@ -1070,7 +1354,7 @@ restart: /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); goto restart; } @@ -1078,7 +1362,7 @@ restart: set_orig_addr(m, msg); /* Capture ancillary data (optional) */ - res = anc_data_recv(m, msg, port); + res = tipc_sk_anc_data_recv(m, msg, tsk); if (res) goto exit; @@ -1104,9 +1388,11 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); - advance_rx_queue(sk); + (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_sk_send_ack(tsk, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } + tsk_advance_rx_queue(sk); } exit: release_sock(sk); @@ -1130,7 +1416,6 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; struct sk_buff *buf; struct tipc_msg *msg; long timeo; @@ -1168,14 +1453,14 @@ restart: /* Discard an empty non-errored message & try again */ if ((!sz) && (!err)) { - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); goto restart; } /* Optionally capture sender's address & ancillary data of first msg */ if (sz_copied == 0) { set_orig_addr(m, msg); - res = anc_data_recv(m, msg, port); + res = tipc_sk_anc_data_recv(m, msg, tsk); if (res) goto exit; } @@ -1213,9 +1498,11 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) - tipc_acknowledge(port->ref, port->conn_unacked); - advance_rx_queue(sk); + if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { + tipc_sk_send_ack(tsk, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + } + tsk_advance_rx_queue(sk); } /* Loop around if more data is required */ @@ -1269,18 +1556,14 @@ static void tipc_data_ready(struct sock *sk) * @tsk: TIPC socket * @msg: message * - * Returns TIPC error status code and socket error status code - * once it encounters some errors + * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise */ -static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) +static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) { struct sock *sk = &tsk->sk; - struct tipc_port *port = &tsk->port; struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(*buf); - - u32 retval = TIPC_ERR_NO_PORT; - int res; + int retval = -TIPC_ERR_NO_PORT; if (msg_mcast(msg)) return retval; @@ -1288,16 +1571,23 @@ static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) switch ((int)sock->state) { case SS_CONNECTED: /* Accept only connection-based messages sent by peer */ - if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) { + if (tsk_peer_msg(tsk, msg)) { if (unlikely(msg_errcode(msg))) { sock->state = SS_DISCONNECTING; - __tipc_port_disconnect(port); + tsk->connected = 0; + /* let timer expire on it's own */ + tipc_node_remove_conn(tsk_peer_node(tsk), + tsk->ref); } retval = TIPC_OK; } break; case SS_CONNECTING: /* Accept only ACK or NACK message */ + + if (unlikely(!msg_connected(msg))) + break; + if (unlikely(msg_errcode(msg))) { sock->state = SS_DISCONNECTING; sk->sk_err = ECONNREFUSED; @@ -1305,17 +1595,17 @@ static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) break; } - if (unlikely(!msg_connected(msg))) - break; - - res = auto_connect(tsk, msg); - if (res) { + if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { sock->state = SS_DISCONNECTING; - sk->sk_err = -res; + sk->sk_err = EINVAL; retval = TIPC_OK; break; } + tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); + msg_set_importance(&tsk->phdr, msg_importance(msg)); + sock->state = SS_CONNECTED; + /* If an incoming message is an 'ACK-', it should be * discarded here because it doesn't contain useful * data. In addition, we should try to wake up @@ -1382,32 +1672,44 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) * * Called with socket lock already taken; port lock may also be taken. * - * Returns TIPC error status code (TIPC_OK if message is not to be rejected) + * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message + * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded */ -static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) +static int filter_rcv(struct sock *sk, struct sk_buff *buf) { struct socket *sock = sk->sk_socket; struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *msg = buf_msg(buf); unsigned int limit = rcvbuf_limit(sk, buf); - u32 res = TIPC_OK; + u32 onode; + int rc = TIPC_OK; + + if (unlikely(msg_user(msg) == CONN_MANAGER)) + return tipc_sk_proto_rcv(tsk, &onode, buf); + + if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { + kfree_skb(buf); + tsk->link_cong = 0; + sk->sk_write_space(sk); + return TIPC_OK; + } /* Reject message if it is wrong sort of message for socket */ if (msg_type(msg) > TIPC_DIRECT_MSG) - return TIPC_ERR_NO_PORT; + return -TIPC_ERR_NO_PORT; if (sock->state == SS_READY) { if (msg_connected(msg)) - return TIPC_ERR_NO_PORT; + return -TIPC_ERR_NO_PORT; } else { - res = filter_connect(tsk, &buf); - if (res != TIPC_OK || buf == NULL) - return res; + rc = filter_connect(tsk, &buf); + if (rc != TIPC_OK || buf == NULL) + return rc; } /* Reject message if there isn't room to queue it */ if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) - return TIPC_ERR_OVERLOAD; + return -TIPC_ERR_OVERLOAD; /* Enqueue message */ TIPC_SKB_CB(buf)->handle = NULL; @@ -1429,16 +1731,23 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) */ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) { - u32 res; + int rc; + u32 onode; struct tipc_sock *tsk = tipc_sk(sk); uint truesize = buf->truesize; - res = filter_rcv(sk, buf); - if (unlikely(res)) - tipc_reject_msg(buf, res); + rc = filter_rcv(sk, buf); + + if (likely(!rc)) { + if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) + atomic_add(truesize, &tsk->dupl_rcvcnt); + return 0; + } + + if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) + return 0; - if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) - atomic_add(truesize, &tsk->dupl_rcvcnt); + tipc_link_xmit(buf, onode, 0); return 0; } @@ -1452,49 +1761,42 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) int tipc_sk_rcv(struct sk_buff *buf) { struct tipc_sock *tsk; - struct tipc_port *port; struct sock *sk; u32 dport = msg_destport(buf_msg(buf)); - int err = TIPC_OK; + int rc = TIPC_OK; uint limit; + u32 dnode; - /* Forward unresolved named message */ - if (unlikely(!dport)) { - tipc_net_route_msg(buf); - return 0; - } - - /* Validate destination */ - port = tipc_port_lock(dport); - if (unlikely(!port)) { - err = TIPC_ERR_NO_PORT; + /* Validate destination and message */ + tsk = tipc_sk_get(dport); + if (unlikely(!tsk)) { + rc = tipc_msg_eval(buf, &dnode); goto exit; } - - tsk = tipc_port_to_sock(port); sk = &tsk->sk; /* Queue message */ bh_lock_sock(sk); if (!sock_owned_by_user(sk)) { - err = filter_rcv(sk, buf); + rc = filter_rcv(sk, buf); } else { if (sk->sk_backlog.len == 0) atomic_set(&tsk->dupl_rcvcnt, 0); limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); if (sk_add_backlog(sk, buf, limit)) - err = TIPC_ERR_OVERLOAD; + rc = -TIPC_ERR_OVERLOAD; } - bh_unlock_sock(sk); - tipc_port_unlock(port); - - if (likely(!err)) + tipc_sk_put(tsk); + if (likely(!rc)) return 0; exit: - tipc_reject_msg(buf, err); - return -EHOSTUNREACH; + if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) + return -EHOSTUNREACH; + + tipc_link_xmit(buf, dnode, 0); + return (rc < 0) ? -EHOSTUNREACH : 0; } static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -1673,10 +1975,8 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) { struct sock *new_sk, *sk = sock->sk; struct sk_buff *buf; - struct tipc_port *new_port; + struct tipc_sock *new_tsock; struct tipc_msg *msg; - struct tipc_portid peer; - u32 new_ref; long timeo; int res; @@ -1698,8 +1998,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) goto exit; new_sk = new_sock->sk; - new_port = &tipc_sk(new_sk)->port; - new_ref = new_port->ref; + new_tsock = tipc_sk(new_sk); msg = buf_msg(buf); /* we lock on new_sk; but lockdep sees the lock on sk */ @@ -1709,18 +2008,16 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) * Reject any stray messages received by new socket * before the socket lock was taken (very, very unlikely) */ - reject_rx_queue(new_sk); + tsk_rej_rx_queue(new_sk); /* Connect new socket to it's peer */ - peer.ref = msg_origport(msg); - peer.node = msg_orignode(msg); - tipc_port_connect(new_ref, &peer); + tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg)); new_sock->state = SS_CONNECTED; - tipc_port_set_importance(new_port, msg_importance(msg)); + tsk_set_importance(new_tsock, msg_importance(msg)); if (msg_named(msg)) { - new_port->conn_type = msg_nametype(msg); - new_port->conn_instance = msg_nameinst(msg); + new_tsock->conn_type = msg_nametype(msg); + new_tsock->conn_instance = msg_nameinst(msg); } /* @@ -1730,7 +2027,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) if (!msg_data_sz(msg)) { struct msghdr m = {NULL,}; - advance_rx_queue(sk); + tsk_advance_rx_queue(sk); tipc_send_packet(NULL, new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); @@ -1756,8 +2053,8 @@ static int tipc_shutdown(struct socket *sock, int how) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; struct sk_buff *buf; + u32 dnode; int res; if (how != SHUT_RDWR) @@ -1777,14 +2074,21 @@ restart: kfree_skb(buf); goto restart; } - tipc_port_disconnect(port->ref); - tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN); + if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN)) + tipc_link_xmit(buf, dnode, tsk->ref); + tipc_node_remove_conn(dnode, tsk->ref); } else { - tipc_port_shutdown(port->ref); + dnode = tsk_peer_node(tsk); + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, + TIPC_CONN_MSG, SHORT_H_SIZE, + 0, dnode, tipc_own_addr, + tsk_peer_port(tsk), + tsk->ref, TIPC_CONN_SHUTDOWN); + tipc_link_xmit(buf, dnode, tsk->ref); } - + tsk->connected = 0; sock->state = SS_DISCONNECTING; - + tipc_node_remove_conn(dnode, tsk->ref); /* fall through */ case SS_DISCONNECTING: @@ -1805,6 +2109,432 @@ restart: return res; } +static void tipc_sk_timeout(unsigned long ref) +{ + struct tipc_sock *tsk; + struct sock *sk; + struct sk_buff *buf = NULL; + u32 peer_port, peer_node; + + tsk = tipc_sk_get(ref); + if (!tsk) + return; + + sk = &tsk->sk; + bh_lock_sock(sk); + if (!tsk->connected) { + bh_unlock_sock(sk); + goto exit; + } + peer_port = tsk_peer_port(tsk); + peer_node = tsk_peer_node(tsk); + + if (tsk->probing_state == TIPC_CONN_PROBING) { + /* Previous probe not answered -> self abort */ + buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, + SHORT_H_SIZE, 0, tipc_own_addr, + peer_node, ref, peer_port, + TIPC_ERR_NO_PORT); + } else { + buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, + 0, peer_node, tipc_own_addr, + peer_port, ref, TIPC_OK); + tsk->probing_state = TIPC_CONN_PROBING; + k_start_timer(&tsk->timer, tsk->probing_interval); + } + bh_unlock_sock(sk); + if (buf) + tipc_link_xmit(buf, peer_node, ref); +exit: + tipc_sk_put(tsk); +} + +static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq) +{ + struct publication *publ; + u32 key; + + if (tsk->connected) + return -EINVAL; + key = tsk->ref + tsk->pub_count + 1; + if (key == tsk->ref) + return -EADDRINUSE; + + publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, + scope, tsk->ref, key); + if (unlikely(!publ)) + return -EINVAL; + + list_add(&publ->pport_list, &tsk->publications); + tsk->pub_count++; + tsk->published = 1; + return 0; +} + +static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, + struct tipc_name_seq const *seq) +{ + struct publication *publ; + struct publication *safe; + int rc = -EINVAL; + + list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) { + if (seq) { + if (publ->scope != scope) + continue; + if (publ->type != seq->type) + continue; + if (publ->lower != seq->lower) + continue; + if (publ->upper != seq->upper) + break; + tipc_nametbl_withdraw(publ->type, publ->lower, + publ->ref, publ->key); + rc = 0; + break; + } + tipc_nametbl_withdraw(publ->type, publ->lower, + publ->ref, publ->key); + rc = 0; + } + if (list_empty(&tsk->publications)) + tsk->published = 0; + return rc; +} + +static int tipc_sk_show(struct tipc_sock *tsk, char *buf, + int len, int full_id) +{ + struct publication *publ; + int ret; + + if (full_id) + ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", + tipc_zone(tipc_own_addr), + tipc_cluster(tipc_own_addr), + tipc_node(tipc_own_addr), tsk->ref); + else + ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref); + + if (tsk->connected) { + u32 dport = tsk_peer_port(tsk); + u32 destnode = tsk_peer_node(tsk); + + ret += tipc_snprintf(buf + ret, len - ret, + " connected to <%u.%u.%u:%u>", + tipc_zone(destnode), + tipc_cluster(destnode), + tipc_node(destnode), dport); + if (tsk->conn_type != 0) + ret += tipc_snprintf(buf + ret, len - ret, + " via {%u,%u}", tsk->conn_type, + tsk->conn_instance); + } else if (tsk->published) { + ret += tipc_snprintf(buf + ret, len - ret, " bound to"); + list_for_each_entry(publ, &tsk->publications, pport_list) { + if (publ->lower == publ->upper) + ret += tipc_snprintf(buf + ret, len - ret, + " {%u,%u}", publ->type, + publ->lower); + else + ret += tipc_snprintf(buf + ret, len - ret, + " {%u,%u,%u}", publ->type, + publ->lower, publ->upper); + } + } + ret += tipc_snprintf(buf + ret, len - ret, "\n"); + return ret; +} + +struct sk_buff *tipc_sk_socks_show(void) +{ + struct sk_buff *buf; + struct tlv_desc *rep_tlv; + char *pb; + int pb_len; + struct tipc_sock *tsk; + int str_len = 0; + u32 ref = 0; + + buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); + if (!buf) + return NULL; + rep_tlv = (struct tlv_desc *)buf->data; + pb = TLV_DATA(rep_tlv); + pb_len = ULTRA_STRING_MAX_LEN; + + tsk = tipc_sk_get_next(&ref); + for (; tsk; tsk = tipc_sk_get_next(&ref)) { + lock_sock(&tsk->sk); + str_len += tipc_sk_show(tsk, pb + str_len, + pb_len - str_len, 0); + release_sock(&tsk->sk); + tipc_sk_put(tsk); + } + str_len += 1; /* for "\0" */ + skb_put(buf, TLV_SPACE(str_len)); + TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); + + return buf; +} + +/* tipc_sk_reinit: set non-zero address in all existing sockets + * when we go from standalone to network mode. + */ +void tipc_sk_reinit(void) +{ + struct tipc_msg *msg; + u32 ref = 0; + struct tipc_sock *tsk = tipc_sk_get_next(&ref); + + for (; tsk; tsk = tipc_sk_get_next(&ref)) { + lock_sock(&tsk->sk); + msg = &tsk->phdr; + msg_set_prevnode(msg, tipc_own_addr); + msg_set_orignode(msg, tipc_own_addr); + release_sock(&tsk->sk); + tipc_sk_put(tsk); + } +} + +/** + * struct reference - TIPC socket reference entry + * @tsk: pointer to socket associated with reference entry + * @ref: reference value for socket (combines instance & array index info) + */ +struct reference { + struct tipc_sock *tsk; + u32 ref; +}; + +/** + * struct tipc_ref_table - table of TIPC socket reference entries + * @entries: pointer to array of reference entries + * @capacity: array index of first unusable entry + * @init_point: array index of first uninitialized entry + * @first_free: array index of first unused socket reference entry + * @last_free: array index of last unused socket reference entry + * @index_mask: bitmask for array index portion of reference values + * @start_mask: initial value for instance value portion of reference values + */ +struct ref_table { + struct reference *entries; + u32 capacity; + u32 init_point; + u32 first_free; + u32 last_free; + u32 index_mask; + u32 start_mask; +}; + +/* Socket reference table consists of 2**N entries. + * + * State Socket ptr Reference + * ----- ---------- --------- + * In use non-NULL XXXX|own index + * (XXXX changes each time entry is acquired) + * Free NULL YYYY|next free index + * (YYYY is one more than last used XXXX) + * Uninitialized NULL 0 + * + * Entry 0 is not used; this allows index 0 to denote the end of the free list. + * + * Note that a reference value of 0 does not necessarily indicate that an + * entry is uninitialized, since the last entry in the free list could also + * have a reference value of 0 (although this is unlikely). + */ + +static struct ref_table tipc_ref_table; + +static DEFINE_RWLOCK(ref_table_lock); + +/** + * tipc_ref_table_init - create reference table for sockets + */ +int tipc_sk_ref_table_init(u32 req_sz, u32 start) +{ + struct reference *table; + u32 actual_sz; + + /* account for unused entry, then round up size to a power of 2 */ + + req_sz++; + for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) { + /* do nothing */ + }; + + /* allocate table & mark all entries as uninitialized */ + table = vzalloc(actual_sz * sizeof(struct reference)); + if (table == NULL) + return -ENOMEM; + + tipc_ref_table.entries = table; + tipc_ref_table.capacity = req_sz; + tipc_ref_table.init_point = 1; + tipc_ref_table.first_free = 0; + tipc_ref_table.last_free = 0; + tipc_ref_table.index_mask = actual_sz - 1; + tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask; + + return 0; +} + +/** + * tipc_ref_table_stop - destroy reference table for sockets + */ +void tipc_sk_ref_table_stop(void) +{ + if (!tipc_ref_table.entries) + return; + vfree(tipc_ref_table.entries); + tipc_ref_table.entries = NULL; +} + +/* tipc_ref_acquire - create reference to a socket + * + * Register an socket pointer in the reference table. + * Returns a unique reference value that is used from then on to retrieve the + * socket pointer, or to determine if the socket has been deregistered. + */ +u32 tipc_sk_ref_acquire(struct tipc_sock *tsk) +{ + u32 index; + u32 index_mask; + u32 next_plus_upper; + u32 ref = 0; + struct reference *entry; + + if (unlikely(!tsk)) { + pr_err("Attempt to acquire ref. to non-existent obj\n"); + return 0; + } + if (unlikely(!tipc_ref_table.entries)) { + pr_err("Ref. table not found in acquisition attempt\n"); + return 0; + } + + /* Take a free entry, if available; otherwise initialize a new one */ + write_lock_bh(&ref_table_lock); + index = tipc_ref_table.first_free; + entry = &tipc_ref_table.entries[index]; + + if (likely(index)) { + index = tipc_ref_table.first_free; + entry = &tipc_ref_table.entries[index]; + index_mask = tipc_ref_table.index_mask; + next_plus_upper = entry->ref; + tipc_ref_table.first_free = next_plus_upper & index_mask; + ref = (next_plus_upper & ~index_mask) + index; + entry->tsk = tsk; + } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) { + index = tipc_ref_table.init_point++; + entry = &tipc_ref_table.entries[index]; + ref = tipc_ref_table.start_mask + index; + } + + if (ref) { + entry->ref = ref; + entry->tsk = tsk; + } + write_unlock_bh(&ref_table_lock); + return ref; +} + +/* tipc_sk_ref_discard - invalidate reference to an socket + * + * Disallow future references to an socket and free up the entry for re-use. + */ +void tipc_sk_ref_discard(u32 ref) +{ + struct reference *entry; + u32 index; + u32 index_mask; + + if (unlikely(!tipc_ref_table.entries)) { + pr_err("Ref. table not found during discard attempt\n"); + return; + } + + index_mask = tipc_ref_table.index_mask; + index = ref & index_mask; + entry = &tipc_ref_table.entries[index]; + + write_lock_bh(&ref_table_lock); + + if (unlikely(!entry->tsk)) { + pr_err("Attempt to discard ref. to non-existent socket\n"); + goto exit; + } + if (unlikely(entry->ref != ref)) { + pr_err("Attempt to discard non-existent reference\n"); + goto exit; + } + + /* Mark entry as unused; increment instance part of entry's + * reference to invalidate any subsequent references + */ + + entry->tsk = NULL; + entry->ref = (ref & ~index_mask) + (index_mask + 1); + + /* Append entry to free entry list */ + if (unlikely(tipc_ref_table.first_free == 0)) + tipc_ref_table.first_free = index; + else + tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index; + tipc_ref_table.last_free = index; +exit: + write_unlock_bh(&ref_table_lock); +} + +/* tipc_sk_get - find referenced socket and return pointer to it + */ +struct tipc_sock *tipc_sk_get(u32 ref) +{ + struct reference *entry; + struct tipc_sock *tsk; + + if (unlikely(!tipc_ref_table.entries)) + return NULL; + read_lock_bh(&ref_table_lock); + entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask]; + tsk = entry->tsk; + if (likely(tsk && (entry->ref == ref))) + sock_hold(&tsk->sk); + else + tsk = NULL; + read_unlock_bh(&ref_table_lock); + return tsk; +} + +/* tipc_sk_get_next - lock & return next socket after referenced one +*/ +struct tipc_sock *tipc_sk_get_next(u32 *ref) +{ + struct reference *entry; + struct tipc_sock *tsk = NULL; + uint index = *ref & tipc_ref_table.index_mask; + + read_lock_bh(&ref_table_lock); + while (++index < tipc_ref_table.capacity) { + entry = &tipc_ref_table.entries[index]; + if (!entry->tsk) + continue; + tsk = entry->tsk; + sock_hold(&tsk->sk); + *ref = entry->ref; + break; + } + read_unlock_bh(&ref_table_lock); + return tsk; +} + +static void tipc_sk_put(struct tipc_sock *tsk) +{ + sock_put(&tsk->sk); +} + /** * tipc_setsockopt - set socket option * @sock: socket structure @@ -1823,7 +2553,6 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; u32 value; int res; @@ -1841,16 +2570,16 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, switch (opt) { case TIPC_IMPORTANCE: - tipc_port_set_importance(port, value); + res = tsk_set_importance(tsk, value); break; case TIPC_SRC_DROPPABLE: if (sock->type != SOCK_STREAM) - tipc_port_set_unreliable(port, value); + tsk_set_unreliable(tsk, value); else res = -ENOPROTOOPT; break; case TIPC_DEST_DROPPABLE: - tipc_port_set_unreturnable(port, value); + tsk_set_unreturnable(tsk, value); break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; @@ -1883,7 +2612,6 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_port *port = &tsk->port; int len; u32 value; int res; @@ -1900,16 +2628,16 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, switch (opt) { case TIPC_IMPORTANCE: - value = tipc_port_importance(port); + value = tsk_importance(tsk); break; case TIPC_SRC_DROPPABLE: - value = tipc_port_unreliable(port); + value = tsk_unreliable(tsk); break; case TIPC_DEST_DROPPABLE: - value = tipc_port_unreturnable(port); + value = tsk_unreturnable(tsk); break; case TIPC_CONN_TIMEOUT: - value = tipc_sk(sk)->conn_timeout; + value = tsk->conn_timeout; /* no need to set "res", since already 0 at this point */ break; case TIPC_NODE_RECVQ_DEPTH: @@ -1936,7 +2664,7 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, return put_user(sizeof(value), ol); } -int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) +static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) { struct tipc_sioc_ln_req lnr; void __user *argp = (void __user *)arg; @@ -1952,7 +2680,6 @@ int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) return 0; } return -EADDRNOTAVAIL; - break; default: return -ENOIOCTLCMD; } |