From: Sage Weil Date: Fri, 25 Oct 2013 05:30:50 +0000 (-0700) Subject: osd/ReplicatedPG: implement cache-flush, cache-try-flush X-Git-Tag: v0.75~45^2~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ad3b46665f743765aad9c1b4aa056be893669d7f;p=ceph.git osd/ReplicatedPG: implement cache-flush, cache-try-flush Implement a rados operation that will flush a dirty object in the cache tier by writing it back to the base tier. Signed-off-by: Sage Weil --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 9a154cb07e9a..091d97690b6d 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1056,7 +1056,8 @@ void ReplicatedPG::do_op(OpRequestRef op) } // io blocked on obc? - if (obc->is_blocked()) { + if (obc->is_blocked() && + (m->get_flags() & CEPH_OSD_FLAG_FLUSH) == 0) { wait_for_blocked_object(obc->obs.oi.soid, op); return; } @@ -1192,7 +1193,20 @@ void ReplicatedPG::do_op(OpRequestRef op) ctx->obc = obc; if (m->get_flags() & CEPH_OSD_FLAG_SKIPRWLOCKS) { dout(20) << __func__ << ": skipping rw locks" << dendl; + } else if (m->get_flags() & CEPH_OSD_FLAG_FLUSH) { + dout(20) << __func__ << ": part of flush, will ignore write lock" << dendl; + + // verify there is in fact a flush in progress + // FIXME: we could make this a stronger test. + map::iterator p = flush_ops.find(obc->obs.oi.soid); + if (p == flush_ops.end()) { + dout(10) << __func__ << " no flush in progress, aborting" << dendl; + close_op_ctx(ctx); + osd->reply_op_error(op, -EINVAL); + return; + } } else if (!get_rw_locks(ctx)) { + dout(20) << __func__ << " waiting for rw locks " << dendl; op->mark_delayed("waiting for rw locks"); close_op_ctx(ctx); return; @@ -2519,6 +2533,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // non user-visible modifications case CEPH_OSD_OP_WATCH: case CEPH_OSD_OP_CACHE_EVICT: + case CEPH_OSD_OP_CACHE_FLUSH: + case CEPH_OSD_OP_CACHE_TRY_FLUSH: case CEPH_OSD_OP_UNDIRTY: break; default: @@ -2797,6 +2813,54 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; + case CEPH_OSD_OP_CACHE_TRY_FLUSH: + ++ctx->num_write; + { + if (ctx->lock_to_release != OpContext::NONE) { + dout(10) << "cache-try-flush without SKIPRWLOCKS flag set" << dendl; + result = -EINVAL; + break; + } + if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) { + result = -EINVAL; + break; + } + if (!obs.exists) { + result = 0; + break; + } + if (oi.is_dirty()) { + result = start_flush(ctx, false); + } else { + result = 0; + } + } + break; + + case CEPH_OSD_OP_CACHE_FLUSH: + ++ctx->num_write; + { + if (ctx->lock_to_release == OpContext::NONE) { + dout(10) << "cache-flush with SKIPRWLOCKS flag set" << dendl; + result = -EINVAL; + break; + } + if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) { + result = -EINVAL; + break; + } + if (!obs.exists) { + result = 0; + break; + } + if (oi.is_dirty()) { + result = start_flush(ctx, true); + } else { + result = 0; + } + } + break; + case CEPH_OSD_OP_CACHE_EVICT: ++ctx->num_write; { @@ -4793,6 +4857,264 @@ void ReplicatedPG::cancel_copy_ops(bool requeue) } +// ======================================================================== +// flush +// +// Flush a dirty object in the cache tier by writing it back to the +// base tier. The sequence looks like: +// +// * send a copy-from operation to the base tier to copy the current +// version of the object +// * base tier will pull the object via (perhaps multiple) copy-get(s) +// * on completion, we check if the object has been modified. if so, +// just reply with -EAGAIN. +// * try to take a write lock so we can clear the dirty flag. if this +// fails, wait and retry +// * start a repop that clears the bit. +// +// If we have to wait, we will retry by coming back through the +// start_flush method. We check if a flush is already in progress +// and, if so, try to finish it by rechecking the version and trying +// to clear the dirty bit. +// +// In order for the cache-flush (a write op) to not block the copy-get +// from reading the object, the client *must* set the SKIPRWLOCKS +// flag. +// +// NOTE: normally writes are strictly ordered for the client, but +// flushes are special in that they can be reordered with respect to +// other writes. In particular, we can't have a flush request block +// an update to the cache pool object! + +struct C_Flush : public Context { + ReplicatedPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + tid_t tid; + C_Flush(ReplicatedPG *p, hobject_t o, epoch_t lpr) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0) + {} + void finish(int r) { + pg->lock(); + if (last_peering_reset == pg->get_last_peering_reset()) { + pg->finish_flush(oid, tid, r); + } + pg->unlock(); + } +}; + +int ReplicatedPG::start_flush(OpContext *ctx, bool blocking) +{ + const object_info_t& oi = ctx->obc->obs.oi; + const hobject_t& soid = oi.soid; + dout(10) << __func__ << " " << soid + << " v" << oi.version + << " uv" << oi.user_version + << " " << (blocking ? "blocking" : "non-blocking/best-effort") + << dendl; + + if (blocking) + ctx->obc->start_block(); + + map::iterator p = flush_ops.find(soid); + if (p != flush_ops.end()) { + FlushOpRef fop = p->second; + if (fop->ctx->op == ctx->op) { + // we couldn't take the write lock on a cache-try-flush before; + // now we are trying again for the lock. + close_op_ctx(fop->ctx); // clean up the previous ctx and use the new one. + fop->ctx = ctx; + return try_flush_mark_clean(fop); + } + if (fop->flushed_version == ctx->obc->obs.oi.user_version && + (fop->blocking || !blocking)) { + // nonblocking can join anything + // blocking can only join a blocking flush + dout(20) << __func__ << " piggybacking on existing flush " << dendl; + fop->dup_ops.push_back(ctx->op); + return -EAGAIN; // clean up this ctx; op will retry later + } + + // cancel current flush since it will fail anyway, or because we + // are blocking and the existing flush is nonblocking. + dout(20) << __func__ << " canceling previous flush; it will fail" << dendl; + if (fop->ctx->op) + osd->reply_op_error(fop->ctx->op, -EBUSY); + while (!fop->dup_ops.empty()) { + osd->reply_op_error(fop->dup_ops.front(), -EBUSY); + fop->dup_ops.pop_front(); + } + cancel_flush(fop, false); + } + + FlushOpRef fop(new FlushOp); + fop->ctx = ctx; + fop->flushed_version = oi.user_version; + fop->blocking = blocking; + + ObjectOperation o; + if (oi.is_whiteout()) { + fop->removal = true; + o.remove(); + } else { + object_locator_t oloc(soid); + o.copy_from(soid.oid.name, soid.snap, oloc, oi.user_version, + CEPH_OSD_COPY_FROM_FLAG_FLUSH | + CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY | + CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE); + } + C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset()); + object_locator_t base_oloc(soid); + base_oloc.pool = pool.info.tier_of; + SnapContext snapc; // FIXME + + osd->objecter_lock.Lock(); + tid_t tid = osd->objecter->mutate(soid.oid, base_oloc, o, snapc, oi.mtime, + CEPH_OSD_FLAG_IGNORE_OVERLAY, + NULL, + new C_OnFinisher(fin, + &osd->objecter_finisher)); + fin->tid = tid; + fop->objecter_tid = tid; + osd->objecter_lock.Unlock(); + + flush_ops[soid] = fop; + return -EINPROGRESS; +} + +void ReplicatedPG::finish_flush(hobject_t oid, tid_t tid, int r) +{ + dout(10) << __func__ << " " << oid << " tid " << tid + << " " << cpp_strerror(r) << dendl; + map::iterator p = flush_ops.find(oid); + if (p == flush_ops.end()) { + dout(10) << __func__ << " no flush_op found" << dendl; + return; + } + FlushOpRef fop = p->second; + if (tid != fop->objecter_tid) { + dout(10) << __func__ << " tid " << tid << " != fop " << fop + << " tid " << fop->objecter_tid << dendl; + return; + } + ObjectContextRef obc = fop->ctx->obc; + fop->objecter_tid = 0; + + if (r < 0 && !(r == -ENOENT && fop->removal)) { + reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version, + obc->obs.oi.user_version); + if (!fop->dup_ops.empty()) { + dout(20) << __func__ << " requeueing dups" << dendl; + requeue_ops(fop->dup_ops); + } + flush_ops.erase(oid); + return; + } + + r = try_flush_mark_clean(fop); + if (r == -EBUSY) { + reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version, + obc->obs.oi.user_version); + } +} + +int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop) +{ + ObjectContextRef obc = fop->ctx->obc; + const hobject_t& oid = obc->obs.oi.soid; + + if (fop->flushed_version != obc->obs.oi.user_version) { + dout(10) << __func__ << " flushed_version " << fop->flushed_version + << " != current " << obc->obs.oi.user_version + << dendl; + if (!fop->dup_ops.empty()) { + dout(20) << __func__ << " requeueing dups" << dendl; + requeue_ops(fop->dup_ops); + } + if (fop->blocking) { + obc->stop_block(); + kick_object_context_blocked(obc); + } + flush_ops.erase(oid); + return -EBUSY; + } + + // successfully flushed; can we clear the dirty bit? + if (!fop->blocking) { + // non-blocking: try to take the lock manually, since we don't + // have a ctx yet. + dout(20) << __func__ << " taking write lock" << dendl; + if (!obc->get_write(fop->ctx->op)) { + dout(10) << __func__ << " waiting on lock" << dendl; + return -EINPROGRESS; // will retry. this ctx is still alive! + } + } else { + dout(20) << __func__ << " already holding write lock: " + << obc->rwstate << dendl; + assert(obc->rwstate.state == ObjectContext::RWState::RWWRITE); + assert(fop->ctx->lock_to_release == OpContext::W_LOCK); + + // let other writes continue + obc->stop_block(); + kick_object_context_blocked(obc); + } + + dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl; + tid_t rep_tid = osd->get_tid(); + RepGather *repop = new_repop(fop->ctx, obc, rep_tid); + OpContext *ctx = fop->ctx; + if (!fop->blocking) { + ctx->lock_to_release = OpContext::W_LOCK; // we took it above + } + ctx->at_version = get_next_version(); + + ctx->new_obs = obc->obs; + ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY); + + finish_ctx(ctx); + + if (!fop->dup_ops.empty()) { + dout(20) << __func__ << " queueing dups for " << ctx->at_version << dendl; + list& ls = waiting_for_ondisk[ctx->at_version]; + ls.splice(ls.end(), fop->dup_ops); + } + + simple_repop_submit(repop); + + flush_ops.erase(oid); + return -EINPROGRESS; +} + +void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue) +{ + dout(10) << __func__ << " " << fop->ctx->obc->obs.oi.soid << " tid " + << fop->objecter_tid << dendl; + if (fop->objecter_tid) { + Mutex::Locker l(osd->objecter_lock); + osd->objecter->op_cancel(fop->objecter_tid); + } + if (fop->ctx->op && requeue) { + requeue_op(fop->ctx->op); + requeue_ops(fop->dup_ops); + } + if (fop->blocking) { + fop->ctx->obc->stop_block(); + kick_object_context_blocked(fop->ctx->obc); + } + flush_ops.erase(fop->ctx->obc->obs.oi.soid); + close_op_ctx(fop->ctx); +} + +void ReplicatedPG::cancel_flush_ops(bool requeue) +{ + dout(10) << __func__ << dendl; + map::iterator p = flush_ops.begin(); + while (p != flush_ops.end()) { + cancel_flush((p++)->second, requeue); + } +} + // ======================================================================== // rep op gather @@ -5190,6 +5512,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe void ReplicatedPG::remove_repop(RepGather *repop) { + dout(20) << __func__ << " " << *repop << dendl; release_op_ctx_locks(repop->ctx); repop_map.erase(repop->rep_tid); repop->put(); @@ -5458,7 +5781,8 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid, pg_log_entry_t::LOST_REVERT)); ObjectContextRef obc = object_contexts.lookup(soid); if (obc) { - dout(10) << "get_object_context " << obc << " " << soid << dendl; + dout(10) << "get_object_context " << obc << " " << soid + << " " << obc->rwstate << dendl; } else { // check disk bufferlist bv; @@ -5496,7 +5820,9 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid, register_snapset_context(obc->ssc); populate_obc_watchers(obc); - dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl; + dout(10) << "get_object_context " << obc << " " << soid + << " " << obc->rwstate + << " 0 -> 1 read " << obc->obs.oi << dendl; } return obc; } @@ -7573,6 +7899,7 @@ void ReplicatedPG::on_shutdown() unreg_next_scrub(); cancel_copy_ops(false); + cancel_flush_ops(false); apply_and_flush_repops(false); context_registry_on_change(); @@ -7617,6 +7944,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) context_registry_on_change(); cancel_copy_ops(is_primary()); + cancel_flush_ops(is_primary()); // requeue object waiters if (is_primary()) { diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index f751e05b3f3e..a718b6888a62 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -234,6 +234,21 @@ public: }; friend class PromoteCallback; + struct FlushOp { + OpContext *ctx; ///< the parent OpContext + list dup_ops; ///< dup flush requests + version_t flushed_version; ///< user version we are flushing + tid_t objecter_tid; ///< copy-from request tid + int rval; ///< copy-from result + bool blocking; ///< whether we are blocking updates + bool removal; ///< we are removing the backend object + + FlushOp() + : ctx(NULL), objecter_tid(0), rval(0), + blocking(false), removal(false) {} + }; + typedef boost::shared_ptr FlushOpRef; + boost::scoped_ptr pgbackend; PGBackend *get_pgbackend() { return pgbackend.get(); @@ -1003,6 +1018,17 @@ protected: friend class C_Copyfrom; + // -- flush -- + map flush_ops; + + int start_flush(OpContext *ctx, bool blocking); + void finish_flush(hobject_t oid, tid_t tid, int r); + int try_flush_mark_clean(FlushOpRef fop); + void cancel_flush(FlushOpRef fop, bool requeue); + void cancel_flush_ops(bool requeue); + + friend class C_Flush; + // -- scrub -- virtual void _scrub(ScrubMap& map); virtual void _scrub_clear_state(); @@ -1180,6 +1206,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) << " wfack=" << repop.waitfor_ack //<< " wfnvram=" << repop.waitfor_nvram << " wfdisk=" << repop.waitfor_disk; + if (repop.ctx->lock_to_release != ReplicatedPG::OpContext::NONE) + out << " lock=" << (int)repop.ctx->lock_to_release; if (repop.ctx->op) out << " op=" << *(repop.ctx->op->get_req()); out << ")"; diff --git a/src/test/librados/tier.cc b/src/test/librados/tier.cc index baabe99d96a6..4b4eea15f126 100644 --- a/src/test/librados/tier.cc +++ b/src/test/librados/tier.cc @@ -13,6 +13,7 @@ #include "global/global_init.h" #include "common/ceph_argparse.h" #include "common/common_init.h" +#include "common/Cond.h" #include "test/librados/test.h" #include "json_spirit/json_spirit.h" @@ -454,6 +455,739 @@ TEST(LibRadosTier, Evict) { ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster)); } +TEST(LibRadosTier, TryFlush) { + Rados cluster; + std::string base_pool_name = get_temp_pool_name(); + std::string cache_pool_name = base_pool_name + "-cache"; + ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster)); + ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str())); + IoCtx cache_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx)); + IoCtx base_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx)); + + // configure cache + bufferlist inbl; + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name + + "\", \"overlaypool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name + + "\", \"mode\": \"writeback\"}", + inbl, NULL, NULL)); + + // wait for maps to settle + cluster.wait_for_latest_osdmap(); + + // create object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // verify the object is present in the cache tier + { + ObjectIterator it = cache_ioctx.objects_begin(); + ASSERT_TRUE(it != cache_ioctx.objects_end()); + ASSERT_TRUE(it->first == string("foo")); + ++it; + ASSERT_TRUE(it == cache_ioctx.objects_end()); + } + + // verify the object is NOT present in the base tier + { + ObjectIterator it = base_ioctx.objects_begin(); + ASSERT_TRUE(it == base_ioctx.objects_end()); + } + + // verify dirty + { + bool dirty = false; + int r = -1; + ObjectReadOperation op; + op.is_dirty(&dirty, &r); + ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL)); + ASSERT_TRUE(dirty); + ASSERT_EQ(0, r); + } + + // flush + { + ObjectWriteOperation op; + op.cache_try_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + } + + // verify clean + { + bool dirty = false; + int r = -1; + ObjectReadOperation op; + op.is_dirty(&dirty, &r); + ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL)); + ASSERT_FALSE(dirty); + ASSERT_EQ(0, r); + } + + // verify in base tier + { + ObjectIterator it = base_ioctx.objects_begin(); + ASSERT_TRUE(it != base_ioctx.objects_end()); + ASSERT_TRUE(it->first == string("foo")); + ++it; + ASSERT_TRUE(it == base_ioctx.objects_end()); + } + + // evict it + { + ObjectWriteOperation op; + op.cache_evict(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, librados::OPERATION_IGNORE_CACHE)); + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + } + + // verify no longer in cache tier + { + ObjectIterator it = cache_ioctx.objects_begin(); + ASSERT_TRUE(it == cache_ioctx.objects_end()); + } + + // tear down tiers + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name + + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + + base_ioctx.close(); + cache_ioctx.close(); + + cluster.pool_delete(cache_pool_name.c_str()); + ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster)); +} + +TEST(LibRadosTier, Flush) { + Rados cluster; + std::string base_pool_name = get_temp_pool_name(); + std::string cache_pool_name = base_pool_name + "-cache"; + ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster)); + ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str())); + IoCtx cache_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx)); + IoCtx base_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx)); + + // configure cache + bufferlist inbl; + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name + + "\", \"overlaypool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name + + "\", \"mode\": \"writeback\"}", + inbl, NULL, NULL)); + + // wait for maps to settle + cluster.wait_for_latest_osdmap(); + + uint64_t user_version = 0; + + // create object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // verify the object is present in the cache tier + { + ObjectIterator it = cache_ioctx.objects_begin(); + ASSERT_TRUE(it != cache_ioctx.objects_end()); + ASSERT_TRUE(it->first == string("foo")); + ++it; + ASSERT_TRUE(it == cache_ioctx.objects_end()); + } + + // verify the object is NOT present in the base tier + { + ObjectIterator it = base_ioctx.objects_begin(); + ASSERT_TRUE(it == base_ioctx.objects_end()); + } + + // verify dirty + { + bool dirty = false; + int r = -1; + ObjectReadOperation op; + op.is_dirty(&dirty, &r); + ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL)); + ASSERT_TRUE(dirty); + ASSERT_EQ(0, r); + user_version = cache_ioctx.get_last_version(); + } + + // flush + { + ObjectWriteOperation op; + op.cache_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY)); + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + } + + // verify clean + { + bool dirty = false; + int r = -1; + ObjectReadOperation op; + op.is_dirty(&dirty, &r); + ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL)); + ASSERT_FALSE(dirty); + ASSERT_EQ(0, r); + } + + // verify in base tier + { + ObjectIterator it = base_ioctx.objects_begin(); + ASSERT_TRUE(it != base_ioctx.objects_end()); + ASSERT_TRUE(it->first == string("foo")); + ++it; + ASSERT_TRUE(it == base_ioctx.objects_end()); + } + + // evict it + { + ObjectWriteOperation op; + op.cache_evict(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, librados::OPERATION_IGNORE_CACHE)); + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + } + + // verify no longer in cache tier + { + ObjectIterator it = cache_ioctx.objects_begin(); + ASSERT_TRUE(it == cache_ioctx.objects_end()); + } + + // read it again and verify the version is consistent + { + bufferlist bl; + ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0)); + ASSERT_EQ(user_version, cache_ioctx.get_last_version()); + } + + // erase it + { + ObjectWriteOperation op; + op.remove(); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // flush whiteout + { + ObjectWriteOperation op; + op.cache_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY)); + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + } + + // evict + { + ObjectWriteOperation op; + op.cache_evict(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, librados::OPERATION_IGNORE_CACHE)); + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + } + + // verify no longer in cache tier + { + ObjectIterator it = cache_ioctx.objects_begin(); + ASSERT_TRUE(it == cache_ioctx.objects_end()); + } + // or base tier + { + ObjectIterator it = base_ioctx.objects_begin(); + ASSERT_TRUE(it == base_ioctx.objects_end()); + } + + // tear down tiers + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name + + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + + base_ioctx.close(); + cache_ioctx.close(); + + cluster.pool_delete(cache_pool_name.c_str()); + ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster)); +} + +TEST(LibRadosTier, FlushWriteRaces) { + Rados cluster; + std::string base_pool_name = get_temp_pool_name(); + std::string cache_pool_name = base_pool_name + "-cache"; + ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster)); + ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str())); + IoCtx cache_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx)); + IoCtx base_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx)); + + // configure cache + bufferlist inbl; + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name + + "\", \"overlaypool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name + + "\", \"mode\": \"writeback\"}", + inbl, NULL, NULL)); + + // wait for maps to settle + cluster.wait_for_latest_osdmap(); + + // create/dirty object + bufferlist bl; + bl.append("hi there"); + { + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // flush + write + { + ObjectWriteOperation op; + op.cache_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY)); + + ObjectWriteOperation op2; + op2.write_full(bl); + librados::AioCompletion *completion2 = cluster.aio_create_completion(); + ASSERT_EQ(0, base_ioctx.aio_operate( + "foo", completion2, &op2, 0)); + + completion->wait_for_safe(); + completion2->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + ASSERT_EQ(0, completion2->get_return_value()); + completion->release(); + completion2->release(); + } + + // create/dirty object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // try-flush + write + { + ObjectWriteOperation op; + op.cache_try_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + + ObjectWriteOperation op2; + op2.write_full(bl); + librados::AioCompletion *completion2 = cluster.aio_create_completion(); + ASSERT_EQ(0, base_ioctx.aio_operate( + "foo", completion2, &op2, 0)); + + completion->wait_for_safe(); + completion2->wait_for_safe(); + ASSERT_EQ(-EBUSY, completion->get_return_value()); + ASSERT_EQ(0, completion2->get_return_value()); + completion->release(); + completion2->release(); + } + + // tear down tiers + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name + + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + + base_ioctx.close(); + cache_ioctx.close(); + + cluster.pool_delete(cache_pool_name.c_str()); + ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster)); +} + +TEST(LibRadosTier, FlushTryFlushRaces) { + Rados cluster; + std::string base_pool_name = get_temp_pool_name(); + std::string cache_pool_name = base_pool_name + "-cache"; + ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster)); + ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str())); + IoCtx cache_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx)); + IoCtx base_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx)); + + // configure cache + bufferlist inbl; + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name + + "\", \"overlaypool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name + + "\", \"mode\": \"writeback\"}", + inbl, NULL, NULL)); + + // wait for maps to settle + cluster.wait_for_latest_osdmap(); + + // create/dirty object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // flush + flush + { + ObjectWriteOperation op; + op.cache_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY)); + + ObjectWriteOperation op2; + op2.cache_flush(); + librados::AioCompletion *completion2 = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion2, &op2, + librados::OPERATION_IGNORE_OVERLAY)); + + completion->wait_for_safe(); + completion2->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + ASSERT_EQ(0, completion2->get_return_value()); + completion->release(); + completion2->release(); + } + + // create/dirty object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // flush + try-flush + { + ObjectWriteOperation op; + op.cache_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY)); + + ObjectWriteOperation op2; + op2.cache_try_flush(); + librados::AioCompletion *completion2 = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion2, &op2, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + + completion->wait_for_safe(); + completion2->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + ASSERT_EQ(0, completion2->get_return_value()); + completion->release(); + completion2->release(); + } + + // create/dirty object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // try-flush + flush + // (flush will not piggyback on try-flush) + { + ObjectWriteOperation op; + op.cache_try_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + + ObjectWriteOperation op2; + op2.cache_flush(); + librados::AioCompletion *completion2 = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion2, &op2, + librados::OPERATION_IGNORE_OVERLAY)); + + completion->wait_for_safe(); + completion2->wait_for_safe(); + ASSERT_EQ(-EBUSY, completion->get_return_value()); + ASSERT_EQ(0, completion2->get_return_value()); + completion->release(); + completion2->release(); + } + + // create/dirty object + { + bufferlist bl; + bl.append("hi there"); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // try-flush + try-flush + { + ObjectWriteOperation op; + op.cache_try_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + + ObjectWriteOperation op2; + op2.cache_try_flush(); + librados::AioCompletion *completion2 = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion2, &op2, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + + completion->wait_for_safe(); + completion2->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + ASSERT_EQ(0, completion2->get_return_value()); + completion->release(); + completion2->release(); + } + + // tear down tiers + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name + + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + + base_ioctx.close(); + cache_ioctx.close(); + + cluster.pool_delete(cache_pool_name.c_str()); + ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster)); +} + + +IoCtx *read_ioctx = 0; +Mutex lock("FlushReadRaces::lock"); +Cond cond; +int max_reads = 100; +int num_reads = 0; // in progress + +void flush_read_race_cb(completion_t cb, void *arg); + +void start_flush_read() +{ + //cout << " starting read" << std::endl; + ObjectReadOperation op; + op.stat(NULL, NULL, NULL); + librados::AioCompletion *completion = + librados::Rados::aio_create_completion(); + completion->set_complete_callback(0, flush_read_race_cb); + read_ioctx->aio_operate("foo", completion, &op, NULL); +} + +void flush_read_race_cb(completion_t cb, void *arg) +{ + //cout << " finished read" << std::endl; + lock.Lock(); + if (num_reads > max_reads) { + num_reads--; + cond.Signal(); + } else { + start_flush_read(); + } + // fixme: i'm leaking cb... + lock.Unlock(); +} + +TEST(LibRadosTier, TryFlushReadRace) { + Rados cluster; + std::string base_pool_name = get_temp_pool_name(); + std::string cache_pool_name = base_pool_name + "-cache"; + ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster)); + ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str())); + IoCtx cache_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx)); + IoCtx base_ioctx; + ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx)); + + // configure cache + bufferlist inbl; + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name + + "\", \"overlaypool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name + + "\", \"mode\": \"writeback\"}", + inbl, NULL, NULL)); + + // wait for maps to settle + cluster.wait_for_latest_osdmap(); + + // create/dirty object + { + bufferlist bl; + bl.append("hi there"); + bufferptr bp(4000000); // make it big! + bp.zero(); + bl.append(bp); + ObjectWriteOperation op; + op.write_full(bl); + ASSERT_EQ(0, base_ioctx.operate("foo", &op)); + } + + // start a continuous stream of reads + read_ioctx = &base_ioctx; + lock.Lock(); + for (int i = 0; i < max_reads; ++i) { + start_flush_read(); + num_reads++; + } + lock.Unlock(); + + // try-flush + ObjectWriteOperation op; + op.cache_try_flush(); + librados::AioCompletion *completion = cluster.aio_create_completion(); + ASSERT_EQ(0, cache_ioctx.aio_operate( + "foo", completion, &op, + librados::OPERATION_IGNORE_OVERLAY | + librados::OPERATION_SKIPRWLOCKS)); + + completion->wait_for_safe(); + ASSERT_EQ(0, completion->get_return_value()); + completion->release(); + + // stop reads + lock.Lock(); + max_reads = 0; + while (num_reads > 0) + cond.Wait(lock); + lock.Unlock(); + + // tear down tiers + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name + + "\"}", + inbl, NULL, NULL)); + ASSERT_EQ(0, cluster.mon_command( + "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name + + "\", \"tierpool\": \"" + cache_pool_name + "\"}", + inbl, NULL, NULL)); + + base_ioctx.close(); + cache_ioctx.close(); + + cluster.pool_delete(cache_pool_name.c_str()); + ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster)); +} + TEST(LibRadosTier, HitSetNone) { Rados cluster; std::string pool_name = get_temp_pool_name();