]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync, more work on large object sync via multipart upload
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 18 Oct 2017 21:44:03 +0000 (14:44 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_sync_module_aws.cc

index a0fc93c1acfc16b8c4594dd88b0ed0796e3f8953..ee2b944bb97e137b3dc6c77f29fe2c68ae11ad75 100644 (file)
@@ -371,6 +371,12 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
 protected:
   rgw_rest_obj rest_obj;
 
+  struct range_info {
+    bool is_set{false};
+    uint64_t ofs;
+    uint64_t size;
+  } range;
+
 public:
   RGWStreamReadHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
@@ -396,6 +402,12 @@ public:
   rgw_rest_obj& get_rest_obj() {
     return rest_obj;
   }
+
+  void set_range(uint64_t ofs, uint64_t size) {
+    range.is_set = true;
+    range.ofs = ofs;
+    range.size = size;
+  }
 };
 
 class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
@@ -406,6 +418,13 @@ protected:
 
   RGWHTTPStreamRWRequest *req{nullptr};
 
+  struct multipart_info {
+    bool is_multipart{false};
+    string upload_id;
+    int part_num{0};
+    uint64_t part_size;
+  } multipart;
+
 public:
   RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
@@ -426,6 +445,13 @@ public:
   void set_req(RGWHTTPStreamRWRequest *r) {
     req = r;
   }
+
+  void set_multipart(const string& upload_id, int part_num, uint64_t part_size) {
+    multipart.is_multipart = true;
+    multipart.upload_id = upload_id;
+    multipart.part_num = part_num;
+    multipart.part_size = part_size;
+  }
 };
 
 class RGWStreamSpliceCR : public RGWCoroutine {
index 749fe4371957c1a9958529c0be29f2b0a1db25a6..0723baf853feef1c9f118fcc1907a38e77474440 100644 (file)
@@ -549,7 +549,7 @@ static void send_prepare_convert(const rgw_obj& obj, string *resource)
   *resource = urlsafe_bucket + "/" + urlsafe_object;
 }
 
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
 {
   string resource;
   send_prepare_convert(obj, &resource);
@@ -557,7 +557,7 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>&
   return send_request(&key, extra_headers, resource, mgr);
 }
 
-int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
 {
   string resource;
   send_prepare_convert(obj, &resource);
index e990715c84e4222faa0d36667d7356ab684c61c1..5ab40a3cc91c4edf3cc648d2f0206bbb629d695d 100644 (file)
@@ -118,13 +118,15 @@ public:
   virtual ~RGWRESTStreamRWRequest() override {}
 
   int send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */);
-  int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
+  int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj);
   int send(RGWHTTPManager *mgr);
 
-  int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
+  int send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr);
   int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
 
   int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
+
+  void add_params(param_vec_t *params);
 };
 
 class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {
index 70ee7f0bccf038e786bfc189db6b3af5fab62b25..f35d119d76349c6d9e1aab7b7e3fa10ce77220ff 100644 (file)
@@ -118,7 +118,7 @@ public:
     explicit StreamObjData(rgw_obj& _obj) : obj(_obj) {}
 };
 
-int RGWRESTConn::put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req)
+int RGWRESTConn::put_obj_send_init(rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req)
 {
   string url;
   int ret = get_url(url);
@@ -128,6 +128,11 @@ int RGWRESTConn::put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req)
   rgw_user uid;
   param_vec_t params;
   populate_params(params, &uid, self_zone_group);
+
+  if (extra_params) {
+    append_param_list(params, extra_params);
+  }
+
   RGWRESTStreamS3PutObj *wr = new RGWRESTStreamS3PutObj(cct, "PUT", url, NULL, &params);
   wr->send_init(obj);
   *req = wr;
@@ -184,12 +189,28 @@ static void set_header(T val, map<string, string>& headers, const string& header
 }
 
 
-int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj,
+int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, const rgw_obj& obj,
                          const real_time *mod_ptr, const real_time *unmod_ptr,
                          uint32_t mod_zone_id, uint64_t mod_pg_ver,
                          bool prepend_metadata, bool get_op, bool rgwx_stat,
                          bool sync_manifest, bool skip_decrypt,
                          bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req)
+{
+  get_obj_params params;
+  params.uid = uid;
+  params.info = info;
+  params.mod_ptr = mod_ptr;
+  params.mod_pg_ver = mod_pg_ver;
+  params.prepend_metadata = prepend_metadata;
+  params.get_op = get_op;
+  params.rgwx_stat = rgwx_stat;
+  params.sync_manifest = sync_manifest;
+  params.skip_decrypt = skip_decrypt;
+  params.cb = cb;
+  return get_obj(obj, params, send, req);
+}
+
+int RGWRESTConn::get_obj(const rgw_obj& obj, const get_obj_params& in_params, bool send, RGWRESTStreamRWRequest **req)
 {
   string url;
   int ret = get_url(url);
@@ -197,31 +218,31 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
     return ret;
 
   param_vec_t params;
-  populate_params(params, &uid, self_zone_group);
-  if (prepend_metadata) {
+  populate_params(params, &in_params.uid, self_zone_group);
+  if (in_params.prepend_metadata) {
     params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true"));
   }
-  if (rgwx_stat) {
+  if (in_params.rgwx_stat) {
     params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true"));
   }
-  if (sync_manifest) {
+  if (in_params.sync_manifest) {
     params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "sync-manifest", ""));
   }
-  if (skip_decrypt) {
+  if (in_params.skip_decrypt) {
     params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "skip-decrypt", ""));
   }
   if (!obj.key.instance.empty()) {
     const string& instance = obj.key.instance;
     params.push_back(param_pair_t("versionId", instance));
   }
-  if (get_op) {
-    *req = new RGWRESTStreamReadRequest(cct, url, cb, NULL, &params);
+  if (in_params.get_op) {
+    *req = new RGWRESTStreamReadRequest(cct, url, in_params.cb, NULL, &params);
   } else {
-    *req = new RGWRESTStreamHeadRequest(cct, url, cb, NULL, &params);
+    *req = new RGWRESTStreamHeadRequest(cct, url, in_params.cb, NULL, &params);
   }
   map<string, string> extra_headers;
-  if (info) {
-    const auto& orig_map = info->env->get_map();
+  if (in_params.info) {
+    const auto& orig_map = in_params.info->env->get_map();
 
     /* add original headers that start with HTTP_X_AMZ_ */
     static constexpr char SEARCH_AMZ_PREFIX[] = "HTTP_X_AMZ_";
@@ -235,13 +256,18 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
     }
   }
 
-  set_date_header(mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE");
-  set_date_header(unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE");
-  if (mod_zone_id != 0) {
-    set_header(mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID");
+  set_date_header(in_params.mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE");
+  set_date_header(in_params.unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE");
+  if (in_params.mod_zone_id != 0) {
+    set_header(in_params.mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID");
+  }
+  if (in_params.mod_pg_ver != 0) {
+    set_header(in_params.mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
   }
-  if (mod_pg_ver != 0) {
-    set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
+  if (in_params.range_is_set) {
+    char buf[64];
+    snprintf(buf, sizeof(buf), "bytes=%lld-%lld", (long long)in_params.range_start, (long long)in_params.range_end);
+    set_header(buf, extra_headers, "RANGE");
   }
 
   int r = (*req)->send_prepare(key, extra_headers, obj);
index 215f007424dea7ea229ecb08c35504036089cca6..bdbdd4822db1aae89adc267e813456798bca5cac 100644 (file)
@@ -36,16 +36,22 @@ struct rgw_http_param_pair {
   const char *val;
 };
 
-// copy a null-terminated rgw_http_param_pair list into a list of string pairs
-inline param_vec_t make_param_list(const rgw_http_param_pair* pp)
+// append a null-terminated rgw_http_param_pair list into a list of string pairs
+inline void append_param_list(param_vec_t& params, const rgw_http_param_pair* pp)
 {
-  param_vec_t params;
   while (pp && pp->key) {
     string k = pp->key;
     string v = (pp->val ? pp->val : "");
     params.emplace_back(make_pair(std::move(k), std::move(v)));
     ++pp;
   }
+}
+
+// copy a null-terminated rgw_http_param_pair list into a list of string pairs
+inline param_vec_t make_param_list(const rgw_http_param_pair* pp)
+{
+  param_vec_t params;
+  append_param_list(params, pp);
   return params;
 }
 
@@ -89,12 +95,36 @@ public:
 
 
   /* async requests */
-  int put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req);
+  int put_obj_send_init(rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req);
   int put_obj_async(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size,
                     map<string, bufferlist>& attrs, bool send, RGWRESTStreamS3PutObj **req);
   int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime);
 
-  int get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj,
+  struct get_obj_params {
+    rgw_user uid;
+    req_info *info{nullptr};
+    const ceph::real_time *mod_ptr{nullptr};
+    const ceph::real_time *unmod_ptr{nullptr};
+
+    uint32_t mod_zone_id{0};
+    uint64_t mod_pg_ver{0};
+
+    bool prepend_metadata{false};
+    bool get_op{false};
+    bool rgwx_stat{false};
+    bool sync_manifest{false};
+
+    bool skip_decrypt{true};
+    RGWGetDataCB *cb{nullptr};
+
+    bool range_is_set{false};
+    uint64_t range_start{0};
+    uint64_t range_end{0};
+  };
+
+  int get_obj(const rgw_obj& obj, const get_obj_params& params, bool send, RGWRESTStreamRWRequest **req);
+
+  int get_obj(const rgw_user& uid, req_info *info /* optional */, const rgw_obj& obj,
               const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
               uint32_t mod_zone_id, uint64_t mod_pg_ver,
               bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest,
index 113eb72e7aa44e8fd101174d9abbda61d7dc527e..571947d99f554bfc5827580c3148efd6f8084dfe 100644 (file)
@@ -30,6 +30,11 @@ static string aws_object_name(const RGWBucketInfo& bucket_info, const rgw_obj_ke
   return object_name;
 }
 
+static string obj_to_aws_path(const rgw_obj& obj)
+{
+  return obj.bucket.name + "/" + obj.key.name;
+}
+
 struct AWSConfig {
   string id;
   std::unique_ptr<RGWRESTConn> conn;
@@ -40,6 +45,7 @@ class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *conn;
   rgw_obj src_obj;
+  RGWRESTConn::get_obj_params req_params;
 public:
   RGWRESTStreamGetCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
@@ -52,12 +58,19 @@ public:
 
   int init() override {
     /* init input connection */
+
+
+    req_params.get_op = true;
+    req_params.prepend_metadata = true;
+
+    if (range.is_set) {
+      req_params.range_is_set = true;
+      req_params.range_start = range.ofs;
+      req_params.range_end = range.ofs + range.size - 1;
+    }
+
     RGWRESTStreamRWRequest *in_req;
-    int ret = conn->get_obj(rgw_user(),  nullptr, src_obj,
-                            nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
-                            true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */,
-                            false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
-                            nullptr /* cb */, &in_req);
+    int ret = conn->get_obj(src_obj, req_params, false /* send */, &in_req);
     if (ret < 0) {
       ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
       return ret;
@@ -108,7 +121,16 @@ public:
     /* init output connection */
     RGWRESTStreamS3PutObj *out_req{nullptr};
 
-    conn->put_obj_send_init(dest_obj, &out_req);
+    if (multipart.is_multipart) {
+      char buf[32];
+      snprintf(buf, sizeof(buf), "%d", multipart.part_num);
+      rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
+                                       { "partNumber", buf },
+                                       { nullptr, nullptr } };
+      conn->put_obj_send_init(dest_obj, params, &out_req);
+    } else {
+      conn->put_obj_send_init(dest_obj, nullptr, &out_req);
+    }
 
     set_req(out_req);
 
@@ -179,7 +201,6 @@ public:
   }
 };
 
-#if 0
 class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *source_conn;
@@ -187,6 +208,7 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
   rgw_obj src_obj;
   rgw_obj dest_obj;
 
+  string upload_id;
   uint64_t ofs;
   uint64_t size;
   int part_num;
@@ -200,6 +222,7 @@ public:
                                 const rgw_obj& _src_obj,
                                 RGWRESTConn *_dest_conn,
                                 const rgw_obj& _dest_obj,
+                                const string& _upload_id,
                                 uint64_t _ofs,
                                 uint64_t _size,
                                 int _part_num) : RGWCoroutine(_sync_env->cct),
@@ -208,6 +231,7 @@ public:
                                                    dest_conn(_dest_conn),
                                                    src_obj(_src_obj),
                                                    dest_obj(_dest_obj),
+                                                   upload_id(_upload_id),
                                                    ofs(_ofs), size(_size),
                                                    part_num(_part_num) {}
 
@@ -222,7 +246,7 @@ public:
       out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
                                            dest_obj));
 
-      out_crf->set_multipart(part_num, size);
+      out_crf->set_multipart(upload_id, part_num, size);
 
       yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
       if (retcode < 0) {
@@ -235,7 +259,45 @@ public:
     return 0;
   }
 };
-#endif
+
+class RGWAWSAbortMultipartCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *dest_conn;
+  rgw_obj dest_obj;
+
+  string upload_id;
+
+public:
+  RGWAWSAbortMultipartCR(RGWDataSyncEnv *_sync_env,
+                        RGWRESTConn *_dest_conn,
+                        const rgw_obj& _dest_obj,
+                        const string& _upload_id) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   dest_conn(_dest_conn),
+                                                   dest_obj(_dest_obj),
+                                                   upload_id(_upload_id) {}
+
+  int operate() {
+    reenter(this) {
+
+      yield {
+        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+        bufferlist bl;
+        call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn, sync_env->http_manager,
+                                         obj_to_aws_path(dest_obj), params));
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
 
 class RGWAWSInitMultipartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
@@ -279,11 +341,10 @@ public:
     reenter(this) {
 
       yield {
-        string path = dest_obj.bucket.name + "/" + dest_obj.key.name;
         rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
         bufferlist bl;
         call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
-                                                 path, params, bl, &out_bl));
+                                                 obj_to_aws_path(dest_obj), params, bl, &out_bl));
       }
 
       if (retcode < 0) {
@@ -291,7 +352,11 @@ public:
         return set_cr_error(retcode);
       }
       {
-#warning need to cancel upload in case of error here
+        /*
+         * If one of the following fails we cannot abort upload, as we cannot
+         * extract the upload id. If one of these fail it's very likely that that's
+         * the least of our problem.
+         */
         RGWXMLDecoder::XMLParser parser;
         if (!parser.init()) {
           ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;