int total_entries;
+ bool *reset_backoff;
public:
RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
- uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+ uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
conn(_conn),
shard_id(_shard_id),
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
- total_entries(0) {
+ total_entries(0), reset_backoff(NULL) {
}
~RGWDataSyncShardCR() {
}
int operate() {
+ int r;
while (true) {
switch (sync_marker.state) {
case rgw_data_sync_marker::FullSync:
- return full_sync();
+ r = full_sync();
+ if (r < 0) {
+ ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ return set_cr_error(r);
+ }
+ return 0;
case rgw_data_sync_marker::IncrementalSync:
- return incremental_sync();
- break;
+ r = incremental_sync();
+ if (r < 0) {
+ ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ return set_cr_error(r);
+ }
+ return 0;
default:
return set_cr_error(-EIO);
}
}
int full_sync() {
+#warning lock shard for full_sync
#define OMAP_GET_MAX_ENTRIES 100
int max_entries = OMAP_GET_MAX_ENTRIES;
reenter(&full_cr) {
int incremental_sync() {
+#warning lock shard for inc_sync
reenter(&incremental_cr) {
set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id),
}
};
+class RGWControlCR : public RGWCoroutine
+{
+ RGWCoroutine *cr;
+ Mutex lock;
+
+ RGWSyncBackoff backoff;
+ bool reset_backoff;
+
+protected:
+ bool *backoff_ptr() {
+ return &reset_backoff;
+ }
+
+ Mutex& cr_lock() {
+ return lock;
+ }
+
+ RGWCoroutine *get_cr() {
+ return cr;
+ }
+
+public:
+ RGWControlCR(CephContext *_cct) : RGWCoroutine(_cct), cr(NULL), lock("RGWControlCR::lock"), reset_backoff(false) {
+ }
+
+ virtual ~RGWControlCR() {
+ if (cr) {
+ cr->put();
+ }
+ }
+
+ virtual RGWCoroutine *alloc_cr() = 0;
+
+ int operate() {
+ reenter(this) {
+ while (true) {
+ yield {
+ Mutex::Locker l(lock);
+ cr = alloc_cr();
+ int r = call(cr);
+ if (r < 0) {
+ cr->put();
+ cr = NULL;
+ ldout(cct, 0) << "ERROR: call() returned " << r << dendl;
+ return set_cr_error(r);
+ }
+ }
+ {
+ Mutex::Locker l(lock);
+ cr->put();
+ cr = NULL;
+ }
+ if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) {
+ ldout(cct, 0) << "ERROR: RGWControlCR called coroutine returned " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (reset_backoff) {
+ backoff.reset();
+ }
+ yield backoff.backoff(this);
+ }
+ }
+ return 0;
+ }
+};
+
+class RGWDataSyncShardControlCR : public RGWControlCR {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
+
+ rgw_bucket pool;
+
+ string source_zone;
+ uint32_t shard_id;
+ rgw_data_sync_marker sync_marker;
+
+public:
+ RGWDataSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
+ uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWControlCR(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ conn(_conn),
+ pool(_pool),
+ source_zone(_source_zone),
+ shard_id(_shard_id),
+ sync_marker(_marker) {
+ }
+
+ RGWCoroutine *alloc_cr() {
+ return new RGWDataSyncShardCR(store, http_manager, async_rados, conn, pool, source_zone, shard_id, sync_marker, backoff_ptr());
+ }
+
+ void append_modified_shards(set<string>& keys) {
+ Mutex::Locker l(cr_lock());
+
+ RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
+ if (!cr) {
+ return;
+ }
+
+ cr->append_modified_shards(keys);
+ }
+};
+
class RGWDataSyncCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http_manager;
RGWDataSyncShardMarkerTrack *marker_tracker;
Mutex shard_crs_lock;
- map<int, RGWDataSyncShardCR *> shard_crs;
+ map<int, RGWDataSyncShardControlCR *> shard_crs;
+
+ bool *reset_backoff;
public:
RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+ RGWRESTConn *_conn, const string& _source_zone, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
conn(_conn),
source_zone(_source_zone),
obj_ctx(store),
marker_tracker(NULL),
- shard_crs_lock("RGWDataSyncCR::shard_crs_lock") {
+ shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
+ reset_backoff(_reset_backoff) {
}
int operate() {
return set_cr_error(retcode);
}
- yield {
- /* state: init status */
- if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
- ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
+ /* state: init status */
+ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
+ ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
+ yield {
r = call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
return set_cr_error(r);
}
- sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- /* update new state */
- yield {
- r = call(set_sync_info_cr());
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
- return r;
- }
+ }
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
+ /* update new state */
+ yield {
+ r = call(set_sync_info_cr());
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
+ return set_cr_error(r);
}
}
- }
- if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
- return set_cr_error(retcode);
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ *reset_backoff = true;
}
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
r = call(set_sync_info_cr());
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
- return r;
+ return set_cr_error(r);
}
}
+
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ *reset_backoff = true;
}
+
yield {
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
case rgw_data_sync_info::StateSync:
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {
- RGWDataSyncShardCR *cr = new RGWDataSyncShardCR(store, http_manager, async_rados,
+ RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(store, http_manager, async_rados,
conn, store->get_zone_params().log_pool, source_zone,
iter->first, iter->second);
shard_crs_lock.Lock();
void wakeup(int shard_id, set<string>& keys) {
Mutex::Locker l(shard_crs_lock);
- map<int, RGWDataSyncShardCR *>::iterator iter = shard_crs.find(shard_id);
+ map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
if (iter == shard_crs.end()) {
return;
}
}
};
-class RGWDataSyncControlCR : public RGWCoroutine
+class RGWDataSyncControlCR : public RGWControlCR
{
RGWRados *store;
RGWHTTPManager *http_manager;
RGWRESTConn *conn;
string source_zone;
- Mutex lock;
- RGWDataSyncCR *cr;
-
- RGWSyncBackoff backoff;
- bool reset_backoff;
-
public:
RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+ RGWRESTConn *_conn, const string& _source_zone) : RGWControlCR(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
conn(_conn),
- source_zone(_source_zone),
- lock("RGWDataSyncControlCR::lock"),
- cr(NULL),
- reset_backoff(false) {
+ source_zone(_source_zone) {
}
- ~RGWDataSyncControlCR() {
- Mutex::Locker l(lock);
- if (cr) {
- cr->put();
- cr = NULL;
- }
- }
-
- int operate() {
- reenter(this) {
- while (true) {
- yield {
- Mutex::Locker l(lock);
- cr = new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone);
- cr->get();
- int r = call(new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone));
- if (r < 0) {
- cr->put();
- cr = NULL;
- ldout(store->ctx(), 0) << "ERROR: call(RGWDataSyncControlCR()) returned " << r << dendl;
- return set_cr_error(r);
- }
- }
- {
- Mutex::Locker l(lock);
- cr->put();
- cr = NULL;
- }
- if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) {
- ldout(store->ctx(), 0) << "ERROR: RGWDataSyncControlCR() returned " << retcode << dendl;
- return set_cr_error(retcode);
- }
- if (reset_backoff) {
- backoff.reset();
- }
- yield backoff.backoff(this);
- }
- }
- return 0;
+ RGWCoroutine *alloc_cr() {
+ return new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone, backoff_ptr());
}
void wakeup(int shard_id, set<string>& keys) {
- Mutex::Locker l(lock);
+ Mutex::Locker l(cr_lock());
+
+ RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
if (cr) {
cr->wakeup(shard_id, keys);
}