]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/ReplicatedPG: implement cache-flush, cache-try-flush
authorSage Weil <sage@inktank.com>
Fri, 25 Oct 2013 05:30:50 +0000 (22:30 -0700)
committerSage Weil <sage@inktank.com>
Fri, 20 Dec 2013 00:39:58 +0000 (16:39 -0800)
Implement a rados operation that will flush a dirty object in the cache
tier by writing it back to the base tier.

Signed-off-by: Sage Weil <sage@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/test/librados/tier.cc

index 9a154cb07e9a3238e40933dcbda6d7667c2c447c..091d97690b6d68bb2e71a271f2cee924bca2ab7e 100644 (file)
@@ -1056,7 +1056,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
   }
 
   // io blocked on obc?
-  if (obc->is_blocked()) {
+  if (obc->is_blocked() &&
+      (m->get_flags() & CEPH_OSD_FLAG_FLUSH) == 0) {
     wait_for_blocked_object(obc->obs.oi.soid, op);
     return;
   }
@@ -1192,7 +1193,20 @@ void ReplicatedPG::do_op(OpRequestRef op)
   ctx->obc = obc;
   if (m->get_flags() & CEPH_OSD_FLAG_SKIPRWLOCKS) {
     dout(20) << __func__ << ": skipping rw locks" << dendl;
+  } else if (m->get_flags() & CEPH_OSD_FLAG_FLUSH) {
+    dout(20) << __func__ << ": part of flush, will ignore write lock" << dendl;
+
+    // verify there is in fact a flush in progress
+    // FIXME: we could make this a stronger test.
+    map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(obc->obs.oi.soid);
+    if (p == flush_ops.end()) {
+      dout(10) << __func__ << " no flush in progress, aborting" << dendl;
+      close_op_ctx(ctx);
+      osd->reply_op_error(op, -EINVAL);
+      return;
+    }
   } else if (!get_rw_locks(ctx)) {
+    dout(20) << __func__ << " waiting for rw locks " << dendl;
     op->mark_delayed("waiting for rw locks");
     close_op_ctx(ctx);
     return;
@@ -2519,6 +2533,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // non user-visible modifications
     case CEPH_OSD_OP_WATCH:
     case CEPH_OSD_OP_CACHE_EVICT:
+    case CEPH_OSD_OP_CACHE_FLUSH:
+    case CEPH_OSD_OP_CACHE_TRY_FLUSH:
     case CEPH_OSD_OP_UNDIRTY:
       break;
     default:
@@ -2797,6 +2813,54 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
 
+    case CEPH_OSD_OP_CACHE_TRY_FLUSH:
+      ++ctx->num_write;
+      {
+       if (ctx->lock_to_release != OpContext::NONE) {
+         dout(10) << "cache-try-flush without SKIPRWLOCKS flag set" << dendl;
+         result = -EINVAL;
+         break;
+       }
+       if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
+         result = -EINVAL;
+         break;
+       }
+       if (!obs.exists) {
+         result = 0;
+         break;
+       }
+       if (oi.is_dirty()) {
+         result = start_flush(ctx, false);
+       } else {
+         result = 0;
+       }
+      }
+      break;
+
+    case CEPH_OSD_OP_CACHE_FLUSH:
+      ++ctx->num_write;
+      {
+       if (ctx->lock_to_release == OpContext::NONE) {
+         dout(10) << "cache-flush with SKIPRWLOCKS flag set" << dendl;
+         result = -EINVAL;
+         break;
+       }
+       if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
+         result = -EINVAL;
+         break;
+       }
+       if (!obs.exists) {
+         result = 0;
+         break;
+       }
+       if (oi.is_dirty()) {
+         result = start_flush(ctx, true);
+       } else {
+         result = 0;
+       }
+      }
+      break;
+
     case CEPH_OSD_OP_CACHE_EVICT:
       ++ctx->num_write;
       {
@@ -4793,6 +4857,264 @@ void ReplicatedPG::cancel_copy_ops(bool requeue)
 }
 
 
+// ========================================================================
+// flush
+//
+// Flush a dirty object in the cache tier by writing it back to the
+// base tier.  The sequence looks like:
+//
+//  * send a copy-from operation to the base tier to copy the current
+//    version of the object
+//  * base tier will pull the object via (perhaps multiple) copy-get(s)
+//  * on completion, we check if the object has been modified.  if so,
+//    just reply with -EAGAIN.
+//  * try to take a write lock so we can clear the dirty flag.  if this
+//    fails, wait and retry
+//  * start a repop that clears the bit.
+//
+// If we have to wait, we will retry by coming back through the
+// start_flush method.  We check if a flush is already in progress
+// and, if so, try to finish it by rechecking the version and trying
+// to clear the dirty bit.
+//
+// In order for the cache-flush (a write op) to not block the copy-get
+// from reading the object, the client *must* set the SKIPRWLOCKS
+// flag.
+//
+// NOTE: normally writes are strictly ordered for the client, but
+// flushes are special in that they can be reordered with respect to
+// other writes.  In particular, we can't have a flush request block
+// an update to the cache pool object!
+
+struct C_Flush : public Context {
+  ReplicatedPGRef pg;
+  hobject_t oid;
+  epoch_t last_peering_reset;
+  tid_t tid;
+  C_Flush(ReplicatedPG *p, hobject_t o, epoch_t lpr)
+    : pg(p), oid(o), last_peering_reset(lpr),
+      tid(0)
+  {}
+  void finish(int r) {
+    pg->lock();
+    if (last_peering_reset == pg->get_last_peering_reset()) {
+      pg->finish_flush(oid, tid, r);
+    }
+    pg->unlock();
+  }
+};
+
+int ReplicatedPG::start_flush(OpContext *ctx, bool blocking)
+{
+  const object_info_t& oi = ctx->obc->obs.oi;
+  const hobject_t& soid = oi.soid;
+  dout(10) << __func__ << " " << soid
+          << " v" << oi.version
+          << " uv" << oi.user_version
+          << " " << (blocking ? "blocking" : "non-blocking/best-effort")
+          << dendl;
+
+  if (blocking)
+    ctx->obc->start_block();
+
+  map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(soid);
+  if (p != flush_ops.end()) {
+    FlushOpRef fop = p->second;
+    if (fop->ctx->op == ctx->op) {
+      // we couldn't take the write lock on a cache-try-flush before;
+      // now we are trying again for the lock.
+      close_op_ctx(fop->ctx);  // clean up the previous ctx and use the new one.
+      fop->ctx = ctx;
+      return try_flush_mark_clean(fop);
+    }
+    if (fop->flushed_version == ctx->obc->obs.oi.user_version &&
+       (fop->blocking || !blocking)) {
+      // nonblocking can join anything
+      // blocking can only join a blocking flush
+      dout(20) << __func__ << " piggybacking on existing flush " << dendl;
+      fop->dup_ops.push_back(ctx->op);
+      return -EAGAIN;   // clean up this ctx; op will retry later
+    }
+
+    // cancel current flush since it will fail anyway, or because we
+    // are blocking and the existing flush is nonblocking.
+    dout(20) << __func__ << " canceling previous flush; it will fail" << dendl;
+    if (fop->ctx->op)
+      osd->reply_op_error(fop->ctx->op, -EBUSY);
+    while (!fop->dup_ops.empty()) {
+      osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
+      fop->dup_ops.pop_front();
+    }
+    cancel_flush(fop, false);
+  }
+
+  FlushOpRef fop(new FlushOp);
+  fop->ctx = ctx;
+  fop->flushed_version = oi.user_version;
+  fop->blocking = blocking;
+
+  ObjectOperation o;
+  if (oi.is_whiteout()) {
+    fop->removal = true;
+    o.remove();
+  } else {
+    object_locator_t oloc(soid);
+    o.copy_from(soid.oid.name, soid.snap, oloc, oi.user_version,
+               CEPH_OSD_COPY_FROM_FLAG_FLUSH |
+               CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY |
+               CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE);
+  }
+  C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
+  object_locator_t base_oloc(soid);
+  base_oloc.pool = pool.info.tier_of;
+  SnapContext snapc;  // FIXME
+
+  osd->objecter_lock.Lock();
+  tid_t tid = osd->objecter->mutate(soid.oid, base_oloc, o, snapc, oi.mtime,
+                                   CEPH_OSD_FLAG_IGNORE_OVERLAY,
+                                   NULL,
+                                   new C_OnFinisher(fin,
+                                                    &osd->objecter_finisher));
+  fin->tid = tid;
+  fop->objecter_tid = tid;
+  osd->objecter_lock.Unlock();
+
+  flush_ops[soid] = fop;
+  return -EINPROGRESS;
+}
+
+void ReplicatedPG::finish_flush(hobject_t oid, tid_t tid, int r)
+{
+  dout(10) << __func__ << " " << oid << " tid " << tid
+          << " " << cpp_strerror(r) << dendl;
+  map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(oid);
+  if (p == flush_ops.end()) {
+    dout(10) << __func__ << " no flush_op found" << dendl;
+    return;
+  }
+  FlushOpRef fop = p->second;
+  if (tid != fop->objecter_tid) {
+    dout(10) << __func__ << " tid " << tid << " != fop " << fop
+            << " tid " << fop->objecter_tid << dendl;
+    return;
+  }
+  ObjectContextRef obc = fop->ctx->obc;
+  fop->objecter_tid = 0;
+
+  if (r < 0 && !(r == -ENOENT && fop->removal)) {
+    reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
+             obc->obs.oi.user_version);
+    if (!fop->dup_ops.empty()) {
+      dout(20) << __func__ << " requeueing dups" << dendl;
+      requeue_ops(fop->dup_ops);
+    }
+    flush_ops.erase(oid);
+    return;
+  }
+
+  r = try_flush_mark_clean(fop);
+  if (r == -EBUSY) {
+    reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
+             obc->obs.oi.user_version);
+  }
+}
+
+int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
+{
+  ObjectContextRef obc = fop->ctx->obc;
+  const hobject_t& oid = obc->obs.oi.soid;
+
+  if (fop->flushed_version != obc->obs.oi.user_version) {
+    dout(10) << __func__ << " flushed_version " << fop->flushed_version
+            << " != current " << obc->obs.oi.user_version
+            << dendl;
+    if (!fop->dup_ops.empty()) {
+      dout(20) << __func__ << " requeueing dups" << dendl;
+      requeue_ops(fop->dup_ops);
+    }
+    if (fop->blocking) {
+      obc->stop_block();
+      kick_object_context_blocked(obc);
+    }
+    flush_ops.erase(oid);
+    return -EBUSY;
+  }
+
+  // successfully flushed; can we clear the dirty bit?
+  if (!fop->blocking) {
+    // non-blocking: try to take the lock manually, since we don't
+    // have a ctx yet.
+    dout(20) << __func__ << " taking write lock" << dendl;
+    if (!obc->get_write(fop->ctx->op)) {
+      dout(10) << __func__ << " waiting on lock" << dendl;
+      return -EINPROGRESS;    // will retry.   this ctx is still alive!
+    }
+  } else {
+    dout(20) << __func__ << " already holding write lock: "
+            << obc->rwstate << dendl;
+    assert(obc->rwstate.state == ObjectContext::RWState::RWWRITE);
+    assert(fop->ctx->lock_to_release == OpContext::W_LOCK);
+
+    // let other writes continue
+    obc->stop_block();
+    kick_object_context_blocked(obc);
+  }
+
+  dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
+  tid_t rep_tid = osd->get_tid();
+  RepGather *repop = new_repop(fop->ctx, obc, rep_tid);
+  OpContext *ctx = fop->ctx;
+  if (!fop->blocking) {
+    ctx->lock_to_release = OpContext::W_LOCK;  // we took it above
+  }
+  ctx->at_version = get_next_version();
+
+  ctx->new_obs = obc->obs;
+  ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
+
+  finish_ctx(ctx);
+
+  if (!fop->dup_ops.empty()) {
+    dout(20) << __func__ << " queueing dups for " << ctx->at_version << dendl;
+    list<OpRequestRef>& ls = waiting_for_ondisk[ctx->at_version];
+    ls.splice(ls.end(), fop->dup_ops);
+  }
+
+  simple_repop_submit(repop);
+
+  flush_ops.erase(oid);
+  return -EINPROGRESS;
+}
+
+void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue)
+{
+  dout(10) << __func__ << " " << fop->ctx->obc->obs.oi.soid << " tid "
+          << fop->objecter_tid << dendl;
+  if (fop->objecter_tid) {
+    Mutex::Locker l(osd->objecter_lock);
+    osd->objecter->op_cancel(fop->objecter_tid);
+  }
+  if (fop->ctx->op && requeue) {
+    requeue_op(fop->ctx->op);
+    requeue_ops(fop->dup_ops);
+  }
+  if (fop->blocking) {
+    fop->ctx->obc->stop_block();
+    kick_object_context_blocked(fop->ctx->obc);
+  }
+  flush_ops.erase(fop->ctx->obc->obs.oi.soid);
+  close_op_ctx(fop->ctx);
+}
+
+void ReplicatedPG::cancel_flush_ops(bool requeue)
+{
+  dout(10) << __func__ << dendl;
+  map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
+  while (p != flush_ops.end()) {
+    cancel_flush((p++)->second, requeue);
+  }
+}
+
 // ========================================================================
 // rep op gather
 
@@ -5190,6 +5512,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe
  
 void ReplicatedPG::remove_repop(RepGather *repop)
 {
+  dout(20) << __func__ << " " << *repop << dendl;
   release_op_ctx_locks(repop->ctx);
   repop_map.erase(repop->rep_tid);
   repop->put();
@@ -5458,7 +5781,8 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
       pg_log_entry_t::LOST_REVERT));
   ObjectContextRef obc = object_contexts.lookup(soid);
   if (obc) {
-    dout(10) << "get_object_context " << obc << " " << soid << dendl;
+    dout(10) << "get_object_context " << obc << " " << soid
+            << " " << obc->rwstate << dendl;
   } else {
     // check disk
     bufferlist bv;
@@ -5496,7 +5820,9 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
     register_snapset_context(obc->ssc);
 
     populate_obc_watchers(obc);
-    dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
+    dout(10) << "get_object_context " << obc << " " << soid
+            << " " << obc->rwstate
+            << " 0 -> 1 read " << obc->obs.oi << dendl;
   }
   return obc;
 }
@@ -7573,6 +7899,7 @@ void ReplicatedPG::on_shutdown()
 
   unreg_next_scrub();
   cancel_copy_ops(false);
+  cancel_flush_ops(false);
   apply_and_flush_repops(false);
   context_registry_on_change();
 
@@ -7617,6 +7944,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
   context_registry_on_change();
 
   cancel_copy_ops(is_primary());
+  cancel_flush_ops(is_primary());
 
   // requeue object waiters
   if (is_primary()) {
index f751e05b3f3e4e83eaa420ebc43ce9c194d4ab3c..a718b6888a6246d8f5e46acfd7503b98688cf7a6 100644 (file)
@@ -234,6 +234,21 @@ public:
   };
   friend class PromoteCallback;
 
+  struct FlushOp {
+    OpContext *ctx;             ///< the parent OpContext
+    list<OpRequestRef> dup_ops; ///< dup flush requests
+    version_t flushed_version;  ///< user version we are flushing
+    tid_t objecter_tid;         ///< copy-from request tid
+    int rval;                   ///< copy-from result
+    bool blocking;              ///< whether we are blocking updates
+    bool removal;               ///< we are removing the backend object
+
+    FlushOp()
+      : ctx(NULL), objecter_tid(0), rval(0),
+       blocking(false), removal(false) {}
+  };
+  typedef boost::shared_ptr<FlushOp> FlushOpRef;
+
   boost::scoped_ptr<PGBackend> pgbackend;
   PGBackend *get_pgbackend() {
     return pgbackend.get();
@@ -1003,6 +1018,17 @@ protected:
 
   friend class C_Copyfrom;
 
+  // -- flush --
+  map<hobject_t, FlushOpRef> flush_ops;
+
+  int start_flush(OpContext *ctx, bool blocking);
+  void finish_flush(hobject_t oid, tid_t tid, int r);
+  int try_flush_mark_clean(FlushOpRef fop);
+  void cancel_flush(FlushOpRef fop, bool requeue);
+  void cancel_flush_ops(bool requeue);
+
+  friend class C_Flush;
+
   // -- scrub --
   virtual void _scrub(ScrubMap& map);
   virtual void _scrub_clear_state();
@@ -1180,6 +1206,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
       << " wfack=" << repop.waitfor_ack
     //<< " wfnvram=" << repop.waitfor_nvram
       << " wfdisk=" << repop.waitfor_disk;
+  if (repop.ctx->lock_to_release != ReplicatedPG::OpContext::NONE)
+    out << " lock=" << (int)repop.ctx->lock_to_release;
   if (repop.ctx->op)
     out << " op=" << *(repop.ctx->op->get_req());
   out << ")";
index baabe99d96a6d3464f11b2266341768a68d80349..4b4eea15f126eb1e0ce011c3a91facb402487c97 100644 (file)
@@ -13,6 +13,7 @@
 #include "global/global_init.h"
 #include "common/ceph_argparse.h"
 #include "common/common_init.h"
+#include "common/Cond.h"
 #include "test/librados/test.h"
 #include "json_spirit/json_spirit.h"
 
@@ -454,6 +455,739 @@ TEST(LibRadosTier, Evict) {
   ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
 }
 
+TEST(LibRadosTier, TryFlush) {
+  Rados cluster;
+  std::string base_pool_name = get_temp_pool_name();
+  std::string cache_pool_name = base_pool_name + "-cache";
+  ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+  ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+  IoCtx cache_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+  IoCtx base_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+  // configure cache
+  bufferlist inbl;
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+    "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+    "\", \"mode\": \"writeback\"}",
+    inbl, NULL, NULL));
+
+  // wait for maps to settle
+  cluster.wait_for_latest_osdmap();
+
+  // create object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // verify the object is present in the cache tier
+  {
+    ObjectIterator it = cache_ioctx.objects_begin();
+    ASSERT_TRUE(it != cache_ioctx.objects_end());
+    ASSERT_TRUE(it->first == string("foo"));
+    ++it;
+    ASSERT_TRUE(it == cache_ioctx.objects_end());
+  }
+
+  // verify the object is NOT present in the base tier
+  {
+    ObjectIterator it = base_ioctx.objects_begin();
+    ASSERT_TRUE(it == base_ioctx.objects_end());
+  }
+
+  // verify dirty
+  {
+    bool dirty = false;
+    int r = -1;
+    ObjectReadOperation op;
+    op.is_dirty(&dirty, &r);
+    ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+    ASSERT_TRUE(dirty);
+    ASSERT_EQ(0, r);
+  }
+
+  // flush
+  {
+    ObjectWriteOperation op;
+    op.cache_try_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+    completion->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    completion->release();
+  }
+
+  // verify clean
+  {
+    bool dirty = false;
+    int r = -1;
+    ObjectReadOperation op;
+    op.is_dirty(&dirty, &r);
+    ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+    ASSERT_FALSE(dirty);
+    ASSERT_EQ(0, r);
+  }
+
+  // verify in base tier
+  {
+    ObjectIterator it = base_ioctx.objects_begin();
+    ASSERT_TRUE(it != base_ioctx.objects_end());
+    ASSERT_TRUE(it->first == string("foo"));
+    ++it;
+    ASSERT_TRUE(it == base_ioctx.objects_end());
+  }
+
+  // evict it
+  {
+    ObjectWriteOperation op;
+    op.cache_evict();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op, librados::OPERATION_IGNORE_CACHE));
+    completion->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    completion->release();
+  }
+
+  // verify no longer in cache tier
+  {
+    ObjectIterator it = cache_ioctx.objects_begin();
+    ASSERT_TRUE(it == cache_ioctx.objects_end());
+  }
+
+  // tear down tiers
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+    "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+
+  base_ioctx.close();
+  cache_ioctx.close();
+
+  cluster.pool_delete(cache_pool_name.c_str());
+  ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+TEST(LibRadosTier, Flush) {
+  Rados cluster;
+  std::string base_pool_name = get_temp_pool_name();
+  std::string cache_pool_name = base_pool_name + "-cache";
+  ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+  ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+  IoCtx cache_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+  IoCtx base_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+  // configure cache
+  bufferlist inbl;
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+    "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+    "\", \"mode\": \"writeback\"}",
+    inbl, NULL, NULL));
+
+  // wait for maps to settle
+  cluster.wait_for_latest_osdmap();
+
+  uint64_t user_version = 0;
+
+  // create object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // verify the object is present in the cache tier
+  {
+    ObjectIterator it = cache_ioctx.objects_begin();
+    ASSERT_TRUE(it != cache_ioctx.objects_end());
+    ASSERT_TRUE(it->first == string("foo"));
+    ++it;
+    ASSERT_TRUE(it == cache_ioctx.objects_end());
+  }
+
+  // verify the object is NOT present in the base tier
+  {
+    ObjectIterator it = base_ioctx.objects_begin();
+    ASSERT_TRUE(it == base_ioctx.objects_end());
+  }
+
+  // verify dirty
+  {
+    bool dirty = false;
+    int r = -1;
+    ObjectReadOperation op;
+    op.is_dirty(&dirty, &r);
+    ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+    ASSERT_TRUE(dirty);
+    ASSERT_EQ(0, r);
+    user_version = cache_ioctx.get_last_version();
+  }
+
+  // flush
+  {
+    ObjectWriteOperation op;
+    op.cache_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY));
+    completion->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    completion->release();
+  }
+
+  // verify clean
+  {
+    bool dirty = false;
+    int r = -1;
+    ObjectReadOperation op;
+    op.is_dirty(&dirty, &r);
+    ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+    ASSERT_FALSE(dirty);
+    ASSERT_EQ(0, r);
+  }
+
+  // verify in base tier
+  {
+    ObjectIterator it = base_ioctx.objects_begin();
+    ASSERT_TRUE(it != base_ioctx.objects_end());
+    ASSERT_TRUE(it->first == string("foo"));
+    ++it;
+    ASSERT_TRUE(it == base_ioctx.objects_end());
+  }
+
+  // evict it
+  {
+    ObjectWriteOperation op;
+    op.cache_evict();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op, librados::OPERATION_IGNORE_CACHE));
+    completion->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    completion->release();
+  }
+
+  // verify no longer in cache tier
+  {
+    ObjectIterator it = cache_ioctx.objects_begin();
+    ASSERT_TRUE(it == cache_ioctx.objects_end());
+  }
+
+  // read it again and verify the version is consistent
+  {
+    bufferlist bl;
+    ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
+    ASSERT_EQ(user_version, cache_ioctx.get_last_version());
+  }
+
+  // erase it
+  {
+    ObjectWriteOperation op;
+    op.remove();
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // flush whiteout
+  {
+    ObjectWriteOperation op;
+    op.cache_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY));
+    completion->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    completion->release();
+  }
+
+  // evict
+  {
+    ObjectWriteOperation op;
+    op.cache_evict();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op, librados::OPERATION_IGNORE_CACHE));
+    completion->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    completion->release();
+  }
+
+  // verify no longer in cache tier
+  {
+    ObjectIterator it = cache_ioctx.objects_begin();
+    ASSERT_TRUE(it == cache_ioctx.objects_end());
+  }
+  // or base tier
+  {
+    ObjectIterator it = base_ioctx.objects_begin();
+    ASSERT_TRUE(it == base_ioctx.objects_end());
+  }
+
+  // tear down tiers
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+    "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+
+  base_ioctx.close();
+  cache_ioctx.close();
+
+  cluster.pool_delete(cache_pool_name.c_str());
+  ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+TEST(LibRadosTier, FlushWriteRaces) {
+  Rados cluster;
+  std::string base_pool_name = get_temp_pool_name();
+  std::string cache_pool_name = base_pool_name + "-cache";
+  ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+  ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+  IoCtx cache_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+  IoCtx base_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+  // configure cache
+  bufferlist inbl;
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+    "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+    "\", \"mode\": \"writeback\"}",
+    inbl, NULL, NULL));
+
+  // wait for maps to settle
+  cluster.wait_for_latest_osdmap();
+
+  // create/dirty object
+  bufferlist bl;
+  bl.append("hi there");
+  {
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // flush + write
+  {
+    ObjectWriteOperation op;
+    op.cache_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY));
+
+    ObjectWriteOperation op2;
+    op2.write_full(bl);
+    librados::AioCompletion *completion2 = cluster.aio_create_completion();
+    ASSERT_EQ(0, base_ioctx.aio_operate(
+      "foo", completion2, &op2, 0));
+
+    completion->wait_for_safe();
+    completion2->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    ASSERT_EQ(0, completion2->get_return_value());
+    completion->release();
+    completion2->release();
+  }
+
+  // create/dirty object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // try-flush + write
+  {
+    ObjectWriteOperation op;
+    op.cache_try_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+
+    ObjectWriteOperation op2;
+    op2.write_full(bl);
+    librados::AioCompletion *completion2 = cluster.aio_create_completion();
+    ASSERT_EQ(0, base_ioctx.aio_operate(
+      "foo", completion2, &op2, 0));
+
+    completion->wait_for_safe();
+    completion2->wait_for_safe();
+    ASSERT_EQ(-EBUSY, completion->get_return_value());
+    ASSERT_EQ(0, completion2->get_return_value());
+    completion->release();
+    completion2->release();
+  }
+
+  // tear down tiers
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+    "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+
+  base_ioctx.close();
+  cache_ioctx.close();
+
+  cluster.pool_delete(cache_pool_name.c_str());
+  ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+TEST(LibRadosTier, FlushTryFlushRaces) {
+  Rados cluster;
+  std::string base_pool_name = get_temp_pool_name();
+  std::string cache_pool_name = base_pool_name + "-cache";
+  ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+  ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+  IoCtx cache_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+  IoCtx base_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+  // configure cache
+  bufferlist inbl;
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+    "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+    "\", \"mode\": \"writeback\"}",
+    inbl, NULL, NULL));
+
+  // wait for maps to settle
+  cluster.wait_for_latest_osdmap();
+
+  // create/dirty object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // flush + flush
+  {
+    ObjectWriteOperation op;
+    op.cache_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY));
+
+    ObjectWriteOperation op2;
+    op2.cache_flush();
+    librados::AioCompletion *completion2 = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion2, &op2,
+      librados::OPERATION_IGNORE_OVERLAY));
+
+    completion->wait_for_safe();
+    completion2->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    ASSERT_EQ(0, completion2->get_return_value());
+    completion->release();
+    completion2->release();
+  }
+
+  // create/dirty object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // flush + try-flush
+  {
+    ObjectWriteOperation op;
+    op.cache_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY));
+
+    ObjectWriteOperation op2;
+    op2.cache_try_flush();
+    librados::AioCompletion *completion2 = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion2, &op2,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+
+    completion->wait_for_safe();
+    completion2->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    ASSERT_EQ(0, completion2->get_return_value());
+    completion->release();
+    completion2->release();
+  }
+
+  // create/dirty object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // try-flush + flush
+  //  (flush will not piggyback on try-flush)
+  {
+    ObjectWriteOperation op;
+    op.cache_try_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+
+    ObjectWriteOperation op2;
+    op2.cache_flush();
+    librados::AioCompletion *completion2 = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion2, &op2,
+      librados::OPERATION_IGNORE_OVERLAY));
+
+    completion->wait_for_safe();
+    completion2->wait_for_safe();
+    ASSERT_EQ(-EBUSY, completion->get_return_value());
+    ASSERT_EQ(0, completion2->get_return_value());
+    completion->release();
+    completion2->release();
+  }
+
+  // create/dirty object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // try-flush + try-flush
+  {
+    ObjectWriteOperation op;
+    op.cache_try_flush();
+    librados::AioCompletion *completion = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+
+    ObjectWriteOperation op2;
+    op2.cache_try_flush();
+    librados::AioCompletion *completion2 = cluster.aio_create_completion();
+    ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion2, &op2,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+
+    completion->wait_for_safe();
+    completion2->wait_for_safe();
+    ASSERT_EQ(0, completion->get_return_value());
+    ASSERT_EQ(0, completion2->get_return_value());
+    completion->release();
+    completion2->release();
+  }
+
+  // tear down tiers
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+    "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+
+  base_ioctx.close();
+  cache_ioctx.close();
+
+  cluster.pool_delete(cache_pool_name.c_str());
+  ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+
+IoCtx *read_ioctx = 0;
+Mutex lock("FlushReadRaces::lock");
+Cond cond;
+int max_reads = 100;
+int num_reads = 0; // in progress
+
+void flush_read_race_cb(completion_t cb, void *arg);
+
+void start_flush_read()
+{
+  //cout << " starting read" << std::endl;
+  ObjectReadOperation op;
+  op.stat(NULL, NULL, NULL);
+  librados::AioCompletion *completion =
+    librados::Rados::aio_create_completion();
+  completion->set_complete_callback(0, flush_read_race_cb);
+  read_ioctx->aio_operate("foo", completion, &op, NULL);
+}
+
+void flush_read_race_cb(completion_t cb, void *arg)
+{
+  //cout << " finished read" << std::endl;
+  lock.Lock();
+  if (num_reads > max_reads) {
+    num_reads--;
+    cond.Signal();
+  } else {
+    start_flush_read();
+  }
+  // fixme: i'm leaking cb...
+  lock.Unlock();
+}
+
+TEST(LibRadosTier, TryFlushReadRace) {
+  Rados cluster;
+  std::string base_pool_name = get_temp_pool_name();
+  std::string cache_pool_name = base_pool_name + "-cache";
+  ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+  ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+  IoCtx cache_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+  IoCtx base_ioctx;
+  ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+  // configure cache
+  bufferlist inbl;
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+    "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+    "\", \"mode\": \"writeback\"}",
+    inbl, NULL, NULL));
+
+  // wait for maps to settle
+  cluster.wait_for_latest_osdmap();
+
+  // create/dirty object
+  {
+    bufferlist bl;
+    bl.append("hi there");
+    bufferptr bp(4000000);  // make it big!
+    bp.zero();
+    bl.append(bp);
+    ObjectWriteOperation op;
+    op.write_full(bl);
+    ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+  }
+
+  // start a continuous stream of reads
+  read_ioctx = &base_ioctx;
+  lock.Lock();
+  for (int i = 0; i < max_reads; ++i) {
+    start_flush_read();
+    num_reads++;
+  }
+  lock.Unlock();
+
+  // try-flush
+  ObjectWriteOperation op;
+  op.cache_try_flush();
+  librados::AioCompletion *completion = cluster.aio_create_completion();
+  ASSERT_EQ(0, cache_ioctx.aio_operate(
+      "foo", completion, &op,
+      librados::OPERATION_IGNORE_OVERLAY |
+      librados::OPERATION_SKIPRWLOCKS));
+
+  completion->wait_for_safe();
+  ASSERT_EQ(0, completion->get_return_value());
+  completion->release();
+
+  // stop reads
+  lock.Lock();
+  max_reads = 0;
+  while (num_reads > 0)
+    cond.Wait(lock);
+  lock.Unlock();
+
+  // tear down tiers
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+    "\"}",
+    inbl, NULL, NULL));
+  ASSERT_EQ(0, cluster.mon_command(
+    "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+    "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+    inbl, NULL, NULL));
+
+  base_ioctx.close();
+  cache_ioctx.close();
+
+  cluster.pool_delete(cache_pool_name.c_str());
+  ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
 TEST(LibRadosTier, HitSetNone) {
   Rados cluster;
   std::string pool_name = get_temp_pool_name();