]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/CloudTransition: Verify if the object is already tiered
authorSoumya Koduri <skoduri@redhat.com>
Sun, 16 Aug 2020 09:01:50 +0000 (14:31 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Thu, 18 Nov 2021 07:22:48 +0000 (12:52 +0530)
Add class to fetch headers from remote endpoint and verify if the object
is already tiered.

& Few other fixes stated below -

* Erase data in the head of cloud transitioned object
* 'placement rm' command should erase tier_config details
* A new option added in the object manifest to denote if the
  object is tiered in multiparts

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_json_enc.cc
src/rgw/rgw_lc.cc
src/rgw/rgw_lc_tier.cc
src/rgw/rgw_lc_tier.h
src/rgw/rgw_obj_manifest.h

index 2378499fa94ef49b93af5f75a58251c9bcd8f0be..0552da0246268995da41fed3fd959fd6a173492b 100644 (file)
@@ -5109,6 +5109,11 @@ int main(int argc, const char **argv)
             if (iter != zonegroup.placement_targets.end()) {
               RGWZoneGroupPlacementTarget& info = zonegroup.placement_targets[placement_id];
               info.storage_classes.erase(*opt_storage_class);
+
+             auto ptiter = info.tier_targets.find(*opt_storage_class);
+             if (ptiter != info.tier_targets.end()) {
+               info.tier_targets.erase(ptiter);
+             }
             }
           }
         } else if (opt_cmd == OPT::ZONEGROUP_PLACEMENT_DEFAULT) {
index c054c12bb3c9e66e51a7f227f7216fc372474f23..df2f37c5e3a518358540796ac2d63d6ef9eb5a15 100644 (file)
@@ -103,6 +103,7 @@ void RGWObjTier::dump(Formatter *f) const
 {
   f->dump_string("name", name);
   f->dump_object("tier_placement", tier_placement);
+  f->dump_bool("is_multipart_upload", is_multipart_upload);
 }
 
 void rgw_bucket_placement::dump(Formatter *f) const
index 578e4453354330813e4d5f257428dd40f6aa3e4c..225b62667b29a502c7a679dce65471aa88553253 100644 (file)
@@ -1291,29 +1291,40 @@ public:
     RGWRados::Object op_target(tier_ctx.store->getRados(),
                                tier_ctx.bucket_info,
                                tier_ctx.rctx, tier_ctx.obj);
+    real_time read_mtime;
 
     RGWRados::Object::Read read_op(&op_target);
 
     read_op.params.attrs = &attrs;
+    read_op.params.lastmod = &read_mtime;
 
     int r = read_op.prepare(null_yield);
     if (r < 0) {
       return r;
     }
 
+    if (read_mtime != tier_ctx.o.meta.mtime) {
+      /* raced */
+      return -ECANCELED;
+    }
+
     tier_ctx.rctx.set_atomic(tier_ctx.obj);
     
     RGWRados::Object::Write obj_op(&op_target);
     RGWObjState *s = tier_ctx.rctx.get_state(tier_ctx.obj);
 
     obj_op.meta.modify_tail = true;
+    obj_op.meta.flags = PUT_OBJ_CREATE;
     obj_op.meta.category = RGWObjCategory::CloudTiered;
     obj_op.meta.delete_at = real_time();
-    obj_op.meta.data = NULL;
+    bufferlist blo;
+    blo.append("");
+    obj_op.meta.data = &blo;
     obj_op.meta.if_match = NULL;
     obj_op.meta.if_nomatch = NULL;
     obj_op.meta.user_data = NULL;
     obj_op.meta.zones_trace = NULL;
+    obj_op.meta.delete_at = real_time();
     
     RGWObjManifest *pmanifest; 
 
@@ -1321,6 +1332,7 @@ public:
     RGWObjTier tier_config;
     tier_config.name = oc.tier.storage_class;
     tier_config.tier_placement = oc.tier;
+    tier_config.is_multipart_upload = tier_ctx.is_multipart_upload;
 
     pmanifest->set_tier_type("cloud");
     pmanifest->set_tier_config(tier_config);
@@ -1336,7 +1348,9 @@ public:
     /* should the obj_size also be set to '0' or is it needed
      * to keep track of original size before transition. 
      * But unless obj_size is set to '0', obj_iters cannot
-     * be reset I guess
+     * be reset I guess. For regular transitioned objects
+     * obj_size remains the same even when object is moved to other
+     * storage class. So maybe better to keep it the same way.
      */
     //pmanifest->set_obj_size(0);
 
@@ -1347,6 +1361,8 @@ public:
     bl.append(oc.tier.storage_class);
     attrs[RGW_ATTR_STORAGE_CLASS] = bl;
 
+    attrs.erase(RGW_ATTR_ID_TAG);
+    attrs.erase(RGW_ATTR_TAIL_TAG);
 
     obj_op.write_meta(tier_ctx.o.meta.size, 0, attrs, null_yield);
     if (r < 0) {
@@ -1391,7 +1407,19 @@ public:
     tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold;
     tier_ctx.storage_class = oc.tier.storage_class;
 
-    ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
+    bool al_tiered = false;
+    ret = crs.run(new RGWLCCloudCheckCR(tier_ctx, &al_tiered));
+    
+    if (ret < 0) {
+      ldpp_dout(oc.dpp, 0) << "XXXXXXXXXXXXXX failed in RGWCloudCheckCR() ret=" << ret << dendl;
+    }
+
+    if (!al_tiered) {
+        ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered false" << dendl;
+          ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
+    } else {
+        ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered true" << dendl;
+    }
     http_manager.stop();
          
     if (ret < 0) {
index bb9c64516654a8833cf49258a31257e3c609b63e..b8af3e6ab2e681f5d316b7d53fd6d79e2e2e8e18 100644 (file)
@@ -96,6 +96,207 @@ static void init_headers(map<string, bufferlist>& attrs,
   }
 }
 
+static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
+{
+  for (auto header : headers) {
+    const string& val = header.second;
+    if (header.first == "RGWX_OBJECT_SIZE") {
+      info->content_len = atoi(val.c_str());
+    } else {
+      info->attrs[header.first] = val;
+    }
+  }
+
+  info->acls.set_ctx(cct);
+  auto aiter = attrs.find(RGW_ATTR_ACL);
+  if (aiter != attrs.end()) {
+    bufferlist& bl = aiter->second;
+    auto bliter = bl.cbegin();
+    try {
+      info->acls.decode(bliter);
+    } catch (buffer::error& err) {
+      ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+      return -EIO;
+    }
+  } else {
+    ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+  }
+
+  return 0;
+}
+
+class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF
+{
+  RGWRESTConn::get_obj_params req_params;
+
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  rgw_lc_obj_properties obj_properties;
+  std::shared_ptr<RGWRESTConn> conn;
+  rgw::sal::RGWObject* dest_obj;
+  string etag;
+  RGWRESTStreamRWRequest *in_req;
+  map<string, string> headers;
+
+public:
+  RGWLCStreamGetCRF(CephContext *_cct,
+      RGWCoroutinesEnv *_env,
+      RGWCoroutine *_caller,
+      RGWHTTPManager *_http_manager,
+      const rgw_lc_obj_properties&  _obj_properties,
+      std::shared_ptr<RGWRESTConn> _conn,
+      rgw::sal::RGWObject* _dest_obj) :
+    RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()),
+    cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
+    }
+
+
+
+  int init() override {
+    /* init input connection */
+
+    req_params.get_op = false; /* Need only headers */
+//    req_params.skip_decrypt = false;
+    req_params.prepend_metadata = true;
+    req_params.rgwx_stat = true;
+    req_params.sync_manifest = true;
+    req_params.skip_decrypt = true;
+
+//    req_params.unmod_ptr = &src_properties.mtime;
+//    req_params.etag = src_properties.etag;
+//    req_params.mod_zone_id = src_properties.zone_short_id;
+//    req_params.mod_pg_ver = src_properties.pg_ver;
+
+//    if (range.is_set) {
+//      req_params.range_is_set = true;
+//      req_params.range_start = range.ofs;
+//      req_params.range_end = range.ofs + range.size - 1;
+//    }
+
+    int ret = conn->get_obj(dest_obj, req_params, false /* send */, &in_req);
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+      return ret;
+    }
+
+    set_req(in_req);
+
+    return RGWStreamReadHTTPResourceCRF::init();
+  }
+
+  int init2()  {
+    /* init input connection */
+
+    req_params.get_op = false; /* Need only headers */
+//    req_params.skip_decrypt = false;
+    req_params.prepend_metadata = true;
+    req_params.rgwx_stat = true;
+    req_params.sync_manifest = true;
+    req_params.skip_decrypt = true;
+
+//    req_params.unmod_ptr = &src_properties.mtime;
+//    req_params.etag = src_properties.etag;
+//    req_params.mod_zone_id = src_properties.zone_short_id;
+//    req_params.mod_pg_ver = src_properties.pg_ver;
+
+//    if (range.is_set) {
+//      req_params.range_is_set = true;
+//      req_params.range_start = range.ofs;
+//      req_params.range_end = range.ofs + range.size - 1;
+//    }
+
+  string etag;
+  real_time set_mtime;
+
+    int ret = conn->get_obj(dest_obj, req_params, true /* send */, &in_req);
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+      return ret;
+    }
+
+  ret = conn->complete_request(in_req, nullptr, nullptr,
+                               nullptr, nullptr, &headers);
+  if (ret < 0 && ret != -ENOENT) {
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): XXXXXXXXXXXX conn->complete_request() returned ret=" << ret << dendl;
+      return ret;
+  }
+ //   set_req(in_req);
+
+   // return RGWStreamReadHTTPResourceCRF::init();
+   return 0;
+  }
+
+  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
+    map<string, bufferlist> src_attrs;
+
+    ldout(cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+
+    if (extra_data.length() > 0) {
+      JSONParser jp;
+      if (!jp.parse(extra_data.c_str(), extra_data.length())) {
+        ldout(cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
+        return -EIO;
+      }
+
+      JSONDecoder::decode_json("attrs", src_attrs, &jp);
+    }
+    return do_decode_rest_obj(cct, src_attrs, headers, &rest_obj);
+  }
+
+  void handle_headers(const map<string, string>& _headers) {
+    headers = _headers;
+  }
+
+  int is_already_tiered() {
+    char buf[32];
+    /*rgw_rest_obj rest_obj;
+    rest_obj.init(dest_obj->get_key());
+
+    
+    if (do_decode_rest_obj(cct, attrs, headers, &rest_obj)) {
+      ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
+      return set_cr_error(-EINVAL);
+    }
+
+  for (auto header : headers) {
+    const string& val = header.second;
+    if (header.first == "RGWX_OBJECT_SIZE") {
+      info->content_len = atoi(val.c_str());
+    } else {
+      info->attrs[header.first] = val;
+    }
+  }*/
+
+    map<string, string> attrs = headers;
+//     req->get_out_headers(&attrs);
+  //      get_attrs(&attrs);
+
+  for (auto a : attrs) {
+    ldout(cct, 0) << "XXXXXXXXXXXXXX GetCrf attr[" << a.first << "] = " << a.second <<dendl;
+  }
+    utime_t ut(obj_properties.mtime);
+    snprintf(buf, sizeof(buf), "%lld.%09lld",
+        (long long)ut.sec(),
+        (long long)ut.nsec());
+
+    string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
+
+    if (s.empty())
+        s = attrs["x_amz_meta_rgwx_source_mtime"];
+
+    ldout(cct, 0) << "XXXXXXXXXXXXXX is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
+    ldout(cct, 0) << "XXXXXXXXXXXXXX is_already_tiered mtime buf = " << buf <<dendl;
+    if (!s.empty() && !strcmp(s.c_str(), buf)){
+           return 1;
+    }
+    return 0;
+  }
+
+  bool need_extra_data() override {
+    return true;
+  }
+};
+
 class RGWLCStreamReadCRF : public RGWStreamReadCRF
 {
   CephContext *cct;
@@ -966,6 +1167,71 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
   }
 };
 
+int RGWLCCloudCheckCR::operate() {
+    /* Check if object has already been transitioned */
+     rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
+        tier_ctx.o.meta.etag,
+        tier_ctx.o.versioned_epoch,
+        tier_ctx.acl_mappings,
+        tier_ctx.target_storage_class);
+
+    rgw_bucket target_bucket;
+    string target_obj_name;
+
+    target_bucket.name = tier_ctx.target_bucket_name;
+    target_obj_name = tier_ctx.obj.key.name; // cross check with aws module
+
+    std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
+    dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+
+    std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
+    dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+
+
+//    std::shared_ptr<RGWStreamReadHTTPResourceCRF> get_crf;
+    std::shared_ptr<RGWLCStreamGetCRF> get_crf;
+    get_crf.reset(new RGWLCStreamGetCRF((CephContext *)(tier_ctx.cct), get_env(), this,
+                  (RGWHTTPManager*)(tier_ctx.http_manager),
+                  obj_properties, tier_ctx.conn, static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
+   int ret;
+
+//   yield {
+    ret = get_crf->init2();
+ //  }
+    if (ret < 0) {
+        ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->init failed, ret = " << ret << dendl;
+       return set_cr_error(ret);
+    }
+//reenter(this) {
+       bl.clear();
+/*     do {
+//     yield {
+       ret = get_crf->get_headers(&need_retry);
+       if (ret < 0) {
+               ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->read failed, ret = " << ret << dendl;
+               return set_cr_error(ret);
+//     }
+       }
+        if (retcode < 0) {
+          ldout(cct, 20) << __func__ << ": in_crf->read() retcode=" << retcode << dendl;
+          return set_cr_error(ret);
+        }
+       } while (need_retry); */
+
+        if ((static_cast<RGWLCStreamGetCRF *>(get_crf.get()))->is_already_tiered()) {
+       *already_tiered = true;
+        ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered true" << dendl;
+      return set_cr_done(); 
+       }
+
+    ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered false..going with out_crf writing" << dendl;
+
+    return set_cr_done();
+//  } //reenter
+
+  return 0;
+}
+
 map <pair<string, string>, utime_t> target_buckets;
 
 int RGWLCCloudTierCR::operate() {
@@ -1035,6 +1301,7 @@ int RGWLCCloudTierCR::operate() {
       if (size < multipart_sync_threshold) {
         call (new RGWLCStreamObjToCloudPlainCR(tier_ctx));
       } else {
+        tier_ctx.is_multipart_upload = true;
         call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx));
 
       } 
index 070920c7c8299dca00282f43e8062a60e67c66e6..0e6477f9512abad4ea936769d3865f4a77df5cbe 100644 (file)
@@ -37,6 +37,8 @@ struct RGWLCCloudTierCtx {
   uint64_t multipart_min_part_size;
   uint64_t multipart_sync_threshold;
 
+  bool is_multipart_upload{false};
+
   RGWLCCloudTierCtx(CephContext* _cct, rgw_bucket_dir_entry& _o,
             rgw::sal::RGWRadosStore* _store, RGWBucketInfo &_binfo, rgw_obj _obj,
             RGWObjectCtx& _rctx, std::shared_ptr<RGWRESTConn> _conn, string _bucket,
@@ -65,6 +67,20 @@ class RGWLCCloudTierCR : public RGWCoroutine {
     int operate() override;
 };
 
+class RGWLCCloudCheckCR : public RGWCoroutine {
+  RGWLCCloudTierCtx& tier_ctx;
+  bufferlist bl;
+  bool need_retry{false};
+  int retcode;
+  bool *already_tiered;
+
+  public:
+    RGWLCCloudCheckCR(RGWLCCloudTierCtx& _tier_ctx, bool *_al_ti) :
+          RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx), already_tiered(_al_ti) {}
+
+    int operate() override;
+};
+
 struct rgw_lc_multipart_part_info {
   int part_num{0};
   uint64_t ofs{0};
index 9cb7fec8216b73ad0d65a194fba690b2577b2ad8..b5a31b5f7be9b6abae92c9ed7a5993a03ae3d720 100644 (file)
@@ -151,7 +151,8 @@ WRITE_CLASS_ENCODER(RGWObjManifestRule)
 struct RGWObjTier {
     string name;
     RGWZoneGroupPlacementTier tier_placement;
-    /* XXX: Add multipart upload details */
+    bool is_multipart_upload{false};
+    /* XXX: Add any multipart upload details */
 
     RGWObjTier(): name("none") {}
 
@@ -159,6 +160,7 @@ struct RGWObjTier {
       ENCODE_START(2, 2, bl);
       encode(name, bl);
       encode(tier_placement, bl);
+      encode(is_multipart_upload, bl);
       ENCODE_FINISH(bl);
     }
 
@@ -166,6 +168,7 @@ struct RGWObjTier {
       DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
       decode(name, bl);
       decode(tier_placement, bl);
+      decode(is_multipart_upload, bl);
       DECODE_FINISH(bl);
     }
     void dump(Formatter *f) const;
@@ -466,6 +469,7 @@ public:
 
       tier_config.name = t.name;
       tier_config.tier_placement = t.tier_placement;
+      tier_config.is_multipart_upload = t.is_multipart_upload;
   }
 
   void get_tier_config(RGWObjTier* t) {
@@ -474,6 +478,7 @@ public:
 
       t->name = tier_config.name;
       t->tier_placement = tier_config.tier_placement;
+      t->is_multipart_upload = tier_config.is_multipart_upload;
   }
 
   class obj_iterator {