]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWDataSyncSingleEntryCR calls BucketChangeObserver
authorCasey Bodley <cbodley@redhat.com>
Fri, 15 Sep 2017 18:48:43 +0000 (14:48 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 23 Jan 2018 15:19:07 +0000 (10:19 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 1c50d727b5df574e28d90cd99abe15db1742e4b1)

Conflicts: sync tracing not backported
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc

index ec8d93b8789d52de09a95ac5b60450ff95797912..108000c15a105826c13c932b95bbeb56eaacbf93 100644 (file)
@@ -18,6 +18,7 @@
 #include "rgw_bucket.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_module.h"
+#include "rgw_sync_log_trim.h"
 
 #include "cls/lock/cls_lock_client.h"
 
@@ -613,7 +614,8 @@ int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers
 
 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
 {
-  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
+  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+                _source_zone, _sync_module, observer);
 
   if (initialized) {
     return 0;
@@ -1003,6 +1005,9 @@ public:
              << error_repo->get_obj() << " retcode=" << retcode << dendl;
         }
       }
+      if (sync_env->observer) {
+        sync_env->observer->on_bucket_changed(bs.bucket.get_key());
+      }
       /* FIXME: what do do in case of error */
       if (marker_tracker && !entry_marker.empty()) {
         /* update marker */
@@ -1741,7 +1746,8 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
   bs.bucket = bucket;
   bs.shard_id = shard_id;
 
-  sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
+  sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
+                _error_logger, source_zone, _sync_module, nullptr);
 
   return 0;
 }
@@ -3015,7 +3021,7 @@ int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
   RGWDataSyncEnv env;
   RGWSyncModuleInstanceRef module; // null sync module
   env.init(store->ctx(), store, nullptr, store->get_async_rados(),
-           nullptr, nullptr, nullptr, source_zone, module);
+           nullptr, nullptr, source_zone, module, nullptr);
 
   RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
   return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, info.num_shards,
index 125754003ad6724c2783cdc7797773538094e1f9..7716fabadcc6e57468b5b4ae11d878d9ce626f19 100644 (file)
@@ -10,6 +10,9 @@
 #include "common/RWLock.h"
 #include "common/ceph_json.h"
 
+namespace rgw {
+class BucketChangeObserver;
+}
 
 struct rgw_datalog_info {
   uint32_t num_shards;
@@ -219,13 +222,15 @@ struct RGWDataSyncEnv {
   RGWSyncErrorLogger *error_logger;
   string source_zone;
   RGWSyncModuleInstanceRef sync_module;
+  rgw::BucketChangeObserver *observer{nullptr};
 
   RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL), sync_module(NULL) {}
 
   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
             RGWSyncErrorLogger *_error_logger, const string& _source_zone,
-            RGWSyncModuleInstanceRef& _sync_module) {
+            RGWSyncModuleInstanceRef& _sync_module,
+            rgw::BucketChangeObserver *_observer) {
     cct = _cct;
     store = _store;
     conn = _conn;
@@ -234,6 +239,7 @@ struct RGWDataSyncEnv {
     error_logger = _error_logger;
     source_zone = _source_zone;
     sync_module = _sync_module;
+    observer = _observer;
   }
 
   string shard_obj_name(int shard_id);
@@ -243,6 +249,7 @@ struct RGWDataSyncEnv {
 class RGWRemoteDataLog : public RGWCoroutinesManager {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
+  rgw::BucketChangeObserver *observer;
   RGWHTTPManager http_manager;
 
   RGWDataSyncEnv sync_env;
@@ -253,9 +260,10 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
   bool initialized;
 
 public:
-  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+                   rgw::BucketChangeObserver *observer)
     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
-      store(_store), async_rados(async_rados),
+      store(_store), async_rados(async_rados), observer(observer),
       http_manager(store->ctx(), completion_mgr),
       lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
@@ -292,10 +300,11 @@ class RGWDataSyncStatusManager {
 
 public:
   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                           const string& _source_zone)
+                           const string& _source_zone,
+                           rgw::BucketChangeObserver *observer = nullptr)
     : store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
       sync_module(nullptr),
-      source_log(store, async_rados), num_shards(0) {}
+      source_log(store, async_rados, observer), num_shards(0) {}
   ~RGWDataSyncStatusManager() {
     finalize();
   }
index 5c34c063de66d56cf7867be5b70be10d86e4e0fb..9a0d7edee066118b6b9927fb34535c80cb6fc77e 100644 (file)
@@ -3210,8 +3210,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
   }
 public:
   RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                             const string& _source_zone)
-    : RGWSyncProcessorThread(_store, "data-sync"), sync(_store, async_rados, _source_zone),
+                             const string& _source_zone,
+                             rgw::BucketChangeObserver *observer)
+    : RGWSyncProcessorThread(_store, "data-sync"),
+      sync(_store, async_rados, _source_zone, observer),
       initialized(false) {}
 
   void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
@@ -4554,7 +4556,8 @@ int RGWRados::init_complete()
     Mutex::Locker dl(data_sync_thread_lock);
     for (auto iter : zone_data_sync_from_map) {
       ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
-      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
+      auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
+                                                    &*bucket_trim);
       ret = thread->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;