]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add rgw::putobj::AtomicObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:12:05 +0000 (15:12 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_putobj_processor.h

index c671fffd0b1d54ea1f497fc56b55f5cce8d0b22a..1d6ce79069e27afa1728857f2f5642a1d544d807 100644 (file)
@@ -190,4 +190,112 @@ int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
   return 0;
 }
 
+
+int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
+                                               DataProcessor **processor)
+{
+  first_chunk = std::move(data);
+  *processor = &stripe;
+  return 0;
+}
+
+int AtomicObjectProcessor::prepare()
+{
+  uint64_t max_chunk_size = 0;
+  int r = store->get_max_chunk_size(bucket_info.placement_rule, head_obj,
+                                    &max_chunk_size);
+  if (r < 0) {
+    return r;
+  }
+  const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
+  manifest.set_trivial_rule(max_chunk_size, default_stripe_size);
+
+  r = manifest_gen.create_begin(store->ctx(), &manifest,
+                                bucket_info.placement_rule,
+                                head_obj.bucket, head_obj);
+  if (r < 0) {
+    return r;
+  }
+
+  rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
+
+  uint64_t chunk_size = 0;
+  r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
+  if (r < 0) {
+    return r;
+  }
+  r = writer.set_stripe_obj(std::move(stripe_obj));
+  if (r < 0) {
+    return r;
+  }
+  // only the first chunk goes to the head object
+  uint64_t stripe_size = chunk_size;
+
+  set_head_chunk_size(chunk_size);
+  // initialize the processors
+  chunk = ChunkProcessor(&writer, chunk_size);
+  stripe = StripeProcessor(&chunk, this, stripe_size);
+  return 0;
+}
+
+int AtomicObjectProcessor::complete(size_t accounted_size,
+                                    const std::string& etag,
+                                    ceph::real_time *mtime,
+                                    ceph::real_time set_mtime,
+                                    std::map<std::string, bufferlist>& attrs,
+                                    ceph::real_time delete_at,
+                                    const char *if_match,
+                                    const char *if_nomatch,
+                                    const std::string *user_data,
+                                    rgw_zone_set *zones_trace,
+                                    bool *pcanceled)
+{
+  int r = writer.drain();
+  if (r < 0) {
+    return r;
+  }
+  const uint64_t actual_size = get_actual_size();
+  r = manifest_gen.create_next(actual_size);
+  if (r < 0) {
+    return r;
+  }
+
+  obj_ctx.obj.set_atomic(head_obj);
+
+  RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
+
+  /* some object types shouldn't be versioned, e.g., multipart parts */
+  op_target.set_versioning_disabled(!bucket_info.versioning_enabled());
+
+  RGWRados::Object::Write obj_op(&op_target);
+
+  obj_op.meta.data = &first_chunk;
+  obj_op.meta.manifest = &manifest;
+  obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
+  obj_op.meta.if_match = if_match;
+  obj_op.meta.if_nomatch = if_nomatch;
+  obj_op.meta.mtime = mtime;
+  obj_op.meta.set_mtime = set_mtime;
+  obj_op.meta.owner = owner;
+  obj_op.meta.flags = PUT_OBJ_CREATE;
+  obj_op.meta.olh_epoch = olh_epoch;
+  obj_op.meta.delete_at = delete_at;
+  obj_op.meta.user_data = user_data;
+  obj_op.meta.zones_trace = zones_trace;
+  obj_op.meta.modify_tail = true;
+
+  r = obj_op.write_meta(actual_size, accounted_size, attrs);
+  if (r < 0) {
+    return r;
+  }
+  if (!obj_op.meta.canceled) {
+    // on success, clear the set of objects for deletion
+    writer.clear_written();
+  }
+  if (pcanceled) {
+    *pcanceled = obj_op.meta.canceled;
+  }
+  return 0;
+}
+
 } // namespace rgw::putobj
index 3e5a9077e8193b40b70a3adae244d20aa592b7c6..670161640686f53f759b04004244dc1fe5a43f69 100644 (file)
@@ -136,4 +136,35 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
   {}
 };
 
+
+// a processor that completes with an atomic write to the head object as part of
+// a bucket index transaction
+class AtomicObjectProcessor : public ManifestObjectProcessor {
+  const std::optional<uint64_t> olh_epoch;
+  const std::string unique_tag;
+  bufferlist first_chunk; // written with the head in complete()
+
+  int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+ public:
+  AtomicObjectProcessor(Aio *aio, RGWRados *store,
+                        const RGWBucketInfo& bucket_info, const rgw_user& owner,
+                        RGWObjectCtx& obj_ctx, const rgw_obj& head_obj,
+                        std::optional<uint64_t> olh_epoch,
+                        const std::string& unique_tag)
+    : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
+      olh_epoch(olh_epoch), unique_tag(unique_tag)
+  {}
+
+  // prepare a trivial manifest
+  int prepare() override;
+  // write the head object atomically in a bucket index transaction
+  int complete(size_t accounted_size, const std::string& etag,
+               ceph::real_time *mtime, ceph::real_time set_mtime,
+               std::map<std::string, bufferlist>& attrs,
+               ceph::real_time delete_at,
+               const char *if_match, const char *if_nomatch,
+               const std::string *user_data,
+               rgw_zone_set *zones_trace, bool *canceled) override;
+};
+
 } // namespace rgw::putobj