#include "librbd/Utils.h"
#include "librbd/Watcher.h"
#include "librbd/api/Mirror.h"
+#include "ImageMap.h"
#include "InstanceReplayer.h"
#include "InstanceWatcher.h"
#include "LeaderWatcher.h"
m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
m_local_pool_watcher_listener(this, true),
m_remote_pool_watcher_listener(this, false),
+ m_image_map_listener(this),
m_pool_replayer_thread(this),
m_leader_listener(this)
{
m_instance_replayer.reset();
}
+ assert(!m_image_map);
assert(!m_image_deleter);
assert(!m_local_pool_watcher);
assert(!m_remote_pool_watcher);
m_service_daemon->add_or_update_attribute(m_local_pool_id,
SERVICE_DAEMON_LEADER_KEY, true);
m_instance_watcher->handle_acquire_leader();
- init_local_pool_watcher(on_finish);
+ init_image_map(on_finish);
}
template <typename I>
shut_down_image_deleter(on_finish);
}
+template <typename I>
+void PoolReplayer<I>::init_image_map(Context *on_finish) {
+ dout(5) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(!m_image_map);
+ m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
+ m_image_map_listener));
+
+ auto ctx = new FunctionContext([this, on_finish](int r) {
+ handle_init_image_map(r, on_finish);
+ });
+ m_image_map->init(create_async_context_callback(
+ m_threads->work_queue, ctx));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_image_map(int r, Context *on_finish) {
+ dout(5) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to init image map: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([this, on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_image_map(on_finish);
+ return;
+ }
+
+ init_local_pool_watcher(on_finish);
+}
+
template <typename I>
void PoolReplayer<I>::init_local_pool_watcher(Context *on_finish) {
dout(20) << dendl;
dout(20) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
- on_finish->complete(r);
+ on_finish = new FunctionContext([this, on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_pool_watchers(on_finish);
return;
}
dout(20) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
- on_finish->complete(r);
+ on_finish = new FunctionContext([this, on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_pool_watchers(on_finish);
return;
}
Mutex::Locker locker(m_lock);
assert(!m_image_deleter);
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ handle_init_image_deleter(r, on_finish);
+ });
m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
m_service_daemon));
- m_image_deleter->init(on_finish);
+ m_image_deleter->init(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_init_image_deleter(int r, Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+ if (r < 0) {
+ derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
+ on_finish = new FunctionContext([this, on_finish, r](int) {
+ on_finish->complete(r);
+ });
+ shut_down_image_deleter(on_finish);
+ return;
+ }
+
+ on_finish->complete(0);
+
+ Mutex::Locker locker(m_lock);
m_cond.Signal();
}
template <typename I>
void PoolReplayer<I>::handle_wait_for_update_ops(int r, Context *on_finish) {
dout(20) << "r=" << r << dendl;
-
assert(r == 0);
+ shut_down_image_map(on_finish);
+}
+
+template <typename I>
+void PoolReplayer<I>::shut_down_image_map(Context *on_finish) {
+ dout(5) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_image_map) {
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_image_map(r, on_finish);
+ });
+ m_image_map->shut_down(create_async_context_callback(
+ m_threads->work_queue, on_finish));
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_shut_down_image_map(int r, Context *on_finish) {
+ dout(5) << "r=" << r << dendl;
+ if (r < 0 && r != -EBLACKLISTED) {
+ derr << "failed to shut down image map: " << cpp_strerror(r) << dendl;
+ }
+
Mutex::Locker locker(m_lock);
+ assert(m_image_map);
+ m_image_map.reset();
+
m_instance_replayer->release_all(on_finish);
}
m_instance_watcher->handle_update_leader(leader_instance_id);
}
+template <typename I>
+void PoolReplayer<I>::handle_acquire_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+ // TODO
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_release_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+ // TODO
+}
+
+template <typename I>
+void PoolReplayer<I>::handle_remove_image(const std::string &mirror_uuid,
+ const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) {
+ dout(5) << "mirror_uuid=" << mirror_uuid << ", "
+ << "global_image_id=" << global_image_id << ", "
+ << "instance_id=" << instance_id << dendl;
+ // TODO
+}
+
} // namespace mirror
} // namespace rbd
#include "PoolWatcher.h"
#include "ImageDeleter.h"
#include "types.h"
+#include "tools/rbd_mirror/image_map/Types.h"
#include "tools/rbd_mirror/leader_watcher/Types.h"
#include "tools/rbd_mirror/pool_watcher/Types.h"
#include "tools/rbd_mirror/service_daemon/Types.h"
namespace rbd {
namespace mirror {
+template <typename> class ImageMap;
template <typename> class InstanceReplayer;
template <typename> class InstanceWatcher;
template <typename> class ServiceDaemon;
* . |
* . |
* v (leader acquired) |
+ * INIT_IMAGE_MAP SHUT_DOWN_IMAGE_MAP
+ * | ^
+ * v |
* INIT_LOCAL_POOL_WATCHER WAIT_FOR_NOTIFICATIONS
* | ^
* v |
}
};
+ struct ImageMapListener : public image_map::Listener {
+ PoolReplayer *pool_replayer;
+
+ ImageMapListener(PoolReplayer *pool_replayer)
+ : pool_replayer(pool_replayer) {
+ }
+
+ void acquire_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) override {
+ pool_replayer->handle_acquire_image(global_image_id, instance_id,
+ on_finish);
+ }
+
+ void release_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) override {
+ pool_replayer->handle_release_image(global_image_id, instance_id,
+ on_finish);
+ }
+
+ void remove_image(const std::string &mirror_uuid,
+ const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish) override {
+ pool_replayer->handle_remove_image(mirror_uuid, global_image_id,
+ instance_id, on_finish);
+ }
+ };
+
void handle_update(const std::string &mirror_uuid,
ImageIds &&added_image_ids,
ImageIds &&removed_image_ids);
void handle_post_acquire_leader(Context *on_finish);
void handle_pre_release_leader(Context *on_finish);
+ void init_image_map(Context *on_finish);
+ void handle_init_image_map(int r, Context *on_finish);
+
void init_local_pool_watcher(Context *on_finish);
void handle_init_local_pool_watcher(int r, Context *on_finish);
void handle_init_remote_pool_watcher(int r, Context *on_finish);
void init_image_deleter(Context* on_finish);
+ void handle_init_image_deleter(int r, Context* on_finish);
void shut_down_image_deleter(Context* on_finish);
void handle_shut_down_image_deleter(int r, Context* on_finish);
void wait_for_update_ops(Context *on_finish);
void handle_wait_for_update_ops(int r, Context *on_finish);
+ void shut_down_image_map(Context *on_finish);
+ void handle_shut_down_image_map(int r, Context *on_finish);
+
void handle_update_leader(const std::string &leader_instance_id);
+ void handle_acquire_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish);
+ void handle_release_image(const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish);
+ void handle_remove_image(const std::string &mirror_uuid,
+ const std::string &global_image_id,
+ const std::string &instance_id,
+ Context* on_finish);
+
Threads<ImageCtxT> *m_threads;
ServiceDaemon<ImageCtxT>* m_service_daemon;
int64_t m_local_pool_id = -1;
std::unique_ptr<InstanceReplayer<ImageCtxT>> m_instance_replayer;
std::unique_ptr<ImageDeleter<ImageCtxT>> m_image_deleter;
+ ImageMapListener m_image_map_listener;
+ std::unique_ptr<ImageMap<librbd::ImageCtx>> m_image_map;
+
std::string m_asok_hook_name;
AdminSocketHook *m_asok_hook = nullptr;