]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: decode, map osdmap pg_temp
authorSage Weil <sage@newdream.net>
Mon, 10 Aug 2009 23:42:31 +0000 (16:42 -0700)
committerSage Weil <sage@newdream.net>
Mon, 10 Aug 2009 23:42:31 +0000 (16:42 -0700)
src/kernel/osd_client.c
src/kernel/osdmap.c
src/kernel/osdmap.h

index 553d58fb569a1f904c03fcf89278bc3ac2ededba..b59b6cc214ecf3c933ae599ac855a4488bdb32d0 100644 (file)
@@ -9,7 +9,6 @@
 #include "super.h"
 #include "osd_client.h"
 #include "messenger.h"
-#include "crush/mapper.h"
 #include "decode.h"
 
 /*
@@ -355,11 +354,7 @@ static int map_osds(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        union ceph_pg pgid;
-       struct ceph_pg_pool_info *pool;
-       int ruleno;
-       unsigned pps; /* placement ps */
-       int osds[10], osd = -1;
-       int i, num;
+       int osd = -1;
        int err;
 
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
@@ -367,36 +362,7 @@ static int map_osds(struct ceph_osd_client *osdc,
        if (err)
                return err;
        pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
-       if (pgid.pg.pool >= osdc->osdmap->num_pools)
-               return -1;
-       pool = &osdc->osdmap->pg_pool[pgid.pg.pool];
-       ruleno = crush_find_rule(osdc->osdmap->crush, pool->v.crush_ruleset,
-                                pool->v.type, pool->v.size);
-       if (ruleno < 0) {
-               pr_err("ceph map_osds no crush rule pool %d type %d size %d\n",
-                      pgid.pg.pool, pool->v.type, pool->v.size);
-               return -1;
-       }
-
-       if (pgid.pg.preferred >= 0)
-               pps = ceph_stable_mod(pgid.pg.ps,
-                                     le32_to_cpu(pool->v.lpgp_num),
-                                     pool->lpgp_num_mask);
-       else
-               pps = ceph_stable_mod(pgid.pg.ps,
-                                     le32_to_cpu(pool->v.pgp_num),
-                                     pool->pgp_num_mask);
-       pps += pgid.pg.pool;
-       num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
-                           min_t(int, pool->v.size, ARRAY_SIZE(osds)),
-                           pgid.pg.preferred, osdc->osdmap->osd_weight);
-
-       /* primary is first up osd */
-       for (i = 0; i < num; i++)
-               if (ceph_osd_is_up(osdc->osdmap, osds[i])) {
-                       osd = osds[i];
-                       break;
-               }
+       osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
        dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
             req->r_tid, pgid.pg64, pgid.pg.pool, osd, req->r_last_osd);
        if (req->r_last_osd == osd &&
@@ -706,8 +672,9 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
                        dout("applying incremental map %u len %d\n",
                             epoch, maplen);
-                       newmap = apply_incremental(&p, next, osdc->osdmap,
-                                                  osdc->client->msgr);
+                       newmap = osdmap_apply_incremental(&p, next,
+                                                         osdc->osdmap,
+                                                         osdc->client->msgr);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
                                goto bad;
index 362f5a047e30b885932157d7bbf838ddd2999bca..294168c6aaa63b68d84bbcc60e7417c914b7e49a 100644 (file)
@@ -4,6 +4,7 @@
 #include "super.h"
 #include "osdmap.h"
 #include "crush/hash.h"
+#include "crush/mapper.h"
 #include "decode.h"
 #include "ceph_debug.h"
 
@@ -146,7 +147,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
        c = kzalloc(sizeof(*c), GFP_NOFS);
        if (c == NULL)
                return ERR_PTR(-ENOMEM);
-
+       
        ceph_decode_need(p, end, 4*sizeof(u32), bad);
        ceph_decode_32(p, magic);
        if (magic != CRUSH_MAGIC) {
@@ -318,6 +319,8 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
        dout("osdmap_destroy %p\n", map);
        if (map->crush)
                crush_destroy(map->crush);
+       while (!RB_EMPTY_ROOT(&map->pg_temp))
+               rb_erase(rb_first(&map->pg_temp), &map->pg_temp);
        kfree(map->osd_state);
        kfree(map->osd_weight);
        kfree(map->pg_pool);
@@ -361,6 +364,30 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
        return 0;
 }
 
+/*
+ * Insert a new pg_temp mapping
+ */
+void __insert_pg_mapping(struct ceph_pg_mapping *new, struct rb_root *root)
+{
+       struct rb_node **p = &root->rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_pg_mapping *pg = NULL;
+
+       while (*p) {
+               parent = *p;
+               pg = rb_entry(parent, struct ceph_pg_mapping, node);
+               if (new->pgid < pg->pgid)
+                       p = &(*p)->rb_left;
+               else if (new->pgid > pg->pgid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, root);
+}
+
 /*
  * decode a full map.
  */
@@ -377,6 +404,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
        map = kzalloc(sizeof(*map), GFP_NOFS);
        if (map == NULL)
                return ERR_PTR(-ENOMEM);
+       map->pg_temp = RB_ROOT;
 
        ceph_decode_16_safe(p, end, version, bad);
 
@@ -432,6 +460,31 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
        *p += 4; /* skip length field (should match max) */
        ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
 
+       /* pg_temp */
+       ceph_decode_32_safe(p, end, len, bad);
+       for (i = 0; i < len; i++) {
+               int n, j;
+               u64 pgid;
+               struct ceph_pg_mapping *pg;
+
+               ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
+               ceph_decode_64(p, pgid);
+               ceph_decode_32(p, n);
+               ceph_decode_need(p, end, n * sizeof(u32), bad);
+               pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
+               if (!pg) {
+                       err = -ENOMEM;
+                       goto bad;
+               }
+               pg->pgid = pgid;
+               pg->len = n;
+               for (j = 0; j < n; j++)
+                       ceph_decode_32(p, pg->osds[j]);
+
+               __insert_pg_mapping(pg, &map->pg_temp);
+               dout(" added pg_temp %llx len %d\n", pgid, len);
+       }
+
        /* crush */
        ceph_decode_32_safe(p, end, len, bad);
        dout("osdmap_decode crush len %d from off 0x%x\n", len,
@@ -460,9 +513,9 @@ bad:
 /*
  * decode and apply an incremental map update.
  */
-struct ceph_osdmap *apply_incremental(void **p, void *end,
-                                     struct ceph_osdmap *map,
-                                     struct ceph_messenger *msgr)
+struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
+                                            struct ceph_osdmap *map,
+                                            struct ceph_messenger *msgr)
 {
        struct ceph_osdmap *newmap = map;
        struct crush_map *newcrush = NULL;
@@ -474,6 +527,7 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
        void *start = *p;
        int err = -EINVAL;
        u16 version;
+       struct rb_node *rbp;
 
        ceph_decode_16_safe(p, end, version, bad);
 
@@ -595,6 +649,52 @@ struct ceph_osdmap *apply_incremental(void **p, void *end,
                        map->osd_weight[osd] = off;
        }
 
+       /* new_pg_temp */
+       rbp = rb_first(&map->pg_temp);
+       ceph_decode_32_safe(p, end, len, bad);
+       while (len--) {
+               struct ceph_pg_mapping *pg;
+               int j;
+               u64 pgid;
+               u32 len;
+               ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
+               ceph_decode_64(p, pgid);
+               ceph_decode_32(p, len);
+
+               /* remove any? */
+               while (rbp && rb_entry(rbp, struct ceph_pg_mapping,
+                                      node)->pgid <= pgid) {
+                       struct rb_node *cur = rbp;
+                       rbp = rb_next(rbp);
+                       dout(" removed pg_temp %llx\n",
+                            rb_entry(cur, struct ceph_pg_mapping, node)->pgid);
+                       rb_erase(cur, &map->pg_temp);
+               }
+
+               if (len) {
+                       /* insert */
+                       ceph_decode_need(p, end, len*sizeof(u32), bad);
+                       pg = kmalloc(sizeof(*pg) + sizeof(u32)*len, GFP_NOFS);
+                       if (!pg) {
+                               err = -ENOMEM;
+                               goto bad;
+                       }
+                       pg->pgid = pgid;
+                       pg->len = len;
+                       for (j = 0; j < len; j++)
+                               ceph_decode_32(p, pg->osds[j]);
+                       __insert_pg_mapping(pg, &map->pg_temp);
+                       dout(" added pg_temp %llx len %d\n", pgid, len);
+               }
+       }
+       while (rbp) {
+               struct rb_node *cur = rbp;
+               rbp = rb_next(rbp);
+               dout(" removed pg_temp %llx\n",
+                    rb_entry(cur, struct ceph_pg_mapping, node)->pgid);
+               rb_erase(cur, &map->pg_temp);
+       }
+
        /* ignore the rest */
        *p = end;
        return map;
@@ -695,3 +795,77 @@ int ceph_calc_object_layout(struct ceph_object_layout *ol,
 
        return 0;
 }
+
+/*
+ * Calculate raw osd vector for the given pgid.  Return pointer to osd
+ * array, or NULL on failure.
+ */
+static int *calc_pg_raw(struct ceph_osdmap *osdmap, union ceph_pg pgid,
+                       int *osds, int *num)
+{
+       struct rb_node *n = osdmap->pg_temp.rb_node;
+       struct ceph_pg_mapping *pg;
+       struct ceph_pg_pool_info *pool;
+       int ruleno;
+       unsigned pps; /* placement ps */
+
+       /* pg_temp? */
+       while (n) {
+               pg = rb_entry(n, struct ceph_pg_mapping, node);
+               if (pgid.pg64 < pg->pgid)
+                       n = n->rb_left;
+               else if (pgid.pg64 > pg->pgid)
+                       n = n->rb_right;
+               else {
+                       *num = pg->len;
+                       return pg->osds;
+               }
+       }
+
+       /* crush */
+       if (pgid.pg.pool >= osdmap->num_pools)
+               return NULL;
+       pool = &osdmap->pg_pool[pgid.pg.pool];
+       ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
+                                pool->v.type, pool->v.size);
+       if (ruleno < 0) {
+               pr_err("ceph no crush rule pool %d type %d size %d\n",
+                      pgid.pg.pool, pool->v.type, pool->v.size);
+               return NULL;
+       }
+
+       if (pgid.pg.preferred >= 0)
+               pps = ceph_stable_mod(pgid.pg.ps,
+                                     le32_to_cpu(pool->v.lpgp_num),
+                                     pool->lpgp_num_mask);
+       else
+               pps = ceph_stable_mod(pgid.pg.ps,
+                                     le32_to_cpu(pool->v.pgp_num),
+                                     pool->pgp_num_mask);
+       pps += pgid.pg.pool;
+       *num = crush_do_rule(osdmap->crush, ruleno, pps, osds,
+                            min_t(int, pool->v.size, *num),
+                            pgid.pg.preferred, osdmap->osd_weight);
+       return osds;
+}
+
+/*
+ * Return primary osd for given pgid, or -1 if none.
+ */
+int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, union ceph_pg pgid)
+{
+       int rawosds[10], *osds;
+       int i, num = ARRAY_SIZE(rawosds);
+
+       osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
+       if (!osds)
+               return -1;
+
+       /* primary is first up osd */
+       for (i = 0; i < num; i++)
+               if (ceph_osd_is_up(osdmap, osds[i])) {
+                       return osds[i];
+                       break;
+               }
+       return -1;
+}
index 757aaf500759c744e532a94672e1b3be60daddb9..b2721ae375519d8d735f574b19ac018989fbefa9 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef _FS_CEPH_OSDMAP_H
 #define _FS_CEPH_OSDMAP_H
 
+#include <linux/rbtree.h>
 #include "types.h"
 #include "ceph_fs.h"
 #include "crush/crush.h"
@@ -22,6 +23,13 @@ struct ceph_pg_pool_info {
        int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask;
 };
 
+struct ceph_pg_mapping {
+       struct rb_node node;
+       u64 pgid;
+       int len;
+       int osds[];
+};
+
 struct ceph_osdmap {
        ceph_fsid_t fsid;
        u32 epoch;
@@ -35,6 +43,8 @@ struct ceph_osdmap {
        u32 *osd_weight;   /* 0 = failed, 0x10000 = 100% normal */
        struct ceph_entity_addr *osd_addr;
 
+       struct rb_root pg_temp;
+
        u32 num_pools;
        struct ceph_pg_pool_info *pg_pool;
 
@@ -64,9 +74,9 @@ static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
 }
 
 extern struct ceph_osdmap *osdmap_decode(void **p, void *end);
-extern struct ceph_osdmap *apply_incremental(void **p, void *end,
-                                            struct ceph_osdmap *map,
-                                            struct ceph_messenger *msgr);
+extern struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
+                                           struct ceph_osdmap *map,
+                                           struct ceph_messenger *msgr);
 extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
 
 /* calculate mapping of a file extent to an object */
@@ -79,5 +89,6 @@ extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
                                   const char *oid,
                                   struct ceph_file_layout *fl,
                                   struct ceph_osdmap *osdmap);
+extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, union ceph_pg pgid);
 
 #endif