From: Jason Dillaman Date: Fri, 9 Mar 2018 04:18:35 +0000 (-0500) Subject: rbd-mirror: batch peer instances ack and timeout handlers X-Git-Tag: v13.1.0~312^2~11 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e31ef59aedf4bab06fbc9614411f1e9536f8f43f;p=ceph.git rbd-mirror: batch peer instances ack and timeout handlers This throttles the on-disk updates and also will eventually help to throttle the shuffling of images between alive peer instances. Signed-off-by: Jason Dillaman --- diff --git a/src/test/rbd_mirror/test_Instances.cc b/src/test/rbd_mirror/test_Instances.cc index e2f8008e00eef..eaf86d25f2b35 100644 --- a/src/test/rbd_mirror/test_Instances.cc +++ b/src/test/rbd_mirror/test_Instances.cc @@ -82,7 +82,7 @@ TEST_F(TestInstances, NotifyRemove) std::vector instance_ids; for (int i = 0; i < 10; i++) { - instances.notify(instance_id1); + instances.acked({instance_id1}); sleep(1); C_SaferCond on_get; InstanceWatcher<>::get_instances(m_local_io_ctx, &instance_ids, &on_get); diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index a4ee8fecd4b41..f39667d6dd6b1 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -230,7 +230,7 @@ struct Instances { MOCK_METHOD0(destroy, void()); MOCK_METHOD1(init, void(Context *)); MOCK_METHOD1(shut_down, void(Context *)); - MOCK_METHOD1(notify, void(const std::string &)); + MOCK_METHOD1(acked, void(const std::vector &)); }; Instances *Instances::s_instance = nullptr; diff --git a/src/tools/rbd_mirror/Instances.cc b/src/tools/rbd_mirror/Instances.cc index 56feb760ae3fb..1c24d5d7a3bc7 100644 --- a/src/tools/rbd_mirror/Instances.cc +++ b/src/tools/rbd_mirror/Instances.cc @@ -57,10 +57,7 @@ void Instances::shut_down(Context *on_finish) { [this](int r) { Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); - - for (auto it : m_instances) { - cancel_remove_task(it.second); - } + cancel_remove_task(); wait_for_ops(); }); @@ -68,37 +65,38 @@ void Instances::shut_down(Context *on_finish) { } template -void Instances::notify(const std::string &instance_id) { - dout(20) << instance_id << dendl; +void Instances::acked(const InstanceIds& instance_ids) { + dout(20) << "instance_ids=" << instance_ids << dendl; Mutex::Locker locker(m_lock); - if (m_on_finish != nullptr) { dout(20) << "received on shut down, ignoring" << dendl; return; } - Context *ctx = new C_Notify(this, instance_id); - + Context *ctx = new C_HandleAcked(this, instance_ids); m_threads->work_queue->queue(ctx, 0); } template -void Instances::handle_notify(const std::string &instance_id) { - dout(20) << instance_id << dendl; +void Instances::handle_acked(const InstanceIds& instance_ids) { + dout(5) << "instance_ids=" << instance_ids << dendl; Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); - if (m_on_finish != nullptr) { dout(20) << "handled on shut down, ignoring" << dendl; return; } - auto &instance = m_instances.insert( - std::make_pair(instance_id, Instance(instance_id))).first->second; + auto time = ceph_clock_now(); + for (auto& instance_id : instance_ids) { + auto &instance = m_instances.insert( + std::make_pair(instance_id, Instance{})).first->second; + instance.acked_time = time; + } - schedule_remove_task(instance); + schedule_remove_task(time); } template @@ -131,24 +129,15 @@ void Instances::handle_get_instances(int r) { Context *on_finish = nullptr; { - Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); - - if (r < 0) { - derr << "error retrieving instances: " << cpp_strerror(r) << dendl; - } else { - auto my_instance_id = stringify(m_ioctx.get_instance_id()); - for (auto &instance_id : m_instance_ids) { - if (instance_id == my_instance_id) { - continue; - } - auto &instance = m_instances.insert( - std::make_pair(instance_id, Instance(instance_id))).first->second; - schedule_remove_task(instance); - } - } std::swap(on_finish, m_on_finish); } + + if (r < 0) { + derr << "error retrieving instances: " << cpp_strerror(r) << dendl; + } else { + handle_acked(m_instance_ids); + } on_finish->complete(r); } @@ -180,70 +169,109 @@ void Instances::handle_wait_for_ops(int r) { } template -void Instances::remove_instance(Instance &instance) { +void Instances::remove_instances(const utime_t& time) { assert(m_lock.is_locked()); - dout(20) << instance.id << dendl; + InstanceIds instance_ids; + for (auto& instance_pair : m_instances) { + auto& instance = instance_pair.second; + if (instance.state != INSTANCE_STATE_REMOVING && + instance.acked_time <= time) { + instance.state = INSTANCE_STATE_REMOVING; + instance_ids.push_back(instance_pair.first); + } + } - Context *ctx = create_async_context_callback( - m_threads->work_queue, create_context_callback< - Instances, &Instances::handle_remove_instance>(this)); + dout(20) << "instance_ids=" << instance_ids << dendl; + Context* ctx = new FunctionContext([this, instance_ids](int r) { + handle_remove_instances(r, instance_ids); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + auto gather_ctx = new C_Gather(m_cct, ctx); + for (auto& instance_id : instance_ids) { + InstanceWatcher::remove_instance(m_ioctx, m_threads->work_queue, + instance_id, gather_ctx->new_sub()); + } m_async_op_tracker.start_op(); - InstanceWatcher::remove_instance(m_ioctx, m_threads->work_queue, - instance.id, ctx); - m_instances.erase(instance.id); + gather_ctx->activate(); } template -void Instances::handle_remove_instance(int r) { +void Instances::handle_remove_instances( + int r, const InstanceIds& instance_ids) { + Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); - dout(20) << " r=" << r << dendl; - + dout(20) << "r=" << r << ", instance_ids=" << instance_ids << dendl; assert(r == 0); + for (auto& instance_id : instance_ids) { + m_instances.erase(instance_id); + } + + // reschedule the timer for the next batch + schedule_remove_task(ceph_clock_now()); m_async_op_tracker.finish_op(); } template -void Instances::cancel_remove_task(Instance &instance) { +void Instances::cancel_remove_task() { assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); - if (instance.timer_task == nullptr) { + if (m_timer_task == nullptr) { return; } - dout(20) << instance.timer_task << dendl; + dout(20) << dendl; - bool canceled = m_threads->timer->cancel_event(instance.timer_task); + bool canceled = m_threads->timer->cancel_event(m_timer_task); assert(canceled); - instance.timer_task = nullptr; + m_timer_task = nullptr; } template -void Instances::schedule_remove_task(Instance &instance) { - dout(20) << dendl; - - cancel_remove_task(instance); +void Instances::schedule_remove_task(const utime_t& time) { + cancel_remove_task(); + if (m_on_finish != nullptr) { + dout(20) << "received on shut down, ignoring" << dendl; + return; + } + dout(20) << dendl; int after = m_cct->_conf->get_val("rbd_mirror_leader_heartbeat_interval") * (1 + m_cct->_conf->get_val("rbd_mirror_leader_max_missed_heartbeats") + m_cct->_conf->get_val("rbd_mirror_leader_max_acquire_attempts_before_break")); - instance.timer_task = new FunctionContext( - [this, &instance](int r) { + bool schedule = false; + utime_t oldest_time = time; + for (auto& instance : m_instances) { + if (instance.second.state == INSTANCE_STATE_REMOVING) { + continue; + } + + oldest_time = std::min(oldest_time, instance.second.acked_time); + schedule = true; + } + + if (!schedule) { + return; + } + + // schedule a time to fire when the oldest instance should be removed + m_timer_task = new FunctionContext( + [this, oldest_time](int r) { assert(m_threads->timer_lock.is_locked()); Mutex::Locker locker(m_lock); - instance.timer_task = nullptr; - remove_instance(instance); - }); + m_timer_task = nullptr; - dout(20) << "scheduling instance " << instance.id << " remove after " << after - << " sec (task " << instance.timer_task << ")" << dendl; + remove_instances(oldest_time); + }); - m_threads->timer->add_event_after(after, instance.timer_task); + oldest_time += after; + m_threads->timer->add_event_at(oldest_time, m_timer_task); } } // namespace mirror diff --git a/src/tools/rbd_mirror/Instances.h b/src/tools/rbd_mirror/Instances.h index 2aa4bcf721db3..875e041744caa 100644 --- a/src/tools/rbd_mirror/Instances.h +++ b/src/tools/rbd_mirror/Instances.h @@ -23,6 +23,8 @@ template struct Threads; template class Instances { public: + typedef std::vector InstanceIds; + static Instances *create(Threads *threads, librados::IoCtx &ioctx) { return new Instances(threads, ioctx); @@ -37,7 +39,8 @@ public: void init(Context *on_finish); void shut_down(Context *on_finish); - void notify(const std::string &instance_id); + void acked(const InstanceIds& instance_ids); + void list(std::vector *instance_ids); private: @@ -59,27 +62,41 @@ private: * @endverbatim */ - struct Instance { - std::string id; - Context *timer_task = nullptr; + enum InstanceState { + INSTANCE_STATE_IDLE, + INSTANCE_STATE_REMOVING + }; - Instance(const std::string &instance_id) : id(instance_id) { - } + struct Instance { + utime_t acked_time{}; + InstanceState state = INSTANCE_STATE_IDLE; }; - struct C_Notify : Context { + struct C_NotifyBase : public Context { Instances *instances; - std::string instance_id; + InstanceIds instance_ids; - C_Notify(Instances *instances, const std::string &instance_id) - : instances(instances), instance_id(instance_id) { + C_NotifyBase(Instances *instances, const InstanceIds& instance_ids) + : instances(instances), instance_ids(instance_ids) { instances->m_async_op_tracker.start_op(); } void finish(int r) override { - instances->handle_notify(instance_id); + execute(); instances->m_async_op_tracker.finish_op(); } + + virtual void execute() = 0; + }; + + struct C_HandleAcked : public C_NotifyBase { + C_HandleAcked(Instances *instances, const InstanceIds& instance_ids) + : C_NotifyBase(instances, instance_ids) { + } + + void execute() override { + this->instances->handle_acked(this->instance_ids); + } }; Threads *m_threads; @@ -87,12 +104,14 @@ private: CephContext *m_cct; Mutex m_lock; - std::vector m_instance_ids; + InstanceIds m_instance_ids; std::map m_instances; Context *m_on_finish = nullptr; AsyncOpTracker m_async_op_tracker; - void handle_notify(const std::string &instance_id); + Context *m_timer_task = nullptr; + + void handle_acked(const InstanceIds& instance_ids); void get_instances(); void handle_get_instances(int r); @@ -100,11 +119,11 @@ private: void wait_for_ops(); void handle_wait_for_ops(int r); - void remove_instance(Instance &instance); - void handle_remove_instance(int r); + void remove_instances(const utime_t& time); + void handle_remove_instances(int r, const InstanceIds& instance_ids); - void cancel_remove_task(Instance &instance); - void schedule_remove_task(Instance &instance); + void cancel_remove_task(); + void schedule_remove_task(const utime_t& time); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index a376cc1a6b336..1b2867e96273a 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -986,14 +986,17 @@ void LeaderWatcher::handle_notify_heartbeat(int r) { dout(20) << m_heartbeat_response.acks.size() << " acks received, " << m_heartbeat_response.timeouts.size() << " timed out" << dendl; + std::vector instance_ids; for (auto &it: m_heartbeat_response.acks) { uint64_t notifier_id = it.first.gid; if (notifier_id == m_notifier_id) { continue; } - std::string instance_id = stringify(notifier_id); - m_instances->notify(instance_id); + instance_ids.push_back(stringify(notifier_id)); + } + if (!instance_ids.empty()) { + m_instances->acked(instance_ids); } schedule_timer_task("heartbeat", 1, true,