public:
RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+ RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
conn(_conn),
}
};
+class RGWDataSyncControlCR : public RGWCoroutine
+{
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
+ string source_zone;
+
+ Mutex lock;
+ RGWDataSyncCR *cr;
+
+ RGWSyncBackoff backoff;
+ bool reset_backoff;
+
+public:
+ RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ conn(_conn),
+ source_zone(_source_zone),
+ lock("RGWDataSyncControlCR::lock"),
+ cr(NULL),
+ reset_backoff(false) {
+ }
+
+ ~RGWDataSyncControlCR() {
+ Mutex::Locker l(lock);
+ if (cr) {
+ cr->put();
+ cr = NULL;
+ }
+ }
+
+ int operate() {
+ reenter(this) {
+ while (true) {
+ yield {
+ Mutex::Locker l(lock);
+ cr = new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone);
+ cr->get();
+ int r = call(new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone));
+ if (r < 0) {
+ cr->put();
+ cr = NULL;
+ ldout(store->ctx(), 0) << "ERROR: call(RGWDataSyncControlCR()) returned " << r << dendl;
+ return set_cr_error(r);
+ }
+ }
+ {
+ Mutex::Locker l(lock);
+ cr->put();
+ cr = NULL;
+ }
+ if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) {
+ ldout(store->ctx(), 0) << "ERROR: RGWDataSyncControlCR() returned " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (reset_backoff) {
+ backoff.reset();
+ }
+ yield backoff.backoff(this);
+ }
+ }
+ return 0;
+ }
+
+ void wakeup(int shard_id, set<string>& keys) {
+ Mutex::Locker l(lock);
+ if (cr) {
+ cr->wakeup(shard_id, keys);
+ }
+ }
+};
+
void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
RWLock::RLocker rl(lock);
if (!data_sync_cr) {
RGWObjectCtx obj_ctx(store, NULL);
int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
- if (r == -ENOENT) {
- r = run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
- }
- if (r < 0) {
+ if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << source_zone << " r=" << r << dendl;
return r;
}
-
+
lock.get_write();
- data_sync_cr = new RGWDataSyncCR(store, &http_manager, async_rados, conn, store->get_zone_params().log_pool, source_zone);
+ data_sync_cr = new RGWDataSyncControlCR(store, &http_manager, async_rados, conn, source_zone);
data_sync_cr->get();
lock.unlock();
r = run(data_sync_cr);