]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
RGW - Zipper - Proper Writer API
authorDaniel Gryniewicz <dang@redhat.com>
Wed, 21 Jul 2021 14:56:59 +0000 (10:56 -0400)
committerDaniel Gryniewicz <dang@redhat.com>
Fri, 30 Jul 2021 16:47:32 +0000 (12:47 -0400)
With the implementation of DBStore, it was determined that the API used
for writing in Zipper was too tied to RADOS.  Implement a clean writing
API named Writer.

Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
25 files changed:
src/rgw/rgw_admin.cc
src/rgw/rgw_compression.h
src/rgw/rgw_crypt.cc
src/rgw/rgw_crypt.h
src/rgw/rgw_etag_verifier.cc
src/rgw/rgw_etag_verifier.h
src/rgw/rgw_file.cc
src/rgw/rgw_file.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_putobj.h
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_putobj_processor.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/rgw/rgw_sal.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h
src/rgw/rgw_tools.cc
src/rgw/rgw_tools.h
src/test/rgw/test_rgw_compression.cc
src/test/rgw/test_rgw_crypto.cc
src/test/rgw/test_rgw_putobj.cc

index 3a52b17bdf0f9c42c882e358707f6c099b905694..a26f828475f2f4f11e66322e84ff7b3f32c98c94 100644 (file)
@@ -6770,7 +6770,7 @@ next:
       }
     }
     if (need_rewrite) {
-      ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(bucket->get_info(), obj.get(), dpp(), null_yield);
+      ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(obj.get(), dpp(), null_yield);
       if (ret < 0) {
         cerr << "ERROR: object rewrite returned: " << cpp_strerror(-ret) << std::endl;
         return -ret;
@@ -6899,7 +6899,7 @@ next:
           if (!need_rewrite) {
             formatter->dump_string("status", "Skipped");
           } else {
-            r = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(bucket->get_info(), obj.get(), dpp(), null_yield);
+            r = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(obj.get(), dpp(), null_yield);
             if (r == 0) {
               formatter->dump_string("status", "Success");
             } else {
index cc9386d2c05da72c86439c76331c326eafe6e573..6e5d17c2232abfaf4b95a46d6e9ef9679f781ece 100644 (file)
@@ -49,7 +49,7 @@ class RGWPutObj_Compress : public rgw::putobj::Pipe
   std::vector<compression_block> blocks;
 public:
   RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
-                     rgw::putobj::DataProcessor *next)
+                     rgw::sal::DataProcessor *next)
     : Pipe(next), cct(cct_), compressor(compressor) {}
 
   int process(bufferlist&& data, uint64_t logical_offset) override;
index 9f7e692564218e62d524dece6f1e6521683ca8cf..319d06d0c8254e0f4dd4a321b0b8fb9ff279f942 100644 (file)
@@ -791,7 +791,7 @@ int RGWGetObj_BlockDecrypt::flush() {
 }
 
 RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(CephContext* cct,
-                                               rgw::putobj::DataProcessor *next,
+                                               rgw::sal::DataProcessor *next,
                                                std::unique_ptr<BlockCrypt> crypt)
   : Pipe(next),
     cct(cct),
index ff221549d6fde83a0f05aca08d4ffcf53020d29c..96e52afaa54b14051c6c02efe08984aaebfb44d3 100644 (file)
@@ -128,7 +128,7 @@ class RGWPutObj_BlockEncrypt : public rgw::putobj::Pipe
   const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
 public:
   RGWPutObj_BlockEncrypt(CephContext* cct,
-                         rgw::putobj::DataProcessor *next,
+                         rgw::sal::DataProcessor *next,
                          std::unique_ptr<BlockCrypt> crypt);
 
   int process(bufferlist&& data, uint64_t logical_offset) override;
index 6a455e18b23112eccc2bb4b2cddb6b7e74afd594..3a22e7e6b16bc59ca0d57e08d118d4628089eada 100644 (file)
@@ -8,7 +8,7 @@
 namespace rgw::putobj {
 
 int create_etag_verifier(const DoutPrefixProvider *dpp, 
-                         CephContext* cct, DataProcessor* filter,
+                         CephContext* cct, rgw::sal::DataProcessor* filter,
                          const bufferlist& manifest_bl,
                          const std::optional<RGWCompressionInfo>& compression,
                          etag_verifier_ptr& verifier)
index 48007cf169984d614bc16c0befc8317b25281c38..a94c6065feb8bee0888b10de51fb11068eeb90e9 100644 (file)
@@ -29,7 +29,7 @@ protected:
   string calculated_etag;
 
 public:
-  ETagVerifier(CephContext* cct_, rgw::putobj::DataProcessor *next)
+  ETagVerifier(CephContext* cct_, rgw::sal::DataProcessor *next)
     : Pipe(next), cct(cct_) {}
 
   virtual void calculate_etag() = 0;
@@ -40,7 +40,7 @@ public:
 class ETagVerifier_Atomic : public ETagVerifier
 {
 public:
-  ETagVerifier_Atomic(CephContext* cct_, rgw::putobj::DataProcessor *next)
+  ETagVerifier_Atomic(CephContext* cct_, rgw::sal::DataProcessor *next)
     : ETagVerifier(cct_, next) {}
 
   int process(bufferlist&& data, uint64_t logical_offset) override;
@@ -59,7 +59,7 @@ class ETagVerifier_MPU : public ETagVerifier
 public:
   ETagVerifier_MPU(CephContext* cct,
                              std::vector<uint64_t> part_ofs,
-                             rgw::putobj::DataProcessor *next)
+                             rgw::sal::DataProcessor *next)
     : ETagVerifier(cct, next),
       part_ofs(std::move(part_ofs))
   {}
@@ -76,7 +76,7 @@ constexpr auto max_etag_verifier_size = std::max(
 using etag_verifier_ptr = ceph::static_ptr<ETagVerifier, max_etag_verifier_size>;
 
 int create_etag_verifier(const DoutPrefixProvider *dpp, 
-                         CephContext* cct, DataProcessor* next,
+                         CephContext* cct, rgw::sal::DataProcessor* next,
                          const bufferlist& manifest_bl,
                          const std::optional<RGWCompressionInfo>& compression,
                          etag_verifier_ptr& verifier);
index 70cb773883057d79a02cbd0fce78dc6206d02d1b..d56c6e599fe74c7b8828fdbfca23dfcf9d7e333f 100644 (file)
@@ -1825,12 +1825,9 @@ namespace rgw {
         version_id = state->object->get_instance();
       }
     }
-    processor.emplace(&*aio, get_store(), state->bucket.get(),
-                      &state->dest_placement,
-                      state->bucket_owner.get_id(),
-                      *static_cast<RGWObjectCtx *>(state->obj_ctx),
-                      state->object->clone(), olh_epoch, state->req_id,
-                     this, state->yield);
+    processor = get_store()->get_atomic_writer(this, state->yield, state->object->clone(),
+                                        state->bucket_owner.get_id(), *state->obj_ctx,
+                                        &state->dest_placement, 0, state->req_id);
 
     op_ret = processor->prepare(state->yield);
     if (op_ret < 0) {
index ed9243b3dd618ce3d30c99843e5f04842709953f..fc42887fe24d5c99f0cf0ad2b1113c017a1f08c6 100644 (file)
@@ -2489,8 +2489,8 @@ public:
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
   std::optional<rgw::BlockingAioThrottle> aio;
-  std::optional<rgw::putobj::AtomicObjectProcessor> processor;
-  rgw::putobj::DataProcessor* filter;
+  std::unique_ptr<rgw::sal::Writer> processor;
+  rgw::sal::DataProcessor* filter;
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
   buffer::list data;
index 0b578d1bab93ea8a4ee1b4bb79343c59df612eb5..02e5ddeaef60f8903aaa52c74ea69b9e932ef1bc 100644 (file)
@@ -3733,13 +3733,9 @@ void RGWPutObj::execute(optional_yield y)
   // create the object processor
   auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
                                 s->yield);
-  using namespace rgw::putobj;
-  constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
-                                               sizeof(AtomicObjectProcessor),
-                                               sizeof(AppendObjectProcessor)});
-  ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
+  std::unique_ptr<rgw::sal::Writer> processor;
 
-  rgw_placement_rule *pdest_placement;
+  rgw_placement_rule *pdest_placement = &s->dest_placement;
 
   if (multipart) {
     std::unique_ptr<rgw::sal::MultipartUpload> upload;
@@ -3758,21 +3754,18 @@ void RGWPutObj::execute(optional_yield y)
     s->dest_placement = *pdest_placement;
     pdest_placement = &s->dest_placement;
     ldpp_dout(this, 20) << "dest_placement for part=" << *pdest_placement << dendl;
-    processor.emplace<MultipartObjectProcessor>(
-        &*aio, store, s->bucket.get(), pdest_placement,
-        s->owner.get_id(), obj_ctx, s->object->clone(),
-        multipart_upload_id, multipart_part_num, multipart_part_str,
-        this, s->yield);
+    processor = upload->get_writer(this, s->yield, s->object->clone(),
+                                  s->user->get_id(), obj_ctx, pdest_placement,
+                                  multipart_part_num, multipart_part_str);
   } else if(append) {
     if (s->bucket->versioned()) {
       op_ret = -ERR_INVALID_BUCKET_STATE;
       return;
     }
-    pdest_placement = &s->dest_placement;
-    processor.emplace<AppendObjectProcessor>(
-            &*aio, store, s->bucket.get(), pdest_placement, s->bucket_owner.get_id(),
-           obj_ctx, s->object->clone(),
-            s->req_id, position, &cur_accounted_size, this, s->yield);
+    processor = store->get_append_writer(this, s->yield, s->object->clone(),
+                                        s->bucket_owner.get_id(), obj_ctx,
+                                        pdest_placement, s->req_id, position,
+                                        &cur_accounted_size);
   } else {
     if (s->bucket->versioning_enabled()) {
       if (!version_id.empty()) {
@@ -3782,11 +3775,9 @@ void RGWPutObj::execute(optional_yield y)
         version_id = s->object->get_instance();
       }
     }
-    pdest_placement = &s->dest_placement;
-    processor.emplace<AtomicObjectProcessor>(
-        &*aio, store, s->bucket.get(), pdest_placement,
-        s->bucket_owner.get_id(), obj_ctx, s->object->clone(),
-       olh_epoch, s->req_id, this, s->yield);
+    processor = store->get_atomic_writer(this, s->yield, s->object->clone(),
+                                        s->bucket_owner.get_id(), obj_ctx,
+                                        pdest_placement, olh_epoch, s->req_id);
   }
 
   op_ret = processor->prepare(s->yield);
@@ -3824,13 +3815,13 @@ void RGWPutObj::execute(optional_yield y)
   fst = copy_source_range_fst;
 
   // no filters by default
-  DataProcessor *filter = processor.get();
+  rgw::sal::DataProcessor *filter = processor.get();
 
   const auto& compression_type = store->get_zone()->get_params().get_compression_type(*pdest_placement);
   CompressorRef plugin;
   boost::optional<RGWPutObj_Compress> compressor;
 
-  std::unique_ptr<DataProcessor> encrypt;
+  std::unique_ptr<rgw::sal::DataProcessor> encrypt;
 
   if (!append) { // compression and encryption only apply to full object uploads
     op_ret = get_encrypt_filter(&encrypt, filter);
@@ -4159,21 +4150,19 @@ void RGWPostObj::execute(optional_yield y)
     auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
                                   s->yield);
 
-    using namespace rgw::putobj;
-    AtomicObjectProcessor processor(&*aio, store, s->bucket.get(),
-                                    &s->dest_placement,
-                                    s->bucket_owner.get_id(),
-                                    *static_cast<RGWObjectCtx*>(s->obj_ctx),
-                                    std::move(obj), 0, s->req_id, this, s->yield);
-    op_ret = processor.prepare(s->yield);
+    std::unique_ptr<rgw::sal::Writer> processor;
+    processor = store->get_atomic_writer(this, s->yield, std::move(obj),
+                                        s->bucket_owner.get_id(), *s->obj_ctx,
+                                        &s->dest_placement, 0, s->req_id);
+    op_ret = processor->prepare(s->yield);
     if (op_ret < 0) {
       return;
     }
 
     /* No filters by default. */
-    DataProcessor *filter = &processor;
+    rgw::sal::DataProcessor *filter = processor.get();
 
-    std::unique_ptr<DataProcessor> encrypt;
+    std::unique_ptr<rgw::sal::DataProcessor> encrypt;
     op_ret = get_encrypt_filter(&encrypt, filter);
     if (op_ret < 0) {
       return;
@@ -4274,7 +4263,7 @@ void RGWPostObj::execute(optional_yield y)
       emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
     }
 
-    op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), attrs,
+    op_ret = processor->complete(s->obj_size, etag, nullptr, real_time(), attrs,
                                 (delete_at ? *delete_at : real_time()),
                                 nullptr, nullptr, nullptr, nullptr, nullptr,
                                 s->yield);
@@ -7144,21 +7133,18 @@ int RGWBulkUploadOp::handle_file(const std::string_view path,
   rgw_placement_rule dest_placement = s->dest_placement;
   dest_placement.inherit_from(bucket->get_placement_rule());
 
-  auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
-                                s->yield);
-
-  using namespace rgw::putobj;
-  AtomicObjectProcessor processor(&*aio, store, bucket.get(), &s->dest_placement, bowner.get_id(),
-                                  obj_ctx, std::move(obj), 0, s->req_id, this, s->yield);
-
-  op_ret = processor.prepare(s->yield);
+  std::unique_ptr<rgw::sal::Writer> processor;
+  processor = store->get_atomic_writer(this, s->yield, std::move(obj),
+                                      bowner.get_id(), obj_ctx,
+                                      &s->dest_placement, 0, s->req_id);
+  op_ret = processor->prepare(s->yield);
   if (op_ret < 0) {
     ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl;
     return op_ret;
   }
 
   /* No filters by default. */
-  DataProcessor *filter = &processor;
+  rgw::sal::DataProcessor *filter = processor.get();
 
   const auto& compression_type = store->get_zone()->get_params().get_compression_type(
       dest_placement);
@@ -7250,7 +7236,7 @@ int RGWBulkUploadOp::handle_file(const std::string_view path,
   }
 
   /* Complete the transaction. */
-  op_ret = processor.complete(size, etag, nullptr, ceph::real_time(),
+  op_ret = processor->complete(size, etag, nullptr, ceph::real_time(),
                               attrs, ceph::real_time() /* delete_at */,
                               nullptr, nullptr, nullptr, nullptr, nullptr,
                               s->yield);
index 0c8e8ad40b52af2d817d85bdb419ddeefe8ba1b2..5f2f2f88fed3d6d94e445b5b75dcc5e19b18e56c 100644 (file)
@@ -1269,8 +1269,8 @@ public:
     *filter = nullptr;
     return 0;
   }
-  virtual int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
-                                 rgw::putobj::DataProcessor *cb) {
+  virtual int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+                                 rgw::sal::DataProcessor *cb) {
     return 0;
   }
 
@@ -1327,8 +1327,8 @@ public:
   void pre_exec() override;
   void execute(optional_yield y) override;
 
-  virtual int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
-                                 rgw::putobj::DataProcessor *cb) {
+  virtual int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+                                 rgw::sal::DataProcessor *cb) {
     return 0;
   }
   virtual int get_params(optional_yield y) = 0;
index 4151294d1cb200516224fafb10edf5db6b1d6a25..1c16b22540a36c57c8e0c47116bea866f4873827 100644 (file)
 #pragma once
 
 #include "include/buffer.h"
+#include "rgw_sal.h"
 
 namespace rgw::putobj {
 
-// a simple streaming data processing abstraction
-class DataProcessor {
- public:
-  virtual ~DataProcessor() {}
-
-  // consume a bufferlist in its entirety at the given object offset. an
-  // empty bufferlist is given to request that any buffered data be flushed,
-  // though this doesn't wait for completions
-  virtual int process(bufferlist&& data, uint64_t offset) = 0;
-};
-
 // for composing data processors into a pipeline
-class Pipe : public DataProcessor {
-  DataProcessor *next;
+class Pipe : public rgw::sal::DataProcessor {
+  rgw::sal::DataProcessor *next;
  public:
-  explicit Pipe() : next(nullptr) {}
-  explicit Pipe(DataProcessor *next) : next(next) {}
+  explicit Pipe(rgw::sal::DataProcessor *next) : next(next) {}
 
   // passes the data on to the next processor
   int process(bufferlist&& data, uint64_t offset) override {
@@ -48,8 +37,7 @@ class ChunkProcessor : public Pipe {
   uint64_t chunk_size;
   bufferlist chunk; // leftover bytes from the last call to process()
  public:
-  ChunkProcessor() {}
-  ChunkProcessor(DataProcessor *next, uint64_t chunk_size)
+  ChunkProcessor(rgw::sal::DataProcessor *next, uint64_t chunk_size)
     : Pipe(next), chunk_size(chunk_size)
   {}
 
@@ -70,8 +58,7 @@ class StripeProcessor : public Pipe {
   StripeGenerator *gen;
   std::pair<uint64_t, uint64_t> bounds; // bounds of current stripe
  public:
-  StripeProcessor() {}
-  StripeProcessor(DataProcessor *next, StripeGenerator *gen,
+  StripeProcessor(rgw::sal::DataProcessor *next, StripeGenerator *gen,
                   uint64_t first_stripe_size)
     : Pipe(next), gen(gen), bounds(0, first_stripe_size)
   {}
index 270ccf0960c3a3fb26f605c50838088036ba6e39..d6bb52b71285756d1b8df2e32f815be6d7d0693b 100644 (file)
@@ -60,6 +60,113 @@ int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
   return processor->process(std::move(data), write_offset);
 }
 
+
+static int process_completed(const AioResultList& completed, RawObjSet *written)
+{
+  std::optional<int> error;
+  for (auto& r : completed) {
+    if (r.result >= 0) {
+      written->insert(r.obj.get_ref().obj);
+    } else if (!error) { // record first error code
+      error = r.result;
+    }
+  }
+  return error.value_or(0);
+}
+
+int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
+{
+  stripe_obj = store->svc()->rados->obj(raw_obj);
+  return stripe_obj.open(dpp);
+}
+
+int RadosWriter::process(bufferlist&& bl, uint64_t offset)
+{
+  bufferlist data = std::move(bl);
+  const uint64_t cost = data.length();
+  if (cost == 0) { // no empty writes, use aio directly for creates
+    return 0;
+  }
+  librados::ObjectWriteOperation op;
+  if (offset == 0) {
+    op.write_full(data);
+  } else {
+    op.write(offset, data);
+  }
+  constexpr uint64_t id = 0; // unused
+  auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
+  return process_completed(c, &written);
+}
+
+int RadosWriter::write_exclusive(const bufferlist& data)
+{
+  const uint64_t cost = data.length();
+
+  librados::ObjectWriteOperation op;
+  op.create(true); // exclusive create
+  op.write_full(data);
+
+  constexpr uint64_t id = 0; // unused
+  auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
+  auto d = aio->drain();
+  c.splice(c.end(), d);
+  return process_completed(c, &written);
+}
+
+int RadosWriter::drain()
+{
+  return process_completed(aio->drain(), &written);
+}
+
+RadosWriter::~RadosWriter()
+{
+  // wait on any outstanding aio completions
+  process_completed(aio->drain(), &written);
+
+  bool need_to_remove_head = false;
+  std::optional<rgw_raw_obj> raw_head;
+  if (!rgw::sal::Object::empty(head_obj.get())) {
+    raw_head.emplace();
+    rgw::sal::RadosObject* obj = dynamic_cast<rgw::sal::RadosObject*>(head_obj.get());
+    obj->get_raw_obj(&*raw_head);
+  }
+
+  /**
+   * We should delete the object in the "multipart" namespace to avoid race condition.
+   * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
+   * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
+   * written by the second upload may be deleted by the first upload.
+   * details is describled on #11749
+   *
+   * The above comment still stands, but instead of searching for a specific object in the multipart
+   * namespace, we just make sure that we remove the object that is marked as the head object after
+   * we remove all the other raw objects. Note that we use different call to remove the head object,
+   * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
+   */
+  for (const auto& obj : written) {
+    if (raw_head && obj == *raw_head) {
+      ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
+      need_to_remove_head = true;
+      continue;
+    }
+
+    int r = store->delete_raw_obj(dpp, obj);
+    if (r < 0 && r != -ENOENT) {
+      ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+    }
+  }
+
+  if (need_to_remove_head) {
+    std::string version_id;
+    ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
+    int r = head_obj->delete_object(dpp, &obj_ctx, null_yield);
+    if (r < 0 && r != -ENOENT) {
+      ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
+    }
+  }
+}
+
+
 // advance to the next stripe
 int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
 {
@@ -76,12 +183,12 @@ int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
   if (r < 0) {
     return r;
   }
-  r = writer->set_stripe_obj(stripe_obj);
+  r = writer.set_stripe_obj(stripe_obj);
   if (r < 0) {
     return r;
   }
 
-  chunk = ChunkProcessor(writer.get(), chunk_size);
+  chunk = ChunkProcessor(&writer, chunk_size);
   *pstripe_size = manifest_gen.cur_stripe_max_size();
   return 0;
 }
@@ -103,15 +210,15 @@ int AtomicObjectProcessor::prepare(optional_yield y)
   uint64_t chunk_size = 0;
   uint64_t alignment;
 
-  int r = head_obj->get_max_chunk_size(dpp, bucket->get_placement_rule(),
+  int r = head_obj->get_max_chunk_size(dpp, head_obj->get_bucket()->get_placement_rule(),
                                       &max_head_chunk_size, &alignment);
   if (r < 0) {
     return r;
   }
 
   bool same_pool = true;
-  if (bucket->get_placement_rule() != tail_placement_rule) {
-    if (!head_obj->placement_rules_match(bucket->get_placement_rule(), tail_placement_rule)) {
+  if (head_obj->get_bucket()->get_placement_rule() != tail_placement_rule) {
+    if (!head_obj->placement_rules_match(head_obj->get_bucket()->get_placement_rule(), tail_placement_rule)) {
       same_pool = false;
       r = head_obj->get_max_chunk_size(dpp, tail_placement_rule, &chunk_size);
       if (r < 0) {
@@ -136,7 +243,7 @@ int AtomicObjectProcessor::prepare(optional_yield y)
   rgw_obj obj = head_obj->get_obj();
 
   r = manifest_gen.create_begin(store->ctx(), &manifest,
-                                bucket->get_placement_rule(),
+                                head_obj->get_bucket()->get_placement_rule(),
                                 &tail_placement_rule,
                                 obj.bucket, obj);
   if (r < 0) {
@@ -145,14 +252,14 @@ int AtomicObjectProcessor::prepare(optional_yield y)
 
   rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
 
-  r = writer->set_stripe_obj(stripe_obj);
+  r = writer.set_stripe_obj(stripe_obj);
   if (r < 0) {
     return r;
   }
 
   set_head_chunk_size(head_max_size);
   // initialize the processors
-  chunk = ChunkProcessor(writer.get(), chunk_size);
+  chunk = ChunkProcessor(&writer, chunk_size);
   stripe = StripeProcessor(&chunk, this, head_max_size);
   return 0;
 }
@@ -169,7 +276,7 @@ int AtomicObjectProcessor::complete(size_t accounted_size,
                                     rgw_zone_set *zones_trace,
                                     bool *pcanceled, optional_yield y)
 {
-  int r = writer->drain();
+  int r = writer.drain();
   if (r < 0) {
     return r;
   }
@@ -184,7 +291,7 @@ int AtomicObjectProcessor::complete(size_t accounted_size,
   std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
 
   /* some object types shouldn't be versioned, e.g., multipart parts */
-  obj_op->params.versioning_disabled = !bucket->versioning_enabled();
+  obj_op->params.versioning_disabled = !head_obj->get_bucket()->versioning_enabled();
   obj_op->params.data = &first_chunk;
   obj_op->params.manifest = &manifest;
   obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */
@@ -212,7 +319,7 @@ int AtomicObjectProcessor::complete(size_t accounted_size,
   }
   if (!obj_op->params.canceled) {
     // on success, clear the set of objects for deletion
-    writer->clear_written();
+    writer.clear_written();
   }
   if (pcanceled) {
     *pcanceled = obj_op->params.canceled;
@@ -226,7 +333,7 @@ int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
 {
   // write the first chunk of the head object as part of an exclusive create,
   // then drain to wait for the result in case of EEXIST
-  int r = writer->write_exclusive(data);
+  int r = writer.write_exclusive(data);
   if (r == -EEXIST) {
     // randomize the oid prefix and reprepare the head/manifest
     std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32);
@@ -239,7 +346,7 @@ int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
       return r;
     }
     // resubmit the write op on the new head object
-    r = writer->write_exclusive(data);
+    r = writer.write_exclusive(data);
   }
   if (r < 0) {
     return r;
@@ -265,7 +372,7 @@ int MultipartObjectProcessor::prepare_head()
   manifest.set_multipart_part_rule(stripe_size, part_num);
 
   r = manifest_gen.create_begin(store->ctx(), &manifest,
-                               bucket->get_placement_rule(),
+                               head_obj->get_bucket()->get_placement_rule(),
                                &tail_placement_rule,
                                target_obj->get_bucket()->get_key(),
                                target_obj->get_obj());
@@ -277,14 +384,14 @@ int MultipartObjectProcessor::prepare_head()
   head_obj->raw_obj_to_obj(stripe_obj);
   head_obj->set_hash_source(target_obj->get_name());
 
-  r = writer->set_stripe_obj(stripe_obj);
+  r = writer.set_stripe_obj(stripe_obj);
   if (r < 0) {
     return r;
   }
   stripe_size = manifest_gen.cur_stripe_max_size();
   set_head_chunk_size(stripe_size);
 
-  chunk = ChunkProcessor(writer.get(), chunk_size);
+  chunk = ChunkProcessor(&writer, chunk_size);
   stripe = StripeProcessor(&chunk, this, stripe_size);
   return 0;
 }
@@ -308,7 +415,7 @@ int MultipartObjectProcessor::complete(size_t accounted_size,
                                        rgw_zone_set *zones_trace,
                                        bool *pcanceled, optional_yield y)
 {
-  int r = writer->drain();
+  int r = writer.drain();
   if (r < 0) {
     return r;
   }
@@ -367,7 +474,7 @@ int MultipartObjectProcessor::complete(size_t accounted_size,
   encode(info, bl);
 
   std::unique_ptr<rgw::sal::Object> meta_obj =
-    bucket->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
+    head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
   meta_obj->set_in_extra_data(true);
 
   r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield);
@@ -377,7 +484,7 @@ int MultipartObjectProcessor::complete(size_t accounted_size,
 
   if (!obj_op->params.canceled) {
     // on success, clear the set of objects for deletion
-    writer->clear_written();
+    writer.clear_written();
   }
   if (pcanceled) {
     *pcanceled = obj_op->params.canceled;
@@ -385,9 +492,9 @@ int MultipartObjectProcessor::complete(size_t accounted_size,
   return 0;
 }
 
-int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor)
+int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::sal::DataProcessor **processor)
 {
-  int r = writer->write_exclusive(data);
+  int r = writer.write_exclusive(data);
   if (r < 0) {
     return r;
   }
@@ -458,7 +565,7 @@ int AppendObjectProcessor::prepare(optional_yield y)
 
   rgw_obj obj = head_obj->get_obj();
 
-  r = manifest_gen.create_begin(store->ctx(), &manifest, bucket->get_placement_rule(), &tail_placement_rule, obj.bucket, obj);
+  r = manifest_gen.create_begin(store->ctx(), &manifest, head_obj->get_bucket()->get_placement_rule(), &tail_placement_rule, obj.bucket, obj);
   if (r < 0) {
     return r;
   }
@@ -469,7 +576,7 @@ int AppendObjectProcessor::prepare(optional_yield y)
   if (r < 0) {
     return r;
   }
-  r = writer->set_stripe_obj(std::move(stripe_obj));
+  r = writer.set_stripe_obj(std::move(stripe_obj));
   if (r < 0) {
     return r;
   }
@@ -480,7 +587,7 @@ int AppendObjectProcessor::prepare(optional_yield y)
   set_head_chunk_size(max_head_size);
 
   // initialize the processors
-  chunk = ChunkProcessor(writer.get(), chunk_size);
+  chunk = ChunkProcessor(&writer, chunk_size);
   stripe = StripeProcessor(&chunk, this, stripe_size);
 
   return 0;
@@ -492,7 +599,7 @@ int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, c
                                     const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled,
                                     optional_yield y)
 {
-  int r = writer->drain();
+  int r = writer.drain();
   if (r < 0)
     return r;
   const uint64_t actual_size = get_actual_size();
@@ -554,7 +661,7 @@ int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, c
   }
   if (!obj_op->params.canceled) {
     // on success, clear the set of objects for deletion
-    writer->clear_written();
+    writer.clear_written();
   }
   if (pcanceled) {
     *pcanceled = obj_op->params.canceled;
index 5d450b098de728b31dccafa38c6435c32b22f552..77cb4f7c2f88a4fee77b7ee096d2aa1288dcd75f 100644 (file)
 
 namespace rgw {
 
+namespace sal {
+  class RadosStore;
+}
+
 class Aio;
 
 namespace putobj {
 
-// a data consumer that writes an object in a bucket
-class ObjectProcessor : public DataProcessor {
- public:
-  // prepare to start processing object data
-  virtual int prepare(optional_yield y) = 0;
-
-  // complete the operation and make its result visible to clients
-  virtual int complete(size_t accounted_size, const std::string& etag,
-                       ceph::real_time *mtime, ceph::real_time set_mtime,
-                       std::map<std::string, bufferlist>& attrs,
-                       ceph::real_time delete_at,
-                       const char *if_match, const char *if_nomatch,
-                       const std::string *user_data,
-                       rgw_zone_set *zones_trace, bool *canceled,
-                       optional_yield y) = 0;
-};
-
 // an object processor with special handling for the first chunk of the head.
 // the virtual process_first_chunk() function returns a processor to handle the
 // rest of the object
-class HeadObjectProcessor : public ObjectProcessor {
+class HeadObjectProcessor : public rgw::sal::ObjectProcessor {
   uint64_t head_chunk_size;
   // buffer to capture the first chunk of the head object
   bufferlist head_data;
   // initialized after process_first_chunk() to process everything else
-  DataProcessor *processor = nullptr;
+  rgw::sal::DataProcessor *processor = nullptr;
   uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
  protected:
   uint64_t get_actual_size() const { return data_offset; }
 
   // process the first chunk of data and return a processor for the rest
   virtual int process_first_chunk(bufferlist&& data,
-                                  DataProcessor **processor) = 0;
+                                  rgw::sal::DataProcessor **processor) = 0;
  public:
   HeadObjectProcessor(uint64_t head_chunk_size)
     : head_chunk_size(head_chunk_size)
@@ -74,19 +61,62 @@ class HeadObjectProcessor : public ObjectProcessor {
   int process(bufferlist&& data, uint64_t logical_offset) final override;
 };
 
+using RawObjSet = std::set<rgw_raw_obj>;
+
+// a data sink that writes to rados objects and deletes them on cancelation
+class RadosWriter : public rgw::sal::DataProcessor {
+  Aio *const aio;
+  rgw::sal::RadosStore *const store;
+  RGWObjectCtx& obj_ctx;
+  std::unique_ptr<rgw::sal::Object> head_obj;
+  RGWSI_RADOS::Obj stripe_obj; // current stripe object
+  RawObjSet written; // set of written objects for deletion
+  const DoutPrefixProvider *dpp;
+  optional_yield y;
+
+ public:
+  RadosWriter(Aio *aio, rgw::sal::RadosStore *store,
+              RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
+              const DoutPrefixProvider *dpp, optional_yield y)
+    : aio(aio), store(store),
+      obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y)
+  {}
+  RadosWriter(RadosWriter&& r)
+    : aio(r.aio), store(r.store),
+      obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y)
+  {}
+
+  ~RadosWriter();
+
+  // change the current stripe object
+  int set_stripe_obj(const rgw_raw_obj& obj);
+
+  // write the data at the given offset of the current stripe object
+  int process(bufferlist&& data, uint64_t stripe_offset) override;
+
+  // write the data as an exclusive create and wait for it to complete
+  int write_exclusive(const bufferlist& data);
+
+  int drain();
+
+  // when the operation completes successfully, clear the set of written objects
+  // so they aren't deleted on destruction
+  void clear_written() { written.clear(); }
+
+};
+
 
 // a rados object processor that stripes according to RGWObjManifest
 class ManifestObjectProcessor : public HeadObjectProcessor,
                                 public StripeGenerator {
  protected:
-  rgw::sal::Store* const store;
-  rgw::sal::Bucket* bucket;
+  rgw::sal::RadosStore* const store;
   rgw_placement_rule tail_placement_rule;
   rgw_user owner;
   RGWObjectCtx& obj_ctx;
   std::unique_ptr<rgw::sal::Object> head_obj;
 
-  std::unique_ptr<rgw::sal::Writer> writer;
+  RadosWriter writer;
   RGWObjManifest manifest;
   RGWObjManifest::generator manifest_gen;
   ChunkProcessor chunk;
@@ -97,20 +127,17 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
   int next(uint64_t offset, uint64_t *stripe_size) override;
 
  public:
-  ManifestObjectProcessor(Aio *aio, rgw::sal::Store* store,
-                         rgw::sal::Bucket* bucket,
+  ManifestObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
                           const rgw_placement_rule *ptail_placement_rule,
                           const rgw_user& owner, RGWObjectCtx& obj_ctx,
                           std::unique_ptr<rgw::sal::Object> _head_obj,
                           const DoutPrefixProvider* dpp, optional_yield y)
     : HeadObjectProcessor(0),
-      store(store), bucket(bucket),
+      store(store),
       owner(owner),
       obj_ctx(obj_ctx), head_obj(std::move(_head_obj)),
-      dpp(dpp) {
-       writer = store->get_writer(aio, bucket, obj_ctx, head_obj->clone(), dpp, y);
-       chunk = ChunkProcessor(writer.get(), 0);
-       stripe = StripeProcessor(&chunk, this, 0);
+      writer(aio, store, obj_ctx, head_obj->clone(), dpp, y),
+      chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
         if (ptail_placement_rule) {
           tail_placement_rule = *ptail_placement_rule;
         }
@@ -137,10 +164,9 @@ class AtomicObjectProcessor : public ManifestObjectProcessor {
   const std::string unique_tag;
   bufferlist first_chunk; // written with the head in complete()
 
-  int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+  int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
  public:
-  AtomicObjectProcessor(Aio *aio, rgw::sal::Store* store,
-                       rgw::sal::Bucket* bucket,
+  AtomicObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
                         const rgw_placement_rule *ptail_placement_rule,
                         const rgw_user& owner,
                         RGWObjectCtx& obj_ctx,
@@ -148,7 +174,7 @@ class AtomicObjectProcessor : public ManifestObjectProcessor {
                         std::optional<uint64_t> olh_epoch,
                         const std::string& unique_tag,
                         const DoutPrefixProvider *dpp, optional_yield y)
-    : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule,
+    : ManifestObjectProcessor(aio, store, ptail_placement_rule,
                               owner, obj_ctx, std::move(_head_obj), dpp, y),
       olh_epoch(olh_epoch), unique_tag(unique_tag)
   {}
@@ -180,19 +206,18 @@ class MultipartObjectProcessor : public ManifestObjectProcessor {
 
   // write the first chunk and wait on aio->drain() for its completion.
   // on EEXIST, retry with random prefix
-  int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+  int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
   // prepare the head stripe and manifest
   int prepare_head();
  public:
-  MultipartObjectProcessor(Aio *aio, rgw::sal::Store* store,
-                          rgw::sal::Bucket* bucket,
+  MultipartObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
                            const rgw_placement_rule *ptail_placement_rule,
                            const rgw_user& owner, RGWObjectCtx& obj_ctx,
                            std::unique_ptr<rgw::sal::Object> _head_obj,
                            const std::string& upload_id, uint64_t part_num,
                            const std::string& part_num_str,
                            const DoutPrefixProvider *dpp, optional_yield y)
-    : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule,
+    : ManifestObjectProcessor(aio, store, ptail_placement_rule,
                               owner, obj_ctx, std::move(_head_obj), dpp, y),
       target_obj(head_obj->clone()), upload_id(upload_id),
       part_num(part_num), part_num_str(part_num_str),
@@ -224,18 +249,17 @@ class MultipartObjectProcessor : public ManifestObjectProcessor {
 
     RGWObjManifest *cur_manifest;
 
-    int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+    int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
 
   public:
-    AppendObjectProcessor(Aio *aio, rgw::sal::Store* store,
-                         rgw::sal::Bucket* bucket,
+    AppendObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
                           const rgw_placement_rule *ptail_placement_rule,
                           const rgw_user& owner, RGWObjectCtx& obj_ctx,
                          std::unique_ptr<rgw::sal::Object> _head_obj,
                           const std::string& unique_tag, uint64_t position,
                           uint64_t *cur_accounted_size,
                           const DoutPrefixProvider *dpp, optional_yield y)
-            : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule,
+            : ManifestObjectProcessor(aio, store, ptail_placement_rule,
                                       owner, obj_ctx, std::move(_head_obj), dpp, y),
               position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
               unique_tag(unique_tag), cur_manifest(nullptr)
index e1b31cfd998e7e74a6469f6adb2e088f76fa0890..5f96a5b329caf81d5b76d5a645d58c5e5090e555 100644 (file)
@@ -3328,13 +3328,13 @@ class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
   const DoutPrefixProvider *dpp;
   CephContext* cct;
   rgw_obj obj;
-  rgw::putobj::DataProcessor *filter;
+  rgw::sal::DataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
   bool try_etag_verify;
   rgw::putobj::etag_verifier_ptr etag_verifier;
   boost::optional<rgw::putobj::ChunkProcessor> buffering;
   CompressorRef& plugin;
-  rgw::putobj::ObjectProcessor *processor;
+  rgw::sal::ObjectProcessor *processor;
   void (*progress_cb)(off_t, void *);
   void *progress_data;
   bufferlist extra_data_bl, manifest_bl;
@@ -3352,7 +3352,7 @@ public:
                  CephContext* cct,
                  CompressorRef& plugin,
                  boost::optional<RGWPutObj_Compress>& compressor,
-                 rgw::putobj::ObjectProcessor *p,
+                 rgw::sal::ObjectProcessor *p,
                  void (*_progress_cb)(off_t, void *),
                  void *_progress_data,
                  std::function<int(map<string, bufferlist>&)> _attrs_handler) :
@@ -3557,12 +3557,11 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs,
   }
 }
 
-int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y)
+int RGWRados::rewrite_obj(rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y)
 {
   RGWObjectCtx rctx(this->store);
-  rgw::sal::RadosBucket bucket(store, dest_bucket_info);
 
-  return obj->copy_obj_data(rctx, &bucket, obj, 0, NULL, dpp, y);
+  return obj->copy_obj_data(rctx, obj->get_bucket(), obj, 0, NULL, dpp, y);
 }
 
 struct obj_time_weight {
@@ -3841,7 +3840,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
   rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  AtomicObjectProcessor processor(&aio, this->store, dest_bucket, nullptr, user_id,
+  AtomicObjectProcessor processor(&aio, this->store, nullptr, user_id,
                                   obj_ctx, dest_obj->clone(), olh_epoch,
                                  tag, dpp, null_yield);
   RGWRESTConn *conn;
@@ -4510,7 +4509,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   using namespace rgw::putobj;
   // do not change the null_yield in the initialization of this AtomicObjectProcessor
   // it causes crashes in the ragweed tests
-  AtomicObjectProcessor processor(&aio, this->store, bucket, &dest_placement,
+  AtomicObjectProcessor processor(&aio, this->store, &dest_placement,
                                   bucket->get_info().owner, obj_ctx,
                                   dest_obj->clone(), olh_epoch, tag,
                                  dpp, null_yield);
@@ -6146,7 +6145,7 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
     return 0;
   }
   RGWRados *store = target->get_store();
-  BucketShard *bs;
+  BucketShard *bs = nullptr;
 
   int ret = get_bucket_shard(&bs, dpp);
   if (ret < 0) {
index a94bf6c9fb011075c8ca9cc986564339ca5c57b1..6df125049d9062c497659f7af8f39b93a2c1e8f3 100644 (file)
@@ -1104,7 +1104,7 @@ public:
 
   D3nDataCache* d3n_data_cache{nullptr};
 
-  int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y);
+  int rewrite_obj(rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y);
 
   int stat_remote_obj(const DoutPrefixProvider *dpp,
                RGWObjectCtx& obj_ctx,
index 03608d9e09d83c5276130877c094f4025fb58c37..6597eb4e78b5105c5130a644fad654f2a7cbd91e 100644 (file)
@@ -2572,8 +2572,8 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter(
 }
 
 int RGWPutObj_ObjStore_S3::get_encrypt_filter(
-    std::unique_ptr<rgw::putobj::DataProcessor> *filter,
-    rgw::putobj::DataProcessor *cb)
+    std::unique_ptr<rgw::sal::DataProcessor> *filter,
+    rgw::sal::DataProcessor *cb)
 {
   int res = 0;
   if (!multipart_upload_id.empty()) {
@@ -3125,8 +3125,8 @@ done:
 }
 
 int RGWPostObj_ObjStore_S3::get_encrypt_filter(
-    std::unique_ptr<rgw::putobj::DataProcessor> *filter,
-    rgw::putobj::DataProcessor *cb)
+    std::unique_ptr<rgw::sal::DataProcessor> *filter,
+    rgw::sal::DataProcessor *cb)
 {
   std::unique_ptr<BlockCrypt> block_crypt;
   int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt,
index cc562ece807ea4c742e5aed6aef172a5d512185d..7e2d787cf97979a0a8566fcc4f894b9572af44a8 100644 (file)
@@ -277,8 +277,8 @@ public:
   int get_data(bufferlist& bl) override;
   void send_response() override;
 
-  int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
-                         rgw::putobj::DataProcessor *cb) override;
+  int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+                         rgw::sal::DataProcessor *cb) override;
   int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
                          RGWGetObj_Filter* cb,
                          map<string, bufferlist>& attrs,
@@ -316,8 +316,8 @@ public:
 
   void send_response() override;
   int get_data(ceph::bufferlist& bl, bool& again) override;
-  int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
-                         rgw::putobj::DataProcessor *cb) override;
+  int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+                         rgw::sal::DataProcessor *cb) override;
 };
 
 class RGWDeleteObj_ObjStore_S3 : public RGWDeleteObj_ObjStore {
index 79bced546b9c95bab62e34ee3f07fcd46b49d3bb..a48b09353da7d4f38daae8822e5ca0bcf01f88e0 100644 (file)
@@ -17,7 +17,6 @@
 
 #include "rgw_user.h"
 #include "rgw_notify_event_type.h"
-#include "rgw_putobj.h"
 
 class RGWGetDataCB;
 struct RGWObjState;
@@ -118,6 +117,34 @@ enum AttrsMod {
   ATTRSMOD_MERGE   = 2
 };
 
+// a simple streaming data processing abstraction
+class DataProcessor {
+ public:
+  virtual ~DataProcessor() {}
+
+  // consume a bufferlist in its entirety at the given object offset. an
+  // empty bufferlist is given to request that any buffered data be flushed,
+  // though this doesn't wait for completions
+  virtual int process(bufferlist&& data, uint64_t offset) = 0;
+};
+
+// a data consumer that writes an object in a bucket
+class ObjectProcessor : public DataProcessor {
+ public:
+  // prepare to start processing object data
+  virtual int prepare(optional_yield y) = 0;
+
+  // complete the operation and make its result visible to clients
+  virtual int complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y) = 0;
+};
+
 /**
  * Base class for AIO completions
  */
@@ -176,9 +203,6 @@ class Store {
     virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, 
         rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
     virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) = 0;
-    virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
-              RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
-              const DoutPrefixProvider* dpp, optional_yield y) = 0;
     virtual RGWLC* get_rgwlc(void) = 0;
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0;
     virtual int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) = 0;
@@ -232,6 +256,21 @@ class Store {
                                   const std::string& tenant,
                                   vector<std::unique_ptr<RGWOIDCProvider>>& providers) = 0;
     virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) = 0;
+    virtual std::unique_ptr<Writer> get_append_writer(const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 const std::string& unique_tag,
+                                 uint64_t position,
+                                 uint64_t *cur_accounted_size) = 0;
+    virtual std::unique_ptr<Writer> get_atomic_writer(const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 uint64_t olh_epoch,
+                                 const std::string& unique_tag) = 0;
 
     virtual void finalize(void) = 0;
 
@@ -848,6 +887,14 @@ public:
 
   virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0;
 
+  virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
+                         optional_yield y,
+                         std::unique_ptr<rgw::sal::Object> _head_obj,
+                         const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                         const rgw_placement_rule *ptail_placement_rule,
+                         uint64_t part_num,
+                         const std::string& part_num_str) = 0;
+
   friend inline ostream& operator<<(ostream& out, const MultipartUpload& u) {
     out << u.get_meta();
     if (!u.get_upload_id().empty())
@@ -953,45 +1000,29 @@ protected:
     virtual void delete_inline(const DoutPrefixProvider *dpp, const std::string& tag) = 0;
 };
 
-using RawObjSet = std::set<rgw_raw_obj>;
-
-class Writer : public rgw::putobj::DataProcessor {
+class Writer : public ObjectProcessor {
 protected:
-  Aio* const aio;
-  rgw::sal::Bucket* bucket;
-  RGWObjectCtx& obj_ctx;
-  std::unique_ptr<rgw::sal::Object> head_obj;
-  RawObjSet written; // set of written objects for deletion
   const DoutPrefixProvider* dpp;
-  optional_yield y;
-
- public:
-  Writer(Aio* aio,
-             rgw::sal::Bucket* bucket,
-              RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
-              const DoutPrefixProvider* dpp, optional_yield y)
-    : aio(aio), bucket(bucket),
-      obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y)
-  {}
-  Writer(Writer&& r)
-    : aio(r.aio), bucket(r.bucket),
-      obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y)
-  {}
-
-  ~Writer() = default;
-
-  // change the current stripe object
-  virtual int set_stripe_obj(const rgw_raw_obj& obj) = 0;
-
-  // write the data as an exclusive create and wait for it to complete
-  virtual int write_exclusive(const bufferlist& data) = 0;
-
-  virtual int drain() = 0;
-
-  // when the operation completes successfully, clear the set of written objects
-  // so they aren't deleted on destruction
-  virtual void clear_written() { written.clear(); }
 
+public:
+  Writer(const DoutPrefixProvider *_dpp, optional_yield y) : dpp(_dpp) {}
+  virtual ~Writer() = default;
+
+  // prepare to start processing object data
+  virtual int prepare(optional_yield y) = 0;
+
+  // Process a bufferlist
+  virtual int process(bufferlist&& data, uint64_t offset) = 0;
+
+  // complete the operation and make its result visible to clients
+  virtual int complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y) = 0;
 };
 
 class Zone {
index 94b0d04e739a36c601d6beab84ba04266db07cba..8b5ff1fbc92b227cc50a4cdceec6b53181c94cb4 100644 (file)
@@ -28,6 +28,7 @@
 #include "rgw_multi.h"
 #include "rgw_acl_s3.h"
 #include "rgw_aio.h"
+#include "rgw_aio_throttle.h"
 
 #include "rgw_zone.h"
 #include "rgw_rest_conn.h"
@@ -115,19 +116,6 @@ static int rgw_op_get_bucket_policy_from_attr(const DoutPrefixProvider* dpp,
   return 0;
 }
 
-static int process_completed(const AioResultList& completed, RawObjSet* written)
-{
-  std::optional<int> error;
-  for (auto& r : completed) {
-    if (r.result >= 0) {
-      written->insert(r.obj.get_ref().obj);
-    } else if (!error) { // record first error code
-      error = r.result;
-    }
-  }
-  return error.value_or(0);
-}
-
 int RadosCompletions::drain()
 {
   int ret = 0;
@@ -1199,13 +1187,6 @@ std::unique_ptr<GCChain> RadosStore::get_gc_chain(rgw::sal::Object* obj)
   return std::unique_ptr<GCChain>(new RadosGCChain(this, obj));
 }
 
-std::unique_ptr<Writer> RadosStore::get_writer(Aio* aio, rgw::sal::Bucket* bucket,
-              RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
-              const DoutPrefixProvider* dpp, optional_yield y)
-{
-  return std::unique_ptr<Writer>(new RadosWriter(aio, this, bucket, obj_ctx, std::move(_head_obj), dpp, y));
-}
-
 int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
 {
   return rados->delete_raw_obj(dpp, obj);
@@ -1471,6 +1452,40 @@ std::unique_ptr<MultipartUpload> RadosStore::get_multipart_upload(Bucket* bucket
   return std::unique_ptr<MultipartUpload>(new RadosMultipartUpload(this, bucket, oid, upload_id, mtime));
 }
 
+std::unique_ptr<Writer> RadosStore::get_append_writer(const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 const std::string& unique_tag,
+                                 uint64_t position,
+                                 uint64_t *cur_accounted_size)
+{
+  auto aio = rgw::make_throttle(ctx()->_conf->rgw_put_obj_min_window_size, y);
+  return std::unique_ptr<Writer>(new RadosAppendWriter(dpp, y,
+                                std::move(_head_obj),
+                                this, std::move(aio), owner, obj_ctx,
+                                ptail_placement_rule,
+                                unique_tag, position,
+                                cur_accounted_size));
+}
+
+std::unique_ptr<Writer> RadosStore::get_atomic_writer(const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 uint64_t olh_epoch,
+                                 const std::string& unique_tag)
+{
+  auto aio = rgw::make_throttle(ctx()->_conf->rgw_put_obj_min_window_size, y);
+  return std::unique_ptr<Writer>(new RadosAtomicWriter(dpp, y,
+                                std::move(_head_obj),
+                                this, std::move(aio), owner, obj_ctx,
+                                ptail_placement_rule,
+                                olh_epoch, unique_tag));
+}
+
 int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx* ioctx)
 {
   return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx);
@@ -2478,6 +2493,21 @@ int RadosMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield
   return 0;
 }
 
+std::unique_ptr<Writer> RadosMultipartUpload::get_writer(
+                                 const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 uint64_t part_num,
+                                 const std::string& part_num_str)
+{
+  auto aio = rgw::make_throttle(store->ctx()->_conf->rgw_put_obj_min_window_size, y);
+  return std::unique_ptr<Writer>(new RadosMultipartWriter(dpp, y, this,
+                                std::move(_head_obj), store, std::move(aio), owner,
+                                obj_ctx, ptail_placement_rule, part_num, part_num_str));
+}
+
 MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
   lock(lock_name)
 {
@@ -2636,97 +2666,73 @@ void RadosGCChain::delete_inline(const DoutPrefixProvider *dpp, const std::strin
   store->getRados()->delete_objs_inline(dpp, chain, tag);
 }
 
-int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
+int RadosAtomicWriter::prepare(optional_yield y)
 {
-  stripe_obj = store->svc()->rados->obj(raw_obj);
-  return stripe_obj.open(dpp);
+  return processor.prepare(y);
 }
 
-int RadosWriter::process(bufferlist&& bl, uint64_t offset)
+int RadosAtomicWriter::process(bufferlist&& data, uint64_t offset)
 {
-  bufferlist data = std::move(bl);
-  const uint64_t cost = data.length();
-  if (cost == 0) { // no empty writes, use aio directly for creates
-    return 0;
-  }
-  librados::ObjectWriteOperation op;
-  if (offset == 0) {
-    op.write_full(data);
-  } else {
-    op.write(offset, data);
-  }
-  constexpr uint64_t id = 0; // unused
-  auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
-  return process_completed(c, &written);
+  return processor.process(std::move(data), offset);
 }
 
-int RadosWriter::write_exclusive(const bufferlist& data)
+int RadosAtomicWriter::complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y)
 {
-  const uint64_t cost = data.length();
-
-  librados::ObjectWriteOperation op;
-  op.create(true); // exclusive create
-  op.write_full(data);
-
-  constexpr uint64_t id = 0; // unused
-  auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
-  auto d = aio->drain();
-  c.splice(c.end(), d);
-  return process_completed(c, &written);
+  return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+                           if_match, if_nomatch, user_data, zones_trace, canceled, y);
 }
 
-int RadosWriter::drain()
+int RadosAppendWriter::prepare(optional_yield y)
 {
-  return process_completed(aio->drain(), &written);
+  return processor.prepare(y);
 }
 
-RadosWriter::~RadosWriter()
+int RadosAppendWriter::process(bufferlist&& data, uint64_t offset)
 {
-  // wait on any outstanding aio completions
-  process_completed(aio->drain(), &written);
-
-  bool need_to_remove_head = false;
-  std::optional<rgw_raw_obj> raw_head;
-  if (!rgw::sal::Object::empty(head_obj.get())) {
-    raw_head.emplace();
-    dynamic_cast<RadosObject*>(head_obj.get())->get_raw_obj(&*raw_head);
-  }
+  return processor.process(std::move(data), offset);
+}
 
-  /**
-   * We should delete the object in the "multipart" namespace to avoid race condition.
-   * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
-   * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
-   * written by the second upload may be deleted by the first upload.
-   * details is describled on #11749
-   *
-   * The above comment still stands, but instead of searching for a specific object in the multipart
-   * namespace, we just make sure that we remove the object that is marked as the head object after
-   * we remove all the other raw objects. Note that we use different call to remove the head object,
-   * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
-   */
-  for (const auto& obj : written) {
-    if (raw_head && obj == *raw_head) {
-      ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
-      need_to_remove_head = true;
-      continue;
-    }
+int RadosAppendWriter::complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y)
+{
+  return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+                           if_match, if_nomatch, user_data, zones_trace, canceled, y);
+}
 
-    int r = store->delete_raw_obj(dpp, obj);
-    if (r < 0 && r != -ENOENT) {
-      ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
-    }
-  }
+int RadosMultipartWriter::prepare(optional_yield y)
+{
+  return processor.prepare(y);
+}
 
-  if (need_to_remove_head) {
-    ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
-    std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = head_obj->get_delete_op(&obj_ctx);
-    del_op->params.bucket_owner = bucket->get_acl_owner();
+int RadosMultipartWriter::process(bufferlist&& data, uint64_t offset)
+{
+  return processor.process(std::move(data), offset);
+}
 
-    int r = del_op->delete_obj(dpp, y);
-    if (r < 0 && r != -ENOENT) {
-      ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
-    }
-  }
+int RadosMultipartWriter::complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y)
+{
+  return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+                           if_match, if_nomatch, user_data, zones_trace, canceled, y);
 }
 
 const RGWZoneGroup& RadosZone::get_zonegroup()
index dc057ffba40312d1c86b94393952ab6ca062dafc..7d51e24637e95c1cab92bdba5071d5f7ea65e06b 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_oidc_provider.h"
 #include "rgw_role.h"
 #include "rgw_multi.h"
+#include "rgw_putobj_processor.h"
 #include "services/svc_tier_rados.h"
 #include "cls/lock/cls_lock_client.h"
 
@@ -424,9 +425,6 @@ class RadosStore : public Store {
     virtual std::unique_ptr<Completions> get_completions(void) override;
     virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
     virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) override;
-    virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
-              RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
-              const DoutPrefixProvider* dpp, optional_yield y) override;
     virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
     virtual int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) override;
@@ -480,6 +478,21 @@ class RadosStore : public Store {
                                   const std::string& tenant,
                                   vector<std::unique_ptr<RGWOIDCProvider>>& providers) override;
     virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) override;
+    virtual std::unique_ptr<Writer> get_append_writer(const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 const std::string& unique_tag,
+                                 uint64_t position,
+                                 uint64_t *cur_accounted_size) override;
+    virtual std::unique_ptr<Writer> get_atomic_writer(const DoutPrefixProvider *dpp,
+                                 optional_yield y,
+                                 std::unique_ptr<rgw::sal::Object> _head_obj,
+                                 const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                                 const rgw_placement_rule *ptail_placement_rule,
+                                 uint64_t olh_epoch,
+                                 const std::string& unique_tag) override;
 
     virtual void finalize(void) override;
 
@@ -556,6 +569,13 @@ public:
                       uint64_t& accounted_size, bool& compressed,
                       RGWCompressionInfo& cs_info, off_t& ofs) override;
   virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
+  virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
+                         optional_yield y,
+                         std::unique_ptr<rgw::sal::Object> _head_obj,
+                         const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                         const rgw_placement_rule *ptail_placement_rule,
+                         uint64_t part_num,
+                         const std::string& part_num_str) override;
 };
 
 class MPRadosSerializer : public MPSerializer {
@@ -632,30 +652,130 @@ protected:
     virtual void delete_inline(const DoutPrefixProvider *dpp, const std::string& tag) override;
 };
 
-class RadosWriter : public Writer {
+class RadosAtomicWriter : public Writer {
+protected:
   rgw::sal::RadosStore* store;
-  RGWSI_RADOS::Obj stripe_obj; // current stripe object
-
- public:
-  RadosWriter(Aio* aio, rgw::sal::RadosStore* _store,
-             rgw::sal::Bucket* bucket,
-              RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
-              const DoutPrefixProvider* dpp, optional_yield y)
-    : Writer(aio, bucket, obj_ctx, std::move(_head_obj), dpp, y), store(_store)
-  {}
+  std::unique_ptr<Aio> aio;
+  rgw::putobj::AtomicObjectProcessor processor;
 
-  ~RadosWriter();
+public:
+  RadosAtomicWriter(const DoutPrefixProvider *dpp,
+                   optional_yield y,
+                   std::unique_ptr<rgw::sal::Object> _head_obj,
+                   RadosStore* _store, std::unique_ptr<Aio> _aio,
+                   const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                   const rgw_placement_rule *ptail_placement_rule,
+                   uint64_t olh_epoch,
+                   const std::string& unique_tag) :
+                       Writer(dpp, y),
+                       store(_store),
+                       aio(std::move(_aio)),
+                       processor(&*aio, store,
+                                 ptail_placement_rule, owner, obj_ctx,
+                                 std::move(_head_obj), olh_epoch, unique_tag,
+                                 dpp, y)
+  {}
+  ~RadosAtomicWriter() = default;
+
+  // prepare to start processing object data
+  virtual int prepare(optional_yield y) override;
+
+  // Process a bufferlist
+  virtual int process(bufferlist&& data, uint64_t offset) override;
+
+  // complete the operation and make its result visible to clients
+  virtual int complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y) override;
+};
 
-  // change the current stripe object
-  virtual int set_stripe_obj(const rgw_raw_obj& obj) override;
+class RadosAppendWriter : public Writer {
+protected:
+  rgw::sal::RadosStore* store;
+  std::unique_ptr<Aio> aio;
+  rgw::putobj::AppendObjectProcessor processor;
 
-  // write the data at the given offset of the current stripe object
-  virtual int process(bufferlist&& data, uint64_t stripe_offset) override;
+public:
+  RadosAppendWriter(const DoutPrefixProvider *dpp,
+                   optional_yield y,
+                   std::unique_ptr<rgw::sal::Object> _head_obj,
+                   RadosStore* _store, std::unique_ptr<Aio> _aio,
+                   const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                   const rgw_placement_rule *ptail_placement_rule,
+                   const std::string& unique_tag,
+                   uint64_t position,
+                   uint64_t *cur_accounted_size) :
+                                 Writer(dpp, y),
+                                 store(_store),
+                                 aio(std::move(_aio)),
+                                 processor(&*aio, store,
+                                           ptail_placement_rule, owner, obj_ctx,
+                                           std::move(_head_obj), unique_tag, position,
+                                           cur_accounted_size, dpp, y)
+  {}
+  ~RadosAppendWriter() = default;
+
+  // prepare to start processing object data
+  virtual int prepare(optional_yield y) override;
+
+  // Process a bufferlist
+  virtual int process(bufferlist&& data, uint64_t offset) override;
+
+  // complete the operation and make its result visible to clients
+  virtual int complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y) override;
+};
 
-  // write the data as an exclusive create and wait for it to complete
-  virtual int write_exclusive(const bufferlist& data) override;
+class RadosMultipartWriter : public Writer {
+protected:
+  rgw::sal::RadosStore* store;
+  std::unique_ptr<Aio> aio;
+  rgw::putobj::MultipartObjectProcessor processor;
 
-  virtual int drain() override;
+public:
+  RadosMultipartWriter(const DoutPrefixProvider *dpp,
+                      optional_yield y, MultipartUpload* upload,
+                      std::unique_ptr<rgw::sal::Object> _head_obj,
+                      RadosStore* _store, std::unique_ptr<Aio> _aio,
+                      const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                      const rgw_placement_rule *ptail_placement_rule,
+                      uint64_t part_num, const std::string& part_num_str) :
+                                 Writer(dpp, y),
+                                 store(_store),
+                                 aio(std::move(_aio)),
+                                 processor(&*aio, store,
+                                           ptail_placement_rule, owner, obj_ctx,
+                                           std::move(_head_obj), upload->get_upload_id(),
+                                           part_num, part_num_str, dpp, y)
+  {}
+  ~RadosMultipartWriter() = default;
+
+  // prepare to start processing object data
+  virtual int prepare(optional_yield y) override;
+
+  // Process a bufferlist
+  virtual int process(bufferlist&& data, uint64_t offset) override;
+
+  // complete the operation and make its result visible to clients
+  virtual int complete(size_t accounted_size, const std::string& etag,
+                       ceph::real_time *mtime, ceph::real_time set_mtime,
+                       std::map<std::string, bufferlist>& attrs,
+                       ceph::real_time delete_at,
+                       const char *if_match, const char *if_nomatch,
+                       const std::string *user_data,
+                       rgw_zone_set *zones_trace, bool *canceled,
+                       optional_yield y) override;
 };
 
 class RadosLuaScriptManager : public LuaScriptManager {
index 1a3d33f7f526bb58901d0df495954483992cb1f3..be95c472e40418aa83730c1eeca3ae15245f82da 100644 (file)
@@ -488,16 +488,16 @@ int RGWDataAccess::Object::put(bufferlist& data,
 
   string req_id = store->zone_unique_id(store->get_new_req_id());
 
-  using namespace rgw::putobj;
-  AtomicObjectProcessor processor(&aio, store, b.get(), nullptr,
-                                  owner.get_id(), obj_ctx, std::move(obj), olh_epoch,
-                                  req_id, dpp, y);
+  std::unique_ptr<rgw::sal::Writer> processor;
+  processor = store->get_atomic_writer(dpp, y, std::move(obj),
+                                      owner.get_id(), obj_ctx,
+                                      nullptr, olh_epoch, req_id);
 
-  int ret = processor.prepare(y);
+  int ret = processor->prepare(y);
   if (ret < 0)
     return ret;
 
-  DataProcessor *filter = &processor;
+  rgw::sal::DataProcessor *filter = processor.get();
 
   CompressorRef plugin;
   boost::optional<RGWPutObj_Compress> compressor;
@@ -570,7 +570,7 @@ int RGWDataAccess::Object::put(bufferlist& data,
     puser_data = &(*user_data);
   }
 
-  return processor.complete(obj_size, etag,
+  return processor->complete(obj_size, etag,
                            &mtime, mtime,
                            attrs, delete_at,
                             nullptr, nullptr,
index 218a8c01bd34d809a06b3ade785f48e3bb9e9f37..d0ecfe7bb3a641bca9e12d0ba220b9436536badc 100644 (file)
@@ -194,7 +194,7 @@ public:
 
     ceph::real_time mtime;
     string etag;
-    std::optional<uint64_t> olh_epoch;
+    uint64_t olh_epoch{0};
     ceph::real_time delete_at;
     std::optional<string> user_data;
 
index b1e3403281ad41fb8f3db927bd1129ce9574d2c1..81b213e24b32e130d310aa9029243c7fad842670 100644 (file)
@@ -48,7 +48,7 @@ public:
   }
 };
 
-class ut_put_sink: public rgw::putobj::DataProcessor
+class ut_put_sink: public rgw::sal::DataProcessor
 {
   bufferlist sink;
 public:
index b0dbabfd4e22ad8e87f47dc6b627efea936e099d..edf24f41bb68e5a050c96ad6d68b143d16f2a003 100644 (file)
@@ -44,7 +44,7 @@ public:
   }
 };
 
-class ut_put_sink: public rgw::putobj::DataProcessor
+class ut_put_sink: public rgw::sal::DataProcessor
 {
   std::stringstream sink;
 public:
index 91661f45055ff0268251079f2e6e5d4073a3b3c1..f26a9c2d3858def1621ad69607a5bca2e6a24c56 100644 (file)
@@ -32,7 +32,7 @@ inline std::ostream& operator<<(std::ostream& out, const Op& op) {
   return out << "{off=" << op.offset << " data='" << op.data << "'}";
 }
 
-struct MockProcessor : rgw::putobj::DataProcessor {
+struct MockProcessor : rgw::sal::DataProcessor {
   std::vector<Op> ops;
 
   int process(bufferlist&& data, uint64_t offset) override {