]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add rgw::putobj::MultipartObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:12:07 +0000 (15:12 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_putobj_processor.h

index 1d6ce79069e27afa1728857f2f5642a1d544d807..b68d7b701a2487f0d4e216453e6ccd48f7614a0d 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "rgw_putobj_aio.h"
 #include "rgw_putobj_processor.h"
+#include "rgw_multi.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -298,4 +299,162 @@ int AtomicObjectProcessor::complete(size_t accounted_size,
   return 0;
 }
 
+
+int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
+                                                  DataProcessor **processor)
+{
+  // write the first chunk of the head object as part of an exclusive create,
+  // then drain to wait for the result in case of EEXIST
+  int r = writer.write_exclusive(data);
+  if (r == -EEXIST) {
+    // randomize the oid prefix and reprepare the head/manifest
+    std::string oid_rand(32, 0);
+    gen_rand_alphanumeric(store->ctx(), oid_rand.data(), oid_rand.size());
+
+    mp.init(target_obj.key.name, upload_id, oid_rand);
+    manifest.set_prefix(target_obj.key.name + "." + oid_rand);
+
+    r = prepare_head();
+    if (r < 0) {
+      return r;
+    }
+    // resubmit the write op on the new head object
+    r = writer.write_exclusive(data);
+  }
+  if (r < 0) {
+    return r;
+  }
+  *processor = &stripe;
+  return 0;
+}
+
+int MultipartObjectProcessor::prepare_head()
+{
+  int r = manifest_gen.create_begin(store->ctx(), &manifest,
+                                    bucket_info.placement_rule,
+                                    target_obj.bucket, target_obj);
+  if (r < 0) {
+    return r;
+  }
+
+  rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
+  rgw_raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj);
+  head_obj.index_hash_source = target_obj.key.name;
+
+  uint64_t chunk_size = 0;
+  r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
+  if (r < 0) {
+    return r;
+  }
+  r = writer.set_stripe_obj(std::move(stripe_obj));
+  if (r < 0) {
+    return r;
+  }
+  uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
+
+  uint64_t max_head_size = std::min(chunk_size, stripe_size);
+  set_head_chunk_size(max_head_size);
+
+  chunk = ChunkProcessor(&writer, chunk_size);
+  stripe = StripeProcessor(&chunk, this, stripe_size);
+  return 0;
+}
+
+int MultipartObjectProcessor::prepare()
+{
+  const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
+  manifest.set_multipart_part_rule(default_stripe_size, part_num);
+  manifest.set_prefix(target_obj.key.name + "." + upload_id);
+
+  return prepare_head();
+}
+
+int MultipartObjectProcessor::complete(size_t accounted_size,
+                                       const std::string& etag,
+                                       ceph::real_time *mtime,
+                                       ceph::real_time set_mtime,
+                                       std::map<std::string, bufferlist>& attrs,
+                                       ceph::real_time delete_at,
+                                       const char *if_match,
+                                       const char *if_nomatch,
+                                       const std::string *user_data,
+                                       rgw_zone_set *zones_trace,
+                                       bool *pcanceled)
+{
+  int r = writer.drain();
+  if (r < 0) {
+    return r;
+  }
+  const uint64_t actual_size = get_actual_size();
+  r = manifest_gen.create_next(actual_size);
+  if (r < 0) {
+    return r;
+  }
+
+  RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
+  op_target.set_versioning_disabled(true);
+  RGWRados::Object::Write obj_op(&op_target);
+
+  obj_op.meta.set_mtime = set_mtime;
+  obj_op.meta.mtime = mtime;
+  obj_op.meta.owner = owner;
+  obj_op.meta.delete_at = delete_at;
+  obj_op.meta.zones_trace = zones_trace;
+  obj_op.meta.modify_tail = true;
+
+  r = obj_op.write_meta(actual_size, accounted_size, attrs);
+  if (r < 0)
+    return r;
+
+  bufferlist bl;
+  RGWUploadPartInfo info;
+  string p = "part.";
+  bool sorted_omap = is_v2_upload_id(upload_id);
+
+  if (sorted_omap) {
+    char buf[32];
+    snprintf(buf, sizeof(buf), "%08d", part_num);
+    p.append(buf);
+  } else {
+    p.append(part_num_str);
+  }
+  info.num = part_num;
+  info.etag = etag;
+  info.size = actual_size;
+  info.accounted_size = accounted_size;
+  info.modified = real_clock::now();
+  info.manifest = manifest;
+
+  bool compressed;
+  r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
+  if (r < 0) {
+    ldout(store->ctx(), 1) << "cannot get compression info" << dendl;
+    return r;
+  }
+
+  encode(info, bl);
+
+  rgw_obj meta_obj;
+  meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
+  meta_obj.set_in_extra_data(true);
+
+  rgw_raw_obj raw_meta_obj;
+
+  store->obj_to_raw(bucket_info.placement_rule, meta_obj, &raw_meta_obj);
+  const bool must_exist = true;// detect races with abort
+  r = store->omap_set(raw_meta_obj, p, bl, must_exist);
+  if (r < 0) {
+    return r;
+  }
+
+  if (!obj_op.meta.canceled) {
+    // on success, clear the set of objects for deletion
+    writer.clear_written();
+  }
+  if (pcanceled) {
+    *pcanceled = obj_op.meta.canceled;
+  }
+  return 0;
+}
+
 } // namespace rgw::putobj
index 670161640686f53f759b04004244dc1fe5a43f69..c7342833b1abfbb220fc0c7b26278ca2f5ea42c8 100644 (file)
@@ -167,4 +167,46 @@ class AtomicObjectProcessor : public ManifestObjectProcessor {
                rgw_zone_set *zones_trace, bool *canceled) override;
 };
 
+
+// a processor for multipart parts, which don't require atomic completion. the
+// part's head is written with an exclusive create to detect racing uploads of
+// the same part/upload id, which are restarted with a random oid prefix
+class MultipartObjectProcessor : public ManifestObjectProcessor {
+  const rgw_obj target_obj; // target multipart object
+  const std::string upload_id;
+  const int part_num;
+  const std::string part_num_str;
+  RGWMPObj mp;
+
+  // write the first chunk and wait on aio->drain() for its completion.
+  // on EEXIST, retry with random prefix
+  int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+  // prepare the head stripe and manifest
+  int prepare_head();
+ public:
+  MultipartObjectProcessor(Aio *aio, RGWRados *store,
+                           const RGWBucketInfo& bucket_info,
+                           const rgw_user& owner, RGWObjectCtx& obj_ctx,
+                           const rgw_obj& head_obj,
+                           const std::string& upload_id, uint64_t part_num,
+                           const std::string& part_num_str)
+    : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
+      target_obj(head_obj), upload_id(upload_id),
+      part_num(part_num), part_num_str(part_num_str),
+      mp(head_obj.key.name, upload_id)
+  {}
+
+  // prepare a multipart manifest
+  int prepare() override;
+  // write the head object attributes in a bucket index transaction, then
+  // register the completed part with the multipart meta object
+  int complete(size_t accounted_size, const std::string& etag,
+               ceph::real_time *mtime, ceph::real_time set_mtime,
+               std::map<std::string, bufferlist>& attrs,
+               ceph::real_time delete_at,
+               const char *if_match, const char *if_nomatch,
+               const std::string *user_data,
+               rgw_zone_set *zones_trace, bool *canceled) override;
+};
+
 } // namespace rgw::putobj