with new decode, minimum_to_decode in ErasureCodeInterface.
Updated ECBackend, ECUtil to use the new functions.Fixed the test cases
to use the new functions.
Fixed the review comments.
Authors: Myna, Elita.
Signed-off-by: Myna Vajha <mynaramana@gmail.com>
return 0;
}
+int ErasureCode::minimum_to_decode(const set<int> &want_to_read,
+ const set<int> &available_chunks,
+ map<int, vector<pair<int, int>>> *minimum)
+{
+ set<int> minimum_shard_ids;
+ int r = minimum_to_decode(want_to_read, available_chunks, &minimum_shard_ids);
+ if (r != 0) {
+ return r;
+ }
+ vector<pair<int, int>> default_subchunks;
+ default_subchunks.push_back(make_pair(0, get_sub_chunk_count()));
+ for(auto &&id:minimum_shard_ids){
+ minimum->insert(make_pair(id, default_subchunks));
+ }
+ return 0;
+}
+
int ErasureCode::minimum_to_decode_with_cost(const set<int> &want_to_read,
const map<int, int> &available,
set<int> *minimum)
return decode_chunks(want_to_read, chunks, decoded);
}
+int ErasureCode::decode(const set<int> &want_to_read,
+ const map<int, bufferlist> &chunks,
+ map<int, bufferlist> *decoded, int chunk_size)
+{
+ return decode(want_to_read, chunks, decoded);
+}
+
int ErasureCode::decode_chunks(const set<int> &want_to_read,
const map<int, bufferlist> &chunks,
map<int, bufferlist> *decoded)
return get_chunk_count() - get_data_chunk_count();
}
- int minimum_to_decode(const std::set<int> &want_to_read,
+ virtual int get_sub_chunk_count() {
+ return 1;
+ }
+
+ virtual int minimum_to_decode(const std::set<int> &want_to_read,
const std::set<int> &available_chunks,
- std::set<int> *minimum) override;
+ std::set<int> *minimum);
+
+ virtual int minimum_to_decode(const std::set<int> &want_to_read,
+ const std::set<int> &available,
+ std::map<int, std::vector<std::pair<int, int>>> *minimum)override;
int minimum_to_decode_with_cost(const std::set<int> &want_to_read,
const std::map<int, int> &available,
int encode_chunks(const std::set<int> &want_to_encode,
std::map<int, bufferlist> *encoded) override;
- int decode(const std::set<int> &want_to_read,
+ virtual int decode(const std::set<int> &want_to_read,
const std::map<int, bufferlist> &chunks,
- std::map<int, bufferlist> *decoded) override;
+ std::map<int, bufferlist> *decoded);
+
+ int decode(const std::set<int> &want_to_read,
+ const std::map<int, bufferlist> &chunks,
+ std::map<int, bufferlist> *decoded, int chunk_size) override;
int decode_chunks(const std::set<int> &want_to_read,
const std::map<int, bufferlist> &chunks,
*/
virtual unsigned int get_coding_chunk_count() const = 0;
+ /**
+ * Return the number of sub chunks chunks created by a call to the
+ * **encode** method. Each chunk can be viewed as union of sub-chunks
+ * For the case of array codes, the sub-chunk count > 1, where as the
+ * scalar codes have sub-chunk count = 1.
+ *
+ * @return the number of sub-chunks per chunk created by encode()
+ */
+ virtual int get_sub_chunk_count() = 0;
+
/**
* Return the size (in bytes) of a single chunk created by a call
* to the **decode** method. The returned size multiplied by
*
* @param [in] want_to_read chunk indexes to be decoded
* @param [in] available chunk indexes containing valid data
- * @param [out] minimum chunk indexes to retrieve
+ * @param [out] minimum chunk indexes and corresponding
+ * subchunk index offsets, count.
* @return **0** on success or a negative errno on error.
*/
virtual int minimum_to_decode(const std::set<int> &want_to_read,
const std::set<int> &available,
- std::set<int> *minimum) = 0;
+ std::map<int, std::vector<std::pair<int, int>>>
+ *minimum) = 0;
/**
* Compute the smallest subset of **available** chunks that needs
* @param [in] want_to_read chunk indexes to be decoded
* @param [in] chunks map chunk indexes to chunk data
* @param [out] decoded map chunk indexes to chunk data
+ * @param [in] chunk_size chunk size
* @return **0** on success or a negative errno on error.
*/
virtual int decode(const std::set<int> &want_to_read,
const std::map<int, bufferlist> &chunks,
- std::map<int, bufferlist> *decoded) = 0;
+ std::map<int, bufferlist> *decoded, int chunk_size) = 0;
virtual int decode_chunks(const std::set<int> &want_to_read,
const std::map<int, bufferlist> &chunks,
int minimum_to_decode(const set<int> &want_to_read,
const set<int> &available_chunks,
- set<int> *minimum) override;
+ set<int> *minimum);
int minimum_to_decode_with_cost(const set<int> &want_to_read,
const map<int, int> &available,
int decode(const set<int> &want_to_read,
const map<int, bufferlist> &chunks,
- map<int, bufferlist> *decoded) override;
+ map<int, bufferlist> *decoded);
int decode_chunks(const set<int> &want_to_read,
const map<int, bufferlist> &chunks,
map<int, bufferlist> *decoded) override;
void read(
ECBackend *ec,
const hobject_t &hoid, uint64_t off, uint64_t len,
- const set<pg_shard_t> &need,
+ const map<pg_shard_t, vector<pair<int, int>>> &need,
bool attrs) {
list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
to_read.push_back(boost::make_tuple(off, len, 0));
from[i->first.shard].claim(i->second);
}
dout(10) << __func__ << ": " << from << dendl;
- int r = ECUtil::decode(sinfo, ec_impl, from, target);
+ int r;
+ r = ECUtil::decode(sinfo, ec_impl, from, target);
assert(r == 0);
if (attrs) {
op.xattrs.swap(*attrs);
::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
}
- set<pg_shard_t> to_read;
+ map<pg_shard_t, vector<pair<int, int>>> to_read;
int r = get_min_avail_to_read_shards(
op.hoid, want, true, false, &to_read);
if (r != 0) {
++i) {
int r = 0;
ECUtil::HashInfoRef hinfo;
+ int subchunk_size = sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count();
if (!get_parent()->get_pool().allows_ecoverwrites()) {
+
hinfo = get_hash_info(i->first);
if (!hinfo) {
r = -EIO;
}
for (auto j = i->second.begin(); j != i->second.end(); ++j) {
bufferlist bl;
- r = store->read(
- ch,
- ghobject_t(i->first, ghobject_t::NO_GEN, shard),
- j->get<0>(),
- j->get<1>(),
- bl, j->get<2>());
+ if ((op.subchunks.find(i->first)->second.size() == 1) &&
+ (op.subchunks.find(i->first)->second.front().second ==
+ ec_impl->get_sub_chunk_count())) {
+ dout(25) << __func__ << " case1: reading the complete chunk/shard." << dendl;
+ r = store->read(
+ ch,
+ ghobject_t(i->first, ghobject_t::NO_GEN, shard),
+ j->get<0>(),
+ j->get<1>(),
+ bl, j->get<2>()); // Allow EIO return
+ } else {
+ dout(25) << __func__ << " case2: going to do fragmented read." << dendl;
+ for (int m = 0; m < (int)j->get<1>(); m += sinfo.get_chunk_size()) {
+ for (auto &&k:op.subchunks.find(i->first)->second) {
+ bufferlist bl0;
+ r = store->read(
+ ch,
+ ghobject_t(i->first, ghobject_t::NO_GEN, shard),
+ j->get<0>() + m + (k.first)*subchunk_size,
+ (k.second)*subchunk_size,
+ bl0, j->get<2>());
+ bl.claim_append(bl0);
+ }
+ }
+ }
+
if (r < 0) {
get_parent()->clog_error() << "Error " << r
<< " reading object "
have.insert(j->first.shard);
dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
}
- set<int> want_to_read, dummy_minimum;
+ set<int> want_to_read;
+ map<int, vector<pair<int, int>>> dummy_minimum;
get_want_to_read_shards(&want_to_read);
int err;
if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
const set<int> &want,
bool for_recovery,
bool do_redundant_reads,
- set<pg_shard_t> *to_read)
+ map<pg_shard_t, vector<pair<int, int>>> *to_read)
{
// Make sure we don't do redundant reads for recovery
assert(!for_recovery || !do_redundant_reads);
get_all_avail_shards(hoid, have, shards, for_recovery);
- set<int> need;
+ map<int, vector<pair<int, int>>> need;
int r = ec_impl->minimum_to_decode(want, have, &need);
if (r < 0)
return r;
if (do_redundant_reads) {
- need.swap(have);
+ vector<pair<int, int>> subchunks_list;
+ subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
+ for (auto &&i: have) {
+ need[i] = subchunks_list;
+ }
}
if (!to_read)
return 0;
- for (set<int>::iterator i = need.begin();
- i != need.end();
- ++i) {
- assert(shards.count(shard_id_t(*i)));
- to_read->insert(shards[shard_id_t(*i)]);
+ for (auto &&i:need) {
+ assert(shards.count(shard_id_t(i.first)));
+ to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second));
}
return 0;
}
int ECBackend::get_remaining_shards(
const hobject_t &hoid,
const set<int> &avail,
- set<pg_shard_t> *to_read,
+ map<pg_shard_t, vector<pair<int, int>>> *to_read,
bool for_recovery)
{
assert(to_read);
get_all_avail_shards(hoid, have, shards, for_recovery);
+ vector<pair<int, int>> subchunks;
+ subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count()));
for (set<int>::iterator i = have.begin();
i != have.end();
++i) {
assert(shards.count(shard_id_t(*i)));
if (avail.find(*i) == avail.end())
- to_read->insert(shards[shard_id_t(*i)]);
+ to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks));
}
return 0;
}
i != op.to_read.end();
++i) {
bool need_attrs = i->second.want_attrs;
- for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
+
+ for (auto j = i->second.need.begin();
j != i->second.need.end();
++j) {
if (need_attrs) {
- messages[*j].attrs_to_read.insert(i->first);
+ messages[j->first].attrs_to_read.insert(i->first);
need_attrs = false;
}
- op.obj_to_source[i->first].insert(*j);
- op.source_to_obj[*j].insert(i->first);
+ messages[j->first].subchunks[i->first] = j->second;
+ op.obj_to_source[i->first].insert(j->first);
+ op.source_to_obj[j->first].insert(i->first);
}
for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
i->second.to_read.begin();
++j) {
pair<uint64_t, uint64_t> chunk_off_len =
sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
- for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
+ for (auto k = i->second.need.begin();
k != i->second.need.end();
++k) {
- messages[*k].to_read[i->first].push_back(
+ messages[k->first].to_read[i->first].push_back(
boost::make_tuple(
chunk_off_len.first,
chunk_off_len.second,
map<hobject_t, read_request_t> for_read_op;
for (auto &&to_read: reads) {
- set<pg_shard_t> shards;
+ map<pg_shard_t, vector<pair<int, int>>> shards;
int r = get_min_avail_to_read_shards(
to_read.first,
want_to_read,
for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
already_read.insert(i->shard);
dout(10) << __func__ << " have/error shards=" << already_read << dendl;
- set<pg_shard_t> shards;
+ map<pg_shard_t, vector<pair<int, int>>> shards;
int r = get_remaining_shards(hoid, already_read, &shards, rop.for_recovery);
if (r)
return r;
};
struct read_request_t {
const list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
- const set<pg_shard_t> need;
+ const map<pg_shard_t, vector<pair<int, int>>> need;
const bool want_attrs;
GenContext<pair<RecoveryMessages *, read_result_t& > &> *cb;
read_request_t(
const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
- const set<pg_shard_t> &need,
+ const map<pg_shard_t, vector<pair<int, int>>> &need,
bool want_attrs,
GenContext<pair<RecoveryMessages *, read_result_t& > &> *cb)
: to_read(to_read), need(need), want_attrs(want_attrs),
++i) {
have.insert(i->shard);
}
- set<int> min;
+ map<int, vector<pair<int, int>>> min;
return ec_impl->minimum_to_decode(want, have, &min) == 0;
}
};
const set<int> &want, ///< [in] desired shards
bool for_recovery, ///< [in] true if we may use non-acting replicas
bool do_redundant_reads, ///< [in] true if we want to issue redundant reads to reduce latency
- set<pg_shard_t> *to_read ///< [out] shards to read
+ map<pg_shard_t, vector<pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read
); ///< @return error code, 0 on success
int get_remaining_shards(
const hobject_t &hoid,
const set<int> &avail,
- set<pg_shard_t> *to_read,
+ map<pg_shard_t, vector<pair<int, int>>> *to_read,
bool for_recovery);
int objects_get_attrs(
void ECSubRead::encode(bufferlist &bl, uint64_t features) const
{
if ((features & CEPH_FEATURE_OSD_FADVISE_FLAGS) == 0) {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(from, bl);
::encode(tid, bl);
map<hobject_t, list<pair<uint64_t, uint64_t> >> tmp;
}
::encode(tmp, bl);
::encode(attrs_to_read, bl);
+ ::encode(subchunks, bl);
ENCODE_FINISH(bl);
return;
}
- ENCODE_START(2, 2, bl);
+ ENCODE_START(3, 2, bl);
::encode(from, bl);
::encode(tid, bl);
::encode(to_read, bl);
::encode(attrs_to_read, bl);
+ ::encode(subchunks, bl);
ENCODE_FINISH(bl);
}
void ECSubRead::decode(bufferlist::iterator &bl)
{
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
::decode(from, bl);
::decode(tid, bl);
if (struct_v == 1) {
::decode(to_read, bl);
}
::decode(attrs_to_read, bl);
+ if (struct_v > 2 && struct_v > struct_compat) {
+ ::decode(subchunks, bl);
+ } else {
+ for (auto &&i:attrs_to_read) {
+ subchunks[i].push_back(make_pair(0,1));
+ }
+ }
DECODE_FINISH(bl);
}
ceph_tid_t tid;
map<hobject_t, list<boost::tuple<uint64_t, uint64_t, uint32_t> >> to_read;
set<hobject_t> attrs_to_read;
+ map<hobject_t, vector<pair<int, int>>> subchunks;
void encode(bufferlist &bl, uint64_t features) const;
void decode(bufferlist::iterator &bl);
void dump(Formatter *f) const;
#include <errno.h>
#include "include/encoding.h"
#include "ECUtil.h"
+
using namespace std;
int ECUtil::decode(
}
bufferlist bl;
int r = ec_impl->decode_concat(chunks, &bl);
- assert(bl.length() == sinfo.get_stripe_width());
assert(r == 0);
+ assert(bl.length() == sinfo.get_stripe_width());
out->claim_append(bl);
}
return 0;
ErasureCodeInterfaceRef &ec_impl,
map<int, bufferlist> &to_decode,
map<int, bufferlist*> &out) {
- assert(to_decode.size());
-
- uint64_t total_data_size = to_decode.begin()->second.length();
- assert(total_data_size % sinfo.get_chunk_size() == 0);
-
- for (map<int, bufferlist>::iterator i = to_decode.begin();
- i != to_decode.end();
- ++i) {
- assert(i->second.length() == total_data_size);
- }
- if (total_data_size == 0)
- return 0;
+ assert(to_decode.size());
set<int> need;
for (map<int, bufferlist*>::iterator i = out.begin();
need.insert(i->first);
}
- for (uint64_t i = 0; i < total_data_size; i += sinfo.get_chunk_size()) {
+ set<int> avail;
+ for (auto i = to_decode.begin();
+ i != to_decode.end();
+ ++i) {
+ assert(i->second.length() != 0);
+ avail.insert(i->first);
+ }
+
+ map<int, vector<pair<int, int>>> min;
+ assert(ec_impl->minimum_to_decode(need, avail, &min) == 0);
+
+ int chunks_count = 0;
+ map<int, int> repair_data_per_chunk;
+ int subchunk_size = sinfo.get_chunk_size()/ec_impl->get_sub_chunk_count();
+
+ for (map<int, bufferlist>::iterator i = to_decode.begin();
+ i != to_decode.end();
+ ++i) {
+ assert(min.find(i->first) != min.end());
+
+ int repair_subchunk_count = 0;
+ for (auto j = min[i->first].begin();
+ j != min[i->first].end(); ++j) {
+ repair_subchunk_count += j->second;
+ }
+ repair_data_per_chunk[i->first] = repair_subchunk_count*subchunk_size;
+ assert(i->second.length() % repair_data_per_chunk[i->first] == 0);
+
+ if (i == to_decode.begin()) {
+ chunks_count = (int) i->second.length() / repair_data_per_chunk[i->first];
+ }
+ else {
+ assert(chunks_count ==
+ (int) i->second.length() / repair_data_per_chunk[i->first]);
+ }
+ }
+
+ for (int i = 0; i < chunks_count; i++) {
map<int, bufferlist> chunks;
- for (map<int, bufferlist>::iterator j = to_decode.begin();
+ for (auto j = to_decode.begin();
j != to_decode.end();
++j) {
- chunks[j->first].substr_of(j->second, i, sinfo.get_chunk_size());
+ chunks[j->first].substr_of(j->second,
+ i*repair_data_per_chunk[j->first],
+ repair_data_per_chunk[j->first]);
}
map<int, bufferlist> out_bls;
- int r = ec_impl->decode(need, chunks, &out_bls);
+ int r = ec_impl->decode(need, chunks, &out_bls, sinfo.get_chunk_size());
assert(r == 0);
- for (map<int, bufferlist*>::iterator j = out.begin();
- j != out.end();
- ++j) {
+ for (auto j = out.begin(); j != out.end(); ++j) {
assert(out_bls.count(j->first));
assert(out_bls[j->first].length() == sinfo.get_chunk_size());
j->second->claim_append(out_bls[j->first]);
}
}
- for (map<int, bufferlist*>::iterator i = out.begin();
- i != out.end();
- ++i) {
- assert(i->second->length() == total_data_size);
+ for (auto i = out.begin(); i != out.end(); ++i) {
+ assert(i->second->length() == chunks_count*sinfo.get_chunk_size());
}
return 0;
}
int minimum_to_decode(const set<int> &want_to_read,
const set<int> &available_chunks,
- set<int> *minimum) override {
+ set<int> *minimum) {
if (includes(available_chunks.begin(), available_chunks.end(),
want_to_read.begin(), want_to_read.end())) {
*minimum = want_to_read;
int decode(const set<int> &want_to_read,
const map<int, bufferlist> &chunks,
- map<int, bufferlist> *decoded) override {
+ map<int, bufferlist> *decoded) {
//
// All chunks have the same size
//
want_to_read.insert(chunk);
map<int,bufferlist> decoded;
- code = erasure_code->decode(want_to_read, chunks, &decoded);
+ code = erasure_code->decode(want_to_read, chunks, &decoded, 0);
if (code)
return code;
for (set<int>::iterator chunk = want_to_read.begin();
return code;
} else if (erased.size() > 0) {
map<int,bufferlist> decoded;
- code = erasure_code->decode(want_to_read, encoded, &decoded);
+ code = erasure_code->decode(want_to_read, encoded, &decoded, 0);
if (code)
return code;
} else {
chunks.erase(erasure);
}
map<int,bufferlist> decoded;
- code = erasure_code->decode(want_to_read, chunks, &decoded);
+ code = erasure_code->decode(want_to_read, chunks, &decoded, 0);
if (code)
return code;
}
}
map<int,bufferlist> decoded;
- int code = erasure_code->decode(erasures, available, &decoded);
+ int code = erasure_code->decode(erasures, available, &decoded, 0);
if (code)
return code;
for (set<int>::iterator erasure = erasures.begin();