diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 326 |
1 files changed, 178 insertions, 148 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 05be0c18169..6f164289bde 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc); static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void __register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); +static void __unregister_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); static void __unregister_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); +static void __enqueue_request(struct ceph_osd_request *req); static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); @@ -297,12 +300,21 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, /* * requests */ -void ceph_osdc_release_request(struct kref *kref) +static void ceph_osdc_release_request(struct kref *kref) { - struct ceph_osd_request *req; + struct ceph_osd_request *req = container_of(kref, + struct ceph_osd_request, r_kref); unsigned int which; - req = container_of(kref, struct ceph_osd_request, r_kref); + dout("%s %p (r_request %p r_reply %p)\n", __func__, req, + req->r_request, req->r_reply); + WARN_ON(!RB_EMPTY_NODE(&req->r_node)); + WARN_ON(!list_empty(&req->r_req_lru_item)); + WARN_ON(!list_empty(&req->r_osd_item)); + WARN_ON(!list_empty(&req->r_linger_item)); + WARN_ON(!list_empty(&req->r_linger_osd_item)); + WARN_ON(req->r_osd); + if (req->r_request) ceph_msg_put(req->r_request); if (req->r_reply) { @@ -320,7 +332,22 @@ void ceph_osdc_release_request(struct kref *kref) kmem_cache_free(ceph_osd_request_cache, req); } -EXPORT_SYMBOL(ceph_osdc_release_request); + +void ceph_osdc_get_request(struct ceph_osd_request *req) +{ + dout("%s %p (was %d)\n", __func__, req, + atomic_read(&req->r_kref.refcount)); + kref_get(&req->r_kref); +} +EXPORT_SYMBOL(ceph_osdc_get_request); + +void ceph_osdc_put_request(struct ceph_osd_request *req) +{ + dout("%s %p (was %d)\n", __func__, req, + atomic_read(&req->r_kref.refcount)); + kref_put(&req->r_kref, ceph_osdc_release_request); +} +EXPORT_SYMBOL(ceph_osdc_put_request); struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, @@ -364,7 +391,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, RB_CLEAR_NODE(&req->r_node); INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_linger_item); - INIT_LIST_HEAD(&req->r_linger_osd); + INIT_LIST_HEAD(&req->r_linger_osd_item); INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); @@ -404,68 +431,9 @@ EXPORT_SYMBOL(ceph_osdc_alloc_request); static bool osd_req_opcode_valid(u16 opcode) { switch (opcode) { - case CEPH_OSD_OP_READ: - case CEPH_OSD_OP_STAT: - case CEPH_OSD_OP_MAPEXT: - case CEPH_OSD_OP_MASKTRUNC: - case CEPH_OSD_OP_SPARSE_READ: - case CEPH_OSD_OP_NOTIFY: - case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_ASSERT_VER: - case CEPH_OSD_OP_WRITE: - case CEPH_OSD_OP_WRITEFULL: - case CEPH_OSD_OP_TRUNCATE: - case CEPH_OSD_OP_ZERO: - case CEPH_OSD_OP_DELETE: - case CEPH_OSD_OP_APPEND: - case CEPH_OSD_OP_STARTSYNC: - case CEPH_OSD_OP_SETTRUNC: - case CEPH_OSD_OP_TRIMTRUNC: - case CEPH_OSD_OP_TMAPUP: - case CEPH_OSD_OP_TMAPPUT: - case CEPH_OSD_OP_TMAPGET: - case CEPH_OSD_OP_CREATE: - case CEPH_OSD_OP_ROLLBACK: - case CEPH_OSD_OP_WATCH: - case CEPH_OSD_OP_OMAPGETKEYS: - case CEPH_OSD_OP_OMAPGETVALS: - case CEPH_OSD_OP_OMAPGETHEADER: - case CEPH_OSD_OP_OMAPGETVALSBYKEYS: - case CEPH_OSD_OP_OMAPSETVALS: - case CEPH_OSD_OP_OMAPSETHEADER: - case CEPH_OSD_OP_OMAPCLEAR: - case CEPH_OSD_OP_OMAPRMKEYS: - case CEPH_OSD_OP_OMAP_CMP: - case CEPH_OSD_OP_SETALLOCHINT: - case CEPH_OSD_OP_CLONERANGE: - case CEPH_OSD_OP_ASSERT_SRC_VERSION: - case CEPH_OSD_OP_SRC_CMPXATTR: - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_GETXATTRS: - case CEPH_OSD_OP_CMPXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_SETXATTRS: - case CEPH_OSD_OP_RESETXATTRS: - case CEPH_OSD_OP_RMXATTR: - case CEPH_OSD_OP_PULL: - case CEPH_OSD_OP_PUSH: - case CEPH_OSD_OP_BALANCEREADS: - case CEPH_OSD_OP_UNBALANCEREADS: - case CEPH_OSD_OP_SCRUB: - case CEPH_OSD_OP_SCRUB_RESERVE: - case CEPH_OSD_OP_SCRUB_UNRESERVE: - case CEPH_OSD_OP_SCRUB_STOP: - case CEPH_OSD_OP_SCRUB_MAP: - case CEPH_OSD_OP_WRLOCK: - case CEPH_OSD_OP_WRUNLOCK: - case CEPH_OSD_OP_RDLOCK: - case CEPH_OSD_OP_RDUNLOCK: - case CEPH_OSD_OP_UPLOCK: - case CEPH_OSD_OP_DNLOCK: - case CEPH_OSD_OP_CALL: - case CEPH_OSD_OP_PGLS: - case CEPH_OSD_OP_PGLS_FILTER: - return true; +#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; +__CEPH_FORALL_OSD_OPS(GENERATE_CASE) +#undef GENERATE_CASE default: return false; } @@ -868,6 +836,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc, return NULL; } +static void __kick_linger_request(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_osd *osd = req->r_osd; + + /* + * Linger requests need to be resent with a new tid to avoid + * the dup op detection logic on the OSDs. Achieve this with + * a re-register dance instead of open-coding. + */ + ceph_osdc_get_request(req); + if (!list_empty(&req->r_linger_item)) + __unregister_linger_request(osdc, req); + else + __unregister_request(osdc, req); + __register_request(osdc, req); + ceph_osdc_put_request(req); + + /* + * Unless request has been registered as both normal and + * lingering, __unregister{,_linger}_request clears r_osd. + * However, here we need to preserve r_osd to make sure we + * requeue on the same OSD. + */ + WARN_ON(req->r_osd || !osd); + req->r_osd = osd; + + dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid); + __enqueue_request(req); +} + /* * Resubmit requests pending on the given osd. */ @@ -876,12 +875,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc, { struct ceph_osd_request *req, *nreq; LIST_HEAD(resend); + LIST_HEAD(resend_linger); int err; - dout("__kick_osd_requests osd%d\n", osd->o_osd); + dout("%s osd%d\n", __func__, osd->o_osd); err = __reset_osd(osdc, osd); if (err) return; + /* * Build up a list of requests to resend by traversing the * osd's list of requests. Requests for a given object are @@ -902,33 +903,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc, list_for_each_entry(req, &osd->o_requests, r_osd_item) { if (!req->r_sent) break; - list_move_tail(&req->r_req_lru_item, &resend); - dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, - osd->o_osd); - if (!req->r_linger) + + if (!req->r_linger) { + dout("%s requeueing %p tid %llu\n", __func__, req, + req->r_tid); + list_move_tail(&req->r_req_lru_item, &resend); req->r_flags |= CEPH_OSD_FLAG_RETRY; + } else { + list_move_tail(&req->r_req_lru_item, &resend_linger); + } } list_splice(&resend, &osdc->req_unsent); /* - * Linger requests are re-registered before sending, which - * sets up a new tid for each. We add them to the unsent - * list at the end to keep things in tid order. + * Both registered and not yet registered linger requests are + * enqueued with a new tid on the same OSD. We add/move them + * to req_unsent/o_requests at the end to keep things in tid + * order. */ list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, - r_linger_osd) { - /* - * reregister request prior to unregistering linger so - * that r_osd is preserved. - */ - BUG_ON(!list_empty(&req->r_req_lru_item)); - __register_request(osdc, req); - list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); - list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); - __unregister_linger_request(osdc, req); - dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, - osd->o_osd); + r_linger_osd_item) { + WARN_ON(!list_empty(&req->r_req_lru_item)); + __kick_linger_request(req); } + + list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item) + __kick_linger_request(req); } /* @@ -1007,7 +1007,9 @@ static void put_osd(struct ceph_osd *osd) static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { dout("__remove_osd %p\n", osd); - BUG_ON(!list_empty(&osd->o_requests)); + WARN_ON(!list_empty(&osd->o_requests)); + WARN_ON(!list_empty(&osd->o_linger_requests)); + rb_erase(&osd->o_node, &osdc->osds); list_del_init(&osd->o_osd_lru); ceph_con_close(&osd->o_con); @@ -1029,12 +1031,23 @@ static void remove_all_osds(struct ceph_osd_client *osdc) static void __move_osd_to_lru(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - dout("__move_osd_to_lru %p\n", osd); + dout("%s %p\n", __func__, osd); BUG_ON(!list_empty(&osd->o_osd_lru)); + list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; } +static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, + struct ceph_osd *osd) +{ + dout("%s %p\n", __func__, osd); + + if (list_empty(&osd->o_requests) && + list_empty(&osd->o_linger_requests)) + __move_osd_to_lru(osdc, osd); +} + static void __remove_osd_from_lru(struct ceph_osd *osd) { dout("__remove_osd_from_lru %p\n", osd); @@ -1175,6 +1188,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, dout("__unregister_request %p tid %lld\n", req, req->r_tid); rb_erase(&req->r_node, &osdc->requests); + RB_CLEAR_NODE(&req->r_node); osdc->num_requests--; if (req->r_osd) { @@ -1182,12 +1196,8 @@ static void __unregister_request(struct ceph_osd_client *osdc, ceph_msg_revoke(req->r_request); list_del_init(&req->r_osd_item); - if (list_empty(&req->r_osd->o_requests) && - list_empty(&req->r_osd->o_linger_requests)) { - dout("moving osd to %p lru\n", req->r_osd); - __move_osd_to_lru(osdc, req->r_osd); - } - if (list_empty(&req->r_linger_item)) + maybe_move_osd_to_lru(osdc, req->r_osd); + if (list_empty(&req->r_linger_osd_item)) req->r_osd = NULL; } @@ -1214,44 +1224,40 @@ static void __cancel_request(struct ceph_osd_request *req) static void __register_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - dout("__register_linger_request %p\n", req); + dout("%s %p tid %llu\n", __func__, req, req->r_tid); + WARN_ON(!req->r_linger); + ceph_osdc_get_request(req); list_add_tail(&req->r_linger_item, &osdc->req_linger); if (req->r_osd) - list_add_tail(&req->r_linger_osd, + list_add_tail(&req->r_linger_osd_item, &req->r_osd->o_linger_requests); } static void __unregister_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - dout("__unregister_linger_request %p\n", req); + WARN_ON(!req->r_linger); + + if (list_empty(&req->r_linger_item)) { + dout("%s %p tid %llu not registered\n", __func__, req, + req->r_tid); + return; + } + + dout("%s %p tid %llu\n", __func__, req, req->r_tid); list_del_init(&req->r_linger_item); - if (req->r_osd) { - list_del_init(&req->r_linger_osd); - if (list_empty(&req->r_osd->o_requests) && - list_empty(&req->r_osd->o_linger_requests)) { - dout("moving osd to %p lru\n", req->r_osd); - __move_osd_to_lru(osdc, req->r_osd); - } + if (req->r_osd) { + list_del_init(&req->r_linger_osd_item); + maybe_move_osd_to_lru(osdc, req->r_osd); if (list_empty(&req->r_osd_item)) req->r_osd = NULL; } - ceph_osdc_put_request(req); -} -void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - mutex_lock(&osdc->request_mutex); - if (req->r_linger) { - req->r_linger = 0; - __unregister_linger_request(osdc, req); - } - mutex_unlock(&osdc->request_mutex); + list_del_init(&req->r_req_lru_item); /* can be on notarget */ + ceph_osdc_put_request(req); } -EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, struct ceph_osd_request *req) @@ -1318,6 +1324,22 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap, &req->r_target_oid, pg_out); } +static void __enqueue_request(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + + dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); + + if (req->r_osd) { + __remove_osd_from_lru(req->r_osd); + list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); + list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); + } else { + list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); + } +} + /* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is @@ -1375,6 +1397,7 @@ static int __map_request(struct ceph_osd_client *osdc, if (req->r_osd) { __cancel_request(req); list_del_init(&req->r_osd_item); + list_del_init(&req->r_linger_osd_item); req->r_osd = NULL; } @@ -1395,13 +1418,7 @@ static int __map_request(struct ceph_osd_client *osdc, &osdc->osdmap->osd_addr[o]); } - if (req->r_osd) { - __remove_osd_from_lru(req->r_osd); - list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); - list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); - } else { - list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); - } + __enqueue_request(req); err = 1; /* osd or pg changed */ out: @@ -1746,8 +1763,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, } bytes = le32_to_cpu(msg->hdr.data_len); if (payload_len != bytes) { - pr_warning("sum of op payload lens %d != data_len %d", - payload_len, bytes); + pr_warn("sum of op payload lens %d != data_len %d\n", + payload_len, bytes); goto bad_put; } @@ -2285,26 +2302,21 @@ static void handle_watch_notify(struct ceph_osd_client *osdc, if (event) { event_work = kmalloc(sizeof(*event_work), GFP_NOIO); if (!event_work) { - dout("ERROR: could not allocate event_work\n"); - goto done_err; + pr_err("couldn't allocate event_work\n"); + ceph_osdc_put_event(event); + return; } INIT_WORK(&event_work->work, do_event_work); event_work->event = event; event_work->ver = ver; event_work->notify_id = notify_id; event_work->opcode = opcode; - if (!queue_work(osdc->notify_wq, &event_work->work)) { - dout("WARNING: failed to queue notify event work\n"); - goto done_err; - } + + queue_work(osdc->notify_wq, &event_work->work); } return; -done_err: - ceph_osdc_put_event(event); - return; - bad: pr_err("osdc handle_watch_notify corrupt msg\n"); } @@ -2430,6 +2442,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_start_request); /* + * Unregister a registered request. The request is not completed (i.e. + * no callbacks or wakeups) - higher layers are supposed to know what + * they are canceling. + */ +void ceph_osdc_cancel_request(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + + mutex_lock(&osdc->request_mutex); + if (req->r_linger) + __unregister_linger_request(osdc, req); + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); + + dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); +} +EXPORT_SYMBOL(ceph_osdc_cancel_request); + +/* * wait for a request to complete */ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, @@ -2437,18 +2468,18 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, { int rc; + dout("%s %p tid %llu\n", __func__, req, req->r_tid); + rc = wait_for_completion_interruptible(&req->r_completion); if (rc < 0) { - mutex_lock(&osdc->request_mutex); - __cancel_request(req); - __unregister_request(osdc, req); - mutex_unlock(&osdc->request_mutex); + dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); + ceph_osdc_cancel_request(req); complete_request(req); - dout("wait_request tid %llu canceled/timed out\n", req->r_tid); return rc; } - dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); + dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid, + req->r_result); return req->r_result; } EXPORT_SYMBOL(ceph_osdc_wait_request); @@ -2750,10 +2781,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ceph_msg_revoke_incoming(req->r_reply); if (front_len > req->r_reply->front_alloc_len) { - pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", - front_len, req->r_reply->front_alloc_len, - (unsigned int)con->peer_name.type, - le64_to_cpu(con->peer_name.num)); + pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", + front_len, req->r_reply->front_alloc_len, + (unsigned int)con->peer_name.type, + le64_to_cpu(con->peer_name.num)); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, false); if (!m) @@ -2776,8 +2807,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (osd_data->pages && unlikely(osd_data->length < data_len)) { - pr_warning("tid %lld reply has %d bytes " - "we had only %llu bytes ready\n", + pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n", tid, data_len, osd_data->length); *skip = 1; ceph_msg_put(m); |