From 38c2c646da52d48d69831d0eddc49ad3e08bbddb Mon Sep 17 00:00:00 2001 From: Daniel Gryniewicz Date: Wed, 21 Jul 2021 10:56:59 -0400 Subject: [PATCH] RGW - Zipper - Proper Writer API With the implementation of DBStore, it was determined that the API used for writing in Zipper was too tied to RADOS. Implement a clean writing API named Writer. Signed-off-by: Daniel Gryniewicz --- src/rgw/rgw_admin.cc | 4 +- src/rgw/rgw_compression.h | 2 +- src/rgw/rgw_crypt.cc | 2 +- src/rgw/rgw_crypt.h | 2 +- src/rgw/rgw_etag_verifier.cc | 2 +- src/rgw/rgw_etag_verifier.h | 8 +- src/rgw/rgw_file.cc | 9 +- src/rgw/rgw_file.h | 4 +- src/rgw/rgw_op.cc | 72 ++++------ src/rgw/rgw_op.h | 8 +- src/rgw/rgw_putobj.h | 25 +--- src/rgw/rgw_putobj_processor.cc | 159 ++++++++++++++++++---- src/rgw/rgw_putobj_processor.h | 108 +++++++++------ src/rgw/rgw_rados.cc | 17 ++- src/rgw/rgw_rados.h | 2 +- src/rgw/rgw_rest_s3.cc | 8 +- src/rgw/rgw_rest_s3.h | 8 +- src/rgw/rgw_sal.h | 111 +++++++++------ src/rgw/rgw_sal_rados.cc | 194 ++++++++++++++------------- src/rgw/rgw_sal_rados.h | 162 +++++++++++++++++++--- src/rgw/rgw_tools.cc | 14 +- src/rgw/rgw_tools.h | 2 +- src/test/rgw/test_rgw_compression.cc | 2 +- src/test/rgw/test_rgw_crypto.cc | 2 +- src/test/rgw/test_rgw_putobj.cc | 2 +- 25 files changed, 593 insertions(+), 336 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 3a52b17bdf0..a26f828475f 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -6770,7 +6770,7 @@ next: } } if (need_rewrite) { - ret = static_cast(store)->getRados()->rewrite_obj(bucket->get_info(), obj.get(), dpp(), null_yield); + ret = static_cast(store)->getRados()->rewrite_obj(obj.get(), dpp(), null_yield); if (ret < 0) { cerr << "ERROR: object rewrite returned: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -6899,7 +6899,7 @@ next: if (!need_rewrite) { formatter->dump_string("status", "Skipped"); } else { - r = static_cast(store)->getRados()->rewrite_obj(bucket->get_info(), obj.get(), dpp(), null_yield); + r = static_cast(store)->getRados()->rewrite_obj(obj.get(), dpp(), null_yield); if (r == 0) { formatter->dump_string("status", "Success"); } else { diff --git a/src/rgw/rgw_compression.h b/src/rgw/rgw_compression.h index cc9386d2c05..6e5d17c2232 100644 --- a/src/rgw/rgw_compression.h +++ b/src/rgw/rgw_compression.h @@ -49,7 +49,7 @@ class RGWPutObj_Compress : public rgw::putobj::Pipe std::vector blocks; public: RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor, - rgw::putobj::DataProcessor *next) + rgw::sal::DataProcessor *next) : Pipe(next), cct(cct_), compressor(compressor) {} int process(bufferlist&& data, uint64_t logical_offset) override; diff --git a/src/rgw/rgw_crypt.cc b/src/rgw/rgw_crypt.cc index 9f7e6925642..319d06d0c82 100644 --- a/src/rgw/rgw_crypt.cc +++ b/src/rgw/rgw_crypt.cc @@ -791,7 +791,7 @@ int RGWGetObj_BlockDecrypt::flush() { } RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(CephContext* cct, - rgw::putobj::DataProcessor *next, + rgw::sal::DataProcessor *next, std::unique_ptr crypt) : Pipe(next), cct(cct), diff --git a/src/rgw/rgw_crypt.h b/src/rgw/rgw_crypt.h index ff221549d6f..96e52afaa54 100644 --- a/src/rgw/rgw_crypt.h +++ b/src/rgw/rgw_crypt.h @@ -128,7 +128,7 @@ class RGWPutObj_BlockEncrypt : public rgw::putobj::Pipe const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */ public: RGWPutObj_BlockEncrypt(CephContext* cct, - rgw::putobj::DataProcessor *next, + rgw::sal::DataProcessor *next, std::unique_ptr crypt); int process(bufferlist&& data, uint64_t logical_offset) override; diff --git a/src/rgw/rgw_etag_verifier.cc b/src/rgw/rgw_etag_verifier.cc index 6a455e18b23..3a22e7e6b16 100644 --- a/src/rgw/rgw_etag_verifier.cc +++ b/src/rgw/rgw_etag_verifier.cc @@ -8,7 +8,7 @@ namespace rgw::putobj { int create_etag_verifier(const DoutPrefixProvider *dpp, - CephContext* cct, DataProcessor* filter, + CephContext* cct, rgw::sal::DataProcessor* filter, const bufferlist& manifest_bl, const std::optional& compression, etag_verifier_ptr& verifier) diff --git a/src/rgw/rgw_etag_verifier.h b/src/rgw/rgw_etag_verifier.h index 48007cf1699..a94c6065feb 100644 --- a/src/rgw/rgw_etag_verifier.h +++ b/src/rgw/rgw_etag_verifier.h @@ -29,7 +29,7 @@ protected: string calculated_etag; public: - ETagVerifier(CephContext* cct_, rgw::putobj::DataProcessor *next) + ETagVerifier(CephContext* cct_, rgw::sal::DataProcessor *next) : Pipe(next), cct(cct_) {} virtual void calculate_etag() = 0; @@ -40,7 +40,7 @@ public: class ETagVerifier_Atomic : public ETagVerifier { public: - ETagVerifier_Atomic(CephContext* cct_, rgw::putobj::DataProcessor *next) + ETagVerifier_Atomic(CephContext* cct_, rgw::sal::DataProcessor *next) : ETagVerifier(cct_, next) {} int process(bufferlist&& data, uint64_t logical_offset) override; @@ -59,7 +59,7 @@ class ETagVerifier_MPU : public ETagVerifier public: ETagVerifier_MPU(CephContext* cct, std::vector part_ofs, - rgw::putobj::DataProcessor *next) + rgw::sal::DataProcessor *next) : ETagVerifier(cct, next), part_ofs(std::move(part_ofs)) {} @@ -76,7 +76,7 @@ constexpr auto max_etag_verifier_size = std::max( using etag_verifier_ptr = ceph::static_ptr; int create_etag_verifier(const DoutPrefixProvider *dpp, - CephContext* cct, DataProcessor* next, + CephContext* cct, rgw::sal::DataProcessor* next, const bufferlist& manifest_bl, const std::optional& compression, etag_verifier_ptr& verifier); diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index 70cb7738830..d56c6e599fe 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -1825,12 +1825,9 @@ namespace rgw { version_id = state->object->get_instance(); } } - processor.emplace(&*aio, get_store(), state->bucket.get(), - &state->dest_placement, - state->bucket_owner.get_id(), - *static_cast(state->obj_ctx), - state->object->clone(), olh_epoch, state->req_id, - this, state->yield); + processor = get_store()->get_atomic_writer(this, state->yield, state->object->clone(), + state->bucket_owner.get_id(), *state->obj_ctx, + &state->dest_placement, 0, state->req_id); op_ret = processor->prepare(state->yield); if (op_ret < 0) { diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index ed9243b3dd6..fc42887fe24 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -2489,8 +2489,8 @@ public: const std::string& obj_name; RGWFileHandle* rgw_fh; std::optional aio; - std::optional processor; - rgw::putobj::DataProcessor* filter; + std::unique_ptr processor; + rgw::sal::DataProcessor* filter; boost::optional compressor; CompressorRef plugin; buffer::list data; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 0b578d1bab9..02e5ddeaef6 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3733,13 +3733,9 @@ void RGWPutObj::execute(optional_yield y) // create the object processor auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size, s->yield); - using namespace rgw::putobj; - constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor), - sizeof(AtomicObjectProcessor), - sizeof(AppendObjectProcessor)}); - ceph::static_ptr processor; + std::unique_ptr processor; - rgw_placement_rule *pdest_placement; + rgw_placement_rule *pdest_placement = &s->dest_placement; if (multipart) { std::unique_ptr upload; @@ -3758,21 +3754,18 @@ void RGWPutObj::execute(optional_yield y) s->dest_placement = *pdest_placement; pdest_placement = &s->dest_placement; ldpp_dout(this, 20) << "dest_placement for part=" << *pdest_placement << dendl; - processor.emplace( - &*aio, store, s->bucket.get(), pdest_placement, - s->owner.get_id(), obj_ctx, s->object->clone(), - multipart_upload_id, multipart_part_num, multipart_part_str, - this, s->yield); + processor = upload->get_writer(this, s->yield, s->object->clone(), + s->user->get_id(), obj_ctx, pdest_placement, + multipart_part_num, multipart_part_str); } else if(append) { if (s->bucket->versioned()) { op_ret = -ERR_INVALID_BUCKET_STATE; return; } - pdest_placement = &s->dest_placement; - processor.emplace( - &*aio, store, s->bucket.get(), pdest_placement, s->bucket_owner.get_id(), - obj_ctx, s->object->clone(), - s->req_id, position, &cur_accounted_size, this, s->yield); + processor = store->get_append_writer(this, s->yield, s->object->clone(), + s->bucket_owner.get_id(), obj_ctx, + pdest_placement, s->req_id, position, + &cur_accounted_size); } else { if (s->bucket->versioning_enabled()) { if (!version_id.empty()) { @@ -3782,11 +3775,9 @@ void RGWPutObj::execute(optional_yield y) version_id = s->object->get_instance(); } } - pdest_placement = &s->dest_placement; - processor.emplace( - &*aio, store, s->bucket.get(), pdest_placement, - s->bucket_owner.get_id(), obj_ctx, s->object->clone(), - olh_epoch, s->req_id, this, s->yield); + processor = store->get_atomic_writer(this, s->yield, s->object->clone(), + s->bucket_owner.get_id(), obj_ctx, + pdest_placement, olh_epoch, s->req_id); } op_ret = processor->prepare(s->yield); @@ -3824,13 +3815,13 @@ void RGWPutObj::execute(optional_yield y) fst = copy_source_range_fst; // no filters by default - DataProcessor *filter = processor.get(); + rgw::sal::DataProcessor *filter = processor.get(); const auto& compression_type = store->get_zone()->get_params().get_compression_type(*pdest_placement); CompressorRef plugin; boost::optional compressor; - std::unique_ptr encrypt; + std::unique_ptr encrypt; if (!append) { // compression and encryption only apply to full object uploads op_ret = get_encrypt_filter(&encrypt, filter); @@ -4159,21 +4150,19 @@ void RGWPostObj::execute(optional_yield y) auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size, s->yield); - using namespace rgw::putobj; - AtomicObjectProcessor processor(&*aio, store, s->bucket.get(), - &s->dest_placement, - s->bucket_owner.get_id(), - *static_cast(s->obj_ctx), - std::move(obj), 0, s->req_id, this, s->yield); - op_ret = processor.prepare(s->yield); + std::unique_ptr processor; + processor = store->get_atomic_writer(this, s->yield, std::move(obj), + s->bucket_owner.get_id(), *s->obj_ctx, + &s->dest_placement, 0, s->req_id); + op_ret = processor->prepare(s->yield); if (op_ret < 0) { return; } /* No filters by default. */ - DataProcessor *filter = &processor; + rgw::sal::DataProcessor *filter = processor.get(); - std::unique_ptr encrypt; + std::unique_ptr encrypt; op_ret = get_encrypt_filter(&encrypt, filter); if (op_ret < 0) { return; @@ -4274,7 +4263,7 @@ void RGWPostObj::execute(optional_yield y) emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp)); } - op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), attrs, + op_ret = processor->complete(s->obj_size, etag, nullptr, real_time(), attrs, (delete_at ? *delete_at : real_time()), nullptr, nullptr, nullptr, nullptr, nullptr, s->yield); @@ -7144,21 +7133,18 @@ int RGWBulkUploadOp::handle_file(const std::string_view path, rgw_placement_rule dest_placement = s->dest_placement; dest_placement.inherit_from(bucket->get_placement_rule()); - auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size, - s->yield); - - using namespace rgw::putobj; - AtomicObjectProcessor processor(&*aio, store, bucket.get(), &s->dest_placement, bowner.get_id(), - obj_ctx, std::move(obj), 0, s->req_id, this, s->yield); - - op_ret = processor.prepare(s->yield); + std::unique_ptr processor; + processor = store->get_atomic_writer(this, s->yield, std::move(obj), + bowner.get_id(), obj_ctx, + &s->dest_placement, 0, s->req_id); + op_ret = processor->prepare(s->yield); if (op_ret < 0) { ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl; return op_ret; } /* No filters by default. */ - DataProcessor *filter = &processor; + rgw::sal::DataProcessor *filter = processor.get(); const auto& compression_type = store->get_zone()->get_params().get_compression_type( dest_placement); @@ -7250,7 +7236,7 @@ int RGWBulkUploadOp::handle_file(const std::string_view path, } /* Complete the transaction. */ - op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), + op_ret = processor->complete(size, etag, nullptr, ceph::real_time(), attrs, ceph::real_time() /* delete_at */, nullptr, nullptr, nullptr, nullptr, nullptr, s->yield); diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 0c8e8ad40b5..5f2f2f88fed 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1269,8 +1269,8 @@ public: *filter = nullptr; return 0; } - virtual int get_encrypt_filter(std::unique_ptr *filter, - rgw::putobj::DataProcessor *cb) { + virtual int get_encrypt_filter(std::unique_ptr *filter, + rgw::sal::DataProcessor *cb) { return 0; } @@ -1327,8 +1327,8 @@ public: void pre_exec() override; void execute(optional_yield y) override; - virtual int get_encrypt_filter(std::unique_ptr *filter, - rgw::putobj::DataProcessor *cb) { + virtual int get_encrypt_filter(std::unique_ptr *filter, + rgw::sal::DataProcessor *cb) { return 0; } virtual int get_params(optional_yield y) = 0; diff --git a/src/rgw/rgw_putobj.h b/src/rgw/rgw_putobj.h index 4151294d1cb..1c16b22540a 100644 --- a/src/rgw/rgw_putobj.h +++ b/src/rgw/rgw_putobj.h @@ -16,26 +16,15 @@ #pragma once #include "include/buffer.h" +#include "rgw_sal.h" namespace rgw::putobj { -// a simple streaming data processing abstraction -class DataProcessor { - public: - virtual ~DataProcessor() {} - - // consume a bufferlist in its entirety at the given object offset. an - // empty bufferlist is given to request that any buffered data be flushed, - // though this doesn't wait for completions - virtual int process(bufferlist&& data, uint64_t offset) = 0; -}; - // for composing data processors into a pipeline -class Pipe : public DataProcessor { - DataProcessor *next; +class Pipe : public rgw::sal::DataProcessor { + rgw::sal::DataProcessor *next; public: - explicit Pipe() : next(nullptr) {} - explicit Pipe(DataProcessor *next) : next(next) {} + explicit Pipe(rgw::sal::DataProcessor *next) : next(next) {} // passes the data on to the next processor int process(bufferlist&& data, uint64_t offset) override { @@ -48,8 +37,7 @@ class ChunkProcessor : public Pipe { uint64_t chunk_size; bufferlist chunk; // leftover bytes from the last call to process() public: - ChunkProcessor() {} - ChunkProcessor(DataProcessor *next, uint64_t chunk_size) + ChunkProcessor(rgw::sal::DataProcessor *next, uint64_t chunk_size) : Pipe(next), chunk_size(chunk_size) {} @@ -70,8 +58,7 @@ class StripeProcessor : public Pipe { StripeGenerator *gen; std::pair bounds; // bounds of current stripe public: - StripeProcessor() {} - StripeProcessor(DataProcessor *next, StripeGenerator *gen, + StripeProcessor(rgw::sal::DataProcessor *next, StripeGenerator *gen, uint64_t first_stripe_size) : Pipe(next), gen(gen), bounds(0, first_stripe_size) {} diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc index 270ccf0960c..d6bb52b7128 100644 --- a/src/rgw/rgw_putobj_processor.cc +++ b/src/rgw/rgw_putobj_processor.cc @@ -60,6 +60,113 @@ int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) return processor->process(std::move(data), write_offset); } + +static int process_completed(const AioResultList& completed, RawObjSet *written) +{ + std::optional error; + for (auto& r : completed) { + if (r.result >= 0) { + written->insert(r.obj.get_ref().obj); + } else if (!error) { // record first error code + error = r.result; + } + } + return error.value_or(0); +} + +int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj) +{ + stripe_obj = store->svc()->rados->obj(raw_obj); + return stripe_obj.open(dpp); +} + +int RadosWriter::process(bufferlist&& bl, uint64_t offset) +{ + bufferlist data = std::move(bl); + const uint64_t cost = data.length(); + if (cost == 0) { // no empty writes, use aio directly for creates + return 0; + } + librados::ObjectWriteOperation op; + if (offset == 0) { + op.write_full(data); + } else { + op.write(offset, data); + } + constexpr uint64_t id = 0; // unused + auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id); + return process_completed(c, &written); +} + +int RadosWriter::write_exclusive(const bufferlist& data) +{ + const uint64_t cost = data.length(); + + librados::ObjectWriteOperation op; + op.create(true); // exclusive create + op.write_full(data); + + constexpr uint64_t id = 0; // unused + auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id); + auto d = aio->drain(); + c.splice(c.end(), d); + return process_completed(c, &written); +} + +int RadosWriter::drain() +{ + return process_completed(aio->drain(), &written); +} + +RadosWriter::~RadosWriter() +{ + // wait on any outstanding aio completions + process_completed(aio->drain(), &written); + + bool need_to_remove_head = false; + std::optional raw_head; + if (!rgw::sal::Object::empty(head_obj.get())) { + raw_head.emplace(); + rgw::sal::RadosObject* obj = dynamic_cast(head_obj.get()); + obj->get_raw_obj(&*raw_head); + } + + /** + * We should delete the object in the "multipart" namespace to avoid race condition. + * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart + * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects + * written by the second upload may be deleted by the first upload. + * details is describled on #11749 + * + * The above comment still stands, but instead of searching for a specific object in the multipart + * namespace, we just make sure that we remove the object that is marked as the head object after + * we remove all the other raw objects. Note that we use different call to remove the head object, + * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme. + */ + for (const auto& obj : written) { + if (raw_head && obj == *raw_head) { + ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl; + need_to_remove_head = true; + continue; + } + + int r = store->delete_raw_obj(dpp, obj); + if (r < 0 && r != -ENOENT) { + ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; + } + } + + if (need_to_remove_head) { + std::string version_id; + ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl; + int r = head_obj->delete_object(dpp, &obj_ctx, null_yield); + if (r < 0 && r != -ENOENT) { + ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl; + } + } +} + + // advance to the next stripe int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size) { @@ -76,12 +183,12 @@ int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size) if (r < 0) { return r; } - r = writer->set_stripe_obj(stripe_obj); + r = writer.set_stripe_obj(stripe_obj); if (r < 0) { return r; } - chunk = ChunkProcessor(writer.get(), chunk_size); + chunk = ChunkProcessor(&writer, chunk_size); *pstripe_size = manifest_gen.cur_stripe_max_size(); return 0; } @@ -103,15 +210,15 @@ int AtomicObjectProcessor::prepare(optional_yield y) uint64_t chunk_size = 0; uint64_t alignment; - int r = head_obj->get_max_chunk_size(dpp, bucket->get_placement_rule(), + int r = head_obj->get_max_chunk_size(dpp, head_obj->get_bucket()->get_placement_rule(), &max_head_chunk_size, &alignment); if (r < 0) { return r; } bool same_pool = true; - if (bucket->get_placement_rule() != tail_placement_rule) { - if (!head_obj->placement_rules_match(bucket->get_placement_rule(), tail_placement_rule)) { + if (head_obj->get_bucket()->get_placement_rule() != tail_placement_rule) { + if (!head_obj->placement_rules_match(head_obj->get_bucket()->get_placement_rule(), tail_placement_rule)) { same_pool = false; r = head_obj->get_max_chunk_size(dpp, tail_placement_rule, &chunk_size); if (r < 0) { @@ -136,7 +243,7 @@ int AtomicObjectProcessor::prepare(optional_yield y) rgw_obj obj = head_obj->get_obj(); r = manifest_gen.create_begin(store->ctx(), &manifest, - bucket->get_placement_rule(), + head_obj->get_bucket()->get_placement_rule(), &tail_placement_rule, obj.bucket, obj); if (r < 0) { @@ -145,14 +252,14 @@ int AtomicObjectProcessor::prepare(optional_yield y) rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); - r = writer->set_stripe_obj(stripe_obj); + r = writer.set_stripe_obj(stripe_obj); if (r < 0) { return r; } set_head_chunk_size(head_max_size); // initialize the processors - chunk = ChunkProcessor(writer.get(), chunk_size); + chunk = ChunkProcessor(&writer, chunk_size); stripe = StripeProcessor(&chunk, this, head_max_size); return 0; } @@ -169,7 +276,7 @@ int AtomicObjectProcessor::complete(size_t accounted_size, rgw_zone_set *zones_trace, bool *pcanceled, optional_yield y) { - int r = writer->drain(); + int r = writer.drain(); if (r < 0) { return r; } @@ -184,7 +291,7 @@ int AtomicObjectProcessor::complete(size_t accounted_size, std::unique_ptr obj_op = head_obj->get_write_op(&obj_ctx); /* some object types shouldn't be versioned, e.g., multipart parts */ - obj_op->params.versioning_disabled = !bucket->versioning_enabled(); + obj_op->params.versioning_disabled = !head_obj->get_bucket()->versioning_enabled(); obj_op->params.data = &first_chunk; obj_op->params.manifest = &manifest; obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */ @@ -212,7 +319,7 @@ int AtomicObjectProcessor::complete(size_t accounted_size, } if (!obj_op->params.canceled) { // on success, clear the set of objects for deletion - writer->clear_written(); + writer.clear_written(); } if (pcanceled) { *pcanceled = obj_op->params.canceled; @@ -226,7 +333,7 @@ int MultipartObjectProcessor::process_first_chunk(bufferlist&& data, { // write the first chunk of the head object as part of an exclusive create, // then drain to wait for the result in case of EEXIST - int r = writer->write_exclusive(data); + int r = writer.write_exclusive(data); if (r == -EEXIST) { // randomize the oid prefix and reprepare the head/manifest std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32); @@ -239,7 +346,7 @@ int MultipartObjectProcessor::process_first_chunk(bufferlist&& data, return r; } // resubmit the write op on the new head object - r = writer->write_exclusive(data); + r = writer.write_exclusive(data); } if (r < 0) { return r; @@ -265,7 +372,7 @@ int MultipartObjectProcessor::prepare_head() manifest.set_multipart_part_rule(stripe_size, part_num); r = manifest_gen.create_begin(store->ctx(), &manifest, - bucket->get_placement_rule(), + head_obj->get_bucket()->get_placement_rule(), &tail_placement_rule, target_obj->get_bucket()->get_key(), target_obj->get_obj()); @@ -277,14 +384,14 @@ int MultipartObjectProcessor::prepare_head() head_obj->raw_obj_to_obj(stripe_obj); head_obj->set_hash_source(target_obj->get_name()); - r = writer->set_stripe_obj(stripe_obj); + r = writer.set_stripe_obj(stripe_obj); if (r < 0) { return r; } stripe_size = manifest_gen.cur_stripe_max_size(); set_head_chunk_size(stripe_size); - chunk = ChunkProcessor(writer.get(), chunk_size); + chunk = ChunkProcessor(&writer, chunk_size); stripe = StripeProcessor(&chunk, this, stripe_size); return 0; } @@ -308,7 +415,7 @@ int MultipartObjectProcessor::complete(size_t accounted_size, rgw_zone_set *zones_trace, bool *pcanceled, optional_yield y) { - int r = writer->drain(); + int r = writer.drain(); if (r < 0) { return r; } @@ -367,7 +474,7 @@ int MultipartObjectProcessor::complete(size_t accounted_size, encode(info, bl); std::unique_ptr meta_obj = - bucket->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART)); + head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART)); meta_obj->set_in_extra_data(true); r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield); @@ -377,7 +484,7 @@ int MultipartObjectProcessor::complete(size_t accounted_size, if (!obj_op->params.canceled) { // on success, clear the set of objects for deletion - writer->clear_written(); + writer.clear_written(); } if (pcanceled) { *pcanceled = obj_op->params.canceled; @@ -385,9 +492,9 @@ int MultipartObjectProcessor::complete(size_t accounted_size, return 0; } -int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor) +int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::sal::DataProcessor **processor) { - int r = writer->write_exclusive(data); + int r = writer.write_exclusive(data); if (r < 0) { return r; } @@ -458,7 +565,7 @@ int AppendObjectProcessor::prepare(optional_yield y) rgw_obj obj = head_obj->get_obj(); - r = manifest_gen.create_begin(store->ctx(), &manifest, bucket->get_placement_rule(), &tail_placement_rule, obj.bucket, obj); + r = manifest_gen.create_begin(store->ctx(), &manifest, head_obj->get_bucket()->get_placement_rule(), &tail_placement_rule, obj.bucket, obj); if (r < 0) { return r; } @@ -469,7 +576,7 @@ int AppendObjectProcessor::prepare(optional_yield y) if (r < 0) { return r; } - r = writer->set_stripe_obj(std::move(stripe_obj)); + r = writer.set_stripe_obj(std::move(stripe_obj)); if (r < 0) { return r; } @@ -480,7 +587,7 @@ int AppendObjectProcessor::prepare(optional_yield y) set_head_chunk_size(max_head_size); // initialize the processors - chunk = ChunkProcessor(writer.get(), chunk_size); + chunk = ChunkProcessor(&writer, chunk_size); stripe = StripeProcessor(&chunk, this, stripe_size); return 0; @@ -492,7 +599,7 @@ int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, c const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled, optional_yield y) { - int r = writer->drain(); + int r = writer.drain(); if (r < 0) return r; const uint64_t actual_size = get_actual_size(); @@ -554,7 +661,7 @@ int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, c } if (!obj_op->params.canceled) { // on success, clear the set of objects for deletion - writer->clear_written(); + writer.clear_written(); } if (pcanceled) { *pcanceled = obj_op->params.canceled; diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index 5d450b098de..77cb4f7c2f8 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -25,43 +25,30 @@ namespace rgw { +namespace sal { + class RadosStore; +} + class Aio; namespace putobj { -// a data consumer that writes an object in a bucket -class ObjectProcessor : public DataProcessor { - public: - // prepare to start processing object data - virtual int prepare(optional_yield y) = 0; - - // complete the operation and make its result visible to clients - virtual int complete(size_t accounted_size, const std::string& etag, - ceph::real_time *mtime, ceph::real_time set_mtime, - std::map& attrs, - ceph::real_time delete_at, - const char *if_match, const char *if_nomatch, - const std::string *user_data, - rgw_zone_set *zones_trace, bool *canceled, - optional_yield y) = 0; -}; - // an object processor with special handling for the first chunk of the head. // the virtual process_first_chunk() function returns a processor to handle the // rest of the object -class HeadObjectProcessor : public ObjectProcessor { +class HeadObjectProcessor : public rgw::sal::ObjectProcessor { uint64_t head_chunk_size; // buffer to capture the first chunk of the head object bufferlist head_data; // initialized after process_first_chunk() to process everything else - DataProcessor *processor = nullptr; + rgw::sal::DataProcessor *processor = nullptr; uint64_t data_offset = 0; // maximum offset of data written (ie compressed) protected: uint64_t get_actual_size() const { return data_offset; } // process the first chunk of data and return a processor for the rest virtual int process_first_chunk(bufferlist&& data, - DataProcessor **processor) = 0; + rgw::sal::DataProcessor **processor) = 0; public: HeadObjectProcessor(uint64_t head_chunk_size) : head_chunk_size(head_chunk_size) @@ -74,19 +61,62 @@ class HeadObjectProcessor : public ObjectProcessor { int process(bufferlist&& data, uint64_t logical_offset) final override; }; +using RawObjSet = std::set; + +// a data sink that writes to rados objects and deletes them on cancelation +class RadosWriter : public rgw::sal::DataProcessor { + Aio *const aio; + rgw::sal::RadosStore *const store; + RGWObjectCtx& obj_ctx; + std::unique_ptr head_obj; + RGWSI_RADOS::Obj stripe_obj; // current stripe object + RawObjSet written; // set of written objects for deletion + const DoutPrefixProvider *dpp; + optional_yield y; + + public: + RadosWriter(Aio *aio, rgw::sal::RadosStore *store, + RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, + const DoutPrefixProvider *dpp, optional_yield y) + : aio(aio), store(store), + obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y) + {} + RadosWriter(RadosWriter&& r) + : aio(r.aio), store(r.store), + obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y) + {} + + ~RadosWriter(); + + // change the current stripe object + int set_stripe_obj(const rgw_raw_obj& obj); + + // write the data at the given offset of the current stripe object + int process(bufferlist&& data, uint64_t stripe_offset) override; + + // write the data as an exclusive create and wait for it to complete + int write_exclusive(const bufferlist& data); + + int drain(); + + // when the operation completes successfully, clear the set of written objects + // so they aren't deleted on destruction + void clear_written() { written.clear(); } + +}; + // a rados object processor that stripes according to RGWObjManifest class ManifestObjectProcessor : public HeadObjectProcessor, public StripeGenerator { protected: - rgw::sal::Store* const store; - rgw::sal::Bucket* bucket; + rgw::sal::RadosStore* const store; rgw_placement_rule tail_placement_rule; rgw_user owner; RGWObjectCtx& obj_ctx; std::unique_ptr head_obj; - std::unique_ptr writer; + RadosWriter writer; RGWObjManifest manifest; RGWObjManifest::generator manifest_gen; ChunkProcessor chunk; @@ -97,20 +127,17 @@ class ManifestObjectProcessor : public HeadObjectProcessor, int next(uint64_t offset, uint64_t *stripe_size) override; public: - ManifestObjectProcessor(Aio *aio, rgw::sal::Store* store, - rgw::sal::Bucket* bucket, + ManifestObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, const rgw_placement_rule *ptail_placement_rule, const rgw_user& owner, RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, const DoutPrefixProvider* dpp, optional_yield y) : HeadObjectProcessor(0), - store(store), bucket(bucket), + store(store), owner(owner), obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), - dpp(dpp) { - writer = store->get_writer(aio, bucket, obj_ctx, head_obj->clone(), dpp, y); - chunk = ChunkProcessor(writer.get(), 0); - stripe = StripeProcessor(&chunk, this, 0); + writer(aio, store, obj_ctx, head_obj->clone(), dpp, y), + chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) { if (ptail_placement_rule) { tail_placement_rule = *ptail_placement_rule; } @@ -137,10 +164,9 @@ class AtomicObjectProcessor : public ManifestObjectProcessor { const std::string unique_tag; bufferlist first_chunk; // written with the head in complete() - int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override; public: - AtomicObjectProcessor(Aio *aio, rgw::sal::Store* store, - rgw::sal::Bucket* bucket, + AtomicObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, const rgw_placement_rule *ptail_placement_rule, const rgw_user& owner, RGWObjectCtx& obj_ctx, @@ -148,7 +174,7 @@ class AtomicObjectProcessor : public ManifestObjectProcessor { std::optional olh_epoch, const std::string& unique_tag, const DoutPrefixProvider *dpp, optional_yield y) - : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule, + : ManifestObjectProcessor(aio, store, ptail_placement_rule, owner, obj_ctx, std::move(_head_obj), dpp, y), olh_epoch(olh_epoch), unique_tag(unique_tag) {} @@ -180,19 +206,18 @@ class MultipartObjectProcessor : public ManifestObjectProcessor { // write the first chunk and wait on aio->drain() for its completion. // on EEXIST, retry with random prefix - int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override; // prepare the head stripe and manifest int prepare_head(); public: - MultipartObjectProcessor(Aio *aio, rgw::sal::Store* store, - rgw::sal::Bucket* bucket, + MultipartObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, const rgw_placement_rule *ptail_placement_rule, const rgw_user& owner, RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, const std::string& upload_id, uint64_t part_num, const std::string& part_num_str, const DoutPrefixProvider *dpp, optional_yield y) - : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule, + : ManifestObjectProcessor(aio, store, ptail_placement_rule, owner, obj_ctx, std::move(_head_obj), dpp, y), target_obj(head_obj->clone()), upload_id(upload_id), part_num(part_num), part_num_str(part_num_str), @@ -224,18 +249,17 @@ class MultipartObjectProcessor : public ManifestObjectProcessor { RGWObjManifest *cur_manifest; - int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override; public: - AppendObjectProcessor(Aio *aio, rgw::sal::Store* store, - rgw::sal::Bucket* bucket, + AppendObjectProcessor(Aio *aio, rgw::sal::RadosStore* store, const rgw_placement_rule *ptail_placement_rule, const rgw_user& owner, RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, const std::string& unique_tag, uint64_t position, uint64_t *cur_accounted_size, const DoutPrefixProvider *dpp, optional_yield y) - : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule, + : ManifestObjectProcessor(aio, store, ptail_placement_rule, owner, obj_ctx, std::move(_head_obj), dpp, y), position(position), cur_size(0), cur_accounted_size(cur_accounted_size), unique_tag(unique_tag), cur_manifest(nullptr) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index e1b31cfd998..5f96a5b329c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3328,13 +3328,13 @@ class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB const DoutPrefixProvider *dpp; CephContext* cct; rgw_obj obj; - rgw::putobj::DataProcessor *filter; + rgw::sal::DataProcessor *filter; boost::optional& compressor; bool try_etag_verify; rgw::putobj::etag_verifier_ptr etag_verifier; boost::optional buffering; CompressorRef& plugin; - rgw::putobj::ObjectProcessor *processor; + rgw::sal::ObjectProcessor *processor; void (*progress_cb)(off_t, void *); void *progress_data; bufferlist extra_data_bl, manifest_bl; @@ -3352,7 +3352,7 @@ public: CephContext* cct, CompressorRef& plugin, boost::optional& compressor, - rgw::putobj::ObjectProcessor *p, + rgw::sal::ObjectProcessor *p, void (*_progress_cb)(off_t, void *), void *_progress_data, std::function&)> _attrs_handler) : @@ -3557,12 +3557,11 @@ static void set_copy_attrs(map& src_attrs, } } -int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y) +int RGWRados::rewrite_obj(rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y) { RGWObjectCtx rctx(this->store); - rgw::sal::RadosBucket bucket(store, dest_bucket_info); - return obj->copy_obj_data(rctx, &bucket, obj, 0, NULL, dpp, y); + return obj->copy_obj_data(rctx, obj->get_bucket(), obj, 0, NULL, dpp, y); } struct obj_time_weight { @@ -3841,7 +3840,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AtomicObjectProcessor processor(&aio, this->store, dest_bucket, nullptr, user_id, + AtomicObjectProcessor processor(&aio, this->store, nullptr, user_id, obj_ctx, dest_obj->clone(), olh_epoch, tag, dpp, null_yield); RGWRESTConn *conn; @@ -4510,7 +4509,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, using namespace rgw::putobj; // do not change the null_yield in the initialization of this AtomicObjectProcessor // it causes crashes in the ragweed tests - AtomicObjectProcessor processor(&aio, this->store, bucket, &dest_placement, + AtomicObjectProcessor processor(&aio, this->store, &dest_placement, bucket->get_info().owner, obj_ctx, dest_obj->clone(), olh_epoch, tag, dpp, null_yield); @@ -6146,7 +6145,7 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64 return 0; } RGWRados *store = target->get_store(); - BucketShard *bs; + BucketShard *bs = nullptr; int ret = get_bucket_shard(&bs, dpp); if (ret < 0) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index a94bf6c9fb0..6df125049d9 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1104,7 +1104,7 @@ public: D3nDataCache* d3n_data_cache{nullptr}; - int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y); + int rewrite_obj(rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y); int stat_remote_obj(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 03608d9e09d..6597eb4e78b 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -2572,8 +2572,8 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter( } int RGWPutObj_ObjStore_S3::get_encrypt_filter( - std::unique_ptr *filter, - rgw::putobj::DataProcessor *cb) + std::unique_ptr *filter, + rgw::sal::DataProcessor *cb) { int res = 0; if (!multipart_upload_id.empty()) { @@ -3125,8 +3125,8 @@ done: } int RGWPostObj_ObjStore_S3::get_encrypt_filter( - std::unique_ptr *filter, - rgw::putobj::DataProcessor *cb) + std::unique_ptr *filter, + rgw::sal::DataProcessor *cb) { std::unique_ptr block_crypt; int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt, diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index cc562ece807..7e2d787cf97 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -277,8 +277,8 @@ public: int get_data(bufferlist& bl) override; void send_response() override; - int get_encrypt_filter(std::unique_ptr *filter, - rgw::putobj::DataProcessor *cb) override; + int get_encrypt_filter(std::unique_ptr *filter, + rgw::sal::DataProcessor *cb) override; int get_decrypt_filter(std::unique_ptr* filter, RGWGetObj_Filter* cb, map& attrs, @@ -316,8 +316,8 @@ public: void send_response() override; int get_data(ceph::bufferlist& bl, bool& again) override; - int get_encrypt_filter(std::unique_ptr *filter, - rgw::putobj::DataProcessor *cb) override; + int get_encrypt_filter(std::unique_ptr *filter, + rgw::sal::DataProcessor *cb) override; }; class RGWDeleteObj_ObjStore_S3 : public RGWDeleteObj_ObjStore { diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 79bced546b9..a48b09353da 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -17,7 +17,6 @@ #include "rgw_user.h" #include "rgw_notify_event_type.h" -#include "rgw_putobj.h" class RGWGetDataCB; struct RGWObjState; @@ -118,6 +117,34 @@ enum AttrsMod { ATTRSMOD_MERGE = 2 }; +// a simple streaming data processing abstraction +class DataProcessor { + public: + virtual ~DataProcessor() {} + + // consume a bufferlist in its entirety at the given object offset. an + // empty bufferlist is given to request that any buffered data be flushed, + // though this doesn't wait for completions + virtual int process(bufferlist&& data, uint64_t offset) = 0; +}; + +// a data consumer that writes an object in a bucket +class ObjectProcessor : public DataProcessor { + public: + // prepare to start processing object data + virtual int prepare(optional_yield y) = 0; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) = 0; +}; + /** * Base class for AIO completions */ @@ -176,9 +203,6 @@ class Store { virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0; virtual std::unique_ptr get_gc_chain(rgw::sal::Object* obj) = 0; - virtual std::unique_ptr get_writer(Aio* aio, rgw::sal::Bucket* bucket, - RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, - const DoutPrefixProvider* dpp, optional_yield y) = 0; virtual RGWLC* get_rgwlc(void) = 0; virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0; virtual int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) = 0; @@ -232,6 +256,21 @@ class Store { const std::string& tenant, vector>& providers) = 0; virtual std::unique_ptr get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) = 0; + virtual std::unique_ptr get_append_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) = 0; + virtual std::unique_ptr get_atomic_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) = 0; virtual void finalize(void) = 0; @@ -848,6 +887,14 @@ public: virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0; + virtual std::unique_ptr get_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) = 0; + friend inline ostream& operator<<(ostream& out, const MultipartUpload& u) { out << u.get_meta(); if (!u.get_upload_id().empty()) @@ -953,45 +1000,29 @@ protected: virtual void delete_inline(const DoutPrefixProvider *dpp, const std::string& tag) = 0; }; -using RawObjSet = std::set; - -class Writer : public rgw::putobj::DataProcessor { +class Writer : public ObjectProcessor { protected: - Aio* const aio; - rgw::sal::Bucket* bucket; - RGWObjectCtx& obj_ctx; - std::unique_ptr head_obj; - RawObjSet written; // set of written objects for deletion const DoutPrefixProvider* dpp; - optional_yield y; - - public: - Writer(Aio* aio, - rgw::sal::Bucket* bucket, - RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, - const DoutPrefixProvider* dpp, optional_yield y) - : aio(aio), bucket(bucket), - obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y) - {} - Writer(Writer&& r) - : aio(r.aio), bucket(r.bucket), - obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y) - {} - - ~Writer() = default; - - // change the current stripe object - virtual int set_stripe_obj(const rgw_raw_obj& obj) = 0; - - // write the data as an exclusive create and wait for it to complete - virtual int write_exclusive(const bufferlist& data) = 0; - - virtual int drain() = 0; - - // when the operation completes successfully, clear the set of written objects - // so they aren't deleted on destruction - virtual void clear_written() { written.clear(); } +public: + Writer(const DoutPrefixProvider *_dpp, optional_yield y) : dpp(_dpp) {} + virtual ~Writer() = default; + + // prepare to start processing object data + virtual int prepare(optional_yield y) = 0; + + // Process a bufferlist + virtual int process(bufferlist&& data, uint64_t offset) = 0; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) = 0; }; class Zone { diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index 94b0d04e739..8b5ff1fbc92 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -28,6 +28,7 @@ #include "rgw_multi.h" #include "rgw_acl_s3.h" #include "rgw_aio.h" +#include "rgw_aio_throttle.h" #include "rgw_zone.h" #include "rgw_rest_conn.h" @@ -115,19 +116,6 @@ static int rgw_op_get_bucket_policy_from_attr(const DoutPrefixProvider* dpp, return 0; } -static int process_completed(const AioResultList& completed, RawObjSet* written) -{ - std::optional error; - for (auto& r : completed) { - if (r.result >= 0) { - written->insert(r.obj.get_ref().obj); - } else if (!error) { // record first error code - error = r.result; - } - } - return error.value_or(0); -} - int RadosCompletions::drain() { int ret = 0; @@ -1199,13 +1187,6 @@ std::unique_ptr RadosStore::get_gc_chain(rgw::sal::Object* obj) return std::unique_ptr(new RadosGCChain(this, obj)); } -std::unique_ptr RadosStore::get_writer(Aio* aio, rgw::sal::Bucket* bucket, - RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, - const DoutPrefixProvider* dpp, optional_yield y) -{ - return std::unique_ptr(new RadosWriter(aio, this, bucket, obj_ctx, std::move(_head_obj), dpp, y)); -} - int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) { return rados->delete_raw_obj(dpp, obj); @@ -1471,6 +1452,40 @@ std::unique_ptr RadosStore::get_multipart_upload(Bucket* bucket return std::unique_ptr(new RadosMultipartUpload(this, bucket, oid, upload_id, mtime)); } +std::unique_ptr RadosStore::get_append_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) +{ + auto aio = rgw::make_throttle(ctx()->_conf->rgw_put_obj_min_window_size, y); + return std::unique_ptr(new RadosAppendWriter(dpp, y, + std::move(_head_obj), + this, std::move(aio), owner, obj_ctx, + ptail_placement_rule, + unique_tag, position, + cur_accounted_size)); +} + +std::unique_ptr RadosStore::get_atomic_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) +{ + auto aio = rgw::make_throttle(ctx()->_conf->rgw_put_obj_min_window_size, y); + return std::unique_ptr(new RadosAtomicWriter(dpp, y, + std::move(_head_obj), + this, std::move(aio), owner, obj_ctx, + ptail_placement_rule, + olh_epoch, unique_tag)); +} + int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx* ioctx) { return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx); @@ -2478,6 +2493,21 @@ int RadosMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield return 0; } +std::unique_ptr RadosMultipartUpload::get_writer( + const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) +{ + auto aio = rgw::make_throttle(store->ctx()->_conf->rgw_put_obj_min_window_size, y); + return std::unique_ptr(new RadosMultipartWriter(dpp, y, this, + std::move(_head_obj), store, std::move(aio), owner, + obj_ctx, ptail_placement_rule, part_num, part_num_str)); +} + MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) : lock(lock_name) { @@ -2636,97 +2666,73 @@ void RadosGCChain::delete_inline(const DoutPrefixProvider *dpp, const std::strin store->getRados()->delete_objs_inline(dpp, chain, tag); } -int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj) +int RadosAtomicWriter::prepare(optional_yield y) { - stripe_obj = store->svc()->rados->obj(raw_obj); - return stripe_obj.open(dpp); + return processor.prepare(y); } -int RadosWriter::process(bufferlist&& bl, uint64_t offset) +int RadosAtomicWriter::process(bufferlist&& data, uint64_t offset) { - bufferlist data = std::move(bl); - const uint64_t cost = data.length(); - if (cost == 0) { // no empty writes, use aio directly for creates - return 0; - } - librados::ObjectWriteOperation op; - if (offset == 0) { - op.write_full(data); - } else { - op.write(offset, data); - } - constexpr uint64_t id = 0; // unused - auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id); - return process_completed(c, &written); + return processor.process(std::move(data), offset); } -int RadosWriter::write_exclusive(const bufferlist& data) +int RadosAtomicWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) { - const uint64_t cost = data.length(); - - librados::ObjectWriteOperation op; - op.create(true); // exclusive create - op.write_full(data); - - constexpr uint64_t id = 0; // unused - auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id); - auto d = aio->drain(); - c.splice(c.end(), d); - return process_completed(c, &written); + return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, + if_match, if_nomatch, user_data, zones_trace, canceled, y); } -int RadosWriter::drain() +int RadosAppendWriter::prepare(optional_yield y) { - return process_completed(aio->drain(), &written); + return processor.prepare(y); } -RadosWriter::~RadosWriter() +int RadosAppendWriter::process(bufferlist&& data, uint64_t offset) { - // wait on any outstanding aio completions - process_completed(aio->drain(), &written); - - bool need_to_remove_head = false; - std::optional raw_head; - if (!rgw::sal::Object::empty(head_obj.get())) { - raw_head.emplace(); - dynamic_cast(head_obj.get())->get_raw_obj(&*raw_head); - } + return processor.process(std::move(data), offset); +} - /** - * We should delete the object in the "multipart" namespace to avoid race condition. - * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart - * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects - * written by the second upload may be deleted by the first upload. - * details is describled on #11749 - * - * The above comment still stands, but instead of searching for a specific object in the multipart - * namespace, we just make sure that we remove the object that is marked as the head object after - * we remove all the other raw objects. Note that we use different call to remove the head object, - * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme. - */ - for (const auto& obj : written) { - if (raw_head && obj == *raw_head) { - ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl; - need_to_remove_head = true; - continue; - } +int RadosAppendWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) +{ + return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, + if_match, if_nomatch, user_data, zones_trace, canceled, y); +} - int r = store->delete_raw_obj(dpp, obj); - if (r < 0 && r != -ENOENT) { - ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; - } - } +int RadosMultipartWriter::prepare(optional_yield y) +{ + return processor.prepare(y); +} - if (need_to_remove_head) { - ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl; - std::unique_ptr del_op = head_obj->get_delete_op(&obj_ctx); - del_op->params.bucket_owner = bucket->get_acl_owner(); +int RadosMultipartWriter::process(bufferlist&& data, uint64_t offset) +{ + return processor.process(std::move(data), offset); +} - int r = del_op->delete_obj(dpp, y); - if (r < 0 && r != -ENOENT) { - ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl; - } - } +int RadosMultipartWriter::complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) +{ + return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, + if_match, if_nomatch, user_data, zones_trace, canceled, y); } const RGWZoneGroup& RadosZone::get_zonegroup() diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index dc057ffba40..7d51e24637e 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -21,6 +21,7 @@ #include "rgw_oidc_provider.h" #include "rgw_role.h" #include "rgw_multi.h" +#include "rgw_putobj_processor.h" #include "services/svc_tier_rados.h" #include "cls/lock/cls_lock_client.h" @@ -424,9 +425,6 @@ class RadosStore : public Store { virtual std::unique_ptr get_completions(void) override; virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override; virtual std::unique_ptr get_gc_chain(rgw::sal::Object* obj) override; - virtual std::unique_ptr get_writer(Aio* aio, rgw::sal::Bucket* bucket, - RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, - const DoutPrefixProvider* dpp, optional_yield y) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } virtual int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) override; @@ -480,6 +478,21 @@ class RadosStore : public Store { const std::string& tenant, vector>& providers) override; virtual std::unique_ptr get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) override; + virtual std::unique_ptr get_append_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) override; + virtual std::unique_ptr get_atomic_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) override; virtual void finalize(void) override; @@ -556,6 +569,13 @@ public: uint64_t& accounted_size, bool& compressed, RGWCompressionInfo& cs_info, off_t& ofs) override; virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override; + virtual std::unique_ptr get_writer(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, + const std::string& part_num_str) override; }; class MPRadosSerializer : public MPSerializer { @@ -632,30 +652,130 @@ protected: virtual void delete_inline(const DoutPrefixProvider *dpp, const std::string& tag) override; }; -class RadosWriter : public Writer { +class RadosAtomicWriter : public Writer { +protected: rgw::sal::RadosStore* store; - RGWSI_RADOS::Obj stripe_obj; // current stripe object - - public: - RadosWriter(Aio* aio, rgw::sal::RadosStore* _store, - rgw::sal::Bucket* bucket, - RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, - const DoutPrefixProvider* dpp, optional_yield y) - : Writer(aio, bucket, obj_ctx, std::move(_head_obj), dpp, y), store(_store) - {} + std::unique_ptr aio; + rgw::putobj::AtomicObjectProcessor processor; - ~RadosWriter(); +public: + RadosAtomicWriter(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + RadosStore* _store, std::unique_ptr _aio, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t olh_epoch, + const std::string& unique_tag) : + Writer(dpp, y), + store(_store), + aio(std::move(_aio)), + processor(&*aio, store, + ptail_placement_rule, owner, obj_ctx, + std::move(_head_obj), olh_epoch, unique_tag, + dpp, y) + {} + ~RadosAtomicWriter() = default; + + // prepare to start processing object data + virtual int prepare(optional_yield y) override; + + // Process a bufferlist + virtual int process(bufferlist&& data, uint64_t offset) override; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) override; +}; - // change the current stripe object - virtual int set_stripe_obj(const rgw_raw_obj& obj) override; +class RadosAppendWriter : public Writer { +protected: + rgw::sal::RadosStore* store; + std::unique_ptr aio; + rgw::putobj::AppendObjectProcessor processor; - // write the data at the given offset of the current stripe object - virtual int process(bufferlist&& data, uint64_t stripe_offset) override; +public: + RadosAppendWriter(const DoutPrefixProvider *dpp, + optional_yield y, + std::unique_ptr _head_obj, + RadosStore* _store, std::unique_ptr _aio, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + const std::string& unique_tag, + uint64_t position, + uint64_t *cur_accounted_size) : + Writer(dpp, y), + store(_store), + aio(std::move(_aio)), + processor(&*aio, store, + ptail_placement_rule, owner, obj_ctx, + std::move(_head_obj), unique_tag, position, + cur_accounted_size, dpp, y) + {} + ~RadosAppendWriter() = default; + + // prepare to start processing object data + virtual int prepare(optional_yield y) override; + + // Process a bufferlist + virtual int process(bufferlist&& data, uint64_t offset) override; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) override; +}; - // write the data as an exclusive create and wait for it to complete - virtual int write_exclusive(const bufferlist& data) override; +class RadosMultipartWriter : public Writer { +protected: + rgw::sal::RadosStore* store; + std::unique_ptr aio; + rgw::putobj::MultipartObjectProcessor processor; - virtual int drain() override; +public: + RadosMultipartWriter(const DoutPrefixProvider *dpp, + optional_yield y, MultipartUpload* upload, + std::unique_ptr _head_obj, + RadosStore* _store, std::unique_ptr _aio, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_placement_rule *ptail_placement_rule, + uint64_t part_num, const std::string& part_num_str) : + Writer(dpp, y), + store(_store), + aio(std::move(_aio)), + processor(&*aio, store, + ptail_placement_rule, owner, obj_ctx, + std::move(_head_obj), upload->get_upload_id(), + part_num, part_num_str, dpp, y) + {} + ~RadosMultipartWriter() = default; + + // prepare to start processing object data + virtual int prepare(optional_yield y) override; + + // Process a bufferlist + virtual int process(bufferlist&& data, uint64_t offset) override; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled, + optional_yield y) override; }; class RadosLuaScriptManager : public LuaScriptManager { diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 1a3d33f7f52..be95c472e40 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -488,16 +488,16 @@ int RGWDataAccess::Object::put(bufferlist& data, string req_id = store->zone_unique_id(store->get_new_req_id()); - using namespace rgw::putobj; - AtomicObjectProcessor processor(&aio, store, b.get(), nullptr, - owner.get_id(), obj_ctx, std::move(obj), olh_epoch, - req_id, dpp, y); + std::unique_ptr processor; + processor = store->get_atomic_writer(dpp, y, std::move(obj), + owner.get_id(), obj_ctx, + nullptr, olh_epoch, req_id); - int ret = processor.prepare(y); + int ret = processor->prepare(y); if (ret < 0) return ret; - DataProcessor *filter = &processor; + rgw::sal::DataProcessor *filter = processor.get(); CompressorRef plugin; boost::optional compressor; @@ -570,7 +570,7 @@ int RGWDataAccess::Object::put(bufferlist& data, puser_data = &(*user_data); } - return processor.complete(obj_size, etag, + return processor->complete(obj_size, etag, &mtime, mtime, attrs, delete_at, nullptr, nullptr, diff --git a/src/rgw/rgw_tools.h b/src/rgw/rgw_tools.h index 218a8c01bd3..d0ecfe7bb3a 100644 --- a/src/rgw/rgw_tools.h +++ b/src/rgw/rgw_tools.h @@ -194,7 +194,7 @@ public: ceph::real_time mtime; string etag; - std::optional olh_epoch; + uint64_t olh_epoch{0}; ceph::real_time delete_at; std::optional user_data; diff --git a/src/test/rgw/test_rgw_compression.cc b/src/test/rgw/test_rgw_compression.cc index b1e3403281a..81b213e24b3 100644 --- a/src/test/rgw/test_rgw_compression.cc +++ b/src/test/rgw/test_rgw_compression.cc @@ -48,7 +48,7 @@ public: } }; -class ut_put_sink: public rgw::putobj::DataProcessor +class ut_put_sink: public rgw::sal::DataProcessor { bufferlist sink; public: diff --git a/src/test/rgw/test_rgw_crypto.cc b/src/test/rgw/test_rgw_crypto.cc index b0dbabfd4e2..edf24f41bb6 100644 --- a/src/test/rgw/test_rgw_crypto.cc +++ b/src/test/rgw/test_rgw_crypto.cc @@ -44,7 +44,7 @@ public: } }; -class ut_put_sink: public rgw::putobj::DataProcessor +class ut_put_sink: public rgw::sal::DataProcessor { std::stringstream sink; public: diff --git a/src/test/rgw/test_rgw_putobj.cc b/src/test/rgw/test_rgw_putobj.cc index 91661f45055..f26a9c2d385 100644 --- a/src/test/rgw/test_rgw_putobj.cc +++ b/src/test/rgw/test_rgw_putobj.cc @@ -32,7 +32,7 @@ inline std::ostream& operator<<(std::ostream& out, const Op& op) { return out << "{off=" << op.offset << " data='" << op.data << "'}"; } -struct MockProcessor : rgw::putobj::DataProcessor { +struct MockProcessor : rgw::sal::DataProcessor { std::vector ops; int process(bufferlist&& data, uint64_t offset) override { -- 2.39.5