static string mp_ns = RGW_OBJ_NS_MULTIPART;
static string shadow_ns = RGW_OBJ_NS_SHADOW;
+#define MULTIPART_UPLOAD_ID_PREFIX "2/" // must contain a unique char that may not come up in gen_rand_alpha()
+
class MultipartMetaFilter : public RGWAccessListFilter {
public:
MultipartMetaFilter() {}
string part_num;
RGWMPObj mp;
req_state *s;
+ string upload_id;
protected:
bool immutable_head() { return true; }
RGWPutObjProcessor::prepare(store, obj_ctx);
string oid = obj_str;
- string upload_id;
upload_id = s->info.args.get("uploadId");
mp.init(oid, upload_id);
return 0;
}
+static bool is_v2_upload_id(const string& upload_id)
+{
+ const char *uid = upload_id.c_str();
+
+ return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0);
+}
+
int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs)
{
complete_writing_data();
bufferlist bl;
RGWUploadPartInfo info;
string p = "part.";
- p.append(part_num);
+ bool sorted_omap = is_v2_upload_id(upload_id);
+
+ if (sorted_omap) {
+ string err;
+ int part_num_int = strict_strtol(part_num.c_str(), 10, &err);
+ if (!err.empty()) {
+ dout(10) << "bad part number specified: " << part_num << dendl;
+ return -EINVAL;
+ }
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%08d", part_num_int);
+ p.append(buf);
+ } else {
+ p.append(part_num);
+ }
info.num = atoi(part_num.c_str());
info.etag = etag;
info.size = s->obj_size;
do {
char buf[33];
gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);
- upload_id = buf;
+ upload_id = "2/"; /* v2 upload id */
+ upload_id.append(buf);
string tmp_obj_name;
RGWMPObj mp(s->object_str, upload_id);
} while (ret == -EEXIST);
}
-static int get_multiparts_info(RGWRados *store, struct req_state *s, string& meta_oid, map<uint32_t, RGWUploadPartInfo>& parts,
- RGWAccessControlPolicy& policy, map<string, bufferlist>& attrs)
+static int get_multipart_info(RGWRados *store, struct req_state *s, string& meta_oid,
+ RGWAccessControlPolicy *policy, map<string, bufferlist>& attrs)
{
map<string, bufferlist> parts_map;
map<string, bufferlist>::iterator iter;
if (ret < 0)
return ret;
- ret = store->omap_get_all(obj, header, parts_map);
- if (ret < 0)
- return ret;
-
- for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
- string name = iter->first;
- if (name.compare(RGW_ATTR_ACL) == 0) {
- bufferlist& bl = iter->second;
- bufferlist::iterator bli = bl.begin();
- try {
- ::decode(policy, bli);
- } catch (buffer::error& err) {
- ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
- return -EIO;
+ if (policy) {
+ for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
+ string name = iter->first;
+ if (name.compare(RGW_ATTR_ACL) == 0) {
+ bufferlist& bl = iter->second;
+ bufferlist::iterator bli = bl.begin();
+ try {
+ ::decode(*policy, bli);
+ } catch (buffer::error& err) {
+ ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+ return -EIO;
+ }
+ break;
}
- break;
}
}
+ return 0;
+}
+
+static int list_multipart_parts(RGWRados *store, struct req_state *s,
+ const string& upload_id,
+ string& meta_oid, int num_parts,
+ int marker, map<uint32_t, RGWUploadPartInfo>& parts,
+ int *next_marker, bool *truncated,
+ bool assume_unsorted = false)
+{
+ map<string, bufferlist> parts_map;
+ map<string, bufferlist>::iterator iter;
+ bufferlist header;
+
+ rgw_obj obj;
+ obj.init_ns(s->bucket, meta_oid, mp_ns);
+
+ bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
+
+ int ret;
+
+ parts.clear();
+
+ if (sorted_omap) {
+ string p;
+ p = "part.";
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%08d", marker);
+ p.append(buf);
+
+ ret = store->omap_get_vals(obj, header, p, num_parts + 1, parts_map);
+ } else {
+ ret = store->omap_get_all(obj, header, parts_map);
+ }
+ if (ret < 0)
+ return ret;
+
+ int i;
+ int last_num = 0;
+
+ uint32_t expected_next = marker + 1;
- for (iter = parts_map.begin(); iter != parts_map.end(); ++iter) {
+ for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) {
bufferlist& bl = iter->second;
bufferlist::iterator bli = bl.begin();
RGWUploadPartInfo info;
ldout(s->cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl;
return -EIO;
}
- parts[info.num] = info;
+ if (sorted_omap) {
+ if (info.num != expected_next) {
+ /* ouch, we expected a specific part num here, but we got a different one. Either
+ * a part is missing, or it could be a case of mixed rgw versions working on the same
+ * upload, where one gateway doesn't support correctly sorted omap keys for multipart
+ * upload just assume data is unsorted.
+ */
+ return list_multipart_parts(store, s, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true);
+ }
+ expected_next++;
+ }
+ if (sorted_omap ||
+ (int)info.num > marker) {
+ parts[info.num] = info;
+ last_num = info.num;
+ }
}
+
+ if (sorted_omap) {
+ if (truncated)
+ *truncated = (iter != parts_map.end());
+ } else {
+ /* rebuild a map with only num_parts entries */
+
+ map<uint32_t, RGWUploadPartInfo> new_parts;
+ map<uint32_t, RGWUploadPartInfo>::iterator piter;
+
+ for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) {
+ new_parts[piter->first] = piter->second;
+ last_num = piter->first;
+ }
+
+ if (truncated)
+ *truncated = (piter != parts.end());
+
+ parts.swap(new_parts);
+ }
+
+ if (next_marker) {
+ *next_marker = last_num;
+ }
+
return 0;
}
string meta_oid;
map<uint32_t, RGWUploadPartInfo> obj_parts;
map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
- RGWAccessControlPolicy policy(s->cct);
map<string, bufferlist> attrs;
off_t ofs = 0;
MD5 hash;
return;
}
- // ensure that each part if of the minimum size
- for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
- if ((obj_iter->second).size < min_part_size) {
- ret = -ERR_TOO_SMALL;
- return;
- }
- }
-
mp.init(s->object_str, upload_id);
meta_oid = mp.get_meta();
- ret = get_multiparts_info(store, s, meta_oid, obj_parts, policy, attrs);
- if (ret == -ENOENT)
- ret = -ERR_NO_SUCH_UPLOAD;
- if (parts->parts.size() != obj_parts.size())
- ret = -ERR_INVALID_PART;
- if (ret < 0)
- return;
+ int total_parts = 0;
+ int max_parts = 1000;
+ int marker = 0;
+ bool truncated;
- for (iter = parts->parts.begin(), obj_iter = obj_parts.begin();
- iter != parts->parts.end() && obj_iter != obj_parts.end();
- ++iter, ++obj_iter) {
- char etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
- if (iter->first != (int)obj_iter->first) {
- ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: " << iter->first << " next uploaded: " << obj_iter->first << dendl;
- ret = -ERR_INVALID_PART;
- return;
+ list<string> remove_objs; /* objects to be removed from index listing */
+
+ iter = parts->parts.begin();
+
+ do {
+ ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, obj_parts, &marker, &truncated);
+ if (ret == -ENOENT) {
+ ret = -ERR_NO_SUCH_UPLOAD;
}
- string part_etag = rgw_string_unquote(iter->second);
- if (part_etag.compare(obj_iter->second.etag) != 0) {
- ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first << " etag: " << iter->second << dendl;
+ if (ret < 0)
+ return;
+
+ total_parts += obj_parts.size();
+ if (!truncated && total_parts != (int)parts->parts.size()) {
ret = -ERR_INVALID_PART;
return;
}
- hex_to_buf(obj_iter->second.etag.c_str(), etag, CEPH_CRYPTO_MD5_DIGESTSIZE);
- hash.Update((const byte *)etag, sizeof(etag));
- }
+ for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter) {
+ char etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ if (iter->first != (int)obj_iter->first) {
+ ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: " << iter->first << " next uploaded: " << obj_iter->first << dendl;
+ ret = -ERR_INVALID_PART;
+ return;
+ }
+ string part_etag = rgw_string_unquote(iter->second);
+ if (part_etag.compare(obj_iter->second.etag) != 0) {
+ ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first << " etag: " << iter->second << dendl;
+ ret = -ERR_INVALID_PART;
+ return;
+ }
+
+ hex_to_buf(obj_iter->second.etag.c_str(), etag, CEPH_CRYPTO_MD5_DIGESTSIZE);
+ hash.Update((const byte *)etag, sizeof(etag));
+
+ RGWUploadPartInfo& obj_part = obj_iter->second;
+
+ /* update manifest for part */
+ string oid = mp.get_part(obj_iter->second.num);
+ rgw_obj src_obj;
+ src_obj.init_ns(s->bucket, oid, mp_ns);
+
+ if (obj_part.manifest.empty()) {
+ RGWObjManifestPart& part = manifest.objs[ofs];
+
+ part.loc = src_obj;
+ part.loc_ofs = 0;
+ part.size = obj_iter->second.size;
+ } else {
+ manifest.append(obj_part.manifest);
+ }
+
+ remove_objs.push_back(src_obj.object);
+
+ ofs += obj_part.size;
+ }
+ } while (truncated);
hash.Final((byte *)final_etag);
buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
target_obj.init(s->bucket, s->object_str);
- list<string> remove_objs; /* objects to be removed from index listing */
-
- for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
- RGWUploadPartInfo& obj_part = obj_iter->second;
- string oid = mp.get_part(obj_iter->second.num);
- rgw_obj src_obj;
- src_obj.init_ns(s->bucket, oid, mp_ns);
-
- if (obj_part.manifest.empty()) {
- RGWObjManifestPart& part = manifest.objs[ofs];
-
- part.loc = src_obj;
- part.loc_ofs = 0;
- part.size = obj_iter->second.size;
- } else {
- manifest.append(obj_part.manifest);
- }
-
- remove_objs.push_back(src_obj.object);
-
- ofs += obj_part.size;
- }
-
manifest.obj_size = ofs;
store->set_atomic(s->obj_ctx, target_obj);
upload_id = s->info.args.get("uploadId");
map<uint32_t, RGWUploadPartInfo> obj_parts;
map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
- RGWAccessControlPolicy policy(s->cct);
map<string, bufferlist> attrs;
rgw_obj meta_obj;
RGWMPObj mp;
mp.init(s->object_str, upload_id);
meta_oid = mp.get_meta();
- ret = get_multiparts_info(store, s, meta_oid, obj_parts, policy, attrs);
+ ret = get_multipart_info(store, s, meta_oid, NULL, attrs);
if (ret < 0)
return;
- for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
- RGWUploadPartInfo& obj_part = obj_iter->second;
+ bool truncated;
+ int marker = 0;
+ int max_parts = 1000;
- if (obj_part.manifest.empty()) {
- string oid = mp.get_part(obj_iter->second.num);
- rgw_obj obj;
- obj.init_ns(s->bucket, oid, mp_ns);
- ret = store->delete_obj(s->obj_ctx, owner, obj);
- if (ret < 0 && ret != -ENOENT)
- return;
- } else {
- RGWObjManifest& manifest = obj_part.manifest;
- map<uint64_t, RGWObjManifestPart>::iterator oiter;
- for (oiter = manifest.objs.begin(); oiter != manifest.objs.end(); ++oiter) {
- RGWObjManifestPart& part = oiter->second;
- ret = store->delete_obj(s->obj_ctx, owner, part.loc);
+ do {
+ ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, obj_parts, &marker, &truncated);
+ if (ret < 0)
+ return;
+
+ for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
+ RGWUploadPartInfo& obj_part = obj_iter->second;
+
+ if (obj_part.manifest.empty()) {
+ string oid = mp.get_part(obj_iter->second.num);
+ rgw_obj obj;
+ obj.init_ns(s->bucket, oid, mp_ns);
+ ret = store->delete_obj(s->obj_ctx, owner, obj);
if (ret < 0 && ret != -ENOENT)
return;
+ } else {
+ RGWObjManifest& manifest = obj_part.manifest;
+ map<uint64_t, RGWObjManifestPart>::iterator oiter;
+ for (oiter = manifest.objs.begin(); oiter != manifest.objs.end(); ++oiter) {
+ RGWObjManifestPart& part = oiter->second;
+ ret = store->delete_obj(s->obj_ctx, owner, part.loc);
+ if (ret < 0 && ret != -ENOENT)
+ return;
+ }
}
}
- }
+ } while (truncated);
+
// and also remove the metadata obj
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
ret = store->delete_obj(s->obj_ctx, owner, meta_obj);
mp.init(s->object_str, upload_id);
meta_oid = mp.get_meta();
- ret = get_multiparts_info(store, s, meta_oid, parts, policy, xattrs);
+ ret = get_multipart_info(store, s, meta_oid, &policy, xattrs);
+ if (ret < 0)
+ return;
+
+ ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, parts, NULL, &truncated);
}
int RGWListBucketMultiparts::verify_permission()