]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/CloudTransition: Tier objects to remote cloud
authorSoumya Koduri <skoduri@redhat.com>
Wed, 23 Dec 2020 05:44:53 +0000 (11:14 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Thu, 18 Nov 2021 07:22:47 +0000 (12:52 +0530)
If the storage class configured is of cloud, transition
the objects to remote endpoint configured.

In case the object size is >mulitpart size limit (say 5M),
upload the object into multiparts.

As part of transition, map rgw attributes to http attrs,
including ACLs.

A new attribute (x-amz-meta-source: rgw) is added to denote
that the object is transitioned from RGW source.

Added two new options to tier-config to configure multipart size -
* multipart_sync_threshold - determines the limit of object size,
when exceeded transitioned in multiparts
* multipart_min_part_size - the minimum size of the multipart upload part

Default values for both the options is 32M and minimum value supported
is 5M.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_lc.cc
src/rgw/rgw_lc_tier.cc [new file with mode: 0644]
src/rgw/rgw_lc_tier.h [new file with mode: 0644]
src/rgw/rgw_zone.cc
src/rgw/rgw_zone.h

index 5acbd17a2dd07e8369a99a9262c0efd89374685a..0f1ddd9f5bceec7b46a0cabddfe54df1eb6f47da 100644 (file)
@@ -72,6 +72,7 @@ set(librgw_common_srcs
   rgw_ldap.cc
   rgw_lc.cc
   rgw_lc_s3.cc
+  rgw_lc_tier.cc
   rgw_metadata.cc
   rgw_multi.cc
   rgw_multi_del.cc
index 0bd169f99e7d36b9c6e32c72f669d5046c40ce10..2a20db0acba080c2d2ad63ea172fdd64176af05b 100644 (file)
@@ -349,3 +349,105 @@ int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) {
   return 0;
 }
 
+RGWStreamReadCRF::RGWStreamReadCRF(RGWRados* rados, RGWBucketInfo& bucket_info,
+                  RGWObjectCtx& obj_ctx, rgw_obj& obj) :
+      op_target(rados, bucket_info, obj_ctx, obj), read_op(&op_target) {}
+RGWStreamReadCRF::~RGWStreamReadCRF() {}
+
+RGWStreamWriteCR::RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr,
+                           shared_ptr<RGWStreamReadCRF>& _in_crf,
+                           shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+                           in_crf(_in_crf), out_crf(_out_crf) {}
+RGWStreamWriteCR::~RGWStreamWriteCR() { }
+
+int RGWStreamWriteCR::operate() {
+  off_t ofs;
+  off_t end;
+  int ret;
+  uint64_t read_len = 0;
+  rgw_rest_obj rest_obj;
+
+  reenter(this) {
+    ret = in_crf->init();
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl;
+      return set_cr_error(ret);
+    }
+    in_crf->get_range(ofs, end);
+    rest_obj = in_crf->get_rest_obj();
+
+    do {
+      bl.clear();
+      ret = in_crf->read(ofs, end, bl);
+      if (ret < 0) {
+        ldout(cct, 0) << "ERROR: fail to read object data, ret = " << ret << dendl;
+        return set_cr_error(ret);
+      }
+      read_len = ret;
+      if (retcode < 0) {
+        ldout(cct, 20) << __func__ << ": read_op.read() retcode=" << retcode << dendl;
+        return set_cr_error(ret);
+      }
+
+      if (bl.length() == 0) {
+        break;
+      } 
+
+      if (!sent_attrs) {
+        ret = out_crf->init();
+        if (ret < 0) {
+          ldout(cct, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl;
+          return set_cr_error(ret);
+        }
+              
+        out_crf->send_ready(rest_obj);
+        ret = out_crf->send();
+        if (ret < 0) {
+          return set_cr_error(ret);
+        }
+        sent_attrs = true;
+      }
+
+      total_read += bl.length();
+
+      do {
+        /* Cant do yield here as read_op doesnt work well with yield and results
+         * in deadlock.
+         */
+        ret = out_crf->write(bl, &need_retry);
+        if (ret < 0)  {
+          return set_cr_error(ret);
+        }
+
+        if (retcode < 0) {
+          ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
+          return set_cr_error(ret);
+        }
+      } while (need_retry);
+
+      ofs += read_len;
+
+    } while (ofs <= end);
+
+    do {
+      /* Ensure out_crf is initialized */
+      if (!sent_attrs) {
+        break;
+      }
+
+      /* This has to be under yield. Otherwise sometimes this loop
+       * never finishes, infinitely waiting for req to be done.
+       */
+      yield {
+        int ret = out_crf->drain_writes(&need_retry);
+        if (ret < 0) {
+          return set_cr_error(ret);
+        }
+      }
+    } while (need_retry);
+
+    return set_cr_done();
+  }
+  return 0;
+}
index cb103aeb83455637379a83810cee87f498a3fd79..46d050c5cd763dc6b6af74778763460b25cae740 100644 (file)
@@ -588,3 +588,60 @@ public:
 
   int operate(const DoutPrefixProvider *dpp) override;
 };
+
+class RGWStreamReadCRF {
+public:
+  RGWRados::Object op_target;
+  RGWRados::Object::Read read_op;
+  off_t ofs;
+  off_t end;
+  rgw_rest_obj rest_obj;
+
+  RGWStreamReadCRF(RGWRados* rados, RGWBucketInfo& bucket_info,
+                  RGWObjectCtx& obj_ctx, rgw_obj& obj);
+  virtual ~RGWStreamReadCRF();
+
+  virtual int init() {return 0; }
+  virtual int init_rest_obj() {return 0;}
+
+  int set_range(off_t _ofs, off_t _end) {
+    ofs = _ofs;
+    end = _end;
+
+    return 0;
+  }
+
+  int get_range(off_t &_ofs, off_t &_end) {
+    _ofs = ofs;
+    _end = end;
+
+    return 0;
+  }
+
+  rgw_rest_obj get_rest_obj() {
+    return rest_obj;
+  }
+
+  virtual int read(off_t ofs, off_t end, bufferlist &bl) {return 0;};
+
+};
+
+class RGWStreamWriteCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  string url;
+  std::shared_ptr<RGWStreamReadCRF> in_crf;
+  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+  bufferlist bl;
+  bool need_retry{false};
+  bool sent_attrs{false};
+  uint64_t total_read{0};
+  int ret{0};
+public:
+  RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr,
+                    std::shared_ptr<RGWStreamReadCRF>& _in_crf,
+                    std::shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf);
+  ~RGWStreamWriteCR();
+
+  int operate() override;
+};
index 9f8ab1626c26cfbe727f592b71f9844ddde02614..879f0f9164505951f3a7c621947447d2c3bca17a 100644 (file)
@@ -1449,6 +1449,8 @@ void RGWZoneGroupPlacementTier::dump(Formatter *f) const
   encode_json("tier_storage_class", tier_storage_class, f);
   encode_json("target_path", target_path, f);
   encode_json("acl_mappings", acl_mappings, f);
+  encode_json("multipart_sync_threshold", multipart_sync_threshold, f);
+  encode_json("multipart_min_part_size", multipart_min_part_size, f);
 }
 
 void RGWZoneGroupPlacementTier::decode_json(JSONObj *obj)
@@ -1468,6 +1470,8 @@ void RGWZoneGroupPlacementTier::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("tier_storage_class", tier_storage_class, obj);
   JSONDecoder::decode_json("target_path", target_path, obj);
   JSONDecoder::decode_json("acl_mappings", acl_mappings, obj);
+  JSONDecoder::decode_json("multipart_sync_threshold", multipart_sync_threshold, obj);
+  JSONDecoder::decode_json("multipart_min_part_size", multipart_min_part_size, obj);
 }
 
 void RGWZoneGroupPlacementTarget::dump(Formatter *f) const
index bd6d997ed9e1d17d891fc1fbacab7a6c60502ed2..83f1ae4c65683fe9436a60e0d1ffb189045ab2c7 100644 (file)
@@ -28,6 +28,7 @@
 #include "rgw_multi.h"
 #include "rgw_sal.h"
 #include "rgw_rados.h"
+#include "rgw_lc_tier.h"
 
 // this seems safe to use, at least for now--arguably, we should
 // prefer header-only fmt, in general
@@ -541,6 +542,8 @@ struct lc_op_ctx {
   const DoutPrefixProvider *dpp;
   WorkQ* wq;
 
+  RGWZoneGroupPlacementTier tier = {};
+
   lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
            boost::optional<std::string> next_key_name,
            ceph::real_time effective_mtime,
@@ -1248,32 +1251,113 @@ public:
     return need_to_process;
   }
 
+ /* find out if the the storage class is remote cloud */
+ int get_tier_target(const RGWZoneGroup &zonegroup, rgw_placement_rule& rule,
+         string& storage_class, RGWZoneGroupPlacementTier &tier) {
+   std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
+   titer = zonegroup.placement_targets.find(rule.name);
+   if (titer == zonegroup.placement_targets.end()) {
+     return -1;
+   }
+
+   if (storage_class.empty()) {
+       storage_class = rule.storage_class;
+   }
+
+   const auto& target_rule = titer->second;
+   std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier;
+   ttier = target_rule.tier_targets.find(storage_class);
+   if (ttier != target_rule.tier_targets.end()) {
+       tier = ttier->second;
+   }
+
+   return 0;
+ }
+  int transition_obj_to_cloud(lc_op_ctx& oc) {
+    std::shared_ptr<RGWRESTConn> conn;
+
+    /* init */
+    string id = "cloudid";
+    string endpoint=oc.tier.endpoint; 
+    RGWAccessKey key = oc.tier.key;
+    HostStyle host_style = oc.tier.host_style;
+    string bucket_name = oc.tier.target_path;
+   
+    if (bucket_name.empty()) {
+      bucket_name = "cloud-bucket";
+    }
+
+    conn.reset(new S3RESTConn(oc.cct, oc.store->svc()->zone,
+                                id, { endpoint }, key, host_style));
+
+    /* http_mngr */
+    RGWCoroutinesManager crs(oc.store->ctx(), oc.store->getRados()->get_cr_registry());
+    RGWHTTPManager http_manager(oc.store->ctx(), crs.get_completion_mgr());
+
+    int ret = http_manager.start();
+    if (ret < 0) {
+      ldpp_dout(oc.dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
+      return ret;
+    }
+
+    RGWLCCloudTierCtx tier_ctx(oc.cct, oc.o, oc.store, oc.bucket_info,
+                        oc.obj, oc.rctx, conn, bucket_name, oc.tier.tier_storage_class,
+                        &http_manager);
+    tier_ctx.acl_mappings = oc.tier.acl_mappings;
+    tier_ctx.multipart_min_part_size = oc.tier.multipart_min_part_size;
+    tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold;
+
+    ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
+    http_manager.stop();
+         
+    if (ret < 0) {
+      ldpp_dout(oc.dpp, 0) << "failed in RGWCloudTierCR() ret=" << ret << dendl;
+      return ret;
+    }
+
+    return 0;
+  }
+
   int process(lc_op_ctx& oc) {
     auto& o = oc.o;
+    int r;
+    std::string tier_type = ""; 
+    const RGWZoneGroup& zonegroup = oc.store->svc()->zone->get_zonegroup();
 
     rgw_placement_rule target_placement;
     target_placement.inherit_from(oc.bucket->get_placement_rule());
     target_placement.storage_class = transition.storage_class;
 
-    if (!oc.store->get_zone()->get_params().
-       valid_placement(target_placement)) {
-      ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
-                          << target_placement
-                           << " bucket="<< oc.bucket
-                           << " rule_id=" << oc.op.id
-                          << " " << oc.wq->thr_name() << dendl;
-      return -EINVAL;
-    }
+    r = get_tier_target(zonegroup, target_placement, target_placement.storage_class, oc.tier);
 
-    int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime,
-                              o.versioned_epoch, oc.dpp, null_yield);
-    if (r < 0) {
-      ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " 
-                          << oc.bucket << ":" << o.key
-                          << " -> " << transition.storage_class 
-                          << " " << cpp_strerror(r)
-                          << " " << oc.wq->thr_name() << dendl;
-      return r;
+    if (!r && oc.tier.tier_type == "cloud") {
+         ldpp_dout(oc.dpp, 0) << "Found cloud tier: " << target_placement.storage_class << dendl;
+        r = transition_obj_to_cloud(oc);
+        if (r < 0) {
+            ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj to cloud (r=" << r << ")"
+                               << dendl;
+        }
+    } else {
+      if (!oc.store->get_zone()->get_params().
+           valid_placement(target_placement)) {
+        ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
+                            << target_placement
+                             << " bucket="<< oc.bucket
+                             << " rule_id=" << oc.op.id
+                            << " " << oc.wq->thr_name() << dendl;
+        return -EINVAL;
+      }
+
+      int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime,
+                                o.versioned_epoch, oc.dpp, null_yield);
+      if (r < 0) {
+        ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " 
+                            << oc.bucket << ":" << o.key 
+                            << " -> " << transition.storage_class 
+                            << " " << cpp_strerror(r)
+                            << " " << oc.wq->thr_name() << dendl;
+        return r;
+      }
     }
     ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket
                         << ":" << o.key << " -> "
diff --git a/src/rgw/rgw_lc_tier.cc b/src/rgw/rgw_lc_tier.cc
new file mode 100644 (file)
index 0000000..e1582c4
--- /dev/null
@@ -0,0 +1,1046 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <string.h>
+#include <iostream>
+#include <map>
+
+#include "common/Formatter.h"
+#include <common/errno.h>
+#include "rgw_lc.h"
+#include "rgw_lc_tier.h"
+#include "rgw_string.h"
+#include "rgw_zone.h"
+#include "rgw_common.h"
+#include "rgw_rest.h"
+
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+
+#include <boost/asio/yield.hpp>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+
+static string get_key_oid(const rgw_obj_key& key)
+{
+  string oid = key.name;
+  if (!key.instance.empty() &&
+      !key.have_null_instance()) {
+    oid += string(":") + key.instance;
+  }
+  return oid;
+}
+
+static string obj_to_aws_path(const rgw_obj& obj)
+{
+  string path = obj.bucket.name + "/" + get_key_oid(obj.key);
+  return path;
+}
+
+static std::set<string> keep_headers = { "CONTENT_TYPE",
+  "CONTENT_ENCODING",
+  "CONTENT_DISPOSITION",
+  "CONTENT_LANGUAGE" };
+
+/*
+ * mapping between rgw object attrs and output http fields
+ *
+ static const struct rgw_http_attr base_rgw_to_http_attrs[] = {
+ { RGW_ATTR_CONTENT_LANG,      "Content-Language" },
+ { RGW_ATTR_EXPIRES,           "Expires" },
+ { RGW_ATTR_CACHE_CONTROL,     "Cache-Control" },
+ { RGW_ATTR_CONTENT_DISP,      "Content-Disposition" },
+ { RGW_ATTR_CONTENT_ENC,       "Content-Encoding" },
+ { RGW_ATTR_USER_MANIFEST,     "X-Object-Manifest" },
+ { RGW_ATTR_X_ROBOTS_TAG ,     "X-Robots-Tag" },
+ { RGW_ATTR_STORAGE_CLASS ,    "X-Amz-Storage-Class" },
+// RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION header depends on access mode:
+// S3 endpoint: x-amz-website-redirect-location
+// S3Website endpoint: Location
+{ RGW_ATTR_AMZ_WEBSITE_REDIRECT_LOCATION, "x-amz-website-redirect-location" },
+}; */
+
+static void init_headers(map<string, bufferlist>& attrs,
+                         map<string, string>& headers)
+{
+  for (auto kv : attrs) {
+    const char * name = kv.first.c_str();
+    const auto aiter = rgw_to_http_attrs.find(name);
+
+    if (aiter != std::end(rgw_to_http_attrs)) {
+      headers[aiter->second] = rgw_bl_str(kv.second);
+    } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) {
+      // this attr has an extra length prefix from encode() in prior versions
+      headers["X-Object-Meta-Static-Large-Object"] = "True";
+    } else if (strncmp(name, RGW_ATTR_META_PREFIX,
+          sizeof(RGW_ATTR_META_PREFIX)-1) == 0) {
+      name += sizeof(RGW_ATTR_META_PREFIX) - 1;
+      string sname(name);
+      string name_prefix = "X-Object-Meta-";
+      char full_name_buf[name_prefix.size() + sname.size() + 1];
+      snprintf(full_name_buf, sizeof(full_name_buf), "%.*s%.*s",
+          static_cast<int>(name_prefix.length()),
+          name_prefix.data(),
+          static_cast<int>(sname.length()),
+          sname.data());
+      headers[full_name_buf] = rgw_bl_str(kv.second);
+    } else if (strcmp(name,RGW_ATTR_CONTENT_TYPE) == 0) {
+      /* Verify if its right way to copy this field */
+      headers["CONTENT_TYPE"] = rgw_bl_str(kv.second);
+    }
+  }
+}
+
+class RGWLCStreamReadCRF : public RGWStreamReadCRF
+{
+  CephContext *cct;
+  map<string, bufferlist> attrs;
+  uint64_t obj_size;
+  rgw_obj& obj;
+  const real_time &mtime;
+
+  bool multipart;
+  uint64_t m_part_size;
+  off_t m_part_off;
+  off_t m_part_end;
+
+  public:
+  RGWLCStreamReadCRF(CephContext *_cct, RGWRados* rados, RGWBucketInfo& bucket_info,
+      RGWObjectCtx& obj_ctx, rgw_obj& _obj, const real_time &_mtime) : RGWStreamReadCRF(rados, bucket_info, obj_ctx, _obj), cct(_cct), obj(_obj), mtime(_mtime) {}
+
+  ~RGWLCStreamReadCRF() {};
+
+  void set_multipart(uint64_t part_size, off_t part_off, off_t part_end) {
+    multipart = true;
+    m_part_size = part_size;
+    m_part_off = part_off;
+    m_part_end = part_end;
+  }
+
+  int init() override {
+    optional_yield y = null_yield;
+    real_time read_mtime;
+
+    read_op.params.attrs = &attrs;
+    read_op.params.lastmod = &read_mtime;
+    read_op.params.obj_size = &obj_size;
+
+    int ret = read_op.prepare(y);
+    if (ret < 0) {
+          ldout(cct, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
+      return ret;
+    }
+
+    if (read_mtime != mtime) {
+      /* raced */
+      return -ECANCELED;
+    }
+
+    ret = init_rest_obj();
+    if (ret < 0) {
+          ldout(cct, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
+      return ret;
+    }
+
+    if (!multipart) {
+      set_range(0, obj_size - 1);
+    } else {
+      set_range(m_part_off, m_part_end);
+    }
+    return 0;
+  }
+
+  int init_rest_obj() override {
+    /* Initialize rgw_rest_obj. 
+     * Reference: do_decode_rest_obj
+     * Check how to copy headers content */ 
+    rest_obj.init(obj.key);
+
+    if (!multipart) {
+      rest_obj.content_len = obj_size;
+    } else {
+      rest_obj.content_len = m_part_size;
+    }
+
+    /* For mulitpart attrs are sent as prt of InitMultipartCR itself */
+    if (multipart) {
+      return 0;
+    }
+    /*
+     * XXX: verify if its right way to copy attrs into
+     * rest obj
+     */
+    init_headers(attrs, rest_obj.attrs);
+
+    rest_obj.acls.set_ctx(cct);
+    auto aiter = attrs.find(RGW_ATTR_ACL);
+    if (aiter != attrs.end()) {
+      bufferlist& bl = aiter->second;
+      auto bliter = bl.cbegin();
+      try {
+        rest_obj.acls.decode(bliter);
+      } catch (buffer::error& err) {
+        ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+        return -EIO;
+      }
+    } else {
+      ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+    }
+    return 0;
+  }
+
+  int read(off_t ofs, off_t end, bufferlist &bl) {
+    optional_yield y = null_yield;
+
+    return read_op.read(ofs, end, bl, y);
+  }
+};
+
+
+class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
+{
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  rgw_lc_obj_properties obj_properties;
+  std::shared_ptr<RGWRESTConn> conn;
+  rgw::sal::RGWObject* dest_obj;
+  string etag;
+
+  public:
+  RGWLCStreamPutCRF(CephContext *_cct,
+      RGWCoroutinesEnv *_env,
+      RGWCoroutine *_caller,
+      RGWHTTPManager *_http_manager,
+      const rgw_lc_obj_properties&  _obj_properties,
+      std::shared_ptr<RGWRESTConn> _conn,
+      rgw::sal::RGWObject* _dest_obj) :
+    RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager),
+    cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
+    }
+
+
+  int init() override {
+    /* init output connection */
+    RGWRESTStreamS3PutObj *out_req{nullptr};
+
+    if (multipart.is_multipart) {
+      char buf[32];
+      snprintf(buf, sizeof(buf), "%d", multipart.part_num);
+      rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
+        { "partNumber", buf },
+        { nullptr, nullptr } };
+      conn->put_obj_send_init(dest_obj, params, &out_req);
+    } else {
+      conn->put_obj_send_init(dest_obj, nullptr, &out_req);
+    }
+
+    set_req(out_req);
+
+    return RGWStreamWriteHTTPResourceCRF::init();
+  }
+
+  static bool keep_attr(const string& h) {
+    return (keep_headers.find(h) != keep_headers.end() ||
+        boost::algorithm::starts_with(h, "X_AMZ_"));
+  }
+
+  static void init_send_attrs(CephContext *cct,
+      const rgw_rest_obj& rest_obj,
+      const rgw_lc_obj_properties& obj_properties,
+      map<string, string> *attrs) {
+
+    map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
+    string target_storage_class = obj_properties.target_storage_class;
+
+    auto& new_attrs = *attrs;
+
+    new_attrs.clear();
+
+    for (auto& hi : rest_obj.attrs) {
+      if (keep_attr(hi.first)) {
+        new_attrs.insert(hi);
+      }
+    }
+
+    auto acl = rest_obj.acls.get_acl();
+
+    map<int, vector<string> > access_map;
+
+    if (!acl_mappings.empty()) {
+      for (auto& grant : acl.get_grant_map()) {
+        auto& orig_grantee = grant.first;
+        auto& perm = grant.second;
+
+        string grantee;
+
+        const auto& am = acl_mappings;
+
+        auto iter = am.find(orig_grantee);
+        if (iter == am.end()) {
+          ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+          continue;
+        }
+
+        grantee = iter->second.dest_id;
+
+        string type;
+
+        switch (iter->second.type) {
+          case ACL_TYPE_CANON_USER:
+            type = "id";
+            break;
+          case ACL_TYPE_EMAIL_USER:
+            type = "emailAddress";
+            break;
+          case ACL_TYPE_GROUP:
+            type = "uri";
+            break;
+          default:
+            continue;
+        }
+
+        string tv = type + "=" + grantee;
+
+        int flags = perm.get_permission().get_permissions();
+        if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
+          access_map[flags].push_back(tv);
+          continue;
+        }
+
+        for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
+          if (flags & i) {
+            access_map[i].push_back(tv);
+          }
+        }
+      }
+    }
+
+    for (auto aiter : access_map) {
+      int grant_type = aiter.first;
+
+      string header_str("x-amz-grant-");
+
+      switch (grant_type) {
+        case RGW_PERM_READ:
+          header_str.append("read");
+          break;
+        case RGW_PERM_WRITE:
+          header_str.append("write");
+          break;
+        case RGW_PERM_READ_ACP:
+          header_str.append("read-acp");
+          break;
+        case RGW_PERM_WRITE_ACP:
+          header_str.append("write-acp");
+          break;
+        case RGW_PERM_FULL_CONTROL:
+          header_str.append("full-control");
+          break;
+      }
+
+      string s;
+
+      for (auto viter : aiter.second) {
+        if (!s.empty()) {
+          s.append(", ");
+        }
+        s.append(viter);
+      }
+
+      ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+
+      new_attrs[header_str] = s;
+    }
+
+    /* Copy target storage class */
+    if (!target_storage_class.empty()) {
+      new_attrs["x-amz-storage-class"] = target_storage_class;
+    } else {
+      new_attrs["x-amz-storage-class"] = "STANDARD";
+    }
+
+    /* New attribute to specify its transitioned from RGW */
+    new_attrs["x-amz-meta-rgwx-source"] = "rgw";
+
+    char buf[32];
+    snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
+    new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
+
+    utime_t ut(obj_properties.mtime);
+    snprintf(buf, sizeof(buf), "%lld.%09lld",
+        (long long)ut.sec(),
+        (long long)ut.nsec());
+
+    new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
+    new_attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
+    new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
+    if (!rest_obj.key.instance.empty()) {
+      new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
+    }
+  }
+
+  void send_ready(const rgw_rest_obj& rest_obj) override {
+    RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
+
+    map<string, string> new_attrs;
+    if (!multipart.is_multipart) {
+      init_send_attrs(cct, rest_obj, obj_properties, &new_attrs);
+    }
+
+    r->set_send_length(rest_obj.content_len);
+
+    RGWAccessControlPolicy policy;
+
+    r->send_ready(conn->get_key(), new_attrs, policy, false);
+  }
+
+  void handle_headers(const map<string, string>& headers) {
+    for (auto h : headers) {
+      if (h.first == "ETAG") {
+        etag = h.second;
+      }
+    }
+  }
+
+  bool get_etag(string *petag) {
+    if (etag.empty()) {
+      return false;
+    }
+    *petag = etag;
+    return true;
+  }
+};
+
+
+
+class RGWLCStreamObjToCloudPlainCR : public RGWCoroutine {
+  RGWLCCloudTierCtx& tier_ctx;
+
+  std::shared_ptr<RGWStreamReadCRF> in_crf;
+  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+  public:
+  RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx)
+    : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {}
+
+  int operate() override {
+    rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
+        tier_ctx.o.meta.etag,
+        tier_ctx.o.versioned_epoch,
+        tier_ctx.acl_mappings,
+        tier_ctx.target_storage_class);
+
+    rgw_bucket target_bucket;
+    string target_obj_name;
+
+    target_bucket.name = tier_ctx.target_bucket_name;
+    target_obj_name = tier_ctx.obj.key.name; // cross check with aws module
+
+    std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
+    dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+
+    std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
+    dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+
+
+    reenter(this) {
+      /* Prepare Read from source */
+      in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+
+      out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
+            (RGWHTTPManager*)(tier_ctx.http_manager),
+            obj_properties, tier_ctx.conn, static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
+
+      /* actual Read & Write */
+      yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done(); 
+    }
+
+    return 0;
+  }
+};
+
+class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine {
+  RGWLCCloudTierCtx& tier_ctx;
+
+  string upload_id;
+
+  rgw_lc_multipart_part_info part_info;
+
+  string *petag;
+  std::shared_ptr<RGWStreamReadCRF> in_crf;
+  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+  public:
+  RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx,
+      const string& _upload_id,
+      const rgw_lc_multipart_part_info& _part_info,
+      string *_petag)
+    : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
+    upload_id(_upload_id),
+    part_info(_part_info),
+    petag(_petag) {}
+
+  int operate() override {
+    rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
+        tier_ctx.o.meta.etag,
+        tier_ctx.o.versioned_epoch,
+        tier_ctx.acl_mappings,
+        tier_ctx.target_storage_class);
+    rgw_bucket target_bucket;
+    string target_obj_name;
+    off_t end;
+
+    target_bucket.name = tier_ctx.target_bucket_name;
+    target_obj_name = tier_ctx.obj.key.name; // cross check with aws module
+
+    std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
+    dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+
+    std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
+    dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+
+    reenter(this) {
+      /* Prepare Read from source */
+      in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+
+      end = part_info.ofs + part_info.size - 1;
+      std::static_pointer_cast<RGWLCStreamReadCRF>(in_crf)->set_multipart(part_info.size, part_info.ofs, end);
+
+      /* Prepare write */
+      out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
+            (RGWHTTPManager*)(tier_ctx.http_manager),
+            obj_properties, tier_ctx.conn, static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
+
+      out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
+
+      /* actual Read & Write */
+      yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      if (!(static_cast<RGWLCStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
+        return set_cr_error(-EIO);
+      }
+
+      return set_cr_done(); 
+    }
+
+    return 0;
+  }
+};
+
+class RGWLCAbortMultipartCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  RGWRESTConn *dest_conn;
+  rgw_obj dest_obj;
+
+  string upload_id;
+
+  public:
+  RGWLCAbortMultipartCR(CephContext *_cct,
+      RGWHTTPManager *_http_manager,
+      RGWRESTConn *_dest_conn,
+      const rgw_obj& _dest_obj,
+      const string& _upload_id) : RGWCoroutine(_cct),
+  cct(_cct), http_manager(_http_manager),
+  dest_conn(_dest_conn),
+  dest_obj(_dest_obj),
+  upload_id(_upload_id) {}
+
+  int operate() override {
+    reenter(this) {
+
+      yield {
+        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+        bufferlist bl;
+        call(new RGWDeleteRESTResourceCR(cct, dest_conn, http_manager,
+              obj_to_aws_path(dest_obj), params));
+      }
+
+      if (retcode < 0) {
+        ldout(cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
+class RGWLCInitMultipartCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  RGWRESTConn *dest_conn;
+  rgw_obj dest_obj;
+
+  uint64_t obj_size;
+  map<string, string> attrs;
+
+  bufferlist out_bl;
+
+  string *upload_id;
+
+  struct InitMultipartResult {
+    string bucket;
+    string key;
+    string upload_id;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+      RGWXMLDecoder::decode_xml("Key", key, obj);
+      RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
+    }
+  } result;
+
+  public:
+  RGWLCInitMultipartCR(CephContext *_cct,
+      RGWHTTPManager *_http_manager,
+      RGWRESTConn *_dest_conn,
+      const rgw_obj& _dest_obj,
+      uint64_t _obj_size,
+      const map<string, string>& _attrs,
+      string *_upload_id) : RGWCoroutine(_cct),
+  cct(_cct),
+  http_manager(_http_manager),
+  dest_conn(_dest_conn),
+  dest_obj(_dest_obj),
+  obj_size(_obj_size),
+  attrs(_attrs),
+  upload_id(_upload_id) {}
+
+  int operate() override {
+    reenter(this) {
+
+      yield {
+        rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
+        bufferlist bl;
+        call(new RGWPostRawRESTResourceCR <bufferlist> (cct, dest_conn, http_manager,
+              obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
+      }
+
+      if (retcode < 0) {
+        ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+        return set_cr_error(retcode);
+      }
+      {
+        /*
+         * If one of the following fails we cannot abort upload, as we cannot
+         * extract the upload id. If one of these fail it's very likely that that's
+         * the least of our problem.
+         */
+        RGWXMLDecoder::XMLParser parser;
+        if (!parser.init()) {
+          ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        try {
+          RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
+        } catch (RGWXMLDecoder::err& err) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+      }
+
+      ldout(cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
+
+      *upload_id = result.upload_id;
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
+class RGWLCCompleteMultipartCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  RGWRESTConn *dest_conn;
+  rgw_obj dest_obj;
+
+  bufferlist out_bl;
+
+  string upload_id;
+
+  struct CompleteMultipartReq {
+    map<int, rgw_lc_multipart_part_info> parts;
+
+    explicit CompleteMultipartReq(const map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
+
+    void dump_xml(Formatter *f) const {
+      for (auto p : parts) {
+        f->open_object_section("Part");
+        encode_xml("PartNumber", p.first, f);
+        encode_xml("ETag", p.second.etag, f);
+        f->close_section();
+      };
+    }
+  } req_enc;
+
+  struct CompleteMultipartResult {
+    string location;
+    string bucket;
+    string key;
+    string etag;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("Location", bucket, obj);
+      RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+      RGWXMLDecoder::decode_xml("Key", key, obj);
+      RGWXMLDecoder::decode_xml("ETag", etag, obj);
+    }
+  } result;
+
+  public:
+  RGWLCCompleteMultipartCR(CephContext *_cct,
+      RGWHTTPManager *_http_manager,
+      RGWRESTConn *_dest_conn,
+      const rgw_obj& _dest_obj,
+      string _upload_id,
+      const map<int, rgw_lc_multipart_part_info>& _parts) : RGWCoroutine(_cct),
+  cct(_cct), http_manager(_http_manager),
+  dest_conn(_dest_conn),
+  dest_obj(_dest_obj),
+  upload_id(_upload_id),
+  req_enc(_parts) {}
+
+  int operate() override {
+    reenter(this) {
+
+      yield {
+        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+        stringstream ss;
+        XMLFormatter formatter;
+
+        encode_xml("CompleteMultipartUpload", req_enc, &formatter);
+
+        formatter.flush(ss);
+
+        bufferlist bl;
+        bl.append(ss.str());
+
+        call(new RGWPostRawRESTResourceCR <bufferlist> (cct, dest_conn, http_manager,
+              obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
+      }
+
+      if (retcode < 0) {
+        ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+        return set_cr_error(retcode);
+      }
+      {
+        /*
+         * If one of the following fails we cannot abort upload, as we cannot
+         * extract the upload id. If one of these fail it's very likely that that's
+         * the least of our problem.
+         */
+        RGWXMLDecoder::XMLParser parser;
+        if (!parser.init()) {
+          ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        try {
+          RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
+        } catch (RGWXMLDecoder::err& err) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+      }
+
+      ldout(cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
+
+class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  RGWRESTConn *dest_conn;
+  const rgw_obj dest_obj;
+  const rgw_raw_obj status_obj;
+
+  string upload_id;
+
+  public:
+
+  RGWLCStreamAbortMultipartUploadCR(CephContext *_cct,
+      RGWHTTPManager *_http_manager,
+      RGWRESTConn *_dest_conn,
+      const rgw_obj& _dest_obj,
+      const rgw_raw_obj& _status_obj,
+      const string& _upload_id) : RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager),
+  dest_conn(_dest_conn),
+  dest_obj(_dest_obj),
+  status_obj(_status_obj),
+  upload_id(_upload_id) {}
+
+  int operate() override {
+    reenter(this) {
+      yield call(new RGWLCAbortMultipartCR(cct, http_manager, dest_conn, dest_obj, upload_id));
+      if (retcode < 0) {
+        ldout(cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
+        /* ignore error, best effort */
+      }
+#ifdef TODO_STATUS_OBJ
+      yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
+      if (retcode < 0) {
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
+        /* ignore error, best effort */
+      }
+#endif
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
+class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
+  RGWLCCloudTierCtx& tier_ctx;
+  RGWRESTConn *source_conn;
+  rgw_obj src_obj;
+  rgw_obj dest_obj;
+
+  uint64_t obj_size;
+  string src_etag;
+  rgw_rest_obj rest_obj;
+
+  rgw_lc_multipart_upload_info status;
+
+  map<string, string> new_attrs;
+
+  rgw_lc_multipart_part_info *pcur_part_info{nullptr};
+
+  int ret_err{0};
+
+  rgw_raw_obj status_obj;
+
+  public:
+  RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) : RGWCoroutine(_tier_ctx.cct),  tier_ctx(_tier_ctx) {}
+
+  int operate() override {
+    rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
+        tier_ctx.o.meta.etag,
+        tier_ctx.o.versioned_epoch,
+        tier_ctx.acl_mappings,
+        tier_ctx.target_storage_class);
+    bool init_multipart{false};
+
+    rgw_obj& obj = tier_ctx.obj;
+    obj_size = tier_ctx.o.meta.size;
+
+    rgw_bucket target_bucket;
+    target_bucket.name = tier_ctx.target_bucket_name;
+    string target_obj_name = obj.key.name; // cross check with aws module
+    rgw_obj dest_obj(target_bucket, target_obj_name);
+    std::shared_ptr<RGWStreamReadCRF> in_crf;
+    rgw_rest_obj rest_obj;
+
+    reenter(this) {
+#ifdef TODO_STATUS_OBJ
+      yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(tier_ctx.async_rados, tier_ctx.store->svc()->sysobj,
+            status_obj, &status, false));
+
+      if (retcode < 0 && retcode != -ENOENT) {
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
+        return retcode;
+      }
+
+      if (retcode >= 0) {
+        /* check here that mtime and size did not change */
+        if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
+            status.src_properties.etag != src_properties.etag) {
+          yield call(new RGWLCStreamAbortMultipartUploadCR( tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+          retcode = -ENOENT;
+        }
+      }
+
+      if (retcode == -ENOENT) {
+      }
+#endif
+      if (!init_multipart) {
+        in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+
+        in_crf->init();
+
+        rest_obj = in_crf->get_rest_obj();
+
+        RGWLCStreamPutCRF::init_send_attrs(tier_ctx.cct, rest_obj, obj_properties, &new_attrs);
+
+        yield call(new RGWLCInitMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, obj_size, std::move(new_attrs), &status.upload_id));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+
+        init_multipart = true;
+        status.obj_size = obj_size;
+#define MULTIPART_MAX_PARTS 10000
+        uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
+        uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
+
+        if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+          min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+        }
+
+        status.part_size = std::max(min_conf_size, min_part_size);
+        status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
+        status.cur_part = 1;
+      }
+
+      for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
+        ldout(tier_ctx.cct, 20) << "status.cur_part = "<<status.cur_part <<", info.ofs = "<< status.cur_ofs <<", info.size = "<< status.part_size<< ", obj size = " << status.obj_size<<dendl;
+        yield {
+          rgw_lc_multipart_part_info& cur_part_info = status.parts[status.cur_part];
+          cur_part_info.part_num = status.cur_part;
+          cur_part_info.ofs = status.cur_ofs;
+          cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
+
+          pcur_part_info = &cur_part_info;
+
+          status.cur_ofs += status.part_size;
+
+          call(new RGWLCStreamObjToCloudMultipartPartCR(tier_ctx,
+                status.upload_id,
+                cur_part_info,
+                &cur_part_info.etag));
+        }
+
+        if (retcode < 0) {
+          ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+          ret_err = retcode;
+          yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+          return set_cr_error(ret_err);
+        }
+
+#ifdef TODO_STATUS_OBJ
+        yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(sync_env->async_rados, sync_env->store->svc()->sysobj, status_obj, status));
+        if (retcode < 0) {
+          ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
+          /* continue with upload anyway */
+        }
+#endif
+        ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
+      }
+
+      yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts));
+      if (retcode < 0) {
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+        ret_err = retcode;
+        yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status_obj, status.upload_id));
+        return set_cr_error(ret_err);
+      }
+
+#ifdef TODO_STATUS_OBJ
+      /* remove status obj */
+      yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
+      if (retcode < 0) {
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+        /* ignore error, best effort */
+      }
+#endif
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
+int RGWLCCloudTierCR::operate() {
+  reenter(this) {
+
+    yield {
+      // xxx: find if bucket is already created
+      ldout(tier_ctx.cct,0) << "Cloud_tier_ctx: creating bucket " << tier_ctx.target_bucket_name << dendl;
+      bufferlist bl;
+      call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
+            tier_ctx.http_manager,
+            tier_ctx.target_bucket_name, nullptr, bl, &out_bl));
+    }
+    if (retcode < 0 ) {
+      RGWXMLDecoder::XMLParser parser;
+      if (!parser.init()) {
+        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+        string str(out_bl.c_str(), out_bl.length());
+        ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+        return set_cr_error(retcode);
+      }
+
+      try {
+        RGWXMLDecoder::decode_xml("Error", result, &parser, true);
+      } catch (RGWXMLDecoder::err& err) {
+        string str(out_bl.c_str(), out_bl.length());
+        ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if ((result.code != "BucketAlreadyOwnedByYou") &&
+          (result.code != "BucketAlreadyExists")) {
+        return set_cr_error(retcode);
+      }
+    }
+
+    bucket_created = true;
+
+    yield {
+      uint64_t size = tier_ctx.o.meta.size;
+      uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
+
+      if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+        multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+      }
+
+      if (size < multipart_sync_threshold) {
+        call (new RGWLCStreamObjToCloudPlainCR(tier_ctx));
+      } else {
+        call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx));
+
+      } 
+    }
+
+    if (retcode < 0) {
+      return set_cr_error(retcode);
+    }
+
+    return set_cr_done();
+  } //reenter
+
+  return 0;
+}
+
diff --git a/src/rgw/rgw_lc_tier.h b/src/rgw/rgw_lc_tier.h
new file mode 100644 (file)
index 0000000..0a4da62
--- /dev/null
@@ -0,0 +1,169 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef CEPH_RGW_LC_TIER_H
+#define CEPH_RGW_LC_TIER_H
+
+#include "rgw_lc.h"
+#include "rgw_cr_rados.h"
+#include "rgw_rest_conn.h"
+#include "rgw_cr_rest.h"
+#include "rgw_coroutine.h"
+#include "rgw_rados.h"
+#include "rgw_zone.h"
+
+#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
+#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
+
+struct RGWLCCloudTierCtx {
+  CephContext *cct;
+
+  /* Source */
+  rgw_bucket_dir_entry& o;
+  rgw::sal::RGWRadosStore *store;
+  RGWBucketInfo& bucket_info;
+
+  rgw_obj obj;
+  RGWObjectCtx& rctx;
+
+  /* Remote */
+  std::shared_ptr<RGWRESTConn> conn;
+  string target_bucket_name;
+  string target_storage_class;
+  RGWHTTPManager *http_manager;
+
+  map<string, RGWTierACLMapping> acl_mappings;
+  uint64_t multipart_min_part_size;
+  uint64_t multipart_sync_threshold;
+
+  RGWLCCloudTierCtx(CephContext* _cct, rgw_bucket_dir_entry& _o,
+            rgw::sal::RGWRadosStore* _store, RGWBucketInfo &_binfo, rgw_obj _obj,
+            RGWObjectCtx& _rctx, std::shared_ptr<RGWRESTConn> _conn, string _bucket,
+            string _storage_class, RGWHTTPManager *_http)
+            : cct(_cct), o(_o), store(_store), bucket_info(_binfo),
+              obj(_obj), rctx(_rctx), conn(_conn), target_bucket_name(_bucket),
+              target_storage_class(_storage_class), http_manager(_http) {}
+};
+
+class RGWLCCloudTierCR : public RGWCoroutine {
+  RGWLCCloudTierCtx& tier_ctx;
+  bufferlist out_bl;
+  int retcode;
+  bool bucket_created = false;
+  struct CreateBucketResult {
+    string code;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("Code", code, obj);
+    }
+  } result;
+
+  public:
+    RGWLCCloudTierCR(RGWLCCloudTierCtx& _tier_ctx) :
+          RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {}
+
+    int operate() override;
+};
+
+struct rgw_lc_multipart_part_info {
+  int part_num{0};
+  uint64_t ofs{0};
+  uint64_t size{0};
+  string etag;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(part_num, bl);
+    encode(ofs, bl);
+    encode(size, bl);
+    encode(etag, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(part_num, bl);
+    decode(ofs, bl);
+    decode(size, bl);
+    decode(etag, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_lc_multipart_part_info)
+
+struct rgw_lc_obj_properties {
+  ceph::real_time mtime;
+  string etag;
+  uint64_t versioned_epoch{0};
+  map<string, RGWTierACLMapping>& target_acl_mappings;
+  string target_storage_class;
+
+  rgw_lc_obj_properties(ceph::real_time _mtime, string _etag,
+                        uint64_t _versioned_epoch, map<string,
+                        RGWTierACLMapping>& _t_acl_mappings,
+                        string _t_storage_class) :
+                        mtime(_mtime), etag(_etag),
+                        versioned_epoch(_versioned_epoch),
+                        target_acl_mappings(_t_acl_mappings),
+                        target_storage_class(_t_storage_class) {}
+  
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(mtime, bl);
+    encode(etag, bl);
+    encode(versioned_epoch, bl);
+    encode(target_acl_mappings, bl);
+    encode(target_storage_class, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(mtime, bl);
+    decode(etag, bl);
+    decode(versioned_epoch, bl);
+    decode(target_acl_mappings, bl);
+    decode(target_storage_class, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_lc_obj_properties)
+
+struct rgw_lc_multipart_upload_info {
+  string upload_id;
+  uint64_t obj_size;
+  uint32_t part_size{0};
+  uint32_t num_parts{0};
+
+  int cur_part{0};
+  uint64_t cur_ofs{0};
+
+  std::map<int, rgw_lc_multipart_part_info> parts;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(upload_id, bl);
+    encode(obj_size, bl);
+    encode(part_size, bl);
+    encode(num_parts, bl);
+    encode(cur_part, bl);
+    encode(cur_ofs, bl);
+    encode(parts, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(upload_id, bl);
+    decode(obj_size, bl);
+    decode(part_size, bl);
+    decode(num_parts, bl);
+    decode(cur_part, bl);
+    decode(cur_ofs, bl);
+    decode(parts, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info)
+
+#endif
index 1d013ae5a97c01db48aec1b1617320e25d19637f..578b4f81bf3f2062b047314949536d88981b1583 100644 (file)
@@ -2081,8 +2081,23 @@ void RGWZoneGroupMap::decode(bufferlist::const_iterator& bl) {
   }
 }
 
+static int conf_to_uint64(const JSONFormattable& config, const string& key, uint64_t *pval)
+{
+  string sval;
+  if (config.find(key, &sval)) {
+    string err;
+    uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
+    if (!err.empty()) {
+      return -EINVAL;
+    }
+    *pval = val;
+  }
+  return 0;
+}
+
 int RGWZoneGroupPlacementTier::update_params(const JSONFormattable& config)
 {
+  int r = -1;
 
   if (config.exists("endpoint")) {
     endpoint = config["endpoint"];
@@ -2108,6 +2123,21 @@ int RGWZoneGroupPlacementTier::update_params(const JSONFormattable& config)
   if (config.exists("secret")) {
     key.key = config["secret"];
   }
+
+  if (config.exists("multipart_sync_threshold")) {
+    r = conf_to_uint64(config, "multipart_sync_threshold", &multipart_sync_threshold);
+    if (r < 0) {
+      multipart_sync_threshold = DEFAULT_MULTIPART_SYNC_PART_SIZE;
+    }
+  }
+
+  if (config.exists("multipart_min_part_size")) {
+    r = conf_to_uint64(config, "multipart_min_part_size", &multipart_min_part_size);
+    if (r < 0) {
+      multipart_min_part_size = DEFAULT_MULTIPART_SYNC_PART_SIZE;
+    }
+  }
+
   if (config.exists("acls")) {
     const JSONFormattable& cc = config["acls"];
     if (cc.is_array()) {
@@ -2149,6 +2179,12 @@ int RGWZoneGroupPlacementTier::clear_params(const JSONFormattable& config)
   if (config.exists("secret")) {
     key.key.clear();
   }
+  if (config.exists("multipart_sync_threshold")) {
+    multipart_sync_threshold = DEFAULT_MULTIPART_SYNC_PART_SIZE;
+  }
+  if (config.exists("multipart_min_part_size")) {
+    multipart_min_part_size = DEFAULT_MULTIPART_SYNC_PART_SIZE;
+  }
   if (config.exists("acls")) {
     const JSONFormattable& cc = config["acls"];
     if (cc.is_array()) {
index c145ca1b8423b0a38de3d71fc6dcd21feccded7f..886e63ed93b7bf736faeb2e27c5c44eddc28cc14 100644 (file)
@@ -748,6 +748,7 @@ struct RGWTierACLMapping {
 WRITE_CLASS_ENCODER(RGWTierACLMapping)
 
 struct RGWZoneGroupPlacementTier {
+#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
   std::string storage_class;
   std::string tier_type;
   std::string endpoint;
@@ -759,6 +760,9 @@ struct RGWZoneGroupPlacementTier {
   string target_path;
   map<string, RGWTierACLMapping> acl_mappings;
 
+  uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+  uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+
   int update_params(const JSONFormattable& config);
   int clear_params(const JSONFormattable& config);
 
@@ -773,6 +777,8 @@ struct RGWZoneGroupPlacementTier {
     encode(tier_storage_class, bl);
     encode(target_path, bl);
     encode(acl_mappings, bl);
+    encode(multipart_sync_threshold, bl);
+    encode(multipart_min_part_size, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -792,6 +798,8 @@ struct RGWZoneGroupPlacementTier {
     decode(tier_storage_class, bl);
     decode(target_path, bl);
     decode(acl_mappings, bl);
+    decode(multipart_sync_threshold, bl);
+    decode(multipart_min_part_size, bl);
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;