diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2010-08-07 12:42:58 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2010-08-07 12:42:58 -0700 |
commit | 3b7433b8a8a83c87972065b1852b7dcae691e464 (patch) | |
tree | 93fa2c003f8baef5ab0733b53bac77961ed5240c /kernel | |
parent | 4a386c3e177ca2fbc70c9283d0b46537844763a0 (diff) | |
parent | 6ee0578b4daaea01c96b172c6aacca43fd9807a6 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq: (55 commits)
workqueue: mark init_workqueues() as early_initcall()
workqueue: explain for_each_*cwq_cpu() iterators
fscache: fix build on !CONFIG_SYSCTL
slow-work: kill it
gfs2: use workqueue instead of slow-work
drm: use workqueue instead of slow-work
cifs: use workqueue instead of slow-work
fscache: drop references to slow-work
fscache: convert operation to use workqueue instead of slow-work
fscache: convert object to use workqueue instead of slow-work
workqueue: fix how cpu number is stored in work->data
workqueue: fix mayday_mask handling on UP
workqueue: fix build problem on !CONFIG_SMP
workqueue: fix locking in retry path of maybe_create_worker()
async: use workqueue for worker pool
workqueue: remove WQ_SINGLE_CPU and use WQ_UNBOUND instead
workqueue: implement unbound workqueue
workqueue: prepare for WQ_UNBOUND implementation
libata: take advantage of cmwq and remove concurrency limitations
workqueue: fix worker management invocation without pending works
...
Fixed up conflicts in fs/cifs/* as per Tejun. Other trivial conflicts in
include/linux/workqueue.h, kernel/trace/Kconfig and kernel/workqueue.c
Diffstat (limited to 'kernel')
-rw-r--r-- | kernel/Makefile | 2 | ||||
-rw-r--r-- | kernel/async.c | 141 | ||||
-rw-r--r-- | kernel/kthread.c | 164 | ||||
-rw-r--r-- | kernel/power/process.c | 21 | ||||
-rw-r--r-- | kernel/slow-work-debugfs.c | 227 | ||||
-rw-r--r-- | kernel/slow-work.c | 1068 | ||||
-rw-r--r-- | kernel/slow-work.h | 72 | ||||
-rw-r--r-- | kernel/sysctl.c | 8 | ||||
-rw-r--r-- | kernel/trace/Kconfig | 11 | ||||
-rw-r--r-- | kernel/workqueue.c | 3160 | ||||
-rw-r--r-- | kernel/workqueue_sched.h | 13 |
11 files changed, 2963 insertions, 1924 deletions
diff --git a/kernel/Makefile b/kernel/Makefile index ce53fb2bd1d..c53e491e25a 100644 --- a/kernel/Makefile +++ b/kernel/Makefile @@ -99,8 +99,6 @@ obj-$(CONFIG_TRACING) += trace/ obj-$(CONFIG_X86_DS) += trace/ obj-$(CONFIG_RING_BUFFER) += trace/ obj-$(CONFIG_SMP) += sched_cpupri.o -obj-$(CONFIG_SLOW_WORK) += slow-work.o -obj-$(CONFIG_SLOW_WORK_DEBUG) += slow-work-debugfs.o obj-$(CONFIG_PERF_EVENTS) += perf_event.o obj-$(CONFIG_HAVE_HW_BREAKPOINT) += hw_breakpoint.o obj-$(CONFIG_USER_RETURN_NOTIFIER) += user-return-notifier.o diff --git a/kernel/async.c b/kernel/async.c index 15319d6c18f..cd9dbb913c7 100644 --- a/kernel/async.c +++ b/kernel/async.c @@ -49,40 +49,33 @@ asynchronous and synchronous parts of the kernel. */ #include <linux/async.h> -#include <linux/bug.h> #include <linux/module.h> #include <linux/wait.h> #include <linux/sched.h> -#include <linux/init.h> -#include <linux/kthread.h> -#include <linux/delay.h> #include <linux/slab.h> +#include <linux/workqueue.h> #include <asm/atomic.h> static async_cookie_t next_cookie = 1; -#define MAX_THREADS 256 #define MAX_WORK 32768 static LIST_HEAD(async_pending); static LIST_HEAD(async_running); static DEFINE_SPINLOCK(async_lock); -static int async_enabled = 0; - struct async_entry { - struct list_head list; - async_cookie_t cookie; - async_func_ptr *func; - void *data; - struct list_head *running; + struct list_head list; + struct work_struct work; + async_cookie_t cookie; + async_func_ptr *func; + void *data; + struct list_head *running; }; static DECLARE_WAIT_QUEUE_HEAD(async_done); -static DECLARE_WAIT_QUEUE_HEAD(async_new); static atomic_t entry_count; -static atomic_t thread_count; extern int initcall_debug; @@ -117,27 +110,23 @@ static async_cookie_t lowest_in_progress(struct list_head *running) spin_unlock_irqrestore(&async_lock, flags); return ret; } + /* * pick the first pending entry and run it */ -static void run_one_entry(void) +static void async_run_entry_fn(struct work_struct *work) { + struct async_entry *entry = + container_of(work, struct async_entry, work); unsigned long flags; - struct async_entry *entry; ktime_t calltime, delta, rettime; - /* 1) pick one task from the pending queue */ - + /* 1) move self to the running queue */ spin_lock_irqsave(&async_lock, flags); - if (list_empty(&async_pending)) - goto out; - entry = list_first_entry(&async_pending, struct async_entry, list); - - /* 2) move it to the running queue */ list_move_tail(&entry->list, entry->running); spin_unlock_irqrestore(&async_lock, flags); - /* 3) run it (and print duration)*/ + /* 2) run (and print duration) */ if (initcall_debug && system_state == SYSTEM_BOOTING) { printk("calling %lli_%pF @ %i\n", (long long)entry->cookie, entry->func, task_pid_nr(current)); @@ -153,31 +142,25 @@ static void run_one_entry(void) (long long)ktime_to_ns(delta) >> 10); } - /* 4) remove it from the running queue */ + /* 3) remove self from the running queue */ spin_lock_irqsave(&async_lock, flags); list_del(&entry->list); - /* 5) free the entry */ + /* 4) free the entry */ kfree(entry); atomic_dec(&entry_count); spin_unlock_irqrestore(&async_lock, flags); - /* 6) wake up any waiters. */ + /* 5) wake up any waiters */ wake_up(&async_done); - return; - -out: - spin_unlock_irqrestore(&async_lock, flags); } - static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running) { struct async_entry *entry; unsigned long flags; async_cookie_t newcookie; - /* allow irq-off callers */ entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC); @@ -186,7 +169,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l * If we're out of memory or if there's too much work * pending already, we execute synchronously. */ - if (!async_enabled || !entry || atomic_read(&entry_count) > MAX_WORK) { + if (!entry || atomic_read(&entry_count) > MAX_WORK) { kfree(entry); spin_lock_irqsave(&async_lock, flags); newcookie = next_cookie++; @@ -196,6 +179,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l ptr(data, newcookie); return newcookie; } + INIT_WORK(&entry->work, async_run_entry_fn); entry->func = ptr; entry->data = data; entry->running = running; @@ -205,7 +189,10 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l list_add_tail(&entry->list, &async_pending); atomic_inc(&entry_count); spin_unlock_irqrestore(&async_lock, flags); - wake_up(&async_new); + + /* schedule for execution */ + queue_work(system_unbound_wq, &entry->work); + return newcookie; } @@ -312,87 +299,3 @@ void async_synchronize_cookie(async_cookie_t cookie) async_synchronize_cookie_domain(cookie, &async_running); } EXPORT_SYMBOL_GPL(async_synchronize_cookie); - - -static int async_thread(void *unused) -{ - DECLARE_WAITQUEUE(wq, current); - add_wait_queue(&async_new, &wq); - - while (!kthread_should_stop()) { - int ret = HZ; - set_current_state(TASK_INTERRUPTIBLE); - /* - * check the list head without lock.. false positives - * are dealt with inside run_one_entry() while holding - * the lock. - */ - rmb(); - if (!list_empty(&async_pending)) - run_one_entry(); - else - ret = schedule_timeout(HZ); - - if (ret == 0) { - /* - * we timed out, this means we as thread are redundant. - * we sign off and die, but we to avoid any races there - * is a last-straw check to see if work snuck in. - */ - atomic_dec(&thread_count); - wmb(); /* manager must see our departure first */ - if (list_empty(&async_pending)) - break; - /* - * woops work came in between us timing out and us - * signing off; we need to stay alive and keep working. - */ - atomic_inc(&thread_count); - } - } - remove_wait_queue(&async_new, &wq); - - return 0; -} - -static int async_manager_thread(void *unused) -{ - DECLARE_WAITQUEUE(wq, current); - add_wait_queue(&async_new, &wq); - - while (!kthread_should_stop()) { - int tc, ec; - - set_current_state(TASK_INTERRUPTIBLE); - - tc = atomic_read(&thread_count); - rmb(); - ec = atomic_read(&entry_count); - - while (tc < ec && tc < MAX_THREADS) { - if (IS_ERR(kthread_run(async_thread, NULL, "async/%i", - tc))) { - msleep(100); - continue; - } - atomic_inc(&thread_count); - tc++; - } - - schedule(); - } - remove_wait_queue(&async_new, &wq); - - return 0; -} - -static int __init async_init(void) -{ - async_enabled = - !IS_ERR(kthread_run(async_manager_thread, NULL, "async/mgr")); - - WARN_ON(!async_enabled); - return 0; -} - -core_initcall(async_init); diff --git a/kernel/kthread.c b/kernel/kthread.c index 83911c78017..2dc3786349d 100644 --- a/kernel/kthread.c +++ b/kernel/kthread.c @@ -14,6 +14,8 @@ #include <linux/file.h> #include <linux/module.h> #include <linux/mutex.h> +#include <linux/slab.h> +#include <linux/freezer.h> #include <trace/events/sched.h> static DEFINE_SPINLOCK(kthread_create_lock); @@ -35,6 +37,7 @@ struct kthread_create_info struct kthread { int should_stop; + void *data; struct completion exited; }; @@ -54,6 +57,19 @@ int kthread_should_stop(void) } EXPORT_SYMBOL(kthread_should_stop); +/** + * kthread_data - return data value specified on kthread creation + * @task: kthread task in question + * + * Return the data value specified when kthread @task was created. + * The caller is responsible for ensuring the validity of @task when + * calling this function. + */ +void *kthread_data(struct task_struct *task) +{ + return to_kthread(task)->data; +} + static int kthread(void *_create) { /* Copy data: it's on kthread's stack */ @@ -64,6 +80,7 @@ static int kthread(void *_create) int ret; self.should_stop = 0; + self.data = data; init_completion(&self.exited); current->vfork_done = &self.exited; @@ -247,3 +264,150 @@ int kthreadd(void *unused) return 0; } + +/** + * kthread_worker_fn - kthread function to process kthread_worker + * @worker_ptr: pointer to initialized kthread_worker + * + * This function can be used as @threadfn to kthread_create() or + * kthread_run() with @worker_ptr argument pointing to an initialized + * kthread_worker. The started kthread will process work_list until + * the it is stopped with kthread_stop(). A kthread can also call + * this function directly after extra initialization. + * + * Different kthreads can be used for the same kthread_worker as long + * as there's only one kthread attached to it at any given time. A + * kthread_worker without an attached kthread simply collects queued + * kthread_works. + */ +int kthread_worker_fn(void *worker_ptr) +{ + struct kthread_worker *worker = worker_ptr; + struct kthread_work *work; + + WARN_ON(worker->task); + worker->task = current; +repeat: + set_current_state(TASK_INTERRUPTIBLE); /* mb paired w/ kthread_stop */ + + if (kthread_should_stop()) { + __set_current_state(TASK_RUNNING); + spin_lock_irq(&worker->lock); + worker->task = NULL; + spin_unlock_irq(&worker->lock); + return 0; + } + + work = NULL; + spin_lock_irq(&worker->lock); + if (!list_empty(&worker->work_list)) { + work = list_first_entry(&worker->work_list, + struct kthread_work, node); + list_del_init(&work->node); + } + spin_unlock_irq(&worker->lock); + + if (work) { + __set_current_state(TASK_RUNNING); + work->func(work); + smp_wmb(); /* wmb worker-b0 paired with flush-b1 */ + work->done_seq = work->queue_seq; + smp_mb(); /* mb worker-b1 paired with flush-b0 */ + if (atomic_read(&work->flushing)) + wake_up_all(&work->done); + } else if (!freezing(current)) + schedule(); + + try_to_freeze(); + goto repeat; +} +EXPORT_SYMBOL_GPL(kthread_worker_fn); + +/** + * queue_kthread_work - queue a kthread_work + * @worker: target kthread_worker + * @work: kthread_work to queue + * + * Queue @work to work processor @task for async execution. @task + * must have been created with kthread_worker_create(). Returns %true + * if @work was successfully queued, %false if it was already pending. + */ +bool queue_kthread_work(struct kthread_worker *worker, + struct kthread_work *work) +{ + bool ret = false; + unsigned long flags; + + spin_lock_irqsave(&worker->lock, flags); + if (list_empty(&work->node)) { + list_add_tail(&work->node, &worker->work_list); + work->queue_seq++; + if (likely(worker->task)) + wake_up_process(worker->task); + ret = true; + } + spin_unlock_irqrestore(&worker->lock, flags); + return ret; +} +EXPORT_SYMBOL_GPL(queue_kthread_work); + +/** + * flush_kthread_work - flush a kthread_work + * @work: work to flush + * + * If @work is queued or executing, wait for it to finish execution. + */ +void flush_kthread_work(struct kthread_work *work) +{ + int seq = work->queue_seq; + + atomic_inc(&work->flushing); + + /* + * mb flush-b0 paired with worker-b1, to make sure either + * worker sees the above increment or we see done_seq update. + */ + smp_mb__after_atomic_inc(); + + /* A - B <= 0 tests whether B is in front of A regardless of overflow */ + wait_event(work->done, seq - work->done_seq <= 0); + atomic_dec(&work->flushing); + + /* + * rmb flush-b1 paired with worker-b0, to make sure our caller + * sees every change made by work->func(). + */ + smp_mb__after_atomic_dec(); +} +EXPORT_SYMBOL_GPL(flush_kthread_work); + +struct kthread_flush_work { + struct kthread_work work; + struct completion done; +}; + +static void kthread_flush_work_fn(struct kthread_work *work) +{ + struct kthread_flush_work *fwork = + container_of(work, struct kthread_flush_work, work); + complete(&fwork->done); +} + +/** + * flush_kthread_worker - flush all current works on a kthread_worker + * @worker: worker to flush + * + * Wait until all currently executing or pending works on @worker are + * finished. + */ +void flush_kthread_worker(struct kthread_worker *worker) +{ + struct kthread_flush_work fwork = { + KTHREAD_WORK_INIT(fwork.work, kthread_flush_work_fn), + COMPLETION_INITIALIZER_ONSTACK(fwork.done), + }; + + queue_kthread_work(worker, &fwork.work); + wait_for_completion(&fwork.done); +} +EXPORT_SYMBOL_GPL(flush_kthread_worker); diff --git a/kernel/power/process.c b/kernel/power/process.c index 71ae29052ab..028a99598f4 100644 --- a/kernel/power/process.c +++ b/kernel/power/process.c @@ -15,6 +15,7 @@ #include <linux/syscalls.h> #include <linux/freezer.h> #include <linux/delay.h> +#include <linux/workqueue.h> /* * Timeout for stopping processes @@ -35,6 +36,7 @@ static int try_to_freeze_tasks(bool sig_only) struct task_struct *g, *p; unsigned long end_time; unsigned int todo; + bool wq_busy = false; struct timeval start, end; u64 elapsed_csecs64; unsigned int elapsed_csecs; @@ -42,6 +44,10 @@ static int try_to_freeze_tasks(bool sig_only) do_gettimeofday(&start); end_time = jiffies + TIMEOUT; + + if (!sig_only) + freeze_workqueues_begin(); + while (true) { todo = 0; read_lock(&tasklist_lock); @@ -63,6 +69,12 @@ static int try_to_freeze_tasks(bool sig_only) todo++; } while_each_thread(g, p); read_unlock(&tasklist_lock); + + if (!sig_only) { + wq_busy = freeze_workqueues_busy(); + todo += wq_busy; + } + if (!todo || time_after(jiffies, end_time)) break; @@ -86,8 +98,12 @@ static int try_to_freeze_tasks(bool sig_only) */ printk("\n"); printk(KERN_ERR "Freezing of tasks failed after %d.%02d seconds " - "(%d tasks refusing to freeze):\n", - elapsed_csecs / 100, elapsed_csecs % 100, todo); + "(%d tasks refusing to freeze, wq_busy=%d):\n", + elapsed_csecs / 100, elapsed_csecs % 100, + todo - wq_busy, wq_busy); + + thaw_workqueues(); + read_lock(&tasklist_lock); do_each_thread(g, p) { task_lock(p); @@ -157,6 +173,7 @@ void thaw_processes(void) oom_killer_enable(); printk("Restarting tasks ... "); + thaw_workqueues(); thaw_tasks(true); thaw_tasks(false); schedule(); diff --git a/kernel/slow-work-debugfs.c b/kernel/slow-work-debugfs.c deleted file mode 100644 index e45c4364529..00000000000 --- a/kernel/slow-work-debugfs.c +++ /dev/null @@ -1,227 +0,0 @@ -/* Slow work debugging - * - * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved. - * Written by David Howells (dhowells@redhat.com) - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public Licence - * as published by the Free Software Foundation; either version - * 2 of the Licence, or (at your option) any later version. - */ - -#include <linux/module.h> -#include <linux/slow-work.h> -#include <linux/fs.h> -#include <linux/time.h> -#include <linux/seq_file.h> -#include "slow-work.h" - -#define ITERATOR_SHIFT (BITS_PER_LONG - 4) -#define ITERATOR_SELECTOR (0xfUL << ITERATOR_SHIFT) -#define ITERATOR_COUNTER (~ITERATOR_SELECTOR) - -void slow_work_new_thread_desc(struct slow_work *work, struct seq_file *m) -{ - seq_puts(m, "Slow-work: New thread"); -} - -/* - * Render the time mark field on a work item into a 5-char time with units plus - * a space - */ -static void slow_work_print_mark(struct seq_file *m, struct slow_work *work) -{ - struct timespec now, diff; - - now = CURRENT_TIME; - diff = timespec_sub(now, work->mark); - - if (diff.tv_sec < 0) - seq_puts(m, " -ve "); - else if (diff.tv_sec == 0 && diff.tv_nsec < 1000) - seq_printf(m, "%3luns ", diff.tv_nsec); - else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000) - seq_printf(m, "%3luus ", diff.tv_nsec / 1000); - else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000000) - seq_printf(m, "%3lums ", diff.tv_nsec / 1000000); - else if (diff.tv_sec <= 1) - seq_puts(m, " 1s "); - else if (diff.tv_sec < 60) - seq_printf(m, "%4lus ", diff.tv_sec); - else if (diff.tv_sec < 60 * 60) - seq_printf(m, "%4lum ", diff.tv_sec / 60); - else if (diff.tv_sec < 60 * 60 * 24) - seq_printf(m, "%4luh ", diff.tv_sec / 3600); - else - seq_puts(m, "exces "); -} - -/* - * Describe a slow work item for debugfs - */ -static int slow_work_runqueue_show(struct seq_file *m, void *v) -{ - struct slow_work *work; - struct list_head *p = v; - unsigned long id; - - switch ((unsigned long) v) { - case 1: - seq_puts(m, "THR PID ITEM ADDR FL MARK DESC\n"); - return 0; - case 2: - seq_puts(m, "=== ===== ================ == ===== ==========\n"); - return 0; - - case 3 ... 3 + SLOW_WORK_THREAD_LIMIT - 1: - id = (unsigned long) v - 3; - - read_lock(&slow_work_execs_lock); - work = slow_work_execs[id]; - if (work) { - smp_read_barrier_depends(); - - seq_printf(m, "%3lu %5d %16p %2lx ", - id, slow_work_pids[id], work, work->flags); - slow_work_print_mark(m, work); - - if (work->ops->desc) - work->ops->desc(work, m); - seq_putc(m, '\n'); - } - read_unlock(&slow_work_execs_lock); - return 0; - - default: - work = list_entry(p, struct slow_work, link); - seq_printf(m, "%3s - %16p %2lx ", - work->flags & SLOW_WORK_VERY_SLOW ? "vsq" : "sq", - work, work->flags); - slow_work_print_mark(m, work); - - if (work->ops->desc) - work->ops->desc(work, m); - seq_putc(m, '\n'); - return 0; - } -} - -/* - * map the iterator to a work item - */ -static void *slow_work_runqueue_index(struct seq_file *m, loff_t *_pos) -{ - struct list_head *p; - unsigned long count, id; - - switch (*_pos >> ITERATOR_SHIFT) { - case 0x0: - if (*_pos == 0) - *_pos = 1; - if (*_pos < 3) - return (void *)(unsigned long) *_pos; - if (*_pos < 3 + SLOW_WORK_THREAD_LIMIT) - for (id = *_pos - 3; - id < SLOW_WORK_THREAD_LIMIT; - id++, (*_pos)++) - if (slow_work_execs[id]) - return (void *)(unsigned long) *_pos; - *_pos = 0x1UL << ITERATOR_SHIFT; - - case 0x1: - count = *_pos & ITERATOR_COUNTER; - list_for_each(p, &slow_work_queue) { - if (count == 0) - return p; - count--; - } - *_pos = 0x2UL << ITERATOR_SHIFT; - - case 0x2: - count = *_pos & ITERATOR_COUNTER; - list_for_each(p, &vslow_work_queue) { - if (count == 0) - return p; - count--; - } - *_pos = 0x3UL << ITERATOR_SHIFT; - - default: - return NULL; - } -} - -/* - * set up the iterator to start reading from the first line - */ -static void *slow_work_runqueue_start(struct seq_file *m, loff_t *_pos) -{ - spin_lock_irq(&slow_work_queue_lock); - return slow_work_runqueue_index(m, _pos); -} - -/* - * move to the next line - */ -static void *slow_work_runqueue_next(struct seq_file *m, void *v, loff_t *_pos) -{ - struct list_head *p = v; - unsigned long selector = *_pos >> ITERATOR_SHIFT; - - (*_pos)++; - switch (selector) { - case 0x0: - return slow_work_runqueue_index(m, _pos); - - case 0x1: - if (*_pos >> ITERATOR_SHIFT == 0x1) { - p = p->next; - if (p != &slow_work_queue) - return p; - } - *_pos = 0x2UL << ITERATOR_SHIFT; - p = &vslow_work_queue; - - case 0x2: - if (*_pos >> ITERATOR_SHIFT == 0x2) { - p = p->next; - if (p != &vslow_work_queue) - return p; - } - *_pos = 0x3UL << ITERATOR_SHIFT; - - default: - return NULL; - } -} - -/* - * clean up after reading - */ -static void slow_work_runqueue_stop(struct seq_file *m, void *v) -{ - spin_unlock_irq(&slow_work_queue_lock); -} - -static const struct seq_operations slow_work_runqueue_ops = { - .start = slow_work_runqueue_start, - .stop = slow_work_runqueue_stop, - .next = slow_work_runqueue_next, - .show = slow_work_runqueue_show, -}; - -/* - * open "/sys/kernel/debug/slow_work/runqueue" to list queue contents - */ -static int slow_work_runqueue_open(struct inode *inode, struct file *file) -{ - return seq_open(file, &slow_work_runqueue_ops); -} - -const struct file_operations slow_work_runqueue_fops = { - .owner = THIS_MODULE, - .open = slow_work_runqueue_open, - .read = seq_read, - .llseek = seq_lseek, - .release = seq_release, -}; diff --git a/kernel/slow-work.c b/kernel/slow-work.c deleted file mode 100644 index 7d3f4fa9ef4..00000000000 --- a/kernel/slow-work.c +++ /dev/null @@ -1,1068 +0,0 @@ -/* Worker thread pool for slow items, such as filesystem lookups or mkdirs - * - * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved. - * Written by David Howells (dhowells@redhat.com) - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public Licence - * as published by the Free Software Foundation; either version - * 2 of the Licence, or (at your option) any later version. - * - * See Documentation/slow-work.txt - */ - -#include <linux/module.h> -#include <linux/slow-work.h> -#include <linux/kthread.h> -#include <linux/freezer.h> -#include <linux/wait.h> -#include <linux/debugfs.h> -#include "slow-work.h" - -static void slow_work_cull_timeout(unsigned long); -static void slow_work_oom_timeout(unsigned long); - -#ifdef CONFIG_SYSCTL -static int slow_work_min_threads_sysctl(struct ctl_table *, int, - void __user *, size_t *, loff_t *); - -static int slow_work_max_threads_sysctl(struct ctl_table *, int , - void __user *, size_t *, loff_t *); -#endif - -/* - * The pool of threads has at least min threads in it as long as someone is - * using the facility, and may have as many as max. - * - * A portion of the pool may be processing very slow operations. - */ -static unsigned slow_work_min_threads = 2; -static unsigned slow_work_max_threads = 4; -static unsigned vslow_work_proportion = 50; /* % of threads that may process - * very slow work */ - -#ifdef CONFIG_SYSCTL -static const int slow_work_min_min_threads = 2; -static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT; -static const int slow_work_min_vslow = 1; -static const int slow_work_max_vslow = 99; - -ctl_table slow_work_sysctls[] = { - { - .procname = "min-threads", - .data = &slow_work_min_threads, - .maxlen = sizeof(unsigned), - .mode = 0644, - .proc_handler = slow_work_min_threads_sysctl, - .extra1 = (void *) &slow_work_min_min_threads, - .extra2 = &slow_work_max_threads, - }, - { - .procname = "max-threads", - .data = &slow_work_max_threads, - .maxlen = sizeof(unsigned), - .mode = 0644, - .proc_handler = slow_work_max_threads_sysctl, - .extra1 = &slow_work_min_threads, - .extra2 = (void *) &slow_work_max_max_threads, - }, - { - .procname = "vslow-percentage", - .data = &vslow_work_proportion, - .maxlen = sizeof(unsigned), - .mode = 0644, - .proc_handler = proc_dointvec_minmax, - .extra1 = (void *) &slow_work_min_vslow, - .extra2 = (void *) &slow_work_max_vslow, - }, - {} -}; -#endif - -/* - * The active state of the thread pool - */ -static atomic_t slow_work_thread_count; -static atomic_t vslow_work_executing_count; - -static bool slow_work_may_not_start_new_thread; -static bool slow_work_cull; /* cull a thread due to lack of activity */ -static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0); -static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0); -static struct slow_work slow_work_new_thread; /* new thread starter */ - -/* - * slow work ID allocation (use slow_work_queue_lock) - */ -static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT); - -/* - * Unregistration tracking to prevent put_ref() from disappearing during module - * unload - */ -#ifdef CONFIG_MODULES -static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT]; -static struct module *slow_work_unreg_module; -static struct slow_work *slow_work_unreg_work_item; -static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq); -static DEFINE_MUTEX(slow_work_unreg_sync_lock); - -static void slow_work_set_thread_processing(int id, struct slow_work *work) -{ - if (work) - slow_work_thread_processing[id] = work->owner; -} -static void slow_work_done_thread_processing(int id, struct slow_work *work) -{ - struct module *module = slow_work_thread_processing[id]; - - slow_work_thread_processing[id] = NULL; - smp_mb(); - if (slow_work_unreg_work_item == work || - slow_work_unreg_module == module) - wake_up_all(&slow_work_unreg_wq); -} -static void slow_work_clear_thread_processing(int id) -{ - slow_work_thread_processing[id] = NULL; -} -#else -static void slow_work_set_thread_processing(int id, struct slow_work *work) {} -static void slow_work_done_thread_processing(int id, struct slow_work *work) {} -static void slow_work_clear_thread_processing(int id) {} -#endif - -/* - * Data for tracking currently executing items for indication through /proc - */ -#ifdef CONFIG_SLOW_WORK_DEBUG -struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT]; -pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT]; -DEFINE_RWLOCK(slow_work_execs_lock); -#endif - -/* - * The queues of work items and the lock governing access to them. These are - * shared between all the CPUs. It doesn't make sense to have per-CPU queues - * as the number of threads bears no relation to the number of CPUs. - * - * There are two queues of work items: one for slow work items, and one for - * very slow work items. - */ -LIST_HEAD(slow_work_queue); -LIST_HEAD(vslow_work_queue); -DEFINE_SPINLOCK(slow_work_queue_lock); - -/* - * The following are two wait queues that get pinged when a work item is placed - * on an empty queue. These allow work items that are hogging a thread by - * sleeping in a way that could be deferred to yield their thread and enqueue - * themselves. - */ -static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation); -static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation); - -/* - * The thread controls. A variable used to signal to the threads that they - * should exit when the queue is empty, a waitqueue used by the threads to wait - * for signals, and a completion set by the last thread to exit. - */ -static bool slow_work_threads_should_exit; -static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq); -static DECLARE_COMPLETION(slow_work_last_thread_exited); - -/* - * The number of users of the thread pool and its lock. Whilst this is zero we - * have no threads hanging around, and when this reaches zero, we wait for all - * active or queued work items to complete and kill all the threads we do have. - */ -static int slow_work_user_count; -static DEFINE_MUTEX(slow_work_user_lock); - -static inline int slow_work_get_ref(struct slow_work *work) -{ - if (work->ops->get_ref) - return work->ops->get_ref(work); - - return 0; -} - -static inline void slow_work_put_ref(struct slow_work *work) -{ - if (work->ops->put_ref) - work->ops->put_ref(work); -} - -/* - * Calculate the maximum number of active threads in the pool that are - * permitted to process very slow work items. - * - * The answer is rounded up to at least 1, but may not equal or exceed the - * maximum number of the threads in the pool. This means we always have at - * least one thread that can process slow work items, and we always have at - * least one thread that won't get tied up doing so. - */ -static unsigned slow_work_calc_vsmax(void) -{ - unsigned vsmax; - - vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion; - vsmax /= 100; - vsmax = max(vsmax, 1U); - return min(vsmax, slow_work_max_threads - 1); -} - -/* - * Attempt to execute stuff queued on a slow thread. Return true if we managed - * it, false if there was nothing to do. - */ -static noinline bool slow_work_execute(int id) -{ - struct slow_work *work = NULL; - unsigned vsmax; - bool very_slow; - - vsmax = slow_work_calc_vsmax(); - - /* see if we can schedule a new thread to be started if we're not - * keeping up with the work */ - if (!waitqueue_active(&slow_work_thread_wq) && - (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) && - atomic_read(&slow_work_thread_count) < slow_work_max_threads && - !slow_work_may_not_start_new_thread) - slow_work_enqueue(&slow_work_new_thread); - - /* find something to execute */ - spin_lock_irq(&slow_work_queue_lock); - if (!list_empty(&vslow_work_queue) && - atomic_read(&vslow_work_executing_count) < vsmax) { - work = list_entry(vslow_work_queue.next, - struct slow_work, link); - if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags)) - BUG(); - list_del_init(&work->link); - atomic_inc(&vslow_work_executing_count); - very_slow = true; - } else if (!list_empty(&slow_work_queue)) { - work = list_entry(slow_work_queue.next, - struct slow_work, link); - if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags)) - BUG(); - list_del_init(&work->link); - very_slow = false; - } else { - very_slow = false; /* avoid the compiler warning */ - } - - slow_work_set_thread_processing(id, work); - if (work) { - slow_work_mark_time(work); - slow_work_begin_exec(id, work); - } - - spin_unlock_irq(&slow_work_queue_lock); - - if (!work) - return false; - - if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags)) - BUG(); - - /* don't execute if the work is in the process of being cancelled */ - if (!test_bit(SLOW_WORK_CANCELLING, &work->flags)) - work->ops->execute(work); - - if (very_slow) - atomic_dec(&vslow_work_executing_count); - clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags); - - /* wake up anyone waiting for this work to be complete */ - wake_up_bit(&work->flags, SLOW_WORK_EXECUTING); - - slow_work_end_exec(id, work); - - /* if someone tried to enqueue the item whilst we were executing it, - * then it'll be left unenqueued to avoid multiple threads trying to - * execute it simultaneously - * - * there is, however, a race between us testing the pending flag and - * getting the spinlock, and between the enqueuer setting the pending - * flag and getting the spinlock, so we use a deferral bit to tell us - * if the enqueuer got there first - */ - if (test_bit(SLOW_WORK_PENDING, &work->flags)) { - spin_lock_irq(&slow_work_queue_lock); - - if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) && - test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) - goto auto_requeue; - - spin_unlock_irq(&slow_work_queue_lock); - } - - /* sort out the race between module unloading and put_ref() */ - slow_work_put_ref(work); - slow_work_done_thread_processing(id, work); - - return true; - -auto_requeue: - /* we must complete the enqueue operation - * - we transfer our ref on the item back to the appropriate queue - * - don't wake another thread up as we're awake already - */ - slow_work_mark_time(work); - if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) - list_add_tail(&work->link, &vslow_work_queue); - else - list_add_tail(&work->link, &slow_work_queue); - spin_unlock_irq(&slow_work_queue_lock); - slow_work_clear_thread_processing(id); - return true; -} - -/** - * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work - * work: The work item under execution that wants to sleep - * _timeout: Scheduler sleep timeout - * - * Allow a requeueable work item to sleep on a slow-work processor thread until - * that thread is needed to do some other work or the sleep is interrupted by - * some other event. - * - * The caller must set up a wake up event before calling this and must have set - * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own - * condition before calling this function as no test is made here. - * - * False is returned if there is nothing on the queue; true is returned if the - * work item should be requeued - */ -bool slow_work_sleep_till_thread_needed(struct slow_work *work, - signed long *_timeout) -{ - wait_queue_head_t *wfo_wq; - struct list_head *queue; - - DEFINE_WAIT(wait); - - if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) { - wfo_wq = &vslow_work_queue_waits_for_occupation; - queue = &vslow_work_queue; - } else { - wfo_wq = &slow_work_queue_waits_for_occupation; - queue = &slow_work_queue; - } - - if (!list_empty(queue)) - return true; - - add_wait_queue_exclusive(wfo_wq, &wait); - if (list_empty(queue)) - *_timeout = schedule_timeout(*_timeout); - finish_wait(wfo_wq, &wait); - - return !list_empty(queue); -} -EXPORT_SYMBOL(slow_work_sleep_till_thread_needed); - -/** - * slow_work_enqueue - Schedule a slow work item for processing - * @work: The work item to queue - * - * Schedule a slow work item for processing. If the item is already undergoing - * execution, this guarantees not to re-enter the execution routine until the - * first execution finishes. - * - * The item is pinned by this function as it retains a reference to it, managed - * through the item operations. The item is unpinned once it has been - * executed. - * - * An item may hog the thread that is running it for a relatively large amount - * of time, sufficient, for example, to perform several lookup, mkdir, create - * and setxattr operations. It may sleep on I/O and may sleep to obtain locks. - * - * Conversely, if a number of items are awaiting processing, it may take some - * time before any given item is given attention. The number of threads in the - * pool may be increased to deal with demand, but only up to a limit. - * - * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in - * the very slow queue, from which only a portion of the threads will be - * allowed to pick items to execute. This ensures that very slow items won't - * overly block ones that are just ordinarily slow. - * - * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is - * attempted queued) - */ -int slow_work_enqueue(struct slow_work *work) -{ - wait_queue_head_t *wfo_wq; - struct list_head *queue; - unsigned long flags; - int ret; - - if (test_bit(SLOW_WORK_CANCELLING, &work->flags)) - return -ECANCELED; - - BUG_ON(slow_work_user_count <= 0); - BUG_ON(!work); - BUG_ON(!work->ops); - - /* when honouring an enqueue request, we only promise that we will run - * the work function in the future; we do not promise to run it once - * per enqueue request - * - * we use the PENDING bit to merge together repeat requests without - * having to disable IRQs and take the spinlock, whilst still - * maintaining our promise - */ - if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) { - if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) { - wfo_wq = &vslow_work_queue_waits_for_occupation; - queue = &vslow_work_queue; - } else { - wfo_wq = &slow_work_queue_waits_for_occupation; - queue = &slow_work_queue; - } - - spin_lock_irqsave(&slow_work_queue_lock, flags); - - if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags))) - goto cancelled; - - /* we promise that we will not attempt to execute the work - * function in more than one thread simultaneously - * - * this, however, leaves us with a problem if we're asked to - * enqueue the work whilst someone is executing the work - * function as simply queueing the work immediately means that - * another thread may try executing it whilst it is already - * under execution - * - * to deal with this, we set the ENQ_DEFERRED bit instead of - * enqueueing, and the thread currently executing the work - * function will enqueue the work item when the work function - * returns and it has cleared the EXECUTING bit - */ - if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) { - set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags); - } else { - ret = slow_work_get_ref(work); - if (ret < 0) - goto failed; - slow_work_mark_time(work); - list_add_tail(&work->link, queue); - wake_up(&slow_work_thread_wq); - - /* if someone who could be requeued is sleeping on a - * thread, then ask them to yield their thread */ - if (work->link.prev == queue) - wake_up(wfo_wq); - } - - spin_unlock_irqrestore(&slow_work_queue_lock, flags); - } - return 0; - -cancelled: - ret = -ECANCELED; -failed: - spin_unlock_irqrestore(&slow_work_queue_lock, flags); - return ret; -} -EXPORT_SYMBOL(slow_work_enqueue); - -static int slow_work_wait(void *word) -{ - schedule(); - return 0; -} - -/** - * slow_work_cancel - Cancel a slow work item - * @work: The work item to cancel - * - * This function will cancel a previously enqueued work item. If we cannot - * cancel the work item, it is guarenteed to have run when this function - * returns. - */ -void slow_work_cancel(struct slow_work *work) -{ - bool wait = true, put = false; - - set_bit(SLOW_WORK_CANCELLING, &work->flags); - smp_mb(); - - /* if the work item is a delayed work item with an active timer, we - * need to wait for the timer to finish _before_ getting the spinlock, - * lest we deadlock against the timer routine - * - * the timer routine will leave DELAYED set if it notices the - * CANCELLING flag in time - */ - if (test_bit(SLOW_WORK_DELAYED, &work->flags)) { - struct delayed_slow_work *dwork = - container_of(work, struct delayed_slow_work, work); - del_timer_sync(&dwork->timer); - } - - spin_lock_irq(&slow_work_queue_lock); - - if (test_bit(SLOW_WORK_DELAYED, &work->flags)) { - /* the timer routine aborted or never happened, so we are left - * holding the timer's reference on the item and should just - * drop the pending flag and wait for any ongoing execution to - * finish */ - struct delayed_slow_work *dwork = - container_of(work, struct delayed_slow_work, work); - - BUG_ON(timer_pending(&dwork->timer)); - BUG_ON(!list_empty(&work->link)); - - clear_bit(SLOW_WORK_DELAYED, &work->flags); - put = true; - clear_bit(SLOW_WORK_PENDING, &work->flags); - - } else if (test_bit(SLOW_WORK_PENDING, &work->flags) && - !list_empty(&work->link)) { - /* the link in the pending queue holds a reference on the item - * that we will need to release */ - list_del_init(&work->link); - wait = false; - put = true; - clear_bit(SLOW_WORK_PENDING, &work->flags); - - } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) { - /* the executor is holding our only reference on the item, so - * we merely need to wait for it to finish executing */ - clear_bit(SLOW_WORK_PENDING, &work->flags); - } - - spin_unlock_irq(&slow_work_queue_lock); - - /* the EXECUTING flag is set by the executor whilst the spinlock is set - * and before the item is dequeued - so assuming the above doesn't - * actually dequeue it, simply waiting for the EXECUTING flag to be - * released here should be sufficient */ - if (wait) - wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait, - TASK_UNINTERRUPTIBLE); - - clear_bit(SLOW_WORK_CANCELLING, &work->flags); - if (put) - slow_work_put_ref(work); -} -EXPORT_SYMBOL(slow_work_cancel); - -/* - * Handle expiry of the delay timer, indicating that a delayed slow work item - * should now be queued if not cancelled - */ -static void delayed_slow_work_timer(unsigned long data) -{ - wait_queue_head_t *wfo_wq; - struct list_head *queue; - struct slow_work *work = (struct slow_work *) data; - unsigned long flags; - bool queued = false, put = false, first = false; - - if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) { - wfo_wq = &vslow_work_queue_waits_for_occupation; - queue = &vslow_work_queue; - } else { - wfo_wq = &slow_work_queue_waits_for_occupation; - queue = &slow_work_queue; - } - - spin_lock_irqsave(&slow_work_queue_lock, flags); - if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) { - clear_bit(SLOW_WORK_DELAYED, &work->flags); - - if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) { - /* we discard the reference the timer was holding in - * favour of the one the executor holds */ - set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags); - put = true; - } else { - slow_work_mark_time(work); - list_add_tail(&work->link, queue); - queued = true; - if (work->link.prev == queue) - first = true; - } - } - - spin_unlock_irqrestore(&slow_work_queue_lock, flags); - if (put) - slow_work_put_ref(work); - if (first) - wake_up(wfo_wq); - if (queued) - wake_up(&slow_work_thread_wq); -} - -/** - * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing - * @dwork: The delayed work item to queue - * @delay: When to start executing the work, in jiffies from now - * - * This is similar to slow_work_enqueue(), but it adds a delay before the work - * is actually queued for processing. - * - * The item can have delayed processing requested on it whilst it is being - * executed. The delay will begin immediately, and if it expires before the - * item finishes executing, the item will be placed back on the queue when it - * has done executing. - */ -int delayed_slow_work_enqueue(struct delayed_slow_work *dwork, - unsigned long delay) -{ - struct slow_work *work = &dwork->work; - unsigned long flags; - int ret; - - if (delay == 0) - return slow_work_enqueue(&dwork->work); - - BUG_ON(slow_work_user_count <= 0); - BUG_ON(!work); - BUG_ON(!work->ops); - - if (test_bit(SLOW_WORK_CANCELLING, &work->flags)) - return -ECANCELED; - - if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) { - spin_lock_irqsave(&slow_work_queue_lock, flags); - - if (test_bit(SLOW_WORK_CANCELLING, &work->flags)) - goto cancelled; - - /* the timer holds a reference whilst it is pending */ - ret = slow_work_get_ref(work); - if (ret < 0) - goto cant_get_ref; - - if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags)) - BUG(); - dwork->timer.expires = jiffies + delay; - dwork->timer.data = (unsigned long) work; - dwork->timer.function = delayed_slow_work_timer; - add_timer(&dwork->timer); - - spin_unlock_irqrestore(&slow_work_queue_lock, flags); - } - - return 0; - -cancelled: - ret = -ECANCELED; -cant_get_ref: - spin_unlock_irqrestore(&slow_work_queue_lock, flags); - return ret; -} -EXPORT_SYMBOL(delayed_slow_work_enqueue); - -/* - * Schedule a cull of the thread pool at some time in the near future - */ -static void slow_work_schedule_cull(void) -{ - mod_timer(&slow_work_cull_timer, - round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT)); -} - -/* - * Worker thread culling algorithm - */ -static bool slow_work_cull_thread(void) -{ - unsigned long flags; - bool do_cull = false; - - spin_lock_irqsave(&slow_work_queue_lock, flags); - - if (slow_work_cull) { - slow_work_cull = false; - - if (list_empty(&slow_work_queue) && - list_empty(&vslow_work_queue) && - atomic_read(&slow_work_thread_count) > - slow_work_min_threads) { - slow_work_schedule_cull(); - do_cull = true; - } - } - - spin_unlock_irqrestore(&slow_work_queue_lock, flags); - return do_cull; -} - -/* - * Determine if there is slow work available for dispatch - */ -static inline bool slow_work_available(int vsmax) -{ - return !list_empty(&slow_work_queue) || - (!list_empty(&vslow_work_queue) && - atomic_read(&vslow_work_executing_count) < vsmax); -} - -/* - * Worker thread dispatcher - */ -static int slow_work_thread(void *_data) -{ - int vsmax, id; - - DEFINE_WAIT(wait); - - set_freezable(); - set_user_nice(current, -5); - - /* allocate ourselves an ID */ - spin_lock_irq(&slow_work_queue_lock); - id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT); - BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT); - __set_bit(id, slow_work_ids); - slow_work_set_thread_pid(id, current->pid); - spin_unlock_irq(&slow_work_queue_lock); - - sprintf(current->comm, "kslowd%03u", id); - - for (;;) { - vsmax = vslow_work_proportion; - vsmax *= atomic_read(&slow_work_thread_count); - vsmax /= 100; - - prepare_to_wait_exclusive(&slow_work_thread_wq, &wait, - TASK_INTERRUPTIBLE); - if (!freezing(current) && - !slow_work_threads_should_exit && - !slow_work_available(vsmax) && - !slow_work_cull) - schedule(); - finish_wait(&slow_work_thread_wq, &wait); - - try_to_freeze(); - - vsmax = vslow_work_proportion; - vsmax *= atomic_read(&slow_work_thread_count); - vsmax /= 100; - - if (slow_work_available(vsmax) && slow_work_execute(id)) { - cond_resched(); - if (list_empty(&slow_work_queue) && - list_empty(&vslow_work_queue) && - atomic_read(&slow_work_thread_count) > - slow_work_min_threads) - slow_work_schedule_cull(); - continue; - } - - if (slow_work_threads_should_exit) - break; - - if (slow_work_cull && slow_work_cull_thread()) - break; - } - - spin_lock_irq(&slow_work_queue_lock); - slow_work_set_thread_pid(id, 0); - __clear_bit(id, slow_work_ids); - spin_unlock_irq(&slow_work_queue_lock); - - if (atomic_dec_and_test(&slow_work_thread_count)) - complete_and_exit(&slow_work_last_thread_exited, 0); - return 0; -} - -/* - * Handle thread cull timer expiration - */ -static void slow_work_cull_timeout(unsigned long data) -{ - slow_work_cull = true; - wake_up(&slow_work_thread_wq); -} - -/* - * Start a new slow work thread - */ -static void slow_work_new_thread_execute(struct slow_work *work) -{ - struct task_struct *p; - - if (slow_work_threads_should_exit) - return; - - if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads) - return; - - if (!mutex_trylock(&slow_work_user_lock)) - return; - - slow_work_may_not_start_new_thread = true; - atomic_inc(&slow_work_thread_count); - p = kthread_run(slow_work_thread, NULL, "kslowd"); - if (IS_ERR(p)) { - printk(KERN_DEBUG "Slow work thread pool: OOM\n"); - if (atomic_dec_and_test(&slow_work_thread_count)) - BUG(); /* we're running on a slow work thread... */ - mod_timer(&slow_work_oom_timer, - round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT)); - } else { - /* ratelimit the starting of new threads */ - mod_timer(&slow_work_oom_timer, jiffies + 1); - } - - mutex_unlock(&slow_work_user_lock); -} - -static const struct slow_work_ops slow_work_new_thread_ops = { - .owner = THIS_MODULE, - .execute = slow_work_new_thread_execute, -#ifdef CONFIG_SLOW_WORK_DEBUG - .desc = slow_work_new_thread_desc, -#endif -}; - -/* - * post-OOM new thread start suppression expiration - */ -static void slow_work_oom_timeout(unsigned long data) -{ - slow_work_may_not_start_new_thread = false; -} - -#ifdef CONFIG_SYSCTL -/* - * Handle adjustment of the minimum number of threads - */ -static int slow_work_min_threads_sysctl(struct ctl_table *table, int write, - void __user *buffer, - size_t *lenp, loff_t *ppos) -{ - int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos); - int n; - - if (ret == 0) { - mutex_lock(&slow_work_user_lock); - if (slow_work_user_count > 0) { - /* see if we need to start or stop threads */ - n = atomic_read(&slow_work_thread_count) - - slow_work_min_threads; - - if (n < 0 && !slow_work_may_not_start_new_thread) - slow_work_enqueue(&slow_work_new_thread); - else if (n > 0) - slow_work_schedule_cull(); - } - mutex_unlock(&slow_work_user_lock); - } - - return ret; -} - -/* - * Handle adjustment of the maximum number of threads - */ -static int slow_work_max_threads_sysctl(struct ctl_table *table, int write, - void __user *buffer, - size_t *lenp, loff_t *ppos) -{ - int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos); - int n; - - if (ret == 0) { - mutex_lock(&slow_work_user_lock); - if (slow_work_user_count > 0) { - /* see if we need to stop threads */ - n = slow_work_max_threads - - atomic_read(&slow_work_thread_count); - - if (n < 0) - slow_work_schedule_cull(); - } - mutex_unlock(&slow_work_user_lock); - } - - return ret; -} -#endif /* CONFIG_SYSCTL */ - -/** - * slow_work_register_user - Register a user of the facility - * @module: The module about to make use of the facility - * - * Register a user of the facility, starting up the initial threads if there - * aren't any other users at this point. This will return 0 if successful, or - * an error if not. - */ -int slow_work_register_user(struct module *module) -{ - struct task_struct *p; - int loop; - - mutex_lock(&slow_work_user_lock); - - if (slow_work_user_count == 0) { - printk(KERN_NOTICE "Slow work thread pool: Starting up\n"); - init_completion(&slow_work_last_thread_exited); - - slow_work_threads_should_exit = false; - slow_work_init(&slow_work_new_thread, - &slow_work_new_thread_ops); - slow_work_may_not_start_new_thread = false; - slow_work_cull = false; - - /* start the minimum number of threads */ - for (loop = 0; loop < slow_work_min_threads; loop++) { - atomic_inc(&slow_work_thread_count); - p = kthread_run(slow_work_thread, NULL, "kslowd"); - if (IS_ERR(p)) - goto error; - } - printk(KERN_NOTICE "Slow work thread pool: Ready\n"); - } - - slow_work_user_count++; - mutex_unlock(&slow_work_user_lock); - return 0; - -error: - if (atomic_dec_and_test(&slow_work_thread_count)) - complete(&slow_work_last_thread_exited); - if (loop > 0) { - printk(KERN_ERR "Slow work thread pool:" - " Aborting startup on ENOMEM\n"); - slow_work_threads_should_exit = true; - wake_up_all(&slow_work_thread_wq); - wait_for_completion(&slow_work_last_thread_exited); - printk(KERN_ERR "Slow work thread pool: Aborted\n"); - } - mutex_unlock(&slow_work_user_lock); - return PTR_ERR(p); -} -EXPORT_SYMBOL(slow_work_register_user); - -/* - * wait for all outstanding items from the calling module to complete - * - note that more items may be queued whilst we're waiting - */ -static void slow_work_wait_for_items(struct module *module) -{ -#ifdef CONFIG_MODULES - DECLARE_WAITQUEUE(myself, current); - struct slow_work *work; - int loop; - - mutex_lock(&slow_work_unreg_sync_lock); - add_wait_queue(&slow_work_unreg_wq, &myself); - - for (;;) { - spin_lock_irq(&slow_work_queue_lock); - - /* first of all, we wait for the last queued item in each list - * to be processed */ - list_for_each_entry_reverse(work, &vslow_work_queue, link) { - if (work->owner == module) { - set_current_state(TASK_UNINTERRUPTIBLE); - slow_work_unreg_work_item = work; - goto do_wait; - } - } - list_for_each_entry_reverse(work, &slow_work_queue, link) { - if (work->owner == module) { - set_current_state(TASK_UNINTERRUPTIBLE); - slow_work_unreg_work_item = work; - goto do_wait; - } - } - - /* then we wait for the items being processed to finish */ - slow_work_unreg_module = module; - smp_mb(); - for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) { - if (slow_work_thread_processing[loop] == module) - goto do_wait; - } - spin_unlock_irq(&slow_work_queue_lock); - break; /* okay, we're done */ - - do_wait: - spin_unlock_irq(&slow_work_queue_lock); - schedule(); - slow_work_unreg_work_item = NULL; - slow_work_unreg_module = NULL; - } - - remove_wait_queue(&slow_work_unreg_wq, &myself); - mutex_unlock(&slow_work_unreg_sync_lock); -#endif /* CONFIG_MODULES */ -} - -/** - * slow_work_unregister_user - Unregister a user of the facility - * @module: The module whose items should be cleared - * - * Unregister a user of the facility, killing all the threads if this was the - * last one. - * - * This waits for all the work items belonging to the nominated module to go - * away before proceeding. - */ -void slow_work_unregister_user(struct module *module) -{ - /* first of all, wait for all outstanding items from the calling module - * to complete */ - if (module) - slow_work_wait_for_items(module); - - /* then we can actually go about shutting down the facility if need - * be */ - mutex_lock(&slow_work_user_lock); - - BUG_ON(slow_work_user_count <= 0); - - slow_work_user_count--; - if (slow_work_user_count == 0) { - printk(KERN_NOTICE "Slow work thread pool: Shutting down\n"); - slow_work_threads_should_exit = true; - del_timer_sync(&slow_work_cull_timer); - del_timer_sync(&slow_work_oom_timer); - wake_up_all(&slow_work_thread_wq); - wait_for_completion(&slow_work_last_thread_exited); - printk(KERN_NOTICE "Slow work thread pool:" - " Shut down complete\n"); - } - - mutex_unlock(&slow_work_user_lock); -} -EXPORT_SYMBOL(slow_work_unregister_user); - -/* - * Initialise the slow work facility - */ -static int __init init_slow_work(void) -{ - unsigned nr_cpus = num_possible_cpus(); - - if (slow_work_max_threads < nr_cpus) - slow_work_max_threads = nr_cpus; -#ifdef CONFIG_SYSCTL - if (slow_work_max_max_threads < nr_cpus * 2) - slow_work_max_max_threads = nr_cpus * 2; -#endif -#ifdef CONFIG_SLOW_WORK_DEBUG - { - struct dentry *dbdir; - - dbdir = debugfs_create_dir("slow_work", NULL); - if (dbdir && !IS_ERR(dbdir)) - debugfs_create_file("runqueue", S_IFREG | 0400, dbdir, - NULL, &slow_work_runqueue_fops); - } -#endif - return 0; -} - -subsys_initcall(init_slow_work); diff --git a/kernel/slow-work.h b/kernel/slow-work.h deleted file mode 100644 index a29ebd1ef41..00000000000 --- a/kernel/slow-work.h +++ /dev/null @@ -1,72 +0,0 @@ -/* Slow work private definitions - * - * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved. - * Written by David Howells (dhowells@redhat.com) - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public Licence - * as published by the Free Software Foundation; either version - * 2 of the Licence, or (at your option) any later version. - */ - -#define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of - * things to do */ -#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after - * OOM */ - -#define SLOW_WORK_THREAD_LIMIT 255 /* abs maximum number of slow-work threads */ - -/* - * slow-work.c - */ -#ifdef CONFIG_SLOW_WORK_DEBUG -extern struct slow_work *slow_work_execs[]; -extern pid_t slow_work_pids[]; -extern rwlock_t slow_work_execs_lock; -#endif - -extern struct list_head slow_work_queue; -extern struct list_head vslow_work_queue; -extern spinlock_t slow_work_queue_lock; - -/* - * slow-work-debugfs.c - */ -#ifdef CONFIG_SLOW_WORK_DEBUG -extern const struct file_operations slow_work_runqueue_fops; - -extern void slow_work_new_thread_desc(struct slow_work *, struct seq_file *); -#endif - -/* - * Helper functions - */ -static inline void slow_work_set_thread_pid(int id, pid_t pid) -{ -#ifdef CONFIG_SLOW_WORK_DEBUG - slow_work_pids[id] = pid; -#endif -} - -static inline void slow_work_mark_time(struct slow_work *work) -{ -#ifdef CONFIG_SLOW_WORK_DEBUG - work->mark = CURRENT_TIME; -#endif -} - -static inline void slow_work_begin_exec(int id, struct slow_work *work) -{ -#ifdef CONFIG_SLOW_WORK_DEBUG - slow_work_execs[id] = work; -#endif -} - -static inline void slow_work_end_exec(int id, struct slow_work *work) -{ -#ifdef CONFIG_SLOW_WORK_DEBUG - write_lock(&slow_work_execs_lock); - slow_work_execs[id] = NULL; - write_unlock(&slow_work_execs_lock); -#endif -} diff --git a/kernel/sysctl.c b/kernel/sysctl.c index 9acfce0cdfd..6d850bf0a51 100644 --- a/kernel/sysctl.c +++ b/kernel/sysctl.c @@ -50,7 +50,6 @@ #include <linux/acpi.h> #include <linux/reboot.h> #include <linux/ftrace.h> -#include <linux/slow-work.h> #include <linux/perf_event.h> #include <linux/kprobes.h> #include <linux/pipe_fs_i.h> @@ -917,13 +916,6 @@ static struct ctl_table kern_table[] = { .proc_handler = proc_dointvec, }, #endif -#ifdef CONFIG_SLOW_WORK - { - .procname = "slow-work", - .mode = 0555, - .child = slow_work_sysctls, - }, -#endif #ifdef CONFIG_PERF_EVENTS { .procname = "perf_event_paranoid", diff --git a/kernel/trace/Kconfig b/kernel/trace/Kconfig index 6eb97bbdefb..538501c6ea5 100644 --- a/kernel/trace/Kconfig +++ b/kernel/trace/Kconfig @@ -323,17 +323,6 @@ config STACK_TRACER Say N if unsure. -config WORKQUEUE_TRACER - bool "Trace workqueues" - select GENERIC_TRACER - help - The workqueue tracer provides some statistical information - about each cpu workqueue thread such as the number of the - works inserted and executed since their creation. It can help - to evaluate the amount of work each of them has to perform. - For example it can help a developer to decide whether he should - choose a per-cpu workqueue instead of a singlethreaded one. - config BLK_DEV_IO_TRACE bool "Support for tracing block IO actions" depends on SYSFS diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 59fef1531dd..9ca34cddaf6 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -33,41 +33,272 @@ #include <linux/kallsyms.h> #include <linux/debug_locks.h> #include <linux/lockdep.h> -#define CREATE_TRACE_POINTS -#include <trace/events/workqueue.h> +#include <linux/idr.h> + +#include "workqueue_sched.h" + +enum { + /* global_cwq flags */ + GCWQ_MANAGE_WORKERS = 1 << 0, /* need to manage workers */ + GCWQ_MANAGING_WORKERS = 1 << 1, /* managing workers */ + GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */ + GCWQ_FREEZING = 1 << 3, /* freeze in progress */ + GCWQ_HIGHPRI_PENDING = 1 << 4, /* highpri works on queue */ + + /* worker flags */ + WORKER_STARTED = 1 << 0, /* started */ + WORKER_DIE = 1 << 1, /* die die die */ + WORKER_IDLE = 1 << 2, /* is idle */ + WORKER_PREP = 1 << 3, /* preparing to run works */ + WORKER_ROGUE = 1 << 4, /* not bound to any cpu */ + WORKER_REBIND = 1 << 5, /* mom is home, come back */ + WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */ + WORKER_UNBOUND = 1 << 7, /* worker is unbound */ + + WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND | + WORKER_CPU_INTENSIVE | WORKER_UNBOUND, + + /* gcwq->trustee_state */ + TRUSTEE_START = 0, /* start */ + TRUSTEE_IN_CHARGE = 1, /* trustee in charge of gcwq */ + TRUSTEE_BUTCHER = 2, /* butcher workers */ + TRUSTEE_RELEASE = 3, /* release workers */ + TRUSTEE_DONE = 4, /* trustee is done */ + + BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ + BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, + BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1, + + MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */ + IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */ + + MAYDAY_INITIAL_TIMEOUT = HZ / 100, /* call for help after 10ms */ + MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */ + CREATE_COOLDOWN = HZ, /* time to breath after fail */ + TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */ + + /* + * Rescue workers are used only on emergencies and shared by + * all cpus. Give -20. + */ + RESCUER_NICE_LEVEL = -20, +}; /* - * The per-CPU workqueue (if single thread, we always use the first - * possible cpu). + * Structure fields follow one of the following exclusion rules. + * + * I: Set during initialization and read-only afterwards. + * + * P: Preemption protected. Disabling preemption is enough and should + * only be modified and accessed from the local cpu. + * + * L: gcwq->lock protected. Access with gcwq->lock held. + * + * X: During normal operation, modification requires gcwq->lock and + * should be done only from local cpu. Either disabling preemption + * on local cpu or grabbing gcwq->lock is enough for read access. + * If GCWQ_DISASSOCIATED is set, it's identical to L. + * + * F: wq->flush_mutex protected. + * + * W: workqueue_lock protected. */ -struct cpu_workqueue_struct { - spinlock_t lock; +struct global_cwq; - struct list_head worklist; - wait_queue_head_t more_work; - struct work_struct *current_work; +/* + * The poor guys doing the actual heavy lifting. All on-duty workers + * are either serving the manager role, on idle list or on busy hash. + */ +struct worker { + /* on idle list while idle, on busy hash table while busy */ + union { + struct list_head entry; /* L: while idle */ + struct hlist_node hentry; /* L: while busy */ + }; - struct workqueue_struct *wq; - struct task_struct *thread; -} ____cacheline_aligned; + struct work_struct *current_work; /* L: work being processed */ + struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */ + struct list_head scheduled; /* L: scheduled works */ + struct task_struct *task; /* I: worker task */ + struct global_cwq *gcwq; /* I: the associated gcwq */ + /* 64 bytes boundary on 64bit, 32 on 32bit */ + unsigned long last_active; /* L: last active timestamp */ + unsigned int flags; /* X: flags */ + int id; /* I: worker id */ + struct work_struct rebind_work; /* L: rebind worker to cpu */ +}; + +/* + * Global per-cpu workqueue. There's one and only one for each cpu + * and all works are queued and processed here regardless of their + * target workqueues. + */ +struct global_cwq { + spinlock_t lock; /* the gcwq lock */ + struct list_head worklist; /* L: list of pending works */ + unsigned int cpu; /* I: the associated cpu */ + unsigned int flags; /* L: GCWQ_* flags */ + + int nr_workers; /* L: total number of workers */ + int nr_idle; /* L: currently idle ones */ + + /* workers are chained either in the idle_list or busy_hash */ + struct list_head idle_list; /* X: list of idle workers */ + struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE]; + /* L: hash of busy workers */ + + struct timer_list idle_timer; /* L: worker idle timeout */ + struct timer_list mayday_timer; /* L: SOS timer for dworkers */ + + struct ida worker_ida; /* L: for worker IDs */ + + struct task_struct *trustee; /* L: for gcwq shutdown */ + unsigned int trustee_state; /* L: trustee state */ + wait_queue_head_t trustee_wait; /* trustee wait */ + struct worker *first_idle; /* L: first idle worker */ +} ____cacheline_aligned_in_smp; + +/* + * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of + * work_struct->data are used for flags and thus cwqs need to be + * aligned at two's power of the number of flag bits. + */ +struct cpu_workqueue_struct { + struct global_cwq *gcwq; /* I: the associated gcwq */ + struct workqueue_struct *wq; /* I: the owning workqueue */ + int work_color; /* L: current color */ + int flush_color; /* L: flushing color */ + int nr_in_flight[WORK_NR_COLORS]; + /* L: nr of in_flight works */ + int nr_active; /* L: nr of active works */ + int max_active; /* L: max active works */ + struct list_head delayed_works; /* L: delayed works */ +}; + +/* + * Structure used to wait for workqueue flush. + */ +struct wq_flusher { + struct list_head list; /* F: list of flushers */ + int flush_color; /* F: flush color waiting for */ + struct completion done; /* flush completion */ +}; + +/* + * All cpumasks are assumed to be always set on UP and thus can't be + * used to determine whether there's something to be done. + */ +#ifdef CONFIG_SMP +typedef cpumask_var_t mayday_mask_t; +#define mayday_test_and_set_cpu(cpu, mask) \ + cpumask_test_and_set_cpu((cpu), (mask)) +#define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask)) +#define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask)) +#define alloc_mayday_mask(maskp, gfp) alloc_cpumask_var((maskp), (gfp)) +#define free_mayday_mask(mask) free_cpumask_var((mask)) +#else +typedef unsigned long mayday_mask_t; +#define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask)) +#define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask)) +#define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask)) +#define alloc_mayday_mask(maskp, gfp) true +#define free_mayday_mask(mask) do { } while (0) +#endif /* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */ struct workqueue_struct { - struct cpu_workqueue_struct *cpu_wq; - struct list_head list; - const char *name; - int singlethread; - int freezeable; /* Freeze threads during suspend */ - int rt; + unsigned int flags; /* I: WQ_* flags */ + union { + struct cpu_workqueue_struct __percpu *pcpu; + struct cpu_workqueue_struct *single; + unsigned long v; + } cpu_wq; /* I: cwq's */ + struct list_head list; /* W: list of all workqueues */ + + struct mutex flush_mutex; /* protects wq flushing */ + int work_color; /* F: current work color */ + int flush_color; /* F: current flush color */ + atomic_t nr_cwqs_to_flush; /* flush in progress */ + struct wq_flusher *first_flusher; /* F: first flusher */ + struct list_head flusher_queue; /* F: flush waiters */ + struct list_head flusher_overflow; /* F: flush overflow list */ + + mayday_mask_t mayday_mask; /* cpus requesting rescue */ + struct worker *rescuer; /* I: rescue worker */ + + int saved_max_active; /* W: saved cwq max_active */ + const char *name; /* I: workqueue name */ #ifdef CONFIG_LOCKDEP - struct lockdep_map lockdep_map; + struct lockdep_map lockdep_map; #endif }; +struct workqueue_struct *system_wq __read_mostly; +struct workqueue_struct *system_long_wq __read_mostly; +struct workqueue_struct *system_nrt_wq __read_mostly; +struct workqueue_struct *system_unbound_wq __read_mostly; +EXPORT_SYMBOL_GPL(system_wq); +EXPORT_SYMBOL_GPL(system_long_wq); +EXPORT_SYMBOL_GPL(system_nrt_wq); +EXPORT_SYMBOL_GPL(system_unbound_wq); + +#define for_each_busy_worker(worker, i, pos, gcwq) \ + for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \ + hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry) + +static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask, + unsigned int sw) +{ + if (cpu < nr_cpu_ids) { + if (sw & 1) { + cpu = cpumask_next(cpu, mask); + if (cpu < nr_cpu_ids) + return cpu; + } + if (sw & 2) + return WORK_CPU_UNBOUND; + } + return WORK_CPU_NONE; +} + +static inline int __next_wq_cpu(int cpu, const struct cpumask *mask, + struct workqueue_struct *wq) +{ + return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2); +} + +/* + * CPU iterators + * + * An extra gcwq is defined for an invalid cpu number + * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any + * specific CPU. The following iterators are similar to + * for_each_*_cpu() iterators but also considers the unbound gcwq. + * + * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND + * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND + * for_each_cwq_cpu() : possible CPUs for bound workqueues, + * WORK_CPU_UNBOUND for unbound workqueues + */ +#define for_each_gcwq_cpu(cpu) \ + for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \ + (cpu) < WORK_CPU_NONE; \ + (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3)) + +#define for_each_online_gcwq_cpu(cpu) \ + for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3); \ + (cpu) < WORK_CPU_NONE; \ + (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3)) + +#define for_each_cwq_cpu(cpu, wq) \ + for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq)); \ + (cpu) < WORK_CPU_NONE; \ + (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq))) + #ifdef CONFIG_LOCKDEP /** * in_workqueue_context() - in context of specified workqueue? @@ -122,7 +353,7 @@ static int work_fixup_activate(void *addr, enum debug_obj_state state) * statically initialized. We just make sure that it * is tracked in the object tracker. */ - if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) { + if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) { debug_object_init(work, &work_debug_descr); debug_object_activate(work, &work_debug_descr); return 0; @@ -196,94 +427,575 @@ static inline void debug_work_deactivate(struct work_struct *work) { } /* Serializes the accesses to the list of workqueues. */ static DEFINE_SPINLOCK(workqueue_lock); static LIST_HEAD(workqueues); +static bool workqueue_freezing; /* W: have wqs started freezing? */ + +/* + * The almighty global cpu workqueues. nr_running is the only field + * which is expected to be used frequently by other cpus via + * try_to_wake_up(). Put it in a separate cacheline. + */ +static DEFINE_PER_CPU(struct global_cwq, global_cwq); +static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running); + +/* + * Global cpu workqueue and nr_running counter for unbound gcwq. The + * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its + * workers have WORKER_UNBOUND set. + */ +static struct global_cwq unbound_global_cwq; +static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0); /* always 0 */ + +static int worker_thread(void *__worker); + +static struct global_cwq *get_gcwq(unsigned int cpu) +{ + if (cpu != WORK_CPU_UNBOUND) + return &per_cpu(global_cwq, cpu); + else + return &unbound_global_cwq; +} + +static atomic_t *get_gcwq_nr_running(unsigned int cpu) +{ + if (cpu != WORK_CPU_UNBOUND) + return &per_cpu(gcwq_nr_running, cpu); + else + return &unbound_gcwq_nr_running; +} + +static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, + struct workqueue_struct *wq) +{ + if (!(wq->flags & WQ_UNBOUND)) { + if (likely(cpu < nr_cpu_ids)) { +#ifdef CONFIG_SMP + return per_cpu_ptr(wq->cpu_wq.pcpu, cpu); +#else + return wq->cpu_wq.single; +#endif + } + } else if (likely(cpu == WORK_CPU_UNBOUND)) + return wq->cpu_wq.single; + return NULL; +} + +static unsigned int work_color_to_flags(int color) +{ + return color << WORK_STRUCT_COLOR_SHIFT; +} + +static int get_work_color(struct work_struct *work) +{ + return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) & + ((1 << WORK_STRUCT_COLOR_BITS) - 1); +} + +static int work_next_color(int color) +{ + return (color + 1) % WORK_NR_COLORS; +} -static int singlethread_cpu __read_mostly; -static const struct cpumask *cpu_singlethread_map __read_mostly; /* - * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD - * flushes cwq->worklist. This means that flush_workqueue/wait_on_work - * which comes in between can't use for_each_online_cpu(). We could - * use cpu_possible_map, the cpumask below is more a documentation - * than optimization. + * A work's data points to the cwq with WORK_STRUCT_CWQ set while the + * work is on queue. Once execution starts, WORK_STRUCT_CWQ is + * cleared and the work data contains the cpu number it was last on. + * + * set_work_{cwq|cpu}() and clear_work_data() can be used to set the + * cwq, cpu or clear work->data. These functions should only be + * called while the work is owned - ie. while the PENDING bit is set. + * + * get_work_[g]cwq() can be used to obtain the gcwq or cwq + * corresponding to a work. gcwq is available once the work has been + * queued anywhere after initialization. cwq is available only from + * queueing until execution starts. */ -static cpumask_var_t cpu_populated_map __read_mostly; +static inline void set_work_data(struct work_struct *work, unsigned long data, + unsigned long flags) +{ + BUG_ON(!work_pending(work)); + atomic_long_set(&work->data, data | flags | work_static(work)); +} -/* If it's single threaded, it isn't in the list of workqueues. */ -static inline int is_wq_single_threaded(struct workqueue_struct *wq) +static void set_work_cwq(struct work_struct *work, + struct cpu_workqueue_struct *cwq, + unsigned long extra_flags) { - return wq->singlethread; + set_work_data(work, (unsigned long)cwq, + WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); } -static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) +static void set_work_cpu(struct work_struct *work, unsigned int cpu) { - return is_wq_single_threaded(wq) - ? cpu_singlethread_map : cpu_populated_map; + set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); } -static -struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) +static void clear_work_data(struct work_struct *work) { - if (unlikely(is_wq_single_threaded(wq))) - cpu = singlethread_cpu; - return per_cpu_ptr(wq->cpu_wq, cpu); + set_work_data(work, WORK_STRUCT_NO_CPU, 0); +} + +static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work) +{ + unsigned long data = atomic_long_read(&work->data); + + if (data & WORK_STRUCT_CWQ) + return (void *)(data & WORK_STRUCT_WQ_DATA_MASK); + else + return NULL; +} + +static struct global_cwq *get_work_gcwq(struct work_struct *work) +{ + unsigned long data = atomic_long_read(&work->data); + unsigned int cpu; + + if (data & WORK_STRUCT_CWQ) + return ((struct cpu_workqueue_struct *) + (data & WORK_STRUCT_WQ_DATA_MASK))->gcwq; + + cpu = data >> WORK_STRUCT_FLAG_BITS; + if (cpu == WORK_CPU_NONE) + return NULL; + + BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND); + return get_gcwq(cpu); } /* - * Set the workqueue on which a work item is to be run - * - Must *only* be called if the pending flag is set + * Policy functions. These define the policies on how the global + * worker pool is managed. Unless noted otherwise, these functions + * assume that they're being called with gcwq->lock held. */ -static inline void set_wq_data(struct work_struct *work, - struct cpu_workqueue_struct *cwq) + +static bool __need_more_worker(struct global_cwq *gcwq) { - unsigned long new; + return !atomic_read(get_gcwq_nr_running(gcwq->cpu)) || + gcwq->flags & GCWQ_HIGHPRI_PENDING; +} - BUG_ON(!work_pending(work)); +/* + * Need to wake up a worker? Called from anything but currently + * running workers. + */ +static bool need_more_worker(struct global_cwq *gcwq) +{ + return !list_empty(&gcwq->worklist) && __need_more_worker(gcwq); +} + +/* Can I start working? Called from busy but !running workers. */ +static bool may_start_working(struct global_cwq *gcwq) +{ + return gcwq->nr_idle; +} + +/* Do I need to keep working? Called from currently running workers. */ +static bool keep_working(struct global_cwq *gcwq) +{ + atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu); + + return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1; +} + +/* Do we need a new worker? Called from manager. */ +static bool need_to_create_worker(struct global_cwq *gcwq) +{ + return need_more_worker(gcwq) && !may_start_working(gcwq); +} - new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); - new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); - atomic_long_set(&work->data, new); +/* Do I need to be the manager? */ +static bool need_to_manage_workers(struct global_cwq *gcwq) +{ + return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS; +} + +/* Do we have too many workers and should some go away? */ +static bool too_many_workers(struct global_cwq *gcwq) +{ + bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS; + int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */ + int nr_busy = gcwq->nr_workers - nr_idle; + + return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy; } /* - * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued. + * Wake up functions. + */ + +/* Return the first worker. Safe with preemption disabled */ +static struct worker *first_worker(struct global_cwq *gcwq) +{ + if (unlikely(list_empty(&gcwq->idle_list))) + return NULL; + + return list_first_entry(&gcwq->idle_list, struct worker, entry); +} + +/** + * wake_up_worker - wake up an idle worker + * @gcwq: gcwq to wake worker for + * + * Wake up the first idle worker of @gcwq. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void wake_up_worker(struct global_cwq *gcwq) +{ + struct worker *worker = first_worker(gcwq); + + if (likely(worker)) + wake_up_process(worker->task); +} + +/** + * wq_worker_waking_up - a worker is waking up + * @task: task waking up + * @cpu: CPU @task is waking up to + * + * This function is called during try_to_wake_up() when a worker is + * being awoken. + * + * CONTEXT: + * spin_lock_irq(rq->lock) + */ +void wq_worker_waking_up(struct task_struct *task, unsigned int cpu) +{ + struct worker *worker = kthread_data(task); + + if (likely(!(worker->flags & WORKER_NOT_RUNNING))) + atomic_inc(get_gcwq_nr_running(cpu)); +} + +/** + * wq_worker_sleeping - a worker is going to sleep + * @task: task going to sleep + * @cpu: CPU in question, must be the current CPU number + * + * This function is called during schedule() when a busy worker is + * going to sleep. Worker on the same cpu can be woken up by + * returning pointer to its task. + * + * CONTEXT: + * spin_lock_irq(rq->lock) + * + * RETURNS: + * Worker task on @cpu to wake up, %NULL if none. + */ +struct task_struct *wq_worker_sleeping(struct task_struct *task, + unsigned int cpu) +{ + struct worker *worker = kthread_data(task), *to_wakeup = NULL; + struct global_cwq *gcwq = get_gcwq(cpu); + atomic_t *nr_running = get_gcwq_nr_running(cpu); + + if (unlikely(worker->flags & WORKER_NOT_RUNNING)) + return NULL; + + /* this can only happen on the local cpu */ + BUG_ON(cpu != raw_smp_processor_id()); + + /* + * The counterpart of the following dec_and_test, implied mb, + * worklist not empty test sequence is in insert_work(). + * Please read comment there. + * + * NOT_RUNNING is clear. This means that trustee is not in + * charge and we're running on the local cpu w/ rq lock held + * and preemption disabled, which in turn means that none else + * could be manipulating idle_list, so dereferencing idle_list + * without gcwq lock is safe. + */ + if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist)) + to_wakeup = first_worker(gcwq); + return to_wakeup ? to_wakeup->task : NULL; +} + +/** + * worker_set_flags - set worker flags and adjust nr_running accordingly + * @worker: self + * @flags: flags to set + * @wakeup: wakeup an idle worker if necessary + * + * Set @flags in @worker->flags and adjust nr_running accordingly. If + * nr_running becomes zero and @wakeup is %true, an idle worker is + * woken up. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) + */ +static inline void worker_set_flags(struct worker *worker, unsigned int flags, + bool wakeup) +{ + struct global_cwq *gcwq = worker->gcwq; + + WARN_ON_ONCE(worker->task != current); + + /* + * If transitioning into NOT_RUNNING, adjust nr_running and + * wake up an idle worker as necessary if requested by + * @wakeup. + */ + if ((flags & WORKER_NOT_RUNNING) && + !(worker->flags & WORKER_NOT_RUNNING)) { + atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu); + + if (wakeup) { + if (atomic_dec_and_test(nr_running) && + !list_empty(&gcwq->worklist)) + wake_up_worker(gcwq); + } else + atomic_dec(nr_running); + } + + worker->flags |= flags; +} + +/** + * worker_clr_flags - clear worker flags and adjust nr_running accordingly + * @worker: self + * @flags: flags to clear + * + * Clear @flags in @worker->flags and adjust nr_running accordingly. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) + */ +static inline void worker_clr_flags(struct worker *worker, unsigned int flags) +{ + struct global_cwq *gcwq = worker->gcwq; + unsigned int oflags = worker->flags; + + WARN_ON_ONCE(worker->task != current); + + worker->flags &= ~flags; + + /* if transitioning out of NOT_RUNNING, increment nr_running */ + if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING)) + if (!(worker->flags & WORKER_NOT_RUNNING)) + atomic_inc(get_gcwq_nr_running(gcwq->cpu)); +} + +/** + * busy_worker_head - return the busy hash head for a work + * @gcwq: gcwq of interest + * @work: work to be hashed + * + * Return hash head of @gcwq for @work. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + * + * RETURNS: + * Pointer to the hash head. + */ +static struct hlist_head *busy_worker_head(struct global_cwq *gcwq, + struct work_struct *work) +{ + const int base_shift = ilog2(sizeof(struct work_struct)); + unsigned long v = (unsigned long)work; + + /* simple shift and fold hash, do we need something better? */ + v >>= base_shift; + v += v >> BUSY_WORKER_HASH_ORDER; + v &= BUSY_WORKER_HASH_MASK; + + return &gcwq->busy_hash[v]; +} + +/** + * __find_worker_executing_work - find worker which is executing a work + * @gcwq: gcwq of interest + * @bwh: hash head as returned by busy_worker_head() + * @work: work to find worker for + * + * Find a worker which is executing @work on @gcwq. @bwh should be + * the hash head obtained by calling busy_worker_head() with the same + * work. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + * + * RETURNS: + * Pointer to worker which is executing @work if found, NULL + * otherwise. + */ +static struct worker *__find_worker_executing_work(struct global_cwq *gcwq, + struct hlist_head *bwh, + struct work_struct *work) +{ + struct worker *worker; + struct hlist_node *tmp; + + hlist_for_each_entry(worker, tmp, bwh, hentry) + if (worker->current_work == work) + return worker; + return NULL; +} + +/** + * find_worker_executing_work - find worker which is executing a work + * @gcwq: gcwq of interest + * @work: work to find worker for + * + * Find a worker which is executing @work on @gcwq. This function is + * identical to __find_worker_executing_work() except that this + * function calculates @bwh itself. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + * + * RETURNS: + * Pointer to worker which is executing @work if found, NULL + * otherwise. */ -static inline void clear_wq_data(struct work_struct *work) +static struct worker *find_worker_executing_work(struct global_cwq *gcwq, + struct work_struct *work) { - unsigned long flags = *work_data_bits(work) & - (1UL << WORK_STRUCT_STATIC); - atomic_long_set(&work->data, flags); + return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work), + work); } -static inline -struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) +/** + * gcwq_determine_ins_pos - find insertion position + * @gcwq: gcwq of interest + * @cwq: cwq a work is being queued for + * + * A work for @cwq is about to be queued on @gcwq, determine insertion + * position for the work. If @cwq is for HIGHPRI wq, the work is + * queued at the head of the queue but in FIFO order with respect to + * other HIGHPRI works; otherwise, at the end of the queue. This + * function also sets GCWQ_HIGHPRI_PENDING flag to hint @gcwq that + * there are HIGHPRI works pending. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + * + * RETURNS: + * Pointer to inserstion position. + */ +static inline struct list_head *gcwq_determine_ins_pos(struct global_cwq *gcwq, + struct cpu_workqueue_struct *cwq) { - return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); + struct work_struct *twork; + + if (likely(!(cwq->wq->flags & WQ_HIGHPRI))) + return &gcwq->worklist; + + list_for_each_entry(twork, &gcwq->worklist, entry) { + struct cpu_workqueue_struct *tcwq = get_work_cwq(twork); + + if (!(tcwq->wq->flags & WQ_HIGHPRI)) + break; + } + + gcwq->flags |= GCWQ_HIGHPRI_PENDING; + return &twork->entry; } +/** + * insert_work - insert a work into gcwq + * @cwq: cwq @work belongs to + * @work: work to insert + * @head: insertion point + * @extra_flags: extra WORK_STRUCT_* flags to set + * + * Insert @work which belongs to @cwq into @gcwq after @head. + * @extra_flags is or'd to work_struct flags. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ static void insert_work(struct cpu_workqueue_struct *cwq, - struct work_struct *work, struct list_head *head) + struct work_struct *work, struct list_head *head, + unsigned int extra_flags) { - trace_workqueue_insertion(cwq->thread, work); + struct global_cwq *gcwq = cwq->gcwq; + + /* we own @work, set data and link */ + set_work_cwq(work, cwq, extra_flags); - set_wq_data(work, cwq); /* * Ensure that we get the right work->data if we see the * result of list_add() below, see try_to_grab_pending(). */ smp_wmb(); + list_add_tail(&work->entry, head); - wake_up(&cwq->more_work); + + /* + * Ensure either worker_sched_deactivated() sees the above + * list_add_tail() or we see zero nr_running to avoid workers + * lying around lazily while there are works to be processed. + */ + smp_mb(); + + if (__need_more_worker(gcwq)) + wake_up_worker(gcwq); } -static void __queue_work(struct cpu_workqueue_struct *cwq, +static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, struct work_struct *work) { + struct global_cwq *gcwq; + struct cpu_workqueue_struct *cwq; + struct list_head *worklist; unsigned long flags; debug_work_activate(work); - spin_lock_irqsave(&cwq->lock, flags); - insert_work(cwq, work, &cwq->worklist); - spin_unlock_irqrestore(&cwq->lock, flags); + + /* determine gcwq to use */ + if (!(wq->flags & WQ_UNBOUND)) { + struct global_cwq *last_gcwq; + + if (unlikely(cpu == WORK_CPU_UNBOUND)) + cpu = raw_smp_processor_id(); + + /* + * It's multi cpu. If @wq is non-reentrant and @work + * was previously on a different cpu, it might still + * be running there, in which case the work needs to + * be queued on that cpu to guarantee non-reentrance. + */ + gcwq = get_gcwq(cpu); + if (wq->flags & WQ_NON_REENTRANT && + (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { + struct worker *worker; + + spin_lock_irqsave(&last_gcwq->lock, flags); + + worker = find_worker_executing_work(last_gcwq, work); + + if (worker && worker->current_cwq->wq == wq) + gcwq = last_gcwq; + else { + /* meh... not running there, queue here */ + spin_unlock_irqrestore(&last_gcwq->lock, flags); + spin_lock_irqsave(&gcwq->lock, flags); + } + } else + spin_lock_irqsave(&gcwq->lock, flags); + } else { + gcwq = get_gcwq(WORK_CPU_UNBOUND); + spin_lock_irqsave(&gcwq->lock, flags); + } + + /* gcwq determined, get cwq and queue */ + cwq = get_cwq(gcwq->cpu, wq); + + BUG_ON(!list_empty(&work->entry)); + + cwq->nr_in_flight[cwq->work_color]++; + + if (likely(cwq->nr_active < cwq->max_active)) { + cwq->nr_active++; + worklist = gcwq_determine_ins_pos(gcwq, cwq); + } else + worklist = &cwq->delayed_works; + + insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color)); + + spin_unlock_irqrestore(&gcwq->lock, flags); } /** @@ -323,9 +1035,8 @@ queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) { int ret = 0; - if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { - BUG_ON(!list_empty(&work->entry)); - __queue_work(wq_per_cpu(wq, cpu), work); + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { + __queue_work(cpu, wq, work); ret = 1; } return ret; @@ -335,10 +1046,9 @@ EXPORT_SYMBOL_GPL(queue_work_on); static void delayed_work_timer_fn(unsigned long __data) { struct delayed_work *dwork = (struct delayed_work *)__data; - struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); - struct workqueue_struct *wq = cwq->wq; + struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); - __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); + __queue_work(smp_processor_id(), cwq->wq, &dwork->work); } /** @@ -375,14 +1085,31 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, struct timer_list *timer = &dwork->timer; struct work_struct *work = &dwork->work; - if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { + unsigned int lcpu; + BUG_ON(timer_pending(timer)); BUG_ON(!list_empty(&work->entry)); timer_stats_timer_set_start_info(&dwork->timer); - /* This stores cwq for the moment, for the timer_fn */ - set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); + /* + * This stores cwq for the moment, for the timer_fn. + * Note that the work's gcwq is preserved to allow + * reentrance detection for delayed works. + */ + if (!(wq->flags & WQ_UNBOUND)) { + struct global_cwq *gcwq = get_work_gcwq(work); + + if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND) + lcpu = gcwq->cpu; + else + lcpu = raw_smp_processor_id(); + } else + lcpu = WORK_CPU_UNBOUND; + + set_work_cwq(work, get_cwq(lcpu, wq), 0); + timer->expires = jiffies + delay; timer->data = (unsigned long)dwork; timer->function = delayed_work_timer_fn; @@ -397,80 +1124,872 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, } EXPORT_SYMBOL_GPL(queue_delayed_work_on); -static void run_workqueue(struct cpu_workqueue_struct *cwq) +/** + * worker_enter_idle - enter idle state + * @worker: worker which is entering idle state + * + * @worker is entering idle state. Update stats and idle timer if + * necessary. + * + * LOCKING: + * spin_lock_irq(gcwq->lock). + */ +static void worker_enter_idle(struct worker *worker) { - spin_lock_irq(&cwq->lock); - while (!list_empty(&cwq->worklist)) { - struct work_struct *work = list_entry(cwq->worklist.next, - struct work_struct, entry); - work_func_t f = work->func; -#ifdef CONFIG_LOCKDEP + struct global_cwq *gcwq = worker->gcwq; + + BUG_ON(worker->flags & WORKER_IDLE); + BUG_ON(!list_empty(&worker->entry) && + (worker->hentry.next || worker->hentry.pprev)); + + /* can't use worker_set_flags(), also called from start_worker() */ + worker->flags |= WORKER_IDLE; + gcwq->nr_idle++; + worker->last_active = jiffies; + + /* idle_list is LIFO */ + list_add(&worker->entry, &gcwq->idle_list); + + if (likely(!(worker->flags & WORKER_ROGUE))) { + if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer)) + mod_timer(&gcwq->idle_timer, + jiffies + IDLE_WORKER_TIMEOUT); + } else + wake_up_all(&gcwq->trustee_wait); + + /* sanity check nr_running */ + WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle && + atomic_read(get_gcwq_nr_running(gcwq->cpu))); +} + +/** + * worker_leave_idle - leave idle state + * @worker: worker which is leaving idle state + * + * @worker is leaving idle state. Update stats. + * + * LOCKING: + * spin_lock_irq(gcwq->lock). + */ +static void worker_leave_idle(struct worker *worker) +{ + struct global_cwq *gcwq = worker->gcwq; + + BUG_ON(!(worker->flags & WORKER_IDLE)); + worker_clr_flags(worker, WORKER_IDLE); + gcwq->nr_idle--; + list_del_init(&worker->entry); +} + +/** + * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq + * @worker: self + * + * Works which are scheduled while the cpu is online must at least be + * scheduled to a worker which is bound to the cpu so that if they are + * flushed from cpu callbacks while cpu is going down, they are + * guaranteed to execute on the cpu. + * + * This function is to be used by rogue workers and rescuers to bind + * themselves to the target cpu and may race with cpu going down or + * coming online. kthread_bind() can't be used because it may put the + * worker to already dead cpu and set_cpus_allowed_ptr() can't be used + * verbatim as it's best effort and blocking and gcwq may be + * [dis]associated in the meantime. + * + * This function tries set_cpus_allowed() and locks gcwq and verifies + * the binding against GCWQ_DISASSOCIATED which is set during + * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters + * idle state or fetches works without dropping lock, it can guarantee + * the scheduling requirement described in the first paragraph. + * + * CONTEXT: + * Might sleep. Called without any lock but returns with gcwq->lock + * held. + * + * RETURNS: + * %true if the associated gcwq is online (@worker is successfully + * bound), %false if offline. + */ +static bool worker_maybe_bind_and_lock(struct worker *worker) +{ + struct global_cwq *gcwq = worker->gcwq; + struct task_struct *task = worker->task; + + while (true) { /* - * It is permissible to free the struct work_struct - * from inside the function that is called from it, - * this we need to take into account for lockdep too. - * To avoid bogus "held lock freed" warnings as well - * as problems when looking into work->lockdep_map, - * make a copy and use that here. + * The following call may fail, succeed or succeed + * without actually migrating the task to the cpu if + * it races with cpu hotunplug operation. Verify + * against GCWQ_DISASSOCIATED. */ - struct lockdep_map lockdep_map = work->lockdep_map; -#endif - trace_workqueue_execution(cwq->thread, work); - debug_work_deactivate(work); - cwq->current_work = work; - list_del_init(cwq->worklist.next); - spin_unlock_irq(&cwq->lock); - - BUG_ON(get_wq_data(work) != cwq); - work_clear_pending(work); - lock_map_acquire(&cwq->wq->lockdep_map); - lock_map_acquire(&lockdep_map); - f(work); - lock_map_release(&lockdep_map); - lock_map_release(&cwq->wq->lockdep_map); - - if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { - printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " - "%s/0x%08x/%d\n", - current->comm, preempt_count(), - task_pid_nr(current)); - printk(KERN_ERR " last function: "); - print_symbol("%s\n", (unsigned long)f); - debug_show_held_locks(current); - dump_stack(); + if (!(gcwq->flags & GCWQ_DISASSOCIATED)) + set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu)); + + spin_lock_irq(&gcwq->lock); + if (gcwq->flags & GCWQ_DISASSOCIATED) + return false; + if (task_cpu(task) == gcwq->cpu && + cpumask_equal(¤t->cpus_allowed, + get_cpu_mask(gcwq->cpu))) + return true; + spin_unlock_irq(&gcwq->lock); + + /* CPU has come up inbetween, retry migration */ + cpu_relax(); + } +} + +/* + * Function for worker->rebind_work used to rebind rogue busy workers + * to the associated cpu which is coming back online. This is + * scheduled by cpu up but can race with other cpu hotplug operations + * and may be executed twice without intervening cpu down. + */ +static void worker_rebind_fn(struct work_struct *work) +{ + struct worker *worker = container_of(work, struct worker, rebind_work); + struct global_cwq *gcwq = worker->gcwq; + + if (worker_maybe_bind_and_lock(worker)) + worker_clr_flags(worker, WORKER_REBIND); + + spin_unlock_irq(&gcwq->lock); +} + +static struct worker *alloc_worker(void) +{ + struct worker *worker; + + worker = kzalloc(sizeof(*worker), GFP_KERNEL); + if (worker) { + INIT_LIST_HEAD(&worker->entry); + INIT_LIST_HEAD(&worker->scheduled); + INIT_WORK(&worker->rebind_work, worker_rebind_fn); + /* on creation a worker is in !idle && prep state */ + worker->flags = WORKER_PREP; + } + return worker; +} + +/** + * create_worker - create a new workqueue worker + * @gcwq: gcwq the new worker will belong to + * @bind: whether to set affinity to @cpu or not + * + * Create a new worker which is bound to @gcwq. The returned worker + * can be started by calling start_worker() or destroyed using + * destroy_worker(). + * + * CONTEXT: + * Might sleep. Does GFP_KERNEL allocations. + * + * RETURNS: + * Pointer to the newly created worker. + */ +static struct worker *create_worker(struct global_cwq *gcwq, bool bind) +{ + bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND; + struct worker *worker = NULL; + int id = -1; + + spin_lock_irq(&gcwq->lock); + while (ida_get_new(&gcwq->worker_ida, &id)) { + spin_unlock_irq(&gcwq->lock); + if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL)) + goto fail; + spin_lock_irq(&gcwq->lock); + } + spin_unlock_irq(&gcwq->lock); + + worker = alloc_worker(); + if (!worker) + goto fail; + + worker->gcwq = gcwq; + worker->id = id; + + if (!on_unbound_cpu) + worker->task = kthread_create(worker_thread, worker, + "kworker/%u:%d", gcwq->cpu, id); + else + worker->task = kthread_create(worker_thread, worker, + "kworker/u:%d", id); + if (IS_ERR(worker->task)) + goto fail; + + /* + * A rogue worker will become a regular one if CPU comes + * online later on. Make sure every worker has + * PF_THREAD_BOUND set. + */ + if (bind && !on_unbound_cpu) + kthread_bind(worker->task, gcwq->cpu); + else { + worker->task->flags |= PF_THREAD_BOUND; + if (on_unbound_cpu) + worker->flags |= WORKER_UNBOUND; + } + + return worker; +fail: + if (id >= 0) { + spin_lock_irq(&gcwq->lock); + ida_remove(&gcwq->worker_ida, id); + spin_unlock_irq(&gcwq->lock); + } + kfree(worker); + return NULL; +} + +/** + * start_worker - start a newly created worker + * @worker: worker to start + * + * Make the gcwq aware of @worker and start it. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void start_worker(struct worker *worker) +{ + worker->flags |= WORKER_STARTED; + worker->gcwq->nr_workers++; + worker_enter_idle(worker); + wake_up_process(worker->task); +} + +/** + * destroy_worker - destroy a workqueue worker + * @worker: worker to be destroyed + * + * Destroy @worker and adjust @gcwq stats accordingly. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which is released and regrabbed. + */ +static void destroy_worker(struct worker *worker) +{ + struct global_cwq *gcwq = worker->gcwq; + int id = worker->id; + + /* sanity check frenzy */ + BUG_ON(worker->current_work); + BUG_ON(!list_empty(&worker->scheduled)); + + if (worker->flags & WORKER_STARTED) + gcwq->nr_workers--; + if (worker->flags & WORKER_IDLE) + gcwq->nr_idle--; + + list_del_init(&worker->entry); + worker->flags |= WORKER_DIE; + + spin_unlock_irq(&gcwq->lock); + + kthread_stop(worker->task); + kfree(worker); + + spin_lock_irq(&gcwq->lock); + ida_remove(&gcwq->worker_ida, id); +} + +static void idle_worker_timeout(unsigned long __gcwq) +{ + struct global_cwq *gcwq = (void *)__gcwq; + + spin_lock_irq(&gcwq->lock); + + if (too_many_workers(gcwq)) { + struct worker *worker; + unsigned long expires; + + /* idle_list is kept in LIFO order, check the last one */ + worker = list_entry(gcwq->idle_list.prev, struct worker, entry); + expires = worker->last_active + IDLE_WORKER_TIMEOUT; + + if (time_before(jiffies, expires)) + mod_timer(&gcwq->idle_timer, expires); + else { + /* it's been idle for too long, wake up manager */ + gcwq->flags |= GCWQ_MANAGE_WORKERS; + wake_up_worker(gcwq); + } + } + + spin_unlock_irq(&gcwq->lock); +} + +static bool send_mayday(struct work_struct *work) +{ + struct cpu_workqueue_struct *cwq = get_work_cwq(work); + struct workqueue_struct *wq = cwq->wq; + unsigned int cpu; + + if (!(wq->flags & WQ_RESCUER)) + return false; + + /* mayday mayday mayday */ + cpu = cwq->gcwq->cpu; + /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */ + if (cpu == WORK_CPU_UNBOUND) + cpu = 0; + if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask)) + wake_up_process(wq->rescuer->task); + return true; +} + +static void gcwq_mayday_timeout(unsigned long __gcwq) +{ + struct global_cwq *gcwq = (void *)__gcwq; + struct work_struct *work; + + spin_lock_irq(&gcwq->lock); + + if (need_to_create_worker(gcwq)) { + /* + * We've been trying to create a new worker but + * haven't been successful. We might be hitting an + * allocation deadlock. Send distress signals to + * rescuers. + */ + list_for_each_entry(work, &gcwq->worklist, entry) + send_mayday(work); + } + + spin_unlock_irq(&gcwq->lock); + + mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL); +} + +/** + * maybe_create_worker - create a new worker if necessary + * @gcwq: gcwq to create a new worker for + * + * Create a new worker for @gcwq if necessary. @gcwq is guaranteed to + * have at least one idle worker on return from this function. If + * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is + * sent to all rescuers with works scheduled on @gcwq to resolve + * possible allocation deadlock. + * + * On return, need_to_create_worker() is guaranteed to be false and + * may_start_working() true. + * + * LOCKING: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. Does GFP_KERNEL allocations. Called only from + * manager. + * + * RETURNS: + * false if no action was taken and gcwq->lock stayed locked, true + * otherwise. + */ +static bool maybe_create_worker(struct global_cwq *gcwq) +{ + if (!need_to_create_worker(gcwq)) + return false; +restart: + spin_unlock_irq(&gcwq->lock); + + /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */ + mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT); + + while (true) { + struct worker *worker; + + worker = create_worker(gcwq, true); + if (worker) { + del_timer_sync(&gcwq->mayday_timer); + spin_lock_irq(&gcwq->lock); + start_worker(worker); + BUG_ON(need_to_create_worker(gcwq)); + return true; + } + + if (!need_to_create_worker(gcwq)) + break; + + __set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(CREATE_COOLDOWN); + + if (!need_to_create_worker(gcwq)) + break; + } + + del_timer_sync(&gcwq->mayday_timer); + spin_lock_irq(&gcwq->lock); + if (need_to_create_worker(gcwq)) + goto restart; + return true; +} + +/** + * maybe_destroy_worker - destroy workers which have been idle for a while + * @gcwq: gcwq to destroy workers for + * + * Destroy @gcwq workers which have been idle for longer than + * IDLE_WORKER_TIMEOUT. + * + * LOCKING: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. Called only from manager. + * + * RETURNS: + * false if no action was taken and gcwq->lock stayed locked, true + * otherwise. + */ +static bool maybe_destroy_workers(struct global_cwq *gcwq) +{ + bool ret = false; + + while (too_many_workers(gcwq)) { + struct worker *worker; + unsigned long expires; + + worker = list_entry(gcwq->idle_list.prev, struct worker, entry); + expires = worker->last_active + IDLE_WORKER_TIMEOUT; + + if (time_before(jiffies, expires)) { + mod_timer(&gcwq->idle_timer, expires); + break; } - spin_lock_irq(&cwq->lock); - cwq->current_work = NULL; + destroy_worker(worker); + ret = true; } - spin_unlock_irq(&cwq->lock); + + return ret; } -static int worker_thread(void *__cwq) +/** + * manage_workers - manage worker pool + * @worker: self + * + * Assume the manager role and manage gcwq worker pool @worker belongs + * to. At any given time, there can be only zero or one manager per + * gcwq. The exclusion is handled automatically by this function. + * + * The caller can safely start processing works on false return. On + * true return, it's guaranteed that need_to_create_worker() is false + * and may_start_working() is true. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. Does GFP_KERNEL allocations. + * + * RETURNS: + * false if no action was taken and gcwq->lock stayed locked, true if + * some action was taken. + */ +static bool manage_workers(struct worker *worker) { - struct cpu_workqueue_struct *cwq = __cwq; - DEFINE_WAIT(wait); + struct global_cwq *gcwq = worker->gcwq; + bool ret = false; - if (cwq->wq->freezeable) - set_freezable(); + if (gcwq->flags & GCWQ_MANAGING_WORKERS) + return ret; - for (;;) { - prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); - if (!freezing(current) && - !kthread_should_stop() && - list_empty(&cwq->worklist)) - schedule(); - finish_wait(&cwq->more_work, &wait); + gcwq->flags &= ~GCWQ_MANAGE_WORKERS; + gcwq->flags |= GCWQ_MANAGING_WORKERS; - try_to_freeze(); + /* + * Destroy and then create so that may_start_working() is true + * on return. + */ + ret |= maybe_destroy_workers(gcwq); + ret |= maybe_create_worker(gcwq); + + gcwq->flags &= ~GCWQ_MANAGING_WORKERS; + + /* + * The trustee might be waiting to take over the manager + * position, tell it we're done. + */ + if (unlikely(gcwq->trustee)) + wake_up_all(&gcwq->trustee_wait); + + return ret; +} + +/** + * move_linked_works - move linked works to a list + * @work: start of series of works to be scheduled + * @head: target list to append @work to + * @nextp: out paramter for nested worklist walking + * + * Schedule linked works starting from @work to @head. Work series to + * be scheduled starts at @work and includes any consecutive work with + * WORK_STRUCT_LINKED set in its predecessor. + * + * If @nextp is not NULL, it's updated to point to the next work of + * the last scheduled work. This allows move_linked_works() to be + * nested inside outer list_for_each_entry_safe(). + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void move_linked_works(struct work_struct *work, struct list_head *head, + struct work_struct **nextp) +{ + struct work_struct *n; - if (kthread_should_stop()) + /* + * Linked worklist will always end before the end of the list, + * use NULL for list head. + */ + list_for_each_entry_safe_from(work, n, NULL, entry) { + list_move_tail(&work->entry, head); + if (!(*work_data_bits(work) & WORK_STRUCT_LINKED)) break; + } + + /* + * If we're already inside safe list traversal and have moved + * multiple works to the scheduled queue, the next position + * needs to be updated. + */ + if (nextp) + *nextp = n; +} - run_workqueue(cwq); +static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq) +{ + struct work_struct *work = list_first_entry(&cwq->delayed_works, + struct work_struct, entry); + struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq); + + move_linked_works(work, pos, NULL); + cwq->nr_active++; +} + +/** + * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight + * @cwq: cwq of interest + * @color: color of work which left the queue + * + * A work either has completed or is removed from pending queue, + * decrement nr_in_flight of its cwq and handle workqueue flushing. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) +{ + /* ignore uncolored works */ + if (color == WORK_NO_COLOR) + return; + + cwq->nr_in_flight[color]--; + cwq->nr_active--; + + if (!list_empty(&cwq->delayed_works)) { + /* one down, submit a delayed one */ + if (cwq->nr_active < cwq->max_active) + cwq_activate_first_delayed(cwq); } - return 0; + /* is flush in progress and are we at the flushing tip? */ + if (likely(cwq->flush_color != color)) + return; + + /* are there still in-flight works? */ + if (cwq->nr_in_flight[color]) + return; + + /* this cwq is done, clear flush_color */ + cwq->flush_color = -1; + + /* + * If this was the last cwq, wake up the first flusher. It + * will handle the rest. + */ + if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush)) + complete(&cwq->wq->first_flusher->done); +} + +/** + * process_one_work - process single work + * @worker: self + * @work: work to process + * + * Process @work. This function contains all the logics necessary to + * process a single work including synchronization against and + * interaction with other workers on the same cpu, queueing and + * flushing. As long as context requirement is met, any worker can + * call this function to process a work. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which is released and regrabbed. + */ +static void process_one_work(struct worker *worker, struct work_struct *work) +{ + struct cpu_workqueue_struct *cwq = get_work_cwq(work); + struct global_cwq *gcwq = cwq->gcwq; + struct hlist_head *bwh = busy_worker_head(gcwq, work); + bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE; + work_func_t f = work->func; + int work_color; + struct worker *collision; +#ifdef CONFIG_LOCKDEP + /* + * It is permissible to free the struct work_struct from + * inside the function that is called from it, this we need to + * take into account for lockdep too. To avoid bogus "held + * lock freed" warnings as well as problems when looking into + * work->lockdep_map, make a copy and use that here. + */ + struct lockdep_map lockdep_map = work->lockdep_map; +#endif + /* + * A single work shouldn't be executed concurrently by + * multiple workers on a single cpu. Check whether anyone is + * already processing the work. If so, defer the work to the + * currently executing one. + */ + collision = __find_worker_executing_work(gcwq, bwh, work); + if (unlikely(collision)) { + move_linked_works(work, &collision->scheduled, NULL); + return; + } + + /* claim and process */ + debug_work_deactivate(work); + hlist_add_head(&worker->hentry, bwh); + worker->current_work = work; + worker->current_cwq = cwq; + work_color = get_work_color(work); + + /* record the current cpu number in the work data and dequeue */ + set_work_cpu(work, gcwq->cpu); + list_del_init(&work->entry); + + /* + * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI, + * wake up another worker; otherwise, clear HIGHPRI_PENDING. + */ + if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) { + struct work_struct *nwork = list_first_entry(&gcwq->worklist, + struct work_struct, entry); + + if (!list_empty(&gcwq->worklist) && + get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI) + wake_up_worker(gcwq); + else + gcwq->flags &= ~GCWQ_HIGHPRI_PENDING; + } + + /* + * CPU intensive works don't participate in concurrency + * management. They're the scheduler's responsibility. + */ + if (unlikely(cpu_intensive)) + worker_set_flags(worker, WORKER_CPU_INTENSIVE, true); + + spin_unlock_irq(&gcwq->lock); + + work_clear_pending(work); + lock_map_acquire(&cwq->wq->lockdep_map); + lock_map_acquire(&lockdep_map); + f(work); + lock_map_release(&lockdep_map); + lock_map_release(&cwq->wq->lockdep_map); + + if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { + printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " + "%s/0x%08x/%d\n", + current->comm, preempt_count(), task_pid_nr(current)); + printk(KERN_ERR " last function: "); + print_symbol("%s\n", (unsigned long)f); + debug_show_held_locks(current); + dump_stack(); + } + + spin_lock_irq(&gcwq->lock); + + /* clear cpu intensive status */ + if (unlikely(cpu_intensive)) + worker_clr_flags(worker, WORKER_CPU_INTENSIVE); + + /* we're done with it, release */ + hlist_del_init(&worker->hentry); + worker->current_work = NULL; + worker->current_cwq = NULL; + cwq_dec_nr_in_flight(cwq, work_color); +} + +/** + * process_scheduled_works - process scheduled works + * @worker: self + * + * Process all scheduled works. Please note that the scheduled list + * may change while processing a work, so this function repeatedly + * fetches a work from the top and executes it. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. + */ +static void process_scheduled_works(struct worker *worker) +{ + while (!list_empty(&worker->scheduled)) { + struct work_struct *work = list_first_entry(&worker->scheduled, + struct work_struct, entry); + process_one_work(worker, work); + } +} + +/** + * worker_thread - the worker thread function + * @__worker: self + * + * The gcwq worker thread function. There's a single dynamic pool of + * these per each cpu. These workers process all works regardless of + * their specific target workqueue. The only exception is works which + * belong to workqueues with a rescuer which will be explained in + * rescuer_thread(). + */ +static int worker_thread(void *__worker) +{ + struct worker *worker = __worker; + struct global_cwq *gcwq = worker->gcwq; + + /* tell the scheduler that this is a workqueue worker */ + worker->task->flags |= PF_WQ_WORKER; +woke_up: + spin_lock_irq(&gcwq->lock); + + /* DIE can be set only while we're idle, checking here is enough */ + if (worker->flags & WORKER_DIE) { + spin_unlock_irq(&gcwq->lock); + worker->task->flags &= ~PF_WQ_WORKER; + return 0; + } + + worker_leave_idle(worker); +recheck: + /* no more worker necessary? */ + if (!need_more_worker(gcwq)) + goto sleep; + + /* do we need to manage? */ + if (unlikely(!may_start_working(gcwq)) && manage_workers(worker)) + goto recheck; + + /* + * ->scheduled list can only be filled while a worker is + * preparing to process a work or actually processing it. + * Make sure nobody diddled with it while I was sleeping. + */ + BUG_ON(!list_empty(&worker->scheduled)); + + /* + * When control reaches this point, we're guaranteed to have + * at least one idle worker or that someone else has already + * assumed the manager role. + */ + worker_clr_flags(worker, WORKER_PREP); + + do { + struct work_struct *work = + list_first_entry(&gcwq->worklist, + struct work_struct, entry); + + if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { + /* optimization path, not strictly necessary */ + process_one_work(worker, work); + if (unlikely(!list_empty(&worker->scheduled))) + process_scheduled_works(worker); + } else { + move_linked_works(work, &worker->scheduled, NULL); + process_scheduled_works(worker); + } + } while (keep_working(gcwq)); + + worker_set_flags(worker, WORKER_PREP, false); +sleep: + if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker)) + goto recheck; + + /* + * gcwq->lock is held and there's no work to process and no + * need to manage, sleep. Workers are woken up only while + * holding gcwq->lock or from local cpu, so setting the + * current state before releasing gcwq->lock is enough to + * prevent losing any event. + */ + worker_enter_idle(worker); + __set_current_state(TASK_INTERRUPTIBLE); + spin_unlock_irq(&gcwq->lock); + schedule(); + goto woke_up; +} + +/** + * rescuer_thread - the rescuer thread function + * @__wq: the associated workqueue + * + * Workqueue rescuer thread function. There's one rescuer for each + * workqueue which has WQ_RESCUER set. + * + * Regular work processing on a gcwq may block trying to create a new + * worker which uses GFP_KERNEL allocation which has slight chance of + * developing into deadlock if some works currently on the same queue + * need to be processed to satisfy the GFP_KERNEL allocation. This is + * the problem rescuer solves. + * + * When such condition is possible, the gcwq summons rescuers of all + * workqueues which have works queued on the gcwq and let them process + * those works so that forward progress can be guaranteed. + * + * This should happen rarely. + */ +static int rescuer_thread(void *__wq) +{ + struct workqueue_struct *wq = __wq; + struct worker *rescuer = wq->rescuer; + struct list_head *scheduled = &rescuer->scheduled; + bool is_unbound = wq->flags & WQ_UNBOUND; + unsigned int cpu; + + set_user_nice(current, RESCUER_NICE_LEVEL); +repeat: + set_current_state(TASK_INTERRUPTIBLE); + + if (kthread_should_stop()) + return 0; + + /* + * See whether any cpu is asking for help. Unbounded + * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND. + */ + for_each_mayday_cpu(cpu, wq->mayday_mask) { + unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu; + struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq); + struct global_cwq *gcwq = cwq->gcwq; + struct work_struct *work, *n; + + __set_current_state(TASK_RUNNING); + mayday_clear_cpu(cpu, wq->mayday_mask); + + /* migrate to the target cpu if possible */ + rescuer->gcwq = gcwq; + worker_maybe_bind_and_lock(rescuer); + + /* + * Slurp in all works issued via this workqueue and + * process'em. + */ + BUG_ON(!list_empty(&rescuer->scheduled)); + list_for_each_entry_safe(work, n, &gcwq->worklist, entry) + if (get_work_cwq(work) == cwq) + move_linked_works(work, scheduled, &n); + + process_scheduled_works(rescuer); + spin_unlock_irq(&gcwq->lock); + } + + schedule(); + goto repeat; } struct wq_barrier { @@ -484,44 +2003,137 @@ static void wq_barrier_func(struct work_struct *work) complete(&barr->done); } +/** + * insert_wq_barrier - insert a barrier work + * @cwq: cwq to insert barrier into + * @barr: wq_barrier to insert + * @target: target work to attach @barr to + * @worker: worker currently executing @target, NULL if @target is not executing + * + * @barr is linked to @target such that @barr is completed only after + * @target finishes execution. Please note that the ordering + * guarantee is observed only with respect to @target and on the local + * cpu. + * + * Currently, a queued barrier can't be canceled. This is because + * try_to_grab_pending() can't determine whether the work to be + * grabbed is at the head of the queue and thus can't clear LINKED + * flag of the previous work while there must be a valid next work + * after a work with LINKED flag set. + * + * Note that when @worker is non-NULL, @target may be modified + * underneath us, so we can't reliably determine cwq from @target. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, - struct wq_barrier *barr, struct list_head *head) + struct wq_barrier *barr, + struct work_struct *target, struct worker *worker) { + struct list_head *head; + unsigned int linked = 0; + /* - * debugobject calls are safe here even with cwq->lock locked + * debugobject calls are safe here even with gcwq->lock locked * as we know for sure that this will not trigger any of the * checks and call back into the fixup functions where we * might deadlock. */ INIT_WORK_ON_STACK(&barr->work, wq_barrier_func); - __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); - + __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); init_completion(&barr->done); + /* + * If @target is currently being executed, schedule the + * barrier to the worker; otherwise, put it after @target. + */ + if (worker) + head = worker->scheduled.next; + else { + unsigned long *bits = work_data_bits(target); + + head = target->entry.next; + /* there can already be other linked works, inherit and set */ + linked = *bits & WORK_STRUCT_LINKED; + __set_bit(WORK_STRUCT_LINKED_BIT, bits); + } + debug_work_activate(&barr->work); - insert_work(cwq, &barr->work, head); + insert_work(cwq, &barr->work, head, + work_color_to_flags(WORK_NO_COLOR) | linked); } -static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) +/** + * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing + * @wq: workqueue being flushed + * @flush_color: new flush color, < 0 for no-op + * @work_color: new work color, < 0 for no-op + * + * Prepare cwqs for workqueue flushing. + * + * If @flush_color is non-negative, flush_color on all cwqs should be + * -1. If no cwq has in-flight commands at the specified color, all + * cwq->flush_color's stay at -1 and %false is returned. If any cwq + * has in flight commands, its cwq->flush_color is set to + * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq + * wakeup logic is armed and %true is returned. + * + * The caller should have initialized @wq->first_flusher prior to + * calling this function with non-negative @flush_color. If + * @flush_color is negative, no flush color update is done and %false + * is returned. + * + * If @work_color is non-negative, all cwqs should have the same + * work_color which is previous to @work_color and all will be + * advanced to @work_color. + * + * CONTEXT: + * mutex_lock(wq->flush_mutex). + * + * RETURNS: + * %true if @flush_color >= 0 and there's something to flush. %false + * otherwise. + */ +static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq, + int flush_color, int work_color) { - int active = 0; - struct wq_barrier barr; - - WARN_ON(cwq->thread == current); + bool wait = false; + unsigned int cpu; - spin_lock_irq(&cwq->lock); - if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { - insert_wq_barrier(cwq, &barr, &cwq->worklist); - active = 1; + if (flush_color >= 0) { + BUG_ON(atomic_read(&wq->nr_cwqs_to_flush)); + atomic_set(&wq->nr_cwqs_to_flush, 1); } - spin_unlock_irq(&cwq->lock); - if (active) { - wait_for_completion(&barr.done); - destroy_work_on_stack(&barr.work); + for_each_cwq_cpu(cpu, wq) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + struct global_cwq *gcwq = cwq->gcwq; + + spin_lock_irq(&gcwq->lock); + + if (flush_color >= 0) { + BUG_ON(cwq->flush_color != -1); + + if (cwq->nr_in_flight[flush_color]) { + cwq->flush_color = flush_color; + atomic_inc(&wq->nr_cwqs_to_flush); + wait = true; + } + } + + if (work_color >= 0) { + BUG_ON(work_color != work_next_color(cwq->work_color)); + cwq->work_color = work_color; + } + + spin_unlock_irq(&gcwq->lock); } - return active; + if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush)) + complete(&wq->first_flusher->done); + + return wait; } /** @@ -533,20 +2145,150 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) * * We sleep until all works which were queued on entry have been handled, * but we are not livelocked by new incoming ones. - * - * This function used to run the workqueues itself. Now we just wait for the - * helper threads to do it. */ void flush_workqueue(struct workqueue_struct *wq) { - const struct cpumask *cpu_map = wq_cpu_map(wq); - int cpu; + struct wq_flusher this_flusher = { + .list = LIST_HEAD_INIT(this_flusher.list), + .flush_color = -1, + .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done), + }; + int next_color; - might_sleep(); lock_map_acquire(&wq->lockdep_map); lock_map_release(&wq->lockdep_map); - for_each_cpu(cpu, cpu_map) - flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); + + mutex_lock(&wq->flush_mutex); + + /* + * Start-to-wait phase + */ + next_color = work_next_color(wq->work_color); + + if (next_color != wq->flush_color) { + /* + * Color space is not full. The current work_color + * becomes our flush_color and work_color is advanced + * by one. + */ + BUG_ON(!list_empty(&wq->flusher_overflow)); + this_flusher.flush_color = wq->work_color; + wq->work_color = next_color; + + if (!wq->first_flusher) { + /* no flush in progress, become the first flusher */ + BUG_ON(wq->flush_color != this_flusher.flush_color); + + wq->first_flusher = &this_flusher; + + if (!flush_workqueue_prep_cwqs(wq, wq->flush_color, + wq->work_color)) { + /* nothing to flush, done */ + wq->flush_color = next_color; + wq->first_flusher = NULL; + goto out_unlock; + } + } else { + /* wait in queue */ + BUG_ON(wq->flush_color == this_flusher.flush_color); + list_add_tail(&this_flusher.list, &wq->flusher_queue); + flush_workqueue_prep_cwqs(wq, -1, wq->work_color); + } + } else { + /* + * Oops, color space is full, wait on overflow queue. + * The next flush completion will assign us + * flush_color and transfer to flusher_queue. + */ + list_add_tail(&this_flusher.list, &wq->flusher_overflow); + } + + mutex_unlock(&wq->flush_mutex); + + wait_for_completion(&this_flusher.done); + + /* + * Wake-up-and-cascade phase + * + * First flushers are responsible for cascading flushes and + * handling overflow. Non-first flushers can simply return. + */ + if (wq->first_flusher != &this_flusher) + return; + + mutex_lock(&wq->flush_mutex); + + /* we might have raced, check again with mutex held */ + if (wq->first_flusher != &this_flusher) + goto out_unlock; + + wq->first_flusher = NULL; + + BUG_ON(!list_empty(&this_flusher.list)); + BUG_ON(wq->flush_color != this_flusher.flush_color); + + while (true) { + struct wq_flusher *next, *tmp; + + /* complete all the flushers sharing the current flush color */ + list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) { + if (next->flush_color != wq->flush_color) + break; + list_del_init(&next->list); + complete(&next->done); + } + + BUG_ON(!list_empty(&wq->flusher_overflow) && + wq->flush_color != work_next_color(wq->work_color)); + + /* this flush_color is finished, advance by one */ + wq->flush_color = work_next_color(wq->flush_color); + + /* one color has been freed, handle overflow queue */ + if (!list_empty(&wq->flusher_overflow)) { + /* + * Assign the same color to all overflowed + * flushers, advance work_color and append to + * flusher_queue. This is the start-to-wait + * phase for these overflowed flushers. + */ + list_for_each_entry(tmp, &wq->flusher_overflow, list) + tmp->flush_color = wq->work_color; + + wq->work_color = work_next_color(wq->work_color); + + list_splice_tail_init(&wq->flusher_overflow, + &wq->flusher_queue); + flush_workqueue_prep_cwqs(wq, -1, wq->work_color); + } + + if (list_empty(&wq->flusher_queue)) { + BUG_ON(wq->flush_color != wq->work_color); + break; + } + + /* + * Need to flush more colors. Make the next flusher + * the new first flusher and arm cwqs. + */ + BUG_ON(wq->flush_color == wq->work_color); + BUG_ON(wq->flush_color != next->flush_color); + + list_del_init(&next->list); + wq->first_flusher = next; + + if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1)) + break; + + /* + * Meh... this color is already done, clear first + * flusher and repeat cascading. + */ + wq->first_flusher = NULL; + } + +out_unlock: + mutex_unlock(&wq->flush_mutex); } EXPORT_SYMBOL_GPL(flush_workqueue); @@ -562,43 +2304,46 @@ EXPORT_SYMBOL_GPL(flush_workqueue); */ int flush_work(struct work_struct *work) { + struct worker *worker = NULL; + struct global_cwq *gcwq; struct cpu_workqueue_struct *cwq; - struct list_head *prev; struct wq_barrier barr; might_sleep(); - cwq = get_wq_data(work); - if (!cwq) + gcwq = get_work_gcwq(work); + if (!gcwq) return 0; - lock_map_acquire(&cwq->wq->lockdep_map); - lock_map_release(&cwq->wq->lockdep_map); - - prev = NULL; - spin_lock_irq(&cwq->lock); + spin_lock_irq(&gcwq->lock); if (!list_empty(&work->entry)) { /* * See the comment near try_to_grab_pending()->smp_rmb(). - * If it was re-queued under us we are not going to wait. + * If it was re-queued to a different gcwq under us, we + * are not going to wait. */ smp_rmb(); - if (unlikely(cwq != get_wq_data(work))) - goto out; - prev = &work->entry; + cwq = get_work_cwq(work); + if (unlikely(!cwq || gcwq != cwq->gcwq)) + goto already_gone; } else { - if (cwq->current_work != work) - goto out; - prev = &cwq->worklist; + worker = find_worker_executing_work(gcwq, work); + if (!worker) + goto already_gone; + cwq = worker->current_cwq; } - insert_wq_barrier(cwq, &barr, prev->next); -out: - spin_unlock_irq(&cwq->lock); - if (!prev) - return 0; + + insert_wq_barrier(cwq, &barr, work, worker); + spin_unlock_irq(&gcwq->lock); + + lock_map_acquire(&cwq->wq->lockdep_map); + lock_map_release(&cwq->wq->lockdep_map); wait_for_completion(&barr.done); destroy_work_on_stack(&barr.work); return 1; +already_gone: + spin_unlock_irq(&gcwq->lock); + return 0; } EXPORT_SYMBOL_GPL(flush_work); @@ -608,54 +2353,55 @@ EXPORT_SYMBOL_GPL(flush_work); */ static int try_to_grab_pending(struct work_struct *work) { - struct cpu_workqueue_struct *cwq; + struct global_cwq *gcwq; int ret = -1; - if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) return 0; /* * The queueing is in progress, or it is already queued. Try to * steal it from ->worklist without clearing WORK_STRUCT_PENDING. */ - - cwq = get_wq_data(work); - if (!cwq) + gcwq = get_work_gcwq(work); + if (!gcwq) return ret; - spin_lock_irq(&cwq->lock); + spin_lock_irq(&gcwq->lock); if (!list_empty(&work->entry)) { /* - * This work is queued, but perhaps we locked the wrong cwq. + * This work is queued, but perhaps we locked the wrong gcwq. * In that case we must see the new value after rmb(), see * insert_work()->wmb(). */ smp_rmb(); - if (cwq == get_wq_data(work)) { + if (gcwq == get_work_gcwq(work)) { debug_work_deactivate(work); list_del_init(&work->entry); + cwq_dec_nr_in_flight(get_work_cwq(work), + get_work_color(work)); ret = 1; } } - spin_unlock_irq(&cwq->lock); + spin_unlock_irq(&gcwq->lock); return ret; } -static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, - struct work_struct *work) +static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work) { struct wq_barrier barr; - int running = 0; + struct worker *worker; - spin_lock_irq(&cwq->lock); - if (unlikely(cwq->current_work == work)) { - insert_wq_barrier(cwq, &barr, cwq->worklist.next); - running = 1; - } - spin_unlock_irq(&cwq->lock); + spin_lock_irq(&gcwq->lock); + + worker = find_worker_executing_work(gcwq, work); + if (unlikely(worker)) + insert_wq_barrier(worker->current_cwq, &barr, work, worker); - if (unlikely(running)) { + spin_unlock_irq(&gcwq->lock); + + if (unlikely(worker)) { wait_for_completion(&barr.done); destroy_work_on_stack(&barr.work); } @@ -663,9 +2409,6 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, static void wait_on_work(struct work_struct *work) { - struct cpu_workqueue_struct *cwq; - struct workqueue_struct *wq; - const struct cpumask *cpu_map; int cpu; might_sleep(); @@ -673,15 +2416,8 @@ static void wait_on_work(struct work_struct *work) lock_map_acquire(&work->lockdep_map); lock_map_release(&work->lockdep_map); - cwq = get_wq_data(work); - if (!cwq) - return; - - wq = cwq->wq; - cpu_map = wq_cpu_map(wq); - - for_each_cpu(cpu, cpu_map) - wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); + for_each_gcwq_cpu(cpu) + wait_on_cpu_work(get_gcwq(cpu), work); } static int __cancel_work_timer(struct work_struct *work, @@ -696,7 +2432,7 @@ static int __cancel_work_timer(struct work_struct *work, wait_on_work(work); } while (unlikely(ret < 0)); - clear_wq_data(work); + clear_work_data(work); return ret; } @@ -742,8 +2478,6 @@ int cancel_delayed_work_sync(struct delayed_work *dwork) } EXPORT_SYMBOL(cancel_delayed_work_sync); -static struct workqueue_struct *keventd_wq __read_mostly; - /** * schedule_work - put work task in global workqueue * @work: job to be done @@ -757,7 +2491,7 @@ static struct workqueue_struct *keventd_wq __read_mostly; */ int schedule_work(struct work_struct *work) { - return queue_work(keventd_wq, work); + return queue_work(system_wq, work); } EXPORT_SYMBOL(schedule_work); @@ -770,7 +2504,7 @@ EXPORT_SYMBOL(schedule_work); */ int schedule_work_on(int cpu, struct work_struct *work) { - return queue_work_on(cpu, keventd_wq, work); + return queue_work_on(cpu, system_wq, work); } EXPORT_SYMBOL(schedule_work_on); @@ -785,7 +2519,7 @@ EXPORT_SYMBOL(schedule_work_on); int schedule_delayed_work(struct delayed_work *dwork, unsigned long delay) { - return queue_delayed_work(keventd_wq, dwork, delay); + return queue_delayed_work(system_wq, dwork, delay); } EXPORT_SYMBOL(schedule_delayed_work); @@ -798,9 +2532,8 @@ EXPORT_SYMBOL(schedule_delayed_work); void flush_delayed_work(struct delayed_work *dwork) { if (del_timer_sync(&dwork->timer)) { - struct cpu_workqueue_struct *cwq; - cwq = wq_per_cpu(get_wq_data(&dwork->work)->wq, get_cpu()); - __queue_work(cwq, &dwork->work); + __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq, + &dwork->work); put_cpu(); } flush_work(&dwork->work); @@ -819,7 +2552,7 @@ EXPORT_SYMBOL(flush_delayed_work); int schedule_delayed_work_on(int cpu, struct delayed_work *dwork, unsigned long delay) { - return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); + return queue_delayed_work_on(cpu, system_wq, dwork, delay); } EXPORT_SYMBOL(schedule_delayed_work_on); @@ -835,7 +2568,6 @@ EXPORT_SYMBOL(schedule_delayed_work_on); int schedule_on_each_cpu(work_func_t func) { int cpu; - int orig = -1; struct work_struct *works; works = alloc_percpu(struct work_struct); @@ -844,23 +2576,12 @@ int schedule_on_each_cpu(work_func_t func) get_online_cpus(); - /* - * When running in keventd don't schedule a work item on - * itself. Can just call directly because the work queue is - * already bound. This also is faster. - */ - if (current_is_keventd()) - orig = raw_smp_processor_id(); - for_each_online_cpu(cpu) { struct work_struct *work = per_cpu_ptr(works, cpu); INIT_WORK(work, func); - if (cpu != orig) - schedule_work_on(cpu, work); + schedule_work_on(cpu, work); } - if (orig >= 0) - func(per_cpu_ptr(works, orig)); for_each_online_cpu(cpu) flush_work(per_cpu_ptr(works, cpu)); @@ -896,7 +2617,7 @@ int schedule_on_each_cpu(work_func_t func) */ void flush_scheduled_work(void) { - flush_workqueue(keventd_wq); + flush_workqueue(system_wq); } EXPORT_SYMBOL(flush_scheduled_work); @@ -928,170 +2649,170 @@ EXPORT_SYMBOL_GPL(execute_in_process_context); int keventd_up(void) { - return keventd_wq != NULL; + return system_wq != NULL; } -int current_is_keventd(void) +static int alloc_cwqs(struct workqueue_struct *wq) { - struct cpu_workqueue_struct *cwq; - int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ - int ret = 0; - - BUG_ON(!keventd_wq); + /* + * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS. + * Make sure that the alignment isn't lower than that of + * unsigned long long. + */ + const size_t size = sizeof(struct cpu_workqueue_struct); + const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS, + __alignof__(unsigned long long)); +#ifdef CONFIG_SMP + bool percpu = !(wq->flags & WQ_UNBOUND); +#else + bool percpu = false; +#endif - cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); - if (current == cwq->thread) - ret = 1; + if (percpu) + wq->cpu_wq.pcpu = __alloc_percpu(size, align); + else { + void *ptr; - return ret; + /* + * Allocate enough room to align cwq and put an extra + * pointer at the end pointing back to the originally + * allocated pointer which will be used for free. + */ + ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL); + if (ptr) { + wq->cpu_wq.single = PTR_ALIGN(ptr, align); + *(void **)(wq->cpu_wq.single + 1) = ptr; + } + } + /* just in case, make sure it's actually aligned */ + BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align)); + return wq->cpu_wq.v ? 0 : -ENOMEM; } -static struct cpu_workqueue_struct * -init_cpu_workqueue(struct workqueue_struct *wq, int cpu) +static void free_cwqs(struct workqueue_struct *wq) { - struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); - - cwq->wq = wq; - spin_lock_init(&cwq->lock); - INIT_LIST_HEAD(&cwq->worklist); - init_waitqueue_head(&cwq->more_work); +#ifdef CONFIG_SMP + bool percpu = !(wq->flags & WQ_UNBOUND); +#else + bool percpu = false; +#endif - return cwq; + if (percpu) + free_percpu(wq->cpu_wq.pcpu); + else if (wq->cpu_wq.single) { + /* the pointer to free is stored right after the cwq */ + kfree(*(void **)(wq->cpu_wq.single + 1)); + } } -static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) +static int wq_clamp_max_active(int max_active, unsigned int flags, + const char *name) { - struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; - struct workqueue_struct *wq = cwq->wq; - const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; - struct task_struct *p; + int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; - p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); - /* - * Nobody can add the work_struct to this cwq, - * if (caller is __create_workqueue) - * nobody should see this wq - * else // caller is CPU_UP_PREPARE - * cpu is not on cpu_online_map - * so we can abort safely. - */ - if (IS_ERR(p)) - return PTR_ERR(p); - if (cwq->wq->rt) - sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m); - cwq->thread = p; - - trace_workqueue_creation(cwq->thread, cpu); + if (max_active < 1 || max_active > lim) + printk(KERN_WARNING "workqueue: max_active %d requested for %s " + "is out of range, clamping between %d and %d\n", + max_active, name, 1, lim); - return 0; + return clamp_val(max_active, 1, lim); } -static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) +struct workqueue_struct *__alloc_workqueue_key(const char *name, + unsigned int flags, + int max_active, + struct lock_class_key *key, + const char *lock_name) { - struct task_struct *p = cwq->thread; + struct workqueue_struct *wq; + unsigned int cpu; - if (p != NULL) { - if (cpu >= 0) - kthread_bind(p, cpu); - wake_up_process(p); - } -} + /* + * Unbound workqueues aren't concurrency managed and should be + * dispatched to workers immediately. + */ + if (flags & WQ_UNBOUND) + flags |= WQ_HIGHPRI; -struct workqueue_struct *__create_workqueue_key(const char *name, - int singlethread, - int freezeable, - int rt, - struct lock_class_key *key, - const char *lock_name) -{ - struct workqueue_struct *wq; - struct cpu_workqueue_struct *cwq; - int err = 0, cpu; + max_active = max_active ?: WQ_DFL_ACTIVE; + max_active = wq_clamp_max_active(max_active, flags, name); wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) - return NULL; + goto err; - wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); - if (!wq->cpu_wq) { - kfree(wq); - return NULL; - } + wq->flags = flags; + wq->saved_max_active = max_active; + mutex_init(&wq->flush_mutex); + atomic_set(&wq->nr_cwqs_to_flush, 0); + INIT_LIST_HEAD(&wq->flusher_queue); + INIT_LIST_HEAD(&wq->flusher_overflow); wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); - wq->singlethread = singlethread; - wq->freezeable = freezeable; - wq->rt = rt; INIT_LIST_HEAD(&wq->list); - if (singlethread) { - cwq = init_cpu_workqueue(wq, singlethread_cpu); - err = create_workqueue_thread(cwq, singlethread_cpu); - start_workqueue_thread(cwq, -1); - } else { - cpu_maps_update_begin(); - /* - * We must place this wq on list even if the code below fails. - * cpu_down(cpu) can remove cpu from cpu_populated_map before - * destroy_workqueue() takes the lock, in that case we leak - * cwq[cpu]->thread. - */ - spin_lock(&workqueue_lock); - list_add(&wq->list, &workqueues); - spin_unlock(&workqueue_lock); - /* - * We must initialize cwqs for each possible cpu even if we - * are going to call destroy_workqueue() finally. Otherwise - * cpu_up() can hit the uninitialized cwq once we drop the - * lock. - */ - for_each_possible_cpu(cpu) { - cwq = init_cpu_workqueue(wq, cpu); - if (err || !cpu_online(cpu)) - continue; - err = create_workqueue_thread(cwq, cpu); - start_workqueue_thread(cwq, cpu); - } - cpu_maps_update_done(); + if (alloc_cwqs(wq) < 0) + goto err; + + for_each_cwq_cpu(cpu, wq) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + struct global_cwq *gcwq = get_gcwq(cpu); + + BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); + cwq->gcwq = gcwq; + cwq->wq = wq; + cwq->flush_color = -1; + cwq->max_active = max_active; + INIT_LIST_HEAD(&cwq->delayed_works); } - if (err) { - destroy_workqueue(wq); - wq = NULL; + if (flags & WQ_RESCUER) { + struct worker *rescuer; + + if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL)) + goto err; + + wq->rescuer = rescuer = alloc_worker(); + if (!rescuer) + goto err; + + rescuer->task = kthread_create(rescuer_thread, wq, "%s", name); + if (IS_ERR(rescuer->task)) + goto err; + + wq->rescuer = rescuer; + rescuer->task->flags |= PF_THREAD_BOUND; + wake_up_process(rescuer->task); } - return wq; -} -EXPORT_SYMBOL_GPL(__create_workqueue_key); -static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) -{ /* - * Our caller is either destroy_workqueue() or CPU_POST_DEAD, - * cpu_add_remove_lock protects cwq->thread. + * workqueue_lock protects global freeze state and workqueues + * list. Grab it, set max_active accordingly and add the new + * workqueue to workqueues list. */ - if (cwq->thread == NULL) - return; + spin_lock(&workqueue_lock); - lock_map_acquire(&cwq->wq->lockdep_map); - lock_map_release(&cwq->wq->lockdep_map); + if (workqueue_freezing && wq->flags & WQ_FREEZEABLE) + for_each_cwq_cpu(cpu, wq) + get_cwq(cpu, wq)->max_active = 0; - flush_cpu_workqueue(cwq); - /* - * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, - * a concurrent flush_workqueue() can insert a barrier after us. - * However, in that case run_workqueue() won't return and check - * kthread_should_stop() until it flushes all work_struct's. - * When ->worklist becomes empty it is safe to exit because no - * more work_structs can be queued on this cwq: flush_workqueue - * checks list_empty(), and a "normal" queue_work() can't use - * a dead CPU. - */ - trace_workqueue_destruction(cwq->thread); - kthread_stop(cwq->thread); - cwq->thread = NULL; + list_add(&wq->list, &workqueues); + + spin_unlock(&workqueue_lock); + + return wq; +err: + if (wq) { + free_cwqs(wq); + free_mayday_mask(wq->mayday_mask); + kfree(wq->rescuer); + kfree(wq); + } + return NULL; } +EXPORT_SYMBOL_GPL(__alloc_workqueue_key); /** * destroy_workqueue - safely terminate a workqueue @@ -1101,72 +2822,516 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) */ void destroy_workqueue(struct workqueue_struct *wq) { - const struct cpumask *cpu_map = wq_cpu_map(wq); - int cpu; + unsigned int cpu; + + flush_workqueue(wq); - cpu_maps_update_begin(); + /* + * wq list is used to freeze wq, remove from list after + * flushing is complete in case freeze races us. + */ spin_lock(&workqueue_lock); list_del(&wq->list); spin_unlock(&workqueue_lock); - for_each_cpu(cpu, cpu_map) - cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); - cpu_maps_update_done(); + /* sanity check */ + for_each_cwq_cpu(cpu, wq) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + int i; + + for (i = 0; i < WORK_NR_COLORS; i++) + BUG_ON(cwq->nr_in_flight[i]); + BUG_ON(cwq->nr_active); + BUG_ON(!list_empty(&cwq->delayed_works)); + } + + if (wq->flags & WQ_RESCUER) { + kthread_stop(wq->rescuer->task); + free_mayday_mask(wq->mayday_mask); + } - free_percpu(wq->cpu_wq); + free_cwqs(wq); kfree(wq); } EXPORT_SYMBOL_GPL(destroy_workqueue); +/** + * workqueue_set_max_active - adjust max_active of a workqueue + * @wq: target workqueue + * @max_active: new max_active value. + * + * Set max_active of @wq to @max_active. + * + * CONTEXT: + * Don't call from IRQ context. + */ +void workqueue_set_max_active(struct workqueue_struct *wq, int max_active) +{ + unsigned int cpu; + + max_active = wq_clamp_max_active(max_active, wq->flags, wq->name); + + spin_lock(&workqueue_lock); + + wq->saved_max_active = max_active; + + for_each_cwq_cpu(cpu, wq) { + struct global_cwq *gcwq = get_gcwq(cpu); + + spin_lock_irq(&gcwq->lock); + + if (!(wq->flags & WQ_FREEZEABLE) || + !(gcwq->flags & GCWQ_FREEZING)) + get_cwq(gcwq->cpu, wq)->max_active = max_active; + + spin_unlock_irq(&gcwq->lock); + } + + spin_unlock(&workqueue_lock); +} +EXPORT_SYMBOL_GPL(workqueue_set_max_active); + +/** + * workqueue_congested - test whether a workqueue is congested + * @cpu: CPU in question + * @wq: target workqueue + * + * Test whether @wq's cpu workqueue for @cpu is congested. There is + * no synchronization around this function and the test result is + * unreliable and only useful as advisory hints or for debugging. + * + * RETURNS: + * %true if congested, %false otherwise. + */ +bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq) +{ + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + + return !list_empty(&cwq->delayed_works); +} +EXPORT_SYMBOL_GPL(workqueue_congested); + +/** + * work_cpu - return the last known associated cpu for @work + * @work: the work of interest + * + * RETURNS: + * CPU number if @work was ever queued. WORK_CPU_NONE otherwise. + */ +unsigned int work_cpu(struct work_struct *work) +{ + struct global_cwq *gcwq = get_work_gcwq(work); + + return gcwq ? gcwq->cpu : WORK_CPU_NONE; +} +EXPORT_SYMBOL_GPL(work_cpu); + +/** + * work_busy - test whether a work is currently pending or running + * @work: the work to be tested + * + * Test whether @work is currently pending or running. There is no + * synchronization around this function and the test result is + * unreliable and only useful as advisory hints or for debugging. + * Especially for reentrant wqs, the pending state might hide the + * running state. + * + * RETURNS: + * OR'd bitmask of WORK_BUSY_* bits. + */ +unsigned int work_busy(struct work_struct *work) +{ + struct global_cwq *gcwq = get_work_gcwq(work); + unsigned long flags; + unsigned int ret = 0; + + if (!gcwq) + return false; + + spin_lock_irqsave(&gcwq->lock, flags); + + if (work_pending(work)) + ret |= WORK_BUSY_PENDING; + if (find_worker_executing_work(gcwq, work)) + ret |= WORK_BUSY_RUNNING; + + spin_unlock_irqrestore(&gcwq->lock, flags); + + return ret; +} +EXPORT_SYMBOL_GPL(work_busy); + +/* + * CPU hotplug. + * + * There are two challenges in supporting CPU hotplug. Firstly, there + * are a lot of assumptions on strong associations among work, cwq and + * gcwq which make migrating pending and scheduled works very + * difficult to implement without impacting hot paths. Secondly, + * gcwqs serve mix of short, long and very long running works making + * blocked draining impractical. + * + * This is solved by allowing a gcwq to be detached from CPU, running + * it with unbound (rogue) workers and allowing it to be reattached + * later if the cpu comes back online. A separate thread is created + * to govern a gcwq in such state and is called the trustee of the + * gcwq. + * + * Trustee states and their descriptions. + * + * START Command state used on startup. On CPU_DOWN_PREPARE, a + * new trustee is started with this state. + * + * IN_CHARGE Once started, trustee will enter this state after + * assuming the manager role and making all existing + * workers rogue. DOWN_PREPARE waits for trustee to + * enter this state. After reaching IN_CHARGE, trustee + * tries to execute the pending worklist until it's empty + * and the state is set to BUTCHER, or the state is set + * to RELEASE. + * + * BUTCHER Command state which is set by the cpu callback after + * the cpu has went down. Once this state is set trustee + * knows that there will be no new works on the worklist + * and once the worklist is empty it can proceed to + * killing idle workers. + * + * RELEASE Command state which is set by the cpu callback if the + * cpu down has been canceled or it has come online + * again. After recognizing this state, trustee stops + * trying to drain or butcher and clears ROGUE, rebinds + * all remaining workers back to the cpu and releases + * manager role. + * + * DONE Trustee will enter this state after BUTCHER or RELEASE + * is complete. + * + * trustee CPU draining + * took over down complete + * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE + * | | ^ + * | CPU is back online v return workers | + * ----------------> RELEASE -------------- + */ + +/** + * trustee_wait_event_timeout - timed event wait for trustee + * @cond: condition to wait for + * @timeout: timeout in jiffies + * + * wait_event_timeout() for trustee to use. Handles locking and + * checks for RELEASE request. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. To be used by trustee. + * + * RETURNS: + * Positive indicating left time if @cond is satisfied, 0 if timed + * out, -1 if canceled. + */ +#define trustee_wait_event_timeout(cond, timeout) ({ \ + long __ret = (timeout); \ + while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \ + __ret) { \ + spin_unlock_irq(&gcwq->lock); \ + __wait_event_timeout(gcwq->trustee_wait, (cond) || \ + (gcwq->trustee_state == TRUSTEE_RELEASE), \ + __ret); \ + spin_lock_irq(&gcwq->lock); \ + } \ + gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret); \ +}) + +/** + * trustee_wait_event - event wait for trustee + * @cond: condition to wait for + * + * wait_event() for trustee to use. Automatically handles locking and + * checks for CANCEL request. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. To be used by trustee. + * + * RETURNS: + * 0 if @cond is satisfied, -1 if canceled. + */ +#define trustee_wait_event(cond) ({ \ + long __ret1; \ + __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\ + __ret1 < 0 ? -1 : 0; \ +}) + +static int __cpuinit trustee_thread(void *__gcwq) +{ + struct global_cwq *gcwq = __gcwq; + struct worker *worker; + struct work_struct *work; + struct hlist_node *pos; + long rc; + int i; + + BUG_ON(gcwq->cpu != smp_processor_id()); + + spin_lock_irq(&gcwq->lock); + /* + * Claim the manager position and make all workers rogue. + * Trustee must be bound to the target cpu and can't be + * cancelled. + */ + BUG_ON(gcwq->cpu != smp_processor_id()); + rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS)); + BUG_ON(rc < 0); + + gcwq->flags |= GCWQ_MANAGING_WORKERS; + + list_for_each_entry(worker, &gcwq->idle_list, entry) + worker->flags |= WORKER_ROGUE; + + for_each_busy_worker(worker, i, pos, gcwq) + worker->flags |= WORKER_ROGUE; + + /* + * Call schedule() so that we cross rq->lock and thus can + * guarantee sched callbacks see the rogue flag. This is + * necessary as scheduler callbacks may be invoked from other + * cpus. + */ + spin_unlock_irq(&gcwq->lock); + schedule(); + spin_lock_irq(&gcwq->lock); + + /* + * Sched callbacks are disabled now. Zap nr_running. After + * this, nr_running stays zero and need_more_worker() and + * keep_working() are always true as long as the worklist is + * not empty. + */ + atomic_set(get_gcwq_nr_running(gcwq->cpu), 0); + + spin_unlock_irq(&gcwq->lock); + del_timer_sync(&gcwq->idle_timer); + spin_lock_irq(&gcwq->lock); + + /* + * We're now in charge. Notify and proceed to drain. We need + * to keep the gcwq running during the whole CPU down + * procedure as other cpu hotunplug callbacks may need to + * flush currently running tasks. + */ + gcwq->trustee_state = TRUSTEE_IN_CHARGE; + wake_up_all(&gcwq->trustee_wait); + + /* + * The original cpu is in the process of dying and may go away + * anytime now. When that happens, we and all workers would + * be migrated to other cpus. Try draining any left work. We + * want to get it over with ASAP - spam rescuers, wake up as + * many idlers as necessary and create new ones till the + * worklist is empty. Note that if the gcwq is frozen, there + * may be frozen works in freezeable cwqs. Don't declare + * completion while frozen. + */ + while (gcwq->nr_workers != gcwq->nr_idle || + gcwq->flags & GCWQ_FREEZING || + gcwq->trustee_state == TRUSTEE_IN_CHARGE) { + int nr_works = 0; + + list_for_each_entry(work, &gcwq->worklist, entry) { + send_mayday(work); + nr_works++; + } + + list_for_each_entry(worker, &gcwq->idle_list, entry) { + if (!nr_works--) + break; + wake_up_process(worker->task); + } + + if (need_to_create_worker(gcwq)) { + spin_unlock_irq(&gcwq->lock); + worker = create_worker(gcwq, false); + spin_lock_irq(&gcwq->lock); + if (worker) { + worker->flags |= WORKER_ROGUE; + start_worker(worker); + } + } + + /* give a breather */ + if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0) + break; + } + + /* + * Either all works have been scheduled and cpu is down, or + * cpu down has already been canceled. Wait for and butcher + * all workers till we're canceled. + */ + do { + rc = trustee_wait_event(!list_empty(&gcwq->idle_list)); + while (!list_empty(&gcwq->idle_list)) + destroy_worker(list_first_entry(&gcwq->idle_list, + struct worker, entry)); + } while (gcwq->nr_workers && rc >= 0); + + /* + * At this point, either draining has completed and no worker + * is left, or cpu down has been canceled or the cpu is being + * brought back up. There shouldn't be any idle one left. + * Tell the remaining busy ones to rebind once it finishes the + * currently scheduled works by scheduling the rebind_work. + */ + WARN_ON(!list_empty(&gcwq->idle_list)); + + for_each_busy_worker(worker, i, pos, gcwq) { + struct work_struct *rebind_work = &worker->rebind_work; + + /* + * Rebind_work may race with future cpu hotplug + * operations. Use a separate flag to mark that + * rebinding is scheduled. + */ + worker->flags |= WORKER_REBIND; + worker->flags &= ~WORKER_ROGUE; + + /* queue rebind_work, wq doesn't matter, use the default one */ + if (test_and_set_bit(WORK_STRUCT_PENDING_BIT, + work_data_bits(rebind_work))) + continue; + + debug_work_activate(rebind_work); + insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work, + worker->scheduled.next, + work_color_to_flags(WORK_NO_COLOR)); + } + + /* relinquish manager role */ + gcwq->flags &= ~GCWQ_MANAGING_WORKERS; + + /* notify completion */ + gcwq->trustee = NULL; + gcwq->trustee_state = TRUSTEE_DONE; + wake_up_all(&gcwq->trustee_wait); + spin_unlock_irq(&gcwq->lock); + return 0; +} + +/** + * wait_trustee_state - wait for trustee to enter the specified state + * @gcwq: gcwq the trustee of interest belongs to + * @state: target state to wait for + * + * Wait for the trustee to reach @state. DONE is already matched. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock) which may be released and regrabbed + * multiple times. To be used by cpu_callback. + */ +static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state) +{ + if (!(gcwq->trustee_state == state || + gcwq->trustee_state == TRUSTEE_DONE)) { + spin_unlock_irq(&gcwq->lock); + __wait_event(gcwq->trustee_wait, + gcwq->trustee_state == state || + gcwq->trustee_state == TRUSTEE_DONE); + spin_lock_irq(&gcwq->lock); + } +} + static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, unsigned long action, void *hcpu) { unsigned int cpu = (unsigned long)hcpu; - struct cpu_workqueue_struct *cwq; - struct workqueue_struct *wq; - int err = 0; + struct global_cwq *gcwq = get_gcwq(cpu); + struct task_struct *new_trustee = NULL; + struct worker *uninitialized_var(new_worker); + unsigned long flags; action &= ~CPU_TASKS_FROZEN; switch (action) { + case CPU_DOWN_PREPARE: + new_trustee = kthread_create(trustee_thread, gcwq, + "workqueue_trustee/%d\n", cpu); + if (IS_ERR(new_trustee)) + return notifier_from_errno(PTR_ERR(new_trustee)); + kthread_bind(new_trustee, cpu); + /* fall through */ case CPU_UP_PREPARE: - cpumask_set_cpu(cpu, cpu_populated_map); - } -undo: - list_for_each_entry(wq, &workqueues, list) { - cwq = per_cpu_ptr(wq->cpu_wq, cpu); - - switch (action) { - case CPU_UP_PREPARE: - err = create_workqueue_thread(cwq, cpu); - if (!err) - break; - printk(KERN_ERR "workqueue [%s] for %i failed\n", - wq->name, cpu); - action = CPU_UP_CANCELED; - err = -ENOMEM; - goto undo; - - case CPU_ONLINE: - start_workqueue_thread(cwq, cpu); - break; - - case CPU_UP_CANCELED: - start_workqueue_thread(cwq, -1); - case CPU_POST_DEAD: - cleanup_workqueue_thread(cwq); - break; + BUG_ON(gcwq->first_idle); + new_worker = create_worker(gcwq, false); + if (!new_worker) { + if (new_trustee) + kthread_stop(new_trustee); + return NOTIFY_BAD; } } + /* some are called w/ irq disabled, don't disturb irq status */ + spin_lock_irqsave(&gcwq->lock, flags); + switch (action) { - case CPU_UP_CANCELED: + case CPU_DOWN_PREPARE: + /* initialize trustee and tell it to acquire the gcwq */ + BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE); + gcwq->trustee = new_trustee; + gcwq->trustee_state = TRUSTEE_START; + wake_up_process(gcwq->trustee); + wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE); + /* fall through */ + case CPU_UP_PREPARE: + BUG_ON(gcwq->first_idle); + gcwq->first_idle = new_worker; + break; + + case CPU_DYING: + /* + * Before this, the trustee and all workers except for + * the ones which are still executing works from + * before the last CPU down must be on the cpu. After + * this, they'll all be diasporas. + */ + gcwq->flags |= GCWQ_DISASSOCIATED; + break; + case CPU_POST_DEAD: - cpumask_clear_cpu(cpu, cpu_populated_map); + gcwq->trustee_state = TRUSTEE_BUTCHER; + /* fall through */ + case CPU_UP_CANCELED: + destroy_worker(gcwq->first_idle); + gcwq->first_idle = NULL; + break; + + case CPU_DOWN_FAILED: + case CPU_ONLINE: + gcwq->flags &= ~GCWQ_DISASSOCIATED; + if (gcwq->trustee_state != TRUSTEE_DONE) { + gcwq->trustee_state = TRUSTEE_RELEASE; + wake_up_process(gcwq->trustee); + wait_trustee_state(gcwq, TRUSTEE_DONE); + } + + /* + * Trustee is done and there might be no worker left. + * Put the first_idle in and request a real manager to + * take a look. + */ + spin_unlock_irq(&gcwq->lock); + kthread_bind(gcwq->first_idle->task, cpu); + spin_lock_irq(&gcwq->lock); + gcwq->flags |= GCWQ_MANAGE_WORKERS; + start_worker(gcwq->first_idle); + gcwq->first_idle = NULL; + break; } - return notifier_from_errno(err); + spin_unlock_irqrestore(&gcwq->lock, flags); + + return notifier_from_errno(0); } #ifdef CONFIG_SMP @@ -1216,14 +3381,199 @@ long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) EXPORT_SYMBOL_GPL(work_on_cpu); #endif /* CONFIG_SMP */ -void __init init_workqueues(void) +#ifdef CONFIG_FREEZER + +/** + * freeze_workqueues_begin - begin freezing workqueues + * + * Start freezing workqueues. After this function returns, all + * freezeable workqueues will queue new works to their frozen_works + * list instead of gcwq->worklist. + * + * CONTEXT: + * Grabs and releases workqueue_lock and gcwq->lock's. + */ +void freeze_workqueues_begin(void) +{ + unsigned int cpu; + + spin_lock(&workqueue_lock); + + BUG_ON(workqueue_freezing); + workqueue_freezing = true; + + for_each_gcwq_cpu(cpu) { + struct global_cwq *gcwq = get_gcwq(cpu); + struct workqueue_struct *wq; + + spin_lock_irq(&gcwq->lock); + + BUG_ON(gcwq->flags & GCWQ_FREEZING); + gcwq->flags |= GCWQ_FREEZING; + + list_for_each_entry(wq, &workqueues, list) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + + if (cwq && wq->flags & WQ_FREEZEABLE) + cwq->max_active = 0; + } + + spin_unlock_irq(&gcwq->lock); + } + + spin_unlock(&workqueue_lock); +} + +/** + * freeze_workqueues_busy - are freezeable workqueues still busy? + * + * Check whether freezing is complete. This function must be called + * between freeze_workqueues_begin() and thaw_workqueues(). + * + * CONTEXT: + * Grabs and releases workqueue_lock. + * + * RETURNS: + * %true if some freezeable workqueues are still busy. %false if + * freezing is complete. + */ +bool freeze_workqueues_busy(void) +{ + unsigned int cpu; + bool busy = false; + + spin_lock(&workqueue_lock); + + BUG_ON(!workqueue_freezing); + + for_each_gcwq_cpu(cpu) { + struct workqueue_struct *wq; + /* + * nr_active is monotonically decreasing. It's safe + * to peek without lock. + */ + list_for_each_entry(wq, &workqueues, list) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + + if (!cwq || !(wq->flags & WQ_FREEZEABLE)) + continue; + + BUG_ON(cwq->nr_active < 0); + if (cwq->nr_active) { + busy = true; + goto out_unlock; + } + } + } +out_unlock: + spin_unlock(&workqueue_lock); + return busy; +} + +/** + * thaw_workqueues - thaw workqueues + * + * Thaw workqueues. Normal queueing is restored and all collected + * frozen works are transferred to their respective gcwq worklists. + * + * CONTEXT: + * Grabs and releases workqueue_lock and gcwq->lock's. + */ +void thaw_workqueues(void) +{ + unsigned int cpu; + + spin_lock(&workqueue_lock); + + if (!workqueue_freezing) + goto out_unlock; + + for_each_gcwq_cpu(cpu) { + struct global_cwq *gcwq = get_gcwq(cpu); + struct workqueue_struct *wq; + + spin_lock_irq(&gcwq->lock); + + BUG_ON(!(gcwq->flags & GCWQ_FREEZING)); + gcwq->flags &= ~GCWQ_FREEZING; + + list_for_each_entry(wq, &workqueues, list) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + + if (!cwq || !(wq->flags & WQ_FREEZEABLE)) + continue; + + /* restore max_active and repopulate worklist */ + cwq->max_active = wq->saved_max_active; + + while (!list_empty(&cwq->delayed_works) && + cwq->nr_active < cwq->max_active) + cwq_activate_first_delayed(cwq); + } + + wake_up_worker(gcwq); + + spin_unlock_irq(&gcwq->lock); + } + + workqueue_freezing = false; +out_unlock: + spin_unlock(&workqueue_lock); +} +#endif /* CONFIG_FREEZER */ + +static int __init init_workqueues(void) { - alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); + unsigned int cpu; + int i; + + hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); + + /* initialize gcwqs */ + for_each_gcwq_cpu(cpu) { + struct global_cwq *gcwq = get_gcwq(cpu); + + spin_lock_init(&gcwq->lock); + INIT_LIST_HEAD(&gcwq->worklist); + gcwq->cpu = cpu; + if (cpu == WORK_CPU_UNBOUND) + gcwq->flags |= GCWQ_DISASSOCIATED; + + INIT_LIST_HEAD(&gcwq->idle_list); + for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) + INIT_HLIST_HEAD(&gcwq->busy_hash[i]); + + init_timer_deferrable(&gcwq->idle_timer); + gcwq->idle_timer.function = idle_worker_timeout; + gcwq->idle_timer.data = (unsigned long)gcwq; + + setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout, + (unsigned long)gcwq); - cpumask_copy(cpu_populated_map, cpu_online_mask); - singlethread_cpu = cpumask_first(cpu_possible_mask); - cpu_singlethread_map = cpumask_of(singlethread_cpu); - hotcpu_notifier(workqueue_cpu_callback, 0); - keventd_wq = create_workqueue("events"); - BUG_ON(!keventd_wq); + ida_init(&gcwq->worker_ida); + + gcwq->trustee_state = TRUSTEE_DONE; + init_waitqueue_head(&gcwq->trustee_wait); + } + + /* create the initial worker */ + for_each_online_gcwq_cpu(cpu) { + struct global_cwq *gcwq = get_gcwq(cpu); + struct worker *worker; + + worker = create_worker(gcwq, true); + BUG_ON(!worker); + spin_lock_irq(&gcwq->lock); + start_worker(worker); + spin_unlock_irq(&gcwq->lock); + } + + system_wq = alloc_workqueue("events", 0, 0); + system_long_wq = alloc_workqueue("events_long", 0, 0); + system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0); + system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, + WQ_UNBOUND_MAX_ACTIVE); + BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq); + return 0; } +early_initcall(init_workqueues); diff --git a/kernel/workqueue_sched.h b/kernel/workqueue_sched.h index af040babb74..2d10fc98dc7 100644 --- a/kernel/workqueue_sched.h +++ b/kernel/workqueue_sched.h @@ -4,13 +4,6 @@ * Scheduler hooks for concurrency managed workqueue. Only to be * included from sched.c and workqueue.c. */ -static inline void wq_worker_waking_up(struct task_struct *task, - unsigned int cpu) -{ -} - -static inline struct task_struct *wq_worker_sleeping(struct task_struct *task, - unsigned int cpu) -{ - return NULL; -} +void wq_worker_waking_up(struct task_struct *task, unsigned int cpu); +struct task_struct *wq_worker_sleeping(struct task_struct *task, + unsigned int cpu); |