From: Casey Bodley Date: Fri, 15 Sep 2017 18:48:43 +0000 (-0400) Subject: rgw: RGWDataSyncSingleEntryCR calls BucketChangeObserver X-Git-Tag: v13.0.1~210^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1c50d727b5df574e28d90cd99abe15db1742e4b1;p=ceph.git rgw: RGWDataSyncSingleEntryCR calls BucketChangeObserver Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2a29a18fb0d..e01a247569a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -18,6 +18,7 @@ #include "rgw_bucket.h" #include "rgw_metadata.h" #include "rgw_sync_module.h" +#include "rgw_sync_log_trim.h" #include "cls/lock/cls_lock_client.h" @@ -626,7 +627,7 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSy RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module) { sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, - _sync_tracer, _source_zone, _sync_module); + _sync_tracer, _source_zone, _sync_module, observer); if (initialized) { return 0; @@ -1050,6 +1051,9 @@ public: << error_repo->get_obj() << " retcode=" << retcode)); } } + if (sync_env->observer) { + sync_env->observer->on_bucket_changed(bs.bucket.get_key()); + } /* FIXME: what do do in case of error */ if (marker_tracker && !entry_marker.empty()) { /* update marker */ @@ -1835,7 +1839,9 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, bs.bucket = bucket; bs.shard_id = shard_id; - sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, _sync_tracer, source_zone, _sync_module); + sync_env.init(store->ctx(), store, conn, async_rados, http_manager, + _error_logger, _sync_tracer, source_zone, _sync_module, + nullptr); return 0; } @@ -3154,7 +3160,7 @@ int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone, RGWDataSyncEnv env; RGWSyncModuleInstanceRef module; // null sync module env.init(store->ctx(), store, nullptr, store->get_async_rados(), - nullptr, nullptr, nullptr, source_zone, module); + nullptr, nullptr, nullptr, source_zone, module, nullptr); RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, info.num_shards, diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 11232202486..6c56c86687b 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -11,6 +11,9 @@ #include "common/RWLock.h" #include "common/ceph_json.h" +namespace rgw { +class BucketChangeObserver; +} struct rgw_datalog_info { uint32_t num_shards; @@ -218,13 +221,15 @@ struct RGWDataSyncEnv { RGWSyncTraceManager *sync_tracer{nullptr}; string source_zone; RGWSyncModuleInstanceRef sync_module{nullptr}; + rgw::BucketChangeObserver *observer{nullptr}; RGWDataSyncEnv() {} void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer, - const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module) { + const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module, + rgw::BucketChangeObserver *_observer) { cct = _cct; store = _store; conn = _conn; @@ -234,6 +239,7 @@ struct RGWDataSyncEnv { sync_tracer = _sync_tracer; source_zone = _source_zone; sync_module = _sync_module; + observer = _observer; } string shard_obj_name(int shard_id); @@ -243,6 +249,7 @@ struct RGWDataSyncEnv { class RGWRemoteDataLog : public RGWCoroutinesManager { RGWRados *store; RGWAsyncRadosProcessor *async_rados; + rgw::BucketChangeObserver *observer; RGWHTTPManager http_manager; RGWDataSyncEnv sync_env; @@ -255,9 +262,10 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { bool initialized; public: - RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) + RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + rgw::BucketChangeObserver *observer) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), - store(_store), async_rados(async_rados), + store(_store), async_rados(async_rados), observer(observer), http_manager(store->ctx(), completion_mgr), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), initialized(false) {} @@ -295,10 +303,11 @@ class RGWDataSyncStatusManager { public: RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, - const string& _source_zone) + const string& _source_zone, + rgw::BucketChangeObserver *observer = nullptr) : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), sync_module(nullptr), - source_log(store, async_rados), num_shards(0) {} + source_log(store, async_rados, observer), num_shards(0) {} ~RGWDataSyncStatusManager() { finalize(); } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index c61ae9dc28a..060498c60be 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3153,8 +3153,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread } public: RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, - const string& _source_zone) - : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone), + const string& _source_zone, + rgw::BucketChangeObserver *observer) + : RGWSyncProcessorThread(_store, "data-sync"), + sync(_store, async_rados, _source_zone, observer), initialized(false) {} void wakeup_sync_shards(map >& shard_ids) { @@ -4517,7 +4519,8 @@ int RGWRados::init_complete() Mutex::Locker dl(data_sync_thread_lock); for (auto iter : zone_data_sync_from_map) { ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl; - RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first); + auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first, + &*bucket_trim); ret = thread->init(); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;