summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-04-29 19:07:40 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2013-04-29 19:07:40 -0700
commit46d9be3e5eb01f71fc02653755d970247174b400 (patch)
tree01534c9ebfa5f52a7133e34354d2831fe6704f15
parentce8aa48929449b491149b6c87861ac69cb797a42 (diff)
parentcece95dfe5aa56ba99e51b4746230ff0b8542abd (diff)
Merge branch 'for-3.10' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq
Pull workqueue updates from Tejun Heo: "A lot of activities on workqueue side this time. The changes achieve the followings. - WQ_UNBOUND workqueues - the workqueues which are per-cpu - are updated to be able to interface with multiple backend worker pools. This involved a lot of churning but the end result seems actually neater as unbound workqueues are now a lot closer to per-cpu ones. - The ability to interface with multiple backend worker pools are used to implement unbound workqueues with custom attributes. Currently the supported attributes are the nice level and CPU affinity. It may be expanded to include cgroup association in future. The attributes can be specified either by calling apply_workqueue_attrs() or through /sys/bus/workqueue/WQ_NAME/* if the workqueue in question is exported through sysfs. The backend worker pools are keyed by the actual attributes and shared by any workqueues which share the same attributes. When attributes of a workqueue are changed, the workqueue binds to the worker pool with the specified attributes while leaving the work items which are already executing in its previous worker pools alone. This allows converting custom worker pool implementations which want worker attribute tuning to use workqueues. The writeback pool is already converted in block tree and there are a couple others are likely to follow including btrfs io workers. - WQ_UNBOUND's ability to bind to multiple worker pools is also used to make it NUMA-aware. Because there's no association between work item issuer and the specific worker assigned to execute it, before this change, using unbound workqueue led to unnecessary cross-node bouncing and it couldn't be helped by autonuma as it requires tasks to have implicit node affinity and workers are assigned randomly. After these changes, an unbound workqueue now binds to multiple NUMA-affine worker pools so that queued work items are executed in the same node. This is turned on by default but can be disabled system-wide or for individual workqueues. Crypto was requesting NUMA affinity as encrypting data across different nodes can contribute noticeable overhead and doing it per-cpu was too limiting for certain cases and IO throughput could be bottlenecked by one CPU being fully occupied while others have idle cycles. While the new features required a lot of changes including restructuring locking, it didn't complicate the execution paths much. The unbound workqueue handling is now closer to per-cpu ones and the new features are implemented by simply associating a workqueue with different sets of backend worker pools without changing queue, execution or flush paths. As such, even though the amount of change is very high, I feel relatively safe in that it isn't likely to cause subtle issues with basic correctness of work item execution and handling. If something is wrong, it's likely to show up as being associated with worker pools with the wrong attributes or OOPS while workqueue attributes are being changed or during CPU hotplug. While this creates more backend worker pools, it doesn't add too many more workers unless, of course, there are many workqueues with unique combinations of attributes. Assuming everything else is the same, NUMA awareness costs an extra worker pool per NUMA node with online CPUs. There are also a couple things which are being routed outside the workqueue tree. - block tree pulled in workqueue for-3.10 so that writeback worker pool can be converted to unbound workqueue with sysfs control exposed. This simplifies the code, makes writeback workers NUMA-aware and allows tuning nice level and CPU affinity via sysfs. - The conversion to workqueue means that there's no 1:1 association between a specific worker, which makes writeback folks unhappy as they want to be able to tell which filesystem caused a problem from backtrace on systems with many filesystems mounted. This is resolved by allowing work items to set debug info string which is printed when the task is dumped. As this change involves unifying implementations of dump_stack() and friends in arch codes, it's being routed through Andrew's -mm tree." * 'for-3.10' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq: (84 commits) workqueue: use kmem_cache_free() instead of kfree() workqueue: avoid false negative WARN_ON() in destroy_workqueue() workqueue: update sysfs interface to reflect NUMA awareness and a kernel param to disable NUMA affinity workqueue: implement NUMA affinity for unbound workqueues workqueue: introduce put_pwq_unlocked() workqueue: introduce numa_pwq_tbl_install() workqueue: use NUMA-aware allocation for pool_workqueues workqueue: break init_and_link_pwq() into two functions and introduce alloc_unbound_pwq() workqueue: map an unbound workqueues to multiple per-node pool_workqueues workqueue: move hot fields of workqueue_struct to the end workqueue: make workqueue->name[] fixed len workqueue: add workqueue->unbound_attrs workqueue: determine NUMA node of workers accourding to the allowed cpumask workqueue: drop 'H' from kworker names of unbound worker pools workqueue: add wq_numa_tbl_len and wq_numa_possible_cpumask[] workqueue: move pwq_pool_locking outside of get/put_unbound_pool() workqueue: fix memory leak in apply_workqueue_attrs() workqueue: fix unbound workqueue attrs hashing / comparison workqueue: fix race condition in unbound workqueue free path workqueue: remove pwq_lock which is no longer used ...
-rw-r--r--Documentation/kernel-parameters.txt9
-rw-r--r--drivers/base/base.h2
-rw-r--r--drivers/base/bus.c73
-rw-r--r--drivers/base/core.c2
-rw-r--r--include/linux/cpumask.h15
-rw-r--r--include/linux/device.h2
-rw-r--r--include/linux/sched.h2
-rw-r--r--include/linux/workqueue.h166
-rw-r--r--kernel/cgroup.c4
-rw-r--r--kernel/cpuset.c16
-rw-r--r--kernel/kthread.c2
-rw-r--r--kernel/sched/core.c9
-rw-r--r--kernel/workqueue.c2828
-rw-r--r--kernel/workqueue_internal.h9
14 files changed, 2214 insertions, 925 deletions
diff --git a/Documentation/kernel-parameters.txt b/Documentation/kernel-parameters.txt
index 365f7bd40ee..12bbce346d5 100644
--- a/Documentation/kernel-parameters.txt
+++ b/Documentation/kernel-parameters.txt
@@ -3260,6 +3260,15 @@ bytes respectively. Such letter suffixes can also be entirely omitted.
or other driver-specific files in the
Documentation/watchdog/ directory.
+ workqueue.disable_numa
+ By default, all work items queued to unbound
+ workqueues are affine to the NUMA nodes they're
+ issued on, which results in better behavior in
+ general. If NUMA affinity needs to be disabled for
+ whatever reason, this option can be used. Note
+ that this also can be controlled per-workqueue for
+ workqueues visible under /sys/bus/workqueue/.
+
x2apic_phys [X86-64,APIC] Use x2apic physical mode instead of
default x2apic cluster mode on platforms
supporting x2apic.
diff --git a/drivers/base/base.h b/drivers/base/base.h
index 6ee17bb391a..b8bdfe61daa 100644
--- a/drivers/base/base.h
+++ b/drivers/base/base.h
@@ -101,6 +101,8 @@ static inline int hypervisor_init(void) { return 0; }
extern int platform_bus_init(void);
extern void cpu_dev_init(void);
+struct kobject *virtual_device_parent(struct device *dev);
+
extern int bus_add_device(struct device *dev);
extern void bus_probe_device(struct device *dev);
extern void bus_remove_device(struct device *dev);
diff --git a/drivers/base/bus.c b/drivers/base/bus.c
index 8a00dec574d..1a68f947ded 100644
--- a/drivers/base/bus.c
+++ b/drivers/base/bus.c
@@ -1205,26 +1205,10 @@ static void system_root_device_release(struct device *dev)
{
kfree(dev);
}
-/**
- * subsys_system_register - register a subsystem at /sys/devices/system/
- * @subsys: system subsystem
- * @groups: default attributes for the root device
- *
- * All 'system' subsystems have a /sys/devices/system/<name> root device
- * with the name of the subsystem. The root device can carry subsystem-
- * wide attributes. All registered devices are below this single root
- * device and are named after the subsystem with a simple enumeration
- * number appended. The registered devices are not explicitely named;
- * only 'id' in the device needs to be set.
- *
- * Do not use this interface for anything new, it exists for compatibility
- * with bad ideas only. New subsystems should use plain subsystems; and
- * add the subsystem-wide attributes should be added to the subsystem
- * directory itself and not some create fake root-device placed in
- * /sys/devices/system/<name>.
- */
-int subsys_system_register(struct bus_type *subsys,
- const struct attribute_group **groups)
+
+static int subsys_register(struct bus_type *subsys,
+ const struct attribute_group **groups,
+ struct kobject *parent_of_root)
{
struct device *dev;
int err;
@@ -1243,7 +1227,7 @@ int subsys_system_register(struct bus_type *subsys,
if (err < 0)
goto err_name;
- dev->kobj.parent = &system_kset->kobj;
+ dev->kobj.parent = parent_of_root;
dev->groups = groups;
dev->release = system_root_device_release;
@@ -1263,8 +1247,55 @@ err_dev:
bus_unregister(subsys);
return err;
}
+
+/**
+ * subsys_system_register - register a subsystem at /sys/devices/system/
+ * @subsys: system subsystem
+ * @groups: default attributes for the root device
+ *
+ * All 'system' subsystems have a /sys/devices/system/<name> root device
+ * with the name of the subsystem. The root device can carry subsystem-
+ * wide attributes. All registered devices are below this single root
+ * device and are named after the subsystem with a simple enumeration
+ * number appended. The registered devices are not explicitely named;
+ * only 'id' in the device needs to be set.
+ *
+ * Do not use this interface for anything new, it exists for compatibility
+ * with bad ideas only. New subsystems should use plain subsystems; and
+ * add the subsystem-wide attributes should be added to the subsystem
+ * directory itself and not some create fake root-device placed in
+ * /sys/devices/system/<name>.
+ */
+int subsys_system_register(struct bus_type *subsys,
+ const struct attribute_group **groups)
+{
+ return subsys_register(subsys, groups, &system_kset->kobj);
+}
EXPORT_SYMBOL_GPL(subsys_system_register);
+/**
+ * subsys_virtual_register - register a subsystem at /sys/devices/virtual/
+ * @subsys: virtual subsystem
+ * @groups: default attributes for the root device
+ *
+ * All 'virtual' subsystems have a /sys/devices/system/<name> root device
+ * with the name of the subystem. The root device can carry subsystem-wide
+ * attributes. All registered devices are below this single root device.
+ * There's no restriction on device naming. This is for kernel software
+ * constructs which need sysfs interface.
+ */
+int subsys_virtual_register(struct bus_type *subsys,
+ const struct attribute_group **groups)
+{
+ struct kobject *virtual_dir;
+
+ virtual_dir = virtual_device_parent(NULL);
+ if (!virtual_dir)
+ return -ENOMEM;
+
+ return subsys_register(subsys, groups, virtual_dir);
+}
+
int __init buses_init(void)
{
bus_kset = kset_create_and_add("bus", &bus_uevent_ops, NULL);
diff --git a/drivers/base/core.c b/drivers/base/core.c
index f88d9e259a3..01631243757 100644
--- a/drivers/base/core.c
+++ b/drivers/base/core.c
@@ -703,7 +703,7 @@ void device_initialize(struct device *dev)
set_dev_node(dev, -1);
}
-static struct kobject *virtual_device_parent(struct device *dev)
+struct kobject *virtual_device_parent(struct device *dev)
{
static struct kobject *virtual_dir = NULL;
diff --git a/include/linux/cpumask.h b/include/linux/cpumask.h
index 032560295fc..d08e4d2a9b9 100644
--- a/include/linux/cpumask.h
+++ b/include/linux/cpumask.h
@@ -591,6 +591,21 @@ static inline int cpulist_scnprintf(char *buf, int len,
}
/**
+ * cpumask_parse - extract a cpumask from from a string
+ * @buf: the buffer to extract from
+ * @dstp: the cpumask to set.
+ *
+ * Returns -errno, or 0 for success.
+ */
+static inline int cpumask_parse(const char *buf, struct cpumask *dstp)
+{
+ char *nl = strchr(buf, '\n');
+ int len = nl ? nl - buf : strlen(buf);
+
+ return bitmap_parse(buf, len, cpumask_bits(dstp), nr_cpumask_bits);
+}
+
+/**
* cpulist_parse - extract a cpumask from a user string of ranges
* @buf: the buffer to extract from
* @dstp: the cpumask to set.
diff --git a/include/linux/device.h b/include/linux/device.h
index 88615ccaf23..711793b145f 100644
--- a/include/linux/device.h
+++ b/include/linux/device.h
@@ -297,6 +297,8 @@ void subsys_interface_unregister(struct subsys_interface *sif);
int subsys_system_register(struct bus_type *subsys,
const struct attribute_group **groups);
+int subsys_virtual_register(struct bus_type *subsys,
+ const struct attribute_group **groups);
/**
* struct class - device classes
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 2d02c76a01b..bcbc30397f2 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1793,7 +1793,7 @@ extern void thread_group_cputime_adjusted(struct task_struct *p, cputime_t *ut,
#define PF_SWAPWRITE 0x00800000 /* Allowed to write to swap */
#define PF_SPREAD_PAGE 0x01000000 /* Spread page cache over cpuset */
#define PF_SPREAD_SLAB 0x02000000 /* Spread some slab caches over cpuset */
-#define PF_THREAD_BOUND 0x04000000 /* Thread bound to specific cpu */
+#define PF_NO_SETAFFINITY 0x04000000 /* Userland is not allowed to meddle with cpus_allowed */
#define PF_MCE_EARLY 0x08000000 /* Early kill for mce process policy */
#define PF_MEMPOLICY 0x10000000 /* Non-default NUMA mempolicy */
#define PF_MUTEX_TESTER 0x20000000 /* Thread belongs to the rt mutex tester */
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 8afab27cdbc..71797563937 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -11,6 +11,7 @@
#include <linux/lockdep.h>
#include <linux/threads.h>
#include <linux/atomic.h>
+#include <linux/cpumask.h>
struct workqueue_struct;
@@ -68,7 +69,7 @@ enum {
WORK_STRUCT_COLOR_BITS,
/* data contains off-queue information when !WORK_STRUCT_PWQ */
- WORK_OFFQ_FLAG_BASE = WORK_STRUCT_FLAG_BITS,
+ WORK_OFFQ_FLAG_BASE = WORK_STRUCT_COLOR_SHIFT,
WORK_OFFQ_CANCELING = (1 << WORK_OFFQ_FLAG_BASE),
@@ -115,6 +116,20 @@ struct delayed_work {
int cpu;
};
+/*
+ * A struct for workqueue attributes. This can be used to change
+ * attributes of an unbound workqueue.
+ *
+ * Unlike other fields, ->no_numa isn't a property of a worker_pool. It
+ * only modifies how apply_workqueue_attrs() select pools and thus doesn't
+ * participate in pool hash calculations or equality comparisons.
+ */
+struct workqueue_attrs {
+ int nice; /* nice level */
+ cpumask_var_t cpumask; /* allowed CPUs */
+ bool no_numa; /* disable NUMA affinity */
+};
+
static inline struct delayed_work *to_delayed_work(struct work_struct *work)
{
return container_of(work, struct delayed_work, work);
@@ -283,9 +298,10 @@ enum {
WQ_MEM_RECLAIM = 1 << 3, /* may be used for memory reclaim */
WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */
+ WQ_SYSFS = 1 << 6, /* visible in sysfs, see wq_sysfs_register() */
- WQ_DRAINING = 1 << 6, /* internal: workqueue is draining */
- WQ_RESCUER = 1 << 7, /* internal: workqueue has rescuer */
+ __WQ_DRAINING = 1 << 16, /* internal: workqueue is draining */
+ __WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
WQ_MAX_UNBOUND_PER_CPU = 4, /* 4 * #cpus for unbound wq */
@@ -388,7 +404,7 @@ __alloc_workqueue_key(const char *fmt, unsigned int flags, int max_active,
* Pointer to the allocated workqueue on success, %NULL on failure.
*/
#define alloc_ordered_workqueue(fmt, flags, args...) \
- alloc_workqueue(fmt, WQ_UNBOUND | (flags), 1, ##args)
+ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define create_workqueue(name) \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
@@ -399,30 +415,23 @@ __alloc_workqueue_key(const char *fmt, unsigned int flags, int max_active,
extern void destroy_workqueue(struct workqueue_struct *wq);
+struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask);
+void free_workqueue_attrs(struct workqueue_attrs *attrs);
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs);
+
extern bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work);
-extern bool queue_work(struct workqueue_struct *wq, struct work_struct *work);
extern bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *work, unsigned long delay);
-extern bool queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *work, unsigned long delay);
extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay);
-extern bool mod_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay);
extern void flush_workqueue(struct workqueue_struct *wq);
extern void drain_workqueue(struct workqueue_struct *wq);
extern void flush_scheduled_work(void);
-extern bool schedule_work_on(int cpu, struct work_struct *work);
-extern bool schedule_work(struct work_struct *work);
-extern bool schedule_delayed_work_on(int cpu, struct delayed_work *work,
- unsigned long delay);
-extern bool schedule_delayed_work(struct delayed_work *work,
- unsigned long delay);
extern int schedule_on_each_cpu(work_func_t func);
-extern int keventd_up(void);
int execute_in_process_context(work_func_t fn, struct execute_work *);
@@ -435,9 +444,121 @@ extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
extern void workqueue_set_max_active(struct workqueue_struct *wq,
int max_active);
-extern bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq);
+extern bool current_is_workqueue_rescuer(void);
+extern bool workqueue_congested(int cpu, struct workqueue_struct *wq);
extern unsigned int work_busy(struct work_struct *work);
+/**
+ * queue_work - queue work on a workqueue
+ * @wq: workqueue to use
+ * @work: work to queue
+ *
+ * Returns %false if @work was already on a queue, %true otherwise.
+ *
+ * We queue the work to the CPU on which it was submitted, but if the CPU dies
+ * it can be processed by another CPU.
+ */
+static inline bool queue_work(struct workqueue_struct *wq,
+ struct work_struct *work)
+{
+ return queue_work_on(WORK_CPU_UNBOUND, wq, work);
+}
+
+/**
+ * queue_delayed_work - queue work on a workqueue after delay
+ * @wq: workqueue to use
+ * @dwork: delayable work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
+ */
+static inline bool queue_delayed_work(struct workqueue_struct *wq,
+ struct delayed_work *dwork,
+ unsigned long delay)
+{
+ return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
+}
+
+/**
+ * mod_delayed_work - modify delay of or queue a delayed work
+ * @wq: workqueue to use
+ * @dwork: work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * mod_delayed_work_on() on local CPU.
+ */
+static inline bool mod_delayed_work(struct workqueue_struct *wq,
+ struct delayed_work *dwork,
+ unsigned long delay)
+{
+ return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
+}
+
+/**
+ * schedule_work_on - put work task on a specific cpu
+ * @cpu: cpu to put the work task on
+ * @work: job to be done
+ *
+ * This puts a job on a specific cpu
+ */
+static inline bool schedule_work_on(int cpu, struct work_struct *work)
+{
+ return queue_work_on(cpu, system_wq, work);
+}
+
+/**
+ * schedule_work - put work task in global workqueue
+ * @work: job to be done
+ *
+ * Returns %false if @work was already on the kernel-global workqueue and
+ * %true otherwise.
+ *
+ * This puts a job in the kernel-global workqueue if it was not already
+ * queued and leaves it in the same position on the kernel-global
+ * workqueue otherwise.
+ */
+static inline bool schedule_work(struct work_struct *work)
+{
+ return queue_work(system_wq, work);
+}
+
+/**
+ * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
+ * @cpu: cpu to use
+ * @dwork: job to be done
+ * @delay: number of jiffies to wait
+ *
+ * After waiting for a given time this puts a job in the kernel-global
+ * workqueue on the specified CPU.
+ */
+static inline bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
+ unsigned long delay)
+{
+ return queue_delayed_work_on(cpu, system_wq, dwork, delay);
+}
+
+/**
+ * schedule_delayed_work - put work task in global workqueue after delay
+ * @dwork: job to be done
+ * @delay: number of jiffies to wait or 0 for immediate execution
+ *
+ * After waiting for a given time this puts a job in the kernel-global
+ * workqueue.
+ */
+static inline bool schedule_delayed_work(struct delayed_work *dwork,
+ unsigned long delay)
+{
+ return queue_delayed_work(system_wq, dwork, delay);
+}
+
+/**
+ * keventd_up - is workqueue initialized yet?
+ */
+static inline bool keventd_up(void)
+{
+ return system_wq != NULL;
+}
+
/*
* Like above, but uses del_timer() instead of del_timer_sync(). This means,
* if it returns 0 the timer function may be running and the queueing is in
@@ -466,12 +587,12 @@ static inline bool __deprecated flush_delayed_work_sync(struct delayed_work *dwo
}
#ifndef CONFIG_SMP
-static inline long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
+static inline long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{
return fn(arg);
}
#else
-long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg);
+long work_on_cpu(int cpu, long (*fn)(void *), void *arg);
#endif /* CONFIG_SMP */
#ifdef CONFIG_FREEZER
@@ -480,4 +601,11 @@ extern bool freeze_workqueues_busy(void);
extern void thaw_workqueues(void);
#endif /* CONFIG_FREEZER */
+#ifdef CONFIG_SYSFS
+int workqueue_sysfs_register(struct workqueue_struct *wq);
+#else /* CONFIG_SYSFS */
+static inline int workqueue_sysfs_register(struct workqueue_struct *wq)
+{ return 0; }
+#endif /* CONFIG_SYSFS */
+
#endif
diff --git a/kernel/cgroup.c b/kernel/cgroup.c
index dfaf50d4705..1f628bc039f 100644
--- a/kernel/cgroup.c
+++ b/kernel/cgroup.c
@@ -2224,11 +2224,11 @@ retry_find_task:
tsk = tsk->group_leader;
/*
- * Workqueue threads may acquire PF_THREAD_BOUND and become
+ * Workqueue threads may acquire PF_NO_SETAFFINITY and become
* trapped in a cpuset, or RT worker may be born in a cgroup
* with no rt_runtime allocated. Just say no.
*/
- if (tsk == kthreadd_task || (tsk->flags & PF_THREAD_BOUND)) {
+ if (tsk == kthreadd_task || (tsk->flags & PF_NO_SETAFFINITY)) {
ret = -EINVAL;
rcu_read_unlock();
goto out_unlock_cgroup;
diff --git a/kernel/cpuset.c b/kernel/cpuset.c
index 334d983a36b..027a6f65f2a 100644
--- a/kernel/cpuset.c
+++ b/kernel/cpuset.c
@@ -1388,16 +1388,16 @@ static int cpuset_can_attach(struct cgroup *cgrp, struct cgroup_taskset *tset)
cgroup_taskset_for_each(task, cgrp, tset) {
/*
- * Kthreads bound to specific cpus cannot be moved to a new
- * cpuset; we cannot change their cpu affinity and
- * isolating such threads by their set of allowed nodes is
- * unnecessary. Thus, cpusets are not applicable for such
- * threads. This prevents checking for success of
- * set_cpus_allowed_ptr() on all attached tasks before
- * cpus_allowed may be changed.
+ * Kthreads which disallow setaffinity shouldn't be moved
+ * to a new cpuset; we don't want to change their cpu
+ * affinity and isolating such threads by their set of
+ * allowed nodes is unnecessary. Thus, cpusets are not
+ * applicable for such threads. This prevents checking for
+ * success of set_cpus_allowed_ptr() on all attached tasks
+ * before cpus_allowed may be changed.
*/
ret = -EINVAL;
- if (task->flags & PF_THREAD_BOUND)
+ if (task->flags & PF_NO_SETAFFINITY)
goto out_unlock;
ret = security_task_setscheduler(task);
if (ret)
diff --git a/kernel/kthread.c b/kernel/kthread.c
index 9b12d65186f..16d8ddd268b 100644
--- a/kernel/kthread.c
+++ b/kernel/kthread.c
@@ -278,7 +278,7 @@ static void __kthread_bind(struct task_struct *p, unsigned int cpu, long state)
}
/* It's safe because the task is inactive. */
do_set_cpus_allowed(p, cpumask_of(cpu));
- p->flags |= PF_THREAD_BOUND;
+ p->flags |= PF_NO_SETAFFINITY;
}
/**
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 42053547e0f..d8285eb0cde 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -4083,6 +4083,10 @@ long sched_setaffinity(pid_t pid, const struct cpumask *in_mask)
get_task_struct(p);
rcu_read_unlock();
+ if (p->flags & PF_NO_SETAFFINITY) {
+ retval = -EINVAL;
+ goto out_put_task;
+ }
if (!alloc_cpumask_var(&cpus_allowed, GFP_KERNEL)) {
retval = -ENOMEM;
goto out_put_task;
@@ -4730,11 +4734,6 @@ int set_cpus_allowed_ptr(struct task_struct *p, const struct cpumask *new_mask)
goto out;
}
- if (unlikely((p->flags & PF_THREAD_BOUND) && p != current)) {
- ret = -EINVAL;
- goto out;
- }
-
do_set_cpus_allowed(p, new_mask);
/* Can the task run on the task's current CPU? If so, we're done */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index b48cd597145..154aa12af48 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -41,7 +41,11 @@
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
+#include <linux/jhash.h>
#include <linux/hashtable.h>
+#include <linux/rculist.h>
+#include <linux/nodemask.h>
+#include <linux/moduleparam.h>
#include "workqueue_internal.h"
@@ -58,12 +62,11 @@ enum {
* %WORKER_UNBOUND set and concurrency management disabled, and may
* be executing on any CPU. The pool behaves as an unbound one.
*
- * Note that DISASSOCIATED can be flipped only while holding
- * assoc_mutex to avoid changing binding state while
+ * Note that DISASSOCIATED should be flipped only while holding
+ * manager_mutex to avoid changing binding state while
* create_worker() is in progress.
*/
POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
- POOL_MANAGING_WORKERS = 1 << 1, /* managing workers */
POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
POOL_FREEZING = 1 << 3, /* freeze in progress */
@@ -74,12 +77,14 @@ enum {
WORKER_PREP = 1 << 3, /* preparing to run works */
WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
WORKER_UNBOUND = 1 << 7, /* worker is unbound */
+ WORKER_REBOUND = 1 << 8, /* worker was rebound */
- WORKER_NOT_RUNNING = WORKER_PREP | WORKER_UNBOUND |
- WORKER_CPU_INTENSIVE,
+ WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
+ WORKER_UNBOUND | WORKER_REBOUND,
NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
+ UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
@@ -97,6 +102,8 @@ enum {
*/
RESCUER_NICE_LEVEL = -20,
HIGHPRI_NICE_LEVEL = -20,
+
+ WQ_NAME_LEN = 24,
};
/*
@@ -115,16 +122,26 @@ enum {
* cpu or grabbing pool->lock is enough for read access. If
* POOL_DISASSOCIATED is set, it's identical to L.
*
- * F: wq->flush_mutex protected.
+ * MG: pool->manager_mutex and pool->lock protected. Writes require both
+ * locks. Reads can happen under either lock.
+ *
+ * PL: wq_pool_mutex protected.
+ *
+ * PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads.
+ *
+ * WQ: wq->mutex protected.
*
- * W: workqueue_lock protected.
+ * WR: wq->mutex protected for writes. Sched-RCU protected for reads.
+ *
+ * MD: wq_mayday_lock protected.
*/
/* struct worker is defined in workqueue_internal.h */
struct worker_pool {
spinlock_t lock; /* the pool lock */
- unsigned int cpu; /* I: the associated cpu */
+ int cpu; /* I: the associated cpu */
+ int node; /* I: the associated node ID */
int id; /* I: pool ID */
unsigned int flags; /* X: flags */
@@ -138,12 +155,18 @@ struct worker_pool {
struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for workers */
- /* workers are chained either in busy_hash or idle_list */
+ /* a workers is either on busy_hash or idle_list, or the manager */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
- struct mutex assoc_mutex; /* protect POOL_DISASSOCIATED */
- struct ida worker_ida; /* L: for worker IDs */
+ /* see manage_workers() for details on the two manager mutexes */
+ struct mutex manager_arb; /* manager arbitration */
+ struct mutex manager_mutex; /* manager exclusion */
+ struct idr worker_idr; /* MG: worker IDs and iteration */
+
+ struct workqueue_attrs *attrs; /* I: worker attributes */
+ struct hlist_node hash_node; /* PL: unbound_pool_hash node */
+ int refcnt; /* PL: refcnt for unbound pools */
/*
* The current concurrency level. As it's likely to be accessed
@@ -151,6 +174,12 @@ struct worker_pool {
* cacheline.
*/
atomic_t nr_running ____cacheline_aligned_in_smp;
+
+ /*
+ * Destruction of pool is sched-RCU protected to allow dereferences
+ * from get_work_pool().
+ */
+ struct rcu_head rcu;
} ____cacheline_aligned_in_smp;
/*
@@ -164,75 +193,107 @@ struct pool_workqueue {
struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
+ int refcnt; /* L: reference count */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
int nr_active; /* L: nr of active works */
int max_active; /* L: max active works */
struct list_head delayed_works; /* L: delayed works */
-};
+ struct list_head pwqs_node; /* WR: node on wq->pwqs */
+ struct list_head mayday_node; /* MD: node on wq->maydays */
+
+ /*
+ * Release of unbound pwq is punted to system_wq. See put_pwq()
+ * and pwq_unbound_release_workfn() for details. pool_workqueue
+ * itself is also sched-RCU protected so that the first pwq can be
+ * determined without grabbing wq->mutex.
+ */
+ struct work_struct unbound_release_work;
+ struct rcu_head rcu;
+} __aligned(1 << WORK_STRUCT_FLAG_BITS);
/*
* Structure used to wait for workqueue flush.
*/
struct wq_flusher {
- struct list_head list; /* F: list of flushers */
- int flush_color; /* F: flush color waiting for */
+ struct list_head list; /* WQ: list of flushers */
+ int flush_color; /* WQ: flush color waiting for */
struct completion done; /* flush completion */
};
-/*
- * All cpumasks are assumed to be always set on UP and thus can't be
- * used to determine whether there's something to be done.
- */
-#ifdef CONFIG_SMP
-typedef cpumask_var_t mayday_mask_t;
-#define mayday_test_and_set_cpu(cpu, mask) \
- cpumask_test_and_set_cpu((cpu), (mask))
-#define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask))
-#define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask))
-#define alloc_mayday_mask(maskp, gfp) zalloc_cpumask_var((maskp), (gfp))
-#define free_mayday_mask(mask) free_cpumask_var((mask))
-#else
-typedef unsigned long mayday_mask_t;
-#define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask))
-#define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask))
-#define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask))
-#define alloc_mayday_mask(maskp, gfp) true
-#define free_mayday_mask(mask) do { } while (0)
-#endif
+struct wq_device;
/*
- * The externally visible workqueue abstraction is an array of
- * per-CPU workqueues:
+ * The externally visible workqueue. It relays the issued work items to
+ * the appropriate worker_pool through its pool_workqueues.
*/
struct workqueue_struct {
- unsigned int flags; /* W: WQ_* flags */
- union {
- struct pool_workqueue __percpu *pcpu;
- struct pool_workqueue *single;
- unsigned long v;
- } pool_wq; /* I: pwq's */
- struct list_head list; /* W: list of all workqueues */
-
- struct mutex flush_mutex; /* protects wq flushing */
- int work_color; /* F: current work color */
- int flush_color; /* F: current flush color */
+ struct list_head pwqs; /* WR: all pwqs of this wq */
+ struct list_head list; /* PL: list of all workqueues */
+
+ struct mutex mutex; /* protects this wq */
+ int work_color; /* WQ: current work color */
+ int flush_color; /* WQ: current flush color */
atomic_t nr_pwqs_to_flush; /* flush in progress */
- struct wq_flusher *first_flusher; /* F: first flusher */
- struct list_head flusher_queue; /* F: flush waiters */
- struct list_head flusher_overflow; /* F: flush overflow list */
+ struct wq_flusher *first_flusher; /* WQ: first flusher */
+ struct list_head flusher_queue; /* WQ: flush waiters */
+ struct list_head flusher_overflow; /* WQ: flush overflow list */
- mayday_mask_t mayday_mask; /* cpus requesting rescue */
+ struct list_head maydays; /* MD: pwqs requesting rescue */
struct worker *rescuer; /* I: rescue worker */
- int nr_drainers; /* W: drain in progress */
- int saved_max_active; /* W: saved pwq max_active */
+ int nr_drainers; /* WQ: drain in progress */
+ int saved_max_active; /* WQ: saved pwq max_active */
+
+ struct workqueue_attrs *unbound_attrs; /* WQ: only for unbound wqs */
+ struct pool_workqueue *dfl_pwq; /* WQ: only for unbound wqs */
+
+#ifdef CONFIG_SYSFS
+ struct wq_device *wq_dev; /* I: for sysfs interface */
+#endif
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
- char name[]; /* I: workqueue name */
+ char name[WQ_NAME_LEN]; /* I: workqueue name */
+
+ /* hot fields used during command issue, aligned to cacheline */
+ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
+ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
+ struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */
};
+static struct kmem_cache *pwq_cache;
+
+static int wq_numa_tbl_len; /* highest possible NUMA node id + 1 */
+static cpumask_var_t *wq_numa_possible_cpumask;
+ /* possible CPUs of each node */
+
+static bool wq_disable_numa;
+module_param_named(disable_numa, wq_disable_numa, bool, 0444);
+
+static bool wq_numa_enabled; /* unbound NUMA affinity enabled */
+
+/* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */
+static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
+
+static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
+static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
+
+static LIST_HEAD(workqueues); /* PL: list of all workqueues */
+static bool workqueue_freezing; /* PL: have wqs started freezing? */
+
+/* the per-cpu worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+ cpu_worker_pools);
+
+static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
+
+/* PL: hash of all unbound pools keyed by pool->attrs */
+static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
+
+/* I: attributes used when instantiating standard unbound pools on demand */
+static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
+
struct workqueue_struct *system_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
struct workqueue_struct *system_highpri_wq __read_mostly;
@@ -244,64 +305,87 @@ EXPORT_SYMBOL_GPL(system_unbound_wq);
struct workqueue_struct *system_freezable_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_freezable_wq);
+static int worker_thread(void *__worker);
+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from);
+
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
-#define for_each_std_worker_pool(pool, cpu) \
- for ((pool) = &std_worker_pools(cpu)[0]; \
- (pool) < &std_worker_pools(cpu)[NR_STD_WORKER_POOLS]; (pool)++)
+#define assert_rcu_or_pool_mutex() \
+ rcu_lockdep_assert(rcu_read_lock_sched_held() || \
+ lockdep_is_held(&wq_pool_mutex), \
+ "sched RCU or wq_pool_mutex should be held")
-#define for_each_busy_worker(worker, i, pool) \
- hash_for_each(pool->busy_hash, i, worker, hentry)
+#define assert_rcu_or_wq_mutex(wq) \
+ rcu_lockdep_assert(rcu_read_lock_sched_held() || \
+ lockdep_is_held(&wq->mutex), \
+ "sched RCU or wq->mutex should be held")
-static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
- unsigned int sw)
-{
- if (cpu < nr_cpu_ids) {
- if (sw & 1) {
- cpu = cpumask_next(cpu, mask);
- if (cpu < nr_cpu_ids)
- return cpu;
- }
- if (sw & 2)
- return WORK_CPU_UNBOUND;
- }
- return WORK_CPU_END;
-}
+#ifdef CONFIG_LOCKDEP
+#define assert_manager_or_pool_lock(pool) \
+ WARN_ONCE(debug_locks && \
+ !lockdep_is_held(&(pool)->manager_mutex) && \
+ !lockdep_is_held(&(pool)->lock), \
+ "pool->manager_mutex or ->lock should be held")
+#else
+#define assert_manager_or_pool_lock(pool) do { } while (0)
+#endif
-static inline int __next_pwq_cpu(int cpu, const struct cpumask *mask,
- struct workqueue_struct *wq)
-{
- return __next_wq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
-}
+#define for_each_cpu_worker_pool(pool, cpu) \
+ for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
+ (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
+ (pool)++)
-/*
- * CPU iterators
+/**
+ * for_each_pool - iterate through all worker_pools in the system
+ * @pool: iteration cursor
+ * @pi: integer used for iteration
*
- * An extra cpu number is defined using an invalid cpu number
- * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any
- * specific CPU. The following iterators are similar to for_each_*_cpu()
- * iterators but also considers the unbound CPU.
+ * This must be called either with wq_pool_mutex held or sched RCU read
+ * locked. If the pool needs to be used beyond the locking in effect, the
+ * caller is responsible for guaranteeing that the pool stays online.
*
- * for_each_wq_cpu() : possible CPUs + WORK_CPU_UNBOUND
- * for_each_online_wq_cpu() : online CPUs + WORK_CPU_UNBOUND
- * for_each_pwq_cpu() : possible CPUs for bound workqueues,
- * WORK_CPU_UNBOUND for unbound workqueues
+ * The if/else clause exists only for the lockdep assertion and can be
+ * ignored.
*/
-#define for_each_wq_cpu(cpu) \
- for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, 3); \
- (cpu) < WORK_CPU_END; \
- (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, 3))
+#define for_each_pool(pool, pi) \
+ idr_for_each_entry(&worker_pool_idr, pool, pi) \
+ if (({ assert_rcu_or_pool_mutex(); false; })) { } \
+ else
-#define for_each_online_wq_cpu(cpu) \
- for ((cpu) = __next_wq_cpu(-1, cpu_online_mask, 3); \
- (cpu) < WORK_CPU_END; \
- (cpu) = __next_wq_cpu((cpu), cpu_online_mask, 3))
+/**
+ * for_each_pool_worker - iterate through all workers of a worker_pool
+ * @worker: iteration cursor
+ * @wi: integer used for iteration
+ * @pool: worker_pool to iterate workers of
+ *
+ * This must be called with either @pool->manager_mutex or ->lock held.
+ *
+ * The if/else clause exists only for the lockdep assertion and can be
+ * ignored.
+ */
+#define for_each_pool_worker(worker, wi, pool) \
+ idr_for_each_entry(&(pool)->worker_idr, (worker), (wi)) \
+ if (({ assert_manager_or_pool_lock((pool)); false; })) { } \
+ else
-#define for_each_pwq_cpu(cpu, wq) \
- for ((cpu) = __next_pwq_cpu(-1, cpu_possible_mask, (wq)); \
- (cpu) < WORK_CPU_END; \
- (cpu) = __next_pwq_cpu((cpu), cpu_possible_mask, (wq)))
+/**
+ * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
+ * @pwq: iteration cursor
+ * @wq: the target workqueue
+ *
+ * This must be called either with wq->mutex held or sched RCU read locked.
+ * If the pwq needs to be used beyond the locking in effect, the caller is
+ * responsible for guaranteeing that the pwq stays online.
+ *
+ * The if/else clause exists only for the lockdep assertion and can be
+ * ignored.
+ */
+#define for_each_pwq(pwq, wq) \
+ list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node) \
+ if (({ assert_rcu_or_wq_mutex(wq); false; })) { } \
+ else
#ifdef CONFIG_DEBUG_OBJECTS_WORK
@@ -419,77 +503,35 @@ static inline void debug_work_activate(struct work_struct *work) { }
static inline void debug_work_deactivate(struct work_struct *work) { }
#endif
-/* Serializes the accesses to the list of workqueues. */
-static DEFINE_SPINLOCK(workqueue_lock);
-static LIST_HEAD(workqueues);
-static bool workqueue_freezing; /* W: have wqs started freezing? */
-
-/*
- * The CPU and unbound standard worker pools. The unbound ones have
- * POOL_DISASSOCIATED set, and their workers have WORKER_UNBOUND set.
- */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
- cpu_std_worker_pools);
-static struct worker_pool unbound_std_worker_pools[NR_STD_WORKER_POOLS];
-
-/* idr of all pools */
-static DEFINE_MUTEX(worker_pool_idr_mutex);
-static DEFINE_IDR(worker_pool_idr);
-
-static int worker_thread(void *__worker);
-
-static struct worker_pool *std_worker_pools(int cpu)
-{
- if (cpu != WORK_CPU_UNBOUND)
- return per_cpu(cpu_std_worker_pools, cpu);
- else
- return unbound_std_worker_pools;
-}
-
-static int std_worker_pool_pri(struct worker_pool *pool)
-{
- return pool - std_worker_pools(pool->cpu);
-}
-
/* allocate ID and assign it to @pool */
static int worker_pool_assign_id(struct worker_pool *pool)
{
int ret;
- mutex_lock(&worker_pool_idr_mutex);
+ lockdep_assert_held(&wq_pool_mutex);
+
ret = idr_alloc(&worker_pool_idr, pool, 0, 0, GFP_KERNEL);
- if (ret >= 0)
+ if (ret >= 0) {
pool->id = ret;
- mutex_unlock(&worker_pool_idr_mutex);
-
- return ret < 0 ? ret : 0;
+ return 0;
+ }
+ return ret;
}
-/*
- * Lookup worker_pool by id. The idr currently is built during boot and
- * never modified. Don't worry about locking for now.
+/**
+ * unbound_pwq_by_node - return the unbound pool_workqueue for the given node
+ * @wq: the target workqueue
+ * @node: the node ID
+ *
+ * This must be called either with pwq_lock held or sched RCU read locked.
+ * If the pwq needs to be used beyond the locking in effect, the caller is
+ * responsible for guaranteeing that the pwq stays online.
*/
-static struct worker_pool *worker_pool_by_id(int pool_id)
+static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
+ int node)
{
- return idr_find(&worker_pool_idr, pool_id);
-}
-
-static struct worker_pool *get_std_worker_pool(int cpu, bool highpri)
-{
- struct worker_pool *pools = std_worker_pools(cpu);
-
- return &pools[highpri];
-}
-
-static struct pool_workqueue *get_pwq(unsigned int cpu,
- struct workqueue_struct *wq)
-{
- if (!(wq->flags & WQ_UNBOUND)) {
- if (likely(cpu < nr_cpu_ids))
- return per_cpu_ptr(wq->pool_wq.pcpu, cpu);
- } else if (likely(cpu == WORK_CPU_UNBOUND))
- return wq->pool_wq.single;
- return NULL;
+ assert_rcu_or_wq_mutex(wq);
+ return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
}
static unsigned int work_color_to_flags(int color)
@@ -531,7 +573,7 @@ static int work_next_color(int color)
static inline void set_work_data(struct work_struct *work, unsigned long data,
unsigned long flags)
{
- BUG_ON(!work_pending(work));
+ WARN_ON_ONCE(!work_pending(work));
atomic_long_set(&work->data, data | flags | work_static(work));
}
@@ -583,13 +625,23 @@ static struct pool_workqueue *get_work_pwq(struct work_struct *work)
* @work: the work item of interest
*
* Return the worker_pool @work was last associated with. %NULL if none.
+ *
+ * Pools are created and destroyed under wq_pool_mutex, and allows read
+ * access under sched-RCU read lock. As such, this function should be
+ * called under wq_pool_mutex or with preemption disabled.
+ *
+ * All fields of the returned pool are accessible as long as the above
+ * mentioned locking is in effect. If the returned pool needs to be used
+ * beyond the critical section, the caller is responsible for ensuring the
+ * returned pool is and stays online.
*/
static struct worker_pool *get_work_pool(struct work_struct *work)
{
unsigned long data = atomic_long_read(&work->data);
- struct worker_pool *pool;
int pool_id;
+ assert_rcu_or_pool_mutex();
+
if (data & WORK_STRUCT_PWQ)
return ((struct pool_workqueue *)
(data & WORK_STRUCT_WQ_DATA_MASK))->pool;
@@ -598,9 +650,7 @@ static struct worker_pool *get_work_pool(struct work_struct *work)
if (pool_id == WORK_OFFQ_POOL_NONE)
return NULL;
- pool = worker_pool_by_id(pool_id);
- WARN_ON_ONCE(!pool);
- return pool;
+ return idr_find(&worker_pool_idr, pool_id);
}
/**
@@ -689,7 +739,7 @@ static bool need_to_manage_workers(struct worker_pool *pool)
/* Do we have too many workers and should some go away? */
static bool too_many_workers(struct worker_pool *pool)
{
- bool managing = pool->flags & POOL_MANAGING_WORKERS;
+ bool managing = mutex_is_locked(&pool->manager_arb);
int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
int nr_busy = pool->nr_workers - nr_idle;
@@ -744,7 +794,7 @@ static void wake_up_worker(struct worker_pool *pool)
* CONTEXT:
* spin_lock_irq(rq->lock)
*/
-void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
+void wq_worker_waking_up(struct task_struct *task, int cpu)
{
struct worker *worker = kthread_data(task);
@@ -769,8 +819,7 @@ void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
* RETURNS:
* Worker task on @cpu to wake up, %NULL if none.
*/
-struct task_struct *wq_worker_sleeping(struct task_struct *task,
- unsigned int cpu)
+struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
{
struct worker *worker = kthread_data(task), *to_wakeup = NULL;
struct worker_pool *pool;
@@ -786,7 +835,8 @@ struct task_struct *wq_worker_sleeping(struct task_struct *task,
pool = worker->pool;
/* this can only happen on the local cpu */
- BUG_ON(cpu != raw_smp_processor_id());
+ if (WARN_ON_ONCE(cpu != raw_smp_processor_id()))
+ return NULL;
/*
* The counterpart of the following dec_and_test, implied mb,
@@ -891,13 +941,12 @@ static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
* recycled work item as currently executing and make it wait until the
* current execution finishes, introducing an unwanted dependency.
*
- * This function checks the work item address, work function and workqueue
- * to avoid false positives. Note that this isn't complete as one may
- * construct a work function which can introduce dependency onto itself
- * through a recycled work item. Well, if somebody wants to shoot oneself
- * in the foot that badly, there's only so much we can do, and if such
- * deadlock actually occurs, it should be easy to locate the culprit work
- * function.
+ * This function checks the work item address and work function to avoid
+ * false positives. Note that this isn't complete as one may construct a
+ * work function which can introduce dependency onto itself through a
+ * recycled work item. Well, if somebody wants to shoot oneself in the
+ * foot that badly, there's only so much we can do, and if such deadlock
+ * actually occurs, it should be easy to locate the culprit work function.
*
* CONTEXT:
* spin_lock_irq(pool->lock).
@@ -961,6 +1010,64 @@ static void move_linked_works(struct work_struct *work, struct list_head *head,
*nextp = n;
}
+/**
+ * get_pwq - get an extra reference on the specified pool_workqueue
+ * @pwq: pool_workqueue to get
+ *
+ * Obtain an extra reference on @pwq. The caller should guarantee that
+ * @pwq has positive refcnt and be holding the matching pool->lock.
+ */
+static void get_pwq(struct pool_workqueue *pwq)
+{
+ lockdep_assert_held(&pwq->pool->lock);
+ WARN_ON_ONCE(pwq->refcnt <= 0);
+ pwq->refcnt++;
+}
+
+/**
+ * put_pwq - put a pool_workqueue reference
+ * @pwq: pool_workqueue to put
+ *
+ * Drop a reference of @pwq. If its refcnt reaches zero, schedule its
+ * destruction. The caller should be holding the matching pool->lock.
+ */
+static void put_pwq(struct pool_workqueue *pwq)
+{
+ lockdep_assert_held(&pwq->pool->lock);
+ if (likely(--pwq->refcnt))
+ return;
+ if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
+ return;
+ /*
+ * @pwq can't be released under pool->lock, bounce to
+ * pwq_unbound_release_workfn(). This never recurses on the same
+ * pool->lock as this path is taken only for unbound workqueues and
+ * the release work item is scheduled on a per-cpu workqueue. To
+ * avoid lockdep warning, unbound pool->locks are given lockdep
+ * subclass of 1 in get_unbound_pool().
+ */
+ schedule_work(&pwq->unbound_release_work);
+}
+
+/**
+ * put_pwq_unlocked - put_pwq() with surrounding pool lock/unlock
+ * @pwq: pool_workqueue to put (can be %NULL)
+ *
+ * put_pwq() with locking. This function also allows %NULL @pwq.
+ */
+static void put_pwq_unlocked(struct pool_workqueue *pwq)
+{
+ if (pwq) {
+ /*
+ * As both pwqs and pools are sched-RCU protected, the
+ * following lock operations are safe.
+ */
+ spin_lock_irq(&pwq->pool->lock);
+ put_pwq(pwq);
+ spin_unlock_irq(&pwq->pool->lock);
+ }
+}
+
static void pwq_activate_delayed_work(struct work_struct *work)
{
struct pool_workqueue *pwq = get_work_pwq(work);
@@ -992,9 +1099,9 @@ static void pwq_activate_first_delayed(struct pool_workqueue *pwq)
*/
static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
{
- /* ignore uncolored works */
+ /* uncolored work items don't participate in flushing or nr_active */
if (color == WORK_NO_COLOR)
- return;
+ goto out_put;
pwq->nr_in_flight[color]--;
@@ -1007,11 +1114,11 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
/* is flush in progress and are we at the flushing tip? */
if (likely(pwq->flush_color != color))
- return;
+ goto out_put;
/* are there still in-flight works? */
if (pwq->nr_in_flight[color])
- return;
+ goto out_put;
/* this pwq is done, clear flush_color */
pwq->flush_color = -1;
@@ -1022,6 +1129,8 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
*/
if (atomic_dec_and_test(&pwq->wq->nr_pwqs_to_flush))
complete(&pwq->wq->first_flusher->done);
+out_put:
+ put_pwq(pwq);
}
/**
@@ -1144,11 +1253,12 @@ static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
/* we own @work, set data and link */
set_work_pwq(work, pwq, extra_flags);
list_add_tail(&work->entry, head);
+ get_pwq(pwq);
/*
- * Ensure either worker_sched_deactivated() sees the above
- * list_add_tail() or we see zero nr_running to avoid workers
- * lying around lazily while there are works to be processed.
+ * Ensure either wq_worker_sleeping() sees the above
+ * list_add_tail() or we see zero nr_running to avoid workers lying
+ * around lazily while there are works to be processed.
*/
smp_mb();
@@ -1172,10 +1282,11 @@ static bool is_chained_work(struct workqueue_struct *wq)
return worker && worker->current_pwq->wq == wq;
}
-static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
+static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct pool_workqueue *pwq;
+ struct worker_pool *last_pool;
struct list_head *worklist;
unsigned int work_flags;
unsigned int req_cpu = cpu;
@@ -1191,48 +1302,62 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
debug_work_activate(work);
/* if dying, only works from the same workqueue are allowed */
- if (unlikely(wq->flags & WQ_DRAINING) &&
+ if (unlikely(wq->flags & __WQ_DRAINING) &&
WARN_ON_ONCE(!is_chained_work(wq)))
return;
+retry:
+ if (req_cpu == WORK_CPU_UNBOUND)
+ cpu = raw_smp_processor_id();
- /* determine the pwq to use */
- if (!(wq->flags & WQ_UNBOUND)) {
- struct worker_pool *last_pool;
-
- if (cpu == WORK_CPU_UNBOUND)
- cpu = raw_smp_processor_id();
-
- /*
- * It's multi cpu. If @work was previously on a different
- * cpu, it might still be running there, in which case the
- * work needs to be queued on that cpu to guarantee
- * non-reentrancy.
- */
- pwq = get_pwq(cpu, wq);
- last_pool = get_work_pool(work);
+ /* pwq which will be used unless @work is executing elsewhere */
+ if (!(wq->flags & WQ_UNBOUND))
+ pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+ else
+ pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
- if (last_pool && last_pool != pwq->pool) {
- struct worker *worker;
+ /*
+ * If @work was previously on a different pool, it might still be
+ * running there, in which case the work needs to be queued on that
+ * pool to guarantee non-reentrancy.
+ */
+ last_pool = get_work_pool(work);
+ if (last_pool && last_pool != pwq->pool) {
+ struct worker *worker;
- spin_lock(&last_pool->lock);
+ spin_lock(&last_pool->lock);
- worker = find_worker_executing_work(last_pool, work);
+ worker = find_worker_executing_work(last_pool, work);
- if (worker && worker->current_pwq->wq == wq) {
- pwq = get_pwq(last_pool->cpu, wq);
- } else {
- /* meh... not running there, queue here */
- spin_unlock(&last_pool->lock);
- spin_lock(&pwq->pool->lock);
- }
+ if (worker && worker->current_pwq->wq == wq) {
+ pwq = worker->current_pwq;
} else {
+ /* meh... not running there, queue here */
+ spin_unlock(&last_pool->lock);
spin_lock(&pwq->pool->lock);
}
} else {
- pwq = get_pwq(WORK_CPU_UNBOUND, wq);
spin_lock(&pwq->pool->lock);
}
+ /*
+ * pwq is determined and locked. For unbound pools, we could have
+ * raced with pwq release and it could already be dead. If its
+ * refcnt is zero, repeat pwq selection. Note that pwqs never die
+ * without another pwq replacing it in the numa_pwq_tbl or while
+ * work items are executing on it, so the retrying is guaranteed to
+ * make forward-progress.
+ */
+ if (unlikely(!pwq->refcnt)) {
+ if (wq->flags & WQ_UNBOUND) {
+ spin_unlock(&pwq->pool->lock);
+ cpu_relax();
+ goto retry;
+ }
+ /* oops */
+ WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
+ wq->name, cpu);
+ }
+
/* pwq determined, queue */
trace_workqueue_queue_work(req_cpu, pwq, work);
@@ -1287,22 +1412,6 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
}
EXPORT_SYMBOL_GPL(queue_work_on);
-/**
- * queue_work - queue work on a workqueue
- * @wq: workqueue to use
- * @work: work to queue
- *
- * Returns %false if @work was already on a queue, %true otherwise.
- *
- * We queue the work to the CPU on which it was submitted, but if the CPU dies
- * it can be processed by another CPU.
- */
-bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
-{
- return queue_work_on(WORK_CPU_UNBOUND, wq, work);
-}
-EXPORT_SYMBOL_GPL(queue_work);
-
void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
@@ -1378,21 +1487,6 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
/**
- * queue_delayed_work - queue work on a workqueue after delay
- * @wq: workqueue to use
- * @dwork: delayable work to queue
- * @delay: number of jiffies to wait before queueing
- *
- * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
- */
-bool queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
-{
- return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
-}
-EXPORT_SYMBOL_GPL(queue_delayed_work);
-
-/**
* mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
* @cpu: CPU number to execute work on
* @wq: workqueue to use
@@ -1431,21 +1525,6 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
EXPORT_SYMBOL_GPL(mod_delayed_work_on);
/**
- * mod_delayed_work - modify delay of or queue a delayed work
- * @wq: workqueue to use
- * @dwork: work to queue
- * @delay: number of jiffies to wait before queueing
- *
- * mod_delayed_work_on() on local CPU.
- */
-bool mod_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork,
- unsigned long delay)
-{
- return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
-}
-EXPORT_SYMBOL_GPL(mod_delayed_work);
-
-/**
* worker_enter_idle - enter idle state
* @worker: worker which is entering idle state
*
@@ -1459,9 +1538,10 @@ static void worker_enter_idle(struct worker *worker)
{
struct worker_pool *pool = worker->pool;
- BUG_ON(worker->flags & WORKER_IDLE);
- BUG_ON(!list_empty(&worker->entry) &&
- (worker->hentry.next || worker->hentry.pprev));
+ if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
+ WARN_ON_ONCE(!list_empty(&worker->entry) &&
+ (worker->hentry.next || worker->hentry.pprev)))
+ return;
/* can't use worker_set_flags(), also called from start_worker() */
worker->flags |= WORKER_IDLE;
@@ -1498,22 +1578,25 @@ static void worker_leave_idle(struct worker *worker)
{
struct worker_pool *pool = worker->pool;
- BUG_ON(!(worker->flags & WORKER_IDLE));
+ if (WARN_ON_ONCE(!(worker->flags & WORKER_IDLE)))
+ return;
worker_clr_flags(worker, WORKER_IDLE);
pool->nr_idle--;
list_del_init(&worker->entry);
}
/**
- * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock pool
- * @worker: self
+ * worker_maybe_bind_and_lock - try to bind %current to worker_pool and lock it
+ * @pool: target worker_pool
+ *
+ * Bind %current to the cpu of @pool if it is associated and lock @pool.
*
* Works which are scheduled while the cpu is online must at least be
* scheduled to a worker which is bound to the cpu so that if they are
* flushed from cpu callbacks while cpu is going down, they are
* guaranteed to execute on the cpu.
*
- * This function is to be used by rogue workers and rescuers to bind
+ * This function is to be used by unbound workers and rescuers to bind
* themselves to the target cpu and may race with cpu going down or
* coming online. kthread_bind() can't be used because it may put the
* worker to already dead cpu and set_cpus_allowed_ptr() can't be used
@@ -1534,12 +1617,9 @@ static void worker_leave_idle(struct worker *worker)
* %true if the associated pool is online (@worker is successfully
* bound), %false if offline.
*/
-static bool worker_maybe_bind_and_lock(struct worker *worker)
+static bool worker_maybe_bind_and_lock(struct worker_pool *pool)
__acquires(&pool->lock)
{
- struct worker_pool *pool = worker->pool;
- struct task_struct *task = worker->task;
-
while (true) {
/*
* The following call may fail, succeed or succeed
@@ -1548,14 +1628,13 @@ __acquires(&pool->lock)
* against POOL_DISASSOCIATED.
*/
if (!(pool->flags & POOL_DISASSOCIATED))
- set_cpus_allowed_ptr(task, get_cpu_mask(pool->cpu));
+ set_cpus_allowed_ptr(current, pool->attrs->cpumask);
spin_lock_irq(&pool->lock);
if (pool->flags & POOL_DISASSOCIATED)
return false;
- if (task_cpu(task) == pool->cpu &&
- cpumask_equal(&current->cpus_allowed,
- get_cpu_mask(pool->cpu)))
+ if (task_cpu(current) == pool->cpu &&
+ cpumask_equal(&current->cpus_allowed, pool->attrs->cpumask))
return true;
spin_unlock_irq(&pool->lock);
@@ -1570,108 +1649,6 @@ __acquires(&pool->lock)
}
}
-/*
- * Rebind an idle @worker to its CPU. worker_thread() will test
- * list_empty(@worker->entry) before leaving idle and call this function.
- */
-static void idle_worker_rebind(struct worker *worker)
-{
- /* CPU may go down again inbetween, clear UNBOUND only on success */
- if (worker_maybe_bind_and_lock(worker))
- worker_clr_flags(worker, WORKER_UNBOUND);
-
- /* rebind complete, become available again */
- list_add(&worker->entry, &worker->pool->idle_list);
- spin_unlock_irq(&worker->pool->lock);
-}
-
-/*
- * Function for @worker->rebind.work used to rebind unbound busy workers to
- * the associated cpu which is coming back online. This is scheduled by
- * cpu up but can race with other cpu hotplug operations and may be
- * executed twice without intervening cpu down.
- */
-static void busy_worker_rebind_fn(struct work_struct *work)
-{
- struct worker *worker = container_of(work, struct worker, rebind_work);
-
- if (worker_maybe_bind_and_lock(worker))
- worker_clr_flags(worker, WORKER_UNBOUND);
-
- spin_unlock_irq(&worker->pool->lock);
-}
-
-/**
- * rebind_workers - rebind all workers of a pool to the associated CPU
- * @pool: pool of interest
- *
- * @pool->cpu is coming online. Rebind all workers to the CPU. Rebinding
- * is different for idle and busy ones.
- *
- * Idle ones will be removed from the idle_list and woken up. They will
- * add themselves back after completing rebind. This ensures that the
- * idle_list doesn't contain any unbound workers when re-bound busy workers
- * try to perform local wake-ups for concurrency management.
- *
- * Busy workers can rebind after they finish their current work items.
- * Queueing the rebind work item at the head of the scheduled list is
- * enough. Note that nr_running will be properly bumped as busy workers
- * rebind.
- *
- * On return, all non-manager workers are scheduled for rebind - see
- * manage_workers() for the manager special case. Any idle worker
- * including the manager will not appear on @idle_list until rebind is
- * complete, making local wake-ups safe.
- */
-static void rebind_workers(struct worker_pool *pool)
-{
- struct worker *worker, *n;
- int i;
-
- lockdep_assert_held(&pool->assoc_mutex);
- lockdep_assert_held(&pool->lock);
-
- /* dequeue and kick idle ones */
- list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
- /*
- * idle workers should be off @pool->idle_list until rebind
- * is complete to avoid receiving premature local wake-ups.
- */
- list_del_init(&worker->entry);
-
- /*
- * worker_thread() will see the above dequeuing and call
- * idle_worker_rebind().
- */
- wake_up_process(worker->task);
- }
-
- /* rebind busy workers */
- for_each_busy_worker(worker, i, pool) {
- struct work_struct *rebind_work = &worker->rebind_work;
- struct workqueue_struct *wq;
-
- if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
- work_data_bits(rebind_work)))
- continue;
-
- debug_work_activate(rebind_work);
-
- /*
- * wq doesn't really matter but let's keep @worker->pool
- * and @pwq->pool consistent for sanity.
- */
- if (std_worker_pool_pri(worker->pool))
- wq = system_highpri_wq;
- else
- wq = system_wq;
-
- insert_work(get_pwq(pool->cpu, wq), rebind_work,
- worker->scheduled.next,
- work_color_to_flags(WORK_NO_COLOR));
- }
-}
-
static struct worker *alloc_worker(void)
{
struct worker *worker;
@@ -1680,7 +1657,6 @@ static struct worker *alloc_worker(void)
if (worker) {
INIT_LIST_HEAD(&worker->entry);
INIT_LIST_HEAD(&worker->scheduled);
- INIT_WORK(&worker->rebind_work, busy_worker_rebind_fn);
/* on creation a worker is in !idle && prep state */
worker->flags = WORKER_PREP;
}
@@ -1703,18 +1679,25 @@ static struct worker *alloc_worker(void)
*/
static struct worker *create_worker(struct worker_pool *pool)
{
- const char *pri = std_worker_pool_pri(pool) ? "H" : "";
struct worker *worker = NULL;
int id = -1;
+ char id_buf[16];
+
+ lockdep_assert_held(&pool->manager_mutex);
+ /*
+ * ID is needed to determine kthread name. Allocate ID first
+ * without installing the pointer.
+ */
+ idr_preload(GFP_KERNEL);
spin_lock_irq(&pool->lock);
- while (ida_get_new(&pool->worker_ida, &id)) {
- spin_unlock_irq(&pool->lock);
- if (!ida_pre_get(&pool->worker_ida, GFP_KERNEL))
- goto fail;
- spin_lock_irq(&pool->lock);
- }
+
+ id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT);
+
spin_unlock_irq(&pool->lock);
+ idr_preload_end();
+ if (id < 0)
+ goto fail;
worker = alloc_worker();
if (!worker)
@@ -1723,40 +1706,46 @@ static struct worker *create_worker(struct worker_pool *pool)
worker->pool = pool;
worker->id = id;
- if (pool->cpu != WORK_CPU_UNBOUND)
- worker->task = kthread_create_on_node(worker_thread,
- worker, cpu_to_node(pool->cpu),
- "kworker/%u:%d%s", pool->cpu, id, pri);
+ if (pool->cpu >= 0)
+ snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
+ pool->attrs->nice < 0 ? "H" : "");
else
- worker->task = kthread_create(worker_thread, worker,
- "kworker/u:%d%s", id, pri);
+ snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
+
+ worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
+ "kworker/%s", id_buf);
if (IS_ERR(worker->task))
goto fail;
- if (std_worker_pool_pri(pool))
- set_user_nice(worker->task, HIGHPRI_NICE_LEVEL);
+ /*
+ * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
+ * online CPUs. It'll be re-applied when any of the CPUs come up.
+ */
+ set_user_nice(worker->task, pool->attrs->nice);
+ set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
+
+ /* prevent userland from meddling with cpumask of workqueue workers */
+ worker->task->flags |= PF_NO_SETAFFINITY;
/*
- * Determine CPU binding of the new worker depending on
- * %POOL_DISASSOCIATED. The caller is responsible for ensuring the
- * flag remains stable across this function. See the comments
- * above the flag definition for details.
- *
- * As an unbound worker may later become a regular one if CPU comes
- * online, make sure every worker has %PF_THREAD_BOUND set.
+ * The caller is responsible for ensuring %POOL_DISASSOCIATED
+ * remains stable across this function. See the comments above the
+ * flag definition for details.
*/
- if (!(pool->flags & POOL_DISASSOCIATED)) {
- kthread_bind(worker->task, pool->cpu);
- } else {
- worker->task->flags |= PF_THREAD_BOUND;
+ if (pool->flags & POOL_DISASSOCIATED)
worker->flags |= WORKER_UNBOUND;
- }
+
+ /* successful, commit the pointer to idr */
+ spin_lock_irq(&pool->lock);
+ idr_replace(&pool->worker_idr, worker, worker->id);
+ spin_unlock_irq(&pool->lock);
return worker;
+
fail:
if (id >= 0) {
spin_lock_irq(&pool->lock);
- ida_remove(&pool->worker_ida, id);
+ idr_remove(&pool->worker_idr, id);
spin_unlock_irq(&pool->lock);
}
kfree(worker);
@@ -1781,6 +1770,30 @@ static void start_worker(struct worker *worker)
}
/**
+ * create_and_start_worker - create and start a worker for a pool
+ * @pool: the target pool
+ *
+ * Grab the managership of @pool and create and start a new worker for it.
+ */
+static int create_and_start_worker(struct worker_pool *pool)
+{
+ struct worker *worker;
+
+ mutex_lock(&pool->manager_mutex);
+
+ worker = create_worker(pool);
+ if (worker) {
+ spin_lock_irq(&pool->lock);
+ start_worker(worker);
+ spin_unlock_irq(&pool->lock);
+ }
+
+ mutex_unlock(&pool->manager_mutex);
+
+ return worker ? 0 : -ENOMEM;
+}
+
+/**
* destroy_worker - destroy a workqueue worker
* @worker: worker to be destroyed
*
@@ -1792,11 +1805,14 @@ static void start_worker(struct worker *worker)
static void destroy_worker(struct worker *worker)
{
struct worker_pool *pool = worker->pool;
- int id = worker->id;
+
+ lockdep_assert_held(&pool->manager_mutex);
+ lockdep_assert_held(&pool->lock);
/* sanity check frenzy */
- BUG_ON(worker->current_work);
- BUG_ON(!list_empty(&worker->scheduled));
+ if (WARN_ON(worker->current_work) ||
+ WARN_ON(!list_empty(&worker->scheduled)))
+ return;
if (worker->flags & WORKER_STARTED)
pool->nr_workers--;
@@ -1806,13 +1822,14 @@ static void destroy_worker(struct worker *worker)
list_del_init(&worker->entry);
worker->flags |= WORKER_DIE;
+ idr_remove(&pool->worker_idr, worker->id);
+
spin_unlock_irq(&pool->lock);
kthread_stop(worker->task);
kfree(worker);
spin_lock_irq(&pool->lock);
- ida_remove(&pool->worker_ida, id);
}
static void idle_worker_timeout(unsigned long __pool)
@@ -1841,23 +1858,21 @@ static void idle_worker_timeout(unsigned long __pool)
spin_unlock_irq(&pool->lock);
}
-static bool send_mayday(struct work_struct *work)
+static void send_mayday(struct work_struct *work)
{
struct pool_workqueue *pwq = get_work_pwq(work);
struct workqueue_struct *wq = pwq->wq;
- unsigned int cpu;
- if (!(wq->flags & WQ_RESCUER))
- return false;
+ lockdep_assert_held(&wq_mayday_lock);
+
+ if (!wq->rescuer)
+ return;
/* mayday mayday mayday */
- cpu = pwq->pool->cpu;
- /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
- if (cpu == WORK_CPU_UNBOUND)
- cpu = 0;
- if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
+ if (list_empty(&pwq->mayday_node)) {
+ list_add_tail(&pwq->mayday_node, &wq->maydays);
wake_up_process(wq->rescuer->task);
- return true;
+ }
}
static void pool_mayday_timeout(unsigned long __pool)
@@ -1865,7 +1880,8 @@ static void pool_mayday_timeout(unsigned long __pool)
struct worker_pool *pool = (void *)__pool;
struct work_struct *work;
- spin_lock_irq(&pool->lock);
+ spin_lock_irq(&wq_mayday_lock); /* for wq->maydays */
+ spin_lock(&pool->lock);
if (need_to_create_worker(pool)) {
/*
@@ -1878,7 +1894,8 @@ static void pool_mayday_timeout(unsigned long __pool)
send_mayday(work);
}
- spin_unlock_irq(&pool->lock);
+ spin_unlock(&pool->lock);
+ spin_unlock_irq(&wq_mayday_lock);
mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
}
@@ -1893,8 +1910,8 @@ static void pool_mayday_timeout(unsigned long __pool)
* sent to all rescuers with works scheduled on @pool to resolve
* possible allocation deadlock.
*
- * On return, need_to_create_worker() is guaranteed to be false and
- * may_start_working() true.
+ * On return, need_to_create_worker() is guaranteed to be %false and
+ * may_start_working() %true.
*
* LOCKING:
* spin_lock_irq(pool->lock) which may be released and regrabbed
@@ -1902,7 +1919,7 @@ static void pool_mayday_timeout(unsigned long __pool)
* manager.
*
* RETURNS:
- * false if no action was taken and pool->lock stayed locked, true
+ * %false if no action was taken and pool->lock stayed locked, %true
* otherwise.
*/
static bool maybe_create_worker(struct worker_pool *pool)
@@ -1925,7 +1942,8 @@ restart:
del_timer_sync(&pool->mayday_timer);
spin_lock_irq(&pool->lock);
start_worker(worker);
- BUG_ON(need_to_create_worker(pool));
+ if (WARN_ON_ONCE(need_to_create_worker(pool)))
+ goto restart;
return true;
}
@@ -1958,7 +1976,7 @@ restart:
* multiple times. Called only from manager.
*
* RETURNS:
- * false if no action was taken and pool->lock stayed locked, true
+ * %false if no action was taken and pool->lock stayed locked, %true
* otherwise.
*/
static bool maybe_destroy_workers(struct worker_pool *pool)
@@ -2009,42 +2027,37 @@ static bool manage_workers(struct worker *worker)
struct worker_pool *pool = worker->pool;
bool ret = false;
- if (pool->flags & POOL_MANAGING_WORKERS)
+ /*
+ * Managership is governed by two mutexes - manager_arb and
+ * manager_mutex. manager_arb handles arbitration of manager role.
+ * Anyone who successfully grabs manager_arb wins the arbitration
+ * and becomes the manager. mutex_trylock() on pool->manager_arb
+ * failure while holding pool->lock reliably indicates that someone
+ * else is managing the pool and the worker which failed trylock
+ * can proceed to executing work items. This means that anyone
+ * grabbing manager_arb is responsible for actually performing
+ * manager duties. If manager_arb is grabbed and released without
+ * actual management, the pool may stall indefinitely.
+ *
+ * manager_mutex is used for exclusion of actual management
+ * operations. The holder of manager_mutex can be sure that none
+ * of management operations, including creation and destruction of
+ * workers, won't take place until the mutex is released. Because
+ * manager_mutex doesn't interfere with manager role arbitration,
+ * it is guaranteed that the pool's management, while may be
+ * delayed, won't be disturbed by someone else grabbing
+ * manager_mutex.
+ */
+ if (!mutex_trylock(&pool->manager_arb))
return ret;
- pool->flags |= POOL_MANAGING_WORKERS;
-
/*
- * To simplify both worker management and CPU hotplug, hold off
- * management while hotplug is in progress. CPU hotplug path can't
- * grab %POOL_MANAGING_WORKERS to achieve this because that can
- * lead to idle worker depletion (all become busy thinking someone
- * else is managing) which in turn can result in deadlock under
- * extreme circumstances. Use @pool->assoc_mutex to synchronize
- * manager against CPU hotplug.
- *
- * assoc_mutex would always be free unless CPU hotplug is in
- * progress. trylock first without dropping @pool->lock.
+ * With manager arbitration won, manager_mutex would be free in
+ * most cases. trylock first without dropping @pool->lock.
*/
- if (unlikely(!mutex_trylock(&pool->assoc_mutex))) {
+ if (unlikely(!mutex_trylock(&pool->manager_mutex))) {
spin_unlock_irq(&pool->lock);
- mutex_lock(&pool->assoc_mutex);
- /*
- * CPU hotplug could have happened while we were waiting
- * for assoc_mutex. Hotplug itself can't handle us
- * because manager isn't either on idle or busy list, and
- * @pool's state and ours could have deviated.
- *
- * As hotplug is now excluded via assoc_mutex, we can
- * simply try to bind. It will succeed or fail depending
- * on @pool's current state. Try it and adjust
- * %WORKER_UNBOUND accordingly.
- */
- if (worker_maybe_bind_and_lock(worker))
- worker->flags &= ~WORKER_UNBOUND;
- else
- worker->flags |= WORKER_UNBOUND;
-
+ mutex_lock(&pool->manager_mutex);
ret = true;
}
@@ -2057,8 +2070,8 @@ static bool manage_workers(struct worker *worker)
ret |= maybe_destroy_workers(pool);
ret |= maybe_create_worker(pool);
- pool->flags &= ~POOL_MANAGING_WORKERS;
- mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_mutex);
+ mutex_unlock(&pool->manager_arb);
return ret;
}
@@ -2212,11 +2225,11 @@ static void process_scheduled_works(struct worker *worker)
* worker_thread - the worker thread function
* @__worker: self
*
- * The worker thread function. There are NR_CPU_WORKER_POOLS dynamic pools
- * of these per each cpu. These workers process all works regardless of
- * their specific target workqueue. The only exception is works which
- * belong to workqueues with a rescuer which will be explained in
- * rescuer_thread().
+ * The worker thread function. All workers belong to a worker_pool -
+ * either a per-cpu one or dynamic unbound one. These workers process all
+ * work items regardless of their specific target workqueue. The only
+ * exception is work items which belong to workqueues with a rescuer which
+ * will be explained in rescuer_thread().
*/
static int worker_thread(void *__worker)
{
@@ -2228,19 +2241,12 @@ static int worker_thread(void *__worker)
woke_up:
spin_lock_irq(&pool->lock);
- /* we are off idle list if destruction or rebind is requested */
- if (unlikely(list_empty(&worker->entry))) {
+ /* am I supposed to die? */
+ if (unlikely(worker->flags & WORKER_DIE)) {
spin_unlock_irq(&pool->lock);
-
- /* if DIE is set, destruction is requested */
- if (worker->flags & WORKER_DIE) {
- worker->task->flags &= ~PF_WQ_WORKER;
- return 0;
- }
-
- /* otherwise, rebind */
- idle_worker_rebind(worker);
- goto woke_up;
+ WARN_ON_ONCE(!list_empty(&worker->entry));
+ worker->task->flags &= ~PF_WQ_WORKER;
+ return 0;
}
worker_leave_idle(worker);
@@ -2258,14 +2264,16 @@ recheck:
* preparing to process a work or actually processing it.
* Make sure nobody diddled with it while I was sleeping.
*/
- BUG_ON(!list_empty(&worker->scheduled));
+ WARN_ON_ONCE(!list_empty(&worker->scheduled));
/*
- * When control reaches this point, we're guaranteed to have
- * at least one idle worker or that someone else has already
- * assumed the manager role.
+ * Finish PREP stage. We're guaranteed to have at least one idle
+ * worker or that someone else has already assumed the manager
+ * role. This is where @worker starts participating in concurrency
+ * management if applicable and concurrency management is restored
+ * after being rebound. See rebind_workers() for details.
*/
- worker_clr_flags(worker, WORKER_PREP);
+ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do {
struct work_struct *work =
@@ -2307,7 +2315,7 @@ sleep:
* @__rescuer: self
*
* Workqueue rescuer thread function. There's one rescuer for each
- * workqueue which has WQ_RESCUER set.
+ * workqueue which has WQ_MEM_RECLAIM set.
*
* Regular work processing on a pool may block trying to create a new
* worker which uses GFP_KERNEL allocation which has slight chance of
@@ -2326,8 +2334,6 @@ static int rescuer_thread(void *__rescuer)
struct worker *rescuer = __rescuer;
struct workqueue_struct *wq = rescuer->rescue_wq;
struct list_head *scheduled = &rescuer->scheduled;
- bool is_unbound = wq->flags & WQ_UNBOUND;
- unsigned int cpu;
set_user_nice(current, RESCUER_NICE_LEVEL);
@@ -2345,28 +2351,29 @@ repeat:
return 0;
}
- /*
- * See whether any cpu is asking for help. Unbounded
- * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
- */
- for_each_mayday_cpu(cpu, wq->mayday_mask) {
- unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
- struct pool_workqueue *pwq = get_pwq(tcpu, wq);
+ /* see whether any pwq is asking for help */
+ spin_lock_irq(&wq_mayday_lock);
+
+ while (!list_empty(&wq->maydays)) {
+ struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
+ struct pool_workqueue, mayday_node);
struct worker_pool *pool = pwq->pool;
struct work_struct *work, *n;
__set_current_state(TASK_RUNNING);
- mayday_clear_cpu(cpu, wq->mayday_mask);
+ list_del_init(&pwq->mayday_node);
+
+ spin_unlock_irq(&wq_mayday_lock);
/* migrate to the target cpu if possible */
+ worker_maybe_bind_and_lock(pool);
rescuer->pool = pool;
- worker_maybe_bind_and_lock(rescuer);
/*
* Slurp in all works issued via this workqueue and
* process'em.
*/
- BUG_ON(!list_empty(&rescuer->scheduled));
+ WARN_ON_ONCE(!list_empty(&rescuer->scheduled));
list_for_each_entry_safe(work, n, &pool->worklist, entry)
if (get_work_pwq(work) == pwq)
move_linked_works(work, scheduled, &n);
@@ -2381,9 +2388,13 @@ repeat:
if (keep_working(pool))
wake_up_worker(pool);
- spin_unlock_irq(&pool->lock);
+ rescuer->pool = NULL;
+ spin_unlock(&pool->lock);
+ spin_lock(&wq_mayday_lock);
}
+ spin_unlock_irq(&wq_mayday_lock);
+
/* rescuers should never participate in concurrency management */
WARN_ON_ONCE(!(rescuer->flags & WORKER_NOT_RUNNING));
schedule();
@@ -2487,7 +2498,7 @@ static void insert_wq_barrier(struct pool_workqueue *pwq,
* advanced to @work_color.
*
* CONTEXT:
- * mutex_lock(wq->flush_mutex).
+ * mutex_lock(wq->mutex).
*
* RETURNS:
* %true if @flush_color >= 0 and there's something to flush. %false
@@ -2497,21 +2508,20 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
int flush_color, int work_color)
{
bool wait = false;
- unsigned int cpu;
+ struct pool_workqueue *pwq;
if (flush_color >= 0) {
- BUG_ON(atomic_read(&wq->nr_pwqs_to_flush));
+ WARN_ON_ONCE(atomic_read(&wq->nr_pwqs_to_flush));
atomic_set(&wq->nr_pwqs_to_flush, 1);
}
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ for_each_pwq(pwq, wq) {
struct worker_pool *pool = pwq->pool;
spin_lock_irq(&pool->lock);
if (flush_color >= 0) {
- BUG_ON(pwq->flush_color != -1);
+ WARN_ON_ONCE(pwq->flush_color != -1);
if (pwq->nr_in_flight[flush_color]) {
pwq->flush_color = flush_color;
@@ -2521,7 +2531,7 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
}
if (work_color >= 0) {
- BUG_ON(work_color != work_next_color(pwq->work_color));
+ WARN_ON_ONCE(work_color != work_next_color(pwq->work_color));
pwq->work_color = work_color;
}
@@ -2538,11 +2548,8 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
* flush_workqueue - ensure that any scheduled work has run to completion.
* @wq: workqueue to flush
*
- * Forces execution of the workqueue and blocks until its completion.
- * This is typically used in driver shutdown handlers.
- *
- * We sleep until all works which were queued on entry have been handled,
- * but we are not livelocked by new incoming ones.
+ * This function sleeps until all work items which were queued on entry
+ * have finished execution, but it is not livelocked by new incoming ones.
*/
void flush_workqueue(struct workqueue_struct *wq)
{
@@ -2556,7 +2563,7 @@ void flush_workqueue(struct workqueue_struct *wq)
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
- mutex_lock(&wq->flush_mutex);
+ mutex_lock(&wq->mutex);
/*
* Start-to-wait phase
@@ -2569,13 +2576,13 @@ void flush_workqueue(struct workqueue_struct *wq)
* becomes our flush_color and work_color is advanced
* by one.
*/
- BUG_ON(!list_empty(&wq->flusher_overflow));
+ WARN_ON_ONCE(!list_empty(&wq->flusher_overflow));
this_flusher.flush_color = wq->work_color;
wq->work_color = next_color;
if (!wq->first_flusher) {
/* no flush in progress, become the first flusher */
- BUG_ON(wq->flush_color != this_flusher.flush_color);
+ WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
wq->first_flusher = &this_flusher;
@@ -2588,7 +2595,7 @@ void flush_workqueue(struct workqueue_struct *wq)
}
} else {
/* wait in queue */
- BUG_ON(wq->flush_color == this_flusher.flush_color);
+ WARN_ON_ONCE(wq->flush_color == this_flusher.flush_color);
list_add_tail(&this_flusher.list, &wq->flusher_queue);
flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
}
@@ -2601,7 +2608,7 @@ void flush_workqueue(struct workqueue_struct *wq)
list_add_tail(&this_flusher.list, &wq->flusher_overflow);
}
- mutex_unlock(&wq->flush_mutex);
+ mutex_unlock(&wq->mutex);
wait_for_completion(&this_flusher.done);
@@ -2614,7 +2621,7 @@ void flush_workqueue(struct workqueue_struct *wq)
if (wq->first_flusher != &this_flusher)
return;
- mutex_lock(&wq->flush_mutex);
+ mutex_lock(&wq->mutex);
/* we might have raced, check again with mutex held */
if (wq->first_flusher != &this_flusher)
@@ -2622,8 +2629,8 @@ void flush_workqueue(struct workqueue_struct *wq)
wq->first_flusher = NULL;
- BUG_ON(!list_empty(&this_flusher.list));
- BUG_ON(wq->flush_color != this_flusher.flush_color);
+ WARN_ON_ONCE(!list_empty(&this_flusher.list));
+ WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
while (true) {
struct wq_flusher *next, *tmp;
@@ -2636,8 +2643,8 @@ void flush_workqueue(struct workqueue_struct *wq)
complete(&next->done);
}
- BUG_ON(!list_empty(&wq->flusher_overflow) &&
- wq->flush_color != work_next_color(wq->work_color));
+ WARN_ON_ONCE(!list_empty(&wq->flusher_overflow) &&
+ wq->flush_color != work_next_color(wq->work_color));
/* this flush_color is finished, advance by one */
wq->flush_color = work_next_color(wq->flush_color);
@@ -2661,7 +2668,7 @@ void flush_workqueue(struct workqueue_struct *wq)
}
if (list_empty(&wq->flusher_queue)) {
- BUG_ON(wq->flush_color != wq->work_color);
+ WARN_ON_ONCE(wq->flush_color != wq->work_color);
break;
}
@@ -2669,8 +2676,8 @@ void flush_workqueue(struct workqueue_struct *wq)
* Need to flush more colors. Make the next flusher
* the new first flusher and arm pwqs.
*/
- BUG_ON(wq->flush_color == wq->work_color);
- BUG_ON(wq->flush_color != next->flush_color);
+ WARN_ON_ONCE(wq->flush_color == wq->work_color);
+ WARN_ON_ONCE(wq->flush_color != next->flush_color);
list_del_init(&next->list);
wq->first_flusher = next;
@@ -2686,7 +2693,7 @@ void flush_workqueue(struct workqueue_struct *wq)
}
out_unlock:
- mutex_unlock(&wq->flush_mutex);
+ mutex_unlock(&wq->mutex);
}
EXPORT_SYMBOL_GPL(flush_workqueue);
@@ -2704,22 +2711,23 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
void drain_workqueue(struct workqueue_struct *wq)
{
unsigned int flush_cnt = 0;
- unsigned int cpu;
+ struct pool_workqueue *pwq;
/*
* __queue_work() needs to test whether there are drainers, is much
* hotter than drain_workqueue() and already looks at @wq->flags.
- * Use WQ_DRAINING so that queue doesn't have to check nr_drainers.
+ * Use __WQ_DRAINING so that queue doesn't have to check nr_drainers.
*/
- spin_lock(&workqueue_lock);
+ mutex_lock(&wq->mutex);
if (!wq->nr_drainers++)
- wq->flags |= WQ_DRAINING;
- spin_unlock(&workqueue_lock);
+ wq->flags |= __WQ_DRAINING;
+ mutex_unlock(&wq->mutex);
reflush:
flush_workqueue(wq);
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ mutex_lock(&wq->mutex);
+
+ for_each_pwq(pwq, wq) {
bool drained;
spin_lock_irq(&pwq->pool->lock);
@@ -2731,15 +2739,16 @@ reflush:
if (++flush_cnt == 10 ||
(flush_cnt % 100 == 0 && flush_cnt <= 1000))
- pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
+ pr_warn("workqueue %s: drain_workqueue() isn't complete after %u tries\n",
wq->name, flush_cnt);
+
+ mutex_unlock(&wq->mutex);
goto reflush;
}
- spin_lock(&workqueue_lock);
if (!--wq->nr_drainers)
- wq->flags &= ~WQ_DRAINING;
- spin_unlock(&workqueue_lock);
+ wq->flags &= ~__WQ_DRAINING;
+ mutex_unlock(&wq->mutex);
}
EXPORT_SYMBOL_GPL(drain_workqueue);
@@ -2750,11 +2759,15 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
struct pool_workqueue *pwq;
might_sleep();
+
+ local_irq_disable();
pool = get_work_pool(work);
- if (!pool)
+ if (!pool) {
+ local_irq_enable();
return false;
+ }
- spin_lock_irq(&pool->lock);
+ spin_lock(&pool->lock);
/* see the comment in try_to_grab_pending() with the same code */
pwq = get_work_pwq(work);
if (pwq) {
@@ -2776,7 +2789,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
* flusher is not running on the same workqueue by verifying write
* access.
*/
- if (pwq->wq->saved_max_active == 1 || pwq->wq->flags & WQ_RESCUER)
+ if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)
lock_map_acquire(&pwq->wq->lockdep_map);
else
lock_map_acquire_read(&pwq->wq->lockdep_map);
@@ -2933,66 +2946,6 @@ bool cancel_delayed_work_sync(struct delayed_work *dwork)
EXPORT_SYMBOL(cancel_delayed_work_sync);
/**
- * schedule_work_on - put work task on a specific cpu
- * @cpu: cpu to put the work task on
- * @work: job to be done
- *
- * This puts a job on a specific cpu
- */
-bool schedule_work_on(int cpu, struct work_struct *work)
-{
- return queue_work_on(cpu, system_wq, work);
-}
-EXPORT_SYMBOL(schedule_work_on);
-
-/**
- * schedule_work - put work task in global workqueue
- * @work: job to be done
- *
- * Returns %false if @work was already on the kernel-global workqueue and
- * %true otherwise.
- *
- * This puts a job in the kernel-global workqueue if it was not already
- * queued and leaves it in the same position on the kernel-global
- * workqueue otherwise.
- */
-bool schedule_work(struct work_struct *work)
-{
- return queue_work(system_wq, work);
-}
-EXPORT_SYMBOL(schedule_work);
-
-/**
- * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
- * @cpu: cpu to use
- * @dwork: job to be done
- * @delay: number of jiffies to wait
- *
- * After waiting for a given time this puts a job in the kernel-global
- * workqueue on the specified CPU.
- */
-bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
- unsigned long delay)
-{
- return queue_delayed_work_on(cpu, system_wq, dwork, delay);
-}
-EXPORT_SYMBOL(schedule_delayed_work_on);
-
-/**
- * schedule_delayed_work - put work task in global workqueue after delay
- * @dwork: job to be done
- * @delay: number of jiffies to wait or 0 for immediate execution
- *
- * After waiting for a given time this puts a job in the kernel-global
- * workqueue.
- */
-bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
-{
- return queue_delayed_work(system_wq, dwork, delay);
-}
-EXPORT_SYMBOL(schedule_delayed_work);
-
-/**
* schedule_on_each_cpu - execute a function synchronously on each online CPU
* @func: the function to call
*
@@ -3085,51 +3038,1025 @@ int execute_in_process_context(work_func_t fn, struct execute_work *ew)
}
EXPORT_SYMBOL_GPL(execute_in_process_context);
-int keventd_up(void)
+#ifdef CONFIG_SYSFS
+/*
+ * Workqueues with WQ_SYSFS flag set is visible to userland via
+ * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
+ * following attributes.
+ *
+ * per_cpu RO bool : whether the workqueue is per-cpu or unbound
+ * max_active RW int : maximum number of in-flight work items
+ *
+ * Unbound workqueues have the following extra attributes.
+ *
+ * id RO int : the associated pool ID
+ * nice RW int : nice value of the workers
+ * cpumask RW mask : bitmask of allowed CPUs for the workers
+ */
+struct wq_device {
+ struct workqueue_struct *wq;
+ struct device dev;
+};
+
+static struct workqueue_struct *dev_to_wq(struct device *dev)
+{
+ struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
+
+ return wq_dev->wq;
+}
+
+static ssize_t wq_per_cpu_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+
+ return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
+}
+
+static ssize_t wq_max_active_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+
+ return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
+}
+
+static ssize_t wq_max_active_store(struct device *dev,
+ struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int val;
+
+ if (sscanf(buf, "%d", &val) != 1 || val <= 0)
+ return -EINVAL;
+
+ workqueue_set_max_active(wq, val);
+ return count;
+}
+
+static struct device_attribute wq_sysfs_attrs[] = {
+ __ATTR(per_cpu, 0444, wq_per_cpu_show, NULL),
+ __ATTR(max_active, 0644, wq_max_active_show, wq_max_active_store),
+ __ATTR_NULL,
+};
+
+static ssize_t wq_pool_ids_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ const char *delim = "";
+ int node, written = 0;
+
+ rcu_read_lock_sched();
+ for_each_node(node) {
+ written += scnprintf(buf + written, PAGE_SIZE - written,
+ "%s%d:%d", delim, node,
+ unbound_pwq_by_node(wq, node)->pool->id);
+ delim = " ";
+ }
+ written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
+ rcu_read_unlock_sched();
+
+ return written;
+}
+
+static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
+ mutex_unlock(&wq->mutex);
+
+ return written;
+}
+
+/* prepare workqueue_attrs for sysfs store operations */
+static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
+{
+ struct workqueue_attrs *attrs;
+
+ attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!attrs)
+ return NULL;
+
+ mutex_lock(&wq->mutex);
+ copy_workqueue_attrs(attrs, wq->unbound_attrs);
+ mutex_unlock(&wq->mutex);
+ return attrs;
+}
+
+static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ if (sscanf(buf, "%d", &attrs->nice) == 1 &&
+ attrs->nice >= -20 && attrs->nice <= 19)
+ ret = apply_workqueue_attrs(wq, attrs);
+ else
+ ret = -EINVAL;
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
+
+static ssize_t wq_cpumask_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
{
- return system_wq != NULL;
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = cpumask_scnprintf(buf, PAGE_SIZE, wq->unbound_attrs->cpumask);
+ mutex_unlock(&wq->mutex);
+
+ written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
+ return written;
}
-static int alloc_pwqs(struct workqueue_struct *wq)
+static ssize_t wq_cpumask_store(struct device *dev,
+ struct device_attribute *attr,
+ const char *buf, size_t count)
{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ ret = cpumask_parse(buf, attrs->cpumask);
+ if (!ret)
+ ret = apply_workqueue_attrs(wq, attrs);
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
+
+static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%d\n",
+ !wq->unbound_attrs->no_numa);
+ mutex_unlock(&wq->mutex);
+
+ return written;
+}
+
+static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int v, ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ ret = -EINVAL;
+ if (sscanf(buf, "%d", &v) == 1) {
+ attrs->no_numa = !v;
+ ret = apply_workqueue_attrs(wq, attrs);
+ }
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
+
+static struct device_attribute wq_sysfs_unbound_attrs[] = {
+ __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
+ __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
+ __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
+ __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
+ __ATTR_NULL,
+};
+
+static struct bus_type wq_subsys = {
+ .name = "workqueue",
+ .dev_attrs = wq_sysfs_attrs,
+};
+
+static int __init wq_sysfs_init(void)
+{
+ return subsys_virtual_register(&wq_subsys, NULL);
+}
+core_initcall(wq_sysfs_init);
+
+static void wq_device_release(struct device *dev)
+{
+ struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
+
+ kfree(wq_dev);
+}
+
+/**
+ * workqueue_sysfs_register - make a workqueue visible in sysfs
+ * @wq: the workqueue to register
+ *
+ * Expose @wq in sysfs under /sys/bus/workqueue/devices.
+ * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
+ * which is the preferred method.
+ *
+ * Workqueue user should use this function directly iff it wants to apply
+ * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
+ * apply_workqueue_attrs() may race against userland updating the
+ * attributes.
+ *
+ * Returns 0 on success, -errno on failure.
+ */
+int workqueue_sysfs_register(struct workqueue_struct *wq)
+{
+ struct wq_device *wq_dev;
+ int ret;
+
/*
- * pwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
- * Make sure that the alignment isn't lower than that of
- * unsigned long long.
+ * Adjusting max_active or creating new pwqs by applyting
+ * attributes breaks ordering guarantee. Disallow exposing ordered
+ * workqueues.
*/
- const size_t size = sizeof(struct pool_workqueue);
- const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
- __alignof__(unsigned long long));
+ if (WARN_ON(wq->flags & __WQ_ORDERED))
+ return -EINVAL;
- if (!(wq->flags & WQ_UNBOUND))
- wq->pool_wq.pcpu = __alloc_percpu(size, align);
- else {
- void *ptr;
+ wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
+ if (!wq_dev)
+ return -ENOMEM;
+
+ wq_dev->wq = wq;
+ wq_dev->dev.bus = &wq_subsys;
+ wq_dev->dev.init_name = wq->name;
+ wq_dev->dev.release = wq_device_release;
+
+ /*
+ * unbound_attrs are created separately. Suppress uevent until
+ * everything is ready.
+ */
+ dev_set_uevent_suppress(&wq_dev->dev, true);
+
+ ret = device_register(&wq_dev->dev);
+ if (ret) {
+ kfree(wq_dev);
+ wq->wq_dev = NULL;
+ return ret;
+ }
+
+ if (wq->flags & WQ_UNBOUND) {
+ struct device_attribute *attr;
+
+ for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
+ ret = device_create_file(&wq_dev->dev, attr);
+ if (ret) {
+ device_unregister(&wq_dev->dev);
+ wq->wq_dev = NULL;
+ return ret;
+ }
+ }
+ }
+
+ kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
+ return 0;
+}
+
+/**
+ * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
+ * @wq: the workqueue to unregister
+ *
+ * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
+ */
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
+{
+ struct wq_device *wq_dev = wq->wq_dev;
+
+ if (!wq->wq_dev)
+ return;
+
+ wq->wq_dev = NULL;
+ device_unregister(&wq_dev->dev);
+}
+#else /* CONFIG_SYSFS */
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
+#endif /* CONFIG_SYSFS */
+
+/**
+ * free_workqueue_attrs - free a workqueue_attrs
+ * @attrs: workqueue_attrs to free
+ *
+ * Undo alloc_workqueue_attrs().
+ */
+void free_workqueue_attrs(struct workqueue_attrs *attrs)
+{
+ if (attrs) {
+ free_cpumask_var(attrs->cpumask);
+ kfree(attrs);
+ }
+}
+
+/**
+ * alloc_workqueue_attrs - allocate a workqueue_attrs
+ * @gfp_mask: allocation mask to use
+ *
+ * Allocate a new workqueue_attrs, initialize with default settings and
+ * return it. Returns NULL on failure.
+ */
+struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
+{
+ struct workqueue_attrs *attrs;
+
+ attrs = kzalloc(sizeof(*attrs), gfp_mask);
+ if (!attrs)
+ goto fail;
+ if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
+ goto fail;
+
+ cpumask_copy(attrs->cpumask, cpu_possible_mask);
+ return attrs;
+fail:
+ free_workqueue_attrs(attrs);
+ return NULL;
+}
+
+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from)
+{
+ to->nice = from->nice;
+ cpumask_copy(to->cpumask, from->cpumask);
+}
+
+/* hash value of the content of @attr */
+static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
+{
+ u32 hash = 0;
+
+ hash = jhash_1word(attrs->nice, hash);
+ hash = jhash(cpumask_bits(attrs->cpumask),
+ BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
+ return hash;
+}
+
+/* content equality test */
+static bool wqattrs_equal(const struct workqueue_attrs *a,
+ const struct workqueue_attrs *b)
+{
+ if (a->nice != b->nice)
+ return false;
+ if (!cpumask_equal(a->cpumask, b->cpumask))
+ return false;
+ return true;
+}
+
+/**
+ * init_worker_pool - initialize a newly zalloc'd worker_pool
+ * @pool: worker_pool to initialize
+ *
+ * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
+ * Returns 0 on success, -errno on failure. Even on failure, all fields
+ * inside @pool proper are initialized and put_unbound_pool() can be called
+ * on @pool safely to release it.
+ */
+static int init_worker_pool(struct worker_pool *pool)
+{
+ spin_lock_init(&pool->lock);
+ pool->id = -1;
+ pool->cpu = -1;
+ pool->node = NUMA_NO_NODE;
+ pool->flags |= POOL_DISASSOCIATED;
+ INIT_LIST_HEAD(&pool->worklist);
+ INIT_LIST_HEAD(&pool->idle_list);
+ hash_init(pool->busy_hash);
+
+ init_timer_deferrable(&pool->idle_timer);
+ pool->idle_timer.function = idle_worker_timeout;
+ pool->idle_timer.data = (unsigned long)pool;
+
+ setup_timer(&pool->mayday_timer, pool_mayday_timeout,
+ (unsigned long)pool);
+
+ mutex_init(&pool->manager_arb);
+ mutex_init(&pool->manager_mutex);
+ idr_init(&pool->worker_idr);
+
+ INIT_HLIST_NODE(&pool->hash_node);
+ pool->refcnt = 1;
+
+ /* shouldn't fail above this point */
+ pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!pool->attrs)
+ return -ENOMEM;
+ return 0;
+}
+
+static void rcu_free_pool(struct rcu_head *rcu)
+{
+ struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
+
+ idr_destroy(&pool->worker_idr);
+ free_workqueue_attrs(pool->attrs);
+ kfree(pool);
+}
+
+/**
+ * put_unbound_pool - put a worker_pool
+ * @pool: worker_pool to put
+ *
+ * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
+ * safe manner. get_unbound_pool() calls this function on its failure path
+ * and this function should be able to release pools which went through,
+ * successfully or not, init_worker_pool().
+ *
+ * Should be called with wq_pool_mutex held.
+ */
+static void put_unbound_pool(struct worker_pool *pool)
+{
+ struct worker *worker;
+
+ lockdep_assert_held(&wq_pool_mutex);
+
+ if (--pool->refcnt)
+ return;
+
+ /* sanity checks */
+ if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED)) ||
+ WARN_ON(!list_empty(&pool->worklist)))
+ return;
+
+ /* release id and unhash */
+ if (pool->id >= 0)
+ idr_remove(&worker_pool_idr, pool->id);
+ hash_del(&pool->hash_node);
+
+ /*
+ * Become the manager and destroy all workers. Grabbing
+ * manager_arb prevents @pool's workers from blocking on
+ * manager_mutex.
+ */
+ mutex_lock(&pool->manager_arb);
+ mutex_lock(&pool->manager_mutex);
+ spin_lock_irq(&pool->lock);
+
+ while ((worker = first_worker(pool)))
+ destroy_worker(worker);
+ WARN_ON(pool->nr_workers || pool->nr_idle);
+
+ spin_unlock_irq(&pool->lock);
+ mutex_unlock(&pool->manager_mutex);
+ mutex_unlock(&pool->manager_arb);
+
+ /* shut down the timers */
+ del_timer_sync(&pool->idle_timer);
+ del_timer_sync(&pool->mayday_timer);
+
+ /* sched-RCU protected to allow dereferences from get_work_pool() */
+ call_rcu_sched(&pool->rcu, rcu_free_pool);
+}
+
+/**
+ * get_unbound_pool - get a worker_pool with the specified attributes
+ * @attrs: the attributes of the worker_pool to get
+ *
+ * Obtain a worker_pool which has the same attributes as @attrs, bump the
+ * reference count and return it. If there already is a matching
+ * worker_pool, it will be used; otherwise, this function attempts to
+ * create a new one. On failure, returns NULL.
+ *
+ * Should be called with wq_pool_mutex held.
+ */
+static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
+{
+ u32 hash = wqattrs_hash(attrs);
+ struct worker_pool *pool;
+ int node;
+
+ lockdep_assert_held(&wq_pool_mutex);
+
+ /* do we already have a matching pool? */
+ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
+ if (wqattrs_equal(pool->attrs, attrs)) {
+ pool->refcnt++;
+ goto out_unlock;
+ }
+ }
+
+ /* nope, create a new one */
+ pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+ if (!pool || init_worker_pool(pool) < 0)
+ goto fail;
+
+ if (workqueue_freezing)
+ pool->flags |= POOL_FREEZING;
+
+ lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
+ copy_workqueue_attrs(pool->attrs, attrs);
+
+ /* if cpumask is contained inside a NUMA node, we belong to that node */
+ if (wq_numa_enabled) {
+ for_each_node(node) {
+ if (cpumask_subset(pool->attrs->cpumask,
+ wq_numa_possible_cpumask[node])) {
+ pool->node = node;
+ break;
+ }
+ }
+ }
+
+ if (worker_pool_assign_id(pool) < 0)
+ goto fail;
+
+ /* create and start the initial worker */
+ if (create_and_start_worker(pool) < 0)
+ goto fail;
+
+ /* install */
+ hash_add(unbound_pool_hash, &pool->hash_node, hash);
+out_unlock:
+ return pool;
+fail:
+ if (pool)
+ put_unbound_pool(pool);
+ return NULL;
+}
+
+static void rcu_free_pwq(struct rcu_head *rcu)
+{
+ kmem_cache_free(pwq_cache,
+ container_of(rcu, struct pool_workqueue, rcu));
+}
+
+/*
+ * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
+ * and needs to be destroyed.
+ */
+static void pwq_unbound_release_workfn(struct work_struct *work)
+{
+ struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
+ unbound_release_work);
+ struct workqueue_struct *wq = pwq->wq;
+ struct worker_pool *pool = pwq->pool;
+ bool is_last;
+
+ if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
+ return;
+
+ /*
+ * Unlink @pwq. Synchronization against wq->mutex isn't strictly
+ * necessary on release but do it anyway. It's easier to verify
+ * and consistent with the linking path.
+ */
+ mutex_lock(&wq->mutex);
+ list_del_rcu(&pwq->pwqs_node);
+ is_last = list_empty(&wq->pwqs);
+ mutex_unlock(&wq->mutex);
+
+ mutex_lock(&wq_pool_mutex);
+ put_unbound_pool(pool);
+ mutex_unlock(&wq_pool_mutex);
+
+ call_rcu_sched(&pwq->rcu, rcu_free_pwq);
+
+ /*
+ * If we're the last pwq going away, @wq is already dead and no one
+ * is gonna access it anymore. Free it.
+ */
+ if (is_last) {
+ free_workqueue_attrs(wq->unbound_attrs);
+ kfree(wq);
+ }
+}
+
+/**
+ * pwq_adjust_max_active - update a pwq's max_active to the current setting
+ * @pwq: target pool_workqueue
+ *
+ * If @pwq isn't freezing, set @pwq->max_active to the associated
+ * workqueue's saved_max_active and activate delayed work items
+ * accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
+ */
+static void pwq_adjust_max_active(struct pool_workqueue *pwq)
+{
+ struct workqueue_struct *wq = pwq->wq;
+ bool freezable = wq->flags & WQ_FREEZABLE;
+
+ /* for @wq->saved_max_active */
+ lockdep_assert_held(&wq->mutex);
+
+ /* fast exit for non-freezable wqs */
+ if (!freezable && pwq->max_active == wq->saved_max_active)
+ return;
+
+ spin_lock_irq(&pwq->pool->lock);
+
+ if (!freezable || !(pwq->pool->flags & POOL_FREEZING)) {
+ pwq->max_active = wq->saved_max_active;
+
+ while (!list_empty(&pwq->delayed_works) &&
+ pwq->nr_active < pwq->max_active)
+ pwq_activate_first_delayed(pwq);
/*
- * Allocate enough room to align pwq and put an extra
- * pointer at the end pointing back to the originally
- * allocated pointer which will be used for free.
+ * Need to kick a worker after thawed or an unbound wq's
+ * max_active is bumped. It's a slow path. Do it always.
*/
- ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
- if (ptr) {
- wq->pool_wq.single = PTR_ALIGN(ptr, align);
- *(void **)(wq->pool_wq.single + 1) = ptr;
+ wake_up_worker(pwq->pool);
+ } else {
+ pwq->max_active = 0;
+ }
+
+ spin_unlock_irq(&pwq->pool->lock);
+}
+
+/* initialize newly alloced @pwq which is associated with @wq and @pool */
+static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
+ struct worker_pool *pool)
+{
+ BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
+
+ memset(pwq, 0, sizeof(*pwq));
+
+ pwq->pool = pool;
+ pwq->wq = wq;
+ pwq->flush_color = -1;
+ pwq->refcnt = 1;
+ INIT_LIST_HEAD(&pwq->delayed_works);
+ INIT_LIST_HEAD(&pwq->pwqs_node);
+ INIT_LIST_HEAD(&pwq->mayday_node);
+ INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
+}
+
+/* sync @pwq with the current state of its associated wq and link it */
+static void link_pwq(struct pool_workqueue *pwq)
+{
+ struct workqueue_struct *wq = pwq->wq;
+
+ lockdep_assert_held(&wq->mutex);
+
+ /* may be called multiple times, ignore if already linked */
+ if (!list_empty(&pwq->pwqs_node))
+ return;
+
+ /*
+ * Set the matching work_color. This is synchronized with
+ * wq->mutex to avoid confusing flush_workqueue().
+ */
+ pwq->work_color = wq->work_color;
+
+ /* sync max_active to the current setting */
+ pwq_adjust_max_active(pwq);
+
+ /* link in @pwq */
+ list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
+}
+
+/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
+static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
+{
+ struct worker_pool *pool;
+ struct pool_workqueue *pwq;
+
+ lockdep_assert_held(&wq_pool_mutex);
+
+ pool = get_unbound_pool(attrs);
+ if (!pool)
+ return NULL;
+
+ pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+ if (!pwq) {
+ put_unbound_pool(pool);
+ return NULL;
+ }
+
+ init_pwq(pwq, wq, pool);
+ return pwq;
+}
+
+/* undo alloc_unbound_pwq(), used only in the error path */
+static void free_unbound_pwq(struct pool_workqueue *pwq)
+{
+ lockdep_assert_held(&wq_pool_mutex);
+
+ if (pwq) {
+ put_unbound_pool(pwq->pool);
+ kmem_cache_free(pwq_cache, pwq);
+ }
+}
+
+/**
+ * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
+ * @attrs: the wq_attrs of interest
+ * @node: the target NUMA node
+ * @cpu_going_down: if >= 0, the CPU to consider as offline
+ * @cpumask: outarg, the resulting cpumask
+ *
+ * Calculate the cpumask a workqueue with @attrs should use on @node. If
+ * @cpu_going_down is >= 0, that cpu is considered offline during
+ * calculation. The result is stored in @cpumask. This function returns
+ * %true if the resulting @cpumask is different from @attrs->cpumask,
+ * %false if equal.
+ *
+ * If NUMA affinity is not enabled, @attrs->cpumask is always used. If
+ * enabled and @node has online CPUs requested by @attrs, the returned
+ * cpumask is the intersection of the possible CPUs of @node and
+ * @attrs->cpumask.
+ *
+ * The caller is responsible for ensuring that the cpumask of @node stays
+ * stable.
+ */
+static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
+ int cpu_going_down, cpumask_t *cpumask)
+{
+ if (!wq_numa_enabled || attrs->no_numa)
+ goto use_dfl;
+
+ /* does @node have any online CPUs @attrs wants? */
+ cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
+ if (cpu_going_down >= 0)
+ cpumask_clear_cpu(cpu_going_down, cpumask);
+
+ if (cpumask_empty(cpumask))
+ goto use_dfl;
+
+ /* yeap, return possible CPUs in @node that @attrs wants */
+ cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
+ return !cpumask_equal(cpumask, attrs->cpumask);
+
+use_dfl:
+ cpumask_copy(cpumask, attrs->cpumask);
+ return false;
+}
+
+/* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */
+static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
+ int node,
+ struct pool_workqueue *pwq)
+{
+ struct pool_workqueue *old_pwq;
+
+ lockdep_assert_held(&wq->mutex);
+
+ /* link_pwq() can handle duplicate calls */
+ link_pwq(pwq);
+
+ old_pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
+ rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq);
+ return old_pwq;
+}
+
+/**
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
+ *
+ * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on. Older pwqs are released as in-flight work
+ * items finish. Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
+ *
+ * Performs GFP_KERNEL allocations. Returns 0 on success and -errno on
+ * failure.
+ */
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
+{
+ struct workqueue_attrs *new_attrs, *tmp_attrs;
+ struct pool_workqueue **pwq_tbl, *dfl_pwq;
+ int node, ret;
+
+ /* only unbound workqueues can change attributes */
+ if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+ return -EINVAL;
+
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+ return -EINVAL;
+
+ pwq_tbl = kzalloc(wq_numa_tbl_len * sizeof(pwq_tbl[0]), GFP_KERNEL);
+ new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!pwq_tbl || !new_attrs || !tmp_attrs)
+ goto enomem;
+
+ /* make a copy of @attrs and sanitize it */
+ copy_workqueue_attrs(new_attrs, attrs);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
+
+ /*
+ * We may create multiple pwqs with differing cpumasks. Make a
+ * copy of @new_attrs which will be modified and used to obtain
+ * pools.
+ */
+ copy_workqueue_attrs(tmp_attrs, new_attrs);
+
+ /*
+ * CPUs should stay stable across pwq creations and installations.
+ * Pin CPUs, determine the target cpumask for each node and create
+ * pwqs accordingly.
+ */
+ get_online_cpus();
+
+ mutex_lock(&wq_pool_mutex);
+
+ /*
+ * If something goes wrong during CPU up/down, we'll fall back to
+ * the default pwq covering whole @attrs->cpumask. Always create
+ * it even if we don't use it immediately.
+ */
+ dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (!dfl_pwq)
+ goto enomem_pwq;
+
+ for_each_node(node) {
+ if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
+ pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+ if (!pwq_tbl[node])
+ goto enomem_pwq;
+ } else {
+ dfl_pwq->refcnt++;
+ pwq_tbl[node] = dfl_pwq;
}
}
- /* just in case, make sure it's actually aligned */
- BUG_ON(!IS_ALIGNED(wq->pool_wq.v, align));
- return wq->pool_wq.v ? 0 : -ENOMEM;
+ mutex_unlock(&wq_pool_mutex);
+
+ /* all pwqs have been created successfully, let's install'em */
+ mutex_lock(&wq->mutex);
+
+ copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+
+ /* save the previous pwq and install the new one */
+ for_each_node(node)
+ pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+
+ /* @dfl_pwq might not have been used, ensure it's linked */
+ link_pwq(dfl_pwq);
+ swap(wq->dfl_pwq, dfl_pwq);
+
+ mutex_unlock(&wq->mutex);
+
+ /* put the old pwqs */
+ for_each_node(node)
+ put_pwq_unlocked(pwq_tbl[node]);
+ put_pwq_unlocked(dfl_pwq);
+
+ put_online_cpus();
+ ret = 0;
+ /* fall through */
+out_free:
+ free_workqueue_attrs(tmp_attrs);
+ free_workqueue_attrs(new_attrs);
+ kfree(pwq_tbl);
+ return ret;
+
+enomem_pwq:
+ free_unbound_pwq(dfl_pwq);
+ for_each_node(node)
+ if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
+ free_unbound_pwq(pwq_tbl[node]);
+ mutex_unlock(&wq_pool_mutex);
+ put_online_cpus();
+enomem:
+ ret = -ENOMEM;
+ goto out_free;
}
-static void free_pwqs(struct workqueue_struct *wq)
+/**
+ * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
+ * @wq: the target workqueue
+ * @cpu: the CPU coming up or going down
+ * @online: whether @cpu is coming up or going down
+ *
+ * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
+ * %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update NUMA affinity of
+ * @wq accordingly.
+ *
+ * If NUMA affinity can't be adjusted due to memory allocation failure, it
+ * falls back to @wq->dfl_pwq which may not be optimal but is always
+ * correct.
+ *
+ * Note that when the last allowed CPU of a NUMA node goes offline for a
+ * workqueue with a cpumask spanning multiple nodes, the workers which were
+ * already executing the work items for the workqueue will lose their CPU
+ * affinity and may execute on any CPU. This is similar to how per-cpu
+ * workqueues behave on CPU_DOWN. If a workqueue user wants strict
+ * affinity, it's the user's responsibility to flush the work item from
+ * CPU_DOWN_PREPARE.
+ */
+static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
+ bool online)
{
- if (!(wq->flags & WQ_UNBOUND))
- free_percpu(wq->pool_wq.pcpu);
- else if (wq->pool_wq.single) {
- /* the pointer to free is stored right after the pwq */
- kfree(*(void **)(wq->pool_wq.single + 1));
+ int node = cpu_to_node(cpu);
+ int cpu_off = online ? -1 : cpu;
+ struct pool_workqueue *old_pwq = NULL, *pwq;
+ struct workqueue_attrs *target_attrs;
+ cpumask_t *cpumask;
+
+ lockdep_assert_held(&wq_pool_mutex);
+
+ if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND))
+ return;
+
+ /*
+ * We don't wanna alloc/free wq_attrs for each wq for each CPU.
+ * Let's use a preallocated one. The following buf is protected by
+ * CPU hotplug exclusion.
+ */
+ target_attrs = wq_update_unbound_numa_attrs_buf;
+ cpumask = target_attrs->cpumask;
+
+ mutex_lock(&wq->mutex);
+ if (wq->unbound_attrs->no_numa)
+ goto out_unlock;
+
+ copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
+ pwq = unbound_pwq_by_node(wq, node);
+
+ /*
+ * Let's determine what needs to be done. If the target cpumask is
+ * different from wq's, we need to compare it to @pwq's and create
+ * a new one if they don't match. If the target cpumask equals
+ * wq's, the default pwq should be used. If @pwq is already the
+ * default one, nothing to do; otherwise, install the default one.
+ */
+ if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
+ if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
+ goto out_unlock;
+ } else {
+ if (pwq == wq->dfl_pwq)
+ goto out_unlock;
+ else
+ goto use_dfl_pwq;
+ }
+
+ mutex_unlock(&wq->mutex);
+
+ /* create a new pwq */
+ pwq = alloc_unbound_pwq(wq, target_attrs);
+ if (!pwq) {
+ pr_warning("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
+ wq->name);
+ goto out_unlock;
+ }
+
+ /*
+ * Install the new pwq. As this function is called only from CPU
+ * hotplug callbacks and applying a new attrs is wrapped with
+ * get/put_online_cpus(), @wq->unbound_attrs couldn't have changed
+ * inbetween.
+ */
+ mutex_lock(&wq->mutex);
+ old_pwq = numa_pwq_tbl_install(wq, node, pwq);
+ goto out_unlock;
+
+use_dfl_pwq:
+ spin_lock_irq(&wq->dfl_pwq->pool->lock);
+ get_pwq(wq->dfl_pwq);
+ spin_unlock_irq(&wq->dfl_pwq->pool->lock);
+ old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq);
+out_unlock:
+ mutex_unlock(&wq->mutex);
+ put_pwq_unlocked(old_pwq);
+}
+
+static int alloc_and_link_pwqs(struct workqueue_struct *wq)
+{
+ bool highpri = wq->flags & WQ_HIGHPRI;
+ int cpu;
+
+ if (!(wq->flags & WQ_UNBOUND)) {
+ wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
+ if (!wq->cpu_pwqs)
+ return -ENOMEM;
+
+ for_each_possible_cpu(cpu) {
+ struct pool_workqueue *pwq =
+ per_cpu_ptr(wq->cpu_pwqs, cpu);
+ struct worker_pool *cpu_pools =
+ per_cpu(cpu_worker_pools, cpu);
+
+ init_pwq(pwq, wq, &cpu_pools[highpri]);
+
+ mutex_lock(&wq->mutex);
+ link_pwq(pwq);
+ mutex_unlock(&wq->mutex);
+ }
+ return 0;
+ } else {
+ return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
}
}
@@ -3151,30 +4078,28 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
struct lock_class_key *key,
const char *lock_name, ...)
{
- va_list args, args1;
+ size_t tbl_size = 0;
+ va_list args;
struct workqueue_struct *wq;
- unsigned int cpu;
- size_t namelen;
+ struct pool_workqueue *pwq;
- /* determine namelen, allocate wq and format name */
- va_start(args, lock_name);
- va_copy(args1, args);
- namelen = vsnprintf(NULL, 0, fmt, args) + 1;
+ /* allocate wq and format name */
+ if (flags & WQ_UNBOUND)
+ tbl_size = wq_numa_tbl_len * sizeof(wq->numa_pwq_tbl[0]);
- wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL);
+ wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
if (!wq)
- goto err;
+ return NULL;
- vsnprintf(wq->name, namelen, fmt, args1);
- va_end(args);
- va_end(args1);
+ if (flags & WQ_UNBOUND) {
+ wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!wq->unbound_attrs)
+ goto err_free_wq;
+ }
- /*
- * Workqueues which may be used during memory reclaim should
- * have a rescuer to guarantee forward progress.
- */
- if (flags & WQ_MEM_RECLAIM)
- flags |= WQ_RESCUER;
+ va_start(args, lock_name);
+ vsnprintf(wq->name, sizeof(wq->name), fmt, args);
+ va_end(args);
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
@@ -3182,71 +4107,70 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
/* init wq */
wq->flags = flags;
wq->saved_max_active = max_active;
- mutex_init(&wq->flush_mutex);
+ mutex_init(&wq->mutex);
atomic_set(&wq->nr_pwqs_to_flush, 0);
+ INIT_LIST_HEAD(&wq->pwqs);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
+ INIT_LIST_HEAD(&wq->maydays);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
- if (alloc_pwqs(wq) < 0)
- goto err;
-
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ if (alloc_and_link_pwqs(wq) < 0)
+ goto err_free_wq;
- BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
- pwq->pool = get_std_worker_pool(cpu, flags & WQ_HIGHPRI);
- pwq->wq = wq;
- pwq->flush_color = -1;
- pwq->max_active = max_active;
- INIT_LIST_HEAD(&pwq->delayed_works);
- }
-
- if (flags & WQ_RESCUER) {
+ /*
+ * Workqueues which may be used during memory reclaim should
+ * have a rescuer to guarantee forward progress.
+ */
+ if (flags & WQ_MEM_RECLAIM) {
struct worker *rescuer;
- if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
- goto err;
-
- wq->rescuer = rescuer = alloc_worker();
+ rescuer = alloc_worker();
if (!rescuer)
- goto err;
+ goto err_destroy;
rescuer->rescue_wq = wq;
rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
wq->name);
- if (IS_ERR(rescuer->task))
- goto err;
+ if (IS_ERR(rescuer->task)) {
+ kfree(rescuer);
+ goto err_destroy;
+ }
- rescuer->task->flags |= PF_THREAD_BOUND;
+ wq->rescuer = rescuer;
+ rescuer->task->flags |= PF_NO_SETAFFINITY;
wake_up_process(rescuer->task);
}
+ if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
+ goto err_destroy;
+
/*
- * workqueue_lock protects global freeze state and workqueues
- * list. Grab it, set max_active accordingly and add the new
- * workqueue to workqueues list.
+ * wq_pool_mutex protects global freeze state and workqueues list.
+ * Grab it, adjust max_active and add the new @wq to workqueues
+ * list.
*/
- spin_lock(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
- if (workqueue_freezing && wq->flags & WQ_FREEZABLE)
- for_each_pwq_cpu(cpu, wq)
- get_pwq(cpu, wq)->max_active = 0;
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
+ mutex_unlock(&wq->mutex);
list_add(&wq->list, &workqueues);
- spin_unlock(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
return wq;
-err:
- if (wq) {
- free_pwqs(wq);
- free_mayday_mask(wq->mayday_mask);
- kfree(wq->rescuer);
- kfree(wq);
- }
+
+err_free_wq:
+ free_workqueue_attrs(wq->unbound_attrs);
+ kfree(wq);
+ return NULL;
+err_destroy:
+ destroy_workqueue(wq);
return NULL;
}
EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
@@ -3259,60 +4183,78 @@ EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
- unsigned int cpu;
+ struct pool_workqueue *pwq;
+ int node;
/* drain it before proceeding with destruction */
drain_workqueue(wq);
+ /* sanity checks */
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq) {
+ int i;
+
+ for (i = 0; i < WORK_NR_COLORS; i++) {
+ if (WARN_ON(pwq->nr_in_flight[i])) {
+ mutex_unlock(&wq->mutex);
+ return;
+ }
+ }
+
+ if (WARN_ON((pwq != wq->dfl_pwq) && (pwq->refcnt > 1)) ||
+ WARN_ON(pwq->nr_active) ||
+ WARN_ON(!list_empty(&pwq->delayed_works))) {
+ mutex_unlock(&wq->mutex);
+ return;
+ }
+ }
+ mutex_unlock(&wq->mutex);
+
/*
* wq list is used to freeze wq, remove from list after
* flushing is complete in case freeze races us.
*/
- spin_lock(&workqueue_lock);
- list_del(&wq->list);
- spin_unlock(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
+ list_del_init(&wq->list);
+ mutex_unlock(&wq_pool_mutex);
- /* sanity check */
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
- int i;
+ workqueue_sysfs_unregister(wq);
- for (i = 0; i < WORK_NR_COLORS; i++)
- BUG_ON(pwq->nr_in_flight[i]);
- BUG_ON(pwq->nr_active);
- BUG_ON(!list_empty(&pwq->delayed_works));
- }
-
- if (wq->flags & WQ_RESCUER) {
+ if (wq->rescuer) {
kthread_stop(wq->rescuer->task);
- free_mayday_mask(wq->mayday_mask);
kfree(wq->rescuer);
+ wq->rescuer = NULL;
}
- free_pwqs(wq);
- kfree(wq);
-}
-EXPORT_SYMBOL_GPL(destroy_workqueue);
-
-/**
- * pwq_set_max_active - adjust max_active of a pwq
- * @pwq: target pool_workqueue
- * @max_active: new max_active value.
- *
- * Set @pwq->max_active to @max_active and activate delayed works if
- * increased.
- *
- * CONTEXT:
- * spin_lock_irq(pool->lock).
- */
-static void pwq_set_max_active(struct pool_workqueue *pwq, int max_active)
-{
- pwq->max_active = max_active;
+ if (!(wq->flags & WQ_UNBOUND)) {
+ /*
+ * The base ref is never dropped on per-cpu pwqs. Directly
+ * free the pwqs and wq.
+ */
+ free_percpu(wq->cpu_pwqs);
+ kfree(wq);
+ } else {
+ /*
+ * We're the sole accessor of @wq at this point. Directly
+ * access numa_pwq_tbl[] and dfl_pwq to put the base refs.
+ * @wq will be freed when the last pwq is released.
+ */
+ for_each_node(node) {
+ pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
+ RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
+ put_pwq_unlocked(pwq);
+ }
- while (!list_empty(&pwq->delayed_works) &&
- pwq->nr_active < pwq->max_active)
- pwq_activate_first_delayed(pwq);
+ /*
+ * Put dfl_pwq. @wq may be freed any time after dfl_pwq is
+ * put. Don't access it afterwards.
+ */
+ pwq = wq->dfl_pwq;
+ wq->dfl_pwq = NULL;
+ put_pwq_unlocked(pwq);
+ }
}
+EXPORT_SYMBOL_GPL(destroy_workqueue);
/**
* workqueue_set_max_active - adjust max_active of a workqueue
@@ -3326,30 +4268,37 @@ static void pwq_set_max_active(struct pool_workqueue *pwq, int max_active)
*/
void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
{
- unsigned int cpu;
+ struct pool_workqueue *pwq;
+
+ /* disallow meddling with max_active for ordered workqueues */
+ if (WARN_ON(wq->flags & __WQ_ORDERED))
+ return;
max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
- spin_lock(&workqueue_lock);
+ mutex_lock(&wq->mutex);
wq->saved_max_active = max_active;
- for_each_pwq_cpu(cpu, wq) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
- struct worker_pool *pool = pwq->pool;
-
- spin_lock_irq(&pool->lock);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
- if (!(wq->flags & WQ_FREEZABLE) ||
- !(pool->flags & POOL_FREEZING))
- pwq_set_max_active(pwq, max_active);
+ mutex_unlock(&wq->mutex);
+}
+EXPORT_SYMBOL_GPL(workqueue_set_max_active);
- spin_unlock_irq(&pool->lock);
- }
+/**
+ * current_is_workqueue_rescuer - is %current workqueue rescuer?
+ *
+ * Determine whether %current is a workqueue rescuer. Can be used from
+ * work functions to determine whether it's being run off the rescuer task.
+ */
+bool current_is_workqueue_rescuer(void)
+{
+ struct worker *worker = current_wq_worker();
- spin_unlock(&workqueue_lock);
+ return worker && worker->rescue_wq;
}
-EXPORT_SYMBOL_GPL(workqueue_set_max_active);
/**
* workqueue_congested - test whether a workqueue is congested
@@ -3363,11 +4312,22 @@ EXPORT_SYMBOL_GPL(workqueue_set_max_active);
* RETURNS:
* %true if congested, %false otherwise.
*/
-bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
+bool workqueue_congested(int cpu, struct workqueue_struct *wq)
{
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+ struct pool_workqueue *pwq;
+ bool ret;
- return !list_empty(&pwq->delayed_works);
+ rcu_read_lock_sched();
+
+ if (!(wq->flags & WQ_UNBOUND))
+ pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+ else
+ pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
+
+ ret = !list_empty(&pwq->delayed_works);
+ rcu_read_unlock_sched();
+
+ return ret;
}
EXPORT_SYMBOL_GPL(workqueue_congested);
@@ -3384,19 +4344,22 @@ EXPORT_SYMBOL_GPL(workqueue_congested);
*/
unsigned int work_busy(struct work_struct *work)
{
- struct worker_pool *pool = get_work_pool(work);
+ struct worker_pool *pool;
unsigned long flags;
unsigned int ret = 0;
if (work_pending(work))
ret |= WORK_BUSY_PENDING;
+ local_irq_save(flags);
+ pool = get_work_pool(work);
if (pool) {
- spin_lock_irqsave(&pool->lock, flags);
+ spin_lock(&pool->lock);
if (find_worker_executing_work(pool, work))
ret |= WORK_BUSY_RUNNING;
- spin_unlock_irqrestore(&pool->lock, flags);
+ spin_unlock(&pool->lock);
}
+ local_irq_restore(flags);
return ret;
}
@@ -3422,31 +4385,28 @@ static void wq_unbind_fn(struct work_struct *work)
int cpu = smp_processor_id();
struct worker_pool *pool;
struct worker *worker;
- int i;
+ int wi;
- for_each_std_worker_pool(pool, cpu) {
- BUG_ON(cpu != smp_processor_id());
+ for_each_cpu_worker_pool(pool, cpu) {
+ WARN_ON_ONCE(cpu != smp_processor_id());
- mutex_lock(&pool->assoc_mutex);
+ mutex_lock(&pool->manager_mutex);
spin_lock_irq(&pool->lock);
/*
- * We've claimed all manager positions. Make all workers
+ * We've blocked all manager operations. Make all workers
* unbound and set DISASSOCIATED. Before this, all workers
* except for the ones which are still executing works from
* before the last CPU down must be on the cpu. After
* this, they may become diasporas.
*/
- list_for_each_entry(worker, &pool->idle_list, entry)
- worker->flags |= WORKER_UNBOUND;
-
- for_each_busy_worker(worker, i, pool)
+ for_each_pool_worker(worker, wi, pool)
worker->flags |= WORKER_UNBOUND;
pool->flags |= POOL_DISASSOCIATED;
spin_unlock_irq(&pool->lock);
- mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_mutex);
/*
* Call schedule() so that we cross rq->lock and thus can
@@ -3477,6 +4437,103 @@ static void wq_unbind_fn(struct work_struct *work)
}
}
+/**
+ * rebind_workers - rebind all workers of a pool to the associated CPU
+ * @pool: pool of interest
+ *
+ * @pool->cpu is coming online. Rebind all workers to the CPU.
+ */
+static void rebind_workers(struct worker_pool *pool)
+{
+ struct worker *worker;
+ int wi;
+
+ lockdep_assert_held(&pool->manager_mutex);
+
+ /*
+ * Restore CPU affinity of all workers. As all idle workers should
+ * be on the run-queue of the associated CPU before any local
+ * wake-ups for concurrency management happen, restore CPU affinty
+ * of all workers first and then clear UNBOUND. As we're called
+ * from CPU_ONLINE, the following shouldn't fail.
+ */
+ for_each_pool_worker(worker, wi, pool)
+ WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
+ pool->attrs->cpumask) < 0);
+
+ spin_lock_irq(&pool->lock);
+
+ for_each_pool_worker(worker, wi, pool) {
+ unsigned int worker_flags = worker->flags;
+
+ /*
+ * A bound idle worker should actually be on the runqueue
+ * of the associated CPU for local wake-ups targeting it to
+ * work. Kick all idle workers so that they migrate to the
+ * associated CPU. Doing this in the same loop as
+ * replacing UNBOUND with REBOUND is safe as no worker will
+ * be bound before @pool->lock is released.
+ */
+ if (worker_flags & WORKER_IDLE)
+ wake_up_process(worker->task);
+
+ /*
+ * We want to clear UNBOUND but can't directly call
+ * worker_clr_flags() or adjust nr_running. Atomically
+ * replace UNBOUND with another NOT_RUNNING flag REBOUND.
+ * @worker will clear REBOUND using worker_clr_flags() when
+ * it initiates the next execution cycle thus restoring
+ * concurrency management. Note that when or whether
+ * @worker clears REBOUND doesn't affect correctness.
+ *
+ * ACCESS_ONCE() is necessary because @worker->flags may be
+ * tested without holding any lock in
+ * wq_worker_waking_up(). Without it, NOT_RUNNING test may
+ * fail incorrectly leading to premature concurrency
+ * management operations.
+ */
+ WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
+ worker_flags |= WORKER_REBOUND;
+ worker_flags &= ~WORKER_UNBOUND;
+ ACCESS_ONCE(worker->flags) = worker_flags;
+ }
+
+ spin_unlock_irq(&pool->lock);
+}
+
+/**
+ * restore_unbound_workers_cpumask - restore cpumask of unbound workers
+ * @pool: unbound pool of interest
+ * @cpu: the CPU which is coming up
+ *
+ * An unbound pool may end up with a cpumask which doesn't have any online
+ * CPUs. When a worker of such pool get scheduled, the scheduler resets
+ * its cpus_allowed. If @cpu is in @pool's cpumask which didn't have any
+ * online CPU before, cpus_allowed of all its workers should be restored.
+ */
+static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
+{
+ static cpumask_t cpumask;
+ struct worker *worker;
+ int wi;
+
+ lockdep_assert_held(&pool->manager_mutex);
+
+ /* is @cpu allowed for @pool? */
+ if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
+ return;
+
+ /* is @cpu the only online CPU? */
+ cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
+ if (cpumask_weight(&cpumask) != 1)
+ return;
+
+ /* as we're called from CPU_ONLINE, the following shouldn't fail */
+ for_each_pool_worker(worker, wi, pool)
+ WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
+ pool->attrs->cpumask) < 0);
+}
+
/*
* Workqueues should be brought up before normal priority CPU notifiers.
* This will be registered high priority CPU notifier.
@@ -3485,39 +4542,46 @@ static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
- unsigned int cpu = (unsigned long)hcpu;
+ int cpu = (unsigned long)hcpu;
struct worker_pool *pool;
+ struct workqueue_struct *wq;
+ int pi;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_UP_PREPARE:
- for_each_std_worker_pool(pool, cpu) {
- struct worker *worker;
-
+ for_each_cpu_worker_pool(pool, cpu) {
if (pool->nr_workers)
continue;
-
- worker = create_worker(pool);
- if (!worker)
+ if (create_and_start_worker(pool) < 0)
return NOTIFY_BAD;
-
- spin_lock_irq(&pool->lock);
- start_worker(worker);
- spin_unlock_irq(&pool->lock);
}
break;
case CPU_DOWN_FAILED:
case CPU_ONLINE:
- for_each_std_worker_pool(pool, cpu) {
- mutex_lock(&pool->assoc_mutex);
- spin_lock_irq(&pool->lock);
+ mutex_lock(&wq_pool_mutex);
- pool->flags &= ~POOL_DISASSOCIATED;
- rebind_workers(pool);
+ for_each_pool(pool, pi) {
+ mutex_lock(&pool->manager_mutex);
+
+ if (pool->cpu == cpu) {
+ spin_lock_irq(&pool->lock);
+ pool->flags &= ~POOL_DISASSOCIATED;
+ spin_unlock_irq(&pool->lock);
+
+ rebind_workers(pool);
+ } else if (pool->cpu < 0) {
+ restore_unbound_workers_cpumask(pool, cpu);
+ }
- spin_unlock_irq(&pool->lock);
- mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_mutex);
}
+
+ /* update NUMA affinity of unbound workqueues */
+ list_for_each_entry(wq, &workqueues, list)
+ wq_update_unbound_numa(wq, cpu, true);
+
+ mutex_unlock(&wq_pool_mutex);
break;
}
return NOTIFY_OK;
@@ -3531,14 +4595,23 @@ static int __cpuinit workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
- unsigned int cpu = (unsigned long)hcpu;
+ int cpu = (unsigned long)hcpu;
struct work_struct unbind_work;
+ struct workqueue_struct *wq;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_DOWN_PREPARE:
- /* unbinding should happen on the local CPU */
+ /* unbinding per-cpu workers should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
queue_work_on(cpu, system_highpri_wq, &unbind_work);
+
+ /* update NUMA affinity of unbound workqueues */
+ mutex_lock(&wq_pool_mutex);
+ list_for_each_entry(wq, &workqueues, list)
+ wq_update_unbound_numa(wq, cpu, false);
+ mutex_unlock(&wq_pool_mutex);
+
+ /* wait for per-cpu unbinding to finish */
flush_work(&unbind_work);
break;
}
@@ -3571,7 +4644,7 @@ static void work_for_cpu_fn(struct work_struct *work)
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*/
-long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
+long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{
struct work_for_cpu wfc = { .fn = fn, .arg = arg };
@@ -3589,44 +4662,40 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
* freeze_workqueues_begin - begin freezing workqueues
*
* Start freezing workqueues. After this function returns, all freezable
- * workqueues will queue new works to their frozen_works list instead of
+ * workqueues will queue new works to their delayed_works list instead of
* pool->worklist.
*
* CONTEXT:
- * Grabs and releases workqueue_lock and pool->lock's.
+ * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
void freeze_workqueues_begin(void)
{
- unsigned int cpu;
+ struct worker_pool *pool;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
+ int pi;
- spin_lock(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
- BUG_ON(workqueue_freezing);
+ WARN_ON_ONCE(workqueue_freezing);
workqueue_freezing = true;
- for_each_wq_cpu(cpu) {
- struct worker_pool *pool;
- struct workqueue_struct *wq;
-
- for_each_std_worker_pool(pool, cpu) {
- spin_lock_irq(&pool->lock);
-
- WARN_ON_ONCE(pool->flags & POOL_FREEZING);
- pool->flags |= POOL_FREEZING;
-
- list_for_each_entry(wq, &workqueues, list) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
-
- if (pwq && pwq->pool == pool &&
- (wq->flags & WQ_FREEZABLE))
- pwq->max_active = 0;
- }
+ /* set FREEZING */
+ for_each_pool(pool, pi) {
+ spin_lock_irq(&pool->lock);
+ WARN_ON_ONCE(pool->flags & POOL_FREEZING);
+ pool->flags |= POOL_FREEZING;
+ spin_unlock_irq(&pool->lock);
+ }
- spin_unlock_irq(&pool->lock);
- }
+ list_for_each_entry(wq, &workqueues, list) {
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
+ mutex_unlock(&wq->mutex);
}
- spin_unlock(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
}
/**
@@ -3636,7 +4705,7 @@ void freeze_workqueues_begin(void)
* between freeze_workqueues_begin() and thaw_workqueues().
*
* CONTEXT:
- * Grabs and releases workqueue_lock.
+ * Grabs and releases wq_pool_mutex.
*
* RETURNS:
* %true if some freezable workqueues are still busy. %false if freezing
@@ -3644,34 +4713,34 @@ void freeze_workqueues_begin(void)
*/
bool freeze_workqueues_busy(void)
{
- unsigned int cpu;
bool busy = false;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
- spin_lock(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
- BUG_ON(!workqueue_freezing);
+ WARN_ON_ONCE(!workqueue_freezing);
- for_each_wq_cpu(cpu) {
- struct workqueue_struct *wq;
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_FREEZABLE))
+ continue;
/*
* nr_active is monotonically decreasing. It's safe
* to peek without lock.
*/
- list_for_each_entry(wq, &workqueues, list) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
-
- if (!pwq || !(wq->flags & WQ_FREEZABLE))
- continue;
-
- BUG_ON(pwq->nr_active < 0);
+ rcu_read_lock_sched();
+ for_each_pwq(pwq, wq) {
+ WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
+ rcu_read_unlock_sched();
goto out_unlock;
}
}
+ rcu_read_unlock_sched();
}
out_unlock:
- spin_unlock(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
return busy;
}
@@ -3682,104 +4751,141 @@ out_unlock:
* frozen works are transferred to their respective pool worklists.
*
* CONTEXT:
- * Grabs and releases workqueue_lock and pool->lock's.
+ * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
void thaw_workqueues(void)
{
- unsigned int cpu;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
+ struct worker_pool *pool;
+ int pi;
- spin_lock(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
if (!workqueue_freezing)
goto out_unlock;
- for_each_wq_cpu(cpu) {
- struct worker_pool *pool;
- struct workqueue_struct *wq;
+ /* clear FREEZING */
+ for_each_pool(pool, pi) {
+ spin_lock_irq(&pool->lock);
+ WARN_ON_ONCE(!(pool->flags & POOL_FREEZING));
+ pool->flags &= ~POOL_FREEZING;
+ spin_unlock_irq(&pool->lock);
+ }
- for_each_std_worker_pool(pool, cpu) {
- spin_lock_irq(&pool->lock);
+ /* restore max_active and repopulate worklist */
+ list_for_each_entry(wq, &workqueues, list) {
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
+ mutex_unlock(&wq->mutex);
+ }
- WARN_ON_ONCE(!(pool->flags & POOL_FREEZING));
- pool->flags &= ~POOL_FREEZING;
+ workqueue_freezing = false;
+out_unlock:
+ mutex_unlock(&wq_pool_mutex);
+}
+#endif /* CONFIG_FREEZER */
- list_for_each_entry(wq, &workqueues, list) {
- struct pool_workqueue *pwq = get_pwq(cpu, wq);
+static void __init wq_numa_init(void)
+{
+ cpumask_var_t *tbl;
+ int node, cpu;
- if (!pwq || pwq->pool != pool ||
- !(wq->flags & WQ_FREEZABLE))
- continue;
+ /* determine NUMA pwq table len - highest node id + 1 */
+ for_each_node(node)
+ wq_numa_tbl_len = max(wq_numa_tbl_len, node + 1);
- /* restore max_active and repopulate worklist */
- pwq_set_max_active(pwq, wq->saved_max_active);
- }
+ if (num_possible_nodes() <= 1)
+ return;
- wake_up_worker(pool);
+ if (wq_disable_numa) {
+ pr_info("workqueue: NUMA affinity support disabled\n");
+ return;
+ }
+
+ wq_update_unbound_numa_attrs_buf = alloc_workqueue_attrs(GFP_KERNEL);
+ BUG_ON(!wq_update_unbound_numa_attrs_buf);
- spin_unlock_irq(&pool->lock);
+ /*
+ * We want masks of possible CPUs of each node which isn't readily
+ * available. Build one from cpu_to_node() which should have been
+ * fully initialized by now.
+ */
+ tbl = kzalloc(wq_numa_tbl_len * sizeof(tbl[0]), GFP_KERNEL);
+ BUG_ON(!tbl);
+
+ for_each_node(node)
+ BUG_ON(!alloc_cpumask_var_node(&tbl[node], GFP_KERNEL, node));
+
+ for_each_possible_cpu(cpu) {
+ node = cpu_to_node(cpu);
+ if (WARN_ON(node == NUMA_NO_NODE)) {
+ pr_warn("workqueue: NUMA node mapping not available for cpu%d, disabling NUMA support\n", cpu);
+ /* happens iff arch is bonkers, let's just proceed */
+ return;
}
+ cpumask_set_cpu(cpu, tbl[node]);
}
- workqueue_freezing = false;
-out_unlock:
- spin_unlock(&workqueue_lock);
+ wq_numa_possible_cpumask = tbl;
+ wq_numa_enabled = true;
}
-#endif /* CONFIG_FREEZER */
static int __init init_workqueues(void)
{
- unsigned int cpu;
+ int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
+ int i, cpu;
/* make sure we have enough bits for OFFQ pool ID */
BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT)) <
WORK_CPU_END * NR_STD_WORKER_POOLS);
+ WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
+
+ pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
+
cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
+ wq_numa_init();
+
/* initialize CPU pools */
- for_each_wq_cpu(cpu) {
+ for_each_possible_cpu(cpu) {
struct worker_pool *pool;
- for_each_std_worker_pool(pool, cpu) {
- spin_lock_init(&pool->lock);
+ i = 0;
+ for_each_cpu_worker_pool(pool, cpu) {
+ BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
- pool->flags |= POOL_DISASSOCIATED;
- INIT_LIST_HEAD(&pool->worklist);
- INIT_LIST_HEAD(&pool->idle_list);
- hash_init(pool->busy_hash);
-
- init_timer_deferrable(&pool->idle_timer);
- pool->idle_timer.function = idle_worker_timeout;
- pool->idle_timer.data = (unsigned long)pool;
-
- setup_timer(&pool->mayday_timer, pool_mayday_timeout,
- (unsigned long)pool);
-
- mutex_init(&pool->assoc_mutex);
- ida_init(&pool->worker_ida);
+ cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+ pool->attrs->nice = std_nice[i++];
+ pool->node = cpu_to_node(cpu);
/* alloc pool ID */
+ mutex_lock(&wq_pool_mutex);
BUG_ON(worker_pool_assign_id(pool));
+ mutex_unlock(&wq_pool_mutex);
}
}
/* create the initial worker */
- for_each_online_wq_cpu(cpu) {
+ for_each_online_cpu(cpu) {
struct worker_pool *pool;
- for_each_std_worker_pool(pool, cpu) {
- struct worker *worker;
+ for_each_cpu_worker_pool(pool, cpu) {
+ pool->flags &= ~POOL_DISASSOCIATED;
+ BUG_ON(create_and_start_worker(pool) < 0);
+ }
+ }
- if (cpu != WORK_CPU_UNBOUND)
- pool->flags &= ~POOL_DISASSOCIATED;
+ /* create default unbound wq attrs */
+ for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
+ struct workqueue_attrs *attrs;
- worker = create_worker(pool);
- BUG_ON(!worker);
- spin_lock_irq(&pool->lock);
- start_worker(worker);
- spin_unlock_irq(&pool->lock);
- }
+ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
+ attrs->nice = std_nice[i];
+ unbound_std_wq_attrs[i] = attrs;
}
system_wq = alloc_workqueue("events", 0, 0);
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index 07650264ec1..84ab6e1dc6f 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -32,14 +32,12 @@ struct worker {
struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */
struct worker_pool *pool; /* I: the associated pool */
+ /* L: for rescuers */
/* 64 bytes boundary on 64bit, 32 on 32bit */
unsigned long last_active; /* L: last active timestamp */
unsigned int flags; /* X: flags */
int id; /* I: worker id */
- /* for rebinding worker to CPU */
- struct work_struct rebind_work; /* L: for busy worker */
-
/* used only by rescuers to point to the target workqueue */
struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */
};
@@ -58,8 +56,7 @@ static inline struct worker *current_wq_worker(void)
* Scheduler hooks for concurrency managed workqueue. Only to be used from
* sched.c and workqueue.c.
*/
-void wq_worker_waking_up(struct task_struct *task, unsigned int cpu);
-struct task_struct *wq_worker_sleeping(struct task_struct *task,
- unsigned int cpu);
+void wq_worker_waking_up(struct task_struct *task, int cpu);
+struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu);
#endif /* _KERNEL_WORKQUEUE_INTERNAL_H */