diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/dlm_internal.h | 13 | ||||
-rw-r--r-- | fs/dlm/lock.c | 181 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 21 |
3 files changed, 176 insertions, 39 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index a5f82d5b394..9d3e485f88c 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -498,6 +498,13 @@ struct rcom_lock { char rl_lvb[0]; }; +/* + * The max number of resources per rsbtbl bucket that shrink will attempt + * to remove in each iteration. + */ + +#define DLM_REMOVE_NAMES_MAX 8 + struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ dlm_lockspace_t *ls_local_handle; @@ -531,6 +538,12 @@ struct dlm_ls { int ls_new_rsb_count; struct list_head ls_new_rsb; /* new rsb structs */ + spinlock_t ls_remove_spin; + char ls_remove_name[DLM_RESNAME_MAXLEN+1]; + char *ls_remove_names[DLM_REMOVE_NAMES_MAX]; + int ls_remove_len; + int ls_remove_lens[DLM_REMOVE_NAMES_MAX]; + struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index d9ee1b96549..c7c6cf9e868 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } -/* FIXME: make this more efficient */ +/* If there's an rsb for the same resource being removed, ensure + that the remove message is sent before the new lookup message. + It should be rare to need a delay here, but if not, then it may + be worthwhile to add a proper wait mechanism rather than a delay. */ -static int shrink_bucket(struct dlm_ls *ls, int b) +static void wait_pending_remove(struct dlm_rsb *r) { - struct rb_node *n; + struct dlm_ls *ls = r->res_ls; + restart: + spin_lock(&ls->ls_remove_spin); + if (ls->ls_remove_len && + !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { + log_debug(ls, "delay lookup for remove dir %d %s", + r->res_dir_nodeid, r->res_name); + spin_unlock(&ls->ls_remove_spin); + msleep(1); + goto restart; + } + spin_unlock(&ls->ls_remove_spin); +} + +/* + * ls_remove_spin protects ls_remove_name and ls_remove_len which are + * read by other threads in wait_pending_remove. ls_remove_names + * and ls_remove_lens are only used by the scan thread, so they do + * not need protection. + */ + +static void shrink_bucket(struct dlm_ls *ls, int b) +{ + struct rb_node *n, *next; struct dlm_rsb *r; + char *name; int our_nodeid = dlm_our_nodeid(); - int count = 0, found; + int remote_count = 0; + int i, len, rv; - for (;;) { - found = 0; - spin_lock(&ls->ls_rsbtbl[b].lock); - for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); + memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - /* If we're the directory record for this rsb, and - we're not the master of it, then we need to wait - for the master node to send us a dir remove for - before removing the dir record. */ + spin_lock(&ls->ls_rsbtbl[b].lock); + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { + next = rb_next(n); + r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (!dlm_no_directory(ls) && !is_master(r) && - (dlm_dir_nodeid(r) == our_nodeid)) { - continue; - } + /* If we're the directory record for this rsb, and + we're not the master of it, then we need to wait + for the master node to send us a dir remove for + before removing the dir record. */ - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) - continue; - found = 1; - break; + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)) { + continue; } - if (!found) { - spin_unlock(&ls->ls_rsbtbl[b].lock); - break; + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { + continue; } - if (kref_put(&r->res_ref, kill_rsb)) { - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - spin_unlock(&ls->ls_rsbtbl[b].lock); + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) { /* We're the master of this rsb but we're not the directory record, so we need to tell the dir node to remove the dir record. */ - if (!dlm_no_directory(ls) && is_master(r) && - (dlm_dir_nodeid(r) != our_nodeid)) { - send_remove(r); - } + ls->ls_remove_lens[remote_count] = r->res_length; + memcpy(ls->ls_remove_names[remote_count], r->res_name, + DLM_RESNAME_MAXLEN); + remote_count++; - dlm_free_rsb(r); - count++; - } else { - spin_unlock(&ls->ls_rsbtbl[b].lock); + if (remote_count >= DLM_REMOVE_NAMES_MAX) + break; + continue; + } + + if (!kref_put(&r->res_ref, kill_rsb)) { log_error(ls, "tossed rsb in use %s", r->res_name); + continue; } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + dlm_free_rsb(r); } + spin_unlock(&ls->ls_rsbtbl[b].lock); - return count; + /* + * While searching for rsb's to free, we found some that require + * remote removal. We leave them in place and find them again here + * so there is a very small gap between removing them from the toss + * list and sending the removal. Keeping this gap small is + * important to keep us (the master node) from being out of sync + * with the remote dir node for very long. + * + * From the time the rsb is removed from toss until just after + * send_remove, the rsb name is saved in ls_remove_name. A new + * lookup checks this to ensure that a new lookup message for the + * same resource name is not sent just before the remove message. + */ + + for (i = 0; i < remote_count; i++) { + name = ls->ls_remove_names[i]; + len = ls->ls_remove_lens[i]; + + spin_lock(&ls->ls_rsbtbl[b].lock); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name not toss %s", name); + continue; + } + + if (r->res_master_nodeid != our_nodeid) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name master %d dir %d our %d %s", + r->res_master_nodeid, r->res_dir_nodeid, + our_nodeid, name); + continue; + } + + if (r->res_dir_nodeid == our_nodeid) { + /* should never happen */ + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name dir %d master %d our %d %s", + r->res_dir_nodeid, r->res_master_nodeid, + our_nodeid, name); + continue; + } + + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name toss_time %lu now %lu %s", + r->res_toss_time, jiffies, name); + continue; + } + + if (!kref_put(&r->res_ref, kill_rsb)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name in use %s", name); + continue; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + + /* block lookup of same name until we've sent remove */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = len; + memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + spin_unlock(&ls->ls_rsbtbl[b].lock); + + send_remove(r); + + /* allow lookup of name again */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = 0; + memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + + dlm_free_rsb(r); + } } void dlm_scan_rsbs(struct dlm_ls *ls) @@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 0; } + wait_pending_remove(r); + r->res_first_lkid = lkb->lkb_id; send_lookup(r, lkb); return 1; diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index d4d3b3165c6..952557d00cc 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -506,6 +506,15 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock_init(&ls->ls_rsbtbl[i].lock); } + spin_lock_init(&ls->ls_remove_spin); + + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { + ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, + GFP_KERNEL); + if (!ls->ls_remove_names[i]) + goto out_rsbtbl; + } + idr_init(&ls->ls_lkbidr); spin_lock_init(&ls->ls_lkbidr_spin); @@ -556,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); if (!ls->ls_recover_buf) - goto out_lkbfree; + goto out_lkbidr; ls->ls_slot = 0; ls->ls_num_slots = 0; @@ -640,8 +649,13 @@ static int new_lockspace(const char *name, const char *cluster, spin_unlock(&lslist_lock); idr_destroy(&ls->ls_recover_idr); kfree(ls->ls_recover_buf); - out_lkbfree: + out_lkbidr: idr_destroy(&ls->ls_lkbidr); + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { + if (ls->ls_remove_names[i]) + kfree(ls->ls_remove_names[i]); + } + out_rsbtbl: vfree(ls->ls_rsbtbl); out_lsfree: if (do_unreg) @@ -796,6 +810,9 @@ static int release_lockspace(struct dlm_ls *ls, int force) vfree(ls->ls_rsbtbl); + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) + kfree(ls->ls_remove_names[i]); + while (!list_empty(&ls->ls_new_rsb)) { rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); |