summaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c475
1 files changed, 371 insertions, 104 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 20c6ca8ac7f..764d15defd8 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -46,6 +46,7 @@
#include "ocfs2.h"
#include "alloc.h"
+#include "dcache.h"
#include "dlmglue.h"
#include "extent_map.h"
#include "heartbeat.h"
@@ -69,6 +70,9 @@ struct ocfs2_mask_waiter {
static void ocfs2_inode_ast_func(void *opaque);
static void ocfs2_inode_bast_func(void *opaque,
int level);
+static void ocfs2_dentry_ast_func(void *opaque);
+static void ocfs2_dentry_bast_func(void *opaque,
+ int level);
static void ocfs2_super_ast_func(void *opaque);
static void ocfs2_super_bast_func(void *opaque,
int level);
@@ -76,32 +80,57 @@ static void ocfs2_rename_ast_func(void *opaque);
static void ocfs2_rename_bast_func(void *opaque,
int level);
+/*
+ * Return value from ocfs2_convert_worker_t functions.
+ *
+ * These control the precise actions of ocfs2_generic_unblock_lock()
+ * and ocfs2_process_blocked_lock()
+ *
+ */
+enum ocfs2_unblock_action {
+ UNBLOCK_CONTINUE = 0, /* Continue downconvert */
+ UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire
+ * ->post_unlock callback */
+ UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire
+ * ->post_unlock() callback. */
+};
+
+struct ocfs2_unblock_ctl {
+ int requeue;
+ enum ocfs2_unblock_action unblock_action;
+};
+
/* so far, all locks have gotten along with the same unlock ast */
static void ocfs2_unlock_ast_func(void *opaque,
enum dlm_status status);
-static int ocfs2_do_unblock_meta(struct inode *inode,
- int *requeue);
static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
- int *requeue);
+ struct ocfs2_unblock_ctl *ctl);
static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
- int *requeue);
+ struct ocfs2_unblock_ctl *ctl);
static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
- int *requeue);
+ struct ocfs2_unblock_ctl *ctl);
+static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
+ struct ocfs2_unblock_ctl *ctl);
static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
- int *requeue);
-typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
-static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
- struct ocfs2_lock_res *lockres,
- int *requeue,
- ocfs2_convert_worker_t *worker);
+ struct ocfs2_unblock_ctl *ctl);
+
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
struct ocfs2_lock_res_ops {
void (*ast)(void *);
void (*bast)(void *, int);
void (*unlock_ast)(void *, enum dlm_status);
- int (*unblock)(struct ocfs2_lock_res *, int *);
+ int (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
+ void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
};
+typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
+static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ struct ocfs2_unblock_ctl *ctl,
+ ocfs2_convert_worker_t *worker);
+
static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
.ast = ocfs2_inode_ast_func,
.bast = ocfs2_inode_bast_func,
@@ -116,9 +145,6 @@ static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
.unblock = ocfs2_unblock_meta,
};
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
- int blocking);
-
static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
.ast = ocfs2_inode_ast_func,
.bast = ocfs2_inode_bast_func,
@@ -140,6 +166,14 @@ static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
.unblock = ocfs2_unblock_osb_lock,
};
+static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
+ .ast = ocfs2_dentry_ast_func,
+ .bast = ocfs2_dentry_bast_func,
+ .unlock_ast = ocfs2_unlock_ast_func,
+ .unblock = ocfs2_unblock_dentry_lock,
+ .post_unlock = ocfs2_dentry_post_unlock,
+};
+
static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
{
return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -172,6 +206,13 @@ static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
return (struct inode *) lockres->l_priv;
}
+static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
+{
+ BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
+
+ return (struct ocfs2_dentry_lock *)lockres->l_priv;
+}
+
static int ocfs2_lock_create(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
@@ -204,22 +245,6 @@ static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
struct ocfs2_lock_res *lockres,
int new_level);
-static char *ocfs2_lock_type_strings[] = {
- [OCFS2_LOCK_TYPE_META] = "Meta",
- [OCFS2_LOCK_TYPE_DATA] = "Data",
- [OCFS2_LOCK_TYPE_SUPER] = "Super",
- [OCFS2_LOCK_TYPE_RENAME] = "Rename",
- /* Need to differntiate from [R]ename.. serializing writes is the
- * important job it does, anyway. */
- [OCFS2_LOCK_TYPE_RW] = "Write/Read",
-};
-
-static char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
-{
- mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
- return ocfs2_lock_type_strings[type];
-}
-
static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
u64 blkno,
u32 generation,
@@ -265,13 +290,9 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
struct ocfs2_lock_res *res,
enum ocfs2_lock_type type,
- u64 blkno,
- u32 generation,
struct ocfs2_lock_res_ops *ops,
void *priv)
{
- ocfs2_build_lock_name(type, blkno, generation, res->l_name);
-
res->l_type = type;
res->l_ops = ops;
res->l_priv = priv;
@@ -319,9 +340,59 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
break;
};
- ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type,
- OCFS2_I(inode)->ip_blkno,
- inode->i_generation, ops, inode);
+ ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
+ inode->i_generation, res->l_name);
+ ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
+}
+
+static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
+{
+ __be64 inode_blkno_be;
+
+ memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
+ sizeof(__be64));
+
+ return be64_to_cpu(inode_blkno_be);
+}
+
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+ u64 parent, struct inode *inode)
+{
+ int len;
+ u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
+ __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
+ struct ocfs2_lock_res *lockres = &dl->dl_lockres;
+
+ ocfs2_lock_res_init_once(lockres);
+
+ /*
+ * Unfortunately, the standard lock naming scheme won't work
+ * here because we have two 16 byte values to use. Instead,
+ * we'll stuff the inode number as a binary value. We still
+ * want error prints to show something without garbling the
+ * display, so drop a null byte in there before the inode
+ * number. A future version of OCFS2 will likely use all
+ * binary lock names. The stringified names have been a
+ * tremendous aid in debugging, but now that the debugfs
+ * interface exists, we can mangle things there if need be.
+ *
+ * NOTE: We also drop the standard "pad" value (the total lock
+ * name size stays the same though - the last part is all
+ * zeros due to the memset in ocfs2_lock_res_init_once()
+ */
+ len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
+ "%c%016llx",
+ ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
+ (long long)parent);
+
+ BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
+
+ memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
+ sizeof(__be64));
+
+ ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
+ OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
+ dl);
}
static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
@@ -330,8 +401,9 @@ static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
/* Superblock lockres doesn't come from a slab so we call init
* once on it manually. */
ocfs2_lock_res_init_once(res);
+ ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
+ 0, res->l_name);
ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
- OCFS2_SUPER_BLOCK_BLKNO, 0,
&ocfs2_super_lops, osb);
}
@@ -341,7 +413,8 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
/* Rename lockres doesn't come from a slab so we call init
* once on it manually. */
ocfs2_lock_res_init_once(res);
- ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0,
+ ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
+ ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
&ocfs2_rename_lops, osb);
}
@@ -627,9 +700,10 @@ static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
ocfs2_schedule_blocked_lock(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
+ wake_up(&lockres->l_event);
+
ocfs2_kick_vote_thread(osb);
- wake_up(&lockres->l_event);
mlog_exit_void();
}
@@ -690,9 +764,9 @@ static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
/* set it to something invalid so if we get called again we
* can catch it. */
lockres->l_action = OCFS2_AST_INVALID;
- spin_unlock_irqrestore(&lockres->l_lock, flags);
wake_up(&lockres->l_event);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
}
static void ocfs2_super_ast_func(void *opaque)
@@ -757,6 +831,27 @@ static void ocfs2_rename_bast_func(void *opaque,
mlog_exit_void();
}
+static void ocfs2_dentry_ast_func(void *opaque)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+
+ BUG_ON(!lockres);
+
+ ocfs2_generic_ast_func(lockres, 1);
+}
+
+static void ocfs2_dentry_bast_func(void *opaque, int level)
+{
+ struct ocfs2_lock_res *lockres = opaque;
+ struct ocfs2_dentry_lock *dl = lockres->l_priv;
+ struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
+
+ mlog(0, "Dentry bast: level: %d, name: %s\n", level,
+ lockres->l_name);
+
+ ocfs2_generic_bast_func(osb, lockres, level);
+}
+
static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
int convert)
{
@@ -1076,10 +1171,11 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
mlog_exit_void();
}
-static int ocfs2_create_new_inode_lock(struct inode *inode,
- struct ocfs2_lock_res *lockres)
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres,
+ int ex)
{
- struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+ int level = ex ? LKM_EXMODE : LKM_PRMODE;
unsigned long flags;
spin_lock_irqsave(&lockres->l_lock, flags);
@@ -1087,7 +1183,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
spin_unlock_irqrestore(&lockres->l_lock, flags);
- return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
+ return ocfs2_lock_create(osb, lockres, level, LKM_LOCAL);
}
/* Grants us an EX lock on the data and metadata resources, skipping
@@ -1099,6 +1195,7 @@ static int ocfs2_create_new_inode_lock(struct inode *inode,
int ocfs2_create_new_inode_locks(struct inode *inode)
{
int ret;
+ struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
BUG_ON(!inode);
BUG_ON(!ocfs2_inode_is_new(inode));
@@ -1115,22 +1212,19 @@ int ocfs2_create_new_inode_locks(struct inode *inode)
* on a resource which has an invalid one -- we'll set it
* valid when we release the EX. */
- ret = ocfs2_create_new_inode_lock(inode,
- &OCFS2_I(inode)->ip_rw_lockres);
+ ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1);
if (ret) {
mlog_errno(ret);
goto bail;
}
- ret = ocfs2_create_new_inode_lock(inode,
- &OCFS2_I(inode)->ip_meta_lockres);
+ ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1);
if (ret) {
mlog_errno(ret);
goto bail;
}
- ret = ocfs2_create_new_inode_lock(inode,
- &OCFS2_I(inode)->ip_data_lockres);
+ ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1);
if (ret) {
mlog_errno(ret);
goto bail;
@@ -1809,6 +1903,34 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
}
+int ocfs2_dentry_lock(struct dentry *dentry, int ex)
+{
+ int ret;
+ int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+ struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+ BUG_ON(!dl);
+
+ if (ocfs2_is_hard_readonly(osb))
+ return -EROFS;
+
+ ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
+ if (ret < 0)
+ mlog_errno(ret);
+
+ return ret;
+}
+
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
+{
+ int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+ struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+ ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
+}
+
/* Reference counting of the dlm debug structure. We want this because
* open references on the debug inodes can live on after a mount, so
* we can't rely on the ocfs2_super to always exist. */
@@ -1939,9 +2061,16 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
if (!lockres)
return -EINVAL;
- seq_printf(m, "0x%x\t"
- "%.*s\t"
- "%d\t"
+ seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
+
+ if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
+ seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
+ lockres->l_name,
+ (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
+ else
+ seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
+
+ seq_printf(m, "%d\t"
"0x%lx\t"
"0x%x\t"
"0x%x\t"
@@ -1949,8 +2078,6 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
"%u\t"
"%d\t"
"%d\t",
- OCFS2_DLM_DEBUG_STR_VERSION,
- OCFS2_LOCK_ID_MAX_LEN, lockres->l_name,
lockres->l_level,
lockres->l_flags,
lockres->l_action,
@@ -2311,25 +2438,21 @@ void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
spin_unlock_irqrestore(&lockres->l_lock, flags);
}
-static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres)
{
- int status;
-
- mlog_entry_void();
-
- ocfs2_mark_lockres_freeing(&osb->osb_super_lockres);
-
- status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL);
- if (status < 0)
- mlog_errno(status);
-
- ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres);
+ int ret;
- status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL);
- if (status < 0)
- mlog_errno(status);
+ ocfs2_mark_lockres_freeing(lockres);
+ ret = ocfs2_drop_lock(osb, lockres, NULL);
+ if (ret)
+ mlog_errno(ret);
+}
- mlog_exit(status);
+static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
+{
+ ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
+ ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
}
static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
@@ -2599,7 +2722,7 @@ leave:
static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
- int *requeue,
+ struct ocfs2_unblock_ctl *ctl,
ocfs2_convert_worker_t *worker)
{
unsigned long flags;
@@ -2615,7 +2738,7 @@ static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
recheck:
if (lockres->l_flags & OCFS2_LOCK_BUSY) {
- *requeue = 1;
+ ctl->requeue = 1;
ret = ocfs2_prepare_cancel_convert(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
if (ret) {
@@ -2631,7 +2754,7 @@ recheck:
if ((lockres->l_blocking == LKM_EXMODE)
&& (lockres->l_ex_holders || lockres->l_ro_holders)) {
spin_unlock_irqrestore(&lockres->l_lock, flags);
- *requeue = 1;
+ ctl->requeue = 1;
ret = 0;
goto leave;
}
@@ -2641,7 +2764,7 @@ recheck:
if (lockres->l_blocking == LKM_PRMODE &&
lockres->l_ex_holders) {
spin_unlock_irqrestore(&lockres->l_lock, flags);
- *requeue = 1;
+ ctl->requeue = 1;
ret = 0;
goto leave;
}
@@ -2659,7 +2782,10 @@ recheck:
blocking = lockres->l_blocking;
spin_unlock_irqrestore(&lockres->l_lock, flags);
- worker(lockres, blocking);
+ ctl->unblock_action = worker(lockres, blocking);
+
+ if (ctl->unblock_action == UNBLOCK_STOP_POST)
+ goto leave;
spin_lock_irqsave(&lockres->l_lock, flags);
if (blocking != lockres->l_blocking) {
@@ -2669,7 +2795,7 @@ recheck:
}
downconvert:
- *requeue = 0;
+ ctl->requeue = 0;
new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
ocfs2_prepare_downconvert(lockres, new_level);
@@ -2680,14 +2806,12 @@ leave:
return ret;
}
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
- int blocking)
+static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+ int blocking)
{
struct inode *inode;
struct address_space *mapping;
- mlog_entry_void();
-
inode = ocfs2_lock_res_inode(lockres);
mapping = inode->i_mapping;
@@ -2708,11 +2832,11 @@ static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
filemap_fdatawait(mapping);
}
- mlog_exit_void();
+ return UNBLOCK_CONTINUE;
}
int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
- int *requeue)
+ struct ocfs2_unblock_ctl *ctl)
{
int status;
struct inode *inode;
@@ -2726,22 +2850,20 @@ int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
mlog(0, "unblock inode %llu\n",
(unsigned long long)OCFS2_I(inode)->ip_blkno);
- status = ocfs2_generic_unblock_lock(osb,
- lockres,
- requeue,
+ status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
ocfs2_data_convert_worker);
if (status < 0)
mlog_errno(status);
mlog(0, "inode %llu, requeue = %d\n",
- (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
+ (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
mlog_exit(status);
return status;
}
static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
- int *requeue)
+ struct ocfs2_unblock_ctl *ctl)
{
int status;
struct inode *inode;
@@ -2753,9 +2875,7 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
inode = ocfs2_lock_res_inode(lockres);
status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
- lockres,
- requeue,
- NULL);
+ lockres, ctl, NULL);
if (status < 0)
mlog_errno(status);
@@ -2763,9 +2883,8 @@ static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
return status;
}
-
-int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
- int *requeue)
+static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
+ struct ocfs2_unblock_ctl *ctl)
{
int status;
struct inode *inode;
@@ -2777,21 +2896,165 @@ int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
mlog(0, "unblock inode %llu\n",
(unsigned long long)OCFS2_I(inode)->ip_blkno);
- status = ocfs2_do_unblock_meta(inode, requeue);
+ status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
if (status < 0)
mlog_errno(status);
mlog(0, "inode %llu, requeue = %d\n",
- (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
+ (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
mlog_exit(status);
return status;
}
+/*
+ * Does the final reference drop on our dentry lock. Right now this
+ * happens in the vote thread, but we could choose to simplify the
+ * dlmglue API and push these off to the ocfs2_wq in the future.
+ */
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres)
+{
+ struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+ ocfs2_dentry_lock_put(osb, dl);
+}
+
+/*
+ * d_delete() matching dentries before the lock downconvert.
+ *
+ * At this point, any process waiting to destroy the
+ * dentry_lock due to last ref count is stopped by the
+ * OCFS2_LOCK_QUEUED flag.
+ *
+ * We have two potential problems
+ *
+ * 1) If we do the last reference drop on our dentry_lock (via dput)
+ * we'll wind up in ocfs2_release_dentry_lock(), waiting on
+ * the downconvert to finish. Instead we take an elevated
+ * reference and push the drop until after we've completed our
+ * unblock processing.
+ *
+ * 2) There might be another process with a final reference,
+ * waiting on us to finish processing. If this is the case, we
+ * detect it and exit out - there's no more dentries anyway.
+ */
+static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
+ int blocking)
+{
+ struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+ struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
+ struct dentry *dentry;
+ unsigned long flags;
+ int extra_ref = 0;
+
+ /*
+ * This node is blocking another node from getting a read
+ * lock. This happens when we've renamed within a
+ * directory. We've forced the other nodes to d_delete(), but
+ * we never actually dropped our lock because it's still
+ * valid. The downconvert code will retain a PR for this node,
+ * so there's no further work to do.
+ */
+ if (blocking == LKM_PRMODE)
+ return UNBLOCK_CONTINUE;
+
+ /*
+ * Mark this inode as potentially orphaned. The code in
+ * ocfs2_delete_inode() will figure out whether it actually
+ * needs to be freed or not.
+ */
+ spin_lock(&oi->ip_lock);
+ oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
+ spin_unlock(&oi->ip_lock);
+
+ /*
+ * Yuck. We need to make sure however that the check of
+ * OCFS2_LOCK_FREEING and the extra reference are atomic with
+ * respect to a reference decrement or the setting of that
+ * flag.
+ */
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ spin_lock(&dentry_attach_lock);
+ if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
+ && dl->dl_count) {
+ dl->dl_count++;
+ extra_ref = 1;
+ }
+ spin_unlock(&dentry_attach_lock);
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ mlog(0, "extra_ref = %d\n", extra_ref);
+
+ /*
+ * We have a process waiting on us in ocfs2_dentry_iput(),
+ * which means we can't have any more outstanding
+ * aliases. There's no need to do any more work.
+ */
+ if (!extra_ref)
+ return UNBLOCK_CONTINUE;
+
+ spin_lock(&dentry_attach_lock);
+ while (1) {
+ dentry = ocfs2_find_local_alias(dl->dl_inode,
+ dl->dl_parent_blkno, 1);
+ if (!dentry)
+ break;
+ spin_unlock(&dentry_attach_lock);
+
+ mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
+ dentry->d_name.name);
+
+ /*
+ * The following dcache calls may do an
+ * iput(). Normally we don't want that from the
+ * downconverting thread, but in this case it's ok
+ * because the requesting node already has an
+ * exclusive lock on the inode, so it can't be queued
+ * for a downconvert.
+ */
+ d_delete(dentry);
+ dput(dentry);
+
+ spin_lock(&dentry_attach_lock);
+ }
+ spin_unlock(&dentry_attach_lock);
+
+ /*
+ * If we are the last holder of this dentry lock, there is no
+ * reason to downconvert so skip straight to the unlock.
+ */
+ if (dl->dl_count == 1)
+ return UNBLOCK_STOP_POST;
+
+ return UNBLOCK_CONTINUE_POST;
+}
+
+static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
+ struct ocfs2_unblock_ctl *ctl)
+{
+ int ret;
+ struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+ struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
+
+ mlog(0, "unblock dentry lock: %llu\n",
+ (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
+
+ ret = ocfs2_generic_unblock_lock(osb,
+ lockres,
+ ctl,
+ ocfs2_dentry_convert_worker);
+ if (ret < 0)
+ mlog_errno(ret);
+
+ mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
+
+ return ret;
+}
+
/* Generic unblock function for any lockres whose private data is an
* ocfs2_super pointer. */
static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
- int *requeue)
+ struct ocfs2_unblock_ctl *ctl)
{
int status;
struct ocfs2_super *osb;
@@ -2804,7 +3067,7 @@ static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
status = ocfs2_generic_unblock_lock(osb,
lockres,
- requeue,
+ ctl,
NULL);
if (status < 0)
mlog_errno(status);
@@ -2817,7 +3080,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
int status;
- int requeue = 0;
+ struct ocfs2_unblock_ctl ctl = {0, 0,};
unsigned long flags;
/* Our reference to the lockres in this function can be
@@ -2842,21 +3105,25 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
goto unqueue;
spin_unlock_irqrestore(&lockres->l_lock, flags);
- status = lockres->l_ops->unblock(lockres, &requeue);
+ status = lockres->l_ops->unblock(lockres, &ctl);
if (status < 0)
mlog_errno(status);
spin_lock_irqsave(&lockres->l_lock, flags);
unqueue:
- if (lockres->l_flags & OCFS2_LOCK_FREEING || !requeue) {
+ if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
} else
ocfs2_schedule_blocked_lock(osb, lockres);
mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
- requeue ? "yes" : "no");
+ ctl.requeue ? "yes" : "no");
spin_unlock_irqrestore(&lockres->l_lock, flags);
+ if (ctl.unblock_action != UNBLOCK_CONTINUE
+ && lockres->l_ops->post_unlock)
+ lockres->l_ops->post_unlock(osb, lockres);
+
mlog_exit_void();
}