dout(30, "wait_for_new_map exit\n");
}
-static int open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int mds)
+static int open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
{
struct ceph_msg *msg;
int mstate;
+ int mds = session->s_mds;
/* mds active? */
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
return 0;
}
+static int resume_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
+{
+ int mds = session->s_mds;
+ struct ceph_msg *msg;
+ struct list_head *cp;
+ struct ceph_inode_cap *cap;
+
+ dout(10, "resume_session to mds%d\n", mds);
+
+ /* note cap staleness */
+ spin_lock(&session->s_cap_lock);
+ list_for_each(cp, &session->s_caps) {
+ cap = list_entry(cp, struct ceph_inode_cap, session_caps);
+ cap->caps = 0;
+ }
+ spin_unlock(&session->s_cap_lock);
+
+ /* send request_resume message */
+ msg = create_session_msg(CEPH_SESSION_REQUEST_RESUME, session->s_cap_seq);
+ if (IS_ERR(msg))
+ return PTR_ERR(msg); /* fixme */
+ //session->s_state = CEPH_MDS_SESSION_OPENING;
+ send_msg_mds(mdsc, msg, mds);
+ return 0;
+}
+
void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
__u32 op;
__u64 seq;
struct ceph_mds_session *session;
- int from = msg->hdr.src.name.num;
+ int mds = msg->hdr.src.name.num;
struct ceph_mds_session_head *h = msg->front.iov_base;
if (msg->front.iov_len != sizeof(*h))
seq = le64_to_cpu(h->seq);
/* handle */
+ session = get_session(mdsc, mds);
dout(1, "handle_session op %d seq %llu\n", op, seq);
spin_lock(&mdsc->lock);
switch (op) {
case CEPH_SESSION_OPEN:
- dout(1, "session open from mds%d\n", from);
- session = get_session(mdsc, from);
+ dout(1, "session open from mds%d\n", mds);
session->s_state = CEPH_MDS_SESSION_OPEN;
complete(&session->s_completion);
ceph_mdsc_put_session(session);
break;
case CEPH_SESSION_CLOSE:
- session = get_session(mdsc, from);
if (session->s_cap_seq == seq) {
- dout(1, "session close from mds%d\n", from);
+ dout(1, "session close from mds%d\n", mds);
complete(&session->s_completion); /* for good measure */
- unregister_session(mdsc, from);
+ unregister_session(mdsc, mds);
} else {
dout(1, "ignoring session close from mds%d, seq %llu < my seq %llu\n",
msg->hdr.src.name.num, seq, session->s_cap_seq);
ceph_mdsc_put_session(session);
break;
+ case CEPH_SESSION_RENEWCAPS:
+ dout(10, "session renewed caps from mds%d\n", mds);
+ break;
+
+ case CEPH_SESSION_STALE:
+ dout(10, "session stale from mds%d\n", mds);
+ resume_session(mdsc, session);
+ break;
+
+ case CEPH_SESSION_RESUME:
+ dout(10, "session resumed by mds%d\n", mds);
+ break;
+
default:
dout(0, "bad session op %d\n", op);
BUG_ON(1);
/* exported functions */
+void schedule_delayed(struct ceph_mds_client *mdsc)
+{
+ schedule_delayed_work(&mdsc->delayed_work, HZ*60);
+}
+
+void delayed_work(struct work_struct *work);
+
void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
{
spin_lock_init(&mdsc->lock);
INIT_RADIX_TREE(&mdsc->request_tree, GFP_KERNEL);
mdsc->last_requested_map = 0;
init_completion(&mdsc->map_waiters);
+ INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
+
+ /* hack fixme */
+ schedule_delayed(mdsc);
+}
+
+void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
+{
+ /* close sessions, caps */
+ /* IMPLEMENT ME */
+ cancel_delayed_work_sync(&mdsc->delayed_work);
}
/* open? */
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
- err = open_session(mdsc, session, mds);
+ err = open_session(mdsc, session);
BUG_ON(err && err != -EAGAIN);
}
if (session->s_state != CEPH_MDS_SESSION_OPEN) {
return 0;
}
+int send_renewcaps(struct ceph_mds_client *mdsc, int mds)
+{
+ struct ceph_msg *msg;
+
+ dout(10, "send_renew_caps to mds%d\n", mds);
+
+ msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 0);
+ if (IS_ERR(msg))
+ return PTR_ERR(msg);
+ send_msg_mds(mdsc, msg, mds);
+ return 0;
+}
+
+int ceph_mdsc_renew_caps(struct ceph_mds_client *mdsc)
+{
+ int i;
+ dout(10, "renew_caps\n");
+ for (i=0; i<mdsc->max_sessions; i++) {
+ if (mdsc->sessions[i] == 0 ||
+ mdsc->sessions[i]->s_state < CEPH_MDS_SESSION_OPEN)
+ continue;
+ send_renewcaps(mdsc, i);
+ }
+ return 0;
+}
+void delayed_work(struct work_struct *work)
+{
+ int i;
+ struct ceph_mds_client *mdsc = container_of(work, struct ceph_mds_client, delayed_work.work);
+ dout(10, "delayed_work on %p\n", mdsc);
+
+ /* renew caps */
+ for (i=0; i<mdsc->max_sessions; i++) {
+ if (mdsc->sessions[i] == 0 ||
+ mdsc->sessions[i]->s_state < CEPH_MDS_SESSION_OPEN)
+ continue;
+ send_renewcaps(mdsc, i);
+ }
+
+ schedule_delayed(mdsc);
+}
/* eof */