]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: tiering: add support for multipart uploads
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Oct 2018 17:36:18 +0000 (10:36 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 4 Jan 2019 03:00:22 +0000 (19:00 -0800)
Encode multipart info structure in the upload meta object,
store the storage_cass info there. Use the storage class
that was set on upload init when uploading all the parts,
and when completing the upload.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_op.cc
src/rgw/rgw_rados.h

index c6b52c9486bff5eb16b814a79a09c79152eeb948..30282babbf23c06cc3a660e587508af74d79584b 100644 (file)
@@ -307,7 +307,7 @@ vector<Policy> get_iam_user_policy_from_attr(CephContext* cct,
   return policies;
 }
 
-static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map<string, bufferlist>& attrs)
+static int get_obj_attrs(RGWRados *store, struct req_state *s, const rgw_obj& obj, map<string, bufferlist>& attrs)
 {
   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
   RGWRados::Object::Read read_op(&op_target);
@@ -317,7 +317,116 @@ static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map
   return read_op.prepare();
 }
 
-static int modify_obj_attr(RGWRados *store, struct req_state *s, rgw_obj& obj, const char* attr_name, bufferlist& attr_val)
+static int get_obj_head(RGWRados *store, struct req_state *s,
+                        const rgw_obj& obj,
+                        map<string, bufferlist> *attrs,
+                         bufferlist *pbl)
+{
+  store->set_prefetch_data(s->obj_ctx, obj);
+
+  RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
+  RGWRados::Object::Read read_op(&op_target);
+
+  read_op.params.attrs = attrs;
+
+  int ret = read_op.prepare();
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (!pbl) {
+    return 0;
+  }
+
+  ret = read_op.read(0, s->cct->_conf->rgw_max_chunk_size, *pbl);
+
+  return 0;
+}
+
+struct multipart_upload_info
+{
+  rgw_placement_rule dest_placement;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(dest_placement, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(dest_placement, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(multipart_upload_info)
+
+static int get_multipart_info(RGWRados *store, struct req_state *s,
+                             const rgw_obj& obj,
+                              RGWAccessControlPolicy *policy,
+                             map<string, bufferlist> *attrs,
+                              multipart_upload_info *upload_info)
+{
+  bufferlist header;
+
+  bufferlist headbl;
+  bufferlist *pheadbl = (upload_info ? &headbl : nullptr);
+
+  int op_ret = get_obj_head(store, s, obj, attrs, pheadbl);
+  if (op_ret < 0) {
+    if (op_ret == -ENOENT) {
+      return -ERR_NO_SUCH_UPLOAD;
+    }
+    return op_ret;
+  }
+
+  if (upload_info && headbl.length() > 0) {
+    auto hiter = headbl.cbegin();
+    try {
+      decode(*upload_info, hiter);
+    } catch (buffer::error& err) {
+      ldpp_dout(s, 0) << "ERROR: failed to decode multipart upload info" << dendl;
+      return -EIO;
+    }
+  }
+
+  if (policy && attrs) {
+    for (auto& iter : *attrs) {
+      string name = iter.first;
+      if (name.compare(RGW_ATTR_ACL) == 0) {
+        bufferlist& bl = iter.second;
+        auto bli = bl.cbegin();
+        try {
+          decode(*policy, bli);
+        } catch (buffer::error& err) {
+          ldpp_dout(s, 0) << "ERROR: could not decode policy" << dendl;
+          return -EIO;
+        }
+        break;
+      }
+    }
+  }
+
+  return 0;
+}
+
+static int get_multipart_info(RGWRados *store, struct req_state *s,
+                             string& meta_oid,
+                              RGWAccessControlPolicy *policy,
+                             map<string, bufferlist> *attrs,
+                              multipart_upload_info *upload_info)
+{
+  map<string, bufferlist>::iterator iter;
+  bufferlist header;
+
+  rgw_obj meta_obj;
+  meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
+  meta_obj.set_in_extra_data(true);
+
+  return get_multipart_info(store, s, meta_obj, policy, attrs, upload_info);
+}
+
+static int modify_obj_attr(RGWRados *store, struct req_state *s, const rgw_obj& obj, const char* attr_name, bufferlist& attr_val)
 {
   map<string, bufferlist> attrs;
   RGWRados::Object op_target(store, s->bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
@@ -3512,8 +3621,21 @@ void RGWPutObj::execute()
   ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
 
   if (multipart) {
+    RGWMPObj mp(s->object.name, multipart_upload_id);
+
+    multipart_upload_info upload_info;
+    op_ret = get_multipart_info(store, s, mp.get_meta(), nullptr, nullptr, &upload_info);
+    if (op_ret < 0) {
+      if (op_ret != -ENOENT) {
+        ldpp_dout(this, 0) << "ERROR: get_multipart_info returned " << op_ret << ": " << cpp_strerror(-op_ret) << dendl;
+      } else {// -ENOENT: raced with upload complete/cancel, no need to spam log
+        ldpp_dout(this, 20) << "failed to get multipart info (returned " << op_ret << ": " << cpp_strerror(-op_ret) << "): probably raced with upload complete / cancel" << dendl;
+      }
+      return;
+    }
+    ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl;
     processor.emplace<MultipartObjectProcessor>(
-        &aio, store, s->bucket_info, &s->dest_placement,
+        &aio, store, s->bucket_info, &upload_info.dest_placement,
         s->owner.get_id(), obj_ctx, obj,
         multipart_upload_id, multipart_part_num, multipart_part_str);
   } else {
@@ -5349,48 +5471,15 @@ void RGWInitMultipart::execute()
     obj_op.meta.category = RGWObjCategory::MultiMeta;
     obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
 
-    op_ret = obj_op.write_meta(0, 0, attrs);
-  } while (op_ret == -EEXIST);
-}
+    multipart_upload_info upload_info;
+    upload_info.dest_placement = s->dest_placement;
 
-static int get_multipart_info(RGWRados *store, struct req_state *s,
-                             string& meta_oid,
-                              RGWAccessControlPolicy *policy,
-                             map<string, bufferlist>& attrs)
-{
-  map<string, bufferlist>::iterator iter;
-  bufferlist header;
-
-  rgw_obj obj;
-  obj.init_ns(s->bucket, meta_oid, mp_ns);
-  obj.set_in_extra_data(true);
-
-  int op_ret = get_obj_attrs(store, s, obj, attrs);
-  if (op_ret < 0) {
-    if (op_ret == -ENOENT) {
-      return -ERR_NO_SUCH_UPLOAD;
-    }
-    return op_ret;
-  }
-
-  if (policy) {
-    for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
-      string name = iter->first;
-      if (name.compare(RGW_ATTR_ACL) == 0) {
-        bufferlist& bl = iter->second;
-        auto bli = bl.cbegin();
-        try {
-          decode(*policy, bli);
-        } catch (buffer::error& err) {
-          ldpp_dout(s, 0) << "ERROR: could not decode policy" << dendl;
-          return -EIO;
-        }
-        break;
-      }
-    }
-  }
+    bufferlist bl;
+    encode(upload_info, bl);
+    obj_op.meta.data = &bl;
 
-  return 0;
+    op_ret = obj_op.write_meta(bl.length(), 0, attrs);
+  } while (op_ret == -EEXIST);
 }
 
 int RGWCompleteMultipart::verify_permission()
@@ -5769,7 +5858,6 @@ void RGWAbortMultipart::execute()
   string upload_id;
   string meta_oid;
   upload_id = s->info.args.get("uploadId");
-  map<string, bufferlist> attrs;
   rgw_obj meta_obj;
   RGWMPObj mp;
 
@@ -5779,7 +5867,7 @@ void RGWAbortMultipart::execute()
   mp.init(s->object.name, upload_id);
   meta_oid = mp.get_meta();
 
-  op_ret = get_multipart_info(store, s, meta_oid, NULL, attrs);
+  op_ret = get_multipart_info(store, s, meta_oid, nullptr, nullptr, nullptr);
   if (op_ret < 0)
     return;
 
@@ -5802,7 +5890,6 @@ void RGWListMultipart::pre_exec()
 
 void RGWListMultipart::execute()
 {
-  map<string, bufferlist> xattrs;
   string meta_oid;
   RGWMPObj mp;
 
@@ -5813,7 +5900,7 @@ void RGWListMultipart::execute()
   mp.init(s->object.name, upload_id);
   meta_oid = mp.get_meta();
 
-  op_ret = get_multipart_info(store, s, meta_oid, &policy, xattrs);
+  op_ret = get_multipart_info(store, s, meta_oid, &policy, nullptr, nullptr);
   if (op_ret < 0)
     return;
 
index 3114a083d0310ac2c0d82a5a9709f50fb8394190..91d02e139fb355b4f7d9bf2e7452e99710144922 100644 (file)
@@ -1105,7 +1105,7 @@ public:
     assert (!obj.empty());
     objs_state[obj].is_atomic = true;
   }
-  void set_prefetch_data(rgw_obj& obj) {
+  void set_prefetch_data(const rgw_obj& obj) {
     RWLock::WLocker wl(lock);
     assert (!obj.empty());
     objs_state[obj].prefetch_data = true;
@@ -2102,7 +2102,7 @@ public:
     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
     rctx->set_atomic(obj);
   }
-  void set_prefetch_data(void *ctx, rgw_obj& obj) {
+  void set_prefetch_data(void *ctx, const rgw_obj& obj) {
     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
     rctx->set_prefetch_data(obj);
   }