RGWRados::Object op_target(tier_ctx.store->getRados(),
tier_ctx.bucket_info,
tier_ctx.rctx, tier_ctx.obj);
+ real_time read_mtime;
RGWRados::Object::Read read_op(&op_target);
read_op.params.attrs = &attrs;
+ read_op.params.lastmod = &read_mtime;
int r = read_op.prepare(null_yield);
if (r < 0) {
return r;
}
+ if (read_mtime != tier_ctx.o.meta.mtime) {
+ /* raced */
+ return -ECANCELED;
+ }
+
tier_ctx.rctx.set_atomic(tier_ctx.obj);
RGWRados::Object::Write obj_op(&op_target);
RGWObjState *s = tier_ctx.rctx.get_state(tier_ctx.obj);
obj_op.meta.modify_tail = true;
+ obj_op.meta.flags = PUT_OBJ_CREATE;
obj_op.meta.category = RGWObjCategory::CloudTiered;
obj_op.meta.delete_at = real_time();
- obj_op.meta.data = NULL;
+ bufferlist blo;
+ blo.append("");
+ obj_op.meta.data = &blo;
obj_op.meta.if_match = NULL;
obj_op.meta.if_nomatch = NULL;
obj_op.meta.user_data = NULL;
obj_op.meta.zones_trace = NULL;
+ obj_op.meta.delete_at = real_time();
RGWObjManifest *pmanifest;
RGWObjTier tier_config;
tier_config.name = oc.tier.storage_class;
tier_config.tier_placement = oc.tier;
+ tier_config.is_multipart_upload = tier_ctx.is_multipart_upload;
pmanifest->set_tier_type("cloud");
pmanifest->set_tier_config(tier_config);
/* should the obj_size also be set to '0' or is it needed
* to keep track of original size before transition.
* But unless obj_size is set to '0', obj_iters cannot
- * be reset I guess
+ * be reset I guess. For regular transitioned objects
+ * obj_size remains the same even when object is moved to other
+ * storage class. So maybe better to keep it the same way.
*/
//pmanifest->set_obj_size(0);
bl.append(oc.tier.storage_class);
attrs[RGW_ATTR_STORAGE_CLASS] = bl;
+ attrs.erase(RGW_ATTR_ID_TAG);
+ attrs.erase(RGW_ATTR_TAIL_TAG);
obj_op.write_meta(tier_ctx.o.meta.size, 0, attrs, null_yield);
if (r < 0) {
tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold;
tier_ctx.storage_class = oc.tier.storage_class;
- ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
+ bool al_tiered = false;
+ ret = crs.run(new RGWLCCloudCheckCR(tier_ctx, &al_tiered));
+
+ if (ret < 0) {
+ ldpp_dout(oc.dpp, 0) << "XXXXXXXXXXXXXX failed in RGWCloudCheckCR() ret=" << ret << dendl;
+ }
+
+ if (!al_tiered) {
+ ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered false" << dendl;
+ ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
+ } else {
+ ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered true" << dendl;
+ }
http_manager.stop();
if (ret < 0) {
}
}
+static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
+{
+ for (auto header : headers) {
+ const string& val = header.second;
+ if (header.first == "RGWX_OBJECT_SIZE") {
+ info->content_len = atoi(val.c_str());
+ } else {
+ info->attrs[header.first] = val;
+ }
+ }
+
+ info->acls.set_ctx(cct);
+ auto aiter = attrs.find(RGW_ATTR_ACL);
+ if (aiter != attrs.end()) {
+ bufferlist& bl = aiter->second;
+ auto bliter = bl.cbegin();
+ try {
+ info->acls.decode(bliter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+ return -EIO;
+ }
+ } else {
+ ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+ }
+
+ return 0;
+}
+
+class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF
+{
+ RGWRESTConn::get_obj_params req_params;
+
+ CephContext *cct;
+ RGWHTTPManager *http_manager;
+ rgw_lc_obj_properties obj_properties;
+ std::shared_ptr<RGWRESTConn> conn;
+ rgw::sal::RGWObject* dest_obj;
+ string etag;
+ RGWRESTStreamRWRequest *in_req;
+ map<string, string> headers;
+
+public:
+ RGWLCStreamGetCRF(CephContext *_cct,
+ RGWCoroutinesEnv *_env,
+ RGWCoroutine *_caller,
+ RGWHTTPManager *_http_manager,
+ const rgw_lc_obj_properties& _obj_properties,
+ std::shared_ptr<RGWRESTConn> _conn,
+ rgw::sal::RGWObject* _dest_obj) :
+ RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()),
+ cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
+ }
+
+
+
+ int init() override {
+ /* init input connection */
+
+ req_params.get_op = false; /* Need only headers */
+// req_params.skip_decrypt = false;
+ req_params.prepend_metadata = true;
+ req_params.rgwx_stat = true;
+ req_params.sync_manifest = true;
+ req_params.skip_decrypt = true;
+
+// req_params.unmod_ptr = &src_properties.mtime;
+// req_params.etag = src_properties.etag;
+// req_params.mod_zone_id = src_properties.zone_short_id;
+// req_params.mod_pg_ver = src_properties.pg_ver;
+
+// if (range.is_set) {
+// req_params.range_is_set = true;
+// req_params.range_start = range.ofs;
+// req_params.range_end = range.ofs + range.size - 1;
+// }
+
+ int ret = conn->get_obj(dest_obj, req_params, false /* send */, &in_req);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ set_req(in_req);
+
+ return RGWStreamReadHTTPResourceCRF::init();
+ }
+
+ int init2() {
+ /* init input connection */
+
+ req_params.get_op = false; /* Need only headers */
+// req_params.skip_decrypt = false;
+ req_params.prepend_metadata = true;
+ req_params.rgwx_stat = true;
+ req_params.sync_manifest = true;
+ req_params.skip_decrypt = true;
+
+// req_params.unmod_ptr = &src_properties.mtime;
+// req_params.etag = src_properties.etag;
+// req_params.mod_zone_id = src_properties.zone_short_id;
+// req_params.mod_pg_ver = src_properties.pg_ver;
+
+// if (range.is_set) {
+// req_params.range_is_set = true;
+// req_params.range_start = range.ofs;
+// req_params.range_end = range.ofs + range.size - 1;
+// }
+
+ string etag;
+ real_time set_mtime;
+
+ int ret = conn->get_obj(dest_obj, req_params, true /* send */, &in_req);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ret = conn->complete_request(in_req, nullptr, nullptr,
+ nullptr, nullptr, &headers);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): XXXXXXXXXXXX conn->complete_request() returned ret=" << ret << dendl;
+ return ret;
+ }
+ // set_req(in_req);
+
+ // return RGWStreamReadHTTPResourceCRF::init();
+ return 0;
+ }
+
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
+ map<string, bufferlist> src_attrs;
+
+ ldout(cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+
+ if (extra_data.length() > 0) {
+ JSONParser jp;
+ if (!jp.parse(extra_data.c_str(), extra_data.length())) {
+ ldout(cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
+ return -EIO;
+ }
+
+ JSONDecoder::decode_json("attrs", src_attrs, &jp);
+ }
+ return do_decode_rest_obj(cct, src_attrs, headers, &rest_obj);
+ }
+
+ void handle_headers(const map<string, string>& _headers) {
+ headers = _headers;
+ }
+
+ int is_already_tiered() {
+ char buf[32];
+ /*rgw_rest_obj rest_obj;
+ rest_obj.init(dest_obj->get_key());
+
+
+ if (do_decode_rest_obj(cct, attrs, headers, &rest_obj)) {
+ ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
+ return set_cr_error(-EINVAL);
+ }
+
+ for (auto header : headers) {
+ const string& val = header.second;
+ if (header.first == "RGWX_OBJECT_SIZE") {
+ info->content_len = atoi(val.c_str());
+ } else {
+ info->attrs[header.first] = val;
+ }
+ }*/
+
+ map<string, string> attrs = headers;
+// req->get_out_headers(&attrs);
+ // get_attrs(&attrs);
+
+ for (auto a : attrs) {
+ ldout(cct, 0) << "XXXXXXXXXXXXXX GetCrf attr[" << a.first << "] = " << a.second <<dendl;
+ }
+ utime_t ut(obj_properties.mtime);
+ snprintf(buf, sizeof(buf), "%lld.%09lld",
+ (long long)ut.sec(),
+ (long long)ut.nsec());
+
+ string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
+
+ if (s.empty())
+ s = attrs["x_amz_meta_rgwx_source_mtime"];
+
+ ldout(cct, 0) << "XXXXXXXXXXXXXX is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
+ ldout(cct, 0) << "XXXXXXXXXXXXXX is_already_tiered mtime buf = " << buf <<dendl;
+ if (!s.empty() && !strcmp(s.c_str(), buf)){
+ return 1;
+ }
+ return 0;
+ }
+
+ bool need_extra_data() override {
+ return true;
+ }
+};
+
class RGWLCStreamReadCRF : public RGWStreamReadCRF
{
CephContext *cct;
}
};
+int RGWLCCloudCheckCR::operate() {
+ /* Check if object has already been transitioned */
+ rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
+ tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch,
+ tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class);
+
+ rgw_bucket target_bucket;
+ string target_obj_name;
+
+ target_bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.obj.key.name; // cross check with aws module
+
+ std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
+ dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+
+ std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
+ dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+
+
+// std::shared_ptr<RGWStreamReadHTTPResourceCRF> get_crf;
+ std::shared_ptr<RGWLCStreamGetCRF> get_crf;
+ get_crf.reset(new RGWLCStreamGetCRF((CephContext *)(tier_ctx.cct), get_env(), this,
+ (RGWHTTPManager*)(tier_ctx.http_manager),
+ obj_properties, tier_ctx.conn, static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
+ int ret;
+
+// yield {
+ ret = get_crf->init2();
+ // }
+ if (ret < 0) {
+ ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->init failed, ret = " << ret << dendl;
+ return set_cr_error(ret);
+ }
+//reenter(this) {
+ bl.clear();
+/* do {
+// yield {
+ ret = get_crf->get_headers(&need_retry);
+ if (ret < 0) {
+ ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->read failed, ret = " << ret << dendl;
+ return set_cr_error(ret);
+// }
+ }
+ if (retcode < 0) {
+ ldout(cct, 20) << __func__ << ": in_crf->read() retcode=" << retcode << dendl;
+ return set_cr_error(ret);
+ }
+ } while (need_retry); */
+
+ if ((static_cast<RGWLCStreamGetCRF *>(get_crf.get()))->is_already_tiered()) {
+ *already_tiered = true;
+ ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered true" << dendl;
+ return set_cr_done();
+ }
+
+ ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered false..going with out_crf writing" << dendl;
+
+ return set_cr_done();
+// } //reenter
+
+ return 0;
+}
+
map <pair<string, string>, utime_t> target_buckets;
int RGWLCCloudTierCR::operate() {
if (size < multipart_sync_threshold) {
call (new RGWLCStreamObjToCloudPlainCR(tier_ctx));
} else {
+ tier_ctx.is_multipart_upload = true;
call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx));
}