From: Casey Bodley Date: Wed, 10 Oct 2018 19:11:58 +0000 (-0400) Subject: rgw: add rgw::putobj::RadosWriter adapter X-Git-Tag: 3.2-0~167^2~23 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=53a22e886298aed09fa27429dc1c19f958491d79;p=ceph-ci.git rgw: add rgw::putobj::RadosWriter adapter implements the DataProcessor interface by writing its buffers with Aio, and tracks the set of successful writes so they can be deleted on failure/cancelation Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cb6083321fb..4661bbe51ed 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -85,6 +85,7 @@ set(librgw_common_srcs rgw_otp.cc rgw_policy_s3.cc rgw_putobj.cc + rgw_putobj_processor.cc rgw_putobj_throttle.cc rgw_quota.cc rgw_rados.cc diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc index 9fe46f1bdda..12fccedd5b6 100644 --- a/src/rgw/rgw_putobj_processor.cc +++ b/src/rgw/rgw_putobj_processor.cc @@ -12,8 +12,11 @@ * */ +#include "rgw_putobj_aio.h" #include "rgw_putobj_processor.h" +#define dout_subsys ceph_subsys_rgw + namespace rgw::putobj { int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) @@ -52,4 +55,112 @@ int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) return processor->process(std::move(data), write_offset); } + +static int process_completed(const ResultList& completed, RawObjSet *written) +{ + std::optional error; + for (auto& r : completed) { + if (r.result >= 0) { + written->insert(r.obj); + } else if (!error) { // record first error code + error = r.result; + } + } + return error.value_or(0); +} + +int RadosWriter::set_stripe_obj(rgw_raw_obj&& obj) +{ + rgw_rados_ref ref; + int r = store->get_raw_obj_ref(obj, &ref); + if (r < 0) { + return r; + } + stripe_obj = std::move(obj); + stripe_ref = std::move(ref); + return 0; +} + +int RadosWriter::process(bufferlist&& bl, uint64_t offset) +{ + bufferlist data = std::move(bl); + const uint64_t cost = data.length(); + if (cost == 0) { // no empty writes, use aio directly for creates + return 0; + } + librados::ObjectWriteOperation op; + if (offset == 0) { + op.write_full(data); + } else { + op.write(offset, data); + } + auto c = aio->submit(stripe_ref, stripe_obj, &op, cost); + return process_completed(c, &written); +} + +int RadosWriter::write_exclusive(const bufferlist& data) +{ + const uint64_t cost = data.length(); + + librados::ObjectWriteOperation op; + op.create(true); // exclusive create + op.write_full(data); + + auto c = aio->submit(stripe_ref, stripe_obj, &op, cost); + auto d = aio->drain(); + c.splice(c.end(), d); + return process_completed(c, &written); +} + +int RadosWriter::drain() +{ + return process_completed(aio->drain(), &written); +} + +RadosWriter::~RadosWriter() +{ + // wait on any outstanding aio completions + process_completed(aio->drain(), &written); + + bool need_to_remove_head = false; + std::optional raw_head; + if (!head_obj.empty()) { + raw_head.emplace(); + store->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head); + } + + /** + * We should delete the object in the "multipart" namespace to avoid race condition. + * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart + * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects + * written by the second upload may be deleted by the first upload. + * details is describled on #11749 + * + * The above comment still stands, but instead of searching for a specific object in the multipart + * namespace, we just make sure that we remove the object that is marked as the head object after + * we remove all the other raw objects. Note that we use different call to remove the head object, + * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme. + */ + for (const auto& obj : written) { + if (raw_head && obj == *raw_head) { + ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl; + need_to_remove_head = true; + continue; + } + + int r = store->delete_raw_obj(obj); + if (r < 0 && r != -ENOENT) { + ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; + } + } + + if (need_to_remove_head) { + ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl; + int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0); + if (r < 0 && r != -ENOENT) { + ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl; + } + } +} + } // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index a567de434a1..2057cc26d51 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -65,4 +65,43 @@ class HeadObjectProcessor : public ObjectProcessor { int process(bufferlist&& data, uint64_t logical_offset) final override; }; + +class Aio; +using RawObjSet = std::set; + +// a data sink that writes to rados objects and deletes them on cancelation +class RadosWriter : public DataProcessor { + Aio *const aio; + RGWRados *const store; + const RGWBucketInfo& bucket_info; + RGWObjectCtx& obj_ctx; + const rgw_obj& head_obj; + rgw_rados_ref stripe_ref; // current stripe ref + rgw_raw_obj stripe_obj; // current stripe object + RawObjSet written; // set of written objects for deletion + + public: + RadosWriter(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info, + RGWObjectCtx& obj_ctx, const rgw_obj& head_obj) + : aio(aio), store(store), bucket_info(bucket_info), + obj_ctx(obj_ctx), head_obj(head_obj) + {} + ~RadosWriter(); + + // change the current stripe object + int set_stripe_obj(rgw_raw_obj&& obj); + + // write the data at the given offset of the current stripe object + int process(bufferlist&& data, uint64_t stripe_offset) override; + + // write the data as an exclusive create and wait for it to complete + int write_exclusive(const bufferlist& data); + + int drain(); + + // when the operation completes successfully, clear the set of written objects + // so they aren't deleted on destruction + void clear_written() { written.clear(); } +}; + } // namespace rgw::putobj