namespace librbd {
namespace io {
-ImageRequestWQ::ImageRequestWQ(ImageCtx *image_ctx, const string &name,
- time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<ImageRequest<> >(name, ti, 0, tp),
+template <typename I>
+ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
+ time_t ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<ImageRequest<I> >(name, ti, 0, tp),
m_image_ctx(*image_ctx),
- m_lock(util::unique_lock_name("ImageRequestWQ::m_lock", this)),
+ m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)),
m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
m_shutdown(false), m_on_shutdown(nullptr) {
tp->add_work_queue(this);
}
-ssize_t ImageRequestWQ::read(uint64_t off, uint64_t len,
- ReadResult &&read_result, int op_flags) {
+template <typename I>
+ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
+ ReadResult &&read_result, int op_flags) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << dendl;
return cond.wait();
}
-ssize_t ImageRequestWQ::write(uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags) {
+template <typename I>
+ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << dendl;
return len;
}
-ssize_t ImageRequestWQ::discard(uint64_t off, uint64_t len, bool skip_partial_discard) {
+template <typename I>
+ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
+ bool skip_partial_discard) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << dendl;
return len;
}
-ssize_t ImageRequestWQ::writesame(uint64_t off, uint64_t len, bufferlist &&bl,
- int op_flags) {
+template <typename I>
+ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << ", data_len " << bl.length() << dendl;
return len;
}
-void ImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
- ReadResult &&read_result, int op_flags,
- bool native_async) {
+template <typename I>
+void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
+ ReadResult &&read_result, int op_flags,
+ bool native_async) {
CephContext *cct = m_image_ctx.cct;
ZTracer::Trace trace;
if (cct->_conf->rbd_blkin_trace_all) {
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
lock_required) {
- queue(new ImageReadRequest<>(m_image_ctx, c, {{off, len}},
- std::move(read_result), op_flags, trace));
+ queue(new ImageReadRequest<I>(m_image_ctx, c, {{off, len}},
+ std::move(read_result), op_flags, trace));
} else {
c->start_op();
- ImageRequest<>::aio_read(&m_image_ctx, c, {{off, len}},
- std::move(read_result), op_flags, trace);
+ ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
+ std::move(read_result), op_flags, trace);
finish_in_flight_op();
}
trace.event("finish");
}
-void ImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags,
- bool native_async) {
+template <typename I>
+void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
+ bufferlist &&bl, int op_flags,
+ bool native_async) {
CephContext *cct = m_image_ctx.cct;
ZTracer::Trace trace;
if (cct->_conf->rbd_blkin_trace_all) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(new ImageWriteRequest<>(m_image_ctx, c, {{off, len}},
- std::move(bl), op_flags, trace));
+ queue(new ImageWriteRequest<I>(m_image_ctx, c, {{off, len}},
+ std::move(bl), op_flags, trace));
} else {
c->start_op();
- ImageRequest<>::aio_write(&m_image_ctx, c, {{off, len}},
- std::move(bl), op_flags, trace);
+ ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
+ std::move(bl), op_flags, trace);
finish_in_flight_op();
}
trace.event("finish");
}
-void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
- uint64_t len, bool skip_partial_discard,
- bool native_async) {
+template <typename I>
+void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
+ uint64_t len, bool skip_partial_discard,
+ bool native_async) {
CephContext *cct = m_image_ctx.cct;
ZTracer::Trace trace;
if (cct->_conf->rbd_blkin_trace_all) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(new ImageDiscardRequest<>(m_image_ctx, c, off, len,
- skip_partial_discard, trace));
+ queue(new ImageDiscardRequest<I>(m_image_ctx, c, off, len,
+ skip_partial_discard, trace));
} else {
c->start_op();
- ImageRequest<>::aio_discard(&m_image_ctx, c, off, len,
- skip_partial_discard, trace);
+ ImageRequest<I>::aio_discard(&m_image_ctx, c, off, len,
+ skip_partial_discard, trace);
finish_in_flight_op();
}
trace.event("finish");
}
-void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
+template <typename I>
+void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
CephContext *cct = m_image_ctx.cct;
ZTracer::Trace trace;
if (cct->_conf->rbd_blkin_trace_all) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
- queue(new ImageFlushRequest<>(m_image_ctx, c, trace));
+ queue(new ImageFlushRequest<I>(m_image_ctx, c, trace));
} else {
- ImageRequest<>::aio_flush(&m_image_ctx, c, trace);
+ ImageRequest<I>::aio_flush(&m_image_ctx, c, trace);
finish_in_flight_op();
}
trace.event("finish");
}
-void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags,
- bool native_async) {
+template <typename I>
+void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
+ uint64_t len, bufferlist &&bl,
+ int op_flags, bool native_async) {
CephContext *cct = m_image_ctx.cct;
ZTracer::Trace trace;
if (cct->_conf->rbd_blkin_trace_all) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
- queue(new ImageWriteSameRequest<>(m_image_ctx, c, off, len, std::move(bl),
- op_flags, trace));
+ queue(new ImageWriteSameRequest<I>(m_image_ctx, c, off, len, std::move(bl),
+ op_flags, trace));
} else {
c->start_op();
- ImageRequest<>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
- op_flags, trace);
+ ImageRequest<I>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
+ op_flags, trace);
finish_in_flight_op();
}
trace.event("finish");
}
-void ImageRequestWQ::shut_down(Context *on_shutdown) {
+template <typename I>
+void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
assert(m_image_ctx.owner_lock.is_locked());
{
m_image_ctx.flush(on_shutdown);
}
-bool ImageRequestWQ::is_lock_request_needed() const {
+template <typename I>
+bool ImageRequestWQ<I>::is_lock_request_needed() const {
RWLock::RLocker locker(m_lock);
return (m_queued_writes > 0 ||
(m_require_lock_on_read && m_queued_reads > 0));
}
-int ImageRequestWQ::block_writes() {
+template <typename I>
+int ImageRequestWQ<I>::block_writes() {
C_SaferCond cond_ctx;
block_writes(&cond_ctx);
return cond_ctx.wait();
}
-void ImageRequestWQ::block_writes(Context *on_blocked) {
+template <typename I>
+void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
assert(m_image_ctx.owner_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
m_image_ctx.flush(on_blocked);
}
-void ImageRequestWQ::unblock_writes() {
+template <typename I>
+void ImageRequestWQ<I>::unblock_writes() {
CephContext *cct = m_image_ctx.cct;
bool wake_up = false;
}
if (wake_up) {
- signal();
+ this->signal();
}
}
-void ImageRequestWQ::set_require_lock_on_read() {
+template <typename I>
+void ImageRequestWQ<I>::set_require_lock_on_read() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << dendl;
m_require_lock_on_read = true;
}
-void ImageRequestWQ::clear_require_lock_on_read() {
+template <typename I>
+void ImageRequestWQ<I>::clear_require_lock_on_read() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << dendl;
m_require_lock_on_read = false;
}
- signal();
+ this->signal();
}
-void *ImageRequestWQ::_void_dequeue() {
- ImageRequest<> *peek_item = front();
+template <typename I>
+void *ImageRequestWQ<I>::_void_dequeue() {
+ ImageRequest<I> *peek_item = this->front();
// no IO ops available or refresh in-progress (IO stalled)
if (peek_item == nullptr || m_refresh_in_progress) {
}
}
- ImageRequest<> *item = reinterpret_cast<ImageRequest<> *>(
- ThreadPool::PointerWQ<ImageRequest<> >::_void_dequeue());
+ ImageRequest<I> *item = reinterpret_cast<ImageRequest<I> *>(
+ ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue());
assert(peek_item == item);
if (refresh_required) {
// stall IO until the refresh completes
m_refresh_in_progress = true;
- get_pool_lock().Unlock();
+ this->get_pool_lock().Unlock();
m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
- get_pool_lock().Lock();
+ this->get_pool_lock().Lock();
return nullptr;
}
return item;
}
-void ImageRequestWQ::process(ImageRequest<> *req) {
+template <typename I>
+void ImageRequestWQ<I>::process(ImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
finish_in_flight_op();
}
-void ImageRequestWQ::finish_queued_op(ImageRequest<> *req) {
+template <typename I>
+void ImageRequestWQ<I>::finish_queued_op(ImageRequest<I> *req) {
RWLock::RLocker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes > 0);
}
}
-void ImageRequestWQ::finish_in_progress_write() {
+template <typename I>
+void ImageRequestWQ<I>::finish_in_progress_write() {
bool writes_blocked = false;
{
RWLock::RLocker locker(m_lock);
}
}
-int ImageRequestWQ::start_in_flight_op(AioCompletion *c) {
+template <typename I>
+int ImageRequestWQ<I>::start_in_flight_op(AioCompletion *c) {
RWLock::RLocker locker(m_lock);
if (m_shutdown) {
return true;
}
-void ImageRequestWQ::finish_in_flight_op() {
+template <typename I>
+void ImageRequestWQ<I>::finish_in_flight_op() {
Context *on_shutdown;
{
RWLock::RLocker locker(m_lock);
m_image_ctx.flush(on_shutdown);
}
-bool ImageRequestWQ::is_lock_required() const {
+template <typename I>
+bool ImageRequestWQ<I>::is_lock_required() const {
assert(m_image_ctx.owner_lock.is_locked());
if (m_image_ctx.exclusive_lock == NULL) {
return false;
return (!m_image_ctx.exclusive_lock->is_lock_owner());
}
-void ImageRequestWQ::queue(ImageRequest<> *req) {
+template <typename I>
+void ImageRequestWQ<I>::queue(ImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
m_queued_reads++;
}
- ThreadPool::PointerWQ<ImageRequest<> >::queue(req);
+ ThreadPool::PointerWQ<ImageRequest<I> >::queue(req);
if (lock_required) {
m_image_ctx.exclusive_lock->acquire_lock(nullptr);
}
}
-void ImageRequestWQ::handle_refreshed(int r, ImageRequest<> *req) {
+template <typename I>
+void ImageRequestWQ<I>::handle_refreshed(int r, ImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 15) << "resuming IO after image refresh: r=" << r << ", "
<< "req=" << req << dendl;
if (r < 0) {
- process_finish();
+ this->process_finish();
req->fail(r);
finish_queued_op(req);
delete req;
} else {
// since IO was stalled for refresh -- original IO order is preserved
// if we requeue this op for work queue processing
- requeue(req);
+ this->requeue(req);
}
m_refresh_in_progress = false;
- signal();
+ this->signal();
// refresh might have enabled exclusive lock -- IO stalled until
// we acquire the lock
}
}
-void ImageRequestWQ::handle_blocked_writes(int r) {
+template <typename I>
+void ImageRequestWQ<I>::handle_blocked_writes(int r) {
Contexts contexts;
{
RWLock::WLocker locker(m_lock);
}
}
+template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
+
} // namespace io
} // namespace librbd