From f5a07532842a15830ef220bf1515e397fb51bebe Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 11 Nov 2015 12:43:31 -0800 Subject: [PATCH] rgw: backoff mechanism for data sync Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 251 +++++++++++++++++++++++++++------------ 1 file changed, 172 insertions(+), 79 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 8f6dd40d1923e..34251b65603f0 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -837,11 +837,12 @@ class RGWDataSyncShardCR : public RGWCoroutine { int total_entries; + bool *reset_backoff; public: RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone, - uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store), + uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store), http_manager(_mgr), async_rados(_async_rados), conn(_conn), @@ -850,7 +851,7 @@ public: shard_id(_shard_id), sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), - total_entries(0) { + total_entries(0), reset_backoff(NULL) { } ~RGWDataSyncShardCR() { @@ -868,13 +869,23 @@ public: } int operate() { + int r; while (true) { switch (sync_marker.state) { case rgw_data_sync_marker::FullSync: - return full_sync(); + r = full_sync(); + if (r < 0) { + ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl; + return set_cr_error(r); + } + return 0; case rgw_data_sync_marker::IncrementalSync: - return incremental_sync(); - break; + r = incremental_sync(); + if (r < 0) { + ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl; + return set_cr_error(r); + } + return 0; default: return set_cr_error(-EIO); } @@ -883,6 +894,7 @@ public: } int full_sync() { +#warning lock shard for full_sync #define OMAP_GET_MAX_ENTRIES 100 int max_entries = OMAP_GET_MAX_ENTRIES; reenter(&full_cr) { @@ -931,6 +943,7 @@ public: int incremental_sync() { +#warning lock shard for inc_sync reenter(&incremental_cr) { set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados, RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), @@ -989,6 +1002,113 @@ public: } }; +class RGWControlCR : public RGWCoroutine +{ + RGWCoroutine *cr; + Mutex lock; + + RGWSyncBackoff backoff; + bool reset_backoff; + +protected: + bool *backoff_ptr() { + return &reset_backoff; + } + + Mutex& cr_lock() { + return lock; + } + + RGWCoroutine *get_cr() { + return cr; + } + +public: + RGWControlCR(CephContext *_cct) : RGWCoroutine(_cct), cr(NULL), lock("RGWControlCR::lock"), reset_backoff(false) { + } + + virtual ~RGWControlCR() { + if (cr) { + cr->put(); + } + } + + virtual RGWCoroutine *alloc_cr() = 0; + + int operate() { + reenter(this) { + while (true) { + yield { + Mutex::Locker l(lock); + cr = alloc_cr(); + int r = call(cr); + if (r < 0) { + cr->put(); + cr = NULL; + ldout(cct, 0) << "ERROR: call() returned " << r << dendl; + return set_cr_error(r); + } + } + { + Mutex::Locker l(lock); + cr->put(); + cr = NULL; + } + if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) { + ldout(cct, 0) << "ERROR: RGWControlCR called coroutine returned " << retcode << dendl; + return set_cr_error(retcode); + } + if (reset_backoff) { + backoff.reset(); + } + yield backoff.backoff(this); + } + } + return 0; + } +}; + +class RGWDataSyncShardControlCR : public RGWControlCR { + RGWRados *store; + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + RGWRESTConn *conn; + + rgw_bucket pool; + + string source_zone; + uint32_t shard_id; + rgw_data_sync_marker sync_marker; + +public: + RGWDataSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone, + uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWControlCR(_store->ctx()), store(_store), + http_manager(_mgr), + async_rados(_async_rados), + conn(_conn), + pool(_pool), + source_zone(_source_zone), + shard_id(_shard_id), + sync_marker(_marker) { + } + + RGWCoroutine *alloc_cr() { + return new RGWDataSyncShardCR(store, http_manager, async_rados, conn, pool, source_zone, shard_id, sync_marker, backoff_ptr()); + } + + void append_modified_shards(set& keys) { + Mutex::Locker l(cr_lock()); + + RGWDataSyncShardCR *cr = static_cast(get_cr()); + if (!cr) { + return; + } + + cr->append_modified_shards(keys); + } +}; + class RGWDataSyncCR : public RGWCoroutine { RGWRados *store; RGWHTTPManager *http_manager; @@ -1003,18 +1123,21 @@ class RGWDataSyncCR : public RGWCoroutine { RGWDataSyncShardMarkerTrack *marker_tracker; Mutex shard_crs_lock; - map shard_crs; + map shard_crs; + + bool *reset_backoff; public: RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store), + RGWRESTConn *_conn, const string& _source_zone, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store), http_manager(_mgr), async_rados(_async_rados), conn(_conn), source_zone(_source_zone), obj_ctx(store), marker_tracker(NULL), - shard_crs_lock("RGWDataSyncCR::shard_crs_lock") { + shard_crs_lock("RGWDataSyncCR::shard_crs_lock"), + reset_backoff(_reset_backoff) { } int operate() { @@ -1035,30 +1158,36 @@ public: return set_cr_error(retcode); } - yield { - /* state: init status */ - if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) { - ldout(store->ctx(), 20) << __func__ << "(): init" << dendl; + /* state: init status */ + if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) { + ldout(store->ctx(), 20) << __func__ << "(): init" << dendl; + yield { r = call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards)); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl; return set_cr_error(r); } - sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps; - /* update new state */ - yield { - r = call(set_sync_info_cr()); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl; - return r; - } + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl; + return set_cr_error(retcode); + } + sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps; + /* update new state */ + yield { + r = call(set_sync_info_cr()); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl; + return set_cr_error(r); } } - } - if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl; - return set_cr_error(retcode); + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; + return set_cr_error(retcode); + } + + *reset_backoff = true; } if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { @@ -1077,16 +1206,24 @@ public: r = call(set_sync_info_cr()); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl; - return r; + return set_cr_error(r); } } + + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; + return set_cr_error(retcode); + } + + *reset_backoff = true; } + yield { if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) { case rgw_data_sync_info::StateSync: for (map::iterator iter = sync_status.sync_markers.begin(); iter != sync_status.sync_markers.end(); ++iter) { - RGWDataSyncShardCR *cr = new RGWDataSyncShardCR(store, http_manager, async_rados, + RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(store, http_manager, async_rados, conn, store->get_zone_params().log_pool, source_zone, iter->first, iter->second); shard_crs_lock.Lock(); @@ -1110,7 +1247,7 @@ public: void wakeup(int shard_id, set& keys) { Mutex::Locker l(shard_crs_lock); - map::iterator iter = shard_crs.find(shard_id); + map::iterator iter = shard_crs.find(shard_id); if (iter == shard_crs.end()) { return; } @@ -1119,7 +1256,7 @@ public: } }; -class RGWDataSyncControlCR : public RGWCoroutine +class RGWDataSyncControlCR : public RGWControlCR { RGWRados *store; RGWHTTPManager *http_manager; @@ -1127,67 +1264,23 @@ class RGWDataSyncControlCR : public RGWCoroutine RGWRESTConn *conn; string source_zone; - Mutex lock; - RGWDataSyncCR *cr; - - RGWSyncBackoff backoff; - bool reset_backoff; - public: RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store), + RGWRESTConn *_conn, const string& _source_zone) : RGWControlCR(_store->ctx()), store(_store), http_manager(_mgr), async_rados(_async_rados), conn(_conn), - source_zone(_source_zone), - lock("RGWDataSyncControlCR::lock"), - cr(NULL), - reset_backoff(false) { + source_zone(_source_zone) { } - ~RGWDataSyncControlCR() { - Mutex::Locker l(lock); - if (cr) { - cr->put(); - cr = NULL; - } - } - - int operate() { - reenter(this) { - while (true) { - yield { - Mutex::Locker l(lock); - cr = new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone); - cr->get(); - int r = call(new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone)); - if (r < 0) { - cr->put(); - cr = NULL; - ldout(store->ctx(), 0) << "ERROR: call(RGWDataSyncControlCR()) returned " << r << dendl; - return set_cr_error(r); - } - } - { - Mutex::Locker l(lock); - cr->put(); - cr = NULL; - } - if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) { - ldout(store->ctx(), 0) << "ERROR: RGWDataSyncControlCR() returned " << retcode << dendl; - return set_cr_error(retcode); - } - if (reset_backoff) { - backoff.reset(); - } - yield backoff.backoff(this); - } - } - return 0; + RGWCoroutine *alloc_cr() { + return new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone, backoff_ptr()); } void wakeup(int shard_id, set& keys) { - Mutex::Locker l(lock); + Mutex::Locker l(cr_lock()); + + RGWDataSyncCR *cr = static_cast(get_cr()); if (cr) { cr->wakeup(shard_id, keys); } -- 2.39.5