]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: cloud sync via multipart upload, send attrs on init
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 11 Apr 2018 03:05:33 +0000 (20:05 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 25 Apr 2018 16:03:53 +0000 (09:03 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_conn.h
src/rgw/rgw_sync_module.cc
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_module_aws.cc

index 6d21be1dee69553d3ed11101844f251879e9ad85..dbd863a5846d1871d80618f174fed5409e688c10 100644 (file)
@@ -616,6 +616,7 @@ int RGWAsyncStatRemoteObj::_send_request()
                        nullptr, /* const char *if_match, */
                        nullptr, /* const char *if_nomatch, */
                        pattrs,
+                       pheaders,
                        nullptr,
                        nullptr, /* string *ptag, */
                        petag); /* string *petag, */
index 80b96a3056a6c9349b09e7fddaf55e8670a07e62..82505f3b69dd3a80ffe8218e9f3efe043d4775d8 100644 (file)
@@ -832,6 +832,7 @@ class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
   uint64_t *psize;
   string *petag;
   map<string, bufferlist> *pattrs;
+  map<string, string> *pheaders;
 
 protected:
   int _send_request() override;
@@ -843,14 +844,16 @@ public:
                          ceph::real_time *_pmtime,
                          uint64_t *_psize,
                          string *_petag,
-                         map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
+                         map<string, bufferlist> *_pattrs,
+                         map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
                                                       source_zone(_source_zone),
                                                       bucket_info(_bucket_info),
                                                       key(_key),
                                                       pmtime(_pmtime),
                                                       psize(_psize),
                                                       petag(_petag),
-                                                      pattrs(_pattrs) {}
+                                                      pattrs(_pattrs),
+                                                      pheaders(_pheaders) {}
 };
 
 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
@@ -867,6 +870,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
   uint64_t *psize;
   string *petag;
   map<string, bufferlist> *pattrs;
+  map<string, string> *pheaders;
 
   RGWAsyncStatRemoteObj *req;
 
@@ -878,7 +882,8 @@ public:
                       ceph::real_time *_pmtime,
                       uint64_t *_psize,
                       string *_petag,
-                      map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+                      map<string, bufferlist> *_pattrs,
+                      map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
                                        async_rados(_async_rados), store(_store),
                                        source_zone(_source_zone),
                                        bucket_info(_bucket_info),
@@ -887,6 +892,7 @@ public:
                                        psize(_psize),
                                        petag(_petag),
                                        pattrs(_pattrs),
+                                       pheaders(_pheaders),
                                        req(NULL) {}
 
 
@@ -903,7 +909,7 @@ public:
 
   int send_request() override {
     req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
-                                    bucket_info, key, pmtime, psize, petag, pattrs);
+                                    bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
     async_rados->queue(req);
     return 0;
   }
index d00e5455945792c2c637062c4d57e8a54a53f97a..b9a1f5e0266ce063a44d91f3f20d5cc4c521a479 100644 (file)
@@ -113,7 +113,7 @@ void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs)
   req->get_out_headers(attrs);
 }
 
-int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) {
+int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) {
   /* basic generic implementation */
   for (auto header : headers) {
     const string& val = header.second;
@@ -142,7 +142,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
         extra_data.claim_append(in_cb->get_extra_data());
         map<string, string> attrs;
         req->get_out_headers(&attrs);
-        int ret = decode_rest_obj(attrs, extra_data, &rest_obj);
+        int ret = decode_rest_obj(attrs, extra_data);
         if (ret < 0) {
           ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
           return ret;
index 87fee91ed5794ba4fa315a27cf51111ed94c701f..43448836af4bd9f31a413006bc276c72343a565c 100644 (file)
@@ -118,6 +118,7 @@ class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
   string method;
   string path;
   param_vec_t params;
+  param_vec_t headers;
   T *result;
   bufferlist input_bl;
   bool send_content_length=false;
@@ -127,18 +128,21 @@ class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
  RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
                           RGWHTTPManager *_http_manager,
                           const string& _method, const string& _path,
-                          rgw_http_param_pair *_params, bufferlist& _input, T *_result, bool _send_content_length)
+                          rgw_http_param_pair *_params,
+                          map<string, string> *_headers,
+                          bufferlist& _input, T *_result, bool _send_content_length)
    : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
-    method(_method), path(_path), params(make_param_list(_params)), result(_result),
+    method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_headers)), result(_result),
     input_bl(_input), send_content_length(_send_content_length)
     {}
 
  RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
                           RGWHTTPManager *_http_manager,
                           const string& _method, const string& _path,
-                          rgw_http_param_pair *_params, T *_result)
+                          rgw_http_param_pair *_params, map<string, string> *_headers,
+                          T *_result)
    : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
-    method(_method), path(_path), params(make_param_list(_params)), result(_result)
+    method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_headers)), result(_result)
     {}
 
 
@@ -149,7 +153,7 @@ class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
 
   int send_request() override {
     auto op = boost::intrusive_ptr<RGWRESTSendResource>(
-        new RGWRESTSendResource(conn, method, path, params, nullptr, http_manager));
+        new RGWRESTSendResource(conn, method, path, params, &headers, http_manager));
 
     init_new_io(op.get());
 
@@ -198,8 +202,9 @@ class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR<T> {
   RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
                            RGWHTTPManager *_http_manager,
                            const string& _method, const string& _path,
-                        rgw_http_param_pair *_params,S& _input, T *_result)
-    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, _method, _path, _params, _result){
+                        rgw_http_param_pair *_params, map<string, string> *_headers,
+                        S& _input, T *_result)
+    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, _method, _path, _params, _headers, _result) {
 
     JSONFormatter jf;
     encode_json("data", _input, &jf);
@@ -220,7 +225,7 @@ public:
                         rgw_http_param_pair *_params, S& _input, T *_result)
     : RGWSendRESTResourceCR<S, T>(_cct, _conn, _http_manager,
                             "POST", _path,
-                            _params, _input, _result) {}
+                            _params, nullptr, _input, _result) {}
 };
 
 template <class T>
@@ -230,7 +235,7 @@ class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T> {
                           RGWHTTPManager *_http_manager,
                           const string& _path,
                           rgw_http_param_pair *_params, bufferlist& _input, T *_result)
-    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "PUT", _path, _params, _input, _result, true){}
+    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "PUT", _path, _params, nullptr, _input, _result, true){}
 
 };
 
@@ -240,8 +245,10 @@ class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR <T> {
   RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
                           RGWHTTPManager *_http_manager,
                           const string& _path,
-                          rgw_http_param_pair *_params, bufferlist& _input, T *_result)
-    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "POST", _path, _params, _input, _result, true){}
+                          rgw_http_param_pair *_params,
+                          map<string, string> * _headers,
+                          bufferlist& _input, T *_result)
+    : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "POST", _path, _params, _headers, _input, _result, true){}
 
 };
 
@@ -255,7 +262,7 @@ public:
                         rgw_http_param_pair *_params, S& _input, T *_result)
     : RGWSendRESTResourceCR<S, T>(_cct, _conn, _http_manager,
                                   "PUT", _path,
-                                  _params, _input, _result) {}
+                                  _params, nullptr, _input, _result) {}
 };
 
 class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine {
@@ -361,7 +368,7 @@ protected:
 public:
   virtual int init() = 0;
   virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
-  virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) = 0;
+  virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) = 0;
   virtual bool has_attrs() = 0;
   virtual void get_attrs(std::map<string, string> *attrs) = 0;
   virtual ~RGWStreamReadResourceCRF() = default;
@@ -425,7 +432,7 @@ public:
 
   int init() override;
   int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
-  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override;
+  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override;
   bool has_attrs() override;
   void get_attrs(std::map<string, string> *attrs);
   bool is_done();
index 425306736a0987c9b5898eeb0587b2530b5a6575..9eb866a98698908c683da9284900e3b4c1a6d7b9 100644 (file)
@@ -7814,6 +7814,7 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
                const char *if_match,
                const char *if_nomatch,
                map<string, bufferlist> *pattrs,
+               map<string, string> *pheaders,
                string *version_id,
                string *ptag,
                string *petag)
@@ -7906,7 +7907,11 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
   }
 
   if (pattrs) {
-    *pattrs = src_attrs;
+    *pattrs = std::move(src_attrs);
+  }
+
+  if (pheaders) {
+    *pheaders = std::move(req_headers);
   }
 
   return 0;
index 82b8cabbf3de63d5577215217ef1c1acd2d726e0..fc6ce437e86b8090883eb467feb0e1dca025b9c9 100644 (file)
@@ -3161,6 +3161,7 @@ public:
                const char *if_match,
                const char *if_nomatch,
                map<string, bufferlist> *pattrs,
+               map<string, string> *pheaders,
                string *version_id,
                string *ptag,
                string *petag);
index 416a63cd0f4cb6aa0496337b161d78a71c33a8d7..00a24d2d4d3584e28fd70efccb1755120cbdc1f1 100644 (file)
@@ -55,6 +55,18 @@ inline param_vec_t make_param_list(const rgw_http_param_pair* pp)
   return params;
 }
 
+inline param_vec_t make_param_list(const map<string, string> *pp)
+{
+  param_vec_t params;
+  if (!pp) {
+    return params;
+  }
+  for (auto iter : *pp) {
+    params.emplace_back(make_pair(iter.first, iter.second));
+  }
+  return params;
+}
+
 class RGWRESTConn
 {
   CephContext *cct;
index fb97a6acf391ef5baadb6896c4f910f0b18573c5..196f98541e94a3e4d5d96db955a87c0ec169064a 100644 (file)
@@ -29,7 +29,7 @@ int RGWCallStatRemoteObjCR::operate() {
     yield {
       call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
                                   sync_env->source_zone,
-                                  bucket_info, key, &mtime, &size, &etag, &attrs));
+                                  bucket_info, key, &mtime, &size, &etag, &attrs, &headers));
     }
     if (retcode < 0) {
       ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
@@ -37,11 +37,11 @@ int RGWCallStatRemoteObjCR::operate() {
     }
     ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone
       << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
-      << " attrs=" << attrs << dendl;
+      << " attrs=" << attrs << " headers=" << headers << dendl;
     yield {
       RGWStatRemoteObjCBCR *cb = allocate_callback();
       if (cb) {
-        cb->set_result(mtime, size, etag, std::move(attrs));
+        cb->set_result(mtime, size, etag, std::move(attrs), std::move(headers));
         call(cb);
       }
     }
index 8b06767c0512cfc016429a937995d4d75dce0ff9..dfc26214041f5d57a14164401c07659758b929cb 100644 (file)
@@ -126,6 +126,7 @@ protected:
   uint64_t size = 0;
   string etag;
   map<string, bufferlist> attrs;
+  map<string, string> headers;
 public:
   RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
@@ -134,7 +135,8 @@ public:
   void set_result(ceph::real_time& _mtime,
                   uint64_t _size,
                   const string& _etag,
-                  map<string, bufferlist>&& _attrs) {
+                  map<string, bufferlist>&& _attrs,
+                  map<string, string>&& _headers) {
     mtime = _mtime;
     size = _size;
     etag = _etag;
@@ -147,6 +149,7 @@ class RGWCallStatRemoteObjCR : public RGWCoroutine {
   uint64_t size{0};
   string etag;
   map<string, bufferlist> attrs;
+  map<string, string> headers;
 
 protected:
   RGWDataSyncEnv *sync_env;
index 346504b9a48bb57c5f89534aba63369677805810..56731489238e5012a6638c6c8f18cc703b05daa1 100644 (file)
@@ -676,6 +676,35 @@ struct AWSSyncInstanceEnv {
   }
 };
 
+int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
+{
+  for (auto header : headers) {
+    const string& val = header.second;
+    if (header.first == "RGWX_OBJECT_SIZE") {
+      info->content_len = atoi(val.c_str());
+    } else {
+      info->attrs[header.first] = val;
+    }
+  }
+
+  info->acls.set_ctx(cct);
+  auto aiter = attrs.find(RGW_ATTR_ACL);
+  if (aiter != attrs.end()) {
+    bufferlist& bl = aiter->second;
+    bufferlist::iterator bliter = bl.begin();
+    try {
+      info->acls.decode(bliter);
+    } catch (buffer::error& err) {
+      ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+      return -EIO;
+    }
+  } else {
+    ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+  }
+
+  return 0;
+}
+
 class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
 {
   RGWDataSyncEnv *sync_env;
@@ -727,15 +756,8 @@ public:
     return RGWStreamReadHTTPResourceCRF::init();
   }
 
-  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override {
-    for (auto header : headers) {
-      const string& val = header.second;
-      if (header.first == "RGWX_OBJECT_SIZE") {
-        rest_obj.content_len = atoi(val.c_str());
-      } else {
-        rest_obj.attrs[header.first] = val;
-      }
-    }
+  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
+    map<string, bufferlist> src_attrs;
 
     ldout(sync_env->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
 
@@ -746,28 +768,9 @@ public:
         return -EIO;
       }
 
-      map<string, bufferlist> src_attrs;
-
       JSONDecoder::decode_json("attrs", src_attrs, &jp);
-
-      info->acls.set_ctx(sync_env->cct);
-      auto aiter = src_attrs.find(RGW_ATTR_ACL);
-      if (aiter != src_attrs.end()) {
-        bufferlist& bl = aiter->second;
-        bufferlist::iterator bliter = bl.begin();
-        try {
-          info->acls.decode(bliter);
-        } catch (buffer::error& err) {
-          ldout(sync_env->cct, 0) << "ERROR: failed to decode policy off extra data" << dendl;
-          return -EIO;
-        }
-      } else {
-        ldout(sync_env->cct, 0) << "WARNING: acl attrs not provided in extra data" << dendl;
-      }
     }
-
-    return 0;
-
+    return do_decode_rest_obj(sync_env->cct, src_attrs, headers, &rest_obj);
   }
 
   bool need_extra_data() override {
@@ -813,10 +816,14 @@ public:
     return RGWStreamWriteHTTPResourceCRF::init();
   }
 
-  void send_ready(const rgw_rest_obj& rest_obj) override {
-    RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
+  static void init_send_attrs(CephContext *cct,
+                              const rgw_rest_obj& rest_obj,
+                              const rgw_sync_aws_src_obj_properties& src_properties,
+                              const AWSSyncConfig_Profile *target,
+                              map<string, string> *attrs) {
+    auto& new_attrs = *attrs;
 
-    map<string, string> new_attrs = rest_obj.attrs;
+    new_attrs = rest_obj.attrs;
 
     auto acl = rest_obj.acls.get_acl();
 
@@ -833,7 +840,7 @@ public:
 
         auto iter = am.find(orig_grantee);
         if (iter == am.end()) {
-          ldout(sync_env->cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+          ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
           continue;
         }
 
@@ -903,7 +910,7 @@ public:
         s.append(viter);
       }
 
-      ldout(sync_env->cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+      ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
 
       new_attrs[header_str] = s;
     }
@@ -923,6 +930,17 @@ public:
     if (!rest_obj.key.instance.empty()) {
       new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
     }
+  }
+
+  void send_ready(const rgw_rest_obj& rest_obj) override {
+    RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
+
+    map<string, string> new_attrs;
+    if (!multipart.is_multipart) {
+      init_send_attrs(sync_env->cct, rest_obj, src_properties, target.get(), &new_attrs);
+    } else {
+      new_attrs = rest_obj.attrs;
+    }
 
     r->set_send_length(rest_obj.content_len);
 
@@ -1113,6 +1131,7 @@ class RGWAWSInitMultipartCR : public RGWCoroutine {
   rgw_obj dest_obj;
 
   uint64_t obj_size;
+  map<string, string> obj_headers;
 
   bufferlist out_bl;
 
@@ -1135,11 +1154,13 @@ public:
                         RGWRESTConn *_dest_conn,
                         const rgw_obj& _dest_obj,
                         uint64_t _obj_size,
+                        const map<string, string>& _obj_headers,
                         string *_upload_id) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    dest_conn(_dest_conn),
                                                    dest_obj(_dest_obj),
                                                    obj_size(_obj_size),
+                                                   obj_headers(_obj_headers),
                                                    upload_id(_upload_id) {}
 
   int operate() {
@@ -1149,7 +1170,7 @@ public:
         rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
         bufferlist bl;
         call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
-                                                 obj_to_aws_path(dest_obj), params, bl, &out_bl));
+                                                 obj_to_aws_path(dest_obj), params, &obj_headers, bl, &out_bl));
       }
 
       if (retcode < 0) {
@@ -1260,7 +1281,7 @@ public:
         bl.append(ss.str());
 
         call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
-                                                 obj_to_aws_path(dest_obj), params, bl, &out_bl));
+                                                 obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
       }
 
       if (retcode < 0) {
@@ -1354,9 +1375,12 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   uint64_t obj_size;
   string src_etag;
   rgw_sync_aws_src_obj_properties src_properties;
+  rgw_rest_obj rest_obj;
 
   rgw_sync_aws_multipart_upload_info status;
 
+  map<string, string> obj_headers;
+
   rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
 
   int ret_err{0};
@@ -1371,7 +1395,8 @@ public:
                                 std::shared_ptr<AWSSyncConfig_Profile>& _target,
                                 const rgw_obj& _dest_obj,
                                 uint64_t _obj_size,
-                                const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
+                                const rgw_sync_aws_src_obj_properties& _src_properties,
+                                const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    conf(_conf),
                                                    source_conn(_source_conn),
@@ -1380,6 +1405,7 @@ public:
                                                    dest_obj(_dest_obj),
                                                    obj_size(_obj_size),
                                                    src_properties(_src_properties),
+                                                   rest_obj(_rest_obj),
                                                    status_obj(sync_env->store->get_zone_params().log_pool,
                                                               RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) {
   }
@@ -1406,7 +1432,9 @@ public:
       }
 
       if (retcode == -ENOENT) {
-        yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, &status.upload_id));
+        RGWAWSStreamPutCRF::init_send_attrs(sync_env->cct, rest_obj, src_properties, target.get(), &obj_headers);
+
+        yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, std::move(obj_headers), &status.upload_id));
         if (retcode < 0) {
           return set_cr_error(retcode);
         }
@@ -1626,8 +1654,13 @@ public:
                                                  target,
                                                  dest_obj));
         } else {
+          rgw_rest_obj rest_obj;
+          if (do_decode_rest_obj(sync_env->cct, attrs, headers, &rest_obj)) {
+            ldout(sync_env->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
+            return set_cr_error(-EINVAL);
+          }
           call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj,
-                                                     target, dest_obj, size, src_properties));
+                                                     target, dest_obj, size, src_properties, rest_obj));
         }
       }
       if (retcode < 0) {