#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
#include "global/global_context.h"
+#include "librbd/Utils.h"
#include "librbd/Watcher.h"
#include "librbd/internal.h"
+#include "LeaderWatcher.h"
#include "Replayer.h"
#include "Threads.h"
using std::vector;
using librbd::cls_client::dir_get_name;
+using librbd::util::create_async_context_callback;
namespace rbd {
namespace mirror {
Replayer *replayer;
};
+class LeaderReleaseCommand : public ReplayerAdminSocketCommand {
+public:
+ explicit LeaderReleaseCommand(Replayer *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->release_leader();
+ return true;
+ }
+
+private:
+ Replayer *replayer;
+};
+
} // anonymous namespace
class ReplayerAdminSocketHook : public AdminSocketHook {
if (r == 0) {
commands[command] = new FlushCommand(replayer);
}
+
+ command = "rbd mirror leader release " + name;
+ r = admin_socket->register_command(command, command, this,
+ "release rbd mirror leader " + name);
+ if (r == 0) {
+ commands[command] = new LeaderReleaseCommand(replayer);
+ }
}
~ReplayerAdminSocketHook() {
Commands commands;
};
-class MirrorStatusWatchCtx {
-public:
- MirrorStatusWatchCtx(librados::IoCtx &ioctx, ContextWQ *work_queue) {
- m_ioctx.dup(ioctx);
- m_watcher = new Watcher(m_ioctx, work_queue);
- }
-
- ~MirrorStatusWatchCtx() {
- delete m_watcher;
- }
-
- int register_watch() {
- C_SaferCond cond;
- m_watcher->register_watch(&cond);
- return cond.wait();
- }
-
- int unregister_watch() {
- C_SaferCond cond;
- m_watcher->unregister_watch(&cond);
- return cond.wait();
- }
-
- std::string get_oid() const {
- return m_watcher->get_oid();
- }
-
-private:
- class Watcher : public librbd::Watcher {
- public:
- Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) :
- librbd::Watcher(ioctx, work_queue, RBD_MIRRORING) {
- }
-
- virtual std::string get_oid() const {
- return RBD_MIRRORING;
- }
-
- virtual void handle_notify(uint64_t notify_id, uint64_t handle,
- uint64_t notifier_id, bufferlist &bl) {
- bufferlist out;
- acknowledge_notify(notify_id, handle, out);
- }
- };
-
- librados::IoCtx m_ioctx;
- Watcher *m_watcher;
-};
-
Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<> image_sync_throttler,
int64_t local_pool_id, const peer_t &peer,
m_args(args),
m_local_pool_id(local_pool_id),
m_asok_hook(nullptr),
- m_replayer_thread(this)
+ m_replayer_thread(this),
+ m_leader_listener(this)
{
}
if (m_replayer_thread.is_started()) {
m_replayer_thread.join();
}
+ if (m_leader_watcher) {
+ m_leader_watcher->shut_down();
+ }
}
bool Replayer::is_blacklisted() const {
return m_blacklisted;
}
+bool Replayer::is_leader() const {
+ Mutex::Locker locker(m_lock);
+ return m_leader_watcher && m_leader_watcher->is_leader();
+}
+
int Replayer::init()
{
dout(20) << "replaying for " << m_peer << dendl;
dout(20) << "connected to " << m_peer << dendl;
+ m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
+ &m_leader_listener));
+ r = m_leader_watcher->init();
+ if (r < 0) {
+ derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
// Bootstrap existing mirroring images
init_local_mirroring_images();
if (m_pool_watcher->is_blacklisted()) {
m_blacklisted = true;
m_stopping.set(1);
- } else if (!m_manual_stop) {
+ } else if (!m_manual_stop && m_leader_watcher->is_leader()) {
set_sources(m_pool_watcher->get_images());
}
ImageIds empty_sources;
while (true) {
- Mutex::Locker l(m_lock);
+ Mutex::Locker locker(m_lock);
set_sources(empty_sources);
if (m_image_replayers.empty()) {
break;
f->open_object_section("replayer_status");
f->dump_string("pool", m_local_io_ctx.get_pool_name());
f->dump_stream("peer") << m_peer;
+ f->dump_bool("leader", m_leader_watcher->is_leader());
f->open_array_section("image_replayers");
};
}
}
+void Replayer::release_leader()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read() || !m_leader_watcher) {
+ return;
+ }
+
+ m_leader_watcher->release_leader();
+}
+
void Replayer::set_sources(const ImageIds &image_ids)
{
dout(20) << "enter" << dendl;
assert(m_lock.is_locked());
- if (!m_init_images.empty()) {
+ if (!m_init_images.empty() && !m_stopping.read() &&
+ m_leader_watcher->is_leader()) {
dout(20) << "scanning initial local image set" << dendl;
for (auto &remote_image : image_ids) {
auto it = m_init_images.find(InitImageInfo(remote_image.global_id));
}
// shut down replayers for non-mirrored images
- bool existing_image_replayers = !m_image_replayers.empty();
for (auto image_it = m_image_replayers.begin();
image_it != m_image_replayers.end();) {
if (image_ids.find(ImageId(image_it->first)) == image_ids.end()) {
}
if (image_ids.empty()) {
- if (existing_image_replayers && m_image_replayers.empty()) {
- mirror_image_status_shut_down();
- }
return;
}
return;
}
- if (m_image_replayers.empty() && !existing_image_replayers) {
- // create entry for pool if it doesn't exist
- r = mirror_image_status_init();
- if (r < 0) {
- return;
- }
- }
-
for (auto &image_id : image_ids) {
auto it = m_image_replayers.find(image_id.id);
if (it == m_image_replayers.end()) {
}
}
-int Replayer::mirror_image_status_init() {
- assert(!m_status_watcher);
-
- uint64_t instance_id = librados::Rados(m_local_io_ctx).get_instance_id();
- dout(20) << "pool_id=" << m_local_pool_id << ", "
- << "instance_id=" << instance_id << dendl;
-
- librados::ObjectWriteOperation op;
- librbd::cls_client::mirror_image_status_remove_down(&op);
- int r = m_local_io_ctx.operate(RBD_MIRRORING, &op);
- if (r < 0) {
- derr << "error initializing " << RBD_MIRRORING << "object: "
- << cpp_strerror(r) << dendl;
- return r;
- }
-
- unique_ptr<MirrorStatusWatchCtx> watch_ctx(
- new MirrorStatusWatchCtx(m_local_io_ctx, m_threads->work_queue));
-
- r = watch_ctx->register_watch();
- if (r < 0) {
- derr << "error registering watcher for " << watch_ctx->get_oid()
- << " object: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- m_status_watcher = std::move(watch_ctx);
- return 0;
-}
-
-void Replayer::mirror_image_status_shut_down() {
- assert(m_status_watcher);
-
- int r = m_status_watcher->unregister_watch();
- if (r < 0) {
- derr << "error unregistering watcher for " << m_status_watcher->get_oid()
- << " object: " << cpp_strerror(r) << dendl;
- }
- m_status_watcher.reset();
-}
-
void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer,
const std::string &image_id,
const boost::optional<std::string>& image_name)
m_image_deleter->cancel_waiter(m_local_pool_id,
image_replayer->get_global_image_id());
- if (!m_stopping.read()) {
+ if (!m_stopping.read() && m_leader_watcher->is_leader()) {
dout(20) << "scheduling delete" << dendl;
m_image_deleter->schedule_image_delete(
m_local_rados,
}
FunctionContext *ctx = new FunctionContext(
[&image_replayer, this] (int r) {
- if (!m_stopping.read() && r >= 0) {
+ if (!m_stopping.read() && m_leader_watcher->is_leader() && r >= 0) {
m_image_deleter->schedule_image_delete(
m_local_rados,
image_replayer->get_local_pool_id(),
return false;
}
+void Replayer::handle_post_acquire_leader(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_cond.Signal();
+ }
+
+ on_finish->complete(0);
+}
+
+void Replayer::handle_pre_release_leader(Context *on_finish) {
+ dout(20) << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ set_sources(ImageIds());
+ if (!m_image_replayers.empty()) {
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Context *task = create_async_context_callback(
+ m_threads->work_queue, new FunctionContext(
+ [this, on_finish](int r) {
+ handle_pre_release_leader(on_finish);
+ }));
+ m_threads->timer->add_event_after(1, task);
+ return;
+ }
+ }
+
+ on_finish->complete(0);
+}
+
} // namespace mirror
} // namespace rbd
#include "ClusterWatcher.h"
#include "ImageReplayer.h"
+#include "LeaderWatcher.h"
#include "PoolWatcher.h"
#include "ImageDeleter.h"
#include "types.h"
struct Threads;
class ReplayerAdminSocketHook;
-class MirrorStatusWatchCtx;
/**
* Controls mirroring for a single remote cluster.
Replayer& operator=(const Replayer&) = delete;
bool is_blacklisted() const;
+ bool is_leader() const;
int init();
void run();
void stop(bool manual);
void restart();
void flush();
+ void release_leader();
private:
typedef PoolWatcher::ImageId ImageId;
const boost::optional<std::string>& image_name);
bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
- int mirror_image_status_init();
- void mirror_image_status_shut_down();
-
int init_rados(const std::string &cluster_name, const std::string &client_name,
const std::string &description, RadosRef *rados_ref);
+ void handle_post_acquire_leader(Context *on_finish);
+ void handle_pre_release_leader(Context *on_finish);
+
Threads *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<> m_image_sync_throttler;
std::unique_ptr<PoolWatcher> m_pool_watcher;
std::map<std::string, std::unique_ptr<ImageReplayer<> > > m_image_replayers;
- std::unique_ptr<MirrorStatusWatchCtx> m_status_watcher;
std::string m_asok_hook_name;
ReplayerAdminSocketHook *m_asok_hook;
return 0;
}
} m_replayer_thread;
+
+ class LeaderListener : public LeaderWatcher<>::Listener {
+ public:
+ LeaderListener(Replayer *replayer) : m_replayer(replayer) {
+ }
+
+ protected:
+ virtual void post_acquire_handler(Context *on_finish) {
+ m_replayer->handle_post_acquire_leader(on_finish);
+ }
+
+ virtual void pre_release_handler(Context *on_finish) {
+ m_replayer->handle_pre_release_leader(on_finish);
+ }
+
+ private:
+ Replayer *m_replayer;
+ } m_leader_listener;
+
+ std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
};
} // namespace mirror