From dc9766f9fada7afe0fe06fad3f20d7613ed75fea Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 9 Jan 2008 17:15:37 -0800 Subject: [PATCH] new MClientReconnect encoding, basic support in kernel client --- src/TODO | 1 + src/include/ceph_fs.h | 6 +++ src/kernel/dir.c | 52 +++++++++++++++++++ src/kernel/inode.c | 2 +- src/kernel/mds_client.c | 92 +++++++++++++++++++++++++++++++-- src/kernel/mds_client.h | 2 + src/kernel/messenger.h | 36 +++++++++++-- src/kernel/super.h | 7 ++- src/messages/MClientReconnect.h | 27 +++++++--- 9 files changed, 209 insertions(+), 16 deletions(-) diff --git a/src/TODO b/src/TODO index 75e6b84b02ee8..5f96603180a03 100644 --- a/src/TODO +++ b/src/TODO @@ -25,6 +25,7 @@ kernel client - mechanism to close out old caps - handle file caps, ack back to mds, etc. - actually flush dirty data, too + - integrate mds reply trace into cache - osd client - readpages (readahead) - async (caching) mode diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index f2155822ed0aa..bbcf6bb70a685 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -388,6 +388,12 @@ struct ceph_mds_reply_dirfrag { __u32 dist[]; } __attribute__ ((packed)); +struct ceph_mds_cap_reconnect { + __le32 wanted; + __le32 issued; + __le64 size; + struct ceph_timeval mtime, atime; +} __attribute__ ((packed)); diff --git a/src/kernel/dir.c b/src/kernel/dir.c index 7c5e5f803b140..61349cd3502fd 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -29,6 +29,58 @@ int ceph_get_dentry_path(struct dentry *dn, char *buf, struct dentry *base) return len; } +int ceph_build_dentry_path(struct dentry *dentry, char **path, int *len) +{ + struct dentry *temp; + + if (dentry == NULL) + return -EINVAL; /* not much we can do if dentry is freed and + we need to reopen the file after it was closed implicitly + when the server crashed */ + +retry: + *len = 0; + for (temp = dentry; !IS_ROOT(temp);) { + *len += (1 + temp->d_name.len); + temp = temp->d_parent; + if (temp == NULL) { + derr(1, "corrupt dentry"); + return -EINVAL; + } + } + + *path = kmalloc(*len+1, GFP_KERNEL); + if (*path == NULL) + return -ENOMEM; + (*path)[*len] = 0; /* trailing null */ + for (temp = dentry; !IS_ROOT(temp);) { + *len -= 1 + temp->d_name.len; + if (*len < 0) { + break; + } else { + (*path)[*len] = '/'; + strncpy(*path + *len + 1, temp->d_name.name, + temp->d_name.len); + dout(0, "name: %s", *path + *len); + } + temp = temp->d_parent; + if (temp == NULL) { + dout(1, "corrupt dentry"); + kfree(*path); + return -EINVAL; + } + } + if (*len != 0) { + derr(1, "did not end path lookup where expected namelen is %d", *len); + /* presumably this is only possible if racing with a rename + of one of the parent directories (we can not lock the dentries + above us to prevent this, but retrying should be harmless) */ + kfree(*path); + goto retry; + } + return 0; +} + /* * build fpos from fragment id and offset within that fragment. diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 105f1c727cb0a..03745704d362e 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -54,7 +54,7 @@ int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info) ci->i_wr_size = 0; ci->i_wr_mtime.tv_sec = 0; - ci->i_wr_mtime.tv_usec = 0; + ci->i_wr_mtime.tv_nsec = 0; inode->i_mapping->a_ops = &ceph_aops; diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 2b09b835af7f1..4219c02bc6c10 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -110,6 +110,8 @@ static void register_session(struct ceph_mds_client *mdsc, int mds) s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL); s->s_state = CEPH_MDS_SESSION_NEW; s->s_cap_seq = 0; + INIT_LIST_HEAD(&s->s_caps); + s->s_nr_caps = 0; atomic_set(&s->s_ref, 1); init_completion(&s->s_completion); mdsc->sessions[mds] = s; @@ -675,6 +677,81 @@ void kick_requests(struct ceph_mds_client *mdsc, int m) } } +/* + * send an MClientReconnect to a recovering mds + */ +void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) +{ + struct ceph_mds_session *session; + struct ceph_msg *reply; + int len = 4 + 1; + void *p, *end; + struct list_head *cp; + struct ceph_inode_cap *cap; + char *path; + int pathlen, err; + struct dentry *dentry; + + dout(10, "send_mds_reconnect mds%d\n", mds); + + /* find session */ + session = get_session(mdsc, mds); + if (!session) { + dout(20, "no session for mds%d, sending short reconnect\n", mds); + reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, 0); + if (IS_ERR(reply)) + return; + *(__u32*)reply->front.iov_base = 0; + *(__u8*)(reply->front.iov_base + 4) = 1; /* session was closed */ + goto send; + } + + /* estimate needed space */ + len += session->s_nr_caps * sizeof(struct ceph_mds_cap_reconnect); + len += session->s_nr_caps * (100); /* ugly hack */ + + /* build reply */ + reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, 0); + if (IS_ERR(reply)) + return; + p = reply->front.iov_base; + end = p + len; + + /* traverse this session's caps */ + ceph_encode_32(&p, end, session->s_nr_caps); + list_for_each(cp, &session->s_caps) { + cap = list_entry(cp, struct ceph_inode_cap, session_caps); + ceph_encode_32(&p, end, cap->ci->i_cap_wanted); + ceph_encode_32(&p, end, cap->ci->i_cap_issued); + ceph_encode_64(&p, end, cap->ci->i_wr_size); + ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_mtime); //i_wr_mtime + ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_atime); /* atime.. fixme */ + dentry = list_entry(&cap->ci->vfs_inode.i_dentry, struct dentry, d_alias); + err = ceph_build_dentry_path(dentry, &path, &pathlen); + BUG_ON(err); + if (p + pathlen + 4 + sizeof(struct ceph_mds_cap_reconnect) > end) { + /* der, realloc front */ + int off = end-p; + void *t; + dout(30, "blech, reallocating larger buffer, from %d\n", off); + t = kmalloc(off*2, GFP_KERNEL); + memcpy(t, reply->front.iov_base, off); + kfree(reply->front.iov_base); + reply->front.iov_base = t; + reply->front.iov_len = off*2; + p = t + off; + end = p + off*2; + } + ceph_encode_string(&p, end, path, pathlen); + kfree(path); + } + ceph_encode_8(&p, end, 0); + reply->front.iov_len = end-p; + +send: + send_msg_mds(mdsc, reply, mds); +} + /* * compare old and new mdsmaps, kicking requests * and closing out old connections as necessary @@ -695,12 +772,12 @@ void check_new_map(struct ceph_mds_client *mdsc, continue; oldstate = ceph_mdsmap_get_state(oldmap, i); newstate = ceph_mdsmap_get_state(newmap, i); + + dout(20, "check_new_map mds%d state %d -> %d\n", + i, oldstate, newstate); if (newstate >= oldstate) continue; /* no problem */ - dout(20, "check_new_map mds%d state %d -> %d\n", - i, oldstate, newstate); - /* notify messenger */ ceph_messenger_mark_down(mdsc->client->msgr, &oldmap->m_addr[i]); @@ -726,6 +803,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; struct ceph_mdsmap *newmap, *oldmap; + int from = msg->hdr.src.name.num; + int newstate; if ((err = ceph_decode_32(&p, end, &epoch)) != 0) goto bad; @@ -759,6 +838,13 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) if (oldmap) { check_new_map(mdsc, newmap, oldmap); ceph_mdsmap_destroy(oldmap); + + /* reconnect? */ + if (from < newmap->m_max_mds) { + newstate = ceph_mdsmap_get_state(newmap, from); + if (newstate == CEPH_MDS_STATE_RECONNECT) + send_mds_reconnect(mdsc, from); + } } } else { dout(2, "ceph_mdsc_handle_map lost decode race?\n"); diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index d6ae6334bb5d7..5afc22763336f 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -23,6 +23,8 @@ enum { struct ceph_mds_session { int s_state; __u64 s_cap_seq; /* cap message count/seq from mds */ + struct list_head s_caps; + int s_nr_caps; atomic_t s_ref; struct completion s_completion; }; diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index a5ef7b6538998..bc69dff246d1d 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -232,11 +232,25 @@ static __inline__ int ceph_encode_64(void **p, void *end, __u64 v) { static __inline__ int ceph_encode_32(void **p, void *end, __u32 v) { BUG_ON(*p + sizeof(v) > end); - *(__u32*)*p = cpu_to_le64(v); + *(__u32*)*p = cpu_to_le32(v); *p += sizeof(v); return 0; } +static __inline__ int ceph_encode_16(void **p, void *end, __u16 v) { + BUG_ON(*p + sizeof(v) > end); + *(__u16*)*p = cpu_to_le16(v); + *p += sizeof(v); + return 0; +} + +static __inline__ int ceph_encode_8(void **p, void *end, __u8 v) { + BUG_ON(*p < end); + *(__u8*)*p = v; + (*p)++; + return 0; +} + static __inline__ int ceph_encode_filepath(void **p, void *end, ceph_ino_t ino, const char *path) { __u32 len = path ? strlen(path):0; @@ -248,13 +262,27 @@ static __inline__ int ceph_encode_filepath(void **p, void *end, ceph_ino_t ino, return 0; } -static void __inline__ ceph_decode_timespec(struct timespec *ts, struct ceph_timeval *tv) +static __inline__ int ceph_encode_string(void **p, void *end, const char *s, __u32 len) +{ + BUG_ON(*p + sizeof(len) > end); + ceph_encode_32(p, end, len); + if (len) memcpy(*p, s, len); + *p += len; + return 0; +} + +static __inline__ void ceph_decode_timespec(struct timespec *ts, struct ceph_timeval *tv) { ts->tv_sec = le32_to_cpu(tv->tv_sec); ts->tv_nsec = 1000*le32_to_cpu(tv->tv_usec); } - - +static __inline__ int ceph_encode_timespec(void **p, void *end, struct timespec *ts) +{ + BUG_ON(*p + sizeof(struct ceph_timeval) > end); + ceph_encode_32(p, end, ts->tv_sec); + ceph_encode_32(p, end, ts->tv_nsec/1000); + return 0; +} #endif diff --git a/src/kernel/super.h b/src/kernel/super.h index 91041316c3a8e..238aa5d1c67e6 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -57,6 +57,8 @@ struct ceph_inode_cap { int caps; u64 seq; int flags; /* stale, etc.? */ + struct ceph_inode_info *ci; + struct list_head session_caps; /* per-session caplist */ }; struct ceph_inode_frag_map_item { @@ -77,8 +79,10 @@ struct ceph_inode_info { struct ceph_inode_cap i_caps_static[STATIC_CAPS]; atomic_t i_cap_count; /* ref count (e.g. from file*) */ + int i_cap_wanted; + int i_cap_issued; loff_t i_wr_size; - struct ceph_timeval i_wr_mtime; + struct timespec i_wr_mtime; struct inode vfs_inode; /* at end */ }; @@ -148,6 +152,7 @@ extern int ceph_release(struct inode *inode, struct file *filp); extern const struct inode_operations ceph_dir_iops; extern const struct file_operations ceph_dir_fops; extern int ceph_get_dentry_path(struct dentry *dn, char *buf, struct dentry *base); /* move me */ +extern int ceph_build_dentry_path(struct dentry *dentry, char **path, int *len); #endif /* _FS_CEPH_CEPH_H */ diff --git a/src/messages/MClientReconnect.h b/src/messages/MClientReconnect.h index ebcd36f4ae0a7..32f9612a8714b 100644 --- a/src/messages/MClientReconnect.h +++ b/src/messages/MClientReconnect.h @@ -22,7 +22,7 @@ class MClientReconnect : public Message { public: map inode_caps; map inode_path; - bool closed; // true if this session was closed by the client. + __u8 closed; // true if this session was closed by the client. MClientReconnect() : Message(CEPH_MSG_CLIENT_RECONNECT), closed(false) { } @@ -42,15 +42,28 @@ public: } void encode_payload() { + __u32 n = inode_caps.size(); + ::_encode_simple(n, payload); + for (map::iterator p = inode_caps.begin(); + p != inode_caps.end(); + p++) { + ::_encode_simple(p->first, payload); + ::_encode_simple(p->second, payload); + ::_encode_simple(inode_path[p->first], payload); + } ::_encode(closed, payload); - ::_encode(inode_caps, payload); - ::_encode(inode_path, payload); } void decode_payload() { - int off = 0; - ::_decode(closed, payload, off); - ::_decode(inode_caps, payload, off); - ::_decode(inode_path, payload, off); + bufferlist::iterator p = payload.begin(); + __u32 n; + ::_decode_simple(n, p); + while (n--) { + inodeno_t ino; + ::_decode_simple(ino, p); + ::_decode_simple(inode_caps[ino], p); + ::_decode_simple(inode_path[ino], p); + } + ::_decode_simple(closed, p); } }; -- 2.39.5