]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: fix multipart upload listing
authorYehuda Sadeh <yehuda@inktank.com>
Fri, 17 Jan 2014 17:30:30 +0000 (09:30 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Tue, 28 Jan 2014 19:59:11 +0000 (11:59 -0800)
Fixes: #7169
A separate fix has been created for dumpling.

Previously we read the entire list of parts, disregarding the actual
marker and the requested max parts. This fix refactors the way we read
the list of parts (doing it in parts, using marker).

Create new upload-id format that is used to identify uploads with sorted
omap entries. Make sure we're backward compatible and handle correctly
mixed-versions rgw uploads.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest.cc
src/rgw/rgw_rest_s3.cc

index a73cf395bb4bac3fc7a86a51c8f296e982b73104..e0bbd0f2b42190e935ed49e9854059950f31481f 100644 (file)
@@ -33,6 +33,8 @@ using ceph::crypto::MD5;
 static string mp_ns = RGW_OBJ_NS_MULTIPART;
 static string shadow_ns = RGW_OBJ_NS_SHADOW;
 
+#define MULTIPART_UPLOAD_ID_PREFIX "2/" // must contain a unique char that may not come up in gen_rand_alpha()
+
 class MultipartMetaFilter : public RGWAccessListFilter {
 public:
   MultipartMetaFilter() {}
@@ -1339,6 +1341,7 @@ class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
   string part_num;
   RGWMPObj mp;
   req_state *s;
+  string upload_id;
 
 protected:
   bool immutable_head() { return true; }
@@ -1355,7 +1358,6 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
   RGWPutObjProcessor::prepare(store, obj_ctx);
 
   string oid = obj_str;
-  string upload_id;
   upload_id = s->info.args.get("uploadId");
   mp.init(oid, upload_id);
 
@@ -1374,6 +1376,13 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
   return 0;
 }
 
+static bool is_v2_upload_id(const string& upload_id)
+{
+  const char *uid = upload_id.c_str();
+
+  return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0);
+}
+
 int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs)
 {
   complete_writing_data();
@@ -1390,7 +1399,21 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_
   bufferlist bl;
   RGWUploadPartInfo info;
   string p = "part.";
-  p.append(part_num);
+  bool sorted_omap = is_v2_upload_id(upload_id);
+
+  if (sorted_omap) {
+    string err;
+    int part_num_int = strict_strtol(part_num.c_str(), 10, &err);
+    if (!err.empty()) {
+      dout(10) << "bad part number specified: " << part_num << dendl;
+      return -EINVAL;
+    }
+    char buf[32];
+    snprintf(buf, sizeof(buf), "%08d", part_num_int);
+    p.append(buf);
+  } else {
+    p.append(part_num);
+  }
   info.num = atoi(part_num.c_str());
   info.etag = etag;
   info.size = s->obj_size;
@@ -2323,7 +2346,8 @@ void RGWInitMultipart::execute()
   do {
     char buf[33];
     gen_rand_alphanumeric(s->cct, buf, sizeof(buf) - 1);
-    upload_id = buf;
+    upload_id = "2/"; /* v2 upload id */
+    upload_id.append(buf);
 
     string tmp_obj_name;
     RGWMPObj mp(s->object_str, upload_id);
@@ -2335,8 +2359,8 @@ void RGWInitMultipart::execute()
   } while (ret == -EEXIST);
 }
 
-static int get_multiparts_info(RGWRados *store, struct req_state *s, string& meta_oid, map<uint32_t, RGWUploadPartInfo>& parts,
-                               RGWAccessControlPolicy& policy, map<string, bufferlist>& attrs)
+static int get_multipart_info(RGWRados *store, struct req_state *s, string& meta_oid,
+                              RGWAccessControlPolicy *policy, map<string, bufferlist>& attrs)
 {
   map<string, bufferlist> parts_map;
   map<string, bufferlist>::iterator iter;
@@ -2349,27 +2373,67 @@ static int get_multiparts_info(RGWRados *store, struct req_state *s, string& met
   if (ret < 0)
     return ret;
 
-  ret = store->omap_get_all(obj, header, parts_map);
-  if (ret < 0)
-    return ret;
-
-  for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
-    string name = iter->first;
-    if (name.compare(RGW_ATTR_ACL) == 0) {
-      bufferlist& bl = iter->second;
-      bufferlist::iterator bli = bl.begin();
-      try {
-        ::decode(policy, bli);
-      } catch (buffer::error& err) {
-        ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
-        return -EIO;
+  if (policy) {
+    for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
+      string name = iter->first;
+      if (name.compare(RGW_ATTR_ACL) == 0) {
+        bufferlist& bl = iter->second;
+        bufferlist::iterator bli = bl.begin();
+        try {
+          ::decode(*policy, bli);
+        } catch (buffer::error& err) {
+          ldout(s->cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
+          return -EIO;
+        }
+        break;
       }
-      break;
     }
   }
 
+  return 0;
+}
+
+static int list_multipart_parts(RGWRados *store, struct req_state *s,
+                                const string& upload_id,
+                                string& meta_oid, int num_parts,
+                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
+                                int *next_marker, bool *truncated,
+                                bool assume_unsorted = false)
+{
+  map<string, bufferlist> parts_map;
+  map<string, bufferlist>::iterator iter;
+  bufferlist header;
+
+  rgw_obj obj;
+  obj.init_ns(s->bucket, meta_oid, mp_ns);
+
+  bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
+
+  int ret;
+
+  parts.clear();
+  
+  if (sorted_omap) {
+    string p;
+    p = "part.";
+    char buf[32];
+
+    snprintf(buf, sizeof(buf), "%08d", marker);
+    p.append(buf);
+
+    ret = store->omap_get_vals(obj, header, p, num_parts + 1, parts_map);
+  } else {
+    ret = store->omap_get_all(obj, header, parts_map);
+  }
+  if (ret < 0)
+    return ret;
+
+  int i;
+  int last_num = 0;
+
+  uint32_t expected_next = marker + 1;
 
-  for (iter = parts_map.begin(); iter != parts_map.end(); ++iter) {
+  for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) {
     bufferlist& bl = iter->second;
     bufferlist::iterator bli = bl.begin();
     RGWUploadPartInfo info;
@@ -2379,8 +2443,48 @@ static int get_multiparts_info(RGWRados *store, struct req_state *s, string& met
       ldout(s->cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl;
       return -EIO;
     }
-    parts[info.num] = info;
+    if (sorted_omap) {
+      if (info.num != expected_next) {
+        /* ouch, we expected a specific part num here, but we got a different one. Either
+         * a part is missing, or it could be a case of mixed rgw versions working on the same
+         * upload, where one gateway doesn't support correctly sorted omap keys for multipart
+         * upload just assume data is unsorted.
+         */
+        return list_multipart_parts(store, s, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true);
+      }
+      expected_next++;
+    }
+    if (sorted_omap ||
+      (int)info.num > marker) {
+      parts[info.num] = info;
+      last_num = info.num;
+    }
   }
+
+  if (sorted_omap) {
+    if (truncated)
+      *truncated = (iter != parts_map.end());
+  } else {
+    /* rebuild a map with only num_parts entries */
+
+    map<uint32_t, RGWUploadPartInfo> new_parts;
+    map<uint32_t, RGWUploadPartInfo>::iterator piter;
+
+    for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) {
+      new_parts[piter->first] = piter->second;
+      last_num = piter->first;
+    }
+
+    if (truncated)
+      *truncated = (piter != parts.end());
+
+    parts.swap(new_parts);
+  }
+
+  if (next_marker) {
+    *next_marker = last_num;
+  }
+
   return 0;
 }
 
@@ -2405,7 +2509,6 @@ void RGWCompleteMultipart::execute()
   string meta_oid;
   map<uint32_t, RGWUploadPartInfo> obj_parts;
   map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
-  RGWAccessControlPolicy policy(s->cct);
   map<string, bufferlist> attrs;
   off_t ofs = 0;
   MD5 hash;
@@ -2442,44 +2545,71 @@ void RGWCompleteMultipart::execute()
     return;
   }
 
-  // ensure that each part if of the minimum size
-  for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
-    if ((obj_iter->second).size < min_part_size) {
-      ret = -ERR_TOO_SMALL;
-      return;
-    }
-  }
-
   mp.init(s->object_str, upload_id);
   meta_oid = mp.get_meta();
 
-  ret = get_multiparts_info(store, s, meta_oid, obj_parts, policy, attrs);
-  if (ret == -ENOENT)
-    ret = -ERR_NO_SUCH_UPLOAD;
-  if (parts->parts.size() != obj_parts.size())
-    ret = -ERR_INVALID_PART;
-  if (ret < 0)
-    return;
+  int total_parts = 0;
+  int max_parts = 1000;
+  int marker = 0;
+  bool truncated;
 
-  for (iter = parts->parts.begin(), obj_iter = obj_parts.begin();
-       iter != parts->parts.end() && obj_iter != obj_parts.end();
-       ++iter, ++obj_iter) {
-    char etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
-    if (iter->first != (int)obj_iter->first) {
-      ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: " << iter->first << " next uploaded: " << obj_iter->first << dendl;
-      ret = -ERR_INVALID_PART;
-      return;
+  list<string> remove_objs; /* objects to be removed from index listing */
+
+  iter = parts->parts.begin();
+
+  do {
+    ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, obj_parts, &marker, &truncated);
+    if (ret == -ENOENT) {
+      ret = -ERR_NO_SUCH_UPLOAD;
     }
-    string part_etag = rgw_string_unquote(iter->second);
-    if (part_etag.compare(obj_iter->second.etag) != 0) {
-      ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first << " etag: " << iter->second << dendl;
+    if (ret < 0)
+      return;
+
+    total_parts += obj_parts.size();
+    if (!truncated && total_parts != (int)parts->parts.size()) {
       ret = -ERR_INVALID_PART;
       return;
     }
 
-    hex_to_buf(obj_iter->second.etag.c_str(), etag, CEPH_CRYPTO_MD5_DIGESTSIZE);
-    hash.Update((const byte *)etag, sizeof(etag));
-  }
+    for (obj_iter = obj_parts.begin(); iter != parts->parts.end() && obj_iter != obj_parts.end(); ++iter, ++obj_iter) {
+      char etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+      if (iter->first != (int)obj_iter->first) {
+        ldout(s->cct, 0) << "NOTICE: parts num mismatch: next requested: " << iter->first << " next uploaded: " << obj_iter->first << dendl;
+        ret = -ERR_INVALID_PART;
+        return;
+      }
+      string part_etag = rgw_string_unquote(iter->second);
+      if (part_etag.compare(obj_iter->second.etag) != 0) {
+        ldout(s->cct, 0) << "NOTICE: etag mismatch: part: " << iter->first << " etag: " << iter->second << dendl;
+        ret = -ERR_INVALID_PART;
+        return;
+      }
+
+      hex_to_buf(obj_iter->second.etag.c_str(), etag, CEPH_CRYPTO_MD5_DIGESTSIZE);
+      hash.Update((const byte *)etag, sizeof(etag));
+
+      RGWUploadPartInfo& obj_part = obj_iter->second;
+
+      /* update manifest for part */
+      string oid = mp.get_part(obj_iter->second.num);
+      rgw_obj src_obj;
+      src_obj.init_ns(s->bucket, oid, mp_ns);
+
+      if (obj_part.manifest.empty()) {
+        RGWObjManifestPart& part = manifest.objs[ofs];
+
+        part.loc = src_obj;
+        part.loc_ofs = 0;
+        part.size = obj_iter->second.size;
+      } else {
+        manifest.append(obj_part.manifest);
+      }
+
+      remove_objs.push_back(src_obj.object);
+
+      ofs += obj_part.size;
+    }
+  } while (truncated);
   hash.Final((byte *)final_etag);
 
   buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
@@ -2493,29 +2623,6 @@ void RGWCompleteMultipart::execute()
 
   target_obj.init(s->bucket, s->object_str);
 
-  list<string> remove_objs; /* objects to be removed from index listing */
-
-  for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
-    RGWUploadPartInfo& obj_part = obj_iter->second;
-    string oid = mp.get_part(obj_iter->second.num);
-    rgw_obj src_obj;
-    src_obj.init_ns(s->bucket, oid, mp_ns);
-
-    if (obj_part.manifest.empty()) {
-      RGWObjManifestPart& part = manifest.objs[ofs];
-
-      part.loc = src_obj;
-      part.loc_ofs = 0;
-      part.size = obj_iter->second.size;
-    } else {
-      manifest.append(obj_part.manifest);
-    }
-
-    remove_objs.push_back(src_obj.object);
-
-    ofs += obj_part.size;
-  }
-
   manifest.obj_size = ofs;
 
   store->set_atomic(s->obj_ctx, target_obj);
@@ -2560,7 +2667,6 @@ void RGWAbortMultipart::execute()
   upload_id = s->info.args.get("uploadId");
   map<uint32_t, RGWUploadPartInfo> obj_parts;
   map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
-  RGWAccessControlPolicy policy(s->cct);
   map<string, bufferlist> attrs;
   rgw_obj meta_obj;
   RGWMPObj mp;
@@ -2572,31 +2678,42 @@ void RGWAbortMultipart::execute()
   mp.init(s->object_str, upload_id);
   meta_oid = mp.get_meta();
 
-  ret = get_multiparts_info(store, s, meta_oid, obj_parts, policy, attrs);
+  ret = get_multipart_info(store, s, meta_oid, NULL, attrs);
   if (ret < 0)
     return;
 
-  for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
-    RGWUploadPartInfo& obj_part = obj_iter->second;
+  bool truncated;
+  int marker = 0;
+  int max_parts = 1000;
 
-    if (obj_part.manifest.empty()) {
-      string oid = mp.get_part(obj_iter->second.num);
-      rgw_obj obj;
-      obj.init_ns(s->bucket, oid, mp_ns);
-      ret = store->delete_obj(s->obj_ctx, owner, obj);
-      if (ret < 0 && ret != -ENOENT)
-        return;
-    } else {
-      RGWObjManifest& manifest = obj_part.manifest;
-      map<uint64_t, RGWObjManifestPart>::iterator oiter;
-      for (oiter = manifest.objs.begin(); oiter != manifest.objs.end(); ++oiter) {
-        RGWObjManifestPart& part = oiter->second;
-        ret = store->delete_obj(s->obj_ctx, owner, part.loc);
+  do {
+    ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, obj_parts, &marker, &truncated);
+    if (ret < 0)
+      return;
+
+    for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
+      RGWUploadPartInfo& obj_part = obj_iter->second;
+
+      if (obj_part.manifest.empty()) {
+        string oid = mp.get_part(obj_iter->second.num);
+        rgw_obj obj;
+        obj.init_ns(s->bucket, oid, mp_ns);
+        ret = store->delete_obj(s->obj_ctx, owner, obj);
         if (ret < 0 && ret != -ENOENT)
           return;
+      } else {
+        RGWObjManifest& manifest = obj_part.manifest;
+        map<uint64_t, RGWObjManifestPart>::iterator oiter;
+        for (oiter = manifest.objs.begin(); oiter != manifest.objs.end(); ++oiter) {
+          RGWObjManifestPart& part = oiter->second;
+          ret = store->delete_obj(s->obj_ctx, owner, part.loc);
+          if (ret < 0 && ret != -ENOENT)
+            return;
+        }
       }
     }
-  }
+  } while (truncated);
+
   // and also remove the metadata obj
   meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
   ret = store->delete_obj(s->obj_ctx, owner, meta_obj);
@@ -2631,7 +2748,11 @@ void RGWListMultipart::execute()
   mp.init(s->object_str, upload_id);
   meta_oid = mp.get_meta();
 
-  ret = get_multiparts_info(store, s, meta_oid, parts, policy, xattrs);
+  ret = get_multipart_info(store, s, meta_oid, &policy, xattrs);
+  if (ret < 0)
+    return;
+
+  ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts, marker, parts, NULL, &truncated);
 }
 
 int RGWListBucketMultiparts::verify_permission()
index 39a6acfe79114df255404037ce48165685d32b0f..3724def2cf033295ae6c033031cf25c40abb9869 100644 (file)
@@ -679,7 +679,8 @@ public:
     ret = 0;
     data = NULL;
     len = 0;
-    min_part_size = RGW_MIN_MULTIPART_SIZE;
+    // min_part_size = RGW_MIN_MULTIPART_SIZE;
+    min_part_size = 0;
   }
   virtual ~RGWCompleteMultipart() {
     free(data);
@@ -719,12 +720,14 @@ protected:
   int max_parts;
   int marker;
   RGWAccessControlPolicy policy;
+  bool truncated;
 
 public:
   RGWListMultipart() {
     ret = 0;
     max_parts = 1000;
     marker = 0;
+    truncated = false;
   }
 
   virtual void init(RGWRados *store, struct req_state *s, RGWHandler *h) {
index aca7873f88aea2fa63d68bf6620e72e695fd0dbc..2201d4f92d712d121080319b7781e2404eefd8f5 100644 (file)
@@ -890,11 +890,19 @@ int RGWListMultipart_ObjStore::get_params()
   if (upload_id.empty()) {
     ret = -ENOTSUP;
   }
-  string str = s->info.args.get("part-number-marker");
-  if (!str.empty())
-    marker = atoi(str.c_str());
+  string marker_str = s->info.args.get("part-number-marker");
+
+  if (!marker_str.empty()) {
+    string err;
+    marker = strict_strtol(marker_str.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldout(s->cct, 20) << "bad marker: "  << marker << dendl;
+      ret = -EINVAL;
+      return ret;
+    }
+  }
   
-  str = s->info.args.get("max-parts");
+  string str = s->info.args.get("max-parts");
   if (!str.empty())
     max_parts = atoi(str.c_str());
 
index 68dba64375e34f13181ad06e0779b604fb60ec6b..9dfede449c92400bd664e71b0ae85830b4398598 100644 (file)
@@ -1524,11 +1524,13 @@ void RGWListMultipart_ObjStore_S3::send_response()
     dump_start(s);
     s->formatter->open_object_section_in_ns("ListMultipartUploadResult",
                    "http://s3.amazonaws.com/doc/2006-03-01/");
-    map<uint32_t, RGWUploadPartInfo>::iterator iter, test_iter;
-    int i, cur_max = 0;
+    map<uint32_t, RGWUploadPartInfo>::iterator iter;
+    map<uint32_t, RGWUploadPartInfo>::reverse_iterator test_iter;
+    int cur_max = 0;
 
-    iter = parts.upper_bound(marker);
-    for (i = 0, test_iter = iter; test_iter != parts.end() && i < max_parts; ++test_iter, ++i) {
+    iter = parts.begin();
+    test_iter = parts.rbegin();
+    if (test_iter != parts.rend()) {
       cur_max = test_iter->first;
     }
     s->formatter->dump_string("Bucket", s->bucket_name_str);
@@ -1536,9 +1538,9 @@ void RGWListMultipart_ObjStore_S3::send_response()
     s->formatter->dump_string("UploadId", upload_id);
     s->formatter->dump_string("StorageClass", "STANDARD");
     s->formatter->dump_int("PartNumberMarker", marker);
-    s->formatter->dump_int("NextPartNumberMarker", cur_max + 1);
+    s->formatter->dump_int("NextPartNumberMarker", cur_max);
     s->formatter->dump_int("MaxParts", max_parts);
-    s->formatter->dump_string("IsTruncated", (test_iter == parts.end() ? "false" : "true"));
+    s->formatter->dump_string("IsTruncated", (truncated ? "true" : "false"));
 
     ACLOwner& owner = policy.get_owner();
     dump_owner(s, owner.get_id(), owner.get_display_name());