From 3e7b86e64269f98cd77eaaf7fa8e87a017357a9d Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 8 Sep 2015 18:41:43 -0700 Subject: [PATCH] rgw: wake up appropriate shard sync handler When receiving notification about modified shard, wake up the appropriate handler Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_rados.cc | 16 ++++++++++++++++ src/rgw/rgw_rados.h | 6 +++++- src/rgw/rgw_rest_log.cc | 8 ++++++-- src/rgw/rgw_sync.cc | 35 ++++++++++++++++++++++++++++++++--- src/rgw/rgw_sync.h | 8 +++++++- 5 files changed, 66 insertions(+), 7 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index d2474b7db2798..352b974d98980 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2189,6 +2189,12 @@ public: int init(); int process(); + + void wakeup_sync_shards(set& shard_ids) { + for (set::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) { + sync.wakeup(*iter); + } + } }; int RGWSyncProcessorThread::init() @@ -2208,6 +2214,14 @@ int RGWSyncProcessorThread::process() return 0; } +void RGWRados::wakeup_sync_shards(set& shard_ids) +{ + Mutex::Locker l(sync_thread_lock); + if (sync_processor_thread) { + sync_processor_thread->wakeup_sync_shards(shard_ids); + } +} + int RGWRados::get_required_alignment(rgw_bucket& bucket, uint64_t *alignment) { IoCtx ioctx; @@ -2282,6 +2296,7 @@ void RGWRados::finalize() delete finisher; } if (run_sync_thread) { + Mutex::Locker l(sync_thread_lock); sync_processor_thread->stop(); delete sync_processor_thread; sync_processor_thread = NULL; @@ -2593,6 +2608,7 @@ int RGWRados::init_complete() } if (run_sync_thread) { + Mutex::Locker l(sync_thread_lock); sync_processor_thread = new RGWSyncProcessorThread(this); ret = sync_processor_thread->init(); if (ret < 0) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 006f0a99d3aa2..d11de3f64c534 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1576,6 +1576,8 @@ class RGWRados RGWMetaNotifier *meta_notifier; RGWSyncProcessorThread *sync_processor_thread; + Mutex sync_thread_lock; + int num_watchers; RGWWatcher **watchers; std::set watchers_set; @@ -1633,6 +1635,7 @@ public: RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false), run_sync_thread(false), meta_notifier(NULL), + sync_processor_thread(NULL), sync_thread_lock("sync_thread_lock"), num_watchers(0), watchers(NULL), watch_initialized(false), bucket_id_lock("rados_bucket_id"), @@ -2241,7 +2244,8 @@ public: * Check to see if the bucket metadata is synced */ bool is_syncing_bucket_meta(rgw_bucket& bucket); - + void wakeup_sync_shards(set& shard_ids); + int set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner); int set_buckets_enabled(std::vector& buckets, bool enabled); int bucket_suspended(rgw_bucket& bucket, bool *suspended); diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index c2da1126928c0..4c6cd407f343d 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -292,10 +292,14 @@ void RGWOp_MDLog_Notify::execute() { set updated_shards; decode_json_obj(updated_shards, &p); - for (set::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) { - ldout(s->cct, 0) << __func__ << "(): updated shard=" << *iter << dendl; + if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) { + for (set::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) { + ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl; + } } + store->wakeup_sync_shards(updated_shards); + http_ret = 0; } diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index a944807854fb7..f04a134255199 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1730,6 +1730,11 @@ public: /* TODO */ return 0; } + + void wakeup() { + Mutex::Locker l(inc_lock); + inc_cond.Signal(); + } }; class RGWMetaSyncCR : public RGWCoroutine { @@ -1739,6 +1744,8 @@ class RGWMetaSyncCR : public RGWCoroutine { rgw_meta_sync_status sync_status; + map shard_crs; + public: RGWMetaSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, rgw_meta_sync_status& _sync_status) : RGWCoroutine(_store->ctx()), store(_store), @@ -1754,15 +1761,28 @@ public: for (; iter != sync_status.sync_markers.end(); ++iter) { uint32_t shard_id = iter->first; rgw_meta_sync_marker marker; - spawn(new RGWMetaSyncShardCR(store, http_manager, async_rados, store->get_zone_params().log_pool, + + RGWMetaSyncShardCR *shard_cr = new RGWMetaSyncShardCR(store, http_manager, async_rados, store->get_zone_params().log_pool, shard_id, - sync_status.sync_markers[shard_id]), true); + sync_status.sync_markers[shard_id]); + + + shard_crs[shard_id] = shard_cr; + spawn(shard_cr, true); } } yield return set_state(RGWCoroutine_Done); } return 0; } + + void wakeup(int shard_id) { + map::iterator iter = shard_crs.find(shard_id); + if (iter == shard_crs.end()) { + return; + } + iter->second->wakeup(); + } }; int RGWRemoteMetaLog::clone_shards(int num_shards, vector& clone_markers) @@ -1853,7 +1873,8 @@ int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status /* fall through */ case rgw_meta_sync_info::StateSync: ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl; - r = run(new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status)); + meta_sync_cr = new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status); + r = run(meta_sync_cr); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl; return r; @@ -1867,6 +1888,14 @@ int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status return 0; } +void RGWRemoteMetaLog::wakeup(int shard_id) +{ + if (!meta_sync_cr) { + return; + } + meta_sync_cr->wakeup(shard_id); +} + int RGWCloneMetaLogCoroutine::operate() { reenter(this) { diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 8db33c9fab4fb..6dbcb70d00b11 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -126,6 +126,7 @@ WRITE_CLASS_ENCODER(rgw_meta_sync_status) class RGWAsyncRadosProcessor; class RGWMetaSyncStatusManager; +class RGWMetaSyncCR; class RGWRemoteMetaLog : public RGWCoroutinesManager { RGWRados *store; @@ -135,11 +136,13 @@ class RGWRemoteMetaLog : public RGWCoroutinesManager { RGWHTTPManager http_manager; RGWMetaSyncStatusManager *status_manager; + RGWMetaSyncCR *meta_sync_cr; + public: RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx()), store(_store), conn(NULL), http_manager(store->ctx(), &completion_mgr), - status_manager(_sm) {} + status_manager(_sm), meta_sync_cr(NULL) {} int init(); void finish(); @@ -154,6 +157,8 @@ public: int init_sync_status(int num_shards); int set_sync_info(const rgw_meta_sync_info& sync_info); int run_sync(int num_shards, rgw_meta_sync_status& sync_status); + + void wakeup(int shard_id); }; class RGWMetaSyncStatusManager { @@ -205,6 +210,7 @@ public: int run() { return master_log.run_sync(num_shards, sync_status); } + void wakeup(int shard_id) { return master_log.wakeup(shard_id); } void stop() { master_log.stop(); } -- 2.39.5