return l.lock_exclusive(&ref.pool.ioctx(), ref.obj.oid);
}
-RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
- RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
- const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn), store(_store),
- obj(_obj),
- lock_name(_name),
- cookie(_cookie),
- duration_secs(_duration_secs)
-{
-}
+RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(
+ RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore *_store,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ const string& _name,
+ const string& _cookie,
+ uint32_t _duration_secs) : RGWAsyncRadosRequest(caller, cn),
+ store(_store),
+ obj(_obj),
+ lock_name(_name),
+ cookie(_cookie),
+ duration_secs(_duration_secs)
+ {}
int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider *dpp)
{
return r;
}
-RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
- const rgw_raw_obj& _obj,
- const string& _lock_name,
- const string& _cookie,
- uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
- lock_name(_lock_name),
- cookie(_cookie),
- duration(_duration),
- obj(_obj),
- req(NULL)
+RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(
+ RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore *_store,
+ const rgw_raw_obj& _obj,
+ const string& _lock_name,
+ const string& _cookie,
+ uint32_t _duration_secs) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ lock_name(_lock_name),
+ cookie(_cookie),
+ duration_secs(_duration_secs),
+ obj(_obj),
+ req(nullptr)
{
- set_description() << "rados lock dest=" << obj << " lock=" << lock_name << " cookie=" << cookie << " duration=" << duration;
+ set_description() << "rados lock dest=" << obj << " lock=" <<
+ lock_name << " cookie=" << cookie << " duration_secs=" << duration_secs;
}
void RGWSimpleRadosLockCR::request_cleanup()
{
set_status() << "sending request";
req = new RGWAsyncLockSystemObj(this, stack->create_completion_notifier(),
- store, NULL, obj, lock_name, cookie, duration);
+ store, NULL, obj, lock_name, cookie,
+ duration_secs);
async_rados->queue(req);
return 0;
}
int RGWContinuousLeaseCR::operate(const DoutPrefixProvider *dpp)
{
+ bool renew = false;
if (aborted) {
caller->set_sleeping(false);
return set_cr_done();
last_renew_try_time = ceph::coarse_mono_clock::now();
while (!going_down) {
current_time = ceph::coarse_mono_clock::now();
- yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
+ if (bid_mgr) {
+ if(renew || bid_mgr->is_highest_bidder(shard_id)) {
+ yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, lock_duration_secs));
+ }
+ else {
+ retcode = -EBUSY;
+ }
+ } else {
+ yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, lock_duration_secs));
+ }
if (latency) {
- latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
+ latency->add_latency(ceph::coarse_mono_clock::now() - current_time);
}
- current_time = ceph::coarse_mono_clock::now();
if (current_time - last_renew_try_time > interval_tolerance) {
// renewal should happen between 50%-90% of interval
ldout(store->ctx(), 1) << *this << ": WARNING: did not renew lock " << obj << ":" << lock_name << ": within 90\% of interval. " <<
ldout(store->ctx(), 20) << *this << ": couldn't lock " << obj << ":" << lock_name << ": retcode=" << retcode << dendl;
return set_state(RGWCoroutine_Error, retcode);
}
+ renew = true;
ldout(store->ctx(), 20) << *this << ": successfully locked " << obj << ":" << lock_name << dendl;
set_locked(true);
- yield wait(utime_t(interval / 2, 0));
+ yield wait(utime_t(lock_duration_secs / 2, 0));
+ if (releasing_lock) {
+ set_locked(false);
+ yield call(new RGWSimpleRadosUnlockCR(async_rados, store, obj, lock_name, cookie));
+ releasing_lock = false;
+ renew = false;
+ }
}
set_locked(false); /* moot at this point anyway */
current_time = ceph::coarse_mono_clock::now();
#include "rgw_sal_rados.h"
#include "common/WorkQueue.h"
#include "common/Throttle.h"
-
+#include "cls/lock/cls_lock_client.h"
+#include "sync_fairness.h"
#include <atomic>
#include "common/ceph_time.h"
+#include <functional>
#include "services/svc_sys_obj.h"
#include "services/svc_bucket.h"
struct rgw_http_param_pair;
class RGWRESTConn;
-
class RGWAsyncRadosRequest : public RefCountedObject {
RGWCoroutine *caller;
RGWAioCompletionNotifier *notifier;
protected:
int _send_request(const DoutPrefixProvider *dpp) override;
public:
- RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store,
- RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
- const std::string& _name, const std::string& _cookie, uint32_t _duration_secs);
+ RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
+ rgw::sal::RadosStore *_store, RGWObjVersionTracker *_objv_tracker,
+ const rgw_raw_obj& _obj, const std::string& _name,
+ const std::string& _cookie, uint32_t _duration_secs);
};
class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
};
class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- rgw::sal::RadosStore* store;
- std::string lock_name;
- std::string cookie;
- uint32_t duration;
+ protected:
+ RGWAsyncRadosProcessor *async_rados;
+ rgw::sal::RadosStore* store;
+ std::string lock_name;
+ std::string cookie;
+ uint32_t duration_secs;
- rgw_raw_obj obj;
+ rgw_raw_obj obj;
- RGWAsyncLockSystemObj *req;
+ RGWAsyncLockSystemObj *req;
public:
RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store,
const rgw_raw_obj& _obj,
- const std::string& _lock_name,
+ const std::string& _lock_name,
const std::string& _cookie,
- uint32_t _duration);
+ uint32_t _duration_secs);
~RGWSimpleRadosLockCR() override {
request_cleanup();
}
return buf;
}
};
-
class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
rgw::sal::RadosStore* store;
const std::string lock_name;
const std::string cookie{RGWSimpleRadosLockCR::gen_random_cookie(cct)};
- int interval;
- bool going_down{false};
+ int lock_duration_secs;
+ bool going_down{ false };
bool locked{false};
const ceph::timespan interval_tolerance;
ceph::coarse_mono_time last_renew_try_time;
ceph::coarse_mono_time current_time;
+ std::atomic<bool> releasing_lock = { false };
LatencyMonitor* latency;
+ uint32_t shard_id;
+ rgw::sync_fairness::BidManager* bid_mgr;
+
+
public:
RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
rgw::sal::RadosStore* _store,
rgw_raw_obj obj, std::string lock_name,
- int interval, RGWCoroutine* caller,
+ int _lock_duration_secs, RGWCoroutine* caller,
LatencyMonitor* const latency)
: RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
obj(std::move(obj)), lock_name(std::move(lock_name)),
- interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)),
- ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency)
+ lock_duration_secs(_lock_duration_secs), interval_tolerance(ceph::make_timespan(9*_lock_duration_secs/10)),
+ ts_interval(ceph::make_timespan(_lock_duration_secs)), caller(caller), latency(latency), shard_id(0), bid_mgr(nullptr)
+ {}
+ RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados,
+ rgw::sal::RadosStore* _store,
+ rgw_raw_obj obj, std::string lock_name,
+ int _lock_duration_secs, RGWCoroutine* caller,
+ LatencyMonitor* const latency, uint32_t _shard_id, rgw::sync_fairness::BidManager* _bid_mgr)
+ : RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store),
+ obj(std::move(obj)), lock_name(std::move(lock_name)),
+ lock_duration_secs(_lock_duration_secs), interval_tolerance(ceph::make_timespan(9*_lock_duration_secs/10)),
+ ts_interval(ceph::make_timespan(_lock_duration_secs)), caller(caller), latency(latency), shard_id(_shard_id), bid_mgr(_bid_mgr)
{}
-
virtual ~RGWContinuousLeaseCR() override;
int operate(const DoutPrefixProvider *dpp) override;
- bool is_locked() const {
+ virtual bool is_locked() const {
if (ceph::coarse_mono_clock::now() - last_renew_try_time > ts_interval) {
return false;
}
locked = status;
}
+ void release_lock() {
+ releasing_lock = true;
+ wakeup();
+ }
+
void go_down() {
going_down = true;
wakeup();
}
-
void abort() {
aborted = true;
}
-};
+};// RGWContinuousLeaseCR
class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
const DoutPrefixProvider *dpp;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+ ceph::coarse_mono_clock::duration work_duration;
+ bool work_period_done = false;
+ static constexpr ceph::coarse_mono_clock::time_point work_period_end_unset =
+ ceph::coarse_mono_clock::time_point::min();
+ ceph::coarse_mono_clock::time_point work_period_end = work_period_end_unset;
+
bool lost_lock = false;
bool *reset_backoff;
int total_entries = 0;
RGWSyncTraceNodeRef tn;
+
+
public:
RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
const std::string& period, epoch_t realm_epoch,
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
period(period), realm_epoch(realm_epoch), mdlog(mdlog),
shard_id(_shard_id), sync_marker(_marker),
- period_marker(period_marker),
- reset_backoff(_reset_backoff), tn(_tn) {
+ period_marker(period_marker),
+ work_duration(std::chrono::seconds(
+ cct->_conf.get_val<uint64_t>("rgw_sync_work_period"))),
+ reset_backoff(_reset_backoff),
+ tn(_tn)
+ {
*reset_backoff = false;
}
return set_cr_error(r);
}
return 0;
- }
- }
+ } // switch
+ } // while (true)
+
/* unreachable */
return 0;
- }
+ } // RGWMetaSyncShardCR::operate()
void collect_children()
{
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr));
+ lock_name, lock_duration, this, nullptr, shard_id, sync_env->bid_manager));
lease_stack.reset(spawn(lease_cr.get(), false));
lost_lock = false;
}
set_sleeping(true);
yield;
}
+
tn->log(10, "took lease");
+ //work_period_end = ceph::coarse_mono_clock::now() + work_duration;
/* lock succeeded, a retry now should avoid previous backoff status */
*reset_backoff = true;
total_entries = sync_marker.pos;
/* sync! */
+ work_period_end = ceph::coarse_mono_clock::now() + work_duration;
do {
- if (!lease_cr->is_locked()) {
- tn->log(1, "lease is lost, abort");
- lost_lock = true;
- break;
+ while (!lease_cr->is_locked()) {
+ if (lease_cr->is_done()) {
+ drain_all();
+ tn->log(5, "failed to take lease");
+ return lease_cr->get_ret_status();
}
+ set_sleeping(true);
+ yield;
+ }
+
omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
marker, max_entries, omapkeys));
}
}
collect_children();
+ if(ceph::coarse_mono_clock::now() >= work_period_end)
+ {
+ ldpp_dout(sync_env->dpp, 20) << "metadata sync work period end releasing lock" << dendl;
+ yield lease_cr->release_lock();
+ }
} while (omapkeys->more && can_adjust_marker);
tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
* if we reached here, it means that lost_lock is true, otherwise the state
* change in the previous block will prevent us from reaching here
*/
-
yield lease_cr->go_down();
lease_cr.reset();
tn->log(10, "start incremental sync");
can_adjust_marker = true;
/* grab lock */
- if (!lease_cr) { /* could have had a lease_cr lock from previous state */
+ if (!lease_cr) { /* could have had a lock lease from
+ * previous state */
yield {
+ tn->log(20, "creating RGWContinuousLeaseCR");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr));
+ lock_name, lock_duration, this, nullptr, shard_id, sync_env->bid_manager));
lease_stack.reset(spawn(lease_cr.get(), false));
+
lost_lock = false;
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
drain_all();
- tn->log(5, "failed to take lease");
+ tn->log(10, "failed to take lease");
return lease_cr->get_ret_status();
}
set_sleeping(true);
yield;
}
}
- tn->log(10, "took lease");
- // if the period has advanced, we can't use the existing marker
+ while (!lease_cr->is_locked()) {
+ if (lease_cr->is_done()) {
+ drain_all();
+ tn->log(10, "failed to take lease");
+ return lease_cr->get_ret_status();
+ }
+ tn->log(20, "do not yet have lock; will try again");
+ set_sleeping(true);
+ yield;
+ }
+/*
+ if (work_period_end_unset == work_period_end) {
+ work_period_end = ceph::coarse_mono_clock::now() + work_duration;
+ tn->log(10, SSTR(*this << "took lease until " << work_period_end));
+ } // if the period has advanced, we can't use the existing marker
if (sync_marker.realm_epoch < realm_epoch) {
ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker
<< " from old realm_epoch=" << sync_marker.realm_epoch
sync_marker.realm_epoch = realm_epoch;
sync_marker.marker.clear();
}
+ */
+
mdlog_marker = sync_marker.marker;
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
sync_env->shard_obj_name(shard_id),
*/
marker = max_marker = sync_marker.marker;
/* inc sync */
+ work_period_end = ceph::coarse_mono_clock::now() + work_duration;
do {
if (!lease_cr->is_locked()) {
lost_lock = true;
- tn->log(1, "lease is lost, abort");
+ tn->log(10, "lost lease due to expiration");
break;
}
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(10, SSTR(*this << ": done with period"));
break;
}
+ if(ceph::coarse_mono_clock::now() >= work_period_end)
+ {
+ ldpp_dout(sync_env->dpp, 20) << "metadata sync work period end releasing lock" << dendl;
+ yield lease_cr->release_lock();
+ }
if (mdlog_marker == max_marker && can_adjust_marker) {
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
yield wait(utime_t(cct->_conf->rgw_meta_sync_poll_interval, 0));
}
yield lease_cr->go_down();
-
drain_all();
if (lost_lock) {
return set_cr_done();
}
- /* TODO */
return 0;
}
};
: RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
shard_id(_shard_id), sync_marker(_marker),
- period_marker(std::move(period_marker)) {
+ period_marker(std::move(period_marker))
+ {
tn = sync_env->sync_tracer->add_node(_tn_parent, "shard",
std::to_string(shard_id));
- }
+ };
RGWCoroutine *alloc_cr() override {
return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
- shard_id, sync_marker, period_marker, backoff_ptr(), tn);
+ shard_id, sync_marker, period_marker,
+ backoff_ptr(), tn);
}
RGWCoroutine *alloc_finisher_cr() override {
public:
RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor,
- const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn)
+ const rgw_meta_sync_status& _sync_status,
+ RGWSyncTraceNodeRef& _tn)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
pool(sync_env->store->svc()->zone->get_zone_params().log_pool),
cursor(cursor), sync_status(_sync_status), tn(_tn) {}
continue;
}
}
-
using ShardCR = RGWMetaSyncShardControlCR;
auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
mdlog, shard_id, marker,
- std::move(period_marker), tn);
+ std::move(period_marker), tn);
auto stack = spawn(cr, false);
shard_crs[shard_id] = RefPair{cr, stack};
}
ldpp_dout(dpp, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
return -EINVAL;
}
-
- // construct and start the bid manager for sync fairness
const auto& control_pool = store->svc()->zone->get_zone_params().control_pool;
auto control_obj = rgw_raw_obj{control_pool, meta_sync_bids_oid};
store, control_obj, num_shards);
r = bid_manager->start();
if (r < 0) {
+ tn->log(0, SSTR("ERROR: failed to start bidding manager " << r));
return r;
}
sync_env.bid_manager = bid_manager.get();
+ auto notifyrgws = sync_env.bid_manager->notify_cr();
+ r = run(dpp, notifyrgws);
+ if(r < 0) {
+ tn->log(0, SSTR("ERROR: failed to sync bidding information " << r));
+ return r;
+ }
RGWPeriodHistory::Cursor cursor;
do {
if (r < 0) {
return r;
}
- meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
- r = run(dpp, meta_sync_cr);
- if (r < 0) {
- tn->log(0, "ERROR: failed to fetch all metadata keys");
- return r;
+ {
+ meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn);
+ r = run(dpp, meta_sync_cr);
+ if (r < 0) {
+ tn->log(0, "ERROR: failed to fetch all metadata keys");
+ return r;
+ }
}
break;
default:
encode_json("error_code", error_code, f);
encode_json("message", message, f);
}
-