summaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/dlm_internal.h13
-rw-r--r--fs/dlm/lock.c181
-rw-r--r--fs/dlm/lockspace.c21
3 files changed, 176 insertions, 39 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index a5f82d5b394..9d3e485f88c 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -498,6 +498,13 @@ struct rcom_lock {
char rl_lvb[0];
};
+/*
+ * The max number of resources per rsbtbl bucket that shrink will attempt
+ * to remove in each iteration.
+ */
+
+#define DLM_REMOVE_NAMES_MAX 8
+
struct dlm_ls {
struct list_head ls_list; /* list of lockspaces */
dlm_lockspace_t *ls_local_handle;
@@ -531,6 +538,12 @@ struct dlm_ls {
int ls_new_rsb_count;
struct list_head ls_new_rsb; /* new rsb structs */
+ spinlock_t ls_remove_spin;
+ char ls_remove_name[DLM_RESNAME_MAXLEN+1];
+ char *ls_remove_names[DLM_REMOVE_NAMES_MAX];
+ int ls_remove_len;
+ int ls_remove_lens[DLM_REMOVE_NAMES_MAX];
+
struct list_head ls_nodes; /* current nodes in ls */
struct list_head ls_nodes_gone; /* dead node list, recovery */
int ls_num_nodes; /* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index d9ee1b96549..c7c6cf9e868 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1624,65 +1624,170 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
return error;
}
-/* FIXME: make this more efficient */
+/* If there's an rsb for the same resource being removed, ensure
+ that the remove message is sent before the new lookup message.
+ It should be rare to need a delay here, but if not, then it may
+ be worthwhile to add a proper wait mechanism rather than a delay. */
-static int shrink_bucket(struct dlm_ls *ls, int b)
+static void wait_pending_remove(struct dlm_rsb *r)
{
- struct rb_node *n;
+ struct dlm_ls *ls = r->res_ls;
+ restart:
+ spin_lock(&ls->ls_remove_spin);
+ if (ls->ls_remove_len &&
+ !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
+ log_debug(ls, "delay lookup for remove dir %d %s",
+ r->res_dir_nodeid, r->res_name);
+ spin_unlock(&ls->ls_remove_spin);
+ msleep(1);
+ goto restart;
+ }
+ spin_unlock(&ls->ls_remove_spin);
+}
+
+/*
+ * ls_remove_spin protects ls_remove_name and ls_remove_len which are
+ * read by other threads in wait_pending_remove. ls_remove_names
+ * and ls_remove_lens are only used by the scan thread, so they do
+ * not need protection.
+ */
+
+static void shrink_bucket(struct dlm_ls *ls, int b)
+{
+ struct rb_node *n, *next;
struct dlm_rsb *r;
+ char *name;
int our_nodeid = dlm_our_nodeid();
- int count = 0, found;
+ int remote_count = 0;
+ int i, len, rv;
- for (;;) {
- found = 0;
- spin_lock(&ls->ls_rsbtbl[b].lock);
- for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
- r = rb_entry(n, struct dlm_rsb, res_hashnode);
+ memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
- /* If we're the directory record for this rsb, and
- we're not the master of it, then we need to wait
- for the master node to send us a dir remove for
- before removing the dir record. */
+ spin_lock(&ls->ls_rsbtbl[b].lock);
+ for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+ next = rb_next(n);
+ r = rb_entry(n, struct dlm_rsb, res_hashnode);
- if (!dlm_no_directory(ls) && !is_master(r) &&
- (dlm_dir_nodeid(r) == our_nodeid)) {
- continue;
- }
+ /* If we're the directory record for this rsb, and
+ we're not the master of it, then we need to wait
+ for the master node to send us a dir remove for
+ before removing the dir record. */
- if (!time_after_eq(jiffies, r->res_toss_time +
- dlm_config.ci_toss_secs * HZ))
- continue;
- found = 1;
- break;
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid)) {
+ continue;
}
- if (!found) {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- break;
+ if (!time_after_eq(jiffies, r->res_toss_time +
+ dlm_config.ci_toss_secs * HZ)) {
+ continue;
}
- if (kref_put(&r->res_ref, kill_rsb)) {
- rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid == our_nodeid) &&
+ (dlm_dir_nodeid(r) != our_nodeid)) {
/* We're the master of this rsb but we're not
the directory record, so we need to tell the
dir node to remove the dir record. */
- if (!dlm_no_directory(ls) && is_master(r) &&
- (dlm_dir_nodeid(r) != our_nodeid)) {
- send_remove(r);
- }
+ ls->ls_remove_lens[remote_count] = r->res_length;
+ memcpy(ls->ls_remove_names[remote_count], r->res_name,
+ DLM_RESNAME_MAXLEN);
+ remote_count++;
- dlm_free_rsb(r);
- count++;
- } else {
- spin_unlock(&ls->ls_rsbtbl[b].lock);
+ if (remote_count >= DLM_REMOVE_NAMES_MAX)
+ break;
+ continue;
+ }
+
+ if (!kref_put(&r->res_ref, kill_rsb)) {
log_error(ls, "tossed rsb in use %s", r->res_name);
+ continue;
}
+
+ rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+ dlm_free_rsb(r);
}
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
- return count;
+ /*
+ * While searching for rsb's to free, we found some that require
+ * remote removal. We leave them in place and find them again here
+ * so there is a very small gap between removing them from the toss
+ * list and sending the removal. Keeping this gap small is
+ * important to keep us (the master node) from being out of sync
+ * with the remote dir node for very long.
+ *
+ * From the time the rsb is removed from toss until just after
+ * send_remove, the rsb name is saved in ls_remove_name. A new
+ * lookup checks this to ensure that a new lookup message for the
+ * same resource name is not sent just before the remove message.
+ */
+
+ for (i = 0; i < remote_count; i++) {
+ name = ls->ls_remove_names[i];
+ len = ls->ls_remove_lens[i];
+
+ spin_lock(&ls->ls_rsbtbl[b].lock);
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+ if (rv) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_debug(ls, "remove_name not toss %s", name);
+ continue;
+ }
+
+ if (r->res_master_nodeid != our_nodeid) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_debug(ls, "remove_name master %d dir %d our %d %s",
+ r->res_master_nodeid, r->res_dir_nodeid,
+ our_nodeid, name);
+ continue;
+ }
+
+ if (r->res_dir_nodeid == our_nodeid) {
+ /* should never happen */
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "remove_name dir %d master %d our %d %s",
+ r->res_dir_nodeid, r->res_master_nodeid,
+ our_nodeid, name);
+ continue;
+ }
+
+ if (!time_after_eq(jiffies, r->res_toss_time +
+ dlm_config.ci_toss_secs * HZ)) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_debug(ls, "remove_name toss_time %lu now %lu %s",
+ r->res_toss_time, jiffies, name);
+ continue;
+ }
+
+ if (!kref_put(&r->res_ref, kill_rsb)) {
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "remove_name in use %s", name);
+ continue;
+ }
+
+ rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+
+ /* block lookup of same name until we've sent remove */
+ spin_lock(&ls->ls_remove_spin);
+ ls->ls_remove_len = len;
+ memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
+ spin_unlock(&ls->ls_remove_spin);
+ spin_unlock(&ls->ls_rsbtbl[b].lock);
+
+ send_remove(r);
+
+ /* allow lookup of name again */
+ spin_lock(&ls->ls_remove_spin);
+ ls->ls_remove_len = 0;
+ memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
+ spin_unlock(&ls->ls_remove_spin);
+
+ dlm_free_rsb(r);
+ }
}
void dlm_scan_rsbs(struct dlm_ls *ls)
@@ -2608,6 +2713,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
return 0;
}
+ wait_pending_remove(r);
+
r->res_first_lkid = lkb->lkb_id;
send_lookup(r, lkb);
return 1;
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index d4d3b3165c6..952557d00cc 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -506,6 +506,15 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}
+ spin_lock_init(&ls->ls_remove_spin);
+
+ for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
+ ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
+ GFP_KERNEL);
+ if (!ls->ls_remove_names[i])
+ goto out_rsbtbl;
+ }
+
idr_init(&ls->ls_lkbidr);
spin_lock_init(&ls->ls_lkbidr_spin);
@@ -556,7 +565,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
if (!ls->ls_recover_buf)
- goto out_lkbfree;
+ goto out_lkbidr;
ls->ls_slot = 0;
ls->ls_num_slots = 0;
@@ -640,8 +649,13 @@ static int new_lockspace(const char *name, const char *cluster,
spin_unlock(&lslist_lock);
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
- out_lkbfree:
+ out_lkbidr:
idr_destroy(&ls->ls_lkbidr);
+ for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
+ if (ls->ls_remove_names[i])
+ kfree(ls->ls_remove_names[i]);
+ }
+ out_rsbtbl:
vfree(ls->ls_rsbtbl);
out_lsfree:
if (do_unreg)
@@ -796,6 +810,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
vfree(ls->ls_rsbtbl);
+ for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
+ kfree(ls->ls_remove_names[i]);
+
while (!list_empty(&ls->ls_new_rsb)) {
rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
res_hashchain);