diff options
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r-- | net/sunrpc/sched.c | 106 |
1 files changed, 69 insertions, 37 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 243fc09b164..ffb687671da 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task) /* * Mark an RPC call as having completed by clearing the 'active' bit + * and then waking up all tasks that were sleeping. */ -static void rpc_mark_complete_task(struct rpc_task *task) +static int rpc_complete_task(struct rpc_task *task) { - smp_mb__before_clear_bit(); + void *m = &task->tk_runstate; + wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); + struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); + unsigned long flags; + int ret; + + spin_lock_irqsave(&wq->lock, flags); clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); - smp_mb__after_clear_bit(); - wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); + ret = atomic_dec_and_test(&task->tk_count); + if (waitqueue_active(wq)) + __wake_up_locked_key(wq, TASK_NORMAL, &k); + spin_unlock_irqrestore(&wq->lock, flags); + return ret; } /* * Allow callers to wait for completion of an RPC call + * + * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() + * to enforce taking of the wq->lock and hence avoid races with + * rpc_complete_task(). */ int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) { if (action == NULL) action = rpc_wait_bit_killable; - return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, + return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, action, TASK_KILLABLE); } EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); @@ -285,15 +299,8 @@ static void rpc_make_runnable(struct rpc_task *task) if (rpc_test_and_set_running(task)) return; if (RPC_IS_ASYNC(task)) { - int status; - INIT_WORK(&task->u.tk_work, rpc_async_schedule); - status = queue_work(rpciod_workqueue, &task->u.tk_work); - if (status < 0) { - printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); - task->tk_status = status; - return; - } + queue_work(rpciod_workqueue, &task->u.tk_work); } else wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); } @@ -623,14 +630,12 @@ static void __rpc_execute(struct rpc_task *task) save_callback = task->tk_callback; task->tk_callback = NULL; save_callback(task); - } - - /* - * Perform the next FSM step. - * tk_action may be NULL when the task has been killed - * by someone else. - */ - if (!RPC_IS_QUEUED(task)) { + } else { + /* + * Perform the next FSM step. + * tk_action may be NULL when the task has been killed + * by someone else. + */ if (task->tk_action == NULL) break; task->tk_action(task); @@ -829,12 +834,6 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data) } rpc_init_task(task, setup_data); - if (task->tk_status < 0) { - int err = task->tk_status; - rpc_put_task(task); - return ERR_PTR(err); - } - task->tk_flags |= flags; dprintk("RPC: allocated task %p\n", task); return task; @@ -857,34 +856,67 @@ static void rpc_async_release(struct work_struct *work) rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); } -void rpc_put_task(struct rpc_task *task) +static void rpc_release_resources_task(struct rpc_task *task) { - if (!atomic_dec_and_test(&task->tk_count)) - return; - /* Release resources */ if (task->tk_rqstp) xprt_release(task); if (task->tk_msg.rpc_cred) put_rpccred(task->tk_msg.rpc_cred); rpc_task_release_client(task); - if (task->tk_workqueue != NULL) { +} + +static void rpc_final_put_task(struct rpc_task *task, + struct workqueue_struct *q) +{ + if (q != NULL) { INIT_WORK(&task->u.tk_work, rpc_async_release); - queue_work(task->tk_workqueue, &task->u.tk_work); + queue_work(q, &task->u.tk_work); } else rpc_free_task(task); } + +static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q) +{ + if (atomic_dec_and_test(&task->tk_count)) { + rpc_release_resources_task(task); + rpc_final_put_task(task, q); + } +} + +void rpc_put_task(struct rpc_task *task) +{ + rpc_do_put_task(task, NULL); +} EXPORT_SYMBOL_GPL(rpc_put_task); +void rpc_put_task_async(struct rpc_task *task) +{ + rpc_do_put_task(task, task->tk_workqueue); +} +EXPORT_SYMBOL_GPL(rpc_put_task_async); + static void rpc_release_task(struct rpc_task *task) { dprintk("RPC: %5u release task\n", task->tk_pid); BUG_ON (RPC_IS_QUEUED(task)); - /* Wake up anyone who is waiting for task completion */ - rpc_mark_complete_task(task); + rpc_release_resources_task(task); - rpc_put_task(task); + /* + * Note: at this point we have been removed from rpc_clnt->cl_tasks, + * so it should be safe to use task->tk_count as a test for whether + * or not any other processes still hold references to our rpc_task. + */ + if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) { + /* Wake up anyone who may be waiting for task completion */ + if (!rpc_complete_task(task)) + return; + } else { + if (!atomic_dec_and_test(&task->tk_count)) + return; + } + rpc_final_put_task(task, task->tk_workqueue); } int rpciod_up(void) @@ -908,7 +940,7 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - wq = alloc_workqueue("rpciod", WQ_RESCUER, 0); + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0); rpciod_workqueue = wq; return rpciod_workqueue != NULL; } |