diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/sched.c | 77 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 32 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 1 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 3 |
4 files changed, 85 insertions, 28 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 243fc09b164..3fc8624fcd1 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task) /* * Mark an RPC call as having completed by clearing the 'active' bit + * and then waking up all tasks that were sleeping. */ -static void rpc_mark_complete_task(struct rpc_task *task) +static int rpc_complete_task(struct rpc_task *task) { - smp_mb__before_clear_bit(); + void *m = &task->tk_runstate; + wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); + struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); + unsigned long flags; + int ret; + + spin_lock_irqsave(&wq->lock, flags); clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); - smp_mb__after_clear_bit(); - wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); + ret = atomic_dec_and_test(&task->tk_count); + if (waitqueue_active(wq)) + __wake_up_locked_key(wq, TASK_NORMAL, &k); + spin_unlock_irqrestore(&wq->lock, flags); + return ret; } /* * Allow callers to wait for completion of an RPC call + * + * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() + * to enforce taking of the wq->lock and hence avoid races with + * rpc_complete_task(). */ int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) { if (action == NULL) action = rpc_wait_bit_killable; - return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, + return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, action, TASK_KILLABLE); } EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); @@ -857,34 +871,67 @@ static void rpc_async_release(struct work_struct *work) rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); } -void rpc_put_task(struct rpc_task *task) +static void rpc_release_resources_task(struct rpc_task *task) { - if (!atomic_dec_and_test(&task->tk_count)) - return; - /* Release resources */ if (task->tk_rqstp) xprt_release(task); if (task->tk_msg.rpc_cred) put_rpccred(task->tk_msg.rpc_cred); rpc_task_release_client(task); - if (task->tk_workqueue != NULL) { +} + +static void rpc_final_put_task(struct rpc_task *task, + struct workqueue_struct *q) +{ + if (q != NULL) { INIT_WORK(&task->u.tk_work, rpc_async_release); - queue_work(task->tk_workqueue, &task->u.tk_work); + queue_work(q, &task->u.tk_work); } else rpc_free_task(task); } + +static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q) +{ + if (atomic_dec_and_test(&task->tk_count)) { + rpc_release_resources_task(task); + rpc_final_put_task(task, q); + } +} + +void rpc_put_task(struct rpc_task *task) +{ + rpc_do_put_task(task, NULL); +} EXPORT_SYMBOL_GPL(rpc_put_task); +void rpc_put_task_async(struct rpc_task *task) +{ + rpc_do_put_task(task, task->tk_workqueue); +} +EXPORT_SYMBOL_GPL(rpc_put_task_async); + static void rpc_release_task(struct rpc_task *task) { dprintk("RPC: %5u release task\n", task->tk_pid); BUG_ON (RPC_IS_QUEUED(task)); - /* Wake up anyone who is waiting for task completion */ - rpc_mark_complete_task(task); + rpc_release_resources_task(task); - rpc_put_task(task); + /* + * Note: at this point we have been removed from rpc_clnt->cl_tasks, + * so it should be safe to use task->tk_count as a test for whether + * or not any other processes still hold references to our rpc_task. + */ + if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) { + /* Wake up anyone who may be waiting for task completion */ + if (!rpc_complete_task(task)) + return; + } else { + if (!atomic_dec_and_test(&task->tk_count)) + return; + } + rpc_final_put_task(task, task->tk_workqueue); } int rpciod_up(void) @@ -908,7 +955,7 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - wq = alloc_workqueue("rpciod", WQ_RESCUER, 0); + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0); rpciod_workqueue = wq; return rpciod_workqueue != NULL; } diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index d802e941d36..b7d435c3f19 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -420,6 +420,7 @@ static void svc_sock_setbufsize(struct socket *sock, unsigned int snd, static void svc_udp_data_ready(struct sock *sk, int count) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; + wait_queue_head_t *wq = sk_sleep(sk); if (svsk) { dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n", @@ -428,8 +429,8 @@ static void svc_udp_data_ready(struct sock *sk, int count) set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); svc_xprt_enqueue(&svsk->sk_xprt); } - if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + if (wq && waitqueue_active(wq)) + wake_up_interruptible(wq); } /* @@ -438,6 +439,7 @@ static void svc_udp_data_ready(struct sock *sk, int count) static void svc_write_space(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); + wait_queue_head_t *wq = sk_sleep(sk); if (svsk) { dprintk("svc: socket %p(inet %p), write_space busy=%d\n", @@ -445,10 +447,10 @@ static void svc_write_space(struct sock *sk) svc_xprt_enqueue(&svsk->sk_xprt); } - if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) { + if (wq && waitqueue_active(wq)) { dprintk("RPC svc_write_space: someone sleeping on %p\n", svsk); - wake_up_interruptible(sk_sleep(sk)); + wake_up_interruptible(wq); } } @@ -739,6 +741,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; + wait_queue_head_t *wq; dprintk("svc: socket %p TCP (listen) state change %d\n", sk, sk->sk_state); @@ -761,8 +764,9 @@ static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) printk("svc: socket %p: no user data\n", sk); } - if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) - wake_up_interruptible_all(sk_sleep(sk)); + wq = sk_sleep(sk); + if (wq && waitqueue_active(wq)) + wake_up_interruptible_all(wq); } /* @@ -771,6 +775,7 @@ static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) static void svc_tcp_state_change(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; + wait_queue_head_t *wq = sk_sleep(sk); dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n", sk, sk->sk_state, sk->sk_user_data); @@ -781,13 +786,14 @@ static void svc_tcp_state_change(struct sock *sk) set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); svc_xprt_enqueue(&svsk->sk_xprt); } - if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) - wake_up_interruptible_all(sk_sleep(sk)); + if (wq && waitqueue_active(wq)) + wake_up_interruptible_all(wq); } static void svc_tcp_data_ready(struct sock *sk, int count) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; + wait_queue_head_t *wq = sk_sleep(sk); dprintk("svc: socket %p TCP data ready (svsk %p)\n", sk, sk->sk_user_data); @@ -795,8 +801,8 @@ static void svc_tcp_data_ready(struct sock *sk, int count) set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); svc_xprt_enqueue(&svsk->sk_xprt); } - if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + if (wq && waitqueue_active(wq)) + wake_up_interruptible(wq); } /* @@ -1531,6 +1537,7 @@ static void svc_sock_detach(struct svc_xprt *xprt) { struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); struct sock *sk = svsk->sk_sk; + wait_queue_head_t *wq; dprintk("svc: svc_sock_detach(%p)\n", svsk); @@ -1539,8 +1546,9 @@ static void svc_sock_detach(struct svc_xprt *xprt) sk->sk_data_ready = svsk->sk_odata; sk->sk_write_space = svsk->sk_owspace; - if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) - wake_up_interruptible(sk_sleep(sk)); + wq = sk_sleep(sk); + if (wq && waitqueue_active(wq)) + wake_up_interruptible(wq); } /* diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 9df1eadc912..1a10dcd999e 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1335,6 +1335,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, p, 0, length, DMA_FROM_DEVICE); if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) { put_page(p); + svc_rdma_put_context(ctxt, 1); return; } atomic_inc(&xprt->sc_dma_used); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index c431f5a5796..be96d429b47 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1631,7 +1631,8 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt, } xs_reclassify_socket(family, sock); - if (xs_bind(transport, sock)) { + err = xs_bind(transport, sock); + if (err) { sock_release(sock); goto out; } |