diff options
-rw-r--r-- | fs/ocfs2/heartbeat.c | 1 | ||||
-rw-r--r-- | fs/ocfs2/inode.c | 46 | ||||
-rw-r--r-- | fs/ocfs2/journal.c | 124 | ||||
-rw-r--r-- | fs/ocfs2/ocfs2.h | 4 | ||||
-rw-r--r-- | fs/ocfs2/super.c | 11 |
5 files changed, 154 insertions, 32 deletions
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c index 0bbd22f46c8..cbfd45a97a6 100644 --- a/fs/ocfs2/heartbeat.c +++ b/fs/ocfs2/heartbeat.c @@ -67,6 +67,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb) ocfs2_node_map_init(&osb->mounted_map); ocfs2_node_map_init(&osb->recovery_map); ocfs2_node_map_init(&osb->umount_map); + ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs); } static void ocfs2_do_node_down(int node_num, diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c index 8122489c576..315472a5c19 100644 --- a/fs/ocfs2/inode.c +++ b/fs/ocfs2/inode.c @@ -41,6 +41,7 @@ #include "dlmglue.h" #include "extent_map.h" #include "file.h" +#include "heartbeat.h" #include "inode.h" #include "journal.h" #include "namei.h" @@ -544,6 +545,42 @@ bail: return status; } +/* + * Serialize with orphan dir recovery. If the process doing + * recovery on this orphan dir does an iget() with the dir + * i_mutex held, we'll deadlock here. Instead we detect this + * and exit early - recovery will wipe this inode for us. + */ +static int ocfs2_check_orphan_recovery_state(struct ocfs2_super *osb, + int slot) +{ + int ret = 0; + + spin_lock(&osb->osb_lock); + if (ocfs2_node_map_test_bit(osb, &osb->osb_recovering_orphan_dirs, slot)) { + mlog(0, "Recovery is happening on orphan dir %d, will skip " + "this inode\n", slot); + ret = -EDEADLK; + goto out; + } + /* This signals to the orphan recovery process that it should + * wait for us to handle the wipe. */ + osb->osb_orphan_wipes[slot]++; +out: + spin_unlock(&osb->osb_lock); + return ret; +} + +static void ocfs2_signal_wipe_completion(struct ocfs2_super *osb, + int slot) +{ + spin_lock(&osb->osb_lock); + osb->osb_orphan_wipes[slot]--; + spin_unlock(&osb->osb_lock); + + wake_up(&osb->osb_wipe_event); +} + static int ocfs2_wipe_inode(struct inode *inode, struct buffer_head *di_bh) { @@ -555,6 +592,11 @@ static int ocfs2_wipe_inode(struct inode *inode, /* We've already voted on this so it should be readonly - no * spinlock needed. */ orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot; + + status = ocfs2_check_orphan_recovery_state(osb, orphaned_slot); + if (status) + return status; + orphan_dir_inode = ocfs2_get_system_file_inode(osb, ORPHAN_DIR_SYSTEM_INODE, orphaned_slot); @@ -597,6 +639,7 @@ bail_unlock_dir: brelse(orphan_dir_bh); bail: iput(orphan_dir_inode); + ocfs2_signal_wipe_completion(osb, orphaned_slot); return status; } @@ -822,7 +865,8 @@ void ocfs2_delete_inode(struct inode *inode) status = ocfs2_wipe_inode(inode, di_bh); if (status < 0) { - mlog_errno(status); + if (status != -EDEADLK) + mlog_errno(status); goto bail_unlock_inode; } diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index d329c9df90a..4be801f4559 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -1408,21 +1408,17 @@ bail: return status; } -static int ocfs2_recover_orphans(struct ocfs2_super *osb, - int slot) +static int ocfs2_queue_orphans(struct ocfs2_super *osb, + int slot, + struct inode **head) { - int status = 0; - int have_disk_lock = 0; - struct inode *inode = NULL; - struct inode *iter; + int status; struct inode *orphan_dir_inode = NULL; + struct inode *iter; unsigned long offset, blk, local; struct buffer_head *bh = NULL; struct ocfs2_dir_entry *de; struct super_block *sb = osb->sb; - struct ocfs2_inode_info *oi; - - mlog(0, "Recover inodes from orphan dir in slot %d\n", slot); orphan_dir_inode = ocfs2_get_system_file_inode(osb, ORPHAN_DIR_SYSTEM_INODE, @@ -1430,17 +1426,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, if (!orphan_dir_inode) { status = -ENOENT; mlog_errno(status); - goto out; - } + return status; + } mutex_lock(&orphan_dir_inode->i_mutex); status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0); if (status < 0) { - mutex_unlock(&orphan_dir_inode->i_mutex); mlog_errno(status); goto out; } - have_disk_lock = 1; offset = 0; iter = NULL; @@ -1451,11 +1445,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, if (!bh) status = -EINVAL; if (status < 0) { - mutex_unlock(&orphan_dir_inode->i_mutex); if (bh) brelse(bh); mlog_errno(status); - goto out; + goto out_unlock; } local = 0; @@ -1465,11 +1458,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, if (!ocfs2_check_dir_entry(orphan_dir_inode, de, bh, local)) { - mutex_unlock(&orphan_dir_inode->i_mutex); status = -EINVAL; mlog_errno(status); brelse(bh); - goto out; + goto out_unlock; } local += le16_to_cpu(de->rec_len); @@ -1504,18 +1496,95 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, mlog(0, "queue orphan %"MLFu64"\n", OCFS2_I(iter)->ip_blkno); - OCFS2_I(iter)->ip_next_orphan = inode; - inode = iter; + /* No locking is required for the next_orphan + * queue as there is only ever a single + * process doing orphan recovery. */ + OCFS2_I(iter)->ip_next_orphan = *head; + *head = iter; } brelse(bh); } - mutex_unlock(&orphan_dir_inode->i_mutex); +out_unlock: ocfs2_meta_unlock(orphan_dir_inode, 0); - have_disk_lock = 0; - +out: + mutex_unlock(&orphan_dir_inode->i_mutex); iput(orphan_dir_inode); - orphan_dir_inode = NULL; + return status; +} + +static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb, + int slot) +{ + int ret; + + spin_lock(&osb->osb_lock); + ret = !osb->osb_orphan_wipes[slot]; + spin_unlock(&osb->osb_lock); + return ret; +} + +static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb, + int slot) +{ + spin_lock(&osb->osb_lock); + /* Mark ourselves such that new processes in delete_inode() + * know to quit early. */ + ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot); + while (osb->osb_orphan_wipes[slot]) { + /* If any processes are already in the middle of an + * orphan wipe on this dir, then we need to wait for + * them. */ + spin_unlock(&osb->osb_lock); + wait_event_interruptible(osb->osb_wipe_event, + ocfs2_orphan_recovery_can_continue(osb, slot)); + spin_lock(&osb->osb_lock); + } + spin_unlock(&osb->osb_lock); +} + +static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb, + int slot) +{ + ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot); +} + +/* + * Orphan recovery. Each mounted node has it's own orphan dir which we + * must run during recovery. Our strategy here is to build a list of + * the inodes in the orphan dir and iget/iput them. The VFS does + * (most) of the rest of the work. + * + * Orphan recovery can happen at any time, not just mount so we have a + * couple of extra considerations. + * + * - We grab as many inodes as we can under the orphan dir lock - + * doing iget() outside the orphan dir risks getting a reference on + * an invalid inode. + * - We must be sure not to deadlock with other processes on the + * system wanting to run delete_inode(). This can happen when they go + * to lock the orphan dir and the orphan recovery process attempts to + * iget() inside the orphan dir lock. This can be avoided by + * advertising our state to ocfs2_delete_inode(). + */ +static int ocfs2_recover_orphans(struct ocfs2_super *osb, + int slot) +{ + int ret = 0; + struct inode *inode = NULL; + struct inode *iter; + struct ocfs2_inode_info *oi; + + mlog(0, "Recover inodes from orphan dir in slot %d\n", slot); + + ocfs2_mark_recovering_orphan_dir(osb, slot); + ret = ocfs2_queue_orphans(osb, slot, &inode); + ocfs2_clear_recovering_orphan_dir(osb, slot); + + /* Error here should be noted, but we want to continue with as + * many queued inodes as we've got. */ + if (ret) + mlog_errno(ret); while (inode) { oi = OCFS2_I(inode); @@ -1541,14 +1610,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, inode = iter; } -out: - if (have_disk_lock) - ocfs2_meta_unlock(orphan_dir_inode, 0); - - if (orphan_dir_inode) - iput(orphan_dir_inode); - - return status; + return ret; } static int ocfs2_wait_on_mount(struct ocfs2_super *osb) diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index 19360e3d842..e89de9b6e49 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -287,6 +287,10 @@ struct ocfs2_super struct inode *osb_tl_inode; struct buffer_head *osb_tl_bh; struct work_struct osb_truncate_log_wq; + + struct ocfs2_node_map osb_recovering_orphan_dirs; + unsigned int *osb_orphan_wipes; + wait_queue_head_t osb_wipe_event; }; #define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info) diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index 046824b6b62..8dd3aafec49 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -1325,6 +1325,16 @@ static int ocfs2_initialize_super(struct super_block *sb, } mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots); + init_waitqueue_head(&osb->osb_wipe_event); + osb->osb_orphan_wipes = kcalloc(osb->max_slots, + sizeof(*osb->osb_orphan_wipes), + GFP_KERNEL); + if (!osb->osb_orphan_wipes) { + status = -ENOMEM; + mlog_errno(status); + goto bail; + } + osb->s_feature_compat = le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat); osb->s_feature_ro_compat = @@ -1638,6 +1648,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb) if (osb->slot_info) ocfs2_free_slot_info(osb->slot_info); + kfree(osb->osb_orphan_wipes); /* FIXME * This belongs in journal shutdown, but because we have to * allocate osb->journal at the start of ocfs2_initalize_osb(), |