diff options
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/Kconfig | 4 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 27 | ||||
-rw-r--r-- | net/ceph/ceph_strings.c | 39 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 15 | ||||
-rw-r--r-- | net/ceph/crypto.c | 7 | ||||
-rw-r--r-- | net/ceph/debugfs.c | 29 | ||||
-rw-r--r-- | net/ceph/messenger.c | 283 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 2 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 669 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 290 | ||||
-rw-r--r-- | net/ceph/pagevec.c | 24 |
11 files changed, 785 insertions, 604 deletions
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig index cc04dd667a1..e50cc69ae8c 100644 --- a/net/ceph/Kconfig +++ b/net/ceph/Kconfig @@ -1,6 +1,6 @@ config CEPH_LIB - tristate "Ceph core library (EXPERIMENTAL)" - depends on INET && EXPERIMENTAL + tristate "Ceph core library" + depends on INET select LIBCRC32C select CRYPTO_AES select CRYPTO diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index ee71ea26777..e65e6e4be38 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -15,6 +15,8 @@ #include <linux/slab.h> #include <linux/statfs.h> #include <linux/string.h> +#include <linux/nsproxy.h> +#include <net/net_namespace.h> #include <linux/ceph/ceph_features.h> @@ -26,6 +28,22 @@ #include "crypto.h" +/* + * Module compatibility interface. For now it doesn't do anything, + * but its existence signals a certain level of functionality. + * + * The data buffer is used to pass information both to and from + * libceph. The return value indicates whether libceph determines + * it is compatible with the caller (from another kernel module), + * given the provided data. + * + * The data pointer can be null. + */ +bool libceph_compatible(void *data) +{ + return true; +} +EXPORT_SYMBOL(libceph_compatible); /* * find filename portion of a path (/foo/bar/baz -> baz) @@ -292,6 +310,9 @@ ceph_parse_options(char *options, const char *dev_name, int err = -ENOMEM; substring_t argstr[MAX_OPT_ARGS]; + if (current->nsproxy->net_ns != &init_net) + return ERR_PTR(-EINVAL); + opt = kzalloc(sizeof(*opt), GFP_KERNEL); if (!opt) return ERR_PTR(-ENOMEM); @@ -585,10 +606,8 @@ static int __init init_ceph_lib(void) if (ret < 0) goto out_crypto; - pr_info("loaded (mon/osd proto %d/%d, osdmap %d/%d %d/%d)\n", - CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL, - CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT, - CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT); + pr_info("loaded (mon/osd proto %d/%d)\n", + CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL); return 0; diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 3fbda04de29..1348df96fe1 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op) switch (op) { case CEPH_OSD_OP_READ: return "read"; case CEPH_OSD_OP_STAT: return "stat"; + case CEPH_OSD_OP_MAPEXT: return "mapext"; + case CEPH_OSD_OP_SPARSE_READ: return "sparse-read"; + case CEPH_OSD_OP_NOTIFY: return "notify"; + case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack"; + case CEPH_OSD_OP_ASSERT_VER: return "assert-version"; case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; + case CEPH_OSD_OP_CREATE: return "create"; case CEPH_OSD_OP_WRITE: return "write"; case CEPH_OSD_OP_DELETE: return "delete"; case CEPH_OSD_OP_TRUNCATE: return "truncate"; @@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TMAPUP: return "tmapup"; case CEPH_OSD_OP_TMAPGET: return "tmapget"; case CEPH_OSD_OP_TMAPPUT: return "tmapput"; + case CEPH_OSD_OP_WATCH: return "watch"; + + case CEPH_OSD_OP_CLONERANGE: return "clonerange"; + case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version"; + case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr"; case CEPH_OSD_OP_GETXATTR: return "getxattr"; case CEPH_OSD_OP_GETXATTRS: return "getxattrs"; @@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; case CEPH_OSD_OP_SCRUB: return "scrub"; + case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve"; + case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve"; + case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop"; + case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map"; case CEPH_OSD_OP_WRLOCK: return "wrlock"; case CEPH_OSD_OP_WRUNLOCK: return "wrunlock"; @@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_CALL: return "call"; case CEPH_OSD_OP_PGLS: return "pgls"; + case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter"; + case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys"; + case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals"; + case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header"; + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys"; + case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals"; + case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header"; + case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear"; + case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys"; } return "???"; } +const char *ceph_osd_state_name(int s) +{ + switch (s) { + case CEPH_OSD_EXISTS: + return "exists"; + case CEPH_OSD_UP: + return "up"; + case CEPH_OSD_AUTOOUT: + return "autoout"; + case CEPH_OSD_NEW: + return "new"; + default: + return "???"; + } +} const char *ceph_pool_op_name(int op) { diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 35fce755ce1..cbd06a91941 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in * @outpos: our position in that vector * @firstn: true if choosing "first n" items, false if choosing "indep" * @recurse_to_leaf: true if we want one device under each item of given type + * @descend_once: true if we should only try one descent before giving up * @out2: second output vector for leaf items (if @recurse_to_leaf) */ static int crush_choose(const struct crush_map *map, @@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, - int *out2) + int descend_once, int *out2) { int rep; unsigned int ftotal, flocal; @@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map, } reject = 0; - if (recurse_to_leaf) { + if (!collide && recurse_to_leaf) { if (item < 0) { if (crush_choose(map, map->buckets[-1-item], @@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map, x, outpos+1, 0, out2, outpos, firstn, 0, + map->chooseleaf_descend_once, NULL) <= outpos) /* didn't get leaf */ reject = 1; @@ -422,7 +424,10 @@ reject: ftotal++; flocal++; - if (collide && flocal <= map->choose_local_tries) + if (reject && descend_once) + /* let outer call try again */ + skip_rep = 1; + else if (collide && flocal <= map->choose_local_tries) /* retry locally a few times */ retry_bucket = 1; else if (map->choose_local_fallback_tries > 0 && @@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map, int i, j; int numrep; int firstn; + const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, firstn, - recurse_to_leaf, c+osize); + recurse_to_leaf, + descend_once, c+osize); } if (recurse_to_leaf) diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index af14cb42516..6e7a236525b 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -423,7 +423,8 @@ int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len, } } -int ceph_key_instantiate(struct key *key, struct key_preparsed_payload *prep) +static int ceph_key_instantiate(struct key *key, + struct key_preparsed_payload *prep) { struct ceph_crypto_key *ckey; size_t datalen = prep->datalen; @@ -458,12 +459,12 @@ err: return ret; } -int ceph_key_match(const struct key *key, const void *description) +static int ceph_key_match(const struct key *key, const void *description) { return strcmp(key->description, description) == 0; } -void ceph_key_destroy(struct key *key) { +static void ceph_key_destroy(struct key *key) { struct ceph_crypto_key *ckey = key->payload.data; ceph_crypto_key_destroy(ckey); diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 38b5dc1823d..00d051f4894 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -66,9 +66,9 @@ static int osdmap_show(struct seq_file *s, void *p) for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { struct ceph_pg_pool_info *pool = rb_entry(n, struct ceph_pg_pool_info, node); - seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n", - pool->id, pool->v.pg_num, pool->pg_num_mask, - pool->v.lpg_num, pool->lpg_num_mask); + seq_printf(s, "pg_pool %llu pg_num %d / %d\n", + (unsigned long long)pool->id, pool->pg_num, + pool->pg_num_mask); } for (i = 0; i < client->osdc.osdmap->max_osd; i++) { struct ceph_entity_addr *addr = @@ -123,26 +123,16 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; - struct ceph_osd_request_head *head; - struct ceph_osd_op *op; - int num_ops; - int opcode, olen; + int opcode; int i; req = rb_entry(p, struct ceph_osd_request, r_node); - seq_printf(s, "%lld\tosd%d\t%d.%x\t", req->r_tid, + seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, req->r_osd ? req->r_osd->o_osd : -1, - le32_to_cpu(req->r_pgid.pool), - le16_to_cpu(req->r_pgid.ps)); + req->r_pgid.pool, req->r_pgid.seed); - head = req->r_request->front.iov_base; - op = (void *)(head + 1); - - num_ops = le16_to_cpu(head->num_ops); - olen = le32_to_cpu(head->object_len); - seq_printf(s, "%.*s", olen, - (const char *)(head->ops + num_ops)); + seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", @@ -151,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp) else seq_printf(s, "\t"); - for (i = 0; i < num_ops; i++) { - opcode = le16_to_cpu(op->op); + for (i = 0; i < req->r_num_ops; i++) { + opcode = le16_to_cpu(req->r_request_ops[i].op); seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); - op++; } seq_printf(s, "\n"); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4d111fd2b49..2c0669fb54e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -9,8 +9,9 @@ #include <linux/slab.h> #include <linux/socket.h> #include <linux/string.h> +#ifdef CONFIG_BLOCK #include <linux/bio.h> -#include <linux/blkdev.h> +#endif /* CONFIG_BLOCK */ #include <linux/dns_resolver.h> #include <net/tcp.h> @@ -97,6 +98,57 @@ #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ +static bool con_flag_valid(unsigned long con_flag) +{ + switch (con_flag) { + case CON_FLAG_LOSSYTX: + case CON_FLAG_KEEPALIVE_PENDING: + case CON_FLAG_WRITE_PENDING: + case CON_FLAG_SOCK_CLOSED: + case CON_FLAG_BACKOFF: + return true; + default: + return false; + } +} + +static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + clear_bit(con_flag, &con->flags); +} + +static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + set_bit(con_flag, &con->flags); +} + +static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_bit(con_flag, &con->flags); +} + +static bool con_flag_test_and_clear(struct ceph_connection *con, + unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_and_clear_bit(con_flag, &con->flags); +} + +static bool con_flag_test_and_set(struct ceph_connection *con, + unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_and_set_bit(con_flag, &con->flags); +} + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -114,7 +166,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); -static void ceph_fault(struct ceph_connection *con); +static void con_fault(struct ceph_connection *con); /* * Nicely render a sockaddr as a string. An array of formatted @@ -171,7 +223,7 @@ static void encode_my_addr(struct ceph_messenger *msgr) */ static struct workqueue_struct *ceph_msgr_wq; -void _ceph_msgr_exit(void) +static void _ceph_msgr_exit(void) { if (ceph_msgr_wq) { destroy_workqueue(ceph_msgr_wq); @@ -308,7 +360,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { + if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -333,7 +385,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_flag_set(con, CON_FLAG_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: @@ -474,7 +526,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ - clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_flag_clear(con, CON_FLAG_SOCK_CLOSED); con_sock_state_closed(con); return rc; @@ -506,6 +558,7 @@ static void reset_connection(struct ceph_connection *con) { /* reset connection, out_queue, msg_ and connect_seq */ /* discard existing out_queue and msg_seq */ + dout("reset_connection %p\n", con); ceph_msg_remove_list(&con->out_queue); ceph_msg_remove_list(&con->out_sent); @@ -537,11 +590,10 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); con->state = CON_STATE_CLOSED; - clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ - clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); - clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); - clear_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ + con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); + con_flag_clear(con, CON_FLAG_BACKOFF); reset_connection(con); con->peer_global_seq = 0; @@ -561,7 +613,7 @@ void ceph_con_open(struct ceph_connection *con, mutex_lock(&con->mutex); dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); - BUG_ON(con->state != CON_STATE_CLOSED); + WARN_ON(con->state != CON_STATE_CLOSED); con->state = CON_STATE_PREOPEN; con->peer_name.type = (__u8) entity_type; @@ -797,7 +849,7 @@ static void prepare_write_message(struct ceph_connection *con) /* no, queue up footer too and be done */ prepare_write_message_footer(con); - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -818,7 +870,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -829,7 +881,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -872,7 +924,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) @@ -922,7 +974,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); return 0; } @@ -1506,13 +1558,6 @@ static int process_banner(struct ceph_connection *con) return 0; } -static void fail_protocol(struct ceph_connection *con) -{ - reset_connection(con); - BUG_ON(con->state != CON_STATE_NEGOTIATING); - con->state = CON_STATE_CLOSED; -} - static int process_connect(struct ceph_connection *con) { u64 sup_feat = con->msgr->supported_features; @@ -1530,7 +1575,7 @@ static int process_connect(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr), sup_feat, server_feat, server_feat & ~sup_feat); con->error_msg = "missing required protocol features"; - fail_protocol(con); + reset_connection(con); return -1; case CEPH_MSGR_TAG_BADPROTOVER: @@ -1541,7 +1586,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.protocol_version), le32_to_cpu(con->in_reply.protocol_version)); con->error_msg = "protocol version mismatch"; - fail_protocol(con); + reset_connection(con); return -1; case CEPH_MSGR_TAG_BADAUTHORIZER: @@ -1631,11 +1676,11 @@ static int process_connect(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr), req_feat, server_feat, req_feat & ~server_feat); con->error_msg = "missing required protocol features"; - fail_protocol(con); + reset_connection(con); return -1; } - BUG_ON(con->state != CON_STATE_NEGOTIATING); + WARN_ON(con->state != CON_STATE_NEGOTIATING); con->state = CON_STATE_OPEN; con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); @@ -1649,7 +1694,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(CON_FLAG_LOSSYTX, &con->flags); + con_flag_set(con, CON_FLAG_LOSSYTX); con->delay = 0; /* reset backoff memory */ @@ -2086,15 +2131,14 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, - &con->flags)) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2132,7 +2176,6 @@ more: if (ret < 0) goto out; - BUG_ON(con->state != CON_STATE_CONNECTING); con->state = CON_STATE_NEGOTIATING; /* @@ -2160,7 +2203,7 @@ more: goto more; } - BUG_ON(con->state != CON_STATE_OPEN); + WARN_ON(con->state != CON_STATE_OPEN); if (con->in_base_pos < 0) { /* @@ -2275,7 +2318,7 @@ static void queue_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { - if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) + if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) return false; #define CASE(x) \ @@ -2302,6 +2345,41 @@ static bool con_sock_closed(struct ceph_connection *con) return true; } +static bool con_backoff(struct ceph_connection *con) +{ + int ret; + + if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) + return false; + + ret = queue_con_delay(con, round_jiffies_relative(con->delay)); + if (ret) { + dout("%s: con %p FAILED to back off %lu\n", __func__, + con, con->delay); + BUG_ON(ret == -ENOENT); + con_flag_set(con, CON_FLAG_BACKOFF); + } + + return true; +} + +/* Finish fault handling; con->mutex must *not* be held here */ + +static void con_fault_finish(struct ceph_connection *con) +{ + /* + * in case we faulted due to authentication, invalidate our + * current tickets so that we can get new ones. + */ + if (con->auth_retry && con->ops->invalidate_authorizer) { + dout("calling invalidate_authorizer()\n"); + con->ops->invalidate_authorizer(con); + } + + if (con->ops->fault) + con->ops->fault(con); +} + /* * Do some work on a connection. Drop a connection ref when we're done. */ @@ -2309,89 +2387,84 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); - int ret; + bool fault; mutex_lock(&con->mutex); -restart: - if (con_sock_closed(con)) - goto fault; + while (true) { + int ret; - if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { - dout("con_work %p backing off\n", con); - ret = queue_con_delay(con, round_jiffies_relative(con->delay)); - if (ret) { - dout("con_work %p FAILED to back off %lu\n", con, - con->delay); - BUG_ON(ret == -ENOENT); - set_bit(CON_FLAG_BACKOFF, &con->flags); + if ((fault = con_sock_closed(con))) { + dout("%s: con %p SOCK_CLOSED\n", __func__, con); + break; + } + if (con_backoff(con)) { + dout("%s: con %p BACKOFF\n", __func__, con); + break; + } + if (con->state == CON_STATE_STANDBY) { + dout("%s: con %p STANDBY\n", __func__, con); + break; + } + if (con->state == CON_STATE_CLOSED) { + dout("%s: con %p CLOSED\n", __func__, con); + BUG_ON(con->sock); + break; + } + if (con->state == CON_STATE_PREOPEN) { + dout("%s: con %p PREOPEN\n", __func__, con); + BUG_ON(con->sock); } - goto done; - } - if (con->state == CON_STATE_STANDBY) { - dout("con_work %p STANDBY\n", con); - goto done; - } - if (con->state == CON_STATE_CLOSED) { - dout("con_work %p CLOSED\n", con); - BUG_ON(con->sock); - goto done; - } - if (con->state == CON_STATE_PREOPEN) { - dout("con_work OPENING\n"); - BUG_ON(con->sock); - } + ret = try_read(con); + if (ret < 0) { + if (ret == -EAGAIN) + continue; + con->error_msg = "socket error on read"; + fault = true; + break; + } - ret = try_read(con); - if (ret == -EAGAIN) - goto restart; - if (ret < 0) { - con->error_msg = "socket error on read"; - goto fault; - } + ret = try_write(con); + if (ret < 0) { + if (ret == -EAGAIN) + continue; + con->error_msg = "socket error on write"; + fault = true; + } - ret = try_write(con); - if (ret == -EAGAIN) - goto restart; - if (ret < 0) { - con->error_msg = "socket error on write"; - goto fault; + break; /* If we make it to here, we're done */ } - -done: + if (fault) + con_fault(con); mutex_unlock(&con->mutex); -done_unlocked: - con->ops->put(con); - return; -fault: - ceph_fault(con); /* error/fault path */ - goto done_unlocked; -} + if (fault) + con_fault_finish(con); + con->ops->put(con); +} /* * Generic error/fault handler. A retry mechanism is used with * exponential backoff */ -static void ceph_fault(struct ceph_connection *con) - __releases(con->mutex) +static void con_fault(struct ceph_connection *con) { pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); dout("fault %p state %lu to peer %s\n", con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); - BUG_ON(con->state != CON_STATE_CONNECTING && + WARN_ON(con->state != CON_STATE_CONNECTING && con->state != CON_STATE_NEGOTIATING && con->state != CON_STATE_OPEN); con_close_socket(con); - if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { + if (con_flag_test(con, CON_FLAG_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CON_STATE_CLOSED; - goto out_unlock; + return; } if (con->in_msg) { @@ -2408,9 +2481,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { + !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ @@ -2419,23 +2492,9 @@ static void ceph_fault(struct ceph_connection *con) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) con->delay *= 2; - set_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_set(con, CON_FLAG_BACKOFF); queue_con(con); } - -out_unlock: - mutex_unlock(&con->mutex); - /* - * in case we faulted due to authentication, invalidate our - * current tickets so that we can get new ones. - */ - if (con->auth_retry && con->ops->invalidate_authorizer) { - dout("calling invalidate_authorizer()\n"); - con->ops->invalidate_authorizer(con); - } - - if (con->ops->fault) - con->ops->fault(con); } @@ -2476,8 +2535,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CON_STATE_PREOPEN; con->connect_seq++; - WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); - WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); + WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); + WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); } } @@ -2518,7 +2577,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ - if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) + if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2607,8 +2666,8 @@ void ceph_con_keepalive(struct ceph_connection *con) mutex_lock(&con->mutex); clear_standby(con); mutex_unlock(&con->mutex); - if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && - test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) + if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && + con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); @@ -2658,9 +2717,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, m->page_alignment = 0; m->pages = NULL; m->pagelist = NULL; +#ifdef CONFIG_BLOCK m->bio = NULL; m->bio_iter = NULL; m->bio_seg = 0; +#endif /* CONFIG_BLOCK */ m->trail = NULL; /* front */ diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 812eb3b46c1..aef5b1062be 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -697,7 +697,7 @@ int ceph_monc_delete_snapid(struct ceph_mon_client *monc, u32 pool, u64 snapid) { return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, - pool, snapid, 0, 0); + pool, snapid, NULL, 0); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 780caf6b049..d730dd4d8eb 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -23,7 +23,7 @@ static const struct ceph_connection_operations osd_con_ops; -static void send_queued(struct ceph_osd_client *osdc); +static void __send_queued(struct ceph_osd_client *osdc); static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void __register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); @@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); -static int op_needs_trail(int op) -{ - switch (op) { - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - case CEPH_OSD_OP_CALL: - case CEPH_OSD_OP_NOTIFY: - return 1; - default: - return 0; - } -} - static int op_has_extent(int op) { return (op == CEPH_OSD_OP_READ || op == CEPH_OSD_OP_WRITE); } -int ceph_calc_raw_layout(struct ceph_osd_client *osdc, - struct ceph_file_layout *layout, - u64 snapid, - u64 off, u64 *plen, u64 *bno, - struct ceph_osd_request *req, - struct ceph_osd_req_op *op) -{ - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - u64 orig_len = *plen; - u64 objoff, objlen; /* extent in object */ - int r; - - reqhead->snapid = cpu_to_le64(snapid); - - /* object extent? */ - r = ceph_calc_file_object_mapping(layout, off, plen, bno, - &objoff, &objlen); - if (r < 0) - return r; - if (*plen < orig_len) - dout(" skipping last %llu, final file extent %llu~%llu\n", - orig_len - *plen, off, *plen); - - if (op_has_extent(op->op)) { - op->extent.offset = objoff; - op->extent.length = objlen; - } - req->r_num_pages = calc_pages_for(off, *plen); - req->r_page_alignment = off & ~PAGE_MASK; - if (op->op == CEPH_OSD_OP_WRITE) - op->payload_len = *plen; - - dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", - *bno, objoff, objlen, req->r_num_pages); - return 0; -} -EXPORT_SYMBOL(ceph_calc_raw_layout); - /* * Implement client access to distributed object storage cluster. * @@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout); * * fill osd op in request message. */ -static int calc_layout(struct ceph_osd_client *osdc, - struct ceph_vino vino, +static int calc_layout(struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 *plen, struct ceph_osd_request *req, struct ceph_osd_req_op *op) { - u64 bno; + u64 orig_len = *plen; + u64 bno = 0; + u64 objoff = 0; + u64 objlen = 0; int r; - r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, - plen, &bno, req, op); + /* object extent? */ + r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno, + &objoff, &objlen); if (r < 0) return r; + if (objlen < orig_len) { + *plen = objlen; + dout(" skipping last %llu, final file extent %llu~%llu\n", + orig_len - *plen, off, *plen); + } + + if (op_has_extent(op->op)) { + u32 osize = le32_to_cpu(layout->fl_object_size); + op->extent.offset = objoff; + op->extent.length = objlen; + if (op->extent.truncate_size <= off - objoff) { + op->extent.truncate_size = 0; + } else { + op->extent.truncate_size -= off - objoff; + if (op->extent.truncate_size > osize) + op->extent.truncate_size = osize; + } + } + req->r_num_pages = calc_pages_for(off, *plen); + req->r_page_alignment = off & ~PAGE_MASK; + if (op->op == CEPH_OSD_OP_WRITE) + op->payload_len = *plen; + + dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", + bno, objoff, objlen, req->r_num_pages); snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); req->r_oid_len = strlen(req->r_oid); @@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); if (req->r_con_filling_msg) { - dout("%s revoking pages %p from con %p\n", __func__, - req->r_pages, req->r_con_filling_msg); + dout("%s revoking msg %p from con %p\n", __func__, + req->r_reply, req->r_con_filling_msg); ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); + req->r_con_filling_msg = NULL; } if (req->r_reply) ceph_msg_put(req->r_reply); if (req->r_own_pages) ceph_release_page_vector(req->r_pages, req->r_num_pages); -#ifdef CONFIG_BLOCK - if (req->r_bio) - bio_put(req->r_bio); -#endif ceph_put_snap_context(req->r_snapc); - if (req->r_trail) { - ceph_pagelist_release(req->r_trail); - kfree(req->r_trail); - } + ceph_pagelist_release(&req->r_trail); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); else @@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref) } EXPORT_SYMBOL(ceph_osdc_release_request); -static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) -{ - int i = 0; - - if (needs_trail) - *needs_trail = 0; - while (ops[i].op) { - if (needs_trail && op_needs_trail(ops[i].op)) - *needs_trail = 1; - i++; - } - - return i; -} - struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, - int flags, struct ceph_snap_context *snapc, - struct ceph_osd_req_op *ops, + unsigned int num_ops, bool use_mempool, - gfp_t gfp_flags, - struct page **pages, - struct bio *bio) + gfp_t gfp_flags) { struct ceph_osd_request *req; struct ceph_msg *msg; - int needs_trail; - int num_op = get_num_ops(ops, &needs_trail); - size_t msg_size = sizeof(struct ceph_osd_request_head); - - msg_size += num_op*sizeof(struct ceph_osd_op); + size_t msg_size; + + msg_size = 4 + 4 + 8 + 8 + 4+8; + msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ + msg_size += 1 + 8 + 4 + 4; /* pg_t */ + msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); + msg_size += 8; /* snapid */ + msg_size += 8; /* snap_seq */ + msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ + msg_size += 4; if (use_mempool) { req = mempool_alloc(osdc->req_mempool, gfp_flags); @@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); - req->r_flags = flags; - - WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); - /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } req->r_reply = msg; - /* allocate space for the trailing data */ - if (needs_trail) { - req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); - if (!req->r_trail) { - ceph_osdc_put_request(req); - return NULL; - } - ceph_pagelist_init(req->r_trail); - } + ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ - msg_size += MAX_OBJ_NAME_SIZE; - if (snapc) - msg_size += sizeof(u64) * snapc->num_snaps; if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else @@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, memset(msg->front.iov_base, 0, msg->front.iov_len); req->r_request = msg; - req->r_pages = pages; -#ifdef CONFIG_BLOCK - if (bio) { - req->r_bio = bio; - bio_get(req->r_bio); - } -#endif return req; } @@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->op = cpu_to_le16(src->op); switch (src->op) { + case CEPH_OSD_OP_STAT: + break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: dst->extent.offset = @@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); break; - - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - BUG_ON(!req->r_trail); - - dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); - dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); - dst->xattr.cmp_op = src->xattr.cmp_op; - dst->xattr.cmp_mode = src->xattr.cmp_mode; - ceph_pagelist_append(req->r_trail, src->xattr.name, - src->xattr.name_len); - ceph_pagelist_append(req->r_trail, src->xattr.val, - src->xattr.value_len); - break; case CEPH_OSD_OP_CALL: - BUG_ON(!req->r_trail); - dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); - ceph_pagelist_append(req->r_trail, src->cls.class_name, + ceph_pagelist_append(&req->r_trail, src->cls.class_name, src->cls.class_len); - ceph_pagelist_append(req->r_trail, src->cls.method_name, + ceph_pagelist_append(&req->r_trail, src->cls.method_name, src->cls.method_len); - ceph_pagelist_append(req->r_trail, src->cls.indata, + ceph_pagelist_append(&req->r_trail, src->cls.indata, src->cls.indata_len); break; - case CEPH_OSD_OP_ROLLBACK: - dst->snap.snapid = cpu_to_le64(src->snap.snapid); - break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY: - { - __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); - __le32 timeout = cpu_to_le32(src->watch.timeout); - - BUG_ON(!req->r_trail); - - ceph_pagelist_append(req->r_trail, - &prot_ver, sizeof(prot_ver)); - ceph_pagelist_append(req->r_trail, - &timeout, sizeof(timeout)); - } case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); @@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req, pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); break; + case CEPH_OSD_OP_MAPEXT: + case CEPH_OSD_OP_MASKTRUNC: + case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_NOTIFY: + case CEPH_OSD_OP_ASSERT_VER: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_TRUNCATE: + case CEPH_OSD_OP_ZERO: + case CEPH_OSD_OP_DELETE: + case CEPH_OSD_OP_APPEND: + case CEPH_OSD_OP_SETTRUNC: + case CEPH_OSD_OP_TRIMTRUNC: + case CEPH_OSD_OP_TMAPUP: + case CEPH_OSD_OP_TMAPPUT: + case CEPH_OSD_OP_TMAPGET: + case CEPH_OSD_OP_CREATE: + case CEPH_OSD_OP_ROLLBACK: + case CEPH_OSD_OP_OMAPGETKEYS: + case CEPH_OSD_OP_OMAPGETVALS: + case CEPH_OSD_OP_OMAPGETHEADER: + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: + case CEPH_OSD_OP_MODE_RD: + case CEPH_OSD_OP_OMAPSETVALS: + case CEPH_OSD_OP_OMAPSETHEADER: + case CEPH_OSD_OP_OMAPCLEAR: + case CEPH_OSD_OP_OMAPRMKEYS: + case CEPH_OSD_OP_OMAP_CMP: + case CEPH_OSD_OP_CLONERANGE: + case CEPH_OSD_OP_ASSERT_SRC_VERSION: + case CEPH_OSD_OP_SRC_CMPXATTR: + case CEPH_OSD_OP_GETXATTR: + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_SETXATTRS: + case CEPH_OSD_OP_RESETXATTRS: + case CEPH_OSD_OP_RMXATTR: + case CEPH_OSD_OP_PULL: + case CEPH_OSD_OP_PUSH: + case CEPH_OSD_OP_BALANCEREADS: + case CEPH_OSD_OP_UNBALANCEREADS: + case CEPH_OSD_OP_SCRUB: + case CEPH_OSD_OP_SCRUB_RESERVE: + case CEPH_OSD_OP_SCRUB_UNRESERVE: + case CEPH_OSD_OP_SCRUB_STOP: + case CEPH_OSD_OP_SCRUB_MAP: + case CEPH_OSD_OP_WRLOCK: + case CEPH_OSD_OP_WRUNLOCK: + case CEPH_OSD_OP_RDLOCK: + case CEPH_OSD_OP_RDUNLOCK: + case CEPH_OSD_OP_UPLOCK: + case CEPH_OSD_OP_DNLOCK: + case CEPH_OSD_OP_PGLS: + case CEPH_OSD_OP_PGLS_FILTER: + pr_err("unsupported osd opcode %s\n", + ceph_osd_op_name(dst->op)); + WARN_ON(1); + break; } dst->payload_len = cpu_to_le32(src->payload_len); } @@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req, * */ void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 *plen, + u64 off, u64 len, unsigned int num_ops, struct ceph_osd_req_op *src_ops, - struct ceph_snap_context *snapc, - struct timespec *mtime, - const char *oid, - int oid_len) + struct ceph_snap_context *snapc, u64 snap_id, + struct timespec *mtime) { struct ceph_msg *msg = req->r_request; - struct ceph_osd_request_head *head; struct ceph_osd_req_op *src_op; - struct ceph_osd_op *op; void *p; - int num_op = get_num_ops(src_ops, NULL); - size_t msg_size = sizeof(*head) + num_op*sizeof(*op); + size_t msg_size; int flags = req->r_flags; - u64 data_len = 0; + u64 data_len; int i; - head = msg->front.iov_base; - op = (void *)(head + 1); - p = (void *)(op + num_op); - + req->r_num_ops = num_ops; + req->r_snapid = snap_id; req->r_snapc = ceph_get_snap_context(snapc); - head->client_inc = cpu_to_le32(1); /* always, for now. */ - head->flags = cpu_to_le32(flags); - if (flags & CEPH_OSD_FLAG_WRITE) - ceph_encode_timespec(&head->mtime, mtime); - head->num_ops = cpu_to_le16(num_op); - - - /* fill in oid */ - head->object_len = cpu_to_le32(oid_len); - memcpy(p, oid, oid_len); - p += oid_len; + /* encode request */ + msg->hdr.version = cpu_to_le16(4); + p = msg->front.iov_base; + ceph_encode_32(&p, 1); /* client_inc is always 1 */ + req->r_request_osdmap_epoch = p; + p += 4; + req->r_request_flags = p; + p += 4; + if (req->r_flags & CEPH_OSD_FLAG_WRITE) + ceph_encode_timespec(p, mtime); + p += sizeof(struct ceph_timespec); + req->r_request_reassert_version = p; + p += sizeof(struct ceph_eversion); /* will get filled in */ + + /* oloc */ + ceph_encode_8(&p, 4); + ceph_encode_8(&p, 4); + ceph_encode_32(&p, 8 + 4 + 4); + req->r_request_pool = p; + p += 8; + ceph_encode_32(&p, -1); /* preferred */ + ceph_encode_32(&p, 0); /* key len */ + + ceph_encode_8(&p, 1); + req->r_request_pgid = p; + p += 8 + 4; + ceph_encode_32(&p, -1); /* preferred */ + + /* oid */ + ceph_encode_32(&p, req->r_oid_len); + memcpy(p, req->r_oid, req->r_oid_len); + dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); + p += req->r_oid_len; + + /* ops */ + ceph_encode_16(&p, num_ops); src_op = src_ops; - while (src_op->op) { - osd_req_encode_op(req, op, src_op); - src_op++; - op++; + req->r_request_ops = p; + for (i = 0; i < num_ops; i++, src_op++) { + osd_req_encode_op(req, p, src_op); + p += sizeof(struct ceph_osd_op); } - if (req->r_trail) - data_len += req->r_trail->length; - - if (snapc) { - head->snap_seq = cpu_to_le64(snapc->seq); - head->num_snaps = cpu_to_le32(snapc->num_snaps); + /* snaps */ + ceph_encode_64(&p, req->r_snapid); + ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); + ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); + if (req->r_snapc) { for (i = 0; i < snapc->num_snaps; i++) { - put_unaligned_le64(snapc->snaps[i], p); - p += sizeof(u64); + ceph_encode_64(&p, req->r_snapc->snaps[i]); } } + req->r_request_attempts = p; + p += 4; + + data_len = req->r_trail.length; if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); - req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); - } else if (data_len) { - req->r_request->hdr.data_off = 0; - req->r_request->hdr.data_len = cpu_to_le32(data_len); + data_len += len; } - + req->r_request->hdr.data_len = cpu_to_le32(data_len); req->r_request->page_alignment = req->r_page_alignment; BUG_ON(p > msg->front.iov_base + msg->front.iov_len); msg_size = p - msg->front.iov_base; msg->front.iov_len = msg_size; msg->hdr.front_len = cpu_to_le32(msg_size); + + dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, + num_ops); return; } EXPORT_SYMBOL(ceph_osdc_build_request); @@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, int num_reply, + bool use_mempool, int page_align) { - struct ceph_osd_req_op ops[3]; + struct ceph_osd_req_op ops[2]; struct ceph_osd_request *req; + unsigned int num_op = 1; int r; + memset(&ops, 0, sizeof ops); + ops[0].op = opcode; ops[0].extent.truncate_seq = truncate_seq; ops[0].extent.truncate_size = truncate_size; - ops[0].payload_len = 0; if (do_sync) { ops[1].op = CEPH_OSD_OP_STARTSYNC; - ops[1].payload_len = 0; - ops[2].op = 0; - } else - ops[1].op = 0; - - req = ceph_osdc_alloc_request(osdc, flags, - snapc, ops, - use_mempool, - GFP_NOFS, NULL, NULL); + num_op++; + } + + req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, + GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); + req->r_flags = flags; /* calculate max write size */ - r = calc_layout(osdc, vino, layout, off, plen, req, ops); + r = calc_layout(vino, layout, off, plen, req, ops); if (r < 0) return ERR_PTR(r); req->r_file_layout = *layout; /* keep a copy */ @@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_num_pages = calc_pages_for(page_align, *plen); req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, plen, ops, - snapc, - mtime, - req->r_oid, req->r_oid_len); + ceph_osdc_build_request(req, off, *plen, num_op, ops, + snapc, vino.snap, mtime); return req; } @@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con) down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); __kick_osd_requests(osdc, osd); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc) */ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - struct ceph_osd_request *req; - int ret = 0; + struct ceph_entity_addr *peer_addr; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); if (list_empty(&osd->o_requests) && list_empty(&osd->o_linger_requests)) { __remove_osd(osdc, osd); - ret = -ENODEV; - } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], - &osd->o_con.peer_addr, - sizeof(osd->o_con.peer_addr)) == 0 && - !ceph_con_opened(&osd->o_con)) { + + return -ENODEV; + } + + peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; + if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && + !ceph_con_opened(&osd->o_con)) { + struct ceph_osd_request *req; + dout(" osd addr hasn't changed and connection never opened," " letting msgr retry"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; - ret = -EAGAIN; - } else { - ceph_con_close(&osd->o_con); - ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, - &osdc->osdmap->osd_addr[osd->o_osd]); - osd->o_incarnation++; + + return -EAGAIN; } - return ret; + + ceph_con_close(&osd->o_con); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); + osd->o_incarnation++; + + return 0; } static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) @@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger); static int __map_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, int force_resend) { - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, + err = ceph_calc_object_layout(&pgid, req->r_oid, &req->r_file_layout, osdc->osdmap); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; } - pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); @@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc, (req->r_osd == NULL && o == -1)) return 0; /* no change */ - dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", - req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, + dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", + req->r_tid, pgid.pool, pgid.seed, o, req->r_osd ? req->r_osd->o_osd : -1); /* record full pg acting set */ @@ -1041,15 +1024,22 @@ out: static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - struct ceph_osd_request_head *reqhead; - - dout("send_request %p tid %llu to osd%d flags %d\n", - req, req->r_tid, req->r_osd->o_osd, req->r_flags); + void *p; - reqhead = req->r_request->front.iov_base; - reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); - reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ - reqhead->reassert_version = req->r_reassert_version; + dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", + req, req->r_tid, req->r_osd->o_osd, req->r_flags, + (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); + + /* fill in message content that changes each time we send it */ + put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); + put_unaligned_le32(req->r_flags, req->r_request_flags); + put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + p = req->r_request_pgid; + ceph_encode_64(&p, req->r_pgid.pool); + ceph_encode_32(&p, req->r_pgid.seed); + put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ + memcpy(req->r_request_reassert_version, &req->r_reassert_version, + sizeof(req->r_reassert_version)); req->r_stamp = jiffies; list_move_tail(&req->r_req_lru_item, &osdc->req_lru); @@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc, /* * Send any requests in the queue (req_unsent). */ -static void send_queued(struct ceph_osd_client *osdc) +static void __send_queued(struct ceph_osd_client *osdc) { struct ceph_osd_request *req, *tmp; - dout("send_queued\n"); - mutex_lock(&osdc->request_mutex); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + dout("__send_queued\n"); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) __send_request(osdc, req); - } - mutex_unlock(&osdc->request_mutex); } /* @@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work) } __schedule_osd_timeout(osdc); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req) complete_all(&req->r_safe_completion); /* fsync waiter */ } +static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid) +{ + __u8 v; + + ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad); + v = ceph_decode_8(p); + if (v > 1) { + pr_warning("do not understand pg encoding %d > 1", v); + return -EINVAL; + } + pgid->pool = ceph_decode_64(p); + pgid->seed = ceph_decode_32(p); + *p += 4; + return 0; + +bad: + pr_warning("incomplete pg encoding"); + return -EINVAL; +} + /* * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. @@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req) static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_connection *con) { - struct ceph_osd_reply_head *rhead = msg->front.iov_base; + void *p, *end; struct ceph_osd_request *req; u64 tid; - int numops, object_len, flags; + int object_len; + int numops, payload_len, flags; s32 result; + s32 retry_attempt; + struct ceph_pg pg; + int err; + u32 reassert_epoch; + u64 reassert_version; + u32 osdmap_epoch; + int i; tid = le64_to_cpu(msg->hdr.tid); - if (msg->front.iov_len < sizeof(*rhead)) - goto bad; - numops = le32_to_cpu(rhead->num_ops); - object_len = le32_to_cpu(rhead->object_len); - result = le32_to_cpu(rhead->result); - if (msg->front.iov_len != sizeof(*rhead) + object_len + - numops * sizeof(struct ceph_osd_op)) + dout("handle_reply %p tid %llu\n", msg, tid); + + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_decode_need(&p, end, 4, bad); + object_len = ceph_decode_32(&p); + ceph_decode_need(&p, end, object_len, bad); + p += object_len; + + err = __decode_pgid(&p, end, &pg); + if (err) goto bad; - dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); + + ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); + flags = ceph_decode_64(&p); + result = ceph_decode_32(&p); + reassert_epoch = ceph_decode_32(&p); + reassert_version = ceph_decode_64(&p); + osdmap_epoch = ceph_decode_32(&p); + /* lookup */ mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); @@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, return; } ceph_osdc_get_request(req); - flags = le32_to_cpu(rhead->flags); + + dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, + req, result); + + ceph_decode_need(&p, end, 4, bad); + numops = ceph_decode_32(&p); + if (numops > CEPH_OSD_MAX_OP) + goto bad_put; + if (numops != req->r_num_ops) + goto bad_put; + payload_len = 0; + ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); + for (i = 0; i < numops; i++) { + struct ceph_osd_op *op = p; + int len; + + len = le32_to_cpu(op->payload_len); + req->r_reply_op_len[i] = len; + dout(" op %d has %d bytes\n", i, len); + payload_len += len; + p += sizeof(*op); + } + if (payload_len != le32_to_cpu(msg->hdr.data_len)) { + pr_warning("sum of op payload lens %d != data_len %d", + payload_len, le32_to_cpu(msg->hdr.data_len)); + goto bad_put; + } + + ceph_decode_need(&p, end, 4 + numops * 4, bad); + retry_attempt = ceph_decode_32(&p); + for (i = 0; i < numops; i++) + req->r_reply_op_result[i] = ceph_decode_32(&p); /* * if this connection filled our message, drop our reference now, to @@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, if (!req->r_got_reply) { unsigned int bytes; - req->r_result = le32_to_cpu(rhead->result); + req->r_result = result; bytes = le32_to_cpu(msg->hdr.data_len); dout("handle_reply result %d bytes %d\n", req->r_result, bytes); @@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, req->r_result = bytes; /* in case this is a write and we need to replay, */ - req->r_reassert_version = rhead->reassert_version; + req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); + req->r_reassert_version.version = cpu_to_le64(reassert_version); req->r_got_reply = 1; } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { @@ -1242,10 +1301,11 @@ done: ceph_osdc_put_request(req); return; +bad_put: + ceph_osdc_put_request(req); bad: - pr_err("corrupt osd_op_reply got %d %d expected %d\n", - (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), - (int)sizeof(*rhead)); + pr_err("corrupt osd_op_reply got %d %d\n", + (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); ceph_msg_dump(msg); } @@ -1270,7 +1330,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) * Requeue requests whose mapping to an OSD has changed. If requests map to * no osd, request a new map. * - * Caller should hold map_sem for read and request_mutex. + * Caller should hold map_sem for read. */ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) { @@ -1284,6 +1344,24 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); p = rb_next(p); + + /* + * For linger requests that have not yet been + * registered, move them to the linger list; they'll + * be sent to the osd in the loop below. Unregister + * the request before re-registering it as a linger + * request to ensure the __map_request() below + * will decide it needs to be sent. + */ + if (req->r_linger && list_empty(&req->r_linger_item)) { + dout("%p tid %llu restart on osd%d\n", + req, req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); + __unregister_request(osdc, req); + __register_linger_request(osdc, req); + continue; + } + err = __map_request(osdc, req, force_resend); if (err < 0) continue; /* error */ @@ -1298,17 +1376,6 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) req->r_flags |= CEPH_OSD_FLAG_RETRY; } } - if (req->r_linger && list_empty(&req->r_linger_item)) { - /* - * register as a linger so that we will - * re-submit below and get a new tid - */ - dout("%p tid %llu restart on osd%d\n", - req, req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1); - __register_linger_request(osdc, req); - __unregister_request(osdc, req); - } } list_for_each_entry_safe(req, nreq, &osdc->req_linger, @@ -1316,6 +1383,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); err = __map_request(osdc, req, force_resend); + dout("__map_request returned %d\n", err); if (err == 0) continue; /* no change and no osd was specified */ if (err < 0) @@ -1337,6 +1405,7 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("%d requests for down osds, need new map\n", needmap); ceph_monc_request_next_osdmap(&osdc->client->monc); } + reset_changed_osds(osdc); } @@ -1393,7 +1462,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) osdc->osdmap = newmap; } kick_requests(osdc, 0); - reset_changed_osds(osdc); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1454,7 +1522,9 @@ done: if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) ceph_monc_request_next_osdmap(&osdc->client->monc); - send_queued(osdc); + mutex_lock(&osdc->request_mutex); + __send_queued(osdc); + mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); wake_up_all(&osdc->client->auth_wq); return; @@ -1548,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event) int ceph_osdc_create_event(struct ceph_osd_client *osdc, void (*event_cb)(u64, u64, u8, void *), - int one_shot, void *data, - struct ceph_osd_event **pevent) + void *data, struct ceph_osd_event **pevent) { struct ceph_osd_event *event; @@ -1559,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc, dout("create_event %p\n", event); event->cb = event_cb; - event->one_shot = one_shot; + event->one_shot = 0; event->data = data; event->osdc = osdc; INIT_LIST_HEAD(&event->osd_node); RB_CLEAR_NODE(&event->node); kref_init(&event->kref); /* one ref for us */ kref_get(&event->kref); /* one ref for the caller */ - init_completion(&event->completion); spin_lock(&osdc->event_lock); event->cookie = ++osdc->event_count; @@ -1602,7 +1670,6 @@ static void do_event_work(struct work_struct *work) dout("do_event_work completing %p\n", event); event->cb(ver, notify_id, opcode, event->data); - complete(&event->completion); dout("do_event_work completed %p\n", event); ceph_osdc_put_event(event); kfree(event_work); @@ -1612,7 +1679,8 @@ static void do_event_work(struct work_struct *work) /* * Process osd watch notifications */ -void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) +static void handle_watch_notify(struct ceph_osd_client *osdc, + struct ceph_msg *msg) { void *p, *end; u8 proto_ver; @@ -1633,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) spin_lock(&osdc->event_lock); event = __find_event(osdc, cookie); if (event) { + BUG_ON(event->one_shot); get_event(event); - if (event->one_shot) - __remove_event(event); } spin_unlock(&osdc->event_lock); dout("handle_watch_notify cookie %lld ver %lld event %p\n", @@ -1660,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) return; done_err: - complete(&event->completion); ceph_osdc_put_event(event); return; @@ -1669,21 +1735,6 @@ bad: return; } -int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) -{ - int err; - - dout("wait_event %p\n", event); - err = wait_for_completion_interruptible_timeout(&event->completion, - timeout * HZ); - ceph_osdc_put_event(event); - if (err > 0) - err = 0; - dout("wait_event %p returns %d\n", event, err); - return err; -} -EXPORT_SYMBOL(ceph_osdc_wait_event); - /* * Register request, send initial attempt. */ @@ -1698,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, #ifdef CONFIG_BLOCK req->r_request->bio = req->r_bio; #endif - req->r_request->trail = req->r_trail; + req->r_request->trail = &req->r_trail; register_request(osdc, req); @@ -1857,7 +1908,6 @@ out_mempool: out: return err; } -EXPORT_SYMBOL(ceph_osdc_init); void ceph_osdc_stop(struct ceph_osd_client *osdc) { @@ -1874,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } -EXPORT_SYMBOL(ceph_osdc_stop); /* * Read some contiguous pages. If we cross a stripe boundary, shorten @@ -1894,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, req = ceph_osdc_new_request(osdc, layout, vino, off, plen, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, - false, 1, page_align); + false, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1923,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - struct page **pages, int num_pages, - int flags, int do_sync, bool nofail) + struct page **pages, int num_pages) { struct ceph_osd_request *req; int rc = 0; @@ -1933,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); req = ceph_osdc_new_request(osdc, layout, vino, off, &len, CEPH_OSD_OP_WRITE, - flags | CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE, - snapc, do_sync, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + snapc, 0, truncate_seq, truncate_size, mtime, - nofail, 1, page_align); + true, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1946,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); - rc = ceph_osdc_start_request(osdc, req, nofail); + rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); @@ -2039,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (data_len > 0) { int want = calc_pages_for(req->r_page_alignment, data_len); - if (unlikely(req->r_num_pages < want)) { + if (req->r_pages && unlikely(req->r_num_pages < want)) { pr_warning("tid %lld reply has %d bytes %d pages, we" " had only %d pages ready\n", tid, data_len, want, req->r_num_pages); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index de73214b5d2..69bc4bf89e3 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -13,26 +13,18 @@ char *ceph_osdmap_state_str(char *str, int len, int state) { - int flag = 0; - if (!len) - goto done; - - *str = '\0'; - if (state) { - if (state & CEPH_OSD_EXISTS) { - snprintf(str, len, "exists"); - flag = 1; - } - if (state & CEPH_OSD_UP) { - snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""), - "up"); - flag = 1; - } - } else { + return str; + + if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) + snprintf(str, len, "exists, up"); + else if (state & CEPH_OSD_EXISTS) + snprintf(str, len, "exists"); + else if (state & CEPH_OSD_UP) + snprintf(str, len, "up"); + else snprintf(str, len, "doesn't exist"); - } -done: + return str; } @@ -53,13 +45,8 @@ static int calc_bits_of(unsigned int t) */ static void calc_pg_masks(struct ceph_pg_pool_info *pi) { - pi->pg_num_mask = (1 << calc_bits_of(le32_to_cpu(pi->v.pg_num)-1)) - 1; - pi->pgp_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.pgp_num)-1)) - 1; - pi->lpg_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.lpg_num)-1)) - 1; - pi->lpgp_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.lpgp_num)-1)) - 1; + pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; + pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; } /* @@ -170,6 +157,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->choose_local_tries = 2; c->choose_local_fallback_tries = 5; c->choose_total_tries = 19; + c->chooseleaf_descend_once = 0; ceph_decode_need(p, end, 4*sizeof(u32), bad); magic = ceph_decode_32(p); @@ -336,6 +324,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) dout("crush decode tunable choose_total_tries = %d", c->choose_total_tries); + ceph_decode_need(p, end, sizeof(u32), done); + c->chooseleaf_descend_once = ceph_decode_32(p); + dout("crush decode tunable chooseleaf_descend_once = %d", + c->chooseleaf_descend_once); + done: dout("crush_decode success\n"); return c; @@ -354,12 +347,13 @@ bad: */ static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) { - u64 a = *(u64 *)&l; - u64 b = *(u64 *)&r; - - if (a < b) + if (l.pool < r.pool) + return -1; + if (l.pool > r.pool) + return 1; + if (l.seed < r.seed) return -1; - if (a > b) + if (l.seed > r.seed) return 1; return 0; } @@ -405,8 +399,8 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, } else if (c > 0) { n = n->rb_right; } else { - dout("__lookup_pg_mapping %llx got %p\n", - *(u64 *)&pgid, pg); + dout("__lookup_pg_mapping %lld.%x got %p\n", + pgid.pool, pgid.seed, pg); return pg; } } @@ -418,12 +412,13 @@ static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid) struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); if (pg) { - dout("__remove_pg_mapping %llx %p\n", *(u64 *)&pgid, pg); + dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed, + pg); rb_erase(&pg->node, root); kfree(pg); return 0; } - dout("__remove_pg_mapping %llx dne\n", *(u64 *)&pgid); + dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed); return -ENOENT; } @@ -452,7 +447,7 @@ static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new) return 0; } -static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) +static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id) { struct ceph_pg_pool_info *pi; struct rb_node *n = root->rb_node; @@ -508,24 +503,57 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) { - unsigned int n, m; + u8 ev, cv; + unsigned len, num; + void *pool_end; + + ceph_decode_need(p, end, 2 + 4, bad); + ev = ceph_decode_8(p); /* encoding version */ + cv = ceph_decode_8(p); /* compat version */ + if (ev < 5) { + pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); + return -EINVAL; + } + if (cv > 7) { + pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); + return -EINVAL; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, bad); + pool_end = *p + len; - ceph_decode_copy(p, &pi->v, sizeof(pi->v)); - calc_pg_masks(pi); + pi->type = ceph_decode_8(p); + pi->size = ceph_decode_8(p); + pi->crush_ruleset = ceph_decode_8(p); + pi->object_hash = ceph_decode_8(p); + + pi->pg_num = ceph_decode_32(p); + pi->pgp_num = ceph_decode_32(p); + + *p += 4 + 4; /* skip lpg* */ + *p += 4; /* skip last_change */ + *p += 8 + 4; /* skip snap_seq, snap_epoch */ - /* num_snaps * snap_info_t */ - n = le32_to_cpu(pi->v.num_snaps); - while (n--) { - ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) + - sizeof(struct ceph_timespec), bad); - *p += sizeof(u64) + /* key */ - 1 + sizeof(u64) + /* u8, snapid */ - sizeof(struct ceph_timespec); - m = ceph_decode_32(p); /* snap name */ - *p += m; + /* skip snaps */ + num = ceph_decode_32(p); + while (num--) { + *p += 8; /* snapid key */ + *p += 1 + 1; /* versions */ + len = ceph_decode_32(p); + *p += len; } - *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; + /* skip removed snaps */ + num = ceph_decode_32(p); + *p += num * (8 + 8); + + *p += 8; /* skip auid */ + pi->flags = ceph_decode_64(p); + + /* ignore the rest */ + + *p = pool_end; + calc_pg_masks(pi); return 0; bad: @@ -535,14 +563,15 @@ bad: static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) { struct ceph_pg_pool_info *pi; - u32 num, len, pool; + u32 num, len; + u64 pool; ceph_decode_32_safe(p, end, num, bad); dout(" %d pool names\n", num); while (num--) { - ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, bad); ceph_decode_32_safe(p, end, len, bad); - dout(" pool %d len %d\n", pool, len); + dout(" pool %llu len %d\n", pool, len); ceph_decode_need(p, end, len, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) { @@ -633,7 +662,6 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) struct ceph_osdmap *map; u16 version; u32 len, max, i; - u8 ev; int err = -EINVAL; void *start = *p; struct ceph_pg_pool_info *pi; @@ -646,9 +674,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) map->pg_temp = RB_ROOT; ceph_decode_16_safe(p, end, version, bad); - if (version > CEPH_OSDMAP_VERSION) { - pr_warning("got unknown v %d > %d of osdmap\n", version, - CEPH_OSDMAP_VERSION); + if (version > 6) { + pr_warning("got unknown v %d > 6 of osdmap\n", version); + goto bad; + } + if (version < 6) { + pr_warning("got old v %d < 6 of osdmap\n", version); goto bad; } @@ -660,20 +691,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, max, bad); while (max--) { - ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); + ceph_decode_need(p, end, 8 + 2, bad); err = -ENOMEM; pi = kzalloc(sizeof(*pi), GFP_NOFS); if (!pi) goto bad; - pi->id = ceph_decode_32(p); - err = -EINVAL; - ev = ceph_decode_8(p); /* encoding version */ - if (ev > CEPH_PG_POOL_VERSION) { - pr_warning("got unknown v %d > %d of ceph_pg_pool\n", - ev, CEPH_PG_POOL_VERSION); - kfree(pi); - goto bad; - } + pi->id = ceph_decode_64(p); err = __decode_pool(p, end, pi); if (err < 0) { kfree(pi); @@ -682,12 +705,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) __insert_pg_pool(&map->pg_pools, pi); } - if (version >= 5) { - err = __decode_pool_names(p, end, map); - if (err < 0) { - dout("fail to decode pool names"); - goto bad; - } + err = __decode_pool_names(p, end, map); + if (err < 0) { + dout("fail to decode pool names"); + goto bad; } ceph_decode_32_safe(p, end, map->pool_max, bad); @@ -724,10 +745,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) for (i = 0; i < len; i++) { int n, j; struct ceph_pg pgid; + struct ceph_pg_v1 pgid_v1; struct ceph_pg_mapping *pg; ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); - ceph_decode_copy(p, &pgid, sizeof(pgid)); + ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1)); + pgid.pool = le32_to_cpu(pgid_v1.pool); + pgid.seed = le16_to_cpu(pgid_v1.ps); n = ceph_decode_32(p); err = -EINVAL; if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) @@ -745,7 +769,8 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) err = __insert_pg_mapping(pg, &map->pg_temp); if (err) goto bad; - dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, len); + dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, + len); } /* crush */ @@ -784,16 +809,17 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, struct ceph_fsid fsid; u32 epoch = 0; struct ceph_timespec modified; - u32 len, pool; - __s32 new_pool_max, new_flags, max; + s32 len; + u64 pool; + __s64 new_pool_max; + __s32 new_flags, max; void *start = *p; int err = -EINVAL; u16 version; ceph_decode_16_safe(p, end, version, bad); - if (version > CEPH_OSDMAP_INC_VERSION) { - pr_warning("got unknown v %d > %d of inc osdmap\n", version, - CEPH_OSDMAP_INC_VERSION); + if (version > 6) { + pr_warning("got unknown v %d > %d of inc osdmap\n", version, 6); goto bad; } @@ -803,7 +829,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, epoch = ceph_decode_32(p); BUG_ON(epoch != map->epoch+1); ceph_decode_copy(p, &modified, sizeof(modified)); - new_pool_max = ceph_decode_32(p); + new_pool_max = ceph_decode_64(p); new_flags = ceph_decode_32(p); /* full map? */ @@ -853,18 +879,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, /* new_pool */ ceph_decode_32_safe(p, end, len, bad); while (len--) { - __u8 ev; struct ceph_pg_pool_info *pi; - ceph_decode_32_safe(p, end, pool, bad); - ceph_decode_need(p, end, 1 + sizeof(pi->v), bad); - ev = ceph_decode_8(p); /* encoding version */ - if (ev > CEPH_PG_POOL_VERSION) { - pr_warning("got unknown v %d > %d of ceph_pg_pool\n", - ev, CEPH_PG_POOL_VERSION); - err = -EINVAL; - goto bad; - } + ceph_decode_64_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (!pi) { pi = kzalloc(sizeof(*pi), GFP_NOFS); @@ -890,7 +907,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_pool_info *pi; - ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) __remove_pg_pool(&map->pg_pools, pi); @@ -946,10 +963,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_mapping *pg; int j; + struct ceph_pg_v1 pgid_v1; struct ceph_pg pgid; u32 pglen; ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); - ceph_decode_copy(p, &pgid, sizeof(pgid)); + ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1)); + pgid.pool = le32_to_cpu(pgid_v1.pool); + pgid.seed = le16_to_cpu(pgid_v1.ps); pglen = ceph_decode_32(p); if (pglen) { @@ -975,8 +995,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, kfree(pg); goto bad; } - dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, - pglen); + dout(" added pg_temp %lld.%x len %d\n", pgid.pool, + pgid.seed, pglen); } else { /* remove */ __remove_pg_mapping(&map->pg_temp, pgid); @@ -1010,7 +1030,7 @@ bad: * pass a stride back to the caller. */ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 *plen, + u64 off, u64 len, u64 *ono, u64 *oxoff, u64 *oxlen) { @@ -1021,7 +1041,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u32 su_per_object; u64 t, su_offset; - dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, + dout("mapping %llu~%llu osize %u fl_su %u\n", off, len, osize, su); if (su == 0 || sc == 0) goto invalid; @@ -1054,11 +1074,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, /* * Calculate the length of the extent being written to the selected - * object. This is the minimum of the full length requested (plen) or + * object. This is the minimum of the full length requested (len) or * the remainder of the current stripe being written to. */ - *oxlen = min_t(u64, *plen, su - su_offset); - *plen = *oxlen; + *oxlen = min_t(u64, len, su - su_offset); dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); return 0; @@ -1076,33 +1095,24 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping); * calculate an object layout (i.e. pgid) from an oid, * file_layout, and osdmap */ -int ceph_calc_object_layout(struct ceph_object_layout *ol, +int ceph_calc_object_layout(struct ceph_pg *pg, const char *oid, struct ceph_file_layout *fl, struct ceph_osdmap *osdmap) { unsigned int num, num_mask; - struct ceph_pg pgid; - int poolid = le32_to_cpu(fl->fl_pg_pool); struct ceph_pg_pool_info *pool; - unsigned int ps; BUG_ON(!osdmap); - - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + pg->pool = le32_to_cpu(fl->fl_pg_pool); + pool = __lookup_pg_pool(&osdmap->pg_pools, pg->pool); if (!pool) return -EIO; - ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); - num = le32_to_cpu(pool->v.pg_num); + pg->seed = ceph_str_hash(pool->object_hash, oid, strlen(oid)); + num = pool->pg_num; num_mask = pool->pg_num_mask; - pgid.ps = cpu_to_le16(ps); - pgid.preferred = cpu_to_le16(-1); - pgid.pool = fl->fl_pg_pool; - dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); - - ol->ol_pgid = pgid; - ol->ol_stripe_unit = fl->fl_object_stripe_unit; + dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pg->pool, pg->seed); return 0; } EXPORT_SYMBOL(ceph_calc_object_layout); @@ -1117,19 +1127,16 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_mapping *pg; struct ceph_pg_pool_info *pool; int ruleno; - unsigned int poolid, ps, pps, t, r; - - poolid = le32_to_cpu(pgid.pool); - ps = le16_to_cpu(pgid.ps); + int r; + u32 pps; - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); if (!pool) return NULL; /* pg_temp? */ - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), - pool->pgp_num_mask); - pgid.ps = cpu_to_le16(t); + pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, + pool->pgp_num_mask); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { *num = pg->len; @@ -1137,26 +1144,39 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* crush */ - ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, - pool->v.type, pool->v.size); + ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, + pool->type, pool->size); if (ruleno < 0) { - pr_err("no crush rule pool %d ruleset %d type %d size %d\n", - poolid, pool->v.crush_ruleset, pool->v.type, - pool->v.size); + pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", + pgid.pool, pool->crush_ruleset, pool->type, + pool->size); return NULL; } - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.pgp_num), - pool->pgp_num_mask); - pps += poolid; + if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { + /* hash pool id and seed sothat pool PGs do not overlap */ + pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, + ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask), + pgid.pool); + } else { + /* + * legacy ehavior: add ps and pool together. this is + * not a great approach because the PGs from each pool + * will overlap on top of each other: 0.5 == 1.4 == + * 2.3 == ... + */ + pps = ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask) + + (unsigned)pgid.pool; + } r = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->v.size, *num), + min_t(int, pool->size, *num), osdmap->osd_weight); if (r < 0) { - pr_err("error %d from crush rule: pool %d ruleset %d type %d" - " size %d\n", r, poolid, pool->v.crush_ruleset, - pool->v.type, pool->v.size); + pr_err("error %d from crush rule: pool %lld ruleset %d type %d" + " size %d\n", r, pgid.pool, pool->crush_ruleset, + pool->type, pool->size); return NULL; } *num = r; diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index cd9c21df87d..815a2249cfa 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -12,7 +12,7 @@ /* * build a vector of user pages */ -struct page **ceph_get_direct_page_vector(const char __user *data, +struct page **ceph_get_direct_page_vector(const void __user *data, int num_pages, bool write_page) { struct page **pages; @@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector); * copy user data into a page vector */ int ceph_copy_user_to_page_vector(struct page **pages, - const char __user *data, + const void __user *data, loff_t off, size_t len) { int i = 0; @@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages, } EXPORT_SYMBOL(ceph_copy_user_to_page_vector); -int ceph_copy_to_page_vector(struct page **pages, - const char *data, +void ceph_copy_to_page_vector(struct page **pages, + const void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(page_address(pages[i]) + po, data, l); data += l; left -= l; @@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_to_page_vector); -int ceph_copy_from_page_vector(struct page **pages, - char *data, +void ceph_copy_from_page_vector(struct page **pages, + void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(data, page_address(pages[i]) + po, l); data += l; left -= l; @@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_from_page_vector); @@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector); * copy user data from a page vector into a user pointer */ int ceph_copy_page_vector_to_user(struct page **pages, - char __user *data, + void __user *data, loff_t off, size_t len) { int i = 0; |