diff options
author | Lars Ellenberg <lars.ellenberg@linbit.com> | 2011-10-19 11:50:57 +0200 |
---|---|---|
committer | Philipp Reisner <philipp.reisner@linbit.com> | 2012-11-08 16:58:34 +0100 |
commit | 8c0785a5c9a0f2472aff68dc32247be01728c416 (patch) | |
tree | adb036acb283550aab1a1860bff454a86eb446d5 /drivers | |
parent | b379c41ed78e83c4443fca4dbfbc358c19e4f24c (diff) |
drbd: allow to dequeue batches of work at a time
cherry-picked and adapted from drbd 9 devel branch
In 8.4, we still use drbd_queue_work_front(),
so in normal operation, we can not dequeue batches,
but only single items.
Still, followup commits will wake the worker
without explicitly queueing a work item,
so up() is replaced by a simple wake_up().
Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers')
-rw-r--r-- | drivers/block/drbd/drbd_int.h | 8 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 2 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_worker.c | 88 |
3 files changed, 43 insertions, 55 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index d7ca76ce00c..e84c7b6a6ba 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h @@ -735,8 +735,8 @@ enum bm_flag { struct drbd_work_queue { struct list_head q; - struct semaphore s; /* producers up it, worker down()s it */ spinlock_t q_lock; /* to protect the list. */ + wait_queue_head_t q_wait; }; struct drbd_socket { @@ -1832,9 +1832,8 @@ drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w) unsigned long flags; spin_lock_irqsave(&q->q_lock, flags); list_add(&w->list, &q->q); - up(&q->s); /* within the spinlock, - see comment near end of drbd_worker() */ spin_unlock_irqrestore(&q->q_lock, flags); + wake_up(&q->q_wait); } static inline void @@ -1843,9 +1842,8 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w) unsigned long flags; spin_lock_irqsave(&q->q_lock, flags); list_add_tail(&w->list, &q->q); - up(&q->s); /* within the spinlock, - see comment near end of drbd_worker() */ spin_unlock_irqrestore(&q->q_lock, flags); + wake_up(&q->q_wait); } static inline void wake_asender(struct drbd_tconn *tconn) diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index bfe6975ef94..f379d33b10a 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -2535,9 +2535,9 @@ out: static void drbd_init_workqueue(struct drbd_work_queue* wq) { - sema_init(&wq->s, 0); spin_lock_init(&wq->q_lock); INIT_LIST_HEAD(&wq->q); + init_waitqueue_head(&wq->q_wait); } struct drbd_tconn *conn_get_by_name(const char *name) diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index d7573f4b742..fb2e6c8d45c 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c @@ -1673,6 +1673,23 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) mutex_unlock(mdev->state_mutex); } +bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) +{ + spin_lock_irq(&queue->q_lock); + list_splice_init(&queue->q, work_list); + spin_unlock_irq(&queue->q_lock); + return !list_empty(work_list); +} + +bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list) +{ + spin_lock_irq(&queue->q_lock); + if (!list_empty(&queue->q)) + list_move(queue->q.next, work_list); + spin_unlock_irq(&queue->q_lock); + return !list_empty(work_list); +} + int drbd_worker(struct drbd_thread *thi) { struct drbd_tconn *tconn = thi->tconn; @@ -1680,15 +1697,21 @@ int drbd_worker(struct drbd_thread *thi) struct drbd_conf *mdev; struct net_conf *nc; LIST_HEAD(work_list); - int vnr, intr = 0; + int vnr; int cork; while (get_t_state(thi) == RUNNING) { drbd_thread_current_set_cpu(thi); - if (down_trylock(&tconn->data.work.s)) { - mutex_lock(&tconn->data.mutex); + /* as long as we use drbd_queue_work_front(), + * we may only dequeue single work items here, not batches. */ + if (list_empty(&work_list)) + dequeue_work_item(&tconn->data.work, &work_list); + /* Still nothing to do? Poke TCP, just in case, + * then wait for new work (or signal). */ + if (list_empty(&work_list)) { + mutex_lock(&tconn->data.mutex); rcu_read_lock(); nc = rcu_dereference(tconn->net_conf); cork = nc ? nc->tcp_cork : 0; @@ -1698,15 +1721,16 @@ int drbd_worker(struct drbd_thread *thi) drbd_tcp_uncork(tconn->data.socket); mutex_unlock(&tconn->data.mutex); - intr = down_interruptible(&tconn->data.work.s); + wait_event_interruptible(tconn->data.work.q_wait, + dequeue_work_item(&tconn->data.work, &work_list)); mutex_lock(&tconn->data.mutex); - if (tconn->data.socket && cork) + if (tconn->data.socket && cork) drbd_tcp_cork(tconn->data.socket); mutex_unlock(&tconn->data.mutex); } - if (intr) { + if (signal_pending(current)) { flush_signals(current); if (get_t_state(thi) == RUNNING) { conn_warn(tconn, "Worker got an unexpected signal\n"); @@ -1717,59 +1741,25 @@ int drbd_worker(struct drbd_thread *thi) if (get_t_state(thi) != RUNNING) break; - /* With this break, we have done a down() but not consumed - the entry from the list. The cleanup code takes care of - this... */ - - w = NULL; - spin_lock_irq(&tconn->data.work.q_lock); - if (list_empty(&tconn->data.work.q)) { - /* something terribly wrong in our logic. - * we were able to down() the semaphore, - * but the list is empty... doh. - * - * what is the best thing to do now? - * try again from scratch, restarting the receiver, - * asender, whatnot? could break even more ugly, - * e.g. when we are primary, but no good local data. - * - * I'll try to get away just starting over this loop. - */ - conn_warn(tconn, "Work list unexpectedly empty\n"); - spin_unlock_irq(&tconn->data.work.q_lock); - continue; - } - w = list_entry(tconn->data.work.q.next, struct drbd_work, list); - list_del_init(&w->list); - spin_unlock_irq(&tconn->data.work.q_lock); - if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) { - /* dev_warn(DEV, "worker: a callback failed! \n"); */ + while (!list_empty(&work_list)) { + w = list_first_entry(&work_list, struct drbd_work, list); + list_del_init(&w->list); + if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0) + continue; if (tconn->cstate >= C_WF_REPORT_PARAMS) conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD); } } - spin_lock_irq(&tconn->data.work.q_lock); - while (!list_empty(&tconn->data.work.q)) { - list_splice_init(&tconn->data.work.q, &work_list); - spin_unlock_irq(&tconn->data.work.q_lock); - + do { while (!list_empty(&work_list)) { - w = list_entry(work_list.next, struct drbd_work, list); + w = list_first_entry(&work_list, struct drbd_work, list); list_del_init(&w->list); w->cb(w, 1); } - - spin_lock_irq(&tconn->data.work.q_lock); - } - sema_init(&tconn->data.work.s, 0); - /* DANGEROUS race: if someone did queue his work within the spinlock, - * but up() ed outside the spinlock, we could get an up() on the - * semaphore without corresponding list entry. - * So don't do that. - */ - spin_unlock_irq(&tconn->data.work.q_lock); + dequeue_work_batch(&tconn->data.work, &work_list); + } while (!list_empty(&work_list)); rcu_read_lock(); idr_for_each_entry(&tconn->volumes, mdev, vnr) { |