Create a MultipartUpload object in the Zipper API.
Signed-off-by: Daniel Gryniewicz <dang@redhat.com>
auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
auto& [rule, obj] = wt;
- RGWMPObj mp_obj;
if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
rgw_obj_key key(obj.key);
- if (!mp_obj.from_meta(key.name)) {
- return;
- }
+ std::unique_ptr<rgw::sal::MultipartUpload> mpu = store->get_multipart_upload(target, key.name);
RGWObjectCtx rctx(store);
- int ret = abort_multipart_upload(this, store, cct, &rctx, target, mp_obj);
+ int ret = mpu->abort(this, cct, &rctx);
if (ret == 0) {
if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
(strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0);
}
-int list_multipart_parts(const DoutPrefixProvider *dpp, rgw::sal::Bucket* bucket,
- CephContext *cct,
- const string& upload_id,
- const string& meta_oid, int num_parts,
- int marker, map<uint32_t, RGWUploadPartInfo>& parts,
- int *next_marker, bool *truncated,
- bool assume_unsorted)
-{
- map<string, bufferlist> parts_map;
- map<string, bufferlist>::iterator iter;
-
- std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
- rgw_obj_key(meta_oid, std::string(), RGW_OBJ_NS_MULTIPART));
- obj->set_in_extra_data(true);
-
- bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
-
- parts.clear();
-
- int ret;
- if (sorted_omap) {
- string p;
- p = "part.";
- char buf[32];
-
- snprintf(buf, sizeof(buf), "%08d", marker);
- p.append(buf);
-
- ret = obj->omap_get_vals(dpp, p, num_parts + 1, &parts_map,
- nullptr, null_yield);
- } else {
- ret = obj->omap_get_all(dpp, &parts_map, null_yield);
- }
- if (ret < 0) {
- return ret;
- }
-
- int i;
- int last_num = 0;
-
- uint32_t expected_next = marker + 1;
-
- for (i = 0, iter = parts_map.begin();
- (i < num_parts || !sorted_omap) && iter != parts_map.end();
- ++iter, ++i) {
- bufferlist& bl = iter->second;
- auto bli = bl.cbegin();
- RGWUploadPartInfo info;
- try {
- decode(info, bli);
- } catch (buffer::error& err) {
- ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" <<
- dendl;
- return -EIO;
- }
- if (sorted_omap) {
- if (info.num != expected_next) {
- /* ouch, we expected a specific part num here, but we got a
- * different one. Either a part is missing, or it could be a
- * case of mixed rgw versions working on the same upload,
- * where one gateway doesn't support correctly sorted omap
- * keys for multipart upload just assume data is unsorted.
- */
- return list_multipart_parts(dpp, bucket, cct, upload_id,
- meta_oid, num_parts, marker, parts,
- next_marker, truncated, true);
- }
- expected_next++;
- }
- if (sorted_omap ||
- (int)info.num > marker) {
- parts[info.num] = info;
- last_num = info.num;
- }
- }
-
- if (sorted_omap) {
- if (truncated) {
- *truncated = (iter != parts_map.end());
- }
- } else {
- /* rebuild a map with only num_parts entries */
- map<uint32_t, RGWUploadPartInfo> new_parts;
- map<uint32_t, RGWUploadPartInfo>::iterator piter;
- for (i = 0, piter = parts.begin();
- i < num_parts && piter != parts.end();
- ++i, ++piter) {
- new_parts[piter->first] = piter->second;
- last_num = piter->first;
- }
-
- if (truncated) {
- *truncated = (piter != parts.end());
- }
-
- parts.swap(new_parts);
- }
-
- if (next_marker) {
- *next_marker = last_num;
- }
-
- return 0;
-}
-
-int list_multipart_parts(const DoutPrefixProvider *dpp, struct req_state *s,
- const string& upload_id,
- const string& meta_oid, int num_parts,
- int marker, map<uint32_t, RGWUploadPartInfo>& parts,
- int *next_marker, bool *truncated,
- bool assume_unsorted)
-{
- return list_multipart_parts(dpp, s->bucket.get(), s->cct, upload_id,
- meta_oid, num_parts, marker, parts,
- next_marker, truncated, assume_unsorted);
-}
-
-int abort_multipart_upload(const DoutPrefixProvider *dpp,
- rgw::sal::Store* store, CephContext *cct,
- RGWObjectCtx *obj_ctx, rgw::sal::Bucket* bucket,
- RGWMPObj& mp_obj)
-{
- std::unique_ptr<rgw::sal::Object> meta_obj = bucket->get_object(
- rgw_obj_key(mp_obj.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
- meta_obj->set_in_extra_data(true);
- meta_obj->set_hash_source(mp_obj.get_key());
- std::unique_ptr<rgw::sal::GCChain> chain = store->get_gc_chain(meta_obj.get());
- list<rgw_obj_index_key> remove_objs;
- map<uint32_t, RGWUploadPartInfo> obj_parts;
- bool truncated;
- int marker = 0;
- int ret;
- uint64_t parts_accounted_size = 0;
-
- do {
- ret = list_multipart_parts(dpp, bucket, cct,
- mp_obj.get_upload_id(), mp_obj.get_meta(),
- 1000, marker, obj_parts, &marker, &truncated);
- if (ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << ": list_multipart_parts returned " <<
- ret << dendl;
- return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
- }
-
- for (auto obj_iter = obj_parts.begin();
- obj_iter != obj_parts.end();
- ++obj_iter) {
- RGWUploadPartInfo& obj_part = obj_iter->second;
- if (obj_part.manifest.empty()) {
- string oid = mp_obj.get_part(obj_iter->second.num);
- std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
- rgw_obj_key(oid, std::string(), RGW_OBJ_NS_MULTIPART));
- obj->set_hash_source(mp_obj.get_key());
- ret = obj->delete_object(dpp, obj_ctx, null_yield);
- if (ret < 0 && ret != -ENOENT)
- return ret;
- } else {
- chain->update(dpp, &obj_part.manifest);
- RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin(dpp);
- if (oiter != obj_part.manifest.obj_end(dpp)) {
- std::unique_ptr<rgw::sal::Object> head = bucket->get_object(rgw_obj_key());
- rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
- head->raw_obj_to_obj(raw_head);
-
- rgw_obj_index_key key;
- head->get_key().get_index_key(&key);
- remove_objs.push_back(key);
- }
- }
- parts_accounted_size += obj_part.accounted_size;
- }
- } while (truncated);
-
- /* use upload id as tag and do it synchronously */
- ret = chain->send(mp_obj.get_upload_id());
- if (ret < 0) {
- ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
- if (ret == -ENOENT) {
- return -ERR_NO_SUCH_UPLOAD;
- }
- //Delete objects inline if send chain to gc fails
- chain->delete_inline(dpp, mp_obj.get_upload_id());
- }
-
- std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
- del_op->params.bucket_owner = bucket->get_acl_owner();
- del_op->params.versioning_status = 0;
- if (!remove_objs.empty()) {
- del_op->params.remove_objs = &remove_objs;
- }
-
- del_op->params.abortmp = true;
- del_op->params.parts_accounted_size = parts_accounted_size;
-
- // and also remove the metadata obj
- ret = del_op->delete_obj(dpp, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
- ret << dendl;
- }
- return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
-}
-
-int list_bucket_multiparts(const DoutPrefixProvider *dpp,
- rgw::sal::Bucket* bucket,
- const string& prefix, const string& marker,
- const string& delim,
- const int& max_uploads,
- vector<rgw_bucket_dir_entry> *objs,
- map<string, bool> *common_prefixes, bool *is_truncated)
-{
- rgw::sal::Bucket::ListParams params;
- rgw::sal::Bucket::ListResults results;
- MultipartMetaFilter mp_filter;
-
- params.prefix = prefix;
- params.delim = delim;
- params.marker = marker;
- params.ns = RGW_OBJ_NS_MULTIPART;
- params.filter = &mp_filter;
-
- int ret = bucket->list(dpp, params, max_uploads, results, null_yield);
-
- if (ret < 0)
- return ret;
-
- *objs = std::move(results.objs);
- *common_prefixes = std::move(results.common_prefixes);
- *is_truncated = results.is_truncated;
-
- return ret;
-}
-
-int abort_bucket_multiparts(const DoutPrefixProvider *dpp,
- rgw::sal::Store* store, CephContext *cct,
- rgw::sal::Bucket* bucket, string& prefix, string& delim)
-{
- constexpr int max = 1000;
- int ret, num_deleted = 0;
- vector<rgw_bucket_dir_entry> objs;
- RGWObjectCtx obj_ctx(store);
- string marker;
- bool is_truncated;
-
- do {
- ret = list_bucket_multiparts(dpp, bucket, prefix, marker, delim,
- max, &objs, nullptr, &is_truncated);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << __func__ <<
- " ERROR : calling list_bucket_multiparts; ret=" << ret <<
- "; bucket=\"" << bucket << "\"; prefix=\"" <<
- prefix << "\"; delim=\"" << delim << "\"" << dendl;
- return ret;
- }
- ldpp_dout(dpp, 20) << __func__ <<
- " INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
- bucket << "\"; objs.size()=" << objs.size() <<
- "; is_truncated=" << is_truncated << dendl;
-
- if (!objs.empty()) {
- RGWMPObj mp;
- for (const auto& obj : objs) {
- rgw_obj_key key(obj.key);
- if (!mp.from_meta(key.name))
- continue;
- ret = abort_multipart_upload(dpp, store, cct, &obj_ctx, bucket, mp);
- if (ret < 0) {
- // we're doing a best-effort; if something cannot be found,
- // log it and keep moving forward
- if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(dpp, 0) << __func__ <<
- " ERROR : failed to abort and clean-up multipart upload \"" <<
- key.get_oid() << "\"" << dendl;
- return ret;
- } else {
- ldpp_dout(dpp, 10) << __func__ <<
- " NOTE : unable to find part(s) of "
- "aborted multipart upload of \"" << key.get_oid() <<
- "\" for cleaning up" << dendl;
- }
- }
- num_deleted++;
- }
- if (num_deleted) {
- ldpp_dout(dpp, 0) << __func__ <<
- " WARNING : aborted " << num_deleted <<
- " incomplete multipart uploads" << dendl;
- }
- }
- } while (is_truncated);
-
- return 0;
-}
extern bool is_v2_upload_id(const string& upload_id);
-extern int list_multipart_parts(const DoutPrefixProvider *dpp,
- rgw::sal::Bucket* bucket,
- CephContext *cct,
- const string& upload_id,
- const string& meta_oid, int num_parts,
- int marker, map<uint32_t, RGWUploadPartInfo>& parts,
- int *next_marker, bool *truncated,
- bool assume_unsorted = false);
-
-extern int list_multipart_parts(const DoutPrefixProvider *dpp,
- struct req_state *s,
- const string& upload_id,
- const string& meta_oid, int num_parts,
- int marker, map<uint32_t, RGWUploadPartInfo>& parts,
- int *next_marker, bool *truncated,
- bool assume_unsorted = false);
-
-extern int abort_multipart_upload(const DoutPrefixProvider *dpp, rgw::sal::Store* store,
- CephContext *cct, RGWObjectCtx *obj_ctx,
- rgw::sal::Bucket* bucket, RGWMPObj& mp_obj);
-
-extern int list_bucket_multiparts(const DoutPrefixProvider *dpp,
- rgw::sal::Bucket* bucket,
- const string& prefix,
- const string& marker,
- const string& delim,
- const int& max_uploads,
- vector<rgw_bucket_dir_entry> *objs,
- map<string, bool> *common_prefixes, bool *is_truncated);
-
-extern int abort_bucket_multiparts(const DoutPrefixProvider *dpp,
- rgw::sal::Store* store, CephContext *cct,
- rgw::sal::Bucket* bucket,
- string& prefix, string& delim);
#endif
return policies;
}
-static int get_obj_head(const DoutPrefixProvider *dpp,
- struct req_state *s,
- rgw::sal::Object* obj,
- bufferlist *pbl)
-{
- std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op(s->obj_ctx);
- obj->set_prefetch_data(s->obj_ctx);
-
- int ret = read_op->prepare(s->yield, dpp);
- if (ret < 0) {
- return ret;
- }
-
- if (!pbl) {
- return 0;
- }
-
- ret = read_op->read(0, s->cct->_conf->rgw_max_chunk_size, *pbl, s->yield, dpp);
-
- return 0;
-}
-
-struct multipart_upload_info
-{
- rgw_placement_rule dest_placement;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(dest_placement, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(dest_placement, bl);
- DECODE_FINISH(bl);
- }
-};
-WRITE_CLASS_ENCODER(multipart_upload_info)
-
-static int get_multipart_info(const DoutPrefixProvider *dpp, struct req_state *s,
- rgw::sal::Object* obj,
- multipart_upload_info *upload_info)
-{
- bufferlist header;
-
- bufferlist headbl;
- bufferlist *pheadbl = (upload_info ? &headbl : nullptr);
-
- int op_ret = get_obj_head(dpp, s, obj, pheadbl);
- if (op_ret < 0) {
- if (op_ret == -ENOENT) {
- return -ERR_NO_SUCH_UPLOAD;
- }
- return op_ret;
- }
-
- if (upload_info && headbl.length() > 0) {
- auto hiter = headbl.cbegin();
- try {
- decode(*upload_info, hiter);
- } catch (buffer::error& err) {
- ldpp_dout(s, 0) << "ERROR: failed to decode multipart upload info" << dendl;
- return -EIO;
- }
- }
-
- return 0;
-}
-
-static int get_multipart_info(const DoutPrefixProvider *dpp, struct req_state *s,
- const string& meta_oid,
- multipart_upload_info *upload_info,
- rgw::sal::Attrs* attrs = nullptr)
-{
- map<string, bufferlist>::iterator iter;
- bufferlist header;
-
- std::unique_ptr<rgw::sal::Object> meta_obj;
- meta_obj = s->bucket->get_object(rgw_obj_key(meta_oid, string(), mp_ns));
- meta_obj->set_in_extra_data(true);
-
- int ret = get_multipart_info(dpp, s, meta_obj.get(), upload_info);
- if (ret >= 0 && attrs) {
- *attrs = meta_obj->get_attrs();
- }
- return ret;
-}
-
static int read_bucket_policy(const DoutPrefixProvider *dpp,
rgw::sal::Store* store,
struct req_state *s,
// 'copy_src' is used to make this function backward compatible.
if (!upload_id.empty() && !copy_src) {
/* multipart upload */
- RGWMPObj mp(object->get_name(), upload_id);
- string oid = mp.get_meta();
- mpobj = bucket->get_object(rgw_obj_key(oid, string(), mp_ns));
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
+ upload = store->get_multipart_upload(bucket, object->get_name(), upload_id);
+ mpobj = upload->get_meta_obj();
mpobj->set_in_extra_data(true);
object = mpobj.get();
}
rgw_placement_rule *pdest_placement;
- multipart_upload_info upload_info;
if (multipart) {
- RGWMPObj mp(s->object->get_name(), multipart_upload_id);
-
- op_ret = get_multipart_info(this, s, mp.get_meta(), &upload_info);
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
+ upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(),
+ multipart_upload_id);
+ op_ret = upload->get_info(this, s->yield, s->obj_ctx, &pdest_placement);
if (op_ret < 0) {
if (op_ret != -ENOENT) {
ldpp_dout(this, 0) << "ERROR: get_multipart_info returned " << op_ret << ": " << cpp_strerror(-op_ret) << dendl;
}
return;
}
- pdest_placement = &upload_info.dest_placement;
- ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl;
+ /* upload will go out of scope, so copy the dest placement for later use */
+ s->dest_placement = *pdest_placement;
+ pdest_placement = &s->dest_placement;
+ ldpp_dout(this, 20) << "dest_placement for part=" << *pdest_placement << dendl;
processor.emplace<MultipartObjectProcessor>(
&*aio, store, s->bucket.get(), pdest_placement,
s->owner.get_id(), obj_ctx, s->object->clone(),
return;
}
- do {
- char buf[33];
- std::unique_ptr<rgw::sal::Object> obj;
- gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);
- upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
- upload_id.append(buf);
-
- string tmp_obj_name;
- RGWMPObj mp(s->object->get_name(), upload_id);
- tmp_obj_name = mp.get_meta();
-
- obj = s->bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns));
- // the meta object will be indexed with 0 size, we c
- obj->set_in_extra_data(true);
- obj->set_hash_source(s->object->get_name());
-
- std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = obj->get_write_op(s->obj_ctx);
-
- obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */
- obj_op->params.owner = s->owner;
- obj_op->params.category = RGWObjCategory::MultiMeta;
- obj_op->params.flags = PUT_OBJ_CREATE_EXCL;
- obj_op->params.mtime = &mtime;
- obj_op->params.attrs = &attrs;
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
+ upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(),
+ upload_id);
+ op_ret = upload->init(this, s->yield, s->obj_ctx, s->owner, s->dest_placement, attrs);
- multipart_upload_info upload_info;
- upload_info.dest_placement = s->dest_placement;
-
- bufferlist bl;
- encode(upload_info, bl);
- obj_op->params.data = &bl;
-
- op_ret = obj_op->prepare(s->yield);
+ if (op_ret == 0) {
+ upload_id = upload->get_upload_id();
+ }
- op_ret = obj_op->write_meta(this, bl.length(), 0, s->yield);
- } while (op_ret == -EEXIST);
-
}
int RGWCompleteMultipart::verify_permission(optional_yield y)
void RGWCompleteMultipart::execute(optional_yield y)
{
RGWMultiCompleteUpload *parts;
- map<int, string>::iterator iter;
RGWMultiXMLParser parser;
- string meta_oid;
- map<uint32_t, RGWUploadPartInfo> obj_parts;
- map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
rgw::sal::Attrs attrs;
off_t ofs = 0;
- MD5 hash;
- char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
- char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
bufferlist etag_bl;
std::unique_ptr<rgw::sal::Object> meta_obj;
std::unique_ptr<rgw::sal::Object> target_obj;
- RGWMPObj mp;
RGWObjManifest manifest;
uint64_t olh_epoch = 0;
return;
}
- mp.init(s->object->get_name(), upload_id);
+ upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id);
- meta_oid = mp.get_meta();
-
- int total_parts = 0;
- int handled_parts = 0;
- int max_parts = 1000;
- int marker = 0;
- bool truncated;
RGWCompressionInfo cs_info;
bool compressed = false;
uint64_t accounted_size = 0;
- uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size;
-
list<rgw_obj_index_key> remove_objs; /* objects to be removed from index listing */
bool versioned_object = s->bucket->versioning_enabled();
- iter = parts->parts.begin();
-
- meta_obj = s->bucket->get_object(rgw_obj_key(meta_oid, string(), mp_ns));
+ meta_obj = upload->get_meta_obj();
meta_obj->set_in_extra_data(true);
meta_obj->set_hash_source(s->object->get_name());
return;
}
- do {
- op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,
- marker, obj_parts, &marker, &truncated);
- if (op_ret == -ENOENT) {
- op_ret = -ERR_NO_SUCH_UPLOAD;
- }
- if (op_ret < 0)
- return;
-
- total_parts += obj_parts.size();
- if (!truncated && total_parts != (int)parts->parts.size()) {
- ldpp_dout(this, 0) << "NOTICE: total parts mismatch: have: " << total_parts
- << " expected: " << parts->parts.size() << dendl;
- op_ret = -ERR_INVALID_PART;
- return;
- }
-
- for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter, ++handled_parts) {
- uint64_t part_size = obj_iter->second.accounted_size;
- if (handled_parts < (int)parts->parts.size() - 1 &&
- part_size < min_part_size) {
- op_ret = -ERR_TOO_SMALL;
- return;
- }
-
- char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
- if (iter->first != (int)obj_iter->first) {
- ldpp_dout(this, 0) << "NOTICE: parts num mismatch: next requested: "
- << iter->first << " next uploaded: "
- << obj_iter->first << dendl;
- op_ret = -ERR_INVALID_PART;
- return;
- }
- string part_etag = rgw_string_unquote(iter->second);
- if (part_etag.compare(obj_iter->second.etag) != 0) {
- ldpp_dout(this, 0) << "NOTICE: etag mismatch: part: " << iter->first
- << " etag: " << iter->second << dendl;
- op_ret = -ERR_INVALID_PART;
- return;
- }
-
- hex_to_buf(obj_iter->second.etag.c_str(), petag,
- CEPH_CRYPTO_MD5_DIGESTSIZE);
- hash.Update((const unsigned char *)petag, sizeof(petag));
-
- RGWUploadPartInfo& obj_part = obj_iter->second;
-
- /* update manifest for part */
- string oid = mp.get_part(obj_iter->second.num);
- rgw_obj src_obj;
- src_obj.init_ns(s->bucket->get_key(), oid, mp_ns);
-
- if (obj_part.manifest.empty()) {
- ldpp_dout(this, 0) << "ERROR: empty manifest for object part: obj="
- << src_obj << dendl;
- op_ret = -ERR_INVALID_PART;
- return;
- } else {
- manifest.append(this, obj_part.manifest, store->get_zone());
- }
-
- bool part_compressed = (obj_part.cs_info.compression_type != "none");
- if ((handled_parts > 0) &&
- ((part_compressed != compressed) ||
- (cs_info.compression_type != obj_part.cs_info.compression_type))) {
- ldpp_dout(this, 0) << "ERROR: compression type was changed during multipart upload ("
- << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
- op_ret = -ERR_INVALID_PART;
- return;
- }
-
- if (part_compressed) {
- int64_t new_ofs; // offset in compression data for new part
- if (cs_info.blocks.size() > 0)
- new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
- else
- new_ofs = 0;
- for (const auto& block : obj_part.cs_info.blocks) {
- compression_block cb;
- cb.old_ofs = block.old_ofs + cs_info.orig_size;
- cb.new_ofs = new_ofs;
- cb.len = block.len;
- cs_info.blocks.push_back(cb);
- new_ofs = cb.new_ofs + cb.len;
- }
- if (!compressed)
- cs_info.compression_type = obj_part.cs_info.compression_type;
- cs_info.orig_size += obj_part.cs_info.orig_size;
- compressed = true;
- }
-
- rgw_obj_index_key remove_key;
- src_obj.key.get_index_key(&remove_key);
-
- remove_objs.push_back(remove_key);
-
- ofs += obj_part.size;
- accounted_size += obj_part.accounted_size;
- }
- } while (truncated);
- hash.Final((unsigned char *)final_etag);
-
- buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
- snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
- "-%lld", (long long)parts->parts.size());
- etag = final_etag_str;
- ldpp_dout(this, 10) << "calculated etag: " << final_etag_str << dendl;
+ op_ret = upload->complete(this, s->cct, etag, manifest, parts->parts, remove_objs, accounted_size, compressed, cs_info, ofs);
+ if (op_ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: upload complete failed ret=" << op_ret << dendl;
+ return;
+ }
- etag_bl.append(final_etag_str, strlen(final_etag_str));
+ etag_bl.append(etag);
attrs[RGW_ATTR_ETAG] = etag_bl;
}
// send request to notification manager
- int ret = res->publish_commit(this, ofs, ceph::real_clock::now(), final_etag_str, target_obj->get_instance());
+ int ret = res->publish_commit(this, ofs, target_obj->get_mtime(), etag, target_obj->get_instance());
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
{
op_ret = -EINVAL;
string upload_id;
- string meta_oid;
upload_id = s->info.args.get("uploadId");
rgw_obj meta_obj;
- RGWMPObj mp;
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
if (upload_id.empty() || rgw::sal::Object::empty(s->object.get()))
return;
- mp.init(s->object->get_name(), upload_id);
- meta_oid = mp.get_meta();
-
- op_ret = get_multipart_info(this, s, meta_oid, nullptr);
- if (op_ret < 0)
- return;
-
+ upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id);
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
- op_ret = abort_multipart_upload(this, store, s->cct, obj_ctx, s->bucket.get(), mp);
+ op_ret = upload->abort(this, s->cct, obj_ctx);
}
int RGWListMultipart::verify_permission(optional_yield y)
void RGWListMultipart::execute(optional_yield y)
{
- string meta_oid;
- RGWMPObj mp;
-
op_ret = get_params(y);
if (op_ret < 0)
return;
- mp.init(s->object->get_name(), upload_id);
- meta_oid = mp.get_meta();
+ upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id);
rgw::sal::Attrs attrs;
- op_ret = get_multipart_info(this, s, meta_oid, nullptr, &attrs);
+ op_ret = upload->get_info(this, s->yield, s->obj_ctx, nullptr, &attrs);
/* decode policy */
map<string, bufferlist>::iterator iter = attrs.find(RGW_ATTR_ACL);
if (iter != attrs.end()) {
if (op_ret < 0)
return;
- op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,
- marker, parts, NULL, &truncated);
+ op_ret = upload->list_parts(this, s->cct, max_parts, marker, NULL, &truncated);
}
int RGWListBucketMultiparts::verify_permission(optional_yield y)
void RGWListBucketMultiparts::execute(optional_yield y)
{
- vector<rgw_bucket_dir_entry> objs;
- string marker_meta;
-
op_ret = get_params(y);
if (op_ret < 0)
return;
delimiter="/";
}
}
- marker_meta = marker.get_meta();
- op_ret = list_bucket_multiparts(this, s->bucket.get(), prefix, marker_meta, delimiter,
- max_uploads, &objs, &common_prefixes, &is_truncated);
+ op_ret = s->bucket->list_multiparts(this, prefix, marker_meta,
+ delimiter, max_uploads, uploads,
+ &common_prefixes, &is_truncated);
if (op_ret < 0) {
return;
}
- if (!objs.empty()) {
- vector<rgw_bucket_dir_entry>::iterator iter;
- RGWMultipartUploadEntry entry;
- for (iter = objs.begin(); iter != objs.end(); ++iter) {
- rgw_obj_key key(iter->key);
- if (!entry.mp.from_meta(key.name))
- continue;
- entry.obj = *iter;
- uploads.push_back(entry);
- }
- next_marker = entry;
+ if (!uploads.empty()) {
+ next_marker_key = uploads.back()->get_key();
+ next_marker_upload_id = uploads.back()->get_upload_id();
}
}
class RGWListMultipart : public RGWOp {
protected:
string upload_id;
- map<uint32_t, RGWUploadPartInfo> parts;
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
int max_parts;
int marker;
RGWAccessControlPolicy policy;
uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
};
-struct RGWMultipartUploadEntry {
- rgw_bucket_dir_entry obj;
- RGWMPObj mp;
-
- friend std::ostream& operator<<(std::ostream& out,
- const RGWMultipartUploadEntry& e) {
- constexpr char quote = '"';
- return out << "RGWMultipartUploadEntry{ obj.key=" <<
- quote << e.obj.key << quote << " mp=" << e.mp << " }";
- }
-};
-
class RGWListBucketMultiparts : public RGWOp {
protected:
string prefix;
- RGWMPObj marker;
- RGWMultipartUploadEntry next_marker;
+ string marker_meta;
+ string marker_key;
+ string marker_upload_id;
+ string next_marker_key;
+ string next_marker_upload_id;
int max_uploads;
string delimiter;
- vector<RGWMultipartUploadEntry> uploads;
+ vector<std::unique_ptr<rgw::sal::MultipartUpload>> uploads;
map<string, bool> common_prefixes;
bool is_truncated;
int default_max;
{
constexpr int max_uploads = 1000;
constexpr int max_parts = 1000;
- static const std::string mp_ns = RGW_OBJ_NS_MULTIPART;
- static MultipartMetaFilter mp_filter;
-
+ std::string marker;
+ vector<std::unique_ptr<rgw::sal::MultipartUpload>> uploads;
+ bool is_truncated;
int ret;
- rgw::sal::Bucket::ListParams params;
- rgw::sal::Bucket::ListResults results;
-
- params.ns = mp_ns;
- params.filter = &mp_filter;
- // use empty string for initial params.marker
// use empty strings for params.{prefix,delim}
do {
- ret = bucket->list(dpp, params, max_uploads, results, null_yield);
+ ret = bucket->list_multiparts(dpp, string(), marker, string(), max_uploads, uploads, nullptr, &is_truncated);
if (ret == -ENOENT) {
// could bucket have been removed while this is running?
ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ <<
return ret;
}
- if (!results.objs.empty()) {
- std::vector<RGWMultipartUploadEntry> uploads;
- RGWMultipartUploadEntry entry;
- for (const rgw_bucket_dir_entry& obj : results.objs) {
- const rgw_obj_key& key = obj.key;
- if (!entry.mp.from_meta(key.name)) {
- // we only want the meta objects, so skip all the components
- continue;
- }
- entry.obj = obj;
- uploads.push_back(entry);
- ldpp_dout(dpp, 20) << "RGWRadosList::" << __func__ <<
- " processing incomplete multipart entry " <<
- entry << dendl;
- }
-
+ if (!uploads.empty()) {
// now process the uploads vector
for (const auto& upload : uploads) {
- const RGWMPObj& mp = upload.mp;
int parts_marker = 0;
bool is_parts_truncated = false;
do { // while (is_parts_truncated);
- std::map<uint32_t, RGWUploadPartInfo> parts;
- ret = list_multipart_parts(dpp, bucket, store->ctx(),
- mp.get_upload_id(), mp.get_meta(),
- max_parts, parts_marker,
- parts, &parts_marker,
- &is_parts_truncated);
+ ret = upload->list_parts(dpp, store->ctx(), max_parts, parts_marker,
+ &parts_marker, &is_parts_truncated);
if (ret == -ENOENT) {
ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ <<
": WARNING: list_multipart_parts returned ret=-ENOENT "
- "for " << mp.get_upload_id() << ", moving on" << dendl;
+ "for " << upload->get_upload_id() << ", moving on" << dendl;
break;
} else if (ret < 0) {
ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
return ret;
}
- for (auto& p : parts) {
- RGWObjManifest& manifest = p.second.manifest;
+ for (auto& p : upload->get_parts()) {
+ rgw::sal::RadosMultipartPart* part =
+ dynamic_cast<rgw::sal::RadosMultipartPart*>(p.second.get());
+ RGWObjManifest& manifest = part->get_manifest();
for (auto obj_it = manifest.obj_begin(dpp);
obj_it != manifest.obj_end(dpp);
++obj_it) {
} while (is_parts_truncated);
} // for (const auto& upload
} // if objs not empty
- } while (results.is_truncated);
+ } while (is_truncated);
return 0;
} // RGWRadosList::do_incomplete_multipart
string key_marker = s->info.args.get("key-marker");
string upload_id_marker = s->info.args.get("upload-id-marker");
- if (!key_marker.empty())
- marker.init(key_marker, upload_id_marker);
+ if (!key_marker.empty()) {
+ std::unique_ptr<rgw::sal::MultipartUpload> upload;
+ upload = store->get_multipart_upload(s->bucket.get(), key_marker,
+ upload_id_marker);
+ marker_meta = upload->get_meta();
+ marker_key = upload->get_key();
+ marker_upload_id = upload->get_upload_id();
+ }
return 0;
}
{
int res = 0;
if (!multipart_upload_id.empty()) {
- RGWMPObj mp(s->object->get_name(), multipart_upload_id);
- std::unique_ptr<rgw::sal::Object> obj = s->bucket->get_object(
- rgw_obj_key(mp.get_meta(),
- std::string(),
- RGW_OBJ_NS_MULTIPART));
+ std::unique_ptr<rgw::sal::MultipartUpload> upload =
+ store->get_multipart_upload(s->bucket.get(), s->object->get_name(),
+ multipart_upload_id);
+ std::unique_ptr<rgw::sal::Object> obj = upload->get_meta_obj();
obj->set_in_extra_data(true);
res = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
if (res == 0) {
if (op_ret == 0) {
dump_start(s);
s->formatter->open_object_section_in_ns("ListPartsResult", XMLNS_AWS_S3);
- map<uint32_t, RGWUploadPartInfo>::iterator iter;
- map<uint32_t, RGWUploadPartInfo>::reverse_iterator test_iter;
+ map<uint32_t, std::unique_ptr<rgw::sal::MultipartPart>>::iterator iter;
+ map<uint32_t, std::unique_ptr<rgw::sal::MultipartPart>>::reverse_iterator test_iter;
int cur_max = 0;
- iter = parts.begin();
- test_iter = parts.rbegin();
- if (test_iter != parts.rend()) {
+ iter = upload->get_parts().begin();
+ test_iter = upload->get_parts().rbegin();
+ if (test_iter != upload->get_parts().rend()) {
cur_max = test_iter->first;
}
if (!s->bucket_tenant.empty())
ACLOwner& owner = policy.get_owner();
dump_owner(s, owner.get_id(), owner.get_display_name());
- for (; iter != parts.end(); ++iter) {
- RGWUploadPartInfo& info = iter->second;
+ for (; iter != upload->get_parts().end(); ++iter) {
+ rgw::sal::MultipartPart* part = iter->second.get();
s->formatter->open_object_section("Part");
- dump_time(s, "LastModified", &info.modified);
+ dump_time(s, "LastModified", &part->get_mtime());
- s->formatter->dump_unsigned("PartNumber", info.num);
- s->formatter->dump_format("ETag", "\"%s\"", info.etag.c_str());
- s->formatter->dump_unsigned("Size", info.accounted_size);
+ s->formatter->dump_unsigned("PartNumber", part->get_num());
+ s->formatter->dump_format("ETag", "\"%s\"", part->get_etag().c_str());
+ s->formatter->dump_unsigned("Size", part->get_size());
s->formatter->close_section();
}
s->formatter->close_section();
s->formatter->dump_string("Bucket", s->bucket_name);
if (!prefix.empty())
s->formatter->dump_string("ListMultipartUploadsResult.Prefix", prefix);
- const string& key_marker = marker.get_key();
- if (!key_marker.empty())
- s->formatter->dump_string("KeyMarker", key_marker);
- const string& upload_id_marker = marker.get_upload_id();
- if (!upload_id_marker.empty())
- s->formatter->dump_string("UploadIdMarker", upload_id_marker);
- string next_key = next_marker.mp.get_key();
- if (!next_key.empty())
- s->formatter->dump_string("NextKeyMarker", next_key);
- string next_upload_id = next_marker.mp.get_upload_id();
- if (!next_upload_id.empty())
- s->formatter->dump_string("NextUploadIdMarker", next_upload_id);
+ if (!marker_key.empty())
+ s->formatter->dump_string("KeyMarker", marker_key);
+ if (!marker_upload_id.empty())
+ s->formatter->dump_string("UploadIdMarker", marker_upload_id);
+ if (!next_marker_key.empty())
+ s->formatter->dump_string("NextKeyMarker", next_marker_key);
+ if (!next_marker_upload_id.empty())
+ s->formatter->dump_string("NextUploadIdMarker", next_marker_upload_id);
s->formatter->dump_int("MaxUploads", max_uploads);
if (!delimiter.empty())
s->formatter->dump_string("Delimiter", delimiter);
s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false"));
if (op_ret >= 0) {
- vector<RGWMultipartUploadEntry>::iterator iter;
+ vector<std::unique_ptr<rgw::sal::MultipartUpload>>::iterator iter;
for (iter = uploads.begin(); iter != uploads.end(); ++iter) {
- RGWMPObj& mp = iter->mp;
+ rgw::sal::MultipartUpload* upload = iter->get();
s->formatter->open_array_section("Upload");
if (encode_url) {
- s->formatter->dump_string("Key", url_encode(mp.get_key(), false));
+ s->formatter->dump_string("Key", url_encode(upload->get_key(), false));
} else {
- s->formatter->dump_string("Key", mp.get_key());
+ s->formatter->dump_string("Key", upload->get_key());
}
- s->formatter->dump_string("UploadId", mp.get_upload_id());
+ s->formatter->dump_string("UploadId", upload->get_upload_id());
dump_owner(s, s->user->get_id(), s->user->get_display_name(), "Initiator");
dump_owner(s, s->user->get_id(), s->user->get_display_name());
s->formatter->dump_string("StorageClass", "STANDARD");
- dump_time(s, "Initiated", &iter->obj.meta.mtime);
+ dump_time(s, "Initiated", &upload->get_mtime());
s->formatter->close_section();
}
if (!common_prefixes.empty()) {
class RGWDataSyncStatusManager;
class RGWSyncModuleInstance;
typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
+class RGWCompressionInfo;
+
namespace rgw {
class Aio;
namespace IAM { struct Policy; }
class Bucket;
class Object;
class BucketList;
+class MultipartUpload;
struct MPSerializer;
class Lifecycle;
class Notification;
virtual int get_oidc_providers(const DoutPrefixProvider *dpp,
const std::string& tenant,
vector<std::unique_ptr<RGWOIDCProvider>>& providers) = 0;
+ virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) = 0;
virtual void finalize(void) = 0;
static bool empty(Bucket* b) { return (!b || b->empty()); }
virtual std::unique_ptr<Bucket> clone() = 0;
+ virtual int list_multiparts(const DoutPrefixProvider *dpp,
+ const string& prefix,
+ string& marker,
+ const string& delim,
+ const int& max_uploads,
+ vector<std::unique_ptr<MultipartUpload>>& uploads,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated) = 0;
+ virtual int abort_multiparts(const DoutPrefixProvider *dpp,
+ CephContext *cct,
+ string& prefix, string& delim) = 0;
+
/* dang - This is temporary, until the API is completed */
rgw_bucket& get_key() { return info.bucket; }
RGWBucketInfo& get_info() { return info; }
}
};
+class MultipartPart {
+protected:
+ std::string oid;
+
+public:
+ MultipartPart() = default;
+ virtual ~MultipartPart() = default;
+
+ virtual uint32_t get_num() = 0;
+ virtual uint64_t get_size() = 0;
+ virtual const std::string& get_etag() = 0;
+ virtual ceph::real_time& get_mtime() = 0;
+};
+
+class MultipartUpload {
+protected:
+ Bucket* bucket;
+ std::map<uint32_t, std::unique_ptr<MultipartPart>> parts;
+
+public:
+ MultipartUpload(Bucket* _bucket) : bucket(_bucket) {}
+ virtual ~MultipartUpload() = default;
+
+ virtual const std::string& get_meta() const = 0;
+ virtual const std::string& get_key() const = 0;
+ virtual const std::string& get_upload_id() const = 0;
+ virtual ceph::real_time& get_mtime() = 0;
+
+ std::map<uint32_t, std::unique_ptr<MultipartPart>>& get_parts() { return parts; }
+
+ virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() = 0;
+
+ virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) = 0;
+ virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
+ int num_parts, int marker,
+ int* next_marker, bool* truncated,
+ bool assume_unsorted = false) = 0;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
+ RGWObjectCtx* obj_ctx) = 0;
+ virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct,
+ std::string& etag, RGWObjManifest& manifest,
+ map<int, string>& part_etags,
+ list<rgw_obj_index_key>& remove_objs,
+ uint64_t& accounted_size, bool& compressed,
+ RGWCompressionInfo& cs_info, off_t& ofs) = 0;
+
+ virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0;
+
+ friend inline ostream& operator<<(ostream& out, const MultipartUpload& u) {
+ out << u.get_meta();
+ if (!u.get_upload_id().empty())
+ out << ":" << u.get_upload_id();
+ return out;
+ }
+ friend inline ostream& operator<<(ostream& out, const MultipartUpload* u) {
+ if (!u)
+ out << "<NULL>";
+ else
+ out << *u;
+ return out;
+ }
+ friend inline ostream& operator<<(ostream& out, const
+ std::unique_ptr<MultipartUpload>& p) {
+ out << p.get();
+ return out;
+ }
+};
+
struct Serializer {
Serializer() = default;
virtual ~Serializer() = default;
#define dout_subsys ceph_subsys_rgw
+static string mp_ns = RGW_OBJ_NS_MULTIPART;
+
namespace rgw::sal {
+struct multipart_upload_info
+{
+ rgw_placement_rule dest_placement;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(dest_placement, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(dest_placement, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(multipart_upload_info)
+
// default number of entries to list with each bucket listing call
// (use marker to bridge between calls)
static constexpr size_t listing_max_entries = 1000;
/* If there's a prefix, then we are aborting multiparts as well */
if (!prefix.empty()) {
- ret = abort_bucket_multiparts(dpp, store, store->ctx(), this, prefix, delimiter);
+ ret = abort_multiparts(dpp, store->ctx(), prefix, delimiter);
if (ret < 0) {
return ret;
}
string prefix, delimiter;
- ret = abort_bucket_multiparts(dpp, store, cct, this, prefix, delimiter);
+ ret = abort_multiparts(dpp, cct, prefix, delimiter);
if (ret < 0) {
return ret;
}
return ret;
}
+int RadosBucket::list_multiparts(const DoutPrefixProvider *dpp,
+ const string& prefix,
+ string& marker,
+ const string& delim,
+ const int& max_uploads,
+ vector<std::unique_ptr<MultipartUpload>>& uploads,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated)
+{
+ rgw::sal::Bucket::ListParams params;
+ rgw::sal::Bucket::ListResults results;
+ MultipartMetaFilter mp_filter;
+
+ params.prefix = prefix;
+ params.delim = delim;
+ params.marker = marker;
+ params.ns = RGW_OBJ_NS_MULTIPART;
+ params.filter = &mp_filter;
+
+ int ret = list(dpp, params, max_uploads, results, null_yield);
+
+ if (ret < 0)
+ return ret;
+
+ if (!results.objs.empty()) {
+ for (const rgw_bucket_dir_entry& dentry : results.objs) {
+ rgw_obj_key key(dentry.key);
+ uploads.push_back(store->get_multipart_upload(this, key.name));
+ }
+ }
+ if (common_prefixes) {
+ *common_prefixes = std::move(results.common_prefixes);
+ }
+ *is_truncated = results.is_truncated;
+ marker = params.marker.name;
+
+ return 0;
+}
+
+int RadosBucket::abort_multiparts(const DoutPrefixProvider *dpp,
+ CephContext *cct,
+ string& prefix, string& delim)
+{
+ constexpr int max = 1000;
+ int ret, num_deleted = 0;
+ vector<std::unique_ptr<MultipartUpload>> uploads;
+ RGWObjectCtx obj_ctx(store);
+ string marker;
+ bool is_truncated;
+
+ do {
+ ret = list_multiparts(dpp, prefix, marker, delim,
+ max, uploads, nullptr, &is_truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ " ERROR : calling list_bucket_multiparts; ret=" << ret <<
+ "; bucket=\"" << this << "\"; prefix=\"" <<
+ prefix << "\"; delim=\"" << delim << "\"" << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << __func__ <<
+ " INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
+ this << "\"; uploads.size()=" << uploads.size() <<
+ "; is_truncated=" << is_truncated << dendl;
+
+ if (!uploads.empty()) {
+ for (const auto& upload : uploads) {
+ ret = upload->abort(dpp, cct, &obj_ctx);
+ if (ret < 0) {
+ // we're doing a best-effort; if something cannot be found,
+ // log it and keep moving forward
+ if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ " ERROR : failed to abort and clean-up multipart upload \"" <<
+ upload->get_meta() << "\"" << dendl;
+ return ret;
+ } else {
+ ldpp_dout(dpp, 10) << __func__ <<
+ " NOTE : unable to find part(s) of "
+ "aborted multipart upload of \"" << upload->get_meta() <<
+ "\" for cleaning up" << dendl;
+ }
+ }
+ num_deleted++;
+ }
+ if (num_deleted) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ " WARNING : aborted " << num_deleted <<
+ " incomplete multipart uploads" << dendl;
+ }
+ }
+ } while (is_truncated);
+
+ return 0;
+}
+
std::unique_ptr<User> RadosStore::get_user(const rgw_user &u)
{
return std::unique_ptr<User>(new RadosUser(this, u));
return 0;
}
+std::unique_ptr<MultipartUpload> RadosStore::get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id, ceph::real_time mtime)
+{
+ return std::unique_ptr<MultipartUpload>(new RadosMultipartUpload(this, bucket, oid, upload_id, mtime));
+}
+
int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx* ioctx)
{
return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx);
y);
}
+int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
+ RGWObjectCtx *obj_ctx)
+{
+ std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
+ meta_obj->set_in_extra_data(true);
+ meta_obj->set_hash_source(mp_obj.get_key());
+ std::unique_ptr<rgw::sal::GCChain> chain = store->get_gc_chain(meta_obj.get());
+ list<rgw_obj_index_key> remove_objs;
+ bool truncated;
+ int marker = 0;
+ int ret;
+ uint64_t parts_accounted_size = 0;
+
+ do {
+ ret = list_parts(dpp, cct, 1000, marker, &marker, &truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": RadosMultipartUpload::list_parts returned " <<
+ ret << dendl;
+ return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+ }
+
+ for (auto part_it = parts.begin();
+ part_it != parts.end();
+ ++part_it) {
+ RadosMultipartPart* obj_part = dynamic_cast<RadosMultipartPart*>(part_it->second.get());
+ if (obj_part->info.manifest.empty()) {
+ std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
+ rgw_obj_key(obj_part->oid, std::string(), RGW_OBJ_NS_MULTIPART));
+ obj->set_hash_source(mp_obj.get_key());
+ ret = obj->delete_object(dpp, obj_ctx, null_yield);
+ if (ret < 0 && ret != -ENOENT)
+ return ret;
+ } else {
+ chain->update(dpp, &obj_part->info.manifest);
+ RGWObjManifest::obj_iterator oiter = obj_part->info.manifest.obj_begin(dpp);
+ if (oiter != obj_part->info.manifest.obj_end(dpp)) {
+ std::unique_ptr<rgw::sal::Object> head = bucket->get_object(rgw_obj_key());
+ rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
+ head->raw_obj_to_obj(raw_head);
+
+ rgw_obj_index_key key;
+ head->get_key().get_index_key(&key);
+ remove_objs.push_back(key);
+ }
+ }
+ parts_accounted_size += obj_part->info.accounted_size;
+ }
+ } while (truncated);
+
+ /* use upload id as tag and do it synchronously */
+ ret = chain->send(mp_obj.get_upload_id());
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ //Delete objects inline if send chain to gc fails
+ chain->delete_inline(dpp, mp_obj.get_upload_id());
+ }
+
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
+ del_op->params.bucket_owner = bucket->get_acl_owner();
+ del_op->params.versioning_status = 0;
+ if (!remove_objs.empty()) {
+ del_op->params.remove_objs = &remove_objs;
+ }
+
+ del_op->params.abortmp = true;
+ del_op->params.parts_accounted_size = parts_accounted_size;
+
+ // and also remove the metadata obj
+ ret = del_op->delete_obj(dpp, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
+ ret << dendl;
+ }
+ return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
+}
+
+std::unique_ptr<rgw::sal::Object> RadosMultipartUpload::get_meta_obj()
+{
+ return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns));
+}
+
+int RadosMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs)
+{
+ int ret;
+ std::string oid = mp_obj.get_key();
+
+ do {
+ char buf[33];
+ string tmp_obj_name;
+ std::unique_ptr<rgw::sal::Object> obj;
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
+ upload_id.append(buf);
+
+ mp_obj.init(oid, upload_id);
+ tmp_obj_name = mp_obj.get_meta();
+
+ obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns));
+ // the meta object will be indexed with 0 size, we c
+ obj->set_in_extra_data(true);
+ obj->set_hash_source(oid);
+
+ std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = obj->get_write_op(obj_ctx);
+
+ obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */
+ obj_op->params.owner = owner;
+ obj_op->params.category = RGWObjCategory::MultiMeta;
+ obj_op->params.flags = PUT_OBJ_CREATE_EXCL;
+ obj_op->params.mtime = &mtime;
+ obj_op->params.attrs = &attrs;
+
+ multipart_upload_info upload_info;
+ upload_info.dest_placement = dest_placement;
+
+ bufferlist bl;
+ encode(upload_info, bl);
+ obj_op->params.data = &bl;
+
+ ret = obj_op->prepare(y);
+
+ ret = obj_op->write_meta(dpp, bl.length(), 0, y);
+ } while (ret == -EEXIST);
+
+ return ret;
+}
+
+int RadosMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
+ int num_parts, int marker,
+ int *next_marker, bool *truncated,
+ bool assume_unsorted)
+{
+ map<string, bufferlist> parts_map;
+ map<string, bufferlist>::iterator iter;
+
+ std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
+ rgw_obj_key(get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
+ obj->set_in_extra_data(true);
+
+ bool sorted_omap = is_v2_upload_id(get_upload_id()) && !assume_unsorted;
+
+ parts.clear();
+
+ int ret;
+ if (sorted_omap) {
+ string p;
+ p = "part.";
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%08d", marker);
+ p.append(buf);
+
+ ret = obj->omap_get_vals(dpp, p, num_parts + 1, &parts_map,
+ nullptr, null_yield);
+ } else {
+ ret = obj->omap_get_all(dpp, &parts_map, null_yield);
+ }
+ if (ret < 0) {
+ return ret;
+ }
+
+ int i;
+ int last_num = 0;
+
+ uint32_t expected_next = marker + 1;
+
+ for (i = 0, iter = parts_map.begin();
+ (i < num_parts || !sorted_omap) && iter != parts_map.end();
+ ++iter, ++i) {
+ bufferlist& bl = iter->second;
+ auto bli = bl.cbegin();
+ std::unique_ptr<RadosMultipartPart> part = std::make_unique<RadosMultipartPart>();
+ try {
+ decode(part->info, bli);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" <<
+ dendl;
+ return -EIO;
+ }
+ if (sorted_omap) {
+ if (part->info.num != expected_next) {
+ /* ouch, we expected a specific part num here, but we got a
+ * different one. Either a part is missing, or it could be a
+ * case of mixed rgw versions working on the same upload,
+ * where one gateway doesn't support correctly sorted omap
+ * keys for multipart upload just assume data is unsorted.
+ */
+ return list_parts(dpp, cct, num_parts, marker, next_marker, truncated, true);
+ }
+ expected_next++;
+ }
+ if (sorted_omap ||
+ (int)part->info.num > marker) {
+ last_num = part->info.num;
+ parts[part->info.num] = std::move(part);
+ }
+ }
+
+ if (sorted_omap) {
+ if (truncated) {
+ *truncated = (iter != parts_map.end());
+ }
+ } else {
+ /* rebuild a map with only num_parts entries */
+ std::map<uint32_t, std::unique_ptr<MultipartPart>> new_parts;
+ std::map<uint32_t, std::unique_ptr<MultipartPart>>::iterator piter;
+ for (i = 0, piter = parts.begin();
+ i < num_parts && piter != parts.end();
+ ++i, ++piter) {
+ last_num = piter->first;
+ new_parts[piter->first] = std::move(piter->second);
+ }
+
+ if (truncated) {
+ *truncated = (piter != parts.end());
+ }
+
+ parts.swap(new_parts);
+ }
+
+ if (next_marker) {
+ *next_marker = last_num;
+ }
+
+ return 0;
+}
+
+int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, CephContext* cct,
+ std::string& etag, RGWObjManifest& manifest,
+ map<int, string>& part_etags,
+ list<rgw_obj_index_key>& remove_objs,
+ uint64_t& accounted_size, bool& compressed,
+ RGWCompressionInfo& cs_info, off_t& ofs)
+{
+ char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+ MD5 hash;
+ bool truncated;
+ int ret;
+
+ int total_parts = 0;
+ int handled_parts = 0;
+ int max_parts = 1000;
+ int marker = 0;
+ uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
+ auto etags_iter = part_etags.begin();
+
+ do {
+ ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
+ if (ret == -ENOENT) {
+ ret = -ERR_NO_SUCH_UPLOAD;
+ }
+ if (ret < 0)
+ return ret;
+
+ total_parts += parts.size();
+ if (!truncated && total_parts != (int)part_etags.size()) {
+ ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts
+ << " expected: " << part_etags.size() << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+
+ for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) {
+ RadosMultipartPart* part = dynamic_cast<rgw::sal::RadosMultipartPart*>(obj_iter->second.get());
+ uint64_t part_size = part->get_size();
+ if (handled_parts < (int)part_etags.size() - 1 &&
+ part_size < min_part_size) {
+ ret = -ERR_TOO_SMALL;
+ return ret;
+ }
+
+ char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ if (etags_iter->first != (int)obj_iter->first) {
+ ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: "
+ << etags_iter->first << " next uploaded: "
+ << obj_iter->first << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+ string part_etag = rgw_string_unquote(etags_iter->second);
+ if (part_etag.compare(part->get_etag()) != 0) {
+ ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first
+ << " etag: " << etags_iter->second << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+
+ hex_to_buf(part->get_etag().c_str(), petag,
+ CEPH_CRYPTO_MD5_DIGESTSIZE);
+ hash.Update((const unsigned char *)petag, sizeof(petag));
+
+ RGWUploadPartInfo& obj_part = part->info;
+
+ /* update manifest for part */
+ string oid = mp_obj.get_part(part->info.num);
+ rgw_obj src_obj;
+ src_obj.init_ns(bucket->get_key(), oid, mp_ns);
+
+ if (obj_part.manifest.empty()) {
+ ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj="
+ << src_obj << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ } else {
+ manifest.append(dpp, obj_part.manifest, store->get_zone());
+ }
+
+ bool part_compressed = (obj_part.cs_info.compression_type != "none");
+ if ((handled_parts > 0) &&
+ ((part_compressed != compressed) ||
+ (cs_info.compression_type != obj_part.cs_info.compression_type))) {
+ ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload ("
+ << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl;
+ ret = -ERR_INVALID_PART;
+ return ret;
+ }
+
+ if (part_compressed) {
+ int64_t new_ofs; // offset in compression data for new part
+ if (cs_info.blocks.size() > 0)
+ new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
+ else
+ new_ofs = 0;
+ for (const auto& block : obj_part.cs_info.blocks) {
+ compression_block cb;
+ cb.old_ofs = block.old_ofs + cs_info.orig_size;
+ cb.new_ofs = new_ofs;
+ cb.len = block.len;
+ cs_info.blocks.push_back(cb);
+ new_ofs = cb.new_ofs + cb.len;
+ }
+ if (!compressed)
+ cs_info.compression_type = obj_part.cs_info.compression_type;
+ cs_info.orig_size += obj_part.cs_info.orig_size;
+ compressed = true;
+ }
+
+ rgw_obj_index_key remove_key;
+ src_obj.key.get_index_key(&remove_key);
+
+ remove_objs.push_back(remove_key);
+
+ ofs += obj_part.size;
+ accounted_size += obj_part.accounted_size;
+ }
+ } while (truncated);
+ hash.Final((unsigned char *)final_etag);
+
+ buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
+ snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
+ sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
+ "-%lld", (long long)part_etags.size());
+ etag = final_etag_str;
+ ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl;
+
+ return ret;
+}
+
+int RadosMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
+{
+ if (!rule && !attrs) {
+ return 0;
+ }
+
+ if (rule) {
+ if (!placement.empty()) {
+ *rule = &placement;
+ if (!attrs) {
+ /* Don't need attrs, done */
+ return 0;
+ }
+ } else {
+ *rule = nullptr;
+ }
+ }
+
+ /* We need either attributes or placement, so we need a read */
+ std::unique_ptr<rgw::sal::Object> meta_obj;
+ meta_obj = get_meta_obj();
+ meta_obj->set_in_extra_data(true);
+
+ multipart_upload_info upload_info;
+ bufferlist headbl;
+
+ /* Read the obj head which contains the multipart_upload_info */
+ std::unique_ptr<rgw::sal::Object::ReadOp> read_op = meta_obj->get_read_op(obj_ctx);
+ meta_obj->set_prefetch_data(obj_ctx);
+
+ int ret = read_op->prepare(y, dpp);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ return ret;
+ }
+
+ if (attrs) {
+ /* Attrs are filled in by prepare */
+ *attrs = meta_obj->get_attrs();
+ if (!rule || *rule != nullptr) {
+ /* placement was cached; don't actually read */
+ return 0;
+ }
+ }
+
+ /* Now read the placement from the head */
+ ret = read_op->read(0, store->ctx()->_conf->rgw_max_chunk_size, headbl, y, dpp);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ return ret;
+ }
+
+ if (headbl.length() <= 0) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+
+ /* Decode multipart_upload_info */
+ auto hiter = headbl.cbegin();
+ try {
+ decode(upload_info, hiter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl;
+ return -EIO;
+ }
+ placement = upload_info.dest_placement;
+ *rule = &placement;
+
+ return 0;
+}
+
MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
lock(lock_name)
{
#include "rgw_notify.h"
#include "rgw_oidc_provider.h"
#include "rgw_role.h"
+#include "rgw_multi.h"
+#include "services/svc_tier_rados.h"
#include "cls/lock/cls_lock_client.h"
namespace rgw { namespace sal {
class RadosStore;
+class RadosMultipartUpload;
class RadosCompletions : public Completions {
public:
virtual std::unique_ptr<Bucket> clone() override {
return std::make_unique<RadosBucket>(*this);
}
+ virtual int list_multiparts(const DoutPrefixProvider *dpp,
+ const string& prefix,
+ string& marker,
+ const string& delim,
+ const int& max_uploads,
+ vector<std::unique_ptr<MultipartUpload>>& uploads,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated) override;
+ virtual int abort_multiparts(const DoutPrefixProvider *dpp,
+ CephContext *cct,
+ string& prefix, string& delim) override;
private:
int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
virtual int get_oidc_providers(const DoutPrefixProvider *dpp,
const std::string& tenant,
vector<std::unique_ptr<RGWOIDCProvider>>& providers) override;
+ virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) override;
virtual void finalize(void) override;
void setUserCtl(RGWUserCtl *_ctl) { user_ctl = _ctl; }
};
+class RadosMultipartPart : public MultipartPart {
+protected:
+ RGWUploadPartInfo info;
+
+public:
+ RadosMultipartPart() = default;
+ virtual ~RadosMultipartPart() = default;
+
+ virtual uint32_t get_num() { return info.num; }
+ virtual uint64_t get_size() { return info.accounted_size; }
+ virtual const std::string& get_etag() { return info.etag; }
+ virtual ceph::real_time& get_mtime() { return info.modified; }
+
+ /* For RadosStore code */
+ RGWObjManifest& get_manifest() { return info.manifest; }
+
+ friend class RadosMultipartUpload;
+};
+
+class RadosMultipartUpload : public MultipartUpload {
+ RadosStore* store;
+ RGWMPObj mp_obj;
+ ceph::real_time mtime;
+ rgw_placement_rule placement;
+
+public:
+ RadosMultipartUpload(RadosStore* _store, Bucket* _bucket, const std::string& oid, std::optional<std::string> upload_id, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), mtime(_mtime) {}
+ virtual ~RadosMultipartUpload() = default;
+
+ virtual const std::string& get_meta() const { return mp_obj.get_meta(); }
+ virtual const std::string& get_key() const { return mp_obj.get_key(); }
+ virtual const std::string& get_upload_id() const { return mp_obj.get_upload_id(); }
+ virtual ceph::real_time& get_mtime() { return mtime; }
+ virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() override;
+ virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override;
+ virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
+ int num_parts, int marker,
+ int* next_marker, bool* truncated,
+ bool assume_unsorted = false) override;
+ virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct,
+ RGWObjectCtx* obj_ctx) override;
+ virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct,
+ std::string& etag, RGWObjManifest& manifest,
+ map<int, string>& part_etags,
+ list<rgw_obj_index_key>& remove_objs,
+ uint64_t& accounted_size, bool& compressed,
+ RGWCompressionInfo& cs_info, off_t& ofs) override;
+ virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
+};
+
class MPRadosSerializer : public MPSerializer {
librados::IoCtx ioctx;
rados::cls::lock::Lock lock;
RGWMPObj(const string& _oid, const string& _upload_id) {
init(_oid, _upload_id, _upload_id);
}
+ RGWMPObj(const string& _oid, std::optional<string> _upload_id) {
+ if (_upload_id) {
+ init(_oid, *_upload_id, *_upload_id);
+ } else {
+ from_meta(_oid);
+ }
+ }
void init(const string& _oid, const string& _upload_id) {
init(_oid, _upload_id, _upload_id);
}