#include "rgw_putobj_aio.h"
#include "rgw_putobj_processor.h"
+#include "rgw_multi.h"
#define dout_subsys ceph_subsys_rgw
return 0;
}
+
+int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
+ DataProcessor **processor)
+{
+ // write the first chunk of the head object as part of an exclusive create,
+ // then drain to wait for the result in case of EEXIST
+ int r = writer.write_exclusive(data);
+ if (r == -EEXIST) {
+ // randomize the oid prefix and reprepare the head/manifest
+ std::string oid_rand(32, 0);
+ gen_rand_alphanumeric(store->ctx(), oid_rand.data(), oid_rand.size());
+
+ mp.init(target_obj.key.name, upload_id, oid_rand);
+ manifest.set_prefix(target_obj.key.name + "." + oid_rand);
+
+ r = prepare_head();
+ if (r < 0) {
+ return r;
+ }
+ // resubmit the write op on the new head object
+ r = writer.write_exclusive(data);
+ }
+ if (r < 0) {
+ return r;
+ }
+ *processor = &stripe;
+ return 0;
+}
+
+int MultipartObjectProcessor::prepare_head()
+{
+ int r = manifest_gen.create_begin(store->ctx(), &manifest,
+ bucket_info.placement_rule,
+ target_obj.bucket, target_obj);
+ if (r < 0) {
+ return r;
+ }
+
+ rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
+ rgw_raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj);
+ head_obj.index_hash_source = target_obj.key.name;
+
+ uint64_t chunk_size = 0;
+ r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
+ if (r < 0) {
+ return r;
+ }
+ r = writer.set_stripe_obj(std::move(stripe_obj));
+ if (r < 0) {
+ return r;
+ }
+ uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
+
+ uint64_t max_head_size = std::min(chunk_size, stripe_size);
+ set_head_chunk_size(max_head_size);
+
+ chunk = ChunkProcessor(&writer, chunk_size);
+ stripe = StripeProcessor(&chunk, this, stripe_size);
+ return 0;
+}
+
+int MultipartObjectProcessor::prepare()
+{
+ const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
+ manifest.set_multipart_part_rule(default_stripe_size, part_num);
+ manifest.set_prefix(target_obj.key.name + "." + upload_id);
+
+ return prepare_head();
+}
+
+int MultipartObjectProcessor::complete(size_t accounted_size,
+ const std::string& etag,
+ ceph::real_time *mtime,
+ ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match,
+ const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace,
+ bool *pcanceled)
+{
+ int r = writer.drain();
+ if (r < 0) {
+ return r;
+ }
+ const uint64_t actual_size = get_actual_size();
+ r = manifest_gen.create_next(actual_size);
+ if (r < 0) {
+ return r;
+ }
+
+ RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
+ op_target.set_versioning_disabled(true);
+ RGWRados::Object::Write obj_op(&op_target);
+
+ obj_op.meta.set_mtime = set_mtime;
+ obj_op.meta.mtime = mtime;
+ obj_op.meta.owner = owner;
+ obj_op.meta.delete_at = delete_at;
+ obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
+
+ r = obj_op.write_meta(actual_size, accounted_size, attrs);
+ if (r < 0)
+ return r;
+
+ bufferlist bl;
+ RGWUploadPartInfo info;
+ string p = "part.";
+ bool sorted_omap = is_v2_upload_id(upload_id);
+
+ if (sorted_omap) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%08d", part_num);
+ p.append(buf);
+ } else {
+ p.append(part_num_str);
+ }
+ info.num = part_num;
+ info.etag = etag;
+ info.size = actual_size;
+ info.accounted_size = accounted_size;
+ info.modified = real_clock::now();
+ info.manifest = manifest;
+
+ bool compressed;
+ r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
+ if (r < 0) {
+ ldout(store->ctx(), 1) << "cannot get compression info" << dendl;
+ return r;
+ }
+
+ encode(info, bl);
+
+ rgw_obj meta_obj;
+ meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
+ meta_obj.set_in_extra_data(true);
+
+ rgw_raw_obj raw_meta_obj;
+
+ store->obj_to_raw(bucket_info.placement_rule, meta_obj, &raw_meta_obj);
+ const bool must_exist = true;// detect races with abort
+ r = store->omap_set(raw_meta_obj, p, bl, must_exist);
+ if (r < 0) {
+ return r;
+ }
+
+ if (!obj_op.meta.canceled) {
+ // on success, clear the set of objects for deletion
+ writer.clear_written();
+ }
+ if (pcanceled) {
+ *pcanceled = obj_op.meta.canceled;
+ }
+ return 0;
+}
+
} // namespace rgw::putobj
rgw_zone_set *zones_trace, bool *canceled) override;
};
+
+// a processor for multipart parts, which don't require atomic completion. the
+// part's head is written with an exclusive create to detect racing uploads of
+// the same part/upload id, which are restarted with a random oid prefix
+class MultipartObjectProcessor : public ManifestObjectProcessor {
+ const rgw_obj target_obj; // target multipart object
+ const std::string upload_id;
+ const int part_num;
+ const std::string part_num_str;
+ RGWMPObj mp;
+
+ // write the first chunk and wait on aio->drain() for its completion.
+ // on EEXIST, retry with random prefix
+ int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+ // prepare the head stripe and manifest
+ int prepare_head();
+ public:
+ MultipartObjectProcessor(Aio *aio, RGWRados *store,
+ const RGWBucketInfo& bucket_info,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_obj& head_obj,
+ const std::string& upload_id, uint64_t part_num,
+ const std::string& part_num_str)
+ : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
+ target_obj(head_obj), upload_id(upload_id),
+ part_num(part_num), part_num_str(part_num_str),
+ mp(head_obj.key.name, upload_id)
+ {}
+
+ // prepare a multipart manifest
+ int prepare() override;
+ // write the head object attributes in a bucket index transaction, then
+ // register the completed part with the multipart meta object
+ int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled) override;
+};
+
} // namespace rgw::putobj