]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: osdmap decoding bugfix; request osdmaps when pg has no primary
authorSage Weil <sage@newdream.net>
Wed, 2 Apr 2008 23:06:14 +0000 (16:06 -0700)
committerSage Weil <sage@newdream.net>
Wed, 2 Apr 2008 23:06:14 +0000 (16:06 -0700)
src/kernel/addr.c
src/kernel/client.c
src/kernel/mon_client.c
src/kernel/mon_client.h
src/kernel/osd_client.c
src/kernel/osdmap.c

index 5bc3ec282558d78e6c28c6af8e406679af23478a..ef35e941d155a5af30d4c4c052a2c41e3a3d15e3 100644 (file)
@@ -208,7 +208,6 @@ retry:
                
                for (i = 0; i < nr_pages; i++) {
                        page = pvec.pages[i];
-                       dout(30, "trying page %p\n", page);
                        if (first < 0)
                                lock_page(page);
                        else if (TestSetPageLocked(page))
@@ -243,8 +242,10 @@ retry:
                                end_page_writeback(page);
                                break;
                        }
+                       /*
                        dout(20, "writepages locked page %p index %lu\n",
                             page, page->index);
+                       */
                        kmap(page);
                        if (first < 0)
                                first = i;
index a549a4350952a003f71d5ac29c85f32021eedd5d..84cffe331d145bd4c0548c8d5a683b81d32b5830 100644 (file)
@@ -92,21 +92,25 @@ int ceph_mount(struct ceph_client *client, struct ceph_mount_args *args, struct
                mount_msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, 0, 0, 0, 0);
                if (IS_ERR(mount_msg))
                        return PTR_ERR(mount_msg);
-               mount_msg->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON);
+               mount_msg->hdr.dst.name.type = 
+                       cpu_to_le32(CEPH_ENTITY_TYPE_MON);
                mount_msg->hdr.dst.name.num = cpu_to_le32(which);
                mount_msg->hdr.dst.addr = args->mon_addr[which];
                
                ceph_msg_send(client->msgr, mount_msg, 0);
-               dout(10, "mount from mon%d, %d attempts left\n", which, attempts);
+               dout(10, "mount from mon%d, %d attempts left\n", 
+                    which, attempts);
                
                /* wait */
                dout(10, "mount sent mount request, waiting for maps\n");
-               err = wait_for_completion_timeout(&client->mount_completion, 6*HZ);
+               err = wait_for_completion_timeout(&client->mount_completion, 
+                                                 6*HZ);
                if (err == -EINTR)
                        return err; 
                if (client->mounting == 7) 
                        break;  /* success */
-               dout(10, "mount still waiting for mount, attempts=%d\n", attempts);
+               dout(10, "mount still waiting for mount, attempts=%d\n", 
+                    attempts);
                if (--attempts == 0)
                        return -EIO;
        }
@@ -146,7 +150,9 @@ static void handle_monmap(struct ceph_client *client, struct ceph_msg *msg)
        if (first) {
                client->whoami = le32_to_cpu(msg->hdr.dst.name.num);
                client->msgr->inst.name = msg->hdr.dst.name;
-               dout(1, "i am client%d\n", client->whoami);
+               dout(1, "i am client%d, fsid is %llx.%llx\n", client->whoami,
+                    le64_to_cpu(client->monc.monmap->fsid.major),
+                    le64_to_cpu(client->monc.monmap->fsid.minor));
        }
 }
 
index b13df90c1b559248628ff18c50acac2682f72e68..dced9d7715c0f184edd29a132531b5183fa63b19 100644 (file)
@@ -112,6 +112,41 @@ int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, __u32 have)
        }
 }
 
+int ceph_monc_request_osdmap(struct ceph_mon_client *monc,
+                            __u32 have, __u32 want)
+{
+       struct ceph_msg *msg;
+       int mon = pick_mon(monc, -1);
+       
+       dout(5, "ceph_monc_request_osdmap from mon%d have %u want %u\n", 
+            mon, have, want);
+       monc->want_mdsmap = have;
+       msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, 2*sizeof(__u32), 0, 0, 0);
+       if (IS_ERR(msg))
+               return PTR_ERR(msg);
+       *(__le32*)msg->front.iov_base = cpu_to_le32(have);
+       *(((__le32*)msg->front.iov_base)+1) = cpu_to_le32(want);
+       msg->hdr.dst = monc->monmap->mon_inst[mon];
+       ceph_msg_send(monc->client->msgr, msg, 0);
+       return 0;
+
+}
+
+int ceph_monc_got_osdmap(struct ceph_mon_client *monc, __u32 have)
+{
+       if (have > monc->want_osdmap) {
+               monc->want_osdmap = 0;
+               dout(5, "ceph_monc_got_osdmap have %u > wanted %u\n", 
+                    have, monc->want_osdmap);
+               return 0;
+       } else {
+               dout(5, "ceph_monc_got_osdmap have %u <= wanted %u *****\n", 
+                    have, monc->want_osdmap);
+               return -EAGAIN;
+       }
+}
+
+
 
 /*
  * statfs
index e272621a71ffe89c65ad9ac7300353e39e5306fe..19274b1438c161846d003d60533902e1ffd4f203 100644 (file)
@@ -32,6 +32,7 @@ struct ceph_mon_client {
        u64 last_tid;
 
        u32 want_mdsmap;  /* protected by caller's lock */
+       u32 want_osdmap;  /* protected by caller's lock */
 };
 
 extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end);
@@ -42,8 +43,10 @@ extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
 extern int ceph_monc_request_mdsmap(struct ceph_mon_client *monc, __u32 have);
 extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, __u32 have);
 
+extern int ceph_monc_request_osdmap(struct ceph_mon_client *monc,
+                                   __u32 have, __u32 want);
+extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, __u32 have);
 
-extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, __u64 have);
 extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
 extern void ceph_monc_report_failure(struct ceph_mon_client *monc, struct ceph_entity_inst *who);
 
index 604099bf427c738bd4b54040481ec3d63ba6f5cc..46ed855ed9d9db73d808555791114567aaa3d01d 100644 (file)
@@ -42,14 +42,14 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        struct ceph_osdmap *newmap = 0;
        int err;
 
-       dout(1, "handle_map\n");
+       dout(1, "handle_map, have %u\n", osdc->osdmap ? osdc->osdmap->epoch:0);
        p = msg->front.iov_base;
        end = p + msg->front.iov_len;
 
        /* incremental maps */
        ceph_decode_32_safe(&p, end, nr_maps, bad);
        dout(10, " %d inc maps\n", nr_maps);
-       while (nr_maps--) {
+       while (nr_maps > 0) {
                ceph_decode_need(&p, end, 2*sizeof(__u32), bad);
                ceph_decode_32(&p, epoch);
                ceph_decode_32(&p, maplen);
@@ -68,9 +68,11 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                osdc->osdmap = newmap;
                        }
                } else {
-                       dout(10, "ignoring incremental map %u len %d\n", epoch, maplen);
+                       dout(10, "ignoring incremental map %u len %d\n",
+                            epoch, maplen);
                }
                p = next;
+               nr_maps--;
        }
        if (newmap) 
                goto out;
@@ -86,6 +88,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                dout(5, "skipping non-latest full map %u len %d\n", 
                     epoch, maplen);
                p += maplen;
+               nr_maps--;
        }
        if (nr_maps) {
                ceph_decode_need(&p, end, 2*sizeof(__u32), bad);
@@ -111,6 +114,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        }
        dout(1, "handle_map done\n");
        
+       ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+
        /* kick any pending requests that need kicking */
        /* WRITE ME */
 
@@ -229,7 +234,9 @@ static void send_request(struct ceph_osd_client *osdc,
                ceph_msg_get(req->r_request); /* send consumes a ref */
                ceph_msg_send(osdc->client->msgr, req->r_request, 0);
        } else {
-               dout(10, "send_request no osds in pg are up\n");
+               dout(10, "send_request no osds in this pg are up\n");
+               ceph_monc_request_osdmap(&osdc->client->monc, 
+                                        osdc->osdmap->epoch, 0);
        }
 }
 
index 7a1d6d4815848b7e80c639280e27e2e2bc7545e6..0bd042e90653f256d35add3c5dad5afa577d527c 100644 (file)
@@ -391,7 +391,7 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
        __u32 epoch;
        struct ceph_timespec ctime;
        __u32 len;
-       __u32 max;
+       __s32 max;
        int err = -EINVAL;
 
        ceph_decode_need(p, end, 2*sizeof(__u64)+4*sizeof(__u32), bad);
@@ -405,6 +405,8 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
        /* full map? */
        ceph_decode_32(p, len);
        if (len > 0) {
+               dout(20, "apply_incremental full map len %d, %p to %p\n",
+                    len, *p, end);
                newmap = osdmap_decode(p, min(*p+len, end));
                return newmap;  /* error or not */
        }
@@ -412,6 +414,8 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
        /* new crush? */
        ceph_decode_32_safe(p, end, len, bad);
        if (len > 0) {
+               dout(20, "apply_incremental new crush map len %d, %p to %p\n",
+                    len, *p, end);
                newcrush = crush_decode(p, min(*p+len, end));
                if (IS_ERR(newcrush))
                        return ERR_PTR(PTR_ERR(newcrush));
@@ -426,7 +430,7 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
        ceph_decode_need(p, end, 3*sizeof(__u32), bad);
        ceph_decode_32(p, max);
        *p += 4*sizeof(__u32);  /* skip new_pg_num et al for now.  FIXME. */
-       if (max > 0) {
+       if (max >= 0) {
                if ((err = osdmap_set_max_osd(map, max)) < 0)
                        goto bad;
        }
@@ -474,7 +478,15 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
                if (osd < map->max_osd) 
                        map->crush->device_offload[osd] = off;
        }
-       
+
+       /* skip old/new pg_swap stuff */
+       ceph_decode_32_safe(p, end, len, bad);
+       *p += len * (sizeof(__u64) + sizeof(__u32));
+       ceph_decode_32_safe(p, end, len, bad);
+       *p += len * sizeof(__u64);
+
+       if (*p != end)
+               goto bad;
        return map;
 
 bad: