template <typename I>
uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
const bufferlist &bl,
- const IOObjectRequests &requests,
bool flush_entry) {
assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
uint64_t max_write_data_size =
bytes_remaining -= event_length;
} while (bytes_remaining > 0);
- return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
- offset, length, flush_entry, 0);
+ return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
+ length, flush_entry, 0);
}
template <typename I>
uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
- const IOObjectRequests &requests,
uint64_t offset, size_t length,
bool flush_entry, int filter_ret_val) {
bufferlist bl;
event_entry.timestamp = ceph_clock_now();
encode(event_entry, bl);
- return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
- length, flush_entry, filter_ret_val);
+ return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
+ flush_entry, filter_ret_val);
}
template <typename I>
uint64_t Journal<I>::append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
- const IOObjectRequests &requests,
uint64_t offset, size_t length,
bool flush_entry, int filter_ret_val) {
assert(!bufferlists.empty());
{
Mutex::Locker event_locker(m_event_lock);
- m_events[tid] = Event(futures, requests, offset, length, filter_ret_val);
+ m_events[tid] = Event(futures, offset, length, filter_ret_val);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "event=" << event_type << ", "
- << "new_reqs=" << requests.size() << ", "
<< "offset=" << offset << ", "
<< "length=" << length << ", "
<< "flush=" << flush_entry << ", tid=" << tid << dendl;
<< "failed to commit IO event: " << cpp_strerror(r) << dendl;
}
- IOObjectRequests aio_object_requests;
Contexts on_safe_contexts;
{
Mutex::Locker event_locker(m_event_lock);
assert(it != m_events.end());
Event &event = it->second;
- aio_object_requests.swap(event.aio_object_requests);
on_safe_contexts.swap(event.on_safe_contexts);
if (r < 0 || event.committed_io) {
ldout(cct, 20) << this << " " << __func__ << ": "
<< "completing tid=" << tid << dendl;
- for (auto it = aio_object_requests.begin(); it != aio_object_requests.end();
- ++it) {
- if (r < 0) {
- // don't send aio requests if the journal fails -- bubble error up
- (*it)->fail(r);
- } else {
- // send any waiting aio requests now that journal entry is safe
- (*it)->send(tid);
- }
- }
// alert the cache about the journal event status
for (Contexts::iterator it = on_safe_contexts.begin();
class ImageCtx;
-namespace io { struct ObjectDispatchSpec; }
namespace journal { template <typename> class Replay; }
template <typename ImageCtxT = ImageCtx>
static const std::string LOCAL_MIRROR_UUID;
static const std::string ORPHAN_MIRROR_UUID;
- typedef std::list<io::ObjectDispatchSpec*> IOObjectRequests;
-
Journal(ImageCtxT &image_ctx);
~Journal();
uint64_t append_write_event(uint64_t offset, size_t length,
const bufferlist &bl,
- const IOObjectRequests &requests,
bool flush_entry);
uint64_t append_io_event(journal::EventEntry &&event_entry,
- const IOObjectRequests &requests,
uint64_t offset, size_t length,
bool flush_entry, int filter_ret_val);
void commit_io_event(uint64_t tid, int r);
struct Event {
Futures futures;
- IOObjectRequests aio_object_requests;
Contexts on_safe_contexts;
ExtentInterval pending_extents;
int filter_ret_val = 0;
Event() {
}
- Event(const Futures &_futures, const IOObjectRequests &_requests,
- uint64_t offset, size_t length, int filter_ret_val)
- : futures(_futures), aio_object_requests(_requests),
- filter_ret_val(filter_ret_val) {
+ Event(const Futures &_futures, uint64_t offset, size_t length,
+ int filter_ret_val)
+ : futures(_futures), filter_ret_val(filter_ret_val) {
if (length > 0) {
pending_extents.insert(offset, length);
}
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
- const IOObjectRequests &requests,
uint64_t offset, size_t length, bool flush_entry,
int filter_ret_val);
Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
auto req = io::ObjectDispatchSpec::create_read(
m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, oid.name, object_no, off, len,
snapid, op_flags, trace, &req_comp->bl, &req_comp->extent_map, req_comp);
- req->send(0);
+ req->send();
}
bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid)
req->object_dispatch_flags = (
io::OBJECT_DISPATCH_FLAG_FLUSH |
io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR);
- req->send(0);
+ req->send();
return ++m_tid;
}
ictx, io::OBJECT_DISPATCH_LAYER_NONE, q->oid.name, q->objectno,
q->offset, q->length, snap_id, 0, {}, &req_comp->read_data,
&req_comp->extent_map, req_comp);
- req->send(0);
+ req->send();
}
}
ictx->perfcounter->inc(l_librbd_readahead);
break;
}
- // inform the journal that the op has successfully committed
- if (journal_tid != 0) {
- assert(ictx->journal != NULL);
- ictx->journal->commit_io_event(journal_tid, rval);
- }
-
state = AIO_STATE_CALLBACK;
if (complete_cb) {
lock.Unlock();
put_unlock();
}
-void AioCompletion::associate_journal_event(uint64_t tid) {
- Mutex::Locker l(lock);
- assert(state == AIO_STATE_PENDING);
- journal_tid = tid;
-}
-
bool AioCompletion::is_complete() {
tracepoint(librbd, aio_is_complete_enter, this);
bool done;
AsyncOperation async_op;
- uint64_t journal_tid;
xlist<AioCompletion*>::item m_xlist_item;
bool event_notify;
complete_arg(NULL), rbd_comp(NULL),
pending_count(0), blockers(1),
ref(1), released(false), ictx(NULL),
- aio_type(AIO_TYPE_NONE),
- journal_tid(0), m_xlist_item(this), event_notify(false) {
+ aio_type(AIO_TYPE_NONE), m_xlist_item(this),
+ event_notify(false) {
}
~AioCompletion() {
}
void complete_request(ssize_t r);
- void associate_journal_event(uint64_t tid);
-
bool is_complete();
ssize_t get_return_value();
&image_ctx, OBJECT_DISPATCH_LAYER_NONE, extent.oid.name,
extent.objectno, extent.offset, extent.length, snap_id, m_op_flags,
this->m_trace, &req_comp->bl, &req_comp->extent_map, req_comp);
- req->send(0);
+ req->send();
}
}
// if journaling, stash the request for later; otherwise send
if (request != NULL) {
- request->send(0);
+ request->send();
}
}
}
buffer_offset += extent.second;
tid = image_ctx.journal->append_write_event(extent.first, extent.second,
- sub_bl, {}, synchronous);
+ sub_bl, synchronous);
}
return tid;
extent.second,
this->m_skip_partial_discard));
tid = image_ctx.journal->append_io_event(std::move(event_entry),
- {}, extent.first,
- extent.second, synchronous, 0);
+ extent.first, extent.second,
+ synchronous, 0);
}
return tid;
if (journaling) {
// in-flight ops are flushed prior to closing the journal
uint64_t journal_tid = image_ctx.journal->append_io_event(
- journal::EventEntry(journal::AioFlushEvent()),
- ObjectRequests(), 0, 0, false, 0);
+ journal::EventEntry(journal::AioFlushEvent()), 0, 0, false, 0);
ctx = new FunctionContext(
[&image_ctx, journal_tid, ctx](int r) {
&image_ctx, OBJECT_DISPATCH_LAYER_NONE, m_flush_source, this->m_trace,
ctx);
ctx = new FunctionContext([object_dispatch_spec](int r) {
- object_dispatch_spec->send(0);
+ object_dispatch_spec->send();
});
}
extent.second,
m_data_bl));
tid = image_ctx.journal->append_io_event(std::move(event_entry),
- {}, extent.first,
- extent.second, synchronous, 0);
+ extent.first, extent.second,
+ synchronous, 0);
}
return tid;
journal::AioCompareAndWriteEvent(extent.first, extent.second, m_cmp_bl,
m_bl));
tid = image_ctx.journal->append_io_event(std::move(event_entry),
- {}, extent.first,
- extent.second, synchronous, -EILSEQ);
+ extent.first, extent.second,
+ synchronous, -EILSEQ);
return tid;
}
switch (object_dispatch_spec->dispatch_result) {
case DISPATCH_RESULT_CONTINUE:
- object_dispatch_spec->send(0);
+ object_dispatch_spec->send();
break;
case DISPATCH_RESULT_COMPLETE:
finish(r);
delete object_dispatch_spec;
}
-struct ObjectDispatchSpec::SetJournalTid : public boost::static_visitor<void> {
- uint64_t journal_tid;
-
- SetJournalTid(uint64_t journal_tid) : journal_tid(journal_tid) {
- }
-
- template <typename T>
- void operator()(T& t) const {
- }
-
- void operator()(WriteRequestBase& write_request_base) const {
- write_request_base.journal_tid = journal_tid;
- }
-};
-
-void ObjectDispatchSpec::send(uint64_t journal_tid) {
- // TODO removed in future commit
- if (journal_tid != 0) {
- boost::apply_visitor(SetJournalTid{journal_tid}, request);
- }
-
+void ObjectDispatchSpec::send() {
object_dispatcher->send(this);
}
parent_trace, on_finish);
}
- void send(uint64_t journal_tid /* removed in future commit */);
+ void send();
void fail(int r);
private:
template <typename> friend class ObjectDispatcher;
- // TODO removed in future commmit
- struct SetJournalTid;
-
ObjectDispatchSpec(ObjectDispatcherInterface* object_dispatcher,
ObjectDispatchLayer object_dispatch_layer,
Request&& request, int op_flags,
&image_ctx, io::OBJECT_DISPATCH_LAYER_NONE, oid, m_object_no, 0,
image_ctx.layout.object_size, m_snapc,
io::OBJECT_DISCARD_FLAG_DISABLE_OBJECT_MAP_UPDATE, 0, {}, this);
- object_dispatch_spec->send(0);
+ object_dispatch_spec->send();
return 0;
}
private:
auto object_dispatch_spec = io::ObjectDispatchSpec::create_discard(
&image_ctx, io::OBJECT_DISPATCH_LAYER_NONE, p->oid.name, p->objectno,
p->offset, p->length, snapc, 0, 0, {}, req_comp);
- object_dispatch_spec->send(0);
+ object_dispatch_spec->send();
}
completion->finish_adding_requests();
}
#include "test/librbd/mock/MockJournal.h"
#include "test/librbd/mock/cache/MockImageCache.h"
#include "librbd/io/ImageRequest.h"
-#include "librbd/io/ObjectRequest.h"
#include "librbd/io/ObjectDispatchSpec.h"
namespace librbd {
struct MockTestImageCtx;
struct MockTestJournal : public MockJournal {
- typedef std::list<io::ObjectDispatchSpec*> ObjectRequests;
-
- MOCK_METHOD5(append_write_event, uint64_t(uint64_t, size_t,
- const bufferlist &,
- const ObjectRequests &, bool));
- MOCK_METHOD6(append_io_event_mock, uint64_t(const journal::EventEntry&,
- const ObjectRequests &,
+ MOCK_METHOD4(append_write_event, uint64_t(uint64_t, size_t,
+ const bufferlist &, bool));
+ MOCK_METHOD5(append_io_event_mock, uint64_t(const journal::EventEntry&,
uint64_t, size_t, bool, int));
uint64_t append_io_event(journal::EventEntry &&event_entry,
- const ObjectRequests &requests,
uint64_t offset, size_t length,
bool flush_entry, int filter_ret_val) {
// googlemock doesn't support move semantics
- return append_io_event_mock(event_entry, requests, offset, length,
- flush_entry, filter_ret_val);
+ return append_io_event_mock(event_entry, offset, length, flush_entry,
+ filter_ret_val);
}
MOCK_METHOD2(commit_io_event, void(uint64_t, int));
void inject_into_journal(librbd::ImageCtx *ictx, T event) {
C_SaferCond ctx;
librbd::journal::EventEntry event_entry(event);
- librbd::Journal<>::IOObjectRequests requests;
{
RWLock::RLocker owner_locker(ictx->owner_lock);
- uint64_t tid = ictx->journal->append_io_event(std::move(event_entry),
- requests, 0, 0, true, 0);
+ uint64_t tid = ictx->journal->append_io_event(std::move(event_entry),0, 0,
+ true, 0);
ictx->journal->wait_event(tid, &ctx);
}
ASSERT_EQ(0, ctx.wait());
bl.append_zero(length);
RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
- return mock_journal.append_write_event(0, length, bl, {}, false);
+ return mock_journal.append_write_event(0, length, bl, false);
}
uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx,
MockJournal &mock_journal,
- io::ObjectDispatchSpec *object_request,
int filter_ret_val) {
RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
- MockJournal::IOObjectRequests object_requests;
- if (object_request != nullptr) {
- object_requests.push_back(object_request);
- }
return mock_journal.append_io_event(
- journal::EventEntry{journal::AioFlushEvent{}}, object_requests, 0, 0,
- false, filter_ret_val);
+ journal::EventEntry{journal::AioFlushEvent{}}, 0, 0, false,
+ filter_ret_val);
}
void save_commit_context(Context *ctx) {
Context *on_journal_safe1;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe1);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, nullptr, 0));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, 0));
mock_journal.get_work_queue()->drain();
Context *on_journal_safe2;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe2);
- ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal, nullptr, 0));
+ ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal, 0));
mock_journal.get_work_queue()->drain();
// commit journal event followed by IO event (standard)
close_journal(mock_image_ctx, mock_journal, mock_journaler);
};
- C_SaferCond object_request_ctx;
- auto object_request = io::ObjectDispatchSpec::create_discard(
- &mock_image_ctx, io::OBJECT_DISPATCH_LAYER_NONE, "object0", 0, 0, 0, {}, 0,
- 0, {}, &object_request_ctx);
-
::journal::MockFuture mock_future;
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
- object_request, 0));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, 0));
mock_journal.get_work_queue()->drain();
// commit the event in the journal w/o waiting writeback
expect_future_committed(mock_journaler);
+ C_SaferCond object_request_ctx;
+ mock_journal.wait_event(1U, &object_request_ctx);
on_journal_safe->complete(-EINVAL);
ASSERT_EQ(-EINVAL, object_request_ctx.wait());
close_journal(mock_image_ctx, mock_journal, mock_journaler);
};
- C_SaferCond object_request_ctx;
- auto object_request = io::ObjectDispatchSpec::create_discard(
- &mock_image_ctx, io::OBJECT_DISPATCH_LAYER_NONE, "object0", 0, 0, 0, {}, 0,
- 0, {}, &object_request_ctx);
-
::journal::MockFuture mock_future;
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
- object_request, 0));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, 0));
mock_journal.get_work_queue()->drain();
expect_future_is_valid(mock_future);
// commit the event in the journal w/ waiting cache writeback
expect_future_committed(mock_journaler);
+ C_SaferCond object_request_ctx;
+ mock_journal.wait_event(1U, &object_request_ctx);
on_journal_safe->complete(-EINVAL);
ASSERT_EQ(-EINVAL, object_request_ctx.wait());
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, nullptr, 0));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, 0));
mock_journal.get_work_queue()->drain();
// failed IO remains uncommitted in journal
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
- ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, nullptr,
- -EILSEQ));
+ ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, -EILSEQ));
mock_journal.get_work_queue()->drain();
// filter failed IO committed in journal