From 91177781121f7e461a19ff7b748538558e46ad23 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 7 Jul 2015 19:05:47 -0400 Subject: [PATCH] librbd: refactor AIO request handling Move aio_read/write/discard/flush to new AioImageRequest classes in support of a unified aio queue / journaling. Signed-off-by: Jason Dillaman --- src/librbd/AioImageRequest.cc | 325 ++++++++++++++++++++++++++++++++ src/librbd/AioImageRequest.h | 136 +++++++++++++ src/librbd/AioImageRequestWQ.cc | 147 +++------------ src/librbd/AioImageRequestWQ.h | 30 +-- src/librbd/AioObjectRequest.cc | 5 +- src/librbd/AsyncOperation.cc | 6 +- src/librbd/CopyupRequest.cc | 4 +- src/librbd/Makefile.am | 2 + src/librbd/internal.cc | 321 +------------------------------ src/librbd/internal.h | 10 +- 10 files changed, 520 insertions(+), 466 deletions(-) create mode 100644 src/librbd/AioImageRequest.cc create mode 100644 src/librbd/AioImageRequest.h diff --git a/src/librbd/AioImageRequest.cc b/src/librbd/AioImageRequest.cc new file mode 100644 index 0000000000000..ed8a9dc20c6ac --- /dev/null +++ b/src/librbd/AioImageRequest.cc @@ -0,0 +1,325 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/AioImageRequest.h" +#include "librbd/AioCompletion.h" +#include "librbd/AioObjectRequest.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageWatcher.h" +#include "librbd/internal.h" +#include +#include "include/assert.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::AioImageRequest: " + +namespace librbd { + +void AioImageRequest::read( + ImageCtx *ictx, AioCompletion *c, + const std::vector > &extents, + char *buf, bufferlist *pbl, int op_flags) { + AioImageRead req(*ictx, c, extents, buf, pbl, op_flags); + req.send(); +} + +void AioImageRequest::read(ImageCtx *ictx, AioCompletion *c, uint64_t off, + size_t len, char *buf, bufferlist *pbl, + int op_flags) { + AioImageRead req(*ictx, c, off, len, buf, pbl, op_flags); + req.send(); +} + +void AioImageRequest::write(ImageCtx *ictx, AioCompletion *c, uint64_t off, + size_t len, const char *buf, int op_flags) { + AioImageWrite req(*ictx, c, off, len, buf, op_flags); + req.send(); +} + +void AioImageRequest::discard(ImageCtx *ictx, AioCompletion *c, uint64_t off, + uint64_t len) { + AioImageDiscard req(*ictx, c, off, len); + req.send(); +} + +void AioImageRequest::flush(ImageCtx *ictx, AioCompletion *c) { + AioImageFlush req(*ictx, c); + req.send(); +} + +void AioImageRequest::send() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << get_request_type() << ": ictx=" << &m_image_ctx << ", " + << "completion=" << m_aio_comp << dendl; + + m_aio_comp->get(); + int r = ictx_check(&m_image_ctx); + if (r < 0) { + m_aio_comp->fail(cct, r); + return; + } + + { + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + execute_request(); + } +} + + +void AioImageRead::execute_request() { + CephContext *cct = m_image_ctx.cct; + + if (m_image_ctx.object_cacher && m_image_ctx.readahead_max_bytes > 0 && + !(m_op_flags & LIBRADOS_OP_FLAG_FADVISE_RANDOM)) { + readahead(&m_image_ctx, m_image_extents); + } + + librados::snap_t snap_id; + map > object_extents; + uint64_t buffer_ofs = 0; + { + // prevent image size from changing between computing clip and recording + // pending async operation + RWLock::RLocker snap_locker(m_image_ctx.snap_lock); + snap_id = m_image_ctx.snap_id; + + // map + for (vector >::const_iterator p = + m_image_extents.begin(); + p != m_image_extents.end(); ++p) { + uint64_t len = p->second; + int r = clip_io(&m_image_ctx, p->first, &len); + if (r < 0) { + m_aio_comp->fail(cct, r); + return; + } + if (len == 0) { + continue; + } + + Striper::file_to_extents(cct, m_image_ctx.format_string, + &m_image_ctx.layout, p->first, len, 0, + object_extents, buffer_ofs); + buffer_ofs += len; + } + + m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_READ); + } + + m_aio_comp->read_buf = m_buf; + m_aio_comp->read_buf_len = buffer_ofs; + m_aio_comp->read_bl = m_pbl; + + for (map >::iterator p = object_extents.begin(); + p != object_extents.end(); ++p) { + for (vector::iterator q = p->second.begin(); + q != p->second.end(); ++q) { + ldout(cct, 20) << " oid " << q->oid << " " << q->offset << "~" + << q->length << " from " << q->buffer_extents + << dendl; + + C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp); + AioObjectRead *req = new AioObjectRead(&m_image_ctx, q->oid.name, + q->objectno, q->offset, q->length, + q->buffer_extents, snap_id, true, + req_comp, m_op_flags); + req_comp->set_req(req); + + if (m_image_ctx.object_cacher) { + C_CacheRead *cache_comp = new C_CacheRead(&m_image_ctx, req); + m_image_ctx.aio_read_from_cache(q->oid, q->objectno, &req->data(), + q->length, q->offset, + cache_comp, m_op_flags); + } else { + req->send(); + } + } + } + + m_aio_comp->finish_adding_requests(cct); + m_aio_comp->put(); + + m_image_ctx.perfcounter->inc(l_librbd_rd); + m_image_ctx.perfcounter->inc(l_librbd_rd_bytes, buffer_ofs); +} + +void AioImageWrite::execute_request() { + CephContext *cct = m_image_ctx.cct; + + RWLock::RLocker md_locker(m_image_ctx.md_lock); + + uint64_t clip_len = m_len; + ::SnapContext snapc; + { + // prevent image size from changing between computing clip and recording + // pending async operation + RWLock::RLocker snap_locker(m_image_ctx.snap_lock); + if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { + m_aio_comp->fail(cct, -EROFS); + return; + } + + int r = clip_io(&m_image_ctx, m_off, &clip_len); + if (r < 0) { + m_aio_comp->fail(cct, r); + return; + } + + snapc = m_image_ctx.snapc; + m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_WRITE); + } + + if (m_image_ctx.image_watcher->is_lock_supported() && + !m_image_ctx.image_watcher->is_lock_owner()) { + m_image_ctx.image_watcher->request_lock( + boost::bind(&AioImageRequest::write, &m_image_ctx, _1, m_off, m_len, + m_buf, m_op_flags), m_aio_comp); + m_aio_comp->put(); + return; + } + + // map + vector extents; + if (m_len > 0) { + Striper::file_to_extents(cct, m_image_ctx.format_string, + &m_image_ctx.layout, m_off, clip_len, 0, extents); + } + + for (vector::iterator p = extents.begin(); + p != extents.end(); ++p) { + ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length + << " from " << p->buffer_extents << dendl; + // assemble extent + bufferlist bl; + for (vector >::iterator q = + p->buffer_extents.begin(); + q != p->buffer_extents.end(); ++q) { + bl.append(m_buf + q->first, q->second); + } + + C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + if (m_image_ctx.object_cacher) { + m_image_ctx.write_to_cache(p->oid, bl, p->length, p->offset, req_comp, + m_op_flags); + } else { + AioObjectWrite *req = new AioObjectWrite(&m_image_ctx, p->oid.name, + p->objectno, p->offset, bl, + snapc, req_comp); + + req->set_op_flags(m_op_flags); + req->send(); + } + } + + m_aio_comp->finish_adding_requests(cct); + m_aio_comp->put(); + + m_image_ctx.perfcounter->inc(l_librbd_wr); + m_image_ctx.perfcounter->inc(l_librbd_wr_bytes, clip_len); +} + +void AioImageDiscard::execute_request() { + CephContext *cct = m_image_ctx.cct; + + RWLock::RLocker md_locker(m_image_ctx.md_lock); + + uint64_t clip_len = m_len; + ::SnapContext snapc; + { + // prevent image size from changing between computing clip and recording + // pending async operation + RWLock::RLocker snap_locker(m_image_ctx.snap_lock); + if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { + m_aio_comp->fail(cct, -EROFS); + return; + } + + int r = clip_io(&m_image_ctx, m_off, &clip_len); + if (r < 0) { + m_aio_comp->fail(cct, r); + return; + } + + snapc = m_image_ctx.snapc; + m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_DISCARD); + } + + if (m_image_ctx.image_watcher->is_lock_supported() && + !m_image_ctx.image_watcher->is_lock_owner()) { + m_image_ctx.image_watcher->request_lock( + boost::bind(&AioImageRequest::discard, &m_image_ctx, _1, m_off, m_len), + m_aio_comp); + m_aio_comp->put(); + return; + } + + // map + vector extents; + if (m_len > 0) { + Striper::file_to_extents(cct, m_image_ctx.format_string, + &m_image_ctx.layout, m_off, clip_len, 0, extents); + } + + for (vector::iterator p = extents.begin(); + p != extents.end(); ++p) { + ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length + << " from " << p->buffer_extents << dendl; + C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + AioObjectRequest *req; + + if (p->length == m_image_ctx.layout.fl_object_size) { + req = new AioObjectRemove(&m_image_ctx, p->oid.name, p->objectno, snapc, + req_comp); + } else if (p->offset + p->length == m_image_ctx.layout.fl_object_size) { + req = new AioObjectTruncate(&m_image_ctx, p->oid.name, p->objectno, + p->offset, snapc, req_comp); + } else { + if(cct->_conf->rbd_skip_partial_discard) { + delete req_comp; + continue; + } + req = new AioObjectZero(&m_image_ctx, p->oid.name, p->objectno, p->offset, + p->length, snapc, req_comp); + } + req->send(); + } + + if (m_image_ctx.object_cacher) { + Mutex::Locker l(m_image_ctx.cache_lock); + m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set, extents); + } + + m_aio_comp->finish_adding_requests(cct); + m_aio_comp->put(); + + m_image_ctx.perfcounter->inc(l_librbd_discard); + m_image_ctx.perfcounter->inc(l_librbd_discard_bytes, clip_len); +} + +void AioImageFlush::execute_request() { + CephContext *cct = m_image_ctx.cct; + + // TODO race condition between registering op and submitting to cache + // (might not be flushed -- backport needed) + C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp); + m_image_ctx.flush_async_operations(flush_ctx); + + m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH); + C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + if (m_image_ctx.object_cacher != NULL) { + m_image_ctx.flush_cache_aio(req_comp); + } else { + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb); + m_image_ctx.data_ctx.aio_flush_async(rados_completion); + rados_completion->release(); + } + + m_aio_comp->finish_adding_requests(cct); + m_aio_comp->put(); + + m_image_ctx.perfcounter->inc(l_librbd_aio_flush); +} + +} // namespace librbd diff --git a/src/librbd/AioImageRequest.h b/src/librbd/AioImageRequest.h new file mode 100644 index 0000000000000..1d8586b201d43 --- /dev/null +++ b/src/librbd/AioImageRequest.h @@ -0,0 +1,136 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_H +#define CEPH_LIBRBD_AIO_IMAGE_REQUEST_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include +#include + +namespace librbd { + +class AioCompletion; +class ImageCtx; + +class AioImageRequest { +public: + AioImageRequest(ImageCtx &image_ctx, AioCompletion *aio_comp) + : m_image_ctx(image_ctx), m_aio_comp(aio_comp) {} + virtual ~AioImageRequest() {} + + static void read(ImageCtx *ictx, AioCompletion *c, + const std::vector > &extents, + char *buf, bufferlist *pbl, int op_flags); + static void read(ImageCtx *ictx, AioCompletion *c, uint64_t off, size_t len, + char *buf, bufferlist *pbl, int op_flags); + static void write(ImageCtx *ictx, AioCompletion *c, uint64_t off, size_t len, + const char *buf, int op_flags); + static void discard(ImageCtx *ictx, AioCompletion *c, uint64_t off, + uint64_t len); + static void flush(ImageCtx *ictx, AioCompletion *c); + + virtual bool is_write_op() const = 0; + + void send(); +protected: + ImageCtx &m_image_ctx; + AioCompletion *m_aio_comp; + + virtual void execute_request() = 0; + virtual const char *get_request_type() const = 0; +}; + +class AioImageRead : public AioImageRequest { +public: + AioImageRead(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off, + size_t len, char *buf, bufferlist *pbl, int op_flags) + : AioImageRequest(image_ctx, aio_comp), m_buf(buf), m_pbl(pbl), + m_op_flags(op_flags) { + m_image_extents.push_back(std::make_pair(off, len)); + } + AioImageRead(ImageCtx &image_ctx, AioCompletion *aio_comp, + const std::vector > &image_extents, + char *buf, bufferlist *pbl, int op_flags) + : AioImageRequest(image_ctx, aio_comp), m_image_extents(image_extents), + m_buf(buf), m_pbl(pbl), m_op_flags(op_flags) { + } + + virtual bool is_write_op() const { + return false; + } +protected: + virtual void execute_request(); + virtual const char *get_request_type() const { + return "aio_read"; + } +private: + std::vector > m_image_extents; + char *m_buf; + bufferlist *m_pbl; + int m_op_flags; +}; + +class AioImageWrite : public AioImageRequest { +public: + AioImageWrite(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off, + size_t len, const char *buf, int op_flags) + : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len), m_buf(buf), + m_op_flags(op_flags) { + } + + virtual bool is_write_op() const { + return true; + } +protected: + virtual void execute_request(); + virtual const char *get_request_type() const { + return "aio_write"; + } +private: + uint64_t m_off; + uint64_t m_len; + const char *m_buf; + int m_op_flags; +}; + +class AioImageDiscard : public AioImageRequest { +public: + AioImageDiscard(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off, + uint64_t len) + : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len) { + } + + virtual bool is_write_op() const { + return true; + } +protected: + virtual void execute_request(); + virtual const char *get_request_type() const { + return "aio_discard"; + } +private: + uint64_t m_off; + uint64_t m_len; +}; + +class AioImageFlush : public AioImageRequest { +public: + AioImageFlush(ImageCtx &image_ctx, AioCompletion *aio_comp) + : AioImageRequest(image_ctx, aio_comp) { + } + + virtual bool is_write_op() const { + return false; + } +protected: + virtual void execute_request(); + virtual const char *get_request_type() const { + return "aio_flush"; + } +}; + +} // namespace librbd + +#endif // CEPH_LIBRBD_AIO_IMAGE_REQUEST_H diff --git a/src/librbd/AioImageRequestWQ.cc b/src/librbd/AioImageRequestWQ.cc index 03e2af3e02b14..82cdb0202cb52 100644 --- a/src/librbd/AioImageRequestWQ.cc +++ b/src/librbd/AioImageRequestWQ.cc @@ -3,95 +3,20 @@ #include "librbd/AioImageRequestWQ.h" #include "librbd/AioCompletion.h" +#include "librbd/AioImageRequest.h" #include "librbd/ImageCtx.h" #include "librbd/internal.h" namespace librbd { -namespace { - -class C_AioReadWQ : public Context { -public: - C_AioReadWQ(ImageCtx *ictx, uint64_t off, size_t len, char *buf, - bufferlist *pbl, AioCompletion *c, int op_flags) - : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_pbl(pbl), m_comp(c), - m_op_flags(op_flags) { - } -protected: - virtual void finish(int r) { - aio_read(m_ictx, m_off, m_len, m_buf, m_pbl, m_comp, m_op_flags); - } -private: - ImageCtx *m_ictx; - uint64_t m_off; - uint64_t m_len; - char *m_buf; - bufferlist *m_pbl; - AioCompletion *m_comp; - int m_op_flags; -}; - -class C_AioWriteWQ : public Context { -public: - C_AioWriteWQ(ImageCtx *ictx, uint64_t off, size_t len, const char *buf, - AioCompletion *c, int op_flags) - : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_comp(c), - m_op_flags(op_flags) { - } -protected: - virtual void finish(int r) { - aio_write(m_ictx, m_off, m_len, m_buf, m_comp, m_op_flags); - } -private: - ImageCtx *m_ictx; - uint64_t m_off; - uint64_t m_len; - const char *m_buf; - AioCompletion *m_comp; - int m_op_flags; -}; - -class C_AioDiscardWQ : public Context { -public: - C_AioDiscardWQ(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c) - : m_ictx(ictx), m_off(off), m_len(len), m_comp(c) { - } -protected: - virtual void finish(int r) { - aio_discard(m_ictx, m_off, m_len, m_comp); - } -private: - ImageCtx *m_ictx; - uint64_t m_off; - uint64_t m_len; - AioCompletion *m_comp; -}; - -class C_AioFlushWQ : public Context { -public: - C_AioFlushWQ(ImageCtx *ictx, AioCompletion *c) - : m_ictx(ictx), m_comp(c) { - } -protected: - virtual void finish(int r) { - aio_flush(m_ictx, m_comp); - } -private: - ImageCtx *m_ictx; - AioCompletion *m_comp; -}; - -} // anonymous namespace - void AioImageRequestWQ::aio_read(ImageCtx *ictx, uint64_t off, size_t len, char *buf, bufferlist *pbl, AioCompletion *c, int op_flags) { c->init_time(ictx, librbd::AIO_TYPE_READ); if (ictx->non_blocking_aio) { - queue(new C_AioReadWQ(ictx, off, len, buf, pbl, c, op_flags), - Metadata(false, c)); + queue(new AioImageRead(*ictx, c, off, len, buf, pbl, op_flags)); } else { - librbd::aio_read(ictx, off, len, buf, pbl, c, op_flags); + AioImageRequest::read(ictx, c, off, len, buf, pbl, op_flags); } } @@ -100,10 +25,9 @@ void AioImageRequestWQ::aio_write(ImageCtx *ictx, uint64_t off, size_t len, int op_flags) { c->init_time(ictx, librbd::AIO_TYPE_WRITE); if (ictx->non_blocking_aio) { - queue(new C_AioWriteWQ(ictx, off, len, buf, c, op_flags), - Metadata(true, c)); + queue(new AioImageWrite(*ictx, c, off, len, buf, op_flags)); } else { - librbd::aio_write(ictx, off, len, buf, c, op_flags); + AioImageRequest::write(ictx, c, off, len, buf, op_flags); } } @@ -111,46 +35,35 @@ void AioImageRequestWQ::aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c) { c->init_time(ictx, librbd::AIO_TYPE_DISCARD); if (ictx->non_blocking_aio) { - queue(new C_AioDiscardWQ(ictx, off, len, c), Metadata(true, c)); + queue(new AioImageDiscard(*ictx, c, off, len)); } else { - librbd::aio_discard(ictx, off, len, c); + AioImageRequest::discard(ictx, c, off, len); } } void AioImageRequestWQ::aio_flush(ImageCtx *ictx, AioCompletion *c) { c->init_time(ictx, librbd::AIO_TYPE_FLUSH); if (ictx->non_blocking_aio) { - queue(new C_AioFlushWQ(ictx, c), Metadata(false, c)); + queue(new AioImageFlush(*ictx, c)); } else { - librbd::aio_flush(ictx, c); + AioImageRequest::flush(ictx, c); } } bool AioImageRequestWQ::writes_empty() const { Mutex::Locker locker(m_lock); - for (ceph::unordered_map::const_iterator it = - m_context_metadata.begin(); - it != m_context_metadata.end(); ++it) { - if (it->second.write_op) { - return false; - } - } - return true; + return (m_queued_writes > 0); } void *AioImageRequestWQ::_void_dequeue() { - Context *peek_item = front(); + AioImageRequest *peek_item = front(); if (peek_item == NULL) { return NULL; } { - Mutex::Locker locker(m_lock); - ceph::unordered_map::iterator it = - m_context_metadata.find(peek_item); - assert(it != m_context_metadata.end()); - - if (it->second.write_op) { + if (peek_item->is_write_op()) { + Mutex::Locker locker(m_lock); if (m_writes_suspended) { return NULL; } @@ -158,44 +71,38 @@ void *AioImageRequestWQ::_void_dequeue() { } } - Context *item = reinterpret_cast( - ThreadPool::PointerWQ::_void_dequeue()); + AioImageRequest *item = reinterpret_cast( + ThreadPool::PointerWQ::_void_dequeue()); assert(peek_item == item); return item; } -void AioImageRequestWQ::process(Context *ctx) { - Metadata metadata; - { - Mutex::Locker locker(m_lock); - ceph::unordered_map::iterator it = - m_context_metadata.find(ctx); - assert(it != m_context_metadata.end()); - - metadata = it->second; - m_context_metadata.erase(it); - } - - // TODO - ctx->complete(0); +void AioImageRequestWQ::process(AioImageRequest *req) { + req->send(); { Mutex::Locker locker(m_lock); - if (metadata.write_op) { + if (req->is_write_op()) { + assert(m_queued_writes > 0); + --m_queued_writes; + assert(m_in_progress_writes > 0); if (--m_in_progress_writes == 0) { m_cond.Signal(); } } } + delete req; } -void AioImageRequestWQ::queue(Context *ctx, const Metadata &metadata) { +void AioImageRequestWQ::queue(AioImageRequest *req) { { Mutex::Locker locker(m_lock); - m_context_metadata[ctx] = metadata; + if (req->is_write_op()) { + ++m_queued_writes; + } } - ThreadPool::PointerWQ::queue(ctx); + ThreadPool::PointerWQ::queue(req); } } // namespace librbd diff --git a/src/librbd/AioImageRequestWQ.h b/src/librbd/AioImageRequestWQ.h index d4abfa998a47b..f5c0b8e647da6 100644 --- a/src/librbd/AioImageRequestWQ.h +++ b/src/librbd/AioImageRequestWQ.h @@ -6,19 +6,19 @@ #include "common/WorkQueue.h" #include "common/Mutex.h" -#include "include/unordered_map.h" namespace librbd { class AioCompletion; +class AioImageRequest; class ImageCtx; -class AioImageRequestWQ : protected ThreadPool::PointerWQ { +class AioImageRequestWQ : protected ThreadPool::PointerWQ { public: AioImageRequestWQ(const string &name, time_t ti, ThreadPool *tp) - : ThreadPool::PointerWQ(name, ti, 0, tp), + : ThreadPool::PointerWQ(name, ti, 0, tp), m_lock("AioImageRequestWQ::m_lock"), m_writes_suspended(false), - m_in_progress_writes(0) { + m_in_progress_writes(0), m_queued_writes(0) { } void aio_read(ImageCtx *ictx, uint64_t off, size_t len, char *buf, @@ -29,7 +29,7 @@ public: AioCompletion *c); void aio_flush(ImageCtx *ictx, AioCompletion *c); - using ThreadPool::PointerWQ::drain; + using ThreadPool::PointerWQ::drain; bool writes_empty() const; inline bool writes_suspended() const { @@ -52,30 +52,16 @@ public: signal(); } protected: - virtual void _clear() { - ThreadPool::PointerWQ::_clear(); - m_context_metadata.clear(); - } - virtual void *_void_dequeue(); - virtual void process(Context *ctx); + virtual void process(AioImageRequest *req); private: - struct Metadata { - bool write_op; - AioCompletion *aio_comp; - - Metadata() : write_op(false), aio_comp(NULL) {} - Metadata(bool _write_op, AioCompletion *_aio_comp) - : write_op(_write_op), aio_comp(_aio_comp) {} - }; - mutable Mutex m_lock; Cond m_cond; bool m_writes_suspended; uint32_t m_in_progress_writes; - ceph::unordered_map m_context_metadata; + uint32_t m_queued_writes; - void queue(Context *ctx, const Metadata &metadata); + void queue(AioImageRequest *req); }; } // namespace librbd diff --git a/src/librbd/AioObjectRequest.cc b/src/librbd/AioObjectRequest.cc index d09c6ddf351ad..dbaee3a8523e9 100644 --- a/src/librbd/AioObjectRequest.cc +++ b/src/librbd/AioObjectRequest.cc @@ -8,6 +8,7 @@ #include "common/RWLock.h" #include "librbd/AioCompletion.h" +#include "librbd/AioImageRequest.h" #include "librbd/ImageCtx.h" #include "librbd/ImageWatcher.h" #include "librbd/internal.h" @@ -271,8 +272,8 @@ namespace librbd { << " parent completion " << m_parent_completion << " extents " << parent_extents << dendl; - aio_read(m_ictx->parent, parent_extents, NULL, &m_read_data, - m_parent_completion, 0); + AioImageRequest::read(m_ictx->parent, m_parent_completion, parent_extents, + NULL, &m_read_data, 0); } /** write **/ diff --git a/src/librbd/AsyncOperation.cc b/src/librbd/AsyncOperation.cc index dfb1e61a10095..3114d549f7715 100644 --- a/src/librbd/AsyncOperation.cc +++ b/src/librbd/AsyncOperation.cc @@ -15,7 +15,7 @@ void AsyncOperation::start_op(ImageCtx &image_ctx) { assert(m_image_ctx == NULL); m_image_ctx = &image_ctx; - ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl; + ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl; Mutex::Locker l(m_image_ctx->async_ops_lock); m_image_ctx->async_ops.push_front(&m_xlist_item); } @@ -50,7 +50,9 @@ void AsyncOperation::finish_op() { void AsyncOperation::add_flush_context(Context *on_finish) { assert(m_image_ctx->async_ops_lock.is_locked()); + ldout(m_image_ctx->cct, 20) << this << " " << __func__ << ": " + << "flush=" << on_finish << dendl; m_flush_contexts.push_back(on_finish); -} +} } // namespace librbd diff --git a/src/librbd/CopyupRequest.cc b/src/librbd/CopyupRequest.cc index 6be414f9067aa..4f162bb253d38 100644 --- a/src/librbd/CopyupRequest.cc +++ b/src/librbd/CopyupRequest.cc @@ -7,6 +7,7 @@ #include "common/Mutex.h" #include "librbd/AioCompletion.h" +#include "librbd/AioImageRequest.h" #include "librbd/AioObjectRequest.h" #include "librbd/AsyncObjectThrottle.h" #include "librbd/CopyupRequest.h" @@ -185,7 +186,8 @@ private: << ", oid " << m_oid << ", extents " << m_image_extents << dendl; - aio_read(m_ictx->parent, m_image_extents, NULL, &m_copyup_data, comp, 0); + AioImageRequest::read(m_ictx->parent, comp, m_image_extents, NULL, + &m_copyup_data, 0); } void CopyupRequest::queue_send() diff --git a/src/librbd/Makefile.am b/src/librbd/Makefile.am index 29d45b34ca5ab..f8691124415f1 100644 --- a/src/librbd/Makefile.am +++ b/src/librbd/Makefile.am @@ -8,6 +8,7 @@ if WITH_RBD librbd_internal_la_SOURCES = \ librbd/AioCompletion.cc \ + librbd/AioImageRequest.cc \ librbd/AioImageRequestWQ.cc \ librbd/AioObjectRequest.cc \ librbd/AsyncFlattenRequest.cc \ @@ -50,6 +51,7 @@ lib_LTLIBRARIES += librbd.la noinst_HEADERS += \ librbd/AioCompletion.h \ + librbd/AioImageRequest.h \ librbd/AioImageRequestWQ.h \ librbd/AioObjectRequest.h \ librbd/AsyncFlattenRequest.h \ diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 78fb8235a853b..a94597971e0d4 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -18,6 +18,7 @@ #include "cls/rbd/cls_rbd_client.h" #include "librbd/AioCompletion.h" +#include "librbd/AioImageRequest.h" #include "librbd/AioImageRequestWQ.h" #include "librbd/AioObjectRequest.h" #include "librbd/AsyncFlattenRequest.h" @@ -2896,7 +2897,8 @@ reprotect_and_return_err: Context *ctx = new C_CopyWrite(m_throttle, m_bl); AioCompletion *comp = aio_create_completion_internal(ctx, rbd_ctx_cb); - aio_write(m_dest, m_offset, m_bl->length(), m_bl->c_str(), comp, LIBRADOS_OP_FLAG_FADVISE_DONTNEED); + AioImageRequest::write(m_dest, comp, m_offset, m_bl->length(), + m_bl->c_str(), LIBRADOS_OP_FLAG_FADVISE_DONTNEED); } private: SimpleThrottle *m_throttle; @@ -2948,7 +2950,7 @@ reprotect_and_return_err: bufferlist *bl = new bufferlist(); Context *ctx = new C_CopyRead(&throttle, dest, offset, bl); AioCompletion *comp = aio_create_completion_internal(ctx, rbd_ctx_cb); - aio_read(src, offset, len, NULL, bl, comp, fadvise_flags); + AioImageRequest::read(src, comp, offset, len, NULL, bl, fadvise_flags); prog_ctx.update_progress(offset, src_size); } @@ -3469,7 +3471,7 @@ reprotect_and_return_err: Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret); AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb); - aio_read(ictx, off, read_len, NULL, &bl, c, 0); + AioImageRequest::read(ictx, c, off, read_len, NULL, &bl, 0); mylock.Lock(); while (!done) @@ -3551,7 +3553,7 @@ reprotect_and_return_err: Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret); AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb); - aio_read(ictx, image_extents, buf, pbl, c, op_flags); + AioImageRequest::read(ictx, c, image_extents, buf, pbl, op_flags); mylock.Lock(); while (!done) @@ -3581,7 +3583,7 @@ reprotect_and_return_err: Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret); AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb); - aio_write(ictx, off, mylen, buf, c, op_flags); + AioImageRequest::write(ictx, c, off, mylen, buf, op_flags); mylock.Lock(); while (!done) @@ -3615,7 +3617,7 @@ reprotect_and_return_err: Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret); AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb); - aio_discard(ictx, off, mylen, c); + AioImageRequest::discard(ictx, c, off, mylen); mylock.Lock(); while (!done) @@ -3666,39 +3668,6 @@ reprotect_and_return_err: return 0; } - void aio_flush(ImageCtx *ictx, AioCompletion *c) - { - CephContext *cct = ictx->cct; - ldout(cct, 20) << "aio_flush " << ictx << " completion " << c << dendl; - - c->get(); - int r = ictx_check(ictx); - if (r < 0) { - c->fail(cct, r); - return; - } - - RWLock::RLocker owner_locker(ictx->owner_lock); - ictx->user_flushed(); - - C_AioRequest *flush_ctx = new C_AioRequest(cct, c); - ictx->flush_async_operations(flush_ctx); - - c->start_op(ictx, AIO_TYPE_FLUSH); - C_AioRequest *req_comp = new C_AioRequest(cct, c); - if (ictx->object_cacher) { - ictx->flush_cache_aio(req_comp); - } else { - librados::AioCompletion *rados_completion = - librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb); - ictx->data_ctx.aio_flush_async(rados_completion); - rados_completion->release(); - } - c->finish_adding_requests(cct); - c->put(); - ictx->perfcounter->inc(l_librbd_aio_flush); - } - int flush(ImageCtx *ictx) { CephContext *cct = ictx->cct; @@ -3756,90 +3725,6 @@ reprotect_and_return_err: return r; } - void aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf, - AioCompletion *c, int op_flags) - { - CephContext *cct = ictx->cct; - ldout(cct, 20) << "aio_write " << ictx << " off = " << off << " len = " - << len << " buf = " << (void*)buf << dendl; - - c->get(); - int r = ictx_check(ictx); - if (r < 0) { - c->fail(cct, r); - return; - } - - RWLock::RLocker owner_locker(ictx->owner_lock); - RWLock::RLocker md_locker(ictx->md_lock); - - uint64_t clip_len = len; - ::SnapContext snapc; - { - // prevent image size from changing between computing clip and recording - // pending async operation - RWLock::RLocker snap_locker(ictx->snap_lock); - if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) { - c->fail(cct, -EROFS); - return; - } - - r = clip_io(ictx, off, &clip_len); - if (r < 0) { - c->fail(cct, r); - return; - } - - snapc = ictx->snapc; - c->start_op(ictx, AIO_TYPE_WRITE); - } - - if (ictx->image_watcher->is_lock_supported() && - !ictx->image_watcher->is_lock_owner()) { - c->put(); - ictx->image_watcher->request_lock( - boost::bind(&librbd::aio_write, ictx, off, len, buf, _1, op_flags), c); - return; - } - - // map - vector extents; - if (len > 0) { - Striper::file_to_extents(ictx->cct, ictx->format_string, - &ictx->layout, off, clip_len, 0, extents); - } - - for (vector::iterator p = extents.begin(); p != extents.end(); ++p) { - ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length - << " from " << p->buffer_extents << dendl; - // assemble extent - bufferlist bl; - for (vector >::iterator q = p->buffer_extents.begin(); - q != p->buffer_extents.end(); - ++q) { - bl.append(buf + q->first, q->second); - } - - C_AioRequest *req_comp = new C_AioRequest(cct, c); - if (ictx->object_cacher) { - ictx->write_to_cache(p->oid, bl, p->length, p->offset, req_comp, op_flags); - } else { - AioObjectWrite *req = new AioObjectWrite(ictx, p->oid.name, p->objectno, - p->offset, bl, snapc, - req_comp); - - req->set_op_flags(op_flags); - req->send(); - } - } - - c->finish_adding_requests(ictx->cct); - c->put(); - - ictx->perfcounter->inc(l_librbd_wr); - ictx->perfcounter->inc(l_librbd_wr_bytes, clip_len); - } - int metadata_get(ImageCtx *ictx, const string &key, string *value) { CephContext *cct = ictx->cct; @@ -3894,95 +3779,6 @@ reprotect_and_return_err: return cls_client::metadata_list(&ictx->md_ctx, ictx->header_oid, start, max, pairs); } - void aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c) - { - CephContext *cct = ictx->cct; - ldout(cct, 20) << "aio_discard " << ictx << " off = " << off << " len = " - << len << dendl; - - c->get(); - int r = ictx_check(ictx); - if (r < 0) { - c->fail(cct, r); - return; - } - - RWLock::RLocker owner_locker(ictx->owner_lock); - RWLock::RLocker md_locker(ictx->md_lock); - - uint64_t clip_len = len; - ::SnapContext snapc; - { - // prevent image size from changing between computing clip and recording - // pending async operation - RWLock::RLocker snap_locker(ictx->snap_lock); - if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) { - c->fail(cct, -EROFS); - return; - } - - r = clip_io(ictx, off, &clip_len); - if (r < 0) { - c->fail(cct, r); - return; - } - - // TODO: check for snap - snapc = ictx->snapc; - c->start_op(ictx, AIO_TYPE_DISCARD); - } - - if (ictx->image_watcher->is_lock_supported() && - !ictx->image_watcher->is_lock_owner()) { - c->put(); - ictx->image_watcher->request_lock( - boost::bind(&librbd::aio_discard, ictx, off, len, _1), c); - return; - } - - // map - vector extents; - if (len > 0) { - Striper::file_to_extents(ictx->cct, ictx->format_string, - &ictx->layout, off, clip_len, 0, extents); - } - - for (vector::iterator p = extents.begin(); p != extents.end(); ++p) { - ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length - << " from " << p->buffer_extents << dendl; - C_AioRequest *req_comp = new C_AioRequest(cct, c); - AioObjectRequest *req; - - if (p->length == ictx->layout.fl_object_size) { - req = new AioObjectRemove(ictx, p->oid.name, p->objectno, snapc, - req_comp); - } else if (p->offset + p->length == ictx->layout.fl_object_size) { - req = new AioObjectTruncate(ictx, p->oid.name, p->objectno, p->offset, - snapc, req_comp); - } else { - if(ictx->cct->_conf->rbd_skip_partial_discard) { - delete req_comp; - continue; - } - req = new AioObjectZero(ictx, p->oid.name, p->objectno, p->offset, - p->length, snapc, req_comp); - } - - req->send(); - } - - if (ictx->object_cacher) { - Mutex::Locker l(ictx->cache_lock); - ictx->object_cacher->discard_set(ictx->object_set, extents); - } - - c->finish_adding_requests(ictx->cct); - c->put(); - - ictx->perfcounter->inc(l_librbd_discard); - ictx->perfcounter->inc(l_librbd_discard_bytes, clip_len); - } - void rbd_req_cb(completion_t cb, void *arg) { AioObjectRequest *req = reinterpret_cast(arg); @@ -3990,15 +3786,6 @@ reprotect_and_return_err: req->complete(comp->get_return_value()); } - void aio_read(ImageCtx *ictx, uint64_t off, size_t len, - char *buf, bufferlist *bl, - AioCompletion *c, int op_flags) - { - vector > image_extents(1); - image_extents[0] = make_pair(off, len); - aio_read(ictx, image_extents, buf, bl, c, op_flags); - } - struct C_RBD_Readahead : public Context { ImageCtx *ictx; object_t oid; @@ -4012,8 +3799,8 @@ reprotect_and_return_err: } }; - static void readahead(ImageCtx *ictx, - const vector >& image_extents) + void readahead(ImageCtx *ictx, + const vector >& image_extents) { uint64_t total_bytes = 0; for (vector >::const_iterator p = image_extents.begin(); @@ -4057,94 +3844,6 @@ reprotect_and_return_err: } } - void aio_read(ImageCtx *ictx, const vector >& image_extents, - char *buf, bufferlist *pbl, AioCompletion *c, int op_flags) - { - CephContext *cct = ictx->cct; - ldout(cct, 20) << "aio_read " << ictx << " completion " << c << " " << image_extents << dendl; - - c->get(); - int r = ictx_check(ictx); - if (r < 0) { - c->fail(cct, r); - return; - } - - RWLock::RLocker owner_locker(ictx->owner_lock); - - // readahead - if (ictx->object_cacher && ictx->readahead_max_bytes > 0 && - !(op_flags & LIBRADOS_OP_FLAG_FADVISE_RANDOM)) { - readahead(ictx, image_extents); - } - - snap_t snap_id; - map > object_extents; - uint64_t buffer_ofs = 0; - { - // prevent image size from changing between computing clip and recording - // pending async operation - RWLock::RLocker snap_locker(ictx->snap_lock); - snap_id = ictx->snap_id; - - // map - for (vector >::const_iterator p = - image_extents.begin(); - p != image_extents.end(); ++p) { - uint64_t len = p->second; - r = clip_io(ictx, p->first, &len); - if (r < 0) { - c->fail(cct, r); - return; - } - if (len == 0) { - continue; - } - - Striper::file_to_extents(cct, ictx->format_string, &ictx->layout, - p->first, len, 0, object_extents, buffer_ofs); - buffer_ofs += len; - } - c->start_op(ictx, AIO_TYPE_READ); - } - - c->read_buf = buf; - c->read_buf_len = buffer_ofs; - c->read_bl = pbl; - - for (map >::iterator p = object_extents.begin(); - p != object_extents.end(); ++p) { - for (vector::iterator q = p->second.begin(); - q != p->second.end(); ++q) { - ldout(ictx->cct, 20) << " oid " << q->oid << " " << q->offset << "~" - << q->length << " from " << q->buffer_extents - << dendl; - - C_AioRead *req_comp = new C_AioRead(ictx->cct, c); - AioObjectRead *req = new AioObjectRead(ictx, q->oid.name, q->objectno, - q->offset, q->length, - q->buffer_extents, snap_id, true, - req_comp, op_flags); - req_comp->set_req(req); - - if (ictx->object_cacher) { - C_CacheRead *cache_comp = new C_CacheRead(ictx, req); - ictx->aio_read_from_cache(q->oid, q->objectno, &req->data(), - q->length, q->offset, - cache_comp, op_flags); - } else { - req->send(); - } - } - } - - c->finish_adding_requests(cct); - c->put(); - - ictx->perfcounter->inc(l_librbd_rd); - ictx->perfcounter->inc(l_librbd_rd_bytes, buffer_ofs); - } - AioCompletion *aio_create_completion() { AioCompletion *c = new AioCompletion(); return c; diff --git a/src/librbd/internal.h b/src/librbd/internal.h index 33d24011e3690..2dc0aa0ef984d 100644 --- a/src/librbd/internal.h +++ b/src/librbd/internal.h @@ -198,6 +198,8 @@ namespace librbd { uint64_t len, bool include_parent, bool whole_object, int (*cb)(uint64_t, size_t, int, void *), void *arg); + void readahead(ImageCtx *ictx, + const vector >& image_extents); ssize_t read(ImageCtx *ictx, uint64_t off, size_t len, char *buf, int op_flags); ssize_t read(ImageCtx *ictx, const vector >& image_extents, char *buf, bufferlist *pbl, int op_flags); @@ -212,14 +214,6 @@ namespace librbd { int async_rebuild_object_map(ImageCtx *ictx, Context *ctx, ProgressContext &prog_ctx); - void aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf, - AioCompletion *c, int op_flags); - void aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c); - void aio_read(ImageCtx *ictx, uint64_t off, size_t len, - char *buf, bufferlist *pbl, AioCompletion *c, int op_flags); - void aio_read(ImageCtx *ictx, const vector >& image_extents, - char *buf, bufferlist *pbl, AioCompletion *c, int op_flags); - void aio_flush(ImageCtx *ictx, AioCompletion *c); int flush(ImageCtx *ictx); int _flush(ImageCtx *ictx); int invalidate_cache(ImageCtx *ictx); -- 2.39.5