From: Casey Bodley Date: Wed, 10 Oct 2018 19:54:41 +0000 (-0400) Subject: rgw: remove old PutObjProcessor stack X-Git-Tag: v14.1.0~1156^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F24453%2Fhead;p=ceph.git rgw: remove old PutObjProcessor stack Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 81eea2e1fd48..f5bab237faeb 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3255,139 +3255,6 @@ int RGWPutObj::verify_permission() return 0; } -void RGWPutObjProcessor_Multipart::get_mp(RGWMPObj** _mp){ - *_mp = ∓ -} - -int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, string *oid_rand) -{ - string oid = obj_str; - upload_id = s->info.args.get("uploadId"); - if (!oid_rand) { - mp.init(oid, upload_id); - } else { - mp.init(oid, upload_id, *oid_rand); - } - - part_num = s->info.args.get("partNumber"); - if (part_num.empty()) { - ldpp_dout(s, 10) << "part number is empty" << dendl; - return -EINVAL; - } - - string err; - uint64_t num = (uint64_t)strict_strtol(part_num.c_str(), 10, &err); - - if (!err.empty()) { - ldpp_dout(s, 10) << "bad part number: " << part_num << ": " << err << dendl; - return -EINVAL; - } - - string upload_prefix = oid + "."; - - if (!oid_rand) { - upload_prefix.append(upload_id); - } else { - upload_prefix.append(*oid_rand); - } - - rgw_obj target_obj; - target_obj.init(bucket, oid); - - manifest.set_prefix(upload_prefix); - - manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, num); - - int r = manifest_gen.create_begin(store->ctx(), &manifest, s->bucket_info.placement_rule, bucket, target_obj); - if (r < 0) { - return r; - } - - cur_obj = manifest_gen.get_cur_obj(store); - rgw_raw_obj_to_obj(bucket, cur_obj, &head_obj); - head_obj.index_hash_source = obj_str; - - r = prepare_init(store, NULL); - if (r < 0) { - return r; - } - - return 0; -} - -int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size, - const string& etag, - real_time *mtime, real_time set_mtime, - map& attrs, - real_time delete_at, - const char *if_match, - const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace) -{ - complete_writing_data(); - - RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj); - op_target.set_versioning_disabled(true); - RGWRados::Object::Write head_obj_op(&op_target); - - head_obj_op.meta.set_mtime = set_mtime; - head_obj_op.meta.mtime = mtime; - head_obj_op.meta.owner = s->owner.get_id(); - head_obj_op.meta.delete_at = delete_at; - head_obj_op.meta.zones_trace = zones_trace; - head_obj_op.meta.modify_tail = true; - - int r = head_obj_op.write_meta(obj_len, accounted_size, attrs); - if (r < 0) - return r; - - bufferlist bl; - RGWUploadPartInfo info; - string p = "part."; - bool sorted_omap = is_v2_upload_id(upload_id); - - if (sorted_omap) { - string err; - int part_num_int = strict_strtol(part_num.c_str(), 10, &err); - if (!err.empty()) { - dout(10) << "bad part number specified: " << part_num << dendl; - return -EINVAL; - } - char buf[32]; - snprintf(buf, sizeof(buf), "%08d", part_num_int); - p.append(buf); - } else { - p.append(part_num); - } - info.num = atoi(part_num.c_str()); - info.etag = etag; - info.size = obj_len; - info.accounted_size = accounted_size; - info.modified = real_clock::now(); - info.manifest = manifest; - - bool compressed; - r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); - if (r < 0) { - dout(1) << "cannot get compression info" << dendl; - return r; - } - - encode(info, bl); - - string multipart_meta_obj = mp.get_meta(); - - rgw_obj meta_obj; - meta_obj.init_ns(bucket, multipart_meta_obj, mp_ns); - meta_obj.set_in_extra_data(true); - - rgw_raw_obj raw_meta_obj; - - store->obj_to_raw(s->bucket_info.placement_rule, meta_obj, &raw_meta_obj); - const bool must_exist = true;// detect races with abort - r = store->omap_set(raw_meta_obj, p, bl, must_exist); - return r; -} - void RGWPutObj::pre_exec() { rgw_bucket_object_pre_exec(s); diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 61428021c825..e727fe18b287 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1086,22 +1086,6 @@ public: uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } }; -class RGWPutObj_Filter : public RGWPutObjDataProcessor -{ -protected: - RGWPutObjDataProcessor* next; -public: - explicit RGWPutObj_Filter(RGWPutObjDataProcessor* next) : - next(next){} - ~RGWPutObj_Filter() override {} - int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override { - return next->handle_data(bl, ofs, phandle, pobj, again); - } - int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override { - return next->throttle_data(handle, obj, size, need_to_wait); - } -}; /* RGWPutObj_Filter */ - class RGWPostObj : public RGWOp { protected: off_t min_len; @@ -1854,38 +1838,6 @@ extern int rgw_build_object_policies(RGWRados *store, struct req_state *s, extern rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store, struct req_state* s); -static inline int put_data_and_throttle(RGWPutObjDataProcessor *processor, - bufferlist& data, off_t ofs, - bool need_to_wait) -{ - bool again = false; - do { - void *handle = nullptr; - rgw_raw_obj obj; - - uint64_t size = data.length(); - - int ret = processor->handle_data(data, ofs, &handle, &obj, &again); - if (ret < 0) - return ret; - if (handle != nullptr) - { - ret = processor->throttle_data(handle, obj, size, need_to_wait); - if (ret < 0) - return ret; - } - else - break; - need_to_wait = false; /* the need to wait only applies to the first - * iteration */ - } while (again); - - return 0; -} /* put_data_and_throttle */ - - - - static inline int get_system_versioning_params(req_state *s, uint64_t *olh_epoch, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7bc52497a14c..770a84334d18 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2464,401 +2464,6 @@ void RGWObjVersionTracker::generate_new_write_ver(CephContext *cct) append_rand_alpha(cct, write_version.tag, write_version.tag, TAG_LEN); } -int RGWPutObjProcessor::complete(size_t accounted_size, const string& etag, - real_time *mtime, real_time set_mtime, - map& attrs, real_time delete_at, - const char *if_match, const char *if_nomatch, const string *user_data, - rgw_zone_set *zones_trace) -{ - int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data, zones_trace); - if (r < 0) - return r; - - is_complete = !canceled; - return 0; -} - -CephContext *RGWPutObjProcessor::ctx() -{ - return store->ctx(); -} - -RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio() -{ - drain_pending(); - - if (is_complete) - return; - - set::iterator iter; - bool need_to_remove_head = false; - rgw_raw_obj raw_head; - - if (!head_obj.empty()) { - store->obj_to_raw(bucket_info.placement_rule, head_obj, &raw_head); - } - - /** - * We should delete the object in the "multipart" namespace to avoid race condition. - * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart - * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects - * written by the second upload may be deleted by the first upload. - * details is describled on #11749 - * - * The above comment still stands, but instead of searching for a specific object in the multipart - * namespace, we just make sure that we remove the object that is marked as the head object after - * we remove all the other raw objects. Note that we use different call to remove the head object, - * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme. - */ - for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) { - const rgw_raw_obj& obj = *iter; - if (!head_obj.empty() && obj == raw_head) { - ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl; - need_to_remove_head = true; - continue; - } - - int r = store->delete_raw_obj(obj); - if (r < 0 && r != -ENOENT) { - ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; - } - } - - if (need_to_remove_head) { - ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << raw_head << ")" << dendl; - int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0); - if (r < 0 && r != -ENOENT) { - ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << raw_head << "), leaked" << dendl; - } - } -} - -int RGWPutObjProcessor_Aio::handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive) -{ - if ((uint64_t)abs_ofs + bl.length() > obj_len) - obj_len = abs_ofs + bl.length(); - - if (!(obj == last_written_obj)) { - last_written_obj = obj; - } - - // For the first call pass -1 as the offset to - // do a write_full. - return store->aio_put_obj_data(NULL, obj, bl, ((ofs != 0) ? ofs : -1), exclusive, phandle); -} - -struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending() -{ - struct put_obj_aio_info info; - info = pending.front(); - pending.pop_front(); - pending_size -= info.size; - return info; -} - -int RGWPutObjProcessor_Aio::wait_pending_front() -{ - if (pending.empty()) { - return 0; - } - struct put_obj_aio_info info = pop_pending(); - int ret = store->aio_wait(info.handle); - - if (ret >= 0) { - add_written_obj(info.obj); - } - - return ret; -} - -bool RGWPutObjProcessor_Aio::pending_has_completed() -{ - if (pending.empty()) - return false; - - struct put_obj_aio_info& info = pending.front(); - return store->aio_completed(info.handle); -} - -int RGWPutObjProcessor_Aio::drain_pending() -{ - int ret = 0; - while (!pending.empty()) { - int r = wait_pending_front(); - if (r < 0) - ret = r; - } - return ret; -} - -int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) -{ - bool _wait = need_to_wait; - - if (handle) { - struct put_obj_aio_info info; - info.handle = handle; - info.obj = obj; - info.size = size; - pending_size += size; - pending.push_back(info); - } - size_t orig_size = pending_size; - - /* first drain complete IOs */ - while (pending_has_completed()) { - int r = wait_pending_front(); - if (r < 0) - return r; - - _wait = false; - } - - /* resize window in case messages are draining too fast */ - if (orig_size - pending_size >= window_size) { - window_size += store->ctx()->_conf->rgw_max_chunk_size; - uint64_t max_window_size = store->ctx()->_conf->rgw_put_obj_max_window_size; - if (window_size > max_window_size) { - window_size = max_window_size; - } - } - - /* now throttle. Note that need_to_wait should only affect the first IO operation */ - if (pending_size > window_size || _wait) { - int r = wait_pending_front(); - if (r < 0) - return r; - } - return 0; -} - -int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool exclusive) -{ - if (ofs >= next_part_ofs) { - int r = prepare_next_part(ofs); - if (r < 0) { - return r; - } - } - - *pobj = cur_obj; - - if (!bl.length()) { - *phandle = nullptr; - return 0; - } - - return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); -} - -int RGWPutObjProcessor_Aio::prepare(RGWRados *store, string *oid_rand) -{ - RGWPutObjProcessor::prepare(store, oid_rand); - - window_size = store->ctx()->_conf->rgw_put_obj_min_window_size; - - return 0; -} - -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) -{ - *phandle = NULL; - uint64_t max_write_size = std::min(max_chunk_size, (uint64_t)next_part_ofs - data_ofs); - - pending_data_bl.claim_append(bl); - if (pending_data_bl.length() < max_write_size) { - *again = false; - return 0; - } - - pending_data_bl.splice(0, max_write_size, &bl); - - /* do we have enough data pending accumulated that needs to be written? */ - *again = (pending_data_bl.length() >= max_chunk_size); - - if (!data_ofs && !immutable_head()) { - first_chunk.claim(bl); - obj_len = (uint64_t)first_chunk.length(); - int r = prepare_next_part(obj_len); - if (r < 0) { - return r; - } - data_ofs = obj_len; - return 0; - } - off_t write_ofs = data_ofs; - data_ofs = write_ofs + bl.length(); - bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there - we could be racing with another upload, to the same - object and cleanup can be messy */ - int ret = write_data(bl, write_ofs, phandle, pobj, exclusive); - if (ret >= 0) { /* we might return, need to clear bl as it was already sent */ - bl.clear(); - } - return ret; -} - - -int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand) -{ - RGWPutObjProcessor_Aio::prepare(store, oid_rand); - - int r = store->get_max_chunk_size(bucket_info.placement_rule, head_obj, &max_chunk_size); - if (r < 0) { - return r; - } - - return 0; -} - -int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, string *oid_rand) -{ - head_obj.init(bucket, obj_str); - - int r = prepare_init(store, oid_rand); - if (r < 0) { - return r; - } - - if (versioned_object) { - if (!version_id.empty()) { - head_obj.key.set_instance(version_id); - } else { - store->gen_rand_obj_instance_name(&head_obj); - version_id = head_obj.key.get_instance(); - } - } - - manifest.set_trivial_rule(max_chunk_size, store->ctx()->_conf->rgw_obj_stripe_size); - - r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, head_obj.bucket, head_obj); - if (r < 0) { - return r; - } - - return 0; -} - -int RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) { - - int ret = manifest_gen.create_next(ofs); - if (ret < 0) { - lderr(store->ctx()) << "ERROR: manifest_gen.create_next() returned ret=" << ret << dendl; - return ret; - } - cur_part_ofs = ofs; - next_part_ofs = ofs + manifest_gen.cur_stripe_max_size(); - cur_obj = manifest_gen.get_cur_obj(store); - - return 0; -} - -int RGWPutObjProcessor_Atomic::complete_parts() -{ - if (obj_len > (uint64_t)cur_part_ofs) { - return prepare_next_part(obj_len); - } - return 0; -} - -int RGWPutObjProcessor_Atomic::complete_writing_data() -{ - if (!data_ofs && !immutable_head()) { - /* only claim if pending_data_bl() is not empty. This is needed because we might be called twice - * (e.g., when a retry due to race happens). So a second call to first_chunk.claim() would - * clobber first_chunk - */ - if (pending_data_bl.length() > 0) { - first_chunk.claim(pending_data_bl); - } - obj_len = (uint64_t)first_chunk.length(); - } - while (pending_data_bl.length()) { - void *handle = nullptr; - rgw_raw_obj obj; - uint64_t max_write_size = std::min(max_chunk_size, (uint64_t)next_part_ofs - data_ofs); - if (max_write_size > pending_data_bl.length()) { - max_write_size = pending_data_bl.length(); - } - bufferlist bl; - pending_data_bl.splice(0, max_write_size, &bl); - uint64_t write_len = bl.length(); - int r = write_data(bl, data_ofs, &handle, &obj, false); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl; - return r; - } - data_ofs += write_len; - r = throttle_data(handle, obj, write_len, false); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl; - return r; - } - - if (data_ofs >= next_part_ofs) { - r = prepare_next_part(data_ofs); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: prepare_next_part() returned " << r << dendl; - return r; - } - } - } - int r = complete_parts(); - if (r < 0) { - return r; - } - - r = drain_pending(); - if (r < 0) - return r; - - return 0; -} - -int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string& etag, - real_time *mtime, real_time set_mtime, - map& attrs, - real_time delete_at, - const char *if_match, - const char *if_nomatch, const string *user_data, - rgw_zone_set *zones_trace) { - int r = complete_writing_data(); - if (r < 0) - return r; - - obj_ctx.obj.set_atomic(head_obj); - - RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj); - - /* some object types shouldn't be versioned, e.g., multipart parts */ - op_target.set_versioning_disabled(!versioned_object); - - RGWRados::Object::Write obj_op(&op_target); - - obj_op.meta.data = &first_chunk; - obj_op.meta.manifest = &manifest; - obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */ - obj_op.meta.if_match = if_match; - obj_op.meta.if_nomatch = if_nomatch; - obj_op.meta.mtime = mtime; - obj_op.meta.set_mtime = set_mtime; - obj_op.meta.owner = bucket_info.owner; - obj_op.meta.flags = PUT_OBJ_CREATE; - obj_op.meta.olh_epoch = olh_epoch; - obj_op.meta.delete_at = delete_at; - obj_op.meta.user_data = user_data; - obj_op.meta.zones_trace = zones_trace; - obj_op.meta.modify_tail = true; - - r = obj_op.write_meta(obj_len, accounted_size, attrs); - if (r < 0) { - return r; - } - - canceled = obj_op.meta.canceled; - - return 0; -} int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) { int r = control_pool_ctx.watch2(oid, watch_handle, ctx); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 641fc9a79bed..ccf067d197f1 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -3818,168 +3818,6 @@ public: } }; /* RGWChainedCacheImpl */ -/** - * Base of PUT operation. - * Allow to create chained data transformers like compresors and encryptors. - */ -class RGWPutObjDataProcessor -{ -public: - RGWPutObjDataProcessor(){} - virtual ~RGWPutObjDataProcessor(){} - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) = 0; - virtual int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) = 0; -}; /* RGWPutObjDataProcessor */ - - -class RGWPutObjProcessor : public RGWPutObjDataProcessor -{ -protected: - RGWRados *store; - RGWObjectCtx& obj_ctx; - bool is_complete; - RGWBucketInfo bucket_info; - bool canceled; - - virtual int do_complete(size_t accounted_size, const string& etag, - ceph::real_time *mtime, ceph::real_time set_mtime, - map& attrs, ceph::real_time delete_at, - const char *if_match, const char *if_nomatch, const string *user_data, - rgw_zone_set* zones_trace = nullptr) = 0; - -public: - RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), - obj_ctx(_obj_ctx), - is_complete(false), - bucket_info(_bi), - canceled(false) {} - ~RGWPutObjProcessor() override {} - virtual int prepare(RGWRados *_store, string *oid_rand) { - store = _store; - return 0; - } - - int complete(size_t accounted_size, const string& etag, - ceph::real_time *mtime, ceph::real_time set_mtime, - map& attrs, ceph::real_time delete_at, - const char *if_match = NULL, const char *if_nomatch = NULL, const string *user_data = nullptr, - rgw_zone_set *zones_trace = nullptr); - - CephContext *ctx(); - - bool is_canceled() { return canceled; } -}; /* RGWPutObjProcessor */ - -struct put_obj_aio_info { - void *handle; - rgw_raw_obj obj; - uint64_t size; -}; - -#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024) - -class RGWPutObjProcessor_Aio : public RGWPutObjProcessor -{ - list pending; - uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT}; - uint64_t pending_size{0}; - - struct put_obj_aio_info pop_pending(); - int wait_pending_front(); - bool pending_has_completed(); - - rgw_raw_obj last_written_obj; - -protected: - uint64_t obj_len{0}; - - set written_objs; - rgw_obj head_obj; - - void add_written_obj(const rgw_raw_obj& obj) { - written_objs.insert(obj); - } - - int drain_pending(); - int handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive); - -public: - int prepare(RGWRados *store, string *oid_rand) override; - int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override; - - RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {} - ~RGWPutObjProcessor_Aio() override; -}; /* RGWPutObjProcessor_Aio */ - -class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio -{ - bufferlist first_chunk; - uint64_t part_size; - off_t cur_part_ofs; - off_t next_part_ofs; - int cur_part_id; - off_t data_ofs; - - bufferlist pending_data_bl; - uint64_t max_chunk_size; - - bool versioned_object; - std::optional olh_epoch; - string version_id; - -protected: - rgw_bucket bucket; - string obj_str; - - string unique_tag; - - rgw_raw_obj cur_obj; - RGWObjManifest manifest; - RGWObjManifest::generator manifest_gen; - - int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool exclusive); - int do_complete(size_t accounted_size, const string& etag, - ceph::real_time *mtime, ceph::real_time set_mtime, - map& attrs, ceph::real_time delete_at, - const char *if_match, const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace) override; - - int prepare_next_part(off_t ofs); - int complete_parts(); - int complete_writing_data(); - - int prepare_init(RGWRados *store, string *oid_rand); - -public: - ~RGWPutObjProcessor_Atomic() override {} - RGWPutObjProcessor_Atomic(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, - rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) : - RGWPutObjProcessor_Aio(obj_ctx, bucket_info), - part_size(_p), - cur_part_ofs(0), - next_part_ofs(_p), - cur_part_id(0), - data_ofs(0), - max_chunk_size(0), - versioned_object(versioned), - bucket(_b), - obj_str(_o), - unique_tag(_t) {} - int prepare(RGWRados *store, string *oid_rand) override; - virtual bool immutable_head() { return false; } - int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override; - - void set_olh_epoch(uint64_t epoch) { - olh_epoch = epoch; - } - - void set_version_id(const string& vid) { - version_id = vid; - } - - const string& get_version_id() const { - return version_id; - } -}; /* RGWPutObjProcessor_Atomic */ #define MP_META_SUFFIX ".meta" @@ -4047,26 +3885,6 @@ public: } }; -class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic -{ - string part_num; - RGWMPObj mp; - req_state *s; - string upload_id; - -protected: - int prepare(RGWRados *store, string *oid_rand) override; - int do_complete(size_t accounted_size, const string& etag, - ceph::real_time *mtime, ceph::real_time set_mtime, - map& attrs, ceph::real_time delete_at, - const char *if_match, const char *if_nomatch, const string *user_data, - rgw_zone_set *zones_trace) override; -public: - bool immutable_head() override { return true; } - RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, uint64_t _p, req_state *_s) : - RGWPutObjProcessor_Atomic(obj_ctx, bucket_info, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {} - void get_mp(RGWMPObj** _mp); -}; /* RGWPutObjProcessor_Multipart */ class RGWRadosThread { class Worker : public Thread {