namespace librbd {
+template <typename I>
+struct AioImageRequestWQ<I>::C_AcquireLock : public Context {
+ AioImageRequestWQ *work_queue;
+ AioImageRequest<I> *image_request;
+
+ C_AcquireLock(AioImageRequestWQ *work_queue, AioImageRequest<I> *image_request)
+ : work_queue(work_queue), image_request(image_request) {
+ }
+
+ void finish(int r) override {
+ work_queue->handle_acquire_lock(r, image_request);
+ }
+};
+
+template <typename I>
+struct AioImageRequestWQ<I>::C_BlockedWrites : public Context {
+ AioImageRequestWQ *work_queue;
+ C_BlockedWrites(AioImageRequestWQ *_work_queue)
+ : work_queue(_work_queue) {
+ }
+
+ void finish(int r) override {
+ work_queue->handle_blocked_writes(r);
+ }
+};
+
+template <typename I>
+struct AioImageRequestWQ<I>::C_RefreshFinish : public Context {
+ AioImageRequestWQ *work_queue;
+ AioImageRequest<I> *image_request;
+
+ C_RefreshFinish(AioImageRequestWQ *work_queue,
+ AioImageRequest<I> *image_request)
+ : work_queue(work_queue), image_request(image_request) {
+ }
+ void finish(int r) override {
+ work_queue->handle_refreshed(r, image_request);
+ }
+};
+
template <typename I>
AioImageRequestWQ<I>::AioImageRequestWQ(I *image_ctx, const string &name,
time_t ti, ThreadPool *tp)
return;
}
- RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-
// if journaling is enabled -- we need to replay the journal because
// it might contain an uncommitted write
- bool lock_required;
- {
- RWLock::RLocker locker(m_lock);
- lock_required = m_require_lock_on_read;
- }
-
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
- lock_required) {
+ require_lock_on_read()) {
queue(new AioImageRead<>(m_image_ctx, c, off, len, buf, pbl, op_flags));
} else {
c->start_op();
m_image_ctx.flush(on_shutdown);
}
-template <typename I>
-bool AioImageRequestWQ<I>::is_lock_request_needed() const {
- RWLock::RLocker locker(m_lock);
- return (m_queued_writes.read() > 0 ||
- (m_require_lock_on_read && m_queued_reads.read() > 0));
-}
-
template <typename I>
int AioImageRequestWQ<I>::block_writes() {
C_SaferCond cond_ctx;
template <typename I>
void *AioImageRequestWQ<I>::_void_dequeue() {
+ CephContext *cct = m_image_ctx.cct;
AioImageRequest<I> *peek_item = this->front();
- // no IO ops available or refresh in-progress (IO stalled)
- if (peek_item == nullptr || m_refresh_in_progress) {
+ // no queued IO requests or all IO is blocked/stalled
+ if (peek_item == nullptr || m_io_blockers.read() > 0) {
return nullptr;
}
+ bool lock_required;
bool refresh_required = m_image_ctx.state->is_refresh_required();
{
RWLock::RLocker locker(m_lock);
- if (peek_item->is_write_op()) {
- if (m_write_blockers > 0) {
- return nullptr;
- }
-
- // refresh will requeue the op -- don't count it as in-progress
- if (!refresh_required) {
- m_in_flight_writes.inc();
- }
- } else if (m_require_lock_on_read) {
- return nullptr;
+ bool write_op = peek_item->is_write_op();
+ lock_required = is_lock_required(write_op);
+ if (write_op && !lock_required && !refresh_required) {
+ // completed ops will requeue the IO -- don't count it as in-progress
+ m_in_flight_writes.inc();
}
}
ThreadPool::PointerWQ<AioImageRequest<I> >::_void_dequeue());
assert(peek_item == item);
+ if (lock_required) {
+ this->get_pool_lock().Unlock();
+ m_image_ctx.owner_lock.get_read();
+ if (m_image_ctx.exclusive_lock != nullptr) {
+ ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
+ if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
+ lderr(cct) << "op requires exclusive lock" << dendl;
+ fail_in_flight_io(-EROFS, item);
+
+ // wake up the IO since we won't be returning a request to process
+ this->signal();
+ } else {
+ // stall IO until the acquire completes
+ m_io_blockers.inc();
+ m_image_ctx.exclusive_lock->request_lock(new C_AcquireLock(this, item));
+ }
+ } else {
+ // raced with the exclusive lock being disabled
+ lock_required = false;
+ }
+ m_image_ctx.owner_lock.put_read();
+ this->get_pool_lock().Lock();
+
+ if (lock_required) {
+ return nullptr;
+ }
+ }
+
if (refresh_required) {
- ldout(m_image_ctx.cct, 15) << "image refresh required: delaying IO " << item
- << dendl;
+ ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
// stall IO until the refresh completes
- m_refresh_in_progress = true;
+ m_io_blockers.inc();
this->get_pool_lock().Unlock();
m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
}
template <typename I>
-bool AioImageRequestWQ<I>::is_lock_required() const {
- assert(m_image_ctx.owner_lock.is_locked());
- if (m_image_ctx.exclusive_lock == NULL) {
- return false;
- }
+void AioImageRequestWQ<I>::fail_in_flight_io(int r, AioImageRequest<I> *req) {
+ this->process_finish();
+ req->fail(r);
+ finish_queued_io(req);
+ delete req;
+ finish_in_flight_io();
+}
- return (!m_image_ctx.exclusive_lock->is_lock_owner());
+template <typename I>
+bool AioImageRequestWQ<I>::is_lock_required(bool write_op) const {
+ assert(m_lock.is_locked());
+ return ((write_op && m_require_lock_on_write) ||
+ (!write_op && m_require_lock_on_read));
}
template <typename I>
void AioImageRequestWQ<I>::queue(AioImageRequest<I> *req) {
+ assert(m_image_ctx.owner_lock.is_locked());
+
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
- assert(m_image_ctx.owner_lock.is_locked());
- bool write_op = req->is_write_op();
- bool lock_required = (write_op && is_lock_required()) ||
- (!write_op && m_require_lock_on_read);
-
- if (lock_required && !m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
- lderr(cct) << "op requires exclusive lock" << dendl;
- req->fail(-EROFS);
- delete req;
- finish_in_flight_io();
- return;
- }
-
- if (write_op) {
+ if (req->is_write_op()) {
m_queued_writes.inc();
} else {
m_queued_reads.inc();
}
ThreadPool::PointerWQ<AioImageRequest<I> >::queue(req);
+}
- if (lock_required) {
- m_image_ctx.exclusive_lock->request_lock(nullptr);
+template <typename I>
+void AioImageRequestWQ<I>::handle_acquire_lock(int r, AioImageRequest<I> *req) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
+
+ if (r < 0) {
+ fail_in_flight_io(r, req);
+ } else {
+ // since IO was stalled for acquire -- original IO order is preserved
+ // if we requeue this op for work queue processing
+ this->requeue(req);
}
+
+ assert(m_io_blockers.read() > 0);
+ m_io_blockers.dec();
+ this->signal();
}
template <typename I>
void AioImageRequestWQ<I>::handle_refreshed(int r, AioImageRequest<I> *req) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 15) << "resuming IO after image refresh: r=" << r << ", "
- << "req=" << req << dendl;
+ ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
+ << "req=" << req << dendl;
if (r < 0) {
- this->process_finish();
- req->fail(r);
- finish_queued_io(req);
- delete req;
- finish_in_flight_io();
+ fail_in_flight_io(r, req);
} else {
// since IO was stalled for refresh -- original IO order is preserved
// if we requeue this op for work queue processing
this->requeue(req);
}
- m_refresh_in_progress = false;
+ assert(m_io_blockers.read() > 0);
+ m_io_blockers.dec();
this->signal();
-
- // refresh might have enabled exclusive lock -- IO stalled until
- // we acquire the lock
- RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (is_lock_required() && is_lock_request_needed()) {
- m_image_ctx.exclusive_lock->request_lock(nullptr);
- }
}
template <typename I>
void shut_down(Context *on_shutdown);
- bool is_lock_required() const;
- bool is_lock_request_needed() const;
-
inline bool writes_blocked() const {
RWLock::RLocker locker(m_lock);
return (m_write_blockers > 0);
private:
typedef std::list<Context *> Contexts;
- struct C_RefreshFinish : public Context {
- AioImageRequestWQ *aio_work_queue;
- AioImageRequest<ImageCtxT> *aio_image_request;
-
- C_RefreshFinish(AioImageRequestWQ *aio_work_queue,
- AioImageRequest<ImageCtxT> *aio_image_request)
- : aio_work_queue(aio_work_queue), aio_image_request(aio_image_request) {
- }
- virtual void finish(int r) override {
- aio_work_queue->handle_refreshed(r, aio_image_request);
- }
- };
-
- struct C_BlockedWrites : public Context {
- AioImageRequestWQ *aio_work_queue;
- C_BlockedWrites(AioImageRequestWQ *_aio_work_queue)
- : aio_work_queue(_aio_work_queue) {
- }
-
- virtual void finish(int r) {
- aio_work_queue->handle_blocked_writes(r);
- }
- };
+ struct C_AcquireLock;
+ struct C_BlockedWrites;
+ struct C_RefreshFinish;
ImageCtxT &m_image_ctx;
mutable RWLock m_lock;
atomic_t m_queued_reads {0};
atomic_t m_queued_writes {0};
atomic_t m_in_flight_ios {0};
-
- bool m_refresh_in_progress = false;
+ atomic_t m_io_blockers {0};
bool m_shutdown = false;
Context *m_on_shutdown = nullptr;
+ bool is_lock_required(bool write_op) const;
+
+ inline bool require_lock_on_read() const {
+ RWLock::RLocker locker(m_lock);
+ return m_require_lock_on_read;
+
+ }
inline bool writes_empty() const {
RWLock::RLocker locker(m_lock);
return (m_queued_writes.read() == 0);
int start_in_flight_io(AioCompletion *c);
void finish_in_flight_io();
+ void fail_in_flight_io(int r, AioImageRequest<ImageCtxT> *req);
void queue(AioImageRequest<ImageCtxT> *req);
+ void handle_acquire_lock(int r, AioImageRequest<ImageCtxT> *req);
void handle_refreshed(int r, AioImageRequest<ImageCtxT> *req);
void handle_blocked_writes(int r);
};