#include "InstanceWatcher.h"
#include "LeaderWatcher.h"
#include "Threads.h"
-#include "pool_watcher/RefreshImagesRequest.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
} // anonymous namespace
-struct PoolReplayer::C_RefreshLocalImages : public Context {
- PoolReplayer *pool_replayer;
- Context *on_finish;
- ImageIds image_ids;
-
- C_RefreshLocalImages(PoolReplayer *pool_replayer, Context *on_finish)
- : pool_replayer(pool_replayer), on_finish(on_finish) {
- }
-
- void finish(int r) override {
- pool_replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
- }
-};
-
PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<> image_sync_throttler,
m_peer(peer),
m_args(args),
m_local_pool_id(local_pool_id),
- m_pool_watcher_listener(this),
+ m_local_pool_watcher_listener(this, true),
+ m_remote_pool_watcher_listener(this, false),
m_asok_hook(nullptr),
m_pool_replayer_thread(this),
m_leader_listener(this)
m_instance_replayer->shut_down();
}
- assert(!m_pool_watcher);
+ assert(!m_local_pool_watcher);
+ assert(!m_remote_pool_watcher);
}
bool PoolReplayer::is_blacklisted() const {
}
Mutex::Locker locker(m_lock);
- if (m_pool_watcher && m_pool_watcher->is_blacklisted()) {
+ if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
+ (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
m_blacklisted = true;
m_stopping.set(1);
break;
void PoolReplayer::handle_update(const std::string &mirror_uuid,
ImageIds &&added_image_ids,
ImageIds &&removed_image_ids) {
- assert(!mirror_uuid.empty());
if (m_stopping.read()) {
return;
}
- dout(10) << dendl;
+ dout(10) << "mirror_uuid=" << mirror_uuid << ", "
+ << "added_count=" << added_image_ids.size() << ", "
+ << "removed_count=" << removed_image_ids.size() << dendl;
Mutex::Locker locker(m_lock);
if (!m_leader_watcher->is_leader()) {
return;
}
- if (m_peer.uuid != mirror_uuid) {
- m_instance_replayer->remove_peer(m_peer.uuid);
- m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
- m_peer.uuid = mirror_uuid;
- }
+ if (m_initial_mirror_image_ids.find(mirror_uuid) ==
+ m_initial_mirror_image_ids.end() &&
+ m_initial_mirror_image_ids.size() < 2) {
+ m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
+
+ if (m_initial_mirror_image_ids.size() == 2) {
+ dout(10) << "local and remote pools refreshed" << dendl;
- // first callback will be a full directory -- so see if we need to remove
- // any local images that no longer exist on the remote side
- if (!m_init_image_ids.empty()) {
- dout(20) << "scanning initial local image set" << dendl;
- for (auto &image_id : added_image_ids) {
- auto it = m_init_image_ids.find(image_id);
- if (it != m_init_image_ids.end()) {
- m_init_image_ids.erase(it);
+ // both local and remote initial pool listing received. derive
+ // removal notifications for the remote pool
+ auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
+ auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
+ for (auto &local_image_id : local_image_ids) {
+ if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
+ removed_image_ids.emplace(local_image_id.global_id, "");
+ }
}
+ local_image_ids.clear();
+ remote_image_ids.clear();
}
+ }
- // the remaining images in m_init_image_ids must be deleted
- for (auto &image_id : m_init_image_ids) {
- dout(20) << "scheduling the deletion of init image: "
- << image_id.global_id << " (" << image_id.id << ")" << dendl;
- m_image_deleter->schedule_image_delete(m_local_rados, m_local_pool_id,
- image_id.global_id);
- }
- m_init_image_ids.clear();
+ if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
+ m_instance_replayer->remove_peer(m_peer.uuid);
+ m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
+ m_peer.uuid = mirror_uuid;
}
m_update_op_tracker.start_op();
C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
- for (auto &image_id : removed_image_ids) {
+ for (auto &image_id : added_image_ids) {
// for now always send to myself (the leader)
std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
- mirror_uuid, image_id.id, true,
+ m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
+ mirror_uuid, image_id.id,
gather_ctx->new_sub());
}
- for (auto &image_id : added_image_ids) {
+ for (auto &image_id : removed_image_ids) {
// for now always send to myself (the leader)
std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
- mirror_uuid, image_id.id,
+ m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
+ mirror_uuid, image_id.id, true,
gather_ctx->new_sub());
}
void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
dout(20) << dendl;
- refresh_local_images(on_finish);
+ init_local_pool_watcher(on_finish);
}
void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
dout(20) << dendl;
- shut_down_pool_watcher(on_finish);
+ shut_down_pool_watchers(on_finish);
}
-void PoolReplayer::refresh_local_images(Context *on_finish) {
+void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
dout(20) << dendl;
+ Mutex::Locker locker(m_lock);
+ assert(!m_local_pool_watcher);
+ m_local_pool_watcher.reset(new PoolWatcher<>(
+ m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
+ m_initial_mirror_image_ids.clear();
+
// ensure the initial set of local images is up-to-date
// after acquiring the leader role
- auto ctx = new C_RefreshLocalImages(this, on_finish);
- auto req = pool_watcher::RefreshImagesRequest<>::create(
- m_local_io_ctx, &ctx->image_ids, ctx);
- req->send();
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_local_pool_watcher(r, on_finish);
+ });
+ m_local_pool_watcher->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
}
-void PoolReplayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
- Context *on_finish) {
+void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
dout(20) << "r=" << r << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- m_init_image_ids = std::move(image_ids);
- }
-
if (r < 0) {
derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
on_finish->complete(r);
return;
}
- init_pool_watcher(on_finish);
+ init_remote_pool_watcher(on_finish);
}
-void PoolReplayer::init_pool_watcher(Context *on_finish) {
+void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
dout(20) << dendl;
Mutex::Locker locker(m_lock);
- assert(!m_pool_watcher);
- m_pool_watcher.reset(new PoolWatcher<>(
- m_threads, m_remote_io_ctx, m_pool_watcher_listener));
- m_pool_watcher->init(create_async_context_callback(
+ assert(!m_remote_pool_watcher);
+ m_remote_pool_watcher.reset(new PoolWatcher<>(
+ m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
+ m_remote_pool_watcher->init(create_async_context_callback(
m_threads->work_queue, on_finish));
m_cond.Signal();
}
-void PoolReplayer::shut_down_pool_watcher(Context *on_finish) {
+void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
dout(20) << dendl;
{
Mutex::Locker locker(m_lock);
- if (m_pool_watcher) {
+ if (m_local_pool_watcher) {
Context *ctx = new FunctionContext([this, on_finish](int r) {
- handle_shut_down_pool_watcher(r, on_finish);
- });
+ handle_shut_down_pool_watchers(r, on_finish);
+ });
ctx = create_async_context_callback(m_threads->work_queue, ctx);
- m_pool_watcher->shut_down(ctx);
+ auto gather_ctx = new C_Gather(g_ceph_context, ctx);
+ m_local_pool_watcher->shut_down(gather_ctx->new_sub());
+ if (m_remote_pool_watcher) {
+ m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
+ }
+ gather_ctx->activate();
return;
}
}
on_finish->complete(0);
}
-void PoolReplayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {
+void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
dout(20) << "r=" << r << dendl;
{
Mutex::Locker locker(m_lock);
- assert(m_pool_watcher);
- m_pool_watcher.reset();
+ assert(m_local_pool_watcher);
+ m_local_pool_watcher.reset();
+
+ if (m_remote_pool_watcher) {
+ m_remote_pool_watcher.reset();
+ }
}
wait_for_update_ops(on_finish);
}
private:
struct PoolWatcherListener : public PoolWatcher<>::Listener {
PoolReplayer *pool_replayer;
+ bool local;
- PoolWatcherListener(PoolReplayer *pool_replayer)
- : pool_replayer(pool_replayer) {
+ PoolWatcherListener(PoolReplayer *pool_replayer, bool local)
+ : pool_replayer(pool_replayer), local(local) {
}
void handle_update(const std::string &mirror_uuid,
ImageIds &&added_image_ids,
ImageIds &&removed_image_ids) override {
- pool_replayer->handle_update(mirror_uuid, std::move(added_image_ids),
- std::move(removed_image_ids));
+ pool_replayer->handle_update((local ? "" : mirror_uuid),
+ std::move(added_image_ids),
+ std::move(removed_image_ids));
}
};
- struct C_RefreshLocalImages;
-
void handle_update(const std::string &mirror_uuid,
ImageIds &&added_image_ids,
ImageIds &&removed_image_ids);
void handle_post_acquire_leader(Context *on_finish);
void handle_pre_release_leader(Context *on_finish);
- void refresh_local_images(Context *on_finish);
- void handle_refresh_local_images(int r, ImageIds &&image_ids,
- Context *on_finish);
+ void init_local_pool_watcher(Context *on_finish);
+ void handle_init_local_pool_watcher(int r, Context *on_finish);
+
+ void init_remote_pool_watcher(Context *on_finish);
- void init_pool_watcher(Context *on_finish);
- void shut_down_pool_watcher(Context *on_finish);
- void handle_shut_down_pool_watcher(int r, Context *on_finish);
+ void shut_down_pool_watchers(Context *on_finish);
+ void handle_shut_down_pool_watchers(int r, Context *on_finish);
void wait_for_update_ops(Context *on_finish);
void handle_wait_for_update_ops(int r, Context *on_finish);
int64_t m_local_pool_id = -1;
- PoolWatcherListener m_pool_watcher_listener;
- std::unique_ptr<PoolWatcher<> > m_pool_watcher;
+ PoolWatcherListener m_local_pool_watcher_listener;
+ std::unique_ptr<PoolWatcher<> > m_local_pool_watcher;
+
+ PoolWatcherListener m_remote_pool_watcher_listener;
+ std::unique_ptr<PoolWatcher<> > m_remote_pool_watcher;
std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
std::string m_asok_hook_name;
AdminSocketHook *m_asok_hook;
- std::set<ImageId> m_init_image_ids;
+ std::map<std::string, ImageIds> m_initial_mirror_image_ids;
class PoolReplayerThread : public Thread {
PoolReplayer *m_pool_replayer;