summaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/ast.c3
-rw-r--r--fs/dlm/dlm_internal.h16
-rw-r--r--fs/dlm/lock.c541
-rw-r--r--fs/dlm/lock.h7
-rw-r--r--fs/dlm/lockspace.c20
-rw-r--r--fs/dlm/lowcomms.c28
-rw-r--r--fs/dlm/memory.c8
-rw-r--r--fs/dlm/rcom.c61
-rw-r--r--fs/dlm/recover.c73
-rw-r--r--fs/dlm/recoverd.c15
-rw-r--r--fs/dlm/requestqueue.c43
11 files changed, 531 insertions, 284 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 90e5997262e..63dc19c54d5 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
}
mutex_unlock(&ls->ls_cb_mutex);
- log_debug(ls, "dlm_callback_resume %d", count);
+ if (count)
+ log_debug(ls, "dlm_callback_resume %d", count);
}
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 3a564d197e9..bc342f7ac3a 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -38,6 +38,7 @@
#include <linux/miscdevice.h>
#include <linux/mutex.h>
#include <linux/idr.h>
+#include <linux/ratelimit.h>
#include <asm/uaccess.h>
#include <linux/dlm.h>
@@ -74,6 +75,13 @@ do { \
(ls)->ls_name , ##args); \
} while (0)
+#define log_limit(ls, fmt, args...) \
+do { \
+ if (dlm_config.ci_log_debug) \
+ printk_ratelimited(KERN_DEBUG "dlm: %s: " fmt "\n", \
+ (ls)->ls_name , ##args); \
+} while (0)
+
#define DLM_ASSERT(x, do) \
{ \
if (!(x)) \
@@ -263,6 +271,8 @@ struct dlm_lkb {
ktime_t lkb_last_cast_time; /* for debugging */
ktime_t lkb_last_bast_time; /* for debugging */
+ uint64_t lkb_recover_seq; /* from ls_recover_seq */
+
char *lkb_lvbptr;
struct dlm_lksb *lkb_lksb; /* caller's status block */
void (*lkb_astfn) (void *astparam);
@@ -317,7 +327,7 @@ enum rsb_flags {
RSB_NEW_MASTER,
RSB_NEW_MASTER2,
RSB_RECOVER_CONVERT,
- RSB_LOCKS_PURGED,
+ RSB_RECOVER_GRANT,
};
static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@@ -563,6 +573,7 @@ struct dlm_ls {
struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */
+ unsigned int ls_recover_locks_in; /* for log info */
uint64_t ls_rcom_seq;
spinlock_t ls_rcom_spin;
struct list_head ls_recover_list;
@@ -589,6 +600,7 @@ struct dlm_ls {
#define LSFL_UEVENT_WAIT 5
#define LSFL_TIMEWARN 6
#define LSFL_CB_DELAY 7
+#define LSFL_NODIR 8
/* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */
@@ -636,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
static inline int dlm_no_directory(struct dlm_ls *ls)
{
- return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0;
+ return test_bit(LSFL_NODIR, &ls->ls_flags);
}
int dlm_netlink_init(void);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 4c58d4a3adc..bdafb65a523 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -160,11 +160,12 @@ static const int __quecvt_compat_matrix[8][8] = {
void dlm_print_lkb(struct dlm_lkb *lkb)
{
- printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
- " status %d rqmode %d grmode %d wait_type %d\n",
+ printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+ "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
- lkb->lkb_grmode, lkb->lkb_wait_type);
+ lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
+ (unsigned long long)lkb->lkb_recover_seq);
}
static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
static inline int is_master_copy(struct dlm_lkb *lkb)
{
- if (lkb->lkb_flags & DLM_IFL_MSTCPY)
- DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
}
@@ -479,6 +478,9 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
kref_get(&r->res_ref);
goto out;
}
+ if (error == -ENOTBLK)
+ goto out;
+
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
if (error)
goto out;
@@ -586,6 +588,23 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
return error;
}
+static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
+{
+ struct rb_node *n;
+ struct dlm_rsb *r;
+ int i;
+
+ for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+ spin_lock(&ls->ls_rsbtbl[i].lock);
+ for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+ r = rb_entry(n, struct dlm_rsb, res_hashnode);
+ if (r->res_hash == hash)
+ dlm_dump_rsb(r);
+ }
+ spin_unlock(&ls->ls_rsbtbl[i].lock);
+ }
+}
+
/* This is only called to add a reference when the code already holds
a valid reference to the rsb, so there's no need for locking. */
@@ -1064,8 +1083,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
goto out_del;
}
- log_error(ls, "remwait error %x reply %d flags %x no wait_type",
- lkb->lkb_id, mstype, lkb->lkb_flags);
+ log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
+ lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
+ mstype, lkb->lkb_flags);
return -1;
out_del:
@@ -1498,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
lkb->lkb_rqmode = DLM_LOCK_IV;
+ lkb->lkb_highbast = 0;
}
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
set_lvb_lock(r, lkb);
_grant_lock(r, lkb);
- lkb->lkb_highbast = 0;
}
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1866,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
/* Returns the highest requested mode of all blocked conversions; sets
cw if there's a blocked conversion to DLM_LOCK_CW. */
-static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+ unsigned int *count)
{
struct dlm_lkb *lkb, *s;
int hi, demoted, quit, grant_restart, demote_restart;
@@ -1885,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
if (can_be_granted(r, lkb, 0, &deadlk)) {
grant_lock_pending(r, lkb);
grant_restart = 1;
+ if (count)
+ (*count)++;
continue;
}
@@ -1918,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
return max_t(int, high, hi);
}
-static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+ unsigned int *count)
{
struct dlm_lkb *lkb, *s;
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
- if (can_be_granted(r, lkb, 0, NULL))
+ if (can_be_granted(r, lkb, 0, NULL)) {
grant_lock_pending(r, lkb);
- else {
+ if (count)
+ (*count)++;
+ } else {
high = max_t(int, lkb->lkb_rqmode, high);
if (lkb->lkb_rqmode == DLM_LOCK_CW)
*cw = 1;
@@ -1954,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
return 0;
}
-static void grant_pending_locks(struct dlm_rsb *r)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
{
struct dlm_lkb *lkb, *s;
int high = DLM_LOCK_IV;
int cw = 0;
- DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
+ if (!is_master(r)) {
+ log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+ dlm_dump_rsb(r);
+ return;
+ }
- high = grant_pending_convert(r, high, &cw);
- high = grant_pending_wait(r, high, &cw);
+ high = grant_pending_convert(r, high, &cw, count);
+ high = grant_pending_wait(r, high, &cw, count);
if (high == DLM_LOCK_IV)
return;
@@ -2499,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
before we try again to grant this one. */
if (is_demoted(lkb)) {
- grant_pending_convert(r, DLM_LOCK_IV, NULL);
+ grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
if (_can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
@@ -2527,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
{
switch (error) {
case 0:
- grant_pending_locks(r);
+ grant_pending_locks(r, NULL);
/* grant_pending_locks also sends basts */
break;
case -EAGAIN:
@@ -2550,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
int error)
{
- grant_pending_locks(r);
+ grant_pending_locks(r, NULL);
}
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2571,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
int error)
{
if (error)
- grant_pending_locks(r);
+ grant_pending_locks(r, NULL);
}
/*
@@ -3372,7 +3402,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
-static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3412,14 +3442,15 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
error = 0;
if (error)
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3429,6 +3460,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
goto fail;
+ if (lkb->lkb_remid != ms->m_lkid) {
+ log_error(ls, "receive_convert %x remid %x recover_seq %llu "
+ "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
+ (unsigned long long)lkb->lkb_recover_seq,
+ ms->m_header.h_nodeid, ms->m_lkid);
+ error = -ENOENT;
+ goto fail;
+ }
+
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3456,14 +3496,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3473,6 +3514,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
if (error)
goto fail;
+ if (lkb->lkb_remid != ms->m_lkid) {
+ log_error(ls, "receive_unlock %x remid %x remote %d %x",
+ lkb->lkb_id, lkb->lkb_remid,
+ ms->m_header.h_nodeid, ms->m_lkid);
+ error = -ENOENT;
+ goto fail;
+ }
+
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3497,14 +3546,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
@@ -3532,25 +3582,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
- return;
+ return 0;
fail:
setup_stub_lkb(ls, ms);
send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+ return error;
}
-static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_grant from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
r = lkb->lkb_resource;
@@ -3570,20 +3618,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
+ return 0;
}
-static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_bast from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
r = lkb->lkb_resource;
@@ -3595,10 +3641,12 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
goto out;
queue_bast(r, lkb, ms->m_bastmode);
+ lkb->lkb_highbast = ms->m_bastmode;
out:
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
+ return 0;
}
static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3653,18 +3701,15 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
do_purge(ls, ms->m_nodeid, ms->m_pid);
}
-static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
struct dlm_rsb *r;
int error, mstype, result;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_request_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
r = lkb->lkb_resource;
hold_rsb(r);
@@ -3676,8 +3721,13 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
mstype = lkb->lkb_wait_type;
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
- if (error)
+ if (error) {
+ log_error(ls, "receive_request_reply %x remote %d %x result %d",
+ lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+ ms->m_result);
+ dlm_dump_rsb(r);
goto out;
+ }
/* Optimization: the dir node was also the master, so it took our
lookup as a request and sent request reply instead of lookup reply */
@@ -3755,6 +3805,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
+ return 0;
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3793,8 +3844,11 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
break;
default:
- log_error(r->res_ls, "receive_convert_reply %x error %d",
- lkb->lkb_id, ms->m_result);
+ log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
+ lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+ ms->m_result);
+ dlm_print_rsb(r);
+ dlm_print_lkb(lkb);
}
}
@@ -3821,20 +3875,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
put_rsb(r);
}
-static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_convert_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
_receive_convert_reply(lkb, ms);
dlm_put_lkb(lkb);
+ return 0;
}
static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3873,20 +3925,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
put_rsb(r);
}
-static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_unlock_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
_receive_unlock_reply(lkb, ms);
dlm_put_lkb(lkb);
+ return 0;
}
static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3925,20 +3975,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
put_rsb(r);
}
-static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
{
struct dlm_lkb *lkb;
int error;
error = find_lkb(ls, ms->m_remid, &lkb);
- if (error) {
- log_debug(ls, "receive_cancel_reply from %d no lkb %x",
- ms->m_header.h_nodeid, ms->m_remid);
- return;
- }
+ if (error)
+ return error;
_receive_cancel_reply(lkb, ms);
dlm_put_lkb(lkb);
+ return 0;
}
static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3949,7 +3997,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
error = find_lkb(ls, ms->m_lkid, &lkb);
if (error) {
- log_error(ls, "receive_lookup_reply no lkb");
+ log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
return;
}
@@ -3993,8 +4041,11 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb);
}
-static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+ uint32_t saved_seq)
{
+ int error = 0, noent = 0;
+
if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
log_debug(ls, "ignore non-member message %d from %d %x %x %d",
ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
@@ -4007,47 +4058,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
/* messages sent to a master node */
case DLM_MSG_REQUEST:
- receive_request(ls, ms);
+ error = receive_request(ls, ms);
break;
case DLM_MSG_CONVERT:
- receive_convert(ls, ms);
+ error = receive_convert(ls, ms);
break;
case DLM_MSG_UNLOCK:
- receive_unlock(ls, ms);
+ error = receive_unlock(ls, ms);
break;
case DLM_MSG_CANCEL:
- receive_cancel(ls, ms);
+ noent = 1;
+ error = receive_cancel(ls, ms);
break;
/* messages sent from a master node (replies to above) */
case DLM_MSG_REQUEST_REPLY:
- receive_request_reply(ls, ms);
+ error = receive_request_reply(ls, ms);
break;
case DLM_MSG_CONVERT_REPLY:
- receive_convert_reply(ls, ms);
+ error = receive_convert_reply(ls, ms);
break;
case DLM_MSG_UNLOCK_REPLY:
- receive_unlock_reply(ls, ms);
+ error = receive_unlock_reply(ls, ms);
break;
case DLM_MSG_CANCEL_REPLY:
- receive_cancel_reply(ls, ms);
+ error = receive_cancel_reply(ls, ms);
break;
/* messages sent from a master node (only two types of async msg) */
case DLM_MSG_GRANT:
- receive_grant(ls, ms);
+ noent = 1;
+ error = receive_grant(ls, ms);
break;
case DLM_MSG_BAST:
- receive_bast(ls, ms);
+ noent = 1;
+ error = receive_bast(ls, ms);
break;
/* messages sent to a dir node */
@@ -4075,6 +4129,37 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
default:
log_error(ls, "unknown message type %d", ms->m_type);
}
+
+ /*
+ * When checking for ENOENT, we're checking the result of
+ * find_lkb(m_remid):
+ *
+ * The lock id referenced in the message wasn't found. This may
+ * happen in normal usage for the async messages and cancel, so
+ * only use log_debug for them.
+ *
+ * Some errors are expected and normal.
+ */
+
+ if (error == -ENOENT && noent) {
+ log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
+ ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+ ms->m_lkid, saved_seq);
+ } else if (error == -ENOENT) {
+ log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
+ ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+ ms->m_lkid, saved_seq);
+
+ if (ms->m_type == DLM_MSG_CONVERT)
+ dlm_dump_rsb_hash(ls, ms->m_hash);
+ }
+
+ if (error == -EINVAL) {
+ log_error(ls, "receive %d inval from %d lkid %x remid %x "
+ "saved_seq %u",
+ ms->m_type, ms->m_header.h_nodeid,
+ ms->m_lkid, ms->m_remid, saved_seq);
+ }
}
/* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4092,16 +4177,17 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
dlm_add_requestqueue(ls, nodeid, ms);
} else {
dlm_wait_requestqueue(ls);
- _receive_message(ls, ms);
+ _receive_message(ls, ms, 0);
}
}
/* This is called by dlm_recoverd to process messages that were saved on
the requestqueue. */
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+ uint32_t saved_seq)
{
- _receive_message(ls, ms);
+ _receive_message(ls, ms, saved_seq);
}
/* This is called by the midcomms layer when something is received for
@@ -4137,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) {
- if (dlm_config.ci_log_debug)
- log_print("invalid lockspace %x from %d cmd %d type %d",
- hd->h_lockspace, nodeid, hd->h_cmd, type);
+ if (dlm_config.ci_log_debug) {
+ printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
+ "%u from %d cmd %d type %d\n",
+ hd->h_lockspace, nodeid, hd->h_cmd, type);
+ }
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4187,15 +4275,13 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
/* A waiting lkb needs recovery if the master node has failed, or
the master node is changing (only when no directory is used) */
-static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
+static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
+ int dir_nodeid)
{
- if (dlm_is_removed(ls, lkb->lkb_nodeid))
+ if (dlm_no_directory(ls))
return 1;
- if (!dlm_no_directory(ls))
- return 0;
-
- if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
+ if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
return 1;
return 0;
@@ -4212,6 +4298,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
struct dlm_lkb *lkb, *safe;
struct dlm_message *ms_stub;
int wait_type, stub_unlock_result, stub_cancel_result;
+ int dir_nodeid;
ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
if (!ms_stub) {
@@ -4223,13 +4310,21 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
+ dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
+
/* exclude debug messages about unlocks because there can be so
many and they aren't very interesting */
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
- log_debug(ls, "recover_waiter %x nodeid %d "
- "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
- lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+ log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+ "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+ lkb->lkb_id,
+ lkb->lkb_remid,
+ lkb->lkb_wait_type,
+ lkb->lkb_resource->res_nodeid,
+ lkb->lkb_nodeid,
+ lkb->lkb_wait_nodeid,
+ dir_nodeid);
}
/* all outstanding lookups, regardless of destination will be
@@ -4240,7 +4335,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
continue;
}
- if (!waiter_needs_recovery(ls, lkb))
+ if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
continue;
wait_type = lkb->lkb_wait_type;
@@ -4373,8 +4468,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
ou = is_overlap_unlock(lkb);
err = 0;
- log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
- lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
+ log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+ "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
+ "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
+ r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
+ dlm_dir_nodeid(r), oc, ou);
/* At this point we assume that we won't get a reply to any
previous op or overlap op on this lock. First, do a big
@@ -4426,9 +4524,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
}
}
- if (err)
- log_error(ls, "recover_waiters_post %x %d %x %d %d",
- lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+ if (err) {
+ log_error(ls, "waiter %x msg %d r_nodeid %d "
+ "dir_nodeid %d overlap %d %d",
+ lkb->lkb_id, mstype, r->res_nodeid,
+ dlm_dir_nodeid(r), oc, ou);
+ }
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
@@ -4437,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
return error;
}
-static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
- int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
+static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
+ struct list_head *list)
{
- struct dlm_ls *ls = r->res_ls;
struct dlm_lkb *lkb, *safe;
- list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
- if (test(ls, lkb)) {
- rsb_set_flag(r, RSB_LOCKS_PURGED);
- del_lkb(r, lkb);
- /* this put should free the lkb */
- if (!dlm_put_lkb(lkb))
- log_error(ls, "purged lkb not released");
- }
+ list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+ if (!is_master_copy(lkb))
+ continue;
+
+ /* don't purge lkbs we've added in recover_master_copy for
+ the current recovery seq */
+
+ if (lkb->lkb_recover_seq == ls->ls_recover_seq)
+ continue;
+
+ del_lkb(r, lkb);
+
+ /* this put should free the lkb */
+ if (!dlm_put_lkb(lkb))
+ log_error(ls, "purged mstcpy lkb not released");
}
}
-static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
+void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
{
- return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
-}
+ struct dlm_ls *ls = r->res_ls;
-static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
- return is_master_copy(lkb);
+ purge_mstcpy_list(ls, r, &r->res_grantqueue);
+ purge_mstcpy_list(ls, r, &r->res_convertqueue);
+ purge_mstcpy_list(ls, r, &r->res_waitqueue);
}
-static void purge_dead_locks(struct dlm_rsb *r)
+static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
+ struct list_head *list,
+ int nodeid_gone, unsigned int *count)
{
- purge_queue(r, &r->res_grantqueue, &purge_dead_test);
- purge_queue(r, &r->res_convertqueue, &purge_dead_test);
- purge_queue(r, &r->res_waitqueue, &purge_dead_test);
-}
+ struct dlm_lkb *lkb, *safe;
-void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
-{
- purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
- purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
- purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
+ list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+ if (!is_master_copy(lkb))
+ continue;
+
+ if ((lkb->lkb_nodeid == nodeid_gone) ||
+ dlm_is_removed(ls, lkb->lkb_nodeid)) {
+
+ del_lkb(r, lkb);
+
+ /* this put should free the lkb */
+ if (!dlm_put_lkb(lkb))
+ log_error(ls, "purged dead lkb not released");
+
+ rsb_set_flag(r, RSB_RECOVER_GRANT);
+
+ (*count)++;
+ }
+ }
}
/* Get rid of locks held by nodes that are gone. */
-int dlm_purge_locks(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls)
{
struct dlm_rsb *r;
+ struct dlm_member *memb;
+ int nodes_count = 0;
+ int nodeid_gone = 0;
+ unsigned int lkb_count = 0;
- log_debug(ls, "dlm_purge_locks");
+ /* cache one removed nodeid to optimize the common
+ case of a single node removed */
+
+ list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+ nodes_count++;
+ nodeid_gone = memb->nodeid;
+ }
+
+ if (!nodes_count)
+ return;
down_write(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
hold_rsb(r);
lock_rsb(r);
- if (is_master(r))
- purge_dead_locks(r);
+ if (is_master(r)) {
+ purge_dead_list(ls, r, &r->res_grantqueue,
+ nodeid_gone, &lkb_count);
+ purge_dead_list(ls, r, &r->res_convertqueue,
+ nodeid_gone, &lkb_count);
+ purge_dead_list(ls, r, &r->res_waitqueue,
+ nodeid_gone, &lkb_count);
+ }
unlock_rsb(r);
unhold_rsb(r);
-
- schedule();
+ cond_resched();
}
up_write(&ls->ls_root_sem);
- return 0;
+ if (lkb_count)
+ log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
+ lkb_count, nodes_count);
}
-static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
{
struct rb_node *n;
- struct dlm_rsb *r, *r_ret = NULL;
+ struct dlm_rsb *r;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
- if (!rsb_flag(r, RSB_LOCKS_PURGED))
+
+ if (!rsb_flag(r, RSB_RECOVER_GRANT))
+ continue;
+ rsb_clear_flag(r, RSB_RECOVER_GRANT);
+ if (!is_master(r))
continue;
hold_rsb(r);
- rsb_clear_flag(r, RSB_LOCKS_PURGED);
- r_ret = r;
- break;
+ spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ return r;
}
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
- return r_ret;
+ return NULL;
}
-void dlm_grant_after_purge(struct dlm_ls *ls)
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted. The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock. This would make iterating through all
+ * rsb's very inefficient. So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
+
+void dlm_recover_grant(struct dlm_ls *ls)
{
struct dlm_rsb *r;
int bucket = 0;
+ unsigned int count = 0;
+ unsigned int rsb_count = 0;
+ unsigned int lkb_count = 0;
while (1) {
- r = find_purged_rsb(ls, bucket);
+ r = find_grant_rsb(ls, bucket);
if (!r) {
if (bucket == ls->ls_rsbtbl_size - 1)
break;
bucket++;
continue;
}
+ rsb_count++;
+ count = 0;
lock_rsb(r);
- if (is_master(r)) {
- grant_pending_locks(r);
- confirm_master(r, 0);
- }
+ grant_pending_locks(r, &count);
+ lkb_count += count;
+ confirm_master(r, 0);
unlock_rsb(r);
put_rsb(r);
- schedule();
+ cond_resched();
}
+
+ if (lkb_count)
+ log_debug(ls, "dlm_recover_grant %u locks on %u resources",
+ lkb_count, rsb_count);
}
static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4631,6 +4797,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
struct dlm_lkb *lkb;
+ uint32_t remid = 0;
int error;
if (rl->rl_parent_lkid) {
@@ -4638,14 +4805,31 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
goto out;
}
- error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
- R_MASTER, &r);
+ remid = le32_to_cpu(rl->rl_lkid);
+
+ /* In general we expect the rsb returned to be R_MASTER, but we don't
+ have to require it. Recovery of masters on one node can overlap
+ recovery of locks on another node, so one node can send us MSTCPY
+ locks before we've made ourselves master of this rsb. We can still
+ add new MSTCPY locks that we receive here without any harm; when
+ we make ourselves master, dlm_recover_masters() won't touch the
+ MSTCPY locks we've received early. */
+
+ error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
if (error)
goto out;
+ if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
+ log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
+ rc->rc_header.h_nodeid, remid);
+ error = -EBADR;
+ put_rsb(r);
+ goto out;
+ }
+
lock_rsb(r);
- lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
+ lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
if (lkb) {
error = -EEXIST;
goto out_remid;
@@ -4664,19 +4848,25 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
attach_lkb(r, lkb);
add_lkb(r, lkb, rl->rl_status);
error = 0;
+ ls->ls_recover_locks_in++;
+
+ if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+ rsb_set_flag(r, RSB_RECOVER_GRANT);
out_remid:
/* this is the new value returned to the lock holder for
saving in its process-copy lkb */
rl->rl_remid = cpu_to_le32(lkb->lkb_id);
+ lkb->lkb_recover_seq = ls->ls_recover_seq;
+
out_unlock:
unlock_rsb(r);
put_rsb(r);
out:
- if (error)
- log_debug(ls, "recover_master_copy %d %x", error,
- le32_to_cpu(rl->rl_lkid));
+ if (error && error != -EEXIST)
+ log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
+ rc->rc_header.h_nodeid, remid, error);
rl->rl_result = cpu_to_le32(error);
return error;
}
@@ -4687,41 +4877,52 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
struct dlm_lkb *lkb;
- int error;
+ uint32_t lkid, remid;
+ int error, result;
+
+ lkid = le32_to_cpu(rl->rl_lkid);
+ remid = le32_to_cpu(rl->rl_remid);
+ result = le32_to_cpu(rl->rl_result);
- error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
+ error = find_lkb(ls, lkid, &lkb);
if (error) {
- log_error(ls, "recover_process_copy no lkid %x",
- le32_to_cpu(rl->rl_lkid));
+ log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
+ lkid, rc->rc_header.h_nodeid, remid, result);
return error;
}
- DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
-
- error = le32_to_cpu(rl->rl_result);
-
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
- switch (error) {
+ if (!is_process_copy(lkb)) {
+ log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
+ lkid, rc->rc_header.h_nodeid, remid, result);
+ dlm_dump_rsb(r);
+ unlock_rsb(r);
+ put_rsb(r);
+ dlm_put_lkb(lkb);
+ return -EINVAL;
+ }
+
+ switch (result) {
case -EBADR:
/* There's a chance the new master received our lock before
dlm_recover_master_reply(), this wouldn't happen if we did
a barrier between recover_masters and recover_locks. */
- log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
- (unsigned long)r, r->res_name);
+
+ log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
+ lkid, rc->rc_header.h_nodeid, remid, result);
+
dlm_send_rcom_lock(r, lkb);
goto out;
case -EEXIST:
- log_debug(ls, "master copy exists %x", lkb->lkb_id);
- /* fall through */
case 0:
- lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
+ lkb->lkb_remid = remid;
break;
default:
- log_error(ls, "dlm_recover_process_copy unknown error %d %x",
- error, lkb->lkb_id);
+ log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
+ lkid, rc->rc_header.h_nodeid, remid, result);
}
/* an ack for dlm_recover_locks() which waits for replies from
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 1a255307f6f..c8b226c6280 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -15,7 +15,8 @@
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_print_lkb(struct dlm_lkb *lkb);
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+ uint32_t saved_seq);
void dlm_receive_buffer(union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
void dlm_put_rsb(struct dlm_rsb *r);
@@ -31,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret);
-int dlm_purge_locks(struct dlm_ls *ls);
+void dlm_recover_purge(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
-void dlm_grant_after_purge(struct dlm_ls *ls);
+void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1ea25face8..ca506abbdd3 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
return len;
}
+static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
+{
+ return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
+}
+
+static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
+{
+ int val = simple_strtoul(buf, NULL, 0);
+ if (val == 1)
+ set_bit(LSFL_NODIR, &ls->ls_flags);
+ return len;
+}
+
static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
{
uint32_t status = dlm_recover_status(ls);
@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
.store = dlm_id_store
};
+static struct dlm_attr dlm_attr_nodir = {
+ .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
+ .show = dlm_nodir_show,
+ .store = dlm_nodir_store
+};
+
static struct dlm_attr dlm_attr_recover_status = {
.attr = {.name = "recover_status", .mode = S_IRUGO},
.show = dlm_recover_status_show
@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
&dlm_attr_control.attr,
&dlm_attr_event.attr,
&dlm_attr_id.attr,
+ &dlm_attr_nodir.attr,
&dlm_attr_recover_status.attr,
&dlm_attr_recover_nodeid.attr,
NULL,
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 133ef6dc7cb..5c1b0e38c7a 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -142,6 +142,7 @@ struct writequeue_entry {
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
static int dlm_local_count;
+static int dlm_allow_conn;
/* Work queues */
static struct workqueue_struct *recv_workqueue;
@@ -710,6 +711,13 @@ static int tcp_accept_from_sock(struct connection *con)
struct connection *newcon;
struct connection *addcon;
+ mutex_lock(&connections_lock);
+ if (!dlm_allow_conn) {
+ mutex_unlock(&connections_lock);
+ return -1;
+ }
+ mutex_unlock(&connections_lock);
+
memset(&peeraddr, 0, sizeof(peeraddr));
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
IPPROTO_TCP, &newsock);
@@ -1503,6 +1511,7 @@ void dlm_lowcomms_stop(void)
socket activity.
*/
mutex_lock(&connections_lock);
+ dlm_allow_conn = 0;
foreach_conn(stop_conn);
mutex_unlock(&connections_lock);
@@ -1530,7 +1539,7 @@ int dlm_lowcomms_start(void)
if (!dlm_local_count) {
error = -ENOTCONN;
log_print("no local IP address has been set");
- goto out;
+ goto fail;
}
error = -ENOMEM;
@@ -1538,7 +1547,13 @@ int dlm_lowcomms_start(void)
__alignof__(struct connection), 0,
NULL);
if (!con_cache)
- goto out;
+ goto fail;
+
+ error = work_start();
+ if (error)
+ goto fail_destroy;
+
+ dlm_allow_conn = 1;
/* Start listening */
if (dlm_config.ci_protocol == 0)
@@ -1548,20 +1563,17 @@ int dlm_lowcomms_start(void)
if (error)
goto fail_unlisten;
- error = work_start();
- if (error)
- goto fail_unlisten;
-
return 0;
fail_unlisten:
+ dlm_allow_conn = 0;
con = nodeid2con(0,0);
if (con) {
close_connection(con, false);
kmem_cache_free(con_cache, con);
}
+fail_destroy:
kmem_cache_destroy(con_cache);
-
-out:
+fail:
return error;
}
diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c
index da64df7576e..7cd24bccd4f 100644
--- a/fs/dlm/memory.c
+++ b/fs/dlm/memory.c
@@ -21,21 +21,19 @@ static struct kmem_cache *rsb_cache;
int __init dlm_memory_init(void)
{
- int ret = 0;
-
lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
__alignof__(struct dlm_lkb), 0, NULL);
if (!lkb_cache)
- ret = -ENOMEM;
+ return -ENOMEM;
rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
__alignof__(struct dlm_rsb), 0, NULL);
if (!rsb_cache) {
kmem_cache_destroy(lkb_cache);
- ret = -ENOMEM;
+ return -ENOMEM;
}
- return ret;
+ return 0;
}
void dlm_memory_exit(void)
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index ac5c616c969..64d3e2b958c 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -486,47 +486,50 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0;
}
-static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+/* Called by dlm_recv; corresponds to dlm_receive_message() but special
+ recovery-only comms are sent through here. */
+
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
+ int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
+ int stop, reply = 0, lock = 0;
+ uint32_t status;
uint64_t seq;
- int rv = 0;
switch (rc->rc_type) {
+ case DLM_RCOM_LOCK:
+ lock = 1;
+ break;
+ case DLM_RCOM_LOCK_REPLY:
+ lock = 1;
+ reply = 1;
+ break;
case DLM_RCOM_STATUS_REPLY:
case DLM_RCOM_NAMES_REPLY:
case DLM_RCOM_LOOKUP_REPLY:
- case DLM_RCOM_LOCK_REPLY:
- spin_lock(&ls->ls_recover_lock);
- seq = ls->ls_recover_seq;
- spin_unlock(&ls->ls_recover_lock);
- if (rc->rc_seq_reply != seq) {
- log_debug(ls, "ignoring old reply %x from %d "
- "seq_reply %llx expect %llx",
- rc->rc_type, rc->rc_header.h_nodeid,
- (unsigned long long)rc->rc_seq_reply,
- (unsigned long long)seq);
- rv = 1;
- }
- }
- return rv;
-}
-
-/* Called by dlm_recv; corresponds to dlm_receive_message() but special
- recovery-only comms are sent through here. */
+ reply = 1;
+ };
-void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
-{
- int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
+ spin_lock(&ls->ls_recover_lock);
+ status = ls->ls_recover_status;
+ stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+ seq = ls->ls_recover_seq;
+ spin_unlock(&ls->ls_recover_lock);
- if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
- log_debug(ls, "ignoring recovery message %x from %d",
- rc->rc_type, nodeid);
+ if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
+ (reply && (rc->rc_seq_reply != seq)) ||
+ (lock && !(status & DLM_RS_DIR))) {
+ log_limit(ls, "dlm_receive_rcom ignore msg %d "
+ "from %d %llu %llu recover seq %llu sts %x gen %u",
+ rc->rc_type,
+ nodeid,
+ (unsigned long long)rc->rc_seq,
+ (unsigned long long)rc->rc_seq_reply,
+ (unsigned long long)seq,
+ status, ls->ls_generation);
goto out;
}
- if (is_old_reply(ls, rc))
- goto out;
-
switch (rc->rc_type) {
case DLM_RCOM_STATUS:
receive_rcom_status(ls, rc);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 34d5adf1fce..7554e4dac6b 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
{
struct dlm_lkb *lkb;
- list_for_each_entry(lkb, queue, lkb_statequeue)
- if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
+ list_for_each_entry(lkb, queue, lkb_statequeue) {
+ if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
lkb->lkb_nodeid = nodeid;
+ lkb->lkb_remid = 0;
+ }
+ }
}
static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
/*
* Propagate the new master nodeid to locks
* The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
* rsb's to consider.
*/
static void set_new_master(struct dlm_rsb *r, int nodeid)
{
- lock_rsb(r);
r->res_nodeid = nodeid;
set_master_lkbs(r);
rsb_set_flag(r, RSB_NEW_MASTER);
rsb_set_flag(r, RSB_NEW_MASTER2);
- unlock_rsb(r);
}
/*
@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
static int recover_master(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
- int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
-
- dir_nodeid = dlm_dir_nodeid(r);
+ int error, ret_nodeid;
+ int our_nodeid = dlm_our_nodeid();
+ int dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) {
error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
if (ret_nodeid == our_nodeid)
ret_nodeid = 0;
+ lock_rsb(r);
set_new_master(r, ret_nodeid);
+ unlock_rsb(r);
} else {
recover_list_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
}
/*
- * When not using a directory, most resource names will hash to a new static
- * master nodeid and the resource will need to be remastered.
+ * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
+ * This is necessary because recovery can be started, aborted and restarted,
+ * causing the master nodeid to briefly change during the aborted recovery, and
+ * change back to the original value in the second recovery. The MSTCPY locks
+ * may or may not have been purged during the aborted recovery. Another node
+ * with an outstanding request in waiters list and a request reply saved in the
+ * requestqueue, cannot know whether it should ignore the reply and resend the
+ * request, or accept the reply and complete the request. It must do the
+ * former if the remote node purged MSTCPY locks, and it must do the later if
+ * the remote node did not. This is solved by always purging MSTCPY locks, in
+ * which case, the request reply would always be ignored and the request
+ * resent.
*/
static int recover_master_static(struct dlm_rsb *r)
{
- int master = dlm_dir_nodeid(r);
+ int dir_nodeid = dlm_dir_nodeid(r);
+ int new_master = dir_nodeid;
- if (master == dlm_our_nodeid())
- master = 0;
+ if (dir_nodeid == dlm_our_nodeid())
+ new_master = 0;
- if (r->res_nodeid != master) {
- if (is_master(r))
- dlm_purge_mstcpy_locks(r);
- set_new_master(r, master);
- return 1;
- }
- return 0;
+ lock_rsb(r);
+ dlm_purge_mstcpy_locks(r);
+ set_new_master(r, new_master);
+ unlock_rsb(r);
+ return 1;
}
/*
@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
if (nodeid == dlm_our_nodeid())
nodeid = 0;
+ lock_rsb(r);
set_new_master(r, nodeid);
+ unlock_rsb(r);
recover_list_del(r);
if (recover_list_empty(ls))
@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
struct dlm_rsb *r;
int error, count = 0;
- log_debug(ls, "dlm_recover_locks");
-
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (is_master(r)) {
@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
}
up_read(&ls->ls_root_sem);
- log_debug(ls, "dlm_recover_locks %d locks", count);
+ log_debug(ls, "dlm_recover_locks %d out", count);
error = dlm_wait_function(ls, &recover_list_empty);
out:
@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
}
/* We've become the new master for this rsb and waiting/converting locks may
- need to be granted in dlm_grant_after_purge() due to locks that may have
+ need to be granted in dlm_recover_grant() due to locks that may have
existed from a removed node. */
-static void set_locks_purged(struct dlm_rsb *r)
+static void recover_grant(struct dlm_rsb *r)
{
if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
- rsb_set_flag(r, RSB_LOCKS_PURGED);
+ rsb_set_flag(r, RSB_RECOVER_GRANT);
}
void dlm_recover_rsbs(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- int count = 0;
-
- log_debug(ls, "dlm_recover_rsbs");
+ unsigned int count = 0;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
if (rsb_flag(r, RSB_RECOVER_CONVERT))
recover_conversion(r);
if (rsb_flag(r, RSB_NEW_MASTER2))
- set_locks_purged(r);
+ recover_grant(r);
recover_lvb(r);
count++;
}
@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
}
up_read(&ls->ls_root_sem);
- log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
+ if (count)
+ log_debug(ls, "dlm_recover_rsbs %d done", count);
}
/* Create a single list of all root rsb's to be used during recovery */
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 3780caf7ae0..f1a9073c083 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -54,7 +54,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
unsigned long start;
int error, neg = 0;
- log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq);
+ log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
mutex_lock(&ls->ls_recoverd_active);
@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
+ ls->ls_recover_locks_in = 0;
+
dlm_set_recover_status(ls, DLM_RS_NODES);
error = dlm_recover_members_wait(ls);
@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Clear lkb's for departed nodes.
*/
- dlm_purge_locks(ls);
+ dlm_recover_purge(ls);
/*
* Get new master nodeid's for rsb's that were mastered on
@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
+ log_debug(ls, "dlm_recover_locks %u in",
+ ls->ls_recover_locks_in);
+
/*
* Finalize state in master rsb's now that all locks can be
* checked. This includes conversion resolution and lvb
@@ -225,9 +230,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail;
}
- dlm_grant_after_purge(ls);
+ dlm_recover_grant(ls);
- log_debug(ls, "dlm_recover %llx generation %u done: %u ms",
+ log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
(unsigned long long)rv->seq, ls->ls_generation,
jiffies_to_msecs(jiffies - start));
mutex_unlock(&ls->ls_recoverd_active);
@@ -237,7 +242,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
fail:
dlm_release_root_list(ls);
- log_debug(ls, "dlm_recover %llx error %d",
+ log_debug(ls, "dlm_recover %llu error %d",
(unsigned long long)rv->seq, error);
mutex_unlock(&ls->ls_recoverd_active);
return error;
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index a44fa22890e..1695f1b0dd4 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -19,6 +19,7 @@
struct rq_entry {
struct list_head list;
+ uint32_t recover_seq;
int nodeid;
struct dlm_message request;
};
@@ -41,6 +42,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
return;
}
+ e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
e->nodeid = nodeid;
memcpy(&e->request, ms, ms->m_header.h_length);
@@ -63,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
int dlm_process_requestqueue(struct dlm_ls *ls)
{
struct rq_entry *e;
+ struct dlm_message *ms;
int error = 0;
mutex_lock(&ls->ls_requestqueue_mutex);
@@ -76,7 +79,15 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
mutex_unlock(&ls->ls_requestqueue_mutex);
- dlm_receive_message_saved(ls, &e->request);
+ ms = &e->request;
+
+ log_limit(ls, "dlm_process_requestqueue msg %d from %d "
+ "lkid %x remid %x result %d seq %u",
+ ms->m_type, ms->m_header.h_nodeid,
+ ms->m_lkid, ms->m_remid, ms->m_result,
+ e->recover_seq);
+
+ dlm_receive_message_saved(ls, &e->request, e->recover_seq);
mutex_lock(&ls->ls_requestqueue_mutex);
list_del(&e->list);
@@ -138,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
if (!dlm_no_directory(ls))
return 0;
- /* with no directory, the master is likely to change as a part of
- recovery; requests to/from the defunct master need to be purged */
-
- switch (type) {
- case DLM_MSG_REQUEST:
- case DLM_MSG_CONVERT:
- case DLM_MSG_UNLOCK:
- case DLM_MSG_CANCEL:
- /* we're no longer the master of this resource, the sender
- will resend to the new master (see waiter_needs_recovery) */
-
- if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
- return 1;
- break;
-
- case DLM_MSG_REQUEST_REPLY:
- case DLM_MSG_CONVERT_REPLY:
- case DLM_MSG_UNLOCK_REPLY:
- case DLM_MSG_CANCEL_REPLY:
- case DLM_MSG_GRANT:
- /* this reply is from the former master of the resource,
- we'll resend to the new master if needed */
-
- if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
- return 1;
- break;
- }
-
- return 0;
+ return 1;
}
void dlm_purge_requestqueue(struct dlm_ls *ls)