// vim: ts=8 sw=2 smarttab
#include "librbd/AioImageRequestWQ.h"
+#include "common/errno.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequest.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
+#include "librbd/Utils.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name,
time_t ti, ThreadPool *tp)
: ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
- m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
- m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0) {
+ m_image_ctx(*image_ctx),
+ m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)),
+ m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0),
+ m_in_flight_ops(0), m_shutdown(false), m_on_shutdown(nullptr) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl;
}
}
void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
- char *buf, bufferlist *pbl, int op_flags, bool native_async) {
+ char *buf, bufferlist *pbl, int op_flags,
+ bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_READ);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_read: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << ", off=" << off << ", "
<< "len=" << len << ", " << "flags=" << op_flags << dendl;
- if (native_async && m_image_ctx.event_socket.is_valid())
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_op(c)) {
+ return;
+ }
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio) {
+ if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
} else {
AioImageRequest::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
+ finish_in_flight_op();
}
}
void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
- const char *buf, int op_flags, bool native_async) {
+ const char *buf, int op_flags,
+ bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_WRITE);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_write: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << ", off=" << off << ", "
<< "len=" << len << ", flags=" << op_flags << dendl;
- if (native_async && m_image_ctx.event_socket.is_valid())
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_op(c)) {
+ return;
+ }
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || is_journal_required() ||
queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags));
} else {
AioImageRequest::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
+ finish_in_flight_op();
}
}
<< "completion=" << c << ", off=" << off << ", len=" << len
<< dendl;
- if (native_async && m_image_ctx.event_socket.is_valid())
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_op(c)) {
+ return;
+ }
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || is_journal_required() ||
queue(new AioImageDiscard(m_image_ctx, c, off, len));
} else {
AioImageRequest::aio_discard(&m_image_ctx, c, off, len);
+ finish_in_flight_op();
}
}
ldout(cct, 20) << "aio_flush: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << dendl;
- if (native_async && m_image_ctx.event_socket.is_valid())
+ if (native_async && m_image_ctx.event_socket.is_valid()) {
c->set_event_notify(true);
+ }
+
+ if (!start_in_flight_op(c)) {
+ return;
+ }
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || is_journal_required() ||
queue(new AioImageFlush(m_image_ctx, c));
} else {
AioImageRequest::aio_flush(&m_image_ctx, c);
+ finish_in_flight_op();
+ }
+}
+
+void AioImageRequestWQ::shut_down(Context *on_shutdown) {
+ {
+ RWLock::WLocker locker(m_lock);
+ assert(!m_shutdown);
+ m_shutdown = true;
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.read()
+ << dendl;
+ if (m_in_flight_ops.read() > 0) {
+ m_on_shutdown = on_shutdown;
+ return;
+ }
}
+
+ on_shutdown->complete(0);
}
void AioImageRequestWQ::block_writes() {
CephContext *cct = m_image_ctx.cct;
{
- Mutex::Locker locker(m_lock);
+ RWLock::WLocker locker(m_lock);
++m_write_blockers;
ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
<< "num=" << m_write_blockers << dendl;
- if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) {
+ if (!m_write_blocker_contexts.empty() || m_in_progress_writes.read() > 0) {
m_write_blocker_contexts.push_back(on_blocked);
return;
}
bool wake_up = false;
{
- Mutex::Locker locker(m_lock);
+ RWLock::WLocker locker(m_lock);
assert(m_write_blockers > 0);
--m_write_blockers;
{
if (peek_item->is_write_op()) {
- Mutex::Locker locker(m_lock);
+ RWLock::RLocker locker(m_lock);
if (m_write_blockers > 0) {
return NULL;
}
- ++m_in_progress_writes;
+ m_in_progress_writes.inc();
}
}
bool writes_blocked = false;
{
- Mutex::Locker locker(m_lock);
+ RWLock::RLocker locker(m_lock);
if (req->is_write_op()) {
- assert(m_queued_writes > 0);
- --m_queued_writes;
+ assert(m_queued_writes.read() > 0);
+ m_queued_writes.dec();
- assert(m_in_progress_writes > 0);
- if (--m_in_progress_writes == 0 && !m_write_blocker_contexts.empty()) {
+ assert(m_in_progress_writes.read() > 0);
+ if (m_in_progress_writes.dec() == 0 &&
+ !m_write_blocker_contexts.empty()) {
writes_blocked = true;
}
}
m_image_ctx.flush(new C_BlockedWrites(this));
}
delete req;
+
+ finish_in_flight_op();
+}
+
+int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) {
+ RWLock::RLocker locker(m_lock);
+
+ if (m_shutdown) {
+ CephContext *cct = m_image_ctx.cct;
+ lderr(cct) << "IO received on closed image" << dendl;
+
+ c->get();
+ c->fail(cct, -ESHUTDOWN);
+ return false;
+ }
+
+ m_in_flight_ops.inc();
+ return true;
+}
+
+void AioImageRequestWQ::finish_in_flight_op() {
+ {
+ RWLock::RLocker locker(m_lock);
+ if (m_in_flight_ops.dec() > 0 || !m_shutdown) {
+ return;
+ }
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << __func__ << ": completing shut down" << dendl;
+
+ assert(m_on_shutdown != nullptr);
+ m_on_shutdown->complete(0);
}
bool AioImageRequestWQ::is_journal_required() const {
+ // TODO eliminate once journal startup state is integrated
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
return (m_image_ctx.journal != NULL);
}
<< "req=" << req << dendl;
assert(m_image_ctx.owner_lock.is_locked());
-
- {
- Mutex::Locker locker(m_lock);
- if (req->is_write_op()) {
- ++m_queued_writes;
- }
+ if (req->is_write_op()) {
+ m_queued_writes.inc();
}
+
ThreadPool::PointerWQ<AioImageRequest>::queue(req);
if (is_lock_required()) {
void AioImageRequestWQ::handle_blocked_writes(int r) {
Contexts contexts;
{
- Mutex::Locker locker(m_lock);
+ RWLock::WLocker locker(m_lock);
contexts.swap(m_write_blocker_contexts);
}