summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/sched.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/sched.c')
-rw-r--r--net/sunrpc/sched.c209
1 files changed, 60 insertions, 149 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 944d75396fb..2ac43c41c3a 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -25,7 +25,6 @@
#ifdef RPC_DEBUG
#define RPCDBG_FACILITY RPCDBG_SCHED
#define RPC_TASK_MAGIC_ID 0xf00baa
-static int rpc_task_id;
#endif
/*
@@ -40,7 +39,6 @@ static mempool_t *rpc_task_mempool __read_mostly;
static mempool_t *rpc_buffer_mempool __read_mostly;
static void __rpc_default_timer(struct rpc_task *task);
-static void rpciod_killall(void);
static void rpc_async_schedule(struct work_struct *);
static void rpc_release_task(struct rpc_task *task);
@@ -50,23 +48,13 @@ static void rpc_release_task(struct rpc_task *task);
static RPC_WAITQ(delay_queue, "delayq");
/*
- * All RPC tasks are linked into this list
- */
-static LIST_HEAD(all_tasks);
-
-/*
* rpciod-related stuff
*/
static DEFINE_MUTEX(rpciod_mutex);
-static unsigned int rpciod_users;
+static atomic_t rpciod_users = ATOMIC_INIT(0);
struct workqueue_struct *rpciod_workqueue;
/*
- * Spinlock for other critical sections of code.
- */
-static DEFINE_SPINLOCK(rpc_sched_lock);
-
-/*
* Disable the timer for a given RPC task. Should be called with
* queue->lock and bh_disabled in order to avoid races within
* rpc_run_timer().
@@ -267,18 +255,33 @@ static int rpc_wait_bit_interruptible(void *word)
return 0;
}
+#ifdef RPC_DEBUG
+static void rpc_task_set_debuginfo(struct rpc_task *task)
+{
+ static atomic_t rpc_pid;
+
+ task->tk_magic = RPC_TASK_MAGIC_ID;
+ task->tk_pid = atomic_inc_return(&rpc_pid);
+}
+#else
+static inline void rpc_task_set_debuginfo(struct rpc_task *task)
+{
+}
+#endif
+
static void rpc_set_active(struct rpc_task *task)
{
+ struct rpc_clnt *clnt;
if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
return;
- spin_lock(&rpc_sched_lock);
-#ifdef RPC_DEBUG
- task->tk_magic = RPC_TASK_MAGIC_ID;
- task->tk_pid = rpc_task_id++;
-#endif
+ rpc_task_set_debuginfo(task);
/* Add to global list of all tasks */
- list_add_tail(&task->tk_task, &all_tasks);
- spin_unlock(&rpc_sched_lock);
+ clnt = task->tk_client;
+ if (clnt != NULL) {
+ spin_lock(&clnt->cl_lock);
+ list_add_tail(&task->tk_task, &clnt->cl_tasks);
+ spin_unlock(&clnt->cl_lock);
+ }
}
/*
@@ -818,6 +821,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
if (tk_ops->rpc_call_prepare != NULL)
task->tk_action = rpc_prepare_task;
task->tk_calldata = calldata;
+ INIT_LIST_HEAD(&task->tk_task);
/* Initialize retry counters */
task->tk_garb_retry = 2;
@@ -830,7 +834,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
task->tk_workqueue = rpciod_workqueue;
if (clnt) {
- atomic_inc(&clnt->cl_users);
+ kref_get(&clnt->cl_kref);
if (clnt->cl_softrtry)
task->tk_flags |= RPC_TASK_SOFT;
if (!clnt->cl_intr)
@@ -860,9 +864,7 @@ static void rpc_free_task(struct rcu_head *rcu)
}
/*
- * Create a new task for the specified client. We have to
- * clean up after an allocation failure, as the client may
- * have specified "oneshot".
+ * Create a new task for the specified client.
*/
struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
{
@@ -870,7 +872,7 @@ struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc
task = rpc_alloc_task();
if (!task)
- goto cleanup;
+ goto out;
rpc_init_task(task, clnt, flags, tk_ops, calldata);
@@ -878,16 +880,6 @@ struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc
task->tk_flags |= RPC_TASK_DYNAMIC;
out:
return task;
-
-cleanup:
- /* Check whether to release the client */
- if (clnt) {
- printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
- atomic_read(&clnt->cl_users), clnt->cl_oneshot);
- atomic_inc(&clnt->cl_users); /* pretend we were used ... */
- rpc_release_client(clnt);
- }
- goto out;
}
@@ -920,11 +912,13 @@ static void rpc_release_task(struct rpc_task *task)
#endif
dprintk("RPC: %5u release task\n", task->tk_pid);
- /* Remove from global task list */
- spin_lock(&rpc_sched_lock);
- list_del(&task->tk_task);
- spin_unlock(&rpc_sched_lock);
-
+ if (!list_empty(&task->tk_task)) {
+ struct rpc_clnt *clnt = task->tk_client;
+ /* Remove from client task list */
+ spin_lock(&clnt->cl_lock);
+ list_del(&task->tk_task);
+ spin_unlock(&clnt->cl_lock);
+ }
BUG_ON (RPC_IS_QUEUED(task));
/* Synchronously delete any running timer */
@@ -939,29 +933,6 @@ static void rpc_release_task(struct rpc_task *task)
rpc_put_task(task);
}
-/**
- * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
- * @clnt: pointer to RPC client
- * @flags: RPC flags
- * @ops: RPC call ops
- * @data: user call data
- */
-struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
- const struct rpc_call_ops *ops,
- void *data)
-{
- struct rpc_task *task;
- task = rpc_new_task(clnt, flags, ops, data);
- if (task == NULL) {
- rpc_release_calldata(ops, data);
- return ERR_PTR(-ENOMEM);
- }
- atomic_inc(&task->tk_count);
- rpc_execute(task);
- return task;
-}
-EXPORT_SYMBOL(rpc_run_task);
-
/*
* Kill all tasks for the given client.
* XXX: kill their descendants as well?
@@ -969,44 +940,25 @@ EXPORT_SYMBOL(rpc_run_task);
void rpc_killall_tasks(struct rpc_clnt *clnt)
{
struct rpc_task *rovr;
- struct list_head *le;
- dprintk("RPC: killing all tasks for client %p\n", clnt);
+ if (list_empty(&clnt->cl_tasks))
+ return;
+ dprintk("RPC: killing all tasks for client %p\n", clnt);
/*
* Spin lock all_tasks to prevent changes...
*/
- spin_lock(&rpc_sched_lock);
- alltask_for_each(rovr, le, &all_tasks) {
+ spin_lock(&clnt->cl_lock);
+ list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
if (! RPC_IS_ACTIVATED(rovr))
continue;
- if (!clnt || rovr->tk_client == clnt) {
+ if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
rovr->tk_flags |= RPC_TASK_KILLED;
rpc_exit(rovr, -EIO);
rpc_wake_up_task(rovr);
}
}
- spin_unlock(&rpc_sched_lock);
-}
-
-static void rpciod_killall(void)
-{
- unsigned long flags;
-
- while (!list_empty(&all_tasks)) {
- clear_thread_flag(TIF_SIGPENDING);
- rpc_killall_tasks(NULL);
- flush_workqueue(rpciod_workqueue);
- if (!list_empty(&all_tasks)) {
- dprintk("RPC: rpciod_killall: waiting for tasks "
- "to exit\n");
- yield();
- }
- }
-
- spin_lock_irqsave(&current->sighand->siglock, flags);
- recalc_sigpending();
- spin_unlock_irqrestore(&current->sighand->siglock, flags);
+ spin_unlock(&clnt->cl_lock);
}
/*
@@ -1018,28 +970,27 @@ rpciod_up(void)
struct workqueue_struct *wq;
int error = 0;
+ if (atomic_inc_not_zero(&rpciod_users))
+ return 0;
+
mutex_lock(&rpciod_mutex);
- dprintk("RPC: rpciod_up: users %u\n", rpciod_users);
- rpciod_users++;
- if (rpciod_workqueue)
- goto out;
- /*
- * If there's no pid, we should be the first user.
- */
- if (rpciod_users > 1)
- printk(KERN_WARNING "rpciod_up: no workqueue, %u users??\n", rpciod_users);
+
+ /* Guard against races with rpciod_down() */
+ if (rpciod_workqueue != NULL)
+ goto out_ok;
/*
* Create the rpciod thread and wait for it to start.
*/
+ dprintk("RPC: creating workqueue rpciod\n");
error = -ENOMEM;
wq = create_workqueue("rpciod");
- if (wq == NULL) {
- printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
- rpciod_users--;
+ if (wq == NULL)
goto out;
- }
+
rpciod_workqueue = wq;
error = 0;
+out_ok:
+ atomic_inc(&rpciod_users);
out:
mutex_unlock(&rpciod_mutex);
return error;
@@ -1048,59 +999,19 @@ out:
void
rpciod_down(void)
{
+ if (!atomic_dec_and_test(&rpciod_users))
+ return;
+
mutex_lock(&rpciod_mutex);
- dprintk("RPC: rpciod_down sema %u\n", rpciod_users);
- if (rpciod_users) {
- if (--rpciod_users)
- goto out;
- } else
- printk(KERN_WARNING "rpciod_down: no users??\n");
+ dprintk("RPC: destroying workqueue rpciod\n");
- if (!rpciod_workqueue) {
- dprintk("RPC: rpciod_down: Nothing to do!\n");
- goto out;
+ if (atomic_read(&rpciod_users) == 0 && rpciod_workqueue != NULL) {
+ destroy_workqueue(rpciod_workqueue);
+ rpciod_workqueue = NULL;
}
- rpciod_killall();
-
- destroy_workqueue(rpciod_workqueue);
- rpciod_workqueue = NULL;
- out:
mutex_unlock(&rpciod_mutex);
}
-#ifdef RPC_DEBUG
-void rpc_show_tasks(void)
-{
- struct list_head *le;
- struct rpc_task *t;
-
- spin_lock(&rpc_sched_lock);
- if (list_empty(&all_tasks)) {
- spin_unlock(&rpc_sched_lock);
- return;
- }
- printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
- "-rpcwait -action- ---ops--\n");
- alltask_for_each(t, le, &all_tasks) {
- const char *rpc_waitq = "none";
-
- if (RPC_IS_QUEUED(t))
- rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
-
- printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
- t->tk_pid,
- (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
- t->tk_flags, t->tk_status,
- t->tk_client,
- (t->tk_client ? t->tk_client->cl_prog : 0),
- t->tk_rqstp, t->tk_timeout,
- rpc_waitq,
- t->tk_action, t->tk_ops);
- }
- spin_unlock(&rpc_sched_lock);
-}
-#endif
-
void
rpc_destroy_mempool(void)
{