]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crush: add add_item and reweight functions
authorSage Weil <sage@newdream.net>
Tue, 17 May 2011 20:04:24 +0000 (13:04 -0700)
committerSage Weil <sage@newdream.net>
Tue, 17 May 2011 20:30:49 +0000 (13:30 -0700)
Insert a device at a particular point in the hierarchy, and adjust weights
as appropriate.

Signed-off-by: Sage Weil <sage@newdream.net>
src/Makefile.am
src/crush/CrushWrapper.cc [new file with mode: 0644]
src/crush/CrushWrapper.h
src/crush/builder.c
src/crush/builder.h
src/crushtool.cc

index a5b4229dc729fa97badaedecdfa816b377a4b374..998d9a1a90afa15683bcb337bdbf352cd93ef132 100644 (file)
@@ -629,6 +629,7 @@ libcommon_files = \
        common/str_list.cc \
        common/errno.cc \
        msg/SimpleMessenger.cc \
+       crush/CrushWrapper.cc \
        mon/MonMap.cc \
        mon/MonClient.cc \
        osd/OSDMap.cc \
diff --git a/src/crush/CrushWrapper.cc b/src/crush/CrushWrapper.cc
new file mode 100644 (file)
index 0000000..216f6d3
--- /dev/null
@@ -0,0 +1,88 @@
+
+#include "common/debug.h"
+
+#include "CrushWrapper.h"
+
+void CrushWrapper::find_roots(set<int>& roots) const
+{
+  for (unsigned i=0; i<crush->max_rules; i++) {
+    crush_rule *r = crush->rules[i];
+    if (!r)
+      continue;
+    for (unsigned j=0; j<r->len; j++) {
+      if (r->steps[j].op == CRUSH_RULE_TAKE)
+       roots.insert(r->steps[j].arg1);
+    }
+  }
+}
+
+
+int CrushWrapper::insert_device(int item, int weight, string name,
+                               map<string,string>& loc)  // typename -> bucketname
+{
+  dout(0) << "insert_device item " << item << " weight " << weight
+         << " name " << name << " loc " << loc << dendl;
+  set_item_name(item, name.c_str());
+
+  for (map<int,string>::iterator p = type_map.begin(); p != type_map.end(); p++) {
+    if (p->first == 0)
+      continue;
+
+    int id = get_item_id(loc[p->second].c_str());
+    if (!id) {
+      // create the bucket
+      dout(0) << "insert_device creating bucket " << loc[p->second] << dendl;
+      int empty = 0;
+      id = add_bucket(0, CRUSH_BUCKET_STRAW, CRUSH_HASH_DEFAULT, p->first, 1, &item, &empty);
+      set_item_name(id, loc[p->second].c_str());
+      item = id;
+      continue;
+    }
+
+    // add to an existing bucket
+    if (!bucket_exists(id)) {
+      dout(0) << "insert_device don't have bucket " << id << dendl;
+      return -EINVAL;
+    }
+      
+    dout(0) << "insert_device adding " << item << " weight " << weight
+           << " to bucket " << id << dendl;
+    crush_bucket *b = (crush_bucket *)get_bucket(id);
+    assert(b);
+    crush_bucket_add_item(b, item, 0);
+    adjust_item_weight(item, weight);
+    return 0;
+  }
+
+  dout(0) << "warning: didn't find anywhere to add item " << item << " in " << loc << dendl;
+  return -EINVAL;
+}
+
+void CrushWrapper::adjust_item_weight(int id, int weight)
+{
+  dout(0) << "adjust_item_weight " << id << " weight " << weight << dendl;
+  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
+    crush_bucket *b = crush->buckets[bidx];
+    if (b == 0)
+      continue;
+    for (unsigned i = 0; i < b->size; i++)
+      if (b->items[i] == id) {
+       int diff = crush_bucket_adjust_item_weight(b, id, weight);
+       dout(0) << "adjust_item_weight " << id << " diff " << diff << dendl;
+       adjust_item_weight(-1 - bidx, b->weight);
+      }
+  }
+}
+
+void CrushWrapper::reweight()
+{
+  set<int> roots;
+  find_roots(roots);
+  for (set<int>::iterator p = roots.begin(); p != roots.end(); p++) {
+    if (*p >= 0)
+      continue;
+    crush_bucket *b = (struct crush_bucket *)get_bucket(*p);
+    dout(0) << "reweight bucket " << *p << dendl;
+    crush_reweight_bucket(crush, b);
+  }
+}
index 5d0dfe27621562a16db1fbdf836a4bf8d7a62dbf..6baaa70125afc8d76fdbc651901d6b384bc69f1b 100644 (file)
@@ -144,6 +144,13 @@ public:
       rule_name_rmap[name] = i;
   }
 
+
+  void find_roots(set<int>& roots) const;
+  int insert_device(int id, int weight, string name, map<string,string>& loc);
+  void adjust_item_weight(int id, int weight);
+  void reweight();
+
+
   /*** devices ***/
   int get_max_devices() {
     if (!crush) return 0;
index bff8ee4e8a56826862502474a3b7cb47bef839da..8049291afbe92c2ba6fac9db27f13429598dcd49 100644 (file)
@@ -235,6 +235,22 @@ static int parent(int n)
                return n + (1<<h);
 }
 
+static int calc_depth(int size)
+{
+       int depth = 1;
+       int t = size - 1;
+       while (t) {
+               t = t >> 1;
+               depth++;
+       }
+       return depth;
+}
+
+static int calc_node(int i)
+{
+       return ((i+1) << 1)-1;
+}
+
 struct crush_bucket_tree*
 crush_make_tree_bucket(int hash, int type, int size,
                       int *items,    /* in leaf order */
@@ -243,7 +259,7 @@ crush_make_tree_bucket(int hash, int type, int size,
        struct crush_bucket_tree *bucket;
        int depth;
        int node;
-       int t, i, j;
+       int i, j;
 
        bucket = malloc(sizeof(*bucket));
        memset(bucket, 0, sizeof(*bucket));
@@ -256,12 +272,7 @@ crush_make_tree_bucket(int hash, int type, int size,
        bucket->h.perm = malloc(sizeof(__u32)*size);
 
        /* calc tree depth */
-       depth = 1;
-       t = size - 1;
-       while (t) {
-               t = t >> 1;
-               depth++;
-       }
+       depth = calc_depth(size);
        bucket->num_nodes = 1 << depth;
        bucket->node_weights = malloc(sizeof(__u32)*bucket->num_nodes);
 
@@ -270,7 +281,7 @@ crush_make_tree_bucket(int hash, int type, int size,
 
        for (i=0; i<size; i++) {
                bucket->h.items[i] = items[i];
-               node = ((i+1) << 1)-1;
+               node = calc_node(i);
                printf("item %d node %d weight %d\n", i, node, weights[i]);
                bucket->node_weights[node] = weights[i];
                bucket->h.weight += weights[i];
@@ -289,38 +300,14 @@ crush_make_tree_bucket(int hash, int type, int size,
 
 /* straw bucket */
 
-struct crush_bucket_straw *
-crush_make_straw_bucket(int hash, 
-                       int type,
-                       int size,
-                       int *items,
-                       int *weights)
+int crush_calc_straw(struct crush_bucket_straw *bucket)
 {
-       struct crush_bucket_straw *bucket;
        int *reverse;
        int i, j, k;
-
        double straw, wbelow, lastw, wnext, pbelow;
        int numleft;
-
-       bucket = malloc(sizeof(*bucket));
-       memset(bucket, 0, sizeof(*bucket));
-       bucket->h.alg = CRUSH_BUCKET_STRAW;
-       bucket->h.hash = hash;
-       bucket->h.type = type;
-       bucket->h.size = size;
-
-       bucket->h.items = malloc(sizeof(__u32)*size);
-       bucket->h.perm = malloc(sizeof(__u32)*size);
-       bucket->item_weights = malloc(sizeof(__u32)*size);
-       bucket->straws = malloc(sizeof(__u32)*size);
-
-       bucket->h.weight = 0;
-       for (i=0; i<size; i++) {
-               bucket->h.items[i] = items[i];
-               bucket->h.weight += weights[i];
-               bucket->item_weights[i] = weights[i];
-       }
+       int size = bucket->h.size;
+       __u32 *weights = bucket->item_weights;
 
        /* reverse sort by weight (simple insertion sort) */
        reverse = malloc(sizeof(int) * size);
@@ -384,6 +371,39 @@ crush_make_straw_bucket(int hash,
        }
 
        free(reverse);
+       return 0;
+}
+
+struct crush_bucket_straw *
+crush_make_straw_bucket(int hash, 
+                       int type,
+                       int size,
+                       int *items,
+                       int *weights)
+{
+       struct crush_bucket_straw *bucket;
+       int i;
+
+       bucket = malloc(sizeof(*bucket));
+       memset(bucket, 0, sizeof(*bucket));
+       bucket->h.alg = CRUSH_BUCKET_STRAW;
+       bucket->h.hash = hash;
+       bucket->h.type = type;
+       bucket->h.size = size;
+
+       bucket->h.items = malloc(sizeof(__u32)*size);
+       bucket->h.perm = malloc(sizeof(__u32)*size);
+       bucket->item_weights = malloc(sizeof(__u32)*size);
+       bucket->straws = malloc(sizeof(__u32)*size);
+
+       bucket->h.weight = 0;
+       for (i=0; i<size; i++) {
+               bucket->h.items[i] = items[i];
+               bucket->h.weight += weights[i];
+               bucket->item_weights[i] = weights[i];
+       }
+
+       crush_calc_straw(bucket);
 
        return bucket;
 }
@@ -416,3 +436,299 @@ crush_make_bucket(int alg, int hash, int type, int size,
        }
        return 0;
 }
+
+
+/************************************************/
+
+int crush_add_uniform_bucket_item(struct crush_bucket_uniform *bucket, int item, int weight)
+{
+       int newsize = bucket->h.size + 1;
+
+       bucket->h.items = realloc(bucket->h.items, sizeof(__u32)*newsize);
+       bucket->h.perm = realloc(bucket->h.perm, sizeof(__u32)*newsize);
+
+       bucket->h.items[newsize-1] = item;
+       bucket->h.weight += weight;
+       bucket->h.size++;
+
+       return 0;
+}
+
+int crush_add_list_bucket_item(struct crush_bucket_list *bucket, int item, int weight)
+{
+       int newsize = bucket->h.size + 1;
+
+       bucket->h.items = realloc(bucket->h.items, sizeof(__u32)*newsize);
+       bucket->h.perm = realloc(bucket->h.perm, sizeof(__u32)*newsize);
+       bucket->item_weights = realloc(bucket->item_weights, sizeof(__u32)*newsize);
+       bucket->sum_weights = realloc(bucket->sum_weights, sizeof(__u32)*newsize);
+
+       bucket->h.items[newsize-1] = item;
+       bucket->item_weights[newsize-1] = weight;
+       if (newsize > 1)
+               bucket->sum_weights[newsize-1] = bucket->sum_weights[newsize-2] + weight;
+       else
+               bucket->sum_weights[newsize-1] = weight;
+
+       bucket->h.weight += weight;
+       bucket->h.size++;
+       return 0;
+}
+
+int crush_add_tree_bucket_item(struct crush_bucket_tree *bucket, int item, int weight)
+{
+       int newsize = bucket->h.size + 1;
+       int depth = calc_depth(newsize);;
+       int node;
+       int j;
+
+       bucket->num_nodes = 1 << depth;
+       bucket->h.items = realloc(bucket->h.items, sizeof(__u32)*newsize);
+       bucket->h.perm = realloc(bucket->h.perm, sizeof(__u32)*newsize);
+       bucket->node_weights = realloc(bucket->node_weights, sizeof(__u32)*bucket->num_nodes);
+       
+       node = calc_node(newsize-1);
+       bucket->node_weights[node] = weight;
+
+       for (j=1; j<depth; j++) {
+               node = parent(node);
+               bucket->node_weights[node] += weight;
+               printf(" node %d weight %d\n", node, bucket->node_weights[node]);
+       }
+       
+       bucket->h.weight += weight;
+       bucket->h.size++;
+       return 0;
+}
+
+int crush_add_straw_bucket_item(struct crush_bucket_straw *bucket, int item, int weight)
+{
+       int newsize = bucket->h.size + 1;
+       
+       bucket->h.items = realloc(bucket->h.items, sizeof(__u32)*newsize);
+       bucket->h.perm = realloc(bucket->h.perm, sizeof(__u32)*newsize);
+       bucket->item_weights = realloc(bucket->item_weights, sizeof(__u32)*newsize);
+       bucket->straws = realloc(bucket->straws, sizeof(__u32)*newsize);
+
+       bucket->h.items[newsize-1] = item;
+       bucket->item_weights[newsize-1] = weight;
+
+       bucket->h.weight += weight;
+       bucket->h.size++;
+       
+       return crush_calc_straw(bucket);
+}
+
+int crush_bucket_add_item(struct crush_bucket *b, int item, int weight)
+{
+       switch (b->alg) {
+       case CRUSH_BUCKET_UNIFORM:
+               return crush_add_uniform_bucket_item((struct crush_bucket_uniform *)b, item, weight);
+       case CRUSH_BUCKET_LIST:
+               return crush_add_list_bucket_item((struct crush_bucket_list *)b, item, weight);
+       case CRUSH_BUCKET_TREE:
+               return crush_add_tree_bucket_item((struct crush_bucket_tree *)b, item, weight);
+       case CRUSH_BUCKET_STRAW:
+               return crush_add_straw_bucket_item((struct crush_bucket_straw *)b, item, weight);
+       default:
+               return -1;
+       }
+       return 0;
+}
+
+/************************************************/
+
+int crush_adjust_uniform_bucket_item_weight(struct crush_bucket_uniform *bucket, int item, int weight)
+{
+       int diff = (weight - bucket->item_weight) * bucket->h.size;
+
+       bucket->item_weight = weight;
+       bucket->h.weight = bucket->item_weight * bucket->h.size;
+
+       return diff;
+}
+
+int crush_adjust_list_bucket_item_weight(struct crush_bucket_list *bucket, int item, int weight)
+{
+       int diff;
+       unsigned i, j;
+
+       for (i = 0; i < bucket->h.size; i++) {
+               if (bucket->h.items[i] == item)
+                       break;
+       }
+       if (i == bucket->h.size)
+               return 0;
+
+       diff = weight = bucket->item_weights[i];
+       bucket->item_weights[i] = weight;
+       bucket->h.weight += diff;
+
+       for (j = i; j < bucket->h.size; j++)
+               bucket->sum_weights[j] += diff;
+
+       return diff;
+}
+
+int crush_adjust_tree_bucket_item_weight(struct crush_bucket_tree *bucket, int item, int weight)
+{
+       int diff;
+       int node;
+       unsigned i, j;
+       unsigned depth = calc_depth(bucket->h.size);
+
+       for (i = 0; i < bucket->h.size; i++) {
+               if (bucket->h.items[i] == item)
+                       break;
+       }
+       if (i == bucket->h.size)
+               return 0;
+       
+       node = calc_node(i);
+       diff = weight = bucket->node_weights[node];
+       bucket->node_weights[node] = weight;
+       bucket->h.weight += diff;
+
+       for (j=1; j<depth; j++) {
+               node = parent(node);
+               bucket->node_weights[node] += diff;
+       }
+
+       return diff;
+}
+
+int crush_adjust_straw_bucket_item_weight(struct crush_bucket_straw *bucket, int item, int weight)
+{
+       unsigned idx;
+       int diff;
+
+       for (idx = 0; idx < bucket->h.size; idx++)
+               if (bucket->h.items[idx] == item)
+                       break;
+       if (idx == bucket->h.size)
+               return 0;
+
+       diff = weight - bucket->item_weights[idx];
+       bucket->item_weights[idx] = weight;
+       bucket->h.weight += diff;
+
+       crush_calc_straw(bucket);
+
+       return diff;
+}
+
+int crush_bucket_adjust_item_weight(struct crush_bucket *b, int item, int weight)
+{
+       switch (b->alg) {
+       case CRUSH_BUCKET_UNIFORM:
+               return crush_adjust_uniform_bucket_item_weight((struct crush_bucket_uniform *)b,
+                                                            item, weight);
+       case CRUSH_BUCKET_LIST:
+               return crush_adjust_list_bucket_item_weight((struct crush_bucket_list *)b,
+                                                           item, weight);
+       case CRUSH_BUCKET_TREE:
+               return crush_adjust_tree_bucket_item_weight((struct crush_bucket_tree *)b,
+                                                           item, weight);
+       case CRUSH_BUCKET_STRAW:
+               return crush_adjust_straw_bucket_item_weight((struct crush_bucket_straw *)b,
+                                                            item, weight);
+       default:
+               return -1;
+       }
+       return 0;
+}
+
+/************************************************/
+
+int crush_reweight_uniform_bucket(struct crush_map *crush, struct crush_bucket_uniform *bucket)
+{
+       unsigned i;
+       unsigned sum = 0, n = 0, leaves = 0;
+
+       for (i = 0; i < bucket->h.size; i++) {
+               int id = bucket->h.items[i];
+               if (id < 0) {
+                       struct crush_bucket *c = crush->buckets[-1-id];
+                       crush_reweight_bucket(crush, c);
+                       sum += c->weight;
+                       n++;
+               } else {
+                       leaves++;
+               }
+       }
+
+       if (n > leaves)
+               bucket->item_weight = sum / n;  // more bucket children than leaves, average!
+       bucket->h.weight = bucket->item_weight * bucket->h.size;
+
+       return 0;
+}
+
+int crush_reweight_list_bucket(struct crush_map *crush, struct crush_bucket_list *bucket)
+{
+       unsigned i;
+
+       bucket->h.weight = 0;
+       for (i = 0; i < bucket->h.size; i++) {
+               int id = bucket->h.items[i];
+               if (id < 0) {
+                       struct crush_bucket *c = crush->buckets[-1-id];
+                       crush_reweight_bucket(crush, c);
+                       bucket->item_weights[i] = c->weight;
+               }
+               bucket->h.weight += bucket->item_weights[i];
+       }
+       return 0;
+}
+
+int crush_reweight_tree_bucket(struct crush_map *crush, struct crush_bucket_tree *bucket)
+{
+       unsigned i;
+
+       bucket->h.weight = 0;
+       for (i = 0; i < bucket->h.size; i++) {
+               int node = calc_node(i);
+               int id = bucket->h.items[i];
+               if (id < 0) {
+                       struct crush_bucket *c = crush->buckets[-1-id];
+                       crush_reweight_bucket(crush, c);
+                       bucket->node_weights[node] = c->weight;
+               }
+               bucket->h.weight += bucket->node_weights[node];
+       }
+       return 0;
+}
+
+int crush_reweight_straw_bucket(struct crush_map *crush, struct crush_bucket_straw *bucket)
+{
+       unsigned i;
+
+       bucket->h.weight = 0;
+       for (i = 0; i < bucket->h.size; i++) {
+               int id = bucket->h.items[i];
+               if (id < 0) {
+                       struct crush_bucket *c = crush->buckets[-1-id];
+                       crush_reweight_bucket(crush, c);
+                       bucket->item_weights[i] = c->weight;
+               }
+               bucket->h.weight += bucket->item_weights[i];
+       }
+       return 0;
+}
+
+int crush_reweight_bucket(struct crush_map *crush, struct crush_bucket *b)
+{
+       switch (b->alg) {
+       case CRUSH_BUCKET_UNIFORM:
+               return crush_reweight_uniform_bucket(crush, (struct crush_bucket_uniform *)b);
+       case CRUSH_BUCKET_LIST:
+               return crush_reweight_list_bucket(crush, (struct crush_bucket_list *)b);
+       case CRUSH_BUCKET_TREE:
+               return crush_reweight_tree_bucket(crush, (struct crush_bucket_tree *)b);
+       case CRUSH_BUCKET_STRAW:
+               return crush_reweight_straw_bucket(crush, (struct crush_bucket_straw *)b);
+       default:
+               return -1;
+       }
+       return 0;
+}
index 02ed57f05336316212e1b7aa2c7232a46211938f..245cfb378b3b437931fba0298399ea45512bb48a 100644 (file)
@@ -17,6 +17,9 @@ extern int crush_add_bucket(struct crush_map *map,
                            int bucketno,
                            struct crush_bucket *bucket);
 struct crush_bucket *crush_make_bucket(int alg, int hash, int type, int size, int *items, int *weights);
+extern int crush_bucket_add_item(struct crush_bucket *bucket, int item, int weight);
+extern int crush_bucket_adjust_item_weight(struct crush_bucket *bucket, int item, int weight);
+extern int crush_reweight_bucket(struct crush_map *crush, struct crush_bucket *bucket);
 
 struct crush_bucket_uniform *
 crush_make_uniform_bucket(int hash, int type, int size,
index a8cd485c34289d653390e73ea3a9e4774ad283dd..5c66c4b6854801b4d3f5ead50dcbbda9448ad6de 100644 (file)
@@ -791,6 +791,12 @@ int main(int argc, const char **argv)
   const char *outfn = 0;
   bool clobber = false;
 
+  bool reweight = false;
+  int add_item = -1;
+  float add_weight = 0;
+  const char *add_name = 0;
+  map<string,string> add_loc;
+
   int build = 0;
   int num_osds =0;
   vector<layer_t> layers;
@@ -806,14 +812,26 @@ int main(int argc, const char **argv)
     } else if (CEPH_ARGPARSE_EQ("decompile", 'd')) {
       CEPH_ARGPARSE_SET_ARG_VAL(&infn, OPT_STR);
       decompile = true;
+    } else if (CEPH_ARGPARSE_EQ("infn", 'i')) {
+      CEPH_ARGPARSE_SET_ARG_VAL(&infn, OPT_STR);
     } else if (CEPH_ARGPARSE_EQ("outfn", 'o')) {
       CEPH_ARGPARSE_SET_ARG_VAL(&outfn, OPT_STR);
     } else if (CEPH_ARGPARSE_EQ("compile", 'c')) {
       CEPH_ARGPARSE_SET_ARG_VAL(&srcfn, OPT_STR);
       compile = true;
     } else if (CEPH_ARGPARSE_EQ("test", 't')) {
-      CEPH_ARGPARSE_SET_ARG_VAL(&infn, OPT_STR);
       test = true;
+    } else if (CEPH_ARGPARSE_EQ("reweight", '\0')) {
+      reweight = true;
+    } else if (CEPH_ARGPARSE_EQ("add_item", '\0')) {
+      CEPH_ARGPARSE_SET_ARG_VAL(&add_item, OPT_INT);
+      CEPH_ARGPARSE_SET_ARG_VAL(&add_weight, OPT_FLOAT);
+      CEPH_ARGPARSE_SET_ARG_VAL(&add_name, OPT_STR);
+    } else if (CEPH_ARGPARSE_EQ("loc", '\0')) {
+      const char *type, *name;
+      CEPH_ARGPARSE_SET_ARG_VAL(&type, OPT_STR);
+      CEPH_ARGPARSE_SET_ARG_VAL(&name, OPT_STR);
+      add_loc[type] = name;
     } else if (CEPH_ARGPARSE_EQ("verbose", 'v')) {
       verbose++;
     } else if (CEPH_ARGPARSE_EQ("build", '\0')) {
@@ -859,7 +877,7 @@ int main(int argc, const char **argv)
   }
   if (decompile + compile + build > 1)
     usage();
-  if (!compile && !decompile && !build && !test)
+  if (!compile && !decompile && !build && !test && !reweight && add_item < 0)
     usage();
 
   /*
@@ -869,6 +887,7 @@ int main(int argc, const char **argv)
   */
 
   CrushWrapper crush;
+  bool modified = false;
 
   if (infn) {
     bufferlist bl;
@@ -902,8 +921,7 @@ int main(int argc, const char **argv)
     crush.finalize();
     if (r < 0) 
       exit(1);
-    if (!outfn)
-      cout << me << " successfully compiled '" << srcfn << "'.  Use -o file to write it out." << std::endl;
+    modified = true;
   }
 
   if (build) {
@@ -1010,11 +1028,24 @@ int main(int argc, const char **argv)
     crush.finalize();
     dout(0) << "crush max_devices " << crush.crush->max_devices << dendl;
 
-    if (!outfn)
-      cout << me << " successfully built map.  Use -o file to write it out." << std::endl;
+    modified = true;
   }
-  if (compile || build) {
-    if (outfn) {
+
+  if (add_item >= 0) {
+    cout << me << " adding item " << add_item << " weight " << add_weight
+        << " at " << add_loc << std::endl;
+    crush.insert_device(add_item, (int)(add_weight * (float)0x10000), add_name, add_loc);
+    modified = true;
+  }
+  if (reweight) {
+    crush.reweight();
+    modified = true;
+  }
+
+  if (modified) {
+    if (!outfn) {
+      cout << me << " successfully built or modified map.  Use '-o <file>' to write it out." << std::endl;
+    } else {
       bufferlist bl;
       crush.encode(bl);
       int r = bl.write_file(outfn);