__le64 size;
struct ceph_timeval mtime, atime;
} __attribute__ ((packed));
+/* followed by encoded string */
}
-int parse_open_reply(struct ceph_msg *reply, struct inode *inode)
+int parse_open_reply(struct ceph_msg *reply, struct inode *inode, struct ceph_mds_session *session)
{
struct ceph_mds_reply_head *head;
struct ceph_mds_reply_info rinfo;
- int frommds = reply->hdr.src.name.num;
+ int frommds = session->s_mds;
int err;
struct ceph_inode_cap *cap;
return err;
/* fill in cap */
- cap = ceph_add_cap(inode, frommds,
+ cap = ceph_add_cap(inode, session,
le32_to_cpu(head->file_caps),
le32_to_cpu(head->file_caps_seq));
if (IS_ERR(cap))
struct ceph_msg *req = 0;
struct ceph_mds_request_head *reqhead;
struct ceph_mds_reply_info rinfo;
+ struct ceph_mds_session *session;
int frommds;
int err;
struct ceph_inode_cap *cap;
reqhead = req->front.iov_base;
reqhead->args.open.flags = O_DIRECTORY;
reqhead->args.open.mode = 0;
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, &session)) < 0)
return err;
err = le32_to_cpu(rinfo.head->result);
- if (err != 0)
- return err;
+ if (err != 0)
+ goto out;
if (rinfo.trace_nr == 0) {
dout(10, "open_root_inode wtf, mds returns 0 but no trace\n");
err = -EINVAL;
/* create root inode */
inode = iget_locked(client->sb, rinfo.trace_in[rinfo.trace_nr-1].in->ino);
- if (inode == NULL)
- return -ENOMEM;
+ if (inode == NULL) {
+ err = -ENOMEM;
+ goto out;
+ }
if (inode->i_state & I_NEW)
unlock_new_inode(inode);
if ((err = ceph_fill_inode(inode, rinfo.trace_in[rinfo.trace_nr-1].in)) < 0)
- goto out;
+ goto out2;
/* fill in cap */
frommds = rinfo.reply->hdr.src.name.num;
- cap = ceph_add_cap(inode, frommds,
+ cap = ceph_add_cap(inode, session,
le32_to_cpu(rinfo.head->file_caps),
le32_to_cpu(rinfo.head->file_caps_seq));
if (IS_ERR(cap)) {
err = PTR_ERR(cap);
- goto out;
+ goto out2;
}
ci = ceph_inode(inode);
ci->i_nr_by_mode[FILE_MODE_PIN]++;
if (root == NULL) {
err = -ENOMEM;
/* fixme: also close? */
- goto out;
+ goto out2;
}
client->sb->s_root = root;
- dout(30, "open_root_inode success.\n");
+ dout(30, "open_root_inode success, root d is %p.\n", root);
return 0;
-out:
+out2:
dout(30, "open_root_inode failure %d\n", err);
iput(inode);
+out:
+ ceph_mdsc_put_session(session);
return err;
}
return PTR_ERR(req);
rhead = req->front.iov_base;
rhead->args.readdir.frag = cpu_to_le32(frag);
- if ((err = ceph_mdsc_do_request(mdsc, req, &fi->rinfo, -1)) < 0)
+ if ((err = ceph_mdsc_do_request(mdsc, req, &fi->rinfo, 0)) < 0)
return err;
err = le32_to_cpu(fi->rinfo.head->result);
dout(10, "dir_readdir got and parsed readdir result=%d on frag %u\n", err, frag);
kfree(path);
if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req));
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0)
return ERR_PTR(err);
err = le32_to_cpu(rinfo.head->result);
dout(20, "dir_lookup result=%d\n", err);
rhead = req->front.iov_base;
rhead->args.mknod.mode = cpu_to_le32(mode);
rhead->args.mknod.rdev = cpu_to_le32(rdev);
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) {
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) {
d_drop(dentry);
return err;
}
d_drop(dentry);
return PTR_ERR(req);
}
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) {
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) {
d_drop(dentry);
return err;
}
}
rhead = req->front.iov_base;
rhead->args.mkdir.mode = cpu_to_le32(mode);
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) {
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) {
d_drop(dentry);
return err;
}
kfree(path);
if (IS_ERR(req))
return PTR_ERR(req);
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0)
return err;
err = le32_to_cpu(rinfo.head->result);
kfree(newpath);
if (IS_ERR(req))
return PTR_ERR(req);
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0)
return err;
err = le32_to_cpu(rinfo.head->result);
struct ceph_msg *req;
struct ceph_mds_request_head *rhead;
struct ceph_mds_reply_info rinfo;
+ struct ceph_mds_session *session;
struct dentry *dentry;
int frommds;
struct ceph_inode_cap *cap;
return ERR_PTR(PTR_ERR(req));
rhead = req->front.iov_base;
rhead->args.open.flags = cpu_to_le32(flags);
- if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
+ if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, &session)) < 0)
return ERR_PTR(err);
dout(10, "open got and parsed result\n");
frommds = rinfo.reply->hdr.src.name.num;
- cap = ceph_add_cap(inode, frommds,
+ cap = ceph_add_cap(inode, session,
le32_to_cpu(rinfo.head->file_caps),
le32_to_cpu(rinfo.head->file_caps_seq));
+ ceph_mdsc_put_session(session);
return cap;
}
}
-struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq)
+struct ceph_inode_cap *ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u32 cap, u32 seq)
{
+ int mds = session->s_mds;
struct ceph_inode_info *ci = ceph_inode(inode);
int i;
kfree(o);
ci->i_max_caps *= 2;
}
-
+ ci->i_caps[i].ci = ci;
ci->i_caps[i].caps = 0;
ci->i_caps[i].mds = mds;
ci->i_caps[i].seq = 0;
ci->i_caps[i].flags = 0;
ci->i_nr_caps++;
+
+ ci->i_caps[i].session = session;
+ spin_lock(&session->s_cap_lock);
+ list_add(&ci->i_caps[i].session_caps, &session->s_caps);
+ session->s_nr_caps++;
+ spin_unlock(&session->s_cap_lock);
}
dout(10, "add_cap inode %p (%lu) got cap %d %xh now %xh seq %d from %d\n",
return have;
}
+void __remove_cap(struct ceph_inode_cap *cap)
+{
+ /* remove from session list */
+ struct ceph_mds_session *session = cap->session;
+ spin_lock(&session->s_cap_lock);
+ list_del(&cap->session_caps);
+ session->s_nr_caps--;
+ spin_unlock(&session->s_cap_lock);
+ cap->session = 0;
+}
+
void ceph_remove_caps(struct ceph_inode_info *ci)
{
- dout(10, "remove_caps on %p nr %d i_caps %p\n", &ci->vfs_inode,
- ci->i_nr_caps, ci->i_caps);
+ int i;
+ dout(10, "remove_caps on %p nr %d\n", &ci->vfs_inode, ci->i_nr_caps);
if (ci->i_nr_caps) {
+ for (i=0; i<ci->i_nr_caps; i++)
+ __remove_cap(&ci->i_caps[i]);
iput(&ci->vfs_inode);
ci->i_nr_caps = 0;
if (ci->i_caps != ci->i_caps_static) {
dout(10, "2\n");
if (!cap) {
dout(10, "adding new cap inode %p for mds%d\n", inode, mds);
- cap = ceph_add_cap(inode, mds, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq));
+ cap = ceph_add_cap(inode, session, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq));
return 0;
}
s->s_mds = mds;
s->s_state = CEPH_MDS_SESSION_NEW;
s->s_cap_seq = 0;
+ spin_lock_init(&s->s_cap_lock);
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
atomic_set(&s->s_ref, 1);
return session;
}
-static void put_session(struct ceph_mds_session *s)
-{
- if (atomic_dec_and_test(&s->s_ref))
- kfree(s);
-}
-
static void unregister_session(struct ceph_mds_client *mdsc, int mds)
{
dout(10, "unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
- put_session(mdsc->sessions[mds]);
+ ceph_mdsc_put_session(mdsc->sessions[mds]);
mdsc->sessions[mds] = 0;
}
session = get_session(mdsc, from);
session->s_state = CEPH_MDS_SESSION_OPEN;
complete(&session->s_completion);
- put_session(session);
+ ceph_mdsc_put_session(session);
break;
case CEPH_SESSION_CLOSE:
dout(1, "ignoring session close from mds%d, seq %llu < my seq %llu\n",
msg->hdr.src.name.num, seq, session->s_cap_seq);
}
- put_session(session);
+ ceph_mdsc_put_session(session);
break;
default:
}
int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
- struct ceph_mds_reply_info *rinfo, int mds)
+ struct ceph_mds_reply_info *rinfo, struct ceph_mds_session **psession)
{
struct ceph_mds_request *req;
struct ceph_mds_request_head *rhead;
struct ceph_mds_session *session;
struct ceph_msg *reply = 0;
int err;
+ int mds = -1;
dout(30, "do_request on %p type %d\n", msg, msg->hdr.type);
}
if (session->s_state != CEPH_MDS_SESSION_OPEN) {
dout(30, "do_request session %p not open, %d\n", session, session->s_state);
- put_session(session);
+ ceph_mdsc_put_session(session);
goto retry;
}
- put_session(session);
/* make request? */
BUG_ON(req->r_num_mds >= 2);
spin_lock(&mdsc->lock);
/* clean up request, parse reply */
- if (!req->r_reply)
+ if (!req->r_reply) {
+ ceph_mdsc_put_session(session);
goto retry;
+ }
reply = req->r_reply;
unregister_request(mdsc, req);
spin_unlock(&mdsc->lock);
put_request(req);
- if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0)
+ if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0) {
+ ceph_mdsc_put_session(session);
return err;
+ }
dout(30, "do_request done on %p result %d tracelen %d\n", msg,
rinfo->head->result, rinfo->trace_nr);
+ if (psession)
+ *psession = session;
+ else
+ ceph_mdsc_put_session(session);
return 0;
}
char *path;
int pathlen, err;
struct dentry *dentry;
+ struct ceph_inode_info *ci;
dout(10, "send_mds_reconnect mds%d\n", mds);
dout(10, "session %p state %d\n", session, session->s_state);
/* estimate needed space */
- len += session->s_nr_caps * sizeof(struct ceph_mds_cap_reconnect);
- len += session->s_nr_caps * (100); /* ugly hack */
+ len += (session->s_nr_caps+1) * sizeof(struct ceph_mds_cap_reconnect);
+ len += (session->s_nr_caps+1) * (100); /* ugly hack */
dout(40, "estimating i need %d bytes for %d caps\n", len, session->s_nr_caps);
/* build reply */
return;
p = reply->front.iov_base;
end = p + len;
-
+
/* traverse this session's caps */
ceph_encode_8(&p, end, 0);
+ dout(10, "locking\n");
+ spin_lock(&session->s_cap_lock);
+ dout(10, "locked\n");
ceph_encode_32(&p, end, session->s_nr_caps);
+ dout(10, "iterating %p %p\n", session->s_caps.next, session->s_caps.prev);
list_for_each(cp, &session->s_caps) {
+ dout(10, "cp is %p .. %p %p\n", cp, cp->next, cp->prev);
cap = list_entry(cp, struct ceph_inode_cap, session_caps);
- dout(10, " adding cap %p on ino %lx\n", cap, cap->ci->vfs_inode.i_ino);
- ceph_encode_32(&p, end, ceph_caps_wanted(cap->ci));
- ceph_encode_32(&p, end, ceph_caps_issued(cap->ci));
- ceph_encode_64(&p, end, cap->ci->i_wr_size);
- ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_mtime); //i_wr_mtime
- ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_atime); /* atime.. fixme */
- dentry = list_entry(&cap->ci->vfs_inode.i_dentry, struct dentry, d_alias);
+ ci = cap->ci;
+ dout(10, "cap is %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode);
+ dout(10, " adding cap %p on ino %lx\n", cap, ci->vfs_inode.i_ino);
+ ceph_encode_64(&p, end, ci->vfs_inode.i_ino);
+ ceph_encode_32(&p, end, ceph_caps_wanted(ci));
+ ceph_encode_32(&p, end, ceph_caps_issued(ci));
+ ceph_encode_64(&p, end, ci->i_wr_size);
+ ceph_encode_timespec(&p, end, &ci->vfs_inode.i_mtime); //i_wr_mtime
+ ceph_encode_timespec(&p, end, &ci->vfs_inode.i_atime); /* atime.. fixme */
+ dentry = d_find_alias(&ci->vfs_inode);
path = ceph_build_dentry_path(dentry, &pathlen);
if (IS_ERR(path)) {
err = PTR_ERR(path);
ceph_encode_string(&p, end, path, pathlen);
kfree(path);
}
+ dout(10, "unlocking\n");
+ spin_unlock(&session->s_cap_lock);
len = p - reply->front.iov_base;
reply->hdr.front_len = reply->front.iov_len = len;
int s_mds;
int s_state;
__u64 s_cap_seq; /* cap message count/seq from mds */
+ spinlock_t s_cap_lock;
struct list_head s_caps;
int s_nr_caps;
atomic_t s_ref;
extern struct ceph_msg *ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
- struct ceph_mds_reply_info *rinfo, int mds);
+ struct ceph_mds_reply_info *rinfo, struct ceph_mds_session **psession);
+
+static __inline__ void ceph_mdsc_put_session(struct ceph_mds_session *s)
+{
+ if (atomic_dec_and_test(&s->s_ref))
+ kfree(s);
+}
+
extern int ceph_mdsc_parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info *info);
extern void ceph_mdsc_destroy_reply_info(struct ceph_mds_reply_info *info);
static __inline__ int ceph_encode_string(void **p, void *end, const char *s, __u32 len)
{
- BUG_ON(*p + sizeof(len) > end);
+ BUG_ON(*p + sizeof(len) + len > end);
ceph_encode_32(p, end, len);
if (len) memcpy(*p, s, len);
*p += len;
u64 seq;
int flags; /* stale, etc.? */
struct ceph_inode_info *ci;
+ struct ceph_mds_session *session;
struct list_head session_caps; /* per-session caplist */
};
/* inode.c */
extern int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info);
extern struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want);
-extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq);
+extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u32 cap, u32 seq);
extern void ceph_remove_caps(struct ceph_inode_info *ci);
extern int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, struct ceph_mds_session *session);
void terminate_sessions();
void reconnect_clients();
void handle_client_reconnect(class MClientReconnect *m);
- void process_reconnect_cap(CInode *in, int from, inode_caps_reconnect_t& capinfo);
+ void process_reconnect_cap(CInode *in, int from, ceph_mds_cap_reconnect& capinfo);
void add_reconnected_cap_inode(CInode *in) {
reconnected_caps.insert(in);
}