static CreateNonPrimaryRequest* s_instance;
static CreateNonPrimaryRequest* create(MockTestImageCtx *image_ctx,
bool demoted,
+ const std::string group_id,
+ const std::string group_snap_id,
const std::string &primary_mirror_uuid,
uint64_t primary_snap_id,
const SnapSeqs& snap_seqs,
- uint64_t local_group_pool_id,
- const std::string &local_group_id,
- const std::string &local_group_snap_id,
const ImageState &image_state,
uint64_t *snap_id,
- Context *on_finish) {
+ Context *on_finish){
ceph_assert(s_instance != nullptr);
s_instance->demoted = demoted;
s_instance->primary_mirror_uuid = primary_mirror_uuid;
MOCK_CONST_METHOD0(is_resync_requested, bool());
MOCK_CONST_METHOD0(get_error_code, int());
MOCK_CONST_METHOD0(get_error_description, std::string());
+ MOCK_METHOD1(prune_snapshot, void(uint64_t snapshot_id));
+ MOCK_METHOD1(set_remote_snap_id_end_limit, void(uint64_t snapshot_id));
+ MOCK_METHOD0(get_remote_snap_id_end_limit, uint64_t());
};
template <>
}
MOCK_METHOD0(destroy, void());
- MOCK_METHOD4(start, void(Context *, bool, bool, bool));
+ MOCK_METHOD3(start, void(Context *, bool, bool));
MOCK_METHOD2(stop, void(Context *, bool));
MOCK_METHOD2(restart, void(Context*, bool));
MOCK_METHOD0(flush, void());
void expect_work_queue(MockThreads &mock_threads) {
EXPECT_CALL(*mock_threads.work_queue, queue(_, _))
- .WillOnce(Invoke([this](Context *ctx, int r) {
+ .Times(testing::AtLeast(1))
+ .WillRepeatedly(Invoke([this](Context *ctx, int r) {
m_threads->work_queue->queue(ctx, r);
}));
}
void expect_cancel_event(MockThreads &mock_threads, bool canceled) {
EXPECT_CALL(*mock_threads.timer, cancel_event(_))
- .WillOnce(Return(canceled));
+ .Times(testing::AtLeast(1))
+ .WillRepeatedly(Return(canceled));
}
};
expect_work_queue(mock_threads);
expect_cancel_event(mock_threads, true);
+ expect_work_queue(mock_threads);
EXPECT_CALL(mock_image_replayer, is_stopped()).WillOnce(Return(true));
expect_work_queue(mock_threads);
expect_work_queue(mock_threads);
schedule_update_task(m_threads->timer_lock, after);
}
+template <typename I>
+void ImageMap<I>::schedule_update_task(const ceph::mutex &timer_lock) {
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ double after = cct->_conf.get_val<double>("rbd_mirror_image_policy_update_throttle_interval");
+ schedule_update_task(m_threads->timer_lock, after);
+}
+
template <typename I>
void ImageMap<I>::schedule_update_task(const ceph::mutex &timer_lock,
double after) {
}
}
- schedule_update_task();
+ schedule_update_task(m_threads->timer_lock);
}
template <typename I>
for (auto &entity : entities) {
auto global_id = GlobalId(entity.type, entity.global_id);
auto result = m_peer_map[global_id].insert(mirror_uuid);
- if ((result.second || entity.type == MIRROR_ENTITY_TYPE_GROUP)) {
+ if ((result.second && m_peer_map[global_id].size() == 1) || entity.type == MIRROR_ENTITY_TYPE_GROUP) {
if (m_policy->add_entity(global_id, entity.count)) {
schedule_action(global_id);
}
void schedule_action(const image_map::GlobalId &global_id);
void schedule_update_task();
+ void schedule_update_task(const ceph::mutex &timer_lock);
void schedule_update_task(const ceph::mutex &timer_lock, double after);
void process_updates();
void update_image_mapping(Updates&& map_updates,
return;
}
- if (!m_status_removed) {
- auto ctx = new LambdaContext([this, r](int) {
- m_status_removed = true;
- handle_shut_down(r);
- });
- remove_image_status(m_delete_in_progress, ctx);
- return;
- }
if (m_local_group_ctx != nullptr) {
if (m_local_status_updater->mirror_group_image_exists(
return;
}
} else {
- if (m_local_status_updater->mirror_image_exists(m_global_image_id)) {
- dout(15) << "removing local mirror image status" << dendl;
+ if (!m_status_removed) {
auto ctx = new LambdaContext([this, r](int) {
- handle_shut_down(r);
- });
- m_local_status_updater->remove_mirror_image_status(m_global_image_id,
- true, ctx);
- return;
- }
-
- if (m_remote_image_peer.mirror_status_updater != nullptr &&
- m_remote_image_peer.mirror_status_updater->mirror_image_exists(
- m_global_image_id)) {
- dout(15) << "removing remote mirror image status" << dendl;
- auto ctx = new LambdaContext([this, r](int) {
- handle_shut_down(r);
- });
- m_remote_image_peer.mirror_status_updater->remove_mirror_image_status(
- m_global_image_id, true, ctx);
+ m_status_removed = true;
+ handle_shut_down(r);
+ });
+ remove_image_status(m_delete_in_progress, ctx);
return;
}
}
}
m_image_replayers.clear();
- std::swap(on_finish, m_on_shut_down);
}
- if (on_finish) {
+ if (--m_shutdown_counter == 0 ) {
+ std::swap(on_finish, m_on_shut_down);
on_finish->complete(r);
}
}
}
m_group_replayers.clear();
- std::swap(on_finish, m_on_shut_down);
}
- if (on_finish) {
+ if (--m_shutdown_counter == 0) {
+ std::swap(on_finish, m_on_shut_down);
on_finish->complete(r);
}
}
Context *m_image_state_check_task = nullptr;
Context *m_group_state_check_task = nullptr;
Context *m_on_shut_down = nullptr;
+ std::atomic<int> m_shutdown_counter{2};
bool m_manual_stop = false;
bool m_blocklisted = false;