summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/ceph/messenger.c219
-rw-r--r--fs/ceph/messenger.h4
-rw-r--r--fs/ceph/osd_client.c249
-rw-r--r--fs/ceph/osd_client.h61
-rw-r--r--fs/ceph/pagelist.c2
-rw-r--r--fs/ceph/pagelist.h2
6 files changed, 436 insertions, 101 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76fcec..17a09b32a59 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
#include <net/tcp.h>
#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos =
- le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ if (m->pages)
+ con->out_msg_pos.page_pos =
+ le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ else
+ con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
return ret; /* done! */
}
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+ if (!bio) {
+ *iter = NULL;
+ *seg = 0;
+ return;
+ }
+ *iter = bio;
+ *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+ if (*bio_iter == NULL)
+ return;
+
+ BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+ (*seg)++;
+ if (*seg == (*bio_iter)->bi_vcnt)
+ init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
size_t len;
int crc = con->msgr->nocrc;
int ret;
+ int total_max_write;
+ int in_trail = 0;
+ size_t trail_len = (msg->trail ? msg->trail->length : 0);
dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);
- while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+#ifdef CONFIG_BLOCK
+ if (msg->bio && !msg->bio_iter)
+ init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+
+ while (data_len > con->out_msg_pos.data_pos) {
struct page *page = NULL;
void *kaddr = NULL;
+ int max_write = PAGE_SIZE;
+ int page_shift = 0;
+
+ total_max_write = data_len - trail_len -
+ con->out_msg_pos.data_pos;
/*
* if we are calculating the data crc (the default), we need
* to map the page. if our pages[] has been revoked, use the
* zero page.
*/
- if (msg->pages) {
+
+ /* have we reached the trail part of the data? */
+ if (con->out_msg_pos.data_pos >= data_len - trail_len) {
+ in_trail = 1;
+
+ total_max_write = data_len - con->out_msg_pos.data_pos;
+
+ page = list_first_entry(&msg->trail->head,
+ struct page, lru);
+ if (crc)
+ kaddr = kmap(page);
+ max_write = PAGE_SIZE;
+ } else if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
struct page, lru);
if (crc)
kaddr = kmap(page);
+#ifdef CONFIG_BLOCK
+ } else if (msg->bio) {
+ struct bio_vec *bv;
+
+ bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+ page = bv->bv_page;
+ page_shift = bv->bv_offset;
+ if (crc)
+ kaddr = kmap(page) + page_shift;
+ max_write = bv->bv_len;
+#endif
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
- len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
- (int)(data_len - con->out_msg_pos.data_pos));
+ len = min_t(int, max_write - con->out_msg_pos.page_pos,
+ total_max_write);
+
if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
}
-
ret = kernel_sendpage(con->sock, page,
- con->out_msg_pos.page_pos, len,
+ con->out_msg_pos.page_pos + page_shift,
+ len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && (msg->pages || msg->pagelist))
+ if (crc &&
+ (msg->pages || msg->pagelist || msg->bio || in_trail))
kunmap(page);
if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
con->out_msg_pos.did_page_crc = 0;
- if (msg->pagelist)
+ if (in_trail)
+ list_move_tail(&page->lru,
+ &msg->trail->head);
+ else if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+ else if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
}
}
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section,
unsigned int sec_len, u32 *crc)
{
- int left;
- int ret;
+ int ret, left;
BUG_ON(!section);
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con,
+ struct page **pages,
+ unsigned data_len, int datacrc)
+{
+ void *p;
+ int ret;
+ int left;
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ /* (page) data */
+ BUG_ON(pages == NULL);
+ p = kmap(pages[con->in_msg_pos.page]);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(pages[con->in_msg_pos.page]);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
+
+ return ret;
+}
+
+#ifdef CONFIG_BLOCK
+static int read_partial_message_bio(struct ceph_connection *con,
+ struct bio **bio_iter, int *bio_seg,
+ unsigned data_len, int datacrc)
+{
+ struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+ void *p;
+ int ret, left;
+
+ if (IS_ERR(bv))
+ return PTR_ERR(bv);
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+ p = kmap(bv->bv_page) + bv->bv_offset;
+
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(bv->bv_page);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == bv->bv_len) {
+ con->in_msg_pos.page_pos = 0;
+ iter_bio_next(bio_iter, bio_seg);
+ }
+
+ return ret;
+}
+#endif
+
/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
- void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;
con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ if (m->pages)
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ else
+ con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
+#ifdef CONFIG_BLOCK
+ if (m->bio && !m->bio_iter)
+ init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- BUG_ON(m->pages == NULL);
- p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(m->pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
+ if (m->pages) {
+ ret = read_partial_message_pages(con, m->pages,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+#ifdef CONFIG_BLOCK
+ } else if (m->bio) {
+
+ ret = read_partial_message_bio(con,
+ &m->bio_iter, &m->bio_seg,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+#endif
+ } else {
+ BUG_ON(1);
}
}
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
+ m->bio = NULL;
+ m->bio_iter = NULL;
+ m->bio_seg = 0;
+ m->trail = NULL;
dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
m->pagelist = NULL;
}
+ m->trail = NULL;
+
if (m->pool)
ceph_msgpool_put(m->pool, m);
else
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 76fbc957bc1..5a79450604e 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -82,6 +82,10 @@ struct ceph_msg {
struct ceph_pagelist *pagelist; /* instead of pages */
struct list_head list_head;
struct kref kref;
+ struct bio *bio; /* instead of pages/pagelist */
+ struct bio *bio_iter; /* bio iterator */
+ int bio_seg; /* current bio segment */
+ struct ceph_pagelist *trail; /* the trailing part of the data */
bool front_is_vmalloc;
bool more_to_follow;
bool needs_out_seq;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 2647dafd96f..c5d818e73ad 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
#include "super.h"
#include "osd_client.h"
#include "messenger.h"
#include "decode.h"
#include "auth.h"
+#include "pagelist.h"
#define OSD_OP_FRONT_LEN 4096
#define OSD_OPREPLY_FRONT_LEN 512
@@ -22,29 +26,50 @@ static int __kick_requests(struct ceph_osd_client *osdc,
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static int op_needs_trail(int op)
+{
+ switch (op) {
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ case CEPH_OSD_OP_CALL:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static int op_has_extent(int op)
+{
+ return (op == CEPH_OSD_OP_READ ||
+ op == CEPH_OSD_OP_WRITE);
+}
+
void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
- u64 off, u64 len, u64 *bno,
- struct ceph_osd_request *req)
+ u64 off, u64 *plen, u64 *bno,
+ struct ceph_osd_request *req,
+ struct ceph_osd_req_op *op)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- struct ceph_osd_op *op = (void *)(reqhead + 1);
- u64 orig_len = len;
+ u64 orig_len = *plen;
u64 objoff, objlen; /* extent in object */
reqhead->snapid = cpu_to_le64(snapid);
/* object extent? */
- ceph_calc_file_object_mapping(layout, off, &len, bno,
+ ceph_calc_file_object_mapping(layout, off, plen, bno,
&objoff, &objlen);
- if (len < orig_len)
+ if (*plen < orig_len)
dout(" skipping last %llu, final file extent %llu~%llu\n",
- orig_len - len, off, len);
+ orig_len - *plen, off, *plen);
- op->extent.offset = cpu_to_le64(objoff);
- op->extent.length = cpu_to_le64(objlen);
- req->r_num_pages = calc_pages_for(off, len);
+ if (op_has_extent(op->op)) {
+ op->extent.offset = objoff;
+ op->extent.length = objlen;
+ }
+ req->r_num_pages = calc_pages_for(off, *plen);
dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
*bno, objoff, objlen, req->r_num_pages);
@@ -80,11 +105,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
- struct ceph_osd_request *req)
+ struct ceph_osd_request *req,
+ struct ceph_osd_req_op *op)
{
u64 bno;
- ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
+ ceph_calc_raw_layout(osdc, layout, vino.snap, off,
+ plen, &bno, req, op);
sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
@@ -113,35 +140,64 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
+#ifdef CONFIG_BLOCK
+ if (req->r_bio)
+ bio_put(req->r_bio);
+#endif
ceph_put_snap_context(req->r_snapc);
+ if (req->r_trail) {
+ ceph_pagelist_release(req->r_trail);
+ kfree(req->r_trail);
+ }
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
kfree(req);
}
+static int op_needs_trail(int op)
+{
+ switch (op) {
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
+{
+ int i = 0;
+
+ if (needs_trail)
+ *needs_trail = 0;
+ while (ops[i].op) {
+ if (needs_trail && op_needs_trail(ops[i].op))
+ *needs_trail = 1;
+ i++;
+ }
+
+ return i;
+}
+
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int flags,
struct ceph_snap_context *snapc,
- int do_sync,
+ struct ceph_osd_req_op *ops,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages)
+ struct page **pages,
+ struct bio *bio)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- int num_op = 1 + do_sync;
- size_t msg_size = sizeof(struct ceph_osd_request_head) +
- num_op*sizeof(struct ceph_osd_op);
+ int needs_trail;
+ int num_op = get_num_ops(ops, &needs_trail);
+ size_t msg_size = sizeof(struct ceph_osd_request_head);
- if (use_mempool) {
- req = mempool_alloc(osdc->req_mempool, gfp_flags);
- memset(req, 0, sizeof(*req));
- } else {
- req = kzalloc(sizeof(*req), gfp_flags);
- }
- if (!req)
- return NULL;
+ msg_size += num_op*sizeof(struct ceph_osd_op);
if (use_mempool) {
req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_osdc = osdc;
req->r_mempool = use_mempool;
+
kref_init(&req->r_kref);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
@@ -174,6 +231,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
req->r_reply = msg;
+ /* allocate space for the trailing data */
+ if (needs_trail) {
+ req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
+ if (!req->r_trail) {
+ ceph_osdc_put_request(req);
+ return NULL;
+ }
+ ceph_pagelist_init(req->r_trail);
+ }
/* create request message; allow space for oid */
msg_size += 40;
if (snapc)
@@ -186,38 +252,87 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
ceph_osdc_put_request(req);
return NULL;
}
+
msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
memset(msg->front.iov_base, 0, msg->front.iov_len);
req->r_request = msg;
req->r_pages = pages;
+#ifdef CONFIG_BLOCK
+ if (bio) {
+ req->r_bio = bio;
+ bio_get(req->r_bio);
+ }
+#endif
return req;
}
+static void osd_req_encode_op(struct ceph_osd_request *req,
+ struct ceph_osd_op *dst,
+ struct ceph_osd_req_op *src)
+{
+ dst->op = cpu_to_le16(src->op);
+
+ switch (dst->op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ dst->extent.offset =
+ cpu_to_le64(src->extent.offset);
+ dst->extent.length =
+ cpu_to_le64(src->extent.length);
+ dst->extent.truncate_size =
+ cpu_to_le64(src->extent.truncate_size);
+ dst->extent.truncate_seq =
+ cpu_to_le32(src->extent.truncate_seq);
+ break;
+
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ BUG_ON(!req->r_trail);
+
+ dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
+ dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
+ dst->xattr.cmp_op = src->xattr.cmp_op;
+ dst->xattr.cmp_mode = src->xattr.cmp_mode;
+ ceph_pagelist_append(req->r_trail, src->xattr.name,
+ src->xattr.name_len);
+ ceph_pagelist_append(req->r_trail, src->xattr.val,
+ src->xattr.value_len);
+ break;
+ case CEPH_OSD_OP_STARTSYNC:
+ break;
+ default:
+ pr_err("unrecognized osd opcode %d\n", dst->op);
+ WARN_ON(1);
+ break;
+ }
+ dst->payload_len = cpu_to_le32(src->payload_len);
+}
+
/*
* build new request AND message
*
*/
void ceph_osdc_build_request(struct ceph_osd_request *req,
- u64 off, u64 *plen,
- int opcode,
- struct ceph_snap_context *snapc,
- int do_sync,
- u32 truncate_seq,
- u64 truncate_size,
- struct timespec *mtime,
- const char *oid,
- int oid_len)
+ u64 off, u64 *plen,
+ struct ceph_osd_req_op *src_ops,
+ struct ceph_snap_context *snapc,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len)
{
struct ceph_msg *msg = req->r_request;
struct ceph_osd_request_head *head;
+ struct ceph_osd_req_op *src_op;
struct ceph_osd_op *op;
void *p;
- int num_op = 1 + do_sync;
+ int num_op = get_num_ops(src_ops, NULL);
size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int i;
int flags = req->r_flags;
+ u64 data_len = 0;
+ int i;
head = msg->front.iov_base;
op = (void *)(head + 1);
@@ -230,25 +345,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
if (flags & CEPH_OSD_FLAG_WRITE)
ceph_encode_timespec(&head->mtime, mtime);
head->num_ops = cpu_to_le16(num_op);
- op->op = cpu_to_le16(opcode);
- if (flags & CEPH_OSD_FLAG_WRITE) {
- req->r_request->hdr.data_off = cpu_to_le16(off);
- req->r_request->hdr.data_len = cpu_to_le32(*plen);
- op->payload_len = cpu_to_le32(*plen);
- }
- op->extent.truncate_size = cpu_to_le64(truncate_size);
- op->extent.truncate_seq = cpu_to_le32(truncate_seq);
/* fill in oid */
head->object_len = cpu_to_le32(oid_len);
memcpy(p, oid, oid_len);
p += oid_len;
- if (do_sync) {
+ src_op = src_ops;
+ while (src_op->op) {
+ osd_req_encode_op(req, op, src_op);
+ src_op++;
op++;
- op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
}
+
+ if (req->r_trail)
+ data_len += req->r_trail->length;
+
if (snapc) {
head->snap_seq = cpu_to_le64(snapc->seq);
head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +371,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
}
}
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ req->r_request->hdr.data_off = cpu_to_le16(off);
+ req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+ } else if (data_len) {
+ req->r_request->hdr.data_off = 0;
+ req->r_request->hdr.data_len = cpu_to_le32(data_len);
+ }
+
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
msg_size = p - msg->front.iov_base;
msg->front.iov_len = msg_size;
@@ -288,21 +409,34 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct timespec *mtime,
bool use_mempool, int num_reply)
{
- struct ceph_osd_request *req =
- ceph_osdc_alloc_request(osdc, flags,
- snapc, do_sync,
+ struct ceph_osd_req_op ops[3];
+ struct ceph_osd_request *req;
+
+ ops[0].op = opcode;
+ ops[0].extent.truncate_seq = truncate_seq;
+ ops[0].extent.truncate_size = truncate_size;
+ ops[0].payload_len = 0;
+
+ if (do_sync) {
+ ops[1].op = CEPH_OSD_OP_STARTSYNC;
+ ops[1].payload_len = 0;
+ ops[2].op = 0;
+ } else
+ ops[1].op = 0;
+
+ req = ceph_osdc_alloc_request(osdc, flags,
+ snapc, ops,
use_mempool,
- GFP_NOFS, NULL);
+ GFP_NOFS, NULL, NULL);
if (IS_ERR(req))
return req;
/* calculate max write size */
- calc_layout(osdc, vino, layout, off, plen, req);
+ calc_layout(osdc, vino, layout, off, plen, req, ops);
req->r_file_layout = *layout; /* keep a copy */
- ceph_osdc_build_request(req, off, plen, opcode,
- snapc, do_sync,
- truncate_seq, truncate_size,
+ ceph_osdc_build_request(req, off, plen, ops,
+ snapc,
mtime,
req->r_oid, req->r_oid_len);
@@ -1177,6 +1311,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
req->r_request->pages = req->r_pages;
req->r_request->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+ req->r_request->bio = req->r_bio;
+#endif
+ req->r_request->trail = req->r_trail;
register_request(osdc, req);
@@ -1493,6 +1631,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
}
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
+#ifdef CONFIG_BLOCK
+ m->bio = req->r_bio;
+#endif
}
*skip = 0;
req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b687c2ea72e..d583d1bf6cd 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -15,6 +15,7 @@ struct ceph_snap_context;
struct ceph_osd_request;
struct ceph_osd_client;
struct ceph_authorizer;
+struct ceph_pagelist;
/*
* completion callback for async writepages
@@ -80,6 +81,11 @@ struct ceph_osd_request {
struct page **r_pages; /* pages for data payload */
int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */
+#ifdef CONFIG_BLOCK
+ struct bio *r_bio; /* instead of pages */
+#endif
+
+ struct ceph_pagelist *r_trail; /* trailing part of the data */
};
struct ceph_osd_client {
@@ -110,6 +116,36 @@ struct ceph_osd_client {
struct ceph_msgpool msgpool_op_reply;
};
+struct ceph_osd_req_op {
+ u16 op; /* CEPH_OSD_OP_* */
+ u32 flags; /* CEPH_OSD_FLAG_* */
+ union {
+ struct {
+ u64 offset, length;
+ u64 truncate_size;
+ u32 truncate_seq;
+ } extent;
+ struct {
+ const char *name;
+ u32 name_len;
+ const char *val;
+ u32 value_len;
+ __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
+ __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
+ } xattr;
+ struct {
+ __u8 class_len;
+ __u8 method_len;
+ __u8 argc;
+ u32 indata_len;
+ } cls;
+ struct {
+ u64 cookie, count;
+ } pgls;
+ };
+ u32 payload_len;
+};
+
extern int ceph_osdc_init(struct ceph_osd_client *osdc,
struct ceph_client *client);
extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
@@ -122,27 +158,26 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 snapid,
- u64 off, u64 len, u64 *bno,
- struct ceph_osd_request *req);
+ u64 off, u64 *plen, u64 *bno,
+ struct ceph_osd_request *req,
+ struct ceph_osd_req_op *op);
extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int flags,
struct ceph_snap_context *snapc,
- int do_sync,
+ struct ceph_osd_req_op *ops,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages);
+ struct page **pages,
+ struct bio *bio);
extern void ceph_osdc_build_request(struct ceph_osd_request *req,
- u64 off, u64 *plen,
- int opcode,
- struct ceph_snap_context *snapc,
- int do_sync,
- u32 truncate_seq,
- u64 truncate_size,
- struct timespec *mtime,
- const char *oid,
- int oid_len);
+ u64 off, u64 *plen,
+ struct ceph_osd_req_op *src_ops,
+ struct ceph_snap_context *snapc,
+ struct timespec *mtime,
+ const char *oid,
+ int oid_len);
extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout,
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index 46a368b6dce..326e1c04176 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -39,7 +39,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
return 0;
}
-int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len)
+int ceph_pagelist_append(struct ceph_pagelist *pl, const void *buf, size_t len)
{
while (pl->room < len) {
size_t bit = pl->room;
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
index e8a4187e108..cc9327aa1c9 100644
--- a/fs/ceph/pagelist.h
+++ b/fs/ceph/pagelist.h
@@ -19,7 +19,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
}
extern int ceph_pagelist_release(struct ceph_pagelist *pl);
-extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l);
+extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
{