RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module)
{
sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
- _sync_tracer, _source_zone, _sync_module, observer);
+ _sync_tracer, _source_zone, _sync_module);
if (initialized) {
return 0;
<< error_repo->get_obj() << " retcode=" << retcode));
}
}
- if (sync_env->observer) {
- sync_env->observer->on_bucket_changed(bs.bucket.get_key());
- }
/* FIXME: what do do in case of error */
if (marker_tracker && !entry_marker.empty()) {
/* update marker */
bs.shard_id = shard_id;
sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
- _error_logger, _sync_tracer, source_zone, _sync_module,
- nullptr);
+ _error_logger, _sync_tracer, source_zone, _sync_module);
return 0;
}
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
env.init(store->ctx(), store, nullptr, store->get_async_rados(),
- nullptr, nullptr, nullptr, source_zone, module, nullptr);
+ nullptr, nullptr, nullptr, source_zone, module);
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
#include "rgw_sync_module.h"
#include "rgw_sync_trace.h"
-namespace rgw {
-struct BucketChangeObserver;
-}
-
struct rgw_datalog_info {
uint32_t num_shards;
RGWSyncTraceManager *sync_tracer{nullptr};
string source_zone;
RGWSyncModuleInstanceRef sync_module{nullptr};
- rgw::BucketChangeObserver *observer{nullptr};
RGWDataSyncEnv() {}
void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
- const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module,
- rgw::BucketChangeObserver *_observer) {
+ const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module) {
cct = _cct;
store = _store;
conn = _conn;
sync_tracer = _sync_tracer;
source_zone = _source_zone;
sync_module = _sync_module;
- observer = _observer;
}
string shard_obj_name(int shard_id);
const DoutPrefixProvider *dpp;
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
- rgw::BucketChangeObserver *observer;
RGWHTTPManager http_manager;
RGWDataSyncEnv sync_env;
public:
RGWRemoteDataLog(const DoutPrefixProvider *dpp, RGWRados *_store,
- RGWAsyncRadosProcessor *async_rados,
- rgw::BucketChangeObserver *observer)
+ RGWAsyncRadosProcessor *async_rados)
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
- dpp(dpp), store(_store), async_rados(async_rados), observer(observer),
+ dpp(dpp), store(_store), async_rados(async_rados),
http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
public:
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone,
- rgw::BucketChangeObserver *observer = nullptr)
+ const string& _source_zone)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(nullptr),
- source_log(this, store, async_rados, observer), num_shards(0) {}
+ source_log(this, store, async_rados), num_shards(0) {}
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module,
- rgw::BucketChangeObserver *observer = nullptr)
+ const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(_sync_module),
- source_log(this, store, async_rados, observer), num_shards(0) {}
+ source_log(this, store, async_rados), num_shards(0) {}
~RGWDataSyncStatusManager() {
finalize();
}
}
public:
RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
- const string& _source_zone,
- rgw::BucketChangeObserver *observer)
+ const string& _source_zone)
: RGWSyncProcessorThread(_store, "data-sync"),
- sync(_store, async_rados, _source_zone, observer),
+ sync(_store, async_rados, _source_zone),
initialized(false) {}
void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
Mutex::Locker dl(data_sync_thread_lock);
for (auto iter : zone_data_sync_from_map) {
ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
- auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
- &*bucket_trim);
+ auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;