From cef50120b61c2af4ce34bc165e19cad66296f93d Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Sun, 5 Feb 2012 07:42:44 -0800 Subject: rcu: Direct algorithmic SRCU implementation The current implementation of synchronize_srcu_expedited() can cause severe OS jitter due to its use of synchronize_sched(), which in turn invokes try_stop_cpus(), which causes each CPU to be sent an IPI. This can result in severe performance degradation for real-time workloads and especially for short-interation-length HPC workloads. Furthermore, because only one instance of try_stop_cpus() can be making forward progress at a given time, only one instance of synchronize_srcu_expedited() can make forward progress at a time, even if they are all operating on distinct srcu_struct structures. This commit, inspired by an earlier implementation by Peter Zijlstra (https://lkml.org/lkml/2012/1/31/211) and by further offline discussions, takes a strictly algorithmic bits-in-memory approach. This has the disadvantage of requiring one explicit memory-barrier instruction in each of srcu_read_lock() and srcu_read_unlock(), but on the other hand completely dispenses with OS jitter and furthermore allows SRCU to be used freely by CPUs that RCU believes to be idle or offline. The update-side implementation handles the single read-side memory barrier by rechecking the per-CPU counters after summing them and by running through the update-side state machine twice. This implementation has passed moderate rcutorture testing on both x86 and Power. Also updated to use this_cpu_ptr() instead of per_cpu_ptr(), as suggested by Peter Zijlstra. Reported-by: Peter Zijlstra Signed-off-by: Paul E. McKenney Signed-off-by: Paul E. McKenney Acked-by: Peter Zijlstra Reviewed-by: Lai Jiangshan --- kernel/srcu.c | 284 ++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 189 insertions(+), 95 deletions(-) (limited to 'kernel/srcu.c') diff --git a/kernel/srcu.c b/kernel/srcu.c index ba35f3a4a1f..84c9b97dc3d 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -73,19 +73,102 @@ EXPORT_SYMBOL_GPL(init_srcu_struct); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ /* - * srcu_readers_active_idx -- returns approximate number of readers - * active on the specified rank of per-CPU counters. + * Returns approximate number of readers active on the specified rank + * of per-CPU counters. Also snapshots each counter's value in the + * corresponding element of sp->snap[] for later use validating + * the sum. */ +static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + unsigned long t; -static int srcu_readers_active_idx(struct srcu_struct *sp, int idx) + for_each_possible_cpu(cpu) { + t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]); + sum += t; + sp->snap[cpu] = t; + } + return sum & SRCU_REF_MASK; +} + +/* + * To be called from the update side after an index flip. Returns true + * if the modulo sum of the counters is stably zero, false if there is + * some possibility of non-zero. + */ +static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) { int cpu; - int sum; - sum = 0; + /* + * Note that srcu_readers_active_idx() can incorrectly return + * zero even though there is a pre-existing reader throughout. + * To see this, suppose that task A is in a very long SRCU + * read-side critical section that started on CPU 0, and that + * no other reader exists, so that the modulo sum of the counters + * is equal to one. Then suppose that task B starts executing + * srcu_readers_active_idx(), summing up to CPU 1, and then that + * task C starts reading on CPU 0, so that its increment is not + * summed, but finishes reading on CPU 2, so that its decrement + * -is- summed. Then when task B completes its sum, it will + * incorrectly get zero, despite the fact that task A has been + * in its SRCU read-side critical section the whole time. + * + * We therefore do a validation step should srcu_readers_active_idx() + * return zero. + */ + if (srcu_readers_active_idx(sp, idx) != 0) + return false; + + /* + * Since the caller recently flipped ->completed, we can see at + * most one increment of each CPU's counter from this point + * forward. The reason for this is that the reader CPU must have + * fetched the index before srcu_readers_active_idx checked + * that CPU's counter, but not yet incremented its counter. + * Its eventual counter increment will follow the read in + * srcu_readers_active_idx(), and that increment is immediately + * followed by smp_mb() B. Because smp_mb() D is between + * the ->completed flip and srcu_readers_active_idx()'s read, + * that CPU's subsequent load of ->completed must see the new + * value, and therefore increment the counter in the other rank. + */ + smp_mb(); /* A */ + + /* + * Now, we check the ->snap array that srcu_readers_active_idx() + * filled in from the per-CPU counter values. Since both + * __srcu_read_lock() and __srcu_read_unlock() increment the + * upper bits of the per-CPU counter, an increment/decrement + * pair will change the value of the counter. Since there is + * only one possible increment, the only way to wrap the counter + * is to have a huge number of counter decrements, which requires + * a huge number of tasks and huge SRCU read-side critical-section + * nesting levels, even on 32-bit systems. + * + * All of the ways of confusing the readings require that the scan + * in srcu_readers_active_idx() see the read-side task's decrement, + * but not its increment. However, between that decrement and + * increment are smb_mb() B and C. Either or both of these pair + * with smp_mb() A above to ensure that the scan below will see + * the read-side tasks's increment, thus noting a difference in + * the counter values between the two passes. + * + * Therefore, if srcu_readers_active_idx() returned zero, and + * none of the counters changed, we know that the zero was the + * correct sum. + * + * Of course, it is possible that a task might be delayed + * for a very long time in __srcu_read_lock() after fetching + * the index but before incrementing its counter. This + * possibility will be dealt with in __synchronize_srcu(). + */ for_each_possible_cpu(cpu) - sum += per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]; - return sum; + if (sp->snap[cpu] != + ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx])) + return false; /* False zero reading! */ + return true; } /** @@ -131,10 +214,11 @@ int __srcu_read_lock(struct srcu_struct *sp) int idx; preempt_disable(); - idx = sp->completed & 0x1; - barrier(); /* ensure compiler looks -once- at sp->completed. */ - per_cpu_ptr(sp->per_cpu_ref, smp_processor_id())->c[idx]++; - srcu_barrier(); /* ensure compiler won't misorder critical section. */ + idx = rcu_dereference_index_check(sp->completed, + rcu_read_lock_sched_held()) & 0x1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += + SRCU_USAGE_COUNT + 1; + smp_mb(); /* B */ /* Avoid leaking the critical section. */ preempt_enable(); return idx; } @@ -149,8 +233,9 @@ EXPORT_SYMBOL_GPL(__srcu_read_lock); void __srcu_read_unlock(struct srcu_struct *sp, int idx) { preempt_disable(); - srcu_barrier(); /* ensure compiler won't misorder critical section. */ - per_cpu_ptr(sp->per_cpu_ref, smp_processor_id())->c[idx]--; + smp_mb(); /* C */ /* Avoid leaking the critical section. */ + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += + SRCU_USAGE_COUNT - 1; preempt_enable(); } EXPORT_SYMBOL_GPL(__srcu_read_unlock); @@ -163,12 +248,65 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 10 +#define SYNCHRONIZE_SRCU_READER_DELAY 5 + +/* + * Flip the readers' index by incrementing ->completed, then wait + * until there are no more readers using the counters referenced by + * the old index value. (Recall that the index is the bottom bit + * of ->completed.) + * + * Of course, it is possible that a reader might be delayed for the + * full duration of flip_idx_and_wait() between fetching the + * index and incrementing its counter. This possibility is handled + * by __synchronize_srcu() invoking flip_idx_and_wait() twice. + */ +static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +{ + int idx; + int trycount = 0; + + idx = sp->completed++ & 0x1; + + /* + * If a reader fetches the index before the above increment, + * but increments its counter after srcu_readers_active_idx_check() + * sums it, then smp_mb() D will pair with __srcu_read_lock()'s + * smp_mb() B to ensure that the SRCU read-side critical section + * will see any updates that the current task performed before its + * call to synchronize_srcu(), or to synchronize_srcu_expedited(), + * as the case may be. + */ + smp_mb(); /* D */ + + /* + * SRCU read-side critical sections are normally short, so wait + * a small amount of time before possibly blocking. + */ + if (!srcu_readers_active_idx_check(sp, idx)) { + udelay(SYNCHRONIZE_SRCU_READER_DELAY); + while (!srcu_readers_active_idx_check(sp, idx)) { + if (expedited && ++ trycount < 10) + udelay(SYNCHRONIZE_SRCU_READER_DELAY); + else + schedule_timeout_interruptible(1); + } + } + + /* + * The following smp_mb() E pairs with srcu_read_unlock()'s + * smp_mb C to ensure that if srcu_readers_active_idx_check() + * sees srcu_read_unlock()'s counter decrement, then any + * of the current task's subsequent code will happen after + * that SRCU read-side critical section. + */ + smp_mb(); /* E */ +} /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) +static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { int idx; @@ -178,90 +316,53 @@ static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - idx = sp->completed; + smp_mb(); /* Ensure prior action happens before grace period. */ + idx = ACCESS_ONCE(sp->completed); + smp_mb(); /* Access to ->completed before lock acquisition. */ mutex_lock(&sp->mutex); /* * Check to see if someone else did the work for us while we were - * waiting to acquire the lock. We need -two- advances of + * waiting to acquire the lock. We need -three- advances of * the counter, not just one. If there was but one, we might have * shown up -after- our helper's first synchronize_sched(), thus * having failed to prevent CPU-reordering races with concurrent - * srcu_read_unlock()s on other CPUs (see comment below). So we - * either (1) wait for two or (2) supply the second ourselves. + * srcu_read_unlock()s on other CPUs (see comment below). If there + * was only two, we are guaranteed to have waited through only one + * full index-flip phase. So we either (1) wait for three or + * (2) supply the additional ones we need. */ - if ((sp->completed - idx) >= 2) { + if (sp->completed == idx + 2) + idx = 1; + else if (sp->completed == idx + 3) { mutex_unlock(&sp->mutex); return; - } - - sync_func(); /* Force memory barrier on all CPUs. */ + } else + idx = 0; /* - * The preceding synchronize_sched() ensures that any CPU that - * sees the new value of sp->completed will also see any preceding - * changes to data structures made by this CPU. This prevents - * some other CPU from reordering the accesses in its SRCU - * read-side critical section to precede the corresponding - * srcu_read_lock() -- ensuring that such references will in - * fact be protected. + * If there were no helpers, then we need to do two flips of + * the index. The first flip is required if there are any + * outstanding SRCU readers even if there are no new readers + * running concurrently with the first counter flip. * - * So it is now safe to do the flip. - */ - - idx = sp->completed & 0x1; - sp->completed++; - - sync_func(); /* Force memory barrier on all CPUs. */ - - /* - * At this point, because of the preceding synchronize_sched(), - * all srcu_read_lock() calls using the old counters have completed. - * Their corresponding critical sections might well be still - * executing, but the srcu_read_lock() primitives themselves - * will have finished executing. We initially give readers - * an arbitrarily chosen 10 microseconds to get out of their - * SRCU read-side critical sections, then loop waiting 1/HZ - * seconds per iteration. The 10-microsecond value has done - * very well in testing. - */ - - if (srcu_readers_active_idx(sp, idx)) - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - while (srcu_readers_active_idx(sp, idx)) - schedule_timeout_interruptible(1); - - sync_func(); /* Force memory barrier on all CPUs. */ - - /* - * The preceding synchronize_sched() forces all srcu_read_unlock() - * primitives that were executing concurrently with the preceding - * for_each_possible_cpu() loop to have completed by this point. - * More importantly, it also forces the corresponding SRCU read-side - * critical sections to have also completed, and the corresponding - * references to SRCU-protected data items to be dropped. + * The second flip is required when a new reader picks up + * the old value of the index, but does not increment its + * counter until after its counters is summed/rechecked by + * srcu_readers_active_idx_check(). In this case, the current SRCU + * grace period would be OK because the SRCU read-side critical + * section started after this SRCU grace period started, so the + * grace period is not required to wait for the reader. * - * Note: - * - * Despite what you might think at first glance, the - * preceding synchronize_sched() -must- be within the - * critical section ended by the following mutex_unlock(). - * Otherwise, a task taking the early exit can race - * with a srcu_read_unlock(), which might have executed - * just before the preceding srcu_readers_active() check, - * and whose CPU might have reordered the srcu_read_unlock() - * with the preceding critical section. In this case, there - * is nothing preventing the synchronize_sched() task that is - * taking the early exit from freeing a data structure that - * is still being referenced (out of order) by the task - * doing the srcu_read_unlock(). - * - * Alternatively, the comparison with "2" on the early exit - * could be changed to "3", but this increases synchronize_srcu() - * latency for bulk loads. So the current code is preferred. + * However, the next SRCU grace period would be waiting for the + * other set of counters to go to zero, and therefore would not + * wait for the reader, which would be very bad. To avoid this + * bad scenario, we flip and wait twice, clearing out both sets + * of counters. */ - + for (; idx < 2; idx++) + flip_idx_and_wait(sp, expedited); mutex_unlock(&sp->mutex); } @@ -281,7 +382,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) */ void synchronize_srcu(struct srcu_struct *sp) { - __synchronize_srcu(sp, synchronize_sched); + __synchronize_srcu(sp, 0); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -289,18 +390,11 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); * synchronize_srcu_expedited - Brute-force SRCU grace period * @sp: srcu_struct with which to synchronize. * - * Wait for an SRCU grace period to elapse, but use a "big hammer" - * approach to force the grace period to end quickly. This consumes - * significant time on all CPUs and is unfriendly to real-time workloads, - * so is thus not recommended for any sort of common-case code. In fact, - * if you are using synchronize_srcu_expedited() in a loop, please - * restructure your code to batch your updates, and then use a single - * synchronize_srcu() instead. + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. * * Note that it is illegal to call this function while holding any lock - * that is acquired by a CPU-hotplug notifier. And yes, it is also illegal - * to call this function from a CPU-hotplug notifier. Failing to observe - * these restriction will result in deadlock. It is also illegal to call + * that is acquired by a CPU-hotplug notifier. It is also illegal to call * synchronize_srcu_expedited() from the corresponding SRCU read-side * critical section; doing so will result in deadlock. However, it is * perfectly legal to call synchronize_srcu_expedited() on one srcu_struct @@ -309,7 +403,7 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - __synchronize_srcu(sp, synchronize_sched_expedited); + __synchronize_srcu(sp, 1); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); -- cgit v1.2.3-70-g09d2 From 4b7a3e9e32114a09c61995048f055615b5d4c26d Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Wed, 22 Feb 2012 13:06:51 -0800 Subject: rcu: Remove fast check path from __synchronize_srcu() The fastpath in __synchronize_srcu() is designed to handle cases where there are a large number of concurrent calls for the same srcu_struct structure. However, the Linux kernel currently does not use SRCU in this manner, so remove the fastpath checks for simplicity. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) (limited to 'kernel/srcu.c') diff --git a/kernel/srcu.c b/kernel/srcu.c index 84c9b97dc3d..17e95bcc901 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -308,7 +308,7 @@ static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) */ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { - int idx; + int idx = 0; rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && @@ -316,31 +316,8 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - smp_mb(); /* Ensure prior action happens before grace period. */ - idx = ACCESS_ONCE(sp->completed); - smp_mb(); /* Access to ->completed before lock acquisition. */ mutex_lock(&sp->mutex); - /* - * Check to see if someone else did the work for us while we were - * waiting to acquire the lock. We need -three- advances of - * the counter, not just one. If there was but one, we might have - * shown up -after- our helper's first synchronize_sched(), thus - * having failed to prevent CPU-reordering races with concurrent - * srcu_read_unlock()s on other CPUs (see comment below). If there - * was only two, we are guaranteed to have waited through only one - * full index-flip phase. So we either (1) wait for three or - * (2) supply the additional ones we need. - */ - - if (sp->completed == idx + 2) - idx = 1; - else if (sp->completed == idx + 3) { - mutex_unlock(&sp->mutex); - return; - } else - idx = 0; - /* * If there were no helpers, then we need to do two flips of * the index. The first flip is required if there are any -- cgit v1.2.3-70-g09d2 From 440253c17fc4ed41d778492a7fb44dc0d756eccc Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Wed, 22 Feb 2012 13:29:06 -0800 Subject: rcu: Increment upper bit only for srcu_read_lock() The purpose of the upper bit of SRCU's per-CPU counters is to guarantee that no reasonable series of srcu_read_lock() and srcu_read_unlock() operations can return the value of the counter to its original value. This guarantee is require only after the index has been switched to the other set of counters, so at most one srcu_read_lock() can affect a given CPU's counter. The number of srcu_read_unlock() operations on a given counter is limited to the number of tasks in the system, which given the Linux kernel's current structure is limited to far less than 2^30 on 32-bit systems and far less than 2^62 on 64-bit systems. (Something about a limited number of bytes in the kernel's address space.) Therefore, if srcu_read_lock() increments the upper bits, then srcu_read_unlock() need not do so. In this case, an srcu_read_lock() and an srcu_read_unlock() will flip the lower bit of the upper field of the counter. An unreasonably large additional number of srcu_read_unlock() operations would be required to return the counter to its initial value, thus preserving the guarantee. This commit takes this approach, which further allows it to shrink the size of the upper field to one bit, making the number of srcu_read_unlock() operations required to return the counter to its initial value even more unreasonable than before. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 2 +- kernel/srcu.c | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) (limited to 'kernel/srcu.c') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index a478c8eb847..5b49d41868c 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -35,7 +35,7 @@ struct srcu_struct_array { }; /* Bit definitions for field ->c above and ->snap below. */ -#define SRCU_USAGE_BITS 2 +#define SRCU_USAGE_BITS 1 #define SRCU_REF_MASK (ULONG_MAX >> SRCU_USAGE_BITS) #define SRCU_USAGE_COUNT (SRCU_REF_MASK + 1) diff --git a/kernel/srcu.c b/kernel/srcu.c index 17e95bcc901..43f1d61e513 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -138,14 +138,14 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) /* * Now, we check the ->snap array that srcu_readers_active_idx() - * filled in from the per-CPU counter values. Since both - * __srcu_read_lock() and __srcu_read_unlock() increment the - * upper bits of the per-CPU counter, an increment/decrement - * pair will change the value of the counter. Since there is - * only one possible increment, the only way to wrap the counter - * is to have a huge number of counter decrements, which requires - * a huge number of tasks and huge SRCU read-side critical-section - * nesting levels, even on 32-bit systems. + * filled in from the per-CPU counter values. Since + * __srcu_read_lock() increments the upper bits of the per-CPU + * counter, an increment/decrement pair will change the value + * of the counter. Since there is only one possible increment, + * the only way to wrap the counter is to have a huge number of + * counter decrements, which requires a huge number of tasks and + * huge SRCU read-side critical-section nesting levels, even on + * 32-bit systems. * * All of the ways of confusing the readings require that the scan * in srcu_readers_active_idx() see the read-side task's decrement, @@ -234,8 +234,7 @@ void __srcu_read_unlock(struct srcu_struct *sp, int idx) { preempt_disable(); smp_mb(); /* C */ /* Avoid leaking the critical section. */ - ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += - SRCU_USAGE_COUNT - 1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) -= 1; preempt_enable(); } EXPORT_SYMBOL_GPL(__srcu_read_unlock); -- cgit v1.2.3-70-g09d2 From 944ce9af4767ca085d465e4add69df11a8faa9ef Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Wed, 22 Feb 2012 16:43:55 -0800 Subject: rcu: Flip ->completed only once per SRCU grace period This is an optimization of the SRCU grace period. To guard against preempted readers with old values of the counter, it suffices to scan the old counters once more, then flip ->completed only one time. The reason this works is that the old readers must have incremented the old set of counters (if they have not yet incremented, then their critical section starts after this grace period, so they may be safely ignored). This commit therefore optimizes the second flip out in favor of a simple rescan. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 92 ++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 36 deletions(-) (limited to 'kernel/srcu.c') diff --git a/kernel/srcu.c b/kernel/srcu.c index 43f1d61e513..b6b9ea2eb51 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -249,26 +249,12 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ #define SYNCHRONIZE_SRCU_READER_DELAY 5 -/* - * Flip the readers' index by incrementing ->completed, then wait - * until there are no more readers using the counters referenced by - * the old index value. (Recall that the index is the bottom bit - * of ->completed.) - * - * Of course, it is possible that a reader might be delayed for the - * full duration of flip_idx_and_wait() between fetching the - * index and incrementing its counter. This possibility is handled - * by __synchronize_srcu() invoking flip_idx_and_wait() twice. - */ -static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) { - int idx; int trycount = 0; - idx = sp->completed++ & 0x1; - /* - * If a reader fetches the index before the above increment, + * If a reader fetches the index before the ->completed increment, * but increments its counter after srcu_readers_active_idx_check() * sums it, then smp_mb() D will pair with __srcu_read_lock()'s * smp_mb() B to ensure that the SRCU read-side critical section @@ -298,17 +284,38 @@ static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) * sees srcu_read_unlock()'s counter decrement, then any * of the current task's subsequent code will happen after * that SRCU read-side critical section. + * + * It also ensures the order between the above waiting and + * the next flipping. */ smp_mb(); /* E */ } +/* + * Flip the readers' index by incrementing ->completed, then wait + * until there are no more readers using the counters referenced by + * the old index value. (Recall that the index is the bottom bit + * of ->completed.) + * + * Of course, it is possible that a reader might be delayed for the + * full duration of flip_idx_and_wait() between fetching the + * index and incrementing its counter. This possibility is handled + * by the next __synchronize_srcu() invoking wait_idx() for such readers + * before starting a new grace period. + */ +static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +{ + int idx; + + idx = sp->completed++ & 0x1; + wait_idx(sp, idx, expedited); +} + /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { - int idx = 0; - rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && !lock_is_held(&rcu_lock_map) && @@ -318,27 +325,40 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) mutex_lock(&sp->mutex); /* - * If there were no helpers, then we need to do two flips of - * the index. The first flip is required if there are any - * outstanding SRCU readers even if there are no new readers - * running concurrently with the first counter flip. + * Suppose that during the previous grace period, a reader + * picked up the old value of the index, but did not increment + * its counter until after the previous instance of + * __synchronize_srcu() did the counter summation and recheck. + * That previous grace period was OK because the reader did + * not start until after the grace period started, so the grace + * period was not obligated to wait for that reader. + * + * However, the current SRCU grace period does have to wait for + * that reader. This is handled by invoking wait_idx() on the + * non-active set of counters (hence sp->completed - 1). Once + * wait_idx() returns, we know that all readers that picked up + * the old value of ->completed and that already incremented their + * counter will have completed. * - * The second flip is required when a new reader picks up - * the old value of the index, but does not increment its - * counter until after its counters is summed/rechecked by - * srcu_readers_active_idx_check(). In this case, the current SRCU - * grace period would be OK because the SRCU read-side critical - * section started after this SRCU grace period started, so the - * grace period is not required to wait for the reader. + * But what about readers that picked up the old value of + * ->completed, but -still- have not managed to increment their + * counter? We do not need to wait for those readers, because + * they will have started their SRCU read-side critical section + * after the current grace period starts. * - * However, the next SRCU grace period would be waiting for the - * other set of counters to go to zero, and therefore would not - * wait for the reader, which would be very bad. To avoid this - * bad scenario, we flip and wait twice, clearing out both sets - * of counters. + * Because it is unlikely that readers will be preempted between + * fetching ->completed and incrementing their counter, wait_idx() + * will normally not need to wait. */ - for (; idx < 2; idx++) - flip_idx_and_wait(sp, expedited); + wait_idx(sp, (sp->completed - 1) & 0x1, expedited); + + /* + * Now that wait_idx() has waited for the really old readers, + * invoke flip_idx_and_wait() to flip the counter and wait + * for current SRCU readers. + */ + flip_idx_and_wait(sp, expedited); + mutex_unlock(&sp->mutex); } -- cgit v1.2.3-70-g09d2 From 18108ebfebe9e871d0a9af830baf8f5df69eb5fc Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 27 Feb 2012 09:28:10 -0800 Subject: rcu: Improve SRCU's wait_idx() comments The safety of SRCU is provided byy wait_idx() rather than flipping. The flipping actually prevents starvation. This commit therefore updates the comments to more accurately and precisely describe what is going on. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 77 ++++++++++++++++++++++++++++------------------------------- 1 file changed, 37 insertions(+), 40 deletions(-) (limited to 'kernel/srcu.c') diff --git a/kernel/srcu.c b/kernel/srcu.c index b6b9ea2eb51..1fecb4d858e 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -249,6 +249,10 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); */ #define SYNCHRONIZE_SRCU_READER_DELAY 5 +/* + * Wait until all pre-existing readers complete. Such readers + * will have used the index specified by "idx". + */ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) { int trycount = 0; @@ -291,24 +295,9 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) smp_mb(); /* E */ } -/* - * Flip the readers' index by incrementing ->completed, then wait - * until there are no more readers using the counters referenced by - * the old index value. (Recall that the index is the bottom bit - * of ->completed.) - * - * Of course, it is possible that a reader might be delayed for the - * full duration of flip_idx_and_wait() between fetching the - * index and incrementing its counter. This possibility is handled - * by the next __synchronize_srcu() invoking wait_idx() for such readers - * before starting a new grace period. - */ -static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +static void srcu_flip(struct srcu_struct *sp) { - int idx; - - idx = sp->completed++ & 0x1; - wait_idx(sp, idx, expedited); + sp->completed++; } /* @@ -316,6 +305,8 @@ static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) */ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { + int busy_idx; + rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && !lock_is_held(&rcu_lock_map) && @@ -323,8 +314,28 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); mutex_lock(&sp->mutex); + busy_idx = sp->completed & 0X1UL; /* + * If we recently flipped the index, there will be some readers + * using idx=0 and others using idx=1. Therefore, two calls to + * wait_idx()s suffice to ensure that all pre-existing readers + * have completed: + * + * __synchronize_srcu() { + * wait_idx(sp, 0, expedited); + * wait_idx(sp, 1, expedited); + * } + * + * Starvation is prevented by the fact that we flip the index. + * While we wait on one index to clear out, almost all new readers + * will be using the other index. The number of new readers using the + * index we are waiting on is sharply bounded by roughly the number + * of CPUs. + * + * How can new readers possibly using the old pre-flip value of + * the index? Consider the following sequence of events: + * * Suppose that during the previous grace period, a reader * picked up the old value of the index, but did not increment * its counter until after the previous instance of @@ -333,31 +344,17 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) * not start until after the grace period started, so the grace * period was not obligated to wait for that reader. * - * However, the current SRCU grace period does have to wait for - * that reader. This is handled by invoking wait_idx() on the - * non-active set of counters (hence sp->completed - 1). Once - * wait_idx() returns, we know that all readers that picked up - * the old value of ->completed and that already incremented their - * counter will have completed. - * - * But what about readers that picked up the old value of - * ->completed, but -still- have not managed to increment their - * counter? We do not need to wait for those readers, because - * they will have started their SRCU read-side critical section - * after the current grace period starts. - * - * Because it is unlikely that readers will be preempted between - * fetching ->completed and incrementing their counter, wait_idx() - * will normally not need to wait. + * However, this sequence of events is quite improbable, so + * this call to wait_idx(), which waits on really old readers + * describe in this comment above, will almost never need to wait. */ - wait_idx(sp, (sp->completed - 1) & 0x1, expedited); + wait_idx(sp, 1 - busy_idx, expedited); - /* - * Now that wait_idx() has waited for the really old readers, - * invoke flip_idx_and_wait() to flip the counter and wait - * for current SRCU readers. - */ - flip_idx_and_wait(sp, expedited); + /* Flip the index to avoid reader-induced starvation. */ + srcu_flip(sp); + + /* Wait for recent pre-existing readers. */ + wait_idx(sp, busy_idx, expedited); mutex_unlock(&sp->mutex); } -- cgit v1.2.3-70-g09d2 From b52ce066c55a6a53cf1f8d71308d74f908e31b99 Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 27 Feb 2012 09:29:09 -0800 Subject: rcu: Implement a variant of Peter's SRCU algorithm This commit implements a variant of Peter's algorithm, which may be found at https://lkml.org/lkml/2012/2/1/119. o Make the checking lock-free to enable parallel checking. Parallel checking is required when (1) the original checking task is preempted for a long time, (2) sychronize_srcu_expedited() starts during an ongoing SRCU grace period, or (3) we wish to avoid acquiring a lock. o Since the checking is lock-free, we avoid a mutex in state machine for call_srcu(). o Remove the SRCU_REF_MASK and remove the coupling with the flipping. This might allow us to remove the preempt_disable() in future versions, though such removal will need great care because it rescinds the one-old-reader-per-CPU guarantee. o Remove a smp_mb(), simplify the comments and make the smp_mb() pairs more intuitive. Inspired-by: Peter Zijlstra Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 7 +-- kernel/srcu.c | 149 ++++++++++++++++++++++++--------------------------- 2 files changed, 70 insertions(+), 86 deletions(-) (limited to 'kernel/srcu.c') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 5b49d41868c..15354db3e86 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -32,18 +32,13 @@ struct srcu_struct_array { unsigned long c[2]; + unsigned long seq[2]; }; -/* Bit definitions for field ->c above and ->snap below. */ -#define SRCU_USAGE_BITS 1 -#define SRCU_REF_MASK (ULONG_MAX >> SRCU_USAGE_BITS) -#define SRCU_USAGE_COUNT (SRCU_REF_MASK + 1) - struct srcu_struct { unsigned completed; struct srcu_struct_array __percpu *per_cpu_ref; struct mutex mutex; - unsigned long snap[NR_CPUS]; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ diff --git a/kernel/srcu.c b/kernel/srcu.c index 1fecb4d858e..e0139a27485 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -72,11 +72,26 @@ EXPORT_SYMBOL_GPL(init_srcu_struct); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +/* + * Returns approximate total of the readers' ->seq[] values for the + * rank of per-CPU counters specified by idx. + */ +static unsigned long srcu_readers_seq_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + unsigned long t; + + for_each_possible_cpu(cpu) { + t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->seq[idx]); + sum += t; + } + return sum; +} + /* * Returns approximate number of readers active on the specified rank - * of per-CPU counters. Also snapshots each counter's value in the - * corresponding element of sp->snap[] for later use validating - * the sum. + * of the per-CPU ->c[] counters. */ static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) { @@ -87,26 +102,45 @@ static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) for_each_possible_cpu(cpu) { t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]); sum += t; - sp->snap[cpu] = t; } - return sum & SRCU_REF_MASK; + return sum; } /* - * To be called from the update side after an index flip. Returns true - * if the modulo sum of the counters is stably zero, false if there is - * some possibility of non-zero. + * Return true if the number of pre-existing readers is determined to + * be stably zero. An example unstable zero can occur if the call + * to srcu_readers_active_idx() misses an __srcu_read_lock() increment, + * but due to task migration, sees the corresponding __srcu_read_unlock() + * decrement. This can happen because srcu_readers_active_idx() takes + * time to sum the array, and might in fact be interrupted or preempted + * partway through the summation. */ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) { - int cpu; + unsigned long seq; + + seq = srcu_readers_seq_idx(sp, idx); + + /* + * The following smp_mb() A pairs with the smp_mb() B located in + * __srcu_read_lock(). This pairing ensures that if an + * __srcu_read_lock() increments its counter after the summation + * in srcu_readers_active_idx(), then the corresponding SRCU read-side + * critical section will see any changes made prior to the start + * of the current SRCU grace period. + * + * Also, if the above call to srcu_readers_seq_idx() saw the + * increment of ->seq[], then the call to srcu_readers_active_idx() + * must see the increment of ->c[]. + */ + smp_mb(); /* A */ /* * Note that srcu_readers_active_idx() can incorrectly return * zero even though there is a pre-existing reader throughout. * To see this, suppose that task A is in a very long SRCU * read-side critical section that started on CPU 0, and that - * no other reader exists, so that the modulo sum of the counters + * no other reader exists, so that the sum of the counters * is equal to one. Then suppose that task B starts executing * srcu_readers_active_idx(), summing up to CPU 1, and then that * task C starts reading on CPU 0, so that its increment is not @@ -122,53 +156,31 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) return false; /* - * Since the caller recently flipped ->completed, we can see at - * most one increment of each CPU's counter from this point - * forward. The reason for this is that the reader CPU must have - * fetched the index before srcu_readers_active_idx checked - * that CPU's counter, but not yet incremented its counter. - * Its eventual counter increment will follow the read in - * srcu_readers_active_idx(), and that increment is immediately - * followed by smp_mb() B. Because smp_mb() D is between - * the ->completed flip and srcu_readers_active_idx()'s read, - * that CPU's subsequent load of ->completed must see the new - * value, and therefore increment the counter in the other rank. - */ - smp_mb(); /* A */ - - /* - * Now, we check the ->snap array that srcu_readers_active_idx() - * filled in from the per-CPU counter values. Since - * __srcu_read_lock() increments the upper bits of the per-CPU - * counter, an increment/decrement pair will change the value - * of the counter. Since there is only one possible increment, - * the only way to wrap the counter is to have a huge number of - * counter decrements, which requires a huge number of tasks and - * huge SRCU read-side critical-section nesting levels, even on - * 32-bit systems. + * The remainder of this function is the validation step. + * The following smp_mb() D pairs with the smp_mb() C in + * __srcu_read_unlock(). If the __srcu_read_unlock() was seen + * by srcu_readers_active_idx() above, then any destructive + * operation performed after the grace period will happen after + * the corresponding SRCU read-side critical section. * - * All of the ways of confusing the readings require that the scan - * in srcu_readers_active_idx() see the read-side task's decrement, - * but not its increment. However, between that decrement and - * increment are smb_mb() B and C. Either or both of these pair - * with smp_mb() A above to ensure that the scan below will see - * the read-side tasks's increment, thus noting a difference in - * the counter values between the two passes. - * - * Therefore, if srcu_readers_active_idx() returned zero, and - * none of the counters changed, we know that the zero was the - * correct sum. - * - * Of course, it is possible that a task might be delayed - * for a very long time in __srcu_read_lock() after fetching - * the index but before incrementing its counter. This - * possibility will be dealt with in __synchronize_srcu(). + * Note that there can be at most NR_CPUS worth of readers using + * the old index, which is not enough to overflow even a 32-bit + * integer. (Yes, this does mean that systems having more than + * a billion or so CPUs need to be 64-bit systems.) Therefore, + * the sum of the ->seq[] counters cannot possibly overflow. + * Therefore, the only way that the return values of the two + * calls to srcu_readers_seq_idx() can be equal is if there were + * no increments of the corresponding rank of ->seq[] counts + * in the interim. But the missed-increment scenario laid out + * above includes an increment of the ->seq[] counter by + * the corresponding __srcu_read_lock(). Therefore, if this + * scenario occurs, the return values from the two calls to + * srcu_readers_seq_idx() will differ, and thus the validation + * step below suffices. */ - for_each_possible_cpu(cpu) - if (sp->snap[cpu] != - ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx])) - return false; /* False zero reading! */ - return true; + smp_mb(); /* D */ + + return srcu_readers_seq_idx(sp, idx) == seq; } /** @@ -216,9 +228,9 @@ int __srcu_read_lock(struct srcu_struct *sp) preempt_disable(); idx = rcu_dereference_index_check(sp->completed, rcu_read_lock_sched_held()) & 0x1; - ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += - SRCU_USAGE_COUNT + 1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += 1; smp_mb(); /* B */ /* Avoid leaking the critical section. */ + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->seq[idx]) += 1; preempt_enable(); return idx; } @@ -257,17 +269,6 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) { int trycount = 0; - /* - * If a reader fetches the index before the ->completed increment, - * but increments its counter after srcu_readers_active_idx_check() - * sums it, then smp_mb() D will pair with __srcu_read_lock()'s - * smp_mb() B to ensure that the SRCU read-side critical section - * will see any updates that the current task performed before its - * call to synchronize_srcu(), or to synchronize_srcu_expedited(), - * as the case may be. - */ - smp_mb(); /* D */ - /* * SRCU read-side critical sections are normally short, so wait * a small amount of time before possibly blocking. @@ -281,18 +282,6 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) schedule_timeout_interruptible(1); } } - - /* - * The following smp_mb() E pairs with srcu_read_unlock()'s - * smp_mb C to ensure that if srcu_readers_active_idx_check() - * sees srcu_read_unlock()'s counter decrement, then any - * of the current task's subsequent code will happen after - * that SRCU read-side critical section. - * - * It also ensures the order between the above waiting and - * the next flipping. - */ - smp_mb(); /* E */ } static void srcu_flip(struct srcu_struct *sp) -- cgit v1.2.3-70-g09d2 From dc87917501e324701dbfb249def44054b5220187 Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Tue, 6 Mar 2012 17:57:34 +0800 Subject: rcu: Improve srcu_readers_active_idx()'s cache locality Expand the calls to srcu_readers_active_idx() from srcu_readers_active() inline. This change improves cache locality by interating over the CPUs once rather than twice. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney --- kernel/srcu.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'kernel/srcu.c') diff --git a/kernel/srcu.c b/kernel/srcu.c index e0139a27485..a43211c9286 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -193,7 +193,14 @@ static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) */ static int srcu_readers_active(struct srcu_struct *sp) { - return srcu_readers_active_idx(sp, 0) + srcu_readers_active_idx(sp, 1); + int cpu; + unsigned long sum = 0; + + for_each_possible_cpu(cpu) { + sum += ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[0]); + sum += ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[1]); + } + return sum; } /** -- cgit v1.2.3-70-g09d2 From d9792edd7a9a0858a3b1df92cf8beb31e4191e3c Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 19 Mar 2012 16:12:12 +0800 Subject: rcu: Use single value to handle expedited SRCU grace periods The earlier algorithm used an "expedited" flag combined with a "trycount" counter to differentiate between normal and expedited SRCU grace periods. However, the difference can be encoded into a single counter with a cutoff value and different initial values for expedited and normal SRCU grace periods. This commit makes that change. Signed-off-by: Lai Jiangshan Signed-off-by: Paul E. McKenney Conflicts: kernel/srcu.c --- kernel/srcu.c | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) (limited to 'kernel/srcu.c') diff --git a/kernel/srcu.c b/kernel/srcu.c index a43211c9286..b9088524935 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -266,16 +266,16 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 5 +#define SYNCHRONIZE_SRCU_READER_DELAY 5 +#define SYNCHRONIZE_SRCU_TRYCOUNT 2 +#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* * Wait until all pre-existing readers complete. Such readers * will have used the index specified by "idx". */ -static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) +static void wait_idx(struct srcu_struct *sp, int idx, int trycount) { - int trycount = 0; - /* * SRCU read-side critical sections are normally short, so wait * a small amount of time before possibly blocking. @@ -283,9 +283,10 @@ static void wait_idx(struct srcu_struct *sp, int idx, bool expedited) if (!srcu_readers_active_idx_check(sp, idx)) { udelay(SYNCHRONIZE_SRCU_READER_DELAY); while (!srcu_readers_active_idx_check(sp, idx)) { - if (expedited && ++ trycount < 10) + if (trycount > 0) { + trycount--; udelay(SYNCHRONIZE_SRCU_READER_DELAY); - else + } else schedule_timeout_interruptible(1); } } @@ -299,7 +300,7 @@ static void srcu_flip(struct srcu_struct *sp) /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) +static void __synchronize_srcu(struct srcu_struct *sp, int trycount) { int busy_idx; @@ -319,8 +320,8 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) * have completed: * * __synchronize_srcu() { - * wait_idx(sp, 0, expedited); - * wait_idx(sp, 1, expedited); + * wait_idx(sp, 0, trycount); + * wait_idx(sp, 1, trycount); * } * * Starvation is prevented by the fact that we flip the index. @@ -344,13 +345,13 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) * this call to wait_idx(), which waits on really old readers * describe in this comment above, will almost never need to wait. */ - wait_idx(sp, 1 - busy_idx, expedited); + wait_idx(sp, 1 - busy_idx, trycount); /* Flip the index to avoid reader-induced starvation. */ srcu_flip(sp); /* Wait for recent pre-existing readers. */ - wait_idx(sp, busy_idx, expedited); + wait_idx(sp, busy_idx, trycount); mutex_unlock(&sp->mutex); } @@ -371,7 +372,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) */ void synchronize_srcu(struct srcu_struct *sp) { - __synchronize_srcu(sp, 0); + __synchronize_srcu(sp, SYNCHRONIZE_SRCU_TRYCOUNT); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -392,7 +393,7 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - __synchronize_srcu(sp, 1); + __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); -- cgit v1.2.3-70-g09d2 From 931ea9d1a6e06a5e3af03aa4aaaa7c7fd90e163f Mon Sep 17 00:00:00 2001 From: Lai Jiangshan Date: Mon, 19 Mar 2012 16:12:13 +0800 Subject: rcu: Implement per-domain single-threaded call_srcu() state machine This commit implements an SRCU state machine in support of call_srcu(). The state machine is preemptible, light-weight, and single-threaded, minimizing synchronization overhead. In particular, there is no longer any need for synchronize_srcu() to be guarded by a mutex. Expedited processing is handled, at least in the absence of concurrent grace-period operations on that same srcu_struct structure, by having the synchronize_srcu_expedited() thread take on the role of the workqueue thread for one iteration. There is a reasonable probability that a given SRCU callback will be invoked on the same CPU that registered it, however, there is no guarantee. Concurrent SRCU grace-period primitives can cause callbacks to be executed elsewhere, even in absence of CPU-hotplug operations. Callbacks execute in process context, but under the influence of local_bh_disable(), so it is illegal to sleep in an SRCU callback function. Signed-off-by: Lai Jiangshan Acked-by: Peter Zijlstra Signed-off-by: Paul E. McKenney --- include/linux/srcu.h | 37 +++++- kernel/srcu.c | 362 ++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 336 insertions(+), 63 deletions(-) (limited to 'kernel/srcu.c') diff --git a/include/linux/srcu.h b/include/linux/srcu.h index e5ce80452b6..55a5c52cbb2 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -29,16 +29,30 @@ #include #include +#include struct srcu_struct_array { unsigned long c[2]; unsigned long seq[2]; }; +struct rcu_batch { + struct rcu_head *head, **tail; +}; + struct srcu_struct { unsigned completed; struct srcu_struct_array __percpu *per_cpu_ref; - struct mutex mutex; + spinlock_t queue_lock; /* protect ->batch_queue, ->running */ + bool running; + /* callbacks just queued */ + struct rcu_batch batch_queue; + /* callbacks try to do the first check_zero */ + struct rcu_batch batch_check0; + /* callbacks done with the first check_zero and the flip */ + struct rcu_batch batch_check1; + struct rcu_batch batch_done; + struct delayed_work work; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ @@ -62,12 +76,33 @@ int init_srcu_struct(struct srcu_struct *sp); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ +/** + * call_srcu() - Queue a callback for invocation after an SRCU grace period + * @sp: srcu_struct in queue the callback + * @head: structure to be used for queueing the SRCU callback. + * @func: function to be invoked after the SRCU grace period + * + * The callback function will be invoked some time after a full SRCU + * grace period elapses, in other words after all pre-existing SRCU + * read-side critical sections have completed. However, the callback + * function might well execute concurrently with other SRCU read-side + * critical sections that started after call_srcu() was invoked. SRCU + * read-side critical sections are delimited by srcu_read_lock() and + * srcu_read_unlock(), and may be nested. + * + * The callback will be invoked from process context, but must nevertheless + * be fast and must not block. + */ +void call_srcu(struct srcu_struct *sp, struct rcu_head *head, + void (*func)(struct rcu_head *head)); + void cleanup_srcu_struct(struct srcu_struct *sp); int __srcu_read_lock(struct srcu_struct *sp) __acquires(sp); void __srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp); void synchronize_srcu(struct srcu_struct *sp); void synchronize_srcu_expedited(struct srcu_struct *sp); long srcu_batches_completed(struct srcu_struct *sp); +void srcu_barrier(struct srcu_struct *sp); #ifdef CONFIG_DEBUG_LOCK_ALLOC diff --git a/kernel/srcu.c b/kernel/srcu.c index b9088524935..2095be3318d 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -34,10 +34,77 @@ #include #include +/* + * Initialize an rcu_batch structure to empty. + */ +static inline void rcu_batch_init(struct rcu_batch *b) +{ + b->head = NULL; + b->tail = &b->head; +} + +/* + * Enqueue a callback onto the tail of the specified rcu_batch structure. + */ +static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head) +{ + *b->tail = head; + b->tail = &head->next; +} + +/* + * Is the specified rcu_batch structure empty? + */ +static inline bool rcu_batch_empty(struct rcu_batch *b) +{ + return b->tail == &b->head; +} + +/* + * Remove the callback at the head of the specified rcu_batch structure + * and return a pointer to it, or return NULL if the structure is empty. + */ +static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b) +{ + struct rcu_head *head; + + if (rcu_batch_empty(b)) + return NULL; + + head = b->head; + b->head = head->next; + if (b->tail == &head->next) + rcu_batch_init(b); + + return head; +} + +/* + * Move all callbacks from the rcu_batch structure specified by "from" to + * the structure specified by "to". + */ +static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from) +{ + if (!rcu_batch_empty(from)) { + *to->tail = from->head; + to->tail = from->tail; + rcu_batch_init(from); + } +} + +/* single-thread state-machine */ +static void process_srcu(struct work_struct *work); + static int init_srcu_struct_fields(struct srcu_struct *sp) { sp->completed = 0; - mutex_init(&sp->mutex); + spin_lock_init(&sp->queue_lock); + sp->running = false; + rcu_batch_init(&sp->batch_queue); + rcu_batch_init(&sp->batch_check0); + rcu_batch_init(&sp->batch_check1); + rcu_batch_init(&sp->batch_done); + INIT_DELAYED_WORK(&sp->work, process_srcu); sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array); return sp->per_cpu_ref ? 0 : -ENOMEM; } @@ -266,43 +333,86 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 5 +#define SRCU_RETRY_CHECK_DELAY 5 #define SYNCHRONIZE_SRCU_TRYCOUNT 2 #define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12 /* - * Wait until all pre-existing readers complete. Such readers + * @@@ Wait until all pre-existing readers complete. Such readers * will have used the index specified by "idx". + * the caller should ensures the ->completed is not changed while checking + * and idx = (->completed & 1) ^ 1 */ -static void wait_idx(struct srcu_struct *sp, int idx, int trycount) +static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount) { - /* - * SRCU read-side critical sections are normally short, so wait - * a small amount of time before possibly blocking. - */ - if (!srcu_readers_active_idx_check(sp, idx)) { - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - while (!srcu_readers_active_idx_check(sp, idx)) { - if (trycount > 0) { - trycount--; - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - } else - schedule_timeout_interruptible(1); - } + for (;;) { + if (srcu_readers_active_idx_check(sp, idx)) + return true; + if (--trycount <= 0) + return false; + udelay(SRCU_RETRY_CHECK_DELAY); } } +/* + * Increment the ->completed counter so that future SRCU readers will + * use the other rank of the ->c[] and ->seq[] arrays. This allows + * us to wait for pre-existing readers in a starvation-free manner. + */ static void srcu_flip(struct srcu_struct *sp) { sp->completed++; } +/* + * Enqueue an SRCU callback on the specified srcu_struct structure, + * initiating grace-period processing if it is not already running. + */ +void call_srcu(struct srcu_struct *sp, struct rcu_head *head, + void (*func)(struct rcu_head *head)) +{ + unsigned long flags; + + head->next = NULL; + head->func = func; + spin_lock_irqsave(&sp->queue_lock, flags); + rcu_batch_queue(&sp->batch_queue, head); + if (!sp->running) { + sp->running = true; + queue_delayed_work(system_nrt_wq, &sp->work, 0); + } + spin_unlock_irqrestore(&sp->queue_lock, flags); +} +EXPORT_SYMBOL_GPL(call_srcu); + +struct rcu_synchronize { + struct rcu_head head; + struct completion completion; +}; + +/* + * Awaken the corresponding synchronize_srcu() instance now that a + * grace period has elapsed. + */ +static void wakeme_after_rcu(struct rcu_head *head) +{ + struct rcu_synchronize *rcu; + + rcu = container_of(head, struct rcu_synchronize, head); + complete(&rcu->completion); +} + +static void srcu_advance_batches(struct srcu_struct *sp, int trycount); +static void srcu_reschedule(struct srcu_struct *sp); + /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) { - int busy_idx; + struct rcu_synchronize rcu; + struct rcu_head *head = &rcu.head; + bool done = false; rcu_lockdep_assert(!lock_is_held(&sp->dep_map) && !lock_is_held(&rcu_bh_lock_map) && @@ -310,50 +420,32 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - mutex_lock(&sp->mutex); - busy_idx = sp->completed & 0X1UL; - - /* - * If we recently flipped the index, there will be some readers - * using idx=0 and others using idx=1. Therefore, two calls to - * wait_idx()s suffice to ensure that all pre-existing readers - * have completed: - * - * __synchronize_srcu() { - * wait_idx(sp, 0, trycount); - * wait_idx(sp, 1, trycount); - * } - * - * Starvation is prevented by the fact that we flip the index. - * While we wait on one index to clear out, almost all new readers - * will be using the other index. The number of new readers using the - * index we are waiting on is sharply bounded by roughly the number - * of CPUs. - * - * How can new readers possibly using the old pre-flip value of - * the index? Consider the following sequence of events: - * - * Suppose that during the previous grace period, a reader - * picked up the old value of the index, but did not increment - * its counter until after the previous instance of - * __synchronize_srcu() did the counter summation and recheck. - * That previous grace period was OK because the reader did - * not start until after the grace period started, so the grace - * period was not obligated to wait for that reader. - * - * However, this sequence of events is quite improbable, so - * this call to wait_idx(), which waits on really old readers - * describe in this comment above, will almost never need to wait. - */ - wait_idx(sp, 1 - busy_idx, trycount); - - /* Flip the index to avoid reader-induced starvation. */ - srcu_flip(sp); - - /* Wait for recent pre-existing readers. */ - wait_idx(sp, busy_idx, trycount); + init_completion(&rcu.completion); + + head->next = NULL; + head->func = wakeme_after_rcu; + spin_lock_irq(&sp->queue_lock); + if (!sp->running) { + /* steal the processing owner */ + sp->running = true; + rcu_batch_queue(&sp->batch_check0, head); + spin_unlock_irq(&sp->queue_lock); + + srcu_advance_batches(sp, trycount); + if (!rcu_batch_empty(&sp->batch_done)) { + BUG_ON(sp->batch_done.head != head); + rcu_batch_dequeue(&sp->batch_done); + done = true; + } + /* give the processing owner to work_struct */ + srcu_reschedule(sp); + } else { + rcu_batch_queue(&sp->batch_queue, head); + spin_unlock_irq(&sp->queue_lock); + } - mutex_unlock(&sp->mutex); + if (!done) + wait_for_completion(&rcu.completion); } /** @@ -397,6 +489,15 @@ void synchronize_srcu_expedited(struct srcu_struct *sp) } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); +/** + * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete. + */ +void srcu_barrier(struct srcu_struct *sp) +{ + synchronize_srcu(sp); +} +EXPORT_SYMBOL_GPL(srcu_barrier); + /** * srcu_batches_completed - return batches completed. * @sp: srcu_struct on which to report batch completion. @@ -404,9 +505,146 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); * Report the number of batches, correlated with, but not necessarily * precisely the same as, the number of grace periods that have elapsed. */ - long srcu_batches_completed(struct srcu_struct *sp) { return sp->completed; } EXPORT_SYMBOL_GPL(srcu_batches_completed); + +#define SRCU_CALLBACK_BATCH 10 +#define SRCU_INTERVAL 1 + +/* + * Move any new SRCU callbacks to the first stage of the SRCU grace + * period pipeline. + */ +static void srcu_collect_new(struct srcu_struct *sp) +{ + if (!rcu_batch_empty(&sp->batch_queue)) { + spin_lock_irq(&sp->queue_lock); + rcu_batch_move(&sp->batch_check0, &sp->batch_queue); + spin_unlock_irq(&sp->queue_lock); + } +} + +/* + * Core SRCU state machine. Advance callbacks from ->batch_check0 to + * ->batch_check1 and then to ->batch_done as readers drain. + */ +static void srcu_advance_batches(struct srcu_struct *sp, int trycount) +{ + int idx = 1 ^ (sp->completed & 1); + + /* + * Because readers might be delayed for an extended period after + * fetching ->completed for their index, at any point in time there + * might well be readers using both idx=0 and idx=1. We therefore + * need to wait for readers to clear from both index values before + * invoking a callback. + */ + + if (rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_check1)) + return; /* no callbacks need to be advanced */ + + if (!try_check_zero(sp, idx, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ + + /* + * The callbacks in ->batch_check1 have already done with their + * first zero check and flip back when they were enqueued on + * ->batch_check0 in a previous invocation of srcu_advance_batches(). + * (Presumably try_check_zero() returned false during that + * invocation, leaving the callbacks stranded on ->batch_check1.) + * They are therefore ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); + + if (rcu_batch_empty(&sp->batch_check0)) + return; /* no callbacks need to be advanced */ + srcu_flip(sp); + + /* + * The callbacks in ->batch_check0 just finished their + * first check zero and flip, so move them to ->batch_check1 + * for future checking on the other idx. + */ + rcu_batch_move(&sp->batch_check1, &sp->batch_check0); + + /* + * SRCU read-side critical sections are normally short, so check + * at least twice in quick succession after a flip. + */ + trycount = trycount < 2 ? 2 : trycount; + if (!try_check_zero(sp, idx^1, trycount)) + return; /* failed to advance, will try after SRCU_INTERVAL */ + + /* + * The callbacks in ->batch_check1 have now waited for all + * pre-existing readers using both idx values. They are therefore + * ready to invoke, so move them to ->batch_done. + */ + rcu_batch_move(&sp->batch_done, &sp->batch_check1); +} + +/* + * Invoke a limited number of SRCU callbacks that have passed through + * their grace period. If there are more to do, SRCU will reschedule + * the workqueue. + */ +static void srcu_invoke_callbacks(struct srcu_struct *sp) +{ + int i; + struct rcu_head *head; + + for (i = 0; i < SRCU_CALLBACK_BATCH; i++) { + head = rcu_batch_dequeue(&sp->batch_done); + if (!head) + break; + local_bh_disable(); + head->func(head); + local_bh_enable(); + } +} + +/* + * Finished one round of SRCU grace period. Start another if there are + * more SRCU callbacks queued, otherwise put SRCU into not-running state. + */ +static void srcu_reschedule(struct srcu_struct *sp) +{ + bool pending = true; + + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { + spin_lock_irq(&sp->queue_lock); + if (rcu_batch_empty(&sp->batch_done) && + rcu_batch_empty(&sp->batch_check1) && + rcu_batch_empty(&sp->batch_check0) && + rcu_batch_empty(&sp->batch_queue)) { + sp->running = false; + pending = false; + } + spin_unlock_irq(&sp->queue_lock); + } + + if (pending) + queue_delayed_work(system_nrt_wq, &sp->work, SRCU_INTERVAL); +} + +/* + * This is the work-queue function that handles SRCU grace periods. + */ +static void process_srcu(struct work_struct *work) +{ + struct srcu_struct *sp; + + sp = container_of(work, struct srcu_struct, work.work); + + srcu_collect_new(sp); + srcu_advance_batches(sp, 1); + srcu_invoke_callbacks(sp); + srcu_reschedule(sp); +} -- cgit v1.2.3-70-g09d2