diff options
author | David Teigland <teigland@redhat.com> | 2012-06-14 12:17:32 -0500 |
---|---|---|
committer | David Teigland <teigland@redhat.com> | 2012-07-16 14:18:01 -0500 |
commit | 05c32f47bfae74dabff05208957768078b53cc49 (patch) | |
tree | 71034eba054f49723a0dac41f6bcd9d4f37eb2bc /fs/dlm/lock.c | |
parent | 1d7c484eeb167fc374294e38ae402de4097c8611 (diff) |
dlm: fix race between remove and lookup
It was possible for a remove message on an old
rsb to be sent after a lookup message on a new
rsb, where the rsbs were for the same resource
name. This could lead to a missing directory
entry for the new rsb.
It is fixed by keeping a copy of the resource
name being removed until after the remove has
been sent. A lookup checks if this in-progress
remove matches the name it is looking up.
Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r-- | fs/dlm/lock.c | 181 |
1 files changed, 144 insertions, 37 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index d9ee1b96549..c7c6cf9e868 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } -/* FIXME: make this more efficient */ +/* If there's an rsb for the same resource being removed, ensure + that the remove message is sent before the new lookup message. + It should be rare to need a delay here, but if not, then it may + be worthwhile to add a proper wait mechanism rather than a delay. */ -static int shrink_bucket(struct dlm_ls *ls, int b) +static void wait_pending_remove(struct dlm_rsb *r) { - struct rb_node *n; + struct dlm_ls *ls = r->res_ls; + restart: + spin_lock(&ls->ls_remove_spin); + if (ls->ls_remove_len && + !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { + log_debug(ls, "delay lookup for remove dir %d %s", + r->res_dir_nodeid, r->res_name); + spin_unlock(&ls->ls_remove_spin); + msleep(1); + goto restart; + } + spin_unlock(&ls->ls_remove_spin); +} + +/* + * ls_remove_spin protects ls_remove_name and ls_remove_len which are + * read by other threads in wait_pending_remove. ls_remove_names + * and ls_remove_lens are only used by the scan thread, so they do + * not need protection. + */ + +static void shrink_bucket(struct dlm_ls *ls, int b) +{ + struct rb_node *n, *next; struct dlm_rsb *r; + char *name; int our_nodeid = dlm_our_nodeid(); - int count = 0, found; + int remote_count = 0; + int i, len, rv; - for (;;) { - found = 0; - spin_lock(&ls->ls_rsbtbl[b].lock); - for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); + memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - /* If we're the directory record for this rsb, and - we're not the master of it, then we need to wait - for the master node to send us a dir remove for - before removing the dir record. */ + spin_lock(&ls->ls_rsbtbl[b].lock); + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { + next = rb_next(n); + r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (!dlm_no_directory(ls) && !is_master(r) && - (dlm_dir_nodeid(r) == our_nodeid)) { - continue; - } + /* If we're the directory record for this rsb, and + we're not the master of it, then we need to wait + for the master node to send us a dir remove for + before removing the dir record. */ - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) - continue; - found = 1; - break; + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)) { + continue; } - if (!found) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - break; + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { + continue; } - if (kref_put(&r->res_ref, kill_rsb)) { - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - spin_unlock(&ls->ls_rsbtbl[b].lock); + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) { /* We're the master of this rsb but we're not the directory record, so we need to tell the dir node to remove the dir record. */ - if (!dlm_no_directory(ls) && is_master(r) && - (dlm_dir_nodeid(r) != our_nodeid)) { - send_remove(r); - } + ls->ls_remove_lens[remote_count] = r->res_length; + memcpy(ls->ls_remove_names[remote_count], r->res_name, + DLM_RESNAME_MAXLEN); + remote_count++; - dlm_free_rsb(r); - count++; - } else { - spin_unlock(&ls->ls_rsbtbl[b].lock); + if (remote_count >= DLM_REMOVE_NAMES_MAX) + break; + continue; + } + + if (!kref_put(&r->res_ref, kill_rsb)) { log_error(ls, "tossed rsb in use %s", r->res_name); + continue; } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + dlm_free_rsb(r); } + spin_unlock(&ls->ls_rsbtbl[b].lock); - return count; + /* + * While searching for rsb's to free, we found some that require + * remote removal. We leave them in place and find them again here + * so there is a very small gap between removing them from the toss + * list and sending the removal. Keeping this gap small is + * important to keep us (the master node) from being out of sync + * with the remote dir node for very long. + * + * From the time the rsb is removed from toss until just after + * send_remove, the rsb name is saved in ls_remove_name. A new + * lookup checks this to ensure that a new lookup message for the + * same resource name is not sent just before the remove message. + */ + + for (i = 0; i < remote_count; i++) { + name = ls->ls_remove_names[i]; + len = ls->ls_remove_lens[i]; + + spin_lock(&ls->ls_rsbtbl[b].lock); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name not toss %s", name); + continue; + } + + if (r->res_master_nodeid != our_nodeid) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name master %d dir %d our %d %s", + r->res_master_nodeid, r->res_dir_nodeid, + our_nodeid, name); + continue; + } + + if (r->res_dir_nodeid == our_nodeid) { + /* should never happen */ + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name dir %d master %d our %d %s", + r->res_dir_nodeid, r->res_master_nodeid, + our_nodeid, name); + continue; + } + + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name toss_time %lu now %lu %s", + r->res_toss_time, jiffies, name); + continue; + } + + if (!kref_put(&r->res_ref, kill_rsb)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name in use %s", name); + continue; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + + /* block lookup of same name until we've sent remove */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = len; + memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + spin_unlock(&ls->ls_rsbtbl[b].lock); + + send_remove(r); + + /* allow lookup of name again */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = 0; + memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + + dlm_free_rsb(r); + } } void dlm_scan_rsbs(struct dlm_ls *ls) @@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 0; } + wait_pending_remove(r); + r->res_first_lkid = lkb->lkb_id; send_lookup(r, lkb); return 1; |