return l.lock_exclusive(&ref.pool.ioctx(), ref.obj.oid);
}
-RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(
- RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore *_store,
- RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
- const string& _name,
- const string& _cookie,
- uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn),
- store(_store),
+RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
obj(_obj),
lock_name(_name),
cookie(_cookie),
duration_secs(_duration_secs)
- {}
+{
+}
int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider *dpp)
{
return r;
}
-RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(
- RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore *_store,
+RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
const rgw_raw_obj& _obj,
const string& _lock_name,
const string& _cookie,
- uint32_t _duration_secs) : RGWSimpleCoroutine(_store->ctx()),
+ uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
async_rados(_async_rados),
store(_store),
lock_name(_lock_name),
cookie(_cookie),
- duration_secs(_duration_secs),
+ duration(_duration),
obj(_obj),
req(nullptr)
{
- set_description() << "rados lock dest=" << obj << " lock=" <<
- lock_name << " cookie=" << cookie << " duration_secs=" << duration_secs;
+ set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
}
void RGWSimpleRadosLockCR::request_cleanup()
{
set_status() << "sending request";
req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
- store, NULL, obj, lock_name, cookie,
- duration_secs);
+ store, NULL, obj, lock_name, cookie, duration);
async_rados->queue(req);
return 0;
}
int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
{
- bool renew = false;
if (aborted) {
caller->set_sleeping(false);
return set_cr_done();
last_renew_try_time = ceph::coarse_mono_clock::now();
while (!going_down) {
current_time = ceph::coarse_mono_clock::now();
- if (bid_mgr) {
- if(renew || bid_mgr->is_highest_bidder(shard_id)) {
- yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, lock_duration_secs));
- }
- else {
- retcode = -EBUSY;
- }
- } else {
- yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, lock_duration_secs));
- }
+ yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
if (latency) {
latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
}
+ current_time = ceph::coarse_mono_clock::now();
if (current_time - last_renew_try_time > interval_tolerance) {
// renewal should happen between 50%-90% of interval
ldout(store->ctx(), 1) << *this << ": WARNING: did not renew lock " << obj << ":" << lock_name << ": within 90\% of interval. " <<
ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
return set_state(RGWCoroutine_Error, retcode);
}
- renew = true;
ldout(store->ctx(), 20) << *this << ": successfully locked " << obj << ":" << lock_name << dendl;
set_locked(true);
- yield wait(utime_t(lock_duration_secs / 2, 0));
- if (releasing_lock) {
- set_locked(false);
- yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
- releasing_lock = false;
- renew = false;
- }
+ yield wait(utime_t(interval / 2, 0));
}
set_locked(false); /* moot at this point anyway */
current_time = ceph::coarse_mono_clock::now();
#include "rgw_sal_rados.h"
#include "common/WorkQueue.h"
#include "common/Throttle.h"
-#include "cls/lock/cls_lock_client.h"
-#include "sync_fairness.h"
+
#include <atomic>
#include "common/ceph_time.h"
-#include <functional>
#include "services/svc_sys_obj.h"
#include "services/svc_bucket.h"
struct rgw_http_param_pair;
class RGWRESTConn;
+
class RGWAsyncRadosRequest : public RefCountedObject {
RGWCoroutine *caller;
RGWAioCompletionNotifier *notifier;
protected:
int _send_request(const DoutPrefixProvider *dpp) override;
public:
- RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
- rgw::sal::RadosStore *_store, RGWObjVersionTracker *_objv_tracker,
- const rgw_raw_obj& _obj, const std::string& _name,
- const std::string& _cookie, uint32_t _duration_secs);
+ RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ const std::string& _name, const std::string& _cookie, uint32_t _duration_secs);
};
class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
};
class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
- protected:
RGWAsyncRadosProcessor *async_rados;
rgw::sal::RadosStore* store;
std::string lock_name;
std::string cookie;
- uint32_t duration_secs;
+ uint32_t duration;
rgw_raw_obj obj;
const rgw_raw_obj& _obj,
const std::string& _lock_name,
const std::string& _cookie,
- uint32_t _duration_secs);
+ uint32_t _duration);
~RGWSimpleRadosLockCR() override {
request_cleanup();
}
return buf;
}
};
+
class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
rgw::sal::RadosStore* store;
const std::string lock_name;
const std::string cookie{RGWSimpleRadosLockCR::gen_random_cookie(cct)};
- int lock_duration_secs;
- bool going_down{ false };
+ int interval;
+ bool going_down{false};
bool locked{false};
const ceph::timespan interval_tolerance;
ceph::coarse_mono_time last_renew_try_time;
ceph::coarse_mono_time current_time;
- std::atomic<bool> releasing_lock = { false };
LatencyMonitor* latency;
- uint32_t shard_id;
- rgw::sync_fairness::BidManager* bid_mgr;
-
-
public:
RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
rgw::sal::RadosStore* _store,
rgw_raw_obj obj, std::string lock_name,
- int _lock_duration_secs, RGWCoroutine* caller,
+ int interval, RGWCoroutine* caller,
LatencyMonitor* const latency)
: RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
obj(std::move(obj)), lock_name(std::move(lock_name)),
- lock_duration_secs(_lock_duration_secs), interval_tolerance(ceph::make_timespan(9*_lock_duration_secs/10)),
- ts_interval(ceph::make_timespan(_lock_duration_secs)), caller(caller), latency(latency), shard_id(0), bid_mgr(nullptr)
- {}
- RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
- rgw::sal::RadosStore* _store,
- rgw_raw_obj obj, std::string lock_name,
- int _lock_duration_secs, RGWCoroutine* caller,
- LatencyMonitor* const latency, uint32_t _shard_id, rgw::sync_fairness::BidManager* _bid_mgr)
- : RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
- obj(std::move(obj)), lock_name(std::move(lock_name)),
- lock_duration_secs(_lock_duration_secs), interval_tolerance(ceph::make_timespan(9*_lock_duration_secs/10)),
- ts_interval(ceph::make_timespan(_lock_duration_secs)), caller(caller), latency(latency), shard_id(_shard_id), bid_mgr(_bid_mgr)
+ interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)),
+ ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency)
{}
+
virtual ~RGWContinuousLeaseCR() override;
int operate(const DoutPrefixProvider *dpp) override;
- virtual bool is_locked() const {
+ bool is_locked() const {
if (ceph::coarse_mono_clock::now() - last_renew_try_time > ts_interval) {
return false;
}
locked = status;
}
- void release_lock() {
- releasing_lock = true;
- wakeup();
- }
-
void go_down() {
going_down = true;
wakeup();
}
+
void abort() {
aborted = true;
}
-};// RGWContinuousLeaseCR
+};
class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
const DoutPrefixProvider *dpp;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
- ceph::coarse_mono_clock::duration work_duration;
- bool work_period_done = false;
- static constexpr ceph::coarse_mono_clock::time_point work_period_end_unset =
- ceph::coarse_mono_clock::time_point::min();
- ceph::coarse_mono_clock::time_point work_period_end = work_period_end_unset;
-
bool lost_lock = false;
bool *reset_backoff;
int total_entries = 0;
RGWSyncTraceNodeRef tn;
-
-
public:
RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
const std::string& period, epoch_t realm_epoch,
period(period), realm_epoch(realm_epoch), mdlog(mdlog),
shard_id(_shard_id), sync_marker(_marker),
period_marker(period_marker),
- work_duration(std::chrono::seconds(
- cct->_conf.get_val<uint64_t>("rgw_sync_work_period"))),
- reset_backoff(_reset_backoff),
- tn(_tn)
- {
+ reset_backoff(_reset_backoff), tn(_tn) {
*reset_backoff = false;
}
yield;
}
tn->log(10, "took lease");
- //work_period_end = ceph::coarse_mono_clock::now() + work_duration;
/* lock succeeded, a retry now should avoid previous backoff status */
*reset_backoff = true;
total_entries = sync_marker.pos;
/* sync! */
- work_period_end = ceph::coarse_mono_clock::now() + work_duration;
do {
if (!lease_cr->is_locked()) {
if (sync_env->bid_manager->is_highest_bidder(shard_id, ceph::coarse_mono_clock::now())) {
}
}
collect_children();
- if(ceph::coarse_mono_clock::now() >= work_period_end)
- {
- ldpp_dout(sync_env->dpp, 20) << "metadata sync work period end releasing lock" << dendl;
- yield lease_cr->release_lock();
- }
} while (omapkeys->more && can_adjust_marker);
tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
tn->log(10, "took lease");
// if the period has advanced, we can't use the existing marker
-/*
- if (work_period_end_unset == work_period_end) {
- work_period_end = ceph::coarse_mono_clock::now() + work_duration;
- tn->log(10, SSTR(*this << "took lease until " << work_period_end));
- } // if the period has advanced, we can't use the existing marker
if (sync_marker.realm_epoch < realm_epoch) {
ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
<< " from old realm_epoch=" << sync_marker.realm_epoch
sync_marker.realm_epoch = realm_epoch;
sync_marker.marker.clear();
}
- */
-
mdlog_marker = sync_marker.marker;
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
sync_env->shard_obj_name(shard_id),
*/
marker = max_marker = sync_marker.marker;
/* inc sync */
- work_period_end = ceph::coarse_mono_clock::now() + work_duration;
do {
if (!lease_cr->is_locked()) {
if (sync_env->bid_manager->is_highest_bidder(shard_id, ceph::coarse_mono_clock::now())) {
tn->log(10, SSTR(*this << ": done with period"));
break;
}
- if(ceph::coarse_mono_clock::now() >= work_period_end)
- {
- ldpp_dout(sync_env->dpp, 20) << "metadata sync work period end releasing lock" << dendl;
- yield lease_cr->release_lock();
- }
if (mdlog_marker == max_marker && can_adjust_marker) {
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
yield wait(utime_t(cct->_conf->rgw_meta_sync_poll_interval, 0));
}
yield lease_cr->go_down();
+
drain_all();
if (lost_lock) {
return set_cr_done();
}
+ /* TODO */
return 0;
}
};
: RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
shard_id(_shard_id), sync_marker(_marker),
- period_marker(std::move(period_marker))
- {
+ period_marker(std::move(period_marker)) {
tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
std::to_string(shard_id));
- };
+ }
RGWCoroutine *alloc_cr() override {
return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
- shard_id, sync_marker, period_marker,
- backoff_ptr(), tn);
+ shard_id, sync_marker, period_marker, backoff_ptr(), tn);
}
RGWCoroutine *alloc_finisher_cr() override {
public:
RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
- const rgw_meta_sync_status& _sync_status,
- RGWSyncTraceNodeRef& _tn)
+ const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
cursor(cursor), sync_status(_sync_status), tn(_tn) {}
if (r < 0) {
return r;
}
- {
meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
r = run(dpp, meta_sync_cr);
if (r < 0) {
tn->log(0, "ERROR: failed to fetch all metadata keys");
return r;
- }
}
break;
default: