diff options
author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2010-02-26 15:32:31 -0800 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2010-03-04 11:26:35 -0800 |
commit | 422d2cb8f9afadba1ecd3614f658b6daaaa480fb (patch) | |
tree | 22e1a61acdbbe1459b190c4dbb6019360464b2e9 /fs | |
parent | e9964c102312967a4bc1fd501cb628c4a3b19034 (diff) |
ceph: reset osd after relevant messages timed out
This simplifies the process of timing out messages. We
keep lru of current messages that are in flight. If a
timeout has passed, we reset the osd connection, so that
messages will be retransmitted. This is a failsafe in case
we hit some sort of problem sending out message to the OSD.
Normally, we'll get notification via an updated osdmap if
there are problems.
If a request is older than the keepalive timeout, send a
keepalive to ensure we detect any breaks in the TCP connection.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/osd_client.c | 153 | ||||
-rw-r--r-- | fs/ceph/osd_client.h | 6 | ||||
-rw-r--r-- | fs/ceph/super.c | 8 | ||||
-rw-r--r-- | fs/ceph/super.h | 3 |
4 files changed, 113 insertions, 57 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index c4763bff97b..dbe63db9762 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -17,6 +17,8 @@ #define OSD_OPREPLY_FRONT_LEN 512 const static struct ceph_connection_operations osd_con_ops; +static int __kick_requests(struct ceph_osd_client *osdc, + struct ceph_osd *kickosd); static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); @@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) osd->o_con.ops = &osd_con_ops; osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; + INIT_LIST_HEAD(&osd->o_keepalive_item); return osd; } @@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) return NULL; } +static void __schedule_osd_timeout(struct ceph_osd_client *osdc) +{ + schedule_delayed_work(&osdc->timeout_work, + osdc->client->mount_args->osd_keepalive_timeout * HZ); +} + +static void __cancel_osd_timeout(struct ceph_osd_client *osdc) +{ + cancel_delayed_work(&osdc->timeout_work); +} /* * Register request, assign tid. If this is the first request, set up @@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc, mutex_lock(&osdc->request_mutex); req->r_tid = ++osdc->last_tid; req->r_request->hdr.tid = cpu_to_le64(req->r_tid); + INIT_LIST_HEAD(&req->r_req_lru_item); dout("register_request %p tid %lld\n", req, req->r_tid); __insert_request(osdc, req); ceph_osdc_get_request(req); osdc->num_requests++; - req->r_timeout_stamp = - jiffies + osdc->client->mount_args->osd_timeout*HZ; - if (osdc->num_requests == 1) { - osdc->timeout_tid = req->r_tid; - dout(" timeout on tid %llu at %lu\n", req->r_tid, - req->r_timeout_stamp); - schedule_delayed_work(&osdc->timeout_work, - round_jiffies_relative(req->r_timeout_stamp - jiffies)); + dout(" first request, scheduling timeout\n"); + __schedule_osd_timeout(osdc); } mutex_unlock(&osdc->request_mutex); } @@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc, ceph_osdc_put_request(req); - if (req->r_tid == osdc->timeout_tid) { - if (osdc->num_requests == 0) { - dout("no requests, canceling timeout\n"); - osdc->timeout_tid = 0; - cancel_delayed_work(&osdc->timeout_work); - } else { - req = rb_entry(rb_first(&osdc->requests), - struct ceph_osd_request, r_node); - osdc->timeout_tid = req->r_tid; - dout("rescheduled timeout on tid %llu at %lu\n", - req->r_tid, req->r_timeout_stamp); - schedule_delayed_work(&osdc->timeout_work, - round_jiffies_relative(req->r_timeout_stamp - - jiffies)); - } + list_del_init(&req->r_req_lru_item); + if (osdc->num_requests == 0) { + dout(" no requests, canceling timeout\n"); + __cancel_osd_timeout(osdc); } } @@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req) ceph_con_revoke(&req->r_osd->o_con, req->r_request); req->r_sent = 0; } + list_del_init(&req->r_req_lru_item); } /* @@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc, reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ reqhead->reassert_version = req->r_reassert_version; - req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ; + req->r_sent_stamp = jiffies; + list_move_tail(&osdc->req_lru, &req->r_req_lru_item); ceph_msg_get(req->r_request); /* send consumes a ref */ ceph_con_send(&req->r_osd->o_con, req->r_request); @@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work) { struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); - struct ceph_osd_request *req; + struct ceph_osd_request *req, *last_req = NULL; struct ceph_osd *osd; unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ; - unsigned long next_timeout = timeout + jiffies; + unsigned long keepalive = + osdc->client->mount_args->osd_keepalive_timeout * HZ; + unsigned long last_sent = 0; struct rb_node *p; + struct list_head slow_osds; dout("timeout\n"); down_read(&osdc->map_sem); @@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work) continue; } } - for (p = rb_first(&osdc->osds); p; p = rb_next(p)) { - osd = rb_entry(p, struct ceph_osd, o_node); - if (list_empty(&osd->o_requests)) - continue; - req = list_first_entry(&osd->o_requests, - struct ceph_osd_request, r_osd_item); - if (time_before(jiffies, req->r_timeout_stamp)) - continue; - dout(" tid %llu (at least) timed out on osd%d\n", + /* + * reset osds that appear to be _really_ unresponsive. this + * is a failsafe measure.. we really shouldn't be getting to + * this point if the system is working properly. the monitors + * should mark the osd as failed and we should find out about + * it from an updated osd map. + */ + while (!list_empty(&osdc->req_lru)) { + req = list_entry(osdc->req_lru.next, struct ceph_osd_request, + r_req_lru_item); + + if (time_before(jiffies, req->r_sent_stamp + timeout)) + break; + + BUG_ON(req == last_req && req->r_sent_stamp == last_sent); + last_req = req; + last_sent = req->r_sent_stamp; + + osd = req->r_osd; + BUG_ON(!osd); + pr_warning(" tid %llu timed out on osd%d, will reset osd\n", + req->r_tid, osd->o_osd); + __kick_requests(osdc, osd); + } + + /* + * ping osds that are a bit slow. this ensures that if there + * is a break in the TCP connection we will notice, and reopen + * a connection with that osd (from the fault callback). + */ + INIT_LIST_HEAD(&slow_osds); + list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { + if (time_before(jiffies, req->r_sent_stamp + keepalive)) + break; + + osd = req->r_osd; + BUG_ON(!osd); + dout(" tid %llu is slow, will send keepalive on osd%d\n", req->r_tid, osd->o_osd); - req->r_timeout_stamp = next_timeout; + list_move_tail(&osd->o_keepalive_item, &slow_osds); + } + while (!list_empty(&slow_osds)) { + osd = list_entry(slow_osds.next, struct ceph_osd, + o_keepalive_item); + list_del_init(&osd->o_keepalive_item); ceph_con_keepalive(&osd->o_con); } - if (osdc->timeout_tid) - schedule_delayed_work(&osdc->timeout_work, - round_jiffies_relative(timeout)); - + __schedule_osd_timeout(osdc); mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); @@ -819,18 +852,7 @@ bad: } -/* - * Resubmit osd requests whose osd or osd address has changed. Request - * a new osd map if osds are down, or we are otherwise unable to determine - * how to direct a request. - * - * Close connections to down osds. - * - * If @who is specified, resubmit requests for that specific osd. - * - * Caller should hold map_sem for read and request_mutex. - */ -static void kick_requests(struct ceph_osd_client *osdc, +static int __kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *kickosd) { struct ceph_osd_request *req; @@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc, int err; dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); - mutex_lock(&osdc->request_mutex); if (kickosd) { __reset_osd(osdc, kickosd); } else { @@ -900,14 +921,36 @@ kick: req->r_resend = true; } } + + return needmap; +} + +/* + * Resubmit osd requests whose osd or osd address has changed. Request + * a new osd map if osds are down, or we are otherwise unable to determine + * how to direct a request. + * + * Close connections to down osds. + * + * If @who is specified, resubmit requests for that specific osd. + * + * Caller should hold map_sem for read and request_mutex. + */ +static void kick_requests(struct ceph_osd_client *osdc, + struct ceph_osd *kickosd) +{ + int needmap; + + mutex_lock(&osdc->request_mutex); + needmap = __kick_requests(osdc, kickosd); mutex_unlock(&osdc->request_mutex); if (needmap) { dout("%d requests for down osds, need new map\n", needmap); ceph_monc_request_next_osdmap(&osdc->client->monc); } -} +} /* * Process updated osd map. * @@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) init_completion(&osdc->map_waiters); osdc->last_requested_map = 0; mutex_init(&osdc->request_mutex); - osdc->timeout_tid = 0; osdc->last_tid = 0; osdc->osds = RB_ROOT; INIT_LIST_HEAD(&osdc->osd_lru); osdc->requests = RB_ROOT; + INIT_LIST_HEAD(&osdc->req_lru); osdc->num_requests = 0; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index f256eba6fe7..1b1a3ca43af 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h @@ -36,12 +36,15 @@ struct ceph_osd { void *o_authorizer_buf, *o_authorizer_reply_buf; size_t o_authorizer_buf_len, o_authorizer_reply_buf_len; unsigned long lru_ttl; + int o_marked_for_keepalive; + struct list_head o_keepalive_item; }; /* an in-flight request */ struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; + struct list_head r_req_lru_item; struct list_head r_osd_item; struct ceph_osd *r_osd; struct ceph_pg r_pgid; @@ -67,7 +70,7 @@ struct ceph_osd_request { char r_oid[40]; /* object name */ int r_oid_len; - unsigned long r_timeout_stamp; + unsigned long r_sent_stamp; bool r_resend; /* msg send failed, needs retry */ struct ceph_file_layout r_file_layout; @@ -92,6 +95,7 @@ struct ceph_osd_client { u64 timeout_tid; /* tid of timeout triggering rq */ u64 last_tid; /* tid of last request */ struct rb_root requests; /* pending requests */ + struct list_head req_lru; /* pending requests lru */ int num_requests; struct delayed_work timeout_work; struct delayed_work osds_timeout_work; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 74953be75f8..4290a6e860b 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -292,6 +292,7 @@ enum { Opt_wsize, Opt_rsize, Opt_osdtimeout, + Opt_osdkeepalivetimeout, Opt_mount_timeout, Opt_osd_idle_ttl, Opt_caps_wanted_delay_min, @@ -322,6 +323,7 @@ static match_table_t arg_tokens = { {Opt_wsize, "wsize=%d"}, {Opt_rsize, "rsize=%d"}, {Opt_osdtimeout, "osdtimeout=%d"}, + {Opt_osdkeepalivetimeout, "osdkeepalive=%d"}, {Opt_mount_timeout, "mount_timeout=%d"}, {Opt_osd_idle_ttl, "osd_idle_ttl=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, @@ -367,7 +369,8 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, /* start with defaults */ args->sb_flags = flags; args->flags = CEPH_OPT_DEFAULT; - args->osd_timeout = 5; /* seconds */ + args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT; + args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; @@ -468,6 +471,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, case Opt_osdtimeout: args->osd_timeout = intval; break; + case Opt_osdkeepalivetimeout: + args->osd_keepalive_timeout = intval; + break; case Opt_mount_timeout: args->mount_timeout = intval; break; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 6a778f2c3f6..02c0ddcf3ea 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -62,6 +62,7 @@ struct ceph_mount_args { int max_readdir; /* max readdir size */ int congestion_kb; /* max readdir size */ int osd_timeout; + int osd_keepalive_timeout; char *snapdir_name; /* default ".snap" */ char *name; char *secret; @@ -72,6 +73,8 @@ struct ceph_mount_args { * defaults */ #define CEPH_MOUNT_TIMEOUT_DEFAULT 60 +#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */ +#define CEPH_OSD_KEEPALIVE_DEFAULT 5 #define CEPH_OSD_IDLE_TTL_DEFAULT 60 #define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */ |