]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: support bucket removal in Bulk Delete API implementation.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Sun, 25 Oct 2015 13:15:32 +0000 (14:15 +0100)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Tue, 8 Dec 2015 16:57:24 +0000 (17:57 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest_swift.cc

index 57aaf1b7eadd7fa8afc7b6ca76a7c0463e3671c5..9e20fa328195376bdba5469932e08890ac89c956 100644 (file)
@@ -828,7 +828,7 @@ static char hex_to_num(char c)
   return hex_table.to_num(c);
 }
 
-bool url_decode(string& src_str, string& dest_str, bool in_query)
+bool url_decode(const string& src_str, string& dest_str, bool in_query)
 {
   const char *src = src_str.c_str();
   char dest[src_str.size() + 1];
index 234e4d16abd523fc2c85894a371fcbbfc7607a3e..641e2d57c929769aec253cee37d1063c58c31b3f 100644 (file)
@@ -983,7 +983,7 @@ struct rgw_obj_key {
     instance = i;
   }
 
-  bool empty() {
+  bool empty() const {
     return name.empty();
   }
   bool operator==(const rgw_obj_key& k) const {
@@ -1662,7 +1662,7 @@ extern bool verify_object_permission(struct req_state *s,
 extern bool verify_object_permission(struct req_state *s, int perm);
 /** Convert an input URL into a sane object name
  * by converting %-escaped strings into characters, etc*/
-extern bool url_decode(string& src_str, string& dest_str, bool in_query = false);
+extern bool url_decode(const string& src_str, string& dest_str, bool in_query = false);
 extern void url_encode(const string& src, string& dst);
 
 extern void calc_hmac_sha1(const char *key, int key_len,
index 397f839f2f09902e9a075179ccb5355c3cfb283d..dce18d110b16115d08d0ff8676e8402ad09900f8 100644 (file)
@@ -3797,11 +3797,26 @@ bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
   return verify_object_permission(s, &bacl, &oacl, RGW_PERM_WRITE);
 }
 
-bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
+bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
+                                               map<string, bufferlist>& battrs)
 {
   int ret = 0;
 
+  RGWAccessControlPolicy bacl(store->ctx());
+  rgw_obj_key no_obj;
+  ret = read_policy(store, s, binfo, battrs, &bacl, binfo.bucket, no_obj);
+  if (ret < 0) {
+    return false;
+  }
+
+  return verify_bucket_permission(s, &bacl, RGW_PERM_WRITE);
+}
+
+bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
+{
+  int ret = 0;
   auto& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
   RGWBucketInfo binfo;
   map<string, bufferlist> battrs;
   ret = store->get_bucket_info(obj_ctx, path.bucket_name, binfo, NULL, &battrs);
@@ -3809,8 +3824,7 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
     goto binfo_fail;
   }
 
-  /* We do need a new scope due to goto. */
-  {
+  if (!path.obj_key.empty()) {
     rgw_obj obj(binfo.bucket, path.obj_key);
     obj_ctx.set_atomic(obj);
 
@@ -3831,6 +3845,38 @@ bool RGWBulkDelete::Deleter::delete_single(const acct_path_t& path)
     if (ret < 0) {
       goto delop_fail;
     }
+  } else {
+    RGWObjVersionTracker ot;
+    ot.read_version = binfo.ep_objv;
+
+    if (!verify_permission(binfo, battrs)) {
+      ret = -EACCES;
+      goto auth_fail;
+    }
+
+    ret = store->delete_bucket(binfo.bucket, ot);
+    if (0 == ret) {
+      ret = rgw_unlink_bucket(store, binfo.owner, binfo.bucket.name, false);
+      if (ret < 0) {
+        ldout(s->cct, 0) << "WARNING: failed to unlink bucket: ret=" << ret << dendl;
+      }
+    }
+    if (ret < 0) {
+      goto delop_fail;
+    }
+
+    if (!store->region.is_master) {
+      bufferlist in_data;
+      JSONParser jp;
+      ret = forward_request_to_master(s, &ot.read_version, store, in_data, &jp);
+      if (ret < 0) {
+        if (ret == -ENOENT) { /* adjust error,
+                               we want to return with NoSuchBucket and not NoSuchKey */
+          ret = -ERR_NO_SUCH_BUCKET;
+        }
+        goto delop_fail;
+      }
+    }
   }
 
   num_deleted++;
@@ -3865,7 +3911,7 @@ auth_fail:
 
 delop_fail:
     if (-ENOENT == ret) {
-      ldout(store->ctx(), 20) << "cannot find the object" << dendl;
+      ldout(store->ctx(), 20) << "cannot find entry " << path << dendl;
       num_unfound++;
     } else {
       fail_desc_t failed_item = {
index 78776ffe65079bd85f07e64dcfa633c5f997d2b8..8046f346022b029b63513851bd429596f3600432 100644 (file)
@@ -1138,6 +1138,8 @@ public:
                            map<string, bufferlist>& battrs,
                            rgw_obj& obj,
                            ACLOwner& bucket_owner /* out */);
+    bool verify_permission(RGWBucketInfo& binfo,
+                           map<string, bufferlist>& battrs);
     bool delete_single(const acct_path_t& path);
     bool delete_chunk(const std::list<acct_path_t>& paths);
   };
index 223060c15713a785da70087879c49fa751a277be..a7ad0603bf65f3144fb67fcbc9c4a508e4812d7b 100644 (file)
@@ -952,20 +952,26 @@ int RGWBulkDelete_ObjStore_SWIFT::get_data(list<RGWBulkDelete::acct_path_t>& ite
 
     ldout(s->cct, 20) << "extracted Bulk Delete entry: " << path_str << dendl;
 
+    RGWBulkDelete::acct_path_t path;
+
     const size_t sep_pos = path_str.find('/');
     if (string::npos == sep_pos) {
-      ldout(s->cct, 20) << "wrongly formatted item: " << path_str << dendl;
-      continue;
-    }
+      url_decode(path_str, path.bucket_name);
+    } else {
+      string bucket_name;
+      url_decode(path_str.substr(0, sep_pos), bucket_name);
 
-    RGWBulkDelete::acct_path_t path;
-    path.bucket_name = path_str.substr(0, sep_pos);
-    path.obj_key     = path_str.substr(sep_pos + 1);
+      string obj_name;
+      url_decode(path_str.substr(sep_pos + 1), obj_name);
+
+      path.bucket_name = bucket_name;
+      path.obj_key = obj_name;
+    }
 
     items.push_back(path);
 
     if (items.size() == MAX_CHUNK_ENTRIES) {
-      is_truncated = true;
+      *is_truncated = true;
       return 0;
     }
   }