summaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c579
1 files changed, 510 insertions, 69 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0ad872055cb..77e4e6169a0 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -99,9 +99,10 @@ static void dlm_mle_node_up(struct dlm_ctxt *dlm,
int idx);
static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
- unsigned int namelen, void *nodemap,
- u32 flags);
+static int dlm_do_assert_master(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ void *nodemap, u32 flags);
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
@@ -237,7 +238,8 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
struct dlm_master_list_entry **mle,
char *name, unsigned int namelen);
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle, int to);
static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
@@ -687,6 +689,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
INIT_LIST_HEAD(&res->purge);
atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0;
+ res->inflight_locks = 0;
kref_init(&res->refs);
@@ -700,6 +703,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
res->last_used = 0;
memset(res->lvb, 0, DLM_LVB_LEN);
+ memset(res->refmap, 0, sizeof(res->refmap));
}
struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
@@ -722,6 +726,42 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
return res;
}
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ int new_lockres,
+ const char *file,
+ int line)
+{
+ if (!new_lockres)
+ assert_spin_locked(&res->spinlock);
+
+ if (!test_bit(dlm->node_num, res->refmap)) {
+ BUG_ON(res->inflight_locks != 0);
+ dlm_lockres_set_refmap_bit(dlm->node_num, res);
+ }
+ res->inflight_locks++;
+ mlog(0, "%s:%.*s: inflight++: now %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->inflight_locks);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ const char *file,
+ int line)
+{
+ assert_spin_locked(&res->spinlock);
+
+ BUG_ON(res->inflight_locks == 0);
+ res->inflight_locks--;
+ mlog(0, "%s:%.*s: inflight--: now %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->inflight_locks);
+ if (res->inflight_locks == 0)
+ dlm_lockres_clear_refmap_bit(dlm->node_num, res);
+ wake_up(&res->wq);
+}
+
/*
* lookup a lock resource by name.
* may already exist in the hashtable.
@@ -752,6 +792,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
unsigned int hash;
int tries = 0;
int bit, wait_on_recovery = 0;
+ int drop_inflight_if_nonlocal = 0;
BUG_ON(!lockid);
@@ -761,9 +802,30 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
lookup:
spin_lock(&dlm->spinlock);
- tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
+ tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
if (tmpres) {
+ int dropping_ref = 0;
+
+ spin_lock(&tmpres->spinlock);
+ if (tmpres->owner == dlm->node_num) {
+ BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
+ dlm_lockres_grab_inflight_ref(dlm, tmpres);
+ } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
+ dropping_ref = 1;
+ spin_unlock(&tmpres->spinlock);
spin_unlock(&dlm->spinlock);
+
+ /* wait until done messaging the master, drop our ref to allow
+ * the lockres to be purged, start over. */
+ if (dropping_ref) {
+ spin_lock(&tmpres->spinlock);
+ __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
+ spin_unlock(&tmpres->spinlock);
+ dlm_lockres_put(tmpres);
+ tmpres = NULL;
+ goto lookup;
+ }
+
mlog(0, "found in hash!\n");
if (res)
dlm_lockres_put(res);
@@ -793,6 +855,7 @@ lookup:
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, dlm->node_num);
__dlm_insert_lockres(dlm, res);
+ dlm_lockres_grab_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
/* lockres still marked IN_PROGRESS */
@@ -805,29 +868,40 @@ lookup:
/* if we found a block, wait for lock to be mastered by another node */
blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
if (blocked) {
+ int mig;
if (mle->type == DLM_MLE_MASTER) {
mlog(ML_ERROR, "master entry for nonexistent lock!\n");
BUG();
- } else if (mle->type == DLM_MLE_MIGRATION) {
- /* migration is in progress! */
- /* the good news is that we now know the
- * "current" master (mle->master). */
-
+ }
+ mig = (mle->type == DLM_MLE_MIGRATION);
+ /* if there is a migration in progress, let the migration
+ * finish before continuing. we can wait for the absence
+ * of the MIGRATION mle: either the migrate finished or
+ * one of the nodes died and the mle was cleaned up.
+ * if there is a BLOCK here, but it already has a master
+ * set, we are too late. the master does not have a ref
+ * for us in the refmap. detach the mle and drop it.
+ * either way, go back to the top and start over. */
+ if (mig || mle->master != O2NM_MAX_NODES) {
+ BUG_ON(mig && mle->master == dlm->node_num);
+ /* we arrived too late. the master does not
+ * have a ref for us. retry. */
+ mlog(0, "%s:%.*s: late on %s\n",
+ dlm->name, namelen, lockid,
+ mig ? "MIGRATION" : "BLOCK");
spin_unlock(&dlm->master_lock);
- assert_spin_locked(&dlm->spinlock);
-
- /* set the lockres owner and hash it */
- spin_lock(&res->spinlock);
- dlm_set_lockres_owner(dlm, res, mle->master);
- __dlm_insert_lockres(dlm, res);
- spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
/* master is known, detach */
- dlm_mle_detach_hb_events(dlm, mle);
+ if (!mig)
+ dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
mle = NULL;
- goto wake_waiters;
+ /* this is lame, but we cant wait on either
+ * the mle or lockres waitqueue here */
+ if (mig)
+ msleep(100);
+ goto lookup;
}
} else {
/* go ahead and try to master lock on this node */
@@ -858,6 +932,13 @@ lookup:
/* finally add the lockres to its hash bucket */
__dlm_insert_lockres(dlm, res);
+ /* since this lockres is new it doesnt not require the spinlock */
+ dlm_lockres_grab_inflight_ref_new(dlm, res);
+
+ /* if this node does not become the master make sure to drop
+ * this inflight reference below */
+ drop_inflight_if_nonlocal = 1;
+
/* get an extra ref on the mle in case this is a BLOCK
* if so, the creator of the BLOCK may try to put the last
* ref at this time in the assert master handler, so we
@@ -910,7 +991,7 @@ redo_request:
ret = -EINVAL;
dlm_node_iter_init(mle->vote_map, &iter);
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
- ret = dlm_do_master_request(mle, nodenum);
+ ret = dlm_do_master_request(res, mle, nodenum);
if (ret < 0)
mlog_errno(ret);
if (mle->master != O2NM_MAX_NODES) {
@@ -960,6 +1041,8 @@ wait:
wake_waiters:
spin_lock(&res->spinlock);
+ if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
+ dlm_lockres_drop_inflight_ref(dlm, res);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
@@ -998,7 +1081,7 @@ recheck:
/* this will cause the master to re-assert across
* the whole cluster, freeing up mles */
if (res->owner != dlm->node_num) {
- ret = dlm_do_master_request(mle, res->owner);
+ ret = dlm_do_master_request(res, mle, res->owner);
if (ret < 0) {
/* give recovery a chance to run */
mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
@@ -1062,6 +1145,8 @@ recheck:
* now tell other nodes that I am
* mastering this. */
mle->master = dlm->node_num;
+ /* ref was grabbed in get_lock_resource
+ * will be dropped in dlmlock_master */
assert = 1;
sleep = 0;
}
@@ -1087,7 +1172,8 @@ recheck:
(atomic_read(&mle->woken) == 1),
timeo);
if (res->owner == O2NM_MAX_NODES) {
- mlog(0, "waiting again\n");
+ mlog(0, "%s:%.*s: waiting again\n", dlm->name,
+ res->lockname.len, res->lockname.name);
goto recheck;
}
mlog(0, "done waiting, master is %u\n", res->owner);
@@ -1100,8 +1186,7 @@ recheck:
m = dlm->node_num;
mlog(0, "about to master %.*s here, this=%u\n",
res->lockname.len, res->lockname.name, m);
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len, mle->vote_map, 0);
+ ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
if (ret) {
/* This is a failure in the network path,
* not in the response to the assert_master
@@ -1117,6 +1202,8 @@ recheck:
/* set the lockres owner */
spin_lock(&res->spinlock);
+ /* mastery reference obtained either during
+ * assert_master_handler or in get_lock_resource */
dlm_change_lockres_owner(dlm, res, m);
spin_unlock(&res->spinlock);
@@ -1283,7 +1370,8 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
*
*/
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle, int to)
{
struct dlm_ctxt *dlm = mle->dlm;
struct dlm_master_request request;
@@ -1339,6 +1427,9 @@ again:
case DLM_MASTER_RESP_YES:
set_bit(to, mle->response_map);
mlog(0, "node %u is the master, response=YES\n", to);
+ mlog(0, "%s:%.*s: master node %u now knows I have a "
+ "reference\n", dlm->name, res->lockname.len,
+ res->lockname.name, to);
mle->master = to;
break;
case DLM_MASTER_RESP_NO:
@@ -1379,7 +1470,8 @@ out:
*
* if possible, TRIM THIS DOWN!!!
*/
-int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
u8 response = DLM_MASTER_RESP_MAYBE;
struct dlm_ctxt *dlm = data;
@@ -1417,10 +1509,11 @@ way_up_top:
/* take care of the easy cases up front */
spin_lock(&res->spinlock);
- if (res->state & DLM_LOCK_RES_RECOVERING) {
+ if (res->state & (DLM_LOCK_RES_RECOVERING|
+ DLM_LOCK_RES_MIGRATING)) {
spin_unlock(&res->spinlock);
mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
- "being recovered\n");
+ "being recovered/migrated\n");
response = DLM_MASTER_RESP_ERROR;
if (mle)
kmem_cache_free(dlm_mle_cache, mle);
@@ -1428,8 +1521,10 @@ way_up_top:
}
if (res->owner == dlm->node_num) {
+ mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+ dlm->name, namelen, name, request->node_idx);
+ dlm_lockres_set_refmap_bit(request->node_idx, res);
spin_unlock(&res->spinlock);
- // mlog(0, "this node is the master\n");
response = DLM_MASTER_RESP_YES;
if (mle)
kmem_cache_free(dlm_mle_cache, mle);
@@ -1477,7 +1572,6 @@ way_up_top:
mlog(0, "node %u is master, but trying to migrate to "
"node %u.\n", tmpmle->master, tmpmle->new_master);
if (tmpmle->master == dlm->node_num) {
- response = DLM_MASTER_RESP_YES;
mlog(ML_ERROR, "no owner on lockres, but this "
"node is trying to migrate it to %u?!\n",
tmpmle->new_master);
@@ -1494,6 +1588,10 @@ way_up_top:
* go back and clean the mles on any
* other nodes */
dispatch_assert = 1;
+ dlm_lockres_set_refmap_bit(request->node_idx, res);
+ mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+ dlm->name, namelen, name,
+ request->node_idx);
} else
response = DLM_MASTER_RESP_NO;
} else {
@@ -1607,17 +1705,24 @@ send_response:
* can periodically run all locks owned by this node
* and re-assert across the cluster...
*/
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
- unsigned int namelen, void *nodemap,
- u32 flags)
+int dlm_do_assert_master(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ void *nodemap, u32 flags)
{
struct dlm_assert_master assert;
int to, tmpret;
struct dlm_node_iter iter;
int ret = 0;
int reassert;
+ const char *lockname = res->lockname.name;
+ unsigned int namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+
again:
reassert = 0;
@@ -1647,6 +1752,7 @@ again:
mlog(0, "link to %d went down!\n", to);
/* any nonzero status return will do */
ret = tmpret;
+ r = 0;
} else if (r < 0) {
/* ok, something horribly messed. kill thyself. */
mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1661,17 +1767,39 @@ again:
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
BUG();
- } else if (r == EAGAIN) {
+ }
+
+ if (r & DLM_ASSERT_RESPONSE_REASSERT &&
+ !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
+ mlog(ML_ERROR, "%.*s: very strange, "
+ "master MLE but no lockres on %u\n",
+ namelen, lockname, to);
+ }
+
+ if (r & DLM_ASSERT_RESPONSE_REASSERT) {
mlog(0, "%.*s: node %u create mles on other "
"nodes and requests a re-assert\n",
namelen, lockname, to);
reassert = 1;
}
+ if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
+ mlog(0, "%.*s: node %u has a reference to this "
+ "lockres, set the bit in the refmap\n",
+ namelen, lockname, to);
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(to, res);
+ spin_unlock(&res->spinlock);
+ }
}
if (reassert)
goto again;
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+
return ret;
}
@@ -1684,7 +1812,8 @@ again:
*
* if possible, TRIM THIS DOWN!!!
*/
-int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_master_list_entry *mle = NULL;
@@ -1693,7 +1822,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
char *name;
unsigned int namelen, hash;
u32 flags;
- int master_request = 0;
+ int master_request = 0, have_lockres_ref = 0;
int ret = 0;
if (!dlm_grab(dlm))
@@ -1851,6 +1980,7 @@ ok:
spin_unlock(&mle->spinlock);
if (res) {
+ int wake = 0;
spin_lock(&res->spinlock);
if (mle->type == DLM_MLE_MIGRATION) {
mlog(0, "finishing off migration of lockres %.*s, "
@@ -1858,12 +1988,16 @@ ok:
res->lockname.len, res->lockname.name,
dlm->node_num, mle->new_master);
res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
dlm_change_lockres_owner(dlm, res, mle->new_master);
BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
} else {
dlm_change_lockres_owner(dlm, res, mle->master);
}
spin_unlock(&res->spinlock);
+ have_lockres_ref = 1;
+ if (wake)
+ wake_up(&res->wq);
}
/* master is known, detach if not already detached.
@@ -1913,12 +2047,28 @@ ok:
done:
ret = 0;
- if (res)
- dlm_lockres_put(res);
+ if (res) {
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ *ret_data = (void *)res;
+ }
dlm_put(dlm);
if (master_request) {
mlog(0, "need to tell master to reassert\n");
- ret = EAGAIN; // positive. negative would shoot down the node.
+ /* positive. negative would shoot down the node. */
+ ret |= DLM_ASSERT_RESPONSE_REASSERT;
+ if (!have_lockres_ref) {
+ mlog(ML_ERROR, "strange, got assert from %u, MASTER "
+ "mle present here for %s:%.*s, but no lockres!\n",
+ assert->node_idx, dlm->name, namelen, name);
+ }
+ }
+ if (have_lockres_ref) {
+ /* let the master know we have a reference to the lockres */
+ ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
+ mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
+ dlm->name, namelen, name, assert->node_idx);
}
return ret;
@@ -1929,11 +2079,25 @@ kill:
__dlm_print_one_lock_resource(res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
- dlm_lockres_put(res);
+ *ret_data = (void *)res;
dlm_put(dlm);
return -EINVAL;
}
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
+{
+ struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
+
+ if (ret_data) {
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ dlm_lockres_put(res);
+ }
+ return;
+}
+
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int ignore_higher, u8 request_from, u32 flags)
@@ -2023,9 +2187,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
* even if one or more nodes die */
mlog(0, "worker about to master %.*s here, this=%u\n",
res->lockname.len, res->lockname.name, dlm->node_num);
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len,
- nodemap, flags);
+ ret = dlm_do_assert_master(dlm, res, nodemap, flags);
if (ret < 0) {
/* no need to restart, we are done */
if (!dlm_is_host_down(ret))
@@ -2097,14 +2259,180 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
return ret;
}
+/*
+ * DLM_DEREF_LOCKRES_MSG
+ */
+
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+ struct dlm_deref_lockres deref;
+ int ret = 0, r;
+ const char *lockname;
+ unsigned int namelen;
+
+ lockname = res->lockname.name;
+ namelen = res->lockname.len;
+ BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+ mlog(0, "%s:%.*s: sending deref to %d\n",
+ dlm->name, namelen, lockname, res->owner);
+ memset(&deref, 0, sizeof(deref));
+ deref.node_idx = dlm->node_num;
+ deref.namelen = namelen;
+ memcpy(deref.name, lockname, namelen);
+
+ ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
+ &deref, sizeof(deref), res->owner, &r);
+ if (ret < 0)
+ mlog_errno(ret);
+ else if (r < 0) {
+ /* BAD. other node says I did not have a ref. */
+ mlog(ML_ERROR,"while dropping ref on %s:%.*s "
+ "(master=%u) got %d.\n", dlm->name, namelen,
+ lockname, res->owner, r);
+ dlm_print_one_lock_resource(res);
+ BUG();
+ }
+ return ret;
+}
+
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
+{
+ struct dlm_ctxt *dlm = data;
+ struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
+ struct dlm_lock_resource *res = NULL;
+ char *name;
+ unsigned int namelen;
+ int ret = -EINVAL;
+ u8 node;
+ unsigned int hash;
+ struct dlm_work_item *item;
+ int cleared = 0;
+ int dispatch = 0;
+
+ if (!dlm_grab(dlm))
+ return 0;
+
+ name = deref->name;
+ namelen = deref->namelen;
+ node = deref->node_idx;
+
+ if (namelen > DLM_LOCKID_NAME_MAX) {
+ mlog(ML_ERROR, "Invalid name length!");
+ goto done;
+ }
+ if (deref->node_idx >= O2NM_MAX_NODES) {
+ mlog(ML_ERROR, "Invalid node number: %u\n", node);
+ goto done;
+ }
+
+ hash = dlm_lockid_hash(name, namelen);
+
+ spin_lock(&dlm->spinlock);
+ res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
+ if (!res) {
+ spin_unlock(&dlm->spinlock);
+ mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
+ dlm->name, namelen, name);
+ goto done;
+ }
+ spin_unlock(&dlm->spinlock);
+
+ spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_SETREF_INPROG)
+ dispatch = 1;
+ else {
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
+ }
+ spin_unlock(&res->spinlock);
+
+ if (!dispatch) {
+ if (cleared)
+ dlm_lockres_calc_usage(dlm, res);
+ else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+ ret = 0;
+ goto done;
+ }
+
+ item = kzalloc(sizeof(*item), GFP_NOFS);
+ if (!item) {
+ ret = -ENOMEM;
+ mlog_errno(ret);
+ goto done;
+ }
+
+ dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
+ item->u.dl.deref_res = res;
+ item->u.dl.deref_node = node;
+
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+
+ queue_work(dlm->dlm_worker, &dlm->dispatched_work);
+ return 0;
+
+done:
+ if (res)
+ dlm_lockres_put(res);
+ dlm_put(dlm);
+
+ return ret;
+}
+
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
+{
+ struct dlm_ctxt *dlm;
+ struct dlm_lock_resource *res;
+ u8 node;
+ u8 cleared = 0;
+
+ dlm = item->dlm;
+ res = item->u.dl.deref_res;
+ node = item->u.dl.deref_node;
+
+ spin_lock(&res->spinlock);
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
+ spin_unlock(&res->spinlock);
+
+ if (cleared) {
+ mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
+ dlm->name, res->lockname.len, res->lockname.name, node);
+ dlm_lockres_calc_usage(dlm, res);
+ } else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+
+ dlm_lockres_put(res);
+}
+
/*
* DLM_MIGRATE_LOCKRES
*/
-int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
- u8 target)
+static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ u8 target)
{
struct dlm_master_list_entry *mle = NULL;
struct dlm_master_list_entry *oldmle = NULL;
@@ -2116,7 +2444,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
struct list_head *queue, *iter;
int i;
struct dlm_lock *lock;
- int empty = 1;
+ int empty = 1, wake = 0;
if (!dlm_grab(dlm))
return -EINVAL;
@@ -2241,6 +2569,7 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
res->lockname.name, target);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
spin_unlock(&res->spinlock);
ret = -EINVAL;
}
@@ -2268,6 +2597,9 @@ fail:
* the lockres
*/
+ /* now that remote nodes are spinning on the MIGRATING flag,
+ * ensure that all assert_master work is flushed. */
+ flush_workqueue(dlm->dlm_worker);
/* get an extra reference on the mle.
* otherwise the assert_master from the new
@@ -2296,6 +2628,7 @@ fail:
dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
spin_unlock(&res->spinlock);
goto leave;
}
@@ -2322,7 +2655,8 @@ fail:
res->owner == target)
break;
- mlog(0, "timed out during migration\n");
+ mlog(0, "%s:%.*s: timed out during migration\n",
+ dlm->name, res->lockname.len, res->lockname.name);
/* avoid hang during shutdown when migrating lockres
* to a node which also goes down */
if (dlm_is_node_dead(dlm, target)) {
@@ -2330,20 +2664,20 @@ fail:
"target %u is no longer up, restarting\n",
dlm->name, res->lockname.len,
res->lockname.name, target);
- ret = -ERESTARTSYS;
+ ret = -EINVAL;
+ /* migration failed, detach and clean up mle */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ dlm_put_mle_inuse(mle);
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
+ spin_unlock(&res->spinlock);
+ goto leave;
}
- }
- if (ret == -ERESTARTSYS) {
- /* migration failed, detach and clean up mle */
- dlm_mle_detach_hb_events(dlm, mle);
- dlm_put_mle(mle);
- dlm_put_mle_inuse(mle);
- spin_lock(&res->spinlock);
- res->state &= ~DLM_LOCK_RES_MIGRATING;
- spin_unlock(&res->spinlock);
- goto leave;
- }
- /* TODO: if node died: stop, clean up, return error */
+ } else
+ mlog(0, "%s:%.*s: caught signal during migration\n",
+ dlm->name, res->lockname.len, res->lockname.name);
}
/* all done, set the owner, clear the flag */
@@ -2366,6 +2700,11 @@ leave:
if (ret < 0)
dlm_kick_thread(dlm, res);
+ /* wake up waiters if the MIGRATING flag got set
+ * but migration failed */
+ if (wake)
+ wake_up(&res->wq);
+
/* TODO: cleanup */
if (mres)
free_page((unsigned long)mres);
@@ -2376,6 +2715,53 @@ leave:
return ret;
}
+#define DLM_MIGRATION_RETRY_MS 100
+
+/* Should be called only after beginning the domain leave process.
+ * There should not be any remaining locks on nonlocal lock resources,
+ * and there should be no local locks left on locally mastered resources.
+ *
+ * Called with the dlm spinlock held, may drop it to do migration, but
+ * will re-acquire before exit.
+ *
+ * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+ int ret;
+ int lock_dropped = 0;
+
+ if (res->owner != dlm->node_num) {
+ if (!__dlm_lockres_unused(res)) {
+ mlog(ML_ERROR, "%s:%.*s: this node is not master, "
+ "trying to free this but locks remain\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ }
+ goto leave;
+ }
+
+ /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
+ spin_unlock(&dlm->spinlock);
+ lock_dropped = 1;
+ while (1) {
+ ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
+ if (ret >= 0)
+ break;
+ if (ret == -ENOTEMPTY) {
+ mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
+ res->lockname.len, res->lockname.name);
+ BUG();
+ }
+
+ mlog(0, "lockres %.*s: migrate failed, "
+ "retrying\n", res->lockname.len,
+ res->lockname.name);
+ msleep(DLM_MIGRATION_RETRY_MS);
+ }
+ spin_lock(&dlm->spinlock);
+leave:
+ return lock_dropped;
+}
+
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
{
int ret;
@@ -2405,7 +2791,8 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
return can_proceed;
}
-int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
{
int ret;
spin_lock(&res->spinlock);
@@ -2434,8 +2821,15 @@ static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
__dlm_lockres_reserve_ast(res);
spin_unlock(&res->spinlock);
- /* now flush all the pending asts.. hang out for a bit */
+ /* now flush all the pending asts */
dlm_kick_thread(dlm, res);
+ /* before waiting on DIRTY, block processes which may
+ * try to dirty the lockres before MIGRATING is set */
+ spin_lock(&res->spinlock);
+ BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
+ res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
+ spin_unlock(&res->spinlock);
+ /* now wait on any pending asts and the DIRTY state */
wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
dlm_lockres_release_ast(dlm, res);
@@ -2461,6 +2855,13 @@ again:
mlog(0, "trying again...\n");
goto again;
}
+ /* now that we are sure the MIGRATING state is there, drop
+ * the unneded state which blocked threads trying to DIRTY */
+ spin_lock(&res->spinlock);
+ BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
+ BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
+ res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
+ spin_unlock(&res->spinlock);
/* did the target go down or die? */
spin_lock(&dlm->spinlock);
@@ -2490,7 +2891,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
{
struct list_head *iter, *iter2;
struct list_head *queue = &res->granted;
- int i;
+ int i, bit;
struct dlm_lock *lock;
assert_spin_locked(&res->spinlock);
@@ -2508,12 +2909,28 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
BUG_ON(!list_empty(&lock->bast_list));
BUG_ON(lock->ast_pending);
BUG_ON(lock->bast_pending);
+ dlm_lockres_clear_refmap_bit(lock->ml.node, res);
list_del_init(&lock->list);
dlm_lock_put(lock);
}
}
queue++;
}
+ bit = 0;
+ while (1) {
+ bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
+ if (bit >= O2NM_MAX_NODES)
+ break;
+ /* do not clear the local node reference, if there is a
+ * process holding this, let it drop the ref itself */
+ if (bit != dlm->node_num) {
+ mlog(0, "%s:%.*s: node %u had a ref to this "
+ "migrating lockres, clearing\n", dlm->name,
+ res->lockname.len, res->lockname.name, bit);
+ dlm_lockres_clear_refmap_bit(bit, res);
+ }
+ bit++;
+ }
}
/* for now this is not too intelligent. we will
@@ -2601,6 +3018,16 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
mlog(0, "migrate request (node %u) returned %d!\n",
nodenum, status);
ret = status;
+ } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
+ /* during the migration request we short-circuited
+ * the mastery of the lockres. make sure we have
+ * a mastery ref for nodenum */
+ mlog(0, "%s:%.*s: need ref for node %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ nodenum);
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(nodenum, res);
+ spin_unlock(&res->spinlock);
}
}
@@ -2619,7 +3046,8 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
* we will have no mle in the list to start with. now we can add an mle for
* the migration and this should be the only one found for those scanning the
* list. */
-int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_lock_resource *res = NULL;
@@ -2745,7 +3173,13 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
/* remove it from the list so that only one
* mle will be found */
list_del_init(&tmp->list);
- __dlm_mle_detach_hb_events(dlm, mle);
+ /* this was obviously WRONG. mle is uninited here. should be tmp. */
+ __dlm_mle_detach_hb_events(dlm, tmp);
+ ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
+ mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
+ "telling master to get ref for cleared out mle "
+ "during migration\n", dlm->name, namelen, name,
+ master, new_master);
}
spin_unlock(&tmp->spinlock);
}
@@ -2753,6 +3187,8 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
/* now add a migration mle to the tail of the list */
dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
mle->new_master = new_master;
+ /* the new master will be sending an assert master for this.
+ * at that point we will get the refmap reference */
mle->master = master;
/* do this for consistency with other mle types */
set_bit(new_master, mle->maybe_map);
@@ -2902,6 +3338,13 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
clear_bit(dlm->node_num, iter.node_map);
spin_unlock(&dlm->spinlock);
+ /* ownership of the lockres is changing. account for the
+ * mastery reference here since old_master will briefly have
+ * a reference after the migration completes */
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(old_master, res);
+ spin_unlock(&res->spinlock);
+
mlog(0, "now time to do a migrate request to other nodes\n");
ret = dlm_do_migrate_request(dlm, res, old_master,
dlm->node_num, &iter);
@@ -2914,8 +3357,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
res->lockname.len, res->lockname.name);
/* this call now finishes out the nodemap
* even if one or more nodes die */
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len, iter.node_map,
+ ret = dlm_do_assert_master(dlm, res, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
/* no longer need to retry. all living nodes contacted. */
@@ -2927,8 +3369,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
set_bit(old_master, iter.node_map);
mlog(0, "doing assert master of %.*s back to %u\n",
res->lockname.len, res->lockname.name, old_master);
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len, iter.node_map,
+ ret = dlm_do_assert_master(dlm, res, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
mlog(0, "assert master to original master failed "