]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd map decoding in kernel client
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 20 Nov 2007 19:36:45 +0000 (19:36 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 20 Nov 2007 19:36:45 +0000 (19:36 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2098 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/TODO
trunk/ceph/kernel/osd_client.c
trunk/ceph/messages/MOSDMap.h

index 9babaf1d210f817014ddeb3437b70105af265b61..6cc4b221019d1ebc1a00e3758f58e17611d9e7e2 100644 (file)
@@ -74,7 +74,6 @@ mds
 - efficient stat for single writers
 - lstat vs stat?
 - add FILE_CAP_EXTEND capability bit
-- only share osdmap updates with clients holding capabilities
 - delayed replica caps release... we need to set a timer event? (and cancel it when appropriate?)
 
 
index 53ec45273bf7a04ac9fe70f9da3a92b6847e8f11..e4c88a5da12406e36de4e1fa21351f592e53f7ea 100644 (file)
@@ -41,17 +41,42 @@ bad:
        return ERR_PTR(err);
 }
 
-void osdmap_destroy(struct ceph_osdmap *map)
+static void osdmap_destroy(struct ceph_osdmap *map)
 {
        if (map->osd_state) kfree(map->osd_state);
        if (map->crush) kfree(map->crush);
        kfree(map);
 }
 
+static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
+{
+       __u8 *state;
+       struct ceph_entity_addr *addr;
+
+       state = kzalloc(max * (sizeof(__u32) + 
+                              sizeof(struct ceph_entity_addr)),
+                       GFP_KERNEL);
+       if (state == NULL) 
+               return -ENOMEM;
+       addr = (void*)((__u32*)state + max);
+
+       /* copy old? */
+       if (map->osd_state) {
+               memcpy(state, map->osd_state, map->max_osd);
+               memcpy(addr, map->osd_addr, map->max_osd);
+               kfree(map->osd_state);
+       }
+
+       map->osd_state = state;
+       map->osd_addr = addr;
+       map->max_osd = max;
+       return 0;
+}
+
 static struct ceph_osdmap *osdmap_decode(void **p, void *end)
 {
        struct ceph_osdmap *map;
-       __u32 crushlen;
+       __u32 crushlen, max;
        int err;
 
        map = kmalloc(sizeof(*map), GFP_KERNEL);
@@ -77,23 +102,16 @@ static struct ceph_osdmap *osdmap_decode(void **p, void *end)
                goto bad;
        calc_pg_masks(map);
 
-       if ((err = ceph_decode_32(p, end, &map->max_osd)) < 0)
+       if ((err = ceph_decode_32(p, end, &max)) < 0)
                goto bad;
 
        /* alloc */
-       map->osd_state = kmalloc(map->max_osd * (sizeof(__u32)*2 + 
-                                                sizeof(struct ceph_entity_addr)),
-                                GFP_KERNEL);
-       if (map->osd_state == NULL) 
-               goto bad;
-       map->osd_offload = (void*)((__u32*)map->osd_state + map->max_osd);
-       map->osd_addr = (void*)(map->osd_offload + map->max_osd);
+       if ((err = osdmap_set_max_osd(map, max)) < 0)
+           goto bad;
        
        /* osds */
        if ((err = ceph_decode_copy(p, end, &map->osd_state, map->max_osd)) < 0)
                goto bad;
-       if ((err = ceph_decode_copy(p, end, &map->osd_offload, map->max_osd*sizeof(*map->osd_offload))) < 0)
-               goto bad;
        if ((err = ceph_decode_copy(p, end, &map->osd_addr, map->max_osd*sizeof(*map->osd_addr))) < 0)
                goto bad;
 
@@ -114,18 +132,186 @@ bad:
        return ERR_PTR(err);
 }
 
+static struct ceph_osdmap *apply_incremental(void **p, void *end, struct ceph_osdmap *map)
+{
+       struct ceph_osdmap *newmap = map;
+       struct crush_map *newcrush = 0;
+       struct ceph_fsid fsid;
+       __u64 epoch, mon_epoch;
+       struct ceph_timeval ctime;
+       __u32 len;
+       __u32 max;
+       int err;
+
+       if ((err = ceph_decode_64(p, end, &fsid.major)) < 0)
+               goto bad;
+       if ((err = ceph_decode_64(p, end, &fsid.minor)) < 0)
+               goto bad;
+       if ((err = ceph_decode_64(p, end, &epoch)) < 0)
+               goto bad;
+       if ((err = ceph_decode_64(p, end, &mon_epoch)) < 0)
+               goto bad;
+       if ((err = ceph_decode_32(p, end, &ctime.tv_sec)) < 0)
+               goto bad;
+       if ((err = ceph_decode_32(p, end, &ctime.tv_usec)) < 0)
+               goto bad;
+
+       /* full map? */
+       if ((err = ceph_decode_32(p, end, &len)) < 0)
+               goto bad;
+       if (len > 0) {
+               newmap = osdmap_decode(p, min(*p+len, end));
+               return newmap;  /* error or not */
+       }
+       if (!map || epoch != map->epoch+1) 
+               return 0; /* old or new, or no existing; done */
 
+       /* new crush? */
+       if ((err = ceph_decode_32(p, end, &len)) < 0)
+               goto bad;
+       if (len > 0) {
+               newcrush = crush_decode(p, min(*p+len, end));
+               if (IS_ERR(newcrush))
+                       return ERR_PTR(PTR_ERR(newcrush));
+       }
 
+       /* FIXME: from this point on i'm optimisticaly assuming the message is complete */
 
-void ceph_osdc_init(struct ceph_osd_client *osdc)
-{
-       osdc->osdmap = NULL;
+       /* new max? */
+       if ((err = ceph_decode_32(p, end, &max)) < 0)
+               goto bad;
+       if (max > 0) {
+               if ((err = osdmap_set_max_osd(map, max)) < 0)
+                       goto bad;
+       }
+
+       map->epoch++;
+       map->mon_epoch = mon_epoch;
+       map->ctime = map->ctime;
+       if (newcrush) {
+               if (map->crush) 
+                       crush_destroy(map->crush);
+               map->crush = newcrush;
+       }
+
+       /* new_up */
+       if ((err = ceph_decode_32(p, end, &len)) < 0)
+               goto bad;
+       while (len--) {
+               __u32 osd;
+               struct ceph_entity_addr addr;
+               if ((err = ceph_decode_32(p, end, &osd)) < 0)
+                       goto bad;
+               if ((err = ceph_decode_addr(p, end, &addr)) < 0)
+                       goto bad;
+               dout(1, "osd%d up\n", osd);
+               BUG_ON(osd >= map->max_osd);
+               map->osd_state[osd] |= CEPH_OSD_UP;
+               map->osd_addr[osd] = addr;
+       }
+
+       /* new_down */
+       if ((err = ceph_decode_32(p, end, &len)) < 0)
+               goto bad;
+       while (len--) {
+               __u32 osd;
+               if ((err = ceph_decode_32(p, end, &osd)) < 0)
+                       goto bad;
+               dout(1, "osd%d down\n", osd);
+               if (osd < map->max_osd) 
+                       map->osd_state[osd] &= ~CEPH_OSD_UP;
+       }
+
+       /* new_offload */
+       if ((err = ceph_decode_32(p, end, &len)) < 0)
+               goto bad;
+       while (len--) {
+               __u32 osd, off;
+               if ((err = ceph_decode_32(p, end, &osd)) < 0)
+                       goto bad;
+               if ((err = ceph_decode_32(p, end, &off)) < 0)
+                       goto bad;
+               dout(1, "osd%d offload %x\n", osd, off);
+               if (osd < map->max_osd) 
+                       map->crush->device_offload[osd] = off;
+       }
+       
+       return map;
+
+bad:
+       return ERR_PTR(err);
 }
 
 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
-       dout(1, "ceph_osdc_handle_map - implement me\n");
+       void *p, *end, *next;
+       __u32 nr_maps, maplen;
+       struct ceph_osdmap *newmap = 0;
+       int err;
+
+       dout(1, "ceph_osdc_handle_map\n");
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       /* incremental maps */
+       if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0)
+               goto bad;
+       while (nr_maps--) {
+               if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
+                       goto bad;
+               next = p + maplen;
+               newmap = apply_incremental(p, min(p+maplen,end), osdc->osdmap);
+               if (IS_ERR(newmap)) {
+                       err = PTR_ERR(newmap);
+                       goto bad;
+               }
+               if (newmap != osdc->osdmap) {
+                       osdmap_destroy(osdc->osdmap);
+                       osdc->osdmap = newmap;
+               }
+               dout(1, "got incremental map %llu\n", newmap->epoch);
+               p = next;
+       }
+       if (newmap) 
+               goto out;
+       
+       /* full maps */
+       if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0)
+               goto bad;
+       while (nr_maps > 1) {
+               dout(5, "ignoring non-latest full map\n");
+               if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
+                       goto bad;
+               p += maplen;
+       }
+       if (nr_maps) {
+               if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
+                       goto bad;
+               newmap = osdmap_decode(&p, min(p+maplen,end));
+               if (IS_ERR(newmap)) {
+                       err = PTR_ERR(newmap);
+                       goto bad;
+               }
+               osdmap_destroy(osdc->osdmap);
+               osdc->osdmap = newmap;
+               dout(1, "got full map %llu\n", newmap->epoch);
+       }
+       
+out:
        ceph_msg_put(msg);
+       return;
+
+bad:
+       derr(1, "corrupt osd map message\n");
+       goto out;
 }
 
 
+
+void ceph_osdc_init(struct ceph_osd_client *osdc)
+{
+       osdc->osdmap = NULL;
+}
+
+
+
index 33ea1494728e72c66fb565e8d640ad0040711e74..08f8300c2e08dd09103741ba34435ab10f1b5d2b 100644 (file)
@@ -54,12 +54,12 @@ class MOSDMap : public Message {
   // marshalling
   virtual void decode_payload() {
     int off = 0;
-    ::_decode(maps, payload, off);
     ::_decode(incremental_maps, payload, off);
+    ::_decode(maps, payload, off);
   }
   virtual void encode_payload() {
-    ::_encode(maps, payload);
     ::_encode(incremental_maps, payload);
+    ::_encode(maps, payload);
   }
 
   virtual char *get_type_name() { return "omap"; }