#include "LeaderWatcher.h"
#include "Replayer.h"
#include "Threads.h"
+#include "pool_watcher/RefreshImagesRequest.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
Commands commands;
};
+struct Replayer::C_RefreshLocalImages : public Context {
+ Replayer *replayer;
+ Context *on_finish;
+ ImageIds image_ids;
+
+ C_RefreshLocalImages(Replayer *replayer, Context *on_finish)
+ : replayer(replayer), on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ replayer->handle_refresh_local_images(r, std::move(image_ids), on_finish);
+ }
+};
+
Replayer::Replayer(Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<> image_sync_throttler,
dout(20) << "connected to " << m_peer << dendl;
- r = init_local_mirroring_images();
- if (r < 0) {
- return r;
- }
-
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
r = m_leader_watcher->init();
return 0;
}
-int Replayer::init_local_mirroring_images() {
- dout(20) << dendl;
-
- rbd_mirror_mode_t mirror_mode;
- int r = librbd::api::Mirror<>::mode_get(m_local_io_ctx, &mirror_mode);
- if (r < 0) {
- derr << "could not tell whether mirroring was enabled for "
- << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
- return r;
- }
- if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
- dout(20) << "pool " << m_local_io_ctx.get_pool_name() << " "
- << "has mirroring disabled" << dendl;
- return -ENOENT;
- }
-
- ImageIds image_ids;
-
- std::string last_read = "";
- int max_read = 1024;
- do {
- std::map<std::string, std::string> mirror_images;
- r = librbd::cls_client::mirror_image_list(&m_local_io_ctx, last_read,
- max_read, &mirror_images);
- if (r < 0) {
- derr << "error listing mirrored image directory: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
- std::string image_name;
- r = dir_get_name(&m_local_io_ctx, RBD_DIRECTORY, it->first, &image_name);
- if (r == -ENOENT) {
- dout(20) << "orphaned mirror image: " << it->first << dendl;
- continue;
- } else if (r < 0) {
- derr << "error retrieving local image name: " << cpp_strerror(r)
- << dendl;
- return r;
- }
-
- dout(20) << "local image: " << it->second << " (" << it->first << ")"
- << dendl;
- image_ids.insert(ImageId(it->second, it->first, image_name));
- }
- if (!mirror_images.empty()) {
- last_read = mirror_images.rbegin()->first;
- }
- r = mirror_images.size();
- } while (r == max_read);
-
- m_init_image_ids = std::move(image_ids);
- return 0;
-}
-
void Replayer::run()
{
dout(20) << "enter" << dendl;
void Replayer::handle_post_acquire_leader(Context *on_finish) {
dout(20) << dendl;
+ refresh_local_images(on_finish);
+}
+
+void Replayer::handle_pre_release_leader(Context *on_finish) {
+ dout(20) << dendl;
+ shut_down_pool_watcher(on_finish);
+}
+
+void Replayer::refresh_local_images(Context *on_finish) {
+ dout(20) << dendl;
+
+ // ensure the initial set of local images is up-to-date
+ // after acquiring the leader role
+ auto ctx = new C_RefreshLocalImages(this, on_finish);
+ auto req = pool_watcher::RefreshImagesRequest<>::create(
+ m_local_io_ctx, &ctx->image_ids, ctx);
+ req->send();
+}
+
+void Replayer::handle_refresh_local_images(int r, ImageIds &&image_ids,
+ Context *on_finish) {
+ dout(20) << "r=" << r << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_init_image_ids = std::move(image_ids);
+ }
+
+ if (r < 0) {
+ derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ init_pool_watcher(on_finish);
+}
+
+void Replayer::init_pool_watcher(Context *on_finish) {
+ dout(20) << dendl;
Mutex::Locker locker(m_lock);
assert(!m_pool_watcher);
m_cond.Signal();
}
-void Replayer::handle_pre_release_leader(Context *on_finish) {
- dout(20) << dendl;
- shut_down_pool_watcher(on_finish);
-}
-
void Replayer::shut_down_pool_watcher(Context *on_finish) {
dout(20) << dendl;
- Context *ctx = new FunctionContext([this, on_finish](int r) {
- handle_shut_down_pool_watcher(r, on_finish);
- });
- ctx = create_async_context_callback(m_threads->work_queue, ctx);
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_pool_watcher) {
+ Context *ctx = new FunctionContext([this, on_finish](int r) {
+ handle_shut_down_pool_watcher(r, on_finish);
+ });
+ ctx = create_async_context_callback(m_threads->work_queue, ctx);
+
+ m_pool_watcher->shut_down(ctx);
+ return;
+ }
+ }
- Mutex::Locker locker(m_lock);
- assert(m_pool_watcher);
- m_pool_watcher->shut_down(ctx);
+ on_finish->complete(0);
}
void Replayer::handle_shut_down_pool_watcher(int r, Context *on_finish) {