]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync module, fix target object naming
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 13 Oct 2017 23:47:38 +0000 (16:47 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rest_conn.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync_module_aws.cc

index cfcf737892454d9fdd889b80c0525bc20f5d29da..cf6f178b6e0430fe71300df016924abf16b6974e 100644 (file)
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-class TestSpliceCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  RGWHTTPStreamRWRequest *in_req{nullptr};
-  RGWHTTPStreamRWRequest *out_req{nullptr};
-  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
-  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-public:
-  TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
-               RGWHTTPStreamRWRequest *_in_req,
-               RGWHTTPStreamRWRequest *_out_req);
-
-  int operate();
-};
-
 class RGWCRHTTPGetDataCB : public RGWGetDataCB {
   Mutex lock;
   RGWCoroutinesEnv *env;
@@ -310,27 +295,3 @@ int RGWStreamSpliceCR::operate() {
   return 0;
 }
 
-TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
-                           RGWHTTPStreamRWRequest *_in_req,
-                           RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
-                                                               in_req(_in_req), out_req(_out_req) {
-    in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager));
-    in_crf->set_req(in_req);
-    out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager));
-    out_crf->set_req(out_req);
-}
-
-int TestSpliceCR::operate() {
-  reenter(this) {
-    yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf));
-
-    if (retcode < 0) {
-      return set_cr_error(retcode);
-    }
-
-    return set_cr_done();
-  }
-
-  return 0;
-};
-
index 370e56658d2483b991250f8ffd84cac539e0a2b1..a0fc93c1acfc16b8c4594dd88b0ed0796e3f8953 100644 (file)
@@ -64,7 +64,7 @@ class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
 
 
   virtual int wait_result() {
-    return http_op->wait_bl(result);
+    return http_op->wait(result);
   }
 
   int request_complete() override {
@@ -172,7 +172,7 @@ class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
       ret = http_op->wait(result);
     } else {
       bufferlist bl;
-      ret = http_op->wait_bl(&bl);
+      ret = http_op->wait(&bl);
     }
     auto op = std::move(http_op); // release ref on return
     if (ret < 0) {
@@ -238,6 +238,17 @@ class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T> {
 
 };
 
+template <class T>
+class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR <T> {
+ public:
+  RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
+                          RGWHTTPManager *_http_manager,
+                          const string& _path,
+                          rgw_http_param_pair *_params, bufferlist& _input, T *_result)
+    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "POST", _path, _params, _input, _result, true){}
+
+};
+
 
 template <class S, class T>
 class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T> {
@@ -293,7 +304,7 @@ public:
   int request_complete() override {
     int ret;
     bufferlist bl;
-    ret = http_op->wait_bl(&bl);
+    ret = http_op->wait(&bl);
     auto op = std::move(http_op); // release ref on return
     if (ret < 0) {
       error_stream << "http operation failed: " << op->to_str()
index 76e62220c427e2e5b4a574795b8c4848cc17ed44..215f007424dea7ea229ecb08c35504036089cca6 100644 (file)
@@ -215,7 +215,7 @@ public:
     return req.get_http_status();
   }
 
-  int wait_bl(bufferlist *pbl) {
+  int wait(bufferlist *pbl) {
     int ret = req.wait();
     if (ret < 0) {
       return ret;
@@ -341,7 +341,7 @@ public:
     return req.get_http_status();
   }
 
-  int wait_bl(bufferlist *pbl) {
+  int wait(bufferlist *pbl) {
     int ret = req.wait();
     if (ret < 0) {
       return ret;
index 8f04a993bd816a5a3ef8b79eb22a082ec7de6228..7b9f88c466266015561fde251c58954a0b0db9ee 100644 (file)
@@ -1039,7 +1039,7 @@ public:
         return io_block(0);
       }
       yield {
-        int ret = http_op->wait_bl(pbl);
+        int ret = http_op->wait(pbl);
         http_op->put();
         if (ret < 0) {
           return set_cr_error(ret);
index dd9cb8ae263a8cc5b461f9b6c246eefa8c468b65..113eb72e7aa44e8fd101174d9abbda61d7dc527e 100644 (file)
@@ -135,6 +135,251 @@ public:
   }
 };
 
+
+class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *source_conn;
+  RGWRESTConn *dest_conn;
+  rgw_obj src_obj;
+  rgw_obj dest_obj;
+
+  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+public:
+  RGWAWSStreamObjToCloudPlainCR(RGWDataSyncEnv *_sync_env,
+                                RGWRESTConn *_source_conn,
+                                const rgw_obj& _src_obj,
+                                RGWRESTConn *_dest_conn,
+                                const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   source_conn(_source_conn),
+                                                   dest_conn(_dest_conn),
+                                                   src_obj(_src_obj),
+                                                   dest_obj(_dest_obj) {}
+
+  int operate() {
+    reenter(this) {
+      /* init input */
+      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+      /* init output */
+      out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
+                                           dest_obj));
+
+      yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
+#if 0
+class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *source_conn;
+  RGWRESTConn *dest_conn;
+  rgw_obj src_obj;
+  rgw_obj dest_obj;
+
+  uint64_t ofs;
+  uint64_t size;
+  int part_num;
+
+  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+public:
+  RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env,
+                                RGWRESTConn *_source_conn,
+                                const rgw_obj& _src_obj,
+                                RGWRESTConn *_dest_conn,
+                                const rgw_obj& _dest_obj,
+                                uint64_t _ofs,
+                                uint64_t _size,
+                                int _part_num) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   source_conn(_source_conn),
+                                                   dest_conn(_dest_conn),
+                                                   src_obj(_src_obj),
+                                                   dest_obj(_dest_obj),
+                                                   ofs(_ofs), size(_size),
+                                                   part_num(_part_num) {}
+
+  int operate() {
+    reenter(this) {
+      /* init input */
+      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+      in_crf->set_range(ofs, size);
+
+      /* init output */
+      out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
+                                           dest_obj));
+
+      out_crf->set_multipart(part_num, size);
+
+      yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+#endif
+
+class RGWAWSInitMultipartCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *dest_conn;
+  rgw_obj dest_obj;
+
+  uint64_t obj_size;
+  ceph::real_time mtime;
+
+  bufferlist out_bl;
+
+  string *upload_id;
+
+  struct InitMultipartResult {
+    string bucket;
+    string key;
+    string upload_id;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+      RGWXMLDecoder::decode_xml("Key", key, obj);
+      RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
+    }
+  } result;
+
+public:
+  RGWAWSInitMultipartCR(RGWDataSyncEnv *_sync_env,
+                        RGWRESTConn *_dest_conn,
+                        const rgw_obj& _dest_obj,
+                        uint64_t _obj_size,
+                        const ceph::real_time& _mtime,
+                        string *_upload_id) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   dest_conn(_dest_conn),
+                                                   dest_obj(_dest_obj),
+                                                   obj_size(_obj_size),
+                                                   mtime(_mtime),
+                                                   upload_id(_upload_id) {}
+
+  int operate() {
+    reenter(this) {
+
+      yield {
+        string path = dest_obj.bucket.name + "/" + dest_obj.key.name;
+        rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
+        bufferlist bl;
+        call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
+                                                 path, params, bl, &out_bl));
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+        return set_cr_error(retcode);
+      }
+      {
+#warning need to cancel upload in case of error here
+        RGWXMLDecoder::XMLParser parser;
+        if (!parser.init()) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        try {
+          RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
+        } catch (RGWXMLDecoder::err& err) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+      }
+
+      ldout(sync_env->cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
+
+      *upload_id = result.upload_id;
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
+class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *source_conn;
+  RGWRESTConn *dest_conn;
+  rgw_obj src_obj;
+  rgw_obj dest_obj;
+
+  uint64_t obj_size;
+  ceph::real_time mtime;
+
+  string upload_id;
+
+public:
+  RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
+                                RGWRESTConn *_source_conn,
+                                const rgw_obj& _src_obj,
+                                RGWRESTConn *_dest_conn,
+                                const rgw_obj& _dest_obj,
+                                uint64_t _obj_size,
+                                const ceph::real_time& _mtime) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   source_conn(_source_conn),
+                                                   dest_conn(_dest_conn),
+                                                   src_obj(_src_obj),
+                                                   dest_obj(_dest_obj),
+                                                   obj_size(_obj_size),
+                                                   mtime(_mtime) {}
+
+  int operate() {
+    reenter(this) {
+#if 0
+      /* init input */
+      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+      /* init output */
+      out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
+                                           dest_obj));
+
+      yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+#endif
+
+      yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, obj_size, mtime, &upload_id));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
 // maybe use Fetch Remote Obj instead?
 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   const AWSConfig& conf;
@@ -142,12 +387,12 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   bufferlist res;
   unordered_map <string, bool> bucket_created;
   string target_bucket_name;
-  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
-  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
   rgw_rest_obj rest_obj;
   string obj_path;
   int ret{0};
 
+  static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024;
+
 public:
   RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                             RGWBucketInfo& _bucket_info,
@@ -191,31 +436,22 @@ public:
         bucket_created[target_bucket_name] = true;
       }
 
-      {
+      yield {
         rgw_obj src_obj(bucket_info.bucket, key);
-        /* init input */
-        in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
-
-        ret = in_crf->init();
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
 
         /* init output */
         rgw_bucket target_bucket;
         target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
-                                                uri resolution */
+                                                    uri resolution */
         rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
 
-        out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, conf.conn.get(),
-                                             dest_obj));
-        ret = out_crf->init();
-        if (ret < 0) {
-          return set_cr_error(ret);
+        if (size < multipart_threshold) {
+          call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, conf.conn.get(), dest_obj));
+        } else {
+          call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, conf.conn.get(),
+                                                     dest_obj, size, mtime));
         }
       }
-
-      yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }