]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crush: improve error handling in map decoding
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Mon, 25 Oct 2010 19:06:01 +0000 (12:06 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Mon, 25 Oct 2010 19:08:30 +0000 (12:08 -0700)
Signed-off-by: Colin McCabe <colinm@hq.newdream.net>
src/crush/CrushWrapper.h

index 3bd64b5733a77e14464e73f6fea3003918bd6c55..da2ef161e9ac39c615a1c108e670f41646312c20 100644 (file)
@@ -61,9 +61,9 @@ private:
     build_rmap(rule_name_map, rule_name_rmap);
     have_rmaps = true;
   }
-  void build_rmap(map<int, string> &f, std::map<string, int> &r) {
+  void build_rmap(const map<int, string> &f, std::map<string, int> &r) {
     r.clear();
-    for (std::map<int, string>::iterator p = f.begin(); p != f.end(); p++)
+    for (std::map<int, string>::const_iterator p = f.begin(); p != f.end(); ++p)
       r[p->second] = p->first;
   }
 
@@ -429,29 +429,70 @@ public:
     ::encode(rule_name_map, bl);
   }
 
-  void decode(bufferlist::iterator &blp) {
+  void decode(bufferlist::iterator &blp)
+  {
     create();
 
     __u32 magic;
     ::decode(magic, blp);
-    assert(magic == CRUSH_MAGIC);
+    if (magic != CRUSH_MAGIC)
+      throw buffer::malformed_input("bad magic number");
 
     ::decode(crush->max_buckets, blp);
     ::decode(crush->max_rules, blp);
     ::decode(crush->max_devices, blp);
 
-    // buckets
-    crush->buckets = (crush_bucket**)malloc(sizeof(crush_bucket*)*crush->max_buckets);
-    for (int i=0; i<crush->max_buckets; i++) {
-      __u32 alg;
-      ::decode(alg, blp);
-      if (!alg) {
-       crush->buckets[i] = 0;
-       continue;
+    try {
+      // buckets
+      crush->buckets = (crush_bucket**)calloc(1, crush->max_buckets * sizeof(crush_bucket*));
+      for (int i=0; i<crush->max_buckets; i++) {
+       decode_crush_bucket(&crush->buckets[i], blp);
+      }
+
+      // rules
+      crush->rules = (crush_rule**)calloc(1, crush->max_rules * sizeof(crush_rule*));
+      for (unsigned i = 0; i < crush->max_rules; ++i) {
+       __u32 yes;
+       ::decode(yes, blp);
+       if (!yes) {
+         crush->rules[i] = NULL;
+         continue;
+       }
+
+       __u32 len;
+       ::decode(len, blp);
+       crush->rules[i] = (crush_rule*)calloc(1, crush_rule_size(len));
+       crush->rules[i]->len = len;
+       ::decode(crush->rules[i]->mask, blp);
+       for (unsigned j=0; j<crush->rules[i]->len; j++)
+         ::decode(crush->rules[i]->steps[j], blp);
       }
 
-      int size = 0;
-      switch (alg) {
+      // name info
+      ::decode(type_map, blp);
+      ::decode(name_map, blp);
+      ::decode(rule_name_map, blp);
+      build_rmaps();
+
+      finalize();
+    }
+    catch (...) {
+      crush_destroy(crush);
+      throw;
+    }
+  }
+
+  void decode_crush_bucket(crush_bucket** bptr, bufferlist::iterator &blp)
+  {
+    __u32 alg;
+    ::decode(alg, blp);
+    if (!alg) {
+      *bptr = NULL;
+      return;
+    }
+
+    int size = 0;
+    switch (alg) {
       case CRUSH_BUCKET_UNIFORM:
        size = sizeof(crush_bucket_uniform);
        break;
@@ -464,93 +505,76 @@ public:
       case CRUSH_BUCKET_STRAW:
        size = sizeof(crush_bucket_straw);
        break;
-      default:
-       assert(0);
+      default: {
+       char str[128];
+       snprintf(str, sizeof(str), "unsupported bucket algorithm: %d", alg);
+       throw buffer::malformed_input(str);
       }
-      crush->buckets[i] = (crush_bucket*)malloc(size);
-      memset(crush->buckets[i], 0, size);
-
-      ::decode(crush->buckets[i]->id, blp);
-      ::decode(crush->buckets[i]->type, blp);
-      ::decode(crush->buckets[i]->alg, blp);
-      ::decode(crush->buckets[i]->hash, blp);
-      ::decode(crush->buckets[i]->weight, blp);
-      ::decode(crush->buckets[i]->size, blp);
-
-      crush->buckets[i]->items = (__s32*)malloc(sizeof(__s32)*crush->buckets[i]->size);
-      for (unsigned j=0; j<crush->buckets[i]->size; j++)
-       ::decode(crush->buckets[i]->items[j], blp);
+    }
+    crush_bucket *bucket = (crush_bucket*)calloc(1, size);
+    *bptr = bucket;
+
+    ::decode(bucket->id, blp);
+    ::decode(bucket->type, blp);
+    ::decode(bucket->alg, blp);
+    ::decode(bucket->hash, blp);
+    ::decode(bucket->weight, blp);
+    ::decode(bucket->size, blp);
+
+    bucket->items = (__s32*)calloc(1, bucket->size * sizeof(__s32));
+    for (unsigned j = 0; j < bucket->size; ++j) {
+      ::decode(bucket->items[j], blp);
+    }
 
-      crush->buckets[i]->perm = (__u32*)malloc(sizeof(__s32)*crush->buckets[i]->size);
-      crush->buckets[i]->perm_n = 0;
+    bucket->perm = (__u32*)calloc(1, bucket->size * sizeof(__s32));
+    bucket->perm_n = 0;
 
-      switch (crush->buckets[i]->alg) {
-      case CRUSH_BUCKET_UNIFORM:
-       ::decode(((crush_bucket_uniform*)crush->buckets[i])->item_weight, blp);
+    switch (bucket->alg) {
+      case CRUSH_BUCKET_UNIFORM: {
+       ::decode(((crush_bucket_uniform*)bucket)->item_weight, blp);
        break;
+      }
 
-      case CRUSH_BUCKET_LIST:
-       ((crush_bucket_list*)crush->buckets[i])->item_weights =
-         (__u32*)malloc(crush->buckets[i]->size * sizeof(__u32));
-       ((crush_bucket_list*)crush->buckets[i])->sum_weights =
-         (__u32*)malloc(crush->buckets[i]->size * sizeof(__u32));
+      case CRUSH_BUCKET_LIST: {
+       crush_bucket_list* cbl = (crush_bucket_list*)bucket;
+       cbl->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
+       cbl->sum_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
 
-       for (unsigned j=0; j<crush->buckets[i]->size; j++) {
-         ::decode(((crush_bucket_list*)crush->buckets[i])->item_weights[j], blp);
-         ::decode(((crush_bucket_list*)crush->buckets[i])->sum_weights[j], blp);
+       for (unsigned j = 0; j < bucket->size; ++j) {
+         ::decode(cbl->item_weights[j], blp);
+         ::decode(cbl->sum_weights[j], blp);
        }
        break;
+      }
 
-      case CRUSH_BUCKET_TREE:
-       {
-         ::decode(((crush_bucket_tree*)crush->buckets[i])->num_nodes, blp);
-         unsigned num_nodes = ((crush_bucket_tree*)crush->buckets[i])->num_nodes;
-         ((crush_bucket_tree*)crush->buckets[i])->node_weights =
-           (__u32*)malloc(num_nodes * sizeof(__u32));
-         for (unsigned j=0; j<num_nodes; j++)
-           ::decode(((crush_bucket_tree*)crush->buckets[i])->node_weights[j], blp);
+      case CRUSH_BUCKET_TREE: {
+       unsigned num_nodes;
+       crush_bucket_tree* cbt = (crush_bucket_tree*)bucket;
+       ::decode(num_nodes, blp);
+       cbt->num_nodes = num_nodes;
+       cbt->node_weights = (__u32*)calloc(1, num_nodes * sizeof(__u32));
+       for (unsigned j=0; j<num_nodes; j++) {
+         ::decode(cbt->node_weights[j], blp);
        }
        break;
+      }
 
-      case CRUSH_BUCKET_STRAW:
-       ((crush_bucket_straw*)crush->buckets[i])->straws =
-         (__u32*)malloc(crush->buckets[i]->size * sizeof(__u32));
-       ((crush_bucket_straw*)crush->buckets[i])->item_weights =
-         (__u32*)malloc(crush->buckets[i]->size * sizeof(__u32));
-       for (unsigned j=0; j<crush->buckets[i]->size; j++) {
-         ::decode(((crush_bucket_straw*)crush->buckets[i])->item_weights[j], blp);
-         ::decode(((crush_bucket_straw*)crush->buckets[i])->straws[j], blp);
+      case CRUSH_BUCKET_STRAW: {
+       crush_bucket_straw* cbs = (crush_bucket_straw*)bucket;
+       cbs->straws = (__u32*)calloc(1, bucket->size * sizeof(__u32));
+       cbs->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
+       for (unsigned j = 0; j < bucket->size; ++j) {
+         ::decode(cbs->item_weights[j], blp);
+         ::decode(cbs->straws[j], blp);
        }
        break;
       }
-    }
 
-    // rules
-    crush->rules = (crush_rule**)malloc(sizeof(crush_rule*)*crush->max_rules);
-    for (unsigned i=0; i<crush->max_rules; i++) {
-      __u32 yes;
-      ::decode(yes, blp);
-      if (!yes) {
-       crush->rules[i] = 0;
-       continue;
-      }
-
-      __u32 len;
-      ::decode(len, blp);
-      crush->rules[i] = (crush_rule*)malloc(crush_rule_size(len));
-      crush->rules[i]->len = len;
-      ::decode(crush->rules[i]->mask, blp);
-      for (unsigned j=0; j<crush->rules[i]->len; j++)
-       ::decode(crush->rules[i]->steps[j], blp);
+      default:
+       // We should have handled this case in the first switch statement
+       assert(0);
+       break;
     }
-
-    // name info
-    ::decode(type_map, blp);
-    ::decode(name_map, blp);
-    ::decode(rule_name_map, blp);
-    build_rmaps();
-
-    finalize();
   }
 };