]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
erasure-code: locally repairable code plugin
authorLoic Dachary <loic@dachary.org>
Sun, 8 Jun 2014 12:19:18 +0000 (14:19 +0200)
committerLoic Dachary <loic-201408@dachary.org>
Fri, 29 Aug 2014 17:38:57 +0000 (19:38 +0200)
Recursively apply erasure code techniques so that recovering from the
loss of some chunks only require a subset of the available chunks, most
of the time.

http://tracker.ceph.com/issues/7238 Fixes: #7238

Signed-off-by: Loic Dachary <loic@dachary.org>
doc/rados/operations/index.rst
src/erasure-code/LRC/ErasureCodeLRC.cc [new file with mode: 0644]
src/erasure-code/LRC/ErasureCodeLRC.h [new file with mode: 0644]
src/erasure-code/LRC/ErasureCodePluginLRC.cc [new file with mode: 0644]
src/erasure-code/LRC/Makefile.am [new file with mode: 0644]
src/erasure-code/Makefile.am
src/test/erasure-code/Makefile.am
src/test/erasure-code/TestErasureCodeLRC.cc [new file with mode: 0644]
src/test/erasure-code/TestErasureCodePluginLRC.cc [new file with mode: 0644]

index 96bbdda4d158d0d75157575282b47ad9902f2bbc..47aa7804b5a5b65e3d771b9cb58de42dc7e95bc1 100644 (file)
@@ -34,6 +34,7 @@ CRUSH algorithm.
        data-placement
        pools
        erasure-code-profile
+       erasure-code-lrc
        cache-tiering
        placement-groups
        crush-map
diff --git a/src/erasure-code/LRC/ErasureCodeLRC.cc b/src/erasure-code/LRC/ErasureCodeLRC.cc
new file mode 100644 (file)
index 0000000..02d22ce
--- /dev/null
@@ -0,0 +1,723 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <errno.h>
+#include <algorithm>
+
+#include "include/str_map.h"
+#include "common/debug.h"
+#include "crush/CrushWrapper.h"
+#include "osd/osd_types.h"
+#include "include/stringify.h"
+#include "erasure-code/ErasureCodePlugin.h"
+#include "json_spirit/json_spirit_writer.h"
+
+#include "ErasureCodeLRC.h"
+
+// re-include our assert to clobber boost's
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static ostream& _prefix(std::ostream* _dout)
+{
+  return *_dout << "ErasureCodeLRC: ";
+}
+
+int ErasureCodeLRC::create_ruleset(const string &name,
+                                  CrushWrapper &crush,
+                                  ostream *ss) const
+{
+  if (crush.rule_exists(name)) {
+    *ss << "rule " << name << " exists";
+    return -EEXIST;
+  }
+  if (!crush.name_exists(ruleset_root)) {
+    *ss << "root item " << ruleset_root << " does not exist";
+    return -ENOENT;
+  }
+  int root = crush.get_item_id(ruleset_root);
+
+  int ruleset = 0;
+  for (int i = 0; i < crush.get_max_rules(); i++) {
+    if (crush.rule_exists(i) &&
+       crush.get_rule_mask_ruleset(i) >= ruleset) {
+      ruleset = crush.get_rule_mask_ruleset(i) + 1;
+    }
+  }
+
+  int steps = 3 + ruleset_steps.size();
+  int min_rep = 3;
+  int max_rep = 30;
+  crush_rule *rule = crush_make_rule(steps, ruleset,
+                                    pg_pool_t::TYPE_ERASURE,
+                                    min_rep, max_rep);
+  assert(rule);
+  int step = 0;
+  crush_rule_set_step(rule, step++, CRUSH_RULE_SET_CHOOSELEAF_TRIES, 5, 0);
+  crush_rule_set_step(rule, step++, CRUSH_RULE_TAKE, root, 0);
+  // [ [ "choose", "rack", 2 ],
+  //   [ "chooseleaf", "host", 5 ] ]
+  for (vector<Step>::const_iterator i = ruleset_steps.begin();
+       i != ruleset_steps.end();
+       ++i) {
+    int op = i->op == "chooseleaf" ?
+      CRUSH_RULE_CHOOSELEAF_INDEP : CRUSH_RULE_CHOOSE_INDEP;
+    int type = crush.get_type_id(i->type);
+    if (type < 0) {
+      *ss << "unknown crush type " << i->type;
+      return -EINVAL;
+    }
+    crush_rule_set_step(rule, step++, op, i->n, type);
+  }
+  crush_rule_set_step(rule, step++, CRUSH_RULE_EMIT, 0, 0);
+  int rno = crush_add_rule(crush.crush, rule, -1);
+  crush.set_rule_name(rno, name);
+
+  return ruleset;
+}
+
+int ErasureCodeLRC::layers_description(const map<string,string> &parameters,
+                                      json_spirit::mArray *description,
+                                      ostream *ss) const
+{
+  if (parameters.count("layers") == 0) {
+    *ss << "could not find 'layers' in " << parameters << std::endl;
+    return ERROR_LRC_DESCRIPTION;
+  }
+  string str = parameters.find("layers")->second;
+  try {
+    json_spirit::mValue json;
+    json_spirit::read_or_throw(str, json);
+
+    if (json.type() != json_spirit::array_type) {
+      *ss << "layers='" << str
+         << "' must be a JSON array but is of type "
+         << json.type() << " instead" << std::endl;
+      return ERROR_LRC_ARRAY;
+    }
+    *description = json.get_array();
+  } catch (json_spirit::Error_position &e) {
+    *ss << "failed to parse layers='" << str << "'"
+       << " at line " << e.line_ << ", column " << e.column_
+       << " : " << e.reason_ << std::endl;
+    return ERROR_LRC_PARSE_JSON;
+  }
+  return 0;
+}
+
+int ErasureCodeLRC::layers_parse(string description_string,
+                                json_spirit::mArray description,
+                                ostream *ss)
+{
+  int position = 0;
+  for (vector<json_spirit::mValue>::iterator i = description.begin();
+       i != description.end();
+       i++, position++) {
+    if (i->type() != json_spirit::array_type) {
+      stringstream json_string;
+      json_spirit::write(*i, json_string);
+      *ss << "each element of the array "
+         << description_string << " must be a JSON array but "
+         << json_string.str() << " at position " << position
+         << " is of type " << i->type() << " instead" << std::endl;
+      return ERROR_LRC_ARRAY;
+    }
+    json_spirit::mArray layer_json = i->get_array();
+    map<string, string> parameters;
+    int index = 0;
+    for (vector<json_spirit::mValue>::iterator j = layer_json.begin();
+        j != layer_json.end();
+        ++j, ++index) {
+      if (index == 0) {
+       if (j->type() != json_spirit::str_type) {
+         stringstream element;
+         json_spirit::write(*j, element);
+         *ss << "the first element of the entry " 
+             << element.str() << " (first is zero) "
+             << position << " in " << description_string 
+             << " is of type " << (*j).type() << " instead of string" << std::endl;
+         return ERROR_LRC_STR;
+       }
+       layers.push_back(Layer(j->get_str()));
+       Layer &layer = layers.back();
+       layer.chunks_map = j->get_str();
+      } else if(index == 1) {
+       Layer &layer = layers.back();
+       if (j->type() != json_spirit::str_type &&
+           j->type() != json_spirit::obj_type) {
+         stringstream element;
+         json_spirit::write(*j, element);
+         *ss << "the second element of the entry " 
+             << element.str() << " (first is zero) "
+             << position << " in " << description_string 
+             << " is of type " << (*j).type() << " instead of string or object"
+             << std::endl;
+         return ERROR_LRC_CONFIG_OPTIONS;
+       }
+       if (j->type() == json_spirit::str_type) {
+         int err = get_str_map(j->get_str(), *ss, &layer.parameters);
+         if (err)
+           return err;
+       } else if (j->type() == json_spirit::obj_type) {
+         json_spirit::mObject o = j->get_obj();
+
+         for (map<string, json_spirit::mValue>::iterator i = o.begin();
+              i != o.end();
+              ++i) {
+           layer.parameters[i->first] = i->second.get_str();
+         }
+       }
+      } else {
+         // ignore trailing elements
+      }
+    }
+  }
+  return 0;
+} 
+
+int ErasureCodeLRC::layers_init()
+{
+  ErasureCodePluginRegistry &registry = ErasureCodePluginRegistry::instance();
+  int err;
+  for (unsigned int i = 0; i < layers.size(); i++) {
+    Layer &layer = layers[i];
+    int position = 0;
+    for(std::string::iterator it = layer.chunks_map.begin();
+       it != layer.chunks_map.end();
+       ++it) {
+      if (*it == 'D')
+       layer.data.push_back(position);
+      if (*it == 'c')
+       layer.coding.push_back(position);
+      if (*it == 'c' || *it == 'D')
+       layer.chunks_as_set.insert(position);
+      position++;
+    }
+    layer.chunks = layer.data;
+    layer.chunks.insert(layer.chunks.end(),
+                       layer.coding.begin(), layer.coding.end());
+    if (layer.parameters.find("k") == layer.parameters.end())
+      layer.parameters["k"] = stringify(layer.data.size());
+    if (layer.parameters.find("m") == layer.parameters.end())
+      layer.parameters["m"] = stringify(layer.coding.size());
+    if (layer.parameters.find("plugin") == layer.parameters.end())
+      layer.parameters["plugin"] = "jerasure";
+    if (layer.parameters.find("technique") == layer.parameters.end())
+      layer.parameters["technique"] = "reed_sol_van";
+    if (layer.parameters.find("directory") == layer.parameters.end()) 
+      layer.parameters["directory"] = directory;
+    stringstream ss;
+    err = registry.factory(layer.parameters["plugin"],
+                          layer.parameters,
+                          &layer.erasure_code,
+                          ss);
+    if (err) {
+      derr << ss.str() << dendl;
+      return err;
+    }
+  }
+  return 0;
+}
+
+int ErasureCodeLRC::layers_sanity_checks(string description_string,
+                                        ostream *ss) const
+{
+  int position = 0;
+
+  if (layers.size() < 1) {
+    *ss << "layers parameter has " << layers.size()
+       << " which is less than the minimum of one. "
+       << description_string << std::endl;
+    return ERROR_LRC_LAYERS_COUNT;
+  }
+  for (vector<Layer>::const_iterator layer = layers.begin();
+       layer != layers.end();
+       ++layer) {
+    if (chunk_count != layer->chunks_map.length()) {
+      *ss << "the first element of the array at position " 
+         << position << " (starting from zero) " 
+         << " is the string '" << layer->chunks_map
+         << " found in the layers parameter "
+         << description_string << ". It is expected to be "
+         << chunk_count << " characters long but is "
+         << layer->chunks_map.length() << " characters long instead "
+         << std::endl;
+      return ERROR_LRC_MAPPING_SIZE;
+    }
+  }
+  return 0;
+}
+
+int ErasureCodeLRC::parse(const map<string,string> &parameters,
+                         ostream *ss)
+{
+  int r = ErasureCode::parse(parameters, ss);
+  if (r)
+    return r;
+
+  if (parameters.count("directory") != 0)
+    directory = parameters.find("directory")->second;
+  
+  return parse_ruleset(parameters, ss);
+}
+
+int ErasureCodeLRC::parse_ruleset(const map<string,string> &parameters,
+                                 ostream *ss)
+{
+  map<string,string>::const_iterator parameter;
+  parameter = parameters.find("ruleset-root");
+  if (parameter != parameters.end())
+    ruleset_root = parameter->second;
+
+  if (parameters.count("ruleset-steps") != 0) {
+    ruleset_steps.clear();
+    string str = parameters.find("ruleset-steps")->second;
+    json_spirit::mArray description;
+    try {
+      json_spirit::mValue json;
+      json_spirit::read_or_throw(str, json);
+
+      if (json.type() != json_spirit::array_type) {
+       *ss << "ruleset-steps='" << str
+           << "' must be a JSON array but is of type "
+           << json.type() << " instead" << std::endl;
+       return ERROR_LRC_ARRAY;
+      }
+      description = json.get_array();
+    } catch (json_spirit::Error_position &e) {
+      *ss << "failed to parse ruleset-steps='" << str << "'"
+         << " at line " << e.line_ << ", column " << e.column_
+         << " : " << e.reason_ << std::endl;
+      return ERROR_LRC_PARSE_JSON;
+    }
+
+    int position = 0;
+    for (vector<json_spirit::mValue>::iterator i = description.begin();
+        i != description.end();
+        i++, position++) {
+      if (i->type() != json_spirit::array_type) {
+       stringstream json_string;
+       json_spirit::write(*i, json_string);
+       *ss << "element of the array "
+           << str << " must be a JSON array but "
+           << json_string.str() << " at position " << position
+           << " is of type " << i->type() << " instead" << std::endl;
+       return ERROR_LRC_ARRAY;
+      }
+      int r = parse_ruleset_step(str, i->get_array(), ss);
+      if (r)
+       return r;
+    }
+  }
+  return 0;
+}
+
+int ErasureCodeLRC::parse_ruleset_step(string description_string,
+                                      json_spirit::mArray description,
+                                      ostream *ss)
+{
+  stringstream json_string;
+  json_spirit::write(description, json_string);
+  string op;
+  string type;
+  int n;
+  int position = 0;
+  for (vector<json_spirit::mValue>::iterator i = description.begin();
+       i != description.end();
+       i++, position++) {
+    if ((position == 0 || position == 1) &&
+       i->type() != json_spirit::str_type) {
+      *ss << "element " << position << " of the array "
+         << json_string.str() << " found in " << description_string
+         << " must be a JSON string but is of type "
+         << i->type() << " instead" << std::endl;
+      return position == 0 ? ERROR_LRC_RULESET_OP : ERROR_LRC_RULESET_TYPE;
+    }
+    if (position == 2 && i->type() != json_spirit::int_type) {
+      *ss << "element " << position << " of the array "
+         << json_string.str() << " found in " << description_string
+         << " must be a JSON int but is of type "
+         << i->type() << " instead" << std::endl;
+      return ERROR_LRC_RULESET_N;
+    }
+
+    if (position == 0)
+      op = i->get_str();
+    if (position == 1)
+      type = i->get_str();
+    if (position == 2)
+      n = i->get_int();
+  }
+  ruleset_steps.push_back(Step(op, type, n));
+  return 0;
+}
+
+int ErasureCodeLRC::init(const map<string,string> &parameters,
+                        ostream *ss)
+{
+  int r;
+
+  r = parse(parameters, ss);
+  if (r)
+    return r;
+
+  json_spirit::mArray description;
+  r = layers_description(parameters, &description, ss);
+  if (r)
+    return r;
+
+  string description_string = parameters.find("layers")->second;
+
+  dout(10) << "init(" << description_string << ")" << dendl;
+
+  r = layers_parse(description_string, description, ss);
+  if (r)
+    return r;
+
+  r = layers_init();
+  if (r)
+    return r;
+
+  if (parameters.count("mapping") == 0) {
+    *ss << "the 'mapping' parameter is missing from " << parameters;
+    return ERROR_LRC_MAPPING;
+  }
+  string mapping = parameters.find("mapping")->second;
+  data_chunk_count = 0;
+  for(std::string::iterator it = mapping.begin(); it != mapping.end(); ++it) {
+    if (*it == 'D')
+      data_chunk_count++;
+  }
+  chunk_count = mapping.length();
+
+  return layers_sanity_checks(description_string, ss);
+}
+
+set<int> ErasureCodeLRC::get_erasures(const set<int> &want,
+                                     const set<int> &available) const
+{
+  set<int> result;
+  set_difference(want.begin(), want.end(),
+                available.begin(), available.end(),
+                inserter(result, result.end()));
+  return result;
+}
+
+unsigned int ErasureCodeLRC::get_chunk_size(unsigned int object_size) const
+{
+  return layers.front().erasure_code->get_chunk_size(object_size);
+}
+
+void p(const set<int> &s) { cerr << s; } // for gdb
+
+int ErasureCodeLRC::minimum_to_decode(const set<int> &want_to_read,
+                                     const set<int> &available_chunks,
+                                     set<int> *minimum)
+{
+  dout(20) << __func__ << " want_to_read " << want_to_read
+          << " available_chunks " << available_chunks << dendl;
+  {
+    set<int> erasures_total;
+    set<int> erasures_not_recovered;
+    set<int> erasures_want;
+    for (unsigned int i = 0; i < get_chunk_count(); ++i) {
+      if (available_chunks.count(i) == 0) {
+       erasures_total.insert(i);
+       erasures_not_recovered.insert(i);
+       if (want_to_read.count(i) != 0)
+         erasures_want.insert(i);
+      }
+    }
+
+    //
+    // Case 1:
+    //
+    // When no chunk is missing there is no need to read more than what
+    // is wanted.
+    //
+    if (erasures_want.empty()) {
+      *minimum = want_to_read;
+      dout(20) << __func__ << " minimum == want_to_read == "
+              << want_to_read << dendl;
+      return 0;
+    }
+
+    //
+    // Case 2: 
+    // 
+    // Try to recover erasures with as few chunks as possible.
+    //
+    for (vector<Layer>::reverse_iterator i = layers.rbegin();
+        i != layers.rend();
+        ++i) {
+      //
+      // If this layer has no chunk that we want, skip it.
+      //
+      set<int> layer_want;
+      set_intersection(want_to_read.begin(), want_to_read.end(),
+                      i->chunks_as_set.begin(), i->chunks_as_set.end(),
+                      inserter(layer_want, layer_want.end()));
+      if (layer_want.empty())
+       continue;
+      //
+      // Are some of the chunks we want missing ?
+      //
+      set<int> layer_erasures;
+      set_intersection(layer_want.begin(), layer_want.end(),
+                      erasures_want.begin(), erasures_want.end(),
+                      inserter(layer_erasures, layer_erasures.end()));
+      set<int> layer_minimum;
+      if (layer_erasures.empty()) {
+       //
+       // The chunks we want are available, this is the minimum we need
+       // to read.
+       //
+       layer_minimum = layer_want;
+      } else {
+       set<int> erasures;
+       set_intersection(i->chunks_as_set.begin(), i->chunks_as_set.end(),
+                        erasures_not_recovered.begin(), erasures_not_recovered.end(),
+                        inserter(erasures, erasures.end()));
+
+       if (erasures.size() > i->erasure_code->get_coding_chunk_count()) {
+         //
+         // There are too many erasures for this layer to recover: skip
+         // it and hope that an upper layer will be do better.
+         //
+         continue;
+       } else {
+         //
+         // Get all available chunks in that layer to recover the
+         // missing one(s).
+         //
+         set_difference(i->chunks_as_set.begin(), i->chunks_as_set.end(),
+                        erasures_not_recovered.begin(), erasures_not_recovered.end(),
+                        inserter(layer_minimum, layer_minimum.end()));
+         //
+         // Chunks recovered by this layer are removed from the list of
+         // erasures so that upper levels do not attempt to recover
+         // them.
+         //
+         for (set<int>::const_iterator j = erasures.begin();
+              j != erasures.end();
+              j++) {
+           erasures_not_recovered.erase(*j);
+           if (erasures_want.count(*j))
+             erasures_want.erase(*j);
+         }
+       }
+      }
+      minimum->insert(layer_minimum.begin(), layer_minimum.end());
+    }
+    if (erasures_want.empty()) {
+      minimum->insert(want_to_read.begin(), want_to_read.end());
+      for (set<int>::const_iterator i = erasures_total.begin();
+          i != erasures_total.end();
+          i++) {
+       if (minimum->count(*i))
+         minimum->erase(*i);
+      }
+      dout(20) << __func__ << " minimum = " << *minimum << dendl;
+      return 0;
+    }
+  }
+
+  {  
+    //
+    // Case 3:
+    //
+    // The previous strategy failed to recover from all erasures.
+    //
+    // Try to recover as many chunks as possible, even from layers
+    // that do not contain chunks that we want, in the hope that it
+    // will help the upper layers. 
+    //
+    set<int> erasures_total;
+    for (unsigned int i = 0; i < get_chunk_count(); ++i) {
+      if (available_chunks.count(i) == 0)
+       erasures_total.insert(i);
+    }
+  
+    for (vector<Layer>::reverse_iterator i = layers.rbegin();
+        i != layers.rend();
+        ++i) {
+      set<int> layer_erasures;
+      set_intersection(i->chunks_as_set.begin(), i->chunks_as_set.end(),
+                      erasures_total.begin(), erasures_total.end(),
+                      inserter(layer_erasures, layer_erasures.end()));
+      //
+      // If this layer has no erasure, skip it
+      //
+      if (layer_erasures.empty())
+       continue;
+
+      if (layer_erasures.size() > 0 &&
+         layer_erasures.size() <= i->erasure_code->get_coding_chunk_count()) {
+       //
+       // chunks recovered by this layer are removed from the list of
+       // erasures so that upper levels know they can rely on their
+       // availability
+       //
+       for (set<int>::const_iterator j = layer_erasures.begin();
+            j != layer_erasures.end();
+            j++) {
+         erasures_total.erase(*j);
+       }
+      }
+    }
+    if (erasures_total.empty()) {
+      //
+      // Do not try to be smart about what chunks are necessary to
+      // recover, use all available chunks. 
+      //
+      *minimum = available_chunks;
+      dout(20) << __func__ << " minimum == available_chunks == "
+              << available_chunks << dendl;
+      return 0;
+    }
+  }
+  
+  derr << __func__ << " not enough chunks in " << available_chunks
+       << " to read " << want_to_read << dendl;
+  return -EIO;
+}
+
+int ErasureCodeLRC::encode_chunks(const set<int> &want_to_encode,
+                                 map<int, bufferlist> *encoded)
+{
+  unsigned int top = layers.size();
+  for (vector<Layer>::reverse_iterator i = layers.rbegin();
+       i != layers.rend();
+       ++i) {
+    --top;
+    if (includes(i->chunks_as_set.begin(), i->chunks_as_set.end(),
+                want_to_encode.begin(), want_to_encode.end()))
+      break;
+  }
+
+  for (unsigned int i = top; i < layers.size(); ++i) {
+    const Layer &layer = layers[i];
+    set<int> layer_want_to_encode;
+    map<int, bufferlist> layer_encoded;
+    int j = 0;
+    for (vector<int>::const_iterator c = layer.chunks.begin();
+        c != layer.chunks.end();
+        c++) {
+      layer_encoded[j] = (*encoded)[*c];
+      if (want_to_encode.find(*c) != want_to_encode.end())
+       layer_want_to_encode.insert(j);
+      j++;
+    }
+    int err = layer.erasure_code->encode_chunks(layer_want_to_encode,
+                                               &layer_encoded);
+    if (err) {
+      derr << __func__ << " layer " << layer.chunks_map
+          << " failed with " << err << " trying to encode "
+          << layer_want_to_encode << dendl;
+      return err;
+    }
+  }
+  return 0;
+}
+
+int ErasureCodeLRC::decode_chunks(const set<int> &want_to_read,
+                                 const map<int, bufferlist> &chunks,
+                                 map<int, bufferlist> *decoded)
+{
+  set<int> available_chunks;
+  set<int> erasures;
+  for (unsigned int i = 0; i < get_chunk_count(); ++i) {
+    if (chunks.count(i) != 0)
+      available_chunks.insert(i);
+    else
+      erasures.insert(i);
+  }
+
+  set<int> want_to_read_erasures;
+
+  for (vector<Layer>::reverse_iterator layer = layers.rbegin();
+       layer != layers.rend();
+       ++layer) {
+    set<int> layer_erasures;
+    set_intersection(layer->chunks_as_set.begin(), layer->chunks_as_set.end(),
+                    erasures.begin(), erasures.end(),
+                    inserter(layer_erasures, layer_erasures.end()));
+
+    if (layer_erasures.size() >
+       layer->erasure_code->get_coding_chunk_count()) {
+      // skip because there are too many erasures for this layer to recover
+    } else if(layer_erasures.size() == 0) {
+      // skip because all chunks are already available
+    } else {
+      set<int> layer_want_to_read;
+      map<int, bufferlist> layer_chunks;
+      map<int, bufferlist> layer_decoded;
+      int j = 0;
+      for (vector<int>::const_iterator c = layer->chunks.begin();
+          c != layer->chunks.end();
+          c++) {
+       //
+       // Pick chunks from *decoded* instead of *chunks* to re-use
+       // chunks recovered by previous layers. In other words
+       // *chunks* does not change but *decoded* gradually improves
+       // as more layers recover from erasures.
+       //
+       if (erasures.count(*c) == 0)
+         layer_chunks[j] = (*decoded)[*c];
+       if (want_to_read.count(*c) != 0)
+         layer_want_to_read.insert(j);
+       layer_decoded[j] = (*decoded)[*c];
+       ++j;
+      }
+      int err = layer->erasure_code->decode_chunks(layer_want_to_read,
+                                                  layer_chunks,
+                                                  &layer_decoded);
+      if (err) {
+       derr << __func__ << " layer " << layer->chunks_map
+            << " failed with " << err << " trying to decode "
+            << layer_want_to_read << " with " << available_chunks << dendl;
+       return err;
+      }
+      j = 0;
+      for (vector<int>::const_iterator c = layer->chunks.begin();
+          c != layer->chunks.end();
+          c++) {
+       (*decoded)[*c] = layer_decoded[j];
+       ++j;
+       if (erasures.count(*c) != 0)
+         erasures.erase(*c);
+      }
+      want_to_read_erasures.clear();
+      set_intersection(erasures.begin(), erasures.end(),
+                      want_to_read.begin(), want_to_read.end(),
+                      inserter(want_to_read_erasures, want_to_read_erasures.end()));
+      if (want_to_read_erasures.size() == 0)
+       break;
+    }
+  }
+
+  if (want_to_read_erasures.size() > 0) {
+    derr << __func__ << " want to read " << want_to_read
+        << " with available_chunks = " << available_chunks
+        << " end up being unable to read " << want_to_read_erasures << dendl;
+    return -EIO;
+  } else {
+    return 0;
+  }
+}
diff --git a/src/erasure-code/LRC/ErasureCodeLRC.h b/src/erasure-code/LRC/ErasureCodeLRC.h
new file mode 100644 (file)
index 0000000..f12f429
--- /dev/null
@@ -0,0 +1,133 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_ERASURE_CODE_LRC_H
+#define CEPH_ERASURE_CODE_LRC_H
+
+#include "include/err.h"
+#include "json_spirit/json_spirit.h"
+#include "erasure-code/ErasureCode.h"
+
+#define ERROR_LRC_ARRAY                        -(MAX_ERRNO + 1)
+#define ERROR_LRC_OBJECT               -(MAX_ERRNO + 2)
+#define ERROR_LRC_INT                  -(MAX_ERRNO + 3)
+#define ERROR_LRC_STR                  -(MAX_ERRNO + 4)
+#define ERROR_LRC_PLUGIN               -(MAX_ERRNO + 5)
+#define ERROR_LRC_DESCRIPTION          -(MAX_ERRNO + 6)
+#define ERROR_LRC_PARSE_JSON           -(MAX_ERRNO + 7)
+#define ERROR_LRC_MAPPING              -(MAX_ERRNO + 8)
+#define ERROR_LRC_MAPPING_SIZE         -(MAX_ERRNO + 9)
+#define ERROR_LRC_FIRST_MAPPING                -(MAX_ERRNO + 10)
+#define ERROR_LRC_COUNT_CONSTRAINT     -(MAX_ERRNO + 11)
+#define ERROR_LRC_CONFIG_OPTIONS       -(MAX_ERRNO + 12)
+#define ERROR_LRC_LAYERS_COUNT         -(MAX_ERRNO + 13)
+#define ERROR_LRC_RULESET_OP           -(MAX_ERRNO + 14)
+#define ERROR_LRC_RULESET_TYPE         -(MAX_ERRNO + 15)
+#define ERROR_LRC_RULESET_N            -(MAX_ERRNO + 16)
+
+class ErasureCodeLRC : public ErasureCode {
+public:
+  struct Layer {
+    Layer(string _chunks_map) : chunks_map(_chunks_map) { }
+    ErasureCodeInterfaceRef erasure_code;
+    vector<int> data;
+    vector<int> coding;
+    vector<int> chunks;
+    set<int> chunks_as_set;
+    string chunks_map;
+    map<string,string> parameters;
+  };
+  vector<Layer> layers;
+  string directory;
+  unsigned int chunk_count;
+  unsigned int data_chunk_count;
+  string ruleset_root;
+  struct Step {
+    Step(string _op, string _type, int _n) :
+      op(_op),
+      type(_type),
+      n(_n) {}
+    string op;
+    string type;
+    int n;
+  };
+  vector<Step> ruleset_steps;
+
+  ErasureCodeLRC() :
+    ruleset_root("default")
+  {
+    ruleset_steps.push_back(Step("chooseleaf", "host", 0));
+  }
+
+  virtual ~ErasureCodeLRC() {}
+
+  set<int> get_erasures(const set<int> &need,
+                       const set<int> &available) const;
+
+  virtual int minimum_to_decode(const set<int> &want_to_read,
+                               const set<int> &available,
+                               set<int> *minimum);
+
+  int layer_minimum_to_decode(const Layer &layer,
+                             const set<int> &want,
+                             const set<int> &available,
+                             set<int> *minimum) const;
+
+  virtual int create_ruleset(const string &name,
+                            CrushWrapper &crush,
+                            ostream *ss) const;
+
+  virtual unsigned int get_chunk_count() const {
+    return chunk_count;
+  }
+
+  virtual unsigned int get_data_chunk_count() const {
+    return data_chunk_count;
+  }
+
+  virtual unsigned int get_chunk_size(unsigned int object_size) const;
+
+  int layer_encode(const Layer &layer, vector<bufferlist> &chunks);
+
+  virtual int encode_chunks(const set<int> &want_to_encode,
+                           map<int, bufferlist> *encoded);
+
+  virtual int decode_chunks(const set<int> &want_to_read,
+                           const map<int, bufferlist> &chunks,
+                           map<int, bufferlist> *decoded);
+
+  int init(const map<string,string> &parameters, ostream *ss);
+
+  virtual int parse(const map<string,string> &parameters, ostream *ss);
+
+  int parse_ruleset(const map<string,string> &parameters, ostream *ss);
+
+  int parse_ruleset_step(string description_string,
+                        json_spirit::mArray description,
+                        ostream *ss);
+
+  int layers_description(const map<string,string> &parameters,
+                        json_spirit::mArray *description,
+                        ostream *ss) const;
+  int layers_parse(string description_string,
+                  json_spirit::mArray description,
+                  ostream *ss);
+  int layers_init();
+  int layers_sanity_checks(string description_string,
+                          ostream *ss) const;
+};
+
+#endif
diff --git a/src/erasure-code/LRC/ErasureCodePluginLRC.cc b/src/erasure-code/LRC/ErasureCodePluginLRC.cc
new file mode 100644 (file)
index 0000000..ce4ae7c
--- /dev/null
@@ -0,0 +1,59 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ * 
+ */
+
+#include "ceph_ver.h"
+#include "common/debug.h"
+#include "erasure-code/ErasureCodePlugin.h"
+#include "ErasureCodeLRC.h"
+
+// re-include our assert
+#include "include/assert.h" 
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static ostream& _prefix(std::ostream* _dout)
+{
+  return *_dout << "ErasureCodePluginLRC: ";
+}
+
+class ErasureCodePluginLRC : public ErasureCodePlugin {
+public:
+  virtual int factory(const map<std::string,std::string> &parameters,
+                     ErasureCodeInterfaceRef *erasure_code) {
+    ErasureCodeLRC *interface;
+    interface = new ErasureCodeLRC();
+    stringstream ss;
+    assert(parameters.count("directory") != 0);
+    int r = interface->init(parameters, &ss);
+    if (r) {
+      derr << ss.str() << dendl;
+      delete interface;
+      return r;
+    }
+    *erasure_code = ErasureCodeInterfaceRef(interface);
+    return 0;
+  }
+};
+
+const char *__erasure_code_version() { return CEPH_GIT_NICE_VER; }
+
+int __erasure_code_init(char *plugin_name, char *directory)
+{
+  ErasureCodePluginRegistry &instance = ErasureCodePluginRegistry::instance();
+  return instance.add(plugin_name, new ErasureCodePluginLRC());
+}
diff --git a/src/erasure-code/LRC/Makefile.am b/src/erasure-code/LRC/Makefile.am
new file mode 100644 (file)
index 0000000..a1a9e23
--- /dev/null
@@ -0,0 +1,21 @@
+# LRC plugin
+noinst_HEADERS += \
+  erasure-code/LRC/ErasureCodeLRC.h
+
+LRC_sources = \
+  erasure-code/ErasureCode.cc \
+  erasure-code/LRC/ErasureCodePluginLRC.cc \
+  erasure-code/LRC/ErasureCodeLRC.cc
+
+erasure-code/LRC/ErasureCodePluginLRC.cc: ./ceph_ver.h
+
+libec_LRC_la_SOURCES = ${LRC_sources} common/str_map.cc
+libec_LRC_la_CFLAGS = ${AM_CFLAGS} 
+libec_LRC_la_CXXFLAGS= ${AM_CXXFLAGS} 
+libec_LRC_la_LIBADD = $(LIBCRUSH) $(PTHREAD_LIBS) $(LIBJSON_SPIRIT)
+libec_LRC_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0
+if LINUX
+libec_LRC_la_LDFLAGS += -export-symbols-regex '.*__erasure_code_.*'
+endif
+
+erasure_codelib_LTLIBRARIES += libec_LRC.la
index bada0a422f683f61665448cb3d88a5e09a836f87..223d16634aec6ce47425a92b9fa59fb4a3328760 100644 (file)
@@ -4,6 +4,7 @@ erasure_codelibdir = $(pkglibdir)/erasure-code
 erasure_codelib_LTLIBRARIES =  
 
 include erasure-code/jerasure/Makefile.am
+include erasure-code/LRC/Makefile.am
 
 if WITH_BETTER_YASM_ELF64
 include erasure-code/isa/Makefile.am
index e142f4e552b89724bd10d9288680396612ac05b0..d69e80e38e75d3e04df79412143038adf3954290 100644 (file)
@@ -144,6 +144,25 @@ endif
 check_PROGRAMS += unittest_erasure_code_plugin_isa
 endif
 
+unittest_erasure_code_LRC_SOURCES = \
+       test/erasure-code/TestErasureCodeLRC.cc \
+       ${LRC_sources}
+unittest_erasure_code_LRC_CXXFLAGS = $(UNITTEST_CXXFLAGS)
+unittest_erasure_code_LRC_LDADD = $(LIBOSD) $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
+if LINUX
+unittest_erasure_code_LRC_LDADD += -ldl
+endif
+check_PROGRAMS += unittest_erasure_code_LRC
+
+unittest_erasure_code_plugin_LRC_SOURCES = \
+       test/erasure-code/TestErasureCodePluginLRC.cc
+unittest_erasure_code_plugin_LRC_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
+unittest_erasure_code_plugin_LRC_LDADD = $(LIBOSD) $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
+if LINUX
+unittest_erasure_code_plugin_LRC_LDADD += -ldl
+endif
+check_PROGRAMS += unittest_erasure_code_plugin_LRC
+
 unittest_erasure_code_example_SOURCES = \
        erasure-code/ErasureCode.cc \
        test/erasure-code/TestErasureCodeExample.cc
diff --git a/src/test/erasure-code/TestErasureCodeLRC.cc b/src/test/erasure-code/TestErasureCodeLRC.cc
new file mode 100644 (file)
index 0000000..02e6082
--- /dev/null
@@ -0,0 +1,848 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph distributed storage system
+ *
+ * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <errno.h>
+
+#include "crush/CrushWrapper.h"
+#include "common/config.h"
+#include "include/stringify.h"
+#include "global/global_init.h"
+#include "erasure-code/LRC/ErasureCodeLRC.h"
+#include "common/ceph_argparse.h"
+#include "global/global_context.h"
+#include "gtest/gtest.h"
+
+TEST(ErasureCodeLRC, parse_ruleset)
+{
+  ErasureCodeLRC LRC;
+  EXPECT_EQ("default", LRC.ruleset_root);
+  EXPECT_EQ("host", LRC.ruleset_steps.front().type);
+
+  map<std::string,std::string> parameters;
+  parameters["ruleset-root"] = "other";
+  EXPECT_EQ(0, LRC.parse_ruleset(parameters, &cerr));
+  EXPECT_EQ("other", LRC.ruleset_root);
+
+  parameters["ruleset-steps"] = "[]";
+  EXPECT_EQ(0, LRC.parse_ruleset(parameters, &cerr));
+  EXPECT_TRUE(LRC.ruleset_steps.empty());
+
+  parameters["ruleset-steps"] = "0";
+  EXPECT_EQ(ERROR_LRC_ARRAY, LRC.parse_ruleset(parameters, &cerr));
+
+  parameters["ruleset-steps"] = "{";
+  EXPECT_EQ(ERROR_LRC_PARSE_JSON, LRC.parse_ruleset(parameters, &cerr));
+
+  parameters["ruleset-steps"] = "[0]";
+  EXPECT_EQ(ERROR_LRC_ARRAY, LRC.parse_ruleset(parameters, &cerr));
+
+  parameters["ruleset-steps"] = "[[0]]";
+  EXPECT_EQ(ERROR_LRC_RULESET_OP, LRC.parse_ruleset(parameters, &cerr));
+
+  parameters["ruleset-steps"] = "[[\"choose\", 0]]";
+  EXPECT_EQ(ERROR_LRC_RULESET_TYPE, LRC.parse_ruleset(parameters, &cerr));
+
+  parameters["ruleset-steps"] = "[[\"choose\", \"host\", []]]";
+  EXPECT_EQ(ERROR_LRC_RULESET_N, LRC.parse_ruleset(parameters, &cerr));
+
+  parameters["ruleset-steps"] = "[[\"choose\", \"host\", 2]]";
+  EXPECT_EQ(0, LRC.parse_ruleset(parameters, &cerr));
+
+  const ErasureCodeLRC::Step &step = LRC.ruleset_steps.front();
+  EXPECT_EQ("choose", step.op);
+  EXPECT_EQ("host", step.type);
+  EXPECT_EQ(2, step.n);
+
+  parameters["ruleset-steps"] =
+    "["
+    " [\"choose\", \"rack\", 2], "
+    " [\"chooseleaf\", \"host\", 5], "
+    "]";
+  EXPECT_EQ(0, LRC.parse_ruleset(parameters, &cerr));
+  EXPECT_EQ(2U, LRC.ruleset_steps.size());
+  {
+    const ErasureCodeLRC::Step &step = LRC.ruleset_steps[0];
+    EXPECT_EQ("choose", step.op);
+    EXPECT_EQ("rack", step.type);
+    EXPECT_EQ(2, step.n);
+  }
+  {
+    const ErasureCodeLRC::Step &step = LRC.ruleset_steps[1];
+    EXPECT_EQ("chooseleaf", step.op);
+    EXPECT_EQ("host", step.type);
+    EXPECT_EQ(5, step.n);
+  }
+}
+
+TEST(ErasureCodeTest, create_ruleset)
+{
+  CrushWrapper *c = new CrushWrapper;
+  c->create();
+  int root_type = 3;
+  c->set_type_name(root_type, "root");
+  int rack_type = 2;
+  c->set_type_name(rack_type, "rack");
+  int host_type = 1;
+  c->set_type_name(host_type, "host");
+  int osd_type = 0;
+  c->set_type_name(osd_type, "osd");
+
+  int rootno;
+  c->add_bucket(0, CRUSH_BUCKET_STRAW, CRUSH_HASH_RJENKINS1,
+               root_type, 0, NULL, NULL, &rootno);
+  c->set_item_name(rootno, "default");
+
+  map<string,string> loc;
+  loc["root"] = "default";
+
+  //
+  // Set all to 10 so that the item number it trivial to decompose
+  // into rack/host/osd.
+  //
+  int num_rack;
+  int num_host;
+  int num_osd;
+  num_rack = num_host = num_osd = 10;
+  int osd = 0;
+  for (int r=0; r<num_rack; ++r) {
+    loc["rack"] = string("rack-") + stringify(r);
+    for (int h=0; h<num_host; ++h) {
+      loc["host"] = string("host-") + stringify(r) + string("-") + stringify(h);
+      for (int o=0; o<num_osd; ++o, ++osd) {
+       c->insert_item(g_ceph_context, osd, 1.0, string("osd.") + stringify(osd), loc);
+      }
+    }
+  }
+
+  ErasureCodeLRC LRC;
+  EXPECT_EQ(0, LRC.create_ruleset("rule1", *c, &cerr));
+
+  map<std::string,std::string> parameters;
+  unsigned int racks = 2;
+  unsigned int hosts = 5;
+  parameters["ruleset-steps"] =
+    "["
+    " [\"choose\", \"rack\", " + stringify(racks) + "], "
+    " [\"chooseleaf\", \"host\", " + stringify(hosts) + "], "
+    "]";
+  const char *rule_name = "rule2";
+  EXPECT_EQ(0, LRC.parse_ruleset(parameters, &cerr));
+  EXPECT_EQ(1, LRC.create_ruleset(rule_name, *c, &cerr));
+
+  vector<__u32> weight;
+  for (int o = 0; o < c->get_max_devices(); o++)
+    weight.push_back(0x10000);
+  int rule = c->get_rule_id(rule_name);
+  vector<int> out;
+  unsigned int n = racks * hosts;
+  c->do_rule(rule, 1, out, n, weight);
+  EXPECT_EQ(n, out.size());
+  //
+  // check that the first five are in the same rack and the next five
+  // in the same rack
+  //
+  int first_rack = out[0] / num_host / num_osd;
+  EXPECT_EQ(first_rack, out[1] / num_host / num_osd);
+  EXPECT_EQ(first_rack, out[2] / num_host / num_osd);
+  EXPECT_EQ(first_rack, out[3] / num_host / num_osd);
+  EXPECT_EQ(first_rack, out[4] / num_host / num_osd);
+  int second_rack = out[5] / num_host / num_osd;
+  EXPECT_EQ(second_rack, out[6] / num_host / num_osd);
+  EXPECT_EQ(second_rack, out[7] / num_host / num_osd);
+  EXPECT_EQ(second_rack, out[8] / num_host / num_osd);
+  EXPECT_EQ(second_rack, out[9] / num_host / num_osd);
+}
+
+TEST(ErasureCodeLRC, layers_description)
+{
+  ErasureCodeLRC LRC;
+  map<std::string,std::string> parameters;
+
+  json_spirit::mArray description;
+  EXPECT_EQ(ERROR_LRC_DESCRIPTION,
+           LRC.layers_description(parameters, &description, &cerr));
+
+  {
+    const char *description_string = "\"not an array\"";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(ERROR_LRC_ARRAY,
+             LRC.layers_description(parameters, &description, &cerr));
+  }
+  {
+    const char *description_string = "invalid json";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(ERROR_LRC_PARSE_JSON,
+             LRC.layers_description(parameters, &description, &cerr));
+  }
+  {
+    const char *description_string = "[]";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+  }
+}
+
+TEST(ErasureCodeLRC, layers_parse)
+{
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+
+    const char *description_string ="[ 0 ]";
+    parameters["layers"] = description_string;
+    json_spirit::mArray description;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+    EXPECT_EQ(ERROR_LRC_ARRAY,
+             LRC.layers_parse(description_string, description, &cerr));
+  }
+
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+
+    const char *description_string ="[ [ 0 ] ]";
+    parameters["layers"] = description_string;
+    json_spirit::mArray description;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+    EXPECT_EQ(ERROR_LRC_STR,
+             LRC.layers_parse(description_string, description, &cerr));
+  }
+
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+
+    const char *description_string ="[ [ \"\", 0 ] ]";
+    parameters["layers"] = description_string;
+    json_spirit::mArray description;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+    EXPECT_EQ(ERROR_LRC_CONFIG_OPTIONS,
+             LRC.layers_parse(description_string, description, &cerr));
+  }
+
+  //
+  // The second element can be an object describing the plugin
+  // parameters.
+  //
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+
+    const char *description_string ="[ [ \"\", { \"a\": \"b\" }, \"ignored\" ] ]";
+    parameters["layers"] = description_string;
+    json_spirit::mArray description;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+    EXPECT_EQ(0, LRC.layers_parse(description_string, description, &cerr));
+    EXPECT_EQ("b", LRC.layers.front().parameters["a"]);
+  }
+
+  //
+  // The second element can be a str_map parseable string describing the plugin
+  // parameters.
+  //
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+
+    const char *description_string ="[ [ \"\", \"a=b c=d\" ] ]";
+    parameters["layers"] = description_string;
+    json_spirit::mArray description;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+    EXPECT_EQ(0, LRC.layers_parse(description_string, description, &cerr));
+    EXPECT_EQ("b", LRC.layers.front().parameters["a"]);
+    EXPECT_EQ("d", LRC.layers.front().parameters["c"]);
+  }
+
+}
+
+TEST(ErasureCodeLRC, layers_sanity_checks)
+{
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    parameters["mapping"] =
+           "__DDD__DD";
+    parameters["directory"] = ".libs";
+    const char *description_string =
+      "[ "
+      "  [ \"_cDDD_cDD\", \"\" ],"
+      "  [ \"c_DDD____\", \"\" ],"
+      "  [ \"_____cDDD\", \"\" ],"
+      "]";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(0, LRC.init(parameters, &cerr));
+  }
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    const char *description_string =
+      "[ "
+      "]";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(ERROR_LRC_MAPPING, LRC.init(parameters, &cerr));
+  }
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    parameters["mapping"] = "";
+    const char *description_string =
+      "[ "
+      "]";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(ERROR_LRC_LAYERS_COUNT, LRC.init(parameters, &cerr));
+  }
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    parameters["directory"] = ".libs";
+    parameters["mapping"] =
+           "AA";
+    const char *description_string =
+      "[ "
+      "  [ \"AA??\", \"\" ], "
+      "  [ \"AA\", \"\" ], "
+      "  [ \"AA\", \"\" ], "
+      "]";
+    parameters["layers"] = description_string;
+    EXPECT_EQ(ERROR_LRC_MAPPING_SIZE, LRC.init(parameters, &cerr));
+  }
+}
+
+TEST(ErasureCodeLRC, layers_init)
+{
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+
+    const char *description_string =
+      "[ "
+      "  [ \"_cDDD_cDD_\", \"directory=.libs\" ],"
+      "]";
+    parameters["layers"] = description_string;
+    parameters["directory"] = ".libs";
+    json_spirit::mArray description;
+    EXPECT_EQ(0, LRC.layers_description(parameters, &description, &cerr));
+    EXPECT_EQ(0, LRC.layers_parse(description_string, description, &cerr));
+    EXPECT_EQ(0, LRC.layers_init());
+    EXPECT_EQ("5", LRC.layers.front().parameters["k"]);
+    EXPECT_EQ("2", LRC.layers.front().parameters["m"]);
+    EXPECT_EQ("jerasure", LRC.layers.front().parameters["plugin"]);
+    EXPECT_EQ("reed_sol_van", LRC.layers.front().parameters["technique"]);
+  }
+}
+
+TEST(ErasureCodeLRC, init)
+{
+  ErasureCodeLRC LRC;
+  map<std::string,std::string> parameters;
+  parameters["mapping"] =
+    "__DDD__DD";
+  const char *description_string =
+    "[ "
+    "  [ \"_cDDD_cDD\", \"\" ],"
+    "  [ \"c_DDD____\", \"\" ],"
+    "  [ \"_____cDDD\", \"\" ],"
+    "]";
+  parameters["layers"] = description_string;
+  parameters["directory"] = ".libs";
+  EXPECT_EQ(0, LRC.init(parameters, &cerr));
+}
+
+TEST(ErasureCodeLRC, minimum_to_decode)
+{
+  // trivial : no erasures, the minimum is want_to_read
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    parameters["mapping"] =
+      "__DDD__DD";
+    const char *description_string =
+      "[ "
+      "  [ \"_cDDD_cDD\", \"\" ],"
+      "  [ \"c_DDD____\", \"\" ],"
+      "  [ \"_____cDDD\", \"\" ],"
+      "]";
+    parameters["layers"] = description_string;
+    parameters["directory"] = ".libs";
+    EXPECT_EQ(0, LRC.init(parameters, &cerr));
+    set<int> want_to_read;
+    want_to_read.insert(1);
+    set<int> available_chunks;
+    available_chunks.insert(1);
+    available_chunks.insert(2);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(want_to_read, minimum);
+  }
+  // locally repairable erasure
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    parameters["mapping"] =
+           "__DDD__DD_";
+    const char *description_string =
+      "[ "
+      "  [ \"_cDDD_cDD_\", \"\" ],"
+      "  [ \"c_DDD_____\", \"\" ],"
+      "  [ \"_____cDDD_\", \"\" ],"
+      "  [ \"_____DDDDc\", \"\" ],"
+      "]";
+    parameters["layers"] = description_string;
+    parameters["directory"] = ".libs";
+    EXPECT_EQ(0, LRC.init(parameters, &cerr));
+    EXPECT_EQ(parameters["mapping"].length(),
+             LRC.get_chunk_count());
+    {
+      // want to read the last chunk
+      set<int> want_to_read;
+      want_to_read.insert(LRC.get_chunk_count() - 1);
+      // all chunks are available except the last chunk
+      set<int> available_chunks;
+      for (int i = 0; i < (int)LRC.get_chunk_count() - 1; i++)
+       available_chunks.insert(i);
+      // _____DDDDc can recover c
+      set<int> minimum;
+      EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+      set<int> expected_minimum;
+      expected_minimum.insert(5);
+      expected_minimum.insert(6);
+      expected_minimum.insert(7);
+      expected_minimum.insert(8);
+      EXPECT_EQ(expected_minimum, minimum);
+    }
+    {
+      set<int> want_to_read;
+      want_to_read.insert(0);
+      set<int> available_chunks;
+      for (int i = 1; i < (int)LRC.get_chunk_count(); i++)
+       available_chunks.insert(i);
+      set<int> minimum;
+      EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+      set<int> expected_minimum;
+      expected_minimum.insert(2);
+      expected_minimum.insert(3);
+      expected_minimum.insert(4);
+      EXPECT_EQ(expected_minimum, minimum);
+    }
+  }
+  // implicit parity required
+  {
+    ErasureCodeLRC LRC;
+    map<std::string,std::string> parameters;
+    parameters["mapping"] =
+           "__DDD__DD";
+    const char *description_string =
+      "[ "
+      "  [ \"_cDDD_cDD\", \"\" ],"
+      "  [ \"c_DDD____\", \"\" ],"
+      "  [ \"_____cDDD\", \"\" ],"
+      "]";
+    parameters["layers"] = description_string;
+    parameters["directory"] = ".libs";
+    EXPECT_EQ(0, LRC.init(parameters, &cerr));
+    EXPECT_EQ(parameters["mapping"].length(),
+             LRC.get_chunk_count());
+    set<int> want_to_read;
+    want_to_read.insert(8);
+    //
+    // unable to recover, too many chunks missing
+    //
+    {
+      set<int> available_chunks;
+      available_chunks.insert(0);
+      available_chunks.insert(1);
+      // missing             (2)
+      // missing             (3)
+      available_chunks.insert(4);
+      available_chunks.insert(5);
+      available_chunks.insert(6);
+      // missing             (7)
+      // missing             (8)
+      set<int> minimum;
+      EXPECT_EQ(-EIO, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    }
+    //
+    // We want to read chunk 8 and encoding was done with
+    //
+    //     _cDDD_cDD
+    //    c_DDD____
+    //    _____cDDD
+    //
+    // First strategy fails:
+    //
+    // 012345678
+    // xxXXXxxXX  initial chunks
+    // xx.XXxx..  missing (2, 7, 8)
+    // _____cDDD  fail : can recover 1 but 2 are missing
+    // c_DDD____  ignored because 8 is not used (i.e. _)
+    // _cDDD_cDD  fail : can recover 2 but 3 are missing
+    //
+    // Second strategy succeeds:
+    //
+    // 012345678
+    // xxXXXxxXX  initial chunks
+    // xx.XXxx..  missing (2, 7, 8)
+    // _____cDDD  fail : can recover 1 but 2 are missing
+    // c_DDD____  success: recovers chunk 2
+    // _cDDD_cDD  success: recovers chunk 7, 8
+    //
+    {
+      set<int> available_chunks;
+      available_chunks.insert(0);
+      available_chunks.insert(1);
+      // missing             (2)
+      available_chunks.insert(3);
+      available_chunks.insert(4);
+      available_chunks.insert(5);
+      available_chunks.insert(6);
+      // missing             (7)
+      // missing             (8)
+      set<int> minimum;
+      EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+      EXPECT_EQ(available_chunks, minimum);
+    }
+  }
+}
+
+TEST(ErasureCodeLRC, encode_decode)
+{
+  ErasureCodeLRC LRC;
+  map<std::string,std::string> parameters;
+  parameters["mapping"] =
+    "__DD__DD";
+  const char *description_string =
+    "[ "
+    "  [ \"_cDD_cDD\", \"\" ]," // global layer 
+    "  [ \"c_DD____\", \"\" ]," // first local layer 
+    "  [ \"____cDDD\", \"\" ]," // second local layer 
+    "]";
+  parameters["layers"] = description_string;
+  parameters["directory"] = ".libs";
+  EXPECT_EQ(0, LRC.init(parameters, &cerr));
+  EXPECT_EQ(4U, LRC.get_data_chunk_count());
+  unsigned int stripe_width = g_conf->osd_pool_erasure_code_stripe_width;
+  unsigned int chunk_size = stripe_width / LRC.get_data_chunk_count();
+  EXPECT_EQ(chunk_size, LRC.get_chunk_size(stripe_width));
+  set<int> want_to_encode;
+  map<int, bufferlist> encoded;
+  for (unsigned int i = 0; i < LRC.get_chunk_count(); ++i) {
+    want_to_encode.insert(i);
+    bufferptr ptr(buffer::create_page_aligned(chunk_size));
+    encoded[i].push_front(ptr);
+  }
+  const vector<int> &mapping = LRC.get_chunk_mapping();
+  char c = 'A';
+  for (unsigned int i = 0; i < LRC.get_data_chunk_count(); i++) {
+    int j = mapping[i];
+    string s(chunk_size, c);
+    encoded[j].clear();
+    encoded[j].append(s);
+    c++;
+  }
+  EXPECT_EQ(0, LRC.encode_chunks(want_to_encode, &encoded));
+
+  {
+    map<int, bufferlist> chunks;
+    chunks[4] = encoded[4];
+    chunks[5] = encoded[5];
+    chunks[6] = encoded[6];
+    set<int> want_to_read;
+    want_to_read.insert(7);
+    set<int> available_chunks;
+    available_chunks.insert(4);
+    available_chunks.insert(5);
+    available_chunks.insert(6);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    // only need three chunks from the second local layer
+    EXPECT_EQ(3U, minimum.size()); 
+    EXPECT_EQ(1U, minimum.count(4));
+    EXPECT_EQ(1U, minimum.count(5));
+    EXPECT_EQ(1U, minimum.count(6));
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, chunks, &decoded));
+    string s(chunk_size, 'D');
+    EXPECT_EQ(s, string(decoded[7].c_str(), chunk_size));
+  }
+  {
+    set<int> want_to_read;
+    want_to_read.insert(2);
+    map<int, bufferlist> chunks;
+    chunks[1] = encoded[1];
+    chunks[3] = encoded[3];
+    chunks[5] = encoded[5];
+    chunks[6] = encoded[6];
+    chunks[7] = encoded[7];
+    set<int> available_chunks;
+    available_chunks.insert(1);
+    available_chunks.insert(3);
+    available_chunks.insert(5);
+    available_chunks.insert(6);
+    available_chunks.insert(7);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(5U, minimum.size()); 
+    EXPECT_EQ(available_chunks, minimum);
+
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, encoded, &decoded));
+    string s(chunk_size, 'A');
+    EXPECT_EQ(s, string(decoded[2].c_str(), chunk_size));
+  }
+  {
+    set<int> want_to_read;
+    want_to_read.insert(3);
+    want_to_read.insert(6);
+    want_to_read.insert(7);
+    set<int> available_chunks;
+    available_chunks.insert(0);
+    available_chunks.insert(1);
+    available_chunks.insert(2);
+    // available_chunks.insert(3);
+    available_chunks.insert(4);
+    available_chunks.insert(5);
+    // available_chunks.insert(6);
+    // available_chunks.insert(7);
+    encoded.erase(3);
+    encoded.erase(6);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(4U, minimum.size()); 
+    // only need two chunks from the first local layer
+    EXPECT_EQ(1U, minimum.count(0));
+    EXPECT_EQ(1U, minimum.count(2));
+    // the above chunks will rebuild chunk 3 and the global layer only needs
+    // three more chunks to reach the required amount of chunks (4) to recover
+    // the last two
+    EXPECT_EQ(1U, minimum.count(1));
+    EXPECT_EQ(1U, minimum.count(2));
+    EXPECT_EQ(1U, minimum.count(5));
+
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, encoded, &decoded));
+    {
+      string s(chunk_size, 'B');
+      EXPECT_EQ(s, string(decoded[3].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'C');
+      EXPECT_EQ(s, string(decoded[6].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'D');
+      EXPECT_EQ(s, string(decoded[7].c_str(), chunk_size));
+    }
+  }
+}
+
+TEST(ErasureCodeLRC, encode_decode_2)
+{
+  ErasureCodeLRC LRC;
+  map<std::string,std::string> parameters;
+  parameters["mapping"] =
+    "DD__DD__";
+  const char *description_string =
+    "[ "
+    " [ \"DDc_DDc_\", \"\" ],"
+    " [ \"DDDc____\", \"\" ],"
+    " [ \"____DDDc\", \"\" ],"
+    "]";
+  parameters["layers"] = description_string;
+  parameters["directory"] = ".libs";
+  EXPECT_EQ(0, LRC.init(parameters, &cerr));
+  EXPECT_EQ(4U, LRC.get_data_chunk_count());
+  unsigned int stripe_width = g_conf->osd_pool_erasure_code_stripe_width;
+  unsigned int chunk_size = stripe_width / LRC.get_data_chunk_count();
+  EXPECT_EQ(chunk_size, LRC.get_chunk_size(stripe_width));
+  set<int> want_to_encode;
+  map<int, bufferlist> encoded;
+  for (unsigned int i = 0; i < LRC.get_chunk_count(); ++i) {
+    want_to_encode.insert(i);
+    bufferptr ptr(buffer::create_page_aligned(chunk_size));
+    encoded[i].push_front(ptr);
+  }
+  const vector<int> &mapping = LRC.get_chunk_mapping();
+  char c = 'A';
+  for (unsigned int i = 0; i < LRC.get_data_chunk_count(); i++) {
+    int j = mapping[i];
+    string s(chunk_size, c);
+    encoded[j].clear();
+    encoded[j].append(s);
+    c++;
+  }
+  EXPECT_EQ(0, LRC.encode_chunks(want_to_encode, &encoded));
+
+  {
+    set<int> want_to_read;
+    want_to_read.insert(0);
+    map<int, bufferlist> chunks;
+    chunks[1] = encoded[1];
+    chunks[3] = encoded[3];
+    chunks[4] = encoded[4];
+    chunks[5] = encoded[5];
+    chunks[6] = encoded[6];
+    chunks[7] = encoded[7];
+    set<int> available_chunks;
+    available_chunks.insert(1);
+    available_chunks.insert(3);
+    available_chunks.insert(4);
+    available_chunks.insert(5);
+    available_chunks.insert(6);
+    available_chunks.insert(7);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(4U, minimum.size()); 
+    EXPECT_EQ(1U, minimum.count(1));
+    EXPECT_EQ(1U, minimum.count(4));
+    EXPECT_EQ(1U, minimum.count(5));
+    EXPECT_EQ(1U, minimum.count(6));
+
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, chunks, &decoded));
+    string s(chunk_size, 'A');
+    EXPECT_EQ(s, string(decoded[0].c_str(), chunk_size));
+  }
+  {
+    set<int> want_to_read;
+    for (unsigned int i = 0; i < LRC.get_chunk_count(); i++)
+      want_to_read.insert(i);
+    map<int, bufferlist> chunks;
+    chunks[1] = encoded[1];
+    chunks[3] = encoded[3];
+    chunks[5] = encoded[5];
+    chunks[6] = encoded[6];
+    chunks[7] = encoded[7];
+    set<int> available_chunks;
+    available_chunks.insert(1);
+    available_chunks.insert(3);
+    available_chunks.insert(5);
+    available_chunks.insert(6);
+    available_chunks.insert(7);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(5U, minimum.size()); 
+    EXPECT_EQ(1U, minimum.count(1));
+    EXPECT_EQ(1U, minimum.count(3));
+    EXPECT_EQ(1U, minimum.count(5));
+    EXPECT_EQ(1U, minimum.count(6));
+    EXPECT_EQ(1U, minimum.count(7));
+
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, chunks, &decoded));
+    {
+      string s(chunk_size, 'A');
+      EXPECT_EQ(s, string(decoded[0].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'B');
+      EXPECT_EQ(s, string(decoded[1].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'C');
+      EXPECT_EQ(s, string(decoded[4].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'D');
+      EXPECT_EQ(s, string(decoded[5].c_str(), chunk_size));
+    }
+  }
+  {
+    set<int> want_to_read;
+    for (unsigned int i = 0; i < LRC.get_chunk_count(); i++)
+      want_to_read.insert(i);
+    map<int, bufferlist> chunks;
+    chunks[1] = encoded[1];
+    chunks[3] = encoded[3];
+    chunks[5] = encoded[5];
+    chunks[6] = encoded[6];
+    chunks[7] = encoded[7];
+    set<int> available_chunks;
+    available_chunks.insert(1);
+    available_chunks.insert(3);
+    available_chunks.insert(5);
+    available_chunks.insert(6);
+    available_chunks.insert(7);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(5U, minimum.size()); 
+    EXPECT_EQ(1U, minimum.count(1));
+    EXPECT_EQ(1U, minimum.count(3));
+    EXPECT_EQ(1U, minimum.count(5));
+    EXPECT_EQ(1U, minimum.count(6));
+    EXPECT_EQ(1U, minimum.count(7));
+
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, chunks, &decoded));
+    {
+      string s(chunk_size, 'A');
+      EXPECT_EQ(s, string(decoded[0].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'B');
+      EXPECT_EQ(s, string(decoded[1].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'C');
+      EXPECT_EQ(s, string(decoded[4].c_str(), chunk_size));
+    }
+    {
+      string s(chunk_size, 'D');
+      EXPECT_EQ(s, string(decoded[5].c_str(), chunk_size));
+    }
+  }
+  {
+    set<int> want_to_read;
+    want_to_read.insert(6);
+    map<int, bufferlist> chunks;
+    chunks[0] = encoded[0];
+    chunks[1] = encoded[1];
+    chunks[3] = encoded[3];
+    chunks[5] = encoded[5];
+    chunks[7] = encoded[7];
+    set<int> available_chunks;
+    available_chunks.insert(0);
+    available_chunks.insert(1);
+    available_chunks.insert(3);
+    available_chunks.insert(5);
+    available_chunks.insert(7);
+    set<int> minimum;
+    EXPECT_EQ(0, LRC.minimum_to_decode(want_to_read, available_chunks, &minimum));
+    EXPECT_EQ(available_chunks, minimum); 
+
+    map<int, bufferlist> decoded;
+    EXPECT_EQ(0, LRC.decode(want_to_read, chunks, &decoded));
+  }
+}
+
+int main(int argc, char **argv)
+{
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+
+  global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ;
+ *   make -j4 && valgrind --tool=memcheck --leak-check=full \
+ *      ./unittest_erasure_code_LRC \
+ *      --gtest_filter=*.* --log-to-stderr=true --debug-osd=20"
+ * End:
+ */
diff --git a/src/test/erasure-code/TestErasureCodePluginLRC.cc b/src/test/erasure-code/TestErasureCodePluginLRC.cc
new file mode 100644 (file)
index 0000000..d2f300d
--- /dev/null
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph distributed storage system
+ *
+ * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
+ *
+ * Author: Loic Dachary <loic@dachary.org>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <errno.h>
+#include "arch/probe.h"
+#include "arch/intel.h"
+#include "global/global_init.h"
+#include "erasure-code/ErasureCodePlugin.h"
+#include "common/ceph_argparse.h"
+#include "global/global_context.h"
+#include "gtest/gtest.h"
+
+TEST(ErasureCodePlugin, factory)
+{
+  ErasureCodePluginRegistry &instance = ErasureCodePluginRegistry::instance();
+  map<std::string,std::string> parameters;
+  parameters["directory"] = ".libs";
+  parameters["mapping"] = "DD_";
+  parameters["layers"] = "[ [ \"DDc\", \"\" ] ]";
+  ErasureCodeInterfaceRef erasure_code;
+  EXPECT_FALSE(erasure_code);
+  EXPECT_EQ(0, instance.factory("LRC", parameters, &erasure_code, cerr));
+  EXPECT_TRUE(erasure_code);
+}
+
+int main(int argc, char **argv)
+{
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+
+  global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make -j4 &&
+ *   make unittest_erasure_code_plugin_LRC &&
+ *   valgrind --tool=memcheck ./unittest_erasure_code_plugin_LRC \
+ *      --gtest_filter=*.* --log-to-stderr=true --debug-osd=20"
+ * End:
+ */