return 0;
}
+
+int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
+ DataProcessor **processor)
+{
+ first_chunk = std::move(data);
+ *processor = &stripe;
+ return 0;
+}
+
+int AtomicObjectProcessor::prepare()
+{
+ uint64_t max_chunk_size = 0;
+ int r = store->get_max_chunk_size(bucket_info.placement_rule, head_obj,
+ &max_chunk_size);
+ if (r < 0) {
+ return r;
+ }
+ const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
+ manifest.set_trivial_rule(max_chunk_size, default_stripe_size);
+
+ r = manifest_gen.create_begin(store->ctx(), &manifest,
+ bucket_info.placement_rule,
+ head_obj.bucket, head_obj);
+ if (r < 0) {
+ return r;
+ }
+
+ rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
+
+ uint64_t chunk_size = 0;
+ r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
+ if (r < 0) {
+ return r;
+ }
+ r = writer.set_stripe_obj(std::move(stripe_obj));
+ if (r < 0) {
+ return r;
+ }
+ // only the first chunk goes to the head object
+ uint64_t stripe_size = chunk_size;
+
+ set_head_chunk_size(chunk_size);
+ // initialize the processors
+ chunk = ChunkProcessor(&writer, chunk_size);
+ stripe = StripeProcessor(&chunk, this, stripe_size);
+ return 0;
+}
+
+int AtomicObjectProcessor::complete(size_t accounted_size,
+ const std::string& etag,
+ ceph::real_time *mtime,
+ ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match,
+ const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace,
+ bool *pcanceled)
+{
+ int r = writer.drain();
+ if (r < 0) {
+ return r;
+ }
+ const uint64_t actual_size = get_actual_size();
+ r = manifest_gen.create_next(actual_size);
+ if (r < 0) {
+ return r;
+ }
+
+ obj_ctx.obj.set_atomic(head_obj);
+
+ RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
+
+ /* some object types shouldn't be versioned, e.g., multipart parts */
+ op_target.set_versioning_disabled(!bucket_info.versioning_enabled());
+
+ RGWRados::Object::Write obj_op(&op_target);
+
+ obj_op.meta.data = &first_chunk;
+ obj_op.meta.manifest = &manifest;
+ obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
+ obj_op.meta.if_match = if_match;
+ obj_op.meta.if_nomatch = if_nomatch;
+ obj_op.meta.mtime = mtime;
+ obj_op.meta.set_mtime = set_mtime;
+ obj_op.meta.owner = owner;
+ obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.olh_epoch = olh_epoch;
+ obj_op.meta.delete_at = delete_at;
+ obj_op.meta.user_data = user_data;
+ obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
+
+ r = obj_op.write_meta(actual_size, accounted_size, attrs);
+ if (r < 0) {
+ return r;
+ }
+ if (!obj_op.meta.canceled) {
+ // on success, clear the set of objects for deletion
+ writer.clear_written();
+ }
+ if (pcanceled) {
+ *pcanceled = obj_op.meta.canceled;
+ }
+ return 0;
+}
+
} // namespace rgw::putobj
{}
};
+
+// a processor that completes with an atomic write to the head object as part of
+// a bucket index transaction
+class AtomicObjectProcessor : public ManifestObjectProcessor {
+ const std::optional<uint64_t> olh_epoch;
+ const std::string unique_tag;
+ bufferlist first_chunk; // written with the head in complete()
+
+ int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+ public:
+ AtomicObjectProcessor(Aio *aio, RGWRados *store,
+ const RGWBucketInfo& bucket_info, const rgw_user& owner,
+ RGWObjectCtx& obj_ctx, const rgw_obj& head_obj,
+ std::optional<uint64_t> olh_epoch,
+ const std::string& unique_tag)
+ : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
+ olh_epoch(olh_epoch), unique_tag(unique_tag)
+ {}
+
+ // prepare a trivial manifest
+ int prepare() override;
+ // write the head object atomically in a bucket index transaction
+ int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled) override;
+};
+
} // namespace rgw::putobj