return 0;
}
- void AioCompletion::finalize(CephContext *cct, ssize_t rval)
+ void AioCompletion::finalize(ssize_t rval)
{
+ assert(lock.is_locked());
+ assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
ldout(cct, 20) << this << " " << __func__ << ": r=" << rval << ", "
<< "read_buf=" << reinterpret_cast<void*>(read_buf) << ", "
<< "real_bl=" << reinterpret_cast<void*>(read_bl) << dendl;
}
}
- void AioCompletion::complete(CephContext *cct) {
+ void AioCompletion::complete() {
+ assert(lock.is_locked());
+ assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
tracepoint(librbd, aio_complete_enter, this, rval);
utime_t elapsed;
- assert(lock.is_locked());
elapsed = ceph_clock_now(cct) - start_time;
switch (aio_type) {
case AIO_TYPE_OPEN:
}
void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
- if (ictx == NULL) {
+ Mutex::Locker locker(lock);
+ if (ictx == nullptr) {
ictx = i;
aio_type = t;
start_time = ceph_clock_now(ictx->cct);
}
}
- void AioCompletion::start_op(ImageCtx *i, aio_type_t t) {
- init_time(i, t);
-
+ void AioCompletion::start_op() {
Mutex::Locker locker(lock);
- if (state == STATE_PENDING && !async_op.started()) {
+ assert(ictx != nullptr);
+ assert(!async_op.started());
+ if (state == STATE_PENDING) {
async_op.start_op(*ictx);
}
}
- void AioCompletion::fail(CephContext *cct, int r)
+ void AioCompletion::fail(int r)
{
+ lock.Lock();
+ assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
lderr(cct) << this << " " << __func__ << ": " << cpp_strerror(r)
<< dendl;
- lock.Lock();
assert(pending_count == 0);
rval = r;
- complete(cct);
+ complete();
put_unlock();
}
- void AioCompletion::set_request_count(CephContext *cct, uint32_t count) {
- ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl;
+ void AioCompletion::set_request_count(uint32_t count) {
lock.Lock();
+ assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
+ ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl;
assert(pending_count == 0);
pending_count = count;
lock.Unlock();
// if no pending requests, completion will fire now
- unblock(cct);
+ unblock();
}
- void AioCompletion::complete_request(CephContext *cct, ssize_t r)
+ void AioCompletion::complete_request(ssize_t r)
{
lock.Lock();
+ assert(ictx != nullptr);
+ CephContext *cct = ictx->cct;
+
if (rval >= 0) {
if (r < 0 && r != -EEXIST)
rval = r;
ldout(cct, 20) << this << " " << __func__ << ": cb=" << complete_cb << ", "
<< "pending=" << pending_count << dendl;
if (!count && blockers == 0) {
- finalize(cct, rval);
- complete(cct);
+ finalize(rval);
+ complete();
}
put_unlock();
}
void C_AioRead::finish(int r)
{
- ldout(m_cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl;
+ m_completion->lock.Lock();
+ CephContext *cct = m_completion->ictx->cct;
+ ldout(cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl;
+
if (r >= 0 || r == -ENOENT) { // this was a sparse_read operation
- ldout(m_cct, 10) << " got " << m_req->m_ext_map
- << " for " << m_req->m_buffer_extents
- << " bl " << m_req->data().length() << dendl;
+ ldout(cct, 10) << " got " << m_req->m_ext_map
+ << " for " << m_req->m_buffer_extents
+ << " bl " << m_req->data().length() << dendl;
// reads from the parent don't populate the m_ext_map and the overlap
// may not be the full buffer. compensate here by filling in m_ext_map
// with the read extent when it is empty.
if (m_req->m_ext_map.empty())
m_req->m_ext_map[m_req->m_object_off] = m_req->data().length();
- m_completion->lock.Lock();
m_completion->destriper.add_partial_sparse_result(
- m_cct, m_req->data(), m_req->m_ext_map, m_req->m_object_off,
+ cct, m_req->data(), m_req->m_ext_map, m_req->m_object_off,
m_req->m_buffer_extents);
- m_completion->lock.Unlock();
r = m_req->m_object_len;
}
+ m_completion->lock.Unlock();
+
C_AioRequest::finish(r);
}
int wait_for_complete();
- void finalize(CephContext *cct, ssize_t rval);
+ void finalize(ssize_t rval);
void init_time(ImageCtx *i, aio_type_t t);
- void start_op(ImageCtx *i, aio_type_t t);
- void fail(CephContext *cct, int r);
+ void start_op();
+ void fail(int r);
- void complete(CephContext *cct);
+ void complete();
void set_complete_cb(void *cb_arg, callback_t cb) {
complete_cb = cb;
complete_arg = cb_arg;
}
- void set_request_count(CephContext *cct, uint32_t num);
+ void set_request_count(uint32_t num);
void add_request() {
lock.Lock();
assert(pending_count > 0);
lock.Unlock();
get();
}
- void complete_request(CephContext *cct, ssize_t r);
+ void complete_request(ssize_t r);
void associate_journal_event(uint64_t tid);
Mutex::Locker l(lock);
++blockers;
}
- void unblock(CephContext *cct) {
+ void unblock() {
Mutex::Locker l(lock);
assert(blockers > 0);
--blockers;
if (pending_count == 0 && blockers == 0) {
- finalize(cct, rval);
- complete(cct);
+ finalize(rval);
+ complete();
}
}
class C_AioRequest : public Context {
public:
- C_AioRequest(CephContext *cct, AioCompletion *completion)
- : m_cct(cct), m_completion(completion) {
+ C_AioRequest(AioCompletion *completion) : m_completion(completion) {
m_completion->add_request();
}
virtual ~C_AioRequest() {}
virtual void finish(int r) {
- m_completion->complete_request(m_cct, r);
+ m_completion->complete_request(r);
}
protected:
- CephContext *m_cct;
AioCompletion *m_completion;
};
class C_AioRead : public C_AioRequest {
public:
- C_AioRead(CephContext *cct, AioCompletion *completion)
- : C_AioRequest(cct, completion), m_req(NULL) {
+ C_AioRead(AioCompletion *completion)
+ : C_AioRequest(completion), m_req(nullptr) {
}
virtual ~C_AioRead() {}
virtual void finish(int r);
Mutex::Locker cache_locker(image_ctx.cache_lock);
image_ctx.object_cacher->discard_set(image_ctx.object_set, object_extents);
- aio_comp->complete_request(cct, r);
+ aio_comp->complete_request(r);
}
};
CephContext *cct = image_ctx.cct;
ldout(cct, 20) << this << " C_FlushJournalCommit: journal committed"
<< dendl;
- aio_comp->complete_request(cct, r);
+ aio_comp->complete_request(r);
}
};
I *ictx, AioCompletion *c,
const std::vector<std::pair<uint64_t,uint64_t> > &extents,
char *buf, bufferlist *pbl, int op_flags) {
+ c->init_time(ictx, librbd::AIO_TYPE_READ);
AioImageRead req(*ictx, c, extents, buf, pbl, op_flags);
req.send();
}
void AioImageRequest<I>::aio_read(I *ictx, AioCompletion *c,
uint64_t off, size_t len, char *buf,
bufferlist *pbl, int op_flags) {
+ c->init_time(ictx, librbd::AIO_TYPE_READ);
AioImageRead req(*ictx, c, off, len, buf, pbl, op_flags);
req.send();
}
void AioImageRequest<I>::aio_write(I *ictx, AioCompletion *c,
uint64_t off, size_t len, const char *buf,
int op_flags) {
+ c->init_time(ictx, librbd::AIO_TYPE_WRITE);
AioImageWrite req(*ictx, c, off, len, buf, op_flags);
req.send();
}
template <typename I>
void AioImageRequest<I>::aio_discard(I *ictx, AioCompletion *c,
uint64_t off, uint64_t len) {
+ c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
AioImageDiscard req(*ictx, c, off, len);
req.send();
}
template <typename I>
void AioImageRequest<I>::aio_flush(I *ictx, AioCompletion *c) {
+ c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
AioImageFlush req(*ictx, c);
req.send();
}
template <typename I>
void AioImageRequest<I>::fail(int r) {
m_aio_comp->get();
- m_aio_comp->fail(m_image_ctx.cct, r);
+ m_aio_comp->fail(r);
}
void AioImageRead::send_request() {
uint64_t len = p->second;
int r = clip_io(&m_image_ctx, p->first, &len);
if (r < 0) {
- m_aio_comp->fail(cct, r);
+ m_aio_comp->fail(r);
return;
}
if (len == 0) {
buffer_ofs += len;
}
- m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_READ);
+ m_aio_comp->start_op();
}
m_aio_comp->read_buf = m_buf;
for (auto &object_extent : object_extents) {
request_count += object_extent.second.size();
}
- m_aio_comp->set_request_count(cct, request_count);
+ m_aio_comp->set_request_count(request_count);
// issue the requests
for (auto &object_extent : object_extents) {
<< extent.length << " from " << extent.buffer_extents
<< dendl;
- C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp);
+ C_AioRead *req_comp = new C_AioRead(m_aio_comp);
AioObjectRead *req = new AioObjectRead(&m_image_ctx, extent.oid.name,
extent.objectno, extent.offset,
extent.length,
// pending async operation
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
- m_aio_comp->fail(cct, -EROFS);
+ m_aio_comp->fail(-EROFS);
return;
}
int r = clip_io(&m_image_ctx, m_off, &clip_len);
if (r < 0) {
- m_aio_comp->fail(cct, r);
+ m_aio_comp->fail(r);
return;
}
snapc = m_image_ctx.snapc;
- m_aio_comp->start_op(&m_image_ctx, get_aio_type());
+ m_aio_comp->start_op();
// map to object extents
if (clip_len > 0) {
if (!object_extents.empty()) {
uint64_t journal_tid = 0;
m_aio_comp->set_request_count(
- cct, object_extents.size() + get_cache_request_count(journaling));
+ object_extents.size() + get_cache_request_count(journaling));
AioObjectRequests requests;
send_object_requests(object_extents, snapc,
}
} else {
// no IO to perform -- fire completion
- m_aio_comp->unblock(cct);
+ m_aio_comp->unblock();
}
update_stats(clip_len);
p != object_extents.end(); ++p) {
ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
<< " from " << p->buffer_extents << dendl;
- C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ C_AioRequest *req_comp = new C_AioRequest(m_aio_comp);
AioObjectRequest *request = create_object_request(*p, snapc, req_comp);
// if journaling, stash the request for later; otherwise send
void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
uint64_t journal_tid) {
- CephContext *cct = m_image_ctx.cct;
for (ObjectExtents::const_iterator p = object_extents.begin();
p != object_extents.end(); ++p) {
const ObjectExtent &object_extent = *p;
bufferlist bl;
assemble_extent(object_extent, &bl);
- C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ C_AioRequest *req_comp = new C_AioRequest(m_aio_comp);
m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
object_extent.offset, req_comp, m_op_flags,
journal_tid);
}
void AioImageFlush::send_request() {
- CephContext *cct = m_image_ctx.cct;
-
bool journaling = false;
{
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
!m_image_ctx.journal->is_journal_replaying());
}
- m_aio_comp->set_request_count(cct, journaling ? 2 : 1);
+ m_aio_comp->set_request_count(journaling ? 2 : 1);
if (journaling) {
// in-flight ops are flushed prior to closing the journal
m_aio_comp->associate_journal_event(journal_tid);
}
- C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ C_AioRequest *req_comp = new C_AioRequest(m_aio_comp);
m_image_ctx.flush(req_comp);
- m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
+ m_aio_comp->start_op();
m_aio_comp->put();
m_image_ctx.perfcounter->inc(l_librbd_aio_flush);
m_synchronous(false) {
}
- virtual aio_type_t get_aio_type() const = 0;
-
virtual void send_request();
virtual uint32_t get_cache_request_count(bool journaling) const {
}
protected:
- virtual aio_type_t get_aio_type() const {
- return AIO_TYPE_WRITE;
- }
virtual const char *get_request_type() const {
return "aio_write";
}
}
protected:
- virtual aio_type_t get_aio_type() const {
- return AIO_TYPE_DISCARD;
- }
virtual const char *get_request_type() const {
return "aio_discard";
}
lderr(cct) << "IO received on closed image" << dendl;
c->get();
- c->fail(cct, -ESHUTDOWN);
+ c->fail(-ESHUTDOWN);
return false;
}
// release reference to the parent read completion. this request
// might be completed after unblock is invoked.
AioCompletion *parent_completion = m_parent_completion;
- parent_completion->unblock(m_ictx->cct);
+ parent_completion->unblock();
parent_completion->put();
}
}
}
if (r < 0) {
*ictxp = nullptr;
- comp->fail(ictx->cct, r);
+ comp->fail(r);
} else {
*ictxp = ictx;
comp->lock.Lock();
- comp->complete(ictx->cct);
+ comp->complete();
comp->put_unlock();
}
}
virtual void finish(int r) {
ldout(cct, 20) << "C_CloseComplete::finish: r=" << r << dendl;
if (r < 0) {
- comp->fail(cct, r);
+ comp->fail(r);
} else {
comp->lock.Lock();
- comp->complete(cct);
+ comp->complete();
comp->put_unlock();
}
}
}
ACTION_P2(CompleteAioCompletion, r, image_ctx) {
- CephContext *cct = image_ctx->cct;
- image_ctx->op_work_queue->queue(new FunctionContext([cct, arg0](int r) {
+ image_ctx->op_work_queue->queue(new FunctionContext([this, arg0](int r) {
arg0->get();
- arg0->set_request_count(cct, 1);
- arg0->complete_request(cct, r);
+ arg0->init_time(image_ctx, librbd::AIO_TYPE_NONE);
+ arg0->set_request_count(1);
+ arg0->complete_request(r);
}), r);
}
void when_complete(MockReplayImageCtx &mock_image_ctx, AioCompletion *aio_comp,
int r) {
aio_comp->get();
- aio_comp->set_request_count(mock_image_ctx.cct, 1);
- aio_comp->complete_request(mock_image_ctx.cct, r);
+ aio_comp->init_time(mock_image_ctx.image_ctx, librbd::AIO_TYPE_NONE);
+ aio_comp->set_request_count(1);
+ aio_comp->complete_request(r);
}
int when_flush(MockJournalReplay &mock_journal_replay) {
expect_op_work_queue(mock_image_ctx);
InSequence seq;
- AioCompletion *aio_comp;
+ AioCompletion *aio_comp = nullptr;
C_SaferCond on_ready;
C_SaferCond on_safe;
expect_aio_discard(mock_aio_image_request, &aio_comp, 123, 456);
bl.append_zero(length);
RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
- return mock_journal.append_write_event(nullptr, 0, length, bl, {}, false);
+ return mock_journal.append_write_event(0, length, bl, {}, false);
}
uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx,