summaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c188
1 files changed, 157 insertions, 31 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index fb3e2b0817f..6d4a83d5015 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -163,9 +163,6 @@ void dlm_dispatch_work(struct work_struct *work)
dlm_workfunc_t *workfunc;
int tot=0;
- if (!dlm_joined(dlm))
- return;
-
spin_lock(&dlm->work_lock);
list_splice_init(&dlm->work_list, &tmp_list);
spin_unlock(&dlm->work_lock);
@@ -757,7 +754,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
}
BUG_ON(num == dead_node);
- ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
+ ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
if (!ndata) {
dlm_destroy_recovery_area(dlm, dead_node);
return -ENOMEM;
@@ -821,7 +818,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
}
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -842,7 +840,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
}
BUG_ON(lr->dead_node != dlm->reco.dead_node);
- item = kcalloc(1, sizeof(*item), GFP_NOFS);
+ item = kzalloc(sizeof(*item), GFP_NOFS);
if (!item) {
dlm_put(dlm);
return -ENOMEM;
@@ -978,7 +976,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
}
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
@@ -1129,6 +1128,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
if (total_locks == mres_total_locks)
mres->flags |= DLM_MRES_ALL_DONE;
+ mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+ send_to);
+
/* send it */
ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
sz, send_to, &status);
@@ -1213,6 +1217,34 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
return 0;
}
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+ struct dlm_migratable_lockres *mres)
+{
+ struct dlm_lock dummy;
+ memset(&dummy, 0, sizeof(dummy));
+ dummy.ml.cookie = 0;
+ dummy.ml.type = LKM_IVMODE;
+ dummy.ml.convert_type = LKM_IVMODE;
+ dummy.ml.highest_blocked = LKM_IVMODE;
+ dummy.lksb = NULL;
+ dummy.ml.node = dlm->node_num;
+ dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+ struct dlm_migratable_lock *ml,
+ u8 *nodenum)
+{
+ if (unlikely(ml->cookie == 0 &&
+ ml->type == LKM_IVMODE &&
+ ml->convert_type == LKM_IVMODE &&
+ ml->highest_blocked == LKM_IVMODE &&
+ ml->list == DLM_BLOCKED_LIST)) {
+ *nodenum = ml->node;
+ return 1;
+ }
+ return 0;
+}
int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres,
@@ -1260,6 +1292,14 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
goto error;
}
}
+ if (total_locks == 0) {
+ /* send a dummy lock to indicate a mastery reference only */
+ mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+ "migration");
+ dlm_add_dummy_lock(dlm, mres);
+ }
/* flush any remaining locks */
ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
if (ret < 0)
@@ -1293,7 +1333,8 @@ error:
* do we spin? returning an error only delays the problem really
*/
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_migratable_lockres *mres =
@@ -1323,7 +1364,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
ret = -ENOMEM;
buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
- item = kcalloc(1, sizeof(*item), GFP_NOFS);
+ item = kzalloc(sizeof(*item), GFP_NOFS);
if (!buf || !item)
goto leave;
@@ -1382,17 +1423,21 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
/* add an extra ref for just-allocated lockres
* otherwise the lockres will be purged immediately */
dlm_lockres_get(res);
-
}
/* at this point we have allocated everything we need,
* and we have a hashed lockres with an extra ref and
* the proper res->state flags. */
ret = 0;
+ spin_lock(&res->spinlock);
+ /* drop this either when master requery finds a different master
+ * or when a lock is added by the recovery worker */
+ dlm_lockres_grab_inflight_ref(dlm, res);
if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
/* migration cannot have an unknown master */
BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1445,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
"unknown owner.. will need to requery: "
"%.*s\n", mres->lockname_len, mres->lockname);
} else {
- spin_lock(&res->spinlock);
+ /* take a reference now to pin the lockres, drop it
+ * when locks are added in the worker */
dlm_change_lockres_owner(dlm, res, dlm->node_num);
- spin_unlock(&res->spinlock);
}
+ spin_unlock(&res->spinlock);
/* queue up work for dlm_mig_lockres_worker */
dlm_grab(dlm); /* get an extra ref for the work item */
@@ -1459,6 +1505,9 @@ again:
"this node will take it.\n",
res->lockname.len, res->lockname.name);
} else {
+ spin_lock(&res->spinlock);
+ dlm_lockres_drop_inflight_ref(dlm, res);
+ spin_unlock(&res->spinlock);
mlog(0, "master needs to respond to sender "
"that node %u still owns %.*s\n",
real_master, res->lockname.len,
@@ -1578,7 +1627,8 @@ int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
/* this function cannot error, so unless the sending
* or receiving of the message failed, the owner can
* be trusted */
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
@@ -1660,21 +1710,38 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
{
struct dlm_migratable_lock *ml;
struct list_head *queue;
+ struct list_head *tmpq = NULL;
struct dlm_lock *newlock = NULL;
struct dlm_lockstatus *lksb = NULL;
int ret = 0;
- int i, bad;
+ int i, j, bad;
struct list_head *iter;
struct dlm_lock *lock = NULL;
+ u8 from = O2NM_MAX_NODES;
+ unsigned int added = 0;
mlog(0, "running %d locks for this lockres\n", mres->num_locks);
for (i=0; i<mres->num_locks; i++) {
ml = &(mres->ml[i]);
+
+ if (dlm_is_dummy_lock(dlm, ml, &from)) {
+ /* placeholder, just need to set the refmap bit */
+ BUG_ON(mres->num_locks != 1);
+ mlog(0, "%s:%.*s: dummy lock for %u\n",
+ dlm->name, mres->lockname_len, mres->lockname,
+ from);
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(from, res);
+ spin_unlock(&res->spinlock);
+ added++;
+ break;
+ }
BUG_ON(ml->highest_blocked != LKM_IVMODE);
newlock = NULL;
lksb = NULL;
queue = dlm_list_num_to_pointer(res, ml->list);
+ tmpq = NULL;
/* if the lock is for the local node it needs to
* be moved to the proper location within the queue.
@@ -1684,11 +1751,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
spin_lock(&res->spinlock);
- list_for_each(iter, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
- if (lock->ml.cookie != ml->cookie)
- lock = NULL;
- else
+ for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
+ tmpq = dlm_list_idx_to_ptr(res, j);
+ list_for_each(iter, tmpq) {
+ lock = list_entry (iter, struct dlm_lock, list);
+ if (lock->ml.cookie != ml->cookie)
+ lock = NULL;
+ else
+ break;
+ }
+ if (lock)
break;
}
@@ -1698,12 +1770,20 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
u64 c = ml->cookie;
mlog(ML_ERROR, "could not find local lock "
"with cookie %u:%llu!\n",
- dlm_get_lock_cookie_node(c),
- dlm_get_lock_cookie_seq(c));
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)));
+ __dlm_print_one_lock_resource(res);
BUG();
}
BUG_ON(lock->ml.node != ml->node);
+ if (tmpq != queue) {
+ mlog(0, "lock was on %u instead of %u for %.*s\n",
+ j, ml->list, res->lockname.len, res->lockname.name);
+ spin_unlock(&res->spinlock);
+ continue;
+ }
+
/* see NOTE above about why we do not update
* to match the master here */
@@ -1711,6 +1791,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
/* do not alter lock refcount. switching lists. */
list_move_tail(&lock->list, queue);
spin_unlock(&res->spinlock);
+ added++;
mlog(0, "just reordered a local lock!\n");
continue;
@@ -1799,14 +1880,14 @@ skip_lvb:
mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
"exists on this lockres!\n", dlm->name,
res->lockname.len, res->lockname.name,
- dlm_get_lock_cookie_node(c),
- dlm_get_lock_cookie_seq(c));
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)));
mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
"node=%u, cookie=%u:%llu, queue=%d\n",
ml->type, ml->convert_type, ml->node,
- dlm_get_lock_cookie_node(ml->cookie),
- dlm_get_lock_cookie_seq(ml->cookie),
+ dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
ml->list);
__dlm_print_one_lock_resource(res);
@@ -1817,12 +1898,22 @@ skip_lvb:
if (!bad) {
dlm_lock_get(newlock);
list_add_tail(&newlock->list, queue);
+ mlog(0, "%s:%.*s: added lock for node %u, "
+ "setting refmap bit\n", dlm->name,
+ res->lockname.len, res->lockname.name, ml->node);
+ dlm_lockres_set_refmap_bit(ml->node, res);
+ added++;
}
spin_unlock(&res->spinlock);
}
mlog(0, "done running all the locks\n");
leave:
+ /* balance the ref taken when the work was queued */
+ spin_lock(&res->spinlock);
+ dlm_lockres_drop_inflight_ref(dlm, res);
+ spin_unlock(&res->spinlock);
+
if (ret < 0) {
mlog_errno(ret);
if (newlock)
@@ -1935,9 +2026,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
if (res->owner == dead_node) {
list_del_init(&res->recovering);
spin_lock(&res->spinlock);
+ /* new_master has our reference from
+ * the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
- if (!__dlm_lockres_unused(res))
+ if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
@@ -1977,9 +2070,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
dlm_lockres_put(res);
}
spin_lock(&res->spinlock);
+ /* new_master has our reference from
+ * the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
- if (!__dlm_lockres_unused(res))
+ if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
@@ -2048,6 +2143,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
{
struct list_head *iter, *tmpiter;
struct dlm_lock *lock;
+ unsigned int freed = 0;
/* this node is the lockres master:
* 1) remove any stale locks for the dead node
@@ -2062,6 +2158,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
+ freed++;
}
}
list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2166,7 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
+ freed++;
}
}
list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2174,23 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
+ freed++;
}
}
+ if (freed) {
+ mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+ "dropping ref from lockres\n", dlm->name,
+ res->lockname.len, res->lockname.name, freed, dead_node);
+ BUG_ON(!test_bit(dead_node, res->refmap));
+ dlm_lockres_clear_refmap_bit(dead_node, res);
+ } else if (test_bit(dead_node, res->refmap)) {
+ mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+ "no locks and had not purged before dying\n", dlm->name,
+ res->lockname.len, res->lockname.name, dead_node);
+ dlm_lockres_clear_refmap_bit(dead_node, res);
+ }
+
/* do not kick thread yet */
__dlm_dirty_lockres(dlm, res);
}
@@ -2141,9 +2253,21 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
spin_lock(&res->spinlock);
/* zero the lvb if necessary */
dlm_revalidate_lvb(dlm, res, dead_node);
- if (res->owner == dead_node)
+ if (res->owner == dead_node) {
+ if (res->state & DLM_LOCK_RES_DROPPING_REF)
+ mlog(0, "%s:%.*s: owned by "
+ "dead node %u, this node was "
+ "dropping its ref when it died. "
+ "continue, dropping the flag.\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, dead_node);
+
+ /* the wake_up for this will happen when the
+ * RECOVERING flag is dropped later */
+ res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
dlm_move_lockres_to_recovery_list(dlm, res);
- else if (res->owner == dlm->node_num) {
+ } else if (res->owner == dlm->node_num) {
dlm_free_dead_locks(dlm, res, dead_node);
__dlm_lockres_calc_usage(dlm, res);
}
@@ -2480,7 +2604,8 @@ retry:
return ret;
}
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2608,7 +2733,8 @@ stage2:
return ret;
}
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;