test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position'
compare_images ${POOL} ${image}
+testlog "TEST: client disconnect"
+image=laggy
+create_image ${CLUSTER2} ${POOL} ${image} 128 --journal-object-size 64K
+write_image ${CLUSTER2} ${POOL} ${image} 10
+
+testlog " - replay stopped after disconnect"
+wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
+wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+disconnect_image ${CLUSTER2} ${POOL} ${image}
+test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
+test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
+
+testlog " - replay started after resync requested"
+request_resync_image ${CLUSTER1} ${POOL} ${image}
+wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
+wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
+wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+compare_images ${POOL} ${image}
+
+testlog " - disconnected after max_concurrent_object_sets reached"
+admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+set_image_meta ${CLUSTER2} ${POOL} ${image} \
+ conf_rbd_journal_max_concurrent_object_sets 1
+write_image ${CLUSTER2} ${POOL} ${image} 20 16384
+write_image ${CLUSTER2} ${POOL} ${image} 20 16384
+test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+set_image_meta ${CLUSTER2} ${POOL} ${image} \
+ conf_rbd_journal_max_concurrent_object_sets 0
+
+testlog " - replay is still stopped (disconnected) after restart"
+admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
+test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
+
+testlog " - replay started after resync requested"
+request_resync_image ${CLUSTER1} ${POOL} ${image}
+wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
+wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
+wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+compare_images ${POOL} ${image}
+
echo OK
local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status
rbd --cluster ${cluster} -p ${pool} journal status --image ${image} |
tee ${status_log} >&2
- sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \
+ sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \
${status_log}
}
}
create_image()
+{
+ local cluster=$1 ; shift
+ local pool=$1 ; shift
+ local image=$1 ; shift
+ local size=128
+
+ if [ -n "$1" ]; then
+ size=$1
+ shift
+ fi
+
+ rbd --cluster ${cluster} -p ${pool} create --size ${size} \
+ --image-feature layering,exclusive-lock,journaling $@ ${image}
+}
+
+set_image_meta()
{
local cluster=$1
local pool=$2
local image=$3
+ local key=$4
+ local val=$5
- rbd --cluster ${cluster} -p ${pool} create --size 128 \
- --image-feature layering,exclusive-lock,journaling ${image}
+ rbd --cluster ${cluster} -p ${pool} image-meta set ${image} $key $val
}
remove_image()
${clone_pool}/${clone_image} --image-feature layering,exclusive-lock,journaling
}
+disconnect_image()
+{
+ local cluster=$1
+ local pool=$2
+ local image=$3
+
+ rbd --cluster ${cluster} -p ${pool} journal client disconnect \
+ --image ${image}
+}
+
create_snapshot()
{
local cluster=$1
local pool=$2
local image=$3
local count=$4
+ local size=$5
+
+ test -n "${size}" || size=4096
rbd --cluster ${cluster} -p ${pool} bench-write ${image} \
- --io-size 4096 --io-threads 1 --io-total $((4096 * count)) \
+ --io-size ${size} --io-threads 1 --io-total $((size * count)) \
--io-pattern rand
}
Context*));
MOCK_METHOD2(register_client, void(const bufferlist &, Context *));
+ MOCK_METHOD1(unregister_client, void(Context *));
MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *,
Context *));
MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
int register_client(const bufferlist &data) {
return -EINVAL;
}
- void unregister_client(Context *ctx) {
- ctx->complete(-EINVAL);
- }
void allocate_tag(uint64_t, const bufferlist &,
cls::journal::Tag*, Context *on_finish) {
MockJournaler::get_instance().register_client(data, on_finish);
}
+ void unregister_client(Context *on_finish) {
+ MockJournaler::get_instance().unregister_client(on_finish);
+ }
+
void get_client(const std::string &client_id, cls::journal::Client *client,
Context *on_finish) {
MockJournaler::get_instance().get_client(client_id, client, on_finish);
std::set<cls::journal::Client>::const_iterator c;
for (c = registered_clients.begin(); c != registered_clients.end(); c++) {
std::cout << __func__ << ": client: " << *c << std::endl;
+ if (c->state != cls::journal::CLIENT_STATE_CONNECTED) {
+ continue;
+ }
cls::journal::ObjectPositions object_positions =
c->commit_position.object_positions;
cls::journal::ObjectPositions::const_iterator p =
close_image(ictx);
}
+TEST_F(TestImageReplayer, Disconnect)
+{
+ bootstrap();
+
+ // Test start fails if disconnected
+
+ librbd::ImageCtx *ictx;
+
+ generate_test_data();
+ open_remote_image(&ictx);
+ for (int i = 0; i < TEST_IO_COUNT; ++i) {
+ write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+ }
+ flush(ictx);
+ close_image(ictx);
+
+ std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
+ ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
+ m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
+
+ C_SaferCond cond1;
+ m_replayer->start(&cond1);
+ ASSERT_EQ(-ENOTCONN, cond1.wait());
+
+ // Test start succeeds after resync
+
+ open_local_image(&ictx);
+ librbd::Journal<>::request_resync(ictx);
+ close_image(ictx);
+ C_SaferCond cond2;
+ m_replayer->start(&cond2);
+ ASSERT_EQ(-ENOTCONN, cond2.wait());
+ C_SaferCond delete_cond;
+ m_image_deleter->wait_for_scheduled_deletion(
+ m_local_ioctx.get_id(), m_replayer->get_global_image_id(), &delete_cond);
+ EXPECT_EQ(0, delete_cond.wait());
+
+ start();
+ wait_for_replay_complete();
+
+ // Test replay stopped after disconnect
+
+ open_remote_image(&ictx);
+ for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
+ write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+ }
+ flush(ictx);
+ close_image(ictx);
+
+ ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
+ m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
+ bufferlist bl;
+ ASSERT_EQ(0, m_remote_ioctx.notify2(oid, bl, 5000, NULL));
+
+ wait_for_stopped();
+
+ // Test start fails after disconnect
+
+ C_SaferCond cond3;
+ m_replayer->start(&cond3);
+ ASSERT_EQ(-ENOTCONN, cond3.wait());
+ C_SaferCond cond4;
+ m_replayer->start(&cond4);
+ ASSERT_EQ(-ENOTCONN, cond4.wait());
+}
}
}
+template <typename I>
+void ImageReplayer<I>::RemoteJournalerListener::handle_update(
+ ::journal::JournalMetadata *) {
+ FunctionContext *ctx = new FunctionContext([this](int r) {
+ replayer->handle_remote_journal_metadata_updated();
+ });
+ replayer->m_threads->work_queue->queue(ctx, 0);
+}
+
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads,
shared_ptr<ImageDeleter> image_deleter,
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
m_progress_cxt(this),
- m_resync_listener(new ResyncListener<I>(this))
+ m_resync_listener(new ResyncListener<I>(this)),
+ m_remote_listener(this)
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
return;
}
+ m_remote_journaler->add_listener(&m_remote_listener);
+
+ cls::journal::Client client;
+ r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
+ if (r < 0) {
+ derr << "error retrieving remote journal client: " << cpp_strerror(r)
+ << dendl;
+ on_start_fail(r, "error retrieving remote journal client");
+ return;
+ }
+
+ if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+ dout(5) << "client flagged disconnected, stopping image replay" << dendl;
+ on_start_fail(-ENOTCONN, "disconnected");
+ return;
+ }
+
start_replay();
}
}
template <typename I>
-void ImageReplayer<I>::stop(Context *on_finish, bool manual)
+void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
+ const std::string& desc)
{
- dout(20) << "on_finish=" << on_finish << dendl;
+ dout(20) << "on_finish=" << on_finish << ", manual=" << manual
+ << ", desc=" << desc << dendl;
image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
bool shut_down_replay = false;
bool running = true;
{
Mutex::Locker locker(m_lock);
+
if (!is_running_()) {
running = false;
} else {
}
if (shut_down_replay) {
- on_stop_journal_replay();
+ on_stop_journal_replay(r, desc);
} else if (on_finish != nullptr) {
on_finish->complete(0);
}
}
template <typename I>
-void ImageReplayer<I>::on_stop_journal_replay()
+void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
{
dout(20) << "enter" << dendl;
m_state = STATE_STOPPING;
}
- set_state_description(0, "");
+ set_state_description(r, desc);
update_mirror_image_status(false, boost::none);
reschedule_update_status_task(-1);
shut_down(0);
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {
+ m_remote_journaler->remove_listener(&m_remote_listener);
m_remote_journaler->shut_down(ctx);
});
if (m_stopping_for_resync) {
}
}
+template <typename I>
+void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
+ dout(20) << dendl;
+
+ cls::journal::Client client;
+ {
+ Mutex::Locker locker(m_lock);
+ if (!is_running_()) {
+ return;
+ }
+
+ int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
+ if (r < 0) {
+ derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ return;
+ }
+ }
+
+ if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+ dout(0) << "client flagged disconnected, stopping image replay" << dendl;
+ stop(nullptr, false, -ENOTCONN, "disconnected");
+ }
+}
+
template <typename I>
std::string ImageReplayer<I>::to_string(const State state) {
switch (state) {
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
#include "cls/rbd/cls_rbd_types.h"
+#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Types.h"
}
void start(Context *on_finish = nullptr, bool manual = false);
- void stop(Context *on_finish = nullptr, bool manual = false);
+ void stop(Context *on_finish = nullptr, bool manual = false,
+ int r = 0, const std::string& desc = "");
void restart(Context *on_finish = nullptr);
void flush(Context *on_finish = nullptr);
virtual void on_start_fail(int r, const std::string &desc = "");
virtual bool on_start_interrupted();
- virtual void on_stop_journal_replay();
+ virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
virtual void on_flush_local_replay_flush_start(Context *on_flush);
virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;
+ struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
+ ImageReplayer *replayer;
+
+ RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
+
+ void handle_update(::journal::JournalMetadata *);
+ } m_remote_listener;
+
struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;
ReplayEntry replay_entry;
void shut_down(int r);
void handle_shut_down(int r);
+ void handle_remote_journal_metadata_updated();
void bootstrap();
void handle_bootstrap(int r);
m_ret_val = r;
close_remote_image();
return;
+ } if (m_client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ dout(10) << ": client flagged disconnected -- skipping bootstrap" << dendl;
+ // The caller is expected to detect disconnect initializing remote journal.
+ m_ret_val = 0;
+ close_remote_image();
+ return;
}
update_client_image();