]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add support for multipart upload expiration. 13622/head
authorZhang Shaowen <zhangshaowen@cmss.chinamobile.com>
Thu, 23 Feb 2017 01:36:27 +0000 (09:36 +0800)
committerZhang Shaowen <zhangshaowen@cmss.chinamobile.com>
Wed, 5 Apr 2017 11:01:35 +0000 (19:01 +0800)
Fixes: http://tracker.ceph.com/issues/19088
Signed-off-by: Zhang Shaowen <zhangshaowen@cmss.chinamobile.com>
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h
src/rgw/rgw_lc_s3.cc
src/rgw/rgw_lc_s3.h
src/rgw/rgw_multi.cc
src/rgw/rgw_multi.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h

index 5efc6e38507e526c7f82d8ee0c3df29144413e8d..3b62b23fd934d8121bac10b1584cfab5c2cdda84 100644 (file)
@@ -35,7 +35,7 @@ bool LCRule::validate()
   if (id.length() > MAX_ID_LEN) {
     return false;
   }
-  else if(expiration.empty() && noncur_expiration.empty()) {
+  else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty()) {
     return false;
   }
   else if (!expiration.empty() && expiration.get_days() <= 0) {
@@ -44,6 +44,9 @@ bool LCRule::validate()
   else if (!noncur_expiration.empty() && noncur_expiration.get_days() <=0) {
     return false;
   }
+  else if (!mp_expiration.empty() && mp_expiration.get_days() <= 0) {
+    return false;
+  }
   return true;
 }
 
@@ -66,6 +69,9 @@ bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
   if (!rule->get_noncur_expiration().empty()) {
     op.noncur_expiration = rule->get_noncur_expiration().get_days();
   }
+  if (!rule->get_mp_expiration().empty()) {
+    op.mp_expiration = rule->get_mp_expiration().get_days();
+  }
   auto ret = prefix_map.insert(pair<string, lc_op>(rule->get_prefix(), op));
   return ret.second;
 }
@@ -89,7 +95,7 @@ int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
 }
 
 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
-//define same action(now only support expiration days)
+//define same action. 
 bool RGWLifecycleConfiguration::validate() 
 {
   if (prefix_map.size() < 2) {
@@ -104,7 +110,8 @@ bool RGWLifecycleConfiguration::validate()
       string n_pre = next_iter->first;
       if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
         if ((cur_iter->second.expiration > 0 && next_iter->second.expiration > 0) ||
-          (cur_iter->second.noncur_expiration > 0 && next_iter->second.noncur_expiration > 0)) {
+          (cur_iter->second.noncur_expiration > 0 && next_iter->second.noncur_expiration > 0) || 
+          (cur_iter->second.mp_expiration > 0 && next_iter->second.mp_expiration > 0)) {
           return false;
         } else {
           ++next_iter;
@@ -264,6 +271,54 @@ int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, b
   }
 }
 
+int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
+{
+  MultipartMetaFilter mp_filter;
+  vector<rgw_bucket_dir_entry> objs;
+  RGWMPObj mp_obj;
+  bool is_truncated;
+  int ret;
+  RGWBucketInfo& bucket_info = target->get_bucket_info();
+  RGWRados::Bucket::List list_op(target);
+  list_op.params.list_versions = false;
+  list_op.params.ns = RGW_OBJ_NS_MULTIPART;
+  list_op.params.filter = &mp_filter;
+  for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+    if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
+      continue;
+    }
+    list_op.params.prefix = prefix_iter->first;
+    do {
+      objs.clear();
+      list_op.params.marker = list_op.get_next_marker();
+      ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
+      if (ret < 0) {
+          if (ret == (-ENOENT))
+            return 0;
+          ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
+          return ret;
+      }
+
+      utime_t now = ceph_clock_now();
+      for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
+        if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
+          rgw_obj_key key(obj_iter->key);
+          if (!mp_obj.from_meta(key.name)) {
+            continue;
+          }
+          RGWObjectCtx rctx(store);
+          ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
+          if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
+            ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
+            return ret;
+          }
+        }
+      }
+    } while(is_truncated);
+  }
+  return 0;
+}
+
 int RGWLC::bucket_lc_process(string& shard_id)
 {
   RGWLifecycleConfiguration  config(cct);
@@ -358,7 +413,7 @@ int RGWLC::bucket_lc_process(string& shard_id)
   //bucket versioning is enabled or suspended
     rgw_obj_key pre_marker;
     for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
-      if (!prefix_iter->second.status) {
+      if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0 && prefix_iter->second.noncur_expiration <= 0)) {
         continue;
       }
       if (prefix_iter != prefix_map.begin() && 
@@ -441,6 +496,8 @@ int RGWLC::bucket_lc_process(string& shard_id)
     }
   }
 
+  ret = handle_multipart_expiration(&target, prefix_map);
+
   return ret;
 }
 
index 37956f474b03623a8864fd564828f6300aaa3d2e..61edd78aac720818efc8b44adfabcf2c359161c7 100644 (file)
@@ -16,6 +16,7 @@
 #include "common/Thread.h"
 #include "rgw_common.h"
 #include "rgw_rados.h"
+#include "rgw_multi.h"
 #include "cls/rgw/cls_rgw_types.h"
 
 using namespace std;
@@ -69,6 +70,7 @@ protected:
   string status;
   LCExpiration expiration;
   LCExpiration noncur_expiration;
+  LCExpiration mp_expiration;
 
 public:
 
@@ -96,6 +98,10 @@ public:
     return noncur_expiration;
   }
 
+  LCExpiration& get_mp_expiration() {
+    return mp_expiration;
+  }
+
   void set_id(string*_id) {
     id = *_id;
   }
@@ -116,19 +122,24 @@ public:
     noncur_expiration = *_noncur_expiration;
   }
 
+  void set_mp_expiration(LCExpiration* _mp_expiration) {
+    mp_expiration = *_mp_expiration;
+  }
+
   bool validate();
   
   void encode(bufferlist& bl) const {
-     ENCODE_START(2, 1, bl);
+     ENCODE_START(3, 1, bl);
      ::encode(id, bl);
      ::encode(prefix, bl);
      ::encode(status, bl);
      ::encode(expiration, bl);
      ::encode(noncur_expiration, bl);
+     ::encode(mp_expiration, bl);
      ENCODE_FINISH(bl);
    }
    void decode(bufferlist::iterator& bl) {
-     DECODE_START_LEGACY_COMPAT_LEN(2, 1, 1, bl);
+     DECODE_START_LEGACY_COMPAT_LEN(3, 1, 1, bl);
      ::decode(id, bl);
      ::decode(prefix, bl);
      ::decode(status, bl);
@@ -136,6 +147,9 @@ public:
      if (struct_v >=2) {
        ::decode(noncur_expiration, bl);
      }
+     if (struct_v >= 3) {
+       ::decode(mp_expiration, bl);
+     }
      DECODE_FINISH(bl);
    }
 
@@ -147,8 +161,9 @@ struct lc_op
   bool status;
   int expiration;
   int noncur_expiration;
+  int mp_expiration;
 
-  lc_op() : status(false), expiration(0), noncur_expiration(0) {}
+  lc_op() : status(false), expiration(0), noncur_expiration(0), mp_expiration(0) {}
   
 };
 
@@ -255,6 +270,7 @@ class RGWLC {
   private:
   int remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed = true);
   bool obj_has_expired(double timediff, int days);
+  int handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map);
 };
 
 
index 2b1e1d29508134eaf9b9b3f20fd98c66da2b3c6c..99ed7b70cd688a708f01f15add19ab92e1cc52c3 100644 (file)
@@ -16,7 +16,6 @@ using namespace std;
 bool LCExpiration_S3::xml_end(const char * el) {
   LCDays_S3 *lc_days = static_cast<LCDays_S3 *>(find_first("Days"));
 
-  // ID is mandatory
   if (!lc_days)
     return false;
   days = lc_days->get_data();
@@ -24,7 +23,7 @@ bool LCExpiration_S3::xml_end(const char * el) {
 }
 
 bool LCNoncurExpiration_S3::xml_end(const char *el) {
-  LCNoncurDays_S3 *lc_noncur_days = static_cast<LCNoncurDays_S3 *>(find_first("NoncurrentDays"));
+  LCDays_S3 *lc_noncur_days = static_cast<LCDays_S3 *>(find_first("NoncurrentDays"));
   if (!lc_noncur_days) {
     return false;
   }
@@ -32,6 +31,15 @@ bool LCNoncurExpiration_S3::xml_end(const char *el) {
   return true;
 }
 
+bool LCMPExpiration_S3::xml_end(const char *el) {
+  LCDays_S3 *lc_mp_days = static_cast<LCDays_S3 *>(find_first("DaysAfterInitiation"));
+  if (!lc_mp_days) {
+    return false;
+  }
+  days = lc_mp_days->get_data();
+  return true;
+}
+
 bool RGWLifecycleConfiguration_S3::xml_end(const char *el) {
   XMLObjIter iter = find("Rule");
   LCRule_S3 *rule = static_cast<LCRule_S3 *>(iter.get_next());
@@ -48,6 +56,7 @@ bool LCRule_S3::xml_end(const char *el) {
   LCStatus_S3 *lc_status;
   LCExpiration_S3 *lc_expiration;
   LCNoncurExpiration_S3 *lc_noncur_expiration;
+  LCMPExpiration_S3 *lc_mp_expiration;
 
   id.clear();
   prefix.clear();
@@ -72,7 +81,8 @@ bool LCRule_S3::xml_end(const char *el) {
 
   lc_expiration = static_cast<LCExpiration_S3 *>(find_first("Expiration"));
   lc_noncur_expiration = static_cast<LCNoncurExpiration_S3 *>(find_first("NoncurrentVersionExpiration"));
-  if (!lc_expiration && !lc_noncur_expiration) {
+  lc_mp_expiration = static_cast<LCMPExpiration_S3 *>(find_first("AbortIncompleteMultipartUpload"));
+  if (!lc_expiration && !lc_noncur_expiration && !lc_mp_expiration) {
     return false;
   } else {
     if (lc_expiration) {
@@ -81,6 +91,9 @@ bool LCRule_S3::xml_end(const char *el) {
     if (lc_noncur_expiration) {
       noncur_expiration = *lc_noncur_expiration;
     }
+    if (lc_mp_expiration) {
+      mp_expiration = *lc_mp_expiration;
+    }
   }
 
   return true;
@@ -99,6 +112,10 @@ void LCRule_S3::to_xml(CephContext *cct, ostream& out) {
     LCNoncurExpiration_S3& noncur_expir = static_cast<LCNoncurExpiration_S3&>(noncur_expiration);
     noncur_expir.to_xml(out);
   }
+  if (!mp_expiration.empty()) {
+    LCMPExpiration_S3& mp_expir = static_cast<LCMPExpiration_S3&>(mp_expiration);
+    mp_expir.to_xml(out);
+  }
   out << "</Rule>";
 }
 
@@ -150,7 +167,11 @@ XMLObj *RGWLCXMLParser_S3::alloc_obj(const char *el)
   } else if (strcmp(el, "NoncurrentVersionExpiration") == 0) {
     obj = new LCNoncurExpiration_S3();
   } else if (strcmp(el, "NoncurrentDays") == 0) {
-    obj = new LCNoncurDays_S3();
+    obj = new LCDays_S3();
+  } else if (strcmp(el, "AbortIncompleteMultipartUpload") == 0) {
+    obj = new LCMPExpiration_S3();
+  } else if (strcmp(el, "DaysAfterInitiation") == 0) {
+    obj = new LCDays_S3();
   }
   return obj;
 }
index f04e5b88e137341bbc668e3da2c96c108c97fce1..c969ac979c93cf6d859a28d61f76c0dd1a4b133e 100644 (file)
@@ -48,16 +48,6 @@ public:
   string& to_str() { return data; }
 };
 
-class LCNoncurDays_S3 : public XMLObj
-{
-public:
-  LCNoncurDays_S3() {}
-  ~LCNoncurDays_S3() override {}
-  string& to_str() {
-    return data;
-  }
-};
-
 class LCExpiration_S3 : public LCExpiration, public XMLObj
 {
 public:
@@ -92,6 +82,23 @@ public:
   }
 };
 
+class LCMPExpiration_S3 : public LCExpiration, public XMLObj
+{
+public:
+  LCMPExpiration_S3() {}
+  ~LCMPExpiration_S3() {}
+
+  bool xml_end(const char *el);
+  void to_xml(ostream& out) {
+    out << "<AbortIncompleteMultipartUpload>" << "<DaysAfterInitiation>" << days << "</DaysAfterInitiation>" << "</AbortIncompleteMultipartUpload>";
+  }
+  void dump_xml(Formatter *f) const {
+    f->open_object_section("AbortIncompleteMultipartUpload");
+    encode_xml("DaysAfterInitiation", days, f);
+    f->close_section();
+  }
+};
+
 class LCRule_S3 : public LCRule, public XMLObj
 {
 public:
@@ -114,6 +121,10 @@ public:
       const LCNoncurExpiration_S3& noncur_expir = static_cast<const LCNoncurExpiration_S3&>(noncur_expiration);
       noncur_expir.dump_xml(f);
     }
+    if (!mp_expiration.empty()) {
+      const LCMPExpiration_S3& mp_expir = static_cast<const LCMPExpiration_S3&>(mp_expiration);
+      mp_expir.dump_xml(f);
+    }
     f->close_section(); // Rule
   }
 };
index a1fba2d64fffdc61d313103216a2665cff0e301d..220cc5071948827195a497e15db47ac6cc8052e4 100644 (file)
@@ -65,3 +65,183 @@ XMLObj *RGWMultiXMLParser::alloc_obj(const char *el) {
   return obj;
 }
 
+bool is_v2_upload_id(const string& upload_id)
+{
+  const char *uid = upload_id.c_str();
+
+  return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0) ||
+         (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0);
+}
+
+int list_multipart_parts(RGWRados *store, RGWBucketInfo& bucket_info, CephContext *cct,
+                                const string& upload_id,
+                                string& meta_oid, int num_parts,
+                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
+                                int *next_marker, bool *truncated,
+                                bool assume_unsorted)
+{
+  map<string, bufferlist> parts_map;
+  map<string, bufferlist>::iterator iter;
+  bufferlist header;
+
+  rgw_obj obj;
+  obj.init_ns(bucket_info.bucket, meta_oid, RGW_OBJ_NS_MULTIPART);
+  obj.set_in_extra_data(true);
+
+  rgw_raw_obj raw_obj;
+  store->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
+
+  bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
+
+  int ret;
+
+  parts.clear();
+
+  if (sorted_omap) {
+    string p;
+    p = "part.";
+    char buf[32];
+
+    snprintf(buf, sizeof(buf), "%08d", marker);
+    p.append(buf);
+
+    ret = store->omap_get_vals(raw_obj, header, p, num_parts + 1, parts_map);
+  } else {
+    ret = store->omap_get_all(raw_obj, header, parts_map);
+  }
+  if (ret < 0)
+    return ret;
+
+  int i;
+  int last_num = 0;
+
+  uint32_t expected_next = marker + 1;
+
+  for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) {
+    bufferlist& bl = iter->second;
+    bufferlist::iterator bli = bl.begin();
+    RGWUploadPartInfo info;
+    try {
+      ::decode(info, bli);
+    } catch (buffer::error& err) {
+      ldout(cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl;
+      return -EIO;
+    }
+    if (sorted_omap) {
+      if (info.num != expected_next) {
+        /* ouch, we expected a specific part num here, but we got a different one. Either
+         * a part is missing, or it could be a case of mixed rgw versions working on the same
+         * upload, where one gateway doesn't support correctly sorted omap keys for multipart
+         * upload just assume data is unsorted.
+         */
+        return list_multipart_parts(store, bucket_info, cct, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true);
+      }
+      expected_next++;
+    }
+    if (sorted_omap ||
+      (int)info.num > marker) {
+      parts[info.num] = info;
+      last_num = info.num;
+    }
+  }
+
+  if (sorted_omap) {
+    if (truncated)
+      *truncated = (iter != parts_map.end());
+  } else {
+    /* rebuild a map with only num_parts entries */
+
+    map<uint32_t, RGWUploadPartInfo> new_parts;
+    map<uint32_t, RGWUploadPartInfo>::iterator piter;
+
+    for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) {
+      new_parts[piter->first] = piter->second;
+      last_num = piter->first;
+    }
+
+    if (truncated)
+      *truncated = (piter != parts.end());
+
+    parts.swap(new_parts);
+  }
+
+  if (next_marker) {
+    *next_marker = last_num;
+  }
+
+  return 0;
+}
+
+int list_multipart_parts(RGWRados *store, struct req_state *s,
+                                const string& upload_id,
+                                string& meta_oid, int num_parts,
+                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
+                                int *next_marker, bool *truncated,
+                                bool assume_unsorted)
+{
+  return list_multipart_parts(store, s->bucket_info, s->cct, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, assume_unsorted);
+}
+
+int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_ctx, RGWBucketInfo& bucket_info, RGWMPObj& mp_obj)
+{
+  rgw_obj meta_obj;
+  meta_obj.init_ns(bucket_info.bucket, mp_obj.get_meta(), RGW_OBJ_NS_MULTIPART);
+  meta_obj.set_in_extra_data(true);
+  meta_obj.index_hash_source = mp_obj.get_key();
+  cls_rgw_obj_chain chain;
+  list<rgw_obj_index_key> remove_objs;
+  map<uint32_t, RGWUploadPartInfo> obj_parts;
+  bool truncated;
+  int marker = 0;
+  int ret;
+
+  do {
+    ret = list_multipart_parts(store, bucket_info, cct, mp_obj.get_upload_id(), mp_obj.get_meta(), 1000,
+      marker, obj_parts, &marker, &truncated);
+    if (ret < 0)
+      return ret;
+    for (auto obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
+      RGWUploadPartInfo& obj_part = obj_iter->second;
+      rgw_obj obj;
+      if (obj_part.manifest.empty()) {
+        string oid = mp_obj.get_part(obj_iter->second.num);
+        obj.init_ns(bucket_info.bucket, oid, RGW_OBJ_NS_MULTIPART);
+        obj.index_hash_source = mp_obj.get_key();
+        ret = store->delete_obj(*obj_ctx, bucket_info, obj, 0);
+        if (ret < 0 && ret != -ENOENT)
+          return ret;
+      } else {
+        store->update_gc_chain(meta_obj, obj_part.manifest, &chain);
+        RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin();
+        if (oiter != obj_part.manifest.obj_end()) {
+          rgw_obj head;
+          rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
+          rgw_raw_obj_to_obj(bucket_info.bucket, raw_head, &head);
+
+          rgw_obj_index_key key;
+          head.key.get_index_key(&key);
+          remove_objs.push_back(key);
+        }
+      }
+    }
+  } while (truncated);
+  /* use upload id as tag */
+  ret = store->send_chain_to_gc(chain, mp_obj.get_upload_id() , false);  // do it async
+  if (ret < 0) {
+    ldout(cct, 5) << "gc->send_chain() returned " << ret << dendl;
+    return ret;
+  }
+  RGWRados::Object del_target(store, bucket_info, *obj_ctx, meta_obj);
+  RGWRados::Object::Delete del_op(&del_target);
+
+  del_op.params.bucket_owner = bucket_info.owner;
+  del_op.params.versioning_status = 0;
+  if (!remove_objs.empty()) {
+    del_op.params.remove_objs = &remove_objs;
+  }
+
+  // and also remove the metadata obj
+  ret = del_op.delete_obj();
+  return ret == -ENOENT?-ERR_NO_SUCH_UPLOAD:ret;
+}
+
index 3512f18e7b121a777bae0627e946958fe23bfb65..a2b03bdf5ec6909b411c6614a7d7ef693cc46ee1 100644 (file)
@@ -6,6 +6,11 @@
 
 #include <map>
 #include "rgw_xml.h"
+#include "rgw_rados.h"
+
+#define MP_META_SUFFIX ".meta"
+#define MULTIPART_UPLOAD_ID_PREFIX_LEGACY "2/"
+#define MULTIPART_UPLOAD_ID_PREFIX "2~" // must contain a unique char that may not come up in gen_rand_alpha()
 
 class RGWMultiCompleteUpload : public XMLObj
 {
@@ -52,4 +57,109 @@ public:
   ~RGWMultiXMLParser() override {}
 };
 
+class MultipartMetaFilter : public RGWAccessListFilter {
+public:
+  MultipartMetaFilter() {}
+  bool filter(string& name, string& key) override {
+    int len = name.size();
+    if (len < 6)
+      return false;
+
+    size_t pos = name.find(MP_META_SUFFIX, len - 5);
+    if (pos == string::npos)
+      return false;
+
+    pos = name.rfind('.', pos - 1);
+    if (pos == string::npos)
+      return false;
+
+    key = name.substr(0, pos);
+
+    return true;
+  }
+};
+
+class RGWMPObj {
+  string oid;
+  string prefix;
+  string meta;
+  string upload_id;
+public:
+  RGWMPObj() {}
+  RGWMPObj(const string& _oid, const string& _upload_id) {
+    init(_oid, _upload_id, _upload_id);
+  }
+  void init(const string& _oid, const string& _upload_id) {
+    init(_oid, _upload_id, _upload_id);
+  }
+  void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
+    if (_oid.empty()) {
+      clear();
+      return;
+    }
+    oid = _oid;
+    upload_id = _upload_id;
+    prefix = oid + ".";
+    meta = prefix + upload_id + MP_META_SUFFIX;
+    prefix.append(part_unique_str);
+  }
+  string& get_meta() { return meta; }
+  string get_part(int num) {
+    char buf[16];
+    snprintf(buf, 16, ".%d", num);
+    string s = prefix;
+    s.append(buf);
+    return s;
+  }
+  string get_part(string& part) {
+    string s = prefix;
+    s.append(".");
+    s.append(part);
+    return s;
+  }
+  string& get_upload_id() {
+    return upload_id;
+  }
+  string& get_key() {
+    return oid;
+  }
+  bool from_meta(string& meta) {
+    int end_pos = meta.rfind('.'); // search for ".meta"
+    if (end_pos < 0)
+      return false;
+    int mid_pos = meta.rfind('.', end_pos - 1); // <key>.<upload_id>
+    if (mid_pos < 0)
+      return false;
+    oid = meta.substr(0, mid_pos);
+    upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
+    init(oid, upload_id, upload_id);
+    return true;
+  }
+  void clear() {
+    oid = "";
+    prefix = "";
+    meta = "";
+    upload_id = "";
+  }
+};
+
+extern bool is_v2_upload_id(const string& upload_id);
+
+extern int list_multipart_parts(RGWRados *store, RGWBucketInfo& bucket_info, CephContext *cct,
+                                const string& upload_id,
+                                string& meta_oid, int num_parts,
+                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
+                                int *next_marker, bool *truncated,
+                                bool assume_unsorted = false);
+
+extern int list_multipart_parts(RGWRados *store, struct req_state *s,
+                                const string& upload_id,
+                                string& meta_oid, int num_parts,
+                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
+                                int *next_marker, bool *truncated,
+                                bool assume_unsorted = false);
+
+extern int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_ctx,
+                                RGWBucketInfo& bucket_info, RGWMPObj& mp_obj);
+
 #endif
index a4572dd3a9478139b37753cfe125636fe7175651..a041d5574a463013eaf82abe517a9c357410e8f6 100644 (file)
@@ -57,31 +57,6 @@ static string shadow_ns = RGW_OBJ_NS_SHADOW;
 
 static int forward_request_to_master(struct req_state *s, obj_version *objv, RGWRados *store, bufferlist& in_data, JSONParser *jp);
 
-#define MULTIPART_UPLOAD_ID_PREFIX_LEGACY "2/"
-#define MULTIPART_UPLOAD_ID_PREFIX "2~" // must contain a unique char that may not come up in gen_rand_alpha()
-
-class MultipartMetaFilter : public RGWAccessListFilter {
-public:
-  MultipartMetaFilter() {}
-  bool filter(string& name, string& key) override {
-    int len = name.size();
-    if (len < 6)
-      return false;
-
-    size_t pos = name.find(MP_META_SUFFIX, len - 5);
-    if (pos == string::npos)
-      return false;
-
-    pos = name.rfind('.', pos - 1);
-    if (pos == string::npos)
-      return false;
-
-    key = name.substr(0, pos);
-
-    return true;
-  }
-};
-
 static MultipartMetaFilter mp_filter;
 
 static int parse_range(const char *range, off_t& ofs, off_t& end, bool *partial_content)
@@ -2678,14 +2653,6 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, string *oid_rand)
   return 0;
 }
 
-static bool is_v2_upload_id(const string& upload_id)
-{
-  const char *uid = upload_id.c_str();
-
-  return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0) ||
-         (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0);
-}
-
 int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
                                               const string& etag,
                                               real_time *mtime, real_time set_mtime,
@@ -4677,105 +4644,6 @@ static int get_multipart_info(RGWRados *store, struct req_state *s,
   return 0;
 }
 
-static int list_multipart_parts(RGWRados *store, struct req_state *s,
-                                const string& upload_id,
-                                string& meta_oid, int num_parts,
-                                int marker, map<uint32_t, RGWUploadPartInfo>& parts,
-                                int *next_marker, bool *truncated,
-                                bool assume_unsorted = false)
-{
-  map<string, bufferlist> parts_map;
-  map<string, bufferlist>::iterator iter;
-  bufferlist header;
-
-  rgw_obj obj;
-  obj.init_ns(s->bucket, meta_oid, mp_ns);
-  obj.set_in_extra_data(true);
-
-  rgw_raw_obj raw_obj;
-  store->obj_to_raw(s->bucket_info.placement_rule, obj, &raw_obj);
-
-  bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
-
-  int ret;
-
-  parts.clear();
-
-  if (sorted_omap) {
-    string p;
-    p = "part.";
-    char buf[32];
-
-    snprintf(buf, sizeof(buf), "%08d", marker);
-    p.append(buf);
-
-    ret = store->omap_get_vals(raw_obj, header, p, num_parts + 1, parts_map);
-  } else {
-    ret = store->omap_get_all(raw_obj, header, parts_map);
-  }
-  if (ret < 0)
-    return ret;
-
-  int i;
-  int last_num = 0;
-
-  uint32_t expected_next = marker + 1;
-
-  for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) {
-    bufferlist& bl = iter->second;
-    bufferlist::iterator bli = bl.begin();
-    RGWUploadPartInfo info;
-    try {
-      ::decode(info, bli);
-    } catch (buffer::error& err) {
-      ldout(s->cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl;
-      return -EIO;
-    }
-    if (sorted_omap) {
-      if (info.num != expected_next) {
-        /* ouch, we expected a specific part num here, but we got a different one. Either
-         * a part is missing, or it could be a case of mixed rgw versions working on the same
-         * upload, where one gateway doesn't support correctly sorted omap keys for multipart
-         * upload just assume data is unsorted.
-         */
-        return list_multipart_parts(store, s, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true);
-      }
-      expected_next++;
-    }
-    if (sorted_omap ||
-      (int)info.num > marker) {
-      parts[info.num] = info;
-      last_num = info.num;
-    }
-  }
-
-  if (sorted_omap) {
-    if (truncated)
-      *truncated = (iter != parts_map.end());
-  } else {
-    /* rebuild a map with only num_parts entries */
-
-    map<uint32_t, RGWUploadPartInfo> new_parts;
-    map<uint32_t, RGWUploadPartInfo>::iterator piter;
-
-    for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) {
-      new_parts[piter->first] = piter->second;
-      last_num = piter->first;
-    }
-
-    if (truncated)
-      *truncated = (piter != parts.end());
-
-    parts.swap(new_parts);
-  }
-
-  if (next_marker) {
-    *next_marker = last_num;
-  }
-
-  return 0;
-}
-
 int RGWCompleteMultipart::verify_permission()
 {
   if (!verify_bucket_permission(s, RGW_PERM_WRITE))
@@ -5041,8 +4909,6 @@ void RGWAbortMultipart::execute()
   string upload_id;
   string meta_oid;
   upload_id = s->info.args.get("uploadId");
-  map<uint32_t, RGWUploadPartInfo> obj_parts;
-  map<uint32_t, RGWUploadPartInfo>::iterator obj_iter;
   map<string, bufferlist> attrs;
   rgw_obj meta_obj;
   RGWMPObj mp;
@@ -5057,75 +4923,8 @@ void RGWAbortMultipart::execute()
   if (op_ret < 0)
     return;
 
-  bool truncated;
-  int marker = 0;
-  int max_parts = 1000;
-
-
   RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
-
-  meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
-  meta_obj.set_in_extra_data(true);
-  meta_obj.index_hash_source = s->object.name;
-
-  cls_rgw_obj_chain chain;
-  list<rgw_obj_index_key> remove_objs;
-
-  do {
-    op_ret = list_multipart_parts(store, s, upload_id, meta_oid, max_parts,
-                                 marker, obj_parts, &marker, &truncated);
-    if (op_ret < 0)
-      return;
-
-    for (obj_iter = obj_parts.begin();
-        obj_iter != obj_parts.end(); ++obj_iter) {
-      RGWUploadPartInfo& obj_part = obj_iter->second;
-      rgw_obj obj;
-
-      if (obj_part.manifest.empty()) {
-        string oid = mp.get_part(obj_iter->second.num);
-        obj.init_ns(s->bucket, oid, mp_ns);
-        obj.index_hash_source = s->object.name;
-        op_ret = store->delete_obj(*obj_ctx, s->bucket_info, obj, 0);
-        if (op_ret < 0 && op_ret != -ENOENT)
-          return;
-      } else {
-        store->update_gc_chain(meta_obj, obj_part.manifest, &chain);
-        RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin();
-        if (oiter != obj_part.manifest.obj_end()) {
-          rgw_obj head;
-          rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
-          rgw_raw_obj_to_obj(s->bucket, raw_head, &head);
-
-          rgw_obj_index_key key;
-          head.key.get_index_key(&key);
-          remove_objs.push_back(key);
-        }
-      }
-    }
-  } while (truncated);
-
-  /* use upload id as tag */
-  op_ret = store->send_chain_to_gc(chain, upload_id , false);  // do it async
-  if (op_ret < 0) {
-    ldout(store->ctx(), 5) << "gc->send_chain() returned " << op_ret << dendl;
-    return;
-  }
-
-  RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, meta_obj);
-  RGWRados::Object::Delete del_op(&del_target);
-
-  del_op.params.bucket_owner = s->bucket_info.owner;
-  del_op.params.versioning_status = 0;
-  if (!remove_objs.empty()) {
-    del_op.params.remove_objs = &remove_objs;
-  }
-
-  // and also remove the metadata obj
-  op_ret = del_op.delete_obj();
-  if (op_ret == -ENOENT) {
-    op_ret = -ERR_NO_SUCH_UPLOAD;
-  }
+  op_ret = abort_multipart_upload(store, s->cct, obj_ctx, s->bucket_info, mp);
 }
 
 int RGWListMultipart::verify_permission()
index abb1969eec9b666fe86af0180ab75f86ab0f4269..49ec7e337e94f86889b5c23010c237ef3482b9bb 100644 (file)
@@ -1560,72 +1560,6 @@ public:
   uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
 };
 
-#define MP_META_SUFFIX ".meta"
-
-class RGWMPObj {
-  string oid;
-  string prefix;
-  string meta;
-  string upload_id;
-public:
-  RGWMPObj() {}
-  RGWMPObj(const string& _oid, const string& _upload_id) {
-    init(_oid, _upload_id, _upload_id);
-  }
-  void init(const string& _oid, const string& _upload_id) {
-    init(_oid, _upload_id, _upload_id);
-  }
-  void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
-    if (_oid.empty()) {
-      clear();
-      return;
-    }
-    oid = _oid;
-    upload_id = _upload_id;
-    prefix = oid + ".";
-    meta = prefix + upload_id + MP_META_SUFFIX;
-    prefix.append(part_unique_str);
-  }
-  string& get_meta() { return meta; }
-  string get_part(int num) {
-    char buf[16];
-    snprintf(buf, 16, ".%d", num);
-    string s = prefix;
-    s.append(buf);
-    return s;
-  }
-  string get_part(string& part) {
-    string s = prefix;
-    s.append(".");
-    s.append(part);
-    return s;
-  }
-  string& get_upload_id() {
-    return upload_id;
-  }
-  string& get_key() {
-    return oid;
-  }
-  bool from_meta(string& meta) {
-    int end_pos = meta.rfind('.'); // search for ".meta"
-    if (end_pos < 0)
-      return false;
-    int mid_pos = meta.rfind('.', end_pos - 1); // <key>.<upload_id>
-    if (mid_pos < 0)
-      return false;
-    oid = meta.substr(0, mid_pos);
-    upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
-    init(oid, upload_id, upload_id);
-    return true;
-  }
-  void clear() {
-    oid = "";
-    prefix = "";
-    meta = "";
-    upload_id = "";
-  }
-};
-
 struct RGWMultipartUploadEntry {
   rgw_bucket_dir_entry obj;
   RGWMPObj mp;