]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
more mds_client, first pass at bufferlist, marshalling
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 22 Oct 2007 16:58:44 +0000 (16:58 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 22 Oct 2007 16:58:44 +0000 (16:58 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1978 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/bufferlist.h
trunk/ceph/kernel/kmsg.h
trunk/ceph/kernel/mds_client.c
trunk/ceph/kernel/mds_client.h
trunk/ceph/kernel/mdsmap.c
trunk/ceph/kernel/mdsmap.h

index fac32dc89c339aa2c4e46524844dc82483969258..17978ff5430fd0e5d159edc01455821340eba624 100644 (file)
 #define CEPH_BUFFERLIST_START_IOVLEN  8  /* embed some statically, for fast normal case */
 
 struct ceph_bufferlist {
-       struct iovec *b_iov;   /* data payload */
-       struct iovec b_iov_array[CEPH_BUFFERLIST_START_IOVLEN];  
-       int b_iovlen;          /* used/defined elements in b_iov */         
-       int b_iovmax;          /* allocated size of b_iov array */
-       struct iovec b_append; /* preallocated memory for appending data to this bufferlist */
+       struct kvec *b_kv;   /* data payload */
+       struct kvec b_kv_array[CEPH_BUFFERLIST_START_KVLEN];  
+       int b_kvlen;          /* used/defined elements in b_kv */         
+       int b_kvmax;          /* allocated size of b_kv array */
+       struct kvec b_append; /* preallocated memory for appending data to this bufferlist */
 };
 
 struct ceph_bufferlist_iterator {
-       int i_iov;  /* which iov */
-       int i_off;  /* offset in that iov */
+       int i_kv;  /* which kv */
+       int i_off;  /* offset in that kv */
 };
 
+
+
+void ceph_bl_init(struct ceph_bufferlist *bl)
+{
+       memset(bl, 0, sizeof(*bl));
+}
+
+void ceph_bl_clear(struct ceph_bufferlist *bl)
+{
+       int i;
+       for (i=0; i<bl->b_kvlen; i++)
+               kfree(bl->b_kv[i]->iov_base);
+       bl->b_kvlen = 0;
+       if (bl->b_kv != bl->b_kv_array) {
+               kfree(bl->b_kv);
+               bl->b_kv = bl->b_kv_array;
+               bl->b_kvmax = CEPH_BUFFERLIST_START_KVLEN;
+       }       
+       if (bl->b_append.iov_base) {
+               kfree(bl->b_append.iov_base);
+               bl->b_append.iov_base = 0;
+       }       
+}
+
+
 /*
  * add referenced memory to the bufferlist.
- * expand b_iov array if necessary.
- * extend tail iovec if the added region is contiguous.
+ * expand b_kv array if necessary.
+ * extend tail kvec if the added region is contiguous.
  */
-void ceph_bufferlist_append_ref(struct ceph_bufferlist *bl, void *p, int len)
+void ceph_bl_append_ref(struct ceph_bufferlist *bl, void *p, int len)
 {
-       struct iovec *tmpvec;
-       if (bl->b_iovlen == bl->b_iovmax) {
-               if (bl->b_iovmax) {
-                       bl->b_iovmax *= 2;
-                       tmpvec = kmalloc(bl->b_iovmax);
-                       memcpy(tmpvec, bl->b_iov, sizeof(iovec)*bl->b_iovlen);
-                       if (bl->b_iovlen > CEPH_BUFFERLIST_START_IOVLEN)
-                               kfree(bl->b_iov);
-                       bl->b_iov = tmpvec;
-                       memset(tmpvec + bl->b_iovlen, 0, 
-                              sizeof(iovec)*(bl->b_iovmax - bl->b_iovlen));
+       struct kvec *tmpvec;
+       if (bl->b_kvlen == bl->b_kvmax) {
+               if (bl->b_kvmax) {
+                       bl->b_kvmax *= 2;
+                       tmpvec = kmalloc(bl->b_kvmax);
+                       memcpy(tmpvec, bl->b_kv, sizeof(struct kvec)*bl->b_kvlen);
+                       if (bl->b_kvlen > CEPH_BUFFERLIST_START_KVLEN)
+                               kfree(bl->b_kv);
+                       bl->b_kv = tmpvec;
+                       memset(tmpvec + bl->b_kvlen, 0, 
+                              sizeof(struct kvec)*(bl->b_kvmax - bl->b_kvlen));
                } else {
-                       bl->b_iovmax = CEPH_BUFFERLIST_START_IOVLEN;
-                       bl->b_iov = bl->b_iov_array;
+                       bl->b_kvmax = CEPH_BUFFERLIST_START_KVLEN;
+                       bl->b_kv = bl->b_kv_array;
                }
        }
 
-       if (bl->b_iovlen && 
-           p == bl->b_iov[bl->b_iovlen-1].iov_base + bl->b_iov[bl->b_iovlen-1].iov_base) {
-               bl->b_iov[bl->b_iovlen-1].iov_len += len;
+       if (bl->b_kvlen && 
+           p == bl->b_kv[bl->b_kvlen-1].kv_base + bl->b_kv[bl->b_kvlen-1].kv_base) {
+               bl->b_kv[bl->b_kvlen-1].kv_len += len;
        } else {
-               bl->b_iov[bl->b_iovlen].iov_base = p;
-               bl->b_iov[bl->b_iovlen].iov_len = len;
-               bl->b_iovlen++;
+               bl->b_kv[bl->b_kvlen].kv_base = p;
+               bl->b_kv[bl->b_kvlen].kv_len = len;
+               bl->b_kvlen++;
        }
 }
 
-void ceph_bufferlist_append_copy(struct ceph_bufferlist *bl, void *p, int len)
+void ceph_bl_append_copy(struct ceph_bufferlist *bl, void *p, int len)
 {
        int s;
        while (len > 0) {
                /* allocate more space? */
-               if (!bl->b_append.iov_len) {
-                       bl->b_append.iov_len = (len + PAGE_SIZE - 1) & ~(PAGE_SIZE-1);
-                       bl->b_append.iov_base = kmalloc(bl->b_append.iov_len, GFP_KERNEL);
+               if (!bl->b_append.kv_len) {
+                       bl->b_append.kv_len = (len + PAGE_SIZE - 1) & ~(PAGE_SIZE-1);
+                       bl->b_append.kv_base = kmalloc(bl->b_append.kv_len, GFP_KERNEL);
                }
 
                /* copy what we can */
-               s = min(bl->b_append.iov_len, len);
-               memcpy(bl->b_append.iov_base, s);
-               ceph_bufferlist_append_ref(bl, b_append.iov_base, b_append.iov_len);
+               s = min(bl->b_append.kv_len, len);
+               memcpy(bl->b_append.kv_base, s);
+               ceph_bl_append_ref(bl, b_append.kv_base, b_append.kv_len);
                len -= s;
-               bl->b_append.iov_len -= s;
+               bl->b_append.kv_len -= s;
        }
 }
 
+
+
+
+
+
+
+
+void ceph_bl_iterator_init(struct ceph_bufferlist_iterator *bli)
+{
+       memset(bli, 0, sizeof(*bli));
+}
+
+void ceph_bl_iterator_advance(struct ceph_bufferlist *bl, 
+                             struct ceph_bufferlist_iterator *bli, 
+                             int off)
+{
+
+}
+
+__u64 ceph_bl_decode_u64(struct ceph_bufferlist *bl, ceph_bufferlist_iterator *bli)
+{
+       __u64 r;
+       r = le64_to_cpu((__u64*)(bl->b_kv[bli->i_kv] + bli->i_off));
+       ceph_bl_iterator_advance(bl, bli, sizeof(__u64));
+}
+__s64 ceph_bl_decode_s64(struct ceph_bufferlist *bl, ceph_bufferlist_iterator *bli)
+{
+       __s64 r;
+       r = le64_to_cpu((__s64*)(bl->b_kv[bli->i_kv] + bli->i_off));
+       ceph_bl_iterator_advance(bl, bli, sizeof(__s64));
+}
+
+__u32 ceph_bl_decode_u32(struct ceph_bufferlist *bl, ceph_bufferlist_iterator *bli)
+{
+       __u32 r;
+       r = le32_to_cpu((__u32*)(bl->b_kv[bli->i_kv] + bli->i_off));
+       ceph_bl_iterator_advance(bl, bli, sizeof(__u32));
+}
+__s32 ceph_bl_decode_s32(struct ceph_bufferlist *bl, ceph_bufferlist_iterator *bli)
+{
+       __s32 r;
+       r = le32_to_cpu((__s32*)(bl->b_kv[bli->i_kv] + bli->i_off));
+       ceph_bl_iterator_advance(bl, bli, sizeof(__s32));
+}
+
+__u8 ceph_bl_decode_u8(struct ceph_bufferlist *bl, ceph_bufferlist_iterator *bli)
+{
+       __u8 r;
+       r = (__u8*)(bl->b_kv[bli->i_kv] + bli->i_off);
+       ceph_bl_iterator_advance(bl, bli, sizeof(__u8));
+}
+
 #endif
index d292c11ea28bcb95cc8ced6c61cb096b0c7bb8c5..2b6281ce6a483ccdd3bac53efe3f824660d0b15c 100644 (file)
@@ -21,8 +21,7 @@ struct ceph_kmsgr {
 
 struct ceph_message {
        struct ceph_message_header *msghdr;     /* header */
-       struct kvec *m_iov;                     /* data storage */
-       size_t m_iovlen;        /* is this kvec.iov_len why need it in kvec? */
+       struct ceph_bufferlist payload;
        struct list_head m_list_head;
        atomic_t nref;
 };
@@ -35,15 +34,11 @@ struct ceph_kmsg_pipe {
        /* out queue */
        struct list_head p_out_queue;
        struct ceph_message *p_out_partial;  /* partially sent message */
-       int p_out_partial_pos;
+       struct ceph_bufferlist_iterator p_out_pos;
        struct list_head p_out_sent;  /* sent but unacked; may need resend if connection drops */
 
        /* partially read message contents */
-       struct kvec *p_in_partial_iov;   /* hrm, this probably isn't what we want */
-       size_t p_in_partial_iovlen;
-       size_t p_in_parital_iovmax;  /* size of currently allocated m_iov array */
-       /* .. or something like that? .. */
-
+       struct ceph_message *p_in_partial;
 };
 
 /* 
@@ -54,7 +49,7 @@ extern void ceph_write_message(struct ceph_message *message);
 
 __inline__ void ceph_put_msg(struct ceph_message *msg) {
        if (atomic_dec_and_test(&msg->nref)) {
-               /*ceph_bufferlist_destroy(msg->payload);*/
+               ceph_bufferlist_clear(msg->payload);
                kfree(msg);
        }
 }
index 9db8c8344dbb03d9fce9619a49c25fc6f8a2b17c..e78f5f14fcb660babe84af7e8ce7e2f132648980 100644 (file)
@@ -264,8 +264,24 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message
 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, 
                          struct ceph_message *msg)
 {
-       /* write me */
-       
-       
+       struct ceph_bufferlist_iterator bli;
+       __u64 epoch;
+       __u32 left;
+
+       ceph_bl_iterator_init(&bli);
+       epoch = ceph_bl_decode_u64(&msg->payload, &bli);
+       left = ceph_bl_decode_u32(&msg->payload, &bli);
+
+       printk("ceph_mdsc_handle_map epoch %ld\n", epoch);
+
+       spin_lock(&mdsc->lock);
+       if (epoch > mdsc->mdsmap->m_epoch) {
+               ceph_mdsmap_decode(mdsc->mdsmap, &msg->payload, bli);
+               spin_unlock(&mdsc->lock);
+               complete(&mdsc->waiting_for_map);
+       } else {
+               spin_unlock(&mdsc->lock);
+       }
+
        ceph_put_msg(msg);
 }
index ea81b5ac7ed27dd982894c95352229c245c1c671..355ff4ae689e4dd56ec94140c3a41fb36487f0b0 100644 (file)
@@ -19,9 +19,7 @@ enum {
 };
 struct ceph_mds_session {
        int s_state;
-
-       /*__u64 s_push_seq;  */
-       
+       __u64 s_cap_seq;    /* cap message count from mds */
        atomic_t s_ref;
        struct completion s_completion;
 };
index ff783a1da76eef5f8f18722cc8ecaf862d9a30fa..93e26f57fb6792efd262bb2da15e30d2b1ed5767 100644 (file)
@@ -38,7 +38,59 @@ struct ceph_entity_addr *ceph_mdsmap_get_addr(ceph_mdsmap *m, int w)
        return m->m_addr[w];
 }
 
-int ceph_mdsmap_decode(ceph_mdsmap *m, ceph_bufferlist *bl)
+int ceph_mdsmap_decode(struct ceph_mdsmap *m, 
+                      struct ceph_bufferlist *bl, 
+                      struct ceph_bufferlist_iterator *bli)
 {
-       /* write me */
+       int i, n;
+       __u32 mds;
+       struct ceph_entity_inst *inst;
+       
+       m->m_epoch = ceph_bl_decode_u64(bl, bli);
+       ceph_bl_decode_u32(bl, bli); /* target_num */
+       m->m_created.tv_sec = ceph_bl_decode_u32(bl, bli);
+       m->m_created.tv_usec = ceph_bl_decode_u32(bl, bli);
+       ceph_bl_decode_u64(bl, bli); /* same_in_set_since */
+       m->m_anchortable = ceph_bl_decode_s32(bl, bli);
+       m->m_root = ceph_bl_decode_s32(bl, bli);
+       m->m_max_mds = ceph_bl_decode_u32(bl, bli);
+
+       m->m_addr = kmalloc(sizeof(struct ceph_entity_addr)*m->m_max_mds, GFP_KERNEL);
+       m->m_state = kmalloc(sizeof(__u8)*m->m_max_mds, GFP_KERNEL);
+       memset(m->m_state, 0, sizeof(__u8)*m->m_max_mds);
+       
+       /* created */
+       n = ceph_bl_decode_u32(bl, bli);
+       ceph_bl_iterator_advance(bli, n*sizeof(__u32));
+       
+       /* state */
+       n = ceph_bl_decode_u32(bl, bli);
+       for (i=0; i<n; i++) {
+               mds = ceph_bl_decode_u32(bl, bli);
+               m->m_state[mds] = ceph_bl_decode_s32(bl, bli);
+       }
+
+       /* state_seq */
+       n = ceph_bl_decode_u32(bl, bli);
+       ceph_bl_iterator_advance(bli, n*2*sizeof(__u32));
+
+       /* mds_inst */
+       n = ceph_bl_decode_u32(bl, bli);
+       for (i=0; i<n; i++) {
+               mds = ceph_bl_decode_u32(bl, bli);
+               inst = ceph
+               ceph_bl_iterator_advance(bli, sizeof(struct ceph_entity_name));
+               m->m_addr[mds].nonce = ceph_bl_decode_u64(bl, bli);
+               m->m_addr[mds].port = ceph_bl_decode_u32(bl, bli);
+               m->m_addr[mds].ipq[0] = ceph_bl_decode_u8(bl, bli);
+               m->m_addr[mds].ipq[1] = ceph_bl_decode_u8(bl, bli);
+               m->m_addr[mds].ipq[2] = ceph_bl_decode_u8(bl, bli);
+               m->m_addr[mds].ipq[3] = ceph_bl_decode_u8(bl, bli);
+       }
+
+       /* mds_inc */
+
+       return 0;
 }
+
+
index 9e169e85da16e5fc0d706b5096503d09a0d42b56..da620b99c89cacd61641a3315724801a16a8fb8e 100644 (file)
  */
 struct ceph_mdsmap {
        __u64 m_epoch;
-       __u64 m_same_in_set_since;
-       struct timeval m_created;
+       struct ceph_timeval m_created;
        __u32 m_anchortable;
        __u32 m_root;
+       __u32 m_max_mds;  /* size of m_addr, m_state arrays */
        struct ceph_entity_addr *m_addr;  /* array of addresses */
        __u8 *m_state;                    /* array of states */
-       __u32 m_max_mds;  /* size of m_addr, m_state arrays */
 };
 
 extern int ceph_mdsmap_get_random_mds(ceph_mdsmap *m);
 extern int ceph_mdsmap_get_state(ceph_mdsmap *m, int w);
 extern struct ceph_entity_addr *ceph_mdsmap_get_addr(ceph_mdsmap *m, int w);
-extern int ceph_mdsmap_decode(ceph_mdsmap *m, iovec *v);
+
+extern int ceph_mdsmap_decode(struct ceph_mdsmap *m, 
+                             struct ceph_bufferlist *bl, 
+                             struct ceph_bufferlist_iterator *bli);
 
 #endif