}
// io blocked on obc?
- if (obc->is_blocked()) {
+ if (obc->is_blocked() &&
+ (m->get_flags() & CEPH_OSD_FLAG_FLUSH) == 0) {
wait_for_blocked_object(obc->obs.oi.soid, op);
return;
}
ctx->obc = obc;
if (m->get_flags() & CEPH_OSD_FLAG_SKIPRWLOCKS) {
dout(20) << __func__ << ": skipping rw locks" << dendl;
+ } else if (m->get_flags() & CEPH_OSD_FLAG_FLUSH) {
+ dout(20) << __func__ << ": part of flush, will ignore write lock" << dendl;
+
+ // verify there is in fact a flush in progress
+ // FIXME: we could make this a stronger test.
+ map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(obc->obs.oi.soid);
+ if (p == flush_ops.end()) {
+ dout(10) << __func__ << " no flush in progress, aborting" << dendl;
+ close_op_ctx(ctx);
+ osd->reply_op_error(op, -EINVAL);
+ return;
+ }
} else if (!get_rw_locks(ctx)) {
+ dout(20) << __func__ << " waiting for rw locks " << dendl;
op->mark_delayed("waiting for rw locks");
close_op_ctx(ctx);
return;
// non user-visible modifications
case CEPH_OSD_OP_WATCH:
case CEPH_OSD_OP_CACHE_EVICT:
+ case CEPH_OSD_OP_CACHE_FLUSH:
+ case CEPH_OSD_OP_CACHE_TRY_FLUSH:
case CEPH_OSD_OP_UNDIRTY:
break;
default:
}
break;
+ case CEPH_OSD_OP_CACHE_TRY_FLUSH:
+ ++ctx->num_write;
+ {
+ if (ctx->lock_to_release != OpContext::NONE) {
+ dout(10) << "cache-try-flush without SKIPRWLOCKS flag set" << dendl;
+ result = -EINVAL;
+ break;
+ }
+ if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
+ result = -EINVAL;
+ break;
+ }
+ if (!obs.exists) {
+ result = 0;
+ break;
+ }
+ if (oi.is_dirty()) {
+ result = start_flush(ctx, false);
+ } else {
+ result = 0;
+ }
+ }
+ break;
+
+ case CEPH_OSD_OP_CACHE_FLUSH:
+ ++ctx->num_write;
+ {
+ if (ctx->lock_to_release == OpContext::NONE) {
+ dout(10) << "cache-flush with SKIPRWLOCKS flag set" << dendl;
+ result = -EINVAL;
+ break;
+ }
+ if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
+ result = -EINVAL;
+ break;
+ }
+ if (!obs.exists) {
+ result = 0;
+ break;
+ }
+ if (oi.is_dirty()) {
+ result = start_flush(ctx, true);
+ } else {
+ result = 0;
+ }
+ }
+ break;
+
case CEPH_OSD_OP_CACHE_EVICT:
++ctx->num_write;
{
}
+// ========================================================================
+// flush
+//
+// Flush a dirty object in the cache tier by writing it back to the
+// base tier. The sequence looks like:
+//
+// * send a copy-from operation to the base tier to copy the current
+// version of the object
+// * base tier will pull the object via (perhaps multiple) copy-get(s)
+// * on completion, we check if the object has been modified. if so,
+// just reply with -EAGAIN.
+// * try to take a write lock so we can clear the dirty flag. if this
+// fails, wait and retry
+// * start a repop that clears the bit.
+//
+// If we have to wait, we will retry by coming back through the
+// start_flush method. We check if a flush is already in progress
+// and, if so, try to finish it by rechecking the version and trying
+// to clear the dirty bit.
+//
+// In order for the cache-flush (a write op) to not block the copy-get
+// from reading the object, the client *must* set the SKIPRWLOCKS
+// flag.
+//
+// NOTE: normally writes are strictly ordered for the client, but
+// flushes are special in that they can be reordered with respect to
+// other writes. In particular, we can't have a flush request block
+// an update to the cache pool object!
+
+struct C_Flush : public Context {
+ ReplicatedPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ tid_t tid;
+ C_Flush(ReplicatedPG *p, hobject_t o, epoch_t lpr)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0)
+ {}
+ void finish(int r) {
+ pg->lock();
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ pg->finish_flush(oid, tid, r);
+ }
+ pg->unlock();
+ }
+};
+
+int ReplicatedPG::start_flush(OpContext *ctx, bool blocking)
+{
+ const object_info_t& oi = ctx->obc->obs.oi;
+ const hobject_t& soid = oi.soid;
+ dout(10) << __func__ << " " << soid
+ << " v" << oi.version
+ << " uv" << oi.user_version
+ << " " << (blocking ? "blocking" : "non-blocking/best-effort")
+ << dendl;
+
+ if (blocking)
+ ctx->obc->start_block();
+
+ map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(soid);
+ if (p != flush_ops.end()) {
+ FlushOpRef fop = p->second;
+ if (fop->ctx->op == ctx->op) {
+ // we couldn't take the write lock on a cache-try-flush before;
+ // now we are trying again for the lock.
+ close_op_ctx(fop->ctx); // clean up the previous ctx and use the new one.
+ fop->ctx = ctx;
+ return try_flush_mark_clean(fop);
+ }
+ if (fop->flushed_version == ctx->obc->obs.oi.user_version &&
+ (fop->blocking || !blocking)) {
+ // nonblocking can join anything
+ // blocking can only join a blocking flush
+ dout(20) << __func__ << " piggybacking on existing flush " << dendl;
+ fop->dup_ops.push_back(ctx->op);
+ return -EAGAIN; // clean up this ctx; op will retry later
+ }
+
+ // cancel current flush since it will fail anyway, or because we
+ // are blocking and the existing flush is nonblocking.
+ dout(20) << __func__ << " canceling previous flush; it will fail" << dendl;
+ if (fop->ctx->op)
+ osd->reply_op_error(fop->ctx->op, -EBUSY);
+ while (!fop->dup_ops.empty()) {
+ osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
+ fop->dup_ops.pop_front();
+ }
+ cancel_flush(fop, false);
+ }
+
+ FlushOpRef fop(new FlushOp);
+ fop->ctx = ctx;
+ fop->flushed_version = oi.user_version;
+ fop->blocking = blocking;
+
+ ObjectOperation o;
+ if (oi.is_whiteout()) {
+ fop->removal = true;
+ o.remove();
+ } else {
+ object_locator_t oloc(soid);
+ o.copy_from(soid.oid.name, soid.snap, oloc, oi.user_version,
+ CEPH_OSD_COPY_FROM_FLAG_FLUSH |
+ CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY |
+ CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE);
+ }
+ C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
+ object_locator_t base_oloc(soid);
+ base_oloc.pool = pool.info.tier_of;
+ SnapContext snapc; // FIXME
+
+ osd->objecter_lock.Lock();
+ tid_t tid = osd->objecter->mutate(soid.oid, base_oloc, o, snapc, oi.mtime,
+ CEPH_OSD_FLAG_IGNORE_OVERLAY,
+ NULL,
+ new C_OnFinisher(fin,
+ &osd->objecter_finisher));
+ fin->tid = tid;
+ fop->objecter_tid = tid;
+ osd->objecter_lock.Unlock();
+
+ flush_ops[soid] = fop;
+ return -EINPROGRESS;
+}
+
+void ReplicatedPG::finish_flush(hobject_t oid, tid_t tid, int r)
+{
+ dout(10) << __func__ << " " << oid << " tid " << tid
+ << " " << cpp_strerror(r) << dendl;
+ map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(oid);
+ if (p == flush_ops.end()) {
+ dout(10) << __func__ << " no flush_op found" << dendl;
+ return;
+ }
+ FlushOpRef fop = p->second;
+ if (tid != fop->objecter_tid) {
+ dout(10) << __func__ << " tid " << tid << " != fop " << fop
+ << " tid " << fop->objecter_tid << dendl;
+ return;
+ }
+ ObjectContextRef obc = fop->ctx->obc;
+ fop->objecter_tid = 0;
+
+ if (r < 0 && !(r == -ENOENT && fop->removal)) {
+ reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
+ obc->obs.oi.user_version);
+ if (!fop->dup_ops.empty()) {
+ dout(20) << __func__ << " requeueing dups" << dendl;
+ requeue_ops(fop->dup_ops);
+ }
+ flush_ops.erase(oid);
+ return;
+ }
+
+ r = try_flush_mark_clean(fop);
+ if (r == -EBUSY) {
+ reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
+ obc->obs.oi.user_version);
+ }
+}
+
+int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
+{
+ ObjectContextRef obc = fop->ctx->obc;
+ const hobject_t& oid = obc->obs.oi.soid;
+
+ if (fop->flushed_version != obc->obs.oi.user_version) {
+ dout(10) << __func__ << " flushed_version " << fop->flushed_version
+ << " != current " << obc->obs.oi.user_version
+ << dendl;
+ if (!fop->dup_ops.empty()) {
+ dout(20) << __func__ << " requeueing dups" << dendl;
+ requeue_ops(fop->dup_ops);
+ }
+ if (fop->blocking) {
+ obc->stop_block();
+ kick_object_context_blocked(obc);
+ }
+ flush_ops.erase(oid);
+ return -EBUSY;
+ }
+
+ // successfully flushed; can we clear the dirty bit?
+ if (!fop->blocking) {
+ // non-blocking: try to take the lock manually, since we don't
+ // have a ctx yet.
+ dout(20) << __func__ << " taking write lock" << dendl;
+ if (!obc->get_write(fop->ctx->op)) {
+ dout(10) << __func__ << " waiting on lock" << dendl;
+ return -EINPROGRESS; // will retry. this ctx is still alive!
+ }
+ } else {
+ dout(20) << __func__ << " already holding write lock: "
+ << obc->rwstate << dendl;
+ assert(obc->rwstate.state == ObjectContext::RWState::RWWRITE);
+ assert(fop->ctx->lock_to_release == OpContext::W_LOCK);
+
+ // let other writes continue
+ obc->stop_block();
+ kick_object_context_blocked(obc);
+ }
+
+ dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
+ tid_t rep_tid = osd->get_tid();
+ RepGather *repop = new_repop(fop->ctx, obc, rep_tid);
+ OpContext *ctx = fop->ctx;
+ if (!fop->blocking) {
+ ctx->lock_to_release = OpContext::W_LOCK; // we took it above
+ }
+ ctx->at_version = get_next_version();
+
+ ctx->new_obs = obc->obs;
+ ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
+
+ finish_ctx(ctx);
+
+ if (!fop->dup_ops.empty()) {
+ dout(20) << __func__ << " queueing dups for " << ctx->at_version << dendl;
+ list<OpRequestRef>& ls = waiting_for_ondisk[ctx->at_version];
+ ls.splice(ls.end(), fop->dup_ops);
+ }
+
+ simple_repop_submit(repop);
+
+ flush_ops.erase(oid);
+ return -EINPROGRESS;
+}
+
+void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue)
+{
+ dout(10) << __func__ << " " << fop->ctx->obc->obs.oi.soid << " tid "
+ << fop->objecter_tid << dendl;
+ if (fop->objecter_tid) {
+ Mutex::Locker l(osd->objecter_lock);
+ osd->objecter->op_cancel(fop->objecter_tid);
+ }
+ if (fop->ctx->op && requeue) {
+ requeue_op(fop->ctx->op);
+ requeue_ops(fop->dup_ops);
+ }
+ if (fop->blocking) {
+ fop->ctx->obc->stop_block();
+ kick_object_context_blocked(fop->ctx->obc);
+ }
+ flush_ops.erase(fop->ctx->obc->obs.oi.soid);
+ close_op_ctx(fop->ctx);
+}
+
+void ReplicatedPG::cancel_flush_ops(bool requeue)
+{
+ dout(10) << __func__ << dendl;
+ map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
+ while (p != flush_ops.end()) {
+ cancel_flush((p++)->second, requeue);
+ }
+}
+
// ========================================================================
// rep op gather
void ReplicatedPG::remove_repop(RepGather *repop)
{
+ dout(20) << __func__ << " " << *repop << dendl;
release_op_ctx_locks(repop->ctx);
repop_map.erase(repop->rep_tid);
repop->put();
pg_log_entry_t::LOST_REVERT));
ObjectContextRef obc = object_contexts.lookup(soid);
if (obc) {
- dout(10) << "get_object_context " << obc << " " << soid << dendl;
+ dout(10) << "get_object_context " << obc << " " << soid
+ << " " << obc->rwstate << dendl;
} else {
// check disk
bufferlist bv;
register_snapset_context(obc->ssc);
populate_obc_watchers(obc);
- dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
+ dout(10) << "get_object_context " << obc << " " << soid
+ << " " << obc->rwstate
+ << " 0 -> 1 read " << obc->obs.oi << dendl;
}
return obc;
}
unreg_next_scrub();
cancel_copy_ops(false);
+ cancel_flush_ops(false);
apply_and_flush_repops(false);
context_registry_on_change();
context_registry_on_change();
cancel_copy_ops(is_primary());
+ cancel_flush_ops(is_primary());
// requeue object waiters
if (is_primary()) {
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "common/common_init.h"
+#include "common/Cond.h"
#include "test/librados/test.h"
#include "json_spirit/json_spirit.h"
ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
}
+TEST(LibRadosTier, TryFlush) {
+ Rados cluster;
+ std::string base_pool_name = get_temp_pool_name();
+ std::string cache_pool_name = base_pool_name + "-cache";
+ ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+ ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+ IoCtx cache_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+ IoCtx base_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+ // configure cache
+ bufferlist inbl;
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+ "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+ "\", \"mode\": \"writeback\"}",
+ inbl, NULL, NULL));
+
+ // wait for maps to settle
+ cluster.wait_for_latest_osdmap();
+
+ // create object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // verify the object is present in the cache tier
+ {
+ ObjectIterator it = cache_ioctx.objects_begin();
+ ASSERT_TRUE(it != cache_ioctx.objects_end());
+ ASSERT_TRUE(it->first == string("foo"));
+ ++it;
+ ASSERT_TRUE(it == cache_ioctx.objects_end());
+ }
+
+ // verify the object is NOT present in the base tier
+ {
+ ObjectIterator it = base_ioctx.objects_begin();
+ ASSERT_TRUE(it == base_ioctx.objects_end());
+ }
+
+ // verify dirty
+ {
+ bool dirty = false;
+ int r = -1;
+ ObjectReadOperation op;
+ op.is_dirty(&dirty, &r);
+ ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+ ASSERT_TRUE(dirty);
+ ASSERT_EQ(0, r);
+ }
+
+ // flush
+ {
+ ObjectWriteOperation op;
+ op.cache_try_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ // verify clean
+ {
+ bool dirty = false;
+ int r = -1;
+ ObjectReadOperation op;
+ op.is_dirty(&dirty, &r);
+ ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+ ASSERT_FALSE(dirty);
+ ASSERT_EQ(0, r);
+ }
+
+ // verify in base tier
+ {
+ ObjectIterator it = base_ioctx.objects_begin();
+ ASSERT_TRUE(it != base_ioctx.objects_end());
+ ASSERT_TRUE(it->first == string("foo"));
+ ++it;
+ ASSERT_TRUE(it == base_ioctx.objects_end());
+ }
+
+ // evict it
+ {
+ ObjectWriteOperation op;
+ op.cache_evict();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op, librados::OPERATION_IGNORE_CACHE));
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ // verify no longer in cache tier
+ {
+ ObjectIterator it = cache_ioctx.objects_begin();
+ ASSERT_TRUE(it == cache_ioctx.objects_end());
+ }
+
+ // tear down tiers
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+ "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+
+ base_ioctx.close();
+ cache_ioctx.close();
+
+ cluster.pool_delete(cache_pool_name.c_str());
+ ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+TEST(LibRadosTier, Flush) {
+ Rados cluster;
+ std::string base_pool_name = get_temp_pool_name();
+ std::string cache_pool_name = base_pool_name + "-cache";
+ ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+ ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+ IoCtx cache_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+ IoCtx base_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+ // configure cache
+ bufferlist inbl;
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+ "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+ "\", \"mode\": \"writeback\"}",
+ inbl, NULL, NULL));
+
+ // wait for maps to settle
+ cluster.wait_for_latest_osdmap();
+
+ uint64_t user_version = 0;
+
+ // create object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // verify the object is present in the cache tier
+ {
+ ObjectIterator it = cache_ioctx.objects_begin();
+ ASSERT_TRUE(it != cache_ioctx.objects_end());
+ ASSERT_TRUE(it->first == string("foo"));
+ ++it;
+ ASSERT_TRUE(it == cache_ioctx.objects_end());
+ }
+
+ // verify the object is NOT present in the base tier
+ {
+ ObjectIterator it = base_ioctx.objects_begin();
+ ASSERT_TRUE(it == base_ioctx.objects_end());
+ }
+
+ // verify dirty
+ {
+ bool dirty = false;
+ int r = -1;
+ ObjectReadOperation op;
+ op.is_dirty(&dirty, &r);
+ ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+ ASSERT_TRUE(dirty);
+ ASSERT_EQ(0, r);
+ user_version = cache_ioctx.get_last_version();
+ }
+
+ // flush
+ {
+ ObjectWriteOperation op;
+ op.cache_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY));
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ // verify clean
+ {
+ bool dirty = false;
+ int r = -1;
+ ObjectReadOperation op;
+ op.is_dirty(&dirty, &r);
+ ASSERT_EQ(0, cache_ioctx.operate("foo", &op, NULL));
+ ASSERT_FALSE(dirty);
+ ASSERT_EQ(0, r);
+ }
+
+ // verify in base tier
+ {
+ ObjectIterator it = base_ioctx.objects_begin();
+ ASSERT_TRUE(it != base_ioctx.objects_end());
+ ASSERT_TRUE(it->first == string("foo"));
+ ++it;
+ ASSERT_TRUE(it == base_ioctx.objects_end());
+ }
+
+ // evict it
+ {
+ ObjectWriteOperation op;
+ op.cache_evict();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op, librados::OPERATION_IGNORE_CACHE));
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ // verify no longer in cache tier
+ {
+ ObjectIterator it = cache_ioctx.objects_begin();
+ ASSERT_TRUE(it == cache_ioctx.objects_end());
+ }
+
+ // read it again and verify the version is consistent
+ {
+ bufferlist bl;
+ ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
+ ASSERT_EQ(user_version, cache_ioctx.get_last_version());
+ }
+
+ // erase it
+ {
+ ObjectWriteOperation op;
+ op.remove();
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // flush whiteout
+ {
+ ObjectWriteOperation op;
+ op.cache_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY));
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ // evict
+ {
+ ObjectWriteOperation op;
+ op.cache_evict();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op, librados::OPERATION_IGNORE_CACHE));
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ // verify no longer in cache tier
+ {
+ ObjectIterator it = cache_ioctx.objects_begin();
+ ASSERT_TRUE(it == cache_ioctx.objects_end());
+ }
+ // or base tier
+ {
+ ObjectIterator it = base_ioctx.objects_begin();
+ ASSERT_TRUE(it == base_ioctx.objects_end());
+ }
+
+ // tear down tiers
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+ "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+
+ base_ioctx.close();
+ cache_ioctx.close();
+
+ cluster.pool_delete(cache_pool_name.c_str());
+ ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+TEST(LibRadosTier, FlushWriteRaces) {
+ Rados cluster;
+ std::string base_pool_name = get_temp_pool_name();
+ std::string cache_pool_name = base_pool_name + "-cache";
+ ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+ ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+ IoCtx cache_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+ IoCtx base_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+ // configure cache
+ bufferlist inbl;
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+ "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+ "\", \"mode\": \"writeback\"}",
+ inbl, NULL, NULL));
+
+ // wait for maps to settle
+ cluster.wait_for_latest_osdmap();
+
+ // create/dirty object
+ bufferlist bl;
+ bl.append("hi there");
+ {
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // flush + write
+ {
+ ObjectWriteOperation op;
+ op.cache_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY));
+
+ ObjectWriteOperation op2;
+ op2.write_full(bl);
+ librados::AioCompletion *completion2 = cluster.aio_create_completion();
+ ASSERT_EQ(0, base_ioctx.aio_operate(
+ "foo", completion2, &op2, 0));
+
+ completion->wait_for_safe();
+ completion2->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ ASSERT_EQ(0, completion2->get_return_value());
+ completion->release();
+ completion2->release();
+ }
+
+ // create/dirty object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // try-flush + write
+ {
+ ObjectWriteOperation op;
+ op.cache_try_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+
+ ObjectWriteOperation op2;
+ op2.write_full(bl);
+ librados::AioCompletion *completion2 = cluster.aio_create_completion();
+ ASSERT_EQ(0, base_ioctx.aio_operate(
+ "foo", completion2, &op2, 0));
+
+ completion->wait_for_safe();
+ completion2->wait_for_safe();
+ ASSERT_EQ(-EBUSY, completion->get_return_value());
+ ASSERT_EQ(0, completion2->get_return_value());
+ completion->release();
+ completion2->release();
+ }
+
+ // tear down tiers
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+ "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+
+ base_ioctx.close();
+ cache_ioctx.close();
+
+ cluster.pool_delete(cache_pool_name.c_str());
+ ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+TEST(LibRadosTier, FlushTryFlushRaces) {
+ Rados cluster;
+ std::string base_pool_name = get_temp_pool_name();
+ std::string cache_pool_name = base_pool_name + "-cache";
+ ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+ ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+ IoCtx cache_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+ IoCtx base_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+ // configure cache
+ bufferlist inbl;
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+ "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+ "\", \"mode\": \"writeback\"}",
+ inbl, NULL, NULL));
+
+ // wait for maps to settle
+ cluster.wait_for_latest_osdmap();
+
+ // create/dirty object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // flush + flush
+ {
+ ObjectWriteOperation op;
+ op.cache_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY));
+
+ ObjectWriteOperation op2;
+ op2.cache_flush();
+ librados::AioCompletion *completion2 = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion2, &op2,
+ librados::OPERATION_IGNORE_OVERLAY));
+
+ completion->wait_for_safe();
+ completion2->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ ASSERT_EQ(0, completion2->get_return_value());
+ completion->release();
+ completion2->release();
+ }
+
+ // create/dirty object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // flush + try-flush
+ {
+ ObjectWriteOperation op;
+ op.cache_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY));
+
+ ObjectWriteOperation op2;
+ op2.cache_try_flush();
+ librados::AioCompletion *completion2 = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion2, &op2,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+
+ completion->wait_for_safe();
+ completion2->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ ASSERT_EQ(0, completion2->get_return_value());
+ completion->release();
+ completion2->release();
+ }
+
+ // create/dirty object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // try-flush + flush
+ // (flush will not piggyback on try-flush)
+ {
+ ObjectWriteOperation op;
+ op.cache_try_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+
+ ObjectWriteOperation op2;
+ op2.cache_flush();
+ librados::AioCompletion *completion2 = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion2, &op2,
+ librados::OPERATION_IGNORE_OVERLAY));
+
+ completion->wait_for_safe();
+ completion2->wait_for_safe();
+ ASSERT_EQ(-EBUSY, completion->get_return_value());
+ ASSERT_EQ(0, completion2->get_return_value());
+ completion->release();
+ completion2->release();
+ }
+
+ // create/dirty object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // try-flush + try-flush
+ {
+ ObjectWriteOperation op;
+ op.cache_try_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+
+ ObjectWriteOperation op2;
+ op2.cache_try_flush();
+ librados::AioCompletion *completion2 = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion2, &op2,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+
+ completion->wait_for_safe();
+ completion2->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ ASSERT_EQ(0, completion2->get_return_value());
+ completion->release();
+ completion2->release();
+ }
+
+ // tear down tiers
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+ "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+
+ base_ioctx.close();
+ cache_ioctx.close();
+
+ cluster.pool_delete(cache_pool_name.c_str());
+ ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
+
+IoCtx *read_ioctx = 0;
+Mutex lock("FlushReadRaces::lock");
+Cond cond;
+int max_reads = 100;
+int num_reads = 0; // in progress
+
+void flush_read_race_cb(completion_t cb, void *arg);
+
+void start_flush_read()
+{
+ //cout << " starting read" << std::endl;
+ ObjectReadOperation op;
+ op.stat(NULL, NULL, NULL);
+ librados::AioCompletion *completion =
+ librados::Rados::aio_create_completion();
+ completion->set_complete_callback(0, flush_read_race_cb);
+ read_ioctx->aio_operate("foo", completion, &op, NULL);
+}
+
+void flush_read_race_cb(completion_t cb, void *arg)
+{
+ //cout << " finished read" << std::endl;
+ lock.Lock();
+ if (num_reads > max_reads) {
+ num_reads--;
+ cond.Signal();
+ } else {
+ start_flush_read();
+ }
+ // fixme: i'm leaking cb...
+ lock.Unlock();
+}
+
+TEST(LibRadosTier, TryFlushReadRace) {
+ Rados cluster;
+ std::string base_pool_name = get_temp_pool_name();
+ std::string cache_pool_name = base_pool_name + "-cache";
+ ASSERT_EQ("", create_one_pool_pp(base_pool_name, cluster));
+ ASSERT_EQ(0, cluster.pool_create(cache_pool_name.c_str()));
+ IoCtx cache_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(cache_pool_name.c_str(), cache_ioctx));
+ IoCtx base_ioctx;
+ ASSERT_EQ(0, cluster.ioctx_create(base_pool_name.c_str(), base_ioctx));
+
+ // configure cache
+ bufferlist inbl;
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier add\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier set-overlay\", \"pool\": \"" + base_pool_name +
+ "\", \"overlaypool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier cache-mode\", \"pool\": \"" + cache_pool_name +
+ "\", \"mode\": \"writeback\"}",
+ inbl, NULL, NULL));
+
+ // wait for maps to settle
+ cluster.wait_for_latest_osdmap();
+
+ // create/dirty object
+ {
+ bufferlist bl;
+ bl.append("hi there");
+ bufferptr bp(4000000); // make it big!
+ bp.zero();
+ bl.append(bp);
+ ObjectWriteOperation op;
+ op.write_full(bl);
+ ASSERT_EQ(0, base_ioctx.operate("foo", &op));
+ }
+
+ // start a continuous stream of reads
+ read_ioctx = &base_ioctx;
+ lock.Lock();
+ for (int i = 0; i < max_reads; ++i) {
+ start_flush_read();
+ num_reads++;
+ }
+ lock.Unlock();
+
+ // try-flush
+ ObjectWriteOperation op;
+ op.cache_try_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, cache_ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_OVERLAY |
+ librados::OPERATION_SKIPRWLOCKS));
+
+ completion->wait_for_safe();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+
+ // stop reads
+ lock.Lock();
+ max_reads = 0;
+ while (num_reads > 0)
+ cond.Wait(lock);
+ lock.Unlock();
+
+ // tear down tiers
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove-overlay\", \"pool\": \"" + base_pool_name +
+ "\"}",
+ inbl, NULL, NULL));
+ ASSERT_EQ(0, cluster.mon_command(
+ "{\"prefix\": \"osd tier remove\", \"pool\": \"" + base_pool_name +
+ "\", \"tierpool\": \"" + cache_pool_name + "\"}",
+ inbl, NULL, NULL));
+
+ base_ioctx.close();
+ cache_ioctx.close();
+
+ cluster.pool_delete(cache_pool_name.c_str());
+ ASSERT_EQ(0, destroy_one_pool_pp(base_pool_name, cluster));
+}
+
TEST(LibRadosTier, HitSetNone) {
Rados cluster;
std::string pool_name = get_temp_pool_name();