diff options
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 143 |
1 files changed, 91 insertions, 52 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 03ccf9a7b1f..f564b0e5f80 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -366,7 +366,7 @@ void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up) struct dlm_master_list_entry *mle; assert_spin_locked(&dlm->spinlock); - + list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) { if (node_up) dlm_mle_node_up(dlm, mle, NULL, idx); @@ -511,8 +511,6 @@ static void dlm_lockres_release(struct kref *kref) atomic_dec(&dlm->res_cur_count); - dlm_put(dlm); - if (!hlist_unhashed(&res->hash_node) || !list_empty(&res->granted) || !list_empty(&res->converting) || @@ -585,8 +583,6 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, res->migration_pending = 0; res->inflight_locks = 0; - /* put in dlm_lockres_release */ - dlm_grab(dlm); res->dlm = dlm; kref_init(&res->refs); @@ -617,13 +613,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, { struct dlm_lock_resource *res = NULL; - res = (struct dlm_lock_resource *) - kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS); + res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS); if (!res) goto error; - res->lockname.name = (char *) - kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS); + res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS); if (!res->lockname.name) goto error; @@ -757,8 +751,7 @@ lookup: spin_unlock(&dlm->spinlock); mlog(0, "allocating a new resource\n"); /* nothing found and we need to allocate one. */ - alloc_mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); + alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!alloc_mle) goto leave; res = dlm_new_lockres(dlm, lockid, namelen); @@ -833,7 +826,7 @@ lookup: __dlm_insert_mle(dlm, mle); /* still holding the dlm spinlock, check the recovery map - * to see if there are any nodes that still need to be + * to see if there are any nodes that still need to be * considered. these will not appear in the mle nodemap * but they might own this lockres. wait on them. */ bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); @@ -883,7 +876,7 @@ redo_request: msleep(500); } continue; - } + } dlm_kick_recovery_thread(dlm); msleep(1000); @@ -939,8 +932,8 @@ wait: res->lockname.name, blocked); if (++tries > 20) { mlog(ML_ERROR, "%s:%.*s: spinning on " - "dlm_wait_for_lock_mastery, blocked=%d\n", - dlm->name, res->lockname.len, + "dlm_wait_for_lock_mastery, blocked=%d\n", + dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); dlm_print_one_mle(mle); @@ -1029,7 +1022,7 @@ recheck: ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked); b = (mle->type == DLM_MLE_BLOCK); if ((*blocked && !b) || (!*blocked && b)) { - mlog(0, "%s:%.*s: status change: old=%d new=%d\n", + mlog(0, "%s:%.*s: status change: old=%d new=%d\n", dlm->name, res->lockname.len, res->lockname.name, *blocked, b); *blocked = b; @@ -1542,8 +1535,7 @@ way_up_top: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); - mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); + mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!mle) { response = DLM_MASTER_RESP_ERROR; mlog_errno(-ENOMEM); @@ -1602,7 +1594,7 @@ send_response: } mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", dlm->node_num, res->lockname.len, res->lockname.name); - ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, + ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, DLM_ASSERT_MASTER_MLE_CLEANUP); if (ret < 0) { mlog(ML_ERROR, "failed to dispatch assert master work\n"); @@ -1666,7 +1658,9 @@ again: tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, &assert, sizeof(assert), to, &r); if (tmpret < 0) { - mlog(0, "assert_master returned %d!\n", tmpret); + mlog(ML_ERROR, "Error %d when sending message %u (key " + "0x%x) to node %u\n", tmpret, + DLM_ASSERT_MASTER_MSG, dlm->key, to); if (!dlm_is_host_down(tmpret)) { mlog(ML_ERROR, "unhandled error=%d!\n", tmpret); BUG(); @@ -1701,7 +1695,7 @@ again: if (r & DLM_ASSERT_RESPONSE_REASSERT) { mlog(0, "%.*s: node %u create mles on other " - "nodes and requests a re-assert\n", + "nodes and requests a re-assert\n", namelen, lockname, to); reassert = 1; } @@ -1812,7 +1806,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); goto done; - } + } } } spin_unlock(&dlm->master_lock); @@ -1875,7 +1869,6 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, ok: spin_unlock(&res->spinlock); } - spin_unlock(&dlm->spinlock); // mlog(0, "woo! got an assert_master from node %u!\n", // assert->node_idx); @@ -1883,7 +1876,7 @@ ok: int extra_ref = 0; int nn = -1; int rr, err = 0; - + spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) extra_ref = 1; @@ -1891,7 +1884,7 @@ ok: /* MASTER mle: if any bits set in the response map * then the calling node needs to re-assert to clear * up nodes that this node contacted */ - while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, + while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, nn+1)) < O2NM_MAX_NODES) { if (nn != dlm->node_num && nn != assert->node_idx) master_request = 1; @@ -1926,7 +1919,6 @@ ok: /* master is known, detach if not already detached. * ensures that only one assert_master call will happen * on this mle. */ - spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); rr = atomic_read(&mle->mle_refs.refcount); @@ -1959,7 +1951,6 @@ ok: __dlm_put_mle(mle); } spin_unlock(&dlm->master_lock); - spin_unlock(&dlm->spinlock); } else if (res) { if (res->owner != assert->node_idx) { mlog(0, "assert_master from %u, but current " @@ -1967,6 +1958,7 @@ ok: res->owner, namelen, name); } } + spin_unlock(&dlm->spinlock); done: ret = 0; @@ -2002,7 +1994,7 @@ kill: __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); - *ret_data = (void *)res; + *ret_data = (void *)res; dlm_put(dlm); return -EINVAL; } @@ -2040,10 +2032,10 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, item->u.am.request_from = request_from; item->u.am.flags = flags; - if (ignore_higher) - mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, + if (ignore_higher) + mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, res->lockname.name); - + spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); @@ -2133,7 +2125,7 @@ put: * think that $RECOVERY is currently mastered by a dead node. If so, * we wait a short time to allow that node to get notified by its own * heartbeat stack, then check again. All $RECOVERY lock resources - * mastered by dead nodes are purged when the hearbeat callback is + * mastered by dead nodes are purged when the hearbeat callback is * fired, so we can know for sure that it is safe to continue once * the node returns a live node or no node. */ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, @@ -2174,7 +2166,7 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, ret = -EAGAIN; } spin_unlock(&dlm->spinlock); - mlog(0, "%s: reco lock master is %u\n", dlm->name, + mlog(0, "%s: reco lock master is %u\n", dlm->name, master); break; } @@ -2207,7 +2199,9 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, &deref, sizeof(deref), res->owner, &r); if (ret < 0) - mlog_errno(ret); + mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " + "node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key, + res->owner); else if (r < 0) { /* BAD. other node says I did not have a ref. */ mlog(ML_ERROR,"while dropping ref on %s:%.*s " @@ -2454,8 +2448,7 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, goto leave; } - mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, - GFP_NOFS); + mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!mle) { mlog_errno(ret); goto leave; @@ -2602,7 +2595,7 @@ fail: mlog(0, "%s:%.*s: timed out during migration\n", dlm->name, res->lockname.len, res->lockname.name); - /* avoid hang during shutdown when migrating lockres + /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { mlog(0, "%s:%.*s: expected migration " @@ -2738,7 +2731,7 @@ static int dlm_migration_can_proceed(struct dlm_ctxt *dlm, can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING); spin_unlock(&res->spinlock); - /* target has died, so make the caller break out of the + /* target has died, so make the caller break out of the * wait_event, but caller must recheck the domain_map */ spin_lock(&dlm->spinlock); if (!test_bit(mig_target, dlm->domain_map)) @@ -2811,14 +2804,8 @@ again: mlog(0, "trying again...\n"); goto again; } - /* now that we are sure the MIGRATING state is there, drop - * the unneded state which blocked threads trying to DIRTY */ - spin_lock(&res->spinlock); - BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY)); - BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING)); - res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY; - spin_unlock(&res->spinlock); + ret = 0; /* did the target go down or die? */ spin_lock(&dlm->spinlock); if (!test_bit(target, dlm->domain_map)) { @@ -2829,9 +2816,21 @@ again: spin_unlock(&dlm->spinlock); /* + * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for + * another try; otherwise, we are sure the MIGRATING state is there, + * drop the unneded state which blocked threads trying to DIRTY + */ + spin_lock(&res->spinlock); + BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY)); + res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY; + if (!ret) + BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING)); + spin_unlock(&res->spinlock); + + /* * at this point: * - * o the DLM_LOCK_RES_MIGRATING flag is set + * o the DLM_LOCK_RES_MIGRATING flag is set if target not down * o there are no pending asts on this lockres * o all processes trying to reserve an ast on this * lockres must wait for the MIGRATING flag to clear @@ -2977,7 +2976,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, &migrate, sizeof(migrate), nodenum, &status); if (ret < 0) { - mlog(0, "migrate_request returned %d!\n", ret); + mlog(ML_ERROR, "Error %d when sending message %u (key " + "0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG, + dlm->key, nodenum); if (!dlm_is_host_down(ret)) { mlog(ML_ERROR, "unhandled error=%d!\n", ret); BUG(); @@ -3035,8 +3036,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, hash = dlm_lockid_hash(name, namelen); /* preallocate.. if this fails, abort */ - mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, - GFP_NOFS); + mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!mle) { ret = -ENOMEM; @@ -3046,8 +3046,6 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, /* check for pre-existing lock */ spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, name, namelen, hash); - spin_lock(&dlm->master_lock); - if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -3065,14 +3063,15 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, spin_unlock(&res->spinlock); } + spin_lock(&dlm->master_lock); /* ignore status. only nonzero status would BUG. */ ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, migrate->new_master, migrate->master); -unlock: spin_unlock(&dlm->master_lock); +unlock: spin_unlock(&dlm->spinlock); if (oldmle) { @@ -3434,3 +3433,43 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm, wake_up(&res->wq); wake_up(&dlm->migration_wq); } + +void dlm_force_free_mles(struct dlm_ctxt *dlm) +{ + int i; + struct hlist_head *bucket; + struct dlm_master_list_entry *mle; + struct hlist_node *tmp, *list; + + /* + * We notified all other nodes that we are exiting the domain and + * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still + * around we force free them and wake any processes that are waiting + * on the mles + */ + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + + BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING); + BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES)); + + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = dlm_master_hash(dlm, i); + hlist_for_each_safe(list, tmp, bucket) { + mle = hlist_entry(list, struct dlm_master_list_entry, + master_hash_node); + if (mle->type != DLM_MLE_BLOCK) { + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + } + atomic_set(&mle->woken, 1); + wake_up(&mle->wq); + + __dlm_unlink_mle(dlm, mle); + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); + } + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); +} |