namespace librbd {
-AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name,
- time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<AioImageRequest<> >(name, ti, 0, tp),
+template <typename I>
+AioImageRequestWQ<I>::AioImageRequestWQ(I *image_ctx, const string &name,
+ time_t ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<AioImageRequest<I> >(name, ti, 0, tp),
m_image_ctx(*image_ctx),
m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)),
m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
tp->add_work_queue(this);
}
-ssize_t AioImageRequestWQ::read(uint64_t off, uint64_t len, char *buf,
- int op_flags) {
+template <typename I>
+ssize_t AioImageRequestWQ<I>::read(uint64_t off, uint64_t len, char *buf,
+ int op_flags) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "read: ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << dendl;
return cond.wait();
}
-ssize_t AioImageRequestWQ::write(uint64_t off, uint64_t len, const char *buf,
- int op_flags) {
+template <typename I>
+ssize_t AioImageRequestWQ<I>::write(uint64_t off, uint64_t len, const char *buf,
+ int op_flags) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "write: ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << dendl;
return len;
}
-int AioImageRequestWQ::discard(uint64_t off, uint64_t len) {
+template <typename I>
+int AioImageRequestWQ<I>::discard(uint64_t off, uint64_t len) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "discard: ictx=" << &m_image_ctx << ", off=" << off << ", "
<< "len = " << len << dendl;
return len;
}
-void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
- char *buf, bufferlist *pbl, int op_flags,
- bool native_async) {
+template <typename I>
+void AioImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
+ char *buf, bufferlist *pbl, int op_flags,
+ bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_READ);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_read: ictx=" << &m_image_ctx << ", "
queue(new AioImageRead<>(m_image_ctx, c, off, len, buf, pbl, op_flags));
} else {
c->start_op();
- AioImageRequest<>::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
+ AioImageRequest<I>::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
finish_in_flight_op();
}
}
-void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
- const char *buf, int op_flags,
- bool native_async) {
+template <typename I>
+void AioImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
+ const char *buf, int op_flags,
+ bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_WRITE);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_write: ictx=" << &m_image_ctx << ", "
queue(new AioImageWrite<>(m_image_ctx, c, off, len, buf, op_flags));
} else {
c->start_op();
- AioImageRequest<>::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
+ AioImageRequest<I>::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
finish_in_flight_op();
}
}
-void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
- uint64_t len, bool native_async) {
+template <typename I>
+void AioImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
+ uint64_t len, bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_DISCARD);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_discard: ictx=" << &m_image_ctx << ", "
queue(new AioImageDiscard<>(m_image_ctx, c, off, len));
} else {
c->start_op();
- AioImageRequest<>::aio_discard(&m_image_ctx, c, off, len);
+ AioImageRequest<I>::aio_discard(&m_image_ctx, c, off, len);
finish_in_flight_op();
}
}
-void AioImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
+template <typename I>
+void AioImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_FLUSH);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_flush: ictx=" << &m_image_ctx << ", "
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
queue(new AioImageFlush<>(m_image_ctx, c));
} else {
- AioImageRequest<>::aio_flush(&m_image_ctx, c);
+ AioImageRequest<I>::aio_flush(&m_image_ctx, c);
finish_in_flight_op();
}
}
-void AioImageRequestWQ::shut_down(Context *on_shutdown) {
+template <typename I>
+void AioImageRequestWQ<I>::shut_down(Context *on_shutdown) {
assert(m_image_ctx.owner_lock.is_locked());
{
m_image_ctx.flush(on_shutdown);
}
-bool AioImageRequestWQ::is_lock_request_needed() const {
+template <typename I>
+bool AioImageRequestWQ<I>::is_lock_request_needed() const {
RWLock::RLocker locker(m_lock);
return (m_queued_writes.read() > 0 ||
(m_require_lock_on_read && m_queued_reads.read() > 0));
}
-int AioImageRequestWQ::block_writes() {
+template <typename I>
+int AioImageRequestWQ<I>::block_writes() {
C_SaferCond cond_ctx;
block_writes(&cond_ctx);
return cond_ctx.wait();
}
-void AioImageRequestWQ::block_writes(Context *on_blocked) {
+template <typename I>
+void AioImageRequestWQ<I>::block_writes(Context *on_blocked) {
assert(m_image_ctx.owner_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
m_image_ctx.flush(on_blocked);
}
-void AioImageRequestWQ::unblock_writes() {
+template <typename I>
+void AioImageRequestWQ<I>::unblock_writes() {
CephContext *cct = m_image_ctx.cct;
bool wake_up = false;
}
if (wake_up) {
- signal();
+ this->signal();
}
}
-void AioImageRequestWQ::set_require_lock_on_read() {
+template <typename I>
+void AioImageRequestWQ<I>::set_require_lock_on_read() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << dendl;
m_require_lock_on_read = true;
}
-void AioImageRequestWQ::clear_require_lock_on_read() {
+template <typename I>
+void AioImageRequestWQ<I>::clear_require_lock_on_read() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << dendl;
m_require_lock_on_read = false;
}
- signal();
+ this->signal();
}
-void *AioImageRequestWQ::_void_dequeue() {
- AioImageRequest<> *peek_item = front();
+template <typename I>
+void *AioImageRequestWQ<I>::_void_dequeue() {
+ AioImageRequest<I> *peek_item = this->front();
// no IO ops available or refresh in-progress (IO stalled)
if (peek_item == nullptr || m_refresh_in_progress) {
}
}
- AioImageRequest<> *item = reinterpret_cast<AioImageRequest<> *>(
- ThreadPool::PointerWQ<AioImageRequest<> >::_void_dequeue());
+ AioImageRequest<I> *item = reinterpret_cast<AioImageRequest<I> *>(
+ ThreadPool::PointerWQ<AioImageRequest<I> >::_void_dequeue());
assert(peek_item == item);
if (refresh_required) {
// stall IO until the refresh completes
m_refresh_in_progress = true;
- get_pool_lock().Unlock();
+ this->get_pool_lock().Unlock();
m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
- get_pool_lock().Lock();
+ this->get_pool_lock().Lock();
return nullptr;
}
return item;
}
-void AioImageRequestWQ::process(AioImageRequest<> *req) {
+template <typename I>
+void AioImageRequestWQ<I>::process(AioImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
finish_in_flight_op();
}
-void AioImageRequestWQ::finish_queued_op(AioImageRequest<> *req) {
+template <typename I>
+void AioImageRequestWQ<I>::finish_queued_op(AioImageRequest<I> *req) {
RWLock::RLocker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes.read() > 0);
}
}
-void AioImageRequestWQ::finish_in_progress_write() {
+template <typename I>
+void AioImageRequestWQ<I>::finish_in_progress_write() {
bool writes_blocked = false;
{
RWLock::RLocker locker(m_lock);
}
}
-int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) {
+template <typename I>
+int AioImageRequestWQ<I>::start_in_flight_op(AioCompletion *c) {
RWLock::RLocker locker(m_lock);
if (m_shutdown) {
return true;
}
-void AioImageRequestWQ::finish_in_flight_op() {
+template <typename I>
+void AioImageRequestWQ<I>::finish_in_flight_op() {
{
RWLock::RLocker locker(m_lock);
if (m_in_flight_ops.dec() > 0 || !m_shutdown) {
m_image_ctx.flush(m_on_shutdown);
}
-bool AioImageRequestWQ::is_lock_required() const {
+template <typename I>
+bool AioImageRequestWQ<I>::is_lock_required() const {
assert(m_image_ctx.owner_lock.is_locked());
if (m_image_ctx.exclusive_lock == NULL) {
return false;
return (!m_image_ctx.exclusive_lock->is_lock_owner());
}
-void AioImageRequestWQ::queue(AioImageRequest<> *req) {
+template <typename I>
+void AioImageRequestWQ<I>::queue(AioImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
m_queued_reads.inc();
}
- ThreadPool::PointerWQ<AioImageRequest<> >::queue(req);
+ ThreadPool::PointerWQ<AioImageRequest<I> >::queue(req);
if (lock_required) {
m_image_ctx.exclusive_lock->request_lock(nullptr);
}
}
-void AioImageRequestWQ::handle_refreshed(int r, AioImageRequest<> *req) {
+template <typename I>
+void AioImageRequestWQ<I>::handle_refreshed(int r, AioImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 15) << "resuming IO after image refresh: r=" << r << ", "
<< "req=" << req << dendl;
if (r < 0) {
- process_finish();
+ this->process_finish();
req->fail(r);
finish_queued_op(req);
delete req;
} else {
// since IO was stalled for refresh -- original IO order is preserved
// if we requeue this op for work queue processing
- requeue(req);
+ this->requeue(req);
}
m_refresh_in_progress = false;
- signal();
+ this->signal();
// refresh might have enabled exclusive lock -- IO stalled until
// we acquire the lock
}
}
-void AioImageRequestWQ::handle_blocked_writes(int r) {
+template <typename I>
+void AioImageRequestWQ<I>::handle_blocked_writes(int r) {
Contexts contexts;
{
RWLock::WLocker locker(m_lock);
}
}
+template class librbd::AioImageRequestWQ<librbd::ImageCtx>;
+
} // namespace librbd
template <typename> class AioImageRequest;
class ImageCtx;
-class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest<ImageCtx> > {
+template <typename ImageCtxT = librbd::ImageCtx>
+class AioImageRequestWQ
+ : protected ThreadPool::PointerWQ<AioImageRequest<ImageCtxT> > {
public:
- AioImageRequestWQ(ImageCtx *image_ctx, const string &name, time_t ti,
+ AioImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
ThreadPool *tp);
ssize_t read(uint64_t off, uint64_t len, char *buf, int op_flags);
bool native_async=true);
void aio_flush(AioCompletion *c, bool native_async=true);
- using typename ThreadPool::PointerWQ<AioImageRequest<ImageCtx> >::drain;
- using typename ThreadPool::PointerWQ<AioImageRequest<ImageCtx> >::empty;
+ using typename ThreadPool::PointerWQ<AioImageRequest<ImageCtxT> >::drain;
+ using typename ThreadPool::PointerWQ<AioImageRequest<ImageCtxT> >::empty;
void shut_down(Context *on_shutdown);
protected:
virtual void *_void_dequeue();
- virtual void process(AioImageRequest<ImageCtx> *req);
+ virtual void process(AioImageRequest<ImageCtxT> *req);
private:
typedef std::list<Context *> Contexts;
struct C_RefreshFinish : public Context {
AioImageRequestWQ *aio_work_queue;
- AioImageRequest<ImageCtx> *aio_image_request;
+ AioImageRequest<ImageCtxT> *aio_image_request;
C_RefreshFinish(AioImageRequestWQ *aio_work_queue,
- AioImageRequest<ImageCtx> *aio_image_request)
+ AioImageRequest<ImageCtxT> *aio_image_request)
: aio_work_queue(aio_work_queue), aio_image_request(aio_image_request) {
}
virtual void finish(int r) override {
}
};
- ImageCtx &m_image_ctx;
+ ImageCtxT &m_image_ctx;
mutable RWLock m_lock;
Contexts m_write_blocker_contexts;
uint32_t m_write_blockers;
return (m_queued_writes.read() == 0);
}
- void finish_queued_op(AioImageRequest<ImageCtx> *req);
+ void finish_queued_op(AioImageRequest<ImageCtxT> *req);
void finish_in_progress_write();
int start_in_flight_op(AioCompletion *c);
void finish_in_flight_op();
- void queue(AioImageRequest<ImageCtx> *req);
+ void queue(AioImageRequest<ImageCtxT> *req);
- void handle_refreshed(int r, AioImageRequest<ImageCtx> *req);
+ void handle_refreshed(int r, AioImageRequest<ImageCtxT> *req);
void handle_blocked_writes(int r);
};
} // namespace librbd
+extern template class librbd::AioImageRequestWQ<librbd::ImageCtx>;
+
#endif // CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H