OPTION(rbd_validate_names, OPT_BOOL, true) // true if image specs should be validated
OPTION(rbd_auto_exclusive_lock_until_manual_request, OPT_BOOL, true) // whether to automatically acquire/release exclusive lock until it is explicitly requested, i.e. before we know the user of librbd is properly using the lock API
OPTION(rbd_mirroring_resync_after_disconnect, OPT_BOOL, false) // automatically start image resync after mirroring is disconnected due to being laggy
+OPTION(rbd_mirroring_replay_delay, OPT_INT, 0) // time-delay in seconds for rbd-mirror asynchronous replication
/*
* The following options change the behavior for librbd's image creation methods that
"rbd_journal_pool", false)(
"rbd_journal_max_payload_bytes", false)(
"rbd_journal_max_concurrent_object_sets", false)(
- "rbd_mirroring_resync_after_disconnect", false);
+ "rbd_mirroring_resync_after_disconnect", false)(
+ "rbd_mirroring_replay_delay", false);
md_config_t local_config_t;
std::map<std::string, bufferlist> res;
ASSIGN_OPTION(journal_max_payload_bytes);
ASSIGN_OPTION(journal_max_concurrent_object_sets);
ASSIGN_OPTION(mirroring_resync_after_disconnect);
+ ASSIGN_OPTION(mirroring_replay_delay);
}
ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
uint32_t journal_max_payload_bytes;
int journal_max_concurrent_object_sets;
bool mirroring_resync_after_disconnect;
+ int mirroring_replay_delay;
LibrbdAdminSocketHook *asok_hook;
return r;
}
- journal::EventEntry event_entry{journal::DemoteEvent{}};
+ journal::EventEntry event_entry{journal::DemoteEvent{}, ceph_clock_now()};
bufferlist event_entry_bl;
::encode(event_entry, event_entry_bl);
event_bl.substr_of(bl, event_offset, event_length);
journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
event_length,
- event_bl));
+ event_bl),
+ ceph_clock_now());
bufferlists.emplace_back();
::encode(event_entry, bufferlists.back());
uint64_t offset, size_t length,
bool flush_entry) {
bufferlist bl;
+ event_entry.timestamp = ceph_clock_now();
::encode(event_entry, bl);
return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
length, flush_entry);
assert(m_image_ctx.owner_lock.is_locked());
bufferlist bl;
+ event_entry.timestamp = ceph_clock_now();
::encode(event_entry, bl);
Future future;
ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
<< "r=" << r << dendl;
- journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)));
+ journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
+ ceph_clock_now());
bufferlist bl;
::encode(event_entry, bl);
f->dump_unsigned("length", length);
}
+uint32_t AioWriteEvent::get_fixed_size() {
+ return EventEntry::get_fixed_size() + 16 /* offset, length */;
+}
+
void AioWriteEvent::encode(bufferlist& bl) const {
::encode(offset, bl);
::encode(length, bl);
}
void EventEntry::encode(bufferlist& bl) const {
- ENCODE_START(3, 1, bl);
+ ENCODE_START(4, 1, bl);
boost::apply_visitor(EncodeVisitor(bl), event);
ENCODE_FINISH(bl);
+ encode_metadata(bl);
}
void EventEntry::decode(bufferlist::iterator& it) {
boost::apply_visitor(DecodeVisitor(struct_v, it), event);
DECODE_FINISH(it);
+ if (struct_v >= 4) {
+ decode_metadata(it);
+ }
}
void EventEntry::dump(Formatter *f) const {
boost::apply_visitor(DumpVisitor(f, "event_type"), event);
+ f->dump_stream("timestamp") << timestamp;
+}
+
+void EventEntry::encode_metadata(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(timestamp, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EventEntry::decode_metadata(bufferlist::iterator& it) {
+ DECODE_START(1, it);
+ ::decode(timestamp, it);
+ DECODE_FINISH(it);
}
void EventEntry::generate_test_instances(std::list<EventEntry *> &o) {
o.push_back(new EventEntry(AioDiscardEvent()));
- o.push_back(new EventEntry(AioDiscardEvent(123, 345)));
+ o.push_back(new EventEntry(AioDiscardEvent(123, 345), utime_t(1, 1)));
bufferlist bl;
bl.append(std::string(32, '1'));
o.push_back(new EventEntry(AioWriteEvent()));
- o.push_back(new EventEntry(AioWriteEvent(123, 456, bl)));
+ o.push_back(new EventEntry(AioWriteEvent(123, 456, bl), utime_t(1, 1)));
o.push_back(new EventEntry(AioFlushEvent()));
- o.push_back(new EventEntry(OpFinishEvent(123, -1)));
+ o.push_back(new EventEntry(OpFinishEvent(123, -1), utime_t(1, 1)));
- o.push_back(new EventEntry(SnapCreateEvent()));
+ o.push_back(new EventEntry(SnapCreateEvent(), utime_t(1, 1)));
o.push_back(new EventEntry(SnapCreateEvent(234, "snap",
- cls::rbd::UserSnapshotNamespace())));
+ cls::rbd::UserSnapshotNamespace()),
+ utime_t(1, 1)));
o.push_back(new EventEntry(SnapRemoveEvent()));
- o.push_back(new EventEntry(SnapRemoveEvent(345, "snap")));
+ o.push_back(new EventEntry(SnapRemoveEvent(345, "snap"), utime_t(1, 1)));
o.push_back(new EventEntry(SnapRenameEvent()));
- o.push_back(new EventEntry(SnapRenameEvent(456, 1, "src snap", "dest snap")));
+ o.push_back(new EventEntry(SnapRenameEvent(456, 1, "src snap", "dest snap"),
+ utime_t(1, 1)));
o.push_back(new EventEntry(SnapProtectEvent()));
- o.push_back(new EventEntry(SnapProtectEvent(567, "snap")));
+ o.push_back(new EventEntry(SnapProtectEvent(567, "snap"), utime_t(1, 1)));
o.push_back(new EventEntry(SnapUnprotectEvent()));
- o.push_back(new EventEntry(SnapUnprotectEvent(678, "snap")));
+ o.push_back(new EventEntry(SnapUnprotectEvent(678, "snap"), utime_t(1, 1)));
o.push_back(new EventEntry(SnapRollbackEvent()));
- o.push_back(new EventEntry(SnapRollbackEvent(789, "snap")));
+ o.push_back(new EventEntry(SnapRollbackEvent(789, "snap"), utime_t(1, 1)));
o.push_back(new EventEntry(RenameEvent()));
- o.push_back(new EventEntry(RenameEvent(890, "image name")));
+ o.push_back(new EventEntry(RenameEvent(890, "image name"), utime_t(1, 1)));
o.push_back(new EventEntry(ResizeEvent()));
- o.push_back(new EventEntry(ResizeEvent(901, 1234)));
+ o.push_back(new EventEntry(ResizeEvent(901, 1234), utime_t(1, 1)));
- o.push_back(new EventEntry(FlattenEvent(123)));
+ o.push_back(new EventEntry(FlattenEvent(123), utime_t(1, 1)));
o.push_back(new EventEntry(DemoteEvent()));
o.push_back(new EventEntry(UpdateFeaturesEvent()));
- o.push_back(new EventEntry(UpdateFeaturesEvent(123, 127, true)));
+ o.push_back(new EventEntry(UpdateFeaturesEvent(123, 127, true), utime_t(1, 1)));
o.push_back(new EventEntry(MetadataSetEvent()));
- o.push_back(new EventEntry(MetadataSetEvent(123, "key", "value")));
+ o.push_back(new EventEntry(MetadataSetEvent(123, "key", "value"), utime_t(1, 1)));
o.push_back(new EventEntry(MetadataRemoveEvent()));
- o.push_back(new EventEntry(MetadataRemoveEvent(123, "key")));
+ o.push_back(new EventEntry(MetadataRemoveEvent(123, "key"), utime_t(1, 1)));
}
// Journal Client
#include "include/buffer.h"
#include "include/encoding.h"
#include "include/types.h"
+#include "include/utime.h"
#include <iosfwd>
#include <list>
#include <boost/none.hpp>
uint64_t length;
bufferlist data;
- static uint32_t get_fixed_size() {
- return 30; /// version encoding, type, offset, length
- }
+ static uint32_t get_fixed_size();
AioWriteEvent() : offset(0), length(0) {
}
UnknownEvent> Event;
struct EventEntry {
+ static uint32_t get_fixed_size() {
+ return EVENT_FIXED_SIZE + METADATA_FIXED_SIZE;
+ }
+
EventEntry() : event(UnknownEvent()) {
}
- EventEntry(const Event &_event) : event(_event) {
+ EventEntry(const Event &_event, const utime_t &_timestamp = utime_t())
+ : event(_event), timestamp(_timestamp) {
}
Event event;
+ utime_t timestamp;
EventType get_event_type() const;
void dump(Formatter *f) const;
static void generate_test_instances(std::list<EventEntry *> &o);
+
+private:
+ static const uint32_t EVENT_FIXED_SIZE = 14; /// version encoding, type
+ static const uint32_t METADATA_FIXED_SIZE = 14; /// version encoding, timestamp
+
+ void encode_metadata(bufferlist& bl) const;
+ void decode_metadata(bufferlist::iterator& it);
};
// Journal Client data structures
journal_max_concurrent_object_sets(
image_ctx.journal_max_concurrent_object_sets),
mirroring_resync_after_disconnect(
- image_ctx.mirroring_resync_after_disconnect)
+ image_ctx.mirroring_resync_after_disconnect),
+ mirroring_replay_delay(image_ctx.mirroring_replay_delay)
{
md_ctx.dup(image_ctx.md_ctx);
data_ctx.dup(image_ctx.data_ctx);
uint32_t journal_max_payload_bytes;
int journal_max_concurrent_object_sets;
bool mirroring_resync_after_disconnect;
+ int mirroring_replay_delay;
};
} // namespace librbd
stop();
}
+
+TEST_F(TestImageReplayer, MirroringDelay)
+{
+ const double DELAY = 10; // set less than wait_for_replay_complete timeout
+
+ librbd::ImageCtx *ictx;
+ utime_t start_time;
+ double delay;
+
+ bootstrap();
+
+ ASSERT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_replay_delay",
+ stringify(DELAY).c_str()));
+ open_local_image(&ictx);
+ ASSERT_EQ(DELAY, ictx->mirroring_replay_delay);
+ close_image(ictx);
+
+ start();
+
+ // Test delay
+
+ generate_test_data();
+ open_remote_image(&ictx);
+ start_time = ceph_clock_now();
+ for (int i = 0; i < TEST_IO_COUNT; ++i) {
+ write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+ }
+ flush(ictx);
+ close_image(ictx);
+
+ wait_for_replay_complete();
+ delay = ceph_clock_now() - start_time;
+ ASSERT_GE(delay, DELAY);
+
+ // Test stop when delaying replay
+
+ open_remote_image(&ictx);
+ start_time = ceph_clock_now();
+ for (int i = 0; i < TEST_IO_COUNT; ++i) {
+ write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+ }
+
+ sleep(DELAY / 2);
+ stop();
+ start();
+
+ wait_for_replay_complete();
+ delay = ceph_clock_now() - start_time;
+ ASSERT_GE(delay, DELAY);
+
+ stop();
+}
ASSERT_EQ(0, close_ctx.wait());
}
+TEST_F(TestMockImageReplayer, DelayedReplay) {
+
+ // START
+
+ create_local_image();
+ librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
+
+ librbd::MockTestJournal mock_local_journal;
+ mock_local_image_ctx.journal = &mock_local_journal;
+
+ journal::MockJournaler mock_remote_journaler;
+ MockBootstrapRequest mock_bootstrap_request;
+ MockReplay mock_local_replay;
+ MockEventPreprocessor mock_event_preprocessor;
+ MockReplayStatusFormatter mock_replay_status_formatter;
+ ::journal::MockReplayEntry mock_replay_entry;
+
+ expect_get_or_send_update(mock_replay_status_formatter);
+ expect_get_commit_tid_in_debug(mock_replay_entry);
+ expect_committed(mock_remote_journaler, 1);
+
+ InSequence seq;
+ EXPECT_CALL(mock_remote_journaler, construct());
+ expect_send(mock_bootstrap_request, mock_local_image_ctx, false, 0);
+
+ EXPECT_CALL(mock_local_journal, add_listener(_));
+
+ expect_init(mock_remote_journaler, 0);
+
+ EXPECT_CALL(mock_remote_journaler, add_listener(_));
+ expect_get_cached_client(mock_remote_journaler, 0);
+
+ expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
+
+ EXPECT_CALL(mock_remote_journaler, start_live_replay(_, _));
+
+ C_SaferCond start_ctx;
+ m_image_replayer->start(&start_ctx);
+ ASSERT_EQ(0, start_ctx.wait());
+
+ // REPLAY
+
+ cls::journal::Tag tag =
+ {1, 0, encode_tag_data({librbd::Journal<>::LOCAL_MIRROR_UUID,
+ librbd::Journal<>::LOCAL_MIRROR_UUID,
+ true, 0, 0})};
+
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+
+ // replay_flush
+ expect_shut_down(mock_local_replay, false, 0);
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_start_external_replay(mock_local_journal, &mock_local_replay, 0);
+ expect_get_tag(mock_remote_journaler, tag, 0);
+ expect_allocate_tag(mock_local_journal, 0);
+
+ // process with delay
+ EXPECT_CALL(mock_replay_entry, get_data());
+ librbd::journal::EventEntry event_entry(
+ librbd::journal::AioDiscardEvent(123, 345), ceph_clock_now());
+ EXPECT_CALL(mock_local_replay, decode(_, _))
+ .WillOnce(DoAll(SetArgPointee<1>(event_entry),
+ Return(0)));
+ expect_preprocess(mock_event_preprocessor, false, 0);
+ expect_process(mock_local_replay, 0, 0);
+
+ // attempt to process the next event
+ C_SaferCond replay_ctx;
+ expect_try_pop_front_return_no_entries(mock_remote_journaler, &replay_ctx);
+
+ // fire
+ mock_local_image_ctx.mirroring_replay_delay = 2;
+ m_image_replayer->handle_replay_ready();
+ ASSERT_EQ(0, replay_ctx.wait());
+
+ // add a pending (delayed) entry before stop
+ expect_try_pop_front(mock_remote_journaler, tag.tid, true);
+ EXPECT_CALL(mock_replay_entry, get_data());
+ C_SaferCond decode_ctx;
+ EXPECT_CALL(mock_local_replay, decode(_, _))
+ .WillOnce(DoAll(Invoke([&decode_ctx](bufferlist::iterator* it,
+ librbd::journal::EventEntry *e) {
+ decode_ctx.complete(0);
+ }),
+ Return(0)));
+
+ mock_local_image_ctx.mirroring_replay_delay = 10;
+ m_image_replayer->handle_replay_ready();
+ ASSERT_EQ(0, decode_ctx.wait());
+
+ // STOP
+
+ MockCloseImageRequest mock_close_local_image_request;
+
+ expect_stop_replay(mock_remote_journaler, 0);
+ expect_shut_down(mock_local_replay, true, 0);
+
+ EXPECT_CALL(mock_local_journal, remove_listener(_));
+ EXPECT_CALL(mock_local_journal, stop_external_replay());
+
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
+ expect_send(mock_close_local_image_request, 0);
+
+ C_SaferCond stop_ctx;
+ m_image_replayer->stop(&stop_ctx);
+ ASSERT_EQ(0, stop_ctx.wait());
+}
+
} // namespace mirror
} // namespace rbd
Commands commands;
};
+uint32_t calculate_replay_delay(const utime_t &event_time,
+ int mirroring_replay_delay) {
+ if (mirroring_replay_delay <= 0) {
+ return 0;
+ }
+
+ utime_t now = ceph_clock_now();
+ if (event_time + mirroring_replay_delay <= now) {
+ return 0;
+ }
+
+ // ensure it is rounded up when converting to integer
+ return (event_time + mirroring_replay_delay - now) + 1;
+}
+
} // anonymous namespace
template <typename I>
image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
bool shut_down_replay = false;
bool running = true;
+ bool canceled_task = false;
{
Mutex::Locker locker(m_lock);
std::swap(m_on_stop_finish, on_finish);
m_stop_requested = true;
m_manual_stop = manual;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ if (m_delayed_preprocess_task != nullptr) {
+ canceled_task = m_threads->timer->cancel_event(
+ m_delayed_preprocess_task);
+ assert(canceled_task);
+ m_delayed_preprocess_task = nullptr;
+ }
}
}
}
bootstrap_request->put();
}
+ if (canceled_task) {
+ m_event_replay_tracker.finish_op();
+ on_replay_interrupted();
+ }
+
if (!running) {
dout(20) << "not running" << dendl;
if (on_finish) {
return;
}
+ uint32_t delay = calculate_replay_delay(
+ m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
+ if (delay == 0) {
+ handle_preprocess_entry_ready(0);
+ return;
+ }
+
+ dout(20) << "delaying replay by " << delay << " sec" << dendl;
+
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ assert(m_delayed_preprocess_task == nullptr);
+ m_delayed_preprocess_task = new FunctionContext(
+ [this](int r) {
+ assert(m_threads->timer_lock.is_locked());
+ m_delayed_preprocess_task = nullptr;
+ m_threads->work_queue->queue(
+ create_context_callback<ImageReplayer,
+ &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
+ });
+ m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
+ dout(20) << "r=" << r << dendl;
+ assert(r == 0);
+
if (!m_event_preprocessor->is_required(m_event_entry)) {
process_entry();
return;
}
Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry>(this);
+ ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
m_event_preprocessor->preprocess(&m_event_entry, ctx);
}
template <typename I>
-void ImageReplayer<I>::handle_preprocess_entry(int r) {
+void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
dout(20) << "r=" << r << dendl;
if (r < 0) {
std::swap(on_start, m_on_start_finish);
std::swap(on_stop, m_on_stop_finish);
m_stop_requested = false;
+ assert(m_delayed_preprocess_task == nullptr);
assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
}
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;
AsyncOpTracker m_event_replay_tracker;
+ Context *m_delayed_preprocess_task = nullptr;
struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
ImageReplayer *replayer;
void handle_allocate_local_tag(int r);
void preprocess_entry();
- void handle_preprocess_entry(int r);
+ void handle_preprocess_entry_ready(int r);
+ void handle_preprocess_entry_safe(int r);
void process_entry();
void handle_process_entry_ready(int r);