#include <string.h>
#include <iostream>
#include <map>
+#include <algorithm>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
#include "include/random.h"
#include "cls/rgw/cls_rgw_client.h"
#include "cls/lock/cls_lock_client.h"
+#include "rgw_perf_counters.h"
#include "rgw_common.h"
#include "rgw_bucket.h"
#include "rgw_lc.h"
if (id.length() > MAX_ID_LEN) {
return false;
}
- else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration &&
+ else if(expiration.empty() && noncur_expiration.empty() &&
+ mp_expiration.empty() && !dm_expiration &&
transitions.empty() && noncur_transitions.empty()) {
return false;
}
- else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
+ else if (!expiration.valid() || !noncur_expiration.valid() ||
+ !mp_expiration.valid()) {
return false;
}
if (!transitions.empty()) {
return true;
}
-void LCRule::init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days)
+void LCRule::init_simple_days_rule(std::string_view _id,
+ std::string_view _prefix, int num_days)
{
id = _id;
prefix = _prefix;
} else {
action.date = ceph::from_iso_8601(elem.second.get_date());
}
- action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first);
+ action.storage_class
+ = rgw_placement_rule::get_canonical_storage_class(elem.first);
op.transitions.emplace(elem.first, std::move(action));
}
for (const auto &elem : rule.get_noncur_transitions()) {
if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
return -EINVAL;
}
- if (rule.get_filter().has_tags() && (rule.get_dm_expiration() || !rule.get_mp_expiration().empty())) {
+ if (rule.get_filter().has_tags() && (rule.get_dm_expiration() ||
+ !rule.get_mp_expiration().empty())) {
return -ERR_INVALID_REQUEST;
}
rule_map.insert(pair<string, LCRule>(id, rule));
return 0;
}
-bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
+bool RGWLifecycleConfiguration::has_same_action(const lc_op& first,
+ const lc_op& second) {
if ((first.expiration > 0 || first.expiration_date != boost::none) &&
(second.expiration > 0 || second.expiration_date != boost::none)) {
return true;
return true;
}
}
- } else if (!first.noncur_transitions.empty() && !second.noncur_transitions.empty()) {
+ } else if (!first.noncur_transitions.empty() &&
+ !second.noncur_transitions.empty()) {
for (auto &elem : first.noncur_transitions) {
- if (second.noncur_transitions.find(elem.first) != second.noncur_transitions.end()) {
+ if (second.noncur_transitions.find(elem.first) !=
+ second.noncur_transitions.end()) {
return true;
}
}
return true;
}
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
+ RGWLC *_lc)
+ : dpp(_dpp), cct(_cct), lc(_lc) {}
+
void *RGWLC::LCWorker::entry() {
do {
utime_t start = ceph_clock_now();
if (should_work(start)) {
ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
- int r = lc->process();
+ int r = lc->process(this);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
}
return false;
}
-int RGWLC::bucket_lc_prepare(int index)
+int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
{
map<string, int > entries;
#define MAX_LC_LIST_ENTRIES 100
do {
- int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
+ int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
+ marker, MAX_LC_LIST_ENTRIES, entries);
if (ret < 0)
return ret;
map<string, int>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
pair<string, int > entry(iter->first, lc_uninitial);
- ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
+ ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
+ obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
<< obj_names[index] << dendl;
return 0;
}
-static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr)
+static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days,
+ ceph::real_time *expire_time = nullptr)
{
double timediff, cmp;
utime_t base_time;
return (timediff >= cmp);
}
-static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx)
+static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
+ rgw_obj& obj, RGWObjectCtx& ctx)
{
if (!bucket_info.obj_lock_enabled()) {
return true;
ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
return false;
}
- if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > ceph_clock_now()) {
+ if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
+ ceph_clock_now()) {
return false;
}
}
auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
list_op.params.list_versions = false;
/* lifecycle processing does not depend on total order, so can
- * take advantage of unorderd listing optimizations--such as
+ * take advantage of unordered listing optimizations--such as
* operating on one shard at a time */
list_op.params.allow_unordered = true;
list_op.params.ns = RGW_OBJ_NS_MULTIPART;
list_op.params.filter = &mp_filter;
- for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+ for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+ ++prefix_iter) {
if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
continue;
}
}
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
+ if (obj_has_expired(cct, obj_iter->meta.mtime,
+ prefix_iter->second.mp_expiration)) {
rgw_obj_key key(obj_iter->key);
if (!mp_obj.from_meta(key.name)) {
continue;
}
RGWObjectCtx rctx(store);
+ /* XXXX where is this defined? */
ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
return 0;
}
-static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
+static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
+ rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
{
RGWRados::Object op_target(store, bucket_info, ctx, obj);
RGWRados::Object::Read read_op(&op_target);
}
int fetch() {
- int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
+ int ret = list_op.list_objects(
+ 1000, &objs, NULL, &is_truncated, null_yield);
if (ret < 0) {
return ret;
}
struct op_env {
+
+ using LCWorker = RGWLC::LCWorker;
+
lc_op& op;
rgw::sal::RGWRadosStore *store;
- RGWLC *lc;
+ LCWorker* worker;
RGWBucketInfo& bucket_info;
LCObjsLister& ol;
- op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info,
- LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {}
-};
+ op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker,
+ RGWBucketInfo& _bucket_info, LCObjsLister& _ol)
+ : op(_op), store(_store), worker(_worker), bucket_info(_bucket_info),
+ ol(_ol) {}
+}; /* op_env */
class LCRuleOp;
RGWObjectCtx rctx;
const DoutPrefixProvider *dpp;
- lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o, const DoutPrefixProvider *_dpp) : cct(_env.store->ctx()), env(_env), o(_o),
- store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
- obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
-};
+ lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o,
+ const DoutPrefixProvider *_dpp)
+ : cct(_env.store->ctx()), env(_env), o(_o),
+ store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
+ obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
+}; /* lc_op_ctx */
static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
{
del_op.params.obj_owner = obj_owner;
del_op.params.unmod_since = meta.mtime;
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_remove_expired, 1);
+ }
+
return del_op.delete_obj(null_yield);
}
*skip = true;
bufferlist tags_bl;
- int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx, tags_bl);
+ int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj,
+ oc.rctx, tags_bl);
if (ret < 0) {
if (ret != -ENODATA) {
ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
return false;
}
- is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
+ is_expired = ceph_clock_now() >=
+ ceph::real_clock::to_time_t(*op.expiration_date);
*exp_time = *op.expiration_date;
} else {
is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
- return is_expired && pass_object_lock_check(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx);
+ return is_expired &&
+ pass_object_lock_check(oc.store->getRados(),
+ oc.bucket_info, oc.obj, oc.rctx);
}
int process(lc_op_ctx& oc) {
virtual bool check_current_state(bool is_current) = 0;
virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
public:
- LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {}
+ LCOpAction_Transition(const transition_action& _transition)
+ : transition(_transition) {}
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
return false;
}
- is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*transition.date);
+ is_expired = ceph_clock_now() >=
+ ceph::real_clock::to_time_t(*transition.date);
*exp_time = *transition.date;
} else {
is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
- need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != transition.storage_class);
+ need_to_process =
+ (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
+ transition.storage_class);
return is_expired;
}
target_placement.inherit_from(oc.bucket_info.placement_rule);
target_placement.storage_class = transition.storage_class;
- if (!oc.store->svc()->zone->get_zone_params().valid_placement(target_placement)) {
+ if (!oc.store->svc()->zone->get_zone_params().
+ valid_placement(target_placement)) {
ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
<< " bucket="<< oc.bucket_info.bucket
<< " rule_id=" << oc.op.id << dendl;
return -EINVAL;
}
- int r = oc.store->getRados()->transition_obj(oc.rctx, oc.bucket_info, oc.obj,
- target_placement, o.meta.mtime, o.versioned_epoch, oc.dpp, null_yield);
+ int r = oc.store->getRados()->transition_obj(
+ oc.rctx, oc.bucket_info, oc.obj, target_placement, o.meta.mtime,
+ o.versioned_epoch, oc.dpp, null_yield);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
<< oc.bucket_info.bucket << ":" << o.key
return oc.o.meta.mtime;
}
public:
- LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
+ LCOpAction_CurrentTransition(const transition_action& _transition)
+ : LCOpAction_Transition(_transition) {}
};
class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
return oc.ol.get_prev_obj().meta.mtime;
}
public:
- LCOpAction_NonCurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
+ LCOpAction_NonCurrentTransition(const transition_action& _transition)
+ : LCOpAction_Transition(_transition) {}
};
void LCOpRule::build()
}
-int RGWLC::bucket_lc_process(string& shard_id)
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
string bucket_tenant = result[0];
string bucket_name = result[1];
string bucket_marker = result[2];
- int ret = store->getRados()->get_bucket_info(store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield, &bucket_attrs);
+ int ret = store->getRados()->get_bucket_info(
+ store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
+ &bucket_attrs);
if (ret < 0) {
ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
return ret;
rgw_obj_key pre_marker;
rgw_obj_key next_marker;
- for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
+ for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
+ ++prefix_iter) {
auto& op = prefix_iter->second;
if (!is_valid_op(op)) {
continue;
}
ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
if (prefix_iter != prefix_map.begin() &&
- (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
+ (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
+ prev(prefix_iter)->first) == 0)) {
next_marker = pre_marker;
} else {
pre_marker = next_marker;
return ret;
}
- op_env oenv(op, store, this, bucket_info, ol);
+ op_env oenv(op, store, worker, bucket_info, ol);
LCOpRule orule(oenv);
return ret;
}
-int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
+int RGWLC::bucket_lc_post(int index, int max_lock_sec,
+ pair<string, int>& entry, int& result,
+ LCWorker* worker)
{
utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
l.set_duration(lock_duration);
do {
- int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ int ret = l.lock_exclusive(
+ &store->getRados()->lc_pool_ctx, obj_names[index]);
if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
<< obj_names[index] << ", sleep 5, try again" << dendl;
return 0;
ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
if (result == -ENOENT) {
- ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
+ ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
+ obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
<< obj_names[index] << dendl;
entry.second = lc_complete;
}
- ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
+ ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
+ obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
<< obj_names[index] << dendl;
} while (true);
}
-int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
+int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
+ map<string, int>* progress_map)
{
int index = 0;
progress_map->clear();
for(; index <max_objs; index++) {
map<string, int > entries;
- int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
+ int ret =
+ cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
+ max_entries, entries);
if (ret < 0) {
if (ret == -ENOENT) {
ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
return 0;
}
-int RGWLC::process()
+static inline vector<int> random_sequence(uint32_t n)
{
- int max_secs = cct->_conf->rgw_lc_lock_max_time;
+ vector<int> v(n-1, 0);
+ std::generate(v.begin(), v.end(),
+ [ix = 0]() mutable {
+ return ix++;
+ });
+ std::random_shuffle(v.begin(), v.end());
+ return v;
+}
- const int start = ceph::util::generate_random_number(0, max_objs - 1);
+int RGWLC::process(LCWorker* worker)
+{
+ int max_secs = cct->_conf->rgw_lc_lock_max_time;
- for (int i = 0; i < max_objs; i++) {
- int index = (i + start) % max_objs;
- int ret = process(index, max_secs);
+ /* generate an index-shard sequence unrelated to any other
+ * that might be running in parallel */
+ vector<int> shard_seq = random_sequence(max_objs);
+ for (auto index : shard_seq) {
+ int ret = process(index, max_secs, worker);
if (ret < 0)
return ret;
}
return 0;
}
-int RGWLC::process(int index, int max_lock_secs)
+int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
{
rados::cls::lock::Lock l(lc_index_lock_name);
do {
utime_t time(max_lock_secs, 0);
l.set_duration(time);
- int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
+ obj_names[index]);
if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
<< obj_names[index] << ", sleep 5, try again" << dendl;
return 0;
cls_rgw_lc_obj_head head;
- ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index], head);
+ ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index],
+ head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
<< obj_names[index] << ", ret=" << ret << dendl;
if(!if_already_run_today(head.start_date)) {
head.start_date = now;
head.marker.clear();
- ret = bucket_lc_prepare(index);
+ ret = bucket_lc_prepare(index, worker);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
- << obj_names[index] << ", ret=" << ret << dendl;
+ << obj_names[index]
+ << ", ret=" << ret
+ << dendl;
goto exit;
}
}
- ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx, obj_names[index], head.marker, entry);
+ ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx,
+ obj_names[index], head.marker, entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
<< obj_names[index] << dendl;
goto exit;
}
+ /* termination condition (eof) */
if (entry.first.empty())
goto exit;
entry.second = lc_processing;
- ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
+ ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
+ obj_names[index], entry);
if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index]
- << " (" << entry.first << "," << entry.second << ")" << dendl;
+ ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
+ << obj_names[index]
+ << " (" << entry.first << ","
+ << entry.second << ")"
+ << dendl;
goto exit;
}
head.marker = entry.first;
- ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index], head);
+ ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index],
+ head);
if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
+ ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
+ << obj_names[index]
+ << dendl;
goto exit;
}
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- ret = bucket_lc_process(entry.first);
- bucket_lc_post(index, max_lock_secs, entry, ret);
- }while(1);
+ ret = bucket_lc_process(entry.first, worker);
+ bucket_lc_post(index, max_lock_secs, entry, ret, worker);
+ } while(1);
exit:
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
void RGWLC::start_processor()
{
- worker = new LCWorker(this, cct, this);
- worker->create("lifecycle_thr");
+ auto maxw = cct->_conf->rgw_lc_max_worker;
+ workers.reserve(maxw);
+ for (int ix = 0; ix < maxw; ++ix) {
+ auto worker =
+ std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this);
+ worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
+ workers.emplace_back(std::move(worker));
+ }
}
void RGWLC::stop_processor()
{
down_flag = true;
- if (worker) {
+ for (auto& worker : workers) {
worker->stop();
worker->join();
}
- delete worker;
- worker = NULL;
+ workers.clear();
}
-
unsigned RGWLC::get_subsys() const
{
return dout_subsys;
int end_hour;
int end_minute;
string worktime = cct->_conf->rgw_lifecycle_work_time;
- sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
+ sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute,
+ &end_hour, &end_minute);
struct tm bdt;
time_t tt = now.sec();
localtime_r(&tt, &bdt);
int end_hour;
int end_minute;
string worktime = cct->_conf->rgw_lifecycle_work_time;
- sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
+ sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour,
+ &end_minute);
struct tm bdt;
time_t tt = now.sec();
time_t nt;
return secs>0 ? secs : secs+24*60*60;
}
-void RGWLifecycleConfiguration::generate_test_instances(list<RGWLifecycleConfiguration*>& o)
+void RGWLifecycleConfiguration::generate_test_instances(
+ list<RGWLifecycleConfiguration*>& o)
{
o.push_back(new RGWLifecycleConfiguration);
}
void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
{
- int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs);
- int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
+ int max_objs =
+ (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
+ cct->_conf->rgw_lc_max_objs);
+ /* XXXX oh noes!!! */
+ int index = ceph_str_hash_linux(shard_id.c_str(),
+ shard_id.size()) % HASH_PRIME % max_objs;
*oid = lc_oid_prefix;
char buf[32];
snprintf(buf, 32, ".%d", index);
return;
}
-
-
static std::string get_lc_shard_name(const rgw_bucket& bucket){
return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
}
template<typename F>
-static int guard_lc_modify(rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket, const string& cookie, const F& f) {
+static int guard_lc_modify(
+ rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket,
+ const string& cookie, const F& f) {
CephContext *cct = store->ctx();
string shard_id = get_lc_shard_name(bucket);
attrs[RGW_ATTR_LC] = std::move(lc_bl);
- int ret = store->ctl()->bucket->set_bucket_instance_attrs(bucket_info, attrs,
- &bucket_info.objv_tracker,
- null_yield);
+ int ret =
+ store->ctl()->bucket->set_bucket_instance_attrs(
+ bucket_info, attrs, &bucket_info.objv_tracker, null_yield);
if (ret < 0)
return ret;
rgw_bucket& bucket = bucket_info.bucket;
-
- ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
- const pair<string, int>& entry) {
+ ret = guard_lc_modify(store, bucket, cookie,
+ [&](librados::IoCtx *ctx, const string& oid,
+ const pair<string, int>& entry) {
return cls_rgw_lc_set_entry(*ctx, oid, entry);
});
{
map<string, bufferlist> attrs = bucket_attrs;
attrs.erase(RGW_ATTR_LC);
- int ret = store->ctl()->bucket->set_bucket_instance_attrs(bucket_info, attrs,
- &bucket_info.objv_tracker,
- null_yield);
+ int ret =
+ store->ctl()->bucket->set_bucket_instance_attrs(
+ bucket_info, attrs, &bucket_info.objv_tracker, null_yield);
rgw_bucket& bucket = bucket_info.bucket;
}
- ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
- const pair<string, int>& entry) {
+ ret = guard_lc_modify(store, bucket, cookie,
+ [&](librados::IoCtx *ctx, const string& oid,
+ const pair<string, int>& entry) {
return cls_rgw_lc_rm_entry(*ctx, oid, entry);
});
namespace rgw::lc {
-int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, const RGWBucketInfo& bucket_info,
+int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
+ const RGWBucketInfo& bucket_info,
const map<std::string,bufferlist>& battrs)
{
if (auto aiter = battrs.find(RGW_ATTR_LC);
gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
std::string cookie = cookie_buf;
- ret = guard_lc_modify(store, bucket_info.bucket, cookie,
- [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
- const pair<string, int>& entry) {
- return cls_rgw_lc_set_entry(*lc_pool_ctx,
- lc_oid, entry);
- });
+ ret = guard_lc_modify(
+ store, bucket_info.bucket, cookie,
+ [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
+ const pair<string, int>& entry) {
+ return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
+ });
}