summaryrefslogtreecommitdiffstats
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c85
1 files changed, 54 insertions, 31 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 4cc31be9cd9..6bfbd615380 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
rc->rc_type = type;
+ spin_lock(&ls->ls_recover_lock);
+ rc->rc_seq = ls->ls_recover_seq;
+ spin_unlock(&ls->ls_recover_lock);
+
*mh_ret = mh;
*rc_ret = rc;
return 0;
@@ -78,8 +82,17 @@ static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
rf->rf_lsflags = ls->ls_exflags;
}
-static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid)
+static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
+ struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
+
+ if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
+ log_error(ls, "version mismatch: %x nodeid %d: %x",
+ DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
+ rc->rc_header.h_version);
+ return -EINVAL;
+ }
+
if (rf->rf_lvblen != ls->ls_lvblen ||
rf->rf_lsflags != ls->ls_exflags) {
log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -125,7 +138,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
goto out;
allow_sync_reply(ls, &rc->rc_id);
- memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
+ memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
send_rcom(ls, mh, rc);
@@ -141,8 +154,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
log_debug(ls, "remote node %d not ready", nodeid);
rc->rc_result = 0;
} else
- error = check_config(ls, (struct rcom_config *) rc->rc_buf,
- nodeid);
+ error = check_config(ls, rc, nodeid);
/* the caller looks at rc_result for the remote recovery status */
out:
return error;
@@ -159,6 +171,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
if (error)
return;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
rc->rc_result = dlm_recover_status(ls);
make_config(ls, (struct rcom_config *) rc->rc_buf);
@@ -200,7 +213,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
if (nodeid == dlm_our_nodeid()) {
dlm_copy_master_names(ls, last_name, last_len,
ls->ls_recover_buf + len,
- dlm_config.buffer_size - len, nodeid);
+ dlm_config.ci_buffer_size - len, nodeid);
goto out;
}
@@ -210,7 +223,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
memcpy(rc->rc_buf, last_name, last_len);
allow_sync_reply(ls, &rc->rc_id);
- memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
+ memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
send_rcom(ls, mh, rc);
@@ -224,30 +237,17 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
- int error, inlen, outlen;
- int nodeid = rc_in->rc_header.h_nodeid;
- uint32_t status = dlm_recover_status(ls);
-
- /*
- * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
- * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
- * It could only happen in rare cases where we get a late NAMES
- * message from a previous instance of recovery.
- */
-
- if (!(status & DLM_RS_NODES)) {
- log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
- return;
- }
+ int error, inlen, outlen, nodeid;
nodeid = rc_in->rc_header.h_nodeid;
inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
- outlen = dlm_config.buffer_size - sizeof(struct dlm_rcom);
+ outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
if (error)
return;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
@@ -294,6 +294,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
ret_nodeid = error;
rc->rc_result = ret_nodeid;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
send_rcom(ls, mh, rc);
}
@@ -375,20 +376,13 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
send_rcom(ls, mh, rc);
}
static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
- uint32_t status = dlm_recover_status(ls);
-
- if (!(status & DLM_RS_DIR)) {
- log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
- rc_in->rc_header.h_nodeid);
- return;
- }
-
dlm_recover_process_copy(ls, rc_in);
}
@@ -415,6 +409,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
rc->rc_type = DLM_RCOM_STATUS_REPLY;
rc->rc_id = rc_in->rc_id;
+ rc->rc_seq_reply = rc_in->rc_seq;
rc->rc_result = -ESRCH;
rf = (struct rcom_config *) rc->rc_buf;
@@ -426,6 +421,31 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0;
}
+static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+ uint64_t seq;
+ int rv = 0;
+
+ switch (rc->rc_type) {
+ case DLM_RCOM_STATUS_REPLY:
+ case DLM_RCOM_NAMES_REPLY:
+ case DLM_RCOM_LOOKUP_REPLY:
+ case DLM_RCOM_LOCK_REPLY:
+ spin_lock(&ls->ls_recover_lock);
+ seq = ls->ls_recover_seq;
+ spin_unlock(&ls->ls_recover_lock);
+ if (rc->rc_seq_reply != seq) {
+ log_debug(ls, "ignoring old reply %x from %d "
+ "seq_reply %llx expect %llx",
+ rc->rc_type, rc->rc_header.h_nodeid,
+ (unsigned long long)rc->rc_seq_reply,
+ (unsigned long long)seq);
+ rv = 1;
+ }
+ }
+ return rv;
+}
+
/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
@@ -449,11 +469,14 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
}
if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
- log_error(ls, "ignoring recovery message %x from %d",
+ log_debug(ls, "ignoring recovery message %x from %d",
rc->rc_type, nodeid);
goto out;
}
+ if (is_old_reply(ls, rc))
+ goto out;
+
if (nodeid != rc->rc_header.h_nodeid) {
log_error(ls, "bad rcom nodeid %d from %d",
rc->rc_header.h_nodeid, nodeid);