From: Casey Bodley Date: Wed, 10 Oct 2018 19:12:05 +0000 (-0400) Subject: rgw: add rgw::putobj::AtomicObjectProcessor X-Git-Tag: v14.1.0~1156^2~19 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9ff79e39dc662e49fbac36dbfaabe768dbde98ef;p=ceph.git rgw: add rgw::putobj::AtomicObjectProcessor Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc index c671fffd0b1d..1d6ce79069e2 100644 --- a/src/rgw/rgw_putobj_processor.cc +++ b/src/rgw/rgw_putobj_processor.cc @@ -190,4 +190,112 @@ int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size) return 0; } + +int AtomicObjectProcessor::process_first_chunk(bufferlist&& data, + DataProcessor **processor) +{ + first_chunk = std::move(data); + *processor = &stripe; + return 0; +} + +int AtomicObjectProcessor::prepare() +{ + uint64_t max_chunk_size = 0; + int r = store->get_max_chunk_size(bucket_info.placement_rule, head_obj, + &max_chunk_size); + if (r < 0) { + return r; + } + const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size; + manifest.set_trivial_rule(max_chunk_size, default_stripe_size); + + r = manifest_gen.create_begin(store->ctx(), &manifest, + bucket_info.placement_rule, + head_obj.bucket, head_obj); + if (r < 0) { + return r; + } + + rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); + + uint64_t chunk_size = 0; + r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size); + if (r < 0) { + return r; + } + r = writer.set_stripe_obj(std::move(stripe_obj)); + if (r < 0) { + return r; + } + // only the first chunk goes to the head object + uint64_t stripe_size = chunk_size; + + set_head_chunk_size(chunk_size); + // initialize the processors + chunk = ChunkProcessor(&writer, chunk_size); + stripe = StripeProcessor(&chunk, this, stripe_size); + return 0; +} + +int AtomicObjectProcessor::complete(size_t accounted_size, + const std::string& etag, + ceph::real_time *mtime, + ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, + const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, + bool *pcanceled) +{ + int r = writer.drain(); + if (r < 0) { + return r; + } + const uint64_t actual_size = get_actual_size(); + r = manifest_gen.create_next(actual_size); + if (r < 0) { + return r; + } + + obj_ctx.obj.set_atomic(head_obj); + + RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj); + + /* some object types shouldn't be versioned, e.g., multipart parts */ + op_target.set_versioning_disabled(!bucket_info.versioning_enabled()); + + RGWRados::Object::Write obj_op(&op_target); + + obj_op.meta.data = &first_chunk; + obj_op.meta.manifest = &manifest; + obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */ + obj_op.meta.if_match = if_match; + obj_op.meta.if_nomatch = if_nomatch; + obj_op.meta.mtime = mtime; + obj_op.meta.set_mtime = set_mtime; + obj_op.meta.owner = owner; + obj_op.meta.flags = PUT_OBJ_CREATE; + obj_op.meta.olh_epoch = olh_epoch; + obj_op.meta.delete_at = delete_at; + obj_op.meta.user_data = user_data; + obj_op.meta.zones_trace = zones_trace; + obj_op.meta.modify_tail = true; + + r = obj_op.write_meta(actual_size, accounted_size, attrs); + if (r < 0) { + return r; + } + if (!obj_op.meta.canceled) { + // on success, clear the set of objects for deletion + writer.clear_written(); + } + if (pcanceled) { + *pcanceled = obj_op.meta.canceled; + } + return 0; +} + } // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index 3e5a9077e819..670161640686 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -136,4 +136,35 @@ class ManifestObjectProcessor : public HeadObjectProcessor, {} }; + +// a processor that completes with an atomic write to the head object as part of +// a bucket index transaction +class AtomicObjectProcessor : public ManifestObjectProcessor { + const std::optional olh_epoch; + const std::string unique_tag; + bufferlist first_chunk; // written with the head in complete() + + int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + public: + AtomicObjectProcessor(Aio *aio, RGWRados *store, + const RGWBucketInfo& bucket_info, const rgw_user& owner, + RGWObjectCtx& obj_ctx, const rgw_obj& head_obj, + std::optional olh_epoch, + const std::string& unique_tag) + : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj), + olh_epoch(olh_epoch), unique_tag(unique_tag) + {} + + // prepare a trivial manifest + int prepare() override; + // write the head object atomically in a bucket index transaction + int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled) override; +}; + } // namespace rgw::putobj