diff options
author | Sage Weil <sage@newdream.net> | 2011-08-03 09:58:09 -0700 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2011-10-25 16:10:14 -0700 |
commit | 7c272194e66e91830b90f6202e61c69f8590f1eb (patch) | |
tree | 9a899f357cbf005235fd80ab3b4a240e42498b54 /fs | |
parent | c3b92c8787367a8bb53d57d9789b558f1295cc96 (diff) |
ceph: make readpages fully async
When we get a ->readpages() aop, submit async reads for all page ranges
in the provided page list. Lock the pages immediately, so that VFS/MM
will block until the reads complete.
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/addr.c | 185 |
1 files changed, 115 insertions, 70 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5a3953db811..5bb39a50f90 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -228,102 +228,147 @@ static int ceph_readpage(struct file *filp, struct page *page) } /* - * Build a vector of contiguous pages from the provided page list. + * Finish an async read(ahead) op. */ -static struct page **page_vector_from_list(struct list_head *page_list, - unsigned *nr_pages) +static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) { - struct page **pages; - struct page *page; - int next_index, contig_pages = 0; + struct inode *inode = req->r_inode; + struct ceph_osd_reply_head *replyhead; + int rc, bytes; + int i; - /* build page vector */ - pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); - if (!pages) - return ERR_PTR(-ENOMEM); + /* parse reply */ + replyhead = msg->front.iov_base; + WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); + rc = le32_to_cpu(replyhead->result); + bytes = le32_to_cpu(msg->hdr.data_len); - BUG_ON(list_empty(page_list)); - next_index = list_entry(page_list->prev, struct page, lru)->index; - list_for_each_entry_reverse(page, page_list, lru) { - if (page->index == next_index) { - dout("readpages page %d %p\n", contig_pages, page); - pages[contig_pages] = page; - contig_pages++; - next_index++; - } else { - break; + dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); + + /* unlock all pages, zeroing any data we didn't read */ + for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) { + struct page *page = req->r_pages[i]; + + if (bytes < (int)PAGE_CACHE_SIZE) { + /* zero (remainder of) page */ + int s = bytes < 0 ? 0 : bytes; + zero_user_segment(page, s, PAGE_CACHE_SIZE); } + dout("finish_read %p uptodate %p idx %lu\n", inode, page, + page->index); + flush_dcache_page(page); + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); } - *nr_pages = contig_pages; - return pages; + kfree(req->r_pages); } /* - * Read multiple pages. Leave pages we don't read + unlock in page_list; - * the caller (VM) cleans them up. + * start an async read(ahead) operation. return nr_pages we submitted + * a read for on success, or negative error code. */ -static int ceph_readpages(struct file *file, struct address_space *mapping, - struct list_head *page_list, unsigned nr_pages) +static int start_read(struct inode *inode, struct list_head *page_list) { - struct inode *inode = file->f_dentry->d_inode; - struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; - int rc = 0; - struct page **pages; - loff_t offset; + struct ceph_inode_info *ci = ceph_inode(inode); + struct page *page = list_entry(page_list->prev, struct page, lru); + struct ceph_osd_request *req; + u64 off; u64 len; + int i; + struct page **pages; + pgoff_t next_index; + int nr_pages = 0; + int ret; - dout("readpages %p file %p nr_pages %d\n", - inode, file, nr_pages); - - pages = page_vector_from_list(page_list, &nr_pages); - if (IS_ERR(pages)) - return PTR_ERR(pages); + off = page->index << PAGE_CACHE_SHIFT; - /* guess read extent */ - offset = pages[0]->index << PAGE_CACHE_SHIFT; + /* count pages */ + next_index = page->index; + list_for_each_entry_reverse(page, page_list, lru) { + if (page->index != next_index) + break; + nr_pages++; + next_index++; + } len = nr_pages << PAGE_CACHE_SHIFT; - rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, - offset, &len, - ci->i_truncate_seq, ci->i_truncate_size, - pages, nr_pages, 0); - if (rc == -ENOENT) - rc = 0; - if (rc < 0) - goto out; - - for (; !list_empty(page_list) && len > 0; - rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { - struct page *page = - list_entry(page_list->prev, struct page, lru); + dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, + off, len); + + req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), + off, &len, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, 0, + ci->i_truncate_seq, ci->i_truncate_size, + NULL, false, 1, 0); + if (!req) + return -ENOMEM; + /* build page vector */ + nr_pages = len >> PAGE_CACHE_SHIFT; + pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); + ret = -ENOMEM; + if (!pages) + goto out; + for (i = 0; i < nr_pages; ++i) { + page = list_entry(page_list->prev, struct page, lru); + BUG_ON(PageLocked(page)); list_del(&page->lru); - - if (rc < (int)PAGE_CACHE_SIZE) { - /* zero (remainder of) page */ - int s = rc < 0 ? 0 : rc; - zero_user_segment(page, s, PAGE_CACHE_SIZE); - } - - if (add_to_page_cache_lru(page, mapping, page->index, + + dout("start_read %p adding %p idx %lu\n", inode, page, + page->index); + if (add_to_page_cache_lru(page, &inode->i_data, page->index, GFP_NOFS)) { page_cache_release(page); - dout("readpages %p add_to_page_cache failed %p\n", + dout("start_read %p add_to_page_cache failed %p\n", inode, page); - continue; + nr_pages = i; + goto out_pages; } - dout("readpages %p adding %p idx %lu\n", inode, page, - page->index); - flush_dcache_page(page); - SetPageUptodate(page); - unlock_page(page); - page_cache_release(page); + pages[i] = page; } - rc = 0; + req->r_pages = pages; + req->r_num_pages = nr_pages; + req->r_callback = finish_read; + req->r_inode = inode; + + dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); + ret = ceph_osdc_start_request(osdc, req, false); + if (ret < 0) + goto out_pages; + ceph_osdc_put_request(req); + return nr_pages; -out: +out_pages: + ceph_release_page_vector(pages, nr_pages); kfree(pages); +out: + ceph_osdc_put_request(req); + return ret; +} + + +/* + * Read multiple pages. Leave pages we don't read + unlock in page_list; + * the caller (VM) cleans them up. + */ +static int ceph_readpages(struct file *file, struct address_space *mapping, + struct list_head *page_list, unsigned nr_pages) +{ + struct inode *inode = file->f_dentry->d_inode; + int rc = 0; + + dout("readpages %p file %p nr_pages %d\n", inode, file, nr_pages); + while (!list_empty(page_list)) { + rc = start_read(inode, page_list); + if (rc < 0) + goto out; + BUG_ON(rc == 0); + } +out: + dout("readpages %p file %p ret %d\n", inode, file, rc); return rc; } |