summaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c669
1 files changed, 358 insertions, 311 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 780caf6b049..d730dd4d8eb 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -23,7 +23,7 @@
static const struct ceph_connection_operations osd_con_ops;
-static void send_queued(struct ceph_osd_client *osdc);
+static void __send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
@@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
-static int op_needs_trail(int op)
-{
- switch (op) {
- case CEPH_OSD_OP_GETXATTR:
- case CEPH_OSD_OP_SETXATTR:
- case CEPH_OSD_OP_CMPXATTR:
- case CEPH_OSD_OP_CALL:
- case CEPH_OSD_OP_NOTIFY:
- return 1;
- default:
- return 0;
- }
-}
-
static int op_has_extent(int op)
{
return (op == CEPH_OSD_OP_READ ||
op == CEPH_OSD_OP_WRITE);
}
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
- struct ceph_file_layout *layout,
- u64 snapid,
- u64 off, u64 *plen, u64 *bno,
- struct ceph_osd_request *req,
- struct ceph_osd_req_op *op)
-{
- struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- u64 orig_len = *plen;
- u64 objoff, objlen; /* extent in object */
- int r;
-
- reqhead->snapid = cpu_to_le64(snapid);
-
- /* object extent? */
- r = ceph_calc_file_object_mapping(layout, off, plen, bno,
- &objoff, &objlen);
- if (r < 0)
- return r;
- if (*plen < orig_len)
- dout(" skipping last %llu, final file extent %llu~%llu\n",
- orig_len - *plen, off, *plen);
-
- if (op_has_extent(op->op)) {
- op->extent.offset = objoff;
- op->extent.length = objlen;
- }
- req->r_num_pages = calc_pages_for(off, *plen);
- req->r_page_alignment = off & ~PAGE_MASK;
- if (op->op == CEPH_OSD_OP_WRITE)
- op->payload_len = *plen;
-
- dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
- *bno, objoff, objlen, req->r_num_pages);
- return 0;
-}
-EXPORT_SYMBOL(ceph_calc_raw_layout);
-
/*
* Implement client access to distributed object storage cluster.
*
@@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
*
* fill osd op in request message.
*/
-static int calc_layout(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
+static int calc_layout(struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
struct ceph_osd_request *req,
struct ceph_osd_req_op *op)
{
- u64 bno;
+ u64 orig_len = *plen;
+ u64 bno = 0;
+ u64 objoff = 0;
+ u64 objlen = 0;
int r;
- r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
- plen, &bno, req, op);
+ /* object extent? */
+ r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
+ &objoff, &objlen);
if (r < 0)
return r;
+ if (objlen < orig_len) {
+ *plen = objlen;
+ dout(" skipping last %llu, final file extent %llu~%llu\n",
+ orig_len - *plen, off, *plen);
+ }
+
+ if (op_has_extent(op->op)) {
+ u32 osize = le32_to_cpu(layout->fl_object_size);
+ op->extent.offset = objoff;
+ op->extent.length = objlen;
+ if (op->extent.truncate_size <= off - objoff) {
+ op->extent.truncate_size = 0;
+ } else {
+ op->extent.truncate_size -= off - objoff;
+ if (op->extent.truncate_size > osize)
+ op->extent.truncate_size = osize;
+ }
+ }
+ req->r_num_pages = calc_pages_for(off, *plen);
+ req->r_page_alignment = off & ~PAGE_MASK;
+ if (op->op == CEPH_OSD_OP_WRITE)
+ op->payload_len = *plen;
+
+ dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
+ bno, objoff, objlen, req->r_num_pages);
snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
req->r_oid_len = strlen(req->r_oid);
@@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_request)
ceph_msg_put(req->r_request);
if (req->r_con_filling_msg) {
- dout("%s revoking pages %p from con %p\n", __func__,
- req->r_pages, req->r_con_filling_msg);
+ dout("%s revoking msg %p from con %p\n", __func__,
+ req->r_reply, req->r_con_filling_msg);
ceph_msg_revoke_incoming(req->r_reply);
req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
+ req->r_con_filling_msg = NULL;
}
if (req->r_reply)
ceph_msg_put(req->r_reply);
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
-#ifdef CONFIG_BLOCK
- if (req->r_bio)
- bio_put(req->r_bio);
-#endif
ceph_put_snap_context(req->r_snapc);
- if (req->r_trail) {
- ceph_pagelist_release(req->r_trail);
- kfree(req->r_trail);
- }
+ ceph_pagelist_release(&req->r_trail);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
@@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref)
}
EXPORT_SYMBOL(ceph_osdc_release_request);
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
- int i = 0;
-
- if (needs_trail)
- *needs_trail = 0;
- while (ops[i].op) {
- if (needs_trail && op_needs_trail(ops[i].op))
- *needs_trail = 1;
- i++;
- }
-
- return i;
-}
-
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
- int flags,
struct ceph_snap_context *snapc,
- struct ceph_osd_req_op *ops,
+ unsigned int num_ops,
bool use_mempool,
- gfp_t gfp_flags,
- struct page **pages,
- struct bio *bio)
+ gfp_t gfp_flags)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- int needs_trail;
- int num_op = get_num_ops(ops, &needs_trail);
- size_t msg_size = sizeof(struct ceph_osd_request_head);
-
- msg_size += num_op*sizeof(struct ceph_osd_op);
+ size_t msg_size;
+
+ msg_size = 4 + 4 + 8 + 8 + 4+8;
+ msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+ msg_size += 1 + 8 + 4 + 4; /* pg_t */
+ msg_size += 4 + MAX_OBJ_NAME_SIZE;
+ msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
+ msg_size += 8; /* snapid */
+ msg_size += 8; /* snap_seq */
+ msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
+ msg_size += 4;
if (use_mempool) {
req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item);
- req->r_flags = flags;
-
- WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
/* create reply message */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
req->r_reply = msg;
- /* allocate space for the trailing data */
- if (needs_trail) {
- req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
- if (!req->r_trail) {
- ceph_osdc_put_request(req);
- return NULL;
- }
- ceph_pagelist_init(req->r_trail);
- }
+ ceph_pagelist_init(&req->r_trail);
/* create request message; allow space for oid */
- msg_size += MAX_OBJ_NAME_SIZE;
- if (snapc)
- msg_size += sizeof(u64) * snapc->num_snaps;
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else
@@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
memset(msg->front.iov_base, 0, msg->front.iov_len);
req->r_request = msg;
- req->r_pages = pages;
-#ifdef CONFIG_BLOCK
- if (bio) {
- req->r_bio = bio;
- bio_get(req->r_bio);
- }
-#endif
return req;
}
@@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
dst->op = cpu_to_le16(src->op);
switch (src->op) {
+ case CEPH_OSD_OP_STAT:
+ break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
dst->extent.offset =
@@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
break;
-
- case CEPH_OSD_OP_GETXATTR:
- case CEPH_OSD_OP_SETXATTR:
- case CEPH_OSD_OP_CMPXATTR:
- BUG_ON(!req->r_trail);
-
- dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
- dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
- dst->xattr.cmp_op = src->xattr.cmp_op;
- dst->xattr.cmp_mode = src->xattr.cmp_mode;
- ceph_pagelist_append(req->r_trail, src->xattr.name,
- src->xattr.name_len);
- ceph_pagelist_append(req->r_trail, src->xattr.val,
- src->xattr.value_len);
- break;
case CEPH_OSD_OP_CALL:
- BUG_ON(!req->r_trail);
-
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
- ceph_pagelist_append(req->r_trail, src->cls.class_name,
+ ceph_pagelist_append(&req->r_trail, src->cls.class_name,
src->cls.class_len);
- ceph_pagelist_append(req->r_trail, src->cls.method_name,
+ ceph_pagelist_append(&req->r_trail, src->cls.method_name,
src->cls.method_len);
- ceph_pagelist_append(req->r_trail, src->cls.indata,
+ ceph_pagelist_append(&req->r_trail, src->cls.indata,
src->cls.indata_len);
break;
- case CEPH_OSD_OP_ROLLBACK:
- dst->snap.snapid = cpu_to_le64(src->snap.snapid);
- break;
case CEPH_OSD_OP_STARTSYNC:
break;
- case CEPH_OSD_OP_NOTIFY:
- {
- __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
- __le32 timeout = cpu_to_le32(src->watch.timeout);
-
- BUG_ON(!req->r_trail);
-
- ceph_pagelist_append(req->r_trail,
- &prot_ver, sizeof(prot_ver));
- ceph_pagelist_append(req->r_trail,
- &timeout, sizeof(timeout));
- }
case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_WATCH:
dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
pr_err("unrecognized osd opcode %d\n", dst->op);
WARN_ON(1);
break;
+ case CEPH_OSD_OP_MAPEXT:
+ case CEPH_OSD_OP_MASKTRUNC:
+ case CEPH_OSD_OP_SPARSE_READ:
+ case CEPH_OSD_OP_NOTIFY:
+ case CEPH_OSD_OP_ASSERT_VER:
+ case CEPH_OSD_OP_WRITEFULL:
+ case CEPH_OSD_OP_TRUNCATE:
+ case CEPH_OSD_OP_ZERO:
+ case CEPH_OSD_OP_DELETE:
+ case CEPH_OSD_OP_APPEND:
+ case CEPH_OSD_OP_SETTRUNC:
+ case CEPH_OSD_OP_TRIMTRUNC:
+ case CEPH_OSD_OP_TMAPUP:
+ case CEPH_OSD_OP_TMAPPUT:
+ case CEPH_OSD_OP_TMAPGET:
+ case CEPH_OSD_OP_CREATE:
+ case CEPH_OSD_OP_ROLLBACK:
+ case CEPH_OSD_OP_OMAPGETKEYS:
+ case CEPH_OSD_OP_OMAPGETVALS:
+ case CEPH_OSD_OP_OMAPGETHEADER:
+ case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+ case CEPH_OSD_OP_MODE_RD:
+ case CEPH_OSD_OP_OMAPSETVALS:
+ case CEPH_OSD_OP_OMAPSETHEADER:
+ case CEPH_OSD_OP_OMAPCLEAR:
+ case CEPH_OSD_OP_OMAPRMKEYS:
+ case CEPH_OSD_OP_OMAP_CMP:
+ case CEPH_OSD_OP_CLONERANGE:
+ case CEPH_OSD_OP_ASSERT_SRC_VERSION:
+ case CEPH_OSD_OP_SRC_CMPXATTR:
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_GETXATTRS:
+ case CEPH_OSD_OP_CMPXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_SETXATTRS:
+ case CEPH_OSD_OP_RESETXATTRS:
+ case CEPH_OSD_OP_RMXATTR:
+ case CEPH_OSD_OP_PULL:
+ case CEPH_OSD_OP_PUSH:
+ case CEPH_OSD_OP_BALANCEREADS:
+ case CEPH_OSD_OP_UNBALANCEREADS:
+ case CEPH_OSD_OP_SCRUB:
+ case CEPH_OSD_OP_SCRUB_RESERVE:
+ case CEPH_OSD_OP_SCRUB_UNRESERVE:
+ case CEPH_OSD_OP_SCRUB_STOP:
+ case CEPH_OSD_OP_SCRUB_MAP:
+ case CEPH_OSD_OP_WRLOCK:
+ case CEPH_OSD_OP_WRUNLOCK:
+ case CEPH_OSD_OP_RDLOCK:
+ case CEPH_OSD_OP_RDUNLOCK:
+ case CEPH_OSD_OP_UPLOCK:
+ case CEPH_OSD_OP_DNLOCK:
+ case CEPH_OSD_OP_PGLS:
+ case CEPH_OSD_OP_PGLS_FILTER:
+ pr_err("unsupported osd opcode %s\n",
+ ceph_osd_op_name(dst->op));
+ WARN_ON(1);
+ break;
}
dst->payload_len = cpu_to_le32(src->payload_len);
}
@@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
*
*/
void ceph_osdc_build_request(struct ceph_osd_request *req,
- u64 off, u64 *plen,
+ u64 off, u64 len, unsigned int num_ops,
struct ceph_osd_req_op *src_ops,
- struct ceph_snap_context *snapc,
- struct timespec *mtime,
- const char *oid,
- int oid_len)
+ struct ceph_snap_context *snapc, u64 snap_id,
+ struct timespec *mtime)
{
struct ceph_msg *msg = req->r_request;
- struct ceph_osd_request_head *head;
struct ceph_osd_req_op *src_op;
- struct ceph_osd_op *op;
void *p;
- int num_op = get_num_ops(src_ops, NULL);
- size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+ size_t msg_size;
int flags = req->r_flags;
- u64 data_len = 0;
+ u64 data_len;
int i;
- head = msg->front.iov_base;
- op = (void *)(head + 1);
- p = (void *)(op + num_op);
-
+ req->r_num_ops = num_ops;
+ req->r_snapid = snap_id;
req->r_snapc = ceph_get_snap_context(snapc);
- head->client_inc = cpu_to_le32(1); /* always, for now. */
- head->flags = cpu_to_le32(flags);
- if (flags & CEPH_OSD_FLAG_WRITE)
- ceph_encode_timespec(&head->mtime, mtime);
- head->num_ops = cpu_to_le16(num_op);
-
-
- /* fill in oid */
- head->object_len = cpu_to_le32(oid_len);
- memcpy(p, oid, oid_len);
- p += oid_len;
+ /* encode request */
+ msg->hdr.version = cpu_to_le16(4);
+ p = msg->front.iov_base;
+ ceph_encode_32(&p, 1); /* client_inc is always 1 */
+ req->r_request_osdmap_epoch = p;
+ p += 4;
+ req->r_request_flags = p;
+ p += 4;
+ if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+ ceph_encode_timespec(p, mtime);
+ p += sizeof(struct ceph_timespec);
+ req->r_request_reassert_version = p;
+ p += sizeof(struct ceph_eversion); /* will get filled in */
+
+ /* oloc */
+ ceph_encode_8(&p, 4);
+ ceph_encode_8(&p, 4);
+ ceph_encode_32(&p, 8 + 4 + 4);
+ req->r_request_pool = p;
+ p += 8;
+ ceph_encode_32(&p, -1); /* preferred */
+ ceph_encode_32(&p, 0); /* key len */
+
+ ceph_encode_8(&p, 1);
+ req->r_request_pgid = p;
+ p += 8 + 4;
+ ceph_encode_32(&p, -1); /* preferred */
+
+ /* oid */
+ ceph_encode_32(&p, req->r_oid_len);
+ memcpy(p, req->r_oid, req->r_oid_len);
+ dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+ p += req->r_oid_len;
+
+ /* ops */
+ ceph_encode_16(&p, num_ops);
src_op = src_ops;
- while (src_op->op) {
- osd_req_encode_op(req, op, src_op);
- src_op++;
- op++;
+ req->r_request_ops = p;
+ for (i = 0; i < num_ops; i++, src_op++) {
+ osd_req_encode_op(req, p, src_op);
+ p += sizeof(struct ceph_osd_op);
}
- if (req->r_trail)
- data_len += req->r_trail->length;
-
- if (snapc) {
- head->snap_seq = cpu_to_le64(snapc->seq);
- head->num_snaps = cpu_to_le32(snapc->num_snaps);
+ /* snaps */
+ ceph_encode_64(&p, req->r_snapid);
+ ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+ ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+ if (req->r_snapc) {
for (i = 0; i < snapc->num_snaps; i++) {
- put_unaligned_le64(snapc->snaps[i], p);
- p += sizeof(u64);
+ ceph_encode_64(&p, req->r_snapc->snaps[i]);
}
}
+ req->r_request_attempts = p;
+ p += 4;
+
+ data_len = req->r_trail.length;
if (flags & CEPH_OSD_FLAG_WRITE) {
req->r_request->hdr.data_off = cpu_to_le16(off);
- req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
- } else if (data_len) {
- req->r_request->hdr.data_off = 0;
- req->r_request->hdr.data_len = cpu_to_le32(data_len);
+ data_len += len;
}
-
+ req->r_request->hdr.data_len = cpu_to_le32(data_len);
req->r_request->page_alignment = req->r_page_alignment;
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
msg_size = p - msg->front.iov_base;
msg->front.iov_len = msg_size;
msg->hdr.front_len = cpu_to_le32(msg_size);
+
+ dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
+ num_ops);
return;
}
EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u32 truncate_seq,
u64 truncate_size,
struct timespec *mtime,
- bool use_mempool, int num_reply,
+ bool use_mempool,
int page_align)
{
- struct ceph_osd_req_op ops[3];
+ struct ceph_osd_req_op ops[2];
struct ceph_osd_request *req;
+ unsigned int num_op = 1;
int r;
+ memset(&ops, 0, sizeof ops);
+
ops[0].op = opcode;
ops[0].extent.truncate_seq = truncate_seq;
ops[0].extent.truncate_size = truncate_size;
- ops[0].payload_len = 0;
if (do_sync) {
ops[1].op = CEPH_OSD_OP_STARTSYNC;
- ops[1].payload_len = 0;
- ops[2].op = 0;
- } else
- ops[1].op = 0;
-
- req = ceph_osdc_alloc_request(osdc, flags,
- snapc, ops,
- use_mempool,
- GFP_NOFS, NULL, NULL);
+ num_op++;
+ }
+
+ req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+ GFP_NOFS);
if (!req)
return ERR_PTR(-ENOMEM);
+ req->r_flags = flags;
/* calculate max write size */
- r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+ r = calc_layout(vino, layout, off, plen, req, ops);
if (r < 0)
return ERR_PTR(r);
req->r_file_layout = *layout; /* keep a copy */
@@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
req->r_num_pages = calc_pages_for(page_align, *plen);
req->r_page_alignment = page_align;
- ceph_osdc_build_request(req, off, plen, ops,
- snapc,
- mtime,
- req->r_oid, req->r_oid_len);
+ ceph_osdc_build_request(req, off, *plen, num_op, ops,
+ snapc, vino.snap, mtime);
return req;
}
@@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con)
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
__kick_osd_requests(osdc, osd);
+ __send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
- send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
*/
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
- struct ceph_osd_request *req;
- int ret = 0;
+ struct ceph_entity_addr *peer_addr;
dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd);
- ret = -ENODEV;
- } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
- &osd->o_con.peer_addr,
- sizeof(osd->o_con.peer_addr)) == 0 &&
- !ceph_con_opened(&osd->o_con)) {
+
+ return -ENODEV;
+ }
+
+ peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+ if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+ !ceph_con_opened(&osd->o_con)) {
+ struct ceph_osd_request *req;
+
dout(" osd addr hasn't changed and connection never opened,"
" letting msgr retry");
/* touch each r_stamp for handle_timeout()'s benfit */
list_for_each_entry(req, &osd->o_requests, r_osd_item)
req->r_stamp = jiffies;
- ret = -EAGAIN;
- } else {
- ceph_con_close(&osd->o_con);
- ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
- &osdc->osdmap->osd_addr[osd->o_osd]);
- osd->o_incarnation++;
+
+ return -EAGAIN;
}
- return ret;
+
+ ceph_con_close(&osd->o_con);
+ ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+ osd->o_incarnation++;
+
+ return 0;
}
static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req, int force_resend)
{
- struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
struct ceph_pg pgid;
int acting[CEPH_PG_MAX_SIZE];
int o = -1, num = 0;
int err;
dout("map_request %p tid %lld\n", req, req->r_tid);
- err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
+ err = ceph_calc_object_layout(&pgid, req->r_oid,
&req->r_file_layout, osdc->osdmap);
if (err) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err;
}
- pgid = reqhead->layout.ol_pgid;
req->r_pgid = pgid;
err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
@@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc,
(req->r_osd == NULL && o == -1))
return 0; /* no change */
- dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
- req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
+ dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
+ req->r_tid, pgid.pool, pgid.seed, o,
req->r_osd ? req->r_osd->o_osd : -1);
/* record full pg acting set */
@@ -1041,15 +1024,22 @@ out:
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
- struct ceph_osd_request_head *reqhead;
-
- dout("send_request %p tid %llu to osd%d flags %d\n",
- req, req->r_tid, req->r_osd->o_osd, req->r_flags);
+ void *p;
- reqhead = req->r_request->front.iov_base;
- reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
- reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
- reqhead->reassert_version = req->r_reassert_version;
+ dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
+ req, req->r_tid, req->r_osd->o_osd, req->r_flags,
+ (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
+
+ /* fill in message content that changes each time we send it */
+ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
+ put_unaligned_le32(req->r_flags, req->r_request_flags);
+ put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+ p = req->r_request_pgid;
+ ceph_encode_64(&p, req->r_pgid.pool);
+ ceph_encode_32(&p, req->r_pgid.seed);
+ put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
+ memcpy(req->r_request_reassert_version, &req->r_reassert_version,
+ sizeof(req->r_reassert_version));
req->r_stamp = jiffies;
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc,
/*
* Send any requests in the queue (req_unsent).
*/
-static void send_queued(struct ceph_osd_client *osdc)
+static void __send_queued(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req, *tmp;
- dout("send_queued\n");
- mutex_lock(&osdc->request_mutex);
- list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+ dout("__send_queued\n");
+ list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
__send_request(osdc, req);
- }
- mutex_unlock(&osdc->request_mutex);
}
/*
@@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work)
}
__schedule_osd_timeout(osdc);
+ __send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
- send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
complete_all(&req->r_safe_completion); /* fsync waiter */
}
+static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
+{
+ __u8 v;
+
+ ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
+ v = ceph_decode_8(p);
+ if (v > 1) {
+ pr_warning("do not understand pg encoding %d > 1", v);
+ return -EINVAL;
+ }
+ pgid->pool = ceph_decode_64(p);
+ pgid->seed = ceph_decode_32(p);
+ *p += 4;
+ return 0;
+
+bad:
+ pr_warning("incomplete pg encoding");
+ return -EINVAL;
+}
+
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
@@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
struct ceph_connection *con)
{
- struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+ void *p, *end;
struct ceph_osd_request *req;
u64 tid;
- int numops, object_len, flags;
+ int object_len;
+ int numops, payload_len, flags;
s32 result;
+ s32 retry_attempt;
+ struct ceph_pg pg;
+ int err;
+ u32 reassert_epoch;
+ u64 reassert_version;
+ u32 osdmap_epoch;
+ int i;
tid = le64_to_cpu(msg->hdr.tid);
- if (msg->front.iov_len < sizeof(*rhead))
- goto bad;
- numops = le32_to_cpu(rhead->num_ops);
- object_len = le32_to_cpu(rhead->object_len);
- result = le32_to_cpu(rhead->result);
- if (msg->front.iov_len != sizeof(*rhead) + object_len +
- numops * sizeof(struct ceph_osd_op))
+ dout("handle_reply %p tid %llu\n", msg, tid);
+
+ p = msg->front.iov_base;
+ end = p + msg->front.iov_len;
+
+ ceph_decode_need(&p, end, 4, bad);
+ object_len = ceph_decode_32(&p);
+ ceph_decode_need(&p, end, object_len, bad);
+ p += object_len;
+
+ err = __decode_pgid(&p, end, &pg);
+ if (err)
goto bad;
- dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
+
+ ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
+ flags = ceph_decode_64(&p);
+ result = ceph_decode_32(&p);
+ reassert_epoch = ceph_decode_32(&p);
+ reassert_version = ceph_decode_64(&p);
+ osdmap_epoch = ceph_decode_32(&p);
+
/* lookup */
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
@@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
return;
}
ceph_osdc_get_request(req);
- flags = le32_to_cpu(rhead->flags);
+
+ dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
+ req, result);
+
+ ceph_decode_need(&p, end, 4, bad);
+ numops = ceph_decode_32(&p);
+ if (numops > CEPH_OSD_MAX_OP)
+ goto bad_put;
+ if (numops != req->r_num_ops)
+ goto bad_put;
+ payload_len = 0;
+ ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+ for (i = 0; i < numops; i++) {
+ struct ceph_osd_op *op = p;
+ int len;
+
+ len = le32_to_cpu(op->payload_len);
+ req->r_reply_op_len[i] = len;
+ dout(" op %d has %d bytes\n", i, len);
+ payload_len += len;
+ p += sizeof(*op);
+ }
+ if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+ pr_warning("sum of op payload lens %d != data_len %d",
+ payload_len, le32_to_cpu(msg->hdr.data_len));
+ goto bad_put;
+ }
+
+ ceph_decode_need(&p, end, 4 + numops * 4, bad);
+ retry_attempt = ceph_decode_32(&p);
+ for (i = 0; i < numops; i++)
+ req->r_reply_op_result[i] = ceph_decode_32(&p);
/*
* if this connection filled our message, drop our reference now, to
@@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
if (!req->r_got_reply) {
unsigned int bytes;
- req->r_result = le32_to_cpu(rhead->result);
+ req->r_result = result;
bytes = le32_to_cpu(msg->hdr.data_len);
dout("handle_reply result %d bytes %d\n", req->r_result,
bytes);
@@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
req->r_result = bytes;
/* in case this is a write and we need to replay, */
- req->r_reassert_version = rhead->reassert_version;
+ req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
+ req->r_reassert_version.version = cpu_to_le64(reassert_version);
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1242,10 +1301,11 @@ done:
ceph_osdc_put_request(req);
return;
+bad_put:
+ ceph_osdc_put_request(req);
bad:
- pr_err("corrupt osd_op_reply got %d %d expected %d\n",
- (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
- (int)sizeof(*rhead));
+ pr_err("corrupt osd_op_reply got %d %d\n",
+ (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
ceph_msg_dump(msg);
}
@@ -1270,7 +1330,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
* Requeue requests whose mapping to an OSD has changed. If requests map to
* no osd, request a new map.
*
- * Caller should hold map_sem for read and request_mutex.
+ * Caller should hold map_sem for read.
*/
static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
{
@@ -1284,6 +1344,24 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
for (p = rb_first(&osdc->requests); p; ) {
req = rb_entry(p, struct ceph_osd_request, r_node);
p = rb_next(p);
+
+ /*
+ * For linger requests that have not yet been
+ * registered, move them to the linger list; they'll
+ * be sent to the osd in the loop below. Unregister
+ * the request before re-registering it as a linger
+ * request to ensure the __map_request() below
+ * will decide it needs to be sent.
+ */
+ if (req->r_linger && list_empty(&req->r_linger_item)) {
+ dout("%p tid %llu restart on osd%d\n",
+ req, req->r_tid,
+ req->r_osd ? req->r_osd->o_osd : -1);
+ __unregister_request(osdc, req);
+ __register_linger_request(osdc, req);
+ continue;
+ }
+
err = __map_request(osdc, req, force_resend);
if (err < 0)
continue; /* error */
@@ -1298,17 +1376,6 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
}
}
- if (req->r_linger && list_empty(&req->r_linger_item)) {
- /*
- * register as a linger so that we will
- * re-submit below and get a new tid
- */
- dout("%p tid %llu restart on osd%d\n",
- req, req->r_tid,
- req->r_osd ? req->r_osd->o_osd : -1);
- __register_linger_request(osdc, req);
- __unregister_request(osdc, req);
- }
}
list_for_each_entry_safe(req, nreq, &osdc->req_linger,
@@ -1316,6 +1383,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
err = __map_request(osdc, req, force_resend);
+ dout("__map_request returned %d\n", err);
if (err == 0)
continue; /* no change and no osd was specified */
if (err < 0)
@@ -1337,6 +1405,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
+ reset_changed_osds(osdc);
}
@@ -1393,7 +1462,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap = newmap;
}
kick_requests(osdc, 0);
- reset_changed_osds(osdc);
} else {
dout("ignoring incremental map %u len %d\n",
epoch, maplen);
@@ -1454,7 +1522,9 @@ done:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
ceph_monc_request_next_osdmap(&osdc->client->monc);
- send_queued(osdc);
+ mutex_lock(&osdc->request_mutex);
+ __send_queued(osdc);
+ mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq);
return;
@@ -1548,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event)
int ceph_osdc_create_event(struct ceph_osd_client *osdc,
void (*event_cb)(u64, u64, u8, void *),
- int one_shot, void *data,
- struct ceph_osd_event **pevent)
+ void *data, struct ceph_osd_event **pevent)
{
struct ceph_osd_event *event;
@@ -1559,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
dout("create_event %p\n", event);
event->cb = event_cb;
- event->one_shot = one_shot;
+ event->one_shot = 0;
event->data = data;
event->osdc = osdc;
INIT_LIST_HEAD(&event->osd_node);
RB_CLEAR_NODE(&event->node);
kref_init(&event->kref); /* one ref for us */
kref_get(&event->kref); /* one ref for the caller */
- init_completion(&event->completion);
spin_lock(&osdc->event_lock);
event->cookie = ++osdc->event_count;
@@ -1602,7 +1670,6 @@ static void do_event_work(struct work_struct *work)
dout("do_event_work completing %p\n", event);
event->cb(ver, notify_id, opcode, event->data);
- complete(&event->completion);
dout("do_event_work completed %p\n", event);
ceph_osdc_put_event(event);
kfree(event_work);
@@ -1612,7 +1679,8 @@ static void do_event_work(struct work_struct *work)
/*
* Process osd watch notifications
*/
-void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_watch_notify(struct ceph_osd_client *osdc,
+ struct ceph_msg *msg)
{
void *p, *end;
u8 proto_ver;
@@ -1633,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
spin_lock(&osdc->event_lock);
event = __find_event(osdc, cookie);
if (event) {
+ BUG_ON(event->one_shot);
get_event(event);
- if (event->one_shot)
- __remove_event(event);
}
spin_unlock(&osdc->event_lock);
dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1660,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
return;
done_err:
- complete(&event->completion);
ceph_osdc_put_event(event);
return;
@@ -1669,21 +1735,6 @@ bad:
return;
}
-int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
-{
- int err;
-
- dout("wait_event %p\n", event);
- err = wait_for_completion_interruptible_timeout(&event->completion,
- timeout * HZ);
- ceph_osdc_put_event(event);
- if (err > 0)
- err = 0;
- dout("wait_event %p returns %d\n", event, err);
- return err;
-}
-EXPORT_SYMBOL(ceph_osdc_wait_event);
-
/*
* Register request, send initial attempt.
*/
@@ -1698,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
#ifdef CONFIG_BLOCK
req->r_request->bio = req->r_bio;
#endif
- req->r_request->trail = req->r_trail;
+ req->r_request->trail = &req->r_trail;
register_request(osdc, req);
@@ -1857,7 +1908,6 @@ out_mempool:
out:
return err;
}
-EXPORT_SYMBOL(ceph_osdc_init);
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
@@ -1874,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
}
-EXPORT_SYMBOL(ceph_osdc_stop);
/*
* Read some contiguous pages. If we cross a stripe boundary, shorten
@@ -1894,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0, truncate_seq, truncate_size, NULL,
- false, 1, page_align);
+ false, page_align);
if (IS_ERR(req))
return PTR_ERR(req);
@@ -1923,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
struct timespec *mtime,
- struct page **pages, int num_pages,
- int flags, int do_sync, bool nofail)
+ struct page **pages, int num_pages)
{
struct ceph_osd_request *req;
int rc = 0;
@@ -1933,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
BUG_ON(vino.snap != CEPH_NOSNAP);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
CEPH_OSD_OP_WRITE,
- flags | CEPH_OSD_FLAG_ONDISK |
- CEPH_OSD_FLAG_WRITE,
- snapc, do_sync,
+ CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+ snapc, 0,
truncate_seq, truncate_size, mtime,
- nofail, 1, page_align);
+ true, page_align);
if (IS_ERR(req))
return PTR_ERR(req);
@@ -1946,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
dout("writepages %llu~%llu (%d pages)\n", off, len,
req->r_num_pages);
- rc = ceph_osdc_start_request(osdc, req, nofail);
+ rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
@@ -2039,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
if (data_len > 0) {
int want = calc_pages_for(req->r_page_alignment, data_len);
- if (unlikely(req->r_num_pages < want)) {
+ if (req->r_pages && unlikely(req->r_num_pages < want)) {
pr_warning("tid %lld reply has %d bytes %d pages, we"
" had only %d pages ready\n", tid, data_len,
want, req->r_num_pages);