diff options
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r-- | fs/ocfs2/dlmglue.c | 546 |
1 files changed, 403 insertions, 143 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index 4e97dcceaf8..3867244fb14 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -55,7 +55,6 @@ #include "slot_map.h" #include "super.h" #include "uptodate.h" -#include "vote.h" #include "buffer_head_io.h" @@ -69,6 +68,7 @@ struct ocfs2_mask_waiter { static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); +static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); /* * Return value from ->downconvert_worker functions. @@ -153,10 +153,10 @@ struct ocfs2_lock_res_ops { struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); /* - * Optionally called in the downconvert (or "vote") thread - * after a successful downconvert. The lockres will not be - * referenced after this callback is called, so it is safe to - * free memory, etc. + * Optionally called in the downconvert thread after a + * successful downconvert. The lockres will not be referenced + * after this callback is called, so it is safe to free + * memory, etc. * * The exact semantics of when this is called are controlled * by ->downconvert_worker() @@ -225,17 +225,12 @@ static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { .flags = 0, }; -static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = { +static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { .get_osb = ocfs2_get_inode_osb, .check_downconvert = ocfs2_check_meta_downconvert, .set_lvb = ocfs2_set_meta_lvb, - .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, -}; - -static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = { - .get_osb = ocfs2_get_inode_osb, .downconvert_worker = ocfs2_data_convert_worker, - .flags = 0, + .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, }; static struct ocfs2_lock_res_ops ocfs2_super_lops = { @@ -258,10 +253,14 @@ static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { .flags = 0, }; +static struct ocfs2_lock_res_ops ocfs2_flock_lops = { + .get_osb = ocfs2_get_file_osb, + .flags = 0, +}; + static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) { return lockres->l_type == OCFS2_LOCK_TYPE_META || - lockres->l_type == OCFS2_LOCK_TYPE_DATA || lockres->l_type == OCFS2_LOCK_TYPE_RW || lockres->l_type == OCFS2_LOCK_TYPE_OPEN; } @@ -310,12 +309,24 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, "resource %s: %s\n", dlm_errname(_stat), _func, \ _lockres->l_name, dlm_errmsg(_stat)); \ } while (0) -static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, - struct ocfs2_lock_res *lockres); -static int ocfs2_meta_lock_update(struct inode *inode, +static int ocfs2_downconvert_thread(void *arg); +static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres); +static int ocfs2_inode_lock_update(struct inode *inode, struct buffer_head **bh); static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); static inline int ocfs2_highest_compat_lock_level(int level); +static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, + int new_level); +static int ocfs2_downconvert_lock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres, + int new_level, + int lvb); +static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres); +static int ocfs2_cancel_convert(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres); + static void ocfs2_build_lock_name(enum ocfs2_lock_type type, u64 blkno, @@ -402,10 +413,7 @@ void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, ops = &ocfs2_inode_rw_lops; break; case OCFS2_LOCK_TYPE_META: - ops = &ocfs2_inode_meta_lops; - break; - case OCFS2_LOCK_TYPE_DATA: - ops = &ocfs2_inode_data_lops; + ops = &ocfs2_inode_inode_lops; break; case OCFS2_LOCK_TYPE_OPEN: ops = &ocfs2_inode_open_lops; @@ -428,6 +436,13 @@ static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) return OCFS2_SB(inode->i_sb); } +static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) +{ + struct ocfs2_file_private *fp = lockres->l_priv; + + return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); +} + static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) { __be64 inode_blkno_be; @@ -508,6 +523,21 @@ static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, &ocfs2_rename_lops, osb); } +void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, + struct ocfs2_file_private *fp) +{ + struct inode *inode = fp->fp_file->f_mapping->host; + struct ocfs2_inode_info *oi = OCFS2_I(inode); + + ocfs2_lock_res_init_once(lockres); + ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, + inode->i_generation, lockres->l_name); + ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, + OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, + fp); + lockres->l_flags |= OCFS2_LOCK_NOCACHE; +} + void ocfs2_lock_res_free(struct ocfs2_lock_res *res) { mlog_entry_void(); @@ -724,6 +754,13 @@ static void ocfs2_blocking_ast(void *opaque, int level) lockres->l_name, level, lockres->l_level, ocfs2_lock_type_string(lockres->l_type)); + /* + * We can skip the bast for locks which don't enable caching - + * they'll be dropped at the earliest possible time anyway. + */ + if (lockres->l_flags & OCFS2_LOCK_NOCACHE) + return; + spin_lock_irqsave(&lockres->l_lock, flags); needs_downconvert = ocfs2_generic_handle_bast(lockres, level); if (needs_downconvert) @@ -732,7 +769,7 @@ static void ocfs2_blocking_ast(void *opaque, int level) wake_up(&lockres->l_event); - ocfs2_kick_vote_thread(osb); + ocfs2_wake_downconvert_thread(osb); } static void ocfs2_locking_ast(void *opaque) @@ -935,6 +972,21 @@ static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, } +static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, + struct ocfs2_lock_res *lockres) +{ + int ret; + + ret = wait_for_completion_interruptible(&mw->mw_complete); + if (ret) + lockres_remove_mask_waiter(lockres, mw); + else + ret = mw->mw_status; + /* Re-arm the completion in case we want to wait on it again */ + INIT_COMPLETION(mw->mw_complete); + return ret; +} + static int ocfs2_cluster_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, @@ -1089,7 +1141,7 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb, mlog_entry_void(); spin_lock_irqsave(&lockres->l_lock, flags); ocfs2_dec_holders(lockres, level); - ocfs2_vote_on_unlock(osb, lockres); + ocfs2_downconvert_on_unlock(osb, lockres); spin_unlock_irqrestore(&lockres->l_lock, flags); mlog_exit_void(); } @@ -1147,13 +1199,7 @@ int ocfs2_create_new_inode_locks(struct inode *inode) * We don't want to use LKM_LOCAL on a meta data lock as they * don't use a generation in their lock names. */ - ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0); - if (ret) { - mlog_errno(ret); - goto bail; - } - - ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1); + ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); if (ret) { mlog_errno(ret); goto bail; @@ -1311,76 +1357,221 @@ out: mlog_exit_void(); } -int ocfs2_data_lock_full(struct inode *inode, - int write, - int arg_flags) +static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, + int level) { - int status = 0, level; - struct ocfs2_lock_res *lockres; - struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); + int ret; + struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); + unsigned long flags; + struct ocfs2_mask_waiter mw; - BUG_ON(!inode); + ocfs2_init_mask_waiter(&mw); - mlog_entry_void(); +retry_cancel: + spin_lock_irqsave(&lockres->l_lock, flags); + if (lockres->l_flags & OCFS2_LOCK_BUSY) { + ret = ocfs2_prepare_cancel_convert(osb, lockres); + if (ret) { + spin_unlock_irqrestore(&lockres->l_lock, flags); + ret = ocfs2_cancel_convert(osb, lockres); + if (ret < 0) { + mlog_errno(ret); + goto out; + } + goto retry_cancel; + } + lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); + spin_unlock_irqrestore(&lockres->l_lock, flags); - mlog(0, "inode %llu take %s DATA lock\n", - (unsigned long long)OCFS2_I(inode)->ip_blkno, - write ? "EXMODE" : "PRMODE"); + ocfs2_wait_for_mask(&mw); + goto retry_cancel; + } - /* We'll allow faking a readonly data lock for - * rodevices. */ - if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) { - if (write) { - status = -EROFS; - mlog_errno(status); + ret = -ERESTARTSYS; + /* + * We may still have gotten the lock, in which case there's no + * point to restarting the syscall. + */ + if (lockres->l_level == level) + ret = 0; + + mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, + lockres->l_flags, lockres->l_level, lockres->l_action); + + spin_unlock_irqrestore(&lockres->l_lock, flags); + +out: + return ret; +} + +/* + * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of + * flock() calls. The locking approach this requires is sufficiently + * different from all other cluster lock types that we implement a + * seperate path to the "low-level" dlm calls. In particular: + * + * - No optimization of lock levels is done - we take at exactly + * what's been requested. + * + * - No lock caching is employed. We immediately downconvert to + * no-lock at unlock time. This also means flock locks never go on + * the blocking list). + * + * - Since userspace can trivially deadlock itself with flock, we make + * sure to allow cancellation of a misbehaving applications flock() + * request. + * + * - Access to any flock lockres doesn't require concurrency, so we + * can simplify the code by requiring the caller to guarantee + * serialization of dlmglue flock calls. + */ +int ocfs2_file_lock(struct file *file, int ex, int trylock) +{ + int ret, level = ex ? LKM_EXMODE : LKM_PRMODE; + unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0; + unsigned long flags; + struct ocfs2_file_private *fp = file->private_data; + struct ocfs2_lock_res *lockres = &fp->fp_flock; + struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); + struct ocfs2_mask_waiter mw; + + ocfs2_init_mask_waiter(&mw); + + if ((lockres->l_flags & OCFS2_LOCK_BUSY) || + (lockres->l_level > LKM_NLMODE)) { + mlog(ML_ERROR, + "File lock \"%s\" has busy or locked state: flags: 0x%lx, " + "level: %u\n", lockres->l_name, lockres->l_flags, + lockres->l_level); + return -EINVAL; + } + + spin_lock_irqsave(&lockres->l_lock, flags); + if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { + lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); + spin_unlock_irqrestore(&lockres->l_lock, flags); + + /* + * Get the lock at NLMODE to start - that way we + * can cancel the upconvert request if need be. + */ + ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); + if (ret < 0) { + mlog_errno(ret); + goto out; } - goto out; + + ret = ocfs2_wait_for_mask(&mw); + if (ret) { + mlog_errno(ret); + goto out; + } + spin_lock_irqsave(&lockres->l_lock, flags); } - if (ocfs2_mount_local(osb)) - goto out; + lockres->l_action = OCFS2_AST_CONVERT; + lkm_flags |= LKM_CONVERT; + lockres->l_requested = level; + lockres_or_flags(lockres, OCFS2_LOCK_BUSY); - lockres = &OCFS2_I(inode)->ip_data_lockres; + lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); + spin_unlock_irqrestore(&lockres->l_lock, flags); - level = write ? LKM_EXMODE : LKM_PRMODE; + ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags, + lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, + ocfs2_locking_ast, lockres, ocfs2_blocking_ast); + if (ret != DLM_NORMAL) { + if (trylock && ret == DLM_NOTQUEUED) + ret = -EAGAIN; + else { + ocfs2_log_dlm_error("dlmlock", ret, lockres); + ret = -EINVAL; + } - status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, - 0, arg_flags); - if (status < 0 && status != -EAGAIN) - mlog_errno(status); + ocfs2_recover_from_dlm_error(lockres, 1); + lockres_remove_mask_waiter(lockres, &mw); + goto out; + } + + ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); + if (ret == -ERESTARTSYS) { + /* + * Userspace can cause deadlock itself with + * flock(). Current behavior locally is to allow the + * deadlock, but abort the system call if a signal is + * received. We follow this example, otherwise a + * poorly written program could sit in kernel until + * reboot. + * + * Handling this is a bit more complicated for Ocfs2 + * though. We can't exit this function with an + * outstanding lock request, so a cancel convert is + * required. We intentionally overwrite 'ret' - if the + * cancel fails and the lock was granted, it's easier + * to just bubble sucess back up to the user. + */ + ret = ocfs2_flock_handle_signal(lockres, level); + } out: - mlog_exit(status); - return status; + + mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", + lockres->l_name, ex, trylock, ret); + return ret; } -/* see ocfs2_meta_lock_with_page() */ -int ocfs2_data_lock_with_page(struct inode *inode, - int write, - struct page *page) +void ocfs2_file_unlock(struct file *file) { int ret; + unsigned long flags; + struct ocfs2_file_private *fp = file->private_data; + struct ocfs2_lock_res *lockres = &fp->fp_flock; + struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); + struct ocfs2_mask_waiter mw; - ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK); - if (ret == -EAGAIN) { - unlock_page(page); - if (ocfs2_data_lock(inode, write) == 0) - ocfs2_data_unlock(inode, write); - ret = AOP_TRUNCATED_PAGE; + ocfs2_init_mask_waiter(&mw); + + if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) + return; + + if (lockres->l_level == LKM_NLMODE) + return; + + mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", + lockres->l_name, lockres->l_flags, lockres->l_level, + lockres->l_action); + + spin_lock_irqsave(&lockres->l_lock, flags); + /* + * Fake a blocking ast for the downconvert code. + */ + lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); + lockres->l_blocking = LKM_EXMODE; + + ocfs2_prepare_downconvert(lockres, LKM_NLMODE); + lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); + spin_unlock_irqrestore(&lockres->l_lock, flags); + + ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); + if (ret) { + mlog_errno(ret); + return; } - return ret; + ret = ocfs2_wait_for_mask(&mw); + if (ret) + mlog_errno(ret); } -static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, - struct ocfs2_lock_res *lockres) +static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres) { int kick = 0; mlog_entry_void(); /* If we know that another node is waiting on our lock, kick - * the vote thread * pre-emptively when we reach a release + * the downconvert thread * pre-emptively when we reach a release * condition. */ if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { switch(lockres->l_blocking) { @@ -1398,27 +1589,7 @@ static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, } if (kick) - ocfs2_kick_vote_thread(osb); - - mlog_exit_void(); -} - -void ocfs2_data_unlock(struct inode *inode, - int write) -{ - int level = write ? LKM_EXMODE : LKM_PRMODE; - struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres; - struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); - - mlog_entry_void(); - - mlog(0, "inode %llu drop %s DATA lock\n", - (unsigned long long)OCFS2_I(inode)->ip_blkno, - write ? "EXMODE" : "PRMODE"); - - if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && - !ocfs2_mount_local(osb)) - ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); + ocfs2_wake_downconvert_thread(osb); mlog_exit_void(); } @@ -1442,11 +1613,11 @@ static u64 ocfs2_pack_timespec(struct timespec *spec) /* Call this with the lockres locked. I am reasonably sure we don't * need ip_lock in this function as anyone who would be changing those - * values is supposed to be blocked in ocfs2_meta_lock right now. */ + * values is supposed to be blocked in ocfs2_inode_lock right now. */ static void __ocfs2_stuff_meta_lvb(struct inode *inode) { struct ocfs2_inode_info *oi = OCFS2_I(inode); - struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; + struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; struct ocfs2_meta_lvb *lvb; mlog_entry_void(); @@ -1496,7 +1667,7 @@ static void ocfs2_unpack_timespec(struct timespec *spec, static void ocfs2_refresh_inode_from_lvb(struct inode *inode) { struct ocfs2_inode_info *oi = OCFS2_I(inode); - struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; + struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; struct ocfs2_meta_lvb *lvb; mlog_entry_void(); @@ -1604,12 +1775,12 @@ static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockre } /* may or may not return a bh if it went to disk. */ -static int ocfs2_meta_lock_update(struct inode *inode, +static int ocfs2_inode_lock_update(struct inode *inode, struct buffer_head **bh) { int status = 0; struct ocfs2_inode_info *oi = OCFS2_I(inode); - struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; + struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; struct ocfs2_dinode *fe; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); @@ -1721,7 +1892,7 @@ static int ocfs2_assign_bh(struct inode *inode, * returns < 0 error if the callback will never be called, otherwise * the result of the lock will be communicated via the callback. */ -int ocfs2_meta_lock_full(struct inode *inode, +int ocfs2_inode_lock_full(struct inode *inode, struct buffer_head **ret_bh, int ex, int arg_flags) @@ -1756,7 +1927,7 @@ int ocfs2_meta_lock_full(struct inode *inode, wait_event(osb->recovery_event, ocfs2_node_map_is_empty(osb, &osb->recovery_map)); - lockres = &OCFS2_I(inode)->ip_meta_lockres; + lockres = &OCFS2_I(inode)->ip_inode_lockres; level = ex ? LKM_EXMODE : LKM_PRMODE; dlm_flags = 0; if (arg_flags & OCFS2_META_LOCK_NOQUEUE) @@ -1795,11 +1966,11 @@ local: } /* This is fun. The caller may want a bh back, or it may - * not. ocfs2_meta_lock_update definitely wants one in, but + * not. ocfs2_inode_lock_update definitely wants one in, but * may or may not read one, depending on what's in the * LVB. The result of all of this is that we've *only* gone to * disk if we have to, so the complexity is worthwhile. */ - status = ocfs2_meta_lock_update(inode, &local_bh); + status = ocfs2_inode_lock_update(inode, &local_bh); if (status < 0) { if (status != -ENOENT) mlog_errno(status); @@ -1821,7 +1992,7 @@ bail: *ret_bh = NULL; } if (acquired) - ocfs2_meta_unlock(inode, ex); + ocfs2_inode_unlock(inode, ex); } if (local_bh) @@ -1832,19 +2003,20 @@ bail: } /* - * This is working around a lock inversion between tasks acquiring DLM locks - * while holding a page lock and the vote thread which blocks dlm lock acquiry - * while acquiring page locks. + * This is working around a lock inversion between tasks acquiring DLM + * locks while holding a page lock and the downconvert thread which + * blocks dlm lock acquiry while acquiring page locks. * * ** These _with_page variantes are only intended to be called from aop * methods that hold page locks and return a very specific *positive* error * code that aop methods pass up to the VFS -- test for errors with != 0. ** * - * The DLM is called such that it returns -EAGAIN if it would have blocked - * waiting for the vote thread. In that case we unlock our page so the vote - * thread can make progress. Once we've done this we have to return - * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up - * into the VFS who will then immediately retry the aop call. + * The DLM is called such that it returns -EAGAIN if it would have + * blocked waiting for the downconvert thread. In that case we unlock + * our page so the downconvert thread can make progress. Once we've + * done this we have to return AOP_TRUNCATED_PAGE so the aop method + * that called us can bubble that back up into the VFS who will then + * immediately retry the aop call. * * We do a blocking lock and immediate unlock before returning, though, so that * the lock has a great chance of being cached on this node by the time the VFS @@ -1852,32 +2024,32 @@ bail: * ping locks back and forth, but that's a risk we're willing to take to avoid * the lock inversion simply. */ -int ocfs2_meta_lock_with_page(struct inode *inode, +int ocfs2_inode_lock_with_page(struct inode *inode, struct buffer_head **ret_bh, int ex, struct page *page) { int ret; - ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); + ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); if (ret == -EAGAIN) { unlock_page(page); - if (ocfs2_meta_lock(inode, ret_bh, ex) == 0) - ocfs2_meta_unlock(inode, ex); + if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) + ocfs2_inode_unlock(inode, ex); ret = AOP_TRUNCATED_PAGE; } return ret; } -int ocfs2_meta_lock_atime(struct inode *inode, +int ocfs2_inode_lock_atime(struct inode *inode, struct vfsmount *vfsmnt, int *level) { int ret; mlog_entry_void(); - ret = ocfs2_meta_lock(inode, NULL, 0); + ret = ocfs2_inode_lock(inode, NULL, 0); if (ret < 0) { mlog_errno(ret); return ret; @@ -1890,8 +2062,8 @@ int ocfs2_meta_lock_atime(struct inode *inode, if (ocfs2_should_update_atime(inode, vfsmnt)) { struct buffer_head *bh = NULL; - ocfs2_meta_unlock(inode, 0); - ret = ocfs2_meta_lock(inode, &bh, 1); + ocfs2_inode_unlock(inode, 0); + ret = ocfs2_inode_lock(inode, &bh, 1); if (ret < 0) { mlog_errno(ret); return ret; @@ -1908,11 +2080,11 @@ int ocfs2_meta_lock_atime(struct inode *inode, return ret; } -void ocfs2_meta_unlock(struct inode *inode, +void ocfs2_inode_unlock(struct inode *inode, int ex) { int level = ex ? LKM_EXMODE : LKM_PRMODE; - struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; + struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); mlog_entry_void(); @@ -2320,11 +2492,11 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) goto bail; } - /* launch vote thread */ - osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote"); - if (IS_ERR(osb->vote_task)) { - status = PTR_ERR(osb->vote_task); - osb->vote_task = NULL; + /* launch downconvert thread */ + osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); + if (IS_ERR(osb->dc_task)) { + status = PTR_ERR(osb->dc_task); + osb->dc_task = NULL; mlog_errno(status); goto bail; } @@ -2353,8 +2525,8 @@ local: bail: if (status < 0) { ocfs2_dlm_shutdown_debug(osb); - if (osb->vote_task) - kthread_stop(osb->vote_task); + if (osb->dc_task) + kthread_stop(osb->dc_task); } mlog_exit(status); @@ -2369,9 +2541,9 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb) ocfs2_drop_osb_locks(osb); - if (osb->vote_task) { - kthread_stop(osb->vote_task); - osb->vote_task = NULL; + if (osb->dc_task) { + kthread_stop(osb->dc_task); + osb->dc_task = NULL; } ocfs2_lock_res_free(&osb->osb_super_lockres); @@ -2527,7 +2699,7 @@ out: /* Mark the lockres as being dropped. It will no longer be * queued if blocking, but we still may have to wait on it - * being dequeued from the vote thread before we can consider + * being dequeued from the downconvert thread before we can consider * it safe to drop. * * You can *not* attempt to call cluster_lock on this lockres anymore. */ @@ -2590,14 +2762,7 @@ int ocfs2_drop_inode_locks(struct inode *inode) status = err; err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), - &OCFS2_I(inode)->ip_data_lockres); - if (err < 0) - mlog_errno(err); - if (err < 0 && !status) - status = err; - - err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), - &OCFS2_I(inode)->ip_meta_lockres); + &OCFS2_I(inode)->ip_inode_lockres); if (err < 0) mlog_errno(err); if (err < 0 && !status) @@ -2850,6 +3015,9 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, inode = ocfs2_lock_res_inode(lockres); mapping = inode->i_mapping; + if (S_ISREG(inode->i_mode)) + goto out; + /* * We need this before the filemap_fdatawrite() so that it can * transfer the dirty bit from the PTE to the @@ -2875,6 +3043,7 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, filemap_fdatawait(mapping); } +out: return UNBLOCK_CONTINUE; } @@ -2903,7 +3072,7 @@ static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) /* * Does the final reference drop on our dentry lock. Right now this - * happens in the vote thread, but we could choose to simplify the + * happens in the downconvert thread, but we could choose to simplify the * dlmglue API and push these off to the ocfs2_wq in the future. */ static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, @@ -3042,7 +3211,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb, mlog(0, "lockres %s blocked.\n", lockres->l_name); /* Detect whether a lock has been marked as going away while - * the vote thread was processing other things. A lock can + * the downconvert thread was processing other things. A lock can * still be marked with OCFS2_LOCK_FREEING after this check, * but short circuiting here will still save us some * performance. */ @@ -3091,13 +3260,104 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); - spin_lock(&osb->vote_task_lock); + spin_lock(&osb->dc_task_lock); if (list_empty(&lockres->l_blocked_list)) { list_add_tail(&lockres->l_blocked_list, &osb->blocked_lock_list); osb->blocked_lock_count++; } - spin_unlock(&osb->vote_task_lock); + spin_unlock(&osb->dc_task_lock); + + mlog_exit_void(); +} + +static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) +{ + unsigned long processed; + struct ocfs2_lock_res *lockres; + + mlog_entry_void(); + + spin_lock(&osb->dc_task_lock); + /* grab this early so we know to try again if a state change and + * wake happens part-way through our work */ + osb->dc_work_sequence = osb->dc_wake_sequence; + + processed = osb->blocked_lock_count; + while (processed) { + BUG_ON(list_empty(&osb->blocked_lock_list)); + + lockres = list_entry(osb->blocked_lock_list.next, + struct ocfs2_lock_res, l_blocked_list); + list_del_init(&lockres->l_blocked_list); + osb->blocked_lock_count--; + spin_unlock(&osb->dc_task_lock); + + BUG_ON(!processed); + processed--; + + ocfs2_process_blocked_lock(osb, lockres); + + spin_lock(&osb->dc_task_lock); + } + spin_unlock(&osb->dc_task_lock); mlog_exit_void(); } + +static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) +{ + int empty = 0; + + spin_lock(&osb->dc_task_lock); + if (list_empty(&osb->blocked_lock_list)) + empty = 1; + + spin_unlock(&osb->dc_task_lock); + return empty; +} + +static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) +{ + int should_wake = 0; + + spin_lock(&osb->dc_task_lock); + if (osb->dc_work_sequence != osb->dc_wake_sequence) + should_wake = 1; + spin_unlock(&osb->dc_task_lock); + + return should_wake; +} + +int ocfs2_downconvert_thread(void *arg) +{ + int status = 0; + struct ocfs2_super *osb = arg; + + /* only quit once we've been asked to stop and there is no more + * work available */ + while (!(kthread_should_stop() && + ocfs2_downconvert_thread_lists_empty(osb))) { + + wait_event_interruptible(osb->dc_event, + ocfs2_downconvert_thread_should_wake(osb) || + kthread_should_stop()); + + mlog(0, "downconvert_thread: awoken\n"); + + ocfs2_downconvert_thread_do_work(osb); + } + + osb->dc_task = NULL; + return status; +} + +void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) +{ + spin_lock(&osb->dc_task_lock); + /* make sure the voting thread gets a swipe at whatever changes + * the caller may have made to the voting state */ + osb->dc_wake_sequence++; + spin_unlock(&osb->dc_task_lock); + wake_up(&osb->dc_event); +} |