]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: allow writeback caching
authorJosh Durgin <josh.durgin@dreamhost.com>
Mon, 9 Apr 2012 20:06:40 +0000 (13:06 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Sat, 14 Apr 2012 03:46:34 +0000 (20:46 -0700)
This uses the existing infrastructure of ObjectCacher for
buffer management and expiry.

Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
src/Makefile.am
src/common/config_opts.h
src/librbd.cc
src/librbd/LibrbdWriteback.cc [new file with mode: 0644]
src/librbd/LibrbdWriteback.h [new file with mode: 0644]

index 04a0c6f01afaa83429c8a801b0f0d7c6c81787a7..e9e0d708938251e9b33533c26a8d48aa878474a7 100644 (file)
@@ -319,10 +319,13 @@ lib_LTLIBRARIES += librgw.la
 endif
 
 # librbd
-librbd_la_SOURCES = librbd.cc
+librbd_la_SOURCES = \
+       librbd.cc \
+       librbd/LibrbdWriteback.cc \
+       osdc/ObjectCacher.cc
 librbd_la_CFLAGS = ${AM_CFLAGS}
 librbd_la_CXXFLAGS = ${AM_CXXFLAGS}
-librbd_la_LIBADD = librados.la 
+librbd_la_LIBADD = librados.la
 librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 \
        -export-symbols-regex '^rbd_.*' $(PTHREAD_LIBS) $(EXTRALIBS) 
 lib_LTLIBRARIES += librbd.la
@@ -1321,6 +1324,7 @@ noinst_HEADERS = \
        librados/IoCtxImpl.h\
        librados/PoolAsyncCompletionImpl.h\
        librados/RadosClient.h\
+       librbd/LibrbdWriteback.h\
        logrotate.conf\
        json_spirit/json_spirit.h\
        json_spirit/json_spirit_error_position.h\
index 4824710d4f46d235b68a7f7dfcc642ae360798c9..889b8391d629fb6407d1e0fda709a621b9514380 100644 (file)
@@ -363,6 +363,7 @@ OPTION(journal_queue_max_bytes, OPT_INT, 100 << 20)
 OPTION(journal_align_min_size, OPT_INT, 64 << 10)  // align data payloads >= this.
 OPTION(journal_replay_from, OPT_INT, 0)
 OPTION(journal_zero_on_create, OPT_BOOL, false)
+OPTION(rbd_cache_enabled, OPT_BOOL, false) // whether to enable writeback caching
 OPTION(rgw_cache_enabled, OPT_BOOL, true)   // rgw cache enabled
 OPTION(rgw_cache_lru_size, OPT_INT, 10000)   // num of entries in rgw cache
 OPTION(rgw_socket_path, OPT_STR, "")   // path to unix domain socket, if not specified, rgw will not run as external fcgi
index 7e3264251ad6b7304a46b44ecdcf1cf9bdd05b92..5da704e1cd5d344ec14f2ad6cc0ffef740e8be6c 100644 (file)
  *
  */
 
+#include <errno.h>
+#include <inttypes.h>
+
 #include "common/Cond.h"
 #include "common/dout.h"
 #include "common/errno.h"
 #include "common/snap_types.h"
+#include "include/Context.h"
 #include "include/rbd/librbd.hpp"
+#include "osdc/ObjectCacher.h"
 
-#include <errno.h>
-#include <inttypes.h>
+#include "librbd/LibrbdWriteback.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -46,7 +50,7 @@ namespace librbd {
 
   struct AioCompletion;
 
-  struct AioBlockCompletion {
+  struct AioBlockCompletion : Context {
     CephContext *cct;
     struct AioCompletion *completion;
     uint64_t ofs;
@@ -59,7 +63,8 @@ namespace librbd {
                       uint64_t _ofs, size_t _len, char *_buf)
       : cct(cct_), completion(aio_completion),
        ofs(_ofs), len(_len), buf(_buf) {}
-    void complete(ssize_t r);
+    virtual ~AioBlockCompletion() {}
+    virtual void finish(int r);
   };
 
   struct ImageCtx {
@@ -76,7 +81,11 @@ namespace librbd {
     bool needs_refresh;
     Mutex refresh_lock;
     Mutex lock; // protects access to snapshot and header information
+    Mutex cache_lock; // used as client_lock for the ObjectCacher
 
+    ObjectCacher *object_cacher;
+    LibrbdWriteback *writeback_handler;
+    ObjectCacher::ObjectSet *object_set;
 
     ImageCtx(std::string imgname, IoCtx& p)
       : cct((CephContext*)p.cct()), snapid(CEPH_NOSNAP),
@@ -84,12 +93,35 @@ namespace librbd {
        needs_refresh(true),
        refresh_lock("librbd::ImageCtx::refresh_lock"),
        lock("librbd::ImageCtx::lock"),
+       cache_lock("librbd::ImageCtx::cache_lock"),
+       object_cacher(NULL), writeback_handler(NULL), object_set(NULL)
     {
       md_ctx.dup(p);
       data_ctx.dup(p);
+      if (cct->_conf->rbd_cache_enabled) {
+       Mutex::Locker l(cache_lock);
+       ldout(cct, 20) << "enabling writback caching..." << dendl;
+       writeback_handler = new LibrbdWriteback(data_ctx, cache_lock);
+       object_cacher = new ObjectCacher(cct, *writeback_handler, cache_lock,
+                                        NULL, NULL);
+       object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0);
+       object_cacher->start();
+      }
     }
 
     ~ImageCtx() {
+      if (object_cacher) {
+       delete object_cacher;
+       object_cacher = NULL;
+      }
+      if (writeback_handler) {
+       delete writeback_handler;
+       writeback_handler = NULL;
+      }
+      if (object_set) {
+       delete object_set;
+       object_set = NULL;
+      }
     }
 
     int snap_set(std::string snap_name)
@@ -152,6 +184,92 @@ namespace librbd {
       }
     }
 
+    void aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
+                            uint64_t off, Context *onfinish) {
+      lock.Lock();
+      ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snapid, bl, 0);
+      lock.Unlock();
+      ObjectExtent extent(o, off, len);
+      extent.oloc.pool = data_ctx.get_id();
+      extent.buffer_extents[0] = len;
+      rd->extents.push_back(extent);
+      cache_lock.Lock();
+      int r = object_cacher->readx(rd, object_set, onfinish);
+      cache_lock.Unlock();
+      if (r > 0)
+       onfinish->complete(r);
+    }
+
+    void write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off) {
+      lock.Lock();
+      ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
+                                                               utime_t(), 0);
+      lock.Unlock();
+      ObjectExtent extent(o, off, len);
+      extent.oloc.pool = data_ctx.get_id();
+      extent.buffer_extents[0] = len;
+      wr->extents.push_back(extent);
+      {
+       Mutex::Locker l(cache_lock);
+       object_cacher->wait_for_write(len, cache_lock);
+       object_cacher->writex(wr, object_set);
+      }
+    }
+
+    int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off) {
+      int r;
+      Mutex mylock("librbd::ImageCtx::read_from_cache");
+      Cond cond;
+      bool done;
+      Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
+      aio_read_from_cache(o, bl, len, off, onfinish);
+      mylock.Lock();
+      while (!done)
+       cond.Wait(mylock);
+      mylock.Unlock();
+      return r;
+    }
+
+    void flush_cache() {
+      int r;
+      Mutex mylock("librbd::ImageCtx::flush_cache");
+      Cond cond;
+      bool done;
+      Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
+      cache_lock.Lock();
+      bool already_flushed = object_cacher->commit_set(object_set, onfinish);
+      cache_lock.Unlock();
+      if (!already_flushed) {
+       mylock.Lock();
+       while (!done) {
+         ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+         cond.Wait(mylock);
+       }
+       mylock.Unlock();
+       ldout(cct, 20) << "finished flushing cache" << dendl;
+      }
+    }
+
+    void shutdown_cache() {
+      lock.Lock();
+      invalidate_cache();
+      lock.Unlock();
+      object_cacher->stop();
+    }
+
+    void invalidate_cache() {
+      assert(lock.is_locked());
+      if (!object_cacher)
+       return;
+      cache_lock.Lock();
+      object_cacher->release_set(object_set);
+      cache_lock.Unlock();
+      flush_cache();
+      cache_lock.Lock();
+      bool unclean = object_cacher->release_set(object_set);
+      cache_lock.Unlock();
+      assert(!unclean);
+    }
   };
 
   class WatchCtx : public librados::WatchCtx {
@@ -874,6 +992,11 @@ int resize(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
     return r;
 
   Mutex::Locker l(ictx->lock);
+  if (size < ictx->header.image_size && ictx->object_cacher) {
+    // need to invalidate since we're deleting objects, and
+    // ObjectCacher doesn't track non-existent objects
+    ictx->invalidate_cache();
+  }
   resize_helper(ictx, size, prog_ctx);
 
   ldout(cct, 2) << "done." << dendl;
@@ -1085,6 +1208,11 @@ int snap_rollback(ImageCtx *ictx, const char *snap_name, ProgressContext& prog_c
     return -ENOENT;
   }
 
+  // need to flush any pending writes before resizing and rolling back -
+  // writes might create new snapshots. Rolling back will replace
+  // the current version, so we have to invalidate that too.
+  ictx->invalidate_cache();
+
   uint64_t new_size = ictx->get_image_size();
   ictx->get_snap_size(snap_name, &new_size);
   ldout(cct, 2) << "resizing to snapshot size..." << dendl;
@@ -1195,7 +1323,8 @@ int open_image(IoCtx& io_ctx, ImageCtx *ictx, const char *name, const char *snap
   CephContext *cct = (CephContext *)io_ctx.cct();
   string sn = snap_name ? snap_name : "NULL";
   ldout(cct, 20) << "open_image " << &io_ctx << " ictx =  " << ictx
-          << " name =  " << name << " snap_name = " << (snap_name ? snap_name : "NULL") << dendl;
+                << " name =  " << name << " snap_name = "
+                << (snap_name ? snap_name : "NULL") << dendl;
 
   ictx->lock.Lock();
   int r = ictx_refresh(ictx, snap_name);
@@ -1215,7 +1344,10 @@ int open_image(IoCtx& io_ctx, ImageCtx *ictx, const char *name, const char *snap
 void close_image(ImageCtx *ictx)
 {
   ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
-  flush(ictx);
+  if (ictx->object_cacher)
+    ictx->shutdown_cache(); // implicitly flushes
+  else
+    flush(ictx);
   ictx->lock.Lock();
   ictx->wctx->invalidate();
   ictx->md_ctx.unwatch(ictx->md_oid(), ictx->wctx->cookie);
@@ -1255,15 +1387,26 @@ int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
     ictx->lock.Unlock();
     uint64_t read_len = min(block_size - block_ofs, left);
 
-    map<uint64_t, uint64_t> m;
-    r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
-    if (r < 0 && r == -ENOENT)
-      r = 0;
-    if (r < 0) {
-      return r;
-    }
+    if (ictx->object_cacher) {
+      r = ictx->read_from_cache(oid, &bl, read_len, block_ofs);
+      if (r < 0 && r != -ENOENT)
+       return r;
 
-    r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read, read_len, cb, arg);
+      if (r == -ENOENT)
+       r = cb(total_read, read_len, NULL, arg);
+      else
+       r = cb(total_read, read_len, bl.c_str(), arg);
+    } else {
+      map<uint64_t, uint64_t> m;
+      r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
+      if (r < 0 && r == -ENOENT)
+       r = 0;
+      if (r < 0) {
+       return r;
+      }
+
+      r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read, read_len, cb, arg);
+    }
     if (r < 0) {
       return r;
     }
@@ -1324,11 +1467,15 @@ ssize_t write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf)
     ictx->lock.Unlock();
     uint64_t write_len = min(block_size - block_ofs, left);
     bl.append(buf + total_write, write_len);
-    r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
-    if (r < 0)
-      return r;
-    if ((uint64_t)r != write_len)
-      return -EIO;
+    if (ictx->object_cacher) {
+      ictx->write_to_cache(oid, bl, write_len, block_ofs);
+    } else {
+      r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
+      if (r < 0)
+       return r;
+      if ((uint64_t)r != write_len)
+       return -EIO;
+    }
     total_write += write_len;
     left -= write_len;
   }
@@ -1396,9 +1543,9 @@ ssize_t handle_sparse_read(CephContext *cct,
   return buf_len;
 }
 
-void AioBlockCompletion::complete(ssize_t r)
+void AioBlockCompletion::finish(int r)
 {
-  ldout(cct, 10) << "AioBlockCompletion::complete()" << dendl;
+  ldout(cct, 10) << "AioBlockCompletion::finish()" << dendl;
   if ((r >= 0 || r == -ENOENT) && buf) { // this was a sparse_read operation
     ldout(cct, 10) << "ofs=" << ofs << " len=" << len << dendl;
     r = handle_sparse_read(cct, data_bl, ofs, m, 0, len, simple_read_cb, buf);
@@ -1429,7 +1576,7 @@ void AioCompletion::complete_block(AioBlockCompletion *block_completion, ssize_t
 void rados_cb(rados_completion_t c, void *arg)
 {
   AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
-  block_completion->complete(rados_aio_get_return_value(c));
+  block_completion->finish(rados_aio_get_return_value(c));
   delete block_completion;
 }
 
@@ -1454,7 +1601,12 @@ int flush(ImageCtx *ictx)
     return r;
 
   // flush any outstanding writes
-  r = ictx->data_ctx.aio_flush();
+  if (ictx->object_cacher) {
+    ictx->flush_cache();
+    r = 0;
+  } else {
+    r = ictx->data_ctx.aio_flush();
+  }
 
   if (r)
     ldout(cct, 10) << "aio_flush " << ictx << " r = " << r << dendl;
@@ -1489,27 +1641,30 @@ int aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
 
   c->get();
   for (uint64_t i = start_block; i <= end_block; i++) {
-    AioBlockCompletion *block_completion = new AioBlockCompletion(cct, c, off, len, NULL);
-    c->add_block_completion(block_completion);
-
     ictx->lock.Lock();
     string oid = get_block_oid(ictx->header, i);
     uint64_t block_ofs = get_block_ofs(ictx->header, off + total_write);
     ictx->lock.Unlock();
-    librados::AioCompletion *rados_completion =
-      Rados::aio_create_completion(block_completion, NULL, rados_cb);
 
     uint64_t write_len = min(block_size - block_ofs, left);
     bufferlist bl;
     bl.append(buf + total_write, write_len);
-    r = ictx->data_ctx.aio_write(oid, rados_completion, bl, write_len, block_ofs);
-    rados_completion->release();
-    if (r < 0)
-      goto done;
+    if (ictx->object_cacher) {
+      // may block
+      ictx->write_to_cache(oid, bl, write_len, block_ofs);
+    } else {
+      AioBlockCompletion *block_completion = new AioBlockCompletion(cct, c, off, len, NULL);
+      c->add_block_completion(block_completion);
+      librados::AioCompletion *rados_completion =
+       Rados::aio_create_completion(block_completion, NULL, rados_cb);
+      r = ictx->data_ctx.aio_write(oid, rados_completion, bl, write_len, block_ofs);
+      rados_completion->release();
+      if (r < 0)
+       goto done;
+    }
     total_write += write_len;
     left -= write_len;
   }
-  r = 0;
 done:
   c->finish_adding_completions();
   c->put();
@@ -1521,7 +1676,7 @@ done:
 void rados_aio_sparse_read_cb(rados_completion_t c, void *arg)
 {
   AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
-  block_completion->complete(rados_aio_get_return_value(c));
+  block_completion->finish(rados_aio_get_return_value(c));
   delete block_completion;
 }
 
@@ -1564,18 +1719,25 @@ int aio_read(ImageCtx *ictx, uint64_t off, size_t len,
        new AioBlockCompletion(ictx->cct, c, block_ofs, read_len, buf + total_read);
     c->add_block_completion(block_completion);
 
-    librados::AioCompletion *rados_completion =
-      Rados::aio_create_completion(block_completion, rados_aio_sparse_read_cb, NULL);
-    r = ictx->data_ctx.aio_sparse_read(oid, rados_completion,
-                                      &block_completion->m, &block_completion->data_bl,
-                                      read_len, block_ofs);
-    rados_completion->release();
-    if (r < 0 && r == -ENOENT)
-      r = 0;
-    if (r < 0) {
-      ret = r;
-      goto done;
+    if (ictx->object_cacher) {
+      block_completion->m[block_ofs] = read_len;
+      ictx->aio_read_from_cache(oid, &block_completion->data_bl,
+                               read_len, block_ofs, block_completion);
+    } else {
+      librados::AioCompletion *rados_completion =
+       Rados::aio_create_completion(block_completion, rados_aio_sparse_read_cb, NULL);
+      r = ictx->data_ctx.aio_sparse_read(oid, rados_completion,
+                                        &block_completion->m, &block_completion->data_bl,
+                                        read_len, block_ofs);
+      rados_completion->release();
+      if (r < 0 && r == -ENOENT)
+       r = 0;
+      if (r < 0) {
+       ret = r;
+       goto done;
+      }
     }
+
     total_read += read_len;
     left -= read_len;
   }
diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc
new file mode 100644 (file)
index 0000000..e7d8100
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_context.h"
+#include "common/dout.h"
+#include "common/Mutex.h"
+#include "include/rados/librados.h"
+
+#include "LibrbdWriteback.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbdwriteback: "
+
+// If we change the librados api to use an overrideable class for callbacks
+// (like it does with watch/notify) this will be much nicer
+struct CallbackArgs {
+  CephContext *cct;
+  Context *ctx;
+  Mutex *lock;
+  CallbackArgs(CephContext *cct, Context *c, Mutex *l) :
+    cct(cct), ctx(c), lock(l) {}
+};
+
+static void librbd_writeback_librados_aio_cb(rados_completion_t c, void *arg)
+{
+  CallbackArgs *args = reinterpret_cast<CallbackArgs *>(arg);
+  ldout(args->cct, 20) << "aio_cb completing " << dendl;
+  {
+    Mutex::Locker l(*args->lock);
+    args->ctx->complete(rados_aio_get_return_value(c));
+  }
+  rados_aio_release(c);
+  ldout(args->cct, 20) << "aio_cb finished" << dendl;
+  delete args;
+}
+
+LibrbdWriteback::LibrbdWriteback(const librados::IoCtx& io, Mutex& lock)
+  : m_tid(0), m_lock(lock)
+{
+  m_ioctx.dup(io);
+}
+
+tid_t LibrbdWriteback::read(const object_t& oid,
+                           const object_locator_t& oloc,
+                           uint64_t off, uint64_t len, snapid_t snapid,
+                           bufferlist *pbl, uint64_t trunc_size,
+                           __u32 trunc_seq, Context *onfinish)
+{
+  CallbackArgs *args = new CallbackArgs((CephContext *)m_ioctx.cct(),
+                                       onfinish, &m_lock);
+  librados::AioCompletion *rados_cb =
+    librados::Rados::aio_create_completion(args, librbd_writeback_librados_aio_cb, NULL);
+
+  m_ioctx.snap_set_read(snapid.val);
+  m_ioctx.aio_read(oid.name, rados_cb, pbl, len, off);
+  return ++m_tid;
+}
+
+tid_t LibrbdWriteback::write(const object_t& oid,
+                            const object_locator_t& oloc,
+                            uint64_t off, uint64_t len,
+                            const SnapContext& snapc,
+                            const bufferlist &bl, utime_t mtime,
+                            uint64_t trunc_size, __u32 trunc_seq,
+                            Context *oncommit)
+{
+  CallbackArgs *args = new CallbackArgs((CephContext *)m_ioctx.cct(),
+                                       oncommit, &m_lock);
+  librados::AioCompletion *rados_cb =
+    librados::Rados::aio_create_completion(args, NULL, librbd_writeback_librados_aio_cb);
+  // TODO: find a way to make this less stupid
+  vector<librados::snap_t> snaps;
+  for (vector<snapid_t>::const_iterator it = snapc.snaps.begin();
+       it != snapc.snaps.end(); ++it) {
+    snaps.push_back(it->val);
+  }
+
+  m_ioctx.snap_set_read(CEPH_NOSNAP);
+  m_ioctx.selfmanaged_snap_set_write_ctx(snapc.seq.val, snaps);
+  m_ioctx.aio_write(oid.name, rados_cb, bl, len, off);
+  return ++m_tid;
+}
diff --git a/src/librbd/LibrbdWriteback.h b/src/librbd/LibrbdWriteback.h
new file mode 100644 (file)
index 0000000..e06a7d2
--- /dev/null
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef CEPH_OSDC_LIBRBDWRITEBACKHANDLER_H
+#define CEPH_OSDC_LIBRBDWRITEBACKHANDLER_H
+
+#include "include/Context.h"
+#include "include/types.h"
+#include "include/rados/librados.hpp"
+#include "osd/osd_types.h"
+#include "osdc/WritebackHandler.h"
+
+class LibrbdWriteback : public WritebackHandler {
+ public:
+  LibrbdWriteback(const librados::IoCtx& io, Mutex& lock);
+  virtual ~LibrbdWriteback() {}
+
+  // Note that oloc, trunc_size, and trunc_seq are ignored
+  virtual tid_t read(const object_t& oid, const object_locator_t& oloc,
+                    uint64_t off, uint64_t len, snapid_t snapid,
+                    bufferlist *pbl, uint64_t trunc_size,  __u32 trunc_seq,
+                    Context *onfinish);
+
+  // Note that oloc, trunc_size, and trunc_seq are ignored
+  virtual tid_t write(const object_t& oid, const object_locator_t& oloc,
+                     uint64_t off, uint64_t len, const SnapContext& snapc,
+                     const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
+                     __u32 trunc_seq, Context *oncommit);
+
+ private:
+  int m_tid;
+  Mutex& m_lock;
+  librados::IoCtx m_ioctx;
+};
+
+#endif