--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/AioImageRequest.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/AioObjectRequest.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageWatcher.h"
+#include "librbd/internal.h"
+#include <boost/bind.hpp>
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::AioImageRequest: "
+
+namespace librbd {
+
+void AioImageRequest::read(
+ ImageCtx *ictx, AioCompletion *c,
+ const std::vector<std::pair<uint64_t,uint64_t> > &extents,
+ char *buf, bufferlist *pbl, int op_flags) {
+ AioImageRead req(*ictx, c, extents, buf, pbl, op_flags);
+ req.send();
+}
+
+void AioImageRequest::read(ImageCtx *ictx, AioCompletion *c, uint64_t off,
+ size_t len, char *buf, bufferlist *pbl,
+ int op_flags) {
+ AioImageRead req(*ictx, c, off, len, buf, pbl, op_flags);
+ req.send();
+}
+
+void AioImageRequest::write(ImageCtx *ictx, AioCompletion *c, uint64_t off,
+ size_t len, const char *buf, int op_flags) {
+ AioImageWrite req(*ictx, c, off, len, buf, op_flags);
+ req.send();
+}
+
+void AioImageRequest::discard(ImageCtx *ictx, AioCompletion *c, uint64_t off,
+ uint64_t len) {
+ AioImageDiscard req(*ictx, c, off, len);
+ req.send();
+}
+
+void AioImageRequest::flush(ImageCtx *ictx, AioCompletion *c) {
+ AioImageFlush req(*ictx, c);
+ req.send();
+}
+
+void AioImageRequest::send() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << get_request_type() << ": ictx=" << &m_image_ctx << ", "
+ << "completion=" << m_aio_comp << dendl;
+
+ m_aio_comp->get();
+ int r = ictx_check(&m_image_ctx);
+ if (r < 0) {
+ m_aio_comp->fail(cct, r);
+ return;
+ }
+
+ {
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ execute_request();
+ }
+}
+
+
+void AioImageRead::execute_request() {
+ CephContext *cct = m_image_ctx.cct;
+
+ if (m_image_ctx.object_cacher && m_image_ctx.readahead_max_bytes > 0 &&
+ !(m_op_flags & LIBRADOS_OP_FLAG_FADVISE_RANDOM)) {
+ readahead(&m_image_ctx, m_image_extents);
+ }
+
+ librados::snap_t snap_id;
+ map<object_t,vector<ObjectExtent> > object_extents;
+ uint64_t buffer_ofs = 0;
+ {
+ // prevent image size from changing between computing clip and recording
+ // pending async operation
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ snap_id = m_image_ctx.snap_id;
+
+ // map
+ for (vector<pair<uint64_t,uint64_t> >::const_iterator p =
+ m_image_extents.begin();
+ p != m_image_extents.end(); ++p) {
+ uint64_t len = p->second;
+ int r = clip_io(&m_image_ctx, p->first, &len);
+ if (r < 0) {
+ m_aio_comp->fail(cct, r);
+ return;
+ }
+ if (len == 0) {
+ continue;
+ }
+
+ Striper::file_to_extents(cct, m_image_ctx.format_string,
+ &m_image_ctx.layout, p->first, len, 0,
+ object_extents, buffer_ofs);
+ buffer_ofs += len;
+ }
+
+ m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_READ);
+ }
+
+ m_aio_comp->read_buf = m_buf;
+ m_aio_comp->read_buf_len = buffer_ofs;
+ m_aio_comp->read_bl = m_pbl;
+
+ for (map<object_t,vector<ObjectExtent> >::iterator p = object_extents.begin();
+ p != object_extents.end(); ++p) {
+ for (vector<ObjectExtent>::iterator q = p->second.begin();
+ q != p->second.end(); ++q) {
+ ldout(cct, 20) << " oid " << q->oid << " " << q->offset << "~"
+ << q->length << " from " << q->buffer_extents
+ << dendl;
+
+ C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp);
+ AioObjectRead *req = new AioObjectRead(&m_image_ctx, q->oid.name,
+ q->objectno, q->offset, q->length,
+ q->buffer_extents, snap_id, true,
+ req_comp, m_op_flags);
+ req_comp->set_req(req);
+
+ if (m_image_ctx.object_cacher) {
+ C_CacheRead *cache_comp = new C_CacheRead(&m_image_ctx, req);
+ m_image_ctx.aio_read_from_cache(q->oid, q->objectno, &req->data(),
+ q->length, q->offset,
+ cache_comp, m_op_flags);
+ } else {
+ req->send();
+ }
+ }
+ }
+
+ m_aio_comp->finish_adding_requests(cct);
+ m_aio_comp->put();
+
+ m_image_ctx.perfcounter->inc(l_librbd_rd);
+ m_image_ctx.perfcounter->inc(l_librbd_rd_bytes, buffer_ofs);
+}
+
+void AioImageWrite::execute_request() {
+ CephContext *cct = m_image_ctx.cct;
+
+ RWLock::RLocker md_locker(m_image_ctx.md_lock);
+
+ uint64_t clip_len = m_len;
+ ::SnapContext snapc;
+ {
+ // prevent image size from changing between computing clip and recording
+ // pending async operation
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+ m_aio_comp->fail(cct, -EROFS);
+ return;
+ }
+
+ int r = clip_io(&m_image_ctx, m_off, &clip_len);
+ if (r < 0) {
+ m_aio_comp->fail(cct, r);
+ return;
+ }
+
+ snapc = m_image_ctx.snapc;
+ m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_WRITE);
+ }
+
+ if (m_image_ctx.image_watcher->is_lock_supported() &&
+ !m_image_ctx.image_watcher->is_lock_owner()) {
+ m_image_ctx.image_watcher->request_lock(
+ boost::bind(&AioImageRequest::write, &m_image_ctx, _1, m_off, m_len,
+ m_buf, m_op_flags), m_aio_comp);
+ m_aio_comp->put();
+ return;
+ }
+
+ // map
+ vector<ObjectExtent> extents;
+ if (m_len > 0) {
+ Striper::file_to_extents(cct, m_image_ctx.format_string,
+ &m_image_ctx.layout, m_off, clip_len, 0, extents);
+ }
+
+ for (vector<ObjectExtent>::iterator p = extents.begin();
+ p != extents.end(); ++p) {
+ ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
+ << " from " << p->buffer_extents << dendl;
+ // assemble extent
+ bufferlist bl;
+ for (vector<pair<uint64_t,uint64_t> >::iterator q =
+ p->buffer_extents.begin();
+ q != p->buffer_extents.end(); ++q) {
+ bl.append(m_buf + q->first, q->second);
+ }
+
+ C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ if (m_image_ctx.object_cacher) {
+ m_image_ctx.write_to_cache(p->oid, bl, p->length, p->offset, req_comp,
+ m_op_flags);
+ } else {
+ AioObjectWrite *req = new AioObjectWrite(&m_image_ctx, p->oid.name,
+ p->objectno, p->offset, bl,
+ snapc, req_comp);
+
+ req->set_op_flags(m_op_flags);
+ req->send();
+ }
+ }
+
+ m_aio_comp->finish_adding_requests(cct);
+ m_aio_comp->put();
+
+ m_image_ctx.perfcounter->inc(l_librbd_wr);
+ m_image_ctx.perfcounter->inc(l_librbd_wr_bytes, clip_len);
+}
+
+void AioImageDiscard::execute_request() {
+ CephContext *cct = m_image_ctx.cct;
+
+ RWLock::RLocker md_locker(m_image_ctx.md_lock);
+
+ uint64_t clip_len = m_len;
+ ::SnapContext snapc;
+ {
+ // prevent image size from changing between computing clip and recording
+ // pending async operation
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+ m_aio_comp->fail(cct, -EROFS);
+ return;
+ }
+
+ int r = clip_io(&m_image_ctx, m_off, &clip_len);
+ if (r < 0) {
+ m_aio_comp->fail(cct, r);
+ return;
+ }
+
+ snapc = m_image_ctx.snapc;
+ m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_DISCARD);
+ }
+
+ if (m_image_ctx.image_watcher->is_lock_supported() &&
+ !m_image_ctx.image_watcher->is_lock_owner()) {
+ m_image_ctx.image_watcher->request_lock(
+ boost::bind(&AioImageRequest::discard, &m_image_ctx, _1, m_off, m_len),
+ m_aio_comp);
+ m_aio_comp->put();
+ return;
+ }
+
+ // map
+ vector<ObjectExtent> extents;
+ if (m_len > 0) {
+ Striper::file_to_extents(cct, m_image_ctx.format_string,
+ &m_image_ctx.layout, m_off, clip_len, 0, extents);
+ }
+
+ for (vector<ObjectExtent>::iterator p = extents.begin();
+ p != extents.end(); ++p) {
+ ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
+ << " from " << p->buffer_extents << dendl;
+ C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ AioObjectRequest *req;
+
+ if (p->length == m_image_ctx.layout.fl_object_size) {
+ req = new AioObjectRemove(&m_image_ctx, p->oid.name, p->objectno, snapc,
+ req_comp);
+ } else if (p->offset + p->length == m_image_ctx.layout.fl_object_size) {
+ req = new AioObjectTruncate(&m_image_ctx, p->oid.name, p->objectno,
+ p->offset, snapc, req_comp);
+ } else {
+ if(cct->_conf->rbd_skip_partial_discard) {
+ delete req_comp;
+ continue;
+ }
+ req = new AioObjectZero(&m_image_ctx, p->oid.name, p->objectno, p->offset,
+ p->length, snapc, req_comp);
+ }
+ req->send();
+ }
+
+ if (m_image_ctx.object_cacher) {
+ Mutex::Locker l(m_image_ctx.cache_lock);
+ m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set, extents);
+ }
+
+ m_aio_comp->finish_adding_requests(cct);
+ m_aio_comp->put();
+
+ m_image_ctx.perfcounter->inc(l_librbd_discard);
+ m_image_ctx.perfcounter->inc(l_librbd_discard_bytes, clip_len);
+}
+
+void AioImageFlush::execute_request() {
+ CephContext *cct = m_image_ctx.cct;
+
+ // TODO race condition between registering op and submitting to cache
+ // (might not be flushed -- backport needed)
+ C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp);
+ m_image_ctx.flush_async_operations(flush_ctx);
+
+ m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
+ C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ if (m_image_ctx.object_cacher != NULL) {
+ m_image_ctx.flush_cache_aio(req_comp);
+ } else {
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
+ m_image_ctx.data_ctx.aio_flush_async(rados_completion);
+ rados_completion->release();
+ }
+
+ m_aio_comp->finish_adding_requests(cct);
+ m_aio_comp->put();
+
+ m_image_ctx.perfcounter->inc(l_librbd_aio_flush);
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_H
+#define CEPH_LIBRBD_AIO_IMAGE_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include <utility>
+#include <vector>
+
+namespace librbd {
+
+class AioCompletion;
+class ImageCtx;
+
+class AioImageRequest {
+public:
+ AioImageRequest(ImageCtx &image_ctx, AioCompletion *aio_comp)
+ : m_image_ctx(image_ctx), m_aio_comp(aio_comp) {}
+ virtual ~AioImageRequest() {}
+
+ static void read(ImageCtx *ictx, AioCompletion *c,
+ const std::vector<std::pair<uint64_t,uint64_t> > &extents,
+ char *buf, bufferlist *pbl, int op_flags);
+ static void read(ImageCtx *ictx, AioCompletion *c, uint64_t off, size_t len,
+ char *buf, bufferlist *pbl, int op_flags);
+ static void write(ImageCtx *ictx, AioCompletion *c, uint64_t off, size_t len,
+ const char *buf, int op_flags);
+ static void discard(ImageCtx *ictx, AioCompletion *c, uint64_t off,
+ uint64_t len);
+ static void flush(ImageCtx *ictx, AioCompletion *c);
+
+ virtual bool is_write_op() const = 0;
+
+ void send();
+protected:
+ ImageCtx &m_image_ctx;
+ AioCompletion *m_aio_comp;
+
+ virtual void execute_request() = 0;
+ virtual const char *get_request_type() const = 0;
+};
+
+class AioImageRead : public AioImageRequest {
+public:
+ AioImageRead(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off,
+ size_t len, char *buf, bufferlist *pbl, int op_flags)
+ : AioImageRequest(image_ctx, aio_comp), m_buf(buf), m_pbl(pbl),
+ m_op_flags(op_flags) {
+ m_image_extents.push_back(std::make_pair(off, len));
+ }
+ AioImageRead(ImageCtx &image_ctx, AioCompletion *aio_comp,
+ const std::vector<std::pair<uint64_t,uint64_t> > &image_extents,
+ char *buf, bufferlist *pbl, int op_flags)
+ : AioImageRequest(image_ctx, aio_comp), m_image_extents(image_extents),
+ m_buf(buf), m_pbl(pbl), m_op_flags(op_flags) {
+ }
+
+ virtual bool is_write_op() const {
+ return false;
+ }
+protected:
+ virtual void execute_request();
+ virtual const char *get_request_type() const {
+ return "aio_read";
+ }
+private:
+ std::vector<std::pair<uint64_t,uint64_t> > m_image_extents;
+ char *m_buf;
+ bufferlist *m_pbl;
+ int m_op_flags;
+};
+
+class AioImageWrite : public AioImageRequest {
+public:
+ AioImageWrite(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off,
+ size_t len, const char *buf, int op_flags)
+ : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len), m_buf(buf),
+ m_op_flags(op_flags) {
+ }
+
+ virtual bool is_write_op() const {
+ return true;
+ }
+protected:
+ virtual void execute_request();
+ virtual const char *get_request_type() const {
+ return "aio_write";
+ }
+private:
+ uint64_t m_off;
+ uint64_t m_len;
+ const char *m_buf;
+ int m_op_flags;
+};
+
+class AioImageDiscard : public AioImageRequest {
+public:
+ AioImageDiscard(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off,
+ uint64_t len)
+ : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len) {
+ }
+
+ virtual bool is_write_op() const {
+ return true;
+ }
+protected:
+ virtual void execute_request();
+ virtual const char *get_request_type() const {
+ return "aio_discard";
+ }
+private:
+ uint64_t m_off;
+ uint64_t m_len;
+};
+
+class AioImageFlush : public AioImageRequest {
+public:
+ AioImageFlush(ImageCtx &image_ctx, AioCompletion *aio_comp)
+ : AioImageRequest(image_ctx, aio_comp) {
+ }
+
+ virtual bool is_write_op() const {
+ return false;
+ }
+protected:
+ virtual void execute_request();
+ virtual const char *get_request_type() const {
+ return "aio_flush";
+ }
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_AIO_IMAGE_REQUEST_H
#include "librbd/AioImageRequestWQ.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
namespace librbd {
-namespace {
-
-class C_AioReadWQ : public Context {
-public:
- C_AioReadWQ(ImageCtx *ictx, uint64_t off, size_t len, char *buf,
- bufferlist *pbl, AioCompletion *c, int op_flags)
- : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_pbl(pbl), m_comp(c),
- m_op_flags(op_flags) {
- }
-protected:
- virtual void finish(int r) {
- aio_read(m_ictx, m_off, m_len, m_buf, m_pbl, m_comp, m_op_flags);
- }
-private:
- ImageCtx *m_ictx;
- uint64_t m_off;
- uint64_t m_len;
- char *m_buf;
- bufferlist *m_pbl;
- AioCompletion *m_comp;
- int m_op_flags;
-};
-
-class C_AioWriteWQ : public Context {
-public:
- C_AioWriteWQ(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
- AioCompletion *c, int op_flags)
- : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_comp(c),
- m_op_flags(op_flags) {
- }
-protected:
- virtual void finish(int r) {
- aio_write(m_ictx, m_off, m_len, m_buf, m_comp, m_op_flags);
- }
-private:
- ImageCtx *m_ictx;
- uint64_t m_off;
- uint64_t m_len;
- const char *m_buf;
- AioCompletion *m_comp;
- int m_op_flags;
-};
-
-class C_AioDiscardWQ : public Context {
-public:
- C_AioDiscardWQ(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c)
- : m_ictx(ictx), m_off(off), m_len(len), m_comp(c) {
- }
-protected:
- virtual void finish(int r) {
- aio_discard(m_ictx, m_off, m_len, m_comp);
- }
-private:
- ImageCtx *m_ictx;
- uint64_t m_off;
- uint64_t m_len;
- AioCompletion *m_comp;
-};
-
-class C_AioFlushWQ : public Context {
-public:
- C_AioFlushWQ(ImageCtx *ictx, AioCompletion *c)
- : m_ictx(ictx), m_comp(c) {
- }
-protected:
- virtual void finish(int r) {
- aio_flush(m_ictx, m_comp);
- }
-private:
- ImageCtx *m_ictx;
- AioCompletion *m_comp;
-};
-
-} // anonymous namespace
-
void AioImageRequestWQ::aio_read(ImageCtx *ictx, uint64_t off, size_t len,
char *buf, bufferlist *pbl, AioCompletion *c,
int op_flags) {
c->init_time(ictx, librbd::AIO_TYPE_READ);
if (ictx->non_blocking_aio) {
- queue(new C_AioReadWQ(ictx, off, len, buf, pbl, c, op_flags),
- Metadata(false, c));
+ queue(new AioImageRead(*ictx, c, off, len, buf, pbl, op_flags));
} else {
- librbd::aio_read(ictx, off, len, buf, pbl, c, op_flags);
+ AioImageRequest::read(ictx, c, off, len, buf, pbl, op_flags);
}
}
int op_flags) {
c->init_time(ictx, librbd::AIO_TYPE_WRITE);
if (ictx->non_blocking_aio) {
- queue(new C_AioWriteWQ(ictx, off, len, buf, c, op_flags),
- Metadata(true, c));
+ queue(new AioImageWrite(*ictx, c, off, len, buf, op_flags));
} else {
- librbd::aio_write(ictx, off, len, buf, c, op_flags);
+ AioImageRequest::write(ictx, c, off, len, buf, op_flags);
}
}
AioCompletion *c) {
c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
if (ictx->non_blocking_aio) {
- queue(new C_AioDiscardWQ(ictx, off, len, c), Metadata(true, c));
+ queue(new AioImageDiscard(*ictx, c, off, len));
} else {
- librbd::aio_discard(ictx, off, len, c);
+ AioImageRequest::discard(ictx, c, off, len);
}
}
void AioImageRequestWQ::aio_flush(ImageCtx *ictx, AioCompletion *c) {
c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
if (ictx->non_blocking_aio) {
- queue(new C_AioFlushWQ(ictx, c), Metadata(false, c));
+ queue(new AioImageFlush(*ictx, c));
} else {
- librbd::aio_flush(ictx, c);
+ AioImageRequest::flush(ictx, c);
}
}
bool AioImageRequestWQ::writes_empty() const {
Mutex::Locker locker(m_lock);
- for (ceph::unordered_map<Context *, Metadata>::const_iterator it =
- m_context_metadata.begin();
- it != m_context_metadata.end(); ++it) {
- if (it->second.write_op) {
- return false;
- }
- }
- return true;
+ return (m_queued_writes > 0);
}
void *AioImageRequestWQ::_void_dequeue() {
- Context *peek_item = front();
+ AioImageRequest *peek_item = front();
if (peek_item == NULL) {
return NULL;
}
{
- Mutex::Locker locker(m_lock);
- ceph::unordered_map<Context *, Metadata>::iterator it =
- m_context_metadata.find(peek_item);
- assert(it != m_context_metadata.end());
-
- if (it->second.write_op) {
+ if (peek_item->is_write_op()) {
+ Mutex::Locker locker(m_lock);
if (m_writes_suspended) {
return NULL;
}
}
}
- Context *item = reinterpret_cast<Context *>(
- ThreadPool::PointerWQ<Context>::_void_dequeue());
+ AioImageRequest *item = reinterpret_cast<AioImageRequest *>(
+ ThreadPool::PointerWQ<AioImageRequest>::_void_dequeue());
assert(peek_item == item);
return item;
}
-void AioImageRequestWQ::process(Context *ctx) {
- Metadata metadata;
- {
- Mutex::Locker locker(m_lock);
- ceph::unordered_map<Context *, Metadata>::iterator it =
- m_context_metadata.find(ctx);
- assert(it != m_context_metadata.end());
-
- metadata = it->second;
- m_context_metadata.erase(it);
- }
-
- // TODO
- ctx->complete(0);
+void AioImageRequestWQ::process(AioImageRequest *req) {
+ req->send();
{
Mutex::Locker locker(m_lock);
- if (metadata.write_op) {
+ if (req->is_write_op()) {
+ assert(m_queued_writes > 0);
+ --m_queued_writes;
+
assert(m_in_progress_writes > 0);
if (--m_in_progress_writes == 0) {
m_cond.Signal();
}
}
}
+ delete req;
}
-void AioImageRequestWQ::queue(Context *ctx, const Metadata &metadata) {
+void AioImageRequestWQ::queue(AioImageRequest *req) {
{
Mutex::Locker locker(m_lock);
- m_context_metadata[ctx] = metadata;
+ if (req->is_write_op()) {
+ ++m_queued_writes;
+ }
}
- ThreadPool::PointerWQ<Context>::queue(ctx);
+ ThreadPool::PointerWQ<AioImageRequest>::queue(req);
}
} // namespace librbd
#include "common/WorkQueue.h"
#include "common/Mutex.h"
-#include "include/unordered_map.h"
namespace librbd {
class AioCompletion;
+class AioImageRequest;
class ImageCtx;
-class AioImageRequestWQ : protected ThreadPool::PointerWQ<Context> {
+class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
public:
AioImageRequestWQ(const string &name, time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
+ : ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
m_lock("AioImageRequestWQ::m_lock"), m_writes_suspended(false),
- m_in_progress_writes(0) {
+ m_in_progress_writes(0), m_queued_writes(0) {
}
void aio_read(ImageCtx *ictx, uint64_t off, size_t len, char *buf,
AioCompletion *c);
void aio_flush(ImageCtx *ictx, AioCompletion *c);
- using ThreadPool::PointerWQ<Context>::drain;
+ using ThreadPool::PointerWQ<AioImageRequest>::drain;
bool writes_empty() const;
inline bool writes_suspended() const {
signal();
}
protected:
- virtual void _clear() {
- ThreadPool::PointerWQ<Context>::_clear();
- m_context_metadata.clear();
- }
-
virtual void *_void_dequeue();
- virtual void process(Context *ctx);
+ virtual void process(AioImageRequest *req);
private:
- struct Metadata {
- bool write_op;
- AioCompletion *aio_comp;
-
- Metadata() : write_op(false), aio_comp(NULL) {}
- Metadata(bool _write_op, AioCompletion *_aio_comp)
- : write_op(_write_op), aio_comp(_aio_comp) {}
- };
-
mutable Mutex m_lock;
Cond m_cond;
bool m_writes_suspended;
uint32_t m_in_progress_writes;
- ceph::unordered_map<Context *, Metadata> m_context_metadata;
+ uint32_t m_queued_writes;
- void queue(Context *ctx, const Metadata &metadata);
+ void queue(AioImageRequest *req);
};
} // namespace librbd
#include "common/RWLock.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
<< " parent completion " << m_parent_completion
<< " extents " << parent_extents
<< dendl;
- aio_read(m_ictx->parent, parent_extents, NULL, &m_read_data,
- m_parent_completion, 0);
+ AioImageRequest::read(m_ictx->parent, m_parent_completion, parent_extents,
+ NULL, &m_read_data, 0);
}
/** write **/
assert(m_image_ctx == NULL);
m_image_ctx = &image_ctx;
- ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl;
+ ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl;
Mutex::Locker l(m_image_ctx->async_ops_lock);
m_image_ctx->async_ops.push_front(&m_xlist_item);
}
void AsyncOperation::add_flush_context(Context *on_finish) {
assert(m_image_ctx->async_ops_lock.is_locked());
+ ldout(m_image_ctx->cct, 20) << this << " " << __func__ << ": "
+ << "flush=" << on_finish << dendl;
m_flush_contexts.push_back(on_finish);
-}
+}
} // namespace librbd
#include "common/Mutex.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequest.h"
#include "librbd/AioObjectRequest.h"
#include "librbd/AsyncObjectThrottle.h"
#include "librbd/CopyupRequest.h"
<< ", oid " << m_oid
<< ", extents " << m_image_extents
<< dendl;
- aio_read(m_ictx->parent, m_image_extents, NULL, &m_copyup_data, comp, 0);
+ AioImageRequest::read(m_ictx->parent, comp, m_image_extents, NULL,
+ &m_copyup_data, 0);
}
void CopyupRequest::queue_send()
librbd_internal_la_SOURCES = \
librbd/AioCompletion.cc \
+ librbd/AioImageRequest.cc \
librbd/AioImageRequestWQ.cc \
librbd/AioObjectRequest.cc \
librbd/AsyncFlattenRequest.cc \
noinst_HEADERS += \
librbd/AioCompletion.h \
+ librbd/AioImageRequest.h \
librbd/AioImageRequestWQ.h \
librbd/AioObjectRequest.h \
librbd/AsyncFlattenRequest.h \
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequest.h"
#include "librbd/AioImageRequestWQ.h"
#include "librbd/AioObjectRequest.h"
#include "librbd/AsyncFlattenRequest.h"
Context *ctx = new C_CopyWrite(m_throttle, m_bl);
AioCompletion *comp = aio_create_completion_internal(ctx, rbd_ctx_cb);
- aio_write(m_dest, m_offset, m_bl->length(), m_bl->c_str(), comp, LIBRADOS_OP_FLAG_FADVISE_DONTNEED);
+ AioImageRequest::write(m_dest, comp, m_offset, m_bl->length(),
+ m_bl->c_str(), LIBRADOS_OP_FLAG_FADVISE_DONTNEED);
}
private:
SimpleThrottle *m_throttle;
bufferlist *bl = new bufferlist();
Context *ctx = new C_CopyRead(&throttle, dest, offset, bl);
AioCompletion *comp = aio_create_completion_internal(ctx, rbd_ctx_cb);
- aio_read(src, offset, len, NULL, bl, comp, fadvise_flags);
+ AioImageRequest::read(src, comp, offset, len, NULL, bl, fadvise_flags);
prog_ctx.update_progress(offset, src_size);
}
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
- aio_read(ictx, off, read_len, NULL, &bl, c, 0);
+ AioImageRequest::read(ictx, c, off, read_len, NULL, &bl, 0);
mylock.Lock();
while (!done)
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
- aio_read(ictx, image_extents, buf, pbl, c, op_flags);
+ AioImageRequest::read(ictx, c, image_extents, buf, pbl, op_flags);
mylock.Lock();
while (!done)
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
- aio_write(ictx, off, mylen, buf, c, op_flags);
+ AioImageRequest::write(ictx, c, off, mylen, buf, op_flags);
mylock.Lock();
while (!done)
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
- aio_discard(ictx, off, mylen, c);
+ AioImageRequest::discard(ictx, c, off, mylen);
mylock.Lock();
while (!done)
return 0;
}
- void aio_flush(ImageCtx *ictx, AioCompletion *c)
- {
- CephContext *cct = ictx->cct;
- ldout(cct, 20) << "aio_flush " << ictx << " completion " << c << dendl;
-
- c->get();
- int r = ictx_check(ictx);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
-
- RWLock::RLocker owner_locker(ictx->owner_lock);
- ictx->user_flushed();
-
- C_AioRequest *flush_ctx = new C_AioRequest(cct, c);
- ictx->flush_async_operations(flush_ctx);
-
- c->start_op(ictx, AIO_TYPE_FLUSH);
- C_AioRequest *req_comp = new C_AioRequest(cct, c);
- if (ictx->object_cacher) {
- ictx->flush_cache_aio(req_comp);
- } else {
- librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
- ictx->data_ctx.aio_flush_async(rados_completion);
- rados_completion->release();
- }
- c->finish_adding_requests(cct);
- c->put();
- ictx->perfcounter->inc(l_librbd_aio_flush);
- }
-
int flush(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
return r;
}
- void aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
- AioCompletion *c, int op_flags)
- {
- CephContext *cct = ictx->cct;
- ldout(cct, 20) << "aio_write " << ictx << " off = " << off << " len = "
- << len << " buf = " << (void*)buf << dendl;
-
- c->get();
- int r = ictx_check(ictx);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
-
- RWLock::RLocker owner_locker(ictx->owner_lock);
- RWLock::RLocker md_locker(ictx->md_lock);
-
- uint64_t clip_len = len;
- ::SnapContext snapc;
- {
- // prevent image size from changing between computing clip and recording
- // pending async operation
- RWLock::RLocker snap_locker(ictx->snap_lock);
- if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) {
- c->fail(cct, -EROFS);
- return;
- }
-
- r = clip_io(ictx, off, &clip_len);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
-
- snapc = ictx->snapc;
- c->start_op(ictx, AIO_TYPE_WRITE);
- }
-
- if (ictx->image_watcher->is_lock_supported() &&
- !ictx->image_watcher->is_lock_owner()) {
- c->put();
- ictx->image_watcher->request_lock(
- boost::bind(&librbd::aio_write, ictx, off, len, buf, _1, op_flags), c);
- return;
- }
-
- // map
- vector<ObjectExtent> extents;
- if (len > 0) {
- Striper::file_to_extents(ictx->cct, ictx->format_string,
- &ictx->layout, off, clip_len, 0, extents);
- }
-
- for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); ++p) {
- ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
- << " from " << p->buffer_extents << dendl;
- // assemble extent
- bufferlist bl;
- for (vector<pair<uint64_t,uint64_t> >::iterator q = p->buffer_extents.begin();
- q != p->buffer_extents.end();
- ++q) {
- bl.append(buf + q->first, q->second);
- }
-
- C_AioRequest *req_comp = new C_AioRequest(cct, c);
- if (ictx->object_cacher) {
- ictx->write_to_cache(p->oid, bl, p->length, p->offset, req_comp, op_flags);
- } else {
- AioObjectWrite *req = new AioObjectWrite(ictx, p->oid.name, p->objectno,
- p->offset, bl, snapc,
- req_comp);
-
- req->set_op_flags(op_flags);
- req->send();
- }
- }
-
- c->finish_adding_requests(ictx->cct);
- c->put();
-
- ictx->perfcounter->inc(l_librbd_wr);
- ictx->perfcounter->inc(l_librbd_wr_bytes, clip_len);
- }
-
int metadata_get(ImageCtx *ictx, const string &key, string *value)
{
CephContext *cct = ictx->cct;
return cls_client::metadata_list(&ictx->md_ctx, ictx->header_oid, start, max, pairs);
}
- void aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c)
- {
- CephContext *cct = ictx->cct;
- ldout(cct, 20) << "aio_discard " << ictx << " off = " << off << " len = "
- << len << dendl;
-
- c->get();
- int r = ictx_check(ictx);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
-
- RWLock::RLocker owner_locker(ictx->owner_lock);
- RWLock::RLocker md_locker(ictx->md_lock);
-
- uint64_t clip_len = len;
- ::SnapContext snapc;
- {
- // prevent image size from changing between computing clip and recording
- // pending async operation
- RWLock::RLocker snap_locker(ictx->snap_lock);
- if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) {
- c->fail(cct, -EROFS);
- return;
- }
-
- r = clip_io(ictx, off, &clip_len);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
-
- // TODO: check for snap
- snapc = ictx->snapc;
- c->start_op(ictx, AIO_TYPE_DISCARD);
- }
-
- if (ictx->image_watcher->is_lock_supported() &&
- !ictx->image_watcher->is_lock_owner()) {
- c->put();
- ictx->image_watcher->request_lock(
- boost::bind(&librbd::aio_discard, ictx, off, len, _1), c);
- return;
- }
-
- // map
- vector<ObjectExtent> extents;
- if (len > 0) {
- Striper::file_to_extents(ictx->cct, ictx->format_string,
- &ictx->layout, off, clip_len, 0, extents);
- }
-
- for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); ++p) {
- ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
- << " from " << p->buffer_extents << dendl;
- C_AioRequest *req_comp = new C_AioRequest(cct, c);
- AioObjectRequest *req;
-
- if (p->length == ictx->layout.fl_object_size) {
- req = new AioObjectRemove(ictx, p->oid.name, p->objectno, snapc,
- req_comp);
- } else if (p->offset + p->length == ictx->layout.fl_object_size) {
- req = new AioObjectTruncate(ictx, p->oid.name, p->objectno, p->offset,
- snapc, req_comp);
- } else {
- if(ictx->cct->_conf->rbd_skip_partial_discard) {
- delete req_comp;
- continue;
- }
- req = new AioObjectZero(ictx, p->oid.name, p->objectno, p->offset,
- p->length, snapc, req_comp);
- }
-
- req->send();
- }
-
- if (ictx->object_cacher) {
- Mutex::Locker l(ictx->cache_lock);
- ictx->object_cacher->discard_set(ictx->object_set, extents);
- }
-
- c->finish_adding_requests(ictx->cct);
- c->put();
-
- ictx->perfcounter->inc(l_librbd_discard);
- ictx->perfcounter->inc(l_librbd_discard_bytes, clip_len);
- }
-
void rbd_req_cb(completion_t cb, void *arg)
{
AioObjectRequest *req = reinterpret_cast<AioObjectRequest *>(arg);
req->complete(comp->get_return_value());
}
- void aio_read(ImageCtx *ictx, uint64_t off, size_t len,
- char *buf, bufferlist *bl,
- AioCompletion *c, int op_flags)
- {
- vector<pair<uint64_t,uint64_t> > image_extents(1);
- image_extents[0] = make_pair(off, len);
- aio_read(ictx, image_extents, buf, bl, c, op_flags);
- }
-
struct C_RBD_Readahead : public Context {
ImageCtx *ictx;
object_t oid;
}
};
- static void readahead(ImageCtx *ictx,
- const vector<pair<uint64_t,uint64_t> >& image_extents)
+ void readahead(ImageCtx *ictx,
+ const vector<pair<uint64_t,uint64_t> >& image_extents)
{
uint64_t total_bytes = 0;
for (vector<pair<uint64_t,uint64_t> >::const_iterator p = image_extents.begin();
}
}
- void aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
- char *buf, bufferlist *pbl, AioCompletion *c, int op_flags)
- {
- CephContext *cct = ictx->cct;
- ldout(cct, 20) << "aio_read " << ictx << " completion " << c << " " << image_extents << dendl;
-
- c->get();
- int r = ictx_check(ictx);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
-
- RWLock::RLocker owner_locker(ictx->owner_lock);
-
- // readahead
- if (ictx->object_cacher && ictx->readahead_max_bytes > 0 &&
- !(op_flags & LIBRADOS_OP_FLAG_FADVISE_RANDOM)) {
- readahead(ictx, image_extents);
- }
-
- snap_t snap_id;
- map<object_t,vector<ObjectExtent> > object_extents;
- uint64_t buffer_ofs = 0;
- {
- // prevent image size from changing between computing clip and recording
- // pending async operation
- RWLock::RLocker snap_locker(ictx->snap_lock);
- snap_id = ictx->snap_id;
-
- // map
- for (vector<pair<uint64_t,uint64_t> >::const_iterator p =
- image_extents.begin();
- p != image_extents.end(); ++p) {
- uint64_t len = p->second;
- r = clip_io(ictx, p->first, &len);
- if (r < 0) {
- c->fail(cct, r);
- return;
- }
- if (len == 0) {
- continue;
- }
-
- Striper::file_to_extents(cct, ictx->format_string, &ictx->layout,
- p->first, len, 0, object_extents, buffer_ofs);
- buffer_ofs += len;
- }
- c->start_op(ictx, AIO_TYPE_READ);
- }
-
- c->read_buf = buf;
- c->read_buf_len = buffer_ofs;
- c->read_bl = pbl;
-
- for (map<object_t,vector<ObjectExtent> >::iterator p = object_extents.begin();
- p != object_extents.end(); ++p) {
- for (vector<ObjectExtent>::iterator q = p->second.begin();
- q != p->second.end(); ++q) {
- ldout(ictx->cct, 20) << " oid " << q->oid << " " << q->offset << "~"
- << q->length << " from " << q->buffer_extents
- << dendl;
-
- C_AioRead *req_comp = new C_AioRead(ictx->cct, c);
- AioObjectRead *req = new AioObjectRead(ictx, q->oid.name, q->objectno,
- q->offset, q->length,
- q->buffer_extents, snap_id, true,
- req_comp, op_flags);
- req_comp->set_req(req);
-
- if (ictx->object_cacher) {
- C_CacheRead *cache_comp = new C_CacheRead(ictx, req);
- ictx->aio_read_from_cache(q->oid, q->objectno, &req->data(),
- q->length, q->offset,
- cache_comp, op_flags);
- } else {
- req->send();
- }
- }
- }
-
- c->finish_adding_requests(cct);
- c->put();
-
- ictx->perfcounter->inc(l_librbd_rd);
- ictx->perfcounter->inc(l_librbd_rd_bytes, buffer_ofs);
- }
-
AioCompletion *aio_create_completion() {
AioCompletion *c = new AioCompletion();
return c;
uint64_t len, bool include_parent, bool whole_object,
int (*cb)(uint64_t, size_t, int, void *),
void *arg);
+ void readahead(ImageCtx *ictx,
+ const vector<pair<uint64_t,uint64_t> >& image_extents);
ssize_t read(ImageCtx *ictx, uint64_t off, size_t len, char *buf, int op_flags);
ssize_t read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
char *buf, bufferlist *pbl, int op_flags);
int async_rebuild_object_map(ImageCtx *ictx, Context *ctx,
ProgressContext &prog_ctx);
- void aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
- AioCompletion *c, int op_flags);
- void aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c);
- void aio_read(ImageCtx *ictx, uint64_t off, size_t len,
- char *buf, bufferlist *pbl, AioCompletion *c, int op_flags);
- void aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
- char *buf, bufferlist *pbl, AioCompletion *c, int op_flags);
- void aio_flush(ImageCtx *ictx, AioCompletion *c);
int flush(ImageCtx *ictx);
int _flush(ImageCtx *ictx);
int invalidate_cache(ImageCtx *ictx);