From: Casey Bodley Date: Wed, 10 Oct 2018 19:12:07 +0000 (-0400) Subject: rgw: add rgw::putobj::MultipartObjectProcessor X-Git-Tag: v14.1.0~1156^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5de7718f455e94cbc3ea40a955277ea7e1c7e26c;p=ceph.git rgw: add rgw::putobj::MultipartObjectProcessor Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc index 1d6ce79069e2..b68d7b701a24 100644 --- a/src/rgw/rgw_putobj_processor.cc +++ b/src/rgw/rgw_putobj_processor.cc @@ -14,6 +14,7 @@ #include "rgw_putobj_aio.h" #include "rgw_putobj_processor.h" +#include "rgw_multi.h" #define dout_subsys ceph_subsys_rgw @@ -298,4 +299,162 @@ int AtomicObjectProcessor::complete(size_t accounted_size, return 0; } + +int MultipartObjectProcessor::process_first_chunk(bufferlist&& data, + DataProcessor **processor) +{ + // write the first chunk of the head object as part of an exclusive create, + // then drain to wait for the result in case of EEXIST + int r = writer.write_exclusive(data); + if (r == -EEXIST) { + // randomize the oid prefix and reprepare the head/manifest + std::string oid_rand(32, 0); + gen_rand_alphanumeric(store->ctx(), oid_rand.data(), oid_rand.size()); + + mp.init(target_obj.key.name, upload_id, oid_rand); + manifest.set_prefix(target_obj.key.name + "." + oid_rand); + + r = prepare_head(); + if (r < 0) { + return r; + } + // resubmit the write op on the new head object + r = writer.write_exclusive(data); + } + if (r < 0) { + return r; + } + *processor = &stripe; + return 0; +} + +int MultipartObjectProcessor::prepare_head() +{ + int r = manifest_gen.create_begin(store->ctx(), &manifest, + bucket_info.placement_rule, + target_obj.bucket, target_obj); + if (r < 0) { + return r; + } + + rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); + rgw_raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj); + head_obj.index_hash_source = target_obj.key.name; + + uint64_t chunk_size = 0; + r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size); + if (r < 0) { + return r; + } + r = writer.set_stripe_obj(std::move(stripe_obj)); + if (r < 0) { + return r; + } + uint64_t stripe_size = manifest_gen.cur_stripe_max_size(); + + uint64_t max_head_size = std::min(chunk_size, stripe_size); + set_head_chunk_size(max_head_size); + + chunk = ChunkProcessor(&writer, chunk_size); + stripe = StripeProcessor(&chunk, this, stripe_size); + return 0; +} + +int MultipartObjectProcessor::prepare() +{ + const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size; + manifest.set_multipart_part_rule(default_stripe_size, part_num); + manifest.set_prefix(target_obj.key.name + "." + upload_id); + + return prepare_head(); +} + +int MultipartObjectProcessor::complete(size_t accounted_size, + const std::string& etag, + ceph::real_time *mtime, + ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, + const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, + bool *pcanceled) +{ + int r = writer.drain(); + if (r < 0) { + return r; + } + const uint64_t actual_size = get_actual_size(); + r = manifest_gen.create_next(actual_size); + if (r < 0) { + return r; + } + + RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj); + op_target.set_versioning_disabled(true); + RGWRados::Object::Write obj_op(&op_target); + + obj_op.meta.set_mtime = set_mtime; + obj_op.meta.mtime = mtime; + obj_op.meta.owner = owner; + obj_op.meta.delete_at = delete_at; + obj_op.meta.zones_trace = zones_trace; + obj_op.meta.modify_tail = true; + + r = obj_op.write_meta(actual_size, accounted_size, attrs); + if (r < 0) + return r; + + bufferlist bl; + RGWUploadPartInfo info; + string p = "part."; + bool sorted_omap = is_v2_upload_id(upload_id); + + if (sorted_omap) { + char buf[32]; + snprintf(buf, sizeof(buf), "%08d", part_num); + p.append(buf); + } else { + p.append(part_num_str); + } + info.num = part_num; + info.etag = etag; + info.size = actual_size; + info.accounted_size = accounted_size; + info.modified = real_clock::now(); + info.manifest = manifest; + + bool compressed; + r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); + if (r < 0) { + ldout(store->ctx(), 1) << "cannot get compression info" << dendl; + return r; + } + + encode(info, bl); + + rgw_obj meta_obj; + meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART); + meta_obj.set_in_extra_data(true); + + rgw_raw_obj raw_meta_obj; + + store->obj_to_raw(bucket_info.placement_rule, meta_obj, &raw_meta_obj); + const bool must_exist = true;// detect races with abort + r = store->omap_set(raw_meta_obj, p, bl, must_exist); + if (r < 0) { + return r; + } + + if (!obj_op.meta.canceled) { + // on success, clear the set of objects for deletion + writer.clear_written(); + } + if (pcanceled) { + *pcanceled = obj_op.meta.canceled; + } + return 0; +} + } // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index 670161640686..c7342833b1ab 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -167,4 +167,46 @@ class AtomicObjectProcessor : public ManifestObjectProcessor { rgw_zone_set *zones_trace, bool *canceled) override; }; + +// a processor for multipart parts, which don't require atomic completion. the +// part's head is written with an exclusive create to detect racing uploads of +// the same part/upload id, which are restarted with a random oid prefix +class MultipartObjectProcessor : public ManifestObjectProcessor { + const rgw_obj target_obj; // target multipart object + const std::string upload_id; + const int part_num; + const std::string part_num_str; + RGWMPObj mp; + + // write the first chunk and wait on aio->drain() for its completion. + // on EEXIST, retry with random prefix + int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + // prepare the head stripe and manifest + int prepare_head(); + public: + MultipartObjectProcessor(Aio *aio, RGWRados *store, + const RGWBucketInfo& bucket_info, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_obj& head_obj, + const std::string& upload_id, uint64_t part_num, + const std::string& part_num_str) + : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj), + target_obj(head_obj), upload_id(upload_id), + part_num(part_num), part_num_str(part_num_str), + mp(head_obj.key.name, upload_id) + {} + + // prepare a multipart manifest + int prepare() override; + // write the head object attributes in a bucket index transaction, then + // register the completed part with the multipart meta object + int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled) override; +}; + } // namespace rgw::putobj