time_t ti, ThreadPool *tp)
: ThreadPool::PointerWQ<AioImageRequest<I> >(name, ti, 0, tp),
m_image_ctx(*image_ctx),
- m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)),
- m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
- m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
- m_shutdown(false), m_on_shutdown(nullptr) {
+ m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl;
tp->add_work_queue(this);
c->set_event_notify(true);
}
- if (!start_in_flight_op(c)) {
+ if (!start_in_flight_io(c)) {
return;
}
} else {
c->start_op();
AioImageRequest<I>::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
- finish_in_flight_op();
+ finish_in_flight_io();
}
}
c->set_event_notify(true);
}
- if (!start_in_flight_op(c)) {
+ if (!start_in_flight_io(c)) {
return;
}
} else {
c->start_op();
AioImageRequest<I>::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
- finish_in_flight_op();
+ finish_in_flight_io();
}
}
c->set_event_notify(true);
}
- if (!start_in_flight_op(c)) {
+ if (!start_in_flight_io(c)) {
return;
}
} else {
c->start_op();
AioImageRequest<I>::aio_discard(&m_image_ctx, c, off, len);
- finish_in_flight_op();
+ finish_in_flight_io();
}
}
c->set_event_notify(true);
}
- if (!start_in_flight_op(c)) {
+ if (!start_in_flight_io(c)) {
return;
}
queue(new AioImageFlush<>(m_image_ctx, c));
} else {
AioImageRequest<I>::aio_flush(&m_image_ctx, c);
- finish_in_flight_op();
+ finish_in_flight_io();
}
}
m_shutdown = true;
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.read()
+ ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.read()
<< dendl;
- if (m_in_flight_ops.read() > 0) {
+ if (m_in_flight_ios.read() > 0) {
m_on_shutdown = on_shutdown;
return;
}
++m_write_blockers;
ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
<< "num=" << m_write_blockers << dendl;
- if (!m_write_blocker_contexts.empty() || m_in_progress_writes.read() > 0) {
+ if (!m_write_blocker_contexts.empty() || m_in_flight_writes.read() > 0) {
m_write_blocker_contexts.push_back(on_blocked);
return;
}
// refresh will requeue the op -- don't count it as in-progress
if (!refresh_required) {
- m_in_progress_writes.inc();
+ m_in_flight_writes.inc();
}
} else if (m_require_lock_on_read) {
return nullptr;
req->send();
}
- finish_queued_op(req);
+ finish_queued_io(req);
if (req->is_write_op()) {
- finish_in_progress_write();
+ finish_in_flight_write();
}
delete req;
- finish_in_flight_op();
+ finish_in_flight_io();
}
template <typename I>
-void AioImageRequestWQ<I>::finish_queued_op(AioImageRequest<I> *req) {
+void AioImageRequestWQ<I>::finish_queued_io(AioImageRequest<I> *req) {
RWLock::RLocker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes.read() > 0);
}
template <typename I>
-void AioImageRequestWQ<I>::finish_in_progress_write() {
+void AioImageRequestWQ<I>::finish_in_flight_write() {
bool writes_blocked = false;
{
RWLock::RLocker locker(m_lock);
- assert(m_in_progress_writes.read() > 0);
- if (m_in_progress_writes.dec() == 0 &&
+ assert(m_in_flight_writes.read() > 0);
+ if (m_in_flight_writes.dec() == 0 &&
!m_write_blocker_contexts.empty()) {
writes_blocked = true;
}
}
template <typename I>
-int AioImageRequestWQ<I>::start_in_flight_op(AioCompletion *c) {
+int AioImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
RWLock::RLocker locker(m_lock);
if (m_shutdown) {
return false;
}
- m_in_flight_ops.inc();
+ m_in_flight_ios.inc();
return true;
}
template <typename I>
-void AioImageRequestWQ<I>::finish_in_flight_op() {
+void AioImageRequestWQ<I>::finish_in_flight_io() {
{
RWLock::RLocker locker(m_lock);
- if (m_in_flight_ops.dec() > 0 || !m_shutdown) {
+ if (m_in_flight_ios.dec() > 0 || !m_shutdown) {
return;
}
}
lderr(cct) << "op requires exclusive lock" << dendl;
req->fail(-EROFS);
delete req;
- finish_in_flight_op();
+ finish_in_flight_io();
return;
}
if (r < 0) {
this->process_finish();
req->fail(r);
- finish_queued_op(req);
+ finish_queued_io(req);
delete req;
- finish_in_flight_op();
+ finish_in_flight_io();
} else {
// since IO was stalled for refresh -- original IO order is preserved
// if we requeue this op for work queue processing
ImageCtxT &m_image_ctx;
mutable RWLock m_lock;
Contexts m_write_blocker_contexts;
- uint32_t m_write_blockers;
+ uint32_t m_write_blockers = 0;
bool m_require_lock_on_read = false;
- atomic_t m_in_progress_writes;
- atomic_t m_queued_reads;
- atomic_t m_queued_writes;
- atomic_t m_in_flight_ops;
+ atomic_t m_in_flight_writes {0};
+ atomic_t m_queued_reads {0};
+ atomic_t m_queued_writes {0};
+ atomic_t m_in_flight_ios {0};
- bool m_refresh_in_progress;
+ bool m_refresh_in_progress = false;
- bool m_shutdown;
- Context *m_on_shutdown;
+ bool m_shutdown = false;
+ Context *m_on_shutdown = nullptr;
inline bool writes_empty() const {
RWLock::RLocker locker(m_lock);
return (m_queued_writes.read() == 0);
}
- void finish_queued_op(AioImageRequest<ImageCtxT> *req);
- void finish_in_progress_write();
+ void finish_queued_io(AioImageRequest<ImageCtxT> *req);
+ void finish_in_flight_write();
- int start_in_flight_op(AioCompletion *c);
- void finish_in_flight_op();
+ int start_in_flight_io(AioCompletion *c);
+ void finish_in_flight_io();
void queue(AioImageRequest<ImageCtxT> *req);