namespace librbd {
- void AioCompletion::finish_adding_requests(CephContext *cct)
- {
- ldout(cct, 20) << "AioCompletion::finish_adding_requests " << (void*)this << " pending " << pending_count << dendl;
- unblock(cct);
- }
-
int AioCompletion::wait_for_complete() {
tracepoint(librbd, aio_wait_for_complete_enter, this);
lock.Lock();
void AioCompletion::finalize(CephContext *cct, ssize_t rval)
{
- ldout(cct, 20) << "AioCompletion::finalize() " << (void*)this << " rval " << rval << " read_buf " << (void*)read_buf
- << " read_bl " << (void*)read_bl << dendl;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << rval << ", "
+ << "read_buf=" << reinterpret_cast<void*>(read_buf) << ", "
+ << "real_bl=" << reinterpret_cast<void*>(read_bl) << dendl;
if (rval >= 0 && aio_type == AIO_TYPE_READ) {
// FIXME: make the destriper write directly into a buffer so
// that we avoid shuffling pointers and copying zeros around.
if (read_buf) {
assert(bl.length() == read_buf_len);
bl.copy(0, read_buf_len, read_buf);
- ldout(cct, 20) << "AioCompletion::finalize() copied resulting " << bl.length()
+ ldout(cct, 20) << "copied resulting " << bl.length()
<< " bytes to " << (void*)read_buf << dendl;
}
if (read_bl) {
- ldout(cct, 20) << "AioCompletion::finalize() moving resulting " << bl.length()
+ ldout(cct, 20) << "moving resulting " << bl.length()
<< " bytes to bl " << (void*)read_bl << dendl;
read_bl->claim(bl);
}
}
// note: possible for image to be closed after op marked finished
+ done = true;
if (async_op.started()) {
async_op.finish_op();
}
lock.Lock();
}
- done = true;
if (ictx && event_notify && ictx->event_socket.is_valid()) {
ictx->completed_reqs_lock.Lock();
ictx->completed_reqs.push_back(&m_xlist_item);
void AioCompletion::start_op(ImageCtx *i, aio_type_t t) {
init_time(i, t);
- if (!async_op.started()) {
+
+ Mutex::Locker locker(lock);
+ if (!done && !async_op.started()) {
async_op.start_op(*ictx);
}
}
void AioCompletion::fail(CephContext *cct, int r)
{
- lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r)
+ lderr(cct) << this << " " << __func__ << ": " << cpp_strerror(r)
<< dendl;
lock.Lock();
assert(pending_count == 0);
put_unlock();
}
+ void AioCompletion::set_request_count(CephContext *cct, uint32_t count) {
+ ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl;
+ lock.Lock();
+ assert(pending_count == 0);
+ pending_count = count;
+ lock.Unlock();
+
+ // if no pending requests, completion will fire now
+ unblock(cct);
+ }
+
void AioCompletion::complete_request(CephContext *cct, ssize_t r)
{
- ldout(cct, 20) << "AioCompletion::complete_request() "
- << (void *)this << " complete_cb=" << (void *)complete_cb
- << " pending " << pending_count << dendl;
lock.Lock();
if (rval >= 0) {
if (r < 0 && r != -EEXIST)
}
assert(pending_count);
int count = --pending_count;
+
+ ldout(cct, 20) << this << " " << __func__ << ": cb=" << complete_cb << ", "
+ << "pending=" << pending_count << dendl;
if (!count && blockers == 0) {
finalize(cct, rval);
complete(cct);
*
* The retrying of individual requests is handled at a lower level,
* so all AioCompletion cares about is the count of outstanding
- * requests. Note that this starts at 1 to prevent the reference
- * count from reaching 0 while more requests are being added. When
- * all requests have been added, finish_adding_requests() releases
- * this initial reference.
+ * requests. The number of expected individual requests should be
+ * set initially using set_request_count() prior to issuing the
+ * requests. This ensures that the completion will not be completed
+ * within the caller's thread of execution (instead via a librados
+ * context or via a thread pool context for cache read hits).
*/
struct AioCompletion {
Mutex lock;
callback_t complete_cb;
void *complete_arg;
rbd_completion_t rbd_comp;
- int pending_count; ///< number of requests
+ uint32_t pending_count; ///< number of requests
uint32_t blockers;
int ref;
bool released;
int wait_for_complete();
- void add_request() {
- lock.Lock();
- pending_count++;
- lock.Unlock();
- get();
- }
-
void finalize(CephContext *cct, ssize_t rval);
- void finish_adding_requests(CephContext *cct);
-
void init_time(ImageCtx *i, aio_type_t t);
void start_op(ImageCtx *i, aio_type_t t);
void fail(CephContext *cct, int r);
complete_arg = cb_arg;
}
+ void set_request_count(CephContext *cct, uint32_t num);
+ void add_request() {
+ lock.Lock();
+ assert(pending_count > 0);
+ lock.Unlock();
+ get();
+ }
void complete_request(CephContext *cct, ssize_t r);
void associate_journal_event(uint64_t tid);
m_aio_comp->read_buf_len = buffer_ofs;
m_aio_comp->read_bl = m_pbl;
- for (map<object_t,vector<ObjectExtent> >::iterator p = object_extents.begin();
- p != object_extents.end(); ++p) {
- for (vector<ObjectExtent>::iterator q = p->second.begin();
- q != p->second.end(); ++q) {
- ldout(cct, 20) << " oid " << q->oid << " " << q->offset << "~"
- << q->length << " from " << q->buffer_extents
+ // pre-calculate the expected number of read requests
+ uint32_t request_count = 0;
+ for (auto &object_extent : object_extents) {
+ request_count += object_extent.second.size();
+ }
+ m_aio_comp->set_request_count(cct, request_count);
+
+ // issue the requests
+ for (auto &object_extent : object_extents) {
+ for (auto &extent : object_extent.second) {
+ ldout(cct, 20) << " oid " << extent.oid << " " << extent.offset << "~"
+ << extent.length << " from " << extent.buffer_extents
<< dendl;
C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp);
- AioObjectRead *req = new AioObjectRead(&m_image_ctx, q->oid.name,
- q->objectno, q->offset, q->length,
- q->buffer_extents, snap_id, true,
- req_comp, m_op_flags);
+ AioObjectRead *req = new AioObjectRead(&m_image_ctx, extent.oid.name,
+ extent.objectno, extent.offset,
+ extent.length,
+ extent.buffer_extents, snap_id,
+ true, req_comp, m_op_flags);
req_comp->set_req(req);
if (m_image_ctx.object_cacher) {
C_CacheRead *cache_comp = new C_CacheRead(&m_image_ctx, req);
- m_image_ctx.aio_read_from_cache(q->oid, q->objectno, &req->data(),
- q->length, q->offset,
- cache_comp, m_op_flags);
+ m_image_ctx.aio_read_from_cache(extent.oid, extent.objectno,
+ &req->data(), extent.length,
+ extent.offset, cache_comp, m_op_flags);
} else {
req->send();
}
}
}
- m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
m_image_ctx.perfcounter->inc(l_librbd_rd);
assert(!m_image_ctx.image_watcher->is_lock_supported() ||
m_image_ctx.image_watcher->is_lock_owner());
+ m_aio_comp->set_request_count(
+ m_image_ctx.cct, object_extents.size() +
+ get_cache_request_count(journaling));
+
AioObjectRequests requests;
send_object_requests(object_extents, snapc, (journaling ? &requests : NULL));
send_cache_requests(object_extents, journal_tid);
}
update_stats(clip_len);
-
- m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
}
return tid;
}
+uint32_t AioImageDiscard::get_cache_request_count(bool journaling) const {
+ // extra completion request is required for tracking journal commit
+ return (journaling ? 1 : 0);
+}
+
void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents,
uint64_t journal_tid) {
if (journal_tid == 0) {
void AioImageFlush::send_request() {
CephContext *cct = m_image_ctx.cct;
+ bool journaling = false;
{
- // journal the flush event
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
- if (m_image_ctx.journal != NULL &&
- !m_image_ctx.journal->is_journal_replaying()) {
- uint64_t journal_tid = m_image_ctx.journal->append_io_event(
- m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
- AioObjectRequests(), 0, 0, false);
-
- C_FlushJournalCommit *ctx = new C_FlushJournalCommit(m_image_ctx,
- m_aio_comp,
- journal_tid);
- m_image_ctx.journal->flush_event(journal_tid, ctx);
- m_aio_comp->associate_journal_event(journal_tid);
- }
+ journaling = (m_image_ctx.journal != NULL &&
+ !m_image_ctx.journal->is_journal_replaying());
+ }
+
+ m_aio_comp->set_request_count(cct, journaling ? 2 : 1);
+
+ if (journaling) {
+ // in-flight ops are flushed prior to closing the journal
+ uint64_t journal_tid = m_image_ctx.journal->append_io_event(
+ m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
+ AioObjectRequests(), 0, 0, false);
+
+ C_FlushJournalCommit *ctx = new C_FlushJournalCommit(m_image_ctx,
+ m_aio_comp,
+ journal_tid);
+ m_image_ctx.journal->flush_event(journal_tid, ctx);
+ m_aio_comp->associate_journal_event(journal_tid);
}
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
m_image_ctx.flush(req_comp);
m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
- m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
m_image_ctx.perfcounter->inc(l_librbd_aio_flush);