fi
}
+testlog()
+{
+ echo $(date '+%F %T') $@ | tee -a "${TEMPDIR}/rbd-mirror.test.log"
+}
+
setup()
{
local c
rm -f $(daemon_pid_file "${cluster}")
}
+admin_daemon()
+{
+ local cluster=$1 ; shift
+
+ local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
+ test -S "${asok_file}"
+
+ ceph --admin-daemon ${asok_file} $@
+}
+
status()
{
local cluster daemon image
cmd="${cmd} ${POOL}/${image}"
fi
- local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
- test -S "${asok_file}"
-
- ceph --admin-daemon ${asok_file} ${cmd}
+ admin_daemon "${cluster}" ${cmd}
}
test_image_replay_state()
local test_state=$3
local current_state=stopped
- local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
- test -S "${asok_file}"
-
- ceph --admin-daemon ${asok_file} help |
+ admin_daemon "${cluster}" help |
fgrep "\"rbd mirror status ${POOL}/${image}\"" &&
- ceph --admin-daemon ${asok_file} rbd mirror status ${POOL}/${image} |
+ admin_daemon "${cluster}" rbd mirror status ${POOL}/${image} |
grep -i 'state.*Replaying' &&
current_state=started
setup
-echo "TEST: add image and test replay"
+testlog "TEST: add image and test replay"
start_mirror ${CLUSTER1}
image=test
create_image ${CLUSTER2} ${image}
test_status_in_pool_dir ${CLUSTER2} ${image} 'down+unknown'
compare_images ${image}
-echo "TEST: stop mirror, add image, start mirror and test replay"
+testlog "TEST: stop mirror, add image, start mirror and test replay"
stop_mirror ${CLUSTER1}
image1=test1
create_image ${CLUSTER2} ${image1}
test_status_in_pool_dir ${CLUSTER2} ${image1} 'down+unknown'
compare_images ${image1}
-echo "TEST: test the first image is replaying after restart"
+testlog "TEST: test the first image is replaying after restart"
write_image ${CLUSTER2} ${image} 100
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${image}
test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position'
compare_images ${image}
-echo "TEST: failover and failback"
+testlog "TEST: stop/start/restart mirror via admin socket"
+admin_daemon ${CLUSTER1} rbd mirror stop
+wait_for_image_replay_stopped ${CLUSTER1} ${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror start
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror restart
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror stop
+wait_for_image_replay_stopped ${CLUSTER1} ${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror restart
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror stop ${CLUSTER2}
+wait_for_image_replay_stopped ${CLUSTER1} ${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
+wait_for_image_replay_started ${CLUSTER1} ${image}
+
+admin_daemon ${CLUSTER1} rbd mirror start
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror start ${CLUSTER2}
+
+admin_daemon ${CLUSTER1} rbd mirror restart ${POOL}/${image}
+wait_for_image_replay_started ${CLUSTER1} ${image}
+
+admin_daemon ${CLUSTER1} rbd mirror restart ${CLUSTER2}
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror flush
+admin_daemon ${CLUSTER1} rbd mirror status
+
+testlog "TEST: failover and failback"
start_mirror ${CLUSTER2}
# failover
ImageReplayer<I> *replayer;
};
+template <typename I>
+class StartCommand : public ImageReplayerAdminSocketCommand {
+public:
+ explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->start(nullptr, nullptr, true);
+ return true;
+ }
+
+private:
+ ImageReplayer<I> *replayer;
+};
+
+template <typename I>
+class StopCommand : public ImageReplayerAdminSocketCommand {
+public:
+ explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->stop(nullptr, true);
+ return true;
+ }
+
+private:
+ ImageReplayer<I> *replayer;
+};
+
+template <typename I>
+class RestartCommand : public ImageReplayerAdminSocketCommand {
+public:
+ explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->restart();
+ return true;
+ }
+
+private:
+ ImageReplayer<I> *replayer;
+};
+
template <typename I>
class FlushCommand : public ImageReplayerAdminSocketCommand {
public:
commands[command] = new StatusCommand<I>(replayer);
}
+ command = "rbd mirror start " + name;
+ r = admin_socket->register_command(command, command, this,
+ "start rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StartCommand<I>(replayer);
+ }
+
+ command = "rbd mirror stop " + name;
+ r = admin_socket->register_command(command, command, this,
+ "stop rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StopCommand<I>(replayer);
+ }
+
+ command = "rbd mirror restart " + name;
+ r = admin_socket->register_command(command, command, this,
+ "restart rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new RestartCommand<I>(replayer);
+ }
+
command = "rbd mirror flush " + name;
r = admin_socket->register_command(command, command, this,
"flush rbd mirror " + name);
template <typename I>
void ImageReplayer<I>::start(Context *on_finish,
- const BootstrapParams *bootstrap_params)
+ const BootstrapParams *bootstrap_params,
+ bool manual)
{
assert(m_on_start_finish == nullptr);
assert(m_on_stop_finish == nullptr);
dout(20) << "on_finish=" << on_finish << dendl;
+ int r = 0;
{
Mutex::Locker locker(m_lock);
- assert(is_stopped_());
- m_state = STATE_STARTING;
- m_last_r = 0;
- m_state_desc.clear();
- m_on_start_finish = on_finish;
+ if (!is_stopped_()) {
+ derr << "already running" << dendl;
+ r = -EINVAL;
+ } else if (m_manual_stop && !manual) {
+ dout(5) << "stopped manually, ignoring start without manual flag"
+ << dendl;
+ r = -EPERM;
+ } else {
+ m_state = STATE_STARTING;
+ m_last_r = 0;
+ m_state_desc.clear();
+ m_on_start_finish = on_finish;
+ m_manual_stop = false;
+ }
+ }
+
+ if (r < 0) {
+ if (on_finish) {
+ on_finish->complete(r);
+ }
+ return;
}
- int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
+ r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
if (r < 0) {
derr << "error opening ioctx for remote pool " << m_remote_pool_id
<< ": " << cpp_strerror(r) << dendl;
}
template <typename I>
-void ImageReplayer<I>::stop(Context *on_finish)
+void ImageReplayer<I>::stop(Context *on_finish, bool manual)
{
dout(20) << "on_finish=" << on_finish << dendl;
bool shut_down_replay = false;
+ bool running = true;
{
Mutex::Locker locker(m_lock);
- assert(is_running_());
+ if (!is_running_()) {
+ running = false;
+ } else {
+ if (!is_stopped_()) {
+ if (m_state == STATE_STARTING) {
+ dout(20) << "interrupting start" << dendl;
+ } else {
+ dout(20) << "interrupting replay" << dendl;
+ shut_down_replay = true;
+ }
- if (!is_stopped_()) {
- if (m_state == STATE_STARTING) {
- dout(20) << "interrupting start" << dendl;
- } else {
- dout(20) << "interrupting replay" << dendl;
- shut_down_replay = true;
+ assert(m_on_stop_finish == nullptr);
+ std::swap(m_on_stop_finish, on_finish);
+ m_stop_requested = true;
+ m_manual_stop = manual;
}
+ }
+ }
- assert(m_on_stop_finish == nullptr);
- std::swap(m_on_stop_finish, on_finish);
- m_stop_requested = true;
+ if (!running) {
+ derr << "not running" << dendl;
+ if (on_finish) {
+ on_finish->complete(-EINVAL);
}
+ return;
}
if (shut_down_replay) {
replay_flush();
}
+template <typename I>
+void ImageReplayer<I>::restart(Context *on_finish)
+{
+ FunctionContext *ctx = new FunctionContext(
+ [this, on_finish](int r) {
+ if (r < 0) {
+ // Try start anyway.
+ }
+ start(on_finish, nullptr, true);
+ });
+ stop(ctx);
+}
+
template <typename I>
void ImageReplayer<I>::flush(Context *on_finish)
{
void set_state_description(int r, const std::string &desc);
void start(Context *on_finish = nullptr,
- const BootstrapParams *bootstrap_params = nullptr);
- void stop(Context *on_finish = nullptr);
+ const BootstrapParams *bootstrap_params = nullptr,
+ bool manual = false);
+ void stop(Context *on_finish = nullptr, bool manual = false);
+ void restart(Context *on_finish = nullptr);
void flush(Context *on_finish = nullptr);
void print_status(Formatter *f, stringstream *ss);
librados::AioCompletion *m_update_status_comp = nullptr;
bool m_update_status_pending = false;
bool m_stop_requested = false;
+ bool m_manual_stop = false;
AdminSocketHook *m_asok_hook = nullptr;
Mirror *mirror;
};
+class StartCommand : public MirrorAdminSocketCommand {
+public:
+ explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ mirror->start();
+ return true;
+ }
+
+private:
+ Mirror *mirror;
+};
+
+class StopCommand : public MirrorAdminSocketCommand {
+public:
+ explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ mirror->stop();
+ return true;
+ }
+
+private:
+ Mirror *mirror;
+};
+
+class RestartCommand : public MirrorAdminSocketCommand {
+public:
+ explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ mirror->restart();
+ return true;
+ }
+
+private:
+ Mirror *mirror;
+};
+
class FlushCommand : public MirrorAdminSocketCommand {
public:
explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
commands[command] = new StatusCommand(mirror);
}
+ command = "rbd mirror start";
+ r = admin_socket->register_command(command, command, this,
+ "start rbd mirror");
+ if (r == 0) {
+ commands[command] = new StartCommand(mirror);
+ }
+
+ command = "rbd mirror stop";
+ r = admin_socket->register_command(command, command, this,
+ "stop rbd mirror");
+ if (r == 0) {
+ commands[command] = new StopCommand(mirror);
+ }
+
+ command = "rbd mirror restart";
+ r = admin_socket->register_command(command, command, this,
+ "restart rbd mirror");
+ if (r == 0) {
+ commands[command] = new RestartCommand(mirror);
+ }
+
command = "rbd mirror flush";
r = admin_socket->register_command(command, command, this,
"flush rbd mirror");
while (!m_stopping.read()) {
m_local_cluster_watcher->refresh_pools();
Mutex::Locker l(m_lock);
- update_replayers(m_local_cluster_watcher->get_peer_configs());
+ if (!m_manual_stop) {
+ update_replayers(m_local_cluster_watcher->get_peer_configs());
+ }
// TODO: make interval configurable
m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
}
}
}
-void Mirror::flush()
+void Mirror::start()
+{
+ dout(20) << "enter" << dendl;
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ m_manual_stop = false;
+
+ for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+ auto &replayer = it->second;
+ replayer->start();
+ }
+}
+
+void Mirror::stop()
{
dout(20) << "enter" << dendl;
Mutex::Locker l(m_lock);
return;
}
+ m_manual_stop = true;
+
+ for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+ auto &replayer = it->second;
+ replayer->stop();
+ }
+}
+
+void Mirror::restart()
+{
+ dout(20) << "enter" << dendl;
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ m_manual_stop = false;
+
+ for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+ auto &replayer = it->second;
+ replayer->restart();
+ }
+}
+
+void Mirror::flush()
+{
+ dout(20) << "enter" << dendl;
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read() || m_manual_stop) {
+ return;
+ }
+
for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
auto &replayer = it->second;
replayer->flush();
void handle_signal(int signum);
void print_status(Formatter *f, stringstream *ss);
+ void start();
+ void stop();
+ void restart();
void flush();
private:
std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
std::map<peer_t, std::unique_ptr<Replayer> > m_replayers;
atomic_t m_stopping;
+ bool m_manual_stop = false;
MirrorAdminSocketHook *m_asok_hook;
};
Replayer *replayer;
};
+class StartCommand : public ReplayerAdminSocketCommand {
+public:
+ explicit StartCommand(Replayer *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->start();
+ return true;
+ }
+
+private:
+ Replayer *replayer;
+};
+
+class StopCommand : public ReplayerAdminSocketCommand {
+public:
+ explicit StopCommand(Replayer *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->stop();
+ return true;
+ }
+
+private:
+ Replayer *replayer;
+};
+
+class RestartCommand : public ReplayerAdminSocketCommand {
+public:
+ explicit RestartCommand(Replayer *replayer) : replayer(replayer) {}
+
+ bool call(Formatter *f, stringstream *ss) {
+ replayer->restart();
+ return true;
+ }
+
+private:
+ Replayer *replayer;
+};
+
class FlushCommand : public ReplayerAdminSocketCommand {
public:
explicit FlushCommand(Replayer *replayer) : replayer(replayer) {}
commands[command] = new StatusCommand(replayer);
}
+ command = "rbd mirror start " + name;
+ r = admin_socket->register_command(command, command, this,
+ "start rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StartCommand(replayer);
+ }
+
+ command = "rbd mirror stop " + name;
+ r = admin_socket->register_command(command, command, this,
+ "stop rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new StopCommand(replayer);
+ }
+
+ command = "rbd mirror restart " + name;
+ r = admin_socket->register_command(command, command, this,
+ "restart rbd mirror " + name);
+ if (r == 0) {
+ commands[command] = new RestartCommand(replayer);
+ }
+
command = "rbd mirror flush " + name;
r = admin_socket->register_command(command, command, this,
"flush rbd mirror " + name);
while (!m_stopping.read()) {
Mutex::Locker l(m_lock);
- set_sources(m_pool_watcher->get_images());
+ if (!m_manual_stop) {
+ set_sources(m_pool_watcher->get_images());
+ }
m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
}
}
}
-void Replayer::flush()
+void Replayer::start()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ m_manual_stop = false;
+
+ for (auto it = m_images.begin(); it != m_images.end(); it++) {
+ auto &pool_images = it->second;
+ for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+ auto &image_replayer = i->second;
+ image_replayer->start(nullptr, nullptr, true);
+ }
+ }
+}
+
+void Replayer::stop()
{
dout(20) << "enter" << dendl;
return;
}
+ m_manual_stop = true;
+
+ for (auto it = m_images.begin(); it != m_images.end(); it++) {
+ auto &pool_images = it->second;
+ for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+ auto &image_replayer = i->second;
+ image_replayer->stop(nullptr, true);
+ }
+ }
+}
+
+void Replayer::restart()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read()) {
+ return;
+ }
+
+ m_manual_stop = false;
+
+ for (auto it = m_images.begin(); it != m_images.end(); it++) {
+ auto &pool_images = it->second;
+ for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+ auto &image_replayer = i->second;
+ image_replayer->restart();
+ }
+ }
+}
+
+void Replayer::flush()
+{
+ dout(20) << "enter" << dendl;
+
+ Mutex::Locker l(m_lock);
+
+ if (m_stopping.read() || m_manual_stop) {
+ return;
+ }
+
for (auto it = m_images.begin(); it != m_images.end(); it++) {
auto &pool_images = it->second;
for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
void run();
void print_status(Formatter *f, stringstream *ss);
+ void start();
+ void stop();
+ void restart();
void flush();
private:
Mutex m_lock;
Cond m_cond;
atomic_t m_stopping;
+ bool m_manual_stop = false;
peer_t m_peer;
std::vector<const char*> m_args;