#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
-#include <boost/asio/yield.hpp>
-
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
using namespace std;
+struct rgw_lc_multipart_part_info {
+ int part_num{0};
+ uint64_t ofs{0};
+ uint64_t size{0};
+ std::string etag;
+};
+
+struct rgw_lc_obj_properties {
+ ceph::real_time mtime;
+ std::string etag;
+ uint64_t versioned_epoch{0};
+ std::map<std::string, RGWTierACLMapping>& target_acl_mappings;
+ std::string target_storage_class;
+
+ rgw_lc_obj_properties(ceph::real_time _mtime, std::string _etag,
+ uint64_t _versioned_epoch, std::map<std::string,
+ RGWTierACLMapping>& _t_acl_mappings,
+ std::string _t_storage_class) :
+ mtime(_mtime), etag(_etag),
+ versioned_epoch(_versioned_epoch),
+ target_acl_mappings(_t_acl_mappings),
+ target_storage_class(_t_storage_class) {}
+};
+
+struct rgw_lc_multipart_upload_info {
+ std::string upload_id;
+ uint64_t obj_size;
+ ceph::real_time mtime;
+ std::string etag;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(upload_id, bl);
+ encode(obj_size, bl);
+ encode(mtime, bl);
+ encode(etag, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(upload_id, bl);
+ decode(obj_size, bl);
+ decode(mtime, bl);
+ decode(etag, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_lc_multipart_upload_info)
+
static inline string get_key_instance(const rgw_obj_key& key)
{
if (!key.instance.empty() &&
return path;
}
+static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
+ const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+{
+ int ret = 0;
+ rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
+
+ if (!rados) {
+ ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+ return -1;
+ }
+
+ auto& pool = status_obj->pool;
+ const auto oid = status_obj->oid;
+ auto obj_ctx = rados->svc()->sysobj->init_obj_ctx();
+ bufferlist bl;
+
+ ret = rgw_get_system_obj(obj_ctx, pool, oid, bl, nullptr, nullptr,
+ null_yield, dpp);
+
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bl.length() > 0) {
+ try {
+ auto p = bl.cbegin();
+ status->decode(p);
+ } catch (buffer::error& e) {
+ ldpp_dout(dpp, 10) << "failed to decode status obj: "
+ << e.what() << dendl;
+ return -EIO;
+ }
+ } else {
+ return -EIO;
+ }
+
+ return 0;
+}
+
+static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
+ const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+{
+ int ret = 0;
+ rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
+
+ if (!rados) {
+ ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+ return -1;
+ }
+
+ auto& pool = status_obj->pool;
+ const auto oid = status_obj->oid;
+ auto obj_ctx = rados->svc()->sysobj->init_obj_ctx();
+ bufferlist bl;
+ status->encode(bl);
+
+ ret = rgw_put_system_obj(dpp, obj_ctx, pool, oid, bl, true, nullptr,
+ real_time{}, null_yield);
+
+ return ret;
+}
+
+static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Store *store,
+ const rgw_raw_obj *status_obj)
+{
+ int ret = 0;
+ rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(store);
+
+ if (!rados) {
+ ldpp_dout(dpp, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+ return -1;
+ }
+
+ auto& pool = status_obj->pool;
+ const auto oid = status_obj->oid;
+ auto sysobj = rados->svc()->sysobj;
+
+ ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield);
+
+ return ret;
+}
+
static std::set<string> keep_headers = { "CONTENT_TYPE",
- "CONTENT_ENCODING",
- "CONTENT_DISPOSITION",
- "CONTENT_LANGUAGE" };
+ "CONTENT_ENCODING",
+ "CONTENT_DISPOSITION",
+ "CONTENT_LANGUAGE" };
/*
* mapping between rgw object attrs and output http fields
}; */
static void init_headers(map<string, bufferlist>& attrs,
- map<string, string>& headers)
+ map<string, string>& headers)
{
for (auto& kv : attrs) {
const char * name = kv.first.c_str();
if (aiter != std::end(rgw_to_http_attrs)) {
headers[aiter->second] = rgw_bl_str(kv.second);
- } else if (strcmp(name, RGW_ATTR_SLO_UINDICATOR) == 0) {
- // this attr has an extra length prefix from encode() in prior versions
- headers["X-Object-Meta-Static-Large-Object"] = "True";
} else if (strncmp(name, RGW_ATTR_META_PREFIX,
sizeof(RGW_ATTR_META_PREFIX)-1) == 0) {
name += sizeof(RGW_ATTR_META_PREFIX) - 1;
string sname(name);
- string name_prefix = "X-Object-Meta-";
+ string name_prefix = RGW_ATTR_META_PREFIX;
char full_name_buf[name_prefix.size() + sname.size() + 1];
snprintf(full_name_buf, sizeof(full_name_buf), "%.*s%.*s",
static_cast<int>(name_prefix.length()),
sname.data());
headers[full_name_buf] = rgw_bl_str(kv.second);
} else if (strcmp(name,RGW_ATTR_CONTENT_TYPE) == 0) {
- /* Verify if its right way to copy this field */
headers["CONTENT_TYPE"] = rgw_bl_str(kv.second);
}
}
}
-int RGWLCStreamGetCRF::init(const DoutPrefixProvider *dpp) {
- /* init input connection */
- req_params.get_op = false; /* Need only headers */
- req_params.prepend_metadata = true;
- req_params.rgwx_stat = true;
- req_params.sync_manifest = true;
- req_params.skip_decrypt = true;
+/* Read object or just head from remote endpoint. For now initializes only headers,
+ * but can be extended to fetch etag, mtime etc if needed.
+ */
+static int cloud_tier_get_object(RGWLCCloudTierCtx& tier_ctx, bool head,
+ std::map<std::string, std::string>& headers) {
+ RGWRESTConn::get_obj_params req_params;
+ RGWBucketInfo b;
+ std::string target_obj_name;
+ int ret = 0;
+ std::unique_ptr<rgw::sal::Bucket> dest_bucket;
+ std::unique_ptr<rgw::sal::Object> dest_obj;
+ rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class);
+ std::string etag;
+ RGWRESTStreamRWRequest *in_req;
+
+ b.bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ tier_ctx.obj->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+ }
- string etag;
- real_time set_mtime;
+ ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << ret << dendl;
+ return ret;
+ }
- int ret = conn->get_obj(dpp, dest_obj, req_params, true /* send */, &in_req);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
- return ret;
- }
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+ if (!dest_obj) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+ return -1;
+ }
+ /* init input connection */
+ req_params.get_op = !head;
+ req_params.prepend_metadata = true;
+ req_params.rgwx_stat = true;
+ req_params.sync_manifest = true;
+ req_params.skip_decrypt = true;
+
+ ret = tier_ctx.conn.get_obj(tier_ctx.dpp, dest_obj.get(), req_params, true /* send */, &in_req);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: " << __func__ << "(): conn.get_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
- /* fetch only headers */
- ret = conn->complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- ldout(cct, 20) << "ERROR: " << __func__ << "(): conn->complete_request() returned ret=" << ret << dendl;
- return ret;
- }
- return 0;
+ /* fetch headers */
+ ret = tier_ctx.conn.complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(tier_ctx.dpp, 20) << "ERROR: " << __func__ << "(): conn.complete_request() returned ret=" << ret << dendl;
+ return ret;
}
+ return 0;
+}
-int RGWLCStreamGetCRF::is_already_tiered() {
- char buf[32];
- map<string, string> attrs = headers;
+static bool is_already_tiered(const DoutPrefixProvider *dpp,
+ std::map<std::string, std::string>& headers,
+ ceph::real_time& mtime) {
+ char buf[32];
+ map<string, string> attrs = headers;
- for (const auto& a : attrs) {
- ldout(cct, 20) << "GetCrf attr[" << a.first << "] = " << a.second <<dendl;
- }
- utime_t ut(obj_properties.mtime);
- snprintf(buf, sizeof(buf), "%lld.%09lld",
- (long long)ut.sec(),
- (long long)ut.nsec());
+ for (const auto& a : attrs) {
+ ldpp_dout(dpp, 20) << "GetCrf attr[" << a.first << "] = " << a.second <<dendl;
+ }
+ utime_t ut(mtime);
+ snprintf(buf, sizeof(buf), "%lld.%09lld",
+ (long long)ut.sec(),
+ (long long)ut.nsec());
- string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
+ string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];
- if (s.empty())
- s = attrs["x_amz_meta_rgwx_source_mtime"];
+ if (s.empty())
+ s = attrs["x_amz_meta_rgwx_source_mtime"];
- ldout(cct, 20) << "is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
- ldout(cct, 20) << "is_already_tiered mtime buf = " << buf <<dendl;
+ ldpp_dout(dpp, 20) << "is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
+ ldpp_dout(dpp, 20) << "is_already_tiered mtime buf = " << buf <<dendl;
- if (!s.empty() && !strcmp(s.c_str(), buf)){
- return 1;
- }
- return 0;
+ if (!s.empty() && !strcmp(s.c_str(), buf)){
+ return 1;
}
+ return 0;
+}
-class RGWLCStreamReadCRF : public RGWStreamReadCRF
+/* Read object locally & also initialize dest rest obj based on read attrs */
+class RGWLCStreamRead
{
CephContext *cct;
const DoutPrefixProvider *dpp;
- map<string, bufferlist> attrs;
+ std::map<std::string, bufferlist> attrs;
uint64_t obj_size;
- std::unique_ptr<rgw::sal::Object>* obj;
+ rgw::sal::Object *obj;
const real_time &mtime;
bool multipart;
off_t m_part_off;
off_t m_part_end;
- public:
- RGWLCStreamReadCRF(CephContext *_cct, const DoutPrefixProvider *_dpp,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object>* _obj,
- const real_time &_mtime) :
- RGWStreamReadCRF(_obj, obj_ctx), cct(_cct),
- dpp(_dpp), obj(_obj), mtime(_mtime) {}
+ std::unique_ptr<rgw::sal::Object::ReadOp> read_op;
+ off_t ofs;
+ off_t end;
+ rgw_rest_obj rest_obj;
- ~RGWLCStreamReadCRF() {};
+ int retcode;
- void set_multipart(uint64_t part_size, off_t part_off, off_t part_end) {
- multipart = true;
- m_part_size = part_size;
- m_part_off = part_off;
- m_part_end = part_end;
- }
+ public:
+ RGWLCStreamRead(CephContext *_cct, const DoutPrefixProvider *_dpp,
+ RGWObjectCtx& obj_ctx, rgw::sal::Object *_obj,
+ const real_time &_mtime) :
+ cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime),
+ read_op(obj->get_read_op(&obj_ctx)) {}
+
+ ~RGWLCStreamRead() {};
+ int set_range(off_t _ofs, off_t _end);
+ int get_range(off_t &_ofs, off_t &_end);
+ rgw_rest_obj& get_rest_obj();
+ void set_multipart(uint64_t part_size, off_t part_off, off_t part_end);
+ int init();
+ int init_rest_obj();
+ int read(off_t ofs, off_t end, RGWGetDataCB *out_cb);
+};
- int init() override {
- optional_yield y = null_yield;
- real_time read_mtime;
+/* Send PUT op to remote endpoint */
+class RGWLCCloudStreamPut
+{
+ const DoutPrefixProvider *dpp;
+ rgw_lc_obj_properties obj_properties;
+ RGWRESTConn& conn;
+ rgw::sal::Object *dest_obj;
+ std::string etag;
+ RGWRESTStreamS3PutObj *out_req{nullptr};
- read_op->params.lastmod = &read_mtime;
+ struct multipart_info {
+ bool is_multipart{false};
+ std::string upload_id;
+ int part_num{0};
+ uint64_t part_size;
+ } multipart;
- int ret = read_op->prepare(y, dpp);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
- return ret;
- }
+ int retcode;
- if (read_mtime != mtime) {
- /* raced */
- return -ECANCELED;
+ public:
+ RGWLCCloudStreamPut(const DoutPrefixProvider *_dpp,
+ const rgw_lc_obj_properties& _obj_properties,
+ RGWRESTConn& _conn,
+ rgw::sal::Object *_dest_obj) :
+ dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
}
+ int init();
+ static bool keep_attr(const std::string& h);
+ static void init_send_attrs(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj,
+ const rgw_lc_obj_properties& obj_properties,
+ std::map<std::string, std::string>& attrs);
+ void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj);
+ void handle_headers(const std::map<std::string, std::string>& headers);
+ bool get_etag(std::string *petag);
+ void set_multipart(const std::string& upload_id, int part_num, uint64_t part_size);
+ int send();
+ RGWGetDataCB *get_cb();
+ int complete_request();
+};
- attrs = (*obj)->get_attrs();
- obj_size = (*obj)->get_obj_size();
+int RGWLCStreamRead::set_range(off_t _ofs, off_t _end) {
+ ofs = _ofs;
+ end = _end;
- ret = init_rest_obj();
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
- return ret;
- }
+ return 0;
+}
- if (!multipart) {
- set_range(0, obj_size - 1);
- } else {
- set_range(m_part_off, m_part_end);
- }
- return 0;
- }
+int RGWLCStreamRead::get_range(off_t &_ofs, off_t &_end) {
+ _ofs = ofs;
+ _end = end;
- int init_rest_obj() override {
- /* Initialize rgw_rest_obj.
- * Reference: do_decode_rest_obj
- * Check how to copy headers content */
- rest_obj.init((*obj)->get_key());
+ return 0;
+}
- if (!multipart) {
- rest_obj.content_len = obj_size;
- } else {
- rest_obj.content_len = m_part_size;
- }
+rgw_rest_obj& RGWLCStreamRead::get_rest_obj() {
+ return rest_obj;
+}
- /* For mulitpart attrs are sent as part of InitMultipartCR itself */
- if (multipart) {
- return 0;
- }
+void RGWLCStreamRead::set_multipart(uint64_t part_size, off_t part_off, off_t part_end) {
+ multipart = true;
+ m_part_size = part_size;
+ m_part_off = part_off;
+ m_part_end = part_end;
+}
- /*
- * XXX: verify if its right way to copy attrs into
- * rest obj
- */
- init_headers(attrs, rest_obj.attrs);
-
- rest_obj.acls.set_ctx(cct);
- const auto aiter = attrs.find(RGW_ATTR_ACL);
- if (aiter != attrs.end()) {
- bufferlist& bl = aiter->second;
- auto bliter = bl.cbegin();
- try {
- rest_obj.acls.decode(bliter);
- } catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
- return -EIO;
- }
- } else {
- ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
- }
- return 0;
- }
+int RGWLCStreamRead::init() {
+ optional_yield y = null_yield;
+ real_time read_mtime;
- int read(off_t ofs, off_t end, bufferlist &bl) {
- optional_yield y = null_yield;
+ read_op->params.lastmod = &read_mtime;
- return read_op->read(ofs, end, bl, y, dpp);
+ int ret = read_op->prepare(y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
+ return ret;
}
-};
-
-class RGWLCStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
-{
- CephContext *cct;
- RGWHTTPManager *http_manager;
- rgw_lc_obj_properties obj_properties;
- std::shared_ptr<RGWRESTConn> conn;
- rgw::sal::Object* dest_obj;
- string etag;
+ if (read_mtime != mtime) {
+ /* raced */
+ return -ECANCELED;
+ }
- public:
- RGWLCStreamPutCRF(CephContext *_cct,
- RGWCoroutinesEnv *_env,
- RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager,
- const rgw_lc_obj_properties& _obj_properties,
- std::shared_ptr<RGWRESTConn> _conn,
- rgw::sal::Object* _dest_obj) :
- RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager),
- cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
- }
+ attrs = obj->get_attrs();
+ obj_size = obj->get_obj_size();
+ ret = init_rest_obj();
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
+ return ret;
+ }
- int init() override {
- /* init output connection */
- RGWRESTStreamS3PutObj *out_req{nullptr};
+ if (!multipart) {
+ set_range(0, obj_size - 1);
+ } else {
+ set_range(m_part_off, m_part_end);
+ }
+ return 0;
+}
- if (multipart.is_multipart) {
- char buf[32];
- snprintf(buf, sizeof(buf), "%d", multipart.part_num);
- rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
- { "partNumber", buf },
- { nullptr, nullptr } };
- conn->put_obj_send_init(dest_obj, params, &out_req);
- } else {
- conn->put_obj_send_init(dest_obj, nullptr, &out_req);
- }
+int RGWLCStreamRead::init_rest_obj() {
+ /* Initialize rgw_rest_obj.
+ * Reference: do_decode_rest_obj
+ * Check how to copy headers content */
+ rest_obj.init(obj->get_key());
- set_req(out_req);
+ if (!multipart) {
+ rest_obj.content_len = obj_size;
+ } else {
+ rest_obj.content_len = m_part_size;
+ }
- return RGWStreamWriteHTTPResourceCRF::init();
+ /* For mulitpart attrs are sent as part of InitMultipartCR itself */
+ if (multipart) {
+ return 0;
}
- static bool keep_attr(const string& h) {
- return (keep_headers.find(h) != keep_headers.end() ||
- boost::algorithm::starts_with(h, "X_AMZ_"));
+ /*
+ * XXX: verify if its right way to copy attrs into rest obj
+ */
+ init_headers(attrs, rest_obj.attrs);
+
+ rest_obj.acls.set_ctx(cct);
+ const auto aiter = attrs.find(RGW_ATTR_ACL);
+ if (aiter != attrs.end()) {
+ bufferlist& bl = aiter->second;
+ auto bliter = bl.cbegin();
+ try {
+ rest_obj.acls.decode(bliter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+ return -EIO;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
}
+ return 0;
+}
- static void init_send_attrs(CephContext *cct, const rgw_rest_obj& rest_obj,
- const rgw_lc_obj_properties& obj_properties,
- map<string, string> *attrs) {
+int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) {
+ int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield);
+ return ret;
+}
- map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
- string target_storage_class = obj_properties.target_storage_class;
+int RGWLCCloudStreamPut::init() {
+ /* init output connection */
+ if (multipart.is_multipart) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", multipart.part_num);
+ rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
+ { "partNumber", buf },
+ { nullptr, nullptr } };
+ conn.put_obj_send_init(dest_obj, params, &out_req);
+ } else {
+ conn.put_obj_send_init(dest_obj, nullptr, &out_req);
+ }
- attrs->clear();
+ return 0;
+}
- for (auto& hi : rest_obj.attrs) {
- if (keep_attr(hi.first)) {
- attrs->insert(hi);
- }
- }
+bool RGWLCCloudStreamPut::keep_attr(const string& h) {
+ return (keep_headers.find(h) != keep_headers.end() ||
+ boost::algorithm::starts_with(h, "X_AMZ_"));
+}
- const auto acl = rest_obj.acls.get_acl();
+void RGWLCCloudStreamPut::init_send_attrs(const DoutPrefixProvider *dpp,
+ const rgw_rest_obj& rest_obj,
+ const rgw_lc_obj_properties& obj_properties,
+ std::map<string, string>& attrs) {
- map<int, vector<string> > access_map;
+ map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
+ const std::string& target_storage_class = obj_properties.target_storage_class;
- if (!acl_mappings.empty()) {
- for (auto& grant : acl.get_grant_map()) {
- auto& orig_grantee = grant.first;
- auto& perm = grant.second;
+ attrs.clear();
- string grantee;
+ for (auto& hi : rest_obj.attrs) {
+ if (keep_attr(hi.first)) {
+ attrs.insert(hi);
+ }
+ }
- const auto& am = acl_mappings;
+ const auto acl = rest_obj.acls.get_acl();
- const auto iter = am.find(orig_grantee);
- if (iter == am.end()) {
- ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
- continue;
- }
+ map<int, vector<string> > access_map;
- grantee = iter->second.dest_id;
-
- string type;
-
- switch (iter->second.type) {
- case ACL_TYPE_CANON_USER:
- type = "id";
- break;
- case ACL_TYPE_EMAIL_USER:
- type = "emailAddress";
- break;
- case ACL_TYPE_GROUP:
- type = "uri";
- break;
- default:
- continue;
- }
+ if (!acl_mappings.empty()) {
+ for (auto& grant : acl.get_grant_map()) {
+ auto& orig_grantee = grant.first;
+ auto& perm = grant.second;
- string tv = type + "=" + grantee;
+ string grantee;
- int flags = perm.get_permission().get_permissions();
- if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
- access_map[flags].push_back(tv);
- continue;
- }
+ const auto& am = acl_mappings;
- for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
- if (flags & i) {
- access_map[i].push_back(tv);
- }
- }
+ const auto iter = am.find(orig_grantee);
+ if (iter == am.end()) {
+ ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+ continue;
}
- }
- for (const auto& aiter : access_map) {
- int grant_type = aiter.first;
+ grantee = iter->second.dest_id;
- string header_str("x-amz-grant-");
+ string type;
- switch (grant_type) {
- case RGW_PERM_READ:
- header_str.append("read");
- break;
- case RGW_PERM_WRITE:
- header_str.append("write");
- break;
- case RGW_PERM_READ_ACP:
- header_str.append("read-acp");
+ switch (iter->second.type) {
+ case ACL_TYPE_CANON_USER:
+ type = "id";
break;
- case RGW_PERM_WRITE_ACP:
- header_str.append("write-acp");
+ case ACL_TYPE_EMAIL_USER:
+ type = "emailAddress";
break;
- case RGW_PERM_FULL_CONTROL:
- header_str.append("full-control");
+ case ACL_TYPE_GROUP:
+ type = "uri";
break;
+ default:
+ continue;
}
- string s;
+ string tv = type + "=" + grantee;
- for (const auto& viter : aiter.second) {
- if (!s.empty()) {
- s.append(", ");
- }
- s.append(viter);
+ int flags = perm.get_permission().get_permissions();
+ if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
+ access_map[flags].push_back(tv);
+ continue;
}
- ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
-
- (*attrs)[header_str] = s;
+ for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
+ if (flags & i) {
+ access_map[i].push_back(tv);
+ }
+ }
}
+ }
- /* Copy target storage class */
- if (!target_storage_class.empty()) {
- (*attrs)["x-amz-storage-class"] = target_storage_class;
- } else {
- (*attrs)["x-amz-storage-class"] = "STANDARD";
+ for (const auto& aiter : access_map) {
+ int grant_type = aiter.first;
+
+ string header_str("x-amz-grant-");
+
+ switch (grant_type) {
+ case RGW_PERM_READ:
+ header_str.append("read");
+ break;
+ case RGW_PERM_WRITE:
+ header_str.append("write");
+ break;
+ case RGW_PERM_READ_ACP:
+ header_str.append("read-acp");
+ break;
+ case RGW_PERM_WRITE_ACP:
+ header_str.append("write-acp");
+ break;
+ case RGW_PERM_FULL_CONTROL:
+ header_str.append("full-control");
+ break;
}
- /* New attribute to specify its transitioned from RGW */
- (*attrs)["x-amz-meta-rgwx-source"] = "rgw";
+ string s;
- char buf[32];
- snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
- (*attrs)["x-amz-meta-rgwx-versioned-epoch"] = buf;
-
- utime_t ut(obj_properties.mtime);
- snprintf(buf, sizeof(buf), "%lld.%09lld",
- (long long)ut.sec(),
- (long long)ut.nsec());
-
- (*attrs)["x-amz-meta-rgwx-source-mtime"] = buf;
- (*attrs)["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
- (*attrs)["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
- if (!rest_obj.key.instance.empty()) {
- (*attrs)["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
- }
- for (const auto& a : (*attrs)) {
- ldout(cct, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <<dendl;
+ for (const auto& viter : aiter.second) {
+ if (!s.empty()) {
+ s.append(", ");
+ }
+ s.append(viter);
}
+
+ ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+
+ attrs[header_str] = s;
}
- void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
- RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
+ /* Copy target storage class */
+ if (!target_storage_class.empty()) {
+ attrs["x-amz-storage-class"] = target_storage_class;
+ } else {
+ attrs["x-amz-storage-class"] = "STANDARD";
+ }
- map<string, string> new_attrs;
- if (!multipart.is_multipart) {
- init_send_attrs(cct, rest_obj, obj_properties, &new_attrs);
- }
+ /* New attribute to specify its transitioned from RGW */
+ attrs["x-amz-meta-rgwx-source"] = "rgw";
- r->set_send_length(rest_obj.content_len);
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
+ attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
- RGWAccessControlPolicy policy;
+ utime_t ut(obj_properties.mtime);
+ snprintf(buf, sizeof(buf), "%lld.%09lld",
+ (long long)ut.sec(),
+ (long long)ut.nsec());
- r->send_ready(dpp, conn->get_key(), new_attrs, policy);
+ attrs["x-amz-meta-rgwx-source-mtime"] = buf;
+ attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
+ attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
+ if (!rest_obj.key.instance.empty()) {
+ attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
+ }
+ for (const auto& a : attrs) {
+ ldpp_dout(dpp, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <<dendl;
}
+}
- void handle_headers(const map<string, string>& headers) {
- for (const auto& h : headers) {
- if (h.first == "ETAG") {
- etag = h.second;
- }
- }
+void RGWLCCloudStreamPut::send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) {
+ auto r = static_cast<RGWRESTStreamS3PutObj *>(out_req);
+
+ std::map<std::string, std::string> new_attrs;
+ if (!multipart.is_multipart) {
+ init_send_attrs(dpp, rest_obj, obj_properties, new_attrs);
}
- bool get_etag(string *petag) {
- if (etag.empty()) {
- return false;
+ r->set_send_length(rest_obj.content_len);
+
+ RGWAccessControlPolicy policy;
+
+ r->send_ready(dpp, conn.get_key(), new_attrs, policy);
+}
+
+void RGWLCCloudStreamPut::handle_headers(const map<string, string>& headers) {
+ for (const auto& h : headers) {
+ if (h.first == "ETAG") {
+ etag = h.second;
}
- *petag = etag;
- return true;
}
-};
+}
+bool RGWLCCloudStreamPut::get_etag(string *petag) {
+ if (etag.empty()) {
+ return false;
+ }
+ *petag = etag;
+ return true;
+}
+
+void RGWLCCloudStreamPut::set_multipart(const string& upload_id, int part_num, uint64_t part_size) {
+ multipart.is_multipart = true;
+ multipart.upload_id = upload_id;
+ multipart.part_num = part_num;
+ multipart.part_size = part_size;
+}
+int RGWLCCloudStreamPut::send() {
+ int ret = RGWHTTP::send(out_req);
+ return ret;
+}
-class RGWLCStreamObjToCloudPlainCR : public RGWCoroutine {
- RGWLCCloudTierCtx& tier_ctx;
+RGWGetDataCB *RGWLCCloudStreamPut::get_cb() {
+ return out_req->get_out_cb();
+}
- std::shared_ptr<RGWStreamReadCRF> in_crf;
- std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+int RGWLCCloudStreamPut::complete_request() {
+ int ret = conn.complete_request(out_req, etag, &obj_properties.mtime, null_yield);
+ return ret;
+}
- std::unique_ptr<rgw::sal::Bucket> dest_bucket;
- std::unique_ptr<rgw::sal::Object> dest_obj;
+/* Read local copy and write to Cloud endpoint */
+static int cloud_tier_transfer_object(const DoutPrefixProvider* dpp,
+ RGWLCStreamRead* readf, RGWLCCloudStreamPut* writef) {
+ std::string url;
+ bufferlist bl;
+ bool sent_attrs{false};
+ int ret{0};
+ off_t ofs;
+ off_t end;
- rgw_lc_obj_properties obj_properties;
- RGWBucketInfo b;
- string target_obj_name;
+ ret = readf->init();
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: fail to initialize in_crf, ret = " << ret << dendl;
+ return ret;
+ }
+ readf->get_range(ofs, end);
+ rgw_rest_obj& rest_obj = readf->get_rest_obj();
+ if (!sent_attrs) {
+ ret = writef->init();
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: fail to initialize out_crf, ret = " << ret << dendl;
+ return ret;
+ }
- rgw::sal::Object *o;
+ writef->send_ready(dpp, rest_obj);
+ ret = writef->send();
+ if (ret < 0) {
+ return ret;
+ }
+ sent_attrs = true;
+ }
- public:
- RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx)
- : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
- obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
- tier_ctx.target_storage_class){}
-
- int operate(const DoutPrefixProvider *dpp) {
-
- reenter(this) {
- b.bucket.name = tier_ctx.target_bucket_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
+ ret = readf->read(ofs, end, writef->get_cb());
- retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl;
- return retcode;
- }
-
- dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
- if (!dest_obj) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
- return -1;
- }
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: fail to read from in_crf, ret = " << ret << dendl;
+ return ret;
+ }
- o = dest_obj.get();
+ ret = writef->complete_request();
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: fail to complete request, ret = " << ret << dendl;
+ return ret;
+ }
- // tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
+ return 0;
+}
- /* Prepare Read from source */
- in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, dpp,
- tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+static int cloud_tier_plain_transfer(RGWLCCloudTierCtx& tier_ctx) {
+ int ret;
+ std::unique_ptr<rgw::sal::Bucket> dest_bucket;
+ std::unique_ptr<rgw::sal::Object> dest_obj;
- out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
- (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn, o));
+ rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class);
+ RGWBucketInfo b;
+ std::string target_obj_name;
- /* actual Read & Write */
- yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
+ b.bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ tier_ctx.obj->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+ }
- return set_cr_done();
- }
+ ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl;
+ return ret;
+ }
- return 0;
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+ if (!dest_obj) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+ return -1;
}
-};
-class RGWLCStreamObjToCloudMultipartPartCR : public RGWCoroutine {
- RGWLCCloudTierCtx& tier_ctx;
+ tier_ctx.obj->set_atomic(&tier_ctx.rctx);
+
+ /* Prepare Read from source */
+ /* TODO: Define readf, writef as stack variables. For some reason,
+ * when used as stack variables (esp., readf), the transition seems to
+ * be taking lot of time eventually erroring out at times.
+ */
+ std::shared_ptr<RGWLCStreamRead> readf;
+ readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
+ tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
- string upload_id;
+ std::shared_ptr<RGWLCCloudStreamPut> writef;
+ writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
+ dest_obj.get()));
- rgw_lc_multipart_part_info part_info;
+ /* actual Read & Write */
+ ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
- string *petag;
- std::shared_ptr<RGWStreamReadCRF> in_crf;
- std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+ return ret;
+}
+static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx,
+ const std::string& upload_id,
+ const rgw_lc_multipart_part_info& part_info,
+ std::string *petag) {
+ int ret;
std::unique_ptr<rgw::sal::Bucket> dest_bucket;
std::unique_ptr<rgw::sal::Object> dest_obj;
- rgw_lc_obj_properties obj_properties;
+ rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class);
RGWBucketInfo b;
- string target_obj_name;
+ std::string target_obj_name;
off_t end;
- public:
- RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, const string& _upload_id,
- const rgw_lc_multipart_part_info& _part_info,
- string *_petag) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
- upload_id(_upload_id), part_info(_part_info), petag(_petag),
- obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
- tier_ctx.target_storage_class){}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- b.bucket.name = tier_ctx.target_bucket_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
-
- retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl;
- return retcode;
- }
-
- dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
- if (!dest_obj) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
- return -1;
- }
+ b.bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ tier_ctx.obj->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+ }
- // tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
+ ret = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , ret = " << ret << dendl;
+ return ret;
+ }
- /* Prepare Read from source */
- in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp,
- tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+ if (!dest_obj) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+ return -1;
+ }
- end = part_info.ofs + part_info.size - 1;
- std::static_pointer_cast<RGWLCStreamReadCRF>(in_crf)->set_multipart(part_info.size, part_info.ofs, end);
+ tier_ctx.obj->set_atomic(&tier_ctx.rctx);
- /* Prepare write */
- out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
- (RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn,
- dest_obj.get()));
+ /* TODO: Define readf, writef as stack variables. For some reason,
+ * when used as stack variables (esp., readf), the transition seems to
+ * be taking lot of time eventually erroring out at times. */
+ std::shared_ptr<RGWLCStreamRead> readf;
+ readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
+ tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
- out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
+ std::shared_ptr<RGWLCCloudStreamPut> writef;
+ writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
+ dest_obj.get()));
- /* actual Read & Write */
- yield call(new RGWStreamWriteCR(cct, (RGWHTTPManager*)(tier_ctx.http_manager), in_crf, out_crf));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
+ /* Prepare Read from source */
+ end = part_info.ofs + part_info.size - 1;
+ readf->set_multipart(part_info.size, part_info.ofs, end);
- if (!(static_cast<RGWLCStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
- return set_cr_error(-EIO);
- }
+ /* Prepare write */
+ writef->set_multipart(upload_id, part_info.part_num, part_info.size);
- return set_cr_done();
- }
+ /* actual Read & Write */
+ ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
+ if (ret < 0) {
+ return ret;
+ }
- return 0;
+ if (!(writef->get_etag(petag))) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to get etag from PUT request" << dendl;
+ return -EIO;
}
-};
-class RGWLCAbortMultipartCR : public RGWCoroutine {
- CephContext *cct;
- RGWHTTPManager *http_manager;
- RGWRESTConn *dest_conn;
- rgw_obj dest_obj;
+ return 0;
+}
- string upload_id;
+static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
+ RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
+ const std::string& upload_id) {
+ int ret;
+ bufferlist out_bl;
+ bufferlist bl;
+ rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
- public:
- RGWLCAbortMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager,
- RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj,
- const string& _upload_id) : RGWCoroutine(_cct),
- cct(_cct), http_manager(_http_manager),
- dest_conn(_dest_conn), dest_obj(_dest_obj),
- upload_id(_upload_id) {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
-
- yield {
- rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
- bufferlist bl;
- call(new RGWDeleteRESTResourceCR(cct, dest_conn, http_manager,
- obj_to_aws_path(dest_obj), params));
- }
+ string resource = obj_to_aws_path(dest_obj);
+ ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr,
+ out_bl, &bl, nullptr, null_yield);
- if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
- return set_cr_error(retcode);
- }
- return set_cr_done();
- }
-
- return 0;
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (ret=" << ret << ")" << dendl;
+ return ret;
}
-};
-
-class RGWLCInitMultipartCR : public RGWCoroutine {
- CephContext *cct;
- RGWHTTPManager *http_manager;
- RGWRESTConn *dest_conn;
- rgw_obj dest_obj;
- uint64_t obj_size;
- map<string, string> attrs;
+ return 0;
+}
+static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
+ RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
+ uint64_t obj_size, std::map<std::string, std::string>& attrs,
+ std::string& upload_id) {
bufferlist out_bl;
-
- string *upload_id;
+ bufferlist bl;
struct InitMultipartResult {
- string bucket;
- string key;
- string upload_id;
+ std::string bucket;
+ std::string key;
+ std::string upload_id;
void decode_xml(XMLObj *obj) {
RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
}
} result;
- public:
- RGWLCInitMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager,
- RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj,
- uint64_t _obj_size, const map<string, string>& _attrs,
- string *_upload_id) : RGWCoroutine(_cct), cct(_cct),
- http_manager(_http_manager), dest_conn(_dest_conn),
- dest_obj(_dest_obj), obj_size(_obj_size),
- attrs(_attrs), upload_id(_upload_id) {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
-
- yield {
- rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
- bufferlist bl;
- call(new RGWPostRawRESTResourceCR <bufferlist> (cct, dest_conn, http_manager,
- obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
- }
+ int ret;
+ rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
- if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
- return set_cr_error(retcode);
- }
- {
- /*
- * If one of the following fails we cannot abort upload, as we cannot
- * extract the upload id. If one of these fail it's very likely that that's
- * the least of our problem.
- */
- RGWXMLDecoder::XMLParser parser;
- if (!parser.init()) {
- ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
- return set_cr_error(-EIO);
- }
+ string resource = obj_to_aws_path(dest_obj);
- if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
- return set_cr_error(-EIO);
- }
+ ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs,
+ out_bl, &bl, nullptr, null_yield);
- try {
- RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
- } catch (RGWXMLDecoder::err& err) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl;
- return set_cr_error(-EIO);
- }
- }
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+ return ret;
+ }
+ /*
+ * If one of the following fails we cannot abort upload, as we cannot
+ * extract the upload id. If one of these fail it's very likely that that's
+ * the least of our problem.
+ */
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return -EIO;
+ }
- ldout(cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldpp_dout(dpp, 5) << "ERROR: failed to parse xml initmultipart: " << str << dendl;
+ return -EIO;
+ }
- *upload_id = result.upload_id;
+ try {
+ RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return -EIO;
+ }
- return set_cr_done();
- }
+ ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
- return 0;
- }
-};
+ upload_id = result.upload_id;
-class RGWLCCompleteMultipartCR : public RGWCoroutine {
- CephContext *cct;
- RGWHTTPManager *http_manager;
- RGWRESTConn *dest_conn;
- rgw_obj dest_obj;
+ return 0;
+}
- bufferlist out_bl;
+static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp,
+ RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
+ std::string& upload_id,
+ const std::map<int, rgw_lc_multipart_part_info>& parts) {
+ rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+
+ stringstream ss;
+ XMLFormatter formatter;
+ int ret;
- string upload_id;
+ bufferlist bl, out_bl;
+ string resource = obj_to_aws_path(dest_obj);
struct CompleteMultipartReq {
- map<int, rgw_lc_multipart_part_info> parts;
+ std::map<int, rgw_lc_multipart_part_info> parts;
- explicit CompleteMultipartReq(const map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
+ explicit CompleteMultipartReq(const std::map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
void dump_xml(Formatter *f) const {
for (const auto& p : parts) {
f->close_section();
};
}
- } req_enc;
+ } req_enc(parts);
struct CompleteMultipartResult {
- string location;
- string bucket;
- string key;
- string etag;
+ std::string location;
+ std::string bucket;
+ std::string key;
+ std::string etag;
void decode_xml(XMLObj *obj) {
RGWXMLDecoder::decode_xml("Location", bucket, obj);
}
} result;
- public:
- RGWLCCompleteMultipartCR(CephContext *_cct, RGWHTTPManager *_http_manager,
- RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj,
- string _upload_id, const map<int, rgw_lc_multipart_part_info>& _parts) :
- RGWCoroutine(_cct), cct(_cct), http_manager(_http_manager),
- dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id),
- req_enc(_parts) {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
-
- yield {
- rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
- stringstream ss;
- XMLFormatter formatter;
+ encode_xml("CompleteMultipartUpload", req_enc, &formatter);
- encode_xml("CompleteMultipartUpload", req_enc, &formatter);
+ formatter.flush(ss);
+ bl.append(ss.str());
- formatter.flush(ss);
+ ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
+ out_bl, &bl, nullptr, null_yield);
- bufferlist bl;
- bl.append(ss.str());
- call(new RGWPostRawRESTResourceCR <bufferlist> (cct, dest_conn, http_manager,
- obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
- }
-
- if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
- return set_cr_error(retcode);
- }
- {
- /*
- * If one of the following fails we cannot abort upload, as we cannot
- * extract the upload id. If one of these fail it's very likely that that's
- * the least of our problem.
- */
- RGWXMLDecoder::XMLParser parser;
- if (!parser.init()) {
- ldout(cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
- return set_cr_error(-EIO);
- }
-
- if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
- return set_cr_error(-EIO);
- }
-
- try {
- RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
- } catch (RGWXMLDecoder::err& err) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(cct, 5) << "ERROR: unexpected xml: " << str << dendl;
- return set_cr_error(-EIO);
- }
- }
-
- ldout(cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
-
- return set_cr_done();
- }
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload for dest object=" << dest_obj << dendl;
+ return ret;
+ }
+ /*
+ * If one of the following fails we cannot abort upload, as we cannot
+ * extract the upload id. If one of these fail it's very likely that that's
+ * the least of our problem.
+ */
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return -EIO;
+ }
- return 0;
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldpp_dout(dpp, 5) << "ERROR: failed to parse xml Completemultipart: " << str << dendl;
+ return -EIO;
}
-};
+ try {
+ RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldpp_dout(dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return -EIO;
+ }
-class RGWLCStreamAbortMultipartUploadCR : public RGWCoroutine {
- RGWLCCloudTierCtx& tier_ctx;
- const rgw_obj dest_obj;
- const rgw_raw_obj status_obj;
+ ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
- string upload_id;
+ return ret;
+}
- public:
+static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx& tier_ctx,
+ const rgw_obj& dest_obj, const rgw_raw_obj& status_obj,
+ const std::string& upload_id) {
+ int ret;
- RGWLCStreamAbortMultipartUploadCR(RGWLCCloudTierCtx& _tier_ctx,
- const rgw_obj& _dest_obj, const rgw_raw_obj& _status_obj,
- const string& _upload_id) : RGWCoroutine(_tier_ctx.cct),
- tier_ctx(_tier_ctx), dest_obj(_dest_obj), status_obj(_status_obj),
- upload_id(_upload_id) {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
- /* ignore error, best effort */
- }
- yield call(new RGWRadosRemoveCR(dynamic_cast<rgw::sal::RadosStore*>(tier_ctx.store), status_obj));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
- /* ignore error, best effort */
- }
- return set_cr_done();
- }
+ ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id);
- return 0;
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl;
+ /* ignore error, best effort */
}
-};
+ /* remove status obj */
+ ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl;
+ // ignore error, best effort
+ }
+ return 0;
+}
-class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
- RGWLCCloudTierCtx& tier_ctx;
- RGWRESTConn *source_conn;
+static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
rgw_obj src_obj;
rgw_obj dest_obj;
uint64_t obj_size;
- string src_etag;
+ std::string src_etag;
rgw_rest_obj rest_obj;
rgw_lc_multipart_upload_info status;
- std::shared_ptr<RGWStreamReadCRF> in_crf;
- map<string, string> new_attrs;
+ std::map<std::string, std::string> new_attrs;
rgw_raw_obj status_obj;
- rgw_lc_obj_properties obj_properties;
RGWBucketInfo b;
- string target_obj_name;
+ std::string target_obj_name;
rgw_bucket target_bucket;
- rgw::sal::RadosStore *rados;
-
- public:
- RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx)
- : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
- obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
- tier_ctx.target_storage_class){}
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
+ int ret;
- obj_size = tier_ctx.o.meta.size;
+ rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class);
- target_bucket.name = tier_ctx.target_bucket_name;
+ uint32_t part_size{0};
+ uint32_t num_parts{0};
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
- dest_obj.init(target_bucket, target_obj_name);
+ int cur_part{0};
+ uint64_t cur_ofs{0};
+ std::map<int, rgw_lc_multipart_part_info> parts;
- status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
- "lc_multipart_" + (*tier_ctx.obj)->get_oid());
+ obj_size = tier_ctx.o.meta.size;
- rados = dynamic_cast<rgw::sal::RadosStore*>(tier_ctx.store);
+ target_bucket.name = tier_ctx.target_bucket_name;
- if (!rados) {
- ldout(tier_ctx.cct, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
- return -1;
- }
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ tier_ctx.obj->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance(tier_ctx.obj->get_key());
+ }
+ dest_obj.init(target_bucket, target_obj_name);
- yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj,
- status_obj, &status, false));
+ status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
+ "lc_multipart_" + tier_ctx.obj->get_oid());
- if (retcode < 0 && retcode != -ENOENT) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
- return retcode;
- }
+ ret = read_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status);
- if (retcode >= 0) {
- /* check here that mtime and size did not change */
- if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
- status.etag != obj_properties.etag) {
- yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
- retcode = -ENOENT;
- }
- }
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl;
+ return ret;
+ }
- if (retcode == -ENOENT) {
- in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+ if (ret >= 0) {
+ // check here that mtime and size did not change
+ if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
+ status.etag != obj_properties.etag) {
+ cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
+ ret = -ENOENT;
+ }
+ }
- in_crf->init();
+ if (ret == -ENOENT) {
+ RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime);
- rest_obj = in_crf->get_rest_obj();
+ readf.init();
- RGWLCStreamPutCRF::init_send_attrs(tier_ctx.cct, rest_obj, obj_properties, &new_attrs);
+ rest_obj = readf.get_rest_obj();
- yield call(new RGWLCInitMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, obj_size, std::move(new_attrs), &status.upload_id));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
+ RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs);
- status.obj_size = obj_size;
- status.mtime = obj_properties.mtime;
- status.etag = obj_properties.etag;
-#define MULTIPART_MAX_PARTS 10000
- uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
- uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
+ ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id);
+ if (ret < 0) {
+ return ret;
+ }
- if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
- min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
- }
+ status.obj_size = obj_size;
+ status.mtime = obj_properties.mtime;
+ status.etag = obj_properties.etag;
- status.part_size = std::max(min_conf_size, min_part_size);
- status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
- status.cur_part = 1;
- status.cur_ofs = 0;
- }
+ ret = put_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj, &status);
- for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
- ldout(tier_ctx.cct, 20) << "status.cur_part = "<<status.cur_part <<", info.ofs = "<< status.cur_ofs <<", info.size = "<< status.part_size<< ", obj size = " << status.obj_size<< ", status.num_parts:" << status.num_parts << dendl;
- yield {
- rgw_lc_multipart_part_info& cur_part_info = status.parts[status.cur_part];
- cur_part_info.part_num = status.cur_part;
- cur_part_info.ofs = status.cur_ofs;
- cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to store multipart upload state, ret=" << ret << dendl;
+ // continue with upload anyway
+ }
- status.cur_ofs += cur_part_info.size;
+#define MULTIPART_MAX_PARTS 10000
+#define MULTIPART_MAX_PARTS 10000
+ uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
+ uint64_t min_conf_size = tier_ctx.multipart_min_part_size;
- call(new RGWLCStreamObjToCloudMultipartPartCR(tier_ctx,
- status.upload_id,
- cur_part_info,
- &cur_part_info.etag));
- }
+ if (min_conf_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+ min_conf_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+ }
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
- yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
- return set_cr_error(retcode);
- }
+ part_size = std::max(min_conf_size, min_part_size);
+ num_parts = (obj_size + part_size - 1) / part_size;
+ cur_part = 1;
+ cur_ofs = 0;
+ }
- yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, status_obj, status));
+ for (; (uint32_t)cur_part <= num_parts; ++cur_part) {
+ ldpp_dout(tier_ctx.dpp, 20) << "cur_part = "<< cur_part << ", info.ofs = " << cur_ofs << ", info.size = " << part_size << ", obj size = " << obj_size<< ", num_parts:" << num_parts << dendl;
+ rgw_lc_multipart_part_info& cur_part_info = parts[cur_part];
+ cur_part_info.part_num = cur_part;
+ cur_part_info.ofs = cur_ofs;
+ cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
- /* continue with upload anyway */
- }
- ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << status.parts[status.cur_part].etag << dendl;
- }
+ cur_ofs += cur_part_info.size;
- yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
- yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
- return set_cr_error(retcode);
- }
+ ret = cloud_tier_send_multipart_part(tier_ctx,
+ status.upload_id,
+ cur_part_info,
+ &cur_part_info.etag);
- /* remove status obj */
- yield call(new RGWRadosRemoveCR(rados, status_obj));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
- /* ignore error, best effort */
- }
- return set_cr_done();
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to send multipart part of obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-ret) << ")" << dendl;
+ cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
+ return ret;
}
- return 0;
+
}
-};
-int RGWLCCloudCheckCR::operate(const DoutPrefixProvider *dpp) {
- /* Check if object has already been transitioned */
- reenter(this) {
- b.bucket.name = tier_ctx.target_bucket_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
+ ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl;
+ cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
+ return ret;
+ }
- retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << retcode << dendl;
- return ret;
- }
-
- dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
- if (!dest_obj) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
- return -1;
- }
+ /* remove status obj */
+ ret = delete_upload_status(tier_ctx.dpp, tier_ctx.store, &status_obj);
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl;
+ // ignore error, best effort
+ }
+ return 0;
+}
- get_crf.reset(new RGWLCStreamGetCRF(tier_ctx.cct, get_env(), this, tier_ctx.http_manager, obj_properties,
- tier_ctx.conn, dest_obj.get()));
-
- /* Having yield here doesn't seem to wait for init2() to fetch the headers
- * before calling is_already_tiered() below
- */
- yield {
- retcode = get_crf->init(dpp);
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , retcode = " << retcode << dendl;
- return set_cr_error(ret);
- }
- }
- if (retcode < 0) {
- ldout(tier_ctx.cct, 20) << __func__ << ": get_crf()->init retcode=" << retcode << dendl;
- return set_cr_error(retcode);
- }
- if (get_crf.get()->is_already_tiered()) {
- *already_tiered = true;
- ldout(tier_ctx.cct, 20) << "is_already_tiered true" << dendl;
- return set_cr_done();
- }
+/* Check if object has already been transitioned */
+static int cloud_tier_check_object(RGWLCCloudTierCtx& tier_ctx, bool& already_tiered) {
+ int ret;
+ std::map<std::string, std::string> headers;
- ldout(tier_ctx.cct, 20) << "is_already_tiered false..going with out_crf writing" << dendl;
+ /* Fetch Head object */
+ ret = cloud_tier_get_object(tier_ctx, true, headers);
- return set_cr_done();
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , ret = " << ret << dendl;
+ return ret;
+ }
+
+ already_tiered = is_already_tiered(tier_ctx.dpp, headers, tier_ctx.o.meta.mtime);
+
+ if (already_tiered) {
+ ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered true" << dendl;
+ } else {
+ ldpp_dout(tier_ctx.dpp, 20) << "is_already_tiered false..going with out_crf writing" << dendl;
}
- return 0;
-}
-map <pair<string, string>, utime_t> target_buckets;
+ return ret;
+}
-int RGWLCCloudTierCR::operate(const DoutPrefixProvider *dpp) {
+static int cloud_tier_create_bucket(RGWLCCloudTierCtx& tier_ctx) {
+ bufferlist out_bl;
+ int ret = 0;
pair<string, string> key(tier_ctx.storage_class, tier_ctx.target_bucket_name);
- bool bucket_created = false;
+ struct CreateBucketResult {
+ std::string code;
- reenter(this) {
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Code", code, obj);
+ }
+ } result;
- if (target_buckets.find(key) != target_buckets.end()) {
- utime_t t = target_buckets[key];
+ ldpp_dout(tier_ctx.dpp, 30) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl;
+ bufferlist bl;
+ string resource = tier_ctx.target_bucket_name;
- utime_t now = ceph_clock_now();
+ ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr,
+ out_bl, &bl, nullptr, null_yield);
- if (now - t < (2 * cct->_conf->rgw_lc_debug_interval)) { /* not expired */
- bucket_created = true;
- }
+ if (ret < 0 ) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", ret:" << ret << dendl;
+ return ret;
+ }
+ if (out_bl.length() > 0) {
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
+ return -EIO;
}
- if (!bucket_created){
- yield {
- ldout(tier_ctx.cct,10) << "Cloud_tier_ctx: creating bucket:" << tier_ctx.target_bucket_name << dendl;
- bufferlist bl;
- call(new RGWPutRawRESTResourceCR <bufferlist> (tier_ctx.cct, tier_ctx.conn.get(),
- tier_ctx.http_manager,
- tier_ctx.target_bucket_name, nullptr, bl, &out_bl));
- }
- if (retcode < 0 ) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to create target bucket: " << tier_ctx.target_bucket_name << ", retcode:" << retcode << dendl;
- return set_cr_error(retcode);
- }
- if (out_bl.length() > 0) {
- RGWXMLDecoder::XMLParser parser;
- if (!parser.init()) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize xml parser for parsing create_bucket response from server" << dendl;
- return set_cr_error(-EIO);
- }
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldpp_dout(tier_ctx.dpp, 5) << "ERROR: failed to parse xml createbucket: " << str << dendl;
+ return -EIO;
+ }
- if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(tier_ctx.cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
- return set_cr_error(-EIO);
- }
+ try {
+ RGWXMLDecoder::decode_xml("Error", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldpp_dout(tier_ctx.dpp, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return -EIO;
+ }
- try {
- RGWXMLDecoder::decode_xml("Error", result, &parser, true);
- } catch (RGWXMLDecoder::err& err) {
- string str(out_bl.c_str(), out_bl.length());
- ldout(tier_ctx.cct, 5) << "ERROR: unexpected xml: " << str << dendl;
- return set_cr_error(-EIO);
- }
+ if (result.code != "BucketAlreadyOwnedByYou") {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl;
+ return -EIO;
+ }
+ }
- if (result.code != "BucketAlreadyOwnedByYou") {
- ldout(tier_ctx.cct, 0) << "ERROR: Creating target bucket failed with error: " << result.code << dendl;
- return set_cr_error(-EIO);
- }
- }
+ return 0;
+}
+
+int rgw_cloud_tier_transfer_object(RGWLCCloudTierCtx& tier_ctx) {
+ int ret = 0;
+
+ /* If run first time attempt to create the target bucket */
+ if (!tier_ctx.target_bucket_created) {
+ ret = cloud_tier_create_bucket(tier_ctx);
- target_buckets[key] = ceph_clock_now();
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to create target bucket on the cloud endpoint ret=" << ret << dendl;
+ return ret;
}
+ tier_ctx.target_bucket_created = true;
+ }
- yield {
- uint64_t size = tier_ctx.o.meta.size;
- uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
+ /* Since multiple zones may try to transition the same object to the cloud,
+ * verify if the object is already transitioned. And since its just a best
+ * effort, do not bail out in case of any errors.
+ */
+ bool already_tiered = false;
+ ret = cloud_tier_check_object(tier_ctx, already_tiered);
- if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
- multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE;
- }
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to check object on the cloud endpoint ret=" << ret << dendl;
+ }
- if (size < multipart_sync_threshold) {
- call (new RGWLCStreamObjToCloudPlainCR(tier_ctx));
- } else {
- tier_ctx.is_multipart_upload = true;
- call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx));
+ if (already_tiered) {
+ ldpp_dout(tier_ctx.dpp, 20) << "Object (" << tier_ctx.o.key << ") is already tiered" << dendl;
+ return 0;
+ }
- }
- }
+ uint64_t size = tier_ctx.o.meta.size;
+ uint64_t multipart_sync_threshold = tier_ctx.multipart_sync_threshold;
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
+ if (multipart_sync_threshold < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+ multipart_sync_threshold = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+ }
- return set_cr_done();
- } //reenter
+ if (size < multipart_sync_threshold) {
+ ret = cloud_tier_plain_transfer(tier_ctx);
+ } else {
+ tier_ctx.is_multipart_upload = true;
+ ret = cloud_tier_multipart_transfer(tier_ctx);
+ }
- return 0;
-}
+ if (ret < 0) {
+ ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to transition object ret=" << ret << dendl;
+ }
+ return ret;
+}