m_journaler->flush_commit_position(on_finish);
}
+template <typename I>
+uint64_t Journal<I>::append_write_event(AioCompletion *aio_comp,
+ uint64_t offset, size_t length,
+ const bufferlist &bl,
+ const AioObjectRequests &requests,
+ bool flush_entry) {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
+ uint64_t max_write_data_size =
+ m_max_append_size - journal::AioWriteEvent::get_fixed_size();
+
+ // ensure that the write event fits within the journal entry
+ Bufferlists bufferlists;
+ uint64_t bytes_remaining = length;
+ uint64_t event_offset = 0;
+ do {
+ uint64_t event_length = MIN(bytes_remaining, max_write_data_size);
+
+ bufferlist event_bl;
+ event_bl.substr_of(bl, event_offset, event_length);
+ journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
+ event_length,
+ event_bl));
+
+ bufferlists.emplace_back();
+ ::encode(event_entry, bufferlists.back());
+
+ event_offset += event_length;
+ bytes_remaining -= event_length;
+ } while (bytes_remaining > 0);
+
+ return append_io_events(aio_comp, journal::EVENT_TYPE_AIO_WRITE, bufferlists,
+ requests, offset, length, flush_entry);
+}
+
template <typename I>
uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
journal::EventEntry &&event_entry,
bufferlist bl;
::encode(event_entry, bl);
+ return append_io_events(aio_comp, event_entry.get_event_type(), {bl},
+ requests, offset, length, flush_entry);
+}
- Future future;
+template <typename I>
+uint64_t Journal<I>::append_io_events(AioCompletion *aio_comp,
+ journal::EventType event_type,
+ const Bufferlists &bufferlists,
+ const AioObjectRequests &requests,
+ uint64_t offset, size_t length,
+ bool flush_entry) {
+ assert(m_image_ctx.owner_lock.is_locked());
+ assert(!bufferlists.empty());
+
+ Futures futures;
uint64_t tid;
{
Mutex::Locker locker(m_lock);
tid = ++m_event_tid;
assert(tid != 0);
- future = m_journaler->append(m_tag_tid, bl);
- m_events[tid] = Event(future, aio_comp, requests, offset, length);
+ for (auto &bl : bufferlists) {
+ assert(bl.length() <= m_max_append_size);
+ futures.push_back(m_journaler->append(m_tag_tid, bl));
+ }
+ m_events[tid] = Event(futures, aio_comp, requests, offset, length);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
- << "event=" << event_entry.get_event_type() << ", "
+ << "event=" << event_type << ", "
<< "new_reqs=" << requests.size() << ", "
<< "offset=" << offset << ", "
<< "length=" << length << ", "
Context *on_safe = create_async_context_callback(
m_image_ctx, new C_IOEventSafe(this, tid));
if (flush_entry) {
- future.flush(on_safe);
+ futures.back().flush(on_safe);
} else {
- future.wait(on_safe);
+ futures.back().wait(on_safe);
}
return tid;
}
event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
on_safe));
- return event.future;
+ return event.futures.back();
}
template <typename I>
event.committed_io = true;
if (event.safe) {
if (r >= 0) {
- m_journaler->committed(event.future);
+ for (auto &future : event.futures) {
+ m_journaler->committed(future);
+ }
}
m_events.erase(it);
}
return;
}
+ m_max_append_size = m_journaler->get_max_append_size();
+ ldout(cct, 20) << this << " max_append_size=" << m_max_append_size << dendl;
+
// locate the master image client record
cls::journal::Client client;
r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID,
// failed journal write so IO won't be sent -- or IO extent was
// overwritten by future IO operations so this was a no-op IO event
event.ret_val = r;
- m_journaler->committed(event.future);
+ for (auto &future : event.futures) {
+ m_journaler->committed(future);
+ }
}
if (event.committed_io) {
void flush_commit_position(Context *on_finish);
+ uint64_t append_write_event(AioCompletion *aio_comp,
+ uint64_t offset, size_t length,
+ const bufferlist &bl,
+ const AioObjectRequests &requests,
+ bool flush_entry);
uint64_t append_io_event(AioCompletion *aio_comp,
journal::EventEntry &&event_entry,
const AioObjectRequests &requests,
typedef typename TypeTraits::Future Future;
typedef typename TypeTraits::ReplayEntry ReplayEntry;
+ typedef std::list<bufferlist> Bufferlists;
typedef std::list<Context *> Contexts;
+ typedef std::list<Future> Futures;
typedef interval_set<uint64_t> ExtentInterval;
struct Event {
- Future future;
+ Futures futures;
AioCompletion *aio_comp = nullptr;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
Event() {
}
- Event(const Future &_future, AioCompletion *_aio_comp,
+ Event(const Futures &_futures, AioCompletion *_aio_comp,
const AioObjectRequests &_requests, uint64_t offset, size_t length)
- : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) {
+ : futures(_futures), aio_comp(_aio_comp), aio_object_requests(_requests) {
if (length > 0) {
pending_extents.insert(offset, length);
}
Journaler *m_journaler;
mutable Mutex m_lock;
State m_state;
+ uint64_t m_max_append_size = 0;
uint64_t m_tag_class = 0;
uint64_t m_tag_tid = 0;
journal::TagData m_tag_data;
journal::Replay<ImageCtxT> *m_journal_replay;
+ uint64_t append_io_events(AioCompletion *aio_comp,
+ journal::EventType event_type,
+ const Bufferlists &bufferlists,
+ const AioObjectRequests &requests,
+ uint64_t offset, size_t length, bool flush_entry);
Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
void create_journaler();
.WillOnce(CompleteContext(r, NULL));
}
+ void expect_get_max_append_size(::journal::MockJournaler &mock_journaler,
+ uint32_t max_size) {
+ EXPECT_CALL(mock_journaler, get_max_append_size())
+ .WillOnce(Return(max_size));
+ }
+
void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) {
journal::ImageClientMeta image_client_meta;
return ctx.wait();
}
+ uint64_t when_append_write_event(MockJournalImageCtx &mock_image_ctx,
+ MockJournal &mock_journal, uint64_t length) {
+ bufferlist bl;
+ bl.append_zero(length);
+
+ RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
+ return mock_journal.append_write_event(nullptr, 0, length, bl, {}, false);
+ }
+
uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx,
MockJournal &mock_journal,
AioCompletion *aio_comp = nullptr) {
InSequence seq;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, -ENOENT);
ASSERT_EQ(-ENOENT, when_open(mock_journal));
}
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
ASSERT_EQ(-EBADMSG, when_open(mock_journal));
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
// replay failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
// replay flush failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
// replay write-to-disk failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
// replay write-to-disk failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
+ expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
ASSERT_EQ(0, event_ctx.wait());
}
+TEST_F(TestMockJournal, AppendWriteEvent) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler);
+ BOOST_SCOPE_EXIT_ALL(&) {
+ close_journal(mock_journal, mock_journaler);
+ };
+
+ InSequence seq;
+
+ ::journal::MockFuture mock_future;
+ Context *on_journal_safe = nullptr;
+ expect_append_journaler(mock_journaler);
+ expect_append_journaler(mock_journaler);
+ expect_append_journaler(mock_journaler);
+ expect_wait_future(mock_future, &on_journal_safe);
+ ASSERT_EQ(1U, when_append_write_event(mock_image_ctx, mock_journal, 1 << 17));
+
+ on_journal_safe->complete(0);
+ C_SaferCond event_ctx;
+ mock_journal.wait_event(1U, &event_ctx);
+ ASSERT_EQ(0, event_ctx.wait());
+
+ expect_future_committed(mock_journaler);
+ expect_future_committed(mock_journaler);
+ expect_future_committed(mock_journaler);
+ mock_journal.commit_io_event(1U, 0);
+ ictx->op_work_queue->drain();
+}
+
TEST_F(TestMockJournal, EventCommitError) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);