static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
+ static constexpr uint32_t FLAG_HTTP_MGR = 0x0008;
private:
+ class C_WorkQTimerCtx: public Context {
+ WorkQ* wq;
+ public:
+ C_WorkQTimerCtx(WorkQ* _wq): wq(_wq) {}
+ void finish(int r) override {
+ wq->stop_http_manager();
+ }
+ };
+
const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
RGWLC::LCWorker* wk;
uint32_t qmax;
uint32_t flags;
vector<WorkItem> items;
work_f f;
+ std::unique_ptr<RGWCoroutinesManager> crs;
+ std::unique_ptr<RGWHTTPManager> http_manager;
+ bool is_http_mgr_started{false};
+ ceph::mutex timer_mtx;
+ SafeTimer timer;
+ int timer_wait_sec = 200; //seconds
+ C_WorkQTimerCtx* timer_ctx = nullptr;
public:
WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
- : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
+ : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf),
+ timer_mtx(ceph::make_mutex("WorkQTimerMutex")),
+ timer((CephContext*)(wk->cct), timer_mtx)
{
create(thr_name().c_str());
+ timer.init();
}
+ ~WorkQ() {
+ timer.shutdown();
+ }
+
std::string thr_name() {
return std::string{"wp_thrd: "}
+ std::to_string(wk->ix) + ", " + std::to_string(ix);
f = _f;
}
+ RGWCoroutinesManager* get_crs() { return crs.get(); }
+ RGWHTTPManager* get_http_manager() { return http_manager.get(); }
+
+ int start_http_manager(rgw::sal::Store* store) {
+ int ret = 0;
+
+ if (is_http_mgr_started)
+ return 0;
+
+ /* http_mngr */
+ if(!crs) {
+ crs.reset(new RGWCoroutinesManager(store->ctx(), store->get_cr_registry()));
+ }
+ if (!http_manager) {
+ http_manager.reset(new RGWHTTPManager(store->ctx(), crs.get()->get_completion_mgr()));
+ }
+
+ ret = http_manager->start();
+ if (ret < 0) {
+ dout(5) << "RGWLC:: http_manager->start() failed ret = "
+ << ret << dendl;
+ return ret;
+ }
+
+ is_http_mgr_started = true;
+ flags |= FLAG_HTTP_MGR;
+
+ return ret;
+ }
+
+ int stop_http_manager() {
+ if (!is_http_mgr_started) {
+ return 0;
+ }
+
+ http_manager.reset();
+ crs.reset();
+
+ is_http_mgr_started = false;
+ flags &= ~FLAG_HTTP_MGR;
+ timer.cancel_all_events();
+ timer_ctx = nullptr;
+ return 0;
+ }
+
void enqueue(WorkItem&& item) {
unique_lock uniq(mtx);
while ((!wk->get_lc()->going_down()) &&
flags |= FLAG_EWAIT_SYNC;
cv.wait_for(uniq, 200ms);
}
+ if (timer_ctx && (flags & FLAG_HTTP_MGR)) {
+ timer.cancel_all_events();
+ timer_ctx = nullptr;
+ }
items.push_back(item);
if (flags & FLAG_DWAIT_SYNC) {
flags &= ~FLAG_DWAIT_SYNC;
if (flags & FLAG_EDRAIN_SYNC) {
flags &= ~FLAG_EDRAIN_SYNC;
}
+ if ((flags & FLAG_HTTP_MGR) && !timer_ctx) {
+ timer_ctx = new C_WorkQTimerCtx(this);
+ timer.add_event_after(timer_wait_sec, timer_ctx);
+ }
flags |= FLAG_DWAIT_SYNC;
cv.wait_for(uniq, 200ms);
}
*/
if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
ret = remove_expired_obj(oc.dpp, oc, false);
- ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << "s versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+ ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
} else {
ret = remove_expired_obj(oc.dpp, oc, true);
- ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "s versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+ ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
}
return ret;
}
real_time read_mtime;
- std::unique_ptr<rgw::sal::RGWObject::ReadOp> read_op(oc.obj->get_read_op(&oc.rctx));
+ std::unique_ptr<rgw::sal::Object::ReadOp> read_op(oc.obj->get_read_op(&oc.rctx));
read_op->params.lastmod = &read_mtime;
(*tier_ctx.obj)->set_atomic(&tier_ctx.rctx);
RGWObjState *s = tier_ctx.rctx.get_state((*tier_ctx.obj)->get_obj());
- std::unique_ptr<rgw::sal::RGWObject::WriteOp> obj_op(oc.obj->get_write_op(&oc.rctx));
+ std::unique_ptr<rgw::sal::Object::WriteOp> obj_op(oc.obj->get_write_op(&oc.rctx));
obj_op->params.modify_tail = true;
obj_op->params.flags = PUT_OBJ_CREATE;
tier_config.tier_placement = oc.tier;
tier_config.is_multipart_upload = tier_ctx.is_multipart_upload;
- pmanifest->set_tier_type("cloud");
+ pmanifest->set_tier_type("cloud-s3");
pmanifest->set_tier_config(tier_config);
/* check if its necessary */
* obj_size remains the same even when object is moved to other
* storage class. So maybe better to keep it the same way.
*/
- //pmanifest->set_obj_size(0);
obj_op->params.manifest = pmanifest;
obj_op->params.attrs = &attrs;
r = obj_op->prepare(null_yield);
+ if (r < 0) {
+ return r;
+ }
+
r = obj_op->write_meta(oc.dpp, tier_ctx.o.meta.size, 0, null_yield);
if (r < 0) {
/* init */
string id = "cloudid";
- string endpoint=oc.tier.t.s3.endpoint;
+ string endpoint = oc.tier.t.s3.endpoint;
RGWAccessKey key = oc.tier.t.s3.key;
+ string region = oc.tier.t.s3.region;
HostStyle host_style = oc.tier.t.s3.host_style;
string bucket_name = oc.tier.t.s3.target_path;
const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
boost::algorithm::to_lower(bucket_name);
}
- conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, host_style));
+ conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, region, host_style));
- /* http_mngr */
- RGWCoroutinesManager crs(oc.store->ctx(), oc.store->get_cr_registry());
- RGWHTTPManager http_manager(oc.store->ctx(), crs.get_completion_mgr());
-
- int ret = http_manager.start();
+ int ret = oc.wq->start_http_manager(oc.store);
if (ret < 0) {
- ldpp_dout(oc.dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
+ ldpp_dout(oc.dpp, 0) << "failed in start_http_manager() ret=" << ret << dendl;
return ret;
}
+ RGWCoroutinesManager* crs = oc.wq->get_crs();
+ RGWHTTPManager* http_manager = oc.wq->get_http_manager();
+
+ if (!crs || !http_manager) {
+ /* maybe race..return and retry */
+ ldpp_dout(oc.dpp, 0) << " http_manager and crs not initialized" << dendl;
+ return -1;
+ }
+
RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(),
&oc.obj, oc.rctx, conn, bucket_name,
- oc.tier.t.s3.target_storage_class, &http_manager);
+ oc.tier.t.s3.target_storage_class, http_manager);
tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings;
tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size;
tier_ctx.multipart_sync_threshold = oc.tier.t.s3.multipart_sync_threshold;
* verify if the object is already transitioned. And since its just a best
* effort, do not bail out in case of any errors.
*/
- ret = crs.run(new RGWLCCloudCheckCR(tier_ctx, &al_tiered));
+ ret = crs->run(oc.dpp, new RGWLCCloudCheckCR(tier_ctx, &al_tiered));
if (ret < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudCheckCR() ret=" << ret << dendl;
if (al_tiered) {
ldout(tier_ctx.cct, 20) << "Object (" << oc.o.key << ") is already tiered" << dendl;
- http_manager.stop();
return 0;
} else {
- ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
+ ret = crs->run(oc.dpp, new RGWLCCloudTierCR(tier_ctx));
}
if (ret < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed in RGWCloudTierCR() ret=" << ret << dendl;
+ return ret;
}
if (delete_object) {
return ret;
}
}
- http_manager.stop();
return 0;
}
return 0;
}
+ /* Allow transition for only RadosStore */
+ rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(oc.store);
+
+ if (!rados) {
+ ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is not on RadosStore. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
+ return -1;
+ }
+
r = transition_obj_to_cloud(oc);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")"
sleep(5);
continue;
}
+
if (ret < 0)
return 0;
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
using namespace std;
-static string get_key_instance(const rgw_obj_key& key)
+static inline string get_key_instance(const rgw_obj_key& key)
{
if (!key.instance.empty() &&
!key.have_null_instance()) {
return "";
}
-static string get_key_oid(const rgw_obj_key& key)
+static inline string get_key_oid(const rgw_obj_key& key)
{
string oid = key.name;
if (!key.instance.empty() &&
return oid;
}
-static string obj_to_aws_path(const rgw_obj& obj)
+static inline string obj_to_aws_path(const rgw_obj& obj)
{
string path = obj.bucket.name + "/" + get_key_oid(obj.key);
return path;
static void init_headers(map<string, bufferlist>& attrs,
map<string, string>& headers)
{
- for (auto kv : attrs) {
+ for (auto& kv : attrs) {
const char * name = kv.first.c_str();
const auto aiter = rgw_to_http_attrs.find(name);
}
}
-class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF
-{
- RGWRESTConn::get_obj_params req_params;
-
- CephContext *cct;
- RGWHTTPManager *http_manager;
- rgw_lc_obj_properties obj_properties;
- std::shared_ptr<RGWRESTConn> conn;
- rgw::sal::RGWObject* dest_obj;
- string etag;
- RGWRESTStreamRWRequest *in_req;
- map<string, string> headers;
-
- public:
- RGWLCStreamGetCRF(CephContext *_cct,
- RGWCoroutinesEnv *_env,
- RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager,
- const rgw_lc_obj_properties& _obj_properties,
- std::shared_ptr<RGWRESTConn> _conn,
- rgw::sal::RGWObject* _dest_obj) :
- RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()),
- cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties),
- conn(_conn), dest_obj(_dest_obj) {}
-
- int init() {
+int RGWLCStreamGetCRF::init(const DoutPrefixProvider *dpp) {
/* init input connection */
req_params.get_op = false; /* Need only headers */
req_params.prepend_metadata = true;
string etag;
real_time set_mtime;
- int ret = conn->get_obj(dest_obj, req_params, true /* send */, &in_req);
+ int ret = conn->get_obj(dpp, dest_obj, req_params, true /* send */, &in_req);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
return ret;
}
/* fetch only headers */
- ret = conn->complete_request(in_req, nullptr, nullptr,
- nullptr, nullptr, &headers, null_yield);
+ ret = conn->complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
if (ret < 0 && ret != -ENOENT) {
ldout(cct, 20) << "ERROR: " << __func__ << "(): conn->complete_request() returned ret=" << ret << dendl;
return ret;
return 0;
}
- int is_already_tiered() {
+int RGWLCStreamGetCRF::is_already_tiered() {
char buf[32];
map<string, string> attrs = headers;
- for (auto a : attrs) {
+ for (const auto& a : attrs) {
ldout(cct, 20) << "GetCrf attr[" << a.first << "] = " << a.second <<dendl;
}
utime_t ut(obj_properties.mtime);
}
return 0;
}
-};
class RGWLCStreamReadCRF : public RGWStreamReadCRF
{
const DoutPrefixProvider *dpp;
map<string, bufferlist> attrs;
uint64_t obj_size;
- std::unique_ptr<rgw::sal::RGWObject>* obj;
+ std::unique_ptr<rgw::sal::Object>* obj;
const real_time &mtime;
bool multipart;
public:
RGWLCStreamReadCRF(CephContext *_cct, const DoutPrefixProvider *_dpp,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::RGWObject>* _obj,
+ RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object>* _obj,
const real_time &_mtime) :
RGWStreamReadCRF(_obj, obj_ctx), cct(_cct),
dpp(_dpp), obj(_obj), mtime(_mtime) {}
init_headers(attrs, rest_obj.attrs);
rest_obj.acls.set_ctx(cct);
- auto aiter = attrs.find(RGW_ATTR_ACL);
+ const auto aiter = attrs.find(RGW_ATTR_ACL);
if (aiter != attrs.end()) {
bufferlist& bl = aiter->second;
auto bliter = bl.cbegin();
RGWHTTPManager *http_manager;
rgw_lc_obj_properties obj_properties;
std::shared_ptr<RGWRESTConn> conn;
- rgw::sal::RGWObject* dest_obj;
+ rgw::sal::Object* dest_obj;
string etag;
public:
RGWHTTPManager *_http_manager,
const rgw_lc_obj_properties& _obj_properties,
std::shared_ptr<RGWRESTConn> _conn,
- rgw::sal::RGWObject* _dest_obj) :
+ rgw::sal::Object* _dest_obj) :
RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager),
cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
}
map<string, RGWTierACLMapping>& acl_mappings(obj_properties.target_acl_mappings);
string target_storage_class = obj_properties.target_storage_class;
- auto& new_attrs = *attrs;
-
- new_attrs.clear();
+ attrs->clear();
for (auto& hi : rest_obj.attrs) {
if (keep_attr(hi.first)) {
- new_attrs.insert(hi);
+ attrs->insert(hi);
}
}
- auto acl = rest_obj.acls.get_acl();
+ const auto acl = rest_obj.acls.get_acl();
map<int, vector<string> > access_map;
const auto& am = acl_mappings;
- auto iter = am.find(orig_grantee);
+ const auto iter = am.find(orig_grantee);
if (iter == am.end()) {
ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
continue;
}
}
- for (auto aiter : access_map) {
+ for (const auto& aiter : access_map) {
int grant_type = aiter.first;
string header_str("x-amz-grant-");
string s;
- for (auto viter : aiter.second) {
+ for (const auto& viter : aiter.second) {
if (!s.empty()) {
s.append(", ");
}
ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
- new_attrs[header_str] = s;
+ (*attrs)[header_str] = s;
}
/* Copy target storage class */
if (!target_storage_class.empty()) {
- new_attrs["x-amz-storage-class"] = target_storage_class;
+ (*attrs)["x-amz-storage-class"] = target_storage_class;
} else {
- new_attrs["x-amz-storage-class"] = "STANDARD";
+ (*attrs)["x-amz-storage-class"] = "STANDARD";
}
/* New attribute to specify its transitioned from RGW */
- new_attrs["x-amz-meta-rgwx-source"] = "rgw";
+ (*attrs)["x-amz-meta-rgwx-source"] = "rgw";
char buf[32];
snprintf(buf, sizeof(buf), "%llu", (long long)obj_properties.versioned_epoch);
- new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
+ (*attrs)["x-amz-meta-rgwx-versioned-epoch"] = buf;
utime_t ut(obj_properties.mtime);
snprintf(buf, sizeof(buf), "%lld.%09lld",
(long long)ut.sec(),
(long long)ut.nsec());
- new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
- new_attrs["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
- new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
+ (*attrs)["x-amz-meta-rgwx-source-mtime"] = buf;
+ (*attrs)["x-amz-meta-rgwx-source-etag"] = obj_properties.etag;
+ (*attrs)["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
if (!rest_obj.key.instance.empty()) {
- new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
+ (*attrs)["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
+ }
+ for (const auto& a : (*attrs)) {
+ ldout(cct, 30) << "init_send_attrs attr[" << a.first << "] = " << a.second <<dendl;
}
}
- void send_ready(const rgw_rest_obj& rest_obj) override {
+ void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
map<string, string> new_attrs;
RGWAccessControlPolicy policy;
- r->send_ready(conn->get_key(), new_attrs, policy, false);
+ r->send_ready(dpp, conn->get_key(), new_attrs, policy);
}
void handle_headers(const map<string, string>& headers) {
- for (auto h : headers) {
+ for (const auto& h : headers) {
if (h.first == "ETAG") {
etag = h.second;
}
std::shared_ptr<RGWStreamReadCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
- std::unique_ptr<rgw::sal::RGWBucket> dest_bucket;
- std::unique_ptr<rgw::sal::RGWObject> dest_obj;
+ std::unique_ptr<rgw::sal::Bucket> dest_bucket;
+ std::unique_ptr<rgw::sal::Object> dest_obj;
+
+ rgw_lc_obj_properties obj_properties;
+ RGWBucketInfo b;
+ string target_obj_name;
+
+ rgw::sal::Object *o;
public:
RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx)
- : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {}
-
- int operate() override {
- rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
- tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch,
- tier_ctx.acl_mappings,
- tier_ctx.target_storage_class);
+ : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
+ obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class){}
- string target_obj_name;
- RGWBucketInfo b;
- int reterr = 0;
+ int operate(const DoutPrefixProvider *dpp) {
- b.bucket.name = tier_ctx.target_bucket_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
+ reenter(this) {
+ b.bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ (*tier_ctx.obj)->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
+ }
- reterr = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
- if (reterr < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << reterr << dendl;
- return reterr;
- }
+ retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl;
+ return retcode;
+ }
- dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
- if (!dest_obj) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
- return -1;
- }
-
- rgw::sal::RGWObject *o = static_cast<rgw::sal::RGWObject *>(dest_obj.get());
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+ if (!dest_obj) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+ return -1;
+ }
+ o = dest_obj.get();
- reenter(this) {
// tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
/* Prepare Read from source */
- in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp,
+ in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, dpp,
tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
std::shared_ptr<RGWStreamReadCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
- std::unique_ptr<rgw::sal::RGWBucket> dest_bucket;
- std::unique_ptr<rgw::sal::RGWObject> dest_obj;
+ std::unique_ptr<rgw::sal::Bucket> dest_bucket;
+ std::unique_ptr<rgw::sal::Object> dest_obj;
+
+ rgw_lc_obj_properties obj_properties;
+ RGWBucketInfo b;
+ string target_obj_name;
+ off_t end;
public:
RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, const string& _upload_id,
const rgw_lc_multipart_part_info& _part_info,
string *_petag) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
- upload_id(_upload_id), part_info(_part_info), petag(_petag) {}
-
- int operate() override {
- ldout(cct, 0) << "In CloudMultipartPartCR XXXXXXXXXXXXXXXXXXX" << dendl;
- rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
- tier_ctx.target_storage_class);
- string target_obj_name;
- off_t end;
- RGWBucketInfo b;
- int reterr = 0;
+ upload_id(_upload_id), part_info(_part_info), petag(_petag),
+ obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class){}
- b.bucket.name = tier_ctx.target_bucket_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ b.bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ (*tier_ctx.obj)->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
+ }
- reterr = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
- if (reterr < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << reterr << dendl;
- return reterr;
- }
+ retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , retcode = " << retcode << dendl;
+ return retcode;
+ }
- dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
- if (!dest_obj) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
- return -1;
- }
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+ if (!dest_obj) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+ return -1;
+ }
- reenter(this) {
// tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
/* Prepare Read from source */
/* Prepare write */
out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
(RGWHTTPManager*)(tier_ctx.http_manager), obj_properties, tier_ctx.conn,
- static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
+ dest_obj.get()));
out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
dest_conn(_dest_conn), dest_obj(_dest_obj),
upload_id(_upload_id) {}
- int operate() override {
+ int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
dest_obj(_dest_obj), obj_size(_obj_size),
attrs(_attrs), upload_id(_upload_id) {}
- int operate() override {
+ int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
explicit CompleteMultipartReq(const map<int, rgw_lc_multipart_part_info>& _parts) : parts(_parts) {}
void dump_xml(Formatter *f) const {
- for (auto p : parts) {
+ for (const auto& p : parts) {
f->open_object_section("Part");
encode_xml("PartNumber", p.first, f);
encode_xml("ETag", p.second.etag, f);
dest_conn(_dest_conn), dest_obj(_dest_obj), upload_id(_upload_id),
req_enc(_parts) {}
- int operate() override {
+ int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
const rgw_raw_obj status_obj;
string upload_id;
- int ret = -1;
public:
tier_ctx(_tier_ctx), dest_obj(_dest_obj), status_obj(_status_obj),
upload_id(_upload_id) {}
- int operate() override {
+ int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield call(new RGWLCAbortMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, upload_id));
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
/* ignore error, best effort */
}
- ret = tier_ctx.store->delete_system_obj(status_obj.pool, status_obj.oid, nullptr, null_yield);
-
- if (ret < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << ret << dendl;
+ yield call(new RGWRadosRemoveCR(dynamic_cast<rgw::sal::RadosStore*>(tier_ctx.store), status_obj));
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
/* ignore error, best effort */
}
return set_cr_done();
map<string, string> new_attrs;
- int ret_err{0};
-
rgw_raw_obj status_obj;
- bufferlist bl;
+
+ rgw_lc_obj_properties obj_properties;
+ RGWBucketInfo b;
+ string target_obj_name;
+ rgw_bucket target_bucket;
+ rgw::sal::RadosStore *rados;
public:
- RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {}
+ RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx)
+ : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx),
+ obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
+ tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
+ tier_ctx.target_storage_class){}
- int operate() override {
- rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
- tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch,
- tier_ctx.acl_mappings,
- tier_ctx.target_storage_class);
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
- //rgw_obj& obj = (*tier_ctx.obj)->get_obj();
- obj_size = tier_ctx.o.meta.size;
+ obj_size = tier_ctx.o.meta.size;
- rgw_bucket target_bucket;
- target_bucket.name = tier_ctx.target_bucket_name;
+ target_bucket.name = tier_ctx.target_bucket_name;
- string target_obj_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
- if (!tier_ctx.o.is_current()) {
- target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
- }
- rgw_obj dest_obj(target_bucket, target_obj_name);
- rgw_rest_obj rest_obj;
-
- reenter(this) {
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ (*tier_ctx.obj)->get_name();
+ if (!tier_ctx.o.is_current()) {
+ target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
+ }
+ dest_obj.init(target_bucket, target_obj_name);
- status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
- "lc_multipart_" + (*tier_ctx.obj)->get_oid());
+ status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
+ "lc_multipart_" + (*tier_ctx.obj)->get_oid());
- ret_err = tier_ctx.store->get_system_obj(tier_ctx.dpp, status_obj.pool,
- status_obj.oid, bl, NULL, NULL, null_yield);
+ rados = dynamic_cast<rgw::sal::RadosStore*>(tier_ctx.store);
- if (ret_err < 0 && ret_err != -ENOENT) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << ret_err << dendl;
- return retcode;
+ if (!rados) {
+ ldout(tier_ctx.cct, 0) << "ERROR: Not a RadosStore. Cannot be transitioned to cloud." << dendl;
+ return -1;
}
- if (ret_err >= 0) {
- auto iter = bl.cbegin();
- try {
- decode(status, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
+ yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj,
+ status_obj, &status, false));
+
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
+ return retcode;
}
- if (ret_err >= 0) {
+ if (retcode >= 0) {
/* check here that mtime and size did not change */
if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
status.etag != obj_properties.etag) {
}
}
- if (ret_err == -ENOENT) {
+ if (retcode == -ENOENT) {
in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
in_crf->init();
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to sync obj=" << tier_ctx.obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
- ret_err = retcode;
yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
- return set_cr_error(ret_err);
+ return set_cr_error(retcode);
}
- encode(status, bl);
- ret_err = tier_ctx.store->put_system_obj(status_obj.pool, status_obj.oid,
- bl, false, NULL, real_time(), null_yield, NULL);
- if (ret_err < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << ret_err << dendl;
+ yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(dpp, rados->svc()->rados->get_async_processor(), rados->svc()->sysobj, status_obj, status));
+
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
/* continue with upload anyway */
}
ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << status.parts[status.cur_part].etag << dendl;
yield call(new RGWLCCompleteMultipartCR(tier_ctx.cct, tier_ctx.http_manager, tier_ctx.conn.get(), dest_obj, status.upload_id, status.parts));
if (retcode < 0) {
ldout(tier_ctx.cct, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
- ret_err = retcode;
yield call(new RGWLCStreamAbortMultipartUploadCR(tier_ctx, dest_obj, status_obj, status.upload_id));
- return set_cr_error(ret_err);
+ return set_cr_error(retcode);
}
/* remove status obj */
- ret_err = tier_ctx.store->delete_system_obj(status_obj.pool, status_obj.oid, nullptr, null_yield);
-
- if (ret_err < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-ret_err) << ")" << dendl;
+ yield call(new RGWRadosRemoveCR(rados, status_obj));
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
/* ignore error, best effort */
}
return set_cr_done();
}
};
-int RGWLCCloudCheckCR::operate() {
+int RGWLCCloudCheckCR::operate(const DoutPrefixProvider *dpp) {
/* Check if object has already been transitioned */
- rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime, tier_ctx.o.meta.etag,
- tier_ctx.o.versioned_epoch, tier_ctx.acl_mappings,
- tier_ctx.target_storage_class);
-
- RGWBucketInfo b;
- string target_obj_name;
- int reterr = 0;
-
- b.bucket.name = tier_ctx.target_bucket_name;
- target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- (*tier_ctx.obj)->get_name();
+ reenter(this) {
+ b.bucket.name = tier_ctx.target_bucket_name;
+ target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
+ (*tier_ctx.obj)->get_name();
if (!tier_ctx.o.is_current()) {
target_obj_name += get_key_instance((*tier_ctx.obj)->get_key());
}
- std::unique_ptr<rgw::sal::RGWBucket> dest_bucket;
- std::unique_ptr<rgw::sal::RGWObject> dest_obj;
-
-
- reterr = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
- if (reterr < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << reterr << dendl;
- return reterr;
- }
+ retcode = tier_ctx.store->get_bucket(nullptr, b, &dest_bucket);
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_bucket - " << tier_ctx.target_bucket_name << " , reterr = " << retcode << dendl;
+ return ret;
+ }
- dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
- if (!dest_obj) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
- return -1;
- }
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
+ if (!dest_obj) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to initialize dest_object path - " << target_obj_name << dendl;
+ return -1;
+ }
- std::shared_ptr<RGWLCStreamGetCRF> get_crf;
- get_crf.reset(new RGWLCStreamGetCRF((CephContext *)(tier_ctx.cct), get_env(), this,
- (RGWHTTPManager*)(tier_ctx.http_manager),
- obj_properties, tier_ctx.conn, static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
- int ret = 0;
+ get_crf.reset(new RGWLCStreamGetCRF(tier_ctx.cct, get_env(), this, tier_ctx.http_manager, obj_properties,
+ tier_ctx.conn, dest_obj.get()));
- reenter(this) {
/* Having yield here doesn't seem to wait for init2() to fetch the headers
* before calling is_already_tiered() below
*/
- ret = get_crf->init();
- if (ret < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , ret = " << ret << dendl;
- return set_cr_error(ret);
+ yield {
+ retcode = get_crf->init(dpp);
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to fetch HEAD from cloud for obj=" << tier_ctx.obj << " , retcode = " << retcode << dendl;
+ return set_cr_error(ret);
+ }
+ }
+ if (retcode < 0) {
+ ldout(tier_ctx.cct, 20) << __func__ << ": get_crf()->init retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
}
- if ((static_cast<RGWLCStreamGetCRF *>(get_crf.get()))->is_already_tiered()) {
+ if (get_crf.get()->is_already_tiered()) {
*already_tiered = true;
ldout(tier_ctx.cct, 20) << "is_already_tiered true" << dendl;
return set_cr_done();
map <pair<string, string>, utime_t> target_buckets;
-int RGWLCCloudTierCR::operate() {
+int RGWLCCloudTierCR::operate(const DoutPrefixProvider *dpp) {
pair<string, string> key(tier_ctx.storage_class, tier_ctx.target_bucket_name);
bool bucket_created = false;