*
*/
+#include <errno.h>
+#include <inttypes.h>
+
#include "common/Cond.h"
#include "common/dout.h"
#include "common/errno.h"
#include "common/snap_types.h"
+#include "include/Context.h"
#include "include/rbd/librbd.hpp"
+#include "osdc/ObjectCacher.h"
-#include <errno.h>
-#include <inttypes.h>
+#include "librbd/LibrbdWriteback.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
struct AioCompletion;
- struct AioBlockCompletion {
+ struct AioBlockCompletion : Context {
CephContext *cct;
struct AioCompletion *completion;
uint64_t ofs;
uint64_t _ofs, size_t _len, char *_buf)
: cct(cct_), completion(aio_completion),
ofs(_ofs), len(_len), buf(_buf) {}
- void complete(ssize_t r);
+ virtual ~AioBlockCompletion() {}
+ virtual void finish(int r);
};
struct ImageCtx {
bool needs_refresh;
Mutex refresh_lock;
Mutex lock; // protects access to snapshot and header information
+ Mutex cache_lock; // used as client_lock for the ObjectCacher
+ ObjectCacher *object_cacher;
+ LibrbdWriteback *writeback_handler;
+ ObjectCacher::ObjectSet *object_set;
ImageCtx(std::string imgname, IoCtx& p)
: cct((CephContext*)p.cct()), snapid(CEPH_NOSNAP),
needs_refresh(true),
refresh_lock("librbd::ImageCtx::refresh_lock"),
lock("librbd::ImageCtx::lock"),
+ cache_lock("librbd::ImageCtx::cache_lock"),
+ object_cacher(NULL), writeback_handler(NULL), object_set(NULL)
{
md_ctx.dup(p);
data_ctx.dup(p);
+ if (cct->_conf->rbd_cache_enabled) {
+ Mutex::Locker l(cache_lock);
+ ldout(cct, 20) << "enabling writback caching..." << dendl;
+ writeback_handler = new LibrbdWriteback(data_ctx, cache_lock);
+ object_cacher = new ObjectCacher(cct, *writeback_handler, cache_lock,
+ NULL, NULL);
+ object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0);
+ object_cacher->start();
+ }
}
~ImageCtx() {
+ if (object_cacher) {
+ delete object_cacher;
+ object_cacher = NULL;
+ }
+ if (writeback_handler) {
+ delete writeback_handler;
+ writeback_handler = NULL;
+ }
+ if (object_set) {
+ delete object_set;
+ object_set = NULL;
+ }
}
int snap_set(std::string snap_name)
}
}
+ void aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
+ uint64_t off, Context *onfinish) {
+ lock.Lock();
+ ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snapid, bl, 0);
+ lock.Unlock();
+ ObjectExtent extent(o, off, len);
+ extent.oloc.pool = data_ctx.get_id();
+ extent.buffer_extents[0] = len;
+ rd->extents.push_back(extent);
+ cache_lock.Lock();
+ int r = object_cacher->readx(rd, object_set, onfinish);
+ cache_lock.Unlock();
+ if (r > 0)
+ onfinish->complete(r);
+ }
+
+ void write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off) {
+ lock.Lock();
+ ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
+ utime_t(), 0);
+ lock.Unlock();
+ ObjectExtent extent(o, off, len);
+ extent.oloc.pool = data_ctx.get_id();
+ extent.buffer_extents[0] = len;
+ wr->extents.push_back(extent);
+ {
+ Mutex::Locker l(cache_lock);
+ object_cacher->wait_for_write(len, cache_lock);
+ object_cacher->writex(wr, object_set);
+ }
+ }
+
+ int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off) {
+ int r;
+ Mutex mylock("librbd::ImageCtx::read_from_cache");
+ Cond cond;
+ bool done;
+ Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
+ aio_read_from_cache(o, bl, len, off, onfinish);
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ return r;
+ }
+
+ void flush_cache() {
+ int r;
+ Mutex mylock("librbd::ImageCtx::flush_cache");
+ Cond cond;
+ bool done;
+ Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
+ cache_lock.Lock();
+ bool already_flushed = object_cacher->commit_set(object_set, onfinish);
+ cache_lock.Unlock();
+ if (!already_flushed) {
+ mylock.Lock();
+ while (!done) {
+ ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+ cond.Wait(mylock);
+ }
+ mylock.Unlock();
+ ldout(cct, 20) << "finished flushing cache" << dendl;
+ }
+ }
+
+ void shutdown_cache() {
+ lock.Lock();
+ invalidate_cache();
+ lock.Unlock();
+ object_cacher->stop();
+ }
+
+ void invalidate_cache() {
+ assert(lock.is_locked());
+ if (!object_cacher)
+ return;
+ cache_lock.Lock();
+ object_cacher->release_set(object_set);
+ cache_lock.Unlock();
+ flush_cache();
+ cache_lock.Lock();
+ bool unclean = object_cacher->release_set(object_set);
+ cache_lock.Unlock();
+ assert(!unclean);
+ }
};
class WatchCtx : public librados::WatchCtx {
return r;
Mutex::Locker l(ictx->lock);
+ if (size < ictx->header.image_size && ictx->object_cacher) {
+ // need to invalidate since we're deleting objects, and
+ // ObjectCacher doesn't track non-existent objects
+ ictx->invalidate_cache();
+ }
resize_helper(ictx, size, prog_ctx);
ldout(cct, 2) << "done." << dendl;
return -ENOENT;
}
+ // need to flush any pending writes before resizing and rolling back -
+ // writes might create new snapshots. Rolling back will replace
+ // the current version, so we have to invalidate that too.
+ ictx->invalidate_cache();
+
uint64_t new_size = ictx->get_image_size();
ictx->get_snap_size(snap_name, &new_size);
ldout(cct, 2) << "resizing to snapshot size..." << dendl;
CephContext *cct = (CephContext *)io_ctx.cct();
string sn = snap_name ? snap_name : "NULL";
ldout(cct, 20) << "open_image " << &io_ctx << " ictx = " << ictx
- << " name = " << name << " snap_name = " << (snap_name ? snap_name : "NULL") << dendl;
+ << " name = " << name << " snap_name = "
+ << (snap_name ? snap_name : "NULL") << dendl;
ictx->lock.Lock();
int r = ictx_refresh(ictx, snap_name);
void close_image(ImageCtx *ictx)
{
ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
- flush(ictx);
+ if (ictx->object_cacher)
+ ictx->shutdown_cache(); // implicitly flushes
+ else
+ flush(ictx);
ictx->lock.Lock();
ictx->wctx->invalidate();
ictx->md_ctx.unwatch(ictx->md_oid(), ictx->wctx->cookie);
ictx->lock.Unlock();
uint64_t read_len = min(block_size - block_ofs, left);
- map<uint64_t, uint64_t> m;
- r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
- if (r < 0 && r == -ENOENT)
- r = 0;
- if (r < 0) {
- return r;
- }
+ if (ictx->object_cacher) {
+ r = ictx->read_from_cache(oid, &bl, read_len, block_ofs);
+ if (r < 0 && r != -ENOENT)
+ return r;
- r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read, read_len, cb, arg);
+ if (r == -ENOENT)
+ r = cb(total_read, read_len, NULL, arg);
+ else
+ r = cb(total_read, read_len, bl.c_str(), arg);
+ } else {
+ map<uint64_t, uint64_t> m;
+ r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
+ if (r < 0 && r == -ENOENT)
+ r = 0;
+ if (r < 0) {
+ return r;
+ }
+
+ r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read, read_len, cb, arg);
+ }
if (r < 0) {
return r;
}
ictx->lock.Unlock();
uint64_t write_len = min(block_size - block_ofs, left);
bl.append(buf + total_write, write_len);
- r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
- if (r < 0)
- return r;
- if ((uint64_t)r != write_len)
- return -EIO;
+ if (ictx->object_cacher) {
+ ictx->write_to_cache(oid, bl, write_len, block_ofs);
+ } else {
+ r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
+ if (r < 0)
+ return r;
+ if ((uint64_t)r != write_len)
+ return -EIO;
+ }
total_write += write_len;
left -= write_len;
}
return buf_len;
}
-void AioBlockCompletion::complete(ssize_t r)
+void AioBlockCompletion::finish(int r)
{
- ldout(cct, 10) << "AioBlockCompletion::complete()" << dendl;
+ ldout(cct, 10) << "AioBlockCompletion::finish()" << dendl;
if ((r >= 0 || r == -ENOENT) && buf) { // this was a sparse_read operation
ldout(cct, 10) << "ofs=" << ofs << " len=" << len << dendl;
r = handle_sparse_read(cct, data_bl, ofs, m, 0, len, simple_read_cb, buf);
void rados_cb(rados_completion_t c, void *arg)
{
AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
- block_completion->complete(rados_aio_get_return_value(c));
+ block_completion->finish(rados_aio_get_return_value(c));
delete block_completion;
}
return r;
// flush any outstanding writes
- r = ictx->data_ctx.aio_flush();
+ if (ictx->object_cacher) {
+ ictx->flush_cache();
+ r = 0;
+ } else {
+ r = ictx->data_ctx.aio_flush();
+ }
if (r)
ldout(cct, 10) << "aio_flush " << ictx << " r = " << r << dendl;
c->get();
for (uint64_t i = start_block; i <= end_block; i++) {
- AioBlockCompletion *block_completion = new AioBlockCompletion(cct, c, off, len, NULL);
- c->add_block_completion(block_completion);
-
ictx->lock.Lock();
string oid = get_block_oid(ictx->header, i);
uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
ictx->lock.Unlock();
- librados::AioCompletion *rados_completion =
- Rados::aio_create_completion(block_completion, NULL, rados_cb);
uint64_t write_len = min(block_size - block_ofs, left);
bufferlist bl;
bl.append(buf + total_write, write_len);
- r = ictx->data_ctx.aio_write(oid, rados_completion, bl, write_len, block_ofs);
- rados_completion->release();
- if (r < 0)
- goto done;
+ if (ictx->object_cacher) {
+ // may block
+ ictx->write_to_cache(oid, bl, write_len, block_ofs);
+ } else {
+ AioBlockCompletion *block_completion = new AioBlockCompletion(cct, c, off, len, NULL);
+ c->add_block_completion(block_completion);
+ librados::AioCompletion *rados_completion =
+ Rados::aio_create_completion(block_completion, NULL, rados_cb);
+ r = ictx->data_ctx.aio_write(oid, rados_completion, bl, write_len, block_ofs);
+ rados_completion->release();
+ if (r < 0)
+ goto done;
+ }
total_write += write_len;
left -= write_len;
}
- r = 0;
done:
c->finish_adding_completions();
c->put();
void rados_aio_sparse_read_cb(rados_completion_t c, void *arg)
{
AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
- block_completion->complete(rados_aio_get_return_value(c));
+ block_completion->finish(rados_aio_get_return_value(c));
delete block_completion;
}
new AioBlockCompletion(ictx->cct, c, block_ofs, read_len, buf + total_read);
c->add_block_completion(block_completion);
- librados::AioCompletion *rados_completion =
- Rados::aio_create_completion(block_completion, rados_aio_sparse_read_cb, NULL);
- r = ictx->data_ctx.aio_sparse_read(oid, rados_completion,
- &block_completion->m, &block_completion->data_bl,
- read_len, block_ofs);
- rados_completion->release();
- if (r < 0 && r == -ENOENT)
- r = 0;
- if (r < 0) {
- ret = r;
- goto done;
+ if (ictx->object_cacher) {
+ block_completion->m[block_ofs] = read_len;
+ ictx->aio_read_from_cache(oid, &block_completion->data_bl,
+ read_len, block_ofs, block_completion);
+ } else {
+ librados::AioCompletion *rados_completion =
+ Rados::aio_create_completion(block_completion, rados_aio_sparse_read_cb, NULL);
+ r = ictx->data_ctx.aio_sparse_read(oid, rados_completion,
+ &block_completion->m, &block_completion->data_bl,
+ read_len, block_ofs);
+ rados_completion->release();
+ if (r < 0 && r == -ENOENT)
+ r = 0;
+ if (r < 0) {
+ ret = r;
+ goto done;
+ }
}
+
total_read += read_len;
left -= read_len;
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/dout.h"
+#include "common/Mutex.h"
+#include "include/rados/librados.h"
+
+#include "LibrbdWriteback.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbdwriteback: "
+
+// If we change the librados api to use an overrideable class for callbacks
+// (like it does with watch/notify) this will be much nicer
+struct CallbackArgs {
+ CephContext *cct;
+ Context *ctx;
+ Mutex *lock;
+ CallbackArgs(CephContext *cct, Context *c, Mutex *l) :
+ cct(cct), ctx(c), lock(l) {}
+};
+
+static void librbd_writeback_librados_aio_cb(rados_completion_t c, void *arg)
+{
+ CallbackArgs *args = reinterpret_cast<CallbackArgs *>(arg);
+ ldout(args->cct, 20) << "aio_cb completing " << dendl;
+ {
+ Mutex::Locker l(*args->lock);
+ args->ctx->complete(rados_aio_get_return_value(c));
+ }
+ rados_aio_release(c);
+ ldout(args->cct, 20) << "aio_cb finished" << dendl;
+ delete args;
+}
+
+LibrbdWriteback::LibrbdWriteback(const librados::IoCtx& io, Mutex& lock)
+ : m_tid(0), m_lock(lock)
+{
+ m_ioctx.dup(io);
+}
+
+tid_t LibrbdWriteback::read(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snapid,
+ bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish)
+{
+ CallbackArgs *args = new CallbackArgs((CephContext *)m_ioctx.cct(),
+ onfinish, &m_lock);
+ librados::AioCompletion *rados_cb =
+ librados::Rados::aio_create_completion(args, librbd_writeback_librados_aio_cb, NULL);
+
+ m_ioctx.snap_set_read(snapid.val);
+ m_ioctx.aio_read(oid.name, rados_cb, pbl, len, off);
+ return ++m_tid;
+}
+
+tid_t LibrbdWriteback::write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc,
+ const bufferlist &bl, utime_t mtime,
+ uint64_t trunc_size, __u32 trunc_seq,
+ Context *oncommit)
+{
+ CallbackArgs *args = new CallbackArgs((CephContext *)m_ioctx.cct(),
+ oncommit, &m_lock);
+ librados::AioCompletion *rados_cb =
+ librados::Rados::aio_create_completion(args, NULL, librbd_writeback_librados_aio_cb);
+ // TODO: find a way to make this less stupid
+ vector<librados::snap_t> snaps;
+ for (vector<snapid_t>::const_iterator it = snapc.snaps.begin();
+ it != snapc.snaps.end(); ++it) {
+ snaps.push_back(it->val);
+ }
+
+ m_ioctx.snap_set_read(CEPH_NOSNAP);
+ m_ioctx.selfmanaged_snap_set_write_ctx(snapc.seq.val, snaps);
+ m_ioctx.aio_write(oid.name, rados_cb, bl, len, off);
+ return ++m_tid;
+}