From 0631674bd5fb87d5b679680d4cf193c44a2c76fc Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 20 Nov 2007 19:36:45 +0000 Subject: [PATCH] osd map decoding in kernel client git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2098 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/TODO | 1 - trunk/ceph/kernel/osd_client.c | 218 ++++++++++++++++++++++++++++++--- trunk/ceph/messages/MOSDMap.h | 4 +- 3 files changed, 204 insertions(+), 19 deletions(-) diff --git a/trunk/ceph/TODO b/trunk/ceph/TODO index 9babaf1d210f8..6cc4b221019d1 100644 --- a/trunk/ceph/TODO +++ b/trunk/ceph/TODO @@ -74,7 +74,6 @@ mds - efficient stat for single writers - lstat vs stat? - add FILE_CAP_EXTEND capability bit -- only share osdmap updates with clients holding capabilities - delayed replica caps release... we need to set a timer event? (and cancel it when appropriate?) diff --git a/trunk/ceph/kernel/osd_client.c b/trunk/ceph/kernel/osd_client.c index 53ec45273bf7a..e4c88a5da1240 100644 --- a/trunk/ceph/kernel/osd_client.c +++ b/trunk/ceph/kernel/osd_client.c @@ -41,17 +41,42 @@ bad: return ERR_PTR(err); } -void osdmap_destroy(struct ceph_osdmap *map) +static void osdmap_destroy(struct ceph_osdmap *map) { if (map->osd_state) kfree(map->osd_state); if (map->crush) kfree(map->crush); kfree(map); } +static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) +{ + __u8 *state; + struct ceph_entity_addr *addr; + + state = kzalloc(max * (sizeof(__u32) + + sizeof(struct ceph_entity_addr)), + GFP_KERNEL); + if (state == NULL) + return -ENOMEM; + addr = (void*)((__u32*)state + max); + + /* copy old? */ + if (map->osd_state) { + memcpy(state, map->osd_state, map->max_osd); + memcpy(addr, map->osd_addr, map->max_osd); + kfree(map->osd_state); + } + + map->osd_state = state; + map->osd_addr = addr; + map->max_osd = max; + return 0; +} + static struct ceph_osdmap *osdmap_decode(void **p, void *end) { struct ceph_osdmap *map; - __u32 crushlen; + __u32 crushlen, max; int err; map = kmalloc(sizeof(*map), GFP_KERNEL); @@ -77,23 +102,16 @@ static struct ceph_osdmap *osdmap_decode(void **p, void *end) goto bad; calc_pg_masks(map); - if ((err = ceph_decode_32(p, end, &map->max_osd)) < 0) + if ((err = ceph_decode_32(p, end, &max)) < 0) goto bad; /* alloc */ - map->osd_state = kmalloc(map->max_osd * (sizeof(__u32)*2 + - sizeof(struct ceph_entity_addr)), - GFP_KERNEL); - if (map->osd_state == NULL) - goto bad; - map->osd_offload = (void*)((__u32*)map->osd_state + map->max_osd); - map->osd_addr = (void*)(map->osd_offload + map->max_osd); + if ((err = osdmap_set_max_osd(map, max)) < 0) + goto bad; /* osds */ if ((err = ceph_decode_copy(p, end, &map->osd_state, map->max_osd)) < 0) goto bad; - if ((err = ceph_decode_copy(p, end, &map->osd_offload, map->max_osd*sizeof(*map->osd_offload))) < 0) - goto bad; if ((err = ceph_decode_copy(p, end, &map->osd_addr, map->max_osd*sizeof(*map->osd_addr))) < 0) goto bad; @@ -114,18 +132,186 @@ bad: return ERR_PTR(err); } +static struct ceph_osdmap *apply_incremental(void **p, void *end, struct ceph_osdmap *map) +{ + struct ceph_osdmap *newmap = map; + struct crush_map *newcrush = 0; + struct ceph_fsid fsid; + __u64 epoch, mon_epoch; + struct ceph_timeval ctime; + __u32 len; + __u32 max; + int err; + + if ((err = ceph_decode_64(p, end, &fsid.major)) < 0) + goto bad; + if ((err = ceph_decode_64(p, end, &fsid.minor)) < 0) + goto bad; + if ((err = ceph_decode_64(p, end, &epoch)) < 0) + goto bad; + if ((err = ceph_decode_64(p, end, &mon_epoch)) < 0) + goto bad; + if ((err = ceph_decode_32(p, end, &ctime.tv_sec)) < 0) + goto bad; + if ((err = ceph_decode_32(p, end, &ctime.tv_usec)) < 0) + goto bad; + + /* full map? */ + if ((err = ceph_decode_32(p, end, &len)) < 0) + goto bad; + if (len > 0) { + newmap = osdmap_decode(p, min(*p+len, end)); + return newmap; /* error or not */ + } + if (!map || epoch != map->epoch+1) + return 0; /* old or new, or no existing; done */ + /* new crush? */ + if ((err = ceph_decode_32(p, end, &len)) < 0) + goto bad; + if (len > 0) { + newcrush = crush_decode(p, min(*p+len, end)); + if (IS_ERR(newcrush)) + return ERR_PTR(PTR_ERR(newcrush)); + } + /* FIXME: from this point on i'm optimisticaly assuming the message is complete */ -void ceph_osdc_init(struct ceph_osd_client *osdc) -{ - osdc->osdmap = NULL; + /* new max? */ + if ((err = ceph_decode_32(p, end, &max)) < 0) + goto bad; + if (max > 0) { + if ((err = osdmap_set_max_osd(map, max)) < 0) + goto bad; + } + + map->epoch++; + map->mon_epoch = mon_epoch; + map->ctime = map->ctime; + if (newcrush) { + if (map->crush) + crush_destroy(map->crush); + map->crush = newcrush; + } + + /* new_up */ + if ((err = ceph_decode_32(p, end, &len)) < 0) + goto bad; + while (len--) { + __u32 osd; + struct ceph_entity_addr addr; + if ((err = ceph_decode_32(p, end, &osd)) < 0) + goto bad; + if ((err = ceph_decode_addr(p, end, &addr)) < 0) + goto bad; + dout(1, "osd%d up\n", osd); + BUG_ON(osd >= map->max_osd); + map->osd_state[osd] |= CEPH_OSD_UP; + map->osd_addr[osd] = addr; + } + + /* new_down */ + if ((err = ceph_decode_32(p, end, &len)) < 0) + goto bad; + while (len--) { + __u32 osd; + if ((err = ceph_decode_32(p, end, &osd)) < 0) + goto bad; + dout(1, "osd%d down\n", osd); + if (osd < map->max_osd) + map->osd_state[osd] &= ~CEPH_OSD_UP; + } + + /* new_offload */ + if ((err = ceph_decode_32(p, end, &len)) < 0) + goto bad; + while (len--) { + __u32 osd, off; + if ((err = ceph_decode_32(p, end, &osd)) < 0) + goto bad; + if ((err = ceph_decode_32(p, end, &off)) < 0) + goto bad; + dout(1, "osd%d offload %x\n", osd, off); + if (osd < map->max_osd) + map->crush->device_offload[osd] = off; + } + + return map; + +bad: + return ERR_PTR(err); } void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) { - dout(1, "ceph_osdc_handle_map - implement me\n"); + void *p, *end, *next; + __u32 nr_maps, maplen; + struct ceph_osdmap *newmap = 0; + int err; + + dout(1, "ceph_osdc_handle_map\n"); + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + /* incremental maps */ + if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0) + goto bad; + while (nr_maps--) { + if ((err = ceph_decode_32(&p, end, &maplen)) < 0) + goto bad; + next = p + maplen; + newmap = apply_incremental(p, min(p+maplen,end), osdc->osdmap); + if (IS_ERR(newmap)) { + err = PTR_ERR(newmap); + goto bad; + } + if (newmap != osdc->osdmap) { + osdmap_destroy(osdc->osdmap); + osdc->osdmap = newmap; + } + dout(1, "got incremental map %llu\n", newmap->epoch); + p = next; + } + if (newmap) + goto out; + + /* full maps */ + if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0) + goto bad; + while (nr_maps > 1) { + dout(5, "ignoring non-latest full map\n"); + if ((err = ceph_decode_32(&p, end, &maplen)) < 0) + goto bad; + p += maplen; + } + if (nr_maps) { + if ((err = ceph_decode_32(&p, end, &maplen)) < 0) + goto bad; + newmap = osdmap_decode(&p, min(p+maplen,end)); + if (IS_ERR(newmap)) { + err = PTR_ERR(newmap); + goto bad; + } + osdmap_destroy(osdc->osdmap); + osdc->osdmap = newmap; + dout(1, "got full map %llu\n", newmap->epoch); + } + +out: ceph_msg_put(msg); + return; + +bad: + derr(1, "corrupt osd map message\n"); + goto out; } + +void ceph_osdc_init(struct ceph_osd_client *osdc) +{ + osdc->osdmap = NULL; +} + + + diff --git a/trunk/ceph/messages/MOSDMap.h b/trunk/ceph/messages/MOSDMap.h index 33ea1494728e7..08f8300c2e08d 100644 --- a/trunk/ceph/messages/MOSDMap.h +++ b/trunk/ceph/messages/MOSDMap.h @@ -54,12 +54,12 @@ class MOSDMap : public Message { // marshalling virtual void decode_payload() { int off = 0; - ::_decode(maps, payload, off); ::_decode(incremental_maps, payload, off); + ::_decode(maps, payload, off); } virtual void encode_payload() { - ::_encode(maps, payload); ::_encode(incremental_maps, payload); + ::_encode(maps, payload); } virtual char *get_type_name() { return "omap"; } -- 2.39.5