diff options
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 581 |
1 files changed, 511 insertions, 70 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 856012b4fa4..77e4e6169a0 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -99,9 +99,10 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm, int idx); static void dlm_assert_master_worker(struct dlm_work_item *item, void *data); -static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, - unsigned int namelen, void *nodemap, - u32 flags); +static int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags); +static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data); static inline int dlm_mle_equal(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, @@ -237,7 +238,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry **mle, char *name, unsigned int namelen); -static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to); +static int dlm_do_master_request(struct dlm_lock_resource *res, + struct dlm_master_list_entry *mle, int to); static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, @@ -687,6 +689,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, INIT_LIST_HEAD(&res->purge); atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; + res->inflight_locks = 0; kref_init(&res->refs); @@ -700,6 +703,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, res->last_used = 0; memset(res->lvb, 0, DLM_LVB_LEN); + memset(res->refmap, 0, sizeof(res->refmap)); } struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, @@ -722,6 +726,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, return res; } +void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + int new_lockres, + const char *file, + int line) +{ + if (!new_lockres) + assert_spin_locked(&res->spinlock); + + if (!test_bit(dlm->node_num, res->refmap)) { + BUG_ON(res->inflight_locks != 0); + dlm_lockres_set_refmap_bit(dlm->node_num, res); + } + res->inflight_locks++; + mlog(0, "%s:%.*s: inflight++: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_locks); +} + +void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + const char *file, + int line) +{ + assert_spin_locked(&res->spinlock); + + BUG_ON(res->inflight_locks == 0); + res->inflight_locks--; + mlog(0, "%s:%.*s: inflight--: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_locks); + if (res->inflight_locks == 0) + dlm_lockres_clear_refmap_bit(dlm->node_num, res); + wake_up(&res->wq); +} + /* * lookup a lock resource by name. * may already exist in the hashtable. @@ -752,6 +792,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, unsigned int hash; int tries = 0; int bit, wait_on_recovery = 0; + int drop_inflight_if_nonlocal = 0; BUG_ON(!lockid); @@ -761,9 +802,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); + tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); if (tmpres) { + int dropping_ref = 0; + + spin_lock(&tmpres->spinlock); + if (tmpres->owner == dlm->node_num) { + BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF); + dlm_lockres_grab_inflight_ref(dlm, tmpres); + } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) + dropping_ref = 1; + spin_unlock(&tmpres->spinlock); spin_unlock(&dlm->spinlock); + + /* wait until done messaging the master, drop our ref to allow + * the lockres to be purged, start over. */ + if (dropping_ref) { + spin_lock(&tmpres->spinlock); + __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF); + spin_unlock(&tmpres->spinlock); + dlm_lockres_put(tmpres); + tmpres = NULL; + goto lookup; + } + mlog(0, "found in hash!\n"); if (res) dlm_lockres_put(res); @@ -793,6 +855,7 @@ lookup: spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, dlm->node_num); __dlm_insert_lockres(dlm, res); + dlm_lockres_grab_inflight_ref(dlm, res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* lockres still marked IN_PROGRESS */ @@ -805,29 +868,40 @@ lookup: /* if we found a block, wait for lock to be mastered by another node */ blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); if (blocked) { + int mig; if (mle->type == DLM_MLE_MASTER) { mlog(ML_ERROR, "master entry for nonexistent lock!\n"); BUG(); - } else if (mle->type == DLM_MLE_MIGRATION) { - /* migration is in progress! */ - /* the good news is that we now know the - * "current" master (mle->master). */ - + } + mig = (mle->type == DLM_MLE_MIGRATION); + /* if there is a migration in progress, let the migration + * finish before continuing. we can wait for the absence + * of the MIGRATION mle: either the migrate finished or + * one of the nodes died and the mle was cleaned up. + * if there is a BLOCK here, but it already has a master + * set, we are too late. the master does not have a ref + * for us in the refmap. detach the mle and drop it. + * either way, go back to the top and start over. */ + if (mig || mle->master != O2NM_MAX_NODES) { + BUG_ON(mig && mle->master == dlm->node_num); + /* we arrived too late. the master does not + * have a ref for us. retry. */ + mlog(0, "%s:%.*s: late on %s\n", + dlm->name, namelen, lockid, + mig ? "MIGRATION" : "BLOCK"); spin_unlock(&dlm->master_lock); - assert_spin_locked(&dlm->spinlock); - - /* set the lockres owner and hash it */ - spin_lock(&res->spinlock); - dlm_set_lockres_owner(dlm, res, mle->master); - __dlm_insert_lockres(dlm, res); - spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* master is known, detach */ - dlm_mle_detach_hb_events(dlm, mle); + if (!mig) + dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); mle = NULL; - goto wake_waiters; + /* this is lame, but we cant wait on either + * the mle or lockres waitqueue here */ + if (mig) + msleep(100); + goto lookup; } } else { /* go ahead and try to master lock on this node */ @@ -858,6 +932,13 @@ lookup: /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); + /* since this lockres is new it doesnt not require the spinlock */ + dlm_lockres_grab_inflight_ref_new(dlm, res); + + /* if this node does not become the master make sure to drop + * this inflight reference below */ + drop_inflight_if_nonlocal = 1; + /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we @@ -910,7 +991,7 @@ redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { - ret = dlm_do_master_request(mle, nodenum); + ret = dlm_do_master_request(res, mle, nodenum); if (ret < 0) mlog_errno(ret); if (mle->master != O2NM_MAX_NODES) { @@ -960,6 +1041,8 @@ wait: wake_waiters: spin_lock(&res->spinlock); + if (res->owner != dlm->node_num && drop_inflight_if_nonlocal) + dlm_lockres_drop_inflight_ref(dlm, res); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -998,7 +1081,7 @@ recheck: /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ if (res->owner != dlm->node_num) { - ret = dlm_do_master_request(mle, res->owner); + ret = dlm_do_master_request(res, mle, res->owner); if (ret < 0) { /* give recovery a chance to run */ mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); @@ -1062,6 +1145,8 @@ recheck: * now tell other nodes that I am * mastering this. */ mle->master = dlm->node_num; + /* ref was grabbed in get_lock_resource + * will be dropped in dlmlock_master */ assert = 1; sleep = 0; } @@ -1087,7 +1172,8 @@ recheck: (atomic_read(&mle->woken) == 1), timeo); if (res->owner == O2NM_MAX_NODES) { - mlog(0, "waiting again\n"); + mlog(0, "%s:%.*s: waiting again\n", dlm->name, + res->lockname.len, res->lockname.name); goto recheck; } mlog(0, "done waiting, master is %u\n", res->owner); @@ -1100,8 +1186,7 @@ recheck: m = dlm->node_num; mlog(0, "about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, m); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, mle->vote_map, 0); + ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0); if (ret) { /* This is a failure in the network path, * not in the response to the assert_master @@ -1117,6 +1202,8 @@ recheck: /* set the lockres owner */ spin_lock(&res->spinlock); + /* mastery reference obtained either during + * assert_master_handler or in get_lock_resource */ dlm_change_lockres_owner(dlm, res, m); spin_unlock(&res->spinlock); @@ -1283,7 +1370,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, * */ -static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to) +static int dlm_do_master_request(struct dlm_lock_resource *res, + struct dlm_master_list_entry *mle, int to) { struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; @@ -1339,6 +1427,9 @@ again: case DLM_MASTER_RESP_YES: set_bit(to, mle->response_map); mlog(0, "node %u is the master, response=YES\n", to); + mlog(0, "%s:%.*s: master node %u now knows I have a " + "reference\n", dlm->name, res->lockname.len, + res->lockname.name, to); mle->master = to; break; case DLM_MASTER_RESP_NO: @@ -1379,7 +1470,8 @@ out: * * if possible, TRIM THIS DOWN!!! */ -int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { u8 response = DLM_MASTER_RESP_MAYBE; struct dlm_ctxt *dlm = data; @@ -1417,10 +1509,11 @@ way_up_top: /* take care of the easy cases up front */ spin_lock(&res->spinlock); - if (res->state & DLM_LOCK_RES_RECOVERING) { + if (res->state & (DLM_LOCK_RES_RECOVERING| + DLM_LOCK_RES_MIGRATING)) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " - "being recovered\n"); + "being recovered/migrated\n"); response = DLM_MASTER_RESP_ERROR; if (mle) kmem_cache_free(dlm_mle_cache, mle); @@ -1428,8 +1521,10 @@ way_up_top: } if (res->owner == dlm->node_num) { + mlog(0, "%s:%.*s: setting bit %u in refmap\n", + dlm->name, namelen, name, request->node_idx); + dlm_lockres_set_refmap_bit(request->node_idx, res); spin_unlock(&res->spinlock); - // mlog(0, "this node is the master\n"); response = DLM_MASTER_RESP_YES; if (mle) kmem_cache_free(dlm_mle_cache, mle); @@ -1477,7 +1572,6 @@ way_up_top: mlog(0, "node %u is master, but trying to migrate to " "node %u.\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { - response = DLM_MASTER_RESP_YES; mlog(ML_ERROR, "no owner on lockres, but this " "node is trying to migrate it to %u?!\n", tmpmle->new_master); @@ -1494,6 +1588,10 @@ way_up_top: * go back and clean the mles on any * other nodes */ dispatch_assert = 1; + dlm_lockres_set_refmap_bit(request->node_idx, res); + mlog(0, "%s:%.*s: setting bit %u in refmap\n", + dlm->name, namelen, name, + request->node_idx); } else response = DLM_MASTER_RESP_NO; } else { @@ -1607,17 +1705,24 @@ send_response: * can periodically run all locks owned by this node * and re-assert across the cluster... */ -static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, - unsigned int namelen, void *nodemap, - u32 flags) +int dlm_do_assert_master(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + void *nodemap, u32 flags) { struct dlm_assert_master assert; int to, tmpret; struct dlm_node_iter iter; int ret = 0; int reassert; + const char *lockname = res->lockname.name; + unsigned int namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); + + spin_lock(&res->spinlock); + res->state |= DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + again: reassert = 0; @@ -1647,6 +1752,7 @@ again: mlog(0, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; + r = 0; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " @@ -1661,17 +1767,39 @@ again: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); BUG(); - } else if (r == EAGAIN) { + } + + if (r & DLM_ASSERT_RESPONSE_REASSERT && + !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) { + mlog(ML_ERROR, "%.*s: very strange, " + "master MLE but no lockres on %u\n", + namelen, lockname, to); + } + + if (r & DLM_ASSERT_RESPONSE_REASSERT) { mlog(0, "%.*s: node %u create mles on other " "nodes and requests a re-assert\n", namelen, lockname, to); reassert = 1; } + if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) { + mlog(0, "%.*s: node %u has a reference to this " + "lockres, set the bit in the refmap\n", + namelen, lockname, to); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(to, res); + spin_unlock(&res->spinlock); + } } if (reassert) goto again; + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + return ret; } @@ -1684,7 +1812,8 @@ again: * * if possible, TRIM THIS DOWN!!! */ -int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_master_list_entry *mle = NULL; @@ -1693,7 +1822,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) char *name; unsigned int namelen, hash; u32 flags; - int master_request = 0; + int master_request = 0, have_lockres_ref = 0; int ret = 0; if (!dlm_grab(dlm)) @@ -1851,6 +1980,7 @@ ok: spin_unlock(&mle->spinlock); if (res) { + int wake = 0; spin_lock(&res->spinlock); if (mle->type == DLM_MLE_MIGRATION) { mlog(0, "finishing off migration of lockres %.*s, " @@ -1858,12 +1988,16 @@ ok: res->lockname.len, res->lockname.name, dlm->node_num, mle->new_master); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; dlm_change_lockres_owner(dlm, res, mle->new_master); BUG_ON(res->state & DLM_LOCK_RES_DIRTY); } else { dlm_change_lockres_owner(dlm, res, mle->master); } spin_unlock(&res->spinlock); + have_lockres_ref = 1; + if (wake) + wake_up(&res->wq); } /* master is known, detach if not already detached. @@ -1913,12 +2047,28 @@ ok: done: ret = 0; - if (res) - dlm_lockres_put(res); + if (res) { + spin_lock(&res->spinlock); + res->state |= DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + *ret_data = (void *)res; + } dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); - ret = EAGAIN; // positive. negative would shoot down the node. + /* positive. negative would shoot down the node. */ + ret |= DLM_ASSERT_RESPONSE_REASSERT; + if (!have_lockres_ref) { + mlog(ML_ERROR, "strange, got assert from %u, MASTER " + "mle present here for %s:%.*s, but no lockres!\n", + assert->node_idx, dlm->name, namelen, name); + } + } + if (have_lockres_ref) { + /* let the master know we have a reference to the lockres */ + ret |= DLM_ASSERT_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: got assert from %u, need a ref\n", + dlm->name, namelen, name, assert->node_idx); } return ret; @@ -1929,17 +2079,31 @@ kill: __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); - dlm_lockres_put(res); + *ret_data = (void *)res; dlm_put(dlm); return -EINVAL; } +void dlm_assert_master_post_handler(int status, void *data, void *ret_data) +{ + struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data; + + if (ret_data) { + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_SETREF_INPROG; + spin_unlock(&res->spinlock); + wake_up(&res->wq); + dlm_lockres_put(res); + } + return; +} + int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int ignore_higher, u8 request_from, u32 flags) { struct dlm_work_item *item; - item = kcalloc(1, sizeof(*item), GFP_NOFS); + item = kzalloc(sizeof(*item), GFP_NOFS); if (!item) return -ENOMEM; @@ -2023,9 +2187,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, dlm->node_num); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, - nodemap, flags); + ret = dlm_do_assert_master(dlm, res, nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ if (!dlm_is_host_down(ret)) @@ -2097,14 +2259,180 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, return ret; } +/* + * DLM_DEREF_LOCKRES_MSG + */ + +int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +{ + struct dlm_deref_lockres deref; + int ret = 0, r; + const char *lockname; + unsigned int namelen; + + lockname = res->lockname.name; + namelen = res->lockname.len; + BUG_ON(namelen > O2NM_MAX_NAME_LEN); + + mlog(0, "%s:%.*s: sending deref to %d\n", + dlm->name, namelen, lockname, res->owner); + memset(&deref, 0, sizeof(deref)); + deref.node_idx = dlm->node_num; + deref.namelen = namelen; + memcpy(deref.name, lockname, namelen); + + ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, + &deref, sizeof(deref), res->owner, &r); + if (ret < 0) + mlog_errno(ret); + else if (r < 0) { + /* BAD. other node says I did not have a ref. */ + mlog(ML_ERROR,"while dropping ref on %s:%.*s " + "(master=%u) got %d.\n", dlm->name, namelen, + lockname, res->owner, r); + dlm_print_one_lock_resource(res); + BUG(); + } + return ret; +} + +int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) +{ + struct dlm_ctxt *dlm = data; + struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf; + struct dlm_lock_resource *res = NULL; + char *name; + unsigned int namelen; + int ret = -EINVAL; + u8 node; + unsigned int hash; + struct dlm_work_item *item; + int cleared = 0; + int dispatch = 0; + + if (!dlm_grab(dlm)) + return 0; + + name = deref->name; + namelen = deref->namelen; + node = deref->node_idx; + + if (namelen > DLM_LOCKID_NAME_MAX) { + mlog(ML_ERROR, "Invalid name length!"); + goto done; + } + if (deref->node_idx >= O2NM_MAX_NODES) { + mlog(ML_ERROR, "Invalid node number: %u\n", node); + goto done; + } + + hash = dlm_lockid_hash(name, namelen); + + spin_lock(&dlm->spinlock); + res = __dlm_lookup_lockres_full(dlm, name, namelen, hash); + if (!res) { + spin_unlock(&dlm->spinlock); + mlog(ML_ERROR, "%s:%.*s: bad lockres name\n", + dlm->name, namelen, name); + goto done; + } + spin_unlock(&dlm->spinlock); + + spin_lock(&res->spinlock); + if (res->state & DLM_LOCK_RES_SETREF_INPROG) + dispatch = 1; + else { + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + dlm_lockres_clear_refmap_bit(node, res); + cleared = 1; + } + } + spin_unlock(&res->spinlock); + + if (!dispatch) { + if (cleared) + dlm_lockres_calc_usage(dlm, res); + else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, + res->lockname.len, res->lockname.name, node); + __dlm_print_one_lock_resource(res); + } + ret = 0; + goto done; + } + + item = kzalloc(sizeof(*item), GFP_NOFS); + if (!item) { + ret = -ENOMEM; + mlog_errno(ret); + goto done; + } + + dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL); + item->u.dl.deref_res = res; + item->u.dl.deref_node = node; + + spin_lock(&dlm->work_lock); + list_add_tail(&item->list, &dlm->work_list); + spin_unlock(&dlm->work_lock); + + queue_work(dlm->dlm_worker, &dlm->dispatched_work); + return 0; + +done: + if (res) + dlm_lockres_put(res); + dlm_put(dlm); + + return ret; +} + +static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data) +{ + struct dlm_ctxt *dlm; + struct dlm_lock_resource *res; + u8 node; + u8 cleared = 0; + + dlm = item->dlm; + res = item->u.dl.deref_res; + node = item->u.dl.deref_node; + + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); + if (test_bit(node, res->refmap)) { + __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); + dlm_lockres_clear_refmap_bit(node, res); + cleared = 1; + } + spin_unlock(&res->spinlock); + + if (cleared) { + mlog(0, "%s:%.*s node %u ref dropped in dispatch\n", + dlm->name, res->lockname.len, res->lockname.name, node); + dlm_lockres_calc_usage(dlm, res); + } else { + mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " + "but it is already dropped!\n", dlm->name, + res->lockname.len, res->lockname.name, node); + __dlm_print_one_lock_resource(res); + } + + dlm_lockres_put(res); +} + /* * DLM_MIGRATE_LOCKRES */ -int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, - u8 target) +static int dlm_migrate_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, + u8 target) { struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; @@ -2116,7 +2444,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct list_head *queue, *iter; int i; struct dlm_lock *lock; - int empty = 1; + int empty = 1, wake = 0; if (!dlm_grab(dlm)) return -EINVAL; @@ -2241,6 +2569,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, res->lockname.name, target); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; spin_unlock(&res->spinlock); ret = -EINVAL; } @@ -2268,6 +2597,9 @@ fail: * the lockres */ + /* now that remote nodes are spinning on the MIGRATING flag, + * ensure that all assert_master work is flushed. */ + flush_workqueue(dlm->dlm_worker); /* get an extra reference on the mle. * otherwise the assert_master from the new @@ -2296,6 +2628,7 @@ fail: dlm_put_mle_inuse(mle); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; spin_unlock(&res->spinlock); goto leave; } @@ -2322,7 +2655,8 @@ fail: res->owner == target) break; - mlog(0, "timed out during migration\n"); + mlog(0, "%s:%.*s: timed out during migration\n", + dlm->name, res->lockname.len, res->lockname.name); /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { @@ -2330,20 +2664,20 @@ fail: "target %u is no longer up, restarting\n", dlm->name, res->lockname.len, res->lockname.name, target); - ret = -ERESTARTSYS; + ret = -EINVAL; + /* migration failed, detach and clean up mle */ + dlm_mle_detach_hb_events(dlm, mle); + dlm_put_mle(mle); + dlm_put_mle_inuse(mle); + spin_lock(&res->spinlock); + res->state &= ~DLM_LOCK_RES_MIGRATING; + wake = 1; + spin_unlock(&res->spinlock); + goto leave; } - } - if (ret == -ERESTARTSYS) { - /* migration failed, detach and clean up mle */ - dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle(mle); - dlm_put_mle_inuse(mle); - spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - spin_unlock(&res->spinlock); - goto leave; - } - /* TODO: if node died: stop, clean up, return error */ + } else + mlog(0, "%s:%.*s: caught signal during migration\n", + dlm->name, res->lockname.len, res->lockname.name); } /* all done, set the owner, clear the flag */ @@ -2366,6 +2700,11 @@ leave: if (ret < 0) dlm_kick_thread(dlm, res); + /* wake up waiters if the MIGRATING flag got set + * but migration failed */ + if (wake) + wake_up(&res->wq); + /* TODO: cleanup */ if (mres) free_page((unsigned long)mres); @@ -2376,6 +2715,53 @@ leave: return ret; } +#define DLM_MIGRATION_RETRY_MS 100 + +/* Should be called only after beginning the domain leave process. + * There should not be any remaining locks on nonlocal lock resources, + * and there should be no local locks left on locally mastered resources. + * + * Called with the dlm spinlock held, may drop it to do migration, but + * will re-acquire before exit. + * + * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */ +int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +{ + int ret; + int lock_dropped = 0; + + if (res->owner != dlm->node_num) { + if (!__dlm_lockres_unused(res)) { + mlog(ML_ERROR, "%s:%.*s: this node is not master, " + "trying to free this but locks remain\n", + dlm->name, res->lockname.len, res->lockname.name); + } + goto leave; + } + + /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */ + spin_unlock(&dlm->spinlock); + lock_dropped = 1; + while (1) { + ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES); + if (ret >= 0) + break; + if (ret == -ENOTEMPTY) { + mlog(ML_ERROR, "lockres %.*s still has local locks!\n", + res->lockname.len, res->lockname.name); + BUG(); + } + + mlog(0, "lockres %.*s: migrate failed, " + "retrying\n", res->lockname.len, + res->lockname.name); + msleep(DLM_MIGRATION_RETRY_MS); + } + spin_lock(&dlm->spinlock); +leave: + return lock_dropped; +} + int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) { int ret; @@ -2405,7 +2791,8 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm, return can_proceed; } -int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) +static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { int ret; spin_lock(&res->spinlock); @@ -2434,8 +2821,15 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, __dlm_lockres_reserve_ast(res); spin_unlock(&res->spinlock); - /* now flush all the pending asts.. hang out for a bit */ + /* now flush all the pending asts */ dlm_kick_thread(dlm, res); + /* before waiting on DIRTY, block processes which may + * try to dirty the lockres before MIGRATING is set */ + spin_lock(&res->spinlock); + BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY); + res->state |= DLM_LOCK_RES_BLOCK_DIRTY; + spin_unlock(&res->spinlock); + /* now wait on any pending asts and the DIRTY state */ wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); dlm_lockres_release_ast(dlm, res); @@ -2461,6 +2855,13 @@ again: mlog(0, "trying again...\n"); goto again; } + /* now that we are sure the MIGRATING state is there, drop + * the unneded state which blocked threads trying to DIRTY */ + spin_lock(&res->spinlock); + BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY)); + BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING)); + res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY; + spin_unlock(&res->spinlock); /* did the target go down or die? */ spin_lock(&dlm->spinlock); @@ -2490,7 +2891,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, { struct list_head *iter, *iter2; struct list_head *queue = &res->granted; - int i; + int i, bit; struct dlm_lock *lock; assert_spin_locked(&res->spinlock); @@ -2508,12 +2909,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); + dlm_lockres_clear_refmap_bit(lock->ml.node, res); list_del_init(&lock->list); dlm_lock_put(lock); } } queue++; } + bit = 0; + while (1) { + bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit); + if (bit >= O2NM_MAX_NODES) + break; + /* do not clear the local node reference, if there is a + * process holding this, let it drop the ref itself */ + if (bit != dlm->node_num) { + mlog(0, "%s:%.*s: node %u had a ref to this " + "migrating lockres, clearing\n", dlm->name, + res->lockname.len, res->lockname.name, bit); + dlm_lockres_clear_refmap_bit(bit, res); + } + bit++; + } } /* for now this is not too intelligent. we will @@ -2601,6 +3018,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, mlog(0, "migrate request (node %u) returned %d!\n", nodenum, status); ret = status; + } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) { + /* during the migration request we short-circuited + * the mastery of the lockres. make sure we have + * a mastery ref for nodenum */ + mlog(0, "%s:%.*s: need ref for node %u\n", + dlm->name, res->lockname.len, res->lockname.name, + nodenum); + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(nodenum, res); + spin_unlock(&res->spinlock); } } @@ -2619,7 +3046,8 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, * we will have no mle in the list to start with. now we can add an mle for * the migration and this should be the only one found for those scanning the * list. */ -int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) +int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, + void **ret_data) { struct dlm_ctxt *dlm = data; struct dlm_lock_resource *res = NULL; @@ -2745,7 +3173,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); - __dlm_mle_detach_hb_events(dlm, mle); + /* this was obviously WRONG. mle is uninited here. should be tmp. */ + __dlm_mle_detach_hb_events(dlm, tmp); + ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: master=%u, newmaster=%u, " + "telling master to get ref for cleared out mle " + "during migration\n", dlm->name, namelen, name, + master, new_master); } spin_unlock(&tmp->spinlock); } @@ -2753,6 +3187,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* now add a migration mle to the tail of the list */ dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); mle->new_master = new_master; + /* the new master will be sending an assert master for this. + * at that point we will get the refmap reference */ mle->master = master; /* do this for consistency with other mle types */ set_bit(new_master, mle->maybe_map); @@ -2902,6 +3338,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, clear_bit(dlm->node_num, iter.node_map); spin_unlock(&dlm->spinlock); + /* ownership of the lockres is changing. account for the + * mastery reference here since old_master will briefly have + * a reference after the migration completes */ + spin_lock(&res->spinlock); + dlm_lockres_set_refmap_bit(old_master, res); + spin_unlock(&res->spinlock); + mlog(0, "now time to do a migrate request to other nodes\n"); ret = dlm_do_migrate_request(dlm, res, old_master, dlm->node_num, &iter); @@ -2914,8 +3357,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, res->lockname.len, res->lockname.name); /* this call now finishes out the nodemap * even if one or more nodes die */ - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, iter.node_map, + ret = dlm_do_assert_master(dlm, res, iter.node_map, DLM_ASSERT_MASTER_FINISH_MIGRATION); if (ret < 0) { /* no longer need to retry. all living nodes contacted. */ @@ -2927,8 +3369,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, set_bit(old_master, iter.node_map); mlog(0, "doing assert master of %.*s back to %u\n", res->lockname.len, res->lockname.name, old_master); - ret = dlm_do_assert_master(dlm, res->lockname.name, - res->lockname.len, iter.node_map, + ret = dlm_do_assert_master(dlm, res, iter.node_map, DLM_ASSERT_MASTER_FINISH_MIGRATION); if (ret < 0) { mlog(0, "assert master to original master failed " |