#include "common/containers.h"
#include <common/errno.h>
#include "include/random.h"
-#include "cls/rgw/cls_rgw_client.h"
#include "cls/lock/cls_lock_client.h"
#include "rgw_perf_counters.h"
#include "rgw_common.h"
void RGWLC::initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store) {
cct = _cct;
store = _store;
+ sal_lc = std::move(store->get_lifecycle());
max_objs = cct->_conf->rgw_lc_max_objs;
if (max_objs > HASH_PRIME)
max_objs = HASH_PRIME;
return false;
}
-static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
+static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
os << "<ent: bucket=";
os << ent.bucket;
os << "; start_time=";
int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
{
- vector<cls_rgw_lc_entry> entries;
+ vector<rgw::sal::Lifecycle::LCEntry> entries;
string marker;
dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
#define MAX_LC_LIST_ENTRIES 100
do {
- int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
- marker, MAX_LC_LIST_ENTRIES, entries);
+ int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
if (ret < 0)
return ret;
for (auto& entry : entries) {
entry.start_time = ceph_clock_now();
entry.status = lc_uninitial; // lc_uninitial? really?
- ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], entry);
+ ret = sal_lc->set_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0)
<< "RGWLC::bucket_lc_prepare() failed to set entry on "
return (timediff >= cmp);
}
-static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
- rgw_obj& obj, RGWObjectCtx& ctx)
+static bool pass_object_lock_check(rgw::sal::RGWStore* store, rgw::sal::RGWObject* obj, RGWObjectCtx& ctx)
{
- if (!bucket_info.obj_lock_enabled()) {
+ if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
return true;
}
- RGWRados::Object op_target(store, bucket_info, ctx, obj);
- RGWRados::Object::Read read_op(&op_target);
- map<string, bufferlist> attrs;
- read_op.params.attrs = &attrs;
- int ret = read_op.prepare(null_yield);
+ std::unique_ptr<rgw::sal::RGWObject::ReadOp> read_op = obj->get_read_op(&ctx);
+ int ret = read_op->prepare(null_yield);
if (ret < 0) {
if (ret == -ENOENT) {
return true;
return false;
}
} else {
- auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
- if (iter != attrs.end()) {
+ auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION);
+ if (iter != obj->get_attrs().end()) {
RGWObjectRetention retention;
try {
decode(retention, iter->second);
return false;
}
}
- iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD);
- if (iter != attrs.end()) {
+ iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD);
+ if (iter != obj->get_attrs().end()) {
RGWObjectLegalHold obj_legal_hold;
try {
decode(obj_legal_hold, iter->second);
}
class LCObjsLister {
- rgw::sal::RGWRadosStore *store;
- RGWBucketInfo& bucket_info;
- RGWRados::Bucket target;
- RGWRados::Bucket::List list_op;
- bool is_truncated{false};
- rgw_obj_key next_marker;
+ rgw::sal::RGWStore *store;
+ rgw::sal::RGWBucket* bucket;
+ rgw::sal::RGWBucket::ListParams list_params;
+ rgw::sal::RGWBucket::ListResults list_results;
string prefix;
- vector<rgw_bucket_dir_entry> objs;
vector<rgw_bucket_dir_entry>::iterator obj_iter;
rgw_bucket_dir_entry pre_obj;
int64_t delay_ms;
public:
- LCObjsLister(rgw::sal::RGWRadosStore *_store, RGWBucketInfo& _bucket_info) :
- store(_store), bucket_info(_bucket_info),
- target(store->getRados(), bucket_info), list_op(&target) {
- list_op.params.list_versions = bucket_info.versioned();
- list_op.params.allow_unordered = true;
+ LCObjsLister(rgw::sal::RGWStore *_store, rgw::sal::RGWBucket* _bucket) :
+ store(_store), bucket(_bucket) {
+ list_params.list_versions = bucket->versioned();
+ list_params.allow_unordered = true;
delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
}
void set_prefix(const string& p) {
prefix = p;
- list_op.params.prefix = prefix;
+ list_params.prefix = prefix;
}
int init() {
}
int fetch() {
- int ret = list_op.list_objects(
- 1000, &objs, NULL, &is_truncated, null_yield);
+ int ret = bucket->list(list_params, 1000, list_results, null_yield);
if (ret < 0) {
return ret;
}
- obj_iter = objs.begin();
+ obj_iter = list_results.objs.begin();
return 0;
}
bool get_obj(rgw_bucket_dir_entry **obj,
std::function<void(void)> fetch_barrier
= []() { /* nada */}) {
- if (obj_iter == objs.end()) {
- if (!is_truncated) {
+ if (obj_iter == list_results.objs.end()) {
+ if (!list_results.is_truncated) {
delay();
return false;
} else {
fetch_barrier();
- list_op.params.marker = pre_obj.key;
+ list_params.marker = pre_obj.key;
int ret = fetch();
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret
}
/* returning address of entry in objs */
*obj = &(*obj_iter);
- return obj_iter != objs.end();
+ return obj_iter != list_results.objs.end();
}
rgw_bucket_dir_entry get_prev_obj() {
}
boost::optional<std::string> next_key_name() {
- if (obj_iter == objs.end() ||
- (obj_iter + 1) == objs.end()) {
+ if (obj_iter == list_results.objs.end() ||
+ (obj_iter + 1) == list_results.objs.end()) {
/* this should have been called after get_obj() was called, so this should
* only happen if is_truncated is false */
return boost::none;
lc_op op;
rgw::sal::RGWRadosStore *store;
LCWorker* worker;
- RGWBucketInfo& bucket_info;
+ rgw::sal::RGWBucket* bucket;
LCObjsLister& ol;
op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker,
- RGWBucketInfo& _bucket_info, LCObjsLister& _ol)
- : op(_op), store(_store), worker(_worker), bucket_info(_bucket_info),
+ rgw::sal::RGWBucket* _bucket, LCObjsLister& _ol)
+ : op(_op), store(_store), worker(_worker), bucket(_bucket),
ol(_ol) {}
}; /* op_env */
ceph::real_time effective_mtime;
rgw::sal::RGWRadosStore *store;
- RGWBucketInfo& bucket_info;
+ rgw::sal::RGWBucket* bucket;
lc_op& op; // ok--refers to expanded env.op
LCObjsLister& ol;
- rgw_obj obj;
+ std::unique_ptr<rgw::sal::RGWObject> obj;
RGWObjectCtx rctx;
const DoutPrefixProvider *dpp;
WorkQ* wq;
const DoutPrefixProvider *dpp, WorkQ* wq)
: cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
effective_mtime(effective_mtime),
- store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
- obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
- {}
+ store(env.store), bucket(env.bucket), op(env.op), ol(env.ol),
+ rctx(env.store), dpp(dpp), wq(wq)
+ {
+ obj = bucket->get_object(o.key);
+ }
bool next_has_same_name(const std::string& key_name) {
return (next_key_name && key_name.compare(
static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
{
auto& store = oc.store;
- auto& bucket_info = oc.bucket_info;
+ auto& bucket_info = oc.bucket->get_info();
auto& o = oc.o;
auto obj_key = o.key;
auto& meta = o.meta;
+ int ret;
+ std::string version_id;
if (!remove_indeed) {
obj_key.instance.clear();
obj_key.instance = "null";
}
- rgw_obj obj(bucket_info.bucket, obj_key);
+ std::unique_ptr<rgw::sal::RGWBucket> bucket;
+ std::unique_ptr<rgw::sal::RGWObject> obj;
+
+ ret = store->get_bucket(nullptr, bucket_info, &bucket);
+ if (ret < 0) {
+ return ret;
+ }
+
+ obj = bucket->get_object(obj_key);
+
ACLOwner obj_owner;
obj_owner.set_id(rgw_user {meta.owner});
obj_owner.set_name(meta.owner_display_name);
+ ACLOwner bucket_owner;
+ bucket_owner.set_id(bucket_info.owner);
- RGWRados::Object del_target(store->getRados(), bucket_info, oc.rctx, obj);
- RGWRados::Object::Delete del_op(&del_target);
-
- del_op.params.bucket_owner = bucket_info.owner;
- del_op.params.versioning_status = bucket_info.versioning_status();
- del_op.params.obj_owner = obj_owner;
- del_op.params.unmod_since = meta.mtime;
-
- return del_op.delete_obj(null_yield);
+ return obj->delete_object(&oc.rctx, obj_owner, bucket_owner, meta.mtime, false, 0,
+ version_id, null_yield);
} /* remove_expired_obj */
class LCOpAction {
return !once && stop_at < time(nullptr);
}
-int RGWLC::handle_multipart_expiration(
- RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
- LCWorker* worker, time_t stop_at, bool once)
+int RGWLC::handle_multipart_expiration(rgw::sal::RGWBucket* target,
+ const multimap<string, lc_op>& prefix_map,
+ LCWorker* worker, time_t stop_at, bool once)
{
MultipartMetaFilter mp_filter;
vector<rgw_bucket_dir_entry> objs;
- bool is_truncated;
int ret;
- RGWBucketInfo& bucket_info = target->get_bucket_info();
- RGWRados::Bucket::List list_op(target);
+ rgw::sal::RGWBucket::ListParams params;
+ rgw::sal::RGWBucket::ListResults results;
auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
- list_op.params.list_versions = false;
+ params.list_versions = false;
/* lifecycle processing does not depend on total order, so can
* take advantage of unordered listing optimizations--such as
* operating on one shard at a time */
- list_op.params.allow_unordered = true;
- list_op.params.ns = RGW_OBJ_NS_MULTIPART;
- list_op.params.filter = &mp_filter;
+ params.allow_unordered = true;
+ params.ns = RGW_OBJ_NS_MULTIPART;
+ params.filter = &mp_filter;
auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
return;
}
RGWObjectCtx rctx(store);
- int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
+ int ret = abort_multipart_upload(store, cct, &rctx, target->get_info(), mp_obj);
if (ret == 0) {
if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
continue;
}
- list_op.params.prefix = prefix_iter->first;
+ params.prefix = prefix_iter->first;
do {
objs.clear();
- list_op.params.marker = list_op.get_next_marker();
- ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
+ ret = target->list(params, 1000, results, null_yield);
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
} /* for objs */
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
- } while(is_truncated);
+ } while(results.is_truncated);
} /* for prefix_map */
worker->workpool->drain();
return 0;
}
-static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
- rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
+static int read_obj_tags(rgw::sal::RGWObject* obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
{
- RGWRados::Object op_target(store, bucket_info, ctx, obj);
- RGWRados::Object::Read read_op(&op_target);
+ std::unique_ptr<rgw::sal::RGWObject::ReadOp> rop = obj->get_read_op(&ctx);
- return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
+ return rop->get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
}
static bool is_valid_op(const lc_op& op)
*skip = true;
bufferlist tags_bl;
- int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj,
- oc.rctx, tags_bl);
+ int ret = read_obj_tags(oc.obj.get(), oc.rctx, tags_bl);
if (ret < 0) {
if (ret != -ENODATA) {
ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
- << oc.bucket_info.bucket << ":" << o.key
+ << oc.bucket << ":" << o.key
<< " " << cpp_strerror(r) << " "
<< oc.wq->thr_name() << dendl;
return r;
}
ldout(oc.cct, 2) << "DELETED: current is-dm "
- << oc.bucket_info.bucket << ":" << o.key
+ << oc.bucket << ":" << o.key
<< " " << oc.wq->thr_name() << dendl;
} else {
/* ! o.is_delete_marker() */
- r = remove_expired_obj(oc, !oc.bucket_info.versioned());
+ r = remove_expired_obj(oc, !oc.bucket->versioned());
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
- << oc.bucket_info.bucket << ":" << o.key
+ << oc.bucket << ":" << o.key
<< " " << cpp_strerror(r) << " "
<< oc.wq->thr_name() << dendl;
return r;
if (perfcounter) {
perfcounter->inc(l_rgw_lc_expire_current, 1);
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key
<< " " << oc.wq->thr_name() << dendl;
}
return 0;
<< oc.wq->thr_name() << dendl;
return is_expired &&
- pass_object_lock_check(oc.store->getRados(),
- oc.bucket_info, oc.obj, oc.rctx);
+ pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx);
}
int process(lc_op_ctx& oc) {
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) "
- << oc.bucket_info.bucket << ":" << o.key
+ << oc.bucket << ":" << o.key
<< " " << cpp_strerror(r)
<< " " << oc.wq->thr_name() << dendl;
return r;
if (perfcounter) {
perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key
<< " (non-current expiration) "
<< oc.wq->thr_name() << dendl;
return 0;
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
- << oc.bucket_info.bucket << ":" << o.key
+ << oc.bucket << ":" << o.key
<< " " << cpp_strerror(r)
<< " " << oc.wq->thr_name()
<< dendl;
if (perfcounter) {
perfcounter->inc(l_rgw_lc_expire_dm, 1);
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket << ":" << o.key
<< " (delete marker expiration) "
<< oc.wq->thr_name() << dendl;
return 0;
auto& o = oc.o;
rgw_placement_rule target_placement;
- target_placement.inherit_from(oc.bucket_info.placement_rule);
+ target_placement.inherit_from(oc.bucket->get_placement_rule());
target_placement.storage_class = transition.storage_class;
if (!oc.store->svc()->zone->get_zone_params().
valid_placement(target_placement)) {
ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
<< target_placement
- << " bucket="<< oc.bucket_info.bucket
+ << " bucket="<< oc.bucket
<< " rule_id=" << oc.op.id
<< " " << oc.wq->thr_name() << dendl;
return -EINVAL;
}
- rgw::sal::RGWRadosBucket bucket(oc.store, oc.bucket_info);
- rgw::sal::RGWRadosObject obj(oc.store, oc.obj.key, &bucket);
- int r = oc.store->getRados()->transition_obj(
- oc.rctx, &bucket, obj, target_placement, o.meta.mtime,
- o.versioned_epoch, oc.dpp, null_yield);
+ int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime,
+ o.versioned_epoch, oc.dpp, null_yield);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
- << oc.bucket_info.bucket << ":" << o.key
+ << oc.bucket << ":" << o.key
<< " -> " << transition.storage_class
<< " " << cpp_strerror(r)
<< " " << oc.wq->thr_name() << dendl;
return r;
}
- ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
+ ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket
<< ":" << o.key << " -> "
<< transition.storage_class
<< " " << oc.wq->thr_name() << dendl;
int r = (*selected)->process(ctx);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
- << env.bucket_info.bucket << ":" << o.key
+ << env.bucket << ":" << o.key
<< " " << cpp_strerror(r)
<< " " << wq->thr_name() << dendl;
return r;
}
- ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
+ ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
<< o.key << " " << wq->thr_name() << dendl;
}
time_t stop_at, bool once)
{
RGWLifecycleConfiguration config(cct);
- RGWBucketInfo bucket_info;
- map<string, bufferlist> bucket_attrs;
+ std::unique_ptr<rgw::sal::RGWBucket> bucket;
string no_ns, list_versions;
vector<rgw_bucket_dir_entry> objs;
vector<std::string> result;
string bucket_tenant = result[0];
string bucket_name = result[1];
string bucket_marker = result[2];
- int ret = store->getRados()->get_bucket_info(
- store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
- &bucket_attrs);
+ int ret = store->get_bucket(nullptr, bucket_tenant, bucket_name, &bucket, null_yield);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
+ << " failed" << dendl;
+ return ret;
+ }
+
+ ret = bucket->get_bucket_info(null_yield);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
<< " failed" << dendl;
}
);
- if (bucket_info.bucket.marker != bucket_marker) {
+ if (bucket->get_marker() != bucket_marker) {
ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
<< bucket_tenant << ":" << bucket_name
- << " cur_marker=" << bucket_info.bucket.marker
+ << " cur_marker=" << bucket->get_marker()
<< " orig_marker=" << bucket_marker << dendl;
return -ENOENT;
}
- RGWRados::Bucket target(store->getRados(), bucket_info);
-
- map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
- if (aiter == bucket_attrs.end())
+ map<string, bufferlist>::iterator aiter = bucket->get_attrs().find(RGW_ATTR_LC);
+ if (aiter == bucket->get_attrs().end())
return 0;
bufferlist::const_iterator iter{&aiter->second};
pre_marker = next_marker;
}
- LCObjsLister ol(store, bucket_info);
+ LCObjsLister ol(store, bucket.get());
ol.set_prefix(prefix_iter->first);
ret = ol.init();
return ret;
}
- op_env oenv(op, store, worker, bucket_info, ol);
+ op_env oenv(op, store, worker, bucket.get(), ol);
LCOpRule orule(oenv);
orule.build(); // why can't ctor do it?
rgw_bucket_dir_entry* o{nullptr};
worker->workpool->drain();
}
- ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once);
+ ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
return ret;
}
int RGWLC::bucket_lc_post(int index, int max_lock_sec,
- cls_rgw_lc_entry& entry, int& result,
+ rgw::sal::Lifecycle::LCEntry& entry, int& result,
LCWorker* worker)
{
utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
- rados::cls::lock::Lock l(lc_index_lock_name);
- l.set_cookie(cookie);
- l.set_duration(lock_duration);
+ rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
+ obj_names[index],
+ cookie);
dout(5) << "RGWLC::bucket_lc_post(): POST " << entry
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
do {
- int ret = l.lock_exclusive(
- &store->getRados()->lc_pool_ctx, obj_names[index]);
+ int ret = lock->try_lock(lock_duration, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
<< dendl;
if (result == -ENOENT) {
- ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], entry);
+ ret = sal_lc->rm_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
<< obj_names[index] << dendl;
entry.status = lc_complete;
}
- ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], entry);
+ ret = sal_lc->set_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
<< obj_names[index] << dendl;
}
clean:
- l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ lock->unlock();
+ delete lock;
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
<< obj_names[index] << dendl;
return 0;
}
int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
- vector<cls_rgw_lc_entry>& progress_map,
+ vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
int& index)
{
progress_map.clear();
for(; index < max_objs; index++, marker="") {
- vector<cls_rgw_lc_entry> entries;
- int ret =
- cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
- max_entries, entries);
+ vector<rgw::sal::Lifecycle::LCEntry> entries;
+ int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries);
if (ret < 0) {
if (ret == -ENOENT) {
ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
<< "index: " << index << " worker ix: " << worker->ix
<< dendl;
- rados::cls::lock::Lock l(lc_index_lock_name);
+ rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
+ obj_names[index],
+ std::string());
do {
utime_t now = ceph_clock_now();
//string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
- cls_rgw_lc_entry entry;
+ rgw::sal::Lifecycle::LCEntry entry;
if (max_lock_secs <= 0)
return -EAGAIN;
utime_t time(max_lock_secs, 0);
- l.set_duration(time);
- int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
- obj_names[index]);
+ int ret = lock->try_lock(time, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
if (ret < 0)
return 0;
- cls_rgw_lc_obj_head head;
- ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index],
- head);
+ rgw::sal::Lifecycle::LCHead head;
+ ret = sal_lc->get_head(obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
<< obj_names[index] << ", ret=" << ret << dendl;
}
if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
- ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], head.marker, entry);
+ ret = sal_lc->get_entry(obj_names[index], head.marker, entry);
if (ret >= 0) {
if (entry.status == lc_processing) {
if (expired_session(entry.start_time)) {
}
}
- ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], head.marker, entry);
+ ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
<< obj_names[index] << dendl;
<< dendl;
entry.status = lc_processing;
- ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], entry);
+ ret = sal_lc->set_entry(obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
<< obj_names[index] << entry.bucket << entry.status << dendl;
}
head.marker = entry.bucket;
- ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx,
- obj_names[index], head);
+ ret = sal_lc->put_head(obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< obj_names[index]
<< " index: " << index << " worker ix: " << worker->ix
<< dendl;
- l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ lock->unlock();
+ delete lock;
ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
bucket_lc_post(index, max_lock_secs, entry, ret, worker);
} while(1 && !once);
return 0;
exit:
- l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ lock->unlock();
+ delete lock;
return 0;
}
template<typename F>
static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
+ rgw::sal::Lifecycle* sal_lc,
const rgw_bucket& bucket, const string& cookie,
const F& f) {
CephContext *cct = store->ctx();
get_lc_oid(cct, shard_id, &oid);
/* XXX it makes sense to take shard_id for a bucket_id? */
- cls_rgw_lc_entry entry;
+ rgw::sal::Lifecycle::LCEntry entry;
entry.bucket = shard_id;
entry.status = lc_uninitial;
int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
- rados::cls::lock::Lock l(lc_index_lock_name);
+ rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
+ oid,
+ cookie);
utime_t time(max_lock_secs, 0);
- l.set_duration(time);
- l.set_cookie(cookie);
- librados::IoCtx *ctx = store->getRados()->get_lc_pool_ctx();
int ret;
do {
- ret = l.lock_exclusive(ctx, oid);
+ ret = lock->try_lock(time, null_yield);
if (ret == -EBUSY || ret == -EEXIST) {
ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
<< oid << ", sleep 5, try again" << dendl;
<< oid << ", ret=" << ret << dendl;
break;
}
- ret = f(ctx, oid, entry);
+ ret = f(sal_lc, oid, entry);
if (ret < 0) {
ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on "
<< oid << ", ret=" << ret << dendl;
}
break;
} while(true);
- l.unlock(ctx, oid);
+ lock->unlock();
+ delete lock;
return ret;
}
rgw_bucket& bucket = bucket_info.bucket;
- ret = guard_lc_modify(store, bucket, cookie,
- [&](librados::IoCtx *ctx, const string& oid,
- const cls_rgw_lc_entry& entry) {
- return cls_rgw_lc_set_entry(*ctx, oid, entry);
+ ret = guard_lc_modify(store, sal_lc.get(), bucket, cookie,
+ [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
+ const rgw::sal::Lifecycle::LCEntry& entry) {
+ return sal_lc->set_entry(oid, entry);
});
return ret;
}
- ret = guard_lc_modify(store, bucket, cookie,
- [&](librados::IoCtx *ctx, const string& oid,
- const cls_rgw_lc_entry& entry) {
- return cls_rgw_lc_rm_entry(*ctx, oid, entry);
+ ret = guard_lc_modify(store, sal_lc.get(), bucket, cookie,
+ [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
+ const rgw::sal::Lifecycle::LCEntry& entry) {
+ return sal_lc->rm_entry(oid, entry);
});
return ret;
namespace rgw::lc {
int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
+ rgw::sal::Lifecycle* sal_lc,
const RGWBucketInfo& bucket_info,
const map<std::string,bufferlist>& battrs)
{
std::string lc_oid;
get_lc_oid(store->ctx(), shard_name, &lc_oid);
- cls_rgw_lc_entry entry;
+ rgw::sal::Lifecycle::LCEntry entry;
// There are multiple cases we need to encounter here
// 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
// 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
// 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
// We are not dropping the old marker here as that would be caught by the next LC process update
- auto lc_pool_ctx = store->getRados()->get_lc_pool_ctx();
- int ret = cls_rgw_lc_get_entry(*lc_pool_ctx,
- lc_oid, shard_name, entry);
+ int ret = sal_lc->get_entry(lc_oid, shard_name, entry);
if (ret == 0) {
ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl;
return ret; // entry is already existing correctly set to marker
}
- ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl;
+ ldout(store->ctx(), 5) << "lc_get_entry errored ret code=" << ret << dendl;
if (ret == -ENOENT) {
ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name
<< " creating " << dendl;
std::string cookie = cookie_buf;
ret = guard_lc_modify(
- store, bucket_info.bucket, cookie,
- [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx,
+ store, sal_lc, bucket_info.bucket, cookie,
+ [&sal_lc, &lc_oid](rgw::sal::Lifecycle* slc,
const string& oid,
- const cls_rgw_lc_entry& entry) {
- return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
+ const rgw::sal::Lifecycle::LCEntry& entry) {
+ return slc->set_entry(lc_oid, entry);
});
}
#include "rgw_multi.h"
#include "rgw_acl_s3.h"
-/* Stuff for RGWRadosStore. Move to separate file when store split out */
#include "rgw_zone.h"
#include "rgw_rest_conn.h"
#include "services/svc_sys_obj.h"
#include "services/svc_zone.h"
#include "services/svc_tier_rados.h"
+#include "cls/rgw/cls_rgw_client.h"
#define dout_subsys ceph_subsys_rgw
set_key(tobj.key);
}
+void RGWRadosObject::get_raw_obj(rgw_raw_obj* raw_obj)
+{
+ store->getRados()->obj_to_raw((bucket->get_info()).placement_rule, get_obj(), raw_obj);
+}
+
int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
const std::set<std::string>& keys,
RGWAttrs *vals)
return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals);
}
+MPSerializer* RGWRadosObject::get_serializer(const std::string& lock_name)
+{
+ return new MPRadosSerializer(store, this, lock_name);
+}
+
+int RGWRadosObject::transition(RGWObjectCtx& rctx,
+ RGWBucket* bucket,
+ const rgw_placement_rule& placement_rule,
+ const real_time& mtime,
+ uint64_t olh_epoch,
+ const DoutPrefixProvider *dpp,
+ optional_yield y)
+{
+ return store->getRados()->transition_obj(rctx, bucket, *this, placement_rule, mtime, olh_epoch, dpp, y);
+}
+
std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
{
return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
return ret;
}
+int RGWRadosObject::swift_versioning_restore(RGWObjectCtx* obj_ctx,
+ bool& restored,
+ const DoutPrefixProvider *dpp)
+{
+ return store->getRados()->swift_versioning_restore(*obj_ctx,
+ bucket->get_owner()->get_id(),
+ bucket,
+ this,
+ restored,
+ dpp);
+}
+
+int RGWRadosObject::swift_versioning_copy(RGWObjectCtx* obj_ctx,
+ const DoutPrefixProvider *dpp,
+ optional_yield y)
+{
+ return store->getRados()->swift_versioning_copy(*obj_ctx,
+ bucket->get_info().owner,
+ bucket,
+ this,
+ dpp,
+ y);
+}
+
int RGWRadosStore::get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket, optional_yield y)
{
int ret;
return rados->svc.zone->get_zonegroup(id, zonegroup);
}
+int RGWRadosStore::cluster_stat(RGWClusterStat& stats)
+{
+ rados_cluster_stat_t rados_stats;
+ int ret;
+
+ ret = rados->get_rados_handle()->cluster_stat(rados_stats);
+ if (ret < 0)
+ return ret;
+
+ stats.kb = rados_stats.kb;
+ stats.kb_used = rados_stats.kb_used;
+ stats.kb_avail = rados_stats.kb_avail;
+ stats.num_objects = rados_stats.num_objects;
+
+ return ret;
+}
+
int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b,
const string& zonegroup_id,
rgw_placement_rule& placement_rule,
return ret;
}
+std::unique_ptr<Lifecycle> RGWRadosStore::get_lifecycle(void)
+{
+ return std::unique_ptr<Lifecycle>(new RadosLifecycle(this));
+}
+
+
+MPRadosSerializer::MPRadosSerializer(RGWRadosStore* store, RGWRadosObject* obj, const std::string& lock_name) :
+ lock(lock_name)
+{
+ rgw_pool meta_pool;
+ rgw_raw_obj raw_obj;
+
+ obj->get_raw_obj(&raw_obj);
+ oid = raw_obj.oid;
+ store->getRados()->get_obj_data_pool(obj->get_bucket()->get_placement_rule(),
+ obj->get_obj(), &meta_pool);
+ store->getRados()->open_pool_ctx(meta_pool, ioctx, true);
+}
+
+int MPRadosSerializer::try_lock(utime_t dur, optional_yield y)
+{
+ op.assert_exists();
+ lock.set_duration(dur);
+ lock.lock_exclusive(&op);
+ int ret = rgw_rados_operate(ioctx, oid, &op, y);
+ if (! ret) {
+ locked = true;
+ }
+ return ret;
+}
+
+LCRadosSerializer::LCRadosSerializer(RGWRadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) :
+ lock(lock_name), oid(_oid)
+{
+ ioctx = &store->getRados()->lc_pool_ctx;
+ lock.set_cookie(cookie);
+}
+
+int LCRadosSerializer::try_lock(utime_t dur, optional_yield y)
+{
+ lock.set_duration(dur);
+ return lock.lock_exclusive(ioctx, oid);
+}
+
+int RadosLifecycle::get_entry(const string& oid, const std::string& marker,
+ LCEntry& entry)
+{
+ cls_rgw_lc_entry cls_entry;
+ int ret = cls_rgw_lc_get_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker, cls_entry);
+
+ entry.bucket = cls_entry.bucket;
+ entry.start_time = cls_entry.start_time;
+ entry.status = cls_entry.status;
+
+ return ret;
+}
+
+int RadosLifecycle::get_next_entry(const string& oid, std::string& marker,
+ LCEntry& entry)
+{
+ cls_rgw_lc_entry cls_entry;
+ int ret = cls_rgw_lc_get_next_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker,
+ cls_entry);
+
+ entry.bucket = cls_entry.bucket;
+ entry.start_time = cls_entry.start_time;
+ entry.status = cls_entry.status;
+
+ return ret;
+}
+
+int RadosLifecycle::set_entry(const string& oid, const LCEntry& entry)
+{
+ cls_rgw_lc_entry cls_entry;
+
+ cls_entry.bucket = entry.bucket;
+ cls_entry.start_time = entry.start_time;
+ cls_entry.status = entry.status;
+
+ return cls_rgw_lc_set_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry);
+}
+
+int RadosLifecycle::list_entries(const string& oid, const string& marker,
+ uint32_t max_entries, vector<LCEntry>& entries)
+{
+ vector<cls_rgw_lc_entry> cls_entries;
+ int ret = cls_rgw_lc_list(*store->getRados()->get_lc_pool_ctx(), oid, marker, max_entries, cls_entries);
+
+ if (ret < 0)
+ return ret;
+
+ for (auto& entry : cls_entries) {
+ entries.push_back(LCEntry(entry.bucket, entry.start_time, entry.status));
+ }
+
+ return ret;
+}
+
+int RadosLifecycle::rm_entry(const string& oid, const LCEntry& entry)
+{
+ cls_rgw_lc_entry cls_entry;
+
+ cls_entry.bucket = entry.bucket;
+ cls_entry.start_time = entry.start_time;
+ cls_entry.status = entry.status;
+
+ return cls_rgw_lc_rm_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry);
+}
+
+int RadosLifecycle::get_head(const string& oid, LCHead& head)
+{
+ cls_rgw_lc_obj_head cls_head;
+ int ret = cls_rgw_lc_get_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
+
+ head.marker = cls_head.marker;
+ head.start_date = cls_head.start_date;
+
+ return ret;
+}
+
+int RadosLifecycle::put_head(const string& oid, const LCHead& head)
+{
+ cls_rgw_lc_obj_head cls_head;
+
+ cls_head.marker = head.marker;
+ cls_head.start_date = head.start_date;
+
+ return cls_rgw_lc_put_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
+}
+
+LCSerializer* RadosLifecycle::get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie)
+{
+ return new LCRadosSerializer(store, oid, lock_name, cookie);
+}
+
} // namespace rgw::sal
rgw::sal::RGWRadosStore *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)