int delete_tier_obj(lc_op_ctx& oc, RGWLCCloudTierCtx& tier_ctx) {
int ret = 0;
- /* XXX: do we need to check for retention attributes * as done in RGWDeleteObj?
+ /* If bucket is versioned, create delete_marker for current version
*/
- ret = oc.store->getRados()->delete_obj(oc.dpp, oc.rctx, oc.bucket->get_info(), oc.obj->get_obj(), tier_ctx.bucket_info.versioning_status());
-
+ ret = remove_expired_obj(oc.dpp, oc,
+ !(oc.o.is_current() && oc.bucket->versioned()));
return ret;
}
int update_tier_obj(lc_op_ctx& oc, RGWLCCloudTierCtx& tier_ctx) {
map<string, bufferlist> attrs;
- RGWRados::Object op_target(tier_ctx.store->getRados(),
- tier_ctx.bucket_info,
- tier_ctx.rctx, tier_ctx.obj);
+ int r = 0;
+
real_time read_mtime;
- RGWRados::Object::Read read_op(&op_target);
+ std::unique_ptr<rgw::sal::RGWObject::ReadOp> read_op(oc.obj->get_read_op(&oc.rctx));
- read_op.params.attrs = &attrs;
- read_op.params.lastmod = &read_mtime;
+ read_op->params.lastmod = &read_mtime;
- int r = read_op.prepare(null_yield, oc.dpp);
+ r = read_op->prepare(null_yield, oc.dpp);
if (r < 0) {
return r;
}
return -ECANCELED;
}
- tier_ctx.rctx.set_atomic(tier_ctx.obj);
+ attrs = oc.obj->get_attrs();
+ (*tier_ctx.obj)->set_atomic(&tier_ctx.rctx);
- RGWRados::Object::Write obj_op(&op_target);
- RGWObjState *s = tier_ctx.rctx.get_state(tier_ctx.obj);
+ RGWObjState *s = tier_ctx.rctx.get_state((*tier_ctx.obj)->get_obj());
+ std::unique_ptr<rgw::sal::RGWObject::WriteOp> obj_op(oc.obj->get_write_op(&oc.rctx));
- obj_op.meta.modify_tail = true;
- obj_op.meta.flags = PUT_OBJ_CREATE;
- obj_op.meta.category = RGWObjCategory::CloudTiered;
- obj_op.meta.delete_at = real_time();
+ obj_op->params.modify_tail = true;
+ obj_op->params.flags = PUT_OBJ_CREATE;
+ obj_op->params.category = RGWObjCategory::CloudTiered;
+ obj_op->params.delete_at = real_time();
bufferlist blo;
blo.append("");
- obj_op.meta.data = &blo;
- obj_op.meta.if_match = NULL;
- obj_op.meta.if_nomatch = NULL;
- obj_op.meta.user_data = NULL;
- obj_op.meta.zones_trace = NULL;
- obj_op.meta.delete_at = real_time();
- obj_op.meta.olh_epoch = tier_ctx.o.versioned_epoch;
+ obj_op->params.data = &blo;
+ obj_op->params.if_match = NULL;
+ obj_op->params.if_nomatch = NULL;
+ obj_op->params.user_data = NULL;
+ obj_op->params.zones_trace = NULL;
+ obj_op->params.delete_at = real_time();
+ obj_op->params.olh_epoch = tier_ctx.o.versioned_epoch;
RGWObjManifest *pmanifest;
rgw_placement_rule target_placement;
target_placement.inherit_from(tier_ctx.bucket_info.placement_rule);
target_placement.storage_class = oc.tier.storage_class;
- pmanifest->set_head(target_placement, tier_ctx.obj, 0);
+ pmanifest->set_head(target_placement, (*tier_ctx.obj)->get_obj(), 0);
- pmanifest->set_tail_placement(target_placement, tier_ctx.obj.bucket);
+ pmanifest->set_tail_placement(target_placement, (*tier_ctx.obj)->get_obj().bucket);
/* should the obj_size also be set to '0' or is it needed
* to keep track of original size before transition.
*/
//pmanifest->set_obj_size(0);
- obj_op.meta.manifest = pmanifest;
+ obj_op->params.manifest = pmanifest;
/* update storage class */
bufferlist bl;
attrs.erase(RGW_ATTR_ID_TAG);
attrs.erase(RGW_ATTR_TAIL_TAG);
- obj_op.write_meta(oc.dpp, tier_ctx.o.meta.size, 0, attrs, null_yield);
+ obj_op->params.attrs = &attrs;
+
+ r = obj_op->write_meta(oc.dpp, tier_ctx.o.meta.size, 0, null_yield);
if (r < 0) {
return r;
}
RGWAccessKey key = oc.tier.t.s3.key;
HostStyle host_style = oc.tier.t.s3.host_style;
string bucket_name = oc.tier.t.s3.target_path;
- const RGWZoneGroup& zonegroup = oc.store->svc()->zone->get_zonegroup();
+ const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
+ bool delete_object;
+ /* Option 'retain_object' is not applicable for CurrentVersionTransition */
+ delete_object = (!oc.tier.retain_object ||
+ (oc.o.is_current() && oc.bucket->versioned()));
+
if (bucket_name.empty()) {
bucket_name = "rgwx-" + zonegroup.get_name() + "-" + oc.tier.storage_class +
"-cloud-bucket";
boost::algorithm::to_lower(bucket_name);
}
- conn.reset(new S3RESTConn(oc.cct, oc.store->svc()->zone,
- id, { endpoint }, key, host_style));
+ conn.reset(new S3RESTConn(oc.cct, oc.store, id, { endpoint }, key, host_style));
/* http_mngr */
- RGWCoroutinesManager crs(oc.store->ctx(), oc.store->getRados()->get_cr_registry());
+ RGWCoroutinesManager crs(oc.store->ctx(), oc.store->get_cr_registry());
RGWHTTPManager http_manager(oc.store->ctx(), crs.get_completion_mgr());
int ret = http_manager.start();
}
RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(),
- oc.obj->get_obj(), oc.rctx, conn, bucket_name,
+ &oc.obj, oc.rctx, conn, bucket_name,
oc.tier.t.s3.target_storage_class, &http_manager);
tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings;
tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size;
return ret;
}
- if (oc.tier.retain_object) {
- ret = update_tier_obj(oc, tier_ctx);
+ if (delete_object) {
+ ret = delete_tier_obj(oc, tier_ctx);
if (ret < 0) {
- ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object failed ret=" << ret << dendl;
+ ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object failed ret=" << ret << dendl;
return ret;
}
} else {
- ret = delete_tier_obj(oc, tier_ctx);
+ ret = update_tier_obj(oc, tier_ctx);
if (ret < 0) {
- ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object failed ret=" << ret << dendl;
+ ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object failed ret=" << ret << dendl;
return ret;
}
}
auto& o = oc.o;
int r;
std::string tier_type = "";
- const RGWZoneGroup& zonegroup = oc.store->svc()->zone->get_zonegroup();
+ const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
rgw_placement_rule target_placement;
target_placement.inherit_from(oc.bucket->get_placement_rule());
if (!r && oc.tier.tier_type == "cloud-s3") {
ldpp_dout(oc.dpp, 20) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
+
+ if (!oc.o.is_current() &&
+ !pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, oc.dpp)) {
+ /* Skip objects which has object lock enabled. */
+ ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
+ }
+
r = transition_obj_to_cloud(oc);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj to cloud (r=" << r << ")"
const DoutPrefixProvider *dpp;
map<string, bufferlist> attrs;
uint64_t obj_size;
- rgw_obj& obj;
+ std::unique_ptr<rgw::sal::RGWObject>* obj;
const real_time &mtime;
bool multipart;
public:
RGWLCStreamReadCRF(CephContext *_cct, const DoutPrefixProvider *_dpp,
- RGWRados* rados, RGWBucketInfo& bucket_info,
- RGWObjectCtx& obj_ctx, rgw_obj& _obj, const real_time &_mtime) :
- RGWStreamReadCRF(rados, bucket_info, obj_ctx, _obj), cct(_cct),
+ RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::RGWObject>* _obj,
+ const real_time &_mtime) :
+ RGWStreamReadCRF(_obj, obj_ctx), cct(_cct),
dpp(_dpp), obj(_obj), mtime(_mtime) {}
~RGWLCStreamReadCRF() {};
optional_yield y = null_yield;
real_time read_mtime;
- read_op.params.attrs = &attrs;
- read_op.params.lastmod = &read_mtime;
- read_op.params.obj_size = &obj_size;
+ read_op->params.lastmod = &read_mtime;
- int ret = read_op.prepare(y, dpp);
+ int ret = read_op->prepare(y, dpp);
if (ret < 0) {
ldout(cct, 0) << "ERROR: fail to prepare read_op, ret = " << ret << dendl;
return ret;
return -ECANCELED;
}
+ attrs = (*obj)->get_attrs();
+ obj_size = (*obj)->get_obj_size();
+
ret = init_rest_obj();
if (ret < 0) {
ldout(cct, 0) << "ERROR: fail to initialize rest_obj, ret = " << ret << dendl;
/* Initialize rgw_rest_obj.
* Reference: do_decode_rest_obj
* Check how to copy headers content */
- rest_obj.init(obj.key);
+ rest_obj.init((*obj)->get_key());
if (!multipart) {
rest_obj.content_len = obj_size;
int read(off_t ofs, off_t end, bufferlist &bl) {
optional_yield y = null_yield;
- return read_op.read(ofs, end, bl, y, dpp);
+ return read_op->read(ofs, end, bl, y, dpp);
}
};
std::shared_ptr<RGWStreamReadCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
- std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
- std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
+ std::unique_ptr<rgw::sal::RGWBucket> dest_bucket;
+ std::unique_ptr<rgw::sal::RGWObject> dest_obj;
public:
RGWLCStreamObjToCloudPlainCR(RGWLCCloudTierCtx& _tier_ctx)
target_bucket.name = tier_ctx.target_bucket_name;
target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- tier_ctx.obj.key.name + get_key_instance(tier_ctx.obj.key);
+ (*tier_ctx.obj)->get_name() + get_key_instance((*tier_ctx.obj)->get_key());
- dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+ tier_ctx.store->get_bucket(tier_ctx.dpp, nullptr, target_bucket, &dest_bucket, null_yield);
- dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name),
- (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
rgw::sal::RGWObject *o = static_cast<rgw::sal::RGWObject *>(dest_obj.get());
/* Prepare Read from source */
in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp,
- tier_ctx.store->getRados(), tier_ctx.bucket_info,
tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
out_crf.reset(new RGWLCStreamPutCRF((CephContext *)(tier_ctx.cct), get_env(), this,
std::shared_ptr<RGWStreamReadCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
- std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
- std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
+ std::unique_ptr<rgw::sal::RGWBucket> dest_bucket;
+ std::unique_ptr<rgw::sal::RGWObject> dest_obj;
public:
RGWLCStreamObjToCloudMultipartPartCR(RGWLCCloudTierCtx& _tier_ctx, const string& _upload_id,
target_bucket.name = tier_ctx.target_bucket_name;
target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- tier_ctx.obj.key.name + get_key_instance(tier_ctx.obj.key);
-
- dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+ (*tier_ctx.obj)->get_name() + get_key_instance((*tier_ctx.obj)->get_key());
- dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name),
- (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+ tier_ctx.store->get_bucket(tier_ctx.dpp, nullptr, target_bucket, &dest_bucket, null_yield);
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
reenter(this) {
// tier_ctx.obj.set_atomic(&tier_ctx.rctx); -- might need when updated to zipper SAL
/* Prepare Read from source */
- in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.store->getRados(),
- tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+ in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp,
+ tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
end = part_info.ofs + part_info.size - 1;
std::static_pointer_cast<RGWLCStreamReadCRF>(in_crf)->set_multipart(part_info.size, part_info.ofs, end);
const rgw_raw_obj status_obj;
string upload_id;
+ int ret = -1;
public:
ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
/* ignore error, best effort */
}
- yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
+ ret = tier_ctx.store->delete_system_obj(status_obj.pool, status_obj.oid, nullptr, null_yield);
+
+ if (ret < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << ret << dendl;
/* ignore error, best effort */
}
return set_cr_done();
int ret_err{0};
rgw_raw_obj status_obj;
+ bufferlist bl;
public:
RGWLCStreamObjToCloudMultipartCR(RGWLCCloudTierCtx& _tier_ctx) : RGWCoroutine(_tier_ctx.cct), tier_ctx(_tier_ctx) {}
tier_ctx.acl_mappings,
tier_ctx.target_storage_class);
- rgw_obj& obj = tier_ctx.obj;
+ //rgw_obj& obj = (*tier_ctx.obj)->get_obj();
obj_size = tier_ctx.o.meta.size;
rgw_bucket target_bucket;
string target_obj_name;
target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- tier_ctx.obj.key.name + get_key_instance(tier_ctx.obj.key);
+ (*tier_ctx.obj)->get_name() + get_key_instance((*tier_ctx.obj)->get_key());
rgw_obj dest_obj(target_bucket, target_obj_name);
std::shared_ptr<RGWStreamReadCRF> in_crf;
rgw_rest_obj rest_obj;
- status_obj = rgw_raw_obj(tier_ctx.store->svc()->zone->get_zone_params().log_pool,
- "lc_multipart_" + obj.get_oid());
+ status_obj = rgw_raw_obj(tier_ctx.store->get_zone()->get_params().log_pool,
+ "lc_multipart_" + (*tier_ctx.obj)->get_oid());
- reenter(this) {
- yield call(new RGWSimpleRadosReadCR<rgw_lc_multipart_upload_info>(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj,
- status_obj, &status, false));
+ ret_err = tier_ctx.store->get_system_obj(tier_ctx.dpp, status_obj.pool,
+ status_obj.oid, bl, NULL, NULL, null_yield);
- if (retcode < 0 && retcode != -ENOENT) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
+ if (ret_err < 0 && ret_err != -ENOENT) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << ret_err << dendl;
return retcode;
}
- if (retcode >= 0) {
+ if (ret_err >= 0) {
+ auto iter = bl.cbegin();
+ try {
+ decode(status, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ }
+
+ reenter(this) {
+
+ if (ret_err >= 0) {
/* check here that mtime and size did not change */
if (status.mtime != obj_properties.mtime || status.obj_size != obj_size ||
status.etag != obj_properties.etag) {
}
}
- if (retcode == -ENOENT) {
- in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.store->getRados(), tier_ctx.bucket_info, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
+ if (ret_err == -ENOENT) {
+ in_crf.reset(new RGWLCStreamReadCRF(tier_ctx.cct, tier_ctx.dpp, tier_ctx.rctx, tier_ctx.obj, tier_ctx.o.meta.mtime));
in_crf->init();
return set_cr_error(ret_err);
}
- yield call(new RGWSimpleRadosWriteCR<rgw_lc_multipart_upload_info>(tier_ctx.store->svc()->rados->get_async_processor(), tier_ctx.store->svc()->sysobj, status_obj, status));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
+ encode(status, bl);
+ ret_err = tier_ctx.store->put_system_obj(status_obj.pool, status_obj.oid,
+ bl, false, NULL, real_time(), null_yield, NULL);
+ if (ret_err < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << ret_err << dendl;
/* continue with upload anyway */
}
ldout(tier_ctx.cct, 0) << "sync of object=" << tier_ctx.obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
}
/* remove status obj */
- yield call(new RGWRadosRemoveCR(tier_ctx.store, status_obj));
- if (retcode < 0) {
- ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+ ret_err = tier_ctx.store->delete_system_obj(status_obj.pool, status_obj.oid, nullptr, null_yield);
+
+ if (ret_err < 0) {
+ ldout(tier_ctx.cct, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-ret_err) << ")" << dendl;
/* ignore error, best effort */
}
return set_cr_done();
target_bucket.name = tier_ctx.target_bucket_name;
target_obj_name = tier_ctx.bucket_info.bucket.name + "/" +
- tier_ctx.obj.key.name + get_key_instance(tier_ctx.obj.key);
+ (*tier_ctx.obj)->get_name() + get_key_instance((*tier_ctx.obj)->get_key());
+
+ std::unique_ptr<rgw::sal::RGWBucket> dest_bucket;
+ std::unique_ptr<rgw::sal::RGWObject> dest_obj;
- std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
- dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));
+ tier_ctx.store->get_bucket(tier_ctx.dpp, nullptr, target_bucket, &dest_bucket, null_yield);
- std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
- dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));
+ dest_obj = dest_bucket->get_object(rgw_obj_key(target_obj_name));
std::shared_ptr<RGWLCStreamGetCRF> get_crf;