ceph-objs := super.o inode.o \
bufferlist.o ktcp.o messenger.o \
+ client.o \
mds_client.o mdsmap.o \
mon_client.o monmap.o
+#include <linux/ceph_fs.h>
+#include <linux/wait.h>
+#include <linux/sched.h>
#include "client.h"
#include "super.h"
-#include <include/ceph_msgs.h>
/* debug level; defined in include/ceph_fs.h */
/*
* create a fresh client instance
*/
-static struct ceph_client *create_client(ceph_mount_args *args)
+static struct ceph_client *create_client(struct ceph_mount_args *args)
{
struct ceph_client *cl;
- cl = kmalloc(sizeof(*cl), GFP_KERNEL);
+ cl = kzalloc(sizeof(*cl), GFP_KERNEL);
if (cl == NULL)
return ERR_PTR(-ENOMEM);
- memset(cl, 0, sizeof(*cl));
+
+ atomic_set(&cl->nref, 0);
+ init_waitqueue_head(&cl->mount_wq);
+ spin_lock_init(&cl->sb_lock);
cl->whoami = -1;
ceph_monc_init(&cl->monc);
*/
static int mount(struct ceph_client *client, struct ceph_mount_args *args)
{
- struct ceph_message *mount_msg;
- struct ceph_entity_inst inst;
- int ret;
+ struct ceph_msg *mount_msg;
+ int err;
int attempts = 10;
+ int which;
client->mounting = 7;
/* send mount request */
- mount_msg = ceph_new_message(CEPH_MSG_CLIENT_MOUNT, 0);
+ mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, 0, 0, 0);
if (IS_ERR(mount_msg))
return PTR_ERR(mount_msg);
- ceph_get_msg(mount_msg); /* grab ref; we may retry */
+ ceph_msg_get(mount_msg); /* grab ref; we may retry */
trymount:
- inst.name.type = CEPH_ENTITY_TYPE_MON;
- inst.name.num = get_random_int() % args->num_mon;
- inst.addr = args->mon_addr[inst.name.num];
- dout(1, "mount from mon%d, %d attempts left\n",
- inst.name.num, attempts);
- ceph_messenger_send(client->msgr, mount_msg, &inst);
+ which = get_random_int() % args->num_mon;
+ mount_msg->hdr.dst.name.type = CEPH_ENTITY_TYPE_MON;
+ mount_msg->hdr.dst.name.num = which;
+ mount_msg->hdr.dst.addr = args->mon_addr[which];
+ dout(1, "mount from mon%d, %d attempts left\n", which, attempts);
+
+ ceph_msg_send(client->msgr, mount_msg);
/* wait */
- err = wait_event_interruptible_timeout(client->mounted_wq,
- client->mounting == 0,
+ err = wait_event_interruptible_timeout(client->mount_wq,
+ (client->mounting == 0),
6*HZ);
if (err == -EINTR)
return err;
- if (atomic_read(&client->mounting)) {
+ if (client->mounting) {
dout(1, "ceph_get_client still waiting for mount, attempts=%d\n", attempts);
if (--attempts)
goto trymount;
* the monitor responds to monmap to indicate mount success.
* (or, someday, to indicate a change in the monitor cluster?)
*/
-static void handle_mon_map(struct ceph_client *client, struct ceph_message *msg)
+static void handle_mon_map(struct ceph_client *client, struct ceph_msg *msg)
{
int err;
dout(1, "handle_mon_map");
/* parse */
- err = ceph_monmap_decode(&client->monc->monmap, &msg->payload);
+ err = ceph_monmap_decode(&client->monc.monmap, msg->front.iov_base,
+ msg->front.iov_base + msg->front.iov_len);
if (err != 0)
return;
if (client->whoami < 0) {
- client->whoami = msg->dst.name.num;
- client->msgr->inst.name = msg->dst.name;
+ client->whoami = msg->hdr.dst.name.num;
+ client->msgr->inst.name = msg->hdr.dst.name;
}
clear_bit(4, &client->mounting);
if (client->mounting == 0)
- wake_up(&client->mount_wq);
+ wake_up_all(&client->mount_wq);
}
}
*/
-struct ceph_client *ceph_get_client(ceph_mount_args *args)
+struct ceph_client *ceph_get_client(struct ceph_mount_args *args)
{
struct ceph_client *client = 0;
int ret;
/* write me. */
/* create new client */
- client = create_client();
+ client = create_client(args);
if (IS_ERR(client))
- return PTR_ERR(client);
+ return client;
atomic_inc(&client->nref);
/* request mount */
*
* should be fast and non-blocking, as it is called with locks held.
*/
-static void dispatch(struct ceph_client *client, struct ceph_message *msg)
+static void dispatch(struct ceph_client *client, struct ceph_msg *msg)
{
dout(5, "dispatch %p type %d\n", (void*)msg, msg->hdr.type);
/* mds client */
case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(&client->mds_client, msg);
+ ceph_mdsc_handle_map(&client->mdsc, msg);
break;
case CEPH_MSG_CLIENT_REPLY:
- ceph_mdsc_handle_reply(&client->mds_client, msg);
+ ceph_mdsc_handle_reply(&client->mdsc, msg);
break;
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
- ceph_mdsc_handle_forward(&client->mds_client, msg);
+ ceph_mdsc_handle_forward(&client->mdsc, msg);
break;
/* osd client */
case CEPH_MSG_OSD_MAP:
- ceph_osdc_handle_map(&client->osd_client, msg);
+ ceph_osdc_handle_map(&client->osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
- ceph_osdc_handle_reply(&client->osd_client, msg);
+ ceph_osdc_handle_reply(&client->osdc, msg);
break;
default:
derr(1, "dispatch unknown message type %d\n", msg->hdr.type);
- ceph_put_msg(msg);
+ ceph_msg_put(msg);
}
}
atomic_t nref;
int mounting; /* map bitset; 4=mon, 2=mds, 1=osd map */
- wait_queue_t mount_wq;
+ wait_queue_head_t mount_wq;
struct ceph_messenger *msgr; /* messenger instance */
struct ceph_mon_client monc;
- struct ceph_mds_client mdss;
+ struct ceph_mds_client mdsc;
struct ceph_osd_client osdc;
/* lets ignore all this until later */
#include <linux/ceph_fs.h>
+#include <linux/wait.h>
+#include <linux/sched.h>
#include "mds_client.h"
#include "mon_client.h"
#include "super.h"
struct ceph_msg *
-ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
+ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
+ ceph_ino_t baseino, const char *path, const char *path2)
+{
+ struct ceph_msg *req;
+ struct ceph_client_request_head *head;
+ void *p, *end;
+
+ req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST,
+ sizeof(struct ceph_client_request_head) +
+ sizeof(baseino) + strlen(path) + 1 + strlen(path2) + 1,
+ 0, 0);
+ if (IS_ERR(req))
+ return req;
+ memset(req->front.iov_base, 0, req->front.iov_len);
+ head = req->front.iov_base;
+ p = req->front.iov_base + sizeof(*head);
+ end = req->front.iov_base + req->front.iov_len;
+
+ /* encode head */
+ head->op = cpu_to_le32(op);
+ ceph_encode_inst(&head->client_inst, &mdsc->client->msgr->inst);
+ /*FIXME: head->oldest_client_tid = cpu_to_le64(....);*/
+
+ /* encode paths */
+ ceph_encode_64(&p, end, baseino);
+ memcpy(p, path, strlen(path)+1);
+ p += strlen(path)+1;
+ memcpy(p, path2, strlen(path2)+1);
+ p += strlen(path2)+1;
+ BUG_ON(p != end);
+
+ return req;
+}
+
+struct ceph_msg *
+ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
{
struct ceph_mds_request *req;
struct ceph_mds_session *session;
void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
__u64 epoch;
- __u32 left;
+ __u32 maplen;
int err;
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
+ struct ceph_mdsmap *newmap, *oldmap;
if ((err = ceph_decode_64(&p, end, &epoch)) != 0)
goto bad;
- if ((err = ceph_decode_32(&p, end, &left)) != 0)
+ if ((err = ceph_decode_32(&p, end, &maplen)) != 0)
goto bad;
- dout(2, "ceph_mdsc_handle_map epoch %llu\n", epoch);
+ dout(2, "ceph_mdsc_handle_map epoch %llu len %d\n", epoch, (int)maplen);
+ /* do we need it? */
spin_lock(&mdsc->lock);
- if (epoch > mdsc->mdsmap->m_epoch) {
- ceph_mdsmap_decode(mdsc->mdsmap, &p, end);
+ if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
+ dout(2, "ceph_mdsc_handle_map epoch %llu < our %llu\n", epoch, mdsc->mdsmap->m_epoch);
spin_unlock(&mdsc->lock);
- complete(&mdsc->map_waiters);
+ goto out;
+ }
+ spin_unlock(&mdsc->lock);
+
+ /* decode */
+ newmap = ceph_mdsmap_decode(&p, end);
+ if (IS_ERR(newmap))
+ goto bad2;
+
+ /* swap into place */
+ spin_lock(&mdsc->lock);
+ if (mdsc->mdsmap) {
+ if (mdsc->mdsmap->m_epoch < newmap->m_epoch) {
+ oldmap = mdsc->mdsmap;
+ mdsc->mdsmap = newmap;
+ spin_unlock(&mdsc->lock);
+ ceph_mdsmap_destroy(oldmap);
+ } else {
+ spin_unlock(&mdsc->lock);
+ dout(2, "ceph_mdsc_handle_map lost decode race?\n");
+ ceph_mdsmap_destroy(newmap);
+ }
} else {
+ mdsc->mdsmap = newmap;
spin_unlock(&mdsc->lock);
+ clear_bit(2, &mdsc->client->mounting);
+ if (mdsc->client->mounting == 0)
+ wake_up(&mdsc->client->mount_wq);
}
+ complete(&mdsc->map_waiters);
out:
ceph_msg_put(msg);
bad:
dout(1, "corrupt map\n");
goto out;
+bad2:
+ dout(1, "no memory to decode new mdsmap\n");
+ goto out;
}
-
+#include <linux/err.h>
#include <linux/types.h>
#include <linux/random.h>
#include <linux/slab.h>
return &m->m_addr[w];
}
-int ceph_mdsmap_decode(struct ceph_mdsmap *m, void **p, void *end)
+struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
{
+ struct ceph_mdsmap *m;
int i, n;
__u32 mds;
int err;
+ m = kzalloc(sizeof(*m), GFP_KERNEL);
+ if (m == NULL)
+ return ERR_PTR(-ENOMEM);
+
if ((err = ceph_decode_64(p, end, &m->m_epoch)) != 0)
goto bad;
if ((err = ceph_decode_64(p, end, &m->m_client_epoch)) != 0)
goto bad;
m->m_addr = kmalloc(m->m_max_mds*sizeof(*m->m_addr), GFP_KERNEL);
- m->m_state = kmalloc(m->m_max_mds*sizeof(*m->m_state), GFP_KERNEL);
- memset(m->m_state, 0, m->m_max_mds);
+ m->m_state = kzalloc(m->m_max_mds*sizeof(*m->m_state), GFP_KERNEL);
/* state */
if ((err = ceph_decode_32(p, end, &n)) != 0)
}
/* ok, we don't care about the rest. */
- return 0;
+ return m;
bad:
derr(0, "corrupt mdsmap");
- return -EINVAL;
+ ceph_mdsmap_destroy(m);
+ return ERR_PTR(-EINVAL);
}
-
+void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
+{
+ if (m->m_addr) kfree(m->m_addr);
+ if (m->m_state) kfree(m->m_state);
+ kfree(m);
+}
extern int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w);
extern struct ceph_entity_addr *ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w);
-extern int ceph_mdsmap_decode(struct ceph_mdsmap *m, void **p, void *end);
+extern struct ceph_mdsmap * ceph_mdsmap_decode(void **p, void *end);
+extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
#endif
}
+static __inline__ int ceph_encode_64(void **p, void *end, __u64 v) {
+ BUG_ON(*p + sizeof(v) > end);
+ *(__u64*)p = cpu_to_le64(v);
+ p += sizeof(v);
+ return 0;
+}
+
+
#endif