ImageMap<I>::~ImageMap() {
assert(m_async_op_tracker.empty());
assert(m_timer_task == nullptr);
+ assert(m_rebalance_task == nullptr);
}
template <typename I>
template <typename I>
void ImageMap<I>::schedule_update_task() {
Mutex::Locker timer_lock(m_threads->timer_lock);
+ schedule_update_task(m_threads->timer_lock);
+}
+
+template <typename I>
+void ImageMap<I>::schedule_update_task(const Mutex &timer_lock) {
+ assert(m_threads->timer_lock.is_locked());
+
+ schedule_rebalance_task();
+
if (m_timer_task != nullptr) {
return;
}
m_threads->timer->add_event_after(after, m_timer_task);
}
+template <typename I>
+void ImageMap<I>::rebalance() {
+ assert(m_rebalance_task == nullptr);
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_async_op_tracker.empty() && m_global_image_ids.empty()){
+ dout(20) << "starting rebalance" << dendl;
+
+ std::set<std::string> remap_global_image_ids;
+ m_policy->add_instances({}, &remap_global_image_ids);
+
+ for (auto const &global_image_id : remap_global_image_ids) {
+ schedule_action(global_image_id);
+ }
+ }
+ }
+
+ schedule_update_task(m_threads->timer_lock);
+}
+
+template <typename I>
+void ImageMap<I>::schedule_rebalance_task() {
+ assert(m_threads->timer_lock.is_locked());
+
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+ // fetch the updated value of idle timeout for (re)scheduling
+ double resched_after = cct->_conf->get_val<double>(
+ "rbd_mirror_image_policy_rebalance_timeout");
+ if (!resched_after) {
+ return;
+ }
+
+ // cancel existing rebalance task if any before scheduling
+ if (m_rebalance_task != nullptr) {
+ m_threads->timer->cancel_event(m_rebalance_task);
+ }
+
+ m_rebalance_task = new FunctionContext([this](int _) {
+ assert(m_threads->timer_lock.is_locked());
+ m_rebalance_task = nullptr;
+
+ rebalance();
+ });
+
+ dout(20) << "scheduling rebalance (" << m_rebalance_task << ")"
+ << " after " << resched_after << " second(s)" << dendl;
+ m_threads->timer->add_event_after(resched_after, m_rebalance_task);
+}
+
template <typename I>
void ImageMap<I>::schedule_action(const std::string &global_image_id) {
dout(20) << "global_image_id=" << global_image_id << dendl;
m_threads->timer->cancel_event(m_timer_task);
m_timer_task = nullptr;
}
+ if (m_rebalance_task != nullptr) {
+ m_threads->timer->cancel_event(m_rebalance_task);
+ m_rebalance_task = nullptr;
+ }
}
wait_for_async_ops(on_finish);
std::set<std::string> m_global_image_ids;
+ Context *m_rebalance_task = nullptr;
+
struct C_LoadMap : Context {
ImageMap *image_map;
Context *on_finish;
void schedule_action(const std::string &global_image_id);
void schedule_update_task();
+ void schedule_update_task(const Mutex &timer_lock);
void process_updates();
void update_image_mapping(Updates&& map_updates,
std::set<std::string>&& map_removals);
+ void rebalance();
+ void schedule_rebalance_task();
+
void notify_listener_acquire_release_images(const Updates &acquire, const Updates &release);
void notify_listener_remove_images(const std::string &peer_uuid, const Updates &remove);