From: Yehuda Sadeh Date: Fri, 17 Jan 2014 17:30:30 +0000 (-0800) Subject: rgw: fix multipart upload listing X-Git-Tag: v0.78~262^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2e7bd83f999e9cc865de71ff3063816239db3b44;p=ceph.git rgw: fix multipart upload listing Fixes: #7169 A separate fix has been created for dumpling. Previously we read the entire list of parts, disregarding the actual marker and the requested max parts. This fix refactors the way we read the list of parts (doing it in parts, using marker). Create new upload-id format that is used to identify uploads with sorted omap entries. Make sure we're backward compatible and handle correctly mixed-versions rgw uploads. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index a73cf395bb4b..e0bbd0f2b421 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -33,6 +33,8 @@ using ceph::crypto::MD5; static string mp_ns = RGW_OBJ_NS_MULTIPART; static string shadow_ns = RGW_OBJ_NS_SHADOW; +#define MULTIPART_UPLOAD_ID_PREFIX "2/" // must contain a unique char that may not come up in gen_rand_alpha() + class MultipartMetaFilter : public RGWAccessListFilter { public: MultipartMetaFilter() {} @@ -1339,6 +1341,7 @@ class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic string part_num; RGWMPObj mp; req_state *s; + string upload_id; protected: bool immutable_head() { return true; } @@ -1355,7 +1358,6 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx) RGWPutObjProcessor::prepare(store, obj_ctx); string oid = obj_str; - string upload_id; upload_id = s->info.args.get("uploadId"); mp.init(oid, upload_id); @@ -1374,6 +1376,13 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx) return 0; } +static bool is_v2_upload_id(const string& upload_id) +{ + const char *uid = upload_id.c_str(); + + return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0); +} + int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs) { complete_writing_data(); @@ -1390,7 +1399,21 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_ bufferlist bl; RGWUploadPartInfo info; string p = "part."; - p.append(part_num); + bool sorted_omap = is_v2_upload_id(upload_id); + + if (sorted_omap) { + string err; + int part_num_int = strict_strtol(part_num.c_str(), 10, &err); + if (!err.empty()) { + dout(10) << "bad part number specified: " << part_num << dendl; + return -EINVAL; + } + char buf[32]; + snprintf(buf, sizeof(buf), "%08d", part_num_int); + p.append(buf); + } else { + p.append(part_num); + } info.num = atoi(part_num.c_str()); info.etag = etag; info.size = s->obj_size; @@ -2323,7 +2346,8 @@ void RGWInitMultipart::execute() do { char buf[33]; gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1); - upload_id = buf; + upload_id = "2/"; /* v2 upload id */ + upload_id.append(buf); string tmp_obj_name; RGWMPObj mp(s->object_str, upload_id); @@ -2335,8 +2359,8 @@ void RGWInitMultipart::execute() } while (ret == -EEXIST); } -static int get_multiparts_info(RGWRados *store, struct req_state *s, string& meta_oid, map& parts, - RGWAccessControlPolicy& policy, map& attrs) +static int get_multipart_info(RGWRados *store, struct req_state *s, string& meta_oid, + RGWAccessControlPolicy *policy, map& attrs) { map parts_map; map::iterator iter; @@ -2349,27 +2373,67 @@ static int get_multiparts_info(RGWRados *store, struct req_state *s, string& met if (ret < 0) return ret; - ret = store->omap_get_all(obj, header, parts_map); - if (ret < 0) - return ret; - - for (iter = attrs.begin(); iter != attrs.end(); ++iter) { - string name = iter->first; - if (name.compare(RGW_ATTR_ACL) == 0) { - bufferlist& bl = iter->second; - bufferlist::iterator bli = bl.begin(); - try { - ::decode(policy, bli); - } catch (buffer::error& err) { - ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl; - return -EIO; + if (policy) { + for (iter = attrs.begin(); iter != attrs.end(); ++iter) { + string name = iter->first; + if (name.compare(RGW_ATTR_ACL) == 0) { + bufferlist& bl = iter->second; + bufferlist::iterator bli = bl.begin(); + try { + ::decode(*policy, bli); + } catch (buffer::error& err) { + ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl; + return -EIO; + } + break; } - break; } } + return 0; +} + +static int list_multipart_parts(RGWRados *store, struct req_state *s, + const string& upload_id, + string& meta_oid, int num_parts, + int marker, map& parts, + int *next_marker, bool *truncated, + bool assume_unsorted = false) +{ + map parts_map; + map::iterator iter; + bufferlist header; + + rgw_obj obj; + obj.init_ns(s->bucket, meta_oid, mp_ns); + + bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted; + + int ret; + + parts.clear(); + + if (sorted_omap) { + string p; + p = "part."; + char buf[32]; + + snprintf(buf, sizeof(buf), "%08d", marker); + p.append(buf); + + ret = store->omap_get_vals(obj, header, p, num_parts + 1, parts_map); + } else { + ret = store->omap_get_all(obj, header, parts_map); + } + if (ret < 0) + return ret; + + int i; + int last_num = 0; + + uint32_t expected_next = marker + 1; - for (iter = parts_map.begin(); iter != parts_map.end(); ++iter) { + for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) { bufferlist& bl = iter->second; bufferlist::iterator bli = bl.begin(); RGWUploadPartInfo info; @@ -2379,8 +2443,48 @@ static int get_multiparts_info(RGWRados *store, struct req_state *s, string& met ldout(s->cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl; return -EIO; } - parts[info.num] = info; + if (sorted_omap) { + if (info.num != expected_next) { + /* ouch, we expected a specific part num here, but we got a different one. Either + * a part is missing, or it could be a case of mixed rgw versions working on the same + * upload, where one gateway doesn't support correctly sorted omap keys for multipart + * upload just assume data is unsorted. + */ + return list_multipart_parts(store, s, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true); + } + expected_next++; + } + if (sorted_omap || + (int)info.num > marker) { + parts[info.num] = info; + last_num = info.num; + } } + + if (sorted_omap) { + if (truncated) + *truncated = (iter != parts_map.end()); + } else { + /* rebuild a map with only num_parts entries */ + + map new_parts; + map::iterator piter; + + for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) { + new_parts[piter->first] = piter->second; + last_num = piter->first; + } + + if (truncated) + *truncated = (piter != parts.end()); + + parts.swap(new_parts); + } + + if (next_marker) { + *next_marker = last_num; + } + return 0; } @@ -2405,7 +2509,6 @@ void RGWCompleteMultipart::execute() string meta_oid; map obj_parts; map::iterator obj_iter; - RGWAccessControlPolicy policy(s->cct); map attrs; off_t ofs = 0; MD5 hash; @@ -2442,44 +2545,71 @@ void RGWCompleteMultipart::execute() return; } - // ensure that each part if of the minimum size - for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) { - if ((obj_iter->second).size < min_part_size) { - ret = -ERR_TOO_SMALL; - return; - } - } - mp.init(s->object_str, upload_id); meta_oid = mp.get_meta(); - ret = get_multiparts_info(store, s, meta_oid, obj_parts, policy, attrs); - if (ret == -ENOENT) - ret = -ERR_NO_SUCH_UPLOAD; - if (parts->parts.size() != obj_parts.size()) - ret = -ERR_INVALID_PART; - if (ret < 0) - return; + int total_parts = 0; + int max_parts = 1000; + int marker = 0; + bool truncated; - for (iter = parts->parts.begin(), obj_iter = obj_parts.begin(); - iter != parts->parts.end() && obj_iter != obj_parts.end(); - ++iter, ++obj_iter) { - char etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; - if (iter->first != (int)obj_iter->first) { - ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: " << iter->first << " next uploaded: " << obj_iter->first << dendl; - ret = -ERR_INVALID_PART; - return; + list remove_objs; /* objects to be removed from index listing */ + + iter = parts->parts.begin(); + + do { + ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, obj_parts, &marker, &truncated); + if (ret == -ENOENT) { + ret = -ERR_NO_SUCH_UPLOAD; } - string part_etag = rgw_string_unquote(iter->second); - if (part_etag.compare(obj_iter->second.etag) != 0) { - ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first << " etag: " << iter->second << dendl; + if (ret < 0) + return; + + total_parts += obj_parts.size(); + if (!truncated && total_parts != (int)parts->parts.size()) { ret = -ERR_INVALID_PART; return; } - hex_to_buf(obj_iter->second.etag.c_str(), etag, CEPH_CRYPTO_MD5_DIGESTSIZE); - hash.Update((const byte *)etag, sizeof(etag)); - } + for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter) { + char etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; + if (iter->first != (int)obj_iter->first) { + ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: " << iter->first << " next uploaded: " << obj_iter->first << dendl; + ret = -ERR_INVALID_PART; + return; + } + string part_etag = rgw_string_unquote(iter->second); + if (part_etag.compare(obj_iter->second.etag) != 0) { + ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first << " etag: " << iter->second << dendl; + ret = -ERR_INVALID_PART; + return; + } + + hex_to_buf(obj_iter->second.etag.c_str(), etag, CEPH_CRYPTO_MD5_DIGESTSIZE); + hash.Update((const byte *)etag, sizeof(etag)); + + RGWUploadPartInfo& obj_part = obj_iter->second; + + /* update manifest for part */ + string oid = mp.get_part(obj_iter->second.num); + rgw_obj src_obj; + src_obj.init_ns(s->bucket, oid, mp_ns); + + if (obj_part.manifest.empty()) { + RGWObjManifestPart& part = manifest.objs[ofs]; + + part.loc = src_obj; + part.loc_ofs = 0; + part.size = obj_iter->second.size; + } else { + manifest.append(obj_part.manifest); + } + + remove_objs.push_back(src_obj.object); + + ofs += obj_part.size; + } + } while (truncated); hash.Final((byte *)final_etag); buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); @@ -2493,29 +2623,6 @@ void RGWCompleteMultipart::execute() target_obj.init(s->bucket, s->object_str); - list remove_objs; /* objects to be removed from index listing */ - - for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) { - RGWUploadPartInfo& obj_part = obj_iter->second; - string oid = mp.get_part(obj_iter->second.num); - rgw_obj src_obj; - src_obj.init_ns(s->bucket, oid, mp_ns); - - if (obj_part.manifest.empty()) { - RGWObjManifestPart& part = manifest.objs[ofs]; - - part.loc = src_obj; - part.loc_ofs = 0; - part.size = obj_iter->second.size; - } else { - manifest.append(obj_part.manifest); - } - - remove_objs.push_back(src_obj.object); - - ofs += obj_part.size; - } - manifest.obj_size = ofs; store->set_atomic(s->obj_ctx, target_obj); @@ -2560,7 +2667,6 @@ void RGWAbortMultipart::execute() upload_id = s->info.args.get("uploadId"); map obj_parts; map::iterator obj_iter; - RGWAccessControlPolicy policy(s->cct); map attrs; rgw_obj meta_obj; RGWMPObj mp; @@ -2572,31 +2678,42 @@ void RGWAbortMultipart::execute() mp.init(s->object_str, upload_id); meta_oid = mp.get_meta(); - ret = get_multiparts_info(store, s, meta_oid, obj_parts, policy, attrs); + ret = get_multipart_info(store, s, meta_oid, NULL, attrs); if (ret < 0) return; - for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) { - RGWUploadPartInfo& obj_part = obj_iter->second; + bool truncated; + int marker = 0; + int max_parts = 1000; - if (obj_part.manifest.empty()) { - string oid = mp.get_part(obj_iter->second.num); - rgw_obj obj; - obj.init_ns(s->bucket, oid, mp_ns); - ret = store->delete_obj(s->obj_ctx, owner, obj); - if (ret < 0 && ret != -ENOENT) - return; - } else { - RGWObjManifest& manifest = obj_part.manifest; - map::iterator oiter; - for (oiter = manifest.objs.begin(); oiter != manifest.objs.end(); ++oiter) { - RGWObjManifestPart& part = oiter->second; - ret = store->delete_obj(s->obj_ctx, owner, part.loc); + do { + ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, obj_parts, &marker, &truncated); + if (ret < 0) + return; + + for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) { + RGWUploadPartInfo& obj_part = obj_iter->second; + + if (obj_part.manifest.empty()) { + string oid = mp.get_part(obj_iter->second.num); + rgw_obj obj; + obj.init_ns(s->bucket, oid, mp_ns); + ret = store->delete_obj(s->obj_ctx, owner, obj); if (ret < 0 && ret != -ENOENT) return; + } else { + RGWObjManifest& manifest = obj_part.manifest; + map::iterator oiter; + for (oiter = manifest.objs.begin(); oiter != manifest.objs.end(); ++oiter) { + RGWObjManifestPart& part = oiter->second; + ret = store->delete_obj(s->obj_ctx, owner, part.loc); + if (ret < 0 && ret != -ENOENT) + return; + } } } - } + } while (truncated); + // and also remove the metadata obj meta_obj.init_ns(s->bucket, meta_oid, mp_ns); ret = store->delete_obj(s->obj_ctx, owner, meta_obj); @@ -2631,7 +2748,11 @@ void RGWListMultipart::execute() mp.init(s->object_str, upload_id); meta_oid = mp.get_meta(); - ret = get_multiparts_info(store, s, meta_oid, parts, policy, xattrs); + ret = get_multipart_info(store, s, meta_oid, &policy, xattrs); + if (ret < 0) + return; + + ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, parts, NULL, &truncated); } int RGWListBucketMultiparts::verify_permission() diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 39a6acfe7911..3724def2cf03 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -679,7 +679,8 @@ public: ret = 0; data = NULL; len = 0; - min_part_size = RGW_MIN_MULTIPART_SIZE; + // min_part_size = RGW_MIN_MULTIPART_SIZE; + min_part_size = 0; } virtual ~RGWCompleteMultipart() { free(data); @@ -719,12 +720,14 @@ protected: int max_parts; int marker; RGWAccessControlPolicy policy; + bool truncated; public: RGWListMultipart() { ret = 0; max_parts = 1000; marker = 0; + truncated = false; } virtual void init(RGWRados *store, struct req_state *s, RGWHandler *h) { diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index aca7873f88ae..2201d4f92d71 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -890,11 +890,19 @@ int RGWListMultipart_ObjStore::get_params() if (upload_id.empty()) { ret = -ENOTSUP; } - string str = s->info.args.get("part-number-marker"); - if (!str.empty()) - marker = atoi(str.c_str()); + string marker_str = s->info.args.get("part-number-marker"); + + if (!marker_str.empty()) { + string err; + marker = strict_strtol(marker_str.c_str(), 10, &err); + if (!err.empty()) { + ldout(s->cct, 20) << "bad marker: " << marker << dendl; + ret = -EINVAL; + return ret; + } + } - str = s->info.args.get("max-parts"); + string str = s->info.args.get("max-parts"); if (!str.empty()) max_parts = atoi(str.c_str()); diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 68dba64375e3..9dfede449c92 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -1524,11 +1524,13 @@ void RGWListMultipart_ObjStore_S3::send_response() dump_start(s); s->formatter->open_object_section_in_ns("ListMultipartUploadResult", "http://s3.amazonaws.com/doc/2006-03-01/"); - map::iterator iter, test_iter; - int i, cur_max = 0; + map::iterator iter; + map::reverse_iterator test_iter; + int cur_max = 0; - iter = parts.upper_bound(marker); - for (i = 0, test_iter = iter; test_iter != parts.end() && i < max_parts; ++test_iter, ++i) { + iter = parts.begin(); + test_iter = parts.rbegin(); + if (test_iter != parts.rend()) { cur_max = test_iter->first; } s->formatter->dump_string("Bucket", s->bucket_name_str); @@ -1536,9 +1538,9 @@ void RGWListMultipart_ObjStore_S3::send_response() s->formatter->dump_string("UploadId", upload_id); s->formatter->dump_string("StorageClass", "STANDARD"); s->formatter->dump_int("PartNumberMarker", marker); - s->formatter->dump_int("NextPartNumberMarker", cur_max + 1); + s->formatter->dump_int("NextPartNumberMarker", cur_max); s->formatter->dump_int("MaxParts", max_parts); - s->formatter->dump_string("IsTruncated", (test_iter == parts.end() ? "false" : "true")); + s->formatter->dump_string("IsTruncated", (truncated ? "true" : "false")); ACLOwner& owner = policy.get_owner(); dump_owner(s, owner.get_id(), owner.get_display_name());