} while (bytes_remaining > 0);
return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
- offset, length, flush_entry);
+ offset, length, flush_entry, 0);
}
template <typename I>
uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
+ bool flush_entry, int filter_ret_val) {
bufferlist bl;
event_entry.timestamp = ceph_clock_now();
::encode(event_entry, bl);
return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
- length, flush_entry);
+ length, flush_entry, filter_ret_val);
}
template <typename I>
const Bufferlists &bufferlists,
const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
+ bool flush_entry, int filter_ret_val) {
assert(!bufferlists.empty());
uint64_t tid;
{
Mutex::Locker event_locker(m_event_lock);
- m_events[tid] = Event(futures, requests, offset, length);
+ m_events[tid] = Event(futures, requests, offset, length, filter_ret_val);
}
CephContext *cct = m_image_ctx.cct;
<< "r=" << r << dendl;
Event &event = it->second;
+ if (r < 0 && r == event.filter_ret_val) {
+ // ignore allowed error codes
+ r = 0;
+ }
if (r < 0) {
// event recorded to journal but failed to update disk, we cannot
// commit this IO event. this event must be replayed.
uint64_t append_io_event(journal::EventEntry &&event_entry,
const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry);
+ bool flush_entry, int filter_ret_val);
void commit_io_event(uint64_t tid, int r);
void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
int r);
IOObjectRequests aio_object_requests;
Contexts on_safe_contexts;
ExtentInterval pending_extents;
+ int filter_ret_val = 0;
bool committed_io = false;
bool safe = false;
int ret_val = 0;
Event() {
}
Event(const Futures &_futures, const IOObjectRequests &_requests,
- uint64_t offset, size_t length)
- : futures(_futures), aio_object_requests(_requests) {
+ uint64_t offset, size_t length, int filter_ret_val)
+ : futures(_futures), aio_object_requests(_requests),
+ filter_ret_val(filter_ret_val) {
if (length > 0) {
pending_extents.insert(offset, length);
}
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
const IOObjectRequests &requests,
- uint64_t offset, size_t length, bool flush_entry);
+ uint64_t offset, size_t length, bool flush_entry,
+ int filter_ret_val);
Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
void create_journaler();
this->m_skip_partial_discard));
tid = image_ctx.journal->append_io_event(std::move(event_entry),
requests, extent.first,
- extent.second, synchronous);
+ extent.second, synchronous, 0);
}
AioCompletion *aio_comp = this->m_aio_comp;
// in-flight ops are flushed prior to closing the journal
uint64_t journal_tid = image_ctx.journal->append_io_event(
journal::EventEntry(journal::AioFlushEvent()),
- ObjectRequests(), 0, 0, false);
+ ObjectRequests(), 0, 0, false, 0);
aio_comp->set_request_count(1);
aio_comp->associate_journal_event(journal_tid);
m_data_bl));
tid = image_ctx.journal->append_io_event(std::move(event_entry),
requests, extent.first,
- extent.second, synchronous);
+ extent.second, synchronous, 0);
}
if (image_ctx.object_cacher == NULL) {
m_cmp_bl, m_bl));
tid = image_ctx.journal->append_io_event(std::move(event_entry),
requests, extent.first,
- extent.second, synchronous);
+ extent.second, synchronous, -EILSEQ);
AioCompletion *aio_comp = this->m_aio_comp;
aio_comp->associate_journal_event(tid);
{
RWLock::RLocker owner_locker(ictx->owner_lock);
uint64_t tid = ictx->journal->append_io_event(std::move(event_entry),
- requests, 0, 0, true);
+ requests, 0, 0, true, 0);
ictx->journal->wait_event(tid, &ctx);
}
ASSERT_EQ(0, ctx.wait());
MOCK_METHOD5(append_write_event, uint64_t(uint64_t, size_t,
const bufferlist &,
const ObjectRequests &, bool));
- MOCK_METHOD5(append_io_event_mock, uint64_t(const journal::EventEntry&,
+ MOCK_METHOD6(append_io_event_mock, uint64_t(const journal::EventEntry&,
const ObjectRequests &,
- uint64_t, size_t, bool));
+ uint64_t, size_t, bool, int));
uint64_t append_io_event(journal::EventEntry &&event_entry,
const ObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
+ bool flush_entry, int filter_ret_val) {
// googlemock doesn't support move semantics
return append_io_event_mock(event_entry, requests, offset, length,
- flush_entry);
+ flush_entry, filter_ret_val);
}
MOCK_METHOD3(append_op_event_mock, void(uint64_t, const journal::EventEntry&,
uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx,
MockJournal &mock_journal,
- io::ObjectRequest<> *object_request = nullptr) {
+ io::ObjectRequest<> *object_request,
+ int filter_ret_val) {
RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
MockJournal::IOObjectRequests object_requests;
if (object_request != nullptr) {
}
return mock_journal.append_io_event(
journal::EventEntry{journal::AioFlushEvent{}}, object_requests, 0, 0,
- false);
+ false, filter_ret_val);
}
void save_commit_context(Context *ctx) {
Context *on_journal_safe1;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe1);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, nullptr, 0));
mock_journal.get_work_queue()->drain();
Context *on_journal_safe2;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe2);
- ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal));
+ ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal, nullptr, 0));
mock_journal.get_work_queue()->drain();
// commit journal event followed by IO event (standard)
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
- object_request));
+ object_request, 0));
mock_journal.get_work_queue()->drain();
// commit the event in the journal w/o waiting writeback
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
- object_request));
+ object_request, 0));
mock_journal.get_work_queue()->drain();
expect_future_is_valid(mock_future);
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, nullptr, 0));
mock_journal.get_work_queue()->drain();
// failed IO remains uncommitted in journal
expect_shut_down_journaler(mock_journaler);
}
+TEST_F(TestMockJournal, IOCommitErrorFiltered) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ MockJournalOpenRequest mock_open_request;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler, mock_open_request);
+ BOOST_SCOPE_EXIT_ALL(&) {
+ close_journal(mock_journal, mock_journaler);
+ };
+
+ ::journal::MockFuture mock_future;
+ Context *on_journal_safe;
+ expect_append_journaler(mock_journaler);
+ expect_wait_future(mock_future, &on_journal_safe);
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, nullptr,
+ -EILSEQ));
+ mock_journal.get_work_queue()->drain();
+
+ // filter failed IO committed in journal
+ on_journal_safe->complete(0);
+ ictx->op_work_queue->drain();
+ expect_future_committed(mock_journaler);
+ mock_journal.commit_io_event(1U, -EILSEQ);
+
+ expect_shut_down_journaler(mock_journaler);
+}
+
TEST_F(TestMockJournal, FlushCommitPosition) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);