]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add rgw::putobj::RadosWriter adapter
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:11:58 +0000 (15:11 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
implements the DataProcessor interface by writing its buffers with Aio,
and tracks the set of successful writes so they can be deleted on
failure/cancelation

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_putobj_processor.h

index cb6083321fb0c48c5df6226b8b5ffd80798a29ea..4661bbe51ed094c58799682020cc3552d134b70e 100644 (file)
@@ -85,6 +85,7 @@ set(librgw_common_srcs
   rgw_otp.cc
   rgw_policy_s3.cc
   rgw_putobj.cc
+  rgw_putobj_processor.cc
   rgw_putobj_throttle.cc
   rgw_quota.cc
   rgw_rados.cc
index 9fe46f1bddab9e814633d70cfffc5bc0ac221f98..12fccedd5b6e485c4f8ac328cab97628b5a92d2d 100644 (file)
  *
  */
 
+#include "rgw_putobj_aio.h"
 #include "rgw_putobj_processor.h"
 
+#define dout_subsys ceph_subsys_rgw
+
 namespace rgw::putobj {
 
 int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
@@ -52,4 +55,112 @@ int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
   return processor->process(std::move(data), write_offset);
 }
 
+
+static int process_completed(const ResultList& completed, RawObjSet *written)
+{
+  std::optional<int> error;
+  for (auto& r : completed) {
+    if (r.result >= 0) {
+      written->insert(r.obj);
+    } else if (!error) { // record first error code
+      error = r.result;
+    }
+  }
+  return error.value_or(0);
+}
+
+int RadosWriter::set_stripe_obj(rgw_raw_obj&& obj)
+{
+  rgw_rados_ref ref;
+  int r = store->get_raw_obj_ref(obj, &ref);
+  if (r < 0) {
+    return r;
+  }
+  stripe_obj = std::move(obj);
+  stripe_ref = std::move(ref);
+  return 0;
+}
+
+int RadosWriter::process(bufferlist&& bl, uint64_t offset)
+{
+  bufferlist data = std::move(bl);
+  const uint64_t cost = data.length();
+  if (cost == 0) { // no empty writes, use aio directly for creates
+    return 0;
+  }
+  librados::ObjectWriteOperation op;
+  if (offset == 0) {
+    op.write_full(data);
+  } else {
+    op.write(offset, data);
+  }
+  auto c = aio->submit(stripe_ref, stripe_obj, &op, cost);
+  return process_completed(c, &written);
+}
+
+int RadosWriter::write_exclusive(const bufferlist& data)
+{
+  const uint64_t cost = data.length();
+
+  librados::ObjectWriteOperation op;
+  op.create(true); // exclusive create
+  op.write_full(data);
+
+  auto c = aio->submit(stripe_ref, stripe_obj, &op, cost);
+  auto d = aio->drain();
+  c.splice(c.end(), d);
+  return process_completed(c, &written);
+}
+
+int RadosWriter::drain()
+{
+  return process_completed(aio->drain(), &written);
+}
+
+RadosWriter::~RadosWriter()
+{
+  // wait on any outstanding aio completions
+  process_completed(aio->drain(), &written);
+
+  bool need_to_remove_head = false;
+  std::optional<rgw_raw_obj> raw_head;
+  if (!head_obj.empty()) {
+    raw_head.emplace();
+    store->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head);
+  }
+
+  /**
+   * We should delete the object in the "multipart" namespace to avoid race condition.
+   * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
+   * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
+   * written by the second upload may be deleted by the first upload.
+   * details is describled on #11749
+   *
+   * The above comment still stands, but instead of searching for a specific object in the multipart
+   * namespace, we just make sure that we remove the object that is marked as the head object after
+   * we remove all the other raw objects. Note that we use different call to remove the head object,
+   * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
+   */
+  for (const auto& obj : written) {
+    if (raw_head && obj == *raw_head) {
+      ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
+      need_to_remove_head = true;
+      continue;
+    }
+
+    int r = store->delete_raw_obj(obj);
+    if (r < 0 && r != -ENOENT) {
+      ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+    }
+  }
+
+  if (need_to_remove_head) {
+    ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
+    int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0);
+    if (r < 0 && r != -ENOENT) {
+      ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
+    }
+  }
+}
+
 } // namespace rgw::putobj
index a567de434a1b7b80148c616ccfe128967acbc4f6..2057cc26d5155131a2cdccf8afb763c8e2d527b3 100644 (file)
@@ -65,4 +65,43 @@ class HeadObjectProcessor : public ObjectProcessor {
   int process(bufferlist&& data, uint64_t logical_offset) final override;
 };
 
+
+class Aio;
+using RawObjSet = std::set<rgw_raw_obj>;
+
+// a data sink that writes to rados objects and deletes them on cancelation
+class RadosWriter : public DataProcessor {
+  Aio *const aio;
+  RGWRados *const store;
+  const RGWBucketInfo& bucket_info;
+  RGWObjectCtx& obj_ctx;
+  const rgw_obj& head_obj;
+  rgw_rados_ref stripe_ref; // current stripe ref
+  rgw_raw_obj stripe_obj; // current stripe object
+  RawObjSet written; // set of written objects for deletion
+
+ public:
+  RadosWriter(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info,
+              RGWObjectCtx& obj_ctx, const rgw_obj& head_obj)
+    : aio(aio), store(store), bucket_info(bucket_info),
+      obj_ctx(obj_ctx), head_obj(head_obj)
+  {}
+  ~RadosWriter();
+
+  // change the current stripe object
+  int set_stripe_obj(rgw_raw_obj&& obj);
+
+  // write the data at the given offset of the current stripe object
+  int process(bufferlist&& data, uint64_t stripe_offset) override;
+
+  // write the data as an exclusive create and wait for it to complete
+  int write_exclusive(const bufferlist& data);
+
+  int drain();
+
+  // when the operation completes successfully, clear the set of written objects
+  // so they aren't deleted on destruction
+  void clear_written() { written.clear(); }
+};
+
 } // namespace rgw::putobj