RGWMultiCompleteUpload *parts;
RGWMultiXMLParser parser;
std::unique_ptr<rgw::sal::MultipartUpload> upload;
- rgw::sal::Attrs attrs;
off_t ofs = 0;
- bufferlist etag_bl;
std::unique_ptr<rgw::sal::Object> meta_obj;
std::unique_ptr<rgw::sal::Object> target_obj;
- RGWObjManifest manifest;
uint64_t olh_epoch = 0;
op_ret = get_params(y);
list<rgw_obj_index_key> remove_objs; /* objects to be removed from index listing */
- bool versioned_object = s->bucket->versioning_enabled();
-
meta_obj = upload->get_meta_obj();
meta_obj->set_in_extra_data(true);
meta_obj->set_hash_source(s->object->get_name());
<< " ret=" << op_ret << dendl;
return;
}
- attrs = meta_obj->get_attrs();
// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
return;
}
- op_ret = upload->complete(this, s->cct, etag, manifest, parts->parts, remove_objs, accounted_size, compressed, cs_info, ofs);
- if (op_ret < 0) {
- ldpp_dout(this, 0) << "ERROR: upload complete failed ret=" << op_ret << dendl;
- return;
- }
-
- etag_bl.append(etag);
-
- attrs[RGW_ATTR_ETAG] = etag_bl;
-
- if (compressed) {
- // write compression attribute to full object
- bufferlist tmp;
- encode(cs_info, tmp);
- attrs[RGW_ATTR_COMPRESSION] = tmp;
- }
-
target_obj = s->bucket->get_object(rgw_obj_key(s->object->get_name()));
- if (versioned_object) {
+ if (s->bucket->versioning_enabled()) {
if (!version_id.empty()) {
target_obj->set_instance(version_id);
} else {
version_id = target_obj->get_instance();
}
}
+ target_obj->set_attrs(meta_obj->get_attrs());
- RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-
- target_obj->set_atomic(&obj_ctx);
-
- std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = target_obj->get_write_op(&obj_ctx);
-
- obj_op->params.manifest = &manifest;
- obj_op->params.remove_objs = &remove_objs;
-
- obj_op->params.ptag = &s->req_id; /* use req_id as operation tag */
- obj_op->params.owner = s->owner;
- obj_op->params.flags = PUT_OBJ_CREATE;
- obj_op->params.modify_tail = true;
- obj_op->params.completeMultipart = true;
- obj_op->params.olh_epoch = olh_epoch;
- obj_op->params.attrs = &attrs;
- op_ret = obj_op->prepare(s->yield);
- if (op_ret < 0)
- return;
-
- op_ret = obj_op->write_meta(this, ofs, accounted_size, s->yield);
- if (op_ret < 0)
+ op_ret = upload->complete(this, y, s->cct, parts->parts, remove_objs, accounted_size, compressed, cs_info, ofs, s->req_id, s->owner, olh_epoch, target_obj.get(), s->obj_ctx);
+ if (op_ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: upload complete failed ret=" << op_ret << dendl;
return;
+ }
// remove the upload meta object ; the meta object is not versioned
// when the bucket is, as that would add an unneeded delete marker
head_obj->set_atomic(&obj_ctx);
- std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
+ RGWRados::Object op_target(store->getRados(),
+ head_obj->get_bucket()->get_info(),
+ obj_ctx, head_obj->get_obj());
+ RGWRados::Object::Write obj_op(&op_target);
/* some object types shouldn't be versioned, e.g., multipart parts */
- obj_op->params.versioning_disabled = !head_obj->get_bucket()->versioning_enabled();
- obj_op->params.data = &first_chunk;
- obj_op->params.manifest = &manifest;
- obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */
- obj_op->params.if_match = if_match;
- obj_op->params.if_nomatch = if_nomatch;
- obj_op->params.mtime = mtime;
- obj_op->params.set_mtime = set_mtime;
- obj_op->params.owner = ACLOwner(owner);
- obj_op->params.flags = PUT_OBJ_CREATE;
- obj_op->params.olh_epoch = olh_epoch;
- obj_op->params.delete_at = delete_at;
- obj_op->params.user_data = user_data;
- obj_op->params.zones_trace = zones_trace;
- obj_op->params.modify_tail = true;
- obj_op->params.attrs = &attrs;
-
- r = obj_op->prepare(y);
+ op_target.set_versioning_disabled(!head_obj->get_bucket()->versioning_enabled());
+ obj_op.meta.data = &first_chunk;
+ obj_op.meta.manifest = &manifest;
+ obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
+ obj_op.meta.if_match = if_match;
+ obj_op.meta.if_nomatch = if_nomatch;
+ obj_op.meta.mtime = mtime;
+ obj_op.meta.set_mtime = set_mtime;
+ obj_op.meta.owner = owner;
+ obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.olh_epoch = olh_epoch;
+ obj_op.meta.delete_at = delete_at;
+ obj_op.meta.user_data = user_data;
+ obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
+
+ r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
if (r < 0) {
return r;
}
-
- r = obj_op->write_meta(dpp, actual_size, accounted_size, y);
- if (r < 0) {
- return r;
- }
- if (!obj_op->params.canceled) {
+ if (!obj_op.meta.canceled) {
// on success, clear the set of objects for deletion
writer.clear_written();
}
if (pcanceled) {
- *pcanceled = obj_op->params.canceled;
+ *pcanceled = obj_op.meta.canceled;
}
return 0;
}
return r;
}
- std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
+ RGWRados::Object op_target(store->getRados(),
+ head_obj->get_bucket()->get_info(),
+ obj_ctx, head_obj->get_obj());
+ RGWRados::Object::Write obj_op(&op_target);
- obj_op->params.versioning_disabled = true;
- obj_op->params.set_mtime = set_mtime;
- obj_op->params.mtime = mtime;
- obj_op->params.owner = ACLOwner(owner);
- obj_op->params.delete_at = delete_at;
- obj_op->params.zones_trace = zones_trace;
- obj_op->params.modify_tail = true;
- obj_op->params.attrs = &attrs;
- obj_op->params.pmeta_placement_rule = &tail_placement_rule;
- r = obj_op->prepare(y);
- if (r < 0) {
- return r;
- }
+ op_target.set_versioning_disabled(true);
+ op_target.set_meta_placement_rule(&tail_placement_rule);
+ obj_op.meta.set_mtime = set_mtime;
+ obj_op.meta.mtime = mtime;
+ obj_op.meta.owner = owner;
+ obj_op.meta.delete_at = delete_at;
+ obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
- r = obj_op->write_meta(dpp, actual_size, accounted_size, y);
+ r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
if (r < 0)
return r;
return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
}
- if (!obj_op->params.canceled) {
+ if (!obj_op.meta.canceled) {
// on success, clear the set of objects for deletion
writer.clear_written();
}
if (pcanceled) {
- *pcanceled = obj_op->params.canceled;
+ *pcanceled = obj_op.meta.canceled;
}
return 0;
}
return r;
}
head_obj->set_atomic(&obj_ctx);
- std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
+ RGWRados::Object op_target(store->getRados(),
+ head_obj->get_bucket()->get_info(),
+ obj_ctx, head_obj->get_obj());
+ RGWRados::Object::Write obj_op(&op_target);
//For Append obj, disable versioning
- obj_op->params.versioning_disabled = true;
+ op_target.set_versioning_disabled(true);
if (cur_manifest) {
cur_manifest->append(dpp, manifest, store->get_zone());
- obj_op->params.manifest = cur_manifest;
+ obj_op.meta.manifest = cur_manifest;
} else {
- obj_op->params.manifest = &manifest;
- }
- obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */
- obj_op->params.mtime = mtime;
- obj_op->params.set_mtime = set_mtime;
- obj_op->params.owner = ACLOwner(owner);
- obj_op->params.flags = PUT_OBJ_CREATE;
- obj_op->params.delete_at = delete_at;
- obj_op->params.user_data = user_data;
- obj_op->params.zones_trace = zones_trace;
- obj_op->params.modify_tail = true;
- obj_op->params.appendable = true;
- obj_op->params.attrs = &attrs;
+ obj_op.meta.manifest = &manifest;
+ }
+ obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
+ obj_op.meta.mtime = mtime;
+ obj_op.meta.set_mtime = set_mtime;
+ obj_op.meta.owner = owner;
+ obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.delete_at = delete_at;
+ obj_op.meta.user_data = user_data;
+ obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
+ obj_op.meta.appendable = true;
//Add the append part number
bufferlist cur_part_num_bl;
using ceph::encode;
etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
attrs[RGW_ATTR_ETAG] = etag_bl;
}
- r = obj_op->prepare(y);
- if (r < 0) {
- return r;
- }
- r = obj_op->write_meta(dpp, actual_size + cur_size, accounted_size + *cur_accounted_size, y);
+ r = obj_op.write_meta(dpp, actual_size + cur_size,
+ accounted_size + *cur_accounted_size,
+ attrs, y);
if (r < 0) {
return r;
}
- if (!obj_op->params.canceled) {
+ if (!obj_op.meta.canceled) {
// on success, clear the set of objects for deletion
writer.clear_written();
}
if (pcanceled) {
- *pcanceled = obj_op->params.canceled;
+ *pcanceled = obj_op.meta.canceled;
}
*cur_accounted_size += accounted_size;
virtual int get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) = 0;
};
- struct WriteOp {
- struct Params {
- bool versioning_disabled{false};
- ceph::real_time* mtime{nullptr};
- Attrs* rmattrs{nullptr};
- const bufferlist* data{nullptr};
- RGWObjManifest* manifest{nullptr};
- const std::string* ptag{nullptr};
- std::list<rgw_obj_index_key>* remove_objs{nullptr};
- ceph::real_time set_mtime;
- ACLOwner owner;
- RGWObjCategory category{RGWObjCategory::Main};
- int flags{0};
- const char* if_match{nullptr};
- const char* if_nomatch{nullptr};
- std::optional<uint64_t> olh_epoch;
- ceph::real_time delete_at;
- bool canceled{false};
- const std::string* user_data{nullptr};
- rgw_zone_set* zones_trace{nullptr};
- bool modify_tail{false};
- bool completeMultipart{false};
- bool appendable{false};
- Attrs* attrs{nullptr};
- // In MultipartObjectProcessor::complete, we need this parameter
- // to tell the exact placement rule since it may be different from
- // bucket.placement_rule when Storage Class is specified explicitly
- const rgw_placement_rule *pmeta_placement_rule{nullptr};
- } params;
-
- virtual ~WriteOp() = default;
-
- virtual int prepare(optional_yield y) = 0;
- virtual int write_meta(const DoutPrefixProvider* dpp, uint64_t size, uint64_t accounted_size, optional_yield y) = 0;
- //virtual int write_data(const char* data, uint64_t ofs, uint64_t len, bool exclusive) = 0;
- };
-
struct DeleteOp {
struct Params {
ACLOwner bucket_owner;
Attrs& get_attrs(void) { return attrs; }
const Attrs& get_attrs(void) const { return attrs; }
+ virtual int set_attrs(Attrs a) { attrs = a; return 0; }
ceph::real_time get_mtime(void) const { return mtime; }
uint64_t get_obj_size(void) const { return obj_size; }
Bucket* get_bucket(void) const { return bucket; }
/* OPs */
virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx*) = 0;
- virtual std::unique_ptr<WriteOp> get_write_op(RGWObjectCtx*) = 0;
virtual std::unique_ptr<DeleteOp> get_delete_op(RGWObjectCtx*) = 0;
virtual std::unique_ptr<StatOp> get_stat_op(RGWObjectCtx*) = 0;
bool assume_unsorted = false) = 0;
virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
RGWObjectCtx* obj_ctx) = 0;
- virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct,
- std::string& etag, RGWObjManifest& manifest,
+ virtual int complete(const DoutPrefixProvider* dpp,
+ optional_yield y, CephContext* cct,
std::map<int, std::string>& part_etags,
std::list<rgw_obj_index_key>& remove_objs,
uint64_t& accounted_size, bool& compressed,
- RGWCompressionInfo& cs_info, off_t& ofs) = 0;
+ RGWCompressionInfo& cs_info, off_t& ofs,
+ std::string& tag, ACLOwner& owner,
+ uint64_t olh_epoch,
+ rgw::sal::Object* target_obj,
+ RGWObjectCtx* obj_ctx) = 0;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0;
return parent_op.iterate(dpp, ofs, end, cb, y);
}
-std::unique_ptr<Object::WriteOp> RadosObject::get_write_op(RGWObjectCtx* ctx)
-{
- return std::unique_ptr<Object::WriteOp>(new RadosObject::RadosWriteOp(this, ctx));
-}
-
-RadosObject::RadosWriteOp::RadosWriteOp(RadosObject* _source, RGWObjectCtx* _rctx) :
- source(_source),
- rctx(_rctx),
- op_target(_source->store->getRados(),
- _source->get_bucket()->get_info(),
- *static_cast<RGWObjectCtx *>(rctx),
- _source->get_obj()),
- parent_op(&op_target)
-{ }
-
-int RadosObject::RadosWriteOp::prepare(optional_yield y)
-{
- op_target.set_versioning_disabled(params.versioning_disabled);
- op_target.set_meta_placement_rule(params.pmeta_placement_rule);
- parent_op.meta.mtime = params.mtime;
- parent_op.meta.rmattrs = params.rmattrs;
- parent_op.meta.data = params.data;
- parent_op.meta.manifest = params.manifest;
- parent_op.meta.ptag = params.ptag;
- parent_op.meta.remove_objs = params.remove_objs;
- parent_op.meta.set_mtime = params.set_mtime;
- parent_op.meta.owner = params.owner.get_id();
- parent_op.meta.category = params.category;
- parent_op.meta.flags = params.flags;
- parent_op.meta.if_match = params.if_match;
- parent_op.meta.if_nomatch = params.if_nomatch;
- parent_op.meta.olh_epoch = params.olh_epoch;
- parent_op.meta.delete_at = params.delete_at;
- parent_op.meta.canceled = params.canceled;
- parent_op.meta.user_data = params.user_data;
- parent_op.meta.zones_trace = params.zones_trace;
- parent_op.meta.modify_tail = params.modify_tail;
- parent_op.meta.completeMultipart = params.completeMultipart;
- parent_op.meta.appendable = params.appendable;
-
- return 0;
-}
-
-int RadosObject::RadosWriteOp::write_meta(const DoutPrefixProvider* dpp, uint64_t size, uint64_t accounted_size, optional_yield y)
-{
- int ret = parent_op.write_meta(dpp, size, accounted_size, *params.attrs, y);
- params.canceled = parent_op.meta.canceled;
-
- return ret;
-}
-
int RadosObject::swift_versioning_restore(RGWObjectCtx* obj_ctx,
bool& restored,
const DoutPrefixProvider* dpp)
obj->set_in_extra_data(true);
obj->set_hash_source(oid);
- std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = obj->get_write_op(obj_ctx);
+ RGWRados::Object op_target(store->getRados(),
+ obj->get_bucket()->get_info(),
+ *obj_ctx, obj->get_obj());
+ RGWRados::Object::Write obj_op(&op_target);
- obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */
- obj_op->params.owner = owner;
- obj_op->params.category = RGWObjCategory::MultiMeta;
- obj_op->params.flags = PUT_OBJ_CREATE_EXCL;
- obj_op->params.mtime = &mtime;
- obj_op->params.attrs = &attrs;
+ op_target.set_versioning_disabled(true); /* no versioning for multipart meta */
+ obj_op.meta.owner = owner.get_id();
+ obj_op.meta.category = RGWObjCategory::MultiMeta;
+ obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
+ obj_op.meta.mtime = &mtime;
multipart_upload_info upload_info;
upload_info.dest_placement = dest_placement;
bufferlist bl;
encode(upload_info, bl);
- obj_op->params.data = &bl;
+ obj_op.meta.data = &bl;
- ret = obj_op->prepare(y);
-
- ret = obj_op->write_meta(dpp, bl.length(), 0, y);
+ ret = obj_op.write_meta(dpp, bl.length(), 0, attrs, y);
} while (ret == -EEXIST);
return ret;
return 0;
}
-int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, CephContext* cct,
- std::string& etag, RGWObjManifest& manifest,
+int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
+ optional_yield y, CephContext* cct,
map<int, string>& part_etags,
list<rgw_obj_index_key>& remove_objs,
uint64_t& accounted_size, bool& compressed,
- RGWCompressionInfo& cs_info, off_t& ofs)
+ RGWCompressionInfo& cs_info, off_t& ofs,
+ std::string& tag, ACLOwner& owner,
+ uint64_t olh_epoch,
+ rgw::sal::Object* target_obj,
+ RGWObjectCtx* obj_ctx)
{
char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+ std::string etag;
+ bufferlist etag_bl;
MD5 hash;
bool truncated;
int ret;
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
+ rgw::sal::Attrs attrs = target_obj->get_attrs();
do {
ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
etag = final_etag_str;
ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl;
+ etag_bl.append(etag);
+
+ attrs[RGW_ATTR_ETAG] = etag_bl;
+
+ if (compressed) {
+ // write compression attribute to full object
+ bufferlist tmp;
+ encode(cs_info, tmp);
+ attrs[RGW_ATTR_COMPRESSION] = tmp;
+ }
+
+ target_obj->set_atomic(obj_ctx);
+
+ RGWRados::Object op_target(store->getRados(),
+ target_obj->get_bucket()->get_info(),
+ *obj_ctx, target_obj->get_obj());
+ RGWRados::Object::Write obj_op(&op_target);
+
+ obj_op.meta.manifest = &manifest;
+ obj_op.meta.remove_objs = &remove_objs;
+
+ obj_op.meta.ptag = &tag; /* use req_id as operation tag */
+ obj_op.meta.owner = owner.get_id();
+ obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.modify_tail = true;
+ obj_op.meta.completeMultipart = true;
+ obj_op.meta.olh_epoch = olh_epoch;
+
+ ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs, y);
+ if (ret < 0)
+ return ret;
+
return ret;
}
virtual int get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) override;
};
- struct RadosWriteOp : public WriteOp {
- private:
- RadosObject* source;
- RGWObjectCtx* rctx;
- RGWRados::Object op_target;
- RGWRados::Object::Write parent_op;
-
- public:
- RadosWriteOp(RadosObject* _source, RGWObjectCtx* _rctx);
-
- virtual int prepare(optional_yield y) override;
- virtual int write_meta(const DoutPrefixProvider* dpp, uint64_t size, uint64_t accounted_size, optional_yield y) override;
- //virtual int write_data(const char* data, uint64_t ofs, uint64_t len, bool exclusive) override;
- };
-
struct RadosDeleteOp : public DeleteOp {
private:
RadosObject* source;
/* OPs */
virtual std::unique_ptr<ReadOp> get_read_op(RGWObjectCtx *) override;
- virtual std::unique_ptr<WriteOp> get_write_op(RGWObjectCtx *) override;
virtual std::unique_ptr<DeleteOp> get_delete_op(RGWObjectCtx*) override;
virtual std::unique_ptr<StatOp> get_stat_op(RGWObjectCtx*) override;
RGWMPObj mp_obj;
ceph::real_time mtime;
rgw_placement_rule placement;
+ RGWObjManifest manifest;
public:
RadosMultipartUpload(RadosStore* _store, Bucket* _bucket, const std::string& oid, std::optional<std::string> upload_id, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), mtime(_mtime) {}
bool assume_unsorted = false) override;
virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
RGWObjectCtx* obj_ctx) override;
- virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct,
- std::string& etag, RGWObjManifest& manifest,
+ virtual int complete(const DoutPrefixProvider* dpp,
+ optional_yield y, CephContext* cct,
std::map<int, std::string>& part_etags,
std::list<rgw_obj_index_key>& remove_objs,
uint64_t& accounted_size, bool& compressed,
- RGWCompressionInfo& cs_info, off_t& ofs) override;
+ RGWCompressionInfo& cs_info, off_t& ofs,
+ std::string& tag, ACLOwner& owner,
+ uint64_t olh_epoch,
+ rgw::sal::Object* target_obj,
+ RGWObjectCtx* obj_ctx) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
optional_yield y,