#include "common/strtol.h"
#include "ErasureCode.h"
+static const unsigned SIMD_ALIGN = 32;
+
int ErasureCode::chunk_index(unsigned int i) const
{
return chunk_mapping.size() > i ? chunk_mapping[i] : i;
}
int ErasureCode::encode_prepare(const bufferlist &raw,
- bufferlist *prepared) const
+ map<int, bufferlist> &encoded) const
{
unsigned int k = get_data_chunk_count();
unsigned int m = get_chunk_count() - k;
unsigned blocksize = get_chunk_size(raw.length());
- unsigned padded_length = blocksize * k;
- *prepared = raw;
- if (padded_length - raw.length() > 0) {
- bufferptr pad(padded_length - raw.length());
- pad.zero();
- prepared->push_back(pad);
+ unsigned pad_len = blocksize * k - raw.length();
+ unsigned padded_chunks = k - raw.length() / blocksize;
+ bufferlist prepared = raw;
+
+ if (!prepared.is_aligned(SIMD_ALIGN)) {
+ // splice padded chunks off to make the rebuild faster
+ if (padded_chunks)
+ prepared.splice((k - padded_chunks) * blocksize,
+ padded_chunks * blocksize - pad_len);
+ prepared.rebuild_aligned(SIMD_ALIGN);
+ }
+
+ for (unsigned int i = 0; i < k - padded_chunks; i++) {
+ bufferlist &chunk = encoded[chunk_index(i)];
+ chunk.substr_of(prepared, i * blocksize, blocksize);
+ }
+ if (padded_chunks) {
+ unsigned remainder = raw.length() - (k - padded_chunks) * blocksize;
+ bufferptr buf(buffer::create_aligned(blocksize, SIMD_ALIGN));
+
+ raw.copy((k - padded_chunks) * blocksize, remainder, buf.c_str());
+ buf.zero(remainder, blocksize - remainder);
+ encoded[chunk_index(k-padded_chunks)].push_back(buf);
+
+ for (unsigned int i = k - padded_chunks + 1; i < k; i++) {
+ bufferptr buf(buffer::create_aligned(blocksize, SIMD_ALIGN));
+ buf.zero();
+ encoded[chunk_index(i)].push_back(buf);
+ }
+ }
+ for (unsigned int i = k; i < k + m; i++) {
+ bufferlist &chunk = encoded[chunk_index(i)];
+ chunk.push_back(buffer::create_aligned(blocksize, SIMD_ALIGN));
}
- unsigned coding_length = blocksize * m;
- bufferptr coding(buffer::create_page_aligned(coding_length));
- prepared->push_back(coding);
- prepared->rebuild_page_aligned();
+
return 0;
}
unsigned int k = get_data_chunk_count();
unsigned int m = get_chunk_count() - k;
bufferlist out;
- int err = encode_prepare(in, &out);
+ int err = encode_prepare(in, *encoded);
if (err)
return err;
- unsigned blocksize = get_chunk_size(in.length());
- for (unsigned int i = 0; i < k + m; i++) {
- bufferlist &chunk = (*encoded)[chunk_index(i)];
- chunk.substr_of(out, i * blocksize, blocksize);
- }
encode_chunks(want_to_encode, encoded);
for (unsigned int i = 0; i < k + m; i++) {
if (want_to_encode.count(i) == 0)
unsigned blocksize = (*chunks.begin()).second.length();
for (unsigned int i = 0; i < k + m; i++) {
if (chunks.find(i) == chunks.end()) {
- bufferptr ptr(buffer::create_page_aligned(blocksize));
+ bufferptr ptr(buffer::create_aligned(blocksize, SIMD_ALIGN));
(*decoded)[i].push_front(ptr);
} else {
(*decoded)[i] = chunks.find(i)->second;
- (*decoded)[i].rebuild_page_aligned();
+ (*decoded)[i].rebuild_aligned(SIMD_ALIGN);
}
}
return decode_chunks(want_to_read, chunks, decoded);