#include <bit>
#include <utility>
+#include <memory>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
return r;
}
+void inline BlueStore::_do_read_and_pad(
+ Collection* c,
+ OnodeRef& o,
+ uint32_t offset,
+ uint32_t length,
+ ceph::buffer::list& bl)
+{
+ int r = _do_read(c, o, offset, length, bl, 0);
+ ceph_assert(r >= 0 && r <= (int)length);
+ size_t zlen = length - r;
+ if (zlen > 0) {
+ bl.append_zero(zlen);
+ logger->inc(l_bluestore_write_pad_bytes, zlen);
+ }
+}
+
int BlueStore::_verify_csum(OnodeRef& o,
const bluestore_blob_t* blob, uint64_t blob_xoffset,
const bufferlist& bl,
if (length == 0) {
return 0;
}
- WriteContext wctx;
- _choose_write_options(c, o, fadvise_flags, &wctx);
- if (wctx.compress) {
- // if we have compression, skip to write_v1
- return _do_write(txc, c, o, offset, length, bl, fadvise_flags);
- }
- if (o->onode.segment_size != 0 && wctx.target_blob_size > o->onode.segment_size) {
- wctx.target_blob_size = o->onode.segment_size;
- }
+
if (bl.length() != length) {
bl.splice(length, bl.length() - length);
}
- BlueStore::Writer wr(this, txc, &wctx, o);
- uint64_t start = p2align(offset, min_alloc_size);
- uint64_t end = p2roundup(offset + length, min_alloc_size);
- wr.left_affected_range = start;
- wr.right_affected_range = end;
- std::tie(wr.left_shard_bound, wr.right_shard_bound) =
- o->extent_map.fault_range_ex(db, start, end - start);
- wr.do_write(offset, bl);
- o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range);
- o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range);
+
+ WriteContext wctx;
+ _choose_write_options(c, o, fadvise_flags, &wctx);
+ if (wctx.compressor) {
+ uint32_t end = offset + length;
+ uint32_t segment_size = o->onode.segment_size;
+ if (segment_size) {
+ // split data into segments
+ // first and last segments will do lookaround scan
+ uint32_t write_offset = offset;
+ while (write_offset != end) {
+ uint32_t this_segment_begin = p2align(write_offset, segment_size);
+ uint32_t this_segment_end = this_segment_begin + segment_size;
+ uint32_t write_length = std::min(this_segment_end, end) - write_offset;
+ bufferlist chunk;
+ chunk.substr_of(bl, 0, write_length);
+ bl.splice(0, write_length);
+ _do_write_v2_compressed(txc, c, o, wctx, write_offset, write_length, chunk,
+ this_segment_begin, this_segment_end);
+ write_offset += write_length;
+ };
+ } else {
+ const uint32_t scan_range = 0x20000; //128kB
+ uint32_t scan_left = offset < scan_range ? 0: offset - scan_range;
+ uint32_t scan_right = end + scan_range;
+ _do_write_v2_compressed(txc, c, o, wctx, offset, length, bl,
+ scan_left, scan_right);
+ }
+ } else {
+ // normal uncompressed path
+ BlueStore::Writer wr(this, txc, &wctx, o);
+ uint64_t start = p2align(offset, min_alloc_size);
+ uint64_t end = p2roundup(offset + length, min_alloc_size);
+ wr.left_affected_range = start;
+ wr.right_affected_range = end;
+ std::tie(wr.left_shard_bound, wr.right_shard_bound) =
+ o->extent_map.fault_range_ex(db, start, end - start);
+ wr.do_write(offset, bl);
+ o->extent_map.dirty_range(wr.left_affected_range, wr.right_affected_range - wr.left_affected_range);
+ o->extent_map.maybe_reshard(wr.left_affected_range, wr.right_affected_range);
+ }
return r;
}
+int BlueStore::_do_write_v2_compressed(
+ TransContext *txc,
+ CollectionRef &c,
+ OnodeRef& o,
+ WriteContext& wctx,
+ uint32_t offset, uint32_t length,
+ ceph::buffer::list& input_bl,
+ uint32_t scan_left, uint32_t scan_right)
+{
+ o->extent_map.fault_range(db, scan_left, scan_right - scan_left);
+ if (!c->estimator) c->estimator.reset(create_estimator());
+ Estimator* estimator = c->estimator.get();
+ Scanner scanner(this);
+ scanner.write_lookaround(o.get(), offset, length, scan_left, scan_right, estimator);
+ std::vector<Estimator::region_t> regions;
+ estimator->get_regions(regions);
+ dout(15) << __func__ << " " << std::hex << offset << "~" << length << " -> ";
+ for (const auto& i : regions) {
+ *_dout << i.offset << "~" << i.length << " ";
+ }
+ *_dout << std::dec << dendl;
+ for (const auto& i : regions) {
+ ceph::buffer::list data_bl;
+ if (i.offset <= offset && offset < i.offset + i.length) {
+ // the starting point is withing the region, so the end must too
+ ceph_assert(offset + length <= i.offset + i.length);
+ if (i.offset < offset) {
+ _do_read_and_pad(c.get(), o, i.offset, offset - i.offset, data_bl);
+ }
+ data_bl.claim_append(input_bl);
+ if (offset + length < i.offset + i.length) {
+ ceph::buffer::list right_bl;
+ _do_read_and_pad(c.get(), o, offset + length,
+ i.offset + i.length - (offset + length), right_bl);
+ data_bl.claim_append(right_bl);
+ }
+ } else {
+ // the starting point is not within region, so the end is not allowed either
+ ceph_assert(offset + length < i.offset || offset + length >= i.offset + i.length);
+ _do_read_and_pad(c.get(), o, i.offset, i.length, data_bl);
+ }
+ ceph_assert(data_bl.length() == i.length);
+ Writer::blob_vec bd;
+ int32_t disk_for_compressed;
+ int32_t disk_for_raw;
+ uint32_t au_size = min_alloc_size;
+ uint32_t max_blob_size = c->pool_opts.value_or(
+ pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, (int64_t)comp_max_blob_size.load());
+ disk_for_compressed = estimator->split_and_compress(wctx.compressor, max_blob_size, data_bl, bd);
+ disk_for_raw = p2roundup(i.offset + i.length, au_size) - p2align(i.offset, au_size);
+ BlueStore::Writer wr(this, txc, &wctx, o);
+ if (disk_for_compressed < disk_for_raw) {
+ wr.do_write_with_blobs(i.offset, i.offset + i.length, i.offset + i.length, bd);
+ } else {
+ wr.do_write(i.offset, data_bl);
+ }
+ }
+ estimator->finish();
+ uint32_t changes_start = regions.front().offset;
+ uint32_t changes_end = regions.back().offset + regions.back().length;
+ o->extent_map.compress_extent_map(changes_start, changes_end - changes_start);
+ o->extent_map.dirty_range(changes_start, changes_end - changes_start);
+ o->extent_map.maybe_reshard(changes_start, changes_end);
+ return 0;
+}
+
int BlueStore::_write(TransContext *txc,
CollectionRef& c,
OnodeRef& o,