diff options
-rw-r--r-- | include/linux/workqueue.h | 6 | ||||
-rw-r--r-- | kernel/workqueue.c | 135 |
2 files changed, 103 insertions, 38 deletions
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h index ab0b7fb99bc..10611f7fc80 100644 --- a/include/linux/workqueue.h +++ b/include/linux/workqueue.h @@ -221,7 +221,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; } enum { WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */ - WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */ + WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */ }; extern struct workqueue_struct * @@ -250,9 +250,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active, #define create_workqueue(name) \ __create_workqueue((name), 0, 1) #define create_freezeable_workqueue(name) \ - __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1) + __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU, 1) #define create_singlethread_workqueue(name) \ - __create_workqueue((name), WQ_SINGLE_THREAD, 1) + __create_workqueue((name), WQ_SINGLE_CPU, 1) extern void destroy_workqueue(struct workqueue_struct *wq); diff --git a/kernel/workqueue.c b/kernel/workqueue.c index f57855f718d..cfb8aa567e1 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -114,8 +114,7 @@ struct global_cwq { } ____cacheline_aligned_in_smp; /* - * The per-CPU workqueue (if single thread, we always use the first - * possible cpu). The lower WORK_STRUCT_FLAG_BITS of + * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of * work_struct->data are used for flags and thus cwqs need to be * aligned at two's power of the number of flag bits. */ @@ -159,6 +158,8 @@ struct workqueue_struct { struct list_head flusher_queue; /* F: flush waiters */ struct list_head flusher_overflow; /* F: flush overflow list */ + unsigned long single_cpu; /* cpu for single cpu wq */ + int saved_max_active; /* I: saved cwq max_active */ const char *name; /* I: workqueue name */ #ifdef CONFIG_LOCKDEP @@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq); static int worker_thread(void *__worker); -static int singlethread_cpu __read_mostly; - static struct global_cwq *get_gcwq(unsigned int cpu) { return &per_cpu(global_cwq, cpu); @@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, return per_cpu_ptr(wq->cpu_wq, cpu); } -static struct cpu_workqueue_struct *target_cwq(unsigned int cpu, - struct workqueue_struct *wq) -{ - if (unlikely(wq->flags & WQ_SINGLE_THREAD)) - cpu = singlethread_cpu; - return get_cwq(cpu, wq); -} - static unsigned int work_color_to_flags(int color) { return color << WORK_STRUCT_COLOR_SHIFT; @@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq, wake_up_process(cwq->worker->task); } +/** + * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing + * @cwq: cwq to unbind + * + * Try to unbind @cwq from single cpu workqueue processing. If + * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed. + * + * CONTEXT: + * spin_lock_irq(gcwq->lock). + */ +static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq) +{ + struct workqueue_struct *wq = cwq->wq; + struct global_cwq *gcwq = cwq->gcwq; + + BUG_ON(wq->single_cpu != gcwq->cpu); + /* + * Unbind from workqueue if @cwq is not frozen. If frozen, + * thaw_workqueues() will either restart processing on this + * cpu or unbind if empty. This keeps works queued while + * frozen fully ordered and flushable. + */ + if (likely(!(gcwq->flags & GCWQ_FREEZING))) { + smp_wmb(); /* paired with cmpxchg() in __queue_work() */ + wq->single_cpu = NR_CPUS; + } +} + static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, struct work_struct *work) { - struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq); - struct global_cwq *gcwq = cwq->gcwq; + struct global_cwq *gcwq; + struct cpu_workqueue_struct *cwq; struct list_head *worklist; unsigned long flags; + bool arbitrate; debug_work_activate(work); - spin_lock_irqsave(&gcwq->lock, flags); + /* determine gcwq to use */ + if (!(wq->flags & WQ_SINGLE_CPU)) { + /* just use the requested cpu for multicpu workqueues */ + gcwq = get_gcwq(cpu); + spin_lock_irqsave(&gcwq->lock, flags); + } else { + unsigned int req_cpu = cpu; + + /* + * It's a bit more complex for single cpu workqueues. + * We first need to determine which cpu is going to be + * used. If no cpu is currently serving this + * workqueue, arbitrate using atomic accesses to + * wq->single_cpu; otherwise, use the current one. + */ + retry: + cpu = wq->single_cpu; + arbitrate = cpu == NR_CPUS; + if (arbitrate) + cpu = req_cpu; + + gcwq = get_gcwq(cpu); + spin_lock_irqsave(&gcwq->lock, flags); + + /* + * The following cmpxchg() is a full barrier paired + * with smp_wmb() in cwq_unbind_single_cpu() and + * guarantees that all changes to wq->st_* fields are + * visible on the new cpu after this point. + */ + if (arbitrate) + cmpxchg(&wq->single_cpu, NR_CPUS, cpu); + + if (unlikely(wq->single_cpu != cpu)) { + spin_unlock_irqrestore(&gcwq->lock, flags); + goto retry; + } + } + + /* gcwq determined, get cwq and queue */ + cwq = get_cwq(gcwq->cpu, wq); + BUG_ON(!list_empty(&work->entry)); cwq->nr_in_flight[cwq->work_color]++; @@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, timer_stats_timer_set_start_info(&dwork->timer); /* This stores cwq for the moment, for the timer_fn */ - set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0); + set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0); timer->expires = jiffies + delay; timer->data = (unsigned long)dwork; timer->function = delayed_work_timer_fn; @@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) cwq->nr_in_flight[color]--; cwq->nr_active--; - /* one down, submit a delayed one */ - if (!list_empty(&cwq->delayed_works) && - cwq->nr_active < cwq->max_active) - cwq_activate_first_delayed(cwq); + if (!list_empty(&cwq->delayed_works)) { + /* one down, submit a delayed one */ + if (cwq->nr_active < cwq->max_active) + cwq_activate_first_delayed(cwq); + } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) { + /* this was the last work, unbind from single cpu */ + cwq_unbind_single_cpu(cwq); + } /* is flush in progress and are we at the flushing tip? */ if (likely(cwq->flush_color != color)) @@ -1727,7 +1792,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name, struct lock_class_key *key, const char *lock_name) { - bool singlethread = flags & WQ_SINGLE_THREAD; struct workqueue_struct *wq; bool failed = false; unsigned int cpu; @@ -1748,6 +1812,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name, atomic_set(&wq->nr_cwqs_to_flush, 0); INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_overflow); + wq->single_cpu = NR_CPUS; + wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); INIT_LIST_HEAD(&wq->list); @@ -1773,8 +1839,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name, if (failed) continue; - cwq->worker = create_worker(cwq, - cpu_online(cpu) && !singlethread); + cwq->worker = create_worker(cwq, cpu_online(cpu)); if (cwq->worker) start_worker(cwq->worker); else @@ -1958,18 +2023,16 @@ static int __cpuinit trustee_thread(void *__gcwq) spin_lock_irq(&gcwq->lock); /* - * Make all multithread workers rogue. Trustee must be bound - * to the target cpu and can't be cancelled. + * Make all workers rogue. Trustee must be bound to the + * target cpu and can't be cancelled. */ BUG_ON(gcwq->cpu != smp_processor_id()); list_for_each_entry(worker, &gcwq->idle_list, entry) - if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) - worker->flags |= WORKER_ROGUE; + worker->flags |= WORKER_ROGUE; for_each_busy_worker(worker, i, pos, gcwq) - if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) - worker->flags |= WORKER_ROGUE; + worker->flags |= WORKER_ROGUE; /* * We're now in charge. Notify and proceed to drain. We need @@ -2074,14 +2137,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, wait_trustee_state(gcwq, TRUSTEE_DONE); } - /* clear ROGUE from all multithread workers */ + /* clear ROGUE from all workers */ list_for_each_entry(worker, &gcwq->idle_list, entry) - if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) - worker->flags &= ~WORKER_ROGUE; + worker->flags &= ~WORKER_ROGUE; for_each_busy_worker(worker, i, pos, gcwq) - if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) - worker->flags &= ~WORKER_ROGUE; + worker->flags &= ~WORKER_ROGUE; break; } @@ -2266,6 +2327,11 @@ void thaw_workqueues(void) cwq->nr_active < cwq->max_active) cwq_activate_first_delayed(cwq); + /* perform delayed unbind from single cpu if empty */ + if (wq->single_cpu == gcwq->cpu && + !cwq->nr_active && list_empty(&cwq->delayed_works)) + cwq_unbind_single_cpu(cwq); + wake_up_process(cwq->worker->task); } @@ -2283,7 +2349,6 @@ void __init init_workqueues(void) unsigned int cpu; int i; - singlethread_cpu = cpumask_first(cpu_possible_mask); hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); /* initialize gcwqs */ |