diff options
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r-- | fs/ceph/osd_client.c | 124 |
1 files changed, 55 insertions, 69 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index c7b4dedaace..afa7bb3895c 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -16,7 +16,7 @@ #define OSD_OP_FRONT_LEN 4096 #define OSD_OPREPLY_FRONT_LEN 512 -const static struct ceph_connection_operations osd_con_ops; +static const struct ceph_connection_operations osd_con_ops; static int __kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *kickosd); @@ -147,7 +147,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req = kzalloc(sizeof(*req), GFP_NOFS); } if (req == NULL) - return ERR_PTR(-ENOMEM); + return NULL; req->r_osdc = osdc; req->r_mempool = use_mempool; @@ -164,10 +164,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, - OSD_OPREPLY_FRONT_LEN, 0, 0, NULL); - if (IS_ERR(msg)) { + OSD_OPREPLY_FRONT_LEN, GFP_NOFS); + if (!msg) { ceph_osdc_put_request(req); - return ERR_PTR(PTR_ERR(msg)); + return NULL; } req->r_reply = msg; @@ -178,10 +178,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); - if (IS_ERR(msg)) { + msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS); + if (!msg) { ceph_osdc_put_request(req); - return ERR_PTR(PTR_ERR(msg)); + return NULL; } msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); memset(msg->front.iov_base, 0, msg->front.iov_len); @@ -565,7 +565,8 @@ static int __map_osds(struct ceph_osd_client *osdc, { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; - int o = -1; + int acting[CEPH_PG_MAX_SIZE]; + int o = -1, num = 0; int err; dout("map_osds %p tid %lld\n", req, req->r_tid); @@ -576,10 +577,16 @@ static int __map_osds(struct ceph_osd_client *osdc, pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; - o = ceph_calc_pg_primary(osdc->osdmap, pgid); + err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); + if (err > 0) { + o = acting[0]; + num = err; + } if ((req->r_osd && req->r_osd->o_osd == o && - req->r_sent >= req->r_osd->o_incarnation) || + req->r_sent >= req->r_osd->o_incarnation && + req->r_num_pg_osds == num && + memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || (req->r_osd == NULL && o == -1)) return 0; /* no change */ @@ -587,6 +594,10 @@ static int __map_osds(struct ceph_osd_client *osdc, req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, req->r_osd ? req->r_osd->o_osd : -1); + /* record full pg acting set */ + memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); + req->r_num_pg_osds = num; + if (req->r_osd) { __cancel_request(req); list_del_init(&req->r_osd_item); @@ -612,7 +623,7 @@ static int __map_osds(struct ceph_osd_client *osdc, __remove_osd_from_lru(req->r_osd); list_add(&req->r_osd_item, &req->r_osd->o_requests); } - err = 1; /* osd changed */ + err = 1; /* osd or pg changed */ out: return err; @@ -704,7 +715,7 @@ static void handle_timeout(struct work_struct *work) * should mark the osd as failed and we should find out about * it from an updated osd map. */ - while (!list_empty(&osdc->req_lru)) { + while (timeout && !list_empty(&osdc->req_lru)) { req = list_entry(osdc->req_lru.next, struct ceph_osd_request, r_req_lru_item); @@ -779,16 +790,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_osd_request *req; u64 tid; int numops, object_len, flags; + s32 result; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*rhead)) goto bad; numops = le32_to_cpu(rhead->num_ops); object_len = le32_to_cpu(rhead->object_len); + result = le32_to_cpu(rhead->result); if (msg->front.iov_len != sizeof(*rhead) + object_len + numops * sizeof(struct ceph_osd_op)) goto bad; - dout("handle_reply %p tid %llu\n", msg, tid); + dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); /* lookup */ mutex_lock(&osdc->request_mutex); @@ -834,7 +847,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply tid %llu flags %d\n", tid, flags); /* either this is a read, or we got the safe response */ - if ((flags & CEPH_OSD_FLAG_ONDISK) || + if (result < 0 || + (flags & CEPH_OSD_FLAG_ONDISK) || ((flags & CEPH_OSD_FLAG_WRITE) == 0)) __unregister_request(osdc, req); @@ -1064,6 +1078,7 @@ done: if (newmap) kick_requests(osdc, NULL); up_read(&osdc->map_sem); + wake_up(&osdc->client->auth_wq); return; bad: @@ -1073,45 +1088,6 @@ bad: return; } - -/* - * A read request prepares specific pages that data is to be read into. - * When a message is being read off the wire, we call prepare_pages to - * find those pages. - * 0 = success, -1 failure. - */ -static int __prepare_pages(struct ceph_connection *con, - struct ceph_msg_header *hdr, - struct ceph_osd_request *req, - u64 tid, - struct ceph_msg *m) -{ - struct ceph_osd *osd = con->private; - struct ceph_osd_client *osdc; - int ret = -1; - int data_len = le32_to_cpu(hdr->data_len); - unsigned data_off = le16_to_cpu(hdr->data_off); - - int want = calc_pages_for(data_off & ~PAGE_MASK, data_len); - - if (!osd) - return -1; - - osdc = osd->o_osdc; - - dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m, - tid, req->r_num_pages, want); - if (unlikely(req->r_num_pages < want)) - goto out; - m->pages = req->r_pages; - m->nr_pages = req->r_num_pages; - ret = 0; /* success */ -out: - BUG_ON(ret < 0 || m->nr_pages < want); - - return ret; -} - /* * Register request, send initial attempt. */ @@ -1238,11 +1214,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->req_mempool) goto out; - err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true); + err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, + "osd_op"); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, - OSD_OPREPLY_FRONT_LEN, 10, true); + OSD_OPREPLY_FRONT_LEN, 10, true, + "osd_op_reply"); if (err < 0) goto out_msgpool; return 0; @@ -1288,8 +1266,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, false, 1); - if (IS_ERR(req)) - return PTR_ERR(req); + if (!req) + return -ENOMEM; /* it may be a short read due to an object boundary */ req->r_pages = pages; @@ -1331,8 +1309,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, snapc, do_sync, truncate_seq, truncate_size, mtime, nofail, 1); - if (IS_ERR(req)) - return PTR_ERR(req); + if (!req) + return -ENOMEM; /* it may be a short write due to an object boundary */ req->r_pages = pages; @@ -1380,7 +1358,8 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) } /* - * lookup and return message for incoming reply + * lookup and return message for incoming reply. set up reply message + * pages. */ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, @@ -1393,7 +1372,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, int front = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid; - int err; tid = le64_to_cpu(hdr->tid); mutex_lock(&osdc->request_mutex); @@ -1411,13 +1389,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_con_put(req->r_con_filling_msg); + req->r_con_filling_msg = NULL; } if (front > req->r_reply->front.iov_len) { pr_warning("get_reply front %d > preallocated %d\n", front, (int)req->r_reply->front.iov_len); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL); - if (IS_ERR(m)) + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); + if (!m) goto out; ceph_msg_put(req->r_reply); req->r_reply = m; @@ -1425,12 +1404,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_get(req->r_reply); if (data_len > 0) { - err = __prepare_pages(con, hdr, req, tid, m); - if (err < 0) { + unsigned data_off = le16_to_cpu(hdr->data_off); + int want = calc_pages_for(data_off & ~PAGE_MASK, data_len); + + if (unlikely(req->r_num_pages < want)) { + pr_warning("tid %lld reply %d > expected %d pages\n", + tid, want, m->nr_pages); *skip = 1; ceph_msg_put(m); - m = ERR_PTR(err); + m = NULL; + goto out; } + m->pages = req->r_pages; + m->nr_pages = req->r_num_pages; } *skip = 0; req->r_con_filling_msg = ceph_con_get(con); @@ -1452,7 +1438,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, switch (type) { case CEPH_MSG_OSD_MAP: - return ceph_msg_new(type, front, 0, 0, NULL); + return ceph_msg_new(type, front, GFP_NOFS); case CEPH_MSG_OSD_OPREPLY: return get_reply(con, hdr, skip); default: @@ -1538,7 +1524,7 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } -const static struct ceph_connection_operations osd_con_ops = { +static const struct ceph_connection_operations osd_con_ops = { .get = get_osd_con, .put = put_osd_con, .dispatch = dispatch, |