]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/CloudTransition: Replace Coroutines with RGWRestConn APIs 35100/head
authorSoumya Koduri <skoduri@redhat.com>
Wed, 23 Jun 2021 18:00:11 +0000 (23:30 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Thu, 18 Nov 2021 07:22:48 +0000 (12:52 +0530)
To avoid the overhead of using coroutines during lifecycle transition,
RGWRESTStream* APIs are used to transition objects to remote cloud.

Also handled few optimizations and cleanup stated below:
* Store the list of cloud target buckets as part of LCWorker instead
  of making it global. This list is maintained for the duration of
  RGWLC::process(), post which discarded.
* Refactor code to remove coroutine based class definitions which are no
  longer needed and use direct function calls instead.
* Check for cloud transitioned objects using tier-type and return error if
  accessed in RGWGetObj, RGWCopyObj and RGWPutObj ops.

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
12 files changed:
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_lc.cc
src/rgw/rgw_lc.h
src/rgw/rgw_lc_tier.cc
src/rgw/rgw_lc_tier.h
src/rgw/rgw_obj_manifest.h
src/rgw/rgw_op.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_zone.h

index c270067b2b3ebe91f62a1c42739f4cb8514ab54f..0bd169f99e7d36b9c6e32c72f669d5046c40ce10 100644 (file)
@@ -349,99 +349,3 @@ int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) {
   return 0;
 }
 
-RGWStreamReadCRF::RGWStreamReadCRF(std::unique_ptr<rgw::sal::Object>* obj,
-             RGWObjectCtx& obj_ctx) : read_op((*obj)->get_read_op(&obj_ctx)) {}
-RGWStreamReadCRF::~RGWStreamReadCRF() {}
-
-RGWStreamWriteCR::RGWStreamWriteCR(CephContext* _cct, RGWHTTPManager* _mgr,
-                           shared_ptr<RGWStreamReadCRF>& _in_crf,
-                           shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
-                           in_crf(_in_crf), out_crf(_out_crf) {}
-RGWStreamWriteCR::~RGWStreamWriteCR() { }
-
-int RGWStreamWriteCR::operate(const DoutPrefixProvider* dpp) {
-  reenter(this) {
-    ret = in_crf->init();
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl;
-      return set_cr_error(ret);
-    }
-    in_crf->get_range(ofs, end);
-    rest_obj = in_crf->get_rest_obj();
-
-    do {
-      bl.clear();
-      yield {
-        ret = in_crf->read(ofs, end, bl);
-        if (ret < 0) {
-          ldout(cct, 0) << "ERROR: fail to read object data, ret = " << ret << dendl;
-          return set_cr_error(ret);
-        }
-      }
-      if (retcode < 0) {
-        ldout(cct, 20) << __func__ << ": read_op.read() retcode=" << retcode << dendl;
-        return set_cr_error(ret);
-      }
-
-      read_len = bl.length();
-      if (bl.length() == 0) {
-        break;
-      } 
-
-      if (!sent_attrs) {
-        ret = out_crf->init();
-        if (ret < 0) {
-          ldout(cct, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl;
-          return set_cr_error(ret);
-        }
-              
-        out_crf->send_ready(dpp, rest_obj);
-        ret = out_crf->send();
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
-        sent_attrs = true;
-      }
-
-      total_read += bl.length();
-
-      do {
-        yield {
-          ret = out_crf->write(bl, &need_retry);
-          if (ret < 0)  {
-            return set_cr_error(ret);
-          }
-        }
-
-        if (retcode < 0) {
-          ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
-          return set_cr_error(ret);
-        }
-      } while (need_retry);
-
-      ofs += read_len;
-
-    } while (ofs <= end);
-
-    do {
-      /* Ensure out_crf is initialized */
-      if (!sent_attrs) {
-        break;
-      }
-
-      /* This has to be under yield. Otherwise sometimes this loop
-       * never finishes, infinitely waiting for req to be done.
-       */
-      yield {
-        int ret = out_crf->drain_writes(&need_retry);
-        if (ret < 0) {
-          return set_cr_error(ret);
-        }
-      }
-    } while (need_retry);
-
-    return set_cr_done();
-  }
-  return 0;
-}
index e09c554f7aafed7e5aec10169fa9402d7c17432c..cb103aeb83455637379a83810cee87f498a3fd79 100644 (file)
@@ -9,7 +9,7 @@
 
 #include "rgw_coroutine.h"
 #include "rgw_rest_conn.h"
-#include "rgw_sal.h"
+
 
 struct rgw_rest_obj {
   rgw_obj_key key;
@@ -588,64 +588,3 @@ public:
 
   int operate(const DoutPrefixProvider *dpp) override;
 };
-
-class RGWStreamReadCRF {
-public:
-  std::unique_ptr<rgw::sal::Object::ReadOp> read_op;
-  off_t ofs;
-  off_t end;
-  rgw_rest_obj rest_obj;
-  std::unique_ptr<rgw::sal::Object>* obj;
-
-  RGWStreamReadCRF(std::unique_ptr<rgw::sal::Object>* obj, RGWObjectCtx& obj_ctx);
-  virtual ~RGWStreamReadCRF();
-
-  virtual int init() {return 0; }
-  virtual int init_rest_obj() {return 0;}
-
-  int set_range(off_t _ofs, off_t _end) {
-    ofs = _ofs;
-    end = _end;
-
-    return 0;
-  }
-
-  int get_range(off_t &_ofs, off_t &_end) {
-    _ofs = ofs;
-    _end = end;
-
-    return 0;
-  }
-
-  rgw_rest_obj get_rest_obj() {
-    return rest_obj;
-  }
-
-  virtual int read(off_t ofs, off_t end, bufferlist &bl) {return 0;};
-
-};
-
-class RGWStreamWriteCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  string url;
-  std::shared_ptr<RGWStreamReadCRF> in_crf;
-  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-  bufferlist bl;
-  bool need_retry{false};
-  bool sent_attrs{false};
-  uint64_t total_read{0};
-  int ret{0};
-  off_t ofs;
-  off_t end;
-  uint64_t read_len = 0;
-  rgw_rest_obj rest_obj;
-
-public:
-  RGWStreamWriteCR(CephContext *_cct, RGWHTTPManager *_mgr,
-                    std::shared_ptr<RGWStreamReadCRF>& _in_crf,
-                    std::shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf);
-  ~RGWStreamWriteCR();
-
-  int operate(const DoutPrefixProvider *dpp) override;
-};
index c9eb675f8c5121fe68addd16323ff23c07b31d27..fc7e2455ed92d47ec6614e260f07a38b092de4b0 100644 (file)
@@ -223,6 +223,7 @@ void *RGWLC::LCWorker::entry() {
                          << r << dendl;
       }
       ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
+      cloud_targets.clear(); // clear cloud targets
     }
     if (lc->going_down())
       break;
@@ -684,18 +685,8 @@ public:
   static constexpr uint32_t FLAG_EWAIT_SYNC =  0x0001;
   static constexpr uint32_t FLAG_DWAIT_SYNC =  0x0002;
   static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
-  static constexpr uint32_t FLAG_HTTP_MGR = 0x0008;
 
 private:
-  class C_WorkQTimerCtx: public Context {
-    WorkQ* wq;
-    public:
-      C_WorkQTimerCtx(WorkQ* _wq): wq(_wq) {}
-      void finish(int r) override {
-        wq->stop_http_manager();
-      }
-  };
-
   const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
   RGWLC::LCWorker* wk;
   uint32_t qmax;
@@ -705,28 +696,14 @@ private:
   uint32_t flags;
   vector<WorkItem> items;
   work_f f;
-  std::unique_ptr<RGWCoroutinesManager> crs;
-  std::unique_ptr<RGWHTTPManager> http_manager;
-  bool is_http_mgr_started{false};
-  ceph::mutex timer_mtx;
-  SafeTimer timer;
-  int timer_wait_sec = 200; //seconds
-  C_WorkQTimerCtx* timer_ctx = nullptr;
 
 public:
   WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
-    : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf),
-      timer_mtx(ceph::make_mutex("WorkQTimerMutex")),
-      timer((CephContext*)(wk->cct), timer_mtx)
+    : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
     {
       create(thr_name().c_str());
-      timer.init();
     }
 
-  ~WorkQ() {
-    timer.shutdown();
-  }
-
   std::string thr_name() {
     return std::string{"wp_thrd: "}
     + std::to_string(wk->ix) + ", " + std::to_string(ix);
@@ -736,51 +713,6 @@ public:
     f = _f;
   }
 
-  RGWCoroutinesManager* get_crs() { return crs.get(); }
-  RGWHTTPManager* get_http_manager() { return http_manager.get(); }
-
-  int start_http_manager(rgw::sal::Store* store) {
-    int ret = 0;
-
-    if (is_http_mgr_started)
-      return 0;
-
-    /* http_mngr */
-    if(!crs) {
-      crs.reset(new RGWCoroutinesManager(store->ctx(), store->get_cr_registry()));
-    }
-    if (!http_manager) {
-      http_manager.reset(new RGWHTTPManager(store->ctx(), crs.get()->get_completion_mgr()));
-    }      
-
-    ret = http_manager->start();
-    if (ret < 0) {
-      dout(5) << "RGWLC:: http_manager->start() failed ret = "
-                 << ret << dendl;
-      return ret;
-    }
-
-    is_http_mgr_started = true;
-    flags |= FLAG_HTTP_MGR;
-
-    return ret;
-  }
-
-  int stop_http_manager() {
-    if (!is_http_mgr_started) {
-      return 0;
-    }
-
-    http_manager.reset();
-    crs.reset();
-  
-    is_http_mgr_started = false;
-       flags &= ~FLAG_HTTP_MGR;
-    timer.cancel_all_events();
-    timer_ctx = nullptr;
-    return 0;
-  }
-
   void enqueue(WorkItem&& item) {
     unique_lock uniq(mtx);
     while ((!wk->get_lc()->going_down()) &&
@@ -788,10 +720,6 @@ public:
       flags |= FLAG_EWAIT_SYNC;
       cv.wait_for(uniq, 200ms);
     }
-    if (timer_ctx && (flags & FLAG_HTTP_MGR)) {
-      timer.cancel_all_events();
-      timer_ctx = nullptr;
-    }
     items.push_back(item);
     if (flags & FLAG_DWAIT_SYNC) {
       flags &= ~FLAG_DWAIT_SYNC;
@@ -816,10 +744,6 @@ private:
       if (flags & FLAG_EDRAIN_SYNC) {
        flags &= ~FLAG_EDRAIN_SYNC;
       }
-      if ((flags & FLAG_HTTP_MGR) && !timer_ctx) {
-        timer_ctx = new C_WorkQTimerCtx(this);
-        timer.add_event_after(timer_wait_sec, timer_ctx);
-      }
       flags |= FLAG_DWAIT_SYNC;
       cv.wait_for(uniq, 200ms);
     }
@@ -1329,28 +1253,26 @@ public:
   }
 
   /* find out if the the storage class is remote cloud */
-  int get_tier_target(const RGWZoneGroup &zonegroup, rgw_placement_rule& rule,
-                      string& storage_class, RGWZoneGroupPlacementTier &tier) {
+  int get_tier_target(const RGWZoneGroup &zonegroup, const rgw_placement_rule& rule,
+                      RGWZoneGroupPlacementTier &tier) {
     std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
     titer = zonegroup.placement_targets.find(rule.name);
     if (titer == zonegroup.placement_targets.end()) {
       return -ENOENT;
     }
 
-    if (storage_class.empty()) {
-      storage_class = rule.storage_class;
-    }
-
     const auto& target_rule = titer->second;
     std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier;
-    ttier = target_rule.tier_targets.find(storage_class);
+    ttier = target_rule.tier_targets.find(rule.storage_class);
     if (ttier != target_rule.tier_targets.end()) {
       tier = ttier->second;
+    } else { // not found
+      return -ENOENT;
     }
     return 0;
   }
 
-  int delete_tier_obj(lc_op_ctx& oc, RGWLCCloudTierCtx& tier_ctx) {
+  int delete_tier_obj(lc_op_ctx& oc) {
     int ret = 0;
 
     /* If bucket is versioned, create delete_marker for current version
@@ -1387,28 +1309,28 @@ public:
     }
 
     attrs = oc.obj->get_attrs();
-    (*tier_ctx.obj)->set_atomic(&tier_ctx.rctx);
     
-    RGWObjState *s = tier_ctx.rctx.get_state((*tier_ctx.obj)->get_obj());
-    std::unique_ptr<rgw::sal::Object::WriteOp> obj_op(oc.obj->get_write_op(&oc.rctx));
-
-    obj_op->params.modify_tail = true;
-    obj_op->params.flags = PUT_OBJ_CREATE;
-    obj_op->params.category = RGWObjCategory::CloudTiered;
-    obj_op->params.delete_at = real_time();
+    rgw::sal::RadosStore *rados = static_cast<rgw::sal::RadosStore*>(oc.store);
+    RGWRados::Object op_target(rados->getRados(), oc.bucket->get_info(), oc.rctx, oc.obj->get_obj());
+    RGWRados::Object::Write obj_op(&op_target);
+
+    obj_op.meta.modify_tail = true;
+    obj_op.meta.flags = PUT_OBJ_CREATE;
+    obj_op.meta.category = RGWObjCategory::CloudTiered;
+    obj_op.meta.delete_at = real_time();
     bufferlist blo;
-    blo.append("");
-    obj_op->params.data = &blo;
-    obj_op->params.if_match = NULL;
-    obj_op->params.if_nomatch = NULL;
-    obj_op->params.user_data = NULL;
-    obj_op->params.zones_trace = NULL;
-    obj_op->params.delete_at = real_time();
-    obj_op->params.olh_epoch = tier_ctx.o.versioned_epoch;
+    obj_op.meta.data = &blo;
+    obj_op.meta.if_match = NULL;
+    obj_op.meta.if_nomatch = NULL;
+    obj_op.meta.user_data = NULL;
+    obj_op.meta.zones_trace = NULL;
+    obj_op.meta.delete_at = real_time();
+    obj_op.meta.olh_epoch = tier_ctx.o.versioned_epoch;
     
     RGWObjManifest *pmanifest; 
+    RGWObjManifest manifest;
 
-    pmanifest = &(*s->manifest);
+    pmanifest = &manifest;
     RGWObjTier tier_config;
     tier_config.name = oc.tier.storage_class;
     tier_config.tier_placement = oc.tier;
@@ -1421,19 +1343,13 @@ public:
     rgw_placement_rule target_placement;
     target_placement.inherit_from(tier_ctx.bucket_info.placement_rule);
     target_placement.storage_class = oc.tier.storage_class;
-    pmanifest->set_head(target_placement, (*tier_ctx.obj)->get_obj(), 0);
+    pmanifest->set_head(target_placement, tier_ctx.obj->get_obj(), 0);
 
-    pmanifest->set_tail_placement(target_placement, (*tier_ctx.obj)->get_obj().bucket);
+    pmanifest->set_tail_placement(target_placement, tier_ctx.obj->get_obj().bucket);
 
-    /* should the obj_size also be set to '0' or is it needed
-     * to keep track of original size before transition. 
-     * But unless obj_size is set to '0', obj_iters cannot
-     * be reset I guess. For regular transitioned objects
-     * obj_size remains the same even when object is moved to other
-     * storage class. So maybe better to keep it the same way.
-     */
+    pmanifest->set_obj_size(0);
 
-    obj_op->params.manifest = pmanifest;
+    obj_op.meta.manifest = pmanifest;
 
     /* update storage class */
     bufferlist bl;
@@ -1443,15 +1359,7 @@ public:
     attrs.erase(RGW_ATTR_ID_TAG);
     attrs.erase(RGW_ATTR_TAIL_TAG);
 
-    obj_op->params.attrs = &attrs;
-
-    r = obj_op->prepare(null_yield);
-    if (r < 0) {
-      return r;
-    }
-
-
-    r = obj_op->write_meta(oc.dpp, tier_ctx.o.meta.size, 0, null_yield);
+    r = obj_op.write_meta(oc.dpp, 0, 0, attrs, null_yield);
     if (r < 0) {
       return r;
     }
@@ -1460,8 +1368,6 @@ public:
   }
 
   int transition_obj_to_cloud(lc_op_ctx& oc) {
-    std::shared_ptr<RGWRESTConn> conn;
-
     /* init */
     string id = "cloudid";
     string endpoint = oc.tier.t.s3.endpoint;
@@ -1482,65 +1388,48 @@ public:
       boost::algorithm::to_lower(bucket_name);
     }
 
-    conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, region, host_style));
-
-    int ret = oc.wq->start_http_manager(oc.store);
-    if (ret < 0) {
-      ldpp_dout(oc.dpp, 0) << "failed in start_http_manager() ret=" << ret << dendl;
-      return ret;
-    }
-
-    RGWCoroutinesManager* crs = oc.wq->get_crs();
-    RGWHTTPManager* http_manager = oc.wq->get_http_manager();
-
-    if (!crs || !http_manager) {
-      /* maybe race..return and retry */
-      ldpp_dout(oc.dpp, 0) << " http_manager and crs not initialized" << dendl;
-      return -1;
-    }
+    /* Create RGW REST connection */
+    S3RESTConn conn(oc.cct, oc.store, id, { endpoint }, key, region, host_style);
 
     RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(),
-                        &oc.obj, oc.rctx, conn, bucket_name,
-                        oc.tier.t.s3.target_storage_class, http_manager);
+                        oc.obj.get(), oc.rctx, conn, bucket_name,
+                        oc.tier.t.s3.target_storage_class);
     tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings;
     tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size;
     tier_ctx.multipart_sync_threshold = oc.tier.t.s3.multipart_sync_threshold;
     tier_ctx.storage_class = oc.tier.storage_class;
 
-    bool al_tiered = false;
+    // check if target_path is already created
+    std::set<std::string>& cloud_targets = oc.env.worker->get_cloud_targets();
+    std::pair<std::set<std::string>::iterator, bool> it;
 
-    /* Since multiple zones may try to transition the same object to the cloud,
-     * verify if the object is already transitioned. And since its just a best
-     * effort, do not bail out in case of any errors.
-     */
-    ret = crs->run(oc.dpp, new RGWLCCloudCheckCR(tier_ctx, &al_tiered));
-    
-    if (ret < 0) {
-      ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudCheckCR() ret=" << ret << dendl;
-    }
+    it = cloud_targets.insert(bucket_name);
+    tier_ctx.target_bucket_created = !(it.second);
+
+    ldpp_dout(oc.dpp, 0) << "Transitioning object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ")" << dendl;
+
+    /* Transition object to cloud end point */
+    int ret = rgw_cloud_tier_transfer_object(tier_ctx);
 
-    if (al_tiered) {
-      ldout(tier_ctx.cct, 20) << "Object (" << oc.o.key << ") is already tiered" << dendl;
-      return 0;
-    } else {
-         ret = crs->run(oc.dpp, new RGWLCCloudTierCR(tier_ctx));
-    }
-         
     if (ret < 0) {
-      ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudTierCR() ret=" << ret << dendl;
+      ldpp_dout(oc.dpp, 0) << "ERROR: failed to transfer object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ") ret=" << ret << dendl;
       return ret;
+
+      if (!tier_ctx.target_bucket_created) {
+        cloud_targets.erase(it.first);
+      }
     }
 
     if (delete_object) {
-      ret = delete_tier_obj(oc, tier_ctx);
+      ret = delete_tier_obj(oc);
       if (ret < 0) {
-        ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object failed ret=" << ret << dendl;
+        ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
         return ret;
       }
     } else {
       ret = update_tier_obj(oc, tier_ctx);
       if (ret < 0) {
-        ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object failed ret=" << ret << dendl;
+        ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
         return ret;
       }
     }
@@ -1551,6 +1440,13 @@ public:
   int process(lc_op_ctx& oc) {
     auto& o = oc.o;
     int r;
+
+    if (oc.o.meta.category == RGWObjCategory::CloudTiered) {
+      /* Skip objects which are already cloud tiered. */
+      ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl;
+      return 0;
+    }
+
     std::string tier_type = ""; 
     const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
 
@@ -1558,16 +1454,10 @@ public:
     target_placement.inherit_from(oc.bucket->get_placement_rule());
     target_placement.storage_class = transition.storage_class;
 
-    r = get_tier_target(zonegroup, target_placement, target_placement.storage_class, oc.tier);
+    r = get_tier_target(zonegroup, target_placement, oc.tier);
 
     if (!r && oc.tier.tier_type == "cloud-s3") {
-      ldpp_dout(oc.dpp, 20) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
-      if (oc.o.meta.category == RGWObjCategory::CloudTiered) {
-        /* Skip objects which are already cloud tiered. */
-        ldpp_dout(oc.dpp, 20) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl;
-        return 0;
-      }
-
+      ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
       if (!oc.o.is_current() &&
           !pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, oc.dpp)) {
         /* Skip objects which has object lock enabled. */
index d66ad3a504a0021a8c46002ac01fba92f4dcedb2..ac1612f46aabef9ae6fe2ad3de8560d8516ee474 100644 (file)
@@ -480,6 +480,10 @@ public:
     std::mutex lock;
     std::condition_variable cond;
     WorkPool* workpool{nullptr};
+    /* save the target bucket names created as part of object transition
+     * to cloud. This list is maintained for the duration of each RGWLC::process()
+     * post which it is discarded. */
+    std::set<std::string> cloud_targets;
 
   public:
 
@@ -493,6 +497,7 @@ public:
     void stop();
     bool should_work(utime_t& now);
     int schedule_next_start_time(utime_t& start, utime_t& now);
+    std::set<std::string>& get_cloud_targets() { return cloud_targets; }
     ~LCWorker();
 
     friend class RGWRados;
index d7d62cbf6a6214298467ee5f064219e4bd14cf39..704b7dee2e2714fcc4d5d045c937f3eee2ace011 100644 (file)
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 
-#include <boost/asio/yield.hpp>
-
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
 
+struct rgw_lc_multipart_part_info {
+  int part_num{0};
+  uint64_t ofs{0};
+  uint64_t size{0};
+  std::string etag;
+};
+
+struct rgw_lc_obj_properties {
+  ceph::real_time mtime;
+  std::string etag;
+  uint64_t versioned_epoch{0};
+  std::map<std::string, RGWTierACLMapping>& target_acl_mappings;
+  std::string target_storage_class;
+
+  rgw_lc_obj_properties(ceph::real_time _mtime, std::string _etag,
+      uint64_t _versioned_epoch, std::map<std::string,
+      RGWTierACLMapping>& _t_acl_mappings,
+      std::string _t_storage_class) :
+    mtime(_mtime), etag(_etag),
+    versioned_epoch(_versioned_epoch),
+    target_acl_mappings(_t_acl_mappings),
+    target_storage_class(_t_storage_class) {}
+};
+
+struct rgw_lc_multipart_upload_info {
+  std::string upload_id;
+  uint64_t obj_size;
+  ceph::real_time mtime;
+  std::string etag;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(upload_id, bl);
+    encode(obj_size, bl);
+    encode(mtime, bl);
+    encode(etag, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(upload_id, bl);
+    decode(obj_size, bl);
+    decode(mtime, bl);
+    decode(etag, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info)
+
 static inline string get_key_instance(const rgw_obj_key& key)
 {
   if (!key.instance.empty() &&
@@ -51,10 +99,92 @@ static inline string obj_to_aws_path(const rgw_obj& obj)
   return path;
 }
 
+static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
+    const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+{
+  int ret = 0;
+  rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
+
+  if (!rados) {
+    ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+    return -1;
+  }
+
+  auto& pool = status_obj->pool;
+  const auto oid = status_obj->oid;
+  auto obj_ctx = rados->svc()->sysobj->init_obj_ctx();
+  bufferlist bl;
+
+  ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, nullptr, nullptr,
+      null_yield, dpp);
+
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (bl.length() > 0) {
+    try {
+      auto p = bl.cbegin();
+      status->decode(p);
+    } catch (buffer::error& e) {
+      ldpp_dout(dpp, 10) << "failed to decode status obj: "
+        << e.what() << dendl;
+      return -EIO;
+    }
+  } else {
+    return -EIO;
+  }
+
+  return 0;
+}
+
+static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
+    const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+{
+  int ret = 0;
+  rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
+
+  if (!rados) {
+    ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+    return -1;
+  }
+
+  auto& pool = status_obj->pool;
+  const auto oid = status_obj->oid;
+  auto obj_ctx = rados->svc()->sysobj->init_obj_ctx();
+  bufferlist bl;
+  status->encode(bl);
+
+  ret = rgw_put_system_obj(dpp, obj_ctx, pool, oid, bl, true, nullptr,
+      real_time{}, null_yield);
+
+  return ret;
+}
+
+static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
+    const rgw_raw_obj *status_obj)
+{
+  int ret = 0;
+  rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
+
+  if (!rados) {
+    ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+    return -1;
+  }
+
+  auto& pool = status_obj->pool;
+  const auto oid = status_obj->oid;
+  auto sysobj = rados->svc()->sysobj;
+
+  ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield);
+
+  return ret;
+}
+
 static std::set<string> keep_headers = { "CONTENT_TYPE",
-  "CONTENT_ENCODING",
-  "CONTENT_DISPOSITION",
-  "CONTENT_LANGUAGE" };
+                                         "CONTENT_ENCODING",
+                                         "CONTENT_DISPOSITION",
+                                         "CONTENT_LANGUAGE" };
 
 /*
  * mapping between rgw object attrs and output http fields
@@ -75,7 +205,7 @@ static std::set<string> keep_headers = { "CONTENT_TYPE",
 }; */
 
 static void init_headers(map<string, bufferlist>& attrs,
-                         map<string, string>& headers)
+    map<string, string>& headers)
 {
   for (auto& kv : attrs) {
     const char * name = kv.first.c_str();
@@ -83,14 +213,11 @@ static void init_headers(map<string, bufferlist>& attrs,
 
     if (aiter != std::end(rgw_to_http_attrs)) {
       headers[aiter->second] = rgw_bl_str(kv.second);
-    } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) {
-      // this attr has an extra length prefix from encode() in prior versions
-      headers["X-Object-Meta-Static-Large-Object"] = "True";
     } else if (strncmp(name, RGW_ATTR_META_PREFIX,
           sizeof(RGW_ATTR_META_PREFIX)-1) == 0) {
       name += sizeof(RGW_ATTR_META_PREFIX) - 1;
       string sname(name);
-      string name_prefix = "X-Object-Meta-";
+      string name_prefix = RGW_ATTR_META_PREFIX;
       char full_name_buf[name_prefix.size() + sname.size() + 1];
       snprintf(full_name_buf, sizeof(full_name_buf), "%.*s%.*s",
           static_cast<int>(name_prefix.length()),
@@ -99,71 +226,104 @@ static void init_headers(map<string, bufferlist>& attrs,
           sname.data());
       headers[full_name_buf] = rgw_bl_str(kv.second);
     } else if (strcmp(name,RGW_ATTR_CONTENT_TYPE) == 0) {
-      /* Verify if its right way to copy this field */
       headers["CONTENT_TYPE"] = rgw_bl_str(kv.second);
     }
   }
 }
 
-int RGWLCStreamGetCRF::init(const DoutPrefixProvider *dpp)  {
-    /* init input connection */
-    req_params.get_op = false; /* Need only headers */
-    req_params.prepend_metadata = true;
-    req_params.rgwx_stat = true;
-    req_params.sync_manifest = true;
-    req_params.skip_decrypt = true;
+/* Read object or just head from remote endpoint. For now initializes only headers,
+ * but can be extended to fetch etag, mtime etc if needed.
+ */
+static int cloud_tier_get_object(RGWLCCloudTierCtx& tier_ctx, bool head,
+                         std::map<std::string, std::string>& headers) {
+  RGWRESTConn::get_obj_params req_params;
+  RGWBucketInfo b;
+  std::string target_obj_name;
+  int ret = 0;
+  std::unique_ptr<rgw::sal::Bucket> dest_bucket;
+  std::unique_ptr<rgw::sal::Object> dest_obj;
+  rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+        tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+        tier_ctx.target_storage_class);
+  std::string etag;
+  RGWRESTStreamRWRequest *in_req;
+
+  b.bucket.name = tier_ctx.target_bucket_name;
+  target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+                    tier_ctx.obj->get_name();
+  if (!tier_ctx.o.is_current()) {
+    target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+  }
 
-    string etag;
-    real_time set_mtime;
+  ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << ret << dendl;
+    return ret;
+  }
 
-    int ret = conn->get_obj(dpp, dest_obj, req_params, true /* send */, &in_req);
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
-      return ret;
-    }
+  dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+  if (!dest_obj) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+    return -1;
+  }
+  /* init input connection */
+  req_params.get_op = !head;
+  req_params.prepend_metadata = true;
+  req_params.rgwx_stat = true;
+  req_params.sync_manifest = true;
+  req_params.skip_decrypt = true;
+
+  ret = tier_ctx.conn.get_obj(tier_ctx.dpp, dest_obj.get(), req_params, true /* send */, &in_req);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: " << __func__ << "(): conn.get_obj() returned ret=" << ret << dendl;
+    return ret;
+  }
 
-    /* fetch only headers */
-    ret = conn->complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
-    if (ret < 0 && ret != -ENOENT) {
-      ldout(cct, 20) << "ERROR: " << __func__ << "(): conn->complete_request() returned ret=" << ret << dendl;
-      return ret;
-    }
-    return 0;
+  /* fetch headers */
+  ret = tier_ctx.conn.complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
+  if (ret < 0 && ret != -ENOENT) {
+    ldpp_dout(tier_ctx.dpp, 20) << "ERROR: " << __func__ << "(): conn.complete_request() returned ret=" << ret << dendl;
+    return ret;
   }
+  return 0;
+}
 
-int RGWLCStreamGetCRF::is_already_tiered() {
-    char buf[32];
-    map<string, string> attrs = headers;
+static bool is_already_tiered(const DoutPrefixProvider *dpp,
+                             std::map<std::string, std::string>& headers,
+                             ceph::real_time& mtime) {
+  char buf[32];
+  map<string, string> attrs = headers;
 
-    for (const auto& a : attrs) {
-      ldout(cct, 20) << "GetCrf attr[" << a.first << "] = " << a.second <<dendl;
-    }
-    utime_t ut(obj_properties.mtime);
-    snprintf(buf, sizeof(buf), "%lld.%09lld",
-        (long long)ut.sec(),
-        (long long)ut.nsec());
+  for (const auto& a : attrs) {
+    ldpp_dout(dpp, 20) << "GetCrf attr[" << a.first << "] = " << a.second <<dendl;
+  }
+  utime_t ut(mtime);
+  snprintf(buf, sizeof(buf), "%lld.%09lld",
+      (long long)ut.sec(),
+      (long long)ut.nsec());
 
-    string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
+  string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
 
-    if (s.empty())
-      s = attrs["x_amz_meta_rgwx_source_mtime"];
+  if (s.empty())
+    s = attrs["x_amz_meta_rgwx_source_mtime"];
 
-    ldout(cct, 20) << "is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
-    ldout(cct, 20) << "is_already_tiered mtime buf = " << buf <<dendl;
+  ldpp_dout(dpp, 20) << "is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
+  ldpp_dout(dpp, 20) << "is_already_tiered mtime buf = " << buf <<dendl;
 
-    if (!s.empty() && !strcmp(s.c_str(), buf)){
-      return 1;
-    }
-    return 0;
+  if (!s.empty() && !strcmp(s.c_str(), buf)){
+    return 1;
   }
+  return 0;
+}
 
-class RGWLCStreamReadCRF : public RGWStreamReadCRF
+/* Read object locally & also initialize dest rest obj based on read attrs */
+class RGWLCStreamRead
 {
   CephContext *cct;
   const DoutPrefixProvider *dpp;
-  map<string, bufferlist> attrs;
+  std::map<std::string, bufferlist> attrs;
   uint64_t obj_size;
-  std::unique_ptr<rgw::sal::Object>* obj;
+  rgw::sal::Object *obj;
   const real_time &mtime;
 
   bool multipart;
@@ -171,526 +331,581 @@ class RGWLCStreamReadCRF : public RGWStreamReadCRF
   off_t m_part_off;
   off_t m_part_end;
 
-  public:
-  RGWLCStreamReadCRF(CephContext *_cct, const DoutPrefixProvider *_dpp,
-                     RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object>* _obj,
-                     const real_time &_mtime) :
-                     RGWStreamReadCRF(_obj, obj_ctx), cct(_cct),
-                     dpp(_dpp), obj(_obj), mtime(_mtime) {}
+  std::unique_ptr<rgw::sal::Object::ReadOp> read_op;
+  off_t ofs;
+  off_t end;
+  rgw_rest_obj rest_obj;
 
-  ~RGWLCStreamReadCRF() {};
+  int retcode;
 
-  void set_multipart(uint64_t part_size, off_t part_off, off_t part_end) {
-    multipart = true;
-    m_part_size = part_size;
-    m_part_off = part_off;
-    m_part_end = part_end;
-  }
+  public:
+  RGWLCStreamRead(CephContext *_cct, const DoutPrefixProvider *_dpp,
+      RGWObjectCtx& obj_ctx, rgw::sal::Object *_obj,
+      const real_time &_mtime) :
+    cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime),
+    read_op(obj->get_read_op(&obj_ctx)) {}
+
+  ~RGWLCStreamRead() {};
+  int set_range(off_t _ofs, off_t _end);
+  int get_range(off_t &_ofs, off_t &_end);
+  rgw_rest_obj& get_rest_obj();
+  void set_multipart(uint64_t part_size, off_t part_off, off_t part_end);
+  int init();
+  int init_rest_obj();
+  int read(off_t ofs, off_t end, RGWGetDataCB *out_cb);
+};
 
-  int init() override {
-    optional_yield y = null_yield;
-    real_time read_mtime;
+/* Send PUT op to remote endpoint */
+class RGWLCCloudStreamPut
+{
+  const DoutPrefixProvider *dpp;
+  rgw_lc_obj_properties obj_properties;
+  RGWRESTConn& conn;
+  rgw::sal::Object *dest_obj;
+  std::string etag;
+  RGWRESTStreamS3PutObj *out_req{nullptr};
 
-    read_op->params.lastmod = &read_mtime;
+  struct multipart_info {
+    bool is_multipart{false};
+    std::string upload_id;
+    int part_num{0};
+    uint64_t part_size;
+  } multipart;
 
-    int ret = read_op->prepare(y, dpp);
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
-      return ret;
-    }
+  int retcode;
 
-    if (read_mtime != mtime) {
-      /* raced */
-      return -ECANCELED;
+  public:
+  RGWLCCloudStreamPut(const DoutPrefixProvider *_dpp,
+      const rgw_lc_obj_properties&  _obj_properties,
+      RGWRESTConn& _conn,
+      rgw::sal::Object *_dest_obj) :
+    dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
     }
+  int init();
+  static bool keep_attr(const std::string& h);
+  static void init_send_attrs(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj,
+      const rgw_lc_obj_properties& obj_properties,
+      std::map<std::string, std::string>& attrs);
+  void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj);
+  void handle_headers(const std::map<std::string, std::string>& headers);
+  bool get_etag(std::string *petag);
+  void set_multipart(const std::string& upload_id, int part_num, uint64_t part_size);
+  int send();
+  RGWGetDataCB *get_cb();
+  int complete_request();
+};
 
-    attrs = (*obj)->get_attrs();
-    obj_size = (*obj)->get_obj_size();
+int RGWLCStreamRead::set_range(off_t _ofs, off_t _end) {
+  ofs = _ofs;
+  end = _end;
 
-    ret = init_rest_obj();
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
-      return ret;
-    }
+  return 0;
+}
 
-    if (!multipart) {
-      set_range(0, obj_size - 1);
-    } else {
-      set_range(m_part_off, m_part_end);
-    }
-    return 0;
-  }
+int RGWLCStreamRead::get_range(off_t &_ofs, off_t &_end) {
+  _ofs = ofs;
+  _end = end;
 
-  int init_rest_obj() override {
-    /* Initialize rgw_rest_obj. 
-     * Reference: do_decode_rest_obj
-     * Check how to copy headers content */ 
-    rest_obj.init((*obj)->get_key());
+  return 0;
+}
 
-    if (!multipart) {
-      rest_obj.content_len = obj_size;
-    } else {
-      rest_obj.content_len = m_part_size;
-    }
+rgw_rest_obj& RGWLCStreamRead::get_rest_obj() {
+  return rest_obj;
+}
 
-    /* For mulitpart attrs are sent as part of InitMultipartCR itself */
-    if (multipart) {
-      return 0;
-    }
+void RGWLCStreamRead::set_multipart(uint64_t part_size, off_t part_off, off_t part_end) {
+  multipart = true;
+  m_part_size = part_size;
+  m_part_off = part_off;
+  m_part_end = part_end;
+}
 
-    /*
-     * XXX: verify if its right way to copy attrs into
-     * rest obj
-     */
-    init_headers(attrs, rest_obj.attrs);
-
-    rest_obj.acls.set_ctx(cct);
-    const auto aiter = attrs.find(RGW_ATTR_ACL);
-    if (aiter != attrs.end()) {
-      bufferlist& bl = aiter->second;
-      auto bliter = bl.cbegin();
-      try {
-        rest_obj.acls.decode(bliter);
-      } catch (buffer::error& err) {
-        ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
-        return -EIO;
-      }
-    } else {
-      ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
-    }
-    return 0;
-  }
+int RGWLCStreamRead::init() {
+  optional_yield y = null_yield;
+  real_time read_mtime;
 
-  int read(off_t ofs, off_t end, bufferlist &bl) {
-    optional_yield y = null_yield;
+  read_op->params.lastmod = &read_mtime;
 
-    return read_op->read(ofs, end, bl, y, dpp);
+  int ret = read_op->prepare(y, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
+    return ret;
   }
-};
-
 
-class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
-{
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  rgw_lc_obj_properties obj_properties;
-  std::shared_ptr<RGWRESTConn> conn;
-  rgw::sal::Object* dest_obj;
-  string etag;
+  if (read_mtime != mtime) {
+    /* raced */
+    return -ECANCELED;
+  }
 
-  public:
-  RGWLCStreamPutCRF(CephContext *_cct,
-      RGWCoroutinesEnv *_env,
-      RGWCoroutine *_caller,
-      RGWHTTPManager *_http_manager,
-      const rgw_lc_obj_properties&  _obj_properties,
-      std::shared_ptr<RGWRESTConn> _conn,
-      rgw::sal::Object* _dest_obj) :
-    RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager),
-    cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
-    }
+  attrs = obj->get_attrs();
+  obj_size = obj->get_obj_size();
 
+  ret = init_rest_obj();
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
+    return ret;
+  }
 
-  int init() override {
-    /* init output connection */
-    RGWRESTStreamS3PutObj *out_req{nullptr};
+  if (!multipart) {
+    set_range(0, obj_size - 1);
+  } else {
+    set_range(m_part_off, m_part_end);
+  }
+  return 0;
+}
 
-    if (multipart.is_multipart) {
-      char buf[32];
-      snprintf(buf, sizeof(buf), "%d", multipart.part_num);
-      rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
-        { "partNumber", buf },
-        { nullptr, nullptr } };
-      conn->put_obj_send_init(dest_obj, params, &out_req);
-    } else {
-      conn->put_obj_send_init(dest_obj, nullptr, &out_req);
-    }
+int RGWLCStreamRead::init_rest_obj() {
+  /* Initialize rgw_rest_obj. 
+   * Reference: do_decode_rest_obj
+   * Check how to copy headers content */ 
+  rest_obj.init(obj->get_key());
 
-    set_req(out_req);
+  if (!multipart) {
+    rest_obj.content_len = obj_size;
+  } else {
+    rest_obj.content_len = m_part_size;
+  }
 
-    return RGWStreamWriteHTTPResourceCRF::init();
+  /* For mulitpart attrs are sent as part of InitMultipartCR itself */
+  if (multipart) {
+    return 0;
   }
 
-  static bool keep_attr(const string& h) {
-    return (keep_headers.find(h) != keep_headers.end() ||
-        boost::algorithm::starts_with(h, "X_AMZ_"));
+  /*
+   * XXX: verify if its right way to copy attrs into rest obj
+   */
+  init_headers(attrs, rest_obj.attrs);
+
+  rest_obj.acls.set_ctx(cct);
+  const auto aiter = attrs.find(RGW_ATTR_ACL);
+  if (aiter != attrs.end()) {
+    bufferlist& bl = aiter->second;
+    auto bliter = bl.cbegin();
+    try {
+      rest_obj.acls.decode(bliter);
+    } catch (buffer::error& err) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+      return -EIO;
+    }
+  } else {
+    ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
   }
+  return 0;
+}
 
-  static void init_send_attrs(CephContext *cct, const rgw_rest_obj& rest_obj,
-                              const rgw_lc_obj_properties& obj_properties,
-                              map<string, string> *attrs) {
+int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) {
+  int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield);
+  return ret;
+}
 
-    map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
-    string target_storage_class = obj_properties.target_storage_class;
+int RGWLCCloudStreamPut::init() {
+  /* init output connection */
+  if (multipart.is_multipart) {
+    char buf[32];
+    snprintf(buf, sizeof(buf), "%d", multipart.part_num);
+    rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
+                                     { "partNumber", buf },
+                                     { nullptr, nullptr } };
+    conn.put_obj_send_init(dest_obj, params, &out_req);
+  } else {
+    conn.put_obj_send_init(dest_obj, nullptr, &out_req);
+  }
 
-    attrs->clear();
+  return 0;
+}
 
-    for (auto& hi : rest_obj.attrs) {
-      if (keep_attr(hi.first)) {
-        attrs->insert(hi);
-      }
-    }
+bool RGWLCCloudStreamPut::keep_attr(const string& h) {
+  return (keep_headers.find(h) != keep_headers.end() ||
+      boost::algorithm::starts_with(h, "X_AMZ_"));
+}
 
-    const auto acl = rest_obj.acls.get_acl();
+void RGWLCCloudStreamPut::init_send_attrs(const DoutPrefixProvider *dpp,
+    const rgw_rest_obj& rest_obj,
+    const rgw_lc_obj_properties& obj_properties,
+    std::map<string, string>& attrs) {
 
-    map<int, vector<string> > access_map;
+  map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
+  const std::string& target_storage_class = obj_properties.target_storage_class;
 
-    if (!acl_mappings.empty()) {
-      for (auto& grant : acl.get_grant_map()) {
-        auto& orig_grantee = grant.first;
-        auto& perm = grant.second;
+  attrs.clear();
 
-        string grantee;
+  for (auto& hi : rest_obj.attrs) {
+    if (keep_attr(hi.first)) {
+      attrs.insert(hi);
+    }
+  }
 
-        const auto& am = acl_mappings;
+  const auto acl = rest_obj.acls.get_acl();
 
-        const auto iter = am.find(orig_grantee);
-        if (iter == am.end()) {
-          ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
-          continue;
-        }
+  map<int, vector<string> > access_map;
 
-        grantee = iter->second.dest_id;
-
-        string type;
-
-        switch (iter->second.type) {
-          case ACL_TYPE_CANON_USER:
-            type = "id";
-            break;
-          case ACL_TYPE_EMAIL_USER:
-            type = "emailAddress";
-            break;
-          case ACL_TYPE_GROUP:
-            type = "uri";
-            break;
-          default:
-            continue;
-        }
+  if (!acl_mappings.empty()) {
+    for (auto& grant : acl.get_grant_map()) {
+      auto& orig_grantee = grant.first;
+      auto& perm = grant.second;
 
-        string tv = type + "=" + grantee;
+      string grantee;
 
-        int flags = perm.get_permission().get_permissions();
-        if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
-          access_map[flags].push_back(tv);
-          continue;
-        }
+      const auto& am = acl_mappings;
 
-        for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
-          if (flags & i) {
-            access_map[i].push_back(tv);
-          }
-        }
+      const auto iter = am.find(orig_grantee);
+      if (iter == am.end()) {
+        ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+        continue;
       }
-    }
 
-    for (const auto& aiter : access_map) {
-      int grant_type = aiter.first;
+      grantee = iter->second.dest_id;
 
-      string header_str("x-amz-grant-");
+      string type;
 
-      switch (grant_type) {
-        case RGW_PERM_READ:
-          header_str.append("read");
-          break;
-        case RGW_PERM_WRITE:
-          header_str.append("write");
-          break;
-        case RGW_PERM_READ_ACP:
-          header_str.append("read-acp");
+      switch (iter->second.type) {
+        case ACL_TYPE_CANON_USER:
+          type = "id";
           break;
-        case RGW_PERM_WRITE_ACP:
-          header_str.append("write-acp");
+        case ACL_TYPE_EMAIL_USER:
+          type = "emailAddress";
           break;
-        case RGW_PERM_FULL_CONTROL:
-          header_str.append("full-control");
+        case ACL_TYPE_GROUP:
+          type = "uri";
           break;
+        default:
+          continue;
       }
 
-      string s;
+      string tv = type + "=" + grantee;
 
-      for (const auto& viter : aiter.second) {
-        if (!s.empty()) {
-          s.append(", ");
-        }
-        s.append(viter);
+      int flags = perm.get_permission().get_permissions();
+      if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
+        access_map[flags].push_back(tv);
+        continue;
       }
 
-      ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
-
-      (*attrs)[header_str] = s;
+      for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
+        if (flags & i) {
+          access_map[i].push_back(tv);
+        }
+      }
     }
+  }
 
-    /* Copy target storage class */
-    if (!target_storage_class.empty()) {
-      (*attrs)["x-amz-storage-class"] = target_storage_class;
-    } else {
-      (*attrs)["x-amz-storage-class"] = "STANDARD";
+  for (const auto& aiter : access_map) {
+    int grant_type = aiter.first;
+
+    string header_str("x-amz-grant-");
+
+    switch (grant_type) {
+      case RGW_PERM_READ:
+        header_str.append("read");
+        break;
+      case RGW_PERM_WRITE:
+        header_str.append("write");
+        break;
+      case RGW_PERM_READ_ACP:
+        header_str.append("read-acp");
+        break;
+      case RGW_PERM_WRITE_ACP:
+        header_str.append("write-acp");
+        break;
+      case RGW_PERM_FULL_CONTROL:
+        header_str.append("full-control");
+        break;
     }
 
-    /* New attribute to specify its transitioned from RGW */
-    (*attrs)["x-amz-meta-rgwx-source"] = "rgw";
+    string s;
 
-    char buf[32];
-    snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
-    (*attrs)["x-amz-meta-rgwx-versioned-epoch"] = buf;
-
-    utime_t ut(obj_properties.mtime);
-    snprintf(buf, sizeof(buf), "%lld.%09lld",
-        (long long)ut.sec(),
-        (long long)ut.nsec());
-
-    (*attrs)["x-amz-meta-rgwx-source-mtime"] = buf;
-    (*attrs)["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
-    (*attrs)["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
-    if (!rest_obj.key.instance.empty()) {
-      (*attrs)["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
-    }
-    for (const auto& a : (*attrs)) {
-      ldout(cct, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <<dendl;
+    for (const auto& viter : aiter.second) {
+      if (!s.empty()) {
+        s.append(", ");
+      }
+      s.append(viter);
     }
+
+    ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+
+    attrs[header_str] = s;
   }
 
-  void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
-    RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
+  /* Copy target storage class */
+  if (!target_storage_class.empty()) {
+    attrs["x-amz-storage-class"] = target_storage_class;
+  } else {
+    attrs["x-amz-storage-class"] = "STANDARD";
+  }
 
-    map<string, string> new_attrs;
-    if (!multipart.is_multipart) {
-      init_send_attrs(cct, rest_obj, obj_properties, &new_attrs);
-    }
+  /* New attribute to specify its transitioned from RGW */
+  attrs["x-amz-meta-rgwx-source"] = "rgw";
 
-    r->set_send_length(rest_obj.content_len);
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
+  attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
 
-    RGWAccessControlPolicy policy;
+  utime_t ut(obj_properties.mtime);
+  snprintf(buf, sizeof(buf), "%lld.%09lld",
+      (long long)ut.sec(),
+      (long long)ut.nsec());
 
-    r->send_ready(dpp, conn->get_key(), new_attrs, policy);
+  attrs["x-amz-meta-rgwx-source-mtime"] = buf;
+  attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
+  attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
+  if (!rest_obj.key.instance.empty()) {
+    attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
+  }
+  for (const auto& a : attrs) {
+    ldpp_dout(dpp, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <<dendl;
   }
+}
 
-  void handle_headers(const map<string, string>& headers) {
-    for (const auto& h : headers) {
-      if (h.first == "ETAG") {
-        etag = h.second;
-      }
-    }
+void RGWLCCloudStreamPut::send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) {
+  auto r = static_cast<RGWRESTStreamS3PutObj *>(out_req);
+
+  std::map<std::string, std::string> new_attrs;
+  if (!multipart.is_multipart) {
+    init_send_attrs(dpp, rest_obj, obj_properties, new_attrs);
   }
 
-  bool get_etag(string *petag) {
-    if (etag.empty()) {
-      return false;
+  r->set_send_length(rest_obj.content_len);
+
+  RGWAccessControlPolicy policy;
+
+  r->send_ready(dpp, conn.get_key(), new_attrs, policy);
+}
+
+void RGWLCCloudStreamPut::handle_headers(const map<string, string>& headers) {
+  for (const auto& h : headers) {
+    if (h.first == "ETAG") {
+      etag = h.second;
     }
-    *petag = etag;
-    return true;
   }
-};
+}
 
+bool RGWLCCloudStreamPut::get_etag(string *petag) {
+  if (etag.empty()) {
+    return false;
+  }
+  *petag = etag;
+  return true;
+}
+
+void RGWLCCloudStreamPut::set_multipart(const string& upload_id, int part_num, uint64_t part_size) {
+  multipart.is_multipart = true;
+  multipart.upload_id = upload_id;
+  multipart.part_num = part_num;
+  multipart.part_size = part_size;
+}
 
+int RGWLCCloudStreamPut::send() {
+  int ret = RGWHTTP::send(out_req);
+  return ret;
+}
 
-class RGWLCStreamObjToCloudPlainCR : public RGWCoroutine {
-  RGWLCCloudTierCtx& tier_ctx;
+RGWGetDataCB *RGWLCCloudStreamPut::get_cb() {
+  return out_req->get_out_cb();
+}
 
-  std::shared_ptr<RGWStreamReadCRF> in_crf;
-  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+int RGWLCCloudStreamPut::complete_request() {
+  int ret = conn.complete_request(out_req, etag, &obj_properties.mtime, null_yield);
+  return ret;
+}
 
-  std::unique_ptr<rgw::sal::Bucket> dest_bucket;
-  std::unique_ptr<rgw::sal::Object> dest_obj;
+/* Read local copy and write to Cloud endpoint */
+static int cloud_tier_transfer_object(const DoutPrefixProvider* dpp,
+                            RGWLCStreamRead* readf, RGWLCCloudStreamPut* writef) {
+  std::string url;
+  bufferlist bl;
+  bool sent_attrs{false};
+  int ret{0};
+  off_t ofs;
+  off_t end;
 
-  rgw_lc_obj_properties obj_properties;
-  RGWBucketInfo b;
-  string target_obj_name;
+  ret = readf->init();
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl;
+    return ret;
+  }
+  readf->get_range(ofs, end);
+  rgw_rest_obj& rest_obj = readf->get_rest_obj();
+  if (!sent_attrs) {
+    ret = writef->init();
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl;
+      return ret;
+    }
 
-  rgw::sal::Object *o;
+    writef->send_ready(dpp, rest_obj);
+    ret = writef->send();
+    if (ret < 0) {
+      return ret;
+    }
+    sent_attrs = true;
+  }
 
-  public:
-  RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx)
-    : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
-          obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
-                         tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
-                         tier_ctx.target_storage_class){}
-
-  int operate(const DoutPrefixProvider *dpp) {
-
-    reenter(this) {
-      b.bucket.name = tier_ctx.target_bucket_name;
-      target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
-                        (*tier_ctx.obj)->get_name();
-      if (!tier_ctx.o.is_current()) {
-        target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
-      }
+  ret = readf->read(ofs, end, writef->get_cb());
 
-      retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl;
-        return retcode;
-      }
-  
-      dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
-      if (!dest_obj) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
-        return -1;
-      }
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: fail to read from in_crf, ret = " << ret << dendl;
+    return ret;
+  }
 
-      o = dest_obj.get();
+  ret = writef->complete_request();
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: fail to complete request, ret = " << ret << dendl;
+    return ret;
+  }
 
-    //  tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
+  return 0;
+}
 
-      /* Prepare Read from source */
-      in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, dpp,
-                   tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+static int cloud_tier_plain_transfer(RGWLCCloudTierCtx& tier_ctx) {
+  int ret;
+  std::unique_ptr<rgw::sal::Bucket> dest_bucket;
+  std::unique_ptr<rgw::sal::Object> dest_obj;
 
-      out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
-                    (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn, o));
+  rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+                        tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+                        tier_ctx.target_storage_class);
+  RGWBucketInfo b;
+  std::string target_obj_name;
 
-      /* actual Read & Write */
-      yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf));
-      if (retcode < 0) {
-        return set_cr_error(retcode);
-      }
+  b.bucket.name = tier_ctx.target_bucket_name;
+  target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+    tier_ctx.obj->get_name();
+  if (!tier_ctx.o.is_current()) {
+    target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+  }
 
-      return set_cr_done(); 
-    }
+  ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl;
+    return ret;
+  }
 
-    return 0;
+  dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+  if (!dest_obj) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+    return -1;
   }
-};
 
-class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine {
-  RGWLCCloudTierCtx& tier_ctx;
+  tier_ctx.obj->set_atomic(&tier_ctx.rctx);
+
+  /* Prepare Read from source */
+  /* TODO: Define readf, writef as stack variables. For some reason,
+   * when used as stack variables (esp., readf), the transition seems to
+   * be taking lot of time eventually erroring out at times.
+   */
+  std::shared_ptr<RGWLCStreamRead> readf;
+  readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
+        tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
 
-  string upload_id;
+  std::shared_ptr<RGWLCCloudStreamPut> writef;
+  writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
+               dest_obj.get()));
 
-  rgw_lc_multipart_part_info part_info;
+  /* actual Read & Write */
+  ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
 
-  string *petag;
-  std::shared_ptr<RGWStreamReadCRF> in_crf;
-  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+  return ret;
+}
 
+static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx,
+                                const std::string& upload_id,
+                                const rgw_lc_multipart_part_info& part_info,
+                                std::string *petag) {
+  int ret;
   std::unique_ptr<rgw::sal::Bucket> dest_bucket;
   std::unique_ptr<rgw::sal::Object> dest_obj;
 
-  rgw_lc_obj_properties obj_properties;
+  rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+                        tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+                        tier_ctx.target_storage_class);
   RGWBucketInfo b;
-  string target_obj_name;
+  std::string target_obj_name;
   off_t end;
 
-  public:
-  RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, const string& _upload_id,
-                                       const rgw_lc_multipart_part_info& _part_info,
-                                       string *_petag) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
-                                       upload_id(_upload_id), part_info(_part_info), petag(_petag),
-          obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
-                         tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
-                         tier_ctx.target_storage_class){}
-
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
-      b.bucket.name = tier_ctx.target_bucket_name;
-      target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
-                        (*tier_ctx.obj)->get_name();
-      if (!tier_ctx.o.is_current()) {
-        target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
-      }
-
-      retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl;
-        return retcode;
-      }
-  
-      dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
-      if (!dest_obj) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
-        return -1;
-      }
+  b.bucket.name = tier_ctx.target_bucket_name;
+  target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+    tier_ctx.obj->get_name();
+  if (!tier_ctx.o.is_current()) {
+    target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+  }
 
-    //  tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
+  ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl;
+    return ret;
+  }
 
-      /* Prepare Read from source */
-      in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, 
-                   tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+  dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+  if (!dest_obj) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+    return -1;
+  }
 
-      end = part_info.ofs + part_info.size - 1;
-      std::static_pointer_cast<RGWLCStreamReadCRF>(in_crf)->set_multipart(part_info.size, part_info.ofs, end);
+  tier_ctx.obj->set_atomic(&tier_ctx.rctx);
 
-      /* Prepare write */
-      out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
-                    (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn,
-                    dest_obj.get()));
+  /* TODO: Define readf, writef as stack variables. For some reason,
+   * when used as stack variables (esp., readf), the transition seems to
+   * be taking lot of time eventually erroring out at times. */
+  std::shared_ptr<RGWLCStreamRead> readf;
+  readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
+        tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
 
-      out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
+  std::shared_ptr<RGWLCCloudStreamPut> writef;
+  writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
+               dest_obj.get()));
 
-      /* actual Read & Write */
-      yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf));
-      if (retcode < 0) {
-        return set_cr_error(retcode);
-      }
+  /* Prepare Read from source */
+  end = part_info.ofs + part_info.size - 1;
+  readf->set_multipart(part_info.size, part_info.ofs, end);
 
-      if (!(static_cast<RGWLCStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
-        return set_cr_error(-EIO);
-      }
+  /* Prepare write */
+  writef->set_multipart(upload_id, part_info.part_num, part_info.size);
 
-      return set_cr_done(); 
-    }
+  /* actual Read & Write */
+  ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
+  if (ret < 0) {
+    return ret;
+  }
 
-    return 0;
+  if (!(writef->get_etag(petag))) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl;
+    return -EIO;
   }
-};
 
-class RGWLCAbortMultipartCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  RGWRESTConn *dest_conn;
-  rgw_obj dest_obj;
+  return 0;
+}
 
-  string upload_id;
+static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
+      RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
+      const std::string& upload_id) {
+  int ret;
+  bufferlist out_bl;
+  bufferlist bl;
+  rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
 
-  public:
-  RGWLCAbortMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager,
-                        RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj,
-                        const string& _upload_id) : RGWCoroutine(_cct),
-                        cct(_cct), http_manager(_http_manager),
-                        dest_conn(_dest_conn), dest_obj(_dest_obj),
-                        upload_id(_upload_id) {}
-
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
-
-      yield {
-        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
-        bufferlist bl;
-        call(new RGWDeleteRESTResourceCR(cct, dest_conn, http_manager,
-                                         obj_to_aws_path(dest_obj), params));
-      }
+  string resource = obj_to_aws_path(dest_obj);
+  ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr,
+      out_bl, &bl, nullptr, null_yield);
 
-      if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
-        return set_cr_error(retcode);
-      }
 
-      return set_cr_done();
-    }
-
-    return 0;
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (ret=" << ret << ")" << dendl;
+    return ret;
   }
-};
-
-class RGWLCInitMultipartCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  RGWRESTConn *dest_conn;
-  rgw_obj dest_obj;
 
-  uint64_t obj_size;
-  map<string, string> attrs;
+  return 0;
+}
 
+static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
+      RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
+      uint64_t obj_size, std::map<std::string, std::string>& attrs,
+      std::string& upload_id) {
   bufferlist out_bl;
-
-  string *upload_id;
+  bufferlist bl;
 
   struct InitMultipartResult {
-    string bucket;
-    string key;
-    string upload_id;
+    std::string bucket;
+    std::string key;
+    std::string upload_id;
 
     void decode_xml(XMLObj *obj) {
       RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
@@ -699,81 +914,67 @@ class RGWLCInitMultipartCR : public RGWCoroutine {
     }
   } result;
 
-  public:
-  RGWLCInitMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager,
-                       RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj,
-                       uint64_t _obj_size, const map<string, string>& _attrs,
-                       string *_upload_id) : RGWCoroutine(_cct), cct(_cct),
-                       http_manager(_http_manager), dest_conn(_dest_conn),
-                       dest_obj(_dest_obj), obj_size(_obj_size),
-                       attrs(_attrs), upload_id(_upload_id) {}
-
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
-
-      yield {
-        rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
-        bufferlist bl;
-        call(new RGWPostRawRESTResourceCR <bufferlist> (cct, dest_conn, http_manager,
-              obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
-      }
+  int ret;
+  rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
 
-      if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
-        return set_cr_error(retcode);
-      }
-      {
-        /*
-         * If one of the following fails we cannot abort upload, as we cannot
-         * extract the upload id. If one of these fail it's very likely that that's
-         * the least of our problem.
-         */
-        RGWXMLDecoder::XMLParser parser;
-        if (!parser.init()) {
-          ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
-          return set_cr_error(-EIO);
-        }
+  string resource = obj_to_aws_path(dest_obj);
 
-        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
-          string str(out_bl.c_str(), out_bl.length());
-          ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
-          return set_cr_error(-EIO);
-        }
+  ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs,
+      out_bl, &bl, nullptr, null_yield);
 
-        try {
-          RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
-        } catch (RGWXMLDecoder::err& err) {
-          string str(out_bl.c_str(), out_bl.length());
-          ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl;
-          return set_cr_error(-EIO);
-        }
-      }
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+    return ret;
+  }
+  /*
+   * If one of the following fails we cannot abort upload, as we cannot
+   * extract the upload id. If one of these fail it's very likely that that's
+   * the least of our problem.
+   */
+  RGWXMLDecoder::XMLParser parser;
+  if (!parser.init()) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+    return -EIO;
+  }
 
-      ldout(cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
+  if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+    string str(out_bl.c_str(), out_bl.length());
+    ldpp_dout(dpp, 5) << "ERROR: failed to parse xml initmultipart: " << str << dendl;
+    return -EIO;
+  }
 
-      *upload_id = result.upload_id;
+  try {
+    RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
+  } catch (RGWXMLDecoder::err& err) {
+    string str(out_bl.c_str(), out_bl.length());
+    ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
+    return -EIO;
+  }
 
-      return set_cr_done();
-    }
+  ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
 
-    return 0;
-  }
-};
+  upload_id = result.upload_id;
 
-class RGWLCCompleteMultipartCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  RGWRESTConn *dest_conn;
-  rgw_obj dest_obj;
+  return 0;
+}
 
-  bufferlist out_bl;
+static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp,
+      RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
+      std::string& upload_id,
+      const std::map<int, rgw_lc_multipart_part_info>& parts) {
+  rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+
+  stringstream ss;
+  XMLFormatter formatter;
+  int ret;
 
-  string upload_id;
+  bufferlist bl, out_bl;
+  string resource = obj_to_aws_path(dest_obj);
 
   struct CompleteMultipartReq {
-    map<int, rgw_lc_multipart_part_info> parts;
+    std::map<int, rgw_lc_multipart_part_info> parts;
 
-    explicit CompleteMultipartReq(const map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
+    explicit CompleteMultipartReq(const std::map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
 
     void dump_xml(Formatter *f) const {
       for (const auto& p : parts) {
@@ -783,13 +984,13 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine {
         f->close_section();
       };
     }
-  } req_enc;
+  } req_enc(parts);
 
   struct CompleteMultipartResult {
-    string location;
-    string bucket;
-    string key;
-    string etag;
+    std::string location;
+    std::string bucket;
+    std::string key;
+    std::string etag;
 
     void decode_xml(XMLObj *obj) {
       RGWXMLDecoder::decode_xml("Location", bucket, obj);
@@ -799,396 +1000,332 @@ class RGWLCCompleteMultipartCR : public RGWCoroutine {
     }
   } result;
 
-  public:
-  RGWLCCompleteMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager,
-                           RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj,
-                           string _upload_id, const map<int, rgw_lc_multipart_part_info>& _parts) :
-                           RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager),
-                           dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id),
-                           req_enc(_parts) {}
-
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
-
-      yield {
-        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
-        stringstream ss;
-        XMLFormatter formatter;
+  encode_xml("CompleteMultipartUpload", req_enc, &formatter);
 
-        encode_xml("CompleteMultipartUpload", req_enc, &formatter);
+  formatter.flush(ss);
+  bl.append(ss.str());
 
-        formatter.flush(ss);
+  ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
+      out_bl, &bl, nullptr, null_yield);
 
-        bufferlist bl;
-        bl.append(ss.str());
 
-        call(new RGWPostRawRESTResourceCR <bufferlist> (cct, dest_conn, http_manager,
-              obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
-      }
-
-      if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
-        return set_cr_error(retcode);
-      }
-      {
-        /*
-         * If one of the following fails we cannot abort upload, as we cannot
-         * extract the upload id. If one of these fail it's very likely that that's
-         * the least of our problem.
-         */
-        RGWXMLDecoder::XMLParser parser;
-        if (!parser.init()) {
-          ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
-          return set_cr_error(-EIO);
-        }
-
-        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
-          string str(out_bl.c_str(), out_bl.length());
-          ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
-          return set_cr_error(-EIO);
-        }
-
-        try {
-          RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
-        } catch (RGWXMLDecoder::err& err) {
-          string str(out_bl.c_str(), out_bl.length());
-          ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl;
-          return set_cr_error(-EIO);
-        }
-      }
-
-      ldout(cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
-
-      return set_cr_done();
-    }
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload for dest object=" << dest_obj << dendl;
+    return ret;
+  }
+  /*
+   * If one of the following fails we cannot abort upload, as we cannot
+   * extract the upload id. If one of these fail it's very likely that that's
+   * the least of our problem.
+   */
+  RGWXMLDecoder::XMLParser parser;
+  if (!parser.init()) {
+    ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+    return -EIO;
+  }
 
-    return 0;
+  if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+    string str(out_bl.c_str(), out_bl.length());
+    ldpp_dout(dpp, 5) << "ERROR: failed to parse xml Completemultipart: " << str << dendl;
+    return -EIO;
   }
-};
 
+  try {
+    RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
+  } catch (RGWXMLDecoder::err& err) {
+    string str(out_bl.c_str(), out_bl.length());
+    ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
+    return -EIO;
+  }
 
-class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine {
-  RGWLCCloudTierCtx& tier_ctx;
-  const rgw_obj dest_obj;
-  const rgw_raw_obj status_obj;
+  ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
 
-  string upload_id;
+  return ret;
+}
 
-  public:
+static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx& tier_ctx,
+      const rgw_obj& dest_obj, const rgw_raw_obj& status_obj,
+      const std::string& upload_id) {
+  int ret;
 
-  RGWLCStreamAbortMultipartUploadCR(RGWLCCloudTierCtx& _tier_ctx,
-                                    const rgw_obj& _dest_obj, const rgw_raw_obj& _status_obj,
-                                    const string& _upload_id) : RGWCoroutine(_tier_ctx.cct),
-                                    tier_ctx(_tier_ctx), dest_obj(_dest_obj), status_obj(_status_obj),
-                                    upload_id(_upload_id) {}
-
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
-      yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id));
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
-        /* ignore error, best effort */
-      }
-      yield call(new RGWRadosRemoveCR(dynamic_cast<rgw::sal::RadosStore*>(tier_ctx.store), status_obj));
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
-        /* ignore error, best effort */
-      }
-      return set_cr_done();
-    }
+  ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id);
 
-    return 0;
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl;
+    /* ignore error, best effort */
   }
-};
+  /* remove status obj */
+  ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl;
+    // ignore error, best effort 
+  }
+  return 0;
+}
 
-class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
-  RGWLCCloudTierCtx& tier_ctx;
-  RGWRESTConn *source_conn;
+static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
   rgw_obj src_obj;
   rgw_obj dest_obj;
 
   uint64_t obj_size;
-  string src_etag;
+  std::string src_etag;
   rgw_rest_obj rest_obj;
 
   rgw_lc_multipart_upload_info status;
-  std::shared_ptr<RGWStreamReadCRF> in_crf;
 
-  map<string, string> new_attrs;
+  std::map<std::string, std::string> new_attrs;
 
   rgw_raw_obj status_obj;
 
-  rgw_lc_obj_properties obj_properties;
   RGWBucketInfo b;
-  string target_obj_name;
+  std::string target_obj_name;
   rgw_bucket target_bucket;
-  rgw::sal::RadosStore *rados;
-
-  public:
-  RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx)
-    : RGWCoroutine(_tier_ctx.cct),  tier_ctx(_tier_ctx),
-          obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
-                         tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
-                         tier_ctx.target_storage_class){}
 
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
+  int ret;
 
-      obj_size = tier_ctx.o.meta.size;
+  rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+        tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+        tier_ctx.target_storage_class);
 
-      target_bucket.name = tier_ctx.target_bucket_name;
+  uint32_t part_size{0};
+  uint32_t num_parts{0};
 
-      target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
-                        (*tier_ctx.obj)->get_name();
-      if (!tier_ctx.o.is_current()) {
-        target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
-      }
-      dest_obj.init(target_bucket, target_obj_name);
+  int cur_part{0};
+  uint64_t cur_ofs{0};
+  std::map<int, rgw_lc_multipart_part_info> parts;
 
-      status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
-          "lc_multipart_" + (*tier_ctx.obj)->get_oid());
+  obj_size = tier_ctx.o.meta.size;
 
-      rados = dynamic_cast<rgw::sal::RadosStore*>(tier_ctx.store);
+  target_bucket.name = tier_ctx.target_bucket_name;
 
-      if (!rados) {
-        ldout(tier_ctx.cct, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
-        return -1;
-      }
+  target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+    tier_ctx.obj->get_name();
+  if (!tier_ctx.o.is_current()) {
+    target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+  }
+  dest_obj.init(target_bucket, target_obj_name);
 
-      yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj,
-                                                                 status_obj, &status, false));
+  status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
+      "lc_multipart_" + tier_ctx.obj->get_oid());
 
-      if (retcode < 0 && retcode != -ENOENT) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
-        return retcode;
-      }
+  ret = read_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status);
 
-      if (retcode >= 0) {
-        /* check here that mtime and size did not change */
-        if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
-            status.etag != obj_properties.etag) {
-          yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
-          retcode = -ENOENT;
-        }
-      }
+  if (ret < 0 && ret != -ENOENT) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl;
+    return ret;
+  }
 
-      if (retcode == -ENOENT) {
-        in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+  if (ret >= 0) {
+    // check here that mtime and size did not change 
+    if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
+        status.etag != obj_properties.etag) {
+      cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
+      ret = -ENOENT;
+    }
+  }
 
-        in_crf->init();
+  if (ret == -ENOENT) { 
+    RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime);
 
-        rest_obj = in_crf->get_rest_obj();
+    readf.init();
 
-        RGWLCStreamPutCRF::init_send_attrs(tier_ctx.cct, rest_obj, obj_properties, &new_attrs);
+    rest_obj = readf.get_rest_obj();
 
-        yield call(new RGWLCInitMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, obj_size, std::move(new_attrs), &status.upload_id));
-        if (retcode < 0) {
-          return set_cr_error(retcode);
-        }
+    RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs);
 
-        status.obj_size = obj_size;
-        status.mtime = obj_properties.mtime;
-        status.etag = obj_properties.etag;
-#define MULTIPART_MAX_PARTS 10000
-        uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
-        uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
+    ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id);
+    if (ret < 0) {
+      return ret;
+    }
 
-        if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
-          min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
-        }
+    status.obj_size = obj_size;
+    status.mtime = obj_properties.mtime;
+    status.etag = obj_properties.etag;
 
-        status.part_size = std::max(min_conf_size, min_part_size);
-        status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
-        status.cur_part = 1;
-        status.cur_ofs = 0;
-      }
+    ret = put_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status);
 
-      for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
-        ldout(tier_ctx.cct, 20) << "status.cur_part = "<<status.cur_part <<", info.ofs = "<< status.cur_ofs <<", info.size = "<< status.part_size<< ", obj size = " << status.obj_size<< ", status.num_parts:" << status.num_parts << dendl;
-        yield {
-          rgw_lc_multipart_part_info& cur_part_info = status.parts[status.cur_part];
-          cur_part_info.part_num = status.cur_part;
-          cur_part_info.ofs = status.cur_ofs;
-          cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
+    if (ret < 0) {
+      ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to store multipart upload state, ret=" << ret << dendl;
+      // continue with upload anyway 
+    }
 
-          status.cur_ofs += cur_part_info.size;
+#define MULTIPART_MAX_PARTS 10000
+#define MULTIPART_MAX_PARTS 10000
+    uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
+    uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
 
-          call(new RGWLCStreamObjToCloudMultipartPartCR(tier_ctx,
-                status.upload_id,
-                cur_part_info,
-                &cur_part_info.etag));
-        }
+    if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+      min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+    }
 
-        if (retcode < 0) {
-          ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
-          yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
-          return set_cr_error(retcode);
-        }
+    part_size = std::max(min_conf_size, min_part_size);
+    num_parts = (obj_size + part_size - 1) / part_size;
+    cur_part = 1;
+    cur_ofs = 0;
+  }
 
-      yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, status_obj, status));
+  for (; (uint32_t)cur_part <= num_parts; ++cur_part) {
+    ldpp_dout(tier_ctx.dpp, 20) << "cur_part = "<< cur_part << ", info.ofs = " << cur_ofs << ", info.size = " << part_size << ", obj size = " << obj_size<< ", num_parts:" << num_parts << dendl;
+    rgw_lc_multipart_part_info& cur_part_info = parts[cur_part];
+    cur_part_info.part_num = cur_part;
+    cur_part_info.ofs = cur_ofs;
+    cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
 
-        if (retcode < 0) {
-          ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
-          /* continue with upload anyway */
-        }
-        ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << status.parts[status.cur_part].etag << dendl;
-      }
+    cur_ofs += cur_part_info.size;
 
-      yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts));
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
-        yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
-        return set_cr_error(retcode);
-      }
+    ret = cloud_tier_send_multipart_part(tier_ctx,
+            status.upload_id,
+            cur_part_info,
+            &cur_part_info.etag);
 
-      /* remove status obj */
-      yield call(new RGWRadosRemoveCR(rados, status_obj));
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
-        /* ignore error, best effort */
-      }
-      return set_cr_done();
+    if (ret < 0) {
+      ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to send multipart part of obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-ret) << ")" << dendl;
+      cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
+      return ret;
     }
-    return 0;
+
   }
-};
 
-int RGWLCCloudCheckCR::operate(const DoutPrefixProvider *dpp) {
-  /* Check if object has already been transitioned */
-  reenter(this) {
-    b.bucket.name = tier_ctx.target_bucket_name;
-    target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
-                      (*tier_ctx.obj)->get_name();
-    if (!tier_ctx.o.is_current()) {
-      target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
-    }
+  ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl;
+    cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
+    return ret;
+  }
 
-    retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
-    if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << retcode << dendl;
-        return ret;
-    }
-  
-    dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
-    if (!dest_obj) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
-        return -1;
-    }
+  /* remove status obj */
+  ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj);
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl;
+    // ignore error, best effort 
+  }
+  return 0;
+}
 
-    get_crf.reset(new RGWLCStreamGetCRF(tier_ctx.cct, get_env(), this, tier_ctx.http_manager, obj_properties,
-                  tier_ctx.conn, dest_obj.get()));
-
-    /* Having yield here doesn't seem to wait for init2() to fetch the headers
-     * before calling is_already_tiered() below
-     */
-    yield {
-    retcode = get_crf->init(dpp);
-      if (retcode < 0) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , retcode = " << retcode << dendl;
-        return set_cr_error(ret);
-        }
-    }
-    if (retcode < 0) {
-      ldout(tier_ctx.cct, 20) << __func__ << ": get_crf()->init retcode=" << retcode << dendl;
-      return set_cr_error(retcode);
-    }
-    if (get_crf.get()->is_already_tiered()) {
-      *already_tiered = true;
-      ldout(tier_ctx.cct, 20) << "is_already_tiered true" << dendl;
-      return set_cr_done(); 
-    }
+/* Check if object has already been transitioned */
+static int cloud_tier_check_object(RGWLCCloudTierCtx& tier_ctx, bool& already_tiered) {
+  int ret;
+  std::map<std::string, std::string> headers;
 
-    ldout(tier_ctx.cct, 20) << "is_already_tiered false..going with out_crf writing" << dendl;
+  /* Fetch Head object */
+  ret = cloud_tier_get_object(tier_ctx, true, headers);
 
-    return set_cr_done();
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , ret = " << ret << dendl;
+    return ret;
+  }
+
+  already_tiered = is_already_tiered(tier_ctx.dpp, headers, tier_ctx.o.meta.mtime);
+
+  if (already_tiered) {
+    ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered true" << dendl;
+  } else {
+    ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered false..going with out_crf writing" << dendl;
   }
-  return 0;
-}
 
-map <pair<string, string>, utime_t> target_buckets;
+  return ret;
+}
 
-int RGWLCCloudTierCR::operate(const DoutPrefixProvider *dpp) {
+static int cloud_tier_create_bucket(RGWLCCloudTierCtx& tier_ctx) {
+  bufferlist out_bl;
+  int ret = 0;
   pair<string, string> key(tier_ctx.storage_class, tier_ctx.target_bucket_name);
-  bool bucket_created = false;
+  struct CreateBucketResult {
+    std::string code;
 
-  reenter(this) {
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("Code", code, obj);
+    }
+  } result;
 
-    if (target_buckets.find(key) != target_buckets.end()) {
-      utime_t t = target_buckets[key];
+  ldpp_dout(tier_ctx.dpp, 30) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl;
+  bufferlist bl;
+  string resource = tier_ctx.target_bucket_name;
 
-      utime_t now = ceph_clock_now();
+  ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr,
+                                    out_bl, &bl, nullptr, null_yield);
 
-      if (now - t <  (2 * cct->_conf->rgw_lc_debug_interval)) { /* not expired */
-        bucket_created = true;
-      }
+  if (ret < 0 ) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", ret:" << ret << dendl;
+    return ret;
+  }
+  if (out_bl.length() > 0) {
+    RGWXMLDecoder::XMLParser parser;
+    if (!parser.init()) {
+      ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
+      return -EIO;
     }
 
-    if (!bucket_created){
-      yield {
-        ldout(tier_ctx.cct,10) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl;
-        bufferlist bl;
-        call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
-             tier_ctx.http_manager,
-             tier_ctx.target_bucket_name, nullptr, bl, &out_bl));
-      }
-      if (retcode < 0 ) {
-        ldout(tier_ctx.cct, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", retcode:" << retcode << dendl;
-        return set_cr_error(retcode);
-      }
-      if (out_bl.length() > 0) {
-        RGWXMLDecoder::XMLParser parser;
-        if (!parser.init()) {
-          ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
-          return set_cr_error(-EIO);
-        }
+    if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+      string str(out_bl.c_str(), out_bl.length());
+      ldpp_dout(tier_ctx.dpp, 5) << "ERROR: failed to parse xml createbucket: " << str << dendl;
+      return -EIO;
+    }
 
-        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
-          string str(out_bl.c_str(), out_bl.length());
-          ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
-          return set_cr_error(-EIO);
-        }
+    try {
+      RGWXMLDecoder::decode_xml("Error", result, &parser, true);
+    } catch (RGWXMLDecoder::err& err) {
+      string str(out_bl.c_str(), out_bl.length());
+      ldpp_dout(tier_ctx.dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
+      return -EIO;
+    }
 
-        try {
-          RGWXMLDecoder::decode_xml("Error", result, &parser, true);
-        } catch (RGWXMLDecoder::err& err) {
-          string str(out_bl.c_str(), out_bl.length());
-          ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
-          return set_cr_error(-EIO);
-        }
+    if (result.code != "BucketAlreadyOwnedByYou") {
+      ldpp_dout(tier_ctx.dpp, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl;
+      return -EIO;
+    }
+  }
 
-        if (result.code != "BucketAlreadyOwnedByYou") {
-          ldout(tier_ctx.cct, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl;
-          return set_cr_error(-EIO);
-        }
-      }
+  return 0;
+}
+
+int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx& tier_ctx) {
+  int ret = 0;
+
+  /* If run first time attempt to create the target bucket */
+  if (!tier_ctx.target_bucket_created) {
+    ret = cloud_tier_create_bucket(tier_ctx);
 
-      target_buckets[key] = ceph_clock_now();
+    if (ret < 0) {
+      ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket on the cloud endpoint ret=" << ret << dendl;
+      return ret;
     }
+    tier_ctx.target_bucket_created = true;
+  }
 
-    yield {
-      uint64_t size = tier_ctx.o.meta.size;
-      uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
+  /* Since multiple zones may try to transition the same object to the cloud,
+   * verify if the object is already transitioned. And since its just a best
+   * effort, do not bail out in case of any errors.
+   */
+  bool already_tiered = false;
+  ret = cloud_tier_check_object(tier_ctx, already_tiered);
 
-      if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
-        multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE;
-      }
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to check object on the cloud endpoint ret=" << ret << dendl;
+  }
 
-      if (size < multipart_sync_threshold) {
-        call (new RGWLCStreamObjToCloudPlainCR(tier_ctx));
-      } else {
-        tier_ctx.is_multipart_upload = true;
-        call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx));
+  if (already_tiered) {
+    ldpp_dout(tier_ctx.dpp, 20) << "Object (" << tier_ctx.o.key << ") is already tiered" << dendl;
+    return 0;
+  }
 
-      } 
-    }
+  uint64_t size = tier_ctx.o.meta.size;
+  uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
 
-    if (retcode < 0) {
-      return set_cr_error(retcode);
-    }
+  if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+    multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+  }
 
-    return set_cr_done();
-  } //reenter
+  if (size < multipart_sync_threshold) {
+    ret = cloud_tier_plain_transfer(tier_ctx);
+  } else {
+    tier_ctx.is_multipart_upload = true;
+    ret = cloud_tier_multipart_transfer(tier_ctx);
+  } 
 
-  return 0;
-}
+  if (ret < 0) {
+    ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to transition object ret=" << ret << dendl;
+  }
 
+  return ret;
+}
index 91053f6ea76cc735ef3c47ee6fe0abc8d81083bd..86df479e8280847c23414253ce19cddd12892312 100644 (file)
@@ -5,13 +5,11 @@
 #define CEPH_RGW_LC_TIER_H
 
 #include "rgw_lc.h"
-#include "rgw_cr_rados.h"
 #include "rgw_rest_conn.h"
-#include "rgw_cr_rest.h"
-#include "rgw_coroutine.h"
 #include "rgw_rados.h"
 #include "rgw_zone.h"
 #include "rgw_sal_rados.h"
+#include "rgw_cr_rest.h"
 
 #define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
 #define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
@@ -24,209 +22,34 @@ struct RGWLCCloudTierCtx {
   rgw_bucket_dir_entry& o;
   rgw::sal::Store *store;
   RGWBucketInfo& bucket_info;
-  string storage_class;
+  std::string storage_class;
 
-  std::unique_ptr<rgw::sal::Object>* obj;
+  rgw::sal::Object *obj;
   RGWObjectCtx& rctx;
 
   /* Remote */
-  std::shared_ptr<RGWRESTConn> conn;
-  string target_bucket_name;
-  string target_storage_class;
-  RGWHTTPManager *http_manager;
+  RGWRESTConn& conn;
+  std::string target_bucket_name;
+  std::string target_storage_class;
 
-  map<string, RGWTierACLMapping> acl_mappings;
+  std::map<std::string, RGWTierACLMapping> acl_mappings;
   uint64_t multipart_min_part_size;
   uint64_t multipart_sync_threshold;
 
   bool is_multipart_upload{false};
+  bool target_bucket_created{true};
 
   RGWLCCloudTierCtx(CephContext* _cct, const DoutPrefixProvider *_dpp,
-            rgw_bucket_dir_entry& _o, rgw::sal::Store* _store,
-            RGWBucketInfo &_binfo, std::unique_ptr<rgw::sal::Object>* _obj,
-            RGWObjectCtx& _rctx, std::shared_ptr<RGWRESTConn> _conn, string _bucket,
-            string _storage_class, RGWHTTPManager *_http)
-            : cct(_cct), dpp(_dpp), o(_o), store(_store), bucket_info(_binfo),
-              obj(_obj), rctx(_rctx), conn(_conn), target_bucket_name(_bucket),
-              target_storage_class(_storage_class), http_manager(_http) {}
-};
-
-struct rgw_lc_multipart_part_info {
-  int part_num{0};
-  uint64_t ofs{0};
-  uint64_t size{0};
-  string etag;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(part_num, bl);
-    encode(ofs, bl);
-    encode(size, bl);
-    encode(etag, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(part_num, bl);
-    decode(ofs, bl);
-    decode(size, bl);
-    decode(etag, bl);
-    DECODE_FINISH(bl);
-  }
+      rgw_bucket_dir_entry& _o, rgw::sal::Store *_store,
+      RGWBucketInfo &_binfo, rgw::sal::Object *_obj,
+      RGWObjectCtx& _rctx, RGWRESTConn& _conn, std::string& _bucket,
+      std::string& _storage_class) :
+    cct(_cct), dpp(_dpp), o(_o), store(_store), bucket_info(_binfo),
+    obj(_obj), rctx(_rctx), conn(_conn), target_bucket_name(_bucket),
+    target_storage_class(_storage_class) {}
 };
-WRITE_CLASS_ENCODER(rgw_lc_multipart_part_info)
-
-struct rgw_lc_obj_properties {
-  ceph::real_time mtime;
-  string etag;
-  uint64_t versioned_epoch{0};
-  map<string, RGWTierACLMapping>& target_acl_mappings;
-  string target_storage_class;
-
-  rgw_lc_obj_properties(ceph::real_time _mtime, string _etag,
-                        uint64_t _versioned_epoch, map<string,
-                        RGWTierACLMapping>& _t_acl_mappings,
-                        string _t_storage_class) :
-                        mtime(_mtime), etag(_etag),
-                        versioned_epoch(_versioned_epoch),
-                        target_acl_mappings(_t_acl_mappings),
-                        target_storage_class(_t_storage_class) {}
-  
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(mtime, bl);
-    encode(etag, bl);
-    encode(versioned_epoch, bl);
-    encode(target_acl_mappings, bl);
-    encode(target_storage_class, bl);
-    ENCODE_FINISH(bl);
-  }
 
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(mtime, bl);
-    decode(etag, bl);
-    decode(versioned_epoch, bl);
-    decode(target_acl_mappings, bl);
-    decode(target_storage_class, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(rgw_lc_obj_properties)
-
-struct rgw_lc_multipart_upload_info {
-  string upload_id;
-  uint64_t obj_size;
-  ceph::real_time mtime;
-  string etag;
-  uint32_t part_size{0};
-  uint32_t num_parts{0};
-
-  int cur_part{0};
-  uint64_t cur_ofs{0};
-
-  std::map<int, rgw_lc_multipart_part_info> parts;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(upload_id, bl);
-    encode(obj_size, bl);
-    encode(mtime, bl);
-    encode(etag, bl);
-    encode(part_size, bl);
-    encode(num_parts, bl);
-    encode(cur_part, bl);
-    encode(cur_ofs, bl);
-    encode(parts, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(upload_id, bl);
-    decode(obj_size, bl);
-    decode(mtime, bl);
-    decode(etag, bl);
-    decode(part_size, bl);
-    decode(num_parts, bl);
-    decode(cur_part, bl);
-    decode(cur_ofs, bl);
-    decode(parts, bl);
-    DECODE_FINISH(bl);
-  }
-};
-WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info)
-
-class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF
-{
-  RGWRESTConn::get_obj_params req_params;
-
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  rgw_lc_obj_properties obj_properties;
-  std::shared_ptr<RGWRESTConn> conn;
-  rgw::sal::Object* dest_obj;
-  string etag;
-  RGWRESTStreamRWRequest *in_req;
-  map<string, string> headers;
-
-  public:
-  RGWLCStreamGetCRF(CephContext *_cct,
-      RGWCoroutinesEnv *_env,
-      RGWCoroutine *_caller,
-      RGWHTTPManager *_http_manager,
-      const rgw_lc_obj_properties&  _obj_properties,
-      std::shared_ptr<RGWRESTConn> _conn,
-      rgw::sal::Object* _dest_obj) :
-    RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()),
-                                 cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties),
-                                 conn(_conn), dest_obj(_dest_obj) {}
-  int init(const DoutPrefixProvider *dpp);
-  int is_already_tiered();
-};
-
-class RGWLCCloudTierCR : public RGWCoroutine {
-  RGWLCCloudTierCtx& tier_ctx;
-  bufferlist out_bl;
-  int retcode;
-  struct CreateBucketResult {
-    string code;
-
-    void decode_xml(XMLObj *obj) {
-      RGWXMLDecoder::decode_xml("Code", code, obj);
-    }
-  } result;
-
-  public:
-    RGWLCCloudTierCR(RGWLCCloudTierCtx& _tier_ctx) :
-          RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {}
-
-    int operate(const DoutPrefixProvider *dpp) override;
-};
-
-class RGWLCCloudCheckCR : public RGWCoroutine {
-  RGWLCCloudTierCtx& tier_ctx;
-  bufferlist bl;
-  bool need_retry{false};
-  int retcode;
-  bool *already_tiered;
-  rgw_lc_obj_properties obj_properties;
-  RGWBucketInfo b;
-  string target_obj_name;
-  int ret = 0;
-  std::unique_ptr<rgw::sal::Bucket> dest_bucket;
-  std::unique_ptr<rgw::sal::Object> dest_obj;
-  std::unique_ptr<RGWLCStreamGetCRF> get_crf;
-
-  public:
-    RGWLCCloudCheckCR(RGWLCCloudTierCtx& _tier_ctx, bool *_al_ti) :
-          RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), already_tiered(_al_ti),
-          obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
-                         tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
-                         tier_ctx.target_storage_class){}
-
-    int operate(const DoutPrefixProvider *dpp) override;
-};
+/* Transition object to cloud endpoint */
+int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx& tier_ctx);
 
 #endif
index 03bfefad5c27780a5e1dc3f2bf1ec4fa09a96e24..a3d82a66808b8da0a763db77a0ac580e8d5cc96c 100644 (file)
@@ -149,7 +149,7 @@ struct RGWObjManifestRule {
 WRITE_CLASS_ENCODER(RGWObjManifestRule)
 
 struct RGWObjTier {
-    string name;
+    std::string name;
     RGWZoneGroupPlacementTier tier_placement;
     bool is_multipart_upload{false};
 
@@ -193,7 +193,7 @@ protected:
 
   std::string tail_instance; /* tail object's instance */
 
-  string tier_type;
+  std::string tier_type;
   RGWObjTier tier_config;
 
   void convert_to_explicit(const DoutPrefixProvider *dpp, const RGWZoneGroup& zonegroup, const RGWZoneParams& zone_params);
@@ -452,7 +452,7 @@ public:
       return tier_type;
   }
 
-  inline void set_tier_type(string value) {
+  inline void set_tier_type(std::string value) {
       /* Only "cloud-s3" tier-type is supported for now */
       if (value == "cloud-s3") {
         tier_type = value;
index 751b3e72f5133a94a9cb7b4a27f192c0e529007e..20ad0d985b87ecbfeabeef64156b84713316b9b5 100644 (file)
@@ -2210,7 +2210,7 @@ void RGWGetObj::execute(optional_yield y)
   if (attr_iter != attrs.end() && get_type() == RGW_OP_GET_OBJ && get_data) {
     RGWObjManifest m;
     decode(m, attr_iter->second);
-    if (m.get_tier_type() == "cloud") {
+    if (m.get_tier_type() == "cloud-s3") {
       /* XXX: Instead send presigned redirect or read-through */
       op_ret = -ERR_INVALID_OBJECT_STATE;
       ldpp_dout(this, 0) << "ERROR: Cannot get cloud tiered object. Failing with "
@@ -3988,6 +3988,18 @@ void RGWPutObj::execute(optional_yield y)
       ldpp_dout(this, 0) << "ERROR: get copy source obj state returned with error" << op_ret << dendl;
       return;
     }
+    bufferlist bl;
+    if (astate->get_attr(RGW_ATTR_MANIFEST, bl)) {
+      RGWObjManifest m;
+      decode(m, bl);
+      if (m.get_tier_type() == "cloud-s3") {
+        op_ret = -ERR_INVALID_OBJECT_STATE;
+        ldpp_dout(this, 0) << "ERROR: Cannot copy cloud tiered object. Failing with "
+                      << op_ret << dendl;
+        return;
+      }
+    }
+
     if (!astate->exists){
       op_ret = -ENOENT;
       return;
@@ -5419,6 +5431,20 @@ void RGWCopyObj::execute(optional_yield y)
     if (op_ret < 0) {
       return;
     }
+
+    /* Check if the src object is cloud-tiered */
+    bufferlist bl;
+    if (astate->get_attr(RGW_ATTR_MANIFEST, bl)) {
+      RGWObjManifest m;
+      decode(m, bl);
+      if (m.get_tier_type() == "cloud-s3") {
+        op_ret = -ERR_INVALID_OBJECT_STATE;
+        ldpp_dout(this, 0) << "ERROR: Cannot copy cloud tiered object. Failing with "
+                      << op_ret << dendl;
+        return;
+      }
+    }
+
     obj_size = astate->size;
   
     if (!s->system_request) { // no quota enforcement for system requests
index 7214babb9b9c8a66ea32860b13d9968e06a7ffec..caf6ffa35c80274391ec9a86a5da4e4895d039c2 100644 (file)
@@ -221,7 +221,8 @@ public:
 
 class RGWRESTStreamSendRequest : public RGWRESTStreamRWRequest {
 public:
-  RGWRESTStreamSendRequest(CephContext *_cct, const string& method, const string& _url,
+  RGWRESTStreamSendRequest(CephContext *_cct, const std::string& method,
+                           const std::string& _url,
                            ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params,
                            std::optional<std::string> _api_name,
                            HostStyle _host_style = PathStyle) : RGWRESTStreamRWRequest(_cct, method, _url, _cb, _headers, _params, _api_name, _host_style) {}
index 726bc58a5aaa870c768dbf2259fce97c2fa6f132..afcc86db11cd8ca71d7cb5148e2104aacf2bcd8f 100644 (file)
@@ -376,12 +376,12 @@ int RGWRESTConn::get_resource(const DoutPrefixProvider *dpp,
   return req.complete_request(y);
 }
 
-int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const string& method,
-                        const string& resource, rgw_http_param_pair *extra_params,
-                               map<string, string> *extra_headers, bufferlist& bl,
+int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const std::string& method,
+                        const std::string& resource, rgw_http_param_pair *extra_params,
+                               std::map<std::string, std::string> *extra_headers, bufferlist& bl,
                         bufferlist *send_data, RGWHTTPManager *mgr, optional_yield y)
 {
-  string url;
+  std::string url;
   int ret = get_url(url);
   if (ret < 0)
     return ret;
@@ -398,7 +398,7 @@ int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const string& meth
 
   RGWRESTStreamSendRequest req(cct, method, url, &cb, NULL, &params, api_name, host_style);
 
-  map<string, string> headers;
+  std::map<std::string, std::string> headers;
   if (extra_headers) {
     headers.insert(extra_headers->begin(), extra_headers->end());
   }
index 0dfd0c7415c173d1f6cfac252f632d1e4a4a3286..cae8120705c3c0b57be18b242f082bdd8990ae4f 100644 (file)
@@ -201,10 +201,10 @@ public:
                    optional_yield y);
 
   int send_resource(const DoutPrefixProvider *dpp,
-                   const string& method,
-                   const string& resource,
+                   const std::string& method,
+                   const std::string& resource,
                           rgw_http_param_pair *extra_params,
-                   map<string, string>* extra_headers,
+                   std::map<std::string, std::string>* extra_headers,
                    bufferlist& bl,
                    bufferlist *send_data,
                    RGWHTTPManager *mgr,
index 05eb34db31a1d0f5d9590b5671b0bf99ecea062e..9a0066d9ad152d2be694c99ec341e49763b1cc82 100644 (file)
@@ -687,13 +687,13 @@ struct RGWTierACLMapping {
   RGWTierACLMapping() = default;
 
   RGWTierACLMapping(ACLGranteeTypeEnum t,
-             const string& s,
-             const string& d) : type(t),
+             const std::string& s,
+             const std::string& d) : type(t),
   source_id(s),
   dest_id(d) {}
 
   void init(const JSONFormattable& config) {
-    const string& t = config["type"];
+    const std::string& t = config["type"];
 
     if (t == "email") {
       type = ACL_TYPE_EMAIL_USER;
@@ -735,11 +735,11 @@ struct RGWZoneGroupPlacementTierS3 {
   RGWAccessKey key;
   std::string region;
   HostStyle host_style{PathStyle};
-  string target_storage_class;
+  std::string target_storage_class;
 
   /* Should below be bucket/zone specific?? */
-  string target_path;
-  map<string, RGWTierACLMapping> acl_mappings;
+  std::string target_path;
+  std::map<std::string, RGWTierACLMapping> acl_mappings;
 
   uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
   uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};