summaryrefslogtreecommitdiffstats
path: root/kernel/workqueue.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-10-02 09:54:49 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2012-10-02 09:54:49 -0700
commit033d9959ed2dc1029217d4165f80a71702dc578e (patch)
tree3d306316e44bdabce2e0bf2ef7e466e525f90b4c /kernel/workqueue.c
parent974a847e00cf3ff1695e62b276892137893706ab (diff)
parent7c6e72e46c9ea4a88f3f8ba96edce9db4bd48726 (diff)
Merge branch 'for-3.7' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq
Pull workqueue changes from Tejun Heo: "This is workqueue updates for v3.7-rc1. A lot of activities this round including considerable API and behavior cleanups. * delayed_work combines a timer and a work item. The handling of the timer part has always been a bit clunky leading to confusing cancelation API with weird corner-case behaviors. delayed_work is updated to use new IRQ safe timer and cancelation now works as expected. * Another deficiency of delayed_work was lack of the counterpart of mod_timer() which led to cancel+queue combinations or open-coded timer+work usages. mod_delayed_work[_on]() are added. These two delayed_work changes make delayed_work provide interface and behave like timer which is executed with process context. * A work item could be executed concurrently on multiple CPUs, which is rather unintuitive and made flush_work() behavior confusing and half-broken under certain circumstances. This problem doesn't exist for non-reentrant workqueues. While non-reentrancy check isn't free, the overhead is incurred only when a work item bounces across different CPUs and even in simulated pathological scenario the overhead isn't too high. All workqueues are made non-reentrant. This removes the distinction between flush_[delayed_]work() and flush_[delayed_]_work_sync(). The former is now as strong as the latter and the specified work item is guaranteed to have finished execution of any previous queueing on return. * In addition to the various bug fixes, Lai redid and simplified CPU hotplug handling significantly. * Joonsoo introduced system_highpri_wq and used it during CPU hotplug. There are two merge commits - one to pull in IRQ safe timer from tip/timers/core and the other to pull in CPU hotplug fixes from wq/for-3.6-fixes as Lai's hotplug restructuring depended on them." Fixed a number of trivial conflicts, but the more interesting conflicts were silent ones where the deprecated interfaces had been used by new code in the merge window, and thus didn't cause any real data conflicts. Tejun pointed out a few of them, I fixed a couple more. * 'for-3.7' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq: (46 commits) workqueue: remove spurious WARN_ON_ONCE(in_irq()) from try_to_grab_pending() workqueue: use cwq_set_max_active() helper for workqueue_set_max_active() workqueue: introduce cwq_set_max_active() helper for thaw_workqueues() workqueue: remove @delayed from cwq_dec_nr_in_flight() workqueue: fix possible stall on try_to_grab_pending() of a delayed work item workqueue: use hotcpu_notifier() for workqueue_cpu_down_callback() workqueue: use __cpuinit instead of __devinit for cpu callbacks workqueue: rename manager_mutex to assoc_mutex workqueue: WORKER_REBIND is no longer necessary for idle rebinding workqueue: WORKER_REBIND is no longer necessary for busy rebinding workqueue: reimplement idle worker rebinding workqueue: deprecate __cancel_delayed_work() workqueue: reimplement cancel_delayed_work() using try_to_grab_pending() workqueue: use mod_delayed_work() instead of __cancel + queue workqueue: use irqsafe timer for delayed_work workqueue: clean up delayed_work initializers and add missing one workqueue: make deferrable delayed_work initializer names consistent workqueue: cosmetic whitespace updates for macro definitions workqueue: deprecate system_nrt[_freezable]_wq workqueue: deprecate flush[_delayed]_work_sync() ...
Diffstat (limited to 'kernel/workqueue.c')
-rw-r--r--kernel/workqueue.c1217
1 files changed, 639 insertions, 578 deletions
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 3c5a79e2134..d951daa0ca9 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -58,7 +58,7 @@ enum {
* be executing on any CPU. The gcwq behaves as an unbound one.
*
* Note that DISASSOCIATED can be flipped only while holding
- * managership of all pools on the gcwq to avoid changing binding
+ * assoc_mutex of all pools on the gcwq to avoid changing binding
* state while create_worker() is in progress.
*/
GCWQ_DISASSOCIATED = 1 << 0, /* cpu can't serve workers */
@@ -73,11 +73,10 @@ enum {
WORKER_DIE = 1 << 1, /* die die die */
WORKER_IDLE = 1 << 2, /* is idle */
WORKER_PREP = 1 << 3, /* preparing to run works */
- WORKER_REBIND = 1 << 5, /* mom is home, come back */
WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
WORKER_UNBOUND = 1 << 7, /* worker is unbound */
- WORKER_NOT_RUNNING = WORKER_PREP | WORKER_REBIND | WORKER_UNBOUND |
+ WORKER_NOT_RUNNING = WORKER_PREP | WORKER_UNBOUND |
WORKER_CPU_INTENSIVE,
NR_WORKER_POOLS = 2, /* # worker pools per gcwq */
@@ -126,7 +125,6 @@ enum {
struct global_cwq;
struct worker_pool;
-struct idle_rebind;
/*
* The poor guys doing the actual heavy lifting. All on-duty workers
@@ -150,7 +148,6 @@ struct worker {
int id; /* I: worker id */
/* for rebinding worker to CPU */
- struct idle_rebind *idle_rebind; /* L: for idle worker */
struct work_struct rebind_work; /* L: for busy worker */
};
@@ -160,13 +157,15 @@ struct worker_pool {
struct list_head worklist; /* L: list of pending works */
int nr_workers; /* L: total number of workers */
+
+ /* nr_idle includes the ones off idle_list for rebinding */
int nr_idle; /* L: currently idle ones */
struct list_head idle_list; /* X: list of idle workers */
struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for workers */
- struct mutex manager_mutex; /* mutex manager should hold */
+ struct mutex assoc_mutex; /* protect GCWQ_DISASSOCIATED */
struct ida worker_ida; /* L: for worker IDs */
};
@@ -184,9 +183,8 @@ struct global_cwq {
struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
/* L: hash of busy workers */
- struct worker_pool pools[2]; /* normal and highpri pools */
-
- wait_queue_head_t rebind_hold; /* rebind hold wait */
+ struct worker_pool pools[NR_WORKER_POOLS];
+ /* normal and highpri pools */
} ____cacheline_aligned_in_smp;
/*
@@ -269,17 +267,15 @@ struct workqueue_struct {
};
struct workqueue_struct *system_wq __read_mostly;
-struct workqueue_struct *system_long_wq __read_mostly;
-struct workqueue_struct *system_nrt_wq __read_mostly;
-struct workqueue_struct *system_unbound_wq __read_mostly;
-struct workqueue_struct *system_freezable_wq __read_mostly;
-struct workqueue_struct *system_nrt_freezable_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
+struct workqueue_struct *system_highpri_wq __read_mostly;
+EXPORT_SYMBOL_GPL(system_highpri_wq);
+struct workqueue_struct *system_long_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_long_wq);
-EXPORT_SYMBOL_GPL(system_nrt_wq);
+struct workqueue_struct *system_unbound_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_unbound_wq);
+struct workqueue_struct *system_freezable_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_freezable_wq);
-EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
@@ -534,18 +530,24 @@ static int work_next_color(int color)
}
/*
- * A work's data points to the cwq with WORK_STRUCT_CWQ set while the
- * work is on queue. Once execution starts, WORK_STRUCT_CWQ is
- * cleared and the work data contains the cpu number it was last on.
+ * While queued, %WORK_STRUCT_CWQ is set and non flag bits of a work's data
+ * contain the pointer to the queued cwq. Once execution starts, the flag
+ * is cleared and the high bits contain OFFQ flags and CPU number.
*
- * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
- * cwq, cpu or clear work->data. These functions should only be
- * called while the work is owned - ie. while the PENDING bit is set.
+ * set_work_cwq(), set_work_cpu_and_clear_pending(), mark_work_canceling()
+ * and clear_work_data() can be used to set the cwq, cpu or clear
+ * work->data. These functions should only be called while the work is
+ * owned - ie. while the PENDING bit is set.
*
- * get_work_[g]cwq() can be used to obtain the gcwq or cwq
- * corresponding to a work. gcwq is available once the work has been
- * queued anywhere after initialization. cwq is available only from
- * queueing until execution starts.
+ * get_work_[g]cwq() can be used to obtain the gcwq or cwq corresponding to
+ * a work. gcwq is available once the work has been queued anywhere after
+ * initialization until it is sync canceled. cwq is available only while
+ * the work item is queued.
+ *
+ * %WORK_OFFQ_CANCELING is used to mark a work item which is being
+ * canceled. While being canceled, a work item may have its PENDING set
+ * but stay off timer and worklist for arbitrarily long and nobody should
+ * try to steal the PENDING bit.
*/
static inline void set_work_data(struct work_struct *work, unsigned long data,
unsigned long flags)
@@ -562,13 +564,22 @@ static void set_work_cwq(struct work_struct *work,
WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
}
-static void set_work_cpu(struct work_struct *work, unsigned int cpu)
+static void set_work_cpu_and_clear_pending(struct work_struct *work,
+ unsigned int cpu)
{
- set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
+ /*
+ * The following wmb is paired with the implied mb in
+ * test_and_set_bit(PENDING) and ensures all updates to @work made
+ * here are visible to and precede any updates by the next PENDING
+ * owner.
+ */
+ smp_wmb();
+ set_work_data(work, (unsigned long)cpu << WORK_OFFQ_CPU_SHIFT, 0);
}
static void clear_work_data(struct work_struct *work)
{
+ smp_wmb(); /* see set_work_cpu_and_clear_pending() */
set_work_data(work, WORK_STRUCT_NO_CPU, 0);
}
@@ -591,7 +602,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
return ((struct cpu_workqueue_struct *)
(data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq;
- cpu = data >> WORK_STRUCT_FLAG_BITS;
+ cpu = data >> WORK_OFFQ_CPU_SHIFT;
if (cpu == WORK_CPU_NONE)
return NULL;
@@ -599,6 +610,22 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
return get_gcwq(cpu);
}
+static void mark_work_canceling(struct work_struct *work)
+{
+ struct global_cwq *gcwq = get_work_gcwq(work);
+ unsigned long cpu = gcwq ? gcwq->cpu : WORK_CPU_NONE;
+
+ set_work_data(work, (cpu << WORK_OFFQ_CPU_SHIFT) | WORK_OFFQ_CANCELING,
+ WORK_STRUCT_PENDING);
+}
+
+static bool work_is_canceling(struct work_struct *work)
+{
+ unsigned long data = atomic_long_read(&work->data);
+
+ return !(data & WORK_STRUCT_CWQ) && (data & WORK_OFFQ_CANCELING);
+}
+
/*
* Policy functions. These define the policies on how the global worker
* pools are managed. Unless noted otherwise, these functions assume that
@@ -657,6 +684,13 @@ static bool too_many_workers(struct worker_pool *pool)
int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
int nr_busy = pool->nr_workers - nr_idle;
+ /*
+ * nr_idle and idle_list may disagree if idle rebinding is in
+ * progress. Never return %true if idle_list is empty.
+ */
+ if (list_empty(&pool->idle_list))
+ return false;
+
return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
}
@@ -903,6 +937,206 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
}
/**
+ * move_linked_works - move linked works to a list
+ * @work: start of series of works to be scheduled
+ * @head: target list to append @work to
+ * @nextp: out paramter for nested worklist walking
+ *
+ * Schedule linked works starting from @work to @head. Work series to
+ * be scheduled starts at @work and includes any consecutive work with
+ * WORK_STRUCT_LINKED set in its predecessor.
+ *
+ * If @nextp is not NULL, it's updated to point to the next work of
+ * the last scheduled work. This allows move_linked_works() to be
+ * nested inside outer list_for_each_entry_safe().
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void move_linked_works(struct work_struct *work, struct list_head *head,
+ struct work_struct **nextp)
+{
+ struct work_struct *n;
+
+ /*
+ * Linked worklist will always end before the end of the list,
+ * use NULL for list head.
+ */
+ list_for_each_entry_safe_from(work, n, NULL, entry) {
+ list_move_tail(&work->entry, head);
+ if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
+ break;
+ }
+
+ /*
+ * If we're already inside safe list traversal and have moved
+ * multiple works to the scheduled queue, the next position
+ * needs to be updated.
+ */
+ if (nextp)
+ *nextp = n;
+}
+
+static void cwq_activate_delayed_work(struct work_struct *work)
+{
+ struct cpu_workqueue_struct *cwq = get_work_cwq(work);
+
+ trace_workqueue_activate_work(work);
+ move_linked_works(work, &cwq->pool->worklist, NULL);
+ __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
+ cwq->nr_active++;
+}
+
+static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
+{
+ struct work_struct *work = list_first_entry(&cwq->delayed_works,
+ struct work_struct, entry);
+
+ cwq_activate_delayed_work(work);
+}
+
+/**
+ * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
+ * @cwq: cwq of interest
+ * @color: color of work which left the queue
+ *
+ * A work either has completed or is removed from pending queue,
+ * decrement nr_in_flight of its cwq and handle workqueue flushing.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
+{
+ /* ignore uncolored works */
+ if (color == WORK_NO_COLOR)
+ return;
+
+ cwq->nr_in_flight[color]--;
+
+ cwq->nr_active--;
+ if (!list_empty(&cwq->delayed_works)) {
+ /* one down, submit a delayed one */
+ if (cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
+ }
+
+ /* is flush in progress and are we at the flushing tip? */
+ if (likely(cwq->flush_color != color))
+ return;
+
+ /* are there still in-flight works? */
+ if (cwq->nr_in_flight[color])
+ return;
+
+ /* this cwq is done, clear flush_color */
+ cwq->flush_color = -1;
+
+ /*
+ * If this was the last cwq, wake up the first flusher. It
+ * will handle the rest.
+ */
+ if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
+ complete(&cwq->wq->first_flusher->done);
+}
+
+/**
+ * try_to_grab_pending - steal work item from worklist and disable irq
+ * @work: work item to steal
+ * @is_dwork: @work is a delayed_work
+ * @flags: place to store irq state
+ *
+ * Try to grab PENDING bit of @work. This function can handle @work in any
+ * stable state - idle, on timer or on worklist. Return values are
+ *
+ * 1 if @work was pending and we successfully stole PENDING
+ * 0 if @work was idle and we claimed PENDING
+ * -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
+ * -ENOENT if someone else is canceling @work, this state may persist
+ * for arbitrarily long
+ *
+ * On >= 0 return, the caller owns @work's PENDING bit. To avoid getting
+ * interrupted while holding PENDING and @work off queue, irq must be
+ * disabled on entry. This, combined with delayed_work->timer being
+ * irqsafe, ensures that we return -EAGAIN for finite short period of time.
+ *
+ * On successful return, >= 0, irq is disabled and the caller is
+ * responsible for releasing it using local_irq_restore(*@flags).
+ *
+ * This function is safe to call from any context including IRQ handler.
+ */
+static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
+ unsigned long *flags)
+{
+ struct global_cwq *gcwq;
+
+ local_irq_save(*flags);
+
+ /* try to steal the timer if it exists */
+ if (is_dwork) {
+ struct delayed_work *dwork = to_delayed_work(work);
+
+ /*
+ * dwork->timer is irqsafe. If del_timer() fails, it's
+ * guaranteed that the timer is not queued anywhere and not
+ * running on the local CPU.
+ */
+ if (likely(del_timer(&dwork->timer)))
+ return 1;
+ }
+
+ /* try to claim PENDING the normal way */
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
+ return 0;
+
+ /*
+ * The queueing is in progress, or it is already queued. Try to
+ * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
+ */
+ gcwq = get_work_gcwq(work);
+ if (!gcwq)
+ goto fail;
+
+ spin_lock(&gcwq->lock);
+ if (!list_empty(&work->entry)) {
+ /*
+ * This work is queued, but perhaps we locked the wrong gcwq.
+ * In that case we must see the new value after rmb(), see
+ * insert_work()->wmb().
+ */
+ smp_rmb();
+ if (gcwq == get_work_gcwq(work)) {
+ debug_work_deactivate(work);
+
+ /*
+ * A delayed work item cannot be grabbed directly
+ * because it might have linked NO_COLOR work items
+ * which, if left on the delayed_list, will confuse
+ * cwq->nr_active management later on and cause
+ * stall. Make sure the work item is activated
+ * before grabbing.
+ */
+ if (*work_data_bits(work) & WORK_STRUCT_DELAYED)
+ cwq_activate_delayed_work(work);
+
+ list_del_init(&work->entry);
+ cwq_dec_nr_in_flight(get_work_cwq(work),
+ get_work_color(work));
+
+ spin_unlock(&gcwq->lock);
+ return 1;
+ }
+ }
+ spin_unlock(&gcwq->lock);
+fail:
+ local_irq_restore(*flags);
+ if (work_is_canceling(work))
+ return -ENOENT;
+ cpu_relax();
+ return -EAGAIN;
+}
+
+/**
* insert_work - insert a work into gcwq
* @cwq: cwq @work belongs to
* @work: work to insert
@@ -982,7 +1216,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned int work_flags;
- unsigned long flags;
+ unsigned int req_cpu = cpu;
+
+ /*
+ * While a work item is PENDING && off queue, a task trying to
+ * steal the PENDING will busy-loop waiting for it to either get
+ * queued or lose PENDING. Grabbing PENDING and queueing should
+ * happen with IRQ disabled.
+ */
+ WARN_ON_ONCE(!irqs_disabled());
debug_work_activate(work);
@@ -995,21 +1237,22 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq;
- if (unlikely(cpu == WORK_CPU_UNBOUND))
+ if (cpu == WORK_CPU_UNBOUND)
cpu = raw_smp_processor_id();
/*
- * It's multi cpu. If @wq is non-reentrant and @work
- * was previously on a different cpu, it might still
- * be running there, in which case the work needs to
- * be queued on that cpu to guarantee non-reentrance.
+ * It's multi cpu. If @work was previously on a different
+ * cpu, it might still be running there, in which case the
+ * work needs to be queued on that cpu to guarantee
+ * non-reentrancy.
*/
gcwq = get_gcwq(cpu);
- if (wq->flags & WQ_NON_REENTRANT &&
- (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
+ last_gcwq = get_work_gcwq(work);
+
+ if (last_gcwq && last_gcwq != gcwq) {
struct worker *worker;
- spin_lock_irqsave(&last_gcwq->lock, flags);
+ spin_lock(&last_gcwq->lock);
worker = find_worker_executing_work(last_gcwq, work);
@@ -1017,22 +1260,23 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
gcwq = last_gcwq;
else {
/* meh... not running there, queue here */
- spin_unlock_irqrestore(&last_gcwq->lock, flags);
- spin_lock_irqsave(&gcwq->lock, flags);
+ spin_unlock(&last_gcwq->lock);
+ spin_lock(&gcwq->lock);
}
- } else
- spin_lock_irqsave(&gcwq->lock, flags);
+ } else {
+ spin_lock(&gcwq->lock);
+ }
} else {
gcwq = get_gcwq(WORK_CPU_UNBOUND);
- spin_lock_irqsave(&gcwq->lock, flags);
+ spin_lock(&gcwq->lock);
}
/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);
- trace_workqueue_queue_work(cpu, cwq, work);
+ trace_workqueue_queue_work(req_cpu, cwq, work);
if (WARN_ON(!list_empty(&work->entry))) {
- spin_unlock_irqrestore(&gcwq->lock, flags);
+ spin_unlock(&gcwq->lock);
return;
}
@@ -1050,79 +1294,110 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
insert_work(cwq, work, worklist, work_flags);
- spin_unlock_irqrestore(&gcwq->lock, flags);
+ spin_unlock(&gcwq->lock);
}
/**
- * queue_work - queue work on a workqueue
+ * queue_work_on - queue work on specific cpu
+ * @cpu: CPU number to execute work on
* @wq: workqueue to use
* @work: work to queue
*
- * Returns 0 if @work was already on a queue, non-zero otherwise.
+ * Returns %false if @work was already on a queue, %true otherwise.
*
- * We queue the work to the CPU on which it was submitted, but if the CPU dies
- * it can be processed by another CPU.
+ * We queue the work to a specific CPU, the caller must ensure it
+ * can't go away.
*/
-int queue_work(struct workqueue_struct *wq, struct work_struct *work)
+bool queue_work_on(int cpu, struct workqueue_struct *wq,
+ struct work_struct *work)
{
- int ret;
+ bool ret = false;
+ unsigned long flags;
- ret = queue_work_on(get_cpu(), wq, work);
- put_cpu();
+ local_irq_save(flags);
+
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ __queue_work(cpu, wq, work);
+ ret = true;
+ }
+ local_irq_restore(flags);
return ret;
}
-EXPORT_SYMBOL_GPL(queue_work);
+EXPORT_SYMBOL_GPL(queue_work_on);
/**
- * queue_work_on - queue work on specific cpu
- * @cpu: CPU number to execute work on
+ * queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue
*
- * Returns 0 if @work was already on a queue, non-zero otherwise.
+ * Returns %false if @work was already on a queue, %true otherwise.
*
- * We queue the work to a specific CPU, the caller must ensure it
- * can't go away.
+ * We queue the work to the CPU on which it was submitted, but if the CPU dies
+ * it can be processed by another CPU.
*/
-int
-queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
+bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
- int ret = 0;
-
- if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
- __queue_work(cpu, wq, work);
- ret = 1;
- }
- return ret;
+ return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}
-EXPORT_SYMBOL_GPL(queue_work_on);
+EXPORT_SYMBOL_GPL(queue_work);
-static void delayed_work_timer_fn(unsigned long __data)
+void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
- __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
+ /* should have been called from irqsafe timer with irq already off */
+ __queue_work(dwork->cpu, cwq->wq, &dwork->work);
}
+EXPORT_SYMBOL_GPL(delayed_work_timer_fn);
-/**
- * queue_delayed_work - queue work on a workqueue after delay
- * @wq: workqueue to use
- * @dwork: delayable work to queue
- * @delay: number of jiffies to wait before queueing
- *
- * Returns 0 if @work was already on a queue, non-zero otherwise.
- */
-int queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
+static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
+ struct delayed_work *dwork, unsigned long delay)
{
- if (delay == 0)
- return queue_work(wq, &dwork->work);
+ struct timer_list *timer = &dwork->timer;
+ struct work_struct *work = &dwork->work;
+ unsigned int lcpu;
+
+ WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
+ timer->data != (unsigned long)dwork);
+ BUG_ON(timer_pending(timer));
+ BUG_ON(!list_empty(&work->entry));
+
+ timer_stats_timer_set_start_info(&dwork->timer);
+
+ /*
+ * This stores cwq for the moment, for the timer_fn. Note that the
+ * work's gcwq is preserved to allow reentrance detection for
+ * delayed works.
+ */
+ if (!(wq->flags & WQ_UNBOUND)) {
+ struct global_cwq *gcwq = get_work_gcwq(work);
- return queue_delayed_work_on(-1, wq, dwork, delay);
+ /*
+ * If we cannot get the last gcwq from @work directly,
+ * select the last CPU such that it avoids unnecessarily
+ * triggering non-reentrancy check in __queue_work().
+ */
+ lcpu = cpu;
+ if (gcwq)
+ lcpu = gcwq->cpu;
+ if (lcpu == WORK_CPU_UNBOUND)
+ lcpu = raw_smp_processor_id();
+ } else {
+ lcpu = WORK_CPU_UNBOUND;
+ }
+
+ set_work_cwq(work, get_cwq(lcpu, wq), 0);
+
+ dwork->cpu = cpu;
+ timer->expires = jiffies + delay;
+
+ if (unlikely(cpu != WORK_CPU_UNBOUND))
+ add_timer_on(timer, cpu);
+ else
+ add_timer(timer);
}
-EXPORT_SYMBOL_GPL(queue_delayed_work);
/**
* queue_delayed_work_on - queue work on specific CPU after delay
@@ -1131,53 +1406,100 @@ EXPORT_SYMBOL_GPL(queue_delayed_work);
* @dwork: work to queue
* @delay: number of jiffies to wait before queueing
*
- * Returns 0 if @work was already on a queue, non-zero otherwise.
+ * Returns %false if @work was already on a queue, %true otherwise. If
+ * @delay is zero and @dwork is idle, it will be scheduled for immediate
+ * execution.
*/
-int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
+bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
+ struct delayed_work *dwork, unsigned long delay)
{
- int ret = 0;
- struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
+ bool ret = false;
+ unsigned long flags;
- if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
- unsigned int lcpu;
+ if (!delay)
+ return queue_work_on(cpu, wq, &dwork->work);
- BUG_ON(timer_pending(timer));
- BUG_ON(!list_empty(&work->entry));
+ /* read the comment in __queue_work() */
+ local_irq_save(flags);
- timer_stats_timer_set_start_info(&dwork->timer);
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ __queue_delayed_work(cpu, wq, dwork, delay);
+ ret = true;
+ }
- /*
- * This stores cwq for the moment, for the timer_fn.
- * Note that the work's gcwq is preserved to allow
- * reentrance detection for delayed works.
- */
- if (!(wq->flags & WQ_UNBOUND)) {
- struct global_cwq *gcwq = get_work_gcwq(work);
+ local_irq_restore(flags);
+ return ret;
+}
+EXPORT_SYMBOL_GPL(queue_delayed_work_on);
- if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
- lcpu = gcwq->cpu;
- else
- lcpu = raw_smp_processor_id();
- } else
- lcpu = WORK_CPU_UNBOUND;
+/**
+ * queue_delayed_work - queue work on a workqueue after delay
+ * @wq: workqueue to use
+ * @dwork: delayable work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
+ */
+bool queue_delayed_work(struct workqueue_struct *wq,
+ struct delayed_work *dwork, unsigned long delay)
+{
+ return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
+}
+EXPORT_SYMBOL_GPL(queue_delayed_work);
- set_work_cwq(work, get_cwq(lcpu, wq), 0);
+/**
+ * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
+ * @cpu: CPU number to execute work on
+ * @wq: workqueue to use
+ * @dwork: work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise,
+ * modify @dwork's timer so that it expires after @delay. If @delay is
+ * zero, @work is guaranteed to be scheduled immediately regardless of its
+ * current state.
+ *
+ * Returns %false if @dwork was idle and queued, %true if @dwork was
+ * pending and its timer was modified.
+ *
+ * This function is safe to call from any context including IRQ handler.
+ * See try_to_grab_pending() for details.
+ */
+bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
+ struct delayed_work *dwork, unsigned long delay)
+{
+ unsigned long flags;
+ int ret;
- timer->expires = jiffies + delay;
- timer->data = (unsigned long)dwork;
- timer->function = delayed_work_timer_fn;
+ do {
+ ret = try_to_grab_pending(&dwork->work, true, &flags);
+ } while (unlikely(ret == -EAGAIN));
- if (unlikely(cpu >= 0))
- add_timer_on(timer, cpu);
- else
- add_timer(timer);
- ret = 1;
+ if (likely(ret >= 0)) {
+ __queue_delayed_work(cpu, wq, dwork, delay);
+ local_irq_restore(flags);
}
+
+ /* -ENOENT from try_to_grab_pending() becomes %true */
return ret;
}
-EXPORT_SYMBOL_GPL(queue_delayed_work_on);
+EXPORT_SYMBOL_GPL(mod_delayed_work_on);
+
+/**
+ * mod_delayed_work - modify delay of or queue a delayed work
+ * @wq: workqueue to use
+ * @dwork: work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * mod_delayed_work_on() on local CPU.
+ */
+bool mod_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork,
+ unsigned long delay)
+{
+ return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
+}
+EXPORT_SYMBOL_GPL(mod_delayed_work);
/**
* worker_enter_idle - enter idle state
@@ -1305,37 +1627,21 @@ __acquires(&gcwq->lock)
}
}
-struct idle_rebind {
- int cnt; /* # workers to be rebound */
- struct completion done; /* all workers rebound */
-};
-
/*
- * Rebind an idle @worker to its CPU. During CPU onlining, this has to
- * happen synchronously for idle workers. worker_thread() will test
- * %WORKER_REBIND before leaving idle and call this function.
+ * Rebind an idle @worker to its CPU. worker_thread() will test
+ * list_empty(@worker->entry) before leaving idle and call this function.
*/
static void idle_worker_rebind(struct worker *worker)
{
struct global_cwq *gcwq = worker->pool->gcwq;
- /* CPU must be online at this point */
- WARN_ON(!worker_maybe_bind_and_lock(worker));
- if (!--worker->idle_rebind->cnt)
- complete(&worker->idle_rebind->done);
- spin_unlock_irq(&worker->pool->gcwq->lock);
+ /* CPU may go down again inbetween, clear UNBOUND only on success */
+ if (worker_maybe_bind_and_lock(worker))
+ worker_clr_flags(worker, WORKER_UNBOUND);
- /* we did our part, wait for rebind_workers() to finish up */
- wait_event(gcwq->rebind_hold, !(worker->flags & WORKER_REBIND));
-
- /*
- * rebind_workers() shouldn't finish until all workers passed the
- * above WORKER_REBIND wait. Tell it when done.
- */
- spin_lock_irq(&worker->pool->gcwq->lock);
- if (!--worker->idle_rebind->cnt)
- complete(&worker->idle_rebind->done);
- spin_unlock_irq(&worker->pool->gcwq->lock);
+ /* rebind complete, become available again */
+ list_add(&worker->entry, &worker->pool->idle_list);
+ spin_unlock_irq(&gcwq->lock);
}
/*
@@ -1349,16 +1655,8 @@ static void busy_worker_rebind_fn(struct work_struct *work)
struct worker *worker = container_of(work, struct worker, rebind_work);
struct global_cwq *gcwq = worker->pool->gcwq;
- worker_maybe_bind_and_lock(worker);
-
- /*
- * %WORKER_REBIND must be cleared even if the above binding failed;
- * otherwise, we may confuse the next CPU_UP cycle or oops / get
- * stuck by calling idle_worker_rebind() prematurely. If CPU went
- * down again inbetween, %WORKER_UNBOUND would be set, so clearing
- * %WORKER_REBIND is always safe.
- */
- worker_clr_flags(worker, WORKER_REBIND);
+ if (worker_maybe_bind_and_lock(worker))
+ worker_clr_flags(worker, WORKER_UNBOUND);
spin_unlock_irq(&gcwq->lock);
}
@@ -1370,123 +1668,74 @@ static void busy_worker_rebind_fn(struct work_struct *work)
* @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding
* is different for idle and busy ones.
*
- * The idle ones should be rebound synchronously and idle rebinding should
- * be complete before any worker starts executing work items with
- * concurrency management enabled; otherwise, scheduler may oops trying to
- * wake up non-local idle worker from wq_worker_sleeping().
+ * Idle ones will be removed from the idle_list and woken up. They will
+ * add themselves back after completing rebind. This ensures that the
+ * idle_list doesn't contain any unbound workers when re-bound busy workers
+ * try to perform local wake-ups for concurrency management.
*
- * This is achieved by repeatedly requesting rebinding until all idle
- * workers are known to have been rebound under @gcwq->lock and holding all
- * idle workers from becoming busy until idle rebinding is complete.
+ * Busy workers can rebind after they finish their current work items.
+ * Queueing the rebind work item at the head of the scheduled list is
+ * enough. Note that nr_running will be properly bumped as busy workers
+ * rebind.
*
- * Once idle workers are rebound, busy workers can be rebound as they
- * finish executing their current work items. Queueing the rebind work at
- * the head of their scheduled lists is enough. Note that nr_running will
- * be properbly bumped as busy workers rebind.
- *
- * On return, all workers are guaranteed to either be bound or have rebind
- * work item scheduled.
+ * On return, all non-manager workers are scheduled for rebind - see
+ * manage_workers() for the manager special case. Any idle worker
+ * including the manager will not appear on @idle_list until rebind is
+ * complete, making local wake-ups safe.
*/
static void rebind_workers(struct global_cwq *gcwq)
- __releases(&gcwq->lock) __acquires(&gcwq->lock)
{
- struct idle_rebind idle_rebind;
struct worker_pool *pool;
- struct worker *worker;
+ struct worker *worker, *n;
struct hlist_node *pos;
int i;
lockdep_assert_held(&gcwq->lock);
for_each_worker_pool(pool, gcwq)
- lockdep_assert_held(&pool->manager_mutex);
+ lockdep_assert_held(&pool->assoc_mutex);
- /*
- * Rebind idle workers. Interlocked both ways. We wait for
- * workers to rebind via @idle_rebind.done. Workers will wait for
- * us to finish up by watching %WORKER_REBIND.
- */
- init_completion(&idle_rebind.done);
-retry:
- idle_rebind.cnt = 1;
- INIT_COMPLETION(idle_rebind.done);
-
- /* set REBIND and kick idle ones, we'll wait for these later */
+ /* dequeue and kick idle ones */
for_each_worker_pool(pool, gcwq) {
- list_for_each_entry(worker, &pool->idle_list, entry) {
- unsigned long worker_flags = worker->flags;
-
- if (worker->flags & WORKER_REBIND)
- continue;
-
- /* morph UNBOUND to REBIND atomically */
- worker_flags &= ~WORKER_UNBOUND;
- worker_flags |= WORKER_REBIND;
- ACCESS_ONCE(worker->flags) = worker_flags;
-
- idle_rebind.cnt++;
- worker->idle_rebind = &idle_rebind;
+ list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
+ /*
+ * idle workers should be off @pool->idle_list
+ * until rebind is complete to avoid receiving
+ * premature local wake-ups.
+ */
+ list_del_init(&worker->entry);
- /* worker_thread() will call idle_worker_rebind() */
+ /*
+ * worker_thread() will see the above dequeuing
+ * and call idle_worker_rebind().
+ */
wake_up_process(worker->task);
}
}
- if (--idle_rebind.cnt) {
- spin_unlock_irq(&gcwq->lock);
- wait_for_completion(&idle_rebind.done);
- spin_lock_irq(&gcwq->lock);
- /* busy ones might have become idle while waiting, retry */
- goto retry;
- }
-
- /* all idle workers are rebound, rebind busy workers */
+ /* rebind busy workers */
for_each_busy_worker(worker, i, pos, gcwq) {
struct work_struct *rebind_work = &worker->rebind_work;
- unsigned long worker_flags = worker->flags;
-
- /* morph UNBOUND to REBIND atomically */
- worker_flags &= ~WORKER_UNBOUND;
- worker_flags |= WORKER_REBIND;
- ACCESS_ONCE(worker->flags) = worker_flags;
+ struct workqueue_struct *wq;
if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
work_data_bits(rebind_work)))
continue;
- /* wq doesn't matter, use the default one */
debug_work_activate(rebind_work);
- insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
- worker->scheduled.next,
- work_color_to_flags(WORK_NO_COLOR));
- }
-
- /*
- * All idle workers are rebound and waiting for %WORKER_REBIND to
- * be cleared inside idle_worker_rebind(). Clear and release.
- * Clearing %WORKER_REBIND from this foreign context is safe
- * because these workers are still guaranteed to be idle.
- *
- * We need to make sure all idle workers passed WORKER_REBIND wait
- * in idle_worker_rebind() before returning; otherwise, workers can
- * get stuck at the wait if hotplug cycle repeats.
- */
- idle_rebind.cnt = 1;
- INIT_COMPLETION(idle_rebind.done);
-
- for_each_worker_pool(pool, gcwq) {
- list_for_each_entry(worker, &pool->idle_list, entry) {
- worker->flags &= ~WORKER_REBIND;
- idle_rebind.cnt++;
- }
- }
- wake_up_all(&gcwq->rebind_hold);
+ /*
+ * wq doesn't really matter but let's keep @worker->pool
+ * and @cwq->pool consistent for sanity.
+ */
+ if (worker_pool_pri(worker->pool))
+ wq = system_highpri_wq;
+ else
+ wq = system_wq;
- if (--idle_rebind.cnt) {
- spin_unlock_irq(&gcwq->lock);
- wait_for_completion(&idle_rebind.done);
- spin_lock_irq(&gcwq->lock);
+ insert_work(get_cwq(gcwq->cpu, wq), rebind_work,
+ worker->scheduled.next,
+ work_color_to_flags(WORK_NO_COLOR));
}
}
@@ -1844,22 +2093,22 @@ static bool manage_workers(struct worker *worker)
* grab %POOL_MANAGING_WORKERS to achieve this because that can
* lead to idle worker depletion (all become busy thinking someone
* else is managing) which in turn can result in deadlock under
- * extreme circumstances. Use @pool->manager_mutex to synchronize
+ * extreme circumstances. Use @pool->assoc_mutex to synchronize
* manager against CPU hotplug.
*
- * manager_mutex would always be free unless CPU hotplug is in
+ * assoc_mutex would always be free unless CPU hotplug is in
* progress. trylock first without dropping @gcwq->lock.
*/
- if (unlikely(!mutex_trylock(&pool->manager_mutex))) {
+ if (unlikely(!mutex_trylock(&pool->assoc_mutex))) {
spin_unlock_irq(&pool->gcwq->lock);
- mutex_lock(&pool->manager_mutex);
+ mutex_lock(&pool->assoc_mutex);
/*
* CPU hotplug could have happened while we were waiting
- * for manager_mutex. Hotplug itself can't handle us
+ * for assoc_mutex. Hotplug itself can't handle us
* because manager isn't either on idle or busy list, and
* @gcwq's state and ours could have deviated.
*
- * As hotplug is now excluded via manager_mutex, we can
+ * As hotplug is now excluded via assoc_mutex, we can
* simply try to bind. It will succeed or fail depending
* on @gcwq's current state. Try it and adjust
* %WORKER_UNBOUND accordingly.
@@ -1882,112 +2131,11 @@ static bool manage_workers(struct worker *worker)
ret |= maybe_create_worker(pool);
pool->flags &= ~POOL_MANAGING_WORKERS;
- mutex_unlock(&pool->manager_mutex);
+ mutex_unlock(&pool->assoc_mutex);
return ret;
}
/**
- * move_linked_works - move linked works to a list
- * @work: start of series of works to be scheduled
- * @head: target list to append @work to
- * @nextp: out paramter for nested worklist walking
- *
- * Schedule linked works starting from @work to @head. Work series to
- * be scheduled starts at @work and includes any consecutive work with
- * WORK_STRUCT_LINKED set in its predecessor.
- *
- * If @nextp is not NULL, it's updated to point to the next work of
- * the last scheduled work. This allows move_linked_works() to be
- * nested inside outer list_for_each_entry_safe().
- *
- * CONTEXT:
- * spin_lock_irq(gcwq->lock).
- */
-static void move_linked_works(struct work_struct *work, struct list_head *head,
- struct work_struct **nextp)
-{
- struct work_struct *n;
-
- /*
- * Linked worklist will always end before the end of the list,
- * use NULL for list head.
- */
- list_for_each_entry_safe_from(work, n, NULL, entry) {
- list_move_tail(&work->entry, head);
- if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
- break;
- }
-
- /*
- * If we're already inside safe list traversal and have moved
- * multiple works to the scheduled queue, the next position
- * needs to be updated.
- */
- if (nextp)
- *nextp = n;
-}
-
-static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
-{
- struct work_struct *work = list_first_entry(&cwq->delayed_works,
- struct work_struct, entry);
-
- trace_workqueue_activate_work(work);
- move_linked_works(work, &cwq->pool->worklist, NULL);
- __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
- cwq->nr_active++;
-}
-
-/**
- * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
- * @cwq: cwq of interest
- * @color: color of work which left the queue
- * @delayed: for a delayed work
- *
- * A work either has completed or is removed from pending queue,
- * decrement nr_in_flight of its cwq and handle workqueue flushing.
- *
- * CONTEXT:
- * spin_lock_irq(gcwq->lock).
- */
-static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color,
- bool delayed)
-{
- /* ignore uncolored works */
- if (color == WORK_NO_COLOR)
- return;
-
- cwq->nr_in_flight[color]--;
-
- if (!delayed) {
- cwq->nr_active--;
- if (!list_empty(&cwq->delayed_works)) {
- /* one down, submit a delayed one */
- if (cwq->nr_active < cwq->max_active)
- cwq_activate_first_delayed(cwq);
- }
- }
-
- /* is flush in progress and are we at the flushing tip? */
- if (likely(cwq->flush_color != color))
- return;
-
- /* are there still in-flight works? */
- if (cwq->nr_in_flight[color])
- return;
-
- /* this cwq is done, clear flush_color */
- cwq->flush_color = -1;
-
- /*
- * If this was the last cwq, wake up the first flusher. It
- * will handle the rest.
- */
- if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
- complete(&cwq->wq->first_flusher->done);
-}
-
-/**
* process_one_work - process single work
* @worker: self
* @work: work to process
@@ -2030,7 +2178,7 @@ __acquires(&gcwq->lock)
* necessary to avoid spurious warnings from rescuers servicing the
* unbound or a disassociated gcwq.
*/
- WARN_ON_ONCE(!(worker->flags & (WORKER_UNBOUND | WORKER_REBIND)) &&
+ WARN_ON_ONCE(!(worker->flags & WORKER_UNBOUND) &&
!(gcwq->flags & GCWQ_DISASSOCIATED) &&
raw_smp_processor_id() != gcwq->cpu);
@@ -2046,15 +2194,13 @@ __acquires(&gcwq->lock)
return;
}
- /* claim and process */
+ /* claim and dequeue */
debug_work_deactivate(work);
hlist_add_head(&worker->hentry, bwh);
worker->current_work = work;
worker->current_cwq = cwq;
work_color = get_work_color(work);
- /* record the current cpu number in the work data and dequeue */
- set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry);
/*
@@ -2071,9 +2217,16 @@ __acquires(&gcwq->lock)
if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool))
wake_up_worker(pool);
+ /*
+ * Record the last CPU and clear PENDING which should be the last
+ * update to @work. Also, do this inside @gcwq->lock so that
+ * PENDING and queued state changes happen together while IRQ is
+ * disabled.
+ */
+ set_work_cpu_and_clear_pending(work, gcwq->cpu);
+
spin_unlock_irq(&gcwq->lock);
- work_clear_pending(work);
lock_map_acquire_read(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
trace_workqueue_execute_start(work);
@@ -2087,11 +2240,9 @@ __acquires(&gcwq->lock)
lock_map_release(&cwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
- printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
- "%s/0x%08x/%d\n",
- current->comm, preempt_count(), task_pid_nr(current));
- printk(KERN_ERR " last function: ");
- print_symbol("%s\n", (unsigned long)f);
+ pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
+ " last function: %pf\n",
+ current->comm, preempt_count(), task_pid_nr(current), f);
debug_show_held_locks(current);
dump_stack();
}
@@ -2106,7 +2257,7 @@ __acquires(&gcwq->lock)
hlist_del_init(&worker->hentry);
worker->current_work = NULL;
worker->current_cwq = NULL;
- cwq_dec_nr_in_flight(cwq, work_color, false);
+ cwq_dec_nr_in_flight(cwq, work_color);
}
/**
@@ -2151,18 +2302,17 @@ static int worker_thread(void *__worker)
woke_up:
spin_lock_irq(&gcwq->lock);
- /*
- * DIE can be set only while idle and REBIND set while busy has
- * @worker->rebind_work scheduled. Checking here is enough.
- */
- if (unlikely(worker->flags & (WORKER_REBIND | WORKER_DIE))) {
+ /* we are off idle list if destruction or rebind is requested */
+ if (unlikely(list_empty(&worker->entry))) {
spin_unlock_irq(&gcwq->lock);
+ /* if DIE is set, destruction is requested */
if (worker->flags & WORKER_DIE) {
worker->task->flags &= ~PF_WQ_WORKER;
return 0;
}
+ /* otherwise, rebind */
idle_worker_rebind(worker);
goto woke_up;
}
@@ -2645,8 +2795,8 @@ reflush:
if (++flush_cnt == 10 ||
(flush_cnt % 100 == 0 && flush_cnt <= 1000))
- pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n",
- wq->name, flush_cnt);
+ pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
+ wq->name, flush_cnt);
goto reflush;
}
@@ -2657,8 +2807,7 @@ reflush:
}
EXPORT_SYMBOL_GPL(drain_workqueue);
-static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
- bool wait_executing)
+static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
{
struct worker *worker = NULL;
struct global_cwq *gcwq;
@@ -2680,13 +2829,12 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
cwq = get_work_cwq(work);
if (unlikely(!cwq || gcwq != cwq->pool->gcwq))
goto already_gone;
- } else if (wait_executing) {
+ } else {
worker = find_worker_executing_work(gcwq, work);
if (!worker)
goto already_gone;
cwq = worker->current_cwq;
- } else
- goto already_gone;
+ }
insert_wq_barrier(cwq, barr, work, worker);
spin_unlock_irq(&gcwq->lock);
@@ -2713,15 +2861,8 @@ already_gone:
* flush_work - wait for a work to finish executing the last queueing instance
* @work: the work to flush
*
- * Wait until @work has finished execution. This function considers
- * only the last queueing instance of @work. If @work has been
- * enqueued across different CPUs on a non-reentrant workqueue or on
- * multiple workqueues, @work might still be executing on return on
- * some of the CPUs from earlier queueing.
- *
- * If @work was queued only on a non-reentrant, ordered or unbound
- * workqueue, @work is guaranteed to be idle on return if it hasn't
- * been requeued since flush started.
+ * Wait until @work has finished execution. @work is guaranteed to be idle
+ * on return if it hasn't been requeued since flush started.
*
* RETURNS:
* %true if flush_work() waited for the work to finish execution,
@@ -2734,140 +2875,36 @@ bool flush_work(struct work_struct *work)
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
- if (start_flush_work(work, &barr, true)) {
+ if (start_flush_work(work, &barr)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return true;
- } else
- return false;
-}
-EXPORT_SYMBOL_GPL(flush_work);
-
-static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
-{
- struct wq_barrier barr;
- struct worker *worker;
-
- spin_lock_irq(&gcwq->lock);
-
- worker = find_worker_executing_work(gcwq, work);
- if (unlikely(worker))
- insert_wq_barrier(worker->current_cwq, &barr, work, worker);
-
- spin_unlock_irq(&gcwq->lock);
-
- if (unlikely(worker)) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
- return true;
- } else
+ } else {
return false;
-}
-
-static bool wait_on_work(struct work_struct *work)
-{
- bool ret = false;
- int cpu;
-
- might_sleep();
-
- lock_map_acquire(&work->lockdep_map);
- lock_map_release(&work->lockdep_map);
-
- for_each_gcwq_cpu(cpu)
- ret |= wait_on_cpu_work(get_gcwq(cpu), work);
- return ret;
-}
-
-/**
- * flush_work_sync - wait until a work has finished execution
- * @work: the work to flush
- *
- * Wait until @work has finished execution. On return, it's
- * guaranteed that all queueing instances of @work which happened
- * before this function is called are finished. In other words, if
- * @work hasn't been requeued since this function was called, @work is
- * guaranteed to be idle on return.
- *
- * RETURNS:
- * %true if flush_work_sync() waited for the work to finish execution,
- * %false if it was already idle.
- */
-bool flush_work_sync(struct work_struct *work)
-{
- struct wq_barrier barr;
- bool pending, waited;
-
- /* we'll wait for executions separately, queue barr only if pending */
- pending = start_flush_work(work, &barr, false);
-
- /* wait for executions to finish */
- waited = wait_on_work(work);
-
- /* wait for the pending one */
- if (pending) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
}
-
- return pending || waited;
-}
-EXPORT_SYMBOL_GPL(flush_work_sync);
-
-/*
- * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
- * so this work can't be re-armed in any way.
- */
-static int try_to_grab_pending(struct work_struct *work)
-{
- struct global_cwq *gcwq;
- int ret = -1;
-
- if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
- return 0;
-
- /*
- * The queueing is in progress, or it is already queued. Try to
- * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
- */
- gcwq = get_work_gcwq(work);
- if (!gcwq)
- return ret;
-
- spin_lock_irq(&gcwq->lock);
- if (!list_empty(&work->entry)) {
- /*
- * This work is queued, but perhaps we locked the wrong gcwq.
- * In that case we must see the new value after rmb(), see
- * insert_work()->wmb().
- */
- smp_rmb();
- if (gcwq == get_work_gcwq(work)) {
- debug_work_deactivate(work);
- list_del_init(&work->entry);
- cwq_dec_nr_in_flight(get_work_cwq(work),
- get_work_color(work),
- *work_data_bits(work) & WORK_STRUCT_DELAYED);
- ret = 1;
- }
- }
- spin_unlock_irq(&gcwq->lock);
-
- return ret;
}
+EXPORT_SYMBOL_GPL(flush_work);
-static bool __cancel_work_timer(struct work_struct *work,
- struct timer_list* timer)
+static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
{
+ unsigned long flags;
int ret;
do {
- ret = (timer && likely(del_timer(timer)));
- if (!ret)
- ret = try_to_grab_pending(work);
- wait_on_work(work);
+ ret = try_to_grab_pending(work, is_dwork, &flags);
+ /*
+ * If someone else is canceling, wait for the same event it
+ * would be waiting for before retrying.
+ */
+ if (unlikely(ret == -ENOENT))
+ flush_work(work);
} while (unlikely(ret < 0));
+ /* tell other tasks trying to grab @work to back off */
+ mark_work_canceling(work);
+ local_irq_restore(flags);
+
+ flush_work(work);
clear_work_data(work);
return ret;
}
@@ -2892,7 +2929,7 @@ static bool __cancel_work_timer(struct work_struct *work,
*/
bool cancel_work_sync(struct work_struct *work)
{
- return __cancel_work_timer(work, NULL);
+ return __cancel_work_timer(work, false);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
@@ -2910,33 +2947,44 @@ EXPORT_SYMBOL_GPL(cancel_work_sync);
*/
bool flush_delayed_work(struct delayed_work *dwork)
{
+ local_irq_disable();
if (del_timer_sync(&dwork->timer))
- __queue_work(raw_smp_processor_id(),
+ __queue_work(dwork->cpu,
get_work_cwq(&dwork->work)->wq, &dwork->work);
+ local_irq_enable();
return flush_work(&dwork->work);
}
EXPORT_SYMBOL(flush_delayed_work);
/**
- * flush_delayed_work_sync - wait for a dwork to finish
- * @dwork: the delayed work to flush
+ * cancel_delayed_work - cancel a delayed work
+ * @dwork: delayed_work to cancel
*
- * Delayed timer is cancelled and the pending work is queued for
- * execution immediately. Other than timer handling, its behavior
- * is identical to flush_work_sync().
+ * Kill off a pending delayed_work. Returns %true if @dwork was pending
+ * and canceled; %false if wasn't pending. Note that the work callback
+ * function may still be running on return, unless it returns %true and the
+ * work doesn't re-arm itself. Explicitly flush or use
+ * cancel_delayed_work_sync() to wait on it.
*
- * RETURNS:
- * %true if flush_work_sync() waited for the work to finish execution,
- * %false if it was already idle.
+ * This function is safe to call from any context including IRQ handler.
*/
-bool flush_delayed_work_sync(struct delayed_work *dwork)
+bool cancel_delayed_work(struct delayed_work *dwork)
{
- if (del_timer_sync(&dwork->timer))
- __queue_work(raw_smp_processor_id(),
- get_work_cwq(&dwork->work)->wq, &dwork->work);
- return flush_work_sync(&dwork->work);
+ unsigned long flags;
+ int ret;
+
+ do {
+ ret = try_to_grab_pending(&dwork->work, true, &flags);
+ } while (unlikely(ret == -EAGAIN));
+
+ if (unlikely(ret < 0))
+ return false;
+
+ set_work_cpu_and_clear_pending(&dwork->work, work_cpu(&dwork->work));
+ local_irq_restore(flags);
+ return true;
}
-EXPORT_SYMBOL(flush_delayed_work_sync);
+EXPORT_SYMBOL(cancel_delayed_work);
/**
* cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
@@ -2949,54 +2997,39 @@ EXPORT_SYMBOL(flush_delayed_work_sync);
*/
bool cancel_delayed_work_sync(struct delayed_work *dwork)
{
- return __cancel_work_timer(&dwork->work, &dwork->timer);
+ return __cancel_work_timer(&dwork->work, true);
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
/**
- * schedule_work - put work task in global workqueue
- * @work: job to be done
- *
- * Returns zero if @work was already on the kernel-global workqueue and
- * non-zero otherwise.
- *
- * This puts a job in the kernel-global workqueue if it was not already
- * queued and leaves it in the same position on the kernel-global
- * workqueue otherwise.
- */
-int schedule_work(struct work_struct *work)
-{
- return queue_work(system_wq, work);
-}
-EXPORT_SYMBOL(schedule_work);
-
-/*
* schedule_work_on - put work task on a specific cpu
* @cpu: cpu to put the work task on
* @work: job to be done
*
* This puts a job on a specific cpu
*/
-int schedule_work_on(int cpu, struct work_struct *work)
+bool schedule_work_on(int cpu, struct work_struct *work)
{
return queue_work_on(cpu, system_wq, work);
}
EXPORT_SYMBOL(schedule_work_on);
/**
- * schedule_delayed_work - put work task in global workqueue after delay
- * @dwork: job to be done
- * @delay: number of jiffies to wait or 0 for immediate execution
+ * schedule_work - put work task in global workqueue
+ * @work: job to be done
*
- * After waiting for a given time this puts a job in the kernel-global
- * workqueue.
+ * Returns %false if @work was already on the kernel-global workqueue and
+ * %true otherwise.
+ *
+ * This puts a job in the kernel-global workqueue if it was not already
+ * queued and leaves it in the same position on the kernel-global
+ * workqueue otherwise.
*/
-int schedule_delayed_work(struct delayed_work *dwork,
- unsigned long delay)
+bool schedule_work(struct work_struct *work)
{
- return queue_delayed_work(system_wq, dwork, delay);
+ return queue_work(system_wq, work);
}
-EXPORT_SYMBOL(schedule_delayed_work);
+EXPORT_SYMBOL(schedule_work);
/**
* schedule_delayed_work_on - queue work in global workqueue on CPU after delay
@@ -3007,14 +3040,28 @@ EXPORT_SYMBOL(schedule_delayed_work);
* After waiting for a given time this puts a job in the kernel-global
* workqueue on the specified CPU.
*/
-int schedule_delayed_work_on(int cpu,
- struct delayed_work *dwork, unsigned long delay)
+bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
+ unsigned long delay)
{
return queue_delayed_work_on(cpu, system_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work_on);
/**
+ * schedule_delayed_work - put work task in global workqueue after delay
+ * @dwork: job to be done
+ * @delay: number of jiffies to wait or 0 for immediate execution
+ *
+ * After waiting for a given time this puts a job in the kernel-global
+ * workqueue.
+ */
+bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
+{
+ return queue_delayed_work(system_wq, dwork, delay);
+}
+EXPORT_SYMBOL(schedule_delayed_work);
+
+/**
* schedule_on_each_cpu - execute a function synchronously on each online CPU
* @func: the function to call
*
@@ -3161,9 +3208,8 @@ static int wq_clamp_max_active(int max_active, unsigned int flags,
int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
if (max_active < 1 || max_active > lim)
- printk(KERN_WARNING "workqueue: max_active %d requested for %s "
- "is out of range, clamping between %d and %d\n",
- max_active, name, 1, lim);
+ pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
+ max_active, name, 1, lim);
return clamp_val(max_active, 1, lim);
}
@@ -3319,6 +3365,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
EXPORT_SYMBOL_GPL(destroy_workqueue);
/**
+ * cwq_set_max_active - adjust max_active of a cwq
+ * @cwq: target cpu_workqueue_struct
+ * @max_active: new max_active value.
+ *
+ * Set @cwq->max_active to @max_active and activate delayed works if
+ * increased.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void cwq_set_max_active(struct cpu_workqueue_struct *cwq, int max_active)
+{
+ cwq->max_active = max_active;
+
+ while (!list_empty(&cwq->delayed_works) &&
+ cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
+}
+
+/**
* workqueue_set_max_active - adjust max_active of a workqueue
* @wq: target workqueue
* @max_active: new max_active value.
@@ -3345,7 +3411,7 @@ void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
if (!(wq->flags & WQ_FREEZABLE) ||
!(gcwq->flags & GCWQ_FREEZING))
- get_cwq(gcwq->cpu, wq)->max_active = max_active;
+ cwq_set_max_active(get_cwq(gcwq->cpu, wq), max_active);
spin_unlock_irq(&gcwq->lock);
}
@@ -3440,23 +3506,23 @@ EXPORT_SYMBOL_GPL(work_busy);
*/
/* claim manager positions of all pools */
-static void gcwq_claim_management_and_lock(struct global_cwq *gcwq)
+static void gcwq_claim_assoc_and_lock(struct global_cwq *gcwq)
{
struct worker_pool *pool;
for_each_worker_pool(pool, gcwq)
- mutex_lock_nested(&pool->manager_mutex, pool - gcwq->pools);
+ mutex_lock_nested(&pool->assoc_mutex, pool - gcwq->pools);
spin_lock_irq(&gcwq->lock);
}
/* release manager positions */
-static void gcwq_release_management_and_unlock(struct global_cwq *gcwq)
+static void gcwq_release_assoc_and_unlock(struct global_cwq *gcwq)
{
struct worker_pool *pool;
spin_unlock_irq(&gcwq->lock);
for_each_worker_pool(pool, gcwq)
- mutex_unlock(&pool->manager_mutex);
+ mutex_unlock(&pool->assoc_mutex);
}
static void gcwq_unbind_fn(struct work_struct *work)
@@ -3469,7 +3535,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
BUG_ON(gcwq->cpu != smp_processor_id());
- gcwq_claim_management_and_lock(gcwq);
+ gcwq_claim_assoc_and_lock(gcwq);
/*
* We've claimed all manager positions. Make all workers unbound
@@ -3486,7 +3552,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
gcwq->flags |= GCWQ_DISASSOCIATED;
- gcwq_release_management_and_unlock(gcwq);
+ gcwq_release_assoc_and_unlock(gcwq);
/*
* Call schedule() so that we cross rq->lock and thus can guarantee
@@ -3514,7 +3580,7 @@ static void gcwq_unbind_fn(struct work_struct *work)
* Workqueues should be brought up before normal priority CPU notifiers.
* This will be registered high priority CPU notifier.
*/
-static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
+static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
@@ -3542,10 +3608,10 @@ static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
case CPU_DOWN_FAILED:
case CPU_ONLINE:
- gcwq_claim_management_and_lock(gcwq);
+ gcwq_claim_assoc_and_lock(gcwq);
gcwq->flags &= ~GCWQ_DISASSOCIATED;
rebind_workers(gcwq);
- gcwq_release_management_and_unlock(gcwq);
+ gcwq_release_assoc_and_unlock(gcwq);
break;
}
return NOTIFY_OK;
@@ -3555,7 +3621,7 @@ static int __devinit workqueue_cpu_up_callback(struct notifier_block *nfb,
* Workqueues should be brought down after normal priority CPU notifiers.
* This will be registered as low priority CPU notifier.
*/
-static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb,
+static int __cpuinit workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
@@ -3566,7 +3632,7 @@ static int __devinit workqueue_cpu_down_callback(struct notifier_block *nfb,
case CPU_DOWN_PREPARE:
/* unbinding should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, gcwq_unbind_fn);
- schedule_work_on(cpu, &unbind_work);
+ queue_work_on(cpu, system_highpri_wq, &unbind_work);
flush_work(&unbind_work);
break;
}
@@ -3735,11 +3801,7 @@ void thaw_workqueues(void)
continue;
/* restore max_active and repopulate worklist */
- cwq->max_active = wq->saved_max_active;
-
- while (!list_empty(&cwq->delayed_works) &&
- cwq->nr_active < cwq->max_active)
- cwq_activate_first_delayed(cwq);
+ cwq_set_max_active(cwq, wq->saved_max_active);
}
for_each_worker_pool(pool, gcwq)
@@ -3759,8 +3821,12 @@ static int __init init_workqueues(void)
unsigned int cpu;
int i;
+ /* make sure we have enough bits for OFFQ CPU number */
+ BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_CPU_SHIFT)) <
+ WORK_CPU_LAST);
+
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
- cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
+ hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
/* initialize gcwqs */
for_each_gcwq_cpu(cpu) {
@@ -3786,11 +3852,9 @@ static int __init init_workqueues(void)
setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
(unsigned long)pool);
- mutex_init(&pool->manager_mutex);
+ mutex_init(&pool->assoc_mutex);
ida_init(&pool->worker_ida);
}
-
- init_waitqueue_head(&gcwq->rebind_hold);
}
/* create the initial worker */
@@ -3813,17 +3877,14 @@ static int __init init_workqueues(void)
}
system_wq = alloc_workqueue("events", 0, 0);
+ system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
- system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0);
- system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable",
- WQ_NON_REENTRANT | WQ_FREEZABLE, 0);
- BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
- !system_unbound_wq || !system_freezable_wq ||
- !system_nrt_freezable_wq);
+ BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
+ !system_unbound_wq || !system_freezable_wq);
return 0;
}
early_initcall(init_workqueues);