/*
* Open a session with a (new) monitor.
*/
-static int open_session(struct ceph_mon_client *monc, int newmon)
+static int __open_session(struct ceph_mon_client *monc, int newmon)
{
char r;
return 0;
}
-static void send_subscribe(struct ceph_mon_client *monc)
+/*
+ * Reschedule delayed work timer.
+ */
+static void __schedule_delayed(struct ceph_mon_client *monc)
+{
+ unsigned delay;
+
+ if (monc->want_mount)
+ delay = 10 * HZ;
+ else
+ delay = 20 * HZ;
+ dout("__schedule_delayed after %u\n", delay);
+ schedule_delayed_work(&monc->delayed_work, delay);
+}
+
+/*
+ * Send subscribe request for mdsmap and/or osdmap.
+ */
+static void __send_subscribe(struct ceph_mon_client *monc)
{
- if (!monc->subscribed && !monc->sub_sent) {
+ if ((!monc->subscribed && !monc->sub_sent) ||
+ time_before(jiffies, monc->sub_renew_after) ||
+ !monc->want_next_osdmap) {
struct ceph_msg *msg;
struct ceph_mon_subscribe_item *i;
void *p, *end;
p = msg->front.iov_base;
end = p + msg->front.iov_len;
- ceph_encode_32(&p, 1);
+ ceph_encode_32(&p, 1 + monc->want_next_osdmap);
ceph_encode_string(&p, end, "mdsmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_mdsmap);
i->onetime = 0;
p += sizeof(*i);
+ if (monc->want_next_osdmap) {
+ ceph_encode_string(&p, end, "osdmap", 6);
+ i = p;
+ i->have = cpu_to_le64(monc->have_osdmap);
+ i->onetime = 1;
+ p += sizeof(*i);
+ }
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
static void handle_subscribe_ack(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
- unsigned ms;
+ unsigned seconds;
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
dout("handle_subscribe_ack\n");
- ceph_decode_64_safe(&p, end, ms, bad);
+ ceph_decode_64_safe(&p, end, seconds, bad);
monc->subscribed = true;
- monc->sub_renew_after = monc->sub_sent + ms / 1000;
+ monc->sub_renew_after = monc->sub_sent + seconds*HZ;
bad:
pr_err("ceph got corrupt subscribe-ack msg\n");
}
/*
- * Generic timeout mechanism for monitor requests, so we can resend if
- * we don't get a timely reply. Exponential backoff.
- */
-static void reschedule_timeout(struct ceph_mon_request *req)
-{
- schedule_delayed_work(&req->delayed_work, req->delay);
- if (req->delay < MAX_DELAY_INTERVAL)
- req->delay *= 2;
- else
- req->delay = MAX_DELAY_INTERVAL;
-}
-
-static void retry_request(struct work_struct *work)
-{
- struct ceph_mon_request *req =
- container_of(work, struct ceph_mon_request,
- delayed_work.work);
-
- /*
- * if lock is contended, reschedule sooner. we can't wait for
- * mutex because we cancel the timeout sync with lock held.
- */
- if (mutex_trylock(&req->monc->req_mutex)) {
- req->do_request(req->monc, 1);
- reschedule_timeout(req);
- mutex_unlock(&req->monc->req_mutex);
- } else
- schedule_delayed_work(&req->delayed_work, BASE_DELAY_INTERVAL);
-}
-
-static void cancel_timeout(struct ceph_mon_request *req)
-{
- cancel_delayed_work_sync(&req->delayed_work);
- req->delay = BASE_DELAY_INTERVAL;
-}
-
-static void init_request_type(struct ceph_mon_client *monc,
- struct ceph_mon_request *req,
- ceph_monc_request_func_t func)
-{
- req->monc = monc;
- INIT_DELAYED_WORK(&req->delayed_work, retry_request);
- req->delay = 0;
- req->do_request = func;
-}
-
-
-/*
- * Possibly cancel our desire for a new map
+ * Keep track of which maps we have
*/
int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
{
- mutex_lock(&monc->req_mutex);
+ mutex_lock(&monc->mutex);
monc->have_mdsmap = got;
- mutex_unlock(&monc->req_mutex);
+ mutex_unlock(&monc->mutex);
return 0;
}
-
-/*
- * osd map
- */
-static void request_osdmap(struct ceph_mon_client *monc, int newmon)
-{
- struct ceph_msg *msg;
- struct ceph_osd_getmap *h;
- int err;
-
- dout("request_osdmap want %u\n", monc->want_osdmap);
- err = open_session(monc, newmon);
- if (err)
- return;
- msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, sizeof(*h), 0, 0, NULL);
- if (IS_ERR(msg))
- return;
- h = msg->front.iov_base;
- h->fsid = monc->monmap->fsid;
- h->start = cpu_to_le32(monc->want_osdmap);
- h->have_version = cpu_to_le64(monc->want_osdmap ?
- monc->want_osdmap-1 : 0);
- ceph_con_send(monc->con, msg);
-}
-
-void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want)
+int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
{
- dout("request_osdmap want %u\n", want);
- mutex_lock(&monc->req_mutex);
- monc->osdreq.delay = BASE_DELAY_INTERVAL;
- monc->want_osdmap = want;
- request_osdmap(monc, 0);
- reschedule_timeout(&monc->osdreq);
- mutex_unlock(&monc->req_mutex);
+ mutex_lock(&monc->mutex);
+ monc->have_osdmap = got;
+ monc->want_next_osdmap = false;
+ mutex_unlock(&monc->mutex);
+ return 0;
}
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+/*
+ * Register interest in the next osdmap
+ */
+void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
{
- int ret = 0;
-
- mutex_lock(&monc->req_mutex);
- if (got < monc->want_osdmap) {
- dout("got_osdmap %u < wanted %u\n", got, monc->want_osdmap);
- ret = -EAGAIN;
- } else {
- dout("got_osdmap %u >= wanted %u\n", got, monc->want_osdmap);
- monc->want_osdmap = 0;
- cancel_timeout(&monc->osdreq);
- }
- mutex_unlock(&monc->req_mutex);
- return ret;
+ dout("request_next_osdmap have %u\n", monc->have_osdmap);
+ mutex_lock(&monc->mutex);
+ monc->want_next_osdmap = true;
+ __send_subscribe(monc);
+ mutex_unlock(&monc->mutex);
}
/*
* mount
*/
-static void request_mount(struct ceph_mon_client *monc, int newmon)
+static void __request_mount(struct ceph_mon_client *monc)
{
struct ceph_msg *msg;
struct ceph_client_mount *h;
int err;
- dout("request_mount\n");
- err = open_session(monc, newmon);
+ dout("__request_mount\n");
+ err = __open_session(monc, 1);
if (err)
return;
msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL);
void ceph_monc_request_mount(struct ceph_mon_client *monc)
{
- mutex_lock(&monc->req_mutex);
- monc->mountreq.delay = BASE_DELAY_INTERVAL;
- request_mount(monc, 0);
- reschedule_timeout(&monc->mountreq);
- mutex_unlock(&monc->req_mutex);
+ mutex_lock(&monc->mutex);
+ __request_mount(monc);
+ __schedule_delayed(monc);
+ mutex_unlock(&monc->mutex);
}
/*
return;
}
+ mutex_lock(&monc->mutex);
+
dout("handle_mount_ack\n");
p = msg->front.iov_base;
end = p + msg->front.iov_len;
client->signed_ticket = NULL;
client->signed_ticket_len = 0;
+ monc->want_mount = false;
+
client->whoami = cnum;
client->msgr->inst.name.num = cpu_to_le32(cnum);
client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
-
memcpy(&client->msgr->inst.addr, &addr, sizeof(addr));
-
pr_info("ceph client%d %u.%u.%u.%u:%u fsid %llx.%llx\n", client->whoami,
IPQUADPORT(addr.ipaddr),
le64_to_cpu(__ceph_fsid_major(&client->monc.monmap->fsid)),
le64_to_cpu(__ceph_fsid_minor(&client->monc.monmap->fsid)));
- ceph_debugfs_client_init(client);
- send_subscribe(monc);
+ ceph_debugfs_client_init(client);
+ __send_subscribe(monc);
err = 0;
goto out;
pr_err("ceph error decoding mount_ack message\n");
out:
client->mount_err = err;
- mutex_lock(&monc->req_mutex);
- cancel_timeout(&monc->mountreq);
- mutex_unlock(&monc->req_mutex);
+ mutex_unlock(&monc->mutex);
wake_up(&client->mount_wq);
}
tid = le64_to_cpu(reply->tid);
dout("handle_statfs_reply %p tid %llu\n", msg, tid);
- mutex_lock(&monc->statfs_mutex);
+ mutex_lock(&monc->mutex);
req = radix_tree_lookup(&monc->statfs_request_tree, tid);
if (req) {
*req->buf = reply->st;
req->result = 0;
}
- mutex_unlock(&monc->statfs_mutex);
+ mutex_unlock(&monc->mutex);
if (req)
complete(&req->completion);
return;
int err;
dout("send_statfs tid %llu\n", req->tid);
- err = open_session(monc, 0);
+ err = __open_session(monc, 0);
if (err)
return err;
msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
init_completion(&req.completion);
/* register request */
- mutex_lock(&monc->statfs_mutex);
+ mutex_lock(&monc->mutex);
req.tid = ++monc->last_tid;
req.last_attempt = jiffies;
req.delay = BASE_DELAY_INTERVAL;
if (radix_tree_insert(&monc->statfs_request_tree, req.tid, &req) < 0) {
- mutex_unlock(&monc->statfs_mutex);
+ mutex_unlock(&monc->mutex);
pr_err("ceph ENOMEM in do_statfs\n");
return -ENOMEM;
}
monc->num_statfs_requests++;
ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1);
- mutex_unlock(&monc->statfs_mutex);
+ mutex_unlock(&monc->mutex);
/* send request and wait */
err = send_statfs(monc, &req);
if (!err)
err = wait_for_completion_interruptible(&req.completion);
- mutex_lock(&monc->statfs_mutex);
+ mutex_lock(&monc->mutex);
radix_tree_delete(&monc->statfs_request_tree, req.tid);
monc->num_statfs_requests--;
ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1);
- mutex_unlock(&monc->statfs_mutex);
+ mutex_unlock(&monc->mutex);
if (!err)
err = req.result;
int did = 0;
struct ceph_mon_statfs_request *req;
- mutex_lock(&monc->statfs_mutex);
+ mutex_lock(&monc->mutex);
while (1) {
got = radix_tree_gang_lookup(&monc->statfs_request_tree,
(void **)&req,
send_statfs(monc, req);
}
- mutex_unlock(&monc->statfs_mutex);
+ mutex_unlock(&monc->mutex);
}
+/*
+ * Delayed work. If we haven't mounted yet, retry. Otherwise,
+ * renew/retry subscription as needed (in case it is timing out, or we
+ * got an ENOMEM). And keep the monitor connection alive.
+ */
+static void delayed_work(struct work_struct *work)
+{
+ struct ceph_mon_client *monc =
+ container_of(work, struct ceph_mon_client, delayed_work.work);
+
+ dout("monc delayed_work\n");
+ mutex_lock(&monc->mutex);
+ if (monc->want_mount) {
+ __request_mount(monc);
+ } else {
+ ceph_con_keepalive(monc->con);
+ __send_subscribe(monc);
+ }
+ __schedule_delayed(monc);
+ mutex_unlock(&monc->mutex);
+}
int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
{
memset(monc, 0, sizeof(*monc));
monc->client = cl;
monc->monmap = NULL;
- mutex_init(&monc->statfs_mutex);
+ mutex_init(&monc->mutex);
+ monc->con = NULL;
+
+ INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_NOFS);
monc->num_statfs_requests = 0;
monc->last_tid = 0;
- init_request_type(monc, &monc->osdreq, request_osdmap);
- init_request_type(monc, &monc->mountreq, request_mount);
- mutex_init(&monc->req_mutex);
+
monc->have_mdsmap = 0;
- monc->want_osdmap = 0;
- monc->con = NULL;
+ monc->have_osdmap = 0;
+ monc->want_next_osdmap = true;
+ monc->want_mount = true;
return 0;
}
void ceph_monc_stop(struct ceph_mon_client *monc)
{
dout("stop\n");
- cancel_timeout(&monc->osdreq);
- cancel_timeout(&monc->mountreq);
+ cancel_delayed_work_sync(&monc->delayed_work);
if (monc->con) {
monc->con->ops->put(monc->con);
monc->con = NULL;
struct ceph_mon_client *monc = con->private;
dout("mon_reset\n");
- if (open_session(monc, 1) < 0)
+ if (__open_session(monc, 1) < 0)
return; /* delayed work handler will retry */
+ __send_subscribe(monc);
resend_statfs(monc);
}