utime_t start = ceph_clock_now();
if (should_work(start)) {
ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
- int r = lc->process(this);
+ int r = lc->process(this, false /* once */);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
<< r << dendl;
return v;
}
-int RGWLC::process(LCWorker* worker)
+int RGWLC::process(LCWorker* worker, bool once = false)
{
int max_secs = cct->_conf->rgw_lc_lock_max_time;
* that might be running in parallel */
vector<int> shard_seq = random_sequence(max_objs);
for (auto index : shard_seq) {
- int ret = process(index, max_secs, worker);
+ int ret = process(index, max_secs, worker, once);
if (ret < 0)
return ret;
}
return time(nullptr) + interval;
}
-int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
+int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
+ bool once = false)
{
dout(5) << "RGWLC::process(): ENTER: "
<< "index: " << index << " worker ix: " << worker->ix
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
ret = bucket_lc_process(entry.bucket, worker, thread_stop_at());
bucket_lc_post(index, max_lock_secs, entry, ret, worker);
- } while(1);
+ } while(1 && !once);
+
+ return 0;
exit:
- l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- return 0;
+ l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ return 0;
}
void RGWLC::start_processor()
RGWLC::LCWorker::~LCWorker()
{
- workpool->drain();
delete workpool;
} /* ~LCWorker */
void initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store);
void finalize();
- int process(LCWorker* worker);
- int process(int index, int max_secs, LCWorker* worker);
+ int process(LCWorker* worker, bool once);
+ int process(int index, int max_secs, LCWorker* worker, bool once);
bool if_already_run_today(time_t start_date);
bool expired_session(time_t started);
time_t thread_stop_at();
int RGWRados::process_lc()
{
- return lc->process(nullptr);
+ RGWLC lc;
+ lc.initialize(cct, this->store);
+ RGWLC::LCWorker worker(&lc, cct, &lc, 0);
+ auto ret = lc.process(&worker, true /* once */);
+ lc.stop_processor(); // sets down_flag, but returns immediately
+ return ret;
}
bool RGWRados::process_expire_objects()