diff options
-rw-r--r-- | drivers/block/drbd/drbd_int.h | 3 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 2 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.c | 518 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.h | 9 |
4 files changed, 277 insertions, 255 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index 52ad1bfce85..8b26a2c954d 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h @@ -575,13 +575,14 @@ struct drbd_request { struct list_head tl_requests; /* ring list in the transfer log */ struct bio *master_bio; /* master bio pointer */ - unsigned long rq_state; /* see comments above _req_mod() */ unsigned long start_time; /* once it hits 0, we may complete the master_bio */ atomic_t completion_ref; /* once it hits 0, we may destroy this drbd_request object */ struct kref kref; + + unsigned rq_state; /* see comments above _req_mod() */ }; struct drbd_epoch { diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index bedfeeccd51..d07cb31a36e 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -210,7 +210,7 @@ void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr, /* find latest not yet barrier-acked write request, * count writes in its epoch. */ list_for_each_entry(r, &tconn->transfer_log, tl_requests) { - const unsigned long s = r->rq_state; + const unsigned s = r->rq_state; if (!req) { if (!(s & RQ_WRITE)) continue; diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c index ae894af428c..329528d9dec 100644 --- a/drivers/block/drbd/drbd_req.c +++ b/drivers/block/drbd/drbd_req.c @@ -85,7 +85,9 @@ static struct drbd_request *drbd_req_new(struct drbd_conf *mdev, INIT_LIST_HEAD(&req->tl_requests); INIT_LIST_HEAD(&req->w.list); + /* one reference to be put by __drbd_make_request */ atomic_set(&req->completion_ref, 1); + /* one kref as long as completion_ref > 0 */ kref_init(&req->kref); return req; } @@ -94,7 +96,16 @@ static void drbd_req_destroy(struct kref *kref) { struct drbd_request *req = container_of(kref, struct drbd_request, kref); struct drbd_conf *mdev = req->w.mdev; - const unsigned long s = req->rq_state; + const unsigned s = req->rq_state; + + if ((req->master_bio && !(s & RQ_POSTPONED)) || + atomic_read(&req->completion_ref) || + (s & RQ_LOCAL_PENDING) || + ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) { + dev_err(DEV, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n", + s, atomic_read(&req->completion_ref)); + return; + } /* remove it from the transfer log. * well, only if it had been there in the first @@ -180,44 +191,6 @@ static void drbd_remove_request_interval(struct rb_root *root, wake_up(&mdev->misc_wait); } -static void maybe_wakeup_conflicting_requests(struct drbd_request *req) -{ - const unsigned long s = req->rq_state; - if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) - return; - if (req->i.waiting) - /* Retry all conflicting peer requests. */ - wake_up(&req->w.mdev->misc_wait); -} - -static -void req_may_be_done(struct drbd_request *req) -{ - const unsigned long s = req->rq_state; - - /* req->master_bio still present means: Not yet completed. - * - * Unless this is RQ_POSTPONED, which will cause drbd_req_destroy() to - * queue it on the retry workqueue instead of destroying it. - */ - if (req->master_bio && !(s & RQ_POSTPONED)) - return; - - /* Local still pending, even though master_bio is already completed? - * may happen for RQ_LOCAL_ABORTED requests. */ - if (s & RQ_LOCAL_PENDING) - return; - - if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) { - /* this is disconnected (local only) operation, - * or protocol A, B, or C P_BARRIER_ACK, - * or killed from the transfer log due to connection loss. */ - kref_put(&req->kref, drbd_req_destroy); - } - /* else: network part and not DONE yet. that is - * protocol A, B, or C, barrier ack still pending... */ -} - /* Helper for __req_mod(). * Set m->bio to the master bio, if it is fit to be completed, * or leave it alone (it is initialized to NULL in __req_mod), @@ -225,10 +198,12 @@ void req_may_be_done(struct drbd_request *req) * If m->bio is set, the error status to be returned is placed in m->error. */ static -void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m) +void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m) { - const unsigned long s = req->rq_state; + const unsigned s = req->rq_state; struct drbd_conf *mdev = req->w.mdev; + int rw; + int error, ok; /* we must not complete the master bio, while it is * still being processed by _drbd_send_zc_bio (drbd_send_dblock) @@ -239,116 +214,208 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m) * the receiver, * the bio_endio completion callbacks. */ - if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) - return; - if (s & RQ_NET_QUEUED) + if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) || + (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) || + (s & RQ_COMPLETION_SUSP)) { + dev_err(DEV, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s); return; - if (s & RQ_NET_PENDING) + } + + if (!req->master_bio) { + dev_err(DEV, "drbd_req_complete: Logic BUG, master_bio == NULL!\n"); return; + } - /* FIXME - * instead of all the RQ_FLAGS, actually use the completion_ref - * to decide if this is ready to be completed. */ - if (req->master_bio) { - int complete = atomic_dec_and_test(&req->completion_ref); - D_ASSERT(complete != 0); - } else - D_ASSERT(atomic_read(&req->completion_ref) == 0); + rw = bio_rw(req->master_bio); - if (req->master_bio) { - int rw = bio_rw(req->master_bio); + /* + * figure out whether to report success or failure. + * + * report success when at least one of the operations succeeded. + * or, to put the other way, + * only report failure, when both operations failed. + * + * what to do about the failures is handled elsewhere. + * what we need to do here is just: complete the master_bio. + * + * local completion error, if any, has been stored as ERR_PTR + * in private_bio within drbd_request_endio. + */ + ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK); + error = PTR_ERR(req->private_bio); - /* this is DATA_RECEIVED (remote read) - * or protocol C P_WRITE_ACK - * or protocol B P_RECV_ACK - * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck) - * or canceled or failed, - * or killed from the transfer log due to connection loss. - */ + /* remove the request from the conflict detection + * respective block_id verification hash */ + if (!drbd_interval_empty(&req->i)) { + struct rb_root *root; - /* - * figure out whether to report success or failure. - * - * report success when at least one of the operations succeeded. - * or, to put the other way, - * only report failure, when both operations failed. - * - * what to do about the failures is handled elsewhere. - * what we need to do here is just: complete the master_bio. - * - * local completion error, if any, has been stored as ERR_PTR - * in private_bio within drbd_request_endio. - */ - int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK); - int error = PTR_ERR(req->private_bio); - - /* remove the request from the conflict detection - * respective block_id verification hash */ - if (!drbd_interval_empty(&req->i)) { - struct rb_root *root; - - if (rw == WRITE) - root = &mdev->write_requests; - else - root = &mdev->read_requests; - drbd_remove_request_interval(root, req); - } else if (!(s & RQ_POSTPONED)) - D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0); - - /* Before we can signal completion to the upper layers, - * we may need to close the current transfer log epoch. - * We are within the request lock, so we can simply compare - * the request epoch number with the current transfer log - * epoch number. If they match, increase the current_tle_nr, - * and reset the transfer log epoch write_cnt. - */ - if (rw == WRITE && - req->epoch == atomic_read(&mdev->tconn->current_tle_nr)) - start_new_tl_epoch(mdev->tconn); + if (rw == WRITE) + root = &mdev->write_requests; + else + root = &mdev->read_requests; + drbd_remove_request_interval(root, req); + } else if (!(s & RQ_POSTPONED)) + D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0); + + /* Before we can signal completion to the upper layers, + * we may need to close the current transfer log epoch. + * We are within the request lock, so we can simply compare + * the request epoch number with the current transfer log + * epoch number. If they match, increase the current_tle_nr, + * and reset the transfer log epoch write_cnt. + */ + if (rw == WRITE && + req->epoch == atomic_read(&mdev->tconn->current_tle_nr)) + start_new_tl_epoch(mdev->tconn); - /* Update disk stats */ - _drbd_end_io_acct(mdev, req); + /* Update disk stats */ + _drbd_end_io_acct(mdev, req); - /* If READ failed, - * have it be pushed back to the retry work queue, - * so it will re-enter __drbd_make_request(), - * and be re-assigned to a suitable local or remote path, - * or failed if we do not have access to good data anymore. - * - * Unless it was failed early by __drbd_make_request(), - * because no path was available, in which case - * it was not even added to the transfer_log. - * - * READA may fail, and will not be retried. - * - * WRITE should have used all available paths already. - */ - if (!ok && rw == READ && !list_empty(&req->tl_requests)) - req->rq_state |= RQ_POSTPONED; - - if (!(req->rq_state & RQ_POSTPONED)) { - m->error = ok ? 0 : (error ?: -EIO); - m->bio = req->master_bio; - req->master_bio = NULL; - } else { - /* Assert that this will be _req_is_done() - * with this very invokation. */ - /* FIXME: - * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)? - */ - D_ASSERT(!(s & RQ_LOCAL_PENDING)); - D_ASSERT((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)); - } + /* If READ failed, + * have it be pushed back to the retry work queue, + * so it will re-enter __drbd_make_request(), + * and be re-assigned to a suitable local or remote path, + * or failed if we do not have access to good data anymore. + * + * Unless it was failed early by __drbd_make_request(), + * because no path was available, in which case + * it was not even added to the transfer_log. + * + * READA may fail, and will not be retried. + * + * WRITE should have used all available paths already. + */ + if (!ok && rw == READ && !list_empty(&req->tl_requests)) + req->rq_state |= RQ_POSTPONED; + + if (!(req->rq_state & RQ_POSTPONED)) { + m->error = ok ? 0 : (error ?: -EIO); + m->bio = req->master_bio; + req->master_bio = NULL; + } else { + /* Assert that this will be drbd_req_destroy()ed + * with this very invokation. */ + D_ASSERT(atomic_read(&req->kref.refcount) == 1); } - req_may_be_done(req); } -static void req_may_be_completed_not_susp(struct drbd_request *req, struct bio_and_error *m) +static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put) { struct drbd_conf *mdev = req->w.mdev; + D_ASSERT(m || (req->rq_state & RQ_POSTPONED)); + + if (!atomic_sub_and_test(put, &req->completion_ref)) + return 0; + + if (drbd_suspended(mdev)) { + /* We do not allow completion while suspended. Re-get a + * reference, so whatever happens when this is resumed + * may put and complete. */ + + D_ASSERT(!(req->rq_state & RQ_COMPLETION_SUSP)); + req->rq_state |= RQ_COMPLETION_SUSP; + atomic_inc(&req->completion_ref); + return 0; + } + + /* else */ + drbd_req_complete(req, m); + return 1; +} + +/* I'd like this to be the only place that manipulates + * req->completion_ref and req->kref. */ +static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m, + int clear, int set) +{ + struct drbd_conf *mdev = req->w.mdev; + unsigned s = req->rq_state; + int c_put = 0; + int k_put = 0; + + /* apply */ + + req->rq_state &= ~clear; + req->rq_state |= set; + + /* no change? */ + if (req->rq_state == s) + return; + + /* intent: get references */ - if (!drbd_suspended(mdev)) - req_may_be_completed(req, m); + if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING)) + atomic_inc(&req->completion_ref); + + if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) { + inc_ap_pending(mdev); + atomic_inc(&req->completion_ref); + } + + if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) + atomic_inc(&req->completion_ref); + + if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK)) + kref_get(&req->kref); /* wait for the DONE */ + + if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) + atomic_add(req->i.size >> 9, &mdev->ap_in_flight); + + /* progress: put references */ + + if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP)) + ++c_put; + + if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) { + D_ASSERT(req->rq_state & RQ_LOCAL_PENDING); + /* local completion may still come in later, + * we need to keep the req object around. */ + kref_get(&req->kref); + ++c_put; + } + + if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) { + if (req->rq_state & RQ_LOCAL_ABORTED) + ++k_put; + else + ++c_put; + } + + if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) { + dec_ap_pending(mdev); + ++c_put; + } + + if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) + ++c_put; + + if ((s & RQ_EXP_BARR_ACK) && !(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) { + if (req->rq_state & RQ_NET_SENT) + atomic_sub(req->i.size >> 9, &mdev->ap_in_flight); + ++k_put; + } + + /* potentially complete and destroy */ + + if (k_put || c_put) { + /* Completion does it's own kref_put. If we are going to + * kref_sub below, we need req to be still around then. */ + int at_least = k_put + !!c_put; + int refcount = atomic_read(&req->kref.refcount); + if (refcount < at_least) + dev_err(DEV, + "mod_rq_state: Logic BUG: %x -> %x: refcount = %d, should be >= %d\n", + s, req->rq_state, refcount, at_least); + } + + /* If we made progress, retry conflicting peer requests, if any. */ + if (req->i.waiting) + wake_up(&mdev->misc_wait); + + if (c_put) + k_put += drbd_req_put_completion_ref(req, m, c_put); + if (k_put) + kref_sub(&req->kref, k_put, drbd_req_destroy); } /* obviously this could be coded as many single functions @@ -388,7 +455,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, /* reached via __drbd_make_request * and from w_read_retry_remote */ D_ASSERT(!(req->rq_state & RQ_NET_MASK)); - req->rq_state |= RQ_NET_PENDING; rcu_read_lock(); nc = rcu_dereference(mdev->tconn->net_conf); p = nc->wire_protocol; @@ -396,13 +462,13 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, req->rq_state |= p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK : p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0; - inc_ap_pending(mdev); + mod_rq_state(req, m, 0, RQ_NET_PENDING); break; case TO_BE_SUBMITTED: /* locally */ /* reached via __drbd_make_request */ D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK)); - req->rq_state |= RQ_LOCAL_PENDING; + mod_rq_state(req, m, 0, RQ_LOCAL_PENDING); break; case COMPLETED_OK: @@ -411,44 +477,23 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, else mdev->read_cnt += req->i.size >> 9; - req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK); - req->rq_state &= ~RQ_LOCAL_PENDING; - - maybe_wakeup_conflicting_requests(req); - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, RQ_LOCAL_PENDING, + RQ_LOCAL_COMPLETED|RQ_LOCAL_OK); break; case ABORT_DISK_IO: - req->rq_state |= RQ_LOCAL_ABORTED; - req_may_be_completed_not_susp(req, m); - break; - - case WRITE_COMPLETED_WITH_ERROR: - req->rq_state |= RQ_LOCAL_COMPLETED; - req->rq_state &= ~RQ_LOCAL_PENDING; - - __drbd_chk_io_error(mdev, false); - maybe_wakeup_conflicting_requests(req); - req_may_be_completed_not_susp(req, m); - break; - - case READ_AHEAD_COMPLETED_WITH_ERROR: - /* it is legal to fail READA */ - req->rq_state |= RQ_LOCAL_COMPLETED; - req->rq_state &= ~RQ_LOCAL_PENDING; - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED); break; case READ_COMPLETED_WITH_ERROR: drbd_set_out_of_sync(mdev, req->i.sector, req->i.size); - - req->rq_state |= RQ_LOCAL_COMPLETED; - req->rq_state &= ~RQ_LOCAL_PENDING; - - D_ASSERT(!(req->rq_state & RQ_NET_MASK)); - + /* fall through. */ + case WRITE_COMPLETED_WITH_ERROR: __drbd_chk_io_error(mdev, false); - req_may_be_completed_not_susp(req, m); + /* fall through. */ + case READ_AHEAD_COMPLETED_WITH_ERROR: + /* it is legal to fail READA, no __drbd_chk_io_error in that case. */ + mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED); break; case QUEUE_FOR_NET_READ: @@ -461,7 +506,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, /* So we can verify the handle in the answer packet. * Corresponding drbd_remove_request_interval is in - * req_may_be_completed() */ + * drbd_req_complete() */ D_ASSERT(drbd_interval_empty(&req->i)); drbd_insert_interval(&mdev->read_requests, &req->i); @@ -469,7 +514,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, D_ASSERT(req->rq_state & RQ_NET_PENDING); D_ASSERT((req->rq_state & RQ_LOCAL_MASK) == 0); - req->rq_state |= RQ_NET_QUEUED; + mod_rq_state(req, m, 0, RQ_NET_QUEUED); req->w.cb = w_send_read_req; drbd_queue_work(&mdev->tconn->sender_work, &req->w); break; @@ -479,7 +524,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, /* from __drbd_make_request only */ /* Corresponding drbd_remove_request_interval is in - * req_may_be_completed() */ + * drbd_req_complete() */ D_ASSERT(drbd_interval_empty(&req->i)); drbd_insert_interval(&mdev->write_requests, &req->i); @@ -504,7 +549,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, /* queue work item to send data */ D_ASSERT(req->rq_state & RQ_NET_PENDING); - req->rq_state |= RQ_NET_QUEUED; + mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK); req->w.cb = w_send_dblock; drbd_queue_work(&mdev->tconn->sender_work, &req->w); @@ -519,7 +564,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, break; case QUEUE_FOR_SEND_OOS: - req->rq_state |= RQ_NET_QUEUED; + mod_rq_state(req, m, 0, RQ_NET_QUEUED); req->w.cb = w_send_out_of_sync; drbd_queue_work(&mdev->tconn->sender_work, &req->w); break; @@ -529,64 +574,43 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, case SEND_FAILED: /* real cleanup will be done from tl_clear. just update flags * so it is no longer marked as on the worker queue */ - req->rq_state &= ~RQ_NET_QUEUED; - /* if we did it right, tl_clear should be scheduled only after - * this, so this should not be necessary! */ - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, RQ_NET_QUEUED, 0); break; case HANDED_OVER_TO_NETWORK: /* assert something? */ - if (bio_data_dir(req->master_bio) == WRITE) - atomic_add(req->i.size >> 9, &mdev->ap_in_flight); - if (bio_data_dir(req->master_bio) == WRITE && !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) { /* this is what is dangerous about protocol A: * pretend it was successfully written on the peer. */ - if (req->rq_state & RQ_NET_PENDING) { - dec_ap_pending(mdev); - req->rq_state &= ~RQ_NET_PENDING; - req->rq_state |= RQ_NET_OK; - } /* else: neg-ack was faster... */ + if (req->rq_state & RQ_NET_PENDING) + mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK); + /* else: neg-ack was faster... */ /* it is still not yet RQ_NET_DONE until the * corresponding epoch barrier got acked as well, * so we know what to dirty on connection loss */ } - req->rq_state &= ~RQ_NET_QUEUED; - req->rq_state |= RQ_NET_SENT; - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT); break; case OOS_HANDED_TO_NETWORK: /* Was not set PENDING, no longer QUEUED, so is now DONE * as far as this connection is concerned. */ - req->rq_state &= ~RQ_NET_QUEUED; - req->rq_state |= RQ_NET_DONE; - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE); break; case CONNECTION_LOST_WHILE_PENDING: /* transfer log cleanup after connection loss */ - /* assert something? */ - if (req->rq_state & RQ_NET_PENDING) - dec_ap_pending(mdev); - - p = !(req->rq_state & RQ_WRITE) && req->rq_state & RQ_NET_PENDING; - - req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING); - req->rq_state |= RQ_NET_DONE; - if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE) - atomic_sub(req->i.size >> 9, &mdev->ap_in_flight); - - req_may_be_completed(req, m); /* Allowed while state.susp */ + mod_rq_state(req, m, + RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP, + RQ_NET_DONE); break; case DISCARD_WRITE: /* for discarded conflicting writes of multiple primaries, * there is no need to keep anything in the tl, potential * node crashes are covered by the activity log. */ - req->rq_state |= RQ_NET_DONE; + mod_rq_state(req, NULL, 0, RQ_NET_DONE); /* fall through */ case WRITE_ACKED_BY_PEER_AND_SIS: case WRITE_ACKED_BY_PEER: @@ -605,13 +629,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, * see also notes above in HANDED_OVER_TO_NETWORK about * protocol != C */ ack_common: - req->rq_state |= RQ_NET_OK; D_ASSERT(req->rq_state & RQ_NET_PENDING); - dec_ap_pending(mdev); - atomic_sub(req->i.size >> 9, &mdev->ap_in_flight); - req->rq_state &= ~RQ_NET_PENDING; - maybe_wakeup_conflicting_requests(req); - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK); break; case POSTPONE_WRITE: @@ -622,64 +641,61 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, */ D_ASSERT(req->rq_state & RQ_NET_PENDING); req->rq_state |= RQ_POSTPONED; - maybe_wakeup_conflicting_requests(req); - req_may_be_completed_not_susp(req, m); + if (req->i.waiting) + wake_up(&mdev->misc_wait); + /* Do not clear RQ_NET_PENDING. This request will make further + * progress via restart_conflicting_writes() or + * fail_postponed_requests(). Hopefully. */ break; case NEG_ACKED: - /* assert something? */ - if (req->rq_state & RQ_NET_PENDING) { - dec_ap_pending(mdev); - if (req->rq_state & RQ_WRITE) - atomic_sub(req->i.size >> 9, &mdev->ap_in_flight); - } - req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING); - - req->rq_state |= RQ_NET_DONE; - - maybe_wakeup_conflicting_requests(req); - req_may_be_completed_not_susp(req, m); - /* else: done by HANDED_OVER_TO_NETWORK */ + mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, RQ_NET_DONE); break; case FAIL_FROZEN_DISK_IO: if (!(req->rq_state & RQ_LOCAL_COMPLETED)) break; - - req_may_be_completed(req, m); /* Allowed while state.susp */ + mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0); break; case RESTART_FROZEN_DISK_IO: if (!(req->rq_state & RQ_LOCAL_COMPLETED)) break; - req->rq_state &= ~RQ_LOCAL_COMPLETED; + mod_rq_state(req, m, + RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED, + RQ_LOCAL_PENDING); rv = MR_READ; if (bio_data_dir(req->master_bio) == WRITE) rv = MR_WRITE; - get_ldev(mdev); + get_ldev(mdev); /* always succeeds in this call path */ req->w.cb = w_restart_disk_io; drbd_queue_work(&mdev->tconn->sender_work, &req->w); break; case RESEND: /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK - before the connection loss (B&C only); only P_BARRIER_ACK was missing. + before the connection loss (B&C only); only P_BARRIER_ACK + (or the local completion?) was missing when we suspended. Throwing them out of the TL here by pretending we got a BARRIER_ACK. During connection handshake, we ensure that the peer was not rebooted. */ if (!(req->rq_state & RQ_NET_OK)) { + /* FIXME could this possibly be a req->w.cb == w_send_out_of_sync? + * in that case we must not set RQ_NET_PENDING. */ + + mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING); if (req->w.cb) { - /* w.cb expected to be w_send_dblock, or w_send_read_req */ drbd_queue_work(&mdev->tconn->sender_work, &req->w); rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; - } + } /* else: FIXME can this happen? */ break; } /* else, fall through to BARRIER_ACKED */ case BARRIER_ACKED: + /* barrier ack for READ requests does not make sense */ if (!(req->rq_state & RQ_WRITE)) break; @@ -689,20 +705,17 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, * we won't be able to clean them up... */ dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n"); } - if ((req->rq_state & RQ_NET_MASK) != 0) { - req->rq_state |= RQ_NET_DONE; - if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) - atomic_sub(req->i.size>>9, &mdev->ap_in_flight); - } - req_may_be_done(req); /* Allowed while state.susp */ + /* Allowed to complete requests, even while suspended. + * As this is called for all requests within a matching epoch, + * we need to filter, and only set RQ_NET_DONE for those that + * have actually been on the wire. */ + mod_rq_state(req, m, RQ_COMPLETION_SUSP, + (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0); break; case DATA_RECEIVED: D_ASSERT(req->rq_state & RQ_NET_PENDING); - dec_ap_pending(mdev); - req->rq_state &= ~RQ_NET_PENDING; - req->rq_state |= (RQ_NET_OK|RQ_NET_DONE); - req_may_be_completed_not_susp(req, m); + mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE); break; }; @@ -867,6 +880,9 @@ static bool do_remote_read(struct drbd_request *req) if (mdev->state.pdsk != D_UP_TO_DATE) return false; + if (req->private_bio == NULL) + return true; + /* TODO: improve read balancing decisions, take into account drbd * protocol, pending requests etc. */ @@ -877,9 +893,6 @@ static bool do_remote_read(struct drbd_request *req) if (rbm == RB_PREFER_LOCAL && req->private_bio) return false; /* submit locally */ - if (req->private_bio == NULL) - return true; - if (remote_due_to_read_balancing(mdev, req->i.sector, rbm)) { if (req->private_bio) { bio_put(req->private_bio); @@ -1010,7 +1023,7 @@ void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long /* We fail READ/READA early, if we can not serve it. * We must do this before req is registered on any lists. - * Otherwise, req_may_be_completed() will queue failed READ for retry. */ + * Otherwise, drbd_req_complete() will queue failed READ for retry. */ if (rw != WRITE) { if (!do_remote_read(req) && !req->private_bio) goto nodata; @@ -1042,19 +1055,18 @@ void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long /* but we need to give up the spinlock to submit */ spin_unlock_irq(&mdev->tconn->req_lock); drbd_submit_req_private_bio(req); - /* once we have submitted, we must no longer look at req, - * it may already be destroyed. */ - return; + spin_lock_irq(&mdev->tconn->req_lock); } else if (no_remote) { nodata: if (__ratelimit(&drbd_ratelimit_state)) dev_err(DEV, "IO ERROR: neither local nor remote disk\n"); /* A write may have been queued for send_oos, however. - * So we can not simply free it, we must go through req_may_be_completed() */ + * So we can not simply free it, we must go through drbd_req_put_completion_ref() */ } out: - req_may_be_completed(req, &m); + if (drbd_req_put_completion_ref(req, &m, 1)) + kref_put(&req->kref, drbd_req_destroy); spin_unlock_irq(&mdev->tconn->req_lock); if (m.bio) diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h index f80af27fa5e..90e5a1eea72 100644 --- a/drivers/block/drbd/drbd_req.h +++ b/drivers/block/drbd/drbd_req.h @@ -203,11 +203,18 @@ enum drbd_req_state_bits { /* The peer has sent a retry ACK */ __RQ_POSTPONED, + /* would have been completed, + * but was not, because of drbd_suspended() */ + __RQ_COMPLETION_SUSP, + /* We expect a receive ACK (wire proto B) */ __RQ_EXP_RECEIVE_ACK, /* We expect a write ACK (wite proto C) */ __RQ_EXP_WRITE_ACK, + + /* waiting for a barrier ack, did an extra kref_get */ + __RQ_EXP_BARR_ACK, }; #define RQ_LOCAL_PENDING (1UL << __RQ_LOCAL_PENDING) @@ -230,8 +237,10 @@ enum drbd_req_state_bits { #define RQ_WRITE (1UL << __RQ_WRITE) #define RQ_IN_ACT_LOG (1UL << __RQ_IN_ACT_LOG) #define RQ_POSTPONED (1UL << __RQ_POSTPONED) +#define RQ_COMPLETION_SUSP (1UL << __RQ_COMPLETION_SUSP) #define RQ_EXP_RECEIVE_ACK (1UL << __RQ_EXP_RECEIVE_ACK) #define RQ_EXP_WRITE_ACK (1UL << __RQ_EXP_WRITE_ACK) +#define RQ_EXP_BARR_ACK (1UL << __RQ_EXP_BARR_ACK) /* For waking up the frozen transfer log mod_req() has to return if the request should be counted in the epoch object*/ |