obj-$(CONFIG_CEPH_FS) += ceph.o
-ceph-objs := inode.o bufferlist.o ktcp.o super.o messenger.o poll.o mds_client.o osd_client.o
+ceph-objs := super.o inode.o \
+ bufferlist.o ktcp.o messenger.o poll.o \
+ mds_client.o mdsmap.o \
+ mon_client.o monmap.o
struct ceph_bufferlist_iterator *bli,
int off)
{
-
+
}
int ceph_bl_decode_have(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, int s)
{
{
if (!ceph_bl_decode_have(bl, bli, sizeof(*v)))
return -EINVAL;
- *v = le64_to_cpu((__u64*)(bl->b_kv[bli->i_kv].iov_base + bli->i_off));
+ *v = le64_to_cpu(*(__u64*)(bl->b_kv[bli->i_kv].iov_base + bli->i_off));
ceph_bl_iterator_advance(bl, bli, sizeof(*v));
return 0;
}
{
if (!ceph_bl_decode_have(bl, bli, sizeof(*v)))
return -EINVAL;
- *v = le32_to_cpu((__u64*)(bl->b_kv[bli->i_kv].iov_base + bli->i_off));
+ *v = le32_to_cpu(*(__u32*)(bl->b_kv[bli->i_kv].iov_base + bli->i_off));
ceph_bl_iterator_advance(bl, bli, sizeof(*v));
return 0;
}
{
if (!ceph_bl_decode_have(bl, bli, sizeof(*v)))
return -EINVAL;
- *v = le16_to_cpu((__u64*)(bl->b_kv[bli->i_kv].iov_base + bli->i_off));
+ *v = le16_to_cpu(*(__u16*)(bl->b_kv[bli->i_kv].iov_base + bli->i_off));
ceph_bl_iterator_advance(bl, bli, sizeof(*v));
return 0;
}
ceph_bl_copy(bl, bli, v, sizeof(v));
return 0;
}
+
+
extern void ceph_bl_iterator_init(struct ceph_bufferlist_iterator *bli);
+extern int ceph_bl_copy(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, void *p, int len);
+
+extern void ceph_bl_iterator_advance(struct ceph_bufferlist *bl,
+ struct ceph_bufferlist_iterator *bli,
+ int off);
+extern int ceph_bl_decode_have(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, int s);
+
extern int ceph_bl_decode_64(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, __u64 *v);
extern int ceph_bl_decode_32(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, __u32 *v);
extern int ceph_bl_decode_16(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, __u16 *v);
extern int ceph_bl_decode_8(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, __u8 *v);
-extern int ceph_bl_copy(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, void *p, int len);
+
#endif
+#include <linux/ceph_fs.h>
#include "mds_client.h"
#include "mon_client.h"
#include "super.h"
}
}
-static void get_session(struct ceph_mds_session *s)
-{
- atomic_inc(&s->s_ref);
-}
-
-static void put_session(struct ceph_mds_session *s)
-{
- if (atomic_dec_and_test(&s->s_ref))
- kfree(s);
-}
/*
* register an in-flight request
atomic_set(&mdsc->sessions[mds]->s_ref, 1);
}
+static struct ceph_mds_session *get_session(struct ceph_mds_client *mdsc, int mds)
+{
+ struct ceph_mds_session *session;
+
+ if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0)
+ register_session(mdsc, mds);
+ session = mdsc->sessions[mds];
+
+ atomic_inc(&session->s_ref);
+ return session;
+}
+
+static void put_session(struct ceph_mds_session *s)
+{
+ if (atomic_dec_and_test(&s->s_ref))
+ kfree(s);
+}
+
+static void unregister_session(struct ceph_mds_client *mdsc, int mds)
+{
+ put_session(mdsc->sessions[mds]);
+ mdsc->sessions[mds] = 0;
+}
+
+static struct ceph_message *create_session_msg(__u32 op, __u64 seq)
+{
+ struct ceph_message *msg;
+
+ msg = ceph_new_message(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64));
+ if (IS_ERR(msg))
+ return ERR_PTR(-ENOMEM); /* fixme */
+ op = cpu_to_le32(op);
+ ceph_bl_append_copy(&msg->payload, &op, sizeof(op));
+ seq = cpu_to_le64(op);
+ ceph_bl_append_copy(&msg->payload, &seq, sizeof(seq));
+ return msg;
+}
+
static void open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int mds)
{
struct ceph_message *msg;
return;
}
- /* prepare connect message */
-
- /* send */
+ /* send connect message */
+ msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_cap_seq);
+ if (IS_ERR(msg))
+ return; /* fixme */
send_msg_mds(mdsc, msg, mds);
}
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+{
+ __u32 op;
+ __u64 seq;
+ int err;
+ struct ceph_mds_session *session;
+ struct ceph_bufferlist_iterator bli = {0, 0};
+ int from = msg->hdr.src.name.num;
+
+ /* decode */
+ if ((err = ceph_bl_decode_32(&msg->payload, &bli, &op)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_64(&msg->payload, &bli, &seq)) != 0)
+ goto bad;
+
+ /* handle */
+ dout(1, "handle_session op %d seq %llu\n", op, seq);
+ spin_lock(&mdsc->lock);
+ switch (op) {
+ case CEPH_SESSION_OPEN:
+ dout(1, "session open from mds%d\n", from);
+ session = get_session(mdsc, from);
+ session->s_state = CEPH_MDS_SESSION_OPEN;
+ complete(&session->s_completion);
+ put_session(session);
+ break;
+
+ case CEPH_SESSION_CLOSE:
+ session = get_session(mdsc, from);
+ if (session->s_cap_seq == seq) {
+ dout(1, "session close from mds%d\n", from);
+ complete(&session->s_completion); /* for good measure */
+ unregister_session(mdsc, from);
+ } else {
+ dout(1, "ignoring session close from mds%d, seq %llu < my seq %llu\n",
+ msg->hdr.src.name.num, seq, session->s_cap_seq);
+ }
+ put_session(session);
+ break;
+
+ default:
+ dout(0, "bad session op %d\n", op);
+ BUG_ON(1);
+ }
+ spin_unlock(&mdsc->lock);
+
+out:
+ ceph_put_msg(msg);
+ return;
+
+bad:
+ dout(1, "corrupt session message\n");
+ goto out;
+}
static void wait_for_new_map(struct ceph_mds_client *mdsc)
}
/* get session */
- if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0)
- register_session(mdsc, mds);
- session = mdsc->sessions[mds];
- get_session(session);
+ session = get_session(mdsc, mds);
/* open? */
if (mdsc->sessions[mds]->s_state == CEPH_MDS_SESSION_IDLE)
struct ceph_mds_request *req;
__u64 tid;
- /* parse reply */
+ /* decode */
+
+ /* handle */
spin_lock(&mdsc->lock);
req = radix_tree_lookup(&mdsc->request_tree, tid);
if (!req) {
void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message *msg)
{
struct ceph_mds_request *req;
- int next_mds;
- int fwd_seq;
__u64 tid;
-
- /* parse reply */
+ __u32 next_mds;
+ __u32 fwd_seq;
+ int err;
+ struct ceph_bufferlist_iterator bli = {0, 0};
+ /* decode */
+ if ((err = ceph_bl_decode_64(&msg->payload, &bli, &tid)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(&msg->payload, &bli, &next_mds)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(&msg->payload, &bli, &fwd_seq)) != 0)
+ goto bad;
-
+ /* handle */
spin_lock(&mdsc->lock);
req = radix_tree_lookup(&mdsc->request_tree, tid);
if (req) get_request(req);
}
put_request(req);
+
+out:
ceph_put_msg(msg);
+ return;
+
+bad:
+ derr(0, "corrupt forward message\n");
+ goto out;
}
};
struct ceph_mds_session {
int s_state;
- __u64 s_cap_seq; /* cap message count from mds */
+ __u64 s_cap_seq; /* cap message count/seq from mds */
atomic_t s_ref;
struct completion s_completion;
};
-#include "mdsmap.h"
+#include <linux/types.h>
#include <linux/random.h>
+#include <linux/slab.h>
+#include <asm/bug.h>
+
+#include "mdsmap.h"
+#include "messenger.h"
-int ceph_mdsmap_get_state(ceph_mdsmap *m, int w)
+int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
{
BUG_ON(w < 0);
if (w >= m->m_max_mds)
return CEPH_MDS_STATE_DNE;
- return = m->m_state[w];
+ return m->m_state[w];
}
-int ceph_mdsmap_get_random_mds(ceph_mdsmap *m)
+int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
{
int n = 0;
int i;
n = get_random_int() % n;
i = 0;
for (i=0; n>0; i++, n--)
- while (m->state[i] <= 0) i++;
+ while (m->m_state[i] <= 0)
+ i++;
return i;
}
-struct ceph_entity_addr *ceph_mdsmap_get_addr(ceph_mdsmap *m, int w)
+struct ceph_entity_addr *ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
{
if (w >= m->m_max_mds)
return NULL;
- return m->m_addr[w];
+ return &m->m_addr[w];
}
int ceph_mdsmap_decode(struct ceph_mdsmap *m,
{
int i, n;
__u32 mds;
- struct ceph_entity_inst *inst;
+ int err;
- m->m_epoch = ceph_bl_decode_u64(bl, bli);
- ceph_bl_decode_u32(bl, bli); /* target_num */
- m->m_created.tv_sec = ceph_bl_decode_u32(bl, bli);
- m->m_created.tv_usec = ceph_bl_decode_u32(bl, bli);
- ceph_bl_decode_u64(bl, bli); /* same_in_set_since */
- m->m_anchortable = ceph_bl_decode_s32(bl, bli);
- m->m_root = ceph_bl_decode_s32(bl, bli);
- m->m_max_mds = ceph_bl_decode_u32(bl, bli);
+ if ((err = ceph_bl_decode_64(bl, bli, &m->m_epoch)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_64(bl, bli, &m->m_client_epoch)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(bl, bli, &m->m_created.tv_sec)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(bl, bli, &m->m_created.tv_usec)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(bl, bli, &m->m_anchortable)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(bl, bli, &m->m_root)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(bl, bli, &m->m_max_mds)) != 0)
+ goto bad;
- m->m_addr = kmalloc(sizeof(struct ceph_entity_addr)*m->m_max_mds, GFP_KERNEL);
- m->m_state = kmalloc(sizeof(__u8)*m->m_max_mds, GFP_KERNEL);
- memset(m->m_state, 0, sizeof(__u8)*m->m_max_mds);
-
- /* created */
- n = ceph_bl_decode_u32(bl, bli);
- ceph_bl_iterator_advance(bli, n*sizeof(__u32));
+ m->m_addr = kmalloc(m->m_max_mds*sizeof(*m->m_addr), GFP_KERNEL);
+ m->m_state = kmalloc(m->m_max_mds*sizeof(*m->m_state), GFP_KERNEL);
+ memset(m->m_state, 0, m->m_max_mds);
/* state */
- n = ceph_bl_decode_u32(bl, bli);
+ if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0)
+ goto bad;
for (i=0; i<n; i++) {
- mds = ceph_bl_decode_u32(bl, bli);
- m->m_state[mds] = ceph_bl_decode_s32(bl, bli);
+ if ((err = ceph_bl_decode_32(bl, bli, &mds)) != 0)
+ goto bad;
+ if ((err = ceph_bl_decode_32(bl, bli, &m->m_state[mds])) != 0)
+ goto bad;
}
/* state_seq */
- n = ceph_bl_decode_u32(bl, bli);
- ceph_bl_iterator_advance(bli, n*2*sizeof(__u32));
-
+ if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0)
+ goto bad;
+ ceph_bl_iterator_advance(bl, bli, n*(sizeof(__u32)+sizeof(__u64)));
+
/* mds_inst */
- n = ceph_bl_decode_u32(bl, bli);
+ if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0)
+ goto bad;
for (i=0; i<n; i++) {
- mds = ceph_bl_decode_u32(bl, bli);
- inst = ceph
- ceph_bl_iterator_advance(bli, sizeof(struct ceph_entity_name));
- m->m_addr[mds].nonce = ceph_bl_decode_u64(bl, bli);
- m->m_addr[mds].port = ceph_bl_decode_u32(bl, bli);
- m->m_addr[mds].ipq[0] = ceph_bl_decode_u8(bl, bli);
- m->m_addr[mds].ipq[1] = ceph_bl_decode_u8(bl, bli);
- m->m_addr[mds].ipq[2] = ceph_bl_decode_u8(bl, bli);
- m->m_addr[mds].ipq[3] = ceph_bl_decode_u8(bl, bli);
+ if ((err = ceph_bl_decode_32(bl, bli, &mds)) != 0)
+ goto bad;
+ ceph_bl_iterator_advance(bl, bli, sizeof(struct ceph_entity_name));
+ if ((err = ceph_bl_decode_addr(bl, bli, &m->m_addr[mds])) != 0)
+ goto bad;
}
- /* mds_inc */
-
+ /* ok, we don't care about the rest. */
return 0;
+
+bad:
+ derr(0, "corrupt mdsmap");
+ return -EINVAL;
}
#define _FS_CEPH_MDSMAP_H
#include <linux/ceph_fs.h>
+#include "bufferlist.h"
/* see mds/MDSMap.h */
#define CEPH_MDS_STATE_DNE 0 /* down, never existed. */
* fields limited to those the client cares about
*/
struct ceph_mdsmap {
- __u64 m_epoch;
+ __u64 m_epoch, m_client_epoch;
struct ceph_timeval m_created;
__u32 m_anchortable;
__u32 m_root;
__u32 m_max_mds; /* size of m_addr, m_state arrays */
struct ceph_entity_addr *m_addr; /* array of addresses */
- __u8 *m_state; /* array of states */
+ __s32 *m_state; /* array of states */
};
extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
#include <net/tcp.h>
#include <linux/ceph_fs.h>
-#include <linux/ceph_fs_msgs.h>
#include "messenger.h"
#include "ktcp.h"
return m;
}
+
+
+int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v)
+{
+ int err;
+ if (!ceph_bl_decode_have(bl, bli, sizeof(*v)))
+ return -EINVAL;
+ if ((err = ceph_bl_decode_32(bl, bli, &v->erank)) != 0)
+ return -EINVAL;
+ if ((err = ceph_bl_decode_32(bl, bli, &v->nonce)) != 0)
+ return -EINVAL;
+ ceph_bl_copy(bl, bli, &v->ipaddr, sizeof(v->ipaddr));
+ return 0;
+}
}
+extern int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v);
+
#endif
#define _FS_CEPH_MON_CLIENT_H
#include "monmap.h"
+#include "messenger.h"
struct ceph_mount_args;
+#include <linux/module.h>
#include <linux/parser.h>
#include <linux/fs.h>
#include <linux/mount.h>
inode_init_once(&ci->vfs_inode);
}
-static int init_inodecache(void *foo, struct kmem_cache *cachep, unsigned long flags)
+static int init_inodecache(void)
{
ceph_inode_cachep = kmem_cache_create("ceph_inode_cache",
sizeof(struct ceph_inode_info),
{Opt_monport, "monport=%d"}
};
+static int parse_ip(char *c, int len, struct ceph_entity_addr *addr)
+{
+ dout(1, "parse_ip on %s len %d\n", c, len);
-static int parse_mount_args(int flags, char *options, char *dev_name, struct ceph_mount_args *args)
+ return 0;
+}
+
+static int parse_mount_args(int flags, char *options, const char *dev_name, struct ceph_mount_args *args)
{
char *c;
int len;
args->mntflags = flags;
args->flags = 0;
args->mon_port = CEPH_MON_PORT;
-
- /* get mon hostname, relative path */
+
+ /* ip1[,ip2...]:/server/path */
c = strchr(dev_name, ':');
if (c == NULL)
return -EINVAL;
+
+ /* get mon ip */
+ /* er, just one for now. later, comma-separate... */
len = c - dev_name;
- if (len >= sizeof(args->mon_hostname))
- return -ENAMETOOLONG;
- strncpy(args->mon_hostname, dev_name, len);
+ parse_ip(c, len, &args->mon_addr[0]);
+ args->mon_addr[0].ipaddr.sin_family = AF_INET;
+ args->mon_addr[0].ipaddr.sin_port = CEPH_MON_PORT;
+ args->mon_addr[0].erank = 0;
+ args->mon_addr[0].nonce = 0;
+ args->num_mon = 1;
+ /* path on server */
c++;
- if (strlen(c) >= sizeof(data->path))
+ if (strlen(c) >= sizeof(args->path))
return -ENAMETOOLONG;
- strcpy(args.path, c);
+ strcpy(args->path, c);
- dout(1, "mon %s, path %s\n", args->mon_hostname, args->path);
+ dout(1, "server path %s\n", args->path);
/* parse mount options */
while ((c = strsep(&options, ",")) != NULL) {
int token;
int intval;
int ret;
- if (!*p)
+ if (!*c)
continue;
- token = match_token(p, arg_tokens, argstr);
+ token = match_token(c, arg_tokens, argstr);
ret = match_int(&argstr[0], &intval);
if (ret < 0) {
dout(0, "bad mount arg\n");
struct super_block *s;
struct ceph_mount_args mount_args;
struct ceph_super_info *sbinfo;
- int ret;
+ int error;
int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
dout(1, "ceph_get_sb\n");
- error = parse_mount_args(data, dev_name, &mount_args);
+ error = parse_mount_args(flags, data, dev_name, &mount_args);
if (error < 0)
goto out;
/* client */
if (!sbinfo->sb_client) {
sbinfo->sb_client = ceph_get_client(&mount_args);
- if (PTR_ERR(!sbinfo->sb_client)) {
+ if (PTR_ERR(sbinfo->sb_client)) {
error = PTR_ERR(sbinfo->sb_client);
goto out_splat;
}