diff options
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 1169 |
1 files changed, 573 insertions, 596 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index 081522d3c74..24487d4fb20 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -36,7 +36,6 @@ #include <linux/memcontrol.h> #include <linux/mm_inline.h> #include <linux/slab.h> -#include <linux/smp_lock.h> #include <linux/pkt_sched.h> #define __KERNEL_SYSCALLS__ #include <linux/unistd.h> @@ -49,11 +48,6 @@ #include "drbd_vli.h" -struct flush_work { - struct drbd_work w; - struct drbd_epoch *epoch; -}; - enum finish_epoch { FE_STILL_LIVE, FE_DESTROYED, @@ -66,16 +60,6 @@ static int drbd_do_auth(struct drbd_conf *mdev); static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event); static int e_end_block(struct drbd_conf *, struct drbd_work *, int); -static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch) -{ - struct drbd_epoch *prev; - spin_lock(&mdev->epoch_lock); - prev = list_entry(epoch->list.prev, struct drbd_epoch, list); - if (prev == epoch || prev == mdev->current_epoch) - prev = NULL; - spin_unlock(&mdev->epoch_lock); - return prev; -} #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) @@ -241,7 +225,7 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) spin_unlock_irq(&mdev->req_lock); list_for_each_entry_safe(e, t, &reclaimed, w.list) - drbd_free_ee(mdev, e); + drbd_free_net_ee(mdev, e); } /** @@ -298,9 +282,11 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool * Is also used from inside an other spin_lock_irq(&mdev->req_lock); * Either links the page chain back to the global pool, * or returns all pages to the system. */ -static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) +static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net) { + atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use; int i; + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) i = page_chain_free(page); else { @@ -311,10 +297,10 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) drbd_pp_vacant += i; spin_unlock(&drbd_pp_lock); } - atomic_sub(i, &mdev->pp_in_use); - i = atomic_read(&mdev->pp_in_use); + i = atomic_sub_return(i, a); if (i < 0) - dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); + dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n", + is_net ? "pp_in_use_by_net" : "pp_in_use", i); wake_up(&drbd_pp_wait); } @@ -365,7 +351,6 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, e->size = data_size; e->flags = 0; e->sector = sector; - e->sector = sector; e->block_id = id; return e; @@ -375,9 +360,11 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, return NULL; } -void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) +void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net) { - drbd_pp_free(mdev, e->pages); + if (e->flags & EE_HAS_DIGEST) + kfree(e->digest); + drbd_pp_free(mdev, e->pages, is_net); D_ASSERT(atomic_read(&e->pending_bios) == 0); D_ASSERT(hlist_unhashed(&e->colision)); mempool_free(e, drbd_ee_mempool); @@ -388,13 +375,14 @@ int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list) LIST_HEAD(work_list); struct drbd_epoch_entry *e, *t; int count = 0; + int is_net = list == &mdev->net_ee; spin_lock_irq(&mdev->req_lock); list_splice_init(list, &work_list); spin_unlock_irq(&mdev->req_lock); list_for_each_entry_safe(e, t, &work_list, w.list) { - drbd_free_ee(mdev, e); + drbd_free_some_ee(mdev, e, is_net); count++; } return count; @@ -423,7 +411,7 @@ static int drbd_process_done_ee(struct drbd_conf *mdev) spin_unlock_irq(&mdev->req_lock); list_for_each_entry_safe(e, t, &reclaimed, w.list) - drbd_free_ee(mdev, e); + drbd_free_net_ee(mdev, e); /* possible callbacks here: * e_end_block, and e_end_resync_block, e_send_discard_ack. @@ -719,14 +707,14 @@ out: static int drbd_send_fp(struct drbd_conf *mdev, struct socket *sock, enum drbd_packets cmd) { - struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; + struct p_header80 *h = &mdev->data.sbuf.header.h80; return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0); } static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock) { - struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; + struct p_header80 *h = &mdev->data.rbuf.header.h80; int rr; rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0); @@ -776,9 +764,6 @@ static int drbd_connect(struct drbd_conf *mdev) D_ASSERT(!mdev->data.socket); - if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) - dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n"); - if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS) return -2; @@ -927,6 +912,11 @@ retry: drbd_thread_start(&mdev->asender); + if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) { + drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET); + put_ldev(mdev); + } + if (!drbd_send_protocol(mdev)) return -1; drbd_send_sync_param(mdev, &mdev->sync_conf); @@ -946,22 +936,28 @@ out_release_sockets: return -1; } -static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h) +static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size) { + union p_header *h = &mdev->data.rbuf.header; int r; r = drbd_recv(mdev, h, sizeof(*h)); - if (unlikely(r != sizeof(*h))) { dev_err(DEV, "short read expecting header on sock: r=%d\n", r); return FALSE; - }; - h->command = be16_to_cpu(h->command); - h->length = be16_to_cpu(h->length); - if (unlikely(h->magic != BE_DRBD_MAGIC)) { - dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n", - (long)be32_to_cpu(h->magic), - h->command, h->length); + } + + if (likely(h->h80.magic == BE_DRBD_MAGIC)) { + *cmd = be16_to_cpu(h->h80.command); + *packet_size = be16_to_cpu(h->h80.length); + } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) { + *cmd = be16_to_cpu(h->h95.command); + *packet_size = be32_to_cpu(h->h95.length); + } else { + dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n", + be32_to_cpu(h->h80.magic), + be16_to_cpu(h->h80.command), + be16_to_cpu(h->h80.length)); return FALSE; } mdev->last_received = jiffies; @@ -969,13 +965,13 @@ static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch) +static void drbd_flush(struct drbd_conf *mdev) { int rv; if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) { rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL, - NULL, BLKDEV_IFL_WAIT); + NULL); if (rv) { dev_err(DEV, "local disk flush failed with status %d\n", rv); /* would rather check on EOPNOTSUPP, but that is not reliable. @@ -985,24 +981,6 @@ static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct d } put_ldev(mdev); } - - return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE); -} - -static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel) -{ - struct flush_work *fw = (struct flush_work *)w; - struct drbd_epoch *epoch = fw->epoch; - - kfree(w); - - if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags)) - drbd_flush_after_epoch(mdev, epoch); - - drbd_may_finish_epoch(mdev, epoch, EV_PUT | - (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0)); - - return 1; } /** @@ -1015,15 +993,13 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch, enum epoch_event ev) { - int finish, epoch_size; + int epoch_size; struct drbd_epoch *next_epoch; - int schedule_flush = 0; enum finish_epoch rv = FE_STILL_LIVE; spin_lock(&mdev->epoch_lock); do { next_epoch = NULL; - finish = 0; epoch_size = atomic_read(&epoch->epoch_size); @@ -1033,16 +1009,6 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, break; case EV_GOT_BARRIER_NR: set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); - - /* Special case: If we just switched from WO_bio_barrier to - WO_bdev_flush we should not finish the current epoch */ - if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 && - mdev->write_ordering != WO_bio_barrier && - epoch == mdev->current_epoch) - clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags); - break; - case EV_BARRIER_DONE: - set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags); break; case EV_BECAME_LAST: /* nothing to do*/ @@ -1051,23 +1017,7 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, if (epoch_size != 0 && atomic_read(&epoch->active) == 0 && - test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) && - epoch->list.prev == &mdev->current_epoch->list && - !test_bit(DE_IS_FINISHING, &epoch->flags)) { - /* Nearly all conditions are met to finish that epoch... */ - if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) || - mdev->write_ordering == WO_none || - (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) || - ev & EV_CLEANUP) { - finish = 1; - set_bit(DE_IS_FINISHING, &epoch->flags); - } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) && - mdev->write_ordering == WO_bio_barrier) { - atomic_inc(&epoch->active); - schedule_flush = 1; - } - } - if (finish) { + test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) { if (!(ev & EV_CLEANUP)) { spin_unlock(&mdev->epoch_lock); drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size); @@ -1090,6 +1040,7 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, /* atomic_set(&epoch->active, 0); is already zero */ if (rv == FE_STILL_LIVE) rv = FE_RECYCLED; + wake_up(&mdev->ee_wait); } } @@ -1101,22 +1052,6 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, spin_unlock(&mdev->epoch_lock); - if (schedule_flush) { - struct flush_work *fw; - fw = kmalloc(sizeof(*fw), GFP_ATOMIC); - if (fw) { - fw->w.cb = w_flush; - fw->epoch = epoch; - drbd_queue_work(&mdev->data.work, &fw->w); - } else { - dev_warn(DEV, "Could not kmalloc a flush_work obj\n"); - set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags); - /* That is not a recursion, only one level */ - drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE); - drbd_may_finish_epoch(mdev, epoch, EV_PUT); - } - } - return rv; } @@ -1132,19 +1067,16 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) [WO_none] = "none", [WO_drain_io] = "drain", [WO_bdev_flush] = "flush", - [WO_bio_barrier] = "barrier", }; pwo = mdev->write_ordering; wo = min(pwo, wo); - if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier) - wo = WO_bdev_flush; if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush) wo = WO_drain_io; if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain) wo = WO_none; mdev->write_ordering = wo; - if (pwo != mdev->write_ordering || wo == WO_bio_barrier) + if (pwo != mdev->write_ordering || wo == WO_bdev_flush) dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]); } @@ -1180,7 +1112,7 @@ next_bio: bio->bi_sector = sector; bio->bi_bdev = mdev->ldev->backing_bdev; /* we special case some flags in the multi-bio case, see below - * (REQ_UNPLUG, REQ_HARDBARRIER) */ + * (REQ_UNPLUG) */ bio->bi_rw = rw; bio->bi_private = e; bio->bi_end_io = drbd_endio_sec; @@ -1214,11 +1146,6 @@ next_bio: bio->bi_rw &= ~REQ_UNPLUG; drbd_generic_make_request(mdev, fault_type, bio); - - /* strip off REQ_HARDBARRIER, - * unless it is the first or last bio */ - if (bios && bios->bi_next) - bios->bi_rw &= ~REQ_HARDBARRIER; } while (bios); maybe_kick_lo(mdev); return 0; @@ -1232,53 +1159,12 @@ fail: return -ENOMEM; } -/** - * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set - * @mdev: DRBD device. - * @w: work object. - * @cancel: The connection will be closed anyways (unused in this callback) - */ -int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) -{ - struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; - /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, - (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) - so that we can finish that epoch in drbd_may_finish_epoch(). - That is necessary if we already have a long chain of Epochs, before - we realize that REQ_HARDBARRIER is actually not supported */ - - /* As long as the -ENOTSUPP on the barrier is reported immediately - that will never trigger. If it is reported late, we will just - print that warning and continue correctly for all future requests - with WO_bdev_flush */ - if (previous_epoch(mdev, e->epoch)) - dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); - - /* we still have a local reference, - * get_ldev was done in receive_Data. */ - - e->w.cb = e_end_block; - if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) { - /* drbd_submit_ee fails for one reason only: - * if was not able to allocate sufficient bios. - * requeue, try again later. */ - e->w.cb = w_e_reissue; - drbd_queue_work(&mdev->data.work, &e->w); - } - return 1; -} - -static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h) +static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - int rv, issue_flush; - struct p_barrier *p = (struct p_barrier *)h; + int rv; + struct p_barrier *p = &mdev->data.rbuf.barrier; struct drbd_epoch *epoch; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - - rv = drbd_recv(mdev, h->payload, h->length); - ERR_IF(rv != h->length) return FALSE; - inc_unacked(mdev); if (mdev->net_conf->wire_protocol != DRBD_PROT_C) @@ -1293,44 +1179,40 @@ static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h) * Therefore we must send the barrier_ack after the barrier request was * completed. */ switch (mdev->write_ordering) { - case WO_bio_barrier: case WO_none: if (rv == FE_RECYCLED) return TRUE; - break; + + /* receiver context, in the writeout path of the other node. + * avoid potential distributed deadlock */ + epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); + if (epoch) + break; + else + dev_warn(DEV, "Allocation of an epoch failed, slowing down\n"); + /* Fall through */ case WO_bdev_flush: case WO_drain_io: - if (rv == FE_STILL_LIVE) { - set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags); - drbd_wait_ee_list_empty(mdev, &mdev->active_ee); - rv = drbd_flush_after_epoch(mdev, mdev->current_epoch); - } - if (rv == FE_RECYCLED) - return TRUE; - - /* The asender will send all the ACKs and barrier ACKs out, since - all EEs moved from the active_ee to the done_ee. We need to - provide a new epoch object for the EEs that come in soon */ - break; - } - - /* receiver context, in the writeout path of the other node. - * avoid potential distributed deadlock */ - epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); - if (!epoch) { - dev_warn(DEV, "Allocation of an epoch failed, slowing down\n"); - issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags); drbd_wait_ee_list_empty(mdev, &mdev->active_ee); - if (issue_flush) { - rv = drbd_flush_after_epoch(mdev, mdev->current_epoch); - if (rv == FE_RECYCLED) - return TRUE; + drbd_flush(mdev); + + if (atomic_read(&mdev->current_epoch->epoch_size)) { + epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); + if (epoch) + break; } - drbd_wait_ee_list_empty(mdev, &mdev->done_ee); + epoch = mdev->current_epoch; + wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0); + + D_ASSERT(atomic_read(&epoch->active) == 0); + D_ASSERT(epoch->flags == 0); return TRUE; + default: + dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering); + return FALSE; } epoch->flags = 0; @@ -1457,7 +1339,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size) data_size -= rr; } kunmap(page); - drbd_pp_free(mdev, page); + drbd_pp_free(mdev, page, 0); return rv; } @@ -1562,30 +1444,29 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si list_add(&e->w.list, &mdev->sync_ee); spin_unlock_irq(&mdev->req_lock); + atomic_add(data_size >> 9, &mdev->rs_sect_ev); if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0) return TRUE; + /* drbd_submit_ee currently fails for one reason only: + * not being able to allocate enough bios. + * Is dropping the connection going to help? */ + spin_lock_irq(&mdev->req_lock); + list_del(&e->w.list); + spin_unlock_irq(&mdev->req_lock); + drbd_free_ee(mdev, e); fail: put_ldev(mdev); return FALSE; } -static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) +static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { struct drbd_request *req; sector_t sector; - unsigned int header_size, data_size; int ok; - struct p_data *p = (struct p_data *)h; - - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - ERR_IF(data_size == 0) return FALSE; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; + struct p_data *p = &mdev->data.rbuf.data; sector = be64_to_cpu(p->sector); @@ -1611,20 +1492,11 @@ static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) return ok; } -static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h) +static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { sector_t sector; - unsigned int header_size, data_size; int ok; - struct p_data *p = (struct p_data *)h; - - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - ERR_IF(data_size == 0) return FALSE; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; + struct p_data *p = &mdev->data.rbuf.data; sector = be64_to_cpu(p->sector); D_ASSERT(p->block_id == ID_SYNCER); @@ -1640,9 +1512,11 @@ static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h) ok = drbd_drain_block(mdev, data_size); - drbd_send_ack_dp(mdev, P_NEG_ACK, p); + drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size); } + atomic_add(data_size >> 9, &mdev->rs_sect_in); + return ok; } @@ -1653,15 +1527,8 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel) { struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; sector_t sector = e->sector; - struct drbd_epoch *epoch; int ok = 1, pcmd; - if (e->flags & EE_IS_BARRIER) { - epoch = previous_epoch(mdev, e->epoch); - if (epoch) - drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0)); - } - if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { if (likely((e->flags & EE_WAS_ERROR) == 0)) { pcmd = (mdev->state.conn >= C_SYNC_SOURCE && @@ -1765,24 +1632,27 @@ static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq) return ret; } +static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf) +{ + if (mdev->agreed_pro_version >= 95) + return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | + (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) | + (dpf & DP_FUA ? REQ_FUA : 0) | + (dpf & DP_FLUSH ? REQ_FUA : 0) | + (dpf & DP_DISCARD ? REQ_DISCARD : 0); + else + return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0; +} + /* mirrored write */ -static int receive_Data(struct drbd_conf *mdev, struct p_header *h) +static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { sector_t sector; struct drbd_epoch_entry *e; - struct p_data *p = (struct p_data *)h; - int header_size, data_size; + struct p_data *p = &mdev->data.rbuf.data; int rw = WRITE; u32 dp_flags; - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - ERR_IF(data_size == 0) return FALSE; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; - if (!get_ldev(mdev)) { if (__ratelimit(&drbd_ratelimit_state)) dev_err(DEV, "Can not write mirrored data block " @@ -1792,7 +1662,7 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) mdev->peer_seq++; spin_unlock(&mdev->peer_seq_lock); - drbd_send_ack_dp(mdev, P_NEG_ACK, p); + drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size); atomic_inc(&mdev->current_epoch->epoch_size); return drbd_drain_block(mdev, data_size); } @@ -1815,36 +1685,11 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) e->epoch = mdev->current_epoch; atomic_inc(&e->epoch->epoch_size); atomic_inc(&e->epoch->active); - - if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) { - struct drbd_epoch *epoch; - /* Issue a barrier if we start a new epoch, and the previous epoch - was not a epoch containing a single request which already was - a Barrier. */ - epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list); - if (epoch == e->epoch) { - set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags); - rw |= REQ_HARDBARRIER; - e->flags |= EE_IS_BARRIER; - } else { - if (atomic_read(&epoch->epoch_size) > 1 || - !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) { - set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags); - set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags); - rw |= REQ_HARDBARRIER; - e->flags |= EE_IS_BARRIER; - } - } - } spin_unlock(&mdev->epoch_lock); dp_flags = be32_to_cpu(p->dp_flags); - if (dp_flags & DP_HARDBARRIER) { - dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n"); - /* rw |= REQ_HARDBARRIER; */ - } - if (dp_flags & DP_RW_SYNC) - rw |= REQ_SYNC | REQ_UNPLUG; + rw |= write_flags_to_bio(mdev, dp_flags); + if (dp_flags & DP_MAY_SET_IN_SYNC) e->flags |= EE_MAY_SET_IN_SYNC; @@ -1997,16 +1842,27 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) break; } - if (mdev->state.pdsk == D_DISKLESS) { + if (mdev->state.pdsk < D_INCONSISTENT) { /* In case we have the only disk of the cluster, */ drbd_set_out_of_sync(mdev, e->sector, e->size); e->flags |= EE_CALL_AL_COMPLETE_IO; + e->flags &= ~EE_MAY_SET_IN_SYNC; drbd_al_begin_io(mdev, e->sector); } if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0) return TRUE; + /* drbd_submit_ee currently fails for one reason only: + * not being able to allocate enough bios. + * Is dropping the connection going to help? */ + spin_lock_irq(&mdev->req_lock); + list_del(&e->w.list); + hlist_del_init(&e->colision); + spin_unlock_irq(&mdev->req_lock); + if (e->flags & EE_CALL_AL_COMPLETE_IO) + drbd_al_complete_io(mdev, e->sector); + out_interrupted: /* yes, the epoch_size now is imbalanced. * but we drop the connection anyways, so we don't have a chance to @@ -2016,20 +1872,64 @@ out_interrupted: return FALSE; } -static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) +/* We may throttle resync, if the lower device seems to be busy, + * and current sync rate is above c_min_rate. + * + * To decide whether or not the lower device is busy, we use a scheme similar + * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" + * (more than 64 sectors) of activity we cannot account for with our own resync + * activity, it obviously is "busy". + * + * The current sync rate used here uses only the most recent two step marks, + * to have a short time average so we can react faster. + */ +int drbd_rs_should_slow_down(struct drbd_conf *mdev) +{ + struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk; + unsigned long db, dt, dbdt; + int curr_events; + int throttle = 0; + + /* feature disabled? */ + if (mdev->sync_conf.c_min_rate == 0) + return 0; + + curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + + (int)part_stat_read(&disk->part0, sectors[1]) - + atomic_read(&mdev->rs_sect_ev); + if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) { + unsigned long rs_left; + int i; + + mdev->rs_last_events = curr_events; + + /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, + * approx. */ + i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS; + rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed; + + dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ; + if (!dt) + dt++; + db = mdev->rs_mark_left[i] - rs_left; + dbdt = Bit2KB(db/dt); + + if (dbdt > mdev->sync_conf.c_min_rate) + throttle = 1; + } + return throttle; +} + + +static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size) { sector_t sector; const sector_t capacity = drbd_get_capacity(mdev->this_bdev); struct drbd_epoch_entry *e; struct digest_info *di = NULL; - int size, digest_size; + int size, verb; unsigned int fault_type; - struct p_block_req *p = - (struct p_block_req *)h; - const int brps = sizeof(*p)-sizeof(*h); - - if (drbd_recv(mdev, h->payload, brps) != brps) - return FALSE; + struct p_block_req *p = &mdev->data.rbuf.block_req; sector = be64_to_cpu(p->sector); size = be32_to_cpu(p->blksize); @@ -2046,12 +1946,31 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) } if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) { - if (__ratelimit(&drbd_ratelimit_state)) + verb = 1; + switch (cmd) { + case P_DATA_REQUEST: + drbd_send_ack_rp(mdev, P_NEG_DREPLY, p); + break; + case P_RS_DATA_REQUEST: + case P_CSUM_RS_REQUEST: + case P_OV_REQUEST: + drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p); + break; + case P_OV_REPLY: + verb = 0; + dec_rs_pending(mdev); + drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC); + break; + default: + dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", + cmdname(cmd)); + } + if (verb && __ratelimit(&drbd_ratelimit_state)) dev_err(DEV, "Can not satisfy peer's read request, " "no local data.\n"); - drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY : - P_NEG_RS_DREPLY , p); - return drbd_drain_block(mdev, h->length - brps); + + /* drain possibly payload */ + return drbd_drain_block(mdev, digest_size); } /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD @@ -2063,31 +1982,21 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - switch (h->command) { + switch (cmd) { case P_DATA_REQUEST: e->w.cb = w_e_end_data_req; fault_type = DRBD_FAULT_DT_RD; - break; + /* application IO, don't drbd_rs_begin_io */ + goto submit; + case P_RS_DATA_REQUEST: e->w.cb = w_e_end_rsdata_req; fault_type = DRBD_FAULT_RS_RD; - /* Eventually this should become asynchronously. Currently it - * blocks the whole receiver just to delay the reading of a - * resync data block. - * the drbd_work_queue mechanism is made for this... - */ - if (!drbd_rs_begin_io(mdev, sector)) { - /* we have been interrupted, - * probably connection lost! */ - D_ASSERT(signal_pending(current)); - goto out_free_e; - } break; case P_OV_REPLY: case P_CSUM_RS_REQUEST: fault_type = DRBD_FAULT_RS_RD; - digest_size = h->length - brps ; di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO); if (!di) goto out_free_e; @@ -2095,31 +2004,25 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) di->digest_size = digest_size; di->digest = (((char *)di)+sizeof(struct digest_info)); + e->digest = di; + e->flags |= EE_HAS_DIGEST; + if (drbd_recv(mdev, di->digest, digest_size) != digest_size) goto out_free_e; - e->block_id = (u64)(unsigned long)di; - if (h->command == P_CSUM_RS_REQUEST) { + if (cmd == P_CSUM_RS_REQUEST) { D_ASSERT(mdev->agreed_pro_version >= 89); e->w.cb = w_e_end_csum_rs_req; - } else if (h->command == P_OV_REPLY) { + } else if (cmd == P_OV_REPLY) { e->w.cb = w_e_end_ov_reply; dec_rs_pending(mdev); - break; - } - - if (!drbd_rs_begin_io(mdev, sector)) { - /* we have been interrupted, probably connection lost! */ - D_ASSERT(signal_pending(current)); - goto out_free_e; + /* drbd_rs_begin_io done when we sent this request, + * but accounting still needs to be done. */ + goto submit_for_resync; } break; case P_OV_REQUEST: - if (mdev->state.conn >= C_CONNECTED && - mdev->state.conn != C_VERIFY_T) - dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n", - drbd_conn_str(mdev->state.conn)); if (mdev->ov_start_sector == ~(sector_t)0 && mdev->agreed_pro_version >= 90) { mdev->ov_start_sector = sector; @@ -2130,37 +2033,63 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) } e->w.cb = w_e_end_ov_req; fault_type = DRBD_FAULT_RS_RD; - /* Eventually this should become asynchronous. Currently it - * blocks the whole receiver just to delay the reading of a - * resync data block. - * the drbd_work_queue mechanism is made for this... - */ - if (!drbd_rs_begin_io(mdev, sector)) { - /* we have been interrupted, - * probably connection lost! */ - D_ASSERT(signal_pending(current)); - goto out_free_e; - } break; - default: dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", - cmdname(h->command)); + cmdname(cmd)); fault_type = DRBD_FAULT_MAX; + goto out_free_e; } - spin_lock_irq(&mdev->req_lock); - list_add(&e->w.list, &mdev->read_ee); - spin_unlock_irq(&mdev->req_lock); + /* Throttle, drbd_rs_begin_io and submit should become asynchronous + * wrt the receiver, but it is not as straightforward as it may seem. + * Various places in the resync start and stop logic assume resync + * requests are processed in order, requeuing this on the worker thread + * introduces a bunch of new code for synchronization between threads. + * + * Unlimited throttling before drbd_rs_begin_io may stall the resync + * "forever", throttling after drbd_rs_begin_io will lock that extent + * for application writes for the same time. For now, just throttle + * here, where the rest of the code expects the receiver to sleep for + * a while, anyways. + */ + + /* Throttle before drbd_rs_begin_io, as that locks out application IO; + * this defers syncer requests for some time, before letting at least + * on request through. The resync controller on the receiving side + * will adapt to the incoming rate accordingly. + * + * We cannot throttle here if remote is Primary/SyncTarget: + * we would also throttle its application reads. + * In that case, throttling is done on the SyncTarget only. + */ + if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev)) + msleep(100); + if (drbd_rs_begin_io(mdev, e->sector)) + goto out_free_e; +submit_for_resync: + atomic_add(size >> 9, &mdev->rs_sect_ev); + +submit: inc_unacked(mdev); + spin_lock_irq(&mdev->req_lock); + list_add_tail(&e->w.list, &mdev->read_ee); + spin_unlock_irq(&mdev->req_lock); if (drbd_submit_ee(mdev, e, READ, fault_type) == 0) return TRUE; + /* drbd_submit_ee currently fails for one reason only: + * not being able to allocate enough bios. + * Is dropping the connection going to help? */ + spin_lock_irq(&mdev->req_lock); + list_del(&e->w.list); + spin_unlock_irq(&mdev->req_lock); + /* no drbd_rs_complete_io(), we are dropping the connection anyways */ + out_free_e: - kfree(di); put_ldev(mdev); drbd_free_ee(mdev, e); return FALSE; @@ -2699,20 +2628,13 @@ static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self) return 1; } -static int receive_protocol(struct drbd_conf *mdev, struct p_header *h) +static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_protocol *p = (struct p_protocol *)h; - int header_size, data_size; + struct p_protocol *p = &mdev->data.rbuf.protocol; int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; int p_want_lose, p_two_primaries, cf; char p_integrity_alg[SHARED_SECRET_MAX] = ""; - header_size = sizeof(*p) - sizeof(*h); - data_size = h->length - header_size; - - if (drbd_recv(mdev, h->payload, header_size) != header_size) - return FALSE; - p_proto = be32_to_cpu(p->protocol); p_after_sb_0p = be32_to_cpu(p->after_sb_0p); p_after_sb_1p = be32_to_cpu(p->after_sb_1p); @@ -2805,39 +2727,46 @@ struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev, return tfm; } -static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) +static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size) { int ok = TRUE; - struct p_rs_param_89 *p = (struct p_rs_param_89 *)h; + struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95; unsigned int header_size, data_size, exp_max_sz; struct crypto_hash *verify_tfm = NULL; struct crypto_hash *csums_tfm = NULL; const int apv = mdev->agreed_pro_version; + int *rs_plan_s = NULL; + int fifo_size = 0; exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) : apv == 88 ? sizeof(struct p_rs_param) + SHARED_SECRET_MAX - : /* 89 */ sizeof(struct p_rs_param_89); + : apv <= 94 ? sizeof(struct p_rs_param_89) + : /* apv >= 95 */ sizeof(struct p_rs_param_95); - if (h->length > exp_max_sz) { + if (packet_size > exp_max_sz) { dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n", - h->length, exp_max_sz); + packet_size, exp_max_sz); return FALSE; } if (apv <= 88) { - header_size = sizeof(struct p_rs_param) - sizeof(*h); - data_size = h->length - header_size; - } else /* apv >= 89 */ { - header_size = sizeof(struct p_rs_param_89) - sizeof(*h); - data_size = h->length - header_size; + header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80); + data_size = packet_size - header_size; + } else if (apv <= 94) { + header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80); + data_size = packet_size - header_size; + D_ASSERT(data_size == 0); + } else { + header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80); + data_size = packet_size - header_size; D_ASSERT(data_size == 0); } /* initialize verify_alg and csums_alg */ memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); - if (drbd_recv(mdev, h->payload, header_size) != header_size) + if (drbd_recv(mdev, &p->head.payload, header_size) != header_size) return FALSE; mdev->sync_conf.rate = be32_to_cpu(p->rate); @@ -2896,6 +2825,22 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) } } + if (apv > 94) { + mdev->sync_conf.rate = be32_to_cpu(p->rate); + mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead); + mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target); + mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target); + mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate); + + fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; + if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) { + rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL); + if (!rs_plan_s) { + dev_err(DEV, "kmalloc of fifo_buffer failed"); + goto disconnect; + } + } + } spin_lock(&mdev->peer_seq_lock); /* lock against drbd_nl_syncer_conf() */ @@ -2913,6 +2858,12 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h) mdev->csums_tfm = csums_tfm; dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg); } + if (fifo_size != mdev->rs_plan_s.size) { + kfree(mdev->rs_plan_s.values); + mdev->rs_plan_s.values = rs_plan_s; + mdev->rs_plan_s.size = fifo_size; + mdev->rs_planed = 0; + } spin_unlock(&mdev->peer_seq_lock); } @@ -2946,19 +2897,15 @@ static void warn_if_differ_considerably(struct drbd_conf *mdev, (unsigned long long)a, (unsigned long long)b); } -static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) +static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_sizes *p = (struct p_sizes *)h; + struct p_sizes *p = &mdev->data.rbuf.sizes; enum determine_dev_size dd = unchanged; unsigned int max_seg_s; sector_t p_size, p_usize, my_usize; int ldsc = 0; /* local disk size changed */ enum dds_flags ddsf; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - p_size = be64_to_cpu(p->d_size); p_usize = be64_to_cpu(p->u_size); @@ -2972,7 +2919,6 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) * we still need to figure out whether we accept that. */ mdev->p_size = p_size; -#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r)) if (get_ldev(mdev)) { warn_if_differ_considerably(mdev, "lower level device sizes", p_size, drbd_get_max_capacity(mdev->ldev)); @@ -3029,6 +2975,8 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) if (mdev->agreed_pro_version < 94) max_seg_s = be32_to_cpu(p->max_segment_size); + else if (mdev->agreed_pro_version == 94) + max_seg_s = DRBD_MAX_SIZE_H80_PACKET; else /* drbd 8.3.8 onwards */ max_seg_s = DRBD_MAX_SEGMENT_SIZE; @@ -3062,16 +3010,12 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int receive_uuids(struct drbd_conf *mdev, struct p_header *h) +static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_uuids *p = (struct p_uuids *)h; + struct p_uuids *p = &mdev->data.rbuf.uuids; u64 *p_uuid; int i; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) @@ -3107,6 +3051,11 @@ static int receive_uuids(struct drbd_conf *mdev, struct p_header *h) drbd_md_sync(mdev); } put_ldev(mdev); + } else if (mdev->state.disk < D_INCONSISTENT && + mdev->state.role == R_PRIMARY) { + /* I am a diskless primary, the peer just created a new current UUID + for me. */ + drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]); } /* Before we test for the disk state, we should wait until an eventually @@ -3150,16 +3099,12 @@ static union drbd_state convert_state(union drbd_state ps) return ms; } -static int receive_req_state(struct drbd_conf *mdev, struct p_header *h) +static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_req_state *p = (struct p_req_state *)h; + struct p_req_state *p = &mdev->data.rbuf.req_state; union drbd_state mask, val; int rv; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - mask.i = be32_to_cpu(p->mask); val.i = be32_to_cpu(p->val); @@ -3180,20 +3125,14 @@ static int receive_req_state(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int receive_state(struct drbd_conf *mdev, struct p_header *h) +static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_state *p = (struct p_state *)h; - enum drbd_conns nconn, oconn; - union drbd_state ns, peer_state; + struct p_state *p = &mdev->data.rbuf.state; + union drbd_state os, ns, peer_state; enum drbd_disk_state real_peer_disk; + enum chg_state_flags cs_flags; int rv; - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) - return FALSE; - - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - peer_state.i = be32_to_cpu(p->state); real_peer_disk = peer_state.disk; @@ -3204,40 +3143,74 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) spin_lock_irq(&mdev->req_lock); retry: - oconn = nconn = mdev->state.conn; + os = ns = mdev->state; spin_unlock_irq(&mdev->req_lock); - if (nconn == C_WF_REPORT_PARAMS) - nconn = C_CONNECTED; + /* peer says his disk is uptodate, while we think it is inconsistent, + * and this happens while we think we have a sync going on. */ + if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE && + os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { + /* If we are (becoming) SyncSource, but peer is still in sync + * preparation, ignore its uptodate-ness to avoid flapping, it + * will change to inconsistent once the peer reaches active + * syncing states. + * It may have changed syncer-paused flags, however, so we + * cannot ignore this completely. */ + if (peer_state.conn > C_CONNECTED && + peer_state.conn < C_SYNC_SOURCE) + real_peer_disk = D_INCONSISTENT; + + /* if peer_state changes to connected at the same time, + * it explicitly notifies us that it finished resync. + * Maybe we should finish it up, too? */ + else if (os.conn >= C_SYNC_SOURCE && + peer_state.conn == C_CONNECTED) { + if (drbd_bm_total_weight(mdev) <= mdev->rs_failed) + drbd_resync_finished(mdev); + return TRUE; + } + } + + /* peer says his disk is inconsistent, while we think it is uptodate, + * and this happens while the peer still thinks we have a sync going on, + * but we think we are already done with the sync. + * We ignore this to avoid flapping pdsk. + * This should not happen, if the peer is a recent version of drbd. */ + if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && + os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) + real_peer_disk = D_UP_TO_DATE; + + if (ns.conn == C_WF_REPORT_PARAMS) + ns.conn = C_CONNECTED; if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING && get_ldev_if_state(mdev, D_NEGOTIATING)) { int cr; /* consider resync */ /* if we established a new connection */ - cr = (oconn < C_CONNECTED); + cr = (os.conn < C_CONNECTED); /* if we had an established connection * and one of the nodes newly attaches a disk */ - cr |= (oconn == C_CONNECTED && + cr |= (os.conn == C_CONNECTED && (peer_state.disk == D_NEGOTIATING || - mdev->state.disk == D_NEGOTIATING)); + os.disk == D_NEGOTIATING)); /* if we have both been inconsistent, and the peer has been * forced to be UpToDate with --overwrite-data */ cr |= test_bit(CONSIDER_RESYNC, &mdev->flags); /* if we had been plain connected, and the admin requested to * start a sync by "invalidate" or "invalidate-remote" */ - cr |= (oconn == C_CONNECTED && + cr |= (os.conn == C_CONNECTED && (peer_state.conn >= C_STARTING_SYNC_S && peer_state.conn <= C_WF_BITMAP_T)); if (cr) - nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk); + ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk); put_ldev(mdev); - if (nconn == C_MASK) { - nconn = C_CONNECTED; + if (ns.conn == C_MASK) { + ns.conn = C_CONNECTED; if (mdev->state.disk == D_NEGOTIATING) { - drbd_force_state(mdev, NS(disk, D_DISKLESS)); + drbd_force_state(mdev, NS(disk, D_FAILED)); } else if (peer_state.disk == D_NEGOTIATING) { dev_err(DEV, "Disk attach process on the peer node was aborted.\n"); peer_state.disk = D_DISKLESS; @@ -3245,7 +3218,7 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) } else { if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags)) return FALSE; - D_ASSERT(oconn == C_WF_REPORT_PARAMS); + D_ASSERT(os.conn == C_WF_REPORT_PARAMS); drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); return FALSE; } @@ -3253,18 +3226,28 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) } spin_lock_irq(&mdev->req_lock); - if (mdev->state.conn != oconn) + if (mdev->state.i != os.i) goto retry; clear_bit(CONSIDER_RESYNC, &mdev->flags); - ns.i = mdev->state.i; - ns.conn = nconn; ns.peer = peer_state.role; ns.pdsk = real_peer_disk; ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); - if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) + if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) ns.disk = mdev->new_state_tmp.disk; - - rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL); + cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); + if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && + test_bit(NEW_CUR_UUID, &mdev->flags)) { + /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this + for temporal network outages! */ + spin_unlock_irq(&mdev->req_lock); + dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); + tl_clear(mdev); + drbd_uuid_new_current(mdev); + clear_bit(NEW_CUR_UUID, &mdev->flags); + drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0)); + return FALSE; + } + rv = _drbd_set_state(mdev, ns, cs_flags, NULL); ns = mdev->state; spin_unlock_irq(&mdev->req_lock); @@ -3273,8 +3256,8 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - if (oconn > C_WF_REPORT_PARAMS) { - if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED && + if (os.conn > C_WF_REPORT_PARAMS) { + if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && peer_state.disk != D_NEGOTIATING ) { /* we want resync, peer has not yet decided to sync... */ /* Nowadays only used when forcing a node into primary role and @@ -3291,9 +3274,9 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) +static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { - struct p_rs_uuid *p = (struct p_rs_uuid *)h; + struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid; wait_event(mdev->misc_wait, mdev->state.conn == C_WF_SYNC_UUID || @@ -3302,10 +3285,6 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */ - ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; - if (drbd_recv(mdev, h->payload, h->length) != h->length) - return FALSE; - /* Here the _drbd_uuid_ functions are right, current should _not_ be rotated into the history */ if (get_ldev_if_state(mdev, D_NEGOTIATING)) { @@ -3324,14 +3303,14 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h) enum receive_bitmap_ret { OK, DONE, FAILED }; static enum receive_bitmap_ret -receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h, - unsigned long *buffer, struct bm_xfer_ctx *c) +receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size, + unsigned long *buffer, struct bm_xfer_ctx *c) { unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset); unsigned want = num_words * sizeof(long); - if (want != h->length) { - dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length); + if (want != data_size) { + dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size); return FAILED; } if (want == 0) @@ -3360,7 +3339,7 @@ recv_bm_rle_bits(struct drbd_conf *mdev, u64 tmp; unsigned long s = c->bit_offset; unsigned long e; - int len = p->head.length - (sizeof(*p) - sizeof(p->head)); + int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head)); int toggle = DCBP_get_start(p); int have; int bits; @@ -3429,7 +3408,7 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev, const char *direction, struct bm_xfer_ctx *c) { /* what would it take to transfer it "plaintext" */ - unsigned plain = sizeof(struct p_header) * + unsigned plain = sizeof(struct p_header80) * ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1) + c->bm_words * sizeof(long); unsigned total = c->bytes[0] + c->bytes[1]; @@ -3467,12 +3446,13 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev, in order to be agnostic to the 32 vs 64 bits issue. returns 0 on failure, 1 if we successfully received it. */ -static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) +static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { struct bm_xfer_ctx c; void *buffer; enum receive_bitmap_ret ret; int ok = FALSE; + struct p_header80 *h = &mdev->data.rbuf.header.h80; wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt)); @@ -3492,39 +3472,39 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) }; do { - if (h->command == P_BITMAP) { - ret = receive_bitmap_plain(mdev, h, buffer, &c); - } else if (h->command == P_COMPRESSED_BITMAP) { + if (cmd == P_BITMAP) { + ret = receive_bitmap_plain(mdev, data_size, buffer, &c); + } else if (cmd == P_COMPRESSED_BITMAP) { /* MAYBE: sanity check that we speak proto >= 90, * and the feature is enabled! */ struct p_compressed_bm *p; - if (h->length > BM_PACKET_PAYLOAD_BYTES) { + if (data_size > BM_PACKET_PAYLOAD_BYTES) { dev_err(DEV, "ReportCBitmap packet too large\n"); goto out; } /* use the page buff */ p = buffer; memcpy(p, h, sizeof(*h)); - if (drbd_recv(mdev, p->head.payload, h->length) != h->length) + if (drbd_recv(mdev, p->head.payload, data_size) != data_size) goto out; - if (p->head.length <= (sizeof(*p) - sizeof(p->head))) { - dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length); + if (data_size <= (sizeof(*p) - sizeof(p->head))) { + dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size); return FAILED; } ret = decode_bitmap_c(mdev, p, &c); } else { - dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command); + dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd); goto out; } - c.packets[h->command == P_BITMAP]++; - c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length; + c.packets[cmd == P_BITMAP]++; + c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size; if (ret != OK) break; - if (!drbd_recv_header(mdev, h)) + if (!drbd_recv_header(mdev, &cmd, &data_size)) goto out; } while (ret == OK); if (ret == FAILED) @@ -3555,17 +3535,16 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h) return ok; } -static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent) +static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { /* TODO zero copy sink :) */ static char sink[128]; int size, want, r; - if (!silent) - dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n", - h->command, h->length); + dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n", + cmd, data_size); - size = h->length; + size = data_size; while (size > 0) { want = min_t(int, size, sizeof(sink)); r = drbd_recv(mdev, sink, want); @@ -3575,17 +3554,7 @@ static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent) return size == 0; } -static int receive_skip(struct drbd_conf *mdev, struct p_header *h) -{ - return receive_skip_(mdev, h, 0); -} - -static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h) -{ - return receive_skip_(mdev, h, 1); -} - -static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h) +static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) { if (mdev->state.disk >= D_INCONSISTENT) drbd_kick_lo(mdev); @@ -3597,108 +3566,96 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *); - -static drbd_cmd_handler_f drbd_default_handler[] = { - [P_DATA] = receive_Data, - [P_DATA_REPLY] = receive_DataReply, - [P_RS_DATA_REPLY] = receive_RSDataReply, - [P_BARRIER] = receive_Barrier, - [P_BITMAP] = receive_bitmap, - [P_COMPRESSED_BITMAP] = receive_bitmap, - [P_UNPLUG_REMOTE] = receive_UnplugRemote, - [P_DATA_REQUEST] = receive_DataRequest, - [P_RS_DATA_REQUEST] = receive_DataRequest, - [P_SYNC_PARAM] = receive_SyncParam, - [P_SYNC_PARAM89] = receive_SyncParam, - [P_PROTOCOL] = receive_protocol, - [P_UUIDS] = receive_uuids, - [P_SIZES] = receive_sizes, - [P_STATE] = receive_state, - [P_STATE_CHG_REQ] = receive_req_state, - [P_SYNC_UUID] = receive_sync_uuid, - [P_OV_REQUEST] = receive_DataRequest, - [P_OV_REPLY] = receive_DataRequest, - [P_CSUM_RS_REQUEST] = receive_DataRequest, - [P_DELAY_PROBE] = receive_skip_silent, +typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive); + +struct data_cmd { + int expect_payload; + size_t pkt_size; + drbd_cmd_handler_f function; +}; + +static struct data_cmd drbd_cmd_handler[] = { + [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, + [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, + [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , + [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , + [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } , + [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } , + [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote }, + [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, + [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, + [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam }, + [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam }, + [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, + [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, + [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, + [P_STATE] = { 0, sizeof(struct p_state), receive_state }, + [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, + [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, + [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, + [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, + [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, + [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, /* anything missing from this table is in * the asender_tbl, see get_asender_cmd */ - [P_MAX_CMD] = NULL, + [P_MAX_CMD] = { 0, 0, NULL }, }; -static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler; -static drbd_cmd_handler_f *drbd_opt_cmd_handler; +/* All handler functions that expect a sub-header get that sub-heder in + mdev->data.rbuf.header.head.payload. + + Usually in mdev->data.rbuf.header.head the callback can find the usual + p_header, but they may not rely on that. Since there is also p_header95 ! + */ static void drbdd(struct drbd_conf *mdev) { - drbd_cmd_handler_f handler; - struct p_header *header = &mdev->data.rbuf.header; + union p_header *header = &mdev->data.rbuf.header; + unsigned int packet_size; + enum drbd_packets cmd; + size_t shs; /* sub header size */ + int rv; while (get_t_state(&mdev->receiver) == Running) { drbd_thread_current_set_cpu(mdev); - if (!drbd_recv_header(mdev, header)) { - drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); - break; - } + if (!drbd_recv_header(mdev, &cmd, &packet_size)) + goto err_out; - if (header->command < P_MAX_CMD) - handler = drbd_cmd_handler[header->command]; - else if (P_MAY_IGNORE < header->command - && header->command < P_MAX_OPT_CMD) - handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE]; - else if (header->command > P_MAX_OPT_CMD) - handler = receive_skip; - else - handler = NULL; + if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) { + dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size); + goto err_out; + } - if (unlikely(!handler)) { - dev_err(DEV, "unknown packet type %d, l: %d!\n", - header->command, header->length); - drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); - break; + shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header); + if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) { + dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size); + goto err_out; } - if (unlikely(!handler(mdev, header))) { - dev_err(DEV, "error receiving %s, l: %d!\n", - cmdname(header->command), header->length); - drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); - break; + + if (shs) { + rv = drbd_recv(mdev, &header->h80.payload, shs); + if (unlikely(rv != shs)) { + dev_err(DEV, "short read while reading sub header: rv=%d\n", rv); + goto err_out; + } } - } -} -static void drbd_fail_pending_reads(struct drbd_conf *mdev) -{ - struct hlist_head *slot; - struct hlist_node *pos; - struct hlist_node *tmp; - struct drbd_request *req; - int i; + rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs); - /* - * Application READ requests - */ - spin_lock_irq(&mdev->req_lock); - for (i = 0; i < APP_R_HSIZE; i++) { - slot = mdev->app_reads_hash+i; - hlist_for_each_entry_safe(req, pos, tmp, slot, colision) { - /* it may (but should not any longer!) - * be on the work queue; if that assert triggers, - * we need to also grab the - * spin_lock_irq(&mdev->data.work.q_lock); - * and list_del_init here. */ - D_ASSERT(list_empty(&req->w.list)); - /* It would be nice to complete outside of spinlock. - * But this is easier for now. */ - _req_mod(req, connection_lost_while_pending); + if (unlikely(!rv)) { + dev_err(DEV, "error receiving %s, l: %d!\n", + cmdname(cmd), packet_size); + goto err_out; } } - for (i = 0; i < APP_R_HSIZE; i++) - if (!hlist_empty(mdev->app_reads_hash+i)) - dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: " - "%p, should be NULL\n", i, mdev->app_reads_hash[i].first); - memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *)); - spin_unlock_irq(&mdev->req_lock); + if (0) { + err_out: + drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); + } + /* If we leave here, we probably want to update at least the + * "Connected" indicator on stable storage. Do so explicitly here. */ + drbd_md_sync(mdev); } void drbd_flush_workqueue(struct drbd_conf *mdev) @@ -3711,6 +3668,36 @@ void drbd_flush_workqueue(struct drbd_conf *mdev) wait_for_completion(&barr.done); } +void drbd_free_tl_hash(struct drbd_conf *mdev) +{ + struct hlist_head *h; + + spin_lock_irq(&mdev->req_lock); + + if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) { + spin_unlock_irq(&mdev->req_lock); + return; + } + /* paranoia code */ + for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++) + if (h->first) + dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n", + (int)(h - mdev->ee_hash), h->first); + kfree(mdev->ee_hash); + mdev->ee_hash = NULL; + mdev->ee_hash_s = 0; + + /* paranoia code */ + for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) + if (h->first) + dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n", + (int)(h - mdev->tl_hash), h->first); + kfree(mdev->tl_hash); + mdev->tl_hash = NULL; + mdev->tl_hash_s = 0; + spin_unlock_irq(&mdev->req_lock); +} + static void drbd_disconnect(struct drbd_conf *mdev) { enum drbd_fencing_p fp; @@ -3728,6 +3715,7 @@ static void drbd_disconnect(struct drbd_conf *mdev) drbd_thread_stop(&mdev->asender); drbd_free_sock(mdev); + /* wait for current activity to cease. */ spin_lock_irq(&mdev->req_lock); _drbd_wait_ee_list_empty(mdev, &mdev->active_ee); _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee); @@ -3752,7 +3740,6 @@ static void drbd_disconnect(struct drbd_conf *mdev) /* make sure syncer is stopped and w_resume_next_sg queued */ del_timer_sync(&mdev->resync_timer); - set_bit(STOP_SYNC_TIMER, &mdev->flags); resync_timer_fn((unsigned long)mdev); /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, @@ -3767,11 +3754,9 @@ static void drbd_disconnect(struct drbd_conf *mdev) kfree(mdev->p_uuid); mdev->p_uuid = NULL; - if (!mdev->state.susp) + if (!is_susp(mdev->state)) tl_clear(mdev); - drbd_fail_pending_reads(mdev); - dev_info(DEV, "Connection closed\n"); drbd_md_sync(mdev); @@ -3782,12 +3767,8 @@ static void drbd_disconnect(struct drbd_conf *mdev) put_ldev(mdev); } - if (mdev->state.role == R_PRIMARY) { - if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) { - enum drbd_disk_state nps = drbd_try_outdate_peer(mdev); - drbd_request_state(mdev, NS(pdsk, nps)); - } - } + if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) + drbd_try_outdate_peer_async(mdev); spin_lock_irq(&mdev->req_lock); os = mdev->state; @@ -3800,32 +3781,14 @@ static void drbd_disconnect(struct drbd_conf *mdev) spin_unlock_irq(&mdev->req_lock); if (os.conn == C_DISCONNECTING) { - struct hlist_head *h; - wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0); - - /* we must not free the tl_hash - * while application io is still on the fly */ - wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0); + wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0); - spin_lock_irq(&mdev->req_lock); - /* paranoia code */ - for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++) - if (h->first) - dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n", - (int)(h - mdev->ee_hash), h->first); - kfree(mdev->ee_hash); - mdev->ee_hash = NULL; - mdev->ee_hash_s = 0; - - /* paranoia code */ - for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) - if (h->first) - dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n", - (int)(h - mdev->tl_hash), h->first); - kfree(mdev->tl_hash); - mdev->tl_hash = NULL; - mdev->tl_hash_s = 0; - spin_unlock_irq(&mdev->req_lock); + if (!is_susp(mdev->state)) { + /* we must not free the tl_hash + * while application io is still on the fly */ + wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt)); + drbd_free_tl_hash(mdev); + } crypto_free_hash(mdev->cram_hmac_tfm); mdev->cram_hmac_tfm = NULL; @@ -3845,6 +3808,9 @@ static void drbd_disconnect(struct drbd_conf *mdev) i = drbd_release_ee(mdev, &mdev->net_ee); if (i) dev_info(DEV, "net_ee not empty, killed %u entries\n", i); + i = atomic_read(&mdev->pp_in_use_by_net); + if (i) + dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i); i = atomic_read(&mdev->pp_in_use); if (i) dev_info(DEV, "pp_in_use = %d, expected 0\n", i); @@ -3888,7 +3854,7 @@ static int drbd_send_handshake(struct drbd_conf *mdev) p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE, - (struct p_header *)p, sizeof(*p), 0 ); + (struct p_header80 *)p, sizeof(*p), 0 ); mutex_unlock(&mdev->data.mutex); return ok; } @@ -3904,27 +3870,28 @@ static int drbd_do_handshake(struct drbd_conf *mdev) { /* ASSERT current == mdev->receiver ... */ struct p_handshake *p = &mdev->data.rbuf.handshake; - const int expect = sizeof(struct p_handshake) - -sizeof(struct p_header); + const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80); + unsigned int length; + enum drbd_packets cmd; int rv; rv = drbd_send_handshake(mdev); if (!rv) return 0; - rv = drbd_recv_header(mdev, &p->head); + rv = drbd_recv_header(mdev, &cmd, &length); if (!rv) return 0; - if (p->head.command != P_HAND_SHAKE) { + if (cmd != P_HAND_SHAKE) { dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n", - cmdname(p->head.command), p->head.command); + cmdname(cmd), cmd); return -1; } - if (p->head.length != expect) { + if (length != expect) { dev_err(DEV, "expected HandShake length: %u, received: %u\n", - expect, p->head.length); + expect, length); return -1; } @@ -3982,10 +3949,11 @@ static int drbd_do_auth(struct drbd_conf *mdev) char *response = NULL; char *right_response = NULL; char *peers_ch = NULL; - struct p_header p; unsigned int key_len = strlen(mdev->net_conf->shared_secret); unsigned int resp_size; struct hash_desc desc; + enum drbd_packets cmd; + unsigned int length; int rv; desc.tfm = mdev->cram_hmac_tfm; @@ -4005,33 +3973,33 @@ static int drbd_do_auth(struct drbd_conf *mdev) if (!rv) goto fail; - rv = drbd_recv_header(mdev, &p); + rv = drbd_recv_header(mdev, &cmd, &length); if (!rv) goto fail; - if (p.command != P_AUTH_CHALLENGE) { + if (cmd != P_AUTH_CHALLENGE) { dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n", - cmdname(p.command), p.command); + cmdname(cmd), cmd); rv = 0; goto fail; } - if (p.length > CHALLENGE_LEN*2) { + if (length > CHALLENGE_LEN * 2) { dev_err(DEV, "expected AuthChallenge payload too big.\n"); rv = -1; goto fail; } - peers_ch = kmalloc(p.length, GFP_NOIO); + peers_ch = kmalloc(length, GFP_NOIO); if (peers_ch == NULL) { dev_err(DEV, "kmalloc of peers_ch failed\n"); rv = -1; goto fail; } - rv = drbd_recv(mdev, peers_ch, p.length); + rv = drbd_recv(mdev, peers_ch, length); - if (rv != p.length) { + if (rv != length) { dev_err(DEV, "short read AuthChallenge: l=%u\n", rv); rv = 0; goto fail; @@ -4046,7 +4014,7 @@ static int drbd_do_auth(struct drbd_conf *mdev) } sg_init_table(&sg, 1); - sg_set_buf(&sg, peers_ch, p.length); + sg_set_buf(&sg, peers_ch, length); rv = crypto_hash_digest(&desc, &sg, sg.length, response); if (rv) { @@ -4059,18 +4027,18 @@ static int drbd_do_auth(struct drbd_conf *mdev) if (!rv) goto fail; - rv = drbd_recv_header(mdev, &p); + rv = drbd_recv_header(mdev, &cmd, &length); if (!rv) goto fail; - if (p.command != P_AUTH_RESPONSE) { + if (cmd != P_AUTH_RESPONSE) { dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n", - cmdname(p.command), p.command); + cmdname(cmd), cmd); rv = 0; goto fail; } - if (p.length != resp_size) { + if (length != resp_size) { dev_err(DEV, "expected AuthResponse payload of wrong size\n"); rv = 0; goto fail; @@ -4155,7 +4123,7 @@ int drbdd_init(struct drbd_thread *thi) /* ********* acknowledge sender ******** */ -static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h) +static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h) { struct p_req_state_reply *p = (struct p_req_state_reply *)h; @@ -4173,13 +4141,13 @@ static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_Ping(struct drbd_conf *mdev, struct p_header *h) +static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h) { return drbd_send_ping_ack(mdev); } -static int got_PingAck(struct drbd_conf *mdev, struct p_header *h) +static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h) { /* restore idle timeout */ mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; @@ -4189,7 +4157,7 @@ static int got_PingAck(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h) +static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4199,11 +4167,15 @@ static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h) update_peer_seq(mdev, be32_to_cpu(p->seq_num)); - drbd_rs_complete_io(mdev, sector); - drbd_set_in_sync(mdev, sector, blksize); - /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ - mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); + if (get_ldev(mdev)) { + drbd_rs_complete_io(mdev, sector); + drbd_set_in_sync(mdev, sector, blksize); + /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ + mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); + put_ldev(mdev); + } dec_rs_pending(mdev); + atomic_add(blksize >> 9, &mdev->rs_sect_in); return TRUE; } @@ -4259,7 +4231,7 @@ static int validate_req_change_req_state(struct drbd_conf *mdev, return TRUE; } -static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h) +static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4299,7 +4271,7 @@ static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h) _ack_id_to_req, __func__ , what); } -static int got_NegAck(struct drbd_conf *mdev, struct p_header *h) +static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4319,7 +4291,7 @@ static int got_NegAck(struct drbd_conf *mdev, struct p_header *h) _ack_id_to_req, __func__ , neg_acked); } -static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h) +static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; sector_t sector = be64_to_cpu(p->sector); @@ -4332,7 +4304,7 @@ static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h) _ar_id_to_req, __func__ , neg_acked); } -static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h) +static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h) { sector_t sector; int size; @@ -4354,7 +4326,7 @@ static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h) +static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h) { struct p_barrier_ack *p = (struct p_barrier_ack *)h; @@ -4363,7 +4335,7 @@ static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h) return TRUE; } -static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) +static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h) { struct p_block_ack *p = (struct p_block_ack *)h; struct drbd_work *w; @@ -4380,6 +4352,9 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) else ov_oos_print(mdev); + if (!get_ldev(mdev)) + return TRUE; + drbd_rs_complete_io(mdev, sector); dec_rs_pending(mdev); @@ -4394,18 +4369,18 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) drbd_resync_finished(mdev); } } + put_ldev(mdev); return TRUE; } -static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h) +static int got_skip(struct drbd_conf *mdev, struct p_header80 *h) { - /* IGNORE */ return TRUE; } struct asender_cmd { size_t pkt_size; - int (*process)(struct drbd_conf *mdev, struct p_header *h); + int (*process)(struct drbd_conf *mdev, struct p_header80 *h); }; static struct asender_cmd *get_asender_cmd(int cmd) @@ -4414,8 +4389,8 @@ static struct asender_cmd *get_asender_cmd(int cmd) /* anything missing from this table is in * the drbd_cmd_handler (drbd_default_handler) table, * see the beginning of drbdd() */ - [P_PING] = { sizeof(struct p_header), got_Ping }, - [P_PING_ACK] = { sizeof(struct p_header), got_PingAck }, + [P_PING] = { sizeof(struct p_header80), got_Ping }, + [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck }, [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, @@ -4427,7 +4402,7 @@ static struct asender_cmd *get_asender_cmd(int cmd) [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, - [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_something_to_ignore_m }, + [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, [P_MAX_CMD] = { 0, NULL }, }; if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL) @@ -4438,13 +4413,13 @@ static struct asender_cmd *get_asender_cmd(int cmd) int drbd_asender(struct drbd_thread *thi) { struct drbd_conf *mdev = thi->mdev; - struct p_header *h = &mdev->meta.rbuf.header; + struct p_header80 *h = &mdev->meta.rbuf.header.h80; struct asender_cmd *cmd = NULL; int rv, len; void *buf = h; int received = 0; - int expect = sizeof(struct p_header); + int expect = sizeof(struct p_header80); int empty; sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev)); @@ -4468,10 +4443,8 @@ int drbd_asender(struct drbd_thread *thi) while (1) { clear_bit(SIGNAL_ASENDER, &mdev->flags); flush_signals(current); - if (!drbd_process_done_ee(mdev)) { - dev_err(DEV, "process_done_ee() = NOT_OK\n"); + if (!drbd_process_done_ee(mdev)) goto reconnect; - } /* to avoid race with newly queued ACKs */ set_bit(SIGNAL_ASENDER, &mdev->flags); spin_lock_irq(&mdev->req_lock); @@ -4530,21 +4503,23 @@ int drbd_asender(struct drbd_thread *thi) if (received == expect && cmd == NULL) { if (unlikely(h->magic != BE_DRBD_MAGIC)) { - dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n", - (long)be32_to_cpu(h->magic), - h->command, h->length); + dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n", + be32_to_cpu(h->magic), + be16_to_cpu(h->command), + be16_to_cpu(h->length)); goto reconnect; } cmd = get_asender_cmd(be16_to_cpu(h->command)); len = be16_to_cpu(h->length); if (unlikely(cmd == NULL)) { - dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n", - (long)be32_to_cpu(h->magic), - h->command, h->length); + dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n", + be32_to_cpu(h->magic), + be16_to_cpu(h->command), + be16_to_cpu(h->length)); goto disconnect; } expect = cmd->pkt_size; - ERR_IF(len != expect-sizeof(struct p_header)) + ERR_IF(len != expect-sizeof(struct p_header80)) goto reconnect; } if (received == expect) { @@ -4554,7 +4529,7 @@ int drbd_asender(struct drbd_thread *thi) buf = h; received = 0; - expect = sizeof(struct p_header); + expect = sizeof(struct p_header80); cmd = NULL; } } @@ -4562,10 +4537,12 @@ int drbd_asender(struct drbd_thread *thi) if (0) { reconnect: drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); + drbd_md_sync(mdev); } if (0) { disconnect: drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); + drbd_md_sync(mdev); } clear_bit(SIGNAL_ASENDER, &mdev->flags); |