#include "super.h"
#include "osd_client.h"
#include "messenger.h"
-#include "crush/mapper.h"
#include "decode.h"
/*
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
union ceph_pg pgid;
- struct ceph_pg_pool_info *pool;
- int ruleno;
- unsigned pps; /* placement ps */
- int osds[10], osd = -1;
- int i, num;
+ int osd = -1;
int err;
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
if (err)
return err;
pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
- if (pgid.pg.pool >= osdc->osdmap->num_pools)
- return -1;
- pool = &osdc->osdmap->pg_pool[pgid.pg.pool];
- ruleno = crush_find_rule(osdc->osdmap->crush, pool->v.crush_ruleset,
- pool->v.type, pool->v.size);
- if (ruleno < 0) {
- pr_err("ceph map_osds no crush rule pool %d type %d size %d\n",
- pgid.pg.pool, pool->v.type, pool->v.size);
- return -1;
- }
-
- if (pgid.pg.preferred >= 0)
- pps = ceph_stable_mod(pgid.pg.ps,
- le32_to_cpu(pool->v.lpgp_num),
- pool->lpgp_num_mask);
- else
- pps = ceph_stable_mod(pgid.pg.ps,
- le32_to_cpu(pool->v.pgp_num),
- pool->pgp_num_mask);
- pps += pgid.pg.pool;
- num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
- min_t(int, pool->v.size, ARRAY_SIZE(osds)),
- pgid.pg.preferred, osdc->osdmap->osd_weight);
-
- /* primary is first up osd */
- for (i = 0; i < num; i++)
- if (ceph_osd_is_up(osdc->osdmap, osds[i])) {
- osd = osds[i];
- break;
- }
+ osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
req->r_tid, pgid.pg64, pgid.pg.pool, osd, req->r_last_osd);
if (req->r_last_osd == osd &&
if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
dout("applying incremental map %u len %d\n",
epoch, maplen);
- newmap = apply_incremental(&p, next, osdc->osdmap,
- osdc->client->msgr);
+ newmap = osdmap_apply_incremental(&p, next,
+ osdc->osdmap,
+ osdc->client->msgr);
if (IS_ERR(newmap)) {
err = PTR_ERR(newmap);
goto bad;
#include "super.h"
#include "osdmap.h"
#include "crush/hash.h"
+#include "crush/mapper.h"
#include "decode.h"
#include "ceph_debug.h"
c = kzalloc(sizeof(*c), GFP_NOFS);
if (c == NULL)
return ERR_PTR(-ENOMEM);
-
+
ceph_decode_need(p, end, 4*sizeof(u32), bad);
ceph_decode_32(p, magic);
if (magic != CRUSH_MAGIC) {
dout("osdmap_destroy %p\n", map);
if (map->crush)
crush_destroy(map->crush);
+ while (!RB_EMPTY_ROOT(&map->pg_temp))
+ rb_erase(rb_first(&map->pg_temp), &map->pg_temp);
kfree(map->osd_state);
kfree(map->osd_weight);
kfree(map->pg_pool);
return 0;
}
+/*
+ * Insert a new pg_temp mapping
+ */
+void __insert_pg_mapping(struct ceph_pg_mapping *new, struct rb_root *root)
+{
+ struct rb_node **p = &root->rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_pg_mapping *pg = NULL;
+
+ while (*p) {
+ parent = *p;
+ pg = rb_entry(parent, struct ceph_pg_mapping, node);
+ if (new->pgid < pg->pgid)
+ p = &(*p)->rb_left;
+ else if (new->pgid > pg->pgid)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ rb_link_node(&new->node, parent, p);
+ rb_insert_color(&new->node, root);
+}
+
/*
* decode a full map.
*/
map = kzalloc(sizeof(*map), GFP_NOFS);
if (map == NULL)
return ERR_PTR(-ENOMEM);
+ map->pg_temp = RB_ROOT;
ceph_decode_16_safe(p, end, version, bad);
*p += 4; /* skip length field (should match max) */
ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
+ /* pg_temp */
+ ceph_decode_32_safe(p, end, len, bad);
+ for (i = 0; i < len; i++) {
+ int n, j;
+ u64 pgid;
+ struct ceph_pg_mapping *pg;
+
+ ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
+ ceph_decode_64(p, pgid);
+ ceph_decode_32(p, n);
+ ceph_decode_need(p, end, n * sizeof(u32), bad);
+ pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
+ if (!pg) {
+ err = -ENOMEM;
+ goto bad;
+ }
+ pg->pgid = pgid;
+ pg->len = n;
+ for (j = 0; j < n; j++)
+ ceph_decode_32(p, pg->osds[j]);
+
+ __insert_pg_mapping(pg, &map->pg_temp);
+ dout(" added pg_temp %llx len %d\n", pgid, len);
+ }
+
/* crush */
ceph_decode_32_safe(p, end, len, bad);
dout("osdmap_decode crush len %d from off 0x%x\n", len,
/*
* decode and apply an incremental map update.
*/
-struct ceph_osdmap *apply_incremental(void **p, void *end,
- struct ceph_osdmap *map,
- struct ceph_messenger *msgr)
+struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
+ struct ceph_osdmap *map,
+ struct ceph_messenger *msgr)
{
struct ceph_osdmap *newmap = map;
struct crush_map *newcrush = NULL;
void *start = *p;
int err = -EINVAL;
u16 version;
+ struct rb_node *rbp;
ceph_decode_16_safe(p, end, version, bad);
map->osd_weight[osd] = off;
}
+ /* new_pg_temp */
+ rbp = rb_first(&map->pg_temp);
+ ceph_decode_32_safe(p, end, len, bad);
+ while (len--) {
+ struct ceph_pg_mapping *pg;
+ int j;
+ u64 pgid;
+ u32 len;
+ ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
+ ceph_decode_64(p, pgid);
+ ceph_decode_32(p, len);
+
+ /* remove any? */
+ while (rbp && rb_entry(rbp, struct ceph_pg_mapping,
+ node)->pgid <= pgid) {
+ struct rb_node *cur = rbp;
+ rbp = rb_next(rbp);
+ dout(" removed pg_temp %llx\n",
+ rb_entry(cur, struct ceph_pg_mapping, node)->pgid);
+ rb_erase(cur, &map->pg_temp);
+ }
+
+ if (len) {
+ /* insert */
+ ceph_decode_need(p, end, len*sizeof(u32), bad);
+ pg = kmalloc(sizeof(*pg) + sizeof(u32)*len, GFP_NOFS);
+ if (!pg) {
+ err = -ENOMEM;
+ goto bad;
+ }
+ pg->pgid = pgid;
+ pg->len = len;
+ for (j = 0; j < len; j++)
+ ceph_decode_32(p, pg->osds[j]);
+ __insert_pg_mapping(pg, &map->pg_temp);
+ dout(" added pg_temp %llx len %d\n", pgid, len);
+ }
+ }
+ while (rbp) {
+ struct rb_node *cur = rbp;
+ rbp = rb_next(rbp);
+ dout(" removed pg_temp %llx\n",
+ rb_entry(cur, struct ceph_pg_mapping, node)->pgid);
+ rb_erase(cur, &map->pg_temp);
+ }
+
/* ignore the rest */
*p = end;
return map;
return 0;
}
+
+/*
+ * Calculate raw osd vector for the given pgid. Return pointer to osd
+ * array, or NULL on failure.
+ */
+static int *calc_pg_raw(struct ceph_osdmap *osdmap, union ceph_pg pgid,
+ int *osds, int *num)
+{
+ struct rb_node *n = osdmap->pg_temp.rb_node;
+ struct ceph_pg_mapping *pg;
+ struct ceph_pg_pool_info *pool;
+ int ruleno;
+ unsigned pps; /* placement ps */
+
+ /* pg_temp? */
+ while (n) {
+ pg = rb_entry(n, struct ceph_pg_mapping, node);
+ if (pgid.pg64 < pg->pgid)
+ n = n->rb_left;
+ else if (pgid.pg64 > pg->pgid)
+ n = n->rb_right;
+ else {
+ *num = pg->len;
+ return pg->osds;
+ }
+ }
+
+ /* crush */
+ if (pgid.pg.pool >= osdmap->num_pools)
+ return NULL;
+ pool = &osdmap->pg_pool[pgid.pg.pool];
+ ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
+ pool->v.type, pool->v.size);
+ if (ruleno < 0) {
+ pr_err("ceph no crush rule pool %d type %d size %d\n",
+ pgid.pg.pool, pool->v.type, pool->v.size);
+ return NULL;
+ }
+
+ if (pgid.pg.preferred >= 0)
+ pps = ceph_stable_mod(pgid.pg.ps,
+ le32_to_cpu(pool->v.lpgp_num),
+ pool->lpgp_num_mask);
+ else
+ pps = ceph_stable_mod(pgid.pg.ps,
+ le32_to_cpu(pool->v.pgp_num),
+ pool->pgp_num_mask);
+ pps += pgid.pg.pool;
+ *num = crush_do_rule(osdmap->crush, ruleno, pps, osds,
+ min_t(int, pool->v.size, *num),
+ pgid.pg.preferred, osdmap->osd_weight);
+ return osds;
+}
+
+/*
+ * Return primary osd for given pgid, or -1 if none.
+ */
+int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, union ceph_pg pgid)
+{
+ int rawosds[10], *osds;
+ int i, num = ARRAY_SIZE(rawosds);
+
+ osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
+ if (!osds)
+ return -1;
+
+ /* primary is first up osd */
+ for (i = 0; i < num; i++)
+ if (ceph_osd_is_up(osdmap, osds[i])) {
+ return osds[i];
+ break;
+ }
+ return -1;
+}