From 61120ff1470eb6a9c3abb86cf5c88ee83d018ca2 Mon Sep 17 00:00:00 2001 From: Daniel Gryniewicz Date: Tue, 6 Jul 2021 10:32:19 -0400 Subject: [PATCH] RGW - Zipper - MultipartUpload Create a MultipartUpload object in the Zipper API. Signed-off-by: Daniel Gryniewicz --- src/rgw/rgw_lc.cc | 7 +- src/rgw/rgw_multi.cc | 293 ---------------- src/rgw/rgw_multi.h | 34 -- src/rgw/rgw_op.cc | 338 +++--------------- src/rgw/rgw_op.h | 23 +- src/rgw/rgw_orphan.cc | 50 +-- src/rgw/rgw_rest.cc | 10 +- src/rgw/rgw_rest_s3.cc | 63 ++-- src/rgw/rgw_sal.h | 84 +++++ src/rgw/rgw_sal_rados.cc | 560 +++++++++++++++++++++++++++++- src/rgw/rgw_sal_rados.h | 65 ++++ src/rgw/services/svc_tier_rados.h | 7 + 12 files changed, 813 insertions(+), 721 deletions(-) diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 9ea78c93b35f0..1931dd4d78ba3 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -840,14 +840,11 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = boost::get>(wi); auto& [rule, obj] = wt; - RGWMPObj mp_obj; if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) { rgw_obj_key key(obj.key); - if (!mp_obj.from_meta(key.name)) { - return; - } + std::unique_ptr mpu = store->get_multipart_upload(target, key.name); RGWObjectCtx rctx(store); - int ret = abort_multipart_upload(this, store, cct, &rctx, target, mp_obj); + int ret = mpu->abort(this, cct, &rctx); if (ret == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); diff --git a/src/rgw/rgw_multi.cc b/src/rgw/rgw_multi.cc index 3f209b9934a40..b6edef13e72e9 100644 --- a/src/rgw/rgw_multi.cc +++ b/src/rgw/rgw_multi.cc @@ -79,296 +79,3 @@ bool is_v2_upload_id(const string& upload_id) (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0); } -int list_multipart_parts(const DoutPrefixProvider *dpp, rgw::sal::Bucket* bucket, - CephContext *cct, - const string& upload_id, - const string& meta_oid, int num_parts, - int marker, map& parts, - int *next_marker, bool *truncated, - bool assume_unsorted) -{ - map parts_map; - map::iterator iter; - - std::unique_ptr obj = bucket->get_object( - rgw_obj_key(meta_oid, std::string(), RGW_OBJ_NS_MULTIPART)); - obj->set_in_extra_data(true); - - bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted; - - parts.clear(); - - int ret; - if (sorted_omap) { - string p; - p = "part."; - char buf[32]; - - snprintf(buf, sizeof(buf), "%08d", marker); - p.append(buf); - - ret = obj->omap_get_vals(dpp, p, num_parts + 1, &parts_map, - nullptr, null_yield); - } else { - ret = obj->omap_get_all(dpp, &parts_map, null_yield); - } - if (ret < 0) { - return ret; - } - - int i; - int last_num = 0; - - uint32_t expected_next = marker + 1; - - for (i = 0, iter = parts_map.begin(); - (i < num_parts || !sorted_omap) && iter != parts_map.end(); - ++iter, ++i) { - bufferlist& bl = iter->second; - auto bli = bl.cbegin(); - RGWUploadPartInfo info; - try { - decode(info, bli); - } catch (buffer::error& err) { - ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" << - dendl; - return -EIO; - } - if (sorted_omap) { - if (info.num != expected_next) { - /* ouch, we expected a specific part num here, but we got a - * different one. Either a part is missing, or it could be a - * case of mixed rgw versions working on the same upload, - * where one gateway doesn't support correctly sorted omap - * keys for multipart upload just assume data is unsorted. - */ - return list_multipart_parts(dpp, bucket, cct, upload_id, - meta_oid, num_parts, marker, parts, - next_marker, truncated, true); - } - expected_next++; - } - if (sorted_omap || - (int)info.num > marker) { - parts[info.num] = info; - last_num = info.num; - } - } - - if (sorted_omap) { - if (truncated) { - *truncated = (iter != parts_map.end()); - } - } else { - /* rebuild a map with only num_parts entries */ - map new_parts; - map::iterator piter; - for (i = 0, piter = parts.begin(); - i < num_parts && piter != parts.end(); - ++i, ++piter) { - new_parts[piter->first] = piter->second; - last_num = piter->first; - } - - if (truncated) { - *truncated = (piter != parts.end()); - } - - parts.swap(new_parts); - } - - if (next_marker) { - *next_marker = last_num; - } - - return 0; -} - -int list_multipart_parts(const DoutPrefixProvider *dpp, struct req_state *s, - const string& upload_id, - const string& meta_oid, int num_parts, - int marker, map& parts, - int *next_marker, bool *truncated, - bool assume_unsorted) -{ - return list_multipart_parts(dpp, s->bucket.get(), s->cct, upload_id, - meta_oid, num_parts, marker, parts, - next_marker, truncated, assume_unsorted); -} - -int abort_multipart_upload(const DoutPrefixProvider *dpp, - rgw::sal::Store* store, CephContext *cct, - RGWObjectCtx *obj_ctx, rgw::sal::Bucket* bucket, - RGWMPObj& mp_obj) -{ - std::unique_ptr meta_obj = bucket->get_object( - rgw_obj_key(mp_obj.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART)); - meta_obj->set_in_extra_data(true); - meta_obj->set_hash_source(mp_obj.get_key()); - std::unique_ptr chain = store->get_gc_chain(meta_obj.get()); - list remove_objs; - map obj_parts; - bool truncated; - int marker = 0; - int ret; - uint64_t parts_accounted_size = 0; - - do { - ret = list_multipart_parts(dpp, bucket, cct, - mp_obj.get_upload_id(), mp_obj.get_meta(), - 1000, marker, obj_parts, &marker, &truncated); - if (ret < 0) { - ldpp_dout(dpp, 20) << __func__ << ": list_multipart_parts returned " << - ret << dendl; - return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; - } - - for (auto obj_iter = obj_parts.begin(); - obj_iter != obj_parts.end(); - ++obj_iter) { - RGWUploadPartInfo& obj_part = obj_iter->second; - if (obj_part.manifest.empty()) { - string oid = mp_obj.get_part(obj_iter->second.num); - std::unique_ptr obj = bucket->get_object( - rgw_obj_key(oid, std::string(), RGW_OBJ_NS_MULTIPART)); - obj->set_hash_source(mp_obj.get_key()); - ret = obj->delete_object(dpp, obj_ctx, null_yield); - if (ret < 0 && ret != -ENOENT) - return ret; - } else { - chain->update(dpp, &obj_part.manifest); - RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin(dpp); - if (oiter != obj_part.manifest.obj_end(dpp)) { - std::unique_ptr head = bucket->get_object(rgw_obj_key()); - rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store); - head->raw_obj_to_obj(raw_head); - - rgw_obj_index_key key; - head->get_key().get_index_key(&key); - remove_objs.push_back(key); - } - } - parts_accounted_size += obj_part.accounted_size; - } - } while (truncated); - - /* use upload id as tag and do it synchronously */ - ret = chain->send(mp_obj.get_upload_id()); - if (ret < 0) { - ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl; - if (ret == -ENOENT) { - return -ERR_NO_SUCH_UPLOAD; - } - //Delete objects inline if send chain to gc fails - chain->delete_inline(dpp, mp_obj.get_upload_id()); - } - - std::unique_ptr del_op = meta_obj->get_delete_op(obj_ctx); - del_op->params.bucket_owner = bucket->get_acl_owner(); - del_op->params.versioning_status = 0; - if (!remove_objs.empty()) { - del_op->params.remove_objs = &remove_objs; - } - - del_op->params.abortmp = true; - del_op->params.parts_accounted_size = parts_accounted_size; - - // and also remove the metadata obj - ret = del_op->delete_obj(dpp, null_yield); - if (ret < 0) { - ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " << - ret << dendl; - } - return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; -} - -int list_bucket_multiparts(const DoutPrefixProvider *dpp, - rgw::sal::Bucket* bucket, - const string& prefix, const string& marker, - const string& delim, - const int& max_uploads, - vector *objs, - map *common_prefixes, bool *is_truncated) -{ - rgw::sal::Bucket::ListParams params; - rgw::sal::Bucket::ListResults results; - MultipartMetaFilter mp_filter; - - params.prefix = prefix; - params.delim = delim; - params.marker = marker; - params.ns = RGW_OBJ_NS_MULTIPART; - params.filter = &mp_filter; - - int ret = bucket->list(dpp, params, max_uploads, results, null_yield); - - if (ret < 0) - return ret; - - *objs = std::move(results.objs); - *common_prefixes = std::move(results.common_prefixes); - *is_truncated = results.is_truncated; - - return ret; -} - -int abort_bucket_multiparts(const DoutPrefixProvider *dpp, - rgw::sal::Store* store, CephContext *cct, - rgw::sal::Bucket* bucket, string& prefix, string& delim) -{ - constexpr int max = 1000; - int ret, num_deleted = 0; - vector objs; - RGWObjectCtx obj_ctx(store); - string marker; - bool is_truncated; - - do { - ret = list_bucket_multiparts(dpp, bucket, prefix, marker, delim, - max, &objs, nullptr, &is_truncated); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << - " ERROR : calling list_bucket_multiparts; ret=" << ret << - "; bucket=\"" << bucket << "\"; prefix=\"" << - prefix << "\"; delim=\"" << delim << "\"" << dendl; - return ret; - } - ldpp_dout(dpp, 20) << __func__ << - " INFO: aborting and cleaning up multipart upload(s); bucket=\"" << - bucket << "\"; objs.size()=" << objs.size() << - "; is_truncated=" << is_truncated << dendl; - - if (!objs.empty()) { - RGWMPObj mp; - for (const auto& obj : objs) { - rgw_obj_key key(obj.key); - if (!mp.from_meta(key.name)) - continue; - ret = abort_multipart_upload(dpp, store, cct, &obj_ctx, bucket, mp); - if (ret < 0) { - // we're doing a best-effort; if something cannot be found, - // log it and keep moving forward - if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) { - ldpp_dout(dpp, 0) << __func__ << - " ERROR : failed to abort and clean-up multipart upload \"" << - key.get_oid() << "\"" << dendl; - return ret; - } else { - ldpp_dout(dpp, 10) << __func__ << - " NOTE : unable to find part(s) of " - "aborted multipart upload of \"" << key.get_oid() << - "\" for cleaning up" << dendl; - } - } - num_deleted++; - } - if (num_deleted) { - ldpp_dout(dpp, 0) << __func__ << - " WARNING : aborted " << num_deleted << - " incomplete multipart uploads" << dendl; - } - } - } while (is_truncated); - - return 0; -} diff --git a/src/rgw/rgw_multi.h b/src/rgw/rgw_multi.h index aeb983e0009b1..7174dfbb80c7f 100644 --- a/src/rgw/rgw_multi.h +++ b/src/rgw/rgw_multi.h @@ -109,38 +109,4 @@ public: extern bool is_v2_upload_id(const string& upload_id); -extern int list_multipart_parts(const DoutPrefixProvider *dpp, - rgw::sal::Bucket* bucket, - CephContext *cct, - const string& upload_id, - const string& meta_oid, int num_parts, - int marker, map& parts, - int *next_marker, bool *truncated, - bool assume_unsorted = false); - -extern int list_multipart_parts(const DoutPrefixProvider *dpp, - struct req_state *s, - const string& upload_id, - const string& meta_oid, int num_parts, - int marker, map& parts, - int *next_marker, bool *truncated, - bool assume_unsorted = false); - -extern int abort_multipart_upload(const DoutPrefixProvider *dpp, rgw::sal::Store* store, - CephContext *cct, RGWObjectCtx *obj_ctx, - rgw::sal::Bucket* bucket, RGWMPObj& mp_obj); - -extern int list_bucket_multiparts(const DoutPrefixProvider *dpp, - rgw::sal::Bucket* bucket, - const string& prefix, - const string& marker, - const string& delim, - const int& max_uploads, - vector *objs, - map *common_prefixes, bool *is_truncated); - -extern int abort_bucket_multiparts(const DoutPrefixProvider *dpp, - rgw::sal::Store* store, CephContext *cct, - rgw::sal::Bucket* bucket, - string& prefix, string& delim); #endif diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 829774cbe2a09..0b578d1bab93e 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -329,95 +329,6 @@ vector get_iam_user_policy_from_attr(CephContext* cct, return policies; } -static int get_obj_head(const DoutPrefixProvider *dpp, - struct req_state *s, - rgw::sal::Object* obj, - bufferlist *pbl) -{ - std::unique_ptr read_op = obj->get_read_op(s->obj_ctx); - obj->set_prefetch_data(s->obj_ctx); - - int ret = read_op->prepare(s->yield, dpp); - if (ret < 0) { - return ret; - } - - if (!pbl) { - return 0; - } - - ret = read_op->read(0, s->cct->_conf->rgw_max_chunk_size, *pbl, s->yield, dpp); - - return 0; -} - -struct multipart_upload_info -{ - rgw_placement_rule dest_placement; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(dest_placement, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(dest_placement, bl); - DECODE_FINISH(bl); - } -}; -WRITE_CLASS_ENCODER(multipart_upload_info) - -static int get_multipart_info(const DoutPrefixProvider *dpp, struct req_state *s, - rgw::sal::Object* obj, - multipart_upload_info *upload_info) -{ - bufferlist header; - - bufferlist headbl; - bufferlist *pheadbl = (upload_info ? &headbl : nullptr); - - int op_ret = get_obj_head(dpp, s, obj, pheadbl); - if (op_ret < 0) { - if (op_ret == -ENOENT) { - return -ERR_NO_SUCH_UPLOAD; - } - return op_ret; - } - - if (upload_info && headbl.length() > 0) { - auto hiter = headbl.cbegin(); - try { - decode(*upload_info, hiter); - } catch (buffer::error& err) { - ldpp_dout(s, 0) << "ERROR: failed to decode multipart upload info" << dendl; - return -EIO; - } - } - - return 0; -} - -static int get_multipart_info(const DoutPrefixProvider *dpp, struct req_state *s, - const string& meta_oid, - multipart_upload_info *upload_info, - rgw::sal::Attrs* attrs = nullptr) -{ - map::iterator iter; - bufferlist header; - - std::unique_ptr meta_obj; - meta_obj = s->bucket->get_object(rgw_obj_key(meta_oid, string(), mp_ns)); - meta_obj->set_in_extra_data(true); - - int ret = get_multipart_info(dpp, s, meta_obj.get(), upload_info); - if (ret >= 0 && attrs) { - *attrs = meta_obj->get_attrs(); - } - return ret; -} - static int read_bucket_policy(const DoutPrefixProvider *dpp, rgw::sal::Store* store, struct req_state *s, @@ -473,9 +384,9 @@ static int read_obj_policy(const DoutPrefixProvider *dpp, // 'copy_src' is used to make this function backward compatible. if (!upload_id.empty() && !copy_src) { /* multipart upload */ - RGWMPObj mp(object->get_name(), upload_id); - string oid = mp.get_meta(); - mpobj = bucket->get_object(rgw_obj_key(oid, string(), mp_ns)); + std::unique_ptr upload; + upload = store->get_multipart_upload(bucket, object->get_name(), upload_id); + mpobj = upload->get_meta_obj(); mpobj->set_in_extra_data(true); object = mpobj.get(); } @@ -3830,11 +3741,11 @@ void RGWPutObj::execute(optional_yield y) rgw_placement_rule *pdest_placement; - multipart_upload_info upload_info; if (multipart) { - RGWMPObj mp(s->object->get_name(), multipart_upload_id); - - op_ret = get_multipart_info(this, s, mp.get_meta(), &upload_info); + std::unique_ptr upload; + upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), + multipart_upload_id); + op_ret = upload->get_info(this, s->yield, s->obj_ctx, &pdest_placement); if (op_ret < 0) { if (op_ret != -ENOENT) { ldpp_dout(this, 0) << "ERROR: get_multipart_info returned " << op_ret << ": " << cpp_strerror(-op_ret) << dendl; @@ -3843,8 +3754,10 @@ void RGWPutObj::execute(optional_yield y) } return; } - pdest_placement = &upload_info.dest_placement; - ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl; + /* upload will go out of scope, so copy the dest placement for later use */ + s->dest_placement = *pdest_placement; + pdest_placement = &s->dest_placement; + ldpp_dout(this, 20) << "dest_placement for part=" << *pdest_placement << dendl; processor.emplace( &*aio, store, s->bucket.get(), pdest_placement, s->owner.get_id(), obj_ctx, s->object->clone(), @@ -5998,43 +5911,15 @@ void RGWInitMultipart::execute(optional_yield y) return; } - do { - char buf[33]; - std::unique_ptr obj; - gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1); - upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ - upload_id.append(buf); - - string tmp_obj_name; - RGWMPObj mp(s->object->get_name(), upload_id); - tmp_obj_name = mp.get_meta(); - - obj = s->bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns)); - // the meta object will be indexed with 0 size, we c - obj->set_in_extra_data(true); - obj->set_hash_source(s->object->get_name()); - - std::unique_ptr obj_op = obj->get_write_op(s->obj_ctx); - - obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */ - obj_op->params.owner = s->owner; - obj_op->params.category = RGWObjCategory::MultiMeta; - obj_op->params.flags = PUT_OBJ_CREATE_EXCL; - obj_op->params.mtime = &mtime; - obj_op->params.attrs = &attrs; + std::unique_ptr upload; + upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), + upload_id); + op_ret = upload->init(this, s->yield, s->obj_ctx, s->owner, s->dest_placement, attrs); - multipart_upload_info upload_info; - upload_info.dest_placement = s->dest_placement; - - bufferlist bl; - encode(upload_info, bl); - obj_op->params.data = &bl; - - op_ret = obj_op->prepare(s->yield); + if (op_ret == 0) { + upload_id = upload->get_upload_id(); + } - op_ret = obj_op->write_meta(this, bl.length(), 0, s->yield); - } while (op_ret == -EEXIST); - } int RGWCompleteMultipart::verify_permission(optional_yield y) @@ -6106,20 +5991,13 @@ void RGWCompleteMultipart::pre_exec() void RGWCompleteMultipart::execute(optional_yield y) { RGWMultiCompleteUpload *parts; - map::iterator iter; RGWMultiXMLParser parser; - string meta_oid; - map obj_parts; - map::iterator obj_iter; + std::unique_ptr upload; rgw::sal::Attrs attrs; off_t ofs = 0; - MD5 hash; - char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; - char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; bufferlist etag_bl; std::unique_ptr meta_obj; std::unique_ptr target_obj; - RGWMPObj mp; RGWObjManifest manifest; uint64_t olh_epoch = 0; @@ -6158,29 +6036,18 @@ void RGWCompleteMultipart::execute(optional_yield y) return; } - mp.init(s->object->get_name(), upload_id); + upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id); - meta_oid = mp.get_meta(); - - int total_parts = 0; - int handled_parts = 0; - int max_parts = 1000; - int marker = 0; - bool truncated; RGWCompressionInfo cs_info; bool compressed = false; uint64_t accounted_size = 0; - uint64_t min_part_size = s->cct->_conf->rgw_multipart_min_part_size; - list remove_objs; /* objects to be removed from index listing */ bool versioned_object = s->bucket->versioning_enabled(); - iter = parts->parts.begin(); - - meta_obj = s->bucket->get_object(rgw_obj_key(meta_oid, string(), mp_ns)); + meta_obj = upload->get_meta_obj(); meta_obj->set_in_extra_data(true); meta_obj->set_hash_source(s->object->get_name()); @@ -6220,115 +6087,13 @@ void RGWCompleteMultipart::execute(optional_yield y) return; } - do { - op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts, - marker, obj_parts, &marker, &truncated); - if (op_ret == -ENOENT) { - op_ret = -ERR_NO_SUCH_UPLOAD; - } - if (op_ret < 0) - return; - - total_parts += obj_parts.size(); - if (!truncated && total_parts != (int)parts->parts.size()) { - ldpp_dout(this, 0) << "NOTICE: total parts mismatch: have: " << total_parts - << " expected: " << parts->parts.size() << dendl; - op_ret = -ERR_INVALID_PART; - return; - } - - for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter, ++handled_parts) { - uint64_t part_size = obj_iter->second.accounted_size; - if (handled_parts < (int)parts->parts.size() - 1 && - part_size < min_part_size) { - op_ret = -ERR_TOO_SMALL; - return; - } - - char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; - if (iter->first != (int)obj_iter->first) { - ldpp_dout(this, 0) << "NOTICE: parts num mismatch: next requested: " - << iter->first << " next uploaded: " - << obj_iter->first << dendl; - op_ret = -ERR_INVALID_PART; - return; - } - string part_etag = rgw_string_unquote(iter->second); - if (part_etag.compare(obj_iter->second.etag) != 0) { - ldpp_dout(this, 0) << "NOTICE: etag mismatch: part: " << iter->first - << " etag: " << iter->second << dendl; - op_ret = -ERR_INVALID_PART; - return; - } - - hex_to_buf(obj_iter->second.etag.c_str(), petag, - CEPH_CRYPTO_MD5_DIGESTSIZE); - hash.Update((const unsigned char *)petag, sizeof(petag)); - - RGWUploadPartInfo& obj_part = obj_iter->second; - - /* update manifest for part */ - string oid = mp.get_part(obj_iter->second.num); - rgw_obj src_obj; - src_obj.init_ns(s->bucket->get_key(), oid, mp_ns); - - if (obj_part.manifest.empty()) { - ldpp_dout(this, 0) << "ERROR: empty manifest for object part: obj=" - << src_obj << dendl; - op_ret = -ERR_INVALID_PART; - return; - } else { - manifest.append(this, obj_part.manifest, store->get_zone()); - } - - bool part_compressed = (obj_part.cs_info.compression_type != "none"); - if ((handled_parts > 0) && - ((part_compressed != compressed) || - (cs_info.compression_type != obj_part.cs_info.compression_type))) { - ldpp_dout(this, 0) << "ERROR: compression type was changed during multipart upload (" - << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl; - op_ret = -ERR_INVALID_PART; - return; - } - - if (part_compressed) { - int64_t new_ofs; // offset in compression data for new part - if (cs_info.blocks.size() > 0) - new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; - else - new_ofs = 0; - for (const auto& block : obj_part.cs_info.blocks) { - compression_block cb; - cb.old_ofs = block.old_ofs + cs_info.orig_size; - cb.new_ofs = new_ofs; - cb.len = block.len; - cs_info.blocks.push_back(cb); - new_ofs = cb.new_ofs + cb.len; - } - if (!compressed) - cs_info.compression_type = obj_part.cs_info.compression_type; - cs_info.orig_size += obj_part.cs_info.orig_size; - compressed = true; - } - - rgw_obj_index_key remove_key; - src_obj.key.get_index_key(&remove_key); - - remove_objs.push_back(remove_key); - - ofs += obj_part.size; - accounted_size += obj_part.accounted_size; - } - } while (truncated); - hash.Final((unsigned char *)final_etag); - - buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); - snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, - "-%lld", (long long)parts->parts.size()); - etag = final_etag_str; - ldpp_dout(this, 10) << "calculated etag: " << final_etag_str << dendl; + op_ret = upload->complete(this, s->cct, etag, manifest, parts->parts, remove_objs, accounted_size, compressed, cs_info, ofs); + if (op_ret < 0) { + ldpp_dout(this, 0) << "ERROR: upload complete failed ret=" << op_ret << dendl; + return; + } - etag_bl.append(final_etag_str, strlen(final_etag_str)); + etag_bl.append(etag); attrs[RGW_ATTR_ETAG] = etag_bl; @@ -6384,7 +6149,7 @@ void RGWCompleteMultipart::execute(optional_yield y) } // send request to notification manager - int ret = res->publish_commit(this, ofs, ceph::real_clock::now(), final_etag_str, target_obj->get_instance()); + int ret = res->publish_commit(this, ofs, target_obj->get_mtime(), etag, target_obj->get_instance()); if (ret < 0) { ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; // too late to rollback operation, hence op_ret is not set here @@ -6511,23 +6276,16 @@ void RGWAbortMultipart::execute(optional_yield y) { op_ret = -EINVAL; string upload_id; - string meta_oid; upload_id = s->info.args.get("uploadId"); rgw_obj meta_obj; - RGWMPObj mp; + std::unique_ptr upload; if (upload_id.empty() || rgw::sal::Object::empty(s->object.get())) return; - mp.init(s->object->get_name(), upload_id); - meta_oid = mp.get_meta(); - - op_ret = get_multipart_info(this, s, meta_oid, nullptr); - if (op_ret < 0) - return; - + upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id); RGWObjectCtx *obj_ctx = static_cast(s->obj_ctx); - op_ret = abort_multipart_upload(this, store, s->cct, obj_ctx, s->bucket.get(), mp); + op_ret = upload->abort(this, s->cct, obj_ctx); } int RGWListMultipart::verify_permission(optional_yield y) @@ -6545,18 +6303,14 @@ void RGWListMultipart::pre_exec() void RGWListMultipart::execute(optional_yield y) { - string meta_oid; - RGWMPObj mp; - op_ret = get_params(y); if (op_ret < 0) return; - mp.init(s->object->get_name(), upload_id); - meta_oid = mp.get_meta(); + upload = store->get_multipart_upload(s->bucket.get(), s->object->get_name(), upload_id); rgw::sal::Attrs attrs; - op_ret = get_multipart_info(this, s, meta_oid, nullptr, &attrs); + op_ret = upload->get_info(this, s->yield, s->obj_ctx, nullptr, &attrs); /* decode policy */ map::iterator iter = attrs.find(RGW_ATTR_ACL); if (iter != attrs.end()) { @@ -6571,8 +6325,7 @@ void RGWListMultipart::execute(optional_yield y) if (op_ret < 0) return; - op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts, - marker, parts, NULL, &truncated); + op_ret = upload->list_parts(this, s->cct, max_parts, marker, NULL, &truncated); } int RGWListBucketMultiparts::verify_permission(optional_yield y) @@ -6592,9 +6345,6 @@ void RGWListBucketMultiparts::pre_exec() void RGWListBucketMultiparts::execute(optional_yield y) { - vector objs; - string marker_meta; - op_ret = get_params(y); if (op_ret < 0) return; @@ -6611,25 +6361,17 @@ void RGWListBucketMultiparts::execute(optional_yield y) delimiter="/"; } } - marker_meta = marker.get_meta(); - op_ret = list_bucket_multiparts(this, s->bucket.get(), prefix, marker_meta, delimiter, - max_uploads, &objs, &common_prefixes, &is_truncated); + op_ret = s->bucket->list_multiparts(this, prefix, marker_meta, + delimiter, max_uploads, uploads, + &common_prefixes, &is_truncated); if (op_ret < 0) { return; } - if (!objs.empty()) { - vector::iterator iter; - RGWMultipartUploadEntry entry; - for (iter = objs.begin(); iter != objs.end(); ++iter) { - rgw_obj_key key(iter->key); - if (!entry.mp.from_meta(key.name)) - continue; - entry.obj = *iter; - uploads.push_back(entry); - } - next_marker = entry; + if (!uploads.empty()) { + next_marker_key = uploads.back()->get_key(); + next_marker_upload_id = uploads.back()->get_upload_id(); } } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 156d0c541b0dc..0c8e8ad40b52a 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1884,7 +1884,7 @@ public: class RGWListMultipart : public RGWOp { protected: string upload_id; - map parts; + std::unique_ptr upload; int max_parts; int marker; RGWAccessControlPolicy policy; @@ -1912,26 +1912,17 @@ public: uint32_t op_mask() override { return RGW_OP_TYPE_READ; } }; -struct RGWMultipartUploadEntry { - rgw_bucket_dir_entry obj; - RGWMPObj mp; - - friend std::ostream& operator<<(std::ostream& out, - const RGWMultipartUploadEntry& e) { - constexpr char quote = '"'; - return out << "RGWMultipartUploadEntry{ obj.key=" << - quote << e.obj.key << quote << " mp=" << e.mp << " }"; - } -}; - class RGWListBucketMultiparts : public RGWOp { protected: string prefix; - RGWMPObj marker; - RGWMultipartUploadEntry next_marker; + string marker_meta; + string marker_key; + string marker_upload_id; + string next_marker_key; + string next_marker_upload_id; int max_uploads; string delimiter; - vector uploads; + vector> uploads; map common_prefixes; bool is_truncated; int default_max; diff --git a/src/rgw/rgw_orphan.cc b/src/rgw/rgw_orphan.cc index 39f2bb00a1359..921b226738948 100644 --- a/src/rgw/rgw_orphan.cc +++ b/src/rgw/rgw_orphan.cc @@ -1449,21 +1449,15 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp, { constexpr int max_uploads = 1000; constexpr int max_parts = 1000; - static const std::string mp_ns = RGW_OBJ_NS_MULTIPART; - static MultipartMetaFilter mp_filter; - + std::string marker; + vector> uploads; + bool is_truncated; int ret; - rgw::sal::Bucket::ListParams params; - rgw::sal::Bucket::ListResults results; - - params.ns = mp_ns; - params.filter = &mp_filter; - // use empty string for initial params.marker // use empty strings for params.{prefix,delim} do { - ret = bucket->list(dpp, params, max_uploads, results, null_yield); + ret = bucket->list_multiparts(dpp, string(), marker, string(), max_uploads, uploads, nullptr, &is_truncated); if (ret == -ENOENT) { // could bucket have been removed while this is running? ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ << @@ -1476,39 +1470,19 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp, return ret; } - if (!results.objs.empty()) { - std::vector uploads; - RGWMultipartUploadEntry entry; - for (const rgw_bucket_dir_entry& obj : results.objs) { - const rgw_obj_key& key = obj.key; - if (!entry.mp.from_meta(key.name)) { - // we only want the meta objects, so skip all the components - continue; - } - entry.obj = obj; - uploads.push_back(entry); - ldpp_dout(dpp, 20) << "RGWRadosList::" << __func__ << - " processing incomplete multipart entry " << - entry << dendl; - } - + if (!uploads.empty()) { // now process the uploads vector for (const auto& upload : uploads) { - const RGWMPObj& mp = upload.mp; int parts_marker = 0; bool is_parts_truncated = false; do { // while (is_parts_truncated); - std::map parts; - ret = list_multipart_parts(dpp, bucket, store->ctx(), - mp.get_upload_id(), mp.get_meta(), - max_parts, parts_marker, - parts, &parts_marker, - &is_parts_truncated); + ret = upload->list_parts(dpp, store->ctx(), max_parts, parts_marker, + &parts_marker, &is_parts_truncated); if (ret == -ENOENT) { ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ << ": WARNING: list_multipart_parts returned ret=-ENOENT " - "for " << mp.get_upload_id() << ", moving on" << dendl; + "for " << upload->get_upload_id() << ", moving on" << dendl; break; } else if (ret < 0) { ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ << @@ -1517,8 +1491,10 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp, return ret; } - for (auto& p : parts) { - RGWObjManifest& manifest = p.second.manifest; + for (auto& p : upload->get_parts()) { + rgw::sal::RadosMultipartPart* part = + dynamic_cast(p.second.get()); + RGWObjManifest& manifest = part->get_manifest(); for (auto obj_it = manifest.obj_begin(dpp); obj_it != manifest.obj_end(dpp); ++obj_it) { @@ -1530,7 +1506,7 @@ int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp, } while (is_parts_truncated); } // for (const auto& upload } // if objs not empty - } while (results.is_truncated); + } while (is_truncated); return 0; } // RGWRadosList::do_incomplete_multipart diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index e807c295ac4f7..dc1135d9bfd53 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -1612,8 +1612,14 @@ int RGWListBucketMultiparts_ObjStore::get_params(optional_yield y) string key_marker = s->info.args.get("key-marker"); string upload_id_marker = s->info.args.get("upload-id-marker"); - if (!key_marker.empty()) - marker.init(key_marker, upload_id_marker); + if (!key_marker.empty()) { + std::unique_ptr upload; + upload = store->get_multipart_upload(s->bucket.get(), key_marker, + upload_id_marker); + marker_meta = upload->get_meta(); + marker_key = upload->get_key(); + marker_upload_id = upload->get_upload_id(); + } return 0; } diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 316b36ef2f4fd..03608d9e09d83 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -2577,11 +2577,10 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter( { int res = 0; if (!multipart_upload_id.empty()) { - RGWMPObj mp(s->object->get_name(), multipart_upload_id); - std::unique_ptr obj = s->bucket->get_object( - rgw_obj_key(mp.get_meta(), - std::string(), - RGW_OBJ_NS_MULTIPART)); + std::unique_ptr upload = + store->get_multipart_upload(s->bucket.get(), s->object->get_name(), + multipart_upload_id); + std::unique_ptr obj = upload->get_meta_obj(); obj->set_in_extra_data(true); res = obj->get_obj_attrs(s->obj_ctx, s->yield, this); if (res == 0) { @@ -3786,13 +3785,13 @@ void RGWListMultipart_ObjStore_S3::send_response() if (op_ret == 0) { dump_start(s); s->formatter->open_object_section_in_ns("ListPartsResult", XMLNS_AWS_S3); - map::iterator iter; - map::reverse_iterator test_iter; + map>::iterator iter; + map>::reverse_iterator test_iter; int cur_max = 0; - iter = parts.begin(); - test_iter = parts.rbegin(); - if (test_iter != parts.rend()) { + iter = upload->get_parts().begin(); + test_iter = upload->get_parts().rbegin(); + if (test_iter != upload->get_parts().rend()) { cur_max = test_iter->first; } if (!s->bucket_tenant.empty()) @@ -3809,16 +3808,16 @@ void RGWListMultipart_ObjStore_S3::send_response() ACLOwner& owner = policy.get_owner(); dump_owner(s, owner.get_id(), owner.get_display_name()); - for (; iter != parts.end(); ++iter) { - RGWUploadPartInfo& info = iter->second; + for (; iter != upload->get_parts().end(); ++iter) { + rgw::sal::MultipartPart* part = iter->second.get(); s->formatter->open_object_section("Part"); - dump_time(s, "LastModified", &info.modified); + dump_time(s, "LastModified", &part->get_mtime()); - s->formatter->dump_unsigned("PartNumber", info.num); - s->formatter->dump_format("ETag", "\"%s\"", info.etag.c_str()); - s->formatter->dump_unsigned("Size", info.accounted_size); + s->formatter->dump_unsigned("PartNumber", part->get_num()); + s->formatter->dump_format("ETag", "\"%s\"", part->get_etag().c_str()); + s->formatter->dump_unsigned("Size", part->get_size()); s->formatter->close_section(); } s->formatter->close_section(); @@ -3845,38 +3844,34 @@ void RGWListBucketMultiparts_ObjStore_S3::send_response() s->formatter->dump_string("Bucket", s->bucket_name); if (!prefix.empty()) s->formatter->dump_string("ListMultipartUploadsResult.Prefix", prefix); - const string& key_marker = marker.get_key(); - if (!key_marker.empty()) - s->formatter->dump_string("KeyMarker", key_marker); - const string& upload_id_marker = marker.get_upload_id(); - if (!upload_id_marker.empty()) - s->formatter->dump_string("UploadIdMarker", upload_id_marker); - string next_key = next_marker.mp.get_key(); - if (!next_key.empty()) - s->formatter->dump_string("NextKeyMarker", next_key); - string next_upload_id = next_marker.mp.get_upload_id(); - if (!next_upload_id.empty()) - s->formatter->dump_string("NextUploadIdMarker", next_upload_id); + if (!marker_key.empty()) + s->formatter->dump_string("KeyMarker", marker_key); + if (!marker_upload_id.empty()) + s->formatter->dump_string("UploadIdMarker", marker_upload_id); + if (!next_marker_key.empty()) + s->formatter->dump_string("NextKeyMarker", next_marker_key); + if (!next_marker_upload_id.empty()) + s->formatter->dump_string("NextUploadIdMarker", next_marker_upload_id); s->formatter->dump_int("MaxUploads", max_uploads); if (!delimiter.empty()) s->formatter->dump_string("Delimiter", delimiter); s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false")); if (op_ret >= 0) { - vector::iterator iter; + vector>::iterator iter; for (iter = uploads.begin(); iter != uploads.end(); ++iter) { - RGWMPObj& mp = iter->mp; + rgw::sal::MultipartUpload* upload = iter->get(); s->formatter->open_array_section("Upload"); if (encode_url) { - s->formatter->dump_string("Key", url_encode(mp.get_key(), false)); + s->formatter->dump_string("Key", url_encode(upload->get_key(), false)); } else { - s->formatter->dump_string("Key", mp.get_key()); + s->formatter->dump_string("Key", upload->get_key()); } - s->formatter->dump_string("UploadId", mp.get_upload_id()); + s->formatter->dump_string("UploadId", upload->get_upload_id()); dump_owner(s, s->user->get_id(), s->user->get_display_name(), "Initiator"); dump_owner(s, s->user->get_id(), s->user->get_display_name()); s->formatter->dump_string("StorageClass", "STANDARD"); - dump_time(s, "Initiated", &iter->obj.meta.mtime); + dump_time(s, "Initiated", &upload->get_mtime()); s->formatter->close_section(); } if (!common_prefixes.empty()) { diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 7aa34080240a1..79bced546b9c9 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -37,6 +37,8 @@ using RGWBucketSyncPolicyHandlerRef = std::shared_ptr RGWSyncModuleInstanceRef; +class RGWCompressionInfo; + namespace rgw { class Aio; namespace IAM { struct Policy; } @@ -99,6 +101,7 @@ class User; class Bucket; class Object; class BucketList; +class MultipartUpload; struct MPSerializer; class Lifecycle; class Notification; @@ -228,6 +231,7 @@ class Store { virtual int get_oidc_providers(const DoutPrefixProvider *dpp, const std::string& tenant, vector>& providers) = 0; + virtual std::unique_ptr get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) = 0; virtual void finalize(void) = 0; @@ -438,6 +442,18 @@ class Bucket { static bool empty(Bucket* b) { return (!b || b->empty()); } virtual std::unique_ptr clone() = 0; + virtual int list_multiparts(const DoutPrefixProvider *dpp, + const string& prefix, + string& marker, + const string& delim, + const int& max_uploads, + vector>& uploads, + map *common_prefixes, + bool *is_truncated) = 0; + virtual int abort_multiparts(const DoutPrefixProvider *dpp, + CephContext *cct, + string& prefix, string& delim) = 0; + /* dang - This is temporary, until the API is completed */ rgw_bucket& get_key() { return info.bucket; } RGWBucketInfo& get_info() { return info; } @@ -784,6 +800,74 @@ class Object { } }; +class MultipartPart { +protected: + std::string oid; + +public: + MultipartPart() = default; + virtual ~MultipartPart() = default; + + virtual uint32_t get_num() = 0; + virtual uint64_t get_size() = 0; + virtual const std::string& get_etag() = 0; + virtual ceph::real_time& get_mtime() = 0; +}; + +class MultipartUpload { +protected: + Bucket* bucket; + std::map> parts; + +public: + MultipartUpload(Bucket* _bucket) : bucket(_bucket) {} + virtual ~MultipartUpload() = default; + + virtual const std::string& get_meta() const = 0; + virtual const std::string& get_key() const = 0; + virtual const std::string& get_upload_id() const = 0; + virtual ceph::real_time& get_mtime() = 0; + + std::map>& get_parts() { return parts; } + + virtual std::unique_ptr get_meta_obj() = 0; + + virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) = 0; + virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct, + int num_parts, int marker, + int* next_marker, bool* truncated, + bool assume_unsorted = false) = 0; + virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, + RGWObjectCtx* obj_ctx) = 0; + virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct, + std::string& etag, RGWObjManifest& manifest, + map& part_etags, + list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs) = 0; + + virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0; + + friend inline ostream& operator<<(ostream& out, const MultipartUpload& u) { + out << u.get_meta(); + if (!u.get_upload_id().empty()) + out << ":" << u.get_upload_id(); + return out; + } + friend inline ostream& operator<<(ostream& out, const MultipartUpload* u) { + if (!u) + out << ""; + else + out << *u; + return out; + } + friend inline ostream& operator<<(ostream& out, const + std::unique_ptr& p) { + out << p.get(); + return out; + } +}; + struct Serializer { Serializer() = default; virtual ~Serializer() = default; diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index be0c29eec1023..94b0d04e739a3 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -44,8 +44,28 @@ #define dout_subsys ceph_subsys_rgw +static string mp_ns = RGW_OBJ_NS_MULTIPART; + namespace rgw::sal { +struct multipart_upload_info +{ + rgw_placement_rule dest_placement; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(dest_placement, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(dest_placement, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(multipart_upload_info) + // default number of entries to list with each bucket listing call // (use marker to bridge between calls) static constexpr size_t listing_max_entries = 1000; @@ -266,7 +286,7 @@ int RadosBucket::remove_bucket(const DoutPrefixProvider* dpp, /* If there's a prefix, then we are aborting multiparts as well */ if (!prefix.empty()) { - ret = abort_bucket_multiparts(dpp, store, store->ctx(), this, prefix, delimiter); + ret = abort_multiparts(dpp, store->ctx(), prefix, delimiter); if (ret < 0) { return ret; } @@ -341,7 +361,7 @@ int RadosBucket::remove_bucket_bypass_gc(int concurrent_max, bool string prefix, delimiter; - ret = abort_bucket_multiparts(dpp, store, cct, this, prefix, delimiter); + ret = abort_multiparts(dpp, cct, prefix, delimiter); if (ret < 0) { return ret; } @@ -744,6 +764,102 @@ int RadosBucket::list(const DoutPrefixProvider* dpp, ListParams& params, int max return ret; } +int RadosBucket::list_multiparts(const DoutPrefixProvider *dpp, + const string& prefix, + string& marker, + const string& delim, + const int& max_uploads, + vector>& uploads, + map *common_prefixes, + bool *is_truncated) +{ + rgw::sal::Bucket::ListParams params; + rgw::sal::Bucket::ListResults results; + MultipartMetaFilter mp_filter; + + params.prefix = prefix; + params.delim = delim; + params.marker = marker; + params.ns = RGW_OBJ_NS_MULTIPART; + params.filter = &mp_filter; + + int ret = list(dpp, params, max_uploads, results, null_yield); + + if (ret < 0) + return ret; + + if (!results.objs.empty()) { + for (const rgw_bucket_dir_entry& dentry : results.objs) { + rgw_obj_key key(dentry.key); + uploads.push_back(store->get_multipart_upload(this, key.name)); + } + } + if (common_prefixes) { + *common_prefixes = std::move(results.common_prefixes); + } + *is_truncated = results.is_truncated; + marker = params.marker.name; + + return 0; +} + +int RadosBucket::abort_multiparts(const DoutPrefixProvider *dpp, + CephContext *cct, + string& prefix, string& delim) +{ + constexpr int max = 1000; + int ret, num_deleted = 0; + vector> uploads; + RGWObjectCtx obj_ctx(store); + string marker; + bool is_truncated; + + do { + ret = list_multiparts(dpp, prefix, marker, delim, + max, uploads, nullptr, &is_truncated); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + " ERROR : calling list_bucket_multiparts; ret=" << ret << + "; bucket=\"" << this << "\"; prefix=\"" << + prefix << "\"; delim=\"" << delim << "\"" << dendl; + return ret; + } + ldpp_dout(dpp, 20) << __func__ << + " INFO: aborting and cleaning up multipart upload(s); bucket=\"" << + this << "\"; uploads.size()=" << uploads.size() << + "; is_truncated=" << is_truncated << dendl; + + if (!uploads.empty()) { + for (const auto& upload : uploads) { + ret = upload->abort(dpp, cct, &obj_ctx); + if (ret < 0) { + // we're doing a best-effort; if something cannot be found, + // log it and keep moving forward + if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) { + ldpp_dout(dpp, 0) << __func__ << + " ERROR : failed to abort and clean-up multipart upload \"" << + upload->get_meta() << "\"" << dendl; + return ret; + } else { + ldpp_dout(dpp, 10) << __func__ << + " NOTE : unable to find part(s) of " + "aborted multipart upload of \"" << upload->get_meta() << + "\" for cleaning up" << dendl; + } + } + num_deleted++; + } + if (num_deleted) { + ldpp_dout(dpp, 0) << __func__ << + " WARNING : aborted " << num_deleted << + " incomplete multipart uploads" << dendl; + } + } + } while (is_truncated); + + return 0; +} + std::unique_ptr RadosStore::get_user(const rgw_user &u) { return std::unique_ptr(new RadosUser(this, u)); @@ -1350,6 +1466,11 @@ int RadosStore::get_oidc_providers(const DoutPrefixProvider *dpp, return 0; } +std::unique_ptr RadosStore::get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional upload_id, ceph::real_time mtime) +{ + return std::unique_ptr(new RadosMultipartUpload(this, bucket, oid, upload_id, mtime)); +} + int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx* ioctx) { return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx); @@ -1922,6 +2043,441 @@ int RadosObject::swift_versioning_copy(RGWObjectCtx* obj_ctx, y); } +int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct, + RGWObjectCtx *obj_ctx) +{ + std::unique_ptr meta_obj = get_meta_obj(); + meta_obj->set_in_extra_data(true); + meta_obj->set_hash_source(mp_obj.get_key()); + std::unique_ptr chain = store->get_gc_chain(meta_obj.get()); + list remove_objs; + bool truncated; + int marker = 0; + int ret; + uint64_t parts_accounted_size = 0; + + do { + ret = list_parts(dpp, cct, 1000, marker, &marker, &truncated); + if (ret < 0) { + ldpp_dout(dpp, 20) << __func__ << ": RadosMultipartUpload::list_parts returned " << + ret << dendl; + return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; + } + + for (auto part_it = parts.begin(); + part_it != parts.end(); + ++part_it) { + RadosMultipartPart* obj_part = dynamic_cast(part_it->second.get()); + if (obj_part->info.manifest.empty()) { + std::unique_ptr obj = bucket->get_object( + rgw_obj_key(obj_part->oid, std::string(), RGW_OBJ_NS_MULTIPART)); + obj->set_hash_source(mp_obj.get_key()); + ret = obj->delete_object(dpp, obj_ctx, null_yield); + if (ret < 0 && ret != -ENOENT) + return ret; + } else { + chain->update(dpp, &obj_part->info.manifest); + RGWObjManifest::obj_iterator oiter = obj_part->info.manifest.obj_begin(dpp); + if (oiter != obj_part->info.manifest.obj_end(dpp)) { + std::unique_ptr head = bucket->get_object(rgw_obj_key()); + rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store); + head->raw_obj_to_obj(raw_head); + + rgw_obj_index_key key; + head->get_key().get_index_key(&key); + remove_objs.push_back(key); + } + } + parts_accounted_size += obj_part->info.accounted_size; + } + } while (truncated); + + /* use upload id as tag and do it synchronously */ + ret = chain->send(mp_obj.get_upload_id()); + if (ret < 0) { + ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl; + if (ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + //Delete objects inline if send chain to gc fails + chain->delete_inline(dpp, mp_obj.get_upload_id()); + } + + std::unique_ptr del_op = meta_obj->get_delete_op(obj_ctx); + del_op->params.bucket_owner = bucket->get_acl_owner(); + del_op->params.versioning_status = 0; + if (!remove_objs.empty()) { + del_op->params.remove_objs = &remove_objs; + } + + del_op->params.abortmp = true; + del_op->params.parts_accounted_size = parts_accounted_size; + + // and also remove the metadata obj + ret = del_op->delete_obj(dpp, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " << + ret << dendl; + } + return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; +} + +std::unique_ptr RadosMultipartUpload::get_meta_obj() +{ + return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns)); +} + +int RadosMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) +{ + int ret; + std::string oid = mp_obj.get_key(); + + do { + char buf[33]; + string tmp_obj_name; + std::unique_ptr obj; + gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); + std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ + upload_id.append(buf); + + mp_obj.init(oid, upload_id); + tmp_obj_name = mp_obj.get_meta(); + + obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns)); + // the meta object will be indexed with 0 size, we c + obj->set_in_extra_data(true); + obj->set_hash_source(oid); + + std::unique_ptr obj_op = obj->get_write_op(obj_ctx); + + obj_op->params.versioning_disabled = true; /* no versioning for multipart meta */ + obj_op->params.owner = owner; + obj_op->params.category = RGWObjCategory::MultiMeta; + obj_op->params.flags = PUT_OBJ_CREATE_EXCL; + obj_op->params.mtime = &mtime; + obj_op->params.attrs = &attrs; + + multipart_upload_info upload_info; + upload_info.dest_placement = dest_placement; + + bufferlist bl; + encode(upload_info, bl); + obj_op->params.data = &bl; + + ret = obj_op->prepare(y); + + ret = obj_op->write_meta(dpp, bl.length(), 0, y); + } while (ret == -EEXIST); + + return ret; +} + +int RadosMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, + int num_parts, int marker, + int *next_marker, bool *truncated, + bool assume_unsorted) +{ + map parts_map; + map::iterator iter; + + std::unique_ptr obj = bucket->get_object( + rgw_obj_key(get_meta(), std::string(), RGW_OBJ_NS_MULTIPART)); + obj->set_in_extra_data(true); + + bool sorted_omap = is_v2_upload_id(get_upload_id()) && !assume_unsorted; + + parts.clear(); + + int ret; + if (sorted_omap) { + string p; + p = "part."; + char buf[32]; + + snprintf(buf, sizeof(buf), "%08d", marker); + p.append(buf); + + ret = obj->omap_get_vals(dpp, p, num_parts + 1, &parts_map, + nullptr, null_yield); + } else { + ret = obj->omap_get_all(dpp, &parts_map, null_yield); + } + if (ret < 0) { + return ret; + } + + int i; + int last_num = 0; + + uint32_t expected_next = marker + 1; + + for (i = 0, iter = parts_map.begin(); + (i < num_parts || !sorted_omap) && iter != parts_map.end(); + ++iter, ++i) { + bufferlist& bl = iter->second; + auto bli = bl.cbegin(); + std::unique_ptr part = std::make_unique(); + try { + decode(part->info, bli); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" << + dendl; + return -EIO; + } + if (sorted_omap) { + if (part->info.num != expected_next) { + /* ouch, we expected a specific part num here, but we got a + * different one. Either a part is missing, or it could be a + * case of mixed rgw versions working on the same upload, + * where one gateway doesn't support correctly sorted omap + * keys for multipart upload just assume data is unsorted. + */ + return list_parts(dpp, cct, num_parts, marker, next_marker, truncated, true); + } + expected_next++; + } + if (sorted_omap || + (int)part->info.num > marker) { + last_num = part->info.num; + parts[part->info.num] = std::move(part); + } + } + + if (sorted_omap) { + if (truncated) { + *truncated = (iter != parts_map.end()); + } + } else { + /* rebuild a map with only num_parts entries */ + std::map> new_parts; + std::map>::iterator piter; + for (i = 0, piter = parts.begin(); + i < num_parts && piter != parts.end(); + ++i, ++piter) { + last_num = piter->first; + new_parts[piter->first] = std::move(piter->second); + } + + if (truncated) { + *truncated = (piter != parts.end()); + } + + parts.swap(new_parts); + } + + if (next_marker) { + *next_marker = last_num; + } + + return 0; +} + +int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, CephContext* cct, + std::string& etag, RGWObjManifest& manifest, + map& part_etags, + list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs) +{ + char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; + MD5 hash; + bool truncated; + int ret; + + int total_parts = 0; + int handled_parts = 0; + int max_parts = 1000; + int marker = 0; + uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; + auto etags_iter = part_etags.begin(); + + do { + ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated); + if (ret == -ENOENT) { + ret = -ERR_NO_SUCH_UPLOAD; + } + if (ret < 0) + return ret; + + total_parts += parts.size(); + if (!truncated && total_parts != (int)part_etags.size()) { + ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts + << " expected: " << part_etags.size() << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) { + RadosMultipartPart* part = dynamic_cast(obj_iter->second.get()); + uint64_t part_size = part->get_size(); + if (handled_parts < (int)part_etags.size() - 1 && + part_size < min_part_size) { + ret = -ERR_TOO_SMALL; + return ret; + } + + char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + if (etags_iter->first != (int)obj_iter->first) { + ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: " + << etags_iter->first << " next uploaded: " + << obj_iter->first << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + string part_etag = rgw_string_unquote(etags_iter->second); + if (part_etag.compare(part->get_etag()) != 0) { + ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first + << " etag: " << etags_iter->second << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + hex_to_buf(part->get_etag().c_str(), petag, + CEPH_CRYPTO_MD5_DIGESTSIZE); + hash.Update((const unsigned char *)petag, sizeof(petag)); + + RGWUploadPartInfo& obj_part = part->info; + + /* update manifest for part */ + string oid = mp_obj.get_part(part->info.num); + rgw_obj src_obj; + src_obj.init_ns(bucket->get_key(), oid, mp_ns); + + if (obj_part.manifest.empty()) { + ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj=" + << src_obj << dendl; + ret = -ERR_INVALID_PART; + return ret; + } else { + manifest.append(dpp, obj_part.manifest, store->get_zone()); + } + + bool part_compressed = (obj_part.cs_info.compression_type != "none"); + if ((handled_parts > 0) && + ((part_compressed != compressed) || + (cs_info.compression_type != obj_part.cs_info.compression_type))) { + ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload (" + << cs_info.compression_type << ">>" << obj_part.cs_info.compression_type << ")" << dendl; + ret = -ERR_INVALID_PART; + return ret; + } + + if (part_compressed) { + int64_t new_ofs; // offset in compression data for new part + if (cs_info.blocks.size() > 0) + new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; + else + new_ofs = 0; + for (const auto& block : obj_part.cs_info.blocks) { + compression_block cb; + cb.old_ofs = block.old_ofs + cs_info.orig_size; + cb.new_ofs = new_ofs; + cb.len = block.len; + cs_info.blocks.push_back(cb); + new_ofs = cb.new_ofs + cb.len; + } + if (!compressed) + cs_info.compression_type = obj_part.cs_info.compression_type; + cs_info.orig_size += obj_part.cs_info.orig_size; + compressed = true; + } + + rgw_obj_index_key remove_key; + src_obj.key.get_index_key(&remove_key); + + remove_objs.push_back(remove_key); + + ofs += obj_part.size; + accounted_size += obj_part.accounted_size; + } + } while (truncated); + hash.Final((unsigned char *)final_etag); + + buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); + snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], + sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, + "-%lld", (long long)part_etags.size()); + etag = final_etag_str; + ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl; + + return ret; +} + +int RadosMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs) +{ + if (!rule && !attrs) { + return 0; + } + + if (rule) { + if (!placement.empty()) { + *rule = &placement; + if (!attrs) { + /* Don't need attrs, done */ + return 0; + } + } else { + *rule = nullptr; + } + } + + /* We need either attributes or placement, so we need a read */ + std::unique_ptr meta_obj; + meta_obj = get_meta_obj(); + meta_obj->set_in_extra_data(true); + + multipart_upload_info upload_info; + bufferlist headbl; + + /* Read the obj head which contains the multipart_upload_info */ + std::unique_ptr read_op = meta_obj->get_read_op(obj_ctx); + meta_obj->set_prefetch_data(obj_ctx); + + int ret = read_op->prepare(y, dpp); + if (ret < 0) { + if (ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + return ret; + } + + if (attrs) { + /* Attrs are filled in by prepare */ + *attrs = meta_obj->get_attrs(); + if (!rule || *rule != nullptr) { + /* placement was cached; don't actually read */ + return 0; + } + } + + /* Now read the placement from the head */ + ret = read_op->read(0, store->ctx()->_conf->rgw_max_chunk_size, headbl, y, dpp); + if (ret < 0) { + if (ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + return ret; + } + + if (headbl.length() <= 0) { + return -ERR_NO_SUCH_UPLOAD; + } + + /* Decode multipart_upload_info */ + auto hiter = headbl.cbegin(); + try { + decode(upload_info, hiter); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl; + return -EIO; + } + placement = upload_info.dest_placement; + *rule = &placement; + + return 0; +} + MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) : lock(lock_name) { diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index 5fdf440bff586..dc057ffba4031 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -20,11 +20,14 @@ #include "rgw_notify.h" #include "rgw_oidc_provider.h" #include "rgw_role.h" +#include "rgw_multi.h" +#include "services/svc_tier_rados.h" #include "cls/lock/cls_lock_client.h" namespace rgw { namespace sal { class RadosStore; +class RadosMultipartUpload; class RadosCompletions : public Completions { public: @@ -331,6 +334,17 @@ class RadosBucket : public Bucket { virtual std::unique_ptr clone() override { return std::make_unique(*this); } + virtual int list_multiparts(const DoutPrefixProvider *dpp, + const string& prefix, + string& marker, + const string& delim, + const int& max_uploads, + vector>& uploads, + map *common_prefixes, + bool *is_truncated) override; + virtual int abort_multiparts(const DoutPrefixProvider *dpp, + CephContext *cct, + string& prefix, string& delim) override; private: int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr); @@ -465,6 +479,7 @@ class RadosStore : public Store { virtual int get_oidc_providers(const DoutPrefixProvider *dpp, const std::string& tenant, vector>& providers) override; + virtual std::unique_ptr get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) override; virtual void finalize(void) override; @@ -493,6 +508,56 @@ class RadosStore : public Store { void setUserCtl(RGWUserCtl *_ctl) { user_ctl = _ctl; } }; +class RadosMultipartPart : public MultipartPart { +protected: + RGWUploadPartInfo info; + +public: + RadosMultipartPart() = default; + virtual ~RadosMultipartPart() = default; + + virtual uint32_t get_num() { return info.num; } + virtual uint64_t get_size() { return info.accounted_size; } + virtual const std::string& get_etag() { return info.etag; } + virtual ceph::real_time& get_mtime() { return info.modified; } + + /* For RadosStore code */ + RGWObjManifest& get_manifest() { return info.manifest; } + + friend class RadosMultipartUpload; +}; + +class RadosMultipartUpload : public MultipartUpload { + RadosStore* store; + RGWMPObj mp_obj; + ceph::real_time mtime; + rgw_placement_rule placement; + +public: + RadosMultipartUpload(RadosStore* _store, Bucket* _bucket, const std::string& oid, std::optional upload_id, ceph::real_time _mtime) : MultipartUpload(_bucket), store(_store), mp_obj(oid, upload_id), mtime(_mtime) {} + virtual ~RadosMultipartUpload() = default; + + virtual const std::string& get_meta() const { return mp_obj.get_meta(); } + virtual const std::string& get_key() const { return mp_obj.get_key(); } + virtual const std::string& get_upload_id() const { return mp_obj.get_upload_id(); } + virtual ceph::real_time& get_mtime() { return mtime; } + virtual std::unique_ptr get_meta_obj() override; + virtual int init(const DoutPrefixProvider* dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) override; + virtual int list_parts(const DoutPrefixProvider* dpp, CephContext* cct, + int num_parts, int marker, + int* next_marker, bool* truncated, + bool assume_unsorted = false) override; + virtual int abort(const DoutPrefixProvider* dpp, CephContext* cct, + RGWObjectCtx* obj_ctx) override; + virtual int complete(const DoutPrefixProvider* dpp, CephContext* cct, + std::string& etag, RGWObjManifest& manifest, + map& part_etags, + list& remove_objs, + uint64_t& accounted_size, bool& compressed, + RGWCompressionInfo& cs_info, off_t& ofs) override; + virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override; +}; + class MPRadosSerializer : public MPSerializer { librados::IoCtx ioctx; rados::cls::lock::Lock lock; diff --git a/src/rgw/services/svc_tier_rados.h b/src/rgw/services/svc_tier_rados.h index e46868db225ec..fd48e40e8a76d 100644 --- a/src/rgw/services/svc_tier_rados.h +++ b/src/rgw/services/svc_tier_rados.h @@ -34,6 +34,13 @@ public: RGWMPObj(const string& _oid, const string& _upload_id) { init(_oid, _upload_id, _upload_id); } + RGWMPObj(const string& _oid, std::optional _upload_id) { + if (_upload_id) { + init(_oid, *_upload_id, *_upload_id); + } else { + from_meta(_oid); + } + } void init(const string& _oid, const string& _upload_id) { init(_oid, _upload_id, _upload_id); } -- 2.39.5