ImageCtx *image_ctx;
std::list<Context *> flush_contexts;
- explicit C_CompleteFlushes(ImageCtx *image_ctx, std::list<Context *> &&flush_contexts)
+ explicit C_CompleteFlushes(ImageCtx *image_ctx,
+ std::list<Context *> &&flush_contexts)
: image_ctx(image_ctx), flush_contexts(std::move(flush_contexts)) {
}
void finish(int r) override {
m_flush_contexts.push_back(on_finish);
}
+void AsyncOperation::flush(Context* on_finish) {
+ {
+ Mutex::Locker locker(m_image_ctx->async_ops_lock);
+ xlist<AsyncOperation *>::iterator iter(&m_xlist_item);
+ ++iter;
+
+ // linked list stored newest -> oldest ops
+ if (!iter.end()) {
+ (*iter)->m_flush_contexts.push_back(on_finish);
+ return;
+ }
+ }
+
+ m_image_ctx->op_work_queue->queue(on_finish);
+}
+
} // namespace io
} // namespace librbd
}
};
-template <typename ImageCtxT = ImageCtx>
-struct C_FlushJournalCommit : public Context {
- ImageCtxT &image_ctx;
- AioCompletion *aio_comp;
-
- C_FlushJournalCommit(ImageCtxT &_image_ctx, AioCompletion *_aio_comp,
- uint64_t tid)
- : image_ctx(_image_ctx), aio_comp(_aio_comp) {
- CephContext *cct = image_ctx.cct;
- ldout(cct, 20) << "delaying flush until journal tid " << tid << " "
- << "safe" << dendl;
-
- aio_comp->add_request();
- }
-
- void finish(int r) override {
- CephContext *cct = image_ctx.cct;
- ldout(cct, 20) << "C_FlushJournalCommit: journal committed" << dendl;
- aio_comp->complete_request(r);
- }
-};
-
} // anonymous namespace
template <typename I>
}
AioCompletion *aio_comp = this->m_aio_comp;
+ aio_comp->set_request_count(1);
+
+ Context* ctx = new C_AioRequest(aio_comp);
+
+ // ensure no locks are held when flush is complete
+ ctx = util::create_async_context_callback(image_ctx, ctx);
+
if (journaling) {
// in-flight ops are flushed prior to closing the journal
uint64_t journal_tid = image_ctx.journal->append_io_event(
journal::EventEntry(journal::AioFlushEvent()),
ObjectRequests(), 0, 0, false, 0);
- aio_comp->set_request_count(1);
aio_comp->associate_journal_event(journal_tid);
- FunctionContext *flush_ctx = new FunctionContext(
- [aio_comp, &image_ctx, journal_tid] (int r) {
- auto ctx = new C_FlushJournalCommit<I>(image_ctx, aio_comp,
- journal_tid);
- image_ctx.journal->flush_event(journal_tid, ctx);
-
- // track flush op for block writes
- aio_comp->start_op(true);
- aio_comp->put();
+ ctx = new FunctionContext([ctx, &image_ctx, journal_tid](int r) {
+ image_ctx.journal->flush_event(journal_tid, ctx);
});
- image_ctx.flush_async_operations(flush_ctx);
- } else {
+ } else if (image_ctx.object_cacher != nullptr) {
// flush rbd cache only when journaling is not enabled
- aio_comp->set_request_count(1);
- C_AioRequest *req_comp = new C_AioRequest(aio_comp);
- image_ctx.flush(req_comp);
-
- aio_comp->start_op(true);
- aio_comp->put();
+ ctx = new FunctionContext([ctx, &image_ctx](int r) {
+ image_ctx.flush_cache(ctx);
+ });
}
+ // ensure all in-flight IOs are settled if non-user flush request
+ aio_comp->start_op(true);
+ aio_comp->async_op.flush(ctx);
+ aio_comp->put();
+
image_ctx.perfcounter->inc(l_librbd_aio_flush);
}
EXPECT_CALL(mock_image_ctx, user_flushed());
}
- void expect_flush(MockImageCtx &mock_image_ctx, int r) {
- EXPECT_CALL(mock_image_ctx, flush(_))
- .WillOnce(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue));
+ void expect_flush_cache(MockImageCtx &mock_image_ctx, int r) {
+ if (mock_image_ctx.object_cacher != nullptr) {
+ EXPECT_CALL(mock_image_ctx, flush_cache(_))
+ .WillOnce(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue));
+ }
}
};
MockJournal mock_journal;
mock_image_ctx.journal = &mock_journal;
+ expect_op_work_queue(mock_image_ctx);
+
InSequence seq;
expect_user_flushed(mock_image_ctx);
expect_is_journal_appending(mock_journal, false);
- expect_flush(mock_image_ctx, 0);
+ expect_flush_cache(mock_image_ctx, 0);
C_SaferCond aio_comp_ctx;
AioCompletion *aio_comp = AioCompletion::create_and_start(