return 0;
}
-void RGWPutObjProcessor_Multipart::get_mp(RGWMPObj** _mp){
- *_mp = ∓
-}
-
-int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, string *oid_rand)
-{
- string oid = obj_str;
- upload_id = s->info.args.get("uploadId");
- if (!oid_rand) {
- mp.init(oid, upload_id);
- } else {
- mp.init(oid, upload_id, *oid_rand);
- }
-
- part_num = s->info.args.get("partNumber");
- if (part_num.empty()) {
- ldpp_dout(s, 10) << "part number is empty" << dendl;
- return -EINVAL;
- }
-
- string err;
- uint64_t num = (uint64_t)strict_strtol(part_num.c_str(), 10, &err);
-
- if (!err.empty()) {
- ldpp_dout(s, 10) << "bad part number: " << part_num << ": " << err << dendl;
- return -EINVAL;
- }
-
- string upload_prefix = oid + ".";
-
- if (!oid_rand) {
- upload_prefix.append(upload_id);
- } else {
- upload_prefix.append(*oid_rand);
- }
-
- rgw_obj target_obj;
- target_obj.init(bucket, oid);
-
- manifest.set_prefix(upload_prefix);
-
- manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, num);
-
- int r = manifest_gen.create_begin(store->ctx(), &manifest, s->bucket_info.placement_rule, bucket, target_obj);
- if (r < 0) {
- return r;
- }
-
- cur_obj = manifest_gen.get_cur_obj(store);
- rgw_raw_obj_to_obj(bucket, cur_obj, &head_obj);
- head_obj.index_hash_source = obj_str;
-
- r = prepare_init(store, NULL);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
- const string& etag,
- real_time *mtime, real_time set_mtime,
- map<string, bufferlist>& attrs,
- real_time delete_at,
- const char *if_match,
- const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace)
-{
- complete_writing_data();
-
- RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj);
- op_target.set_versioning_disabled(true);
- RGWRados::Object::Write head_obj_op(&op_target);
-
- head_obj_op.meta.set_mtime = set_mtime;
- head_obj_op.meta.mtime = mtime;
- head_obj_op.meta.owner = s->owner.get_id();
- head_obj_op.meta.delete_at = delete_at;
- head_obj_op.meta.zones_trace = zones_trace;
- head_obj_op.meta.modify_tail = true;
-
- int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
- if (r < 0)
- return r;
-
- bufferlist bl;
- RGWUploadPartInfo info;
- string p = "part.";
- bool sorted_omap = is_v2_upload_id(upload_id);
-
- if (sorted_omap) {
- string err;
- int part_num_int = strict_strtol(part_num.c_str(), 10, &err);
- if (!err.empty()) {
- dout(10) << "bad part number specified: " << part_num << dendl;
- return -EINVAL;
- }
- char buf[32];
- snprintf(buf, sizeof(buf), "%08d", part_num_int);
- p.append(buf);
- } else {
- p.append(part_num);
- }
- info.num = atoi(part_num.c_str());
- info.etag = etag;
- info.size = obj_len;
- info.accounted_size = accounted_size;
- info.modified = real_clock::now();
- info.manifest = manifest;
-
- bool compressed;
- r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
- if (r < 0) {
- dout(1) << "cannot get compression info" << dendl;
- return r;
- }
-
- encode(info, bl);
-
- string multipart_meta_obj = mp.get_meta();
-
- rgw_obj meta_obj;
- meta_obj.init_ns(bucket, multipart_meta_obj, mp_ns);
- meta_obj.set_in_extra_data(true);
-
- rgw_raw_obj raw_meta_obj;
-
- store->obj_to_raw(s->bucket_info.placement_rule, meta_obj, &raw_meta_obj);
- const bool must_exist = true;// detect races with abort
- r = store->omap_set(raw_meta_obj, p, bl, must_exist);
- return r;
-}
-
void RGWPutObj::pre_exec()
{
rgw_bucket_object_pre_exec(s);
uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
};
-class RGWPutObj_Filter : public RGWPutObjDataProcessor
-{
-protected:
- RGWPutObjDataProcessor* next;
-public:
- explicit RGWPutObj_Filter(RGWPutObjDataProcessor* next) :
- next(next){}
- ~RGWPutObj_Filter() override {}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override {
- return next->handle_data(bl, ofs, phandle, pobj, again);
- }
- int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override {
- return next->throttle_data(handle, obj, size, need_to_wait);
- }
-}; /* RGWPutObj_Filter */
-
class RGWPostObj : public RGWOp {
protected:
off_t min_len;
extern rgw::IAM::Environment rgw_build_iam_environment(RGWRados* store,
struct req_state* s);
-static inline int put_data_and_throttle(RGWPutObjDataProcessor *processor,
- bufferlist& data, off_t ofs,
- bool need_to_wait)
-{
- bool again = false;
- do {
- void *handle = nullptr;
- rgw_raw_obj obj;
-
- uint64_t size = data.length();
-
- int ret = processor->handle_data(data, ofs, &handle, &obj, &again);
- if (ret < 0)
- return ret;
- if (handle != nullptr)
- {
- ret = processor->throttle_data(handle, obj, size, need_to_wait);
- if (ret < 0)
- return ret;
- }
- else
- break;
- need_to_wait = false; /* the need to wait only applies to the first
- * iteration */
- } while (again);
-
- return 0;
-} /* put_data_and_throttle */
-
-
-
-
static inline int get_system_versioning_params(req_state *s,
uint64_t *olh_epoch,
append_rand_alpha(cct, write_version.tag, write_version.tag, TAG_LEN);
}
-int RGWPutObjProcessor::complete(size_t accounted_size, const string& etag,
- real_time *mtime, real_time set_mtime,
- map<string, bufferlist>& attrs, real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data,
- rgw_zone_set *zones_trace)
-{
- int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data, zones_trace);
- if (r < 0)
- return r;
-
- is_complete = !canceled;
- return 0;
-}
-
-CephContext *RGWPutObjProcessor::ctx()
-{
- return store->ctx();
-}
-
-RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
-{
- drain_pending();
-
- if (is_complete)
- return;
-
- set<rgw_raw_obj>::iterator iter;
- bool need_to_remove_head = false;
- rgw_raw_obj raw_head;
-
- if (!head_obj.empty()) {
- store->obj_to_raw(bucket_info.placement_rule, head_obj, &raw_head);
- }
-
- /**
- * We should delete the object in the "multipart" namespace to avoid race condition.
- * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
- * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
- * written by the second upload may be deleted by the first upload.
- * details is describled on #11749
- *
- * The above comment still stands, but instead of searching for a specific object in the multipart
- * namespace, we just make sure that we remove the object that is marked as the head object after
- * we remove all the other raw objects. Note that we use different call to remove the head object,
- * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
- */
- for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
- const rgw_raw_obj& obj = *iter;
- if (!head_obj.empty() && obj == raw_head) {
- ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
- need_to_remove_head = true;
- continue;
- }
-
- int r = store->delete_raw_obj(obj);
- if (r < 0 && r != -ENOENT) {
- ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
- }
- }
-
- if (need_to_remove_head) {
- ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << raw_head << ")" << dendl;
- int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0);
- if (r < 0 && r != -ENOENT) {
- ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << raw_head << "), leaked" << dendl;
- }
- }
-}
-
-int RGWPutObjProcessor_Aio::handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive)
-{
- if ((uint64_t)abs_ofs + bl.length() > obj_len)
- obj_len = abs_ofs + bl.length();
-
- if (!(obj == last_written_obj)) {
- last_written_obj = obj;
- }
-
- // For the first call pass -1 as the offset to
- // do a write_full.
- return store->aio_put_obj_data(NULL, obj, bl, ((ofs != 0) ? ofs : -1), exclusive, phandle);
-}
-
-struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
-{
- struct put_obj_aio_info info;
- info = pending.front();
- pending.pop_front();
- pending_size -= info.size;
- return info;
-}
-
-int RGWPutObjProcessor_Aio::wait_pending_front()
-{
- if (pending.empty()) {
- return 0;
- }
- struct put_obj_aio_info info = pop_pending();
- int ret = store->aio_wait(info.handle);
-
- if (ret >= 0) {
- add_written_obj(info.obj);
- }
-
- return ret;
-}
-
-bool RGWPutObjProcessor_Aio::pending_has_completed()
-{
- if (pending.empty())
- return false;
-
- struct put_obj_aio_info& info = pending.front();
- return store->aio_completed(info.handle);
-}
-
-int RGWPutObjProcessor_Aio::drain_pending()
-{
- int ret = 0;
- while (!pending.empty()) {
- int r = wait_pending_front();
- if (r < 0)
- ret = r;
- }
- return ret;
-}
-
-int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait)
-{
- bool _wait = need_to_wait;
-
- if (handle) {
- struct put_obj_aio_info info;
- info.handle = handle;
- info.obj = obj;
- info.size = size;
- pending_size += size;
- pending.push_back(info);
- }
- size_t orig_size = pending_size;
-
- /* first drain complete IOs */
- while (pending_has_completed()) {
- int r = wait_pending_front();
- if (r < 0)
- return r;
-
- _wait = false;
- }
-
- /* resize window in case messages are draining too fast */
- if (orig_size - pending_size >= window_size) {
- window_size += store->ctx()->_conf->rgw_max_chunk_size;
- uint64_t max_window_size = store->ctx()->_conf->rgw_put_obj_max_window_size;
- if (window_size > max_window_size) {
- window_size = max_window_size;
- }
- }
-
- /* now throttle. Note that need_to_wait should only affect the first IO operation */
- if (pending_size > window_size || _wait) {
- int r = wait_pending_front();
- if (r < 0)
- return r;
- }
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool exclusive)
-{
- if (ofs >= next_part_ofs) {
- int r = prepare_next_part(ofs);
- if (r < 0) {
- return r;
- }
- }
-
- *pobj = cur_obj;
-
- if (!bl.length()) {
- *phandle = nullptr;
- return 0;
- }
-
- return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
-}
-
-int RGWPutObjProcessor_Aio::prepare(RGWRados *store, string *oid_rand)
-{
- RGWPutObjProcessor::prepare(store, oid_rand);
-
- window_size = store->ctx()->_conf->rgw_put_obj_min_window_size;
-
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again)
-{
- *phandle = NULL;
- uint64_t max_write_size = std::min(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
-
- pending_data_bl.claim_append(bl);
- if (pending_data_bl.length() < max_write_size) {
- *again = false;
- return 0;
- }
-
- pending_data_bl.splice(0, max_write_size, &bl);
-
- /* do we have enough data pending accumulated that needs to be written? */
- *again = (pending_data_bl.length() >= max_chunk_size);
-
- if (!data_ofs && !immutable_head()) {
- first_chunk.claim(bl);
- obj_len = (uint64_t)first_chunk.length();
- int r = prepare_next_part(obj_len);
- if (r < 0) {
- return r;
- }
- data_ofs = obj_len;
- return 0;
- }
- off_t write_ofs = data_ofs;
- data_ofs = write_ofs + bl.length();
- bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
- we could be racing with another upload, to the same
- object and cleanup can be messy */
- int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
- if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
- bl.clear();
- }
- return ret;
-}
-
-
-int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand)
-{
- RGWPutObjProcessor_Aio::prepare(store, oid_rand);
-
- int r = store->get_max_chunk_size(bucket_info.placement_rule, head_obj, &max_chunk_size);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, string *oid_rand)
-{
- head_obj.init(bucket, obj_str);
-
- int r = prepare_init(store, oid_rand);
- if (r < 0) {
- return r;
- }
-
- if (versioned_object) {
- if (!version_id.empty()) {
- head_obj.key.set_instance(version_id);
- } else {
- store->gen_rand_obj_instance_name(&head_obj);
- version_id = head_obj.key.get_instance();
- }
- }
-
- manifest.set_trivial_rule(max_chunk_size, store->ctx()->_conf->rgw_obj_stripe_size);
-
- r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, head_obj.bucket, head_obj);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::prepare_next_part(off_t ofs) {
-
- int ret = manifest_gen.create_next(ofs);
- if (ret < 0) {
- lderr(store->ctx()) << "ERROR: manifest_gen.create_next() returned ret=" << ret << dendl;
- return ret;
- }
- cur_part_ofs = ofs;
- next_part_ofs = ofs + manifest_gen.cur_stripe_max_size();
- cur_obj = manifest_gen.get_cur_obj(store);
-
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::complete_parts()
-{
- if (obj_len > (uint64_t)cur_part_ofs) {
- return prepare_next_part(obj_len);
- }
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::complete_writing_data()
-{
- if (!data_ofs && !immutable_head()) {
- /* only claim if pending_data_bl() is not empty. This is needed because we might be called twice
- * (e.g., when a retry due to race happens). So a second call to first_chunk.claim() would
- * clobber first_chunk
- */
- if (pending_data_bl.length() > 0) {
- first_chunk.claim(pending_data_bl);
- }
- obj_len = (uint64_t)first_chunk.length();
- }
- while (pending_data_bl.length()) {
- void *handle = nullptr;
- rgw_raw_obj obj;
- uint64_t max_write_size = std::min(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
- if (max_write_size > pending_data_bl.length()) {
- max_write_size = pending_data_bl.length();
- }
- bufferlist bl;
- pending_data_bl.splice(0, max_write_size, &bl);
- uint64_t write_len = bl.length();
- int r = write_data(bl, data_ofs, &handle, &obj, false);
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
- return r;
- }
- data_ofs += write_len;
- r = throttle_data(handle, obj, write_len, false);
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
- return r;
- }
-
- if (data_ofs >= next_part_ofs) {
- r = prepare_next_part(data_ofs);
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: prepare_next_part() returned " << r << dendl;
- return r;
- }
- }
- }
- int r = complete_parts();
- if (r < 0) {
- return r;
- }
-
- r = drain_pending();
- if (r < 0)
- return r;
-
- return 0;
-}
-
-int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string& etag,
- real_time *mtime, real_time set_mtime,
- map<string, bufferlist>& attrs,
- real_time delete_at,
- const char *if_match,
- const char *if_nomatch, const string *user_data,
- rgw_zone_set *zones_trace) {
- int r = complete_writing_data();
- if (r < 0)
- return r;
-
- obj_ctx.obj.set_atomic(head_obj);
-
- RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
-
- /* some object types shouldn't be versioned, e.g., multipart parts */
- op_target.set_versioning_disabled(!versioned_object);
-
- RGWRados::Object::Write obj_op(&op_target);
-
- obj_op.meta.data = &first_chunk;
- obj_op.meta.manifest = &manifest;
- obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
- obj_op.meta.if_match = if_match;
- obj_op.meta.if_nomatch = if_nomatch;
- obj_op.meta.mtime = mtime;
- obj_op.meta.set_mtime = set_mtime;
- obj_op.meta.owner = bucket_info.owner;
- obj_op.meta.flags = PUT_OBJ_CREATE;
- obj_op.meta.olh_epoch = olh_epoch;
- obj_op.meta.delete_at = delete_at;
- obj_op.meta.user_data = user_data;
- obj_op.meta.zones_trace = zones_trace;
- obj_op.meta.modify_tail = true;
-
- r = obj_op.write_meta(obj_len, accounted_size, attrs);
- if (r < 0) {
- return r;
- }
-
- canceled = obj_op.meta.canceled;
-
- return 0;
-}
int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
}
}; /* RGWChainedCacheImpl */
-/**
- * Base of PUT operation.
- * Allow to create chained data transformers like compresors and encryptors.
- */
-class RGWPutObjDataProcessor
-{
-public:
- RGWPutObjDataProcessor(){}
- virtual ~RGWPutObjDataProcessor(){}
- virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) = 0;
- virtual int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) = 0;
-}; /* RGWPutObjDataProcessor */
-
-
-class RGWPutObjProcessor : public RGWPutObjDataProcessor
-{
-protected:
- RGWRados *store;
- RGWObjectCtx& obj_ctx;
- bool is_complete;
- RGWBucketInfo bucket_info;
- bool canceled;
-
- virtual int do_complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data,
- rgw_zone_set* zones_trace = nullptr) = 0;
-
-public:
- RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL),
- obj_ctx(_obj_ctx),
- is_complete(false),
- bucket_info(_bi),
- canceled(false) {}
- ~RGWPutObjProcessor() override {}
- virtual int prepare(RGWRados *_store, string *oid_rand) {
- store = _store;
- return 0;
- }
-
- int complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match = NULL, const char *if_nomatch = NULL, const string *user_data = nullptr,
- rgw_zone_set *zones_trace = nullptr);
-
- CephContext *ctx();
-
- bool is_canceled() { return canceled; }
-}; /* RGWPutObjProcessor */
-
-struct put_obj_aio_info {
- void *handle;
- rgw_raw_obj obj;
- uint64_t size;
-};
-
-#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)
-
-class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
-{
- list<struct put_obj_aio_info> pending;
- uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
- uint64_t pending_size{0};
-
- struct put_obj_aio_info pop_pending();
- int wait_pending_front();
- bool pending_has_completed();
-
- rgw_raw_obj last_written_obj;
-
-protected:
- uint64_t obj_len{0};
-
- set<rgw_raw_obj> written_objs;
- rgw_obj head_obj;
-
- void add_written_obj(const rgw_raw_obj& obj) {
- written_objs.insert(obj);
- }
-
- int drain_pending();
- int handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
-
-public:
- int prepare(RGWRados *store, string *oid_rand) override;
- int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override;
-
- RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
- ~RGWPutObjProcessor_Aio() override;
-}; /* RGWPutObjProcessor_Aio */
-
-class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
-{
- bufferlist first_chunk;
- uint64_t part_size;
- off_t cur_part_ofs;
- off_t next_part_ofs;
- int cur_part_id;
- off_t data_ofs;
-
- bufferlist pending_data_bl;
- uint64_t max_chunk_size;
-
- bool versioned_object;
- std::optional<uint64_t> olh_epoch;
- string version_id;
-
-protected:
- rgw_bucket bucket;
- string obj_str;
-
- string unique_tag;
-
- rgw_raw_obj cur_obj;
- RGWObjManifest manifest;
- RGWObjManifest::generator manifest_gen;
-
- int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool exclusive);
- int do_complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace) override;
-
- int prepare_next_part(off_t ofs);
- int complete_parts();
- int complete_writing_data();
-
- int prepare_init(RGWRados *store, string *oid_rand);
-
-public:
- ~RGWPutObjProcessor_Atomic() override {}
- RGWPutObjProcessor_Atomic(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info,
- rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) :
- RGWPutObjProcessor_Aio(obj_ctx, bucket_info),
- part_size(_p),
- cur_part_ofs(0),
- next_part_ofs(_p),
- cur_part_id(0),
- data_ofs(0),
- max_chunk_size(0),
- versioned_object(versioned),
- bucket(_b),
- obj_str(_o),
- unique_tag(_t) {}
- int prepare(RGWRados *store, string *oid_rand) override;
- virtual bool immutable_head() { return false; }
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override;
-
- void set_olh_epoch(uint64_t epoch) {
- olh_epoch = epoch;
- }
-
- void set_version_id(const string& vid) {
- version_id = vid;
- }
-
- const string& get_version_id() const {
- return version_id;
- }
-}; /* RGWPutObjProcessor_Atomic */
#define MP_META_SUFFIX ".meta"
}
};
-class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
-{
- string part_num;
- RGWMPObj mp;
- req_state *s;
- string upload_id;
-
-protected:
- int prepare(RGWRados *store, string *oid_rand) override;
- int do_complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data,
- rgw_zone_set *zones_trace) override;
-public:
- bool immutable_head() override { return true; }
- RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, uint64_t _p, req_state *_s) :
- RGWPutObjProcessor_Atomic(obj_ctx, bucket_info, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
- void get_mp(RGWMPObj** _mp);
-}; /* RGWPutObjProcessor_Multipart */
class RGWRadosThread {
class Worker : public Thread {