#include "decode.h"
-static void reschedule_timeout(struct ceph_osd_client *osdc,
- unsigned long base_time);
-
-
-
/*
* calculate the mapping of a file extent onto an object, and fill out the
* request accordingly. shorten extent as necessary if it crosses an
/*
- * register request, assign tid.
+ * Register request, assign tid. If this is the first request, set up
+ * the timeout event.
*/
static int register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
goto out;
get_request(req);
- if (osdc->num_requests == 0) {
+ osdc->num_requests++;
+
+ req->r_timeout_stamp =
+ jiffies + osdc->client->mount_args.osd_timeout*HZ;
+
+ if (osdc->num_requests == 1) {
osdc->timeout_tid = req->r_tid;
- dout(10, "setting timeout_tid=%lld\n", osdc->timeout_tid);
- reschedule_timeout(osdc, 0);
+ dout(30, " timeout on tid %llu at %lu\n", req->r_tid,
+ req->r_timeout_stamp);
+ schedule_delayed_work(&osdc->timeout_work,
+ round_jiffies_relative(req->r_timeout_stamp - jiffies));
}
- osdc->num_requests++;
out:
mutex_unlock(&osdc->request_mutex);
return rc;
}
-
/*
- * register the request for timeout
+ * Timeout callback, called every N seconds when 1 or more osd
+ * requests has been active for more than N seconds. When this
+ * happens, we ping all OSDs with requests who have timed out to
+ * ensure any communications channel reset is detected. Reset the
+ * request timeouts another N seconds in the future as we go.
+ * Reschedule the timeout event another N seconds in future (unless
+ * there are no open requests).
*/
-static void __register_request_timeout(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static void handle_timeout(struct work_struct *work)
{
- dout(10, "replacing timeout_tid: %lld-> %lld\n",
- osdc->timeout_tid, req->r_tid);
- osdc->timeout_tid = req->r_tid;
- reschedule_timeout(osdc, req->r_timeout_stamp);
-}
+ struct ceph_osd_client *osdc =
+ container_of(work, struct ceph_osd_client, timeout_work.work);
+ struct ceph_osd_request *req;
+ unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
+ unsigned long next_timeout = timeout + jiffies;
+ RADIX_TREE(pings, GFP_NOFS); /* only send 1 ping per osd */
+ u64 next_tid = 0;
+ int got;
-/*
- * register the next request as the base request for timeout, after current
- * request has been served
- */
-static void __register_next_request_timeout(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
-{
- struct ceph_osd_request *next_req;
- int ret;
+ dout(10, "timeout\n");
+ down_read(&osdc->map_sem);
- ret = radix_tree_gang_lookup(&osdc->request_tree, (void **)&next_req,
- req->r_tid+1, 1);
- if (!ret || (next_req->r_tid == osdc->timeout_tid))
- ret = radix_tree_gang_lookup(&osdc->request_tree,
- (void **)&next_req, 0, 1);
+ ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
+
+ mutex_lock(&osdc->request_mutex);
+ while (1) {
+ got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+ next_tid, 1);
+ if (got == 0)
+ break;
+ next_tid = req->r_tid + 1;
+ if (time_before(jiffies, req->r_timeout_stamp))
+ goto next;
+
+ req->r_timeout_stamp = next_timeout;
+ if (req->r_last_osd >= 0 &&
+ radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
+ struct ceph_entity_name n = {
+ .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
+ .num = cpu_to_le32(req->r_last_osd)
+ };
+ dout(20, " tid %llu (at least) timed out on osd%d\n",
+ req->r_tid, req->r_last_osd);
+ radix_tree_insert(&pings, req->r_last_osd, req);
+ ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
+ }
+
+ next:
+ got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+ next_tid, 1);
+ }
- /* is there a next request? */
- if (ret == 1)
- __register_request_timeout(osdc, next_req);
+ while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
+ radix_tree_delete(&pings, req->r_last_osd);
+
+ if (osdc->timeout_tid)
+ schedule_delayed_work(&osdc->timeout_work,
+ round_jiffies_relative(timeout));
+
+ mutex_unlock(&osdc->request_mutex);
+
+ up_read(&osdc->map_sem);
}
/*
radix_tree_delete(&osdc->request_tree, req->r_tid);
osdc->num_requests--;
+ ceph_osdc_put_request(req);
+
if (req->r_tid == osdc->timeout_tid) {
- cancel_delayed_work(&osdc->timeout_work);
- if (osdc->num_requests)
- __register_next_request_timeout(osdc, req);
- }
+ if (osdc->num_requests == 0) {
+ dout(30, "no requests, canceling timeout\n");
+ osdc->timeout_tid = 0;
+ cancel_delayed_work(&osdc->timeout_work);
+ } else {
+ struct ceph_osd_request *req;
+ int ret;
- ceph_osdc_put_request(req);
+ ret = radix_tree_gang_lookup(&osdc->request_tree,
+ (void **)&req, 0, 1);
+ BUG_ON(ret != 1);
+ osdc->timeout_tid = req->r_tid;
+ dout(30, "rescheduled timeout on tid %llu at %lu\n",
+ req->r_tid, req->r_timeout_stamp);
+ schedule_delayed_work(&osdc->timeout_work,
+ round_jiffies_relative(req->r_timeout_stamp -
+ jiffies));
+ }
+ }
}
/*
req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
req->r_last_osd_addr = req->r_request->hdr.dst.addr;
- req->r_timeout_stamp = jiffies + osdc->client->mount_args.osd_timeout * HZ;
+ req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
ceph_msg_get(req->r_request); /* send consumes a ref */
return ceph_msg_send(osdc->client->msgr, req->r_request,
return rc;
return bytes;
}
-
-static long jiffies_to_timeout(struct ceph_osd_client *osdc,
- unsigned long timeout_time)
-{
- unsigned long jifs;
-
- if (timeout_time)
- jifs = timeout_time - jiffies;
- else
- jifs = osdc->client->mount_args.osd_timeout * HZ;
-
- return round_jiffies_relative(jifs);
-}
-
-/*
- * if one or more requests takes too long, a timeout expires.
- *
- * FIXME.
- */
-static void reschedule_timeout(struct ceph_osd_client *osdc,
- unsigned long timeout_time)
-{
- long jifs = jiffies_to_timeout(osdc, timeout_time);
- dout(10, "reschedule timeout (%ld jiffies)\n", jifs);
-
- schedule_delayed_work(&osdc->timeout_work,
- jifs);
-}
-
-/*
- * timeout callback, being called after osdc pending message has
- * been waiting for reply for a pre set time. When this happens
- * we iterate over all the pending osd requests, and ping all the
- * osds that their timeout has expired. We retrigger the timer,
- * based on the next pending message that its time has not
- * expired yet.
- */
-static void handle_timeout(struct work_struct *work)
-{
- u64 next_tid = 0;
- u64 timeout_tid = 0;
- struct ceph_osd_client *osdc =
- container_of(work, struct ceph_osd_client, timeout_work.work);
- unsigned long next_timeout_stamp;
- struct ceph_osd_request *req, *next_timeout_req;
- int got;
- int timeout = osdc->client->mount_args.osd_timeout * HZ;
- RADIX_TREE(pings, GFP_NOFS); /* only send 1 ping per osd */
- unsigned long t;
-
- dout(10, "timeout\n");
- down_read(&osdc->map_sem);
-
- ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
-
- /*
- * ping any osds with pending requests that have timeout to ensure the communications
- * channel hasn't reset
- */
- mutex_lock(&osdc->request_mutex);
-
- /* this is the request that we're serving */
- got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
- osdc->timeout_tid, 1);
-
- if (got == 0)
- goto done;
-
- req->r_timeout_stamp += timeout;
- next_timeout_req = req;
- next_timeout_stamp = req->r_timeout_stamp;
- timeout_tid = req->r_tid;
- dout(10, "tid=%lld timeout_tid=%lld\n", next_tid, osdc->timeout_tid);
-
- while (1) {
- next_tid = req->r_tid + 1;
- if (time_after(jiffies, req->r_timeout_stamp)) {
- /* is this osd already in the list */
- if (req->r_last_osd >= 0 &&
- radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
- struct ceph_entity_name n = {
- .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
- .num = cpu_to_le32(req->r_last_osd)
- };
- radix_tree_insert(&pings, req->r_last_osd, req);
- ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
- }
- req->r_timeout_stamp = req->r_timeout_stamp + timeout;
- } else {
- if (time_before(req->r_timeout_stamp, next_timeout_stamp)) {
- next_timeout_req = req;
- }
- }
-
- /* now get the next request */
- got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
- next_tid, 1);
-
- /* no next request? let's search from the start */
- if (got == 0) {
- got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
- 0, 1);
- }
-
- dout(30, "next_tid=%lld timeout_tid=%lld\n", next_tid, timeout_tid);
- /* no next request, or we're back where we started */
- if (got == 0 || req->r_tid == timeout_tid)
- break;
- }
-
- /*
- * register the next request for timeout, one that hasn't
- * timeout in this cycle
- */
- __register_request_timeout(osdc, next_timeout_req);
-
- t = 0;
- while (radix_tree_gang_lookup(&pings, (void **)&req, t, 1)) {
- radix_tree_delete(&pings, req->r_last_osd);
- t = req->r_last_osd + 1;
- }
-
-done:
- mutex_unlock(&osdc->request_mutex);
-
- up_read(&osdc->map_sem);
-}
-
-
/*
* init, shutdown
*/
init_completion(&osdc->map_waiters);
osdc->last_requested_map = 0;
mutex_init(&osdc->request_mutex);
+ osdc->timeout_tid = 0;
osdc->last_tid = 0;
INIT_RADIX_TREE(&osdc->request_tree, GFP_NOFS);
osdc->num_requests = 0;