]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdmap/crush decoding works; cleaning up mount wakeup, but still need monmap
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 30 Nov 2007 06:12:17 +0000 (06:12 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 30 Nov 2007 06:12:17 +0000 (06:12 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2155 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/client.c
trunk/ceph/kernel/mds_client.c
trunk/ceph/kernel/mdsmap.c
trunk/ceph/kernel/mdsmap.h
trunk/ceph/kernel/osd_client.c
trunk/ceph/kernel/osd_client.h

index b64bda1124621a94dab011101978b1cbbccedeec..a397bcce40f3f27d2704a76d13974ad18cf85ec4 100644 (file)
@@ -98,7 +98,7 @@ static int mount(struct ceph_client *client, struct ceph_mount_args *args)
        int which;
        char r;
        
-       client->mounting = 06;  /* FIXME don't wait for osd map, for now */
+       client->mounting = 07;  /* wait for mon+mds+osd */
 
        /* send mount request */
 trymount:
@@ -158,10 +158,6 @@ static void handle_mon_map(struct ceph_client *client, struct ceph_msg *msg)
                client->whoami = msg->hdr.dst.name.num;
                client->msgr->inst.name = msg->hdr.dst.name;
        }
-
-       clear_bit(4, &client->mounting);
-       if (client->mounting == 0)
-               wake_up(&client->mount_wq);
 }
 
 
@@ -224,7 +220,13 @@ void ceph_put_client(struct ceph_client *cl)
 }
 
 
-
+void got_first_map(struct ceph_client *client, int type)
+{
+       dout(10, "got_first_map type %d\n", type);
+       clear_bit(type, &client->mounting);
+       if (client->mounting == 0)
+               wake_up(&client->mount_wq);
+}
 
 
 /*
@@ -234,6 +236,8 @@ void ceph_put_client(struct ceph_client *cl)
  */
 void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg)
 {
+       int had;
+
        dout(5, "dispatch from %s%d type %d len %d+%d\n",
             ceph_name_type_str(msg->hdr.src.name.type), msg->hdr.src.name.num,
             msg->hdr.type, msg->hdr.front_len, msg->hdr.data_len);
@@ -242,12 +246,18 @@ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg)
        switch (msg->hdr.type) {
                /* me */
        case CEPH_MSG_MON_MAP:
+               had = client->monc.monmap.epoch ? 1:0;
                handle_mon_map(client, msg);
+               if (!had && client->monc.monmap.epoch)
+                       got_first_map(client, 4);
                break;
 
                /* mds client */
        case CEPH_MSG_MDS_MAP:
+               had = client->mdsc.mdsmap ? 1:0;
                ceph_mdsc_handle_map(&client->mdsc, msg);
+               if (!had && client->mdsc.mdsmap) 
+                       got_first_map(client, 2);
                break;
        case CEPH_MSG_CLIENT_REPLY:
                ceph_mdsc_handle_reply(&client->mdsc, msg);
@@ -258,7 +268,10 @@ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg)
 
                /* osd client */
        case CEPH_MSG_OSD_MAP:
+               had = client->osdc.osdmap ? 1:0;
                ceph_osdc_handle_map(&client->osdc, msg);
+               if (!had && client->osdc.osdmap) 
+                       got_first_map(client, 1);
                break;
        case CEPH_MSG_OSD_OPREPLY:
                ceph_osdc_handle_reply(&client->osdc, msg);
index 8604b0c22aa2ed193c6e305b80e854b0f08f6cbd..63e4464e5bd804d5ec4e801363e76fbf65f691ed 100644 (file)
@@ -632,24 +632,24 @@ bad:
 
 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
-       __u64 epoch;
+       ceph_epoch_t epoch;
        __u32 maplen;
        int err;
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
        struct ceph_mdsmap *newmap, *oldmap;
 
-       if ((err = ceph_decode_64(&p, end, &epoch)) != 0)
+       if ((err = ceph_decode_32(&p, end, &epoch)) != 0)
                goto bad;
        if ((err = ceph_decode_32(&p, end, &maplen)) != 0)
                goto bad;
 
-       dout(2, "ceph_mdsc_handle_map epoch %llu len %d\n", epoch, (int)maplen);
+       dout(2, "ceph_mdsc_handle_map epoch %u len %d\n", epoch, (int)maplen);
 
        /* do we need it? */
        spin_lock(&mdsc->lock);
        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
-               dout(2, "ceph_mdsc_handle_map epoch %llu < our %llu\n", 
+               dout(2, "ceph_mdsc_handle_map epoch %u < our %u\n", 
                     epoch, mdsc->mdsmap->m_epoch);
                spin_unlock(&mdsc->lock);
                goto out;
@@ -678,9 +678,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        } else {
                mdsc->mdsmap = newmap;
                spin_unlock(&mdsc->lock);
-               clear_bit(2, &mdsc->client->mounting);
-               if (mdsc->client->mounting == 0)
-                       wake_up(&mdsc->client->mount_wq);
        }
        complete(&mdsc->map_waiters);
 
index 35ecc7a5bb40c4a56e72d6a38c16e868cade2f87..68dea0c6188f46e53df7ee21afa72d08847fb48c 100644 (file)
@@ -57,9 +57,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
        if (m == NULL) 
                return ERR_PTR(-ENOMEM);
 
-       if ((err = ceph_decode_64(p, end, &m->m_epoch)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_epoch)) != 0)
                goto bad;
-       if ((err = ceph_decode_64(p, end, &m->m_client_epoch)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_client_epoch)) != 0)
                goto bad;
        if ((err = ceph_decode_32(p, end, &m->m_created.tv_sec)) != 0)
                goto bad;
@@ -106,7 +106,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
        }
 
        /* ok, we don't care about the rest. */
-       dout(30, "mdsmap_decode success epoch %llu\n", m->m_epoch);
+       dout(30, "mdsmap_decode success epoch %u\n", m->m_epoch);
        return m;
 
 bad:
index d4ccb1fdf6a0b7f9ef4919819b10632a7d61805e..5911ec83955f99a43b54fed22769cca467f48081 100644 (file)
@@ -26,7 +26,7 @@
  * fields limited to those the client cares about
  */
 struct ceph_mdsmap {
-       __u64 m_epoch, m_client_epoch;
+       ceph_epoch_t m_epoch, m_client_epoch;
        struct ceph_timeval m_created;
        __u32 m_anchortable;
        __u32 m_root;
index c5fa8f0f7c150e167b62869844194d6976a17f5f..fc1c6a610dab226c4a040d024c74f7257f7d8bcf 100644 (file)
@@ -8,7 +8,7 @@
 
 /* maps */
 
-static int calc_bits_of(int t) 
+static int calc_bits_of(unsigned t) 
 {
        int b = 0;
        while (t) {
@@ -27,6 +27,7 @@ static void calc_pg_masks(struct ceph_osdmap *map)
 static int crush_decode_uniform_bucket(void **p, void *end, struct crush_bucket_uniform *b)
 {
        int j, err;
+       dout(30, "crush_decode_uniform_bucket %p to %p\n", *p, end);
        b->primes = kmalloc(b->h.size * sizeof(__u32), GFP_KERNEL);
        if (b->primes == NULL)
                return -ENOMEM;
@@ -41,6 +42,7 @@ static int crush_decode_uniform_bucket(void **p, void *end, struct crush_bucket_
 static int crush_decode_list_bucket(void **p, void *end, struct crush_bucket_list *b)
 {
        int j, err;
+       dout(30, "crush_decode_list_bucket %p to %p\n", *p, end);
        b->item_weights = kmalloc(b->h.size * sizeof(__u32), GFP_KERNEL);
        if (b->item_weights == NULL)
                return -ENOMEM;
@@ -59,6 +61,7 @@ static int crush_decode_list_bucket(void **p, void *end, struct crush_bucket_lis
 static int crush_decode_tree_bucket(void **p, void *end, struct crush_bucket_tree *b)
 {
        int j, err;
+       dout(30, "crush_decode_tree_bucket %p to %p\n", *p, end);
        b->node_weights = kmalloc(b->h.size * sizeof(__u32), GFP_KERNEL);
        if (b->node_weights == NULL)
                return -ENOMEM;
@@ -71,6 +74,7 @@ static int crush_decode_tree_bucket(void **p, void *end, struct crush_bucket_tre
 static int crush_decode_straw_bucket(void **p, void *end, struct crush_bucket_straw *b)
 {
        int j, err;
+       dout(30, "crush_decode_straw_bucket %p to %p\n", *p, end);
        b->straws = kmalloc(b->h.size * sizeof(__u32), GFP_KERNEL);
        if (b->straws == NULL)
                return -ENOMEM;
@@ -85,7 +89,10 @@ static struct crush_map *crush_decode(void **p, void *end)
        struct crush_map *c;
        int err = -EINVAL;
        int i, j;
-       
+       void *start = *p;
+
+       dout(30, "crush_decode %p to %p\n", *p, end);
+
        c = kzalloc(sizeof(*c), GFP_KERNEL);
        if (c == NULL)
                return ERR_PTR(-ENOMEM);
@@ -96,6 +103,10 @@ static struct crush_map *crush_decode(void **p, void *end)
                goto bad;
        if ((err = ceph_decode_32(p, end, &c->max_devices)) < 0)
                goto bad;
+       dout(30, "max-devices %d, max buckets %d, rules %d\n", c->max_devices, c->max_buckets, c->max_rules);
+
+
+       dout(30, "crush_decode 1 %x %p to %p\n", (int)(*p-start), *p, end);
 
        c->device_offload = kmalloc(c->max_devices * sizeof(__u32), GFP_KERNEL);
        if (c->device_offload == NULL) 
@@ -107,25 +118,38 @@ static struct crush_map *crush_decode(void **p, void *end)
        if (c->bucket_parents == NULL) 
                goto badmem;
 
-       c->buckets = kzalloc(c->max_buckets * sizeof(*c->buckets), GFP_KERNEL);
+       dout(30, "crush_decode 2 %x %p to %p\n", (int)(*p-start), *p, end);
+
+       c->buckets = kmalloc(c->max_buckets * sizeof(*c->buckets), GFP_KERNEL);
        if (c->buckets == NULL) 
                goto badmem;
-       c->rules = kzalloc(c->max_rules * sizeof(*c->rules), GFP_KERNEL);
+       c->rules = kmalloc(c->max_rules * sizeof(*c->rules), GFP_KERNEL);
        if (c->rules == NULL)
                goto badmem;
 
+       dout(30, "crush_decode 3 %x %p to %p\n", (int)(*p-start), *p, end);
+
        for (i=0; i<c->max_devices; i++)
                if ((err = ceph_decode_32(p, end, &c->device_offload[i])) < 0)
                        goto bad;
 
+       dout(30, "crush_decode 5 %x %p to %p\n", (int)(*p-start), *p, end);
+
        /* buckets */
        for (i=0; i<c->max_buckets; i++) {
                int size = 0;
                __u32 type;
                struct crush_bucket *b;
 
+               dout(30, "crush_decode bucket %d off %x %p to %p\n", i, (int)(*p-start), *p, end);
+
                if ((err = ceph_decode_32(p, end, &type)) < 0)
                        goto bad;
+               dout(30, "crush_decode type %d\n", type);
+               if (type == 0) {
+                       c->buckets[i] = 0;
+                       continue;
+               }
 
                switch (type) {
                case CRUSH_BUCKET_UNIFORM:
@@ -157,6 +181,8 @@ static struct crush_map *crush_decode(void **p, void *end)
                if ((err = ceph_decode_32(p, end, &b->size)) < 0)
                        goto bad;
 
+               dout(30, "crush_decode bucket size %d off %x %p to %p\n", b->size, (int)(*p-start), *p, end);
+
                b->items = kmalloc(b->size * sizeof(__s32), GFP_KERNEL);
                if (b->items == NULL)
                        goto badmem;
@@ -191,35 +217,45 @@ static struct crush_map *crush_decode(void **p, void *end)
        /* rules */
        for (i=0; i<c->max_rules; i++) {
                __u32 yes;
+               struct crush_rule *r;
+
+               dout(30, "crush_decode rule %d off %x %p to %p\n", i, (int)(*p-start), *p, end);
+
                if ((err = ceph_decode_32(p, end, &yes)) < 0)
                        goto bad;
+               dout(30, "crush_decode yes = %d off %x %p to %p\n", yes, (int)(*p-start), *p, end);
                if (!yes) {
                        c->rules[i] = 0;
                        continue;
                }
-               c->rules[i] = kmalloc(sizeof(**c->rules), GFP_KERNEL);
-               if (c->rules[i] == NULL)
-                       goto badmem;
 
-               if ((err = ceph_decode_32(p, end, &c->rules[i]->len)) < 0)
+               if ((err = ceph_decode_32(p, end, &yes)) < 0)
                        goto bad;
-               for (j=0; j<c->rules[i]->len; j++) {
-                       if ((err = ceph_decode_32(p, end, &c->rules[i]->steps[j].op)) < 0)
+               dout(30, "crush_decode len = %d off %x %p to %p\n", yes, (int)(*p-start), *p, end);
+
+               r = c->rules[i] = kmalloc(sizeof(**c->rules) + yes*sizeof(struct crush_rule_step),
+                                         GFP_KERNEL);
+               if (r == NULL)
+                       goto badmem;
+               r->len = yes;
+               for (j=0; j<r->len; j++) {
+                       if ((err = ceph_decode_32(p, end, &r->steps[j].op)) < 0)
                                goto bad;
-                       if ((err = ceph_decode_32(p, end, &c->rules[i]->steps[j].arg1)) < 0)
+                       if ((err = ceph_decode_32(p, end, &r->steps[j].arg1)) < 0)
                                goto bad;
-                       if ((err = ceph_decode_32(p, end, &c->rules[i]->steps[j].arg2)) < 0)
+                       if ((err = ceph_decode_32(p, end, &r->steps[j].arg2)) < 0)
                                goto bad;
                }
        }
 
        
-
+       dout(30, "crush_decode done\n");
        return c;
        
 badmem:
        err = -ENOMEM;
 bad:
+       dout(30, "crush_decode fail %d\n", err);
        crush_destroy(c);
        return ERR_PTR(err);
 }
@@ -261,8 +297,11 @@ static struct ceph_osdmap *osdmap_decode(void **p, void *end)
        struct ceph_osdmap *map;
        __u32 crushlen, max;
        int err;
+       void *start = *p;
+
+       dout(30, "osdmap_decode from %p to %p\n", *p, end);
 
-       map = kmalloc(sizeof(*map), GFP_KERNEL);
+       map = kzalloc(sizeof(*map), GFP_KERNEL);
        if (map == NULL) 
                return ERR_PTR(-ENOMEM);
 
@@ -278,29 +317,38 @@ static struct ceph_osdmap *osdmap_decode(void **p, void *end)
                goto bad;
        if ((err = ceph_decode_32(p, end, &map->ctime.tv_usec)) < 0)
                goto bad;
+       if ((err = ceph_decode_32(p, end, &map->mtime.tv_sec)) < 0)
+               goto bad;
+       if ((err = ceph_decode_32(p, end, &map->mtime.tv_usec)) < 0)
+               goto bad;
 
        if ((err = ceph_decode_32(p, end, &map->pg_num)) < 0)
                goto bad;
        if ((err = ceph_decode_32(p, end, &map->localized_pg_num)) < 0)
                goto bad;
+
        calc_pg_masks(map);
 
        if ((err = ceph_decode_32(p, end, &max)) < 0)
                goto bad;
 
-       /* alloc */
+       /* (re)alloc osd arrays */
        if ((err = osdmap_set_max_osd(map, max)) < 0)
            goto bad;
+       dout(30, "osdmap_decode max_osd = %d\n", map->max_osd);
        
        /* osds */
-       if ((err = ceph_decode_copy(p, end, &map->osd_state, map->max_osd)) < 0)
+       *p += 4; /* skip length field (should match max) */
+       if ((err = ceph_decode_copy(p, end, map->osd_state, map->max_osd)) < 0)
                goto bad;
-       if ((err = ceph_decode_copy(p, end, &map->osd_addr, map->max_osd*sizeof(*map->osd_addr))) < 0)
+       *p += 4; /* skip length field (should match max) */
+       if ((err = ceph_decode_copy(p, end, map->osd_addr, map->max_osd*sizeof(*map->osd_addr))) < 0)
                goto bad;
 
        /* crush */
        if ((err = ceph_decode_32(p, end, &crushlen)) < 0)
                goto bad;
+       dout(30, "osdmap_decode crush len %d from off %x\n", crushlen, (int)(*p - start));
        map->crush = crush_decode(p, end);
        if (IS_ERR(map->crush)) {
                err = PTR_ERR(map->crush);
@@ -308,9 +356,13 @@ static struct ceph_osdmap *osdmap_decode(void **p, void *end)
                goto bad;
        }
 
+       dout(30, "osdmap_decode done %p %p\n", *p, end);
+       BUG_ON(*p < end);
+
        return map;
 
 bad:
+       dout(30, "osdmap_decode fail\n");
        osdmap_destroy(map);
        return ERR_PTR(err);
 }
@@ -447,7 +499,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                        goto bad;
                next = p + maplen;
                if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
-                       dout(10, "applying incremental map %llu len %d\n", epoch, maplen);
+                       dout(10, "applying incremental map %u len %d\n", epoch, maplen);
                        newmap = apply_incremental(p, min(p+maplen,end), osdc->osdmap);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
@@ -458,7 +510,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                osdc->osdmap = newmap;
                        }
                } else {
-                       dout(10, "ignoring incremental map %llu len %d\n", epoch, maplen);
+                       dout(10, "ignoring incremental map %u len %d\n", epoch, maplen);
                }
                p = next;
        }
@@ -466,33 +518,28 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                goto out;
        
        /* full maps */
-       dout(10, " at %p of %p offset %d\n", p, end, (int)(p - msg->front.iov_base));
        if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0)
                goto bad;
-       dout(10, " at %p of %p offset %d\n", p, end, (int)(p - msg->front.iov_base));
        dout(30, " %d full maps\n", nr_maps);
        while (nr_maps > 1) {
                if ((err = ceph_decode_32(&p, end, &epoch)) < 0)
                        goto bad;
                if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
                        goto bad;
-               dout(5, "skipping non-latest full map %lld len %d\n", epoch, maplen);
+               dout(5, "skipping non-latest full map %u len %d\n", epoch, maplen);
                p += maplen;
        }
        if (nr_maps) {
                if ((err = ceph_decode_32(&p, end, &epoch)) < 0)
                        goto bad;
-       dout(10, " at %p of %p offset %d\n", p, end, (int)(p - msg->front.iov_base));
-       dout(10, "got %llu\n", epoch);
                if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
                        goto bad;
-       dout(10, " at %p of %p offset %d\n", p, end, (int)(p - msg->front.iov_base));
                if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
-                       dout(10, "skipping full map %llu len %d, older than our %llu\n", 
+                       dout(10, "skipping full map %u len %d, older than our %u\n", 
                             epoch, maplen, osdc->osdmap->epoch);
                        p += maplen;
                } else {
-                       dout(10, "taking full map %llu len %d\n", epoch, maplen);
+                       dout(10, "taking full map %u len %d\n", epoch, maplen);
                        newmap = osdmap_decode(&p, min(p+maplen,end));
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
index c8b1df80c8380739451b1e9d4f67e77732bc5d10..598d58bae0ac1a017eddfa27e6c89281956f2e70 100644 (file)
@@ -11,8 +11,8 @@ struct ceph_msg;
 
 struct ceph_osdmap {
        struct ceph_fsid fsid;
-       __u64 epoch;
-       __u64 mon_epoch;
+       ceph_epoch_t epoch;
+       ceph_epoch_t mon_epoch;
        struct ceph_timeval ctime, mtime;
        
        __u32 pg_num, pg_num_mask;