From 7c430a08afaf6700cf4e64b7f7333e6ceafab2be Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 15 Sep 2017 14:48:43 -0400 Subject: [PATCH] rgw: RGWDataSyncSingleEntryCR calls BucketChangeObserver Signed-off-by: Casey Bodley (cherry picked from commit 1c50d727b5df574e28d90cd99abe15db1742e4b1) Conflicts: sync tracing not backported src/rgw/rgw_data_sync.cc src/rgw/rgw_data_sync.h --- src/rgw/rgw_data_sync.cc | 12 +++++++++--- src/rgw/rgw_data_sync.h | 19 ++++++++++++++----- src/rgw/rgw_rados.cc | 9 ++++++--- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index ec8d93b8789..108000c15a1 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -18,6 +18,7 @@ #include "rgw_bucket.h" #include "rgw_metadata.h" #include "rgw_sync_module.h" +#include "rgw_sync_log_trim.h" #include "cls/lock/cls_lock_client.h" @@ -613,7 +614,8 @@ int RGWRemoteDataLog::read_source_log_shards_next(map shard_markers int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module) { - sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module); + sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, + _source_zone, _sync_module, observer); if (initialized) { return 0; @@ -1003,6 +1005,9 @@ public: << error_repo->get_obj() << " retcode=" << retcode << dendl; } } + if (sync_env->observer) { + sync_env->observer->on_bucket_changed(bs.bucket.get_key()); + } /* FIXME: what do do in case of error */ if (marker_tracker && !entry_marker.empty()) { /* update marker */ @@ -1741,7 +1746,8 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, bs.bucket = bucket; bs.shard_id = shard_id; - sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module); + sync_env.init(store->ctx(), store, conn, async_rados, http_manager, + _error_logger, source_zone, _sync_module, nullptr); return 0; } @@ -3015,7 +3021,7 @@ int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone, RGWDataSyncEnv env; RGWSyncModuleInstanceRef module; // null sync module env.init(store->ctx(), store, nullptr, store->get_async_rados(), - nullptr, nullptr, nullptr, source_zone, module); + nullptr, nullptr, source_zone, module, nullptr); RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, info.num_shards, diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 125754003ad..7716fabadcc 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -10,6 +10,9 @@ #include "common/RWLock.h" #include "common/ceph_json.h" +namespace rgw { +class BucketChangeObserver; +} struct rgw_datalog_info { uint32_t num_shards; @@ -219,13 +222,15 @@ struct RGWDataSyncEnv { RGWSyncErrorLogger *error_logger; string source_zone; RGWSyncModuleInstanceRef sync_module; + rgw::BucketChangeObserver *observer{nullptr}; RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {} void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, RGWSyncErrorLogger *_error_logger, const string& _source_zone, - RGWSyncModuleInstanceRef& _sync_module) { + RGWSyncModuleInstanceRef& _sync_module, + rgw::BucketChangeObserver *_observer) { cct = _cct; store = _store; conn = _conn; @@ -234,6 +239,7 @@ struct RGWDataSyncEnv { error_logger = _error_logger; source_zone = _source_zone; sync_module = _sync_module; + observer = _observer; } string shard_obj_name(int shard_id); @@ -243,6 +249,7 @@ struct RGWDataSyncEnv { class RGWRemoteDataLog : public RGWCoroutinesManager { RGWRados *store; RGWAsyncRadosProcessor *async_rados; + rgw::BucketChangeObserver *observer; RGWHTTPManager http_manager; RGWDataSyncEnv sync_env; @@ -253,9 +260,10 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { bool initialized; public: - RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) + RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + rgw::BucketChangeObserver *observer) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), - store(_store), async_rados(async_rados), + store(_store), async_rados(async_rados), observer(observer), http_manager(store->ctx(), completion_mgr), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), initialized(false) {} @@ -292,10 +300,11 @@ class RGWDataSyncStatusManager { public: RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, - const string& _source_zone) + const string& _source_zone, + rgw::BucketChangeObserver *observer = nullptr) : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL), sync_module(nullptr), - source_log(store, async_rados), num_shards(0) {} + source_log(store, async_rados, observer), num_shards(0) {} ~RGWDataSyncStatusManager() { finalize(); } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 5c34c063de6..9a0d7edee06 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3210,8 +3210,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread } public: RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, - const string& _source_zone) - : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone), + const string& _source_zone, + rgw::BucketChangeObserver *observer) + : RGWSyncProcessorThread(_store, "data-sync"), + sync(_store, async_rados, _source_zone, observer), initialized(false) {} void wakeup_sync_shards(map >& shard_ids) { @@ -4554,7 +4556,8 @@ int RGWRados::init_complete() Mutex::Locker dl(data_sync_thread_lock); for (auto iter : zone_data_sync_from_map) { ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl; - RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first); + auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first, + &*bucket_trim); ret = thread->init(); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl; -- 2.47.3