--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/AioImageRequestWQ.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+
+namespace librbd {
+
+namespace {
+
+class C_AioReadWQ : public Context {
+public:
+ C_AioReadWQ(ImageCtx *ictx, uint64_t off, size_t len, char *buf,
+ bufferlist *pbl, AioCompletion *c, int op_flags)
+ : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_pbl(pbl), m_comp(c),
+ m_op_flags(op_flags) {
+ }
+protected:
+ virtual void finish(int r) {
+ aio_read(m_ictx, m_off, m_len, m_buf, m_pbl, m_comp, m_op_flags);
+ }
+private:
+ ImageCtx *m_ictx;
+ uint64_t m_off;
+ uint64_t m_len;
+ char *m_buf;
+ bufferlist *m_pbl;
+ AioCompletion *m_comp;
+ int m_op_flags;
+};
+
+class C_AioWriteWQ : public Context {
+public:
+ C_AioWriteWQ(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
+ AioCompletion *c, int op_flags)
+ : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_comp(c),
+ m_op_flags(op_flags) {
+ }
+protected:
+ virtual void finish(int r) {
+ aio_write(m_ictx, m_off, m_len, m_buf, m_comp, m_op_flags);
+ }
+private:
+ ImageCtx *m_ictx;
+ uint64_t m_off;
+ uint64_t m_len;
+ const char *m_buf;
+ AioCompletion *m_comp;
+ int m_op_flags;
+};
+
+class C_AioDiscardWQ : public Context {
+public:
+ C_AioDiscardWQ(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c)
+ : m_ictx(ictx), m_off(off), m_len(len), m_comp(c) {
+ }
+protected:
+ virtual void finish(int r) {
+ aio_discard(m_ictx, m_off, m_len, m_comp);
+ }
+private:
+ ImageCtx *m_ictx;
+ uint64_t m_off;
+ uint64_t m_len;
+ AioCompletion *m_comp;
+};
+
+class C_AioFlushWQ : public Context {
+public:
+ C_AioFlushWQ(ImageCtx *ictx, AioCompletion *c)
+ : m_ictx(ictx), m_comp(c) {
+ }
+protected:
+ virtual void finish(int r) {
+ aio_flush(m_ictx, m_comp);
+ }
+private:
+ ImageCtx *m_ictx;
+ AioCompletion *m_comp;
+};
+
+} // anonymous namespace
+
+void AioImageRequestWQ::aio_read(ImageCtx *ictx, uint64_t off, size_t len,
+ char *buf, bufferlist *pbl, AioCompletion *c,
+ int op_flags) {
+ c->init_time(ictx, librbd::AIO_TYPE_READ);
+ if (ictx->non_blocking_aio) {
+ queue(new C_AioReadWQ(ictx, off, len, buf, pbl, c, op_flags),
+ Metadata(false, c));
+ } else {
+ librbd::aio_read(ictx, off, len, buf, pbl, c, op_flags);
+ }
+}
+
+void AioImageRequestWQ::aio_write(ImageCtx *ictx, uint64_t off, size_t len,
+ const char *buf, AioCompletion *c,
+ int op_flags) {
+ c->init_time(ictx, librbd::AIO_TYPE_WRITE);
+ if (ictx->non_blocking_aio) {
+ queue(new C_AioWriteWQ(ictx, off, len, buf, c, op_flags),
+ Metadata(true, c));
+ } else {
+ librbd::aio_write(ictx, off, len, buf, c, op_flags);
+ }
+}
+
+void AioImageRequestWQ::aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len,
+ AioCompletion *c) {
+ c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
+ if (ictx->non_blocking_aio) {
+ queue(new C_AioDiscardWQ(ictx, off, len, c), Metadata(true, c));
+ } else {
+ librbd::aio_discard(ictx, off, len, c);
+ }
+}
+
+void AioImageRequestWQ::aio_flush(ImageCtx *ictx, AioCompletion *c) {
+ c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
+ if (ictx->non_blocking_aio) {
+ queue(new C_AioFlushWQ(ictx, c), Metadata(false, c));
+ } else {
+ librbd::aio_flush(ictx, c);
+ }
+}
+
+bool AioImageRequestWQ::writes_empty() const {
+ Mutex::Locker locker(m_lock);
+ for (ceph::unordered_map<Context *, Metadata>::const_iterator it =
+ m_context_metadata.begin();
+ it != m_context_metadata.end(); ++it) {
+ if (it->second.write_op) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void *AioImageRequestWQ::_void_dequeue() {
+ Context *peek_item = front();
+ if (peek_item == NULL) {
+ return NULL;
+ }
+
+ {
+ Mutex::Locker locker(m_lock);
+ ceph::unordered_map<Context *, Metadata>::iterator it =
+ m_context_metadata.find(peek_item);
+ assert(it != m_context_metadata.end());
+
+ if (it->second.write_op) {
+ if (m_writes_suspended) {
+ return NULL;
+ }
+ ++m_in_progress_writes;
+ }
+ }
+
+ Context *item = reinterpret_cast<Context *>(
+ ThreadPool::PointerWQ<Context>::_void_dequeue());
+ assert(peek_item == item);
+ return item;
+}
+
+void AioImageRequestWQ::process(Context *ctx) {
+ Metadata metadata;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph::unordered_map<Context *, Metadata>::iterator it =
+ m_context_metadata.find(ctx);
+ assert(it != m_context_metadata.end());
+
+ metadata = it->second;
+ m_context_metadata.erase(it);
+ }
+
+ // TODO
+ ctx->complete(0);
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (metadata.write_op) {
+ assert(m_in_progress_writes > 0);
+ if (--m_in_progress_writes == 0) {
+ m_cond.Signal();
+ }
+ }
+ }
+}
+
+void AioImageRequestWQ::queue(Context *ctx, const Metadata &metadata) {
+ {
+ Mutex::Locker locker(m_lock);
+ m_context_metadata[ctx] = metadata;
+ }
+ ThreadPool::PointerWQ<Context>::queue(ctx);
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
+#define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
+
+#include "common/WorkQueue.h"
+#include "common/Mutex.h"
+#include "include/unordered_map.h"
+
+namespace librbd {
+
+class AioCompletion;
+class ImageCtx;
+
+class AioImageRequestWQ : protected ThreadPool::PointerWQ<Context> {
+public:
+ AioImageRequestWQ(const string &name, time_t ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
+ m_lock("AioImageRequestWQ::m_lock"), m_writes_suspended(false),
+ m_in_progress_writes(0) {
+ }
+
+ void aio_read(ImageCtx *ictx, uint64_t off, size_t len, char *buf,
+ bufferlist *pbl, AioCompletion *c, int op_flags);
+ void aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
+ AioCompletion *c, int op_flags);
+ void aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len,
+ AioCompletion *c);
+ void aio_flush(ImageCtx *ictx, AioCompletion *c);
+
+ using ThreadPool::PointerWQ<Context>::drain;
+
+ bool writes_empty() const;
+ inline bool writes_suspended() const {
+ Mutex::Locker locker(m_lock);
+ return m_writes_suspended;
+ }
+
+ void suspend_writes() {
+ Mutex::Locker locker(m_lock);
+ while (m_in_progress_writes > 0) {
+ m_cond.Wait(m_lock);
+ }
+ }
+
+ void resume_writes() {
+ {
+ Mutex::Locker locker(m_lock);
+ m_writes_suspended = false;
+ }
+ signal();
+ }
+protected:
+ virtual void _clear() {
+ ThreadPool::PointerWQ<Context>::_clear();
+ m_context_metadata.clear();
+ }
+
+ virtual void *_void_dequeue();
+ virtual void process(Context *ctx);
+private:
+ struct Metadata {
+ bool write_op;
+ AioCompletion *aio_comp;
+
+ Metadata() : write_op(false), aio_comp(NULL) {}
+ Metadata(bool _write_op, AioCompletion *_aio_comp)
+ : write_op(_write_op), aio_comp(_aio_comp) {}
+ };
+
+ mutable Mutex m_lock;
+ Cond m_cond;
+ bool m_writes_suspended;
+ uint32_t m_in_progress_writes;
+ ceph::unordered_map<Context *, Metadata> m_context_metadata;
+
+ void queue(Context *ctx, const Metadata &metadata);
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
#include "osdc/ObjectCacher.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
TracepointProvider::Traits tracepoint_traits("librbd_tp.so", "rbd_tracing");
-class C_AioReadWQ : public Context {
-public:
- C_AioReadWQ(librbd::ImageCtx *ictx, uint64_t off, size_t len,
- char *buf, bufferlist *pbl, librbd::AioCompletion *c,
- int op_flags)
- : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_pbl(pbl), m_comp(c),
- m_op_flags(op_flags) {
- }
-protected:
- virtual void finish(int r) {
- librbd::aio_read(m_ictx, m_off, m_len, m_buf, m_pbl, m_comp, m_op_flags);
- }
-private:
- librbd::ImageCtx *m_ictx;
- uint64_t m_off;
- uint64_t m_len;
- char *m_buf;
- bufferlist *m_pbl;
- librbd::AioCompletion *m_comp;
- int m_op_flags;
-};
-
-class C_AioWriteWQ : public Context {
-public:
- C_AioWriteWQ(librbd::ImageCtx *ictx, uint64_t off, size_t len,
- const char *buf, librbd::AioCompletion *c, int op_flags)
- : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_comp(c),
- m_op_flags(op_flags) {
- }
-protected:
- virtual void finish(int r) {
- librbd::aio_write(m_ictx, m_off, m_len, m_buf, m_comp, m_op_flags);
- }
-private:
- librbd::ImageCtx *m_ictx;
- uint64_t m_off;
- uint64_t m_len;
- const char *m_buf;
- librbd::AioCompletion *m_comp;
- int m_op_flags;
-};
-
-class C_AioDiscardWQ : public Context {
-public:
- C_AioDiscardWQ(librbd::ImageCtx *ictx, uint64_t off, uint64_t len,
- librbd::AioCompletion *c)
- : m_ictx(ictx), m_off(off), m_len(len), m_comp(c) {
- }
-protected:
- virtual void finish(int r) {
- librbd::aio_discard(m_ictx, m_off, m_len, m_comp);
- }
-private:
- librbd::ImageCtx *m_ictx;
- uint64_t m_off;
- uint64_t m_len;
- librbd::AioCompletion *m_comp;
-};
-
-class C_AioFlushWQ : public Context {
-public:
- C_AioFlushWQ(librbd::ImageCtx *ictx, librbd::AioCompletion *c)
- : m_ictx(ictx), m_comp(c) {
- }
-protected:
- virtual void finish(int r) {
- librbd::aio_flush(m_ictx, m_comp);
- }
-private:
- librbd::ImageCtx *m_ictx;
- librbd::AioCompletion *m_comp;
-};
-
-void submit_aio_read(librbd::ImageCtx *ictx, uint64_t off, size_t len,
- char *buf, bufferlist *pbl, librbd::AioCompletion *c,
- int op_flags) {
- c->init_time(ictx, librbd::AIO_TYPE_READ);
- if (ictx->non_blocking_aio) {
- ictx->aio_work_queue->queue(new C_AioReadWQ(ictx, off, len, buf, pbl, c,
- op_flags));
- } else {
- librbd::aio_read(ictx, off, len, buf, pbl, c, op_flags);
- }
-}
-
-void submit_aio_write(librbd::ImageCtx *ictx, uint64_t off, size_t len,
- const char *buf, librbd::AioCompletion *c, int op_flags) {
- c->init_time(ictx, librbd::AIO_TYPE_WRITE);
- if (ictx->non_blocking_aio) {
- ictx->aio_work_queue->queue(new C_AioWriteWQ(ictx, off, len, buf, c,
- op_flags));
- } else {
- librbd::aio_write(ictx, off, len, buf, c, op_flags);
- }
-}
-
-void submit_aio_discard(librbd::ImageCtx *ictx, uint64_t off, uint64_t len,
- librbd::AioCompletion *c) {
- c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
- if (ictx->non_blocking_aio) {
- ictx->aio_work_queue->queue(new C_AioDiscardWQ(ictx, off, len, c));
- } else {
- librbd::aio_discard(ictx, off, len, c);
- }
-}
-
-void submit_aio_flush(librbd::ImageCtx *ictx, librbd::AioCompletion *c) {
- c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
- if (ictx->non_blocking_aio) {
- ictx->aio_work_queue->queue(new C_AioFlushWQ(ictx, c));
- } else {
- librbd::aio_flush(ictx, c);
- }
-}
-
CephContext* get_cct(IoCtx &io_ctx) {
return reinterpret_cast<CephContext*>(io_ctx.cct());
}
tracepoint(librbd, aio_write_exit, -EINVAL);
return -EINVAL;
}
- submit_aio_write(ictx, off, len, bl.c_str(), get_aio_completion(c), 0);
+ ictx->aio_work_queue->aio_write(ictx, off, len, bl.c_str(),
+ get_aio_completion(c), 0);
tracepoint(librbd, aio_write_exit, 0);
return 0;
}
tracepoint(librbd, aio_write_exit, -EINVAL);
return -EINVAL;
}
- submit_aio_write(ictx, off, len, bl.c_str(), get_aio_completion(c),
- op_flags);
+ ictx->aio_work_queue->aio_write(ictx, off, len, bl.c_str(),
+ get_aio_completion(c), op_flags);
tracepoint(librbd, aio_write_exit, 0);
return 0;
}
{
ImageCtx *ictx = (ImageCtx *)ctx;
tracepoint(librbd, aio_discard_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, c->pc);
- submit_aio_discard(ictx, off, len, get_aio_completion(c));
+ ictx->aio_work_queue->aio_discard(ictx, off, len, get_aio_completion(c));
tracepoint(librbd, aio_discard_exit, 0);
return 0;
}
tracepoint(librbd, aio_read_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, bl.c_str(), c->pc);
ldout(ictx->cct, 10) << "Image::aio_read() buf=" << (void *)bl.c_str() << "~"
<< (void *)(bl.c_str() + len - 1) << dendl;
- submit_aio_read(ictx, off, len, NULL, &bl, get_aio_completion(c), 0);
+ ictx->aio_work_queue->aio_read(ictx, off, len, NULL, &bl,
+ get_aio_completion(c), 0);
tracepoint(librbd, aio_read_exit, 0);
return 0;
}
ictx->read_only, off, len, bl.c_str(), c->pc, op_flags);
ldout(ictx->cct, 10) << "Image::aio_read() buf=" << (void *)bl.c_str() << "~"
<< (void *)(bl.c_str() + len - 1) << dendl;
- submit_aio_read(ictx, off, len, NULL, &bl, get_aio_completion(c), op_flags);
+ ictx->aio_work_queue->aio_read(ictx, off, len, NULL, &bl,
+ get_aio_completion(c), op_flags);
tracepoint(librbd, aio_read_exit, 0);
return 0;
}
{
ImageCtx *ictx = (ImageCtx *)ctx;
tracepoint(librbd, aio_flush_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, c->pc);
- submit_aio_flush(ictx, get_aio_completion(c));
+ ictx->aio_work_queue->aio_flush(ictx, get_aio_completion(c));
tracepoint(librbd, aio_flush_exit, 0);
return 0;
}
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
tracepoint(librbd, aio_write_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, buf, comp->pc);
- submit_aio_write(ictx, off, len, buf, get_aio_completion(comp), 0);
+ ictx->aio_work_queue->aio_write(ictx, off, len, buf,
+ get_aio_completion(comp), 0);
tracepoint(librbd, aio_write_exit, 0);
return 0;
}
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
tracepoint(librbd, aio_write2_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(),
ictx->read_only, off, len, buf, comp->pc, op_flags);
- submit_aio_write(ictx, off, len, buf, get_aio_completion(comp), op_flags);
+ ictx->aio_work_queue->aio_write(ictx, off, len, buf, get_aio_completion(comp),
+ op_flags);
tracepoint(librbd, aio_write_exit, 0);
return 0;
}
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
tracepoint(librbd, aio_discard_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, comp->pc);
- submit_aio_discard(ictx, off, len, get_aio_completion(comp));
+ ictx->aio_work_queue->aio_discard(ictx, off, len, get_aio_completion(comp));
tracepoint(librbd, aio_discard_exit, 0);
return 0;
}
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
tracepoint(librbd, aio_read_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, buf, comp->pc);
- submit_aio_read(ictx, off, len, buf, NULL, get_aio_completion(comp), 0);
+ ictx->aio_work_queue->aio_read(ictx, off, len, buf, NULL,
+ get_aio_completion(comp), 0);
tracepoint(librbd, aio_read_exit, 0);
return 0;
}
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
tracepoint(librbd, aio_read2_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(),
ictx->read_only, off, len, buf, comp->pc, op_flags);
- submit_aio_read(ictx, off, len, buf, NULL, get_aio_completion(comp),
- op_flags);
+ ictx->aio_work_queue->aio_read(ictx, off, len, buf, NULL,
+ get_aio_completion(comp), op_flags);
tracepoint(librbd, aio_read_exit, 0);
return 0;
}
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
tracepoint(librbd, aio_flush_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, comp->pc);
- submit_aio_flush(ictx, get_aio_completion(comp));
+ ictx->aio_work_queue->aio_flush(ictx, get_aio_completion(comp));
tracepoint(librbd, aio_flush_exit, 0);
return 0;
}