]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: replace simple_repop_.* with simple_opc_.*
authorSamuel Just <sjust@redhat.com>
Wed, 11 Nov 2015 20:40:16 +0000 (12:40 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 25 Feb 2016 18:56:40 +0000 (10:56 -0800)
This way, we don't expose the RepGather structure
up into the users (which pretty much exclusively
use the repop->ctx member anyway).  This will pave
the way to removing RepGather::ctx.

Part of this involves generalizing the repop callback
members (queue_snap_trimmer and on_applied) to
on_applied, on_committed, on_success, and on_finish
on both OpContext and RepGather.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 6f5a91dac124e69033bb362b1b9b575ef950a079..d383058997fae8c82129722e159e8040ca8e8543 100644 (file)
@@ -3231,7 +3231,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op)
   }
 }
 
-ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
+ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
 {
   // load clone info
   bufferlist bl;
@@ -3298,12 +3298,12 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     }
   }
 
-  RepGather *repop = simple_repop_create(obc);
-  OpContext *ctx = repop->ctx;
+  OpContextUPtr ctx = simple_opc_create(obc);
   ctx->snapset_obc = snapset_obc;
   ctx->lock_to_release = OpContext::W_LOCK;
   ctx->release_snapset_obc = true;
   ctx->at_version = get_next_version();
+
   PGBackend::PGTransaction *t = ctx->op_t;
  
   if (new_snaps.empty()) {
@@ -3386,7 +3386,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     coi.version = ctx->at_version;
     bl.clear();
     ::encode(coi, bl);
-    setattr_maybe_cache(ctx->obc, ctx, t, OI_ATTR, bl);
+    setattr_maybe_cache(ctx->obc, ctx.get(), t, OI_ATTR, bl);
 
     ctx->log.push_back(
       pg_log_entry_t(
@@ -3470,7 +3470,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     bl.clear();
     ::encode(ctx->snapset_obc->obs.oi, bl);
     attrs[OI_ATTR].claim(bl);
-    setattrs_maybe_cache(ctx->snapset_obc, ctx, t, attrs);
+    setattrs_maybe_cache(ctx->snapset_obc, ctx.get(), t, attrs);
 
     if (pool.info.require_rollback()) {
       set<string> changing;
@@ -3482,7 +3482,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     }
   }
 
-  return repop;
+  return ctx;
 }
 
 void ReplicatedPG::snap_trimmer(epoch_t queued)
@@ -7145,12 +7145,12 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
       dout(20) << __func__ << " using temp " << cop->results.temp_oid << dendl;
     }
     ObjectContextRef tempobc = get_object_context(cop->results.temp_oid, true);
-    RepGather *repop = simple_repop_create(tempobc);
+    OpContextUPtr ctx = simple_opc_create(tempobc);
     if (cop->temp_cursor.is_initial()) {
-      repop->ctx->new_temp_oid = cop->results.temp_oid;
+      ctx->new_temp_oid = cop->results.temp_oid;
     }
-    _write_copy_chunk(cop, repop->ctx->op_t);
-    simple_repop_submit(repop);
+    _write_copy_chunk(cop, ctx->op_t);
+    simple_opc_submit(std::move(ctx));
     dout(10) << __func__ << " fetching more" << dendl;
     _copy_some(cobc, cop);
     return;
@@ -7453,9 +7453,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
     dout(10) << __func__ << " abort; will clean up partial work" << dendl;
     ObjectContextRef tempobc = get_object_context(results->temp_oid, false);
     assert(tempobc);
-    RepGather *repop = simple_repop_create(tempobc);
-    repop->ctx->op_t->remove(results->temp_oid);
-    simple_repop_submit(repop);
+    OpContextUPtr ctx = simple_opc_create(tempobc);
+    ctx->op_t->remove(results->temp_oid);
+    simple_opc_submit(std::move(ctx));
     results->started_temp_obj = false;
   }
 
@@ -7467,8 +7467,8 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
     hobject_t head(soid.get_head());
     ObjectContextRef obc = get_object_context(head, false);
     assert(obc);
-    RepGather *repop = simple_repop_create(obc);
-    OpContext *tctx = repop->ctx;
+
+    OpContextUPtr tctx = simple_opc_create(obc);
     tctx->at_version = get_next_version();
     filter_snapc(tctx->new_snapset.snaps);
     vector<snapid_t> new_clones;
@@ -7489,9 +7489,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
     tctx->lock_to_release = OpContext::W_LOCK;
     dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl;
 
-    finish_ctx(tctx, pg_log_entry_t::PROMOTE);
+    finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE);
 
-    simple_repop_submit(repop);
+    simple_opc_submit(std::move(tctx));
     return;
   }
 
@@ -7519,8 +7519,7 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
     return;
   }
 
-  RepGather *repop = simple_repop_create(obc);
-  OpContext *tctx = repop->ctx;
+  OpContextUPtr tctx =  simple_opc_create(obc);
   tctx->at_version = get_next_version();
 
   ++tctx->delta_stats.num_objects;
@@ -7588,9 +7587,9 @@ void ReplicatedPG::finish_promote(int r, CopyResults *results,
   tctx->lock_to_release = OpContext::W_LOCK;
   dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl;
 
-  finish_ctx(tctx, pg_log_entry_t::PROMOTE);
+  finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE);
 
-  simple_repop_submit(repop);
+  simple_opc_submit(std::move(tctx));
 
   osd->logger->inc(l_osd_tier_promote);
 
@@ -8020,8 +8019,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
   }
 
   dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
-  RepGather *repop = simple_repop_create(fop->obc);
-  OpContext *ctx = repop->ctx;
+  OpContextUPtr ctx = simple_opc_create(fop->obc);
 
   ctx->on_finish = fop->on_flush;
   fop->on_flush = NULL;
@@ -8033,7 +8031,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
   ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
   --ctx->delta_stats.num_objects_dirty;
 
-  finish_ctx(ctx, pg_log_entry_t::CLEAN);
+  finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
 
   osd->logger->inc(l_osd_tier_clean);
 
@@ -8046,7 +8044,7 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
     requeue_ops(ls);
   }
 
-  simple_repop_submit(repop);
+  simple_opc_submit(std::move(ctx));
 
   flush_ops.erase(oid);
 
@@ -8124,10 +8122,6 @@ void ReplicatedPG::repop_all_applied(RepGather *repop)
   repop->all_applied = true;
   if (!repop->rep_aborted) {
     eval_repop(repop);
-    if (repop->on_applied) {
-     repop->on_applied->complete(0);
-     repop->on_applied = NULL;
-    }
   }
 }
 
@@ -8208,6 +8202,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
   // ondisk?
   if (repop->all_committed) {
+    for (auto p = repop->on_committed.begin();
+        p != repop->on_committed.end();
+        repop->on_committed.erase(p++)) {
+      (*p)();
+    }
+
     if (repop->ctx->op && !repop->log_op_stat) {
       log_op_stats(repop->ctx);
       repop->log_op_stat = true;
@@ -8253,6 +8253,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
   // applied?
   if (repop->all_applied) {
+    dout(10) << " applied: " << *repop << " " << dendl;
+    for (auto p = repop->on_applied.begin();
+        p != repop->on_applied.end();
+        repop->on_applied.erase(p++)) {
+      (*p)();
+    }
 
     // send dup acks, in order
     if (waiting_for_ack.count(repop->v)) {
@@ -8306,9 +8312,10 @@ void ReplicatedPG::eval_repop(RepGather *repop)
 
     calc_min_last_complete_ondisk();
 
-    // kick snap_trimmer if necessary
-    if (repop->queue_snap_trimmer) {
-      queue_snap_trim();
+    for (auto p = repop->on_success.begin();
+        p != repop->on_success.end();
+        repop->on_success.erase(p++)) {
+      (*p)();
     }
 
     dout(10) << " removing " << *repop << dendl;
@@ -8397,8 +8404,9 @@ void ReplicatedPG::issue_repop(RepGather *repop)
   repop->ctx->op_t = NULL;
 }
 
-ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc,
-                                                ceph_tid_t rep_tid)
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(
+  OpContext *ctx, ObjectContextRef obc,
+  ceph_tid_t rep_tid)
 {
   if (ctx->op)
     dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl;
@@ -8426,6 +8434,13 @@ void ReplicatedPG::remove_repop(RepGather *repop)
     dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl;
   if (repop->ctx->snapset_obc)
     dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl;
+
+  for (auto p = repop->on_finish.begin();
+       p != repop->on_finish.end();
+       repop->on_finish.erase(p++)) {
+    (*p)();
+  }
+
   release_op_ctx_locks(repop->ctx);
   repop->ctx->finish(0);  // FIXME: return value here is sloppy
   repop->put();
@@ -8433,21 +8448,21 @@ void ReplicatedPG::remove_repop(RepGather *repop)
   osd->logger->dec(l_osd_op_wip);
 }
 
-ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
+ReplicatedPG::OpContextUPtr ReplicatedPG::simple_opc_create(ObjectContextRef obc)
 {
   dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
   vector<OSDOp> ops;
   ceph_tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
-  OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, obc, this);
+  OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
   ctx->op_t = pgbackend->get_transaction();
   ctx->mtime = ceph_clock_now(g_ceph_context);
-  RepGather *repop = new_repop(ctx, obc, rep_tid);
-  return repop;
+  return ctx;
 }
 
-void ReplicatedPG::simple_repop_submit(RepGather *repop)
+void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx)
 {
+  RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid);
   dout(20) << __func__ << " " << repop << dendl;
   issue_repop(repop);
   eval_repop(repop);
@@ -8575,8 +8590,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
     return;
   }
 
-  RepGather *repop = simple_repop_create(obc);
-  OpContext *ctx = repop->ctx;
+  OpContextUPtr ctx = simple_opc_create(obc);
   ctx->at_version = get_next_version();
 
   object_info_t& oi = ctx->new_obs.oi;
@@ -8594,11 +8608,11 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
                                    0,
                                    osd_reqid_t(), ctx->mtime));
 
-  oi.prior_version = repop->obc->obs.oi.version;
+  oi.prior_version = obc->obs.oi.version;
   oi.version = ctx->at_version;
   bufferlist bl;
   ::encode(oi, bl);
-  setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
+  setattr_maybe_cache(obc, ctx.get(), t, OI_ATTR, bl);
 
   if (pool.info.require_rollback()) {
     map<string, boost::optional<bufferlist> > to_set;
@@ -8609,9 +8623,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
   }
 
   // no ctx->delta_stats
-
-  // obc ref swallowed by repop!
-  simple_repop_submit(repop);
+  simple_opc_submit(std::move(ctx));
 
   // apply new object state.
   ctx->obc->obs = ctx->new_obs;
@@ -9569,10 +9581,9 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     repop_queue.pop_front();
     dout(10) << " canceling repop tid " << repop->rep_tid << dendl;
     repop->rep_aborted = true;
-    if (repop->on_applied) {
-      delete repop->on_applied;
-      repop->on_applied = NULL;
-    }
+    repop->on_applied.clear();
+    repop->on_committed.clear();
+    repop->on_success.clear();
 
     if (requeue) {
       if (repop->ctx->op) {
@@ -11164,15 +11175,14 @@ void ReplicatedPG::hit_set_remove_all()
     ObjectContextRef obc = get_object_context(oid, false);
     assert(obc);
 
-    RepGather *repop = simple_repop_create(obc);
-    OpContext *ctx = repop->ctx;
+    OpContextUPtr ctx = simple_opc_create(obc);
     ctx->at_version = get_next_version();
     ctx->updated_hset_history = info.hit_set;
     utime_t now = ceph_clock_now(cct);
     ctx->mtime = now;
-    hit_set_trim(repop, 0);
-    apply_ctx_stats(ctx);
-    simple_repop_submit(repop);
+    hit_set_trim(ctx, 0);
+    apply_ctx_stats(ctx.get());
+    simple_opc_submit(std::move(ctx));
   }
 
   info.hit_set = pg_hit_set_history_t();
@@ -11252,15 +11262,6 @@ bool ReplicatedPG::hit_set_apply_log()
   return true;
 }
 
-struct C_HitSetFlushing : public Context {
-  ReplicatedPGRef pg;
-  time_t hit_set_name;
-  C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { }
-  void finish(int r) {
-    pg->hit_set_flushing.erase(hit_set_name);
-  }
-};
-
 void ReplicatedPG::hit_set_persist()
 {
   dout(10) << __func__  << dendl;
@@ -11268,7 +11269,6 @@ void ReplicatedPG::hit_set_persist()
   unsigned max = pool.info.hit_set_count;
 
   utime_t now = ceph_clock_now(cct);
-  RepGather *repop;
   hobject_t oid;
   time_t flush_time = 0;
 
@@ -11337,10 +11337,15 @@ void ReplicatedPG::hit_set_persist()
   flush_time = new_hset.begin;
 
   ObjectContextRef obc = get_object_context(oid, true);
-  repop = simple_repop_create(obc);
-  if (flush_time != 0)
-    repop->on_applied = new C_HitSetFlushing(this, flush_time);
-  OpContext *ctx = repop->ctx;
+  OpContextUPtr ctx = simple_opc_create(obc);
+  if (flush_time != 0) {
+    ReplicatedPGRef pg(this);
+    ctx->register_on_applied(
+      [pg, flush_time]() {
+       pg->hit_set_flushing.erase(flush_time);
+      });
+  }
+
   ctx->at_version = get_next_version();
   ctx->updated_hset_history = info.hit_set;
   pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history);
@@ -11377,7 +11382,7 @@ void ReplicatedPG::hit_set_persist()
   map <string, bufferlist> attrs;
   attrs[OI_ATTR].claim(boi);
   attrs[SS_ATTR].claim(bss);
-  setattrs_maybe_cache(ctx->obc, ctx, ctx->op_t, attrs);
+  setattrs_maybe_cache(ctx->obc, ctx.get(), ctx->op_t, attrs);
   ctx->log.push_back(
     pg_log_entry_t(
       pg_log_entry_t::MODIFY,
@@ -11394,17 +11399,17 @@ void ReplicatedPG::hit_set_persist()
     ctx->log.back().mod_desc.mark_unrollbackable();
   }
 
-  hit_set_trim(repop, max);
+  hit_set_trim(ctx, max);
 
-  apply_ctx_stats(ctx);
-  simple_repop_submit(repop);
+  apply_ctx_stats(ctx.get());
+  simple_opc_submit(std::move(ctx));
 }
 
-void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
+void ReplicatedPG::hit_set_trim(OpContextUPtr &ctx, unsigned max)
 {
-  assert(repop->ctx->updated_hset_history);
+  assert(ctx->updated_hset_history);
   pg_hit_set_history_t &updated_hit_set_hist =
-    *(repop->ctx->updated_hset_history);
+    *(ctx->updated_hset_history);
   for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) {
     list<pg_hit_set_info_t>::iterator p = updated_hit_set_hist.history.begin();
     assert(p != updated_hit_set_hist.history.end());
@@ -11413,34 +11418,34 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
     assert(!is_degraded_or_backfilling_object(oid));
 
     dout(20) << __func__ << " removing " << oid << dendl;
-    ++repop->ctx->at_version.version;
-    repop->ctx->log.push_back(
+    ++ctx->at_version.version;
+    ctx->log.push_back(
         pg_log_entry_t(pg_log_entry_t::DELETE,
                       oid,
-                      repop->ctx->at_version,
+                      ctx->at_version,
                       p->version,
                       0,
                       osd_reqid_t(),
-                      repop->ctx->mtime));
+                      ctx->mtime));
     if (pool.info.require_rollback()) {
-      if (repop->ctx->log.back().mod_desc.rmobject(
-         repop->ctx->at_version.version)) {
-       repop->ctx->op_t->stash(oid, repop->ctx->at_version.version);
+      if (ctx->log.back().mod_desc.rmobject(
+         ctx->at_version.version)) {
+       ctx->op_t->stash(oid, ctx->at_version.version);
       } else {
-       repop->ctx->op_t->remove(oid);
+       ctx->op_t->remove(oid);
       }
     } else {
-      repop->ctx->op_t->remove(oid);
-      repop->ctx->log.back().mod_desc.mark_unrollbackable();
+      ctx->op_t->remove(oid);
+      ctx->log.back().mod_desc.mark_unrollbackable();
     }
     updated_hit_set_hist.history.pop_front();
 
     ObjectContextRef obc = get_object_context(oid, false);
     assert(obc);
-    --repop->ctx->delta_stats.num_objects;
-    --repop->ctx->delta_stats.num_objects_hit_set_archive;
-    repop->ctx->delta_stats.num_bytes -= obc->obs.oi.size;
-    repop->ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size;
+    --ctx->delta_stats.num_objects;
+    --ctx->delta_stats.num_objects_hit_set_archive;
+    ctx->delta_stats.num_bytes -= obc->obs.oi.size;
+    ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size;
   }
 }
 
@@ -11868,14 +11873,13 @@ bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush)
   }
 
   dout(10) << __func__ << " evicting " << obc->obs.oi << dendl;
-  RepGather *repop = simple_repop_create(obc);
-  OpContext *ctx = repop->ctx;
+  OpContextUPtr ctx = simple_opc_create(obc);
   Context *on_evict = new C_AgentEvictStartStop(this);
   ctx->on_finish = on_evict;
   ctx->lock_to_release = OpContext::W_LOCK;
   ctx->at_version = get_next_version();
   assert(ctx->new_obs.exists);
-  int r = _delete_oid(ctx, true);
+  int r = _delete_oid(ctx.get(), true);
   if (obc->obs.oi.is_omap())
     ctx->delta_stats.num_objects_omap--;
   ctx->delta_stats.num_evict++;
@@ -11883,8 +11887,8 @@ bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc, bool after_flush)
   if (obc->obs.oi.is_dirty())
     --ctx->delta_stats.num_objects_dirty;
   assert(r == 0);
-  finish_ctx(ctx, pg_log_entry_t::DELETE, false);
-  simple_repop_submit(repop);
+  finish_ctx(ctx.get(), pg_log_entry_t::DELETE, false);
+  simple_opc_submit(std::move(ctx));
   osd->logger->inc(l_osd_tier_evict);
   osd->logger->inc(l_osd_agent_evict);
   return true;
@@ -12583,15 +12587,14 @@ void ReplicatedPG::_scrub(
     dout(10) << __func__ << " recording digests for " << p->first << dendl;
     ObjectContextRef obc = get_object_context(p->first, false);
     assert(obc);
-    RepGather *repop = simple_repop_create(obc);
-    OpContext *ctx = repop->ctx;
+    OpContextUPtr ctx = simple_opc_create(obc);
     ctx->at_version = get_next_version();
     ctx->mtime = utime_t();      // do not update mtime
     ctx->new_obs.oi.set_data_digest(p->second.first);
     ctx->new_obs.oi.set_omap_digest(p->second.second);
-    finish_ctx(ctx, pg_log_entry_t::MODIFY, true, true);
+    finish_ctx(ctx.get(), pg_log_entry_t::MODIFY, true, true);
     ctx->on_finish = new C_ScrubDigestUpdated(this);
-    simple_repop_submit(repop);
+    simple_opc_submit(std::move(ctx));
     ++scrubber.num_digest_updates_pending;
   }
 
@@ -12674,10 +12677,7 @@ void ReplicatedPG::_scrub_finish()
 
 ReplicatedPG::SnapTrimmer::~SnapTrimmer()
 {
-  while (!repops.empty()) {
-    (*repops.begin())->put();
-    repops.erase(repops.begin());
-  }
+  in_flight.clear();
 }
 
 void ReplicatedPG::SnapTrimmer::log_enter(const char *state_name)
@@ -12746,36 +12746,19 @@ ReplicatedPG::TrimmingObjects::TrimmingObjects(my_context ctx)
 void ReplicatedPG::TrimmingObjects::exit()
 {
   context< SnapTrimmer >().log_exit(state_name, enter_time);
-  // Clean up repops in case of reset
-  set<RepGather *> &repops = context<SnapTrimmer>().repops;
-  for (set<RepGather *>::iterator i = repops.begin();
-       i != repops.end();
-       repops.erase(i++)) {
-    (*i)->put();
-  }
+  context<SnapTrimmer>().in_flight.clear();
 }
 
 boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
 {
   dout(10) << "TrimmingObjects react" << dendl;
-  ReplicatedPG *pg = context< SnapTrimmer >().pg;
+  ReplicatedPGRef pg = context< SnapTrimmer >().pg;
   snapid_t snap_to_trim = context<SnapTrimmer>().snap_to_trim;
-  set<RepGather *> &repops = context<SnapTrimmer>().repops;
+  auto &in_flight = context<SnapTrimmer>().in_flight;
 
   dout(10) << "TrimmingObjects: trimming snap " << snap_to_trim << dendl;
 
-  for (set<RepGather *>::iterator i = repops.begin();
-       i != repops.end(); 
-       ) {
-    if ((*i)->all_applied && (*i)->all_committed) {
-      (*i)->put();
-      repops.erase(i++);
-    } else {
-      ++i;
-    }
-  }
-
-  while (repops.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
+  while (in_flight.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
     // Get next
     hobject_t old_pos = pos;
     int r = pg->snap_mapper.get_next_object_to_trim(snap_to_trim, &pos);
@@ -12790,20 +12773,25 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
     }
 
     dout(10) << "TrimmingObjects react trimming " << pos << dendl;
-    RepGather *repop = pg->trim_object(pos);
-    if (!repop) {
+    OpContextUPtr ctx = pg->trim_object(pos);
+    if (!ctx) {
       dout(10) << __func__ << " could not get write lock on obj "
               << pos << dendl;
       pos = old_pos;
       return discard_event();
     }
-    assert(repop);
-    repop->queue_snap_trimmer = true;
+    assert(ctx);
+    hobject_t to_remove = pos;
+    ctx->register_on_success(
+      [pg, to_remove, &in_flight]() {
+       in_flight.erase(to_remove);
+       pg->queue_snap_trim();
+      });
 
-    pg->apply_ctx_stats(repop->ctx);
+    pg->apply_ctx_stats(ctx.get());
 
-    repops.insert(repop->get());
-    pg->simple_repop_submit(repop);
+    in_flight.insert(pos);
+    pg->simple_opc_submit(std::move(ctx));
   }
   return discard_event();
 }
@@ -12819,30 +12807,16 @@ ReplicatedPG::WaitingOnReplicas::WaitingOnReplicas(my_context ctx)
 void ReplicatedPG::WaitingOnReplicas::exit()
 {
   context< SnapTrimmer >().log_exit(state_name, enter_time);
-
-  // Clean up repops in case of reset
-  set<RepGather *> &repops = context<SnapTrimmer>().repops;
-  for (set<RepGather *>::iterator i = repops.begin();
-       i != repops.end();
-       repops.erase(i++)) {
-    (*i)->put();
-  }
+  context<SnapTrimmer>().in_flight.clear();
 }
 
 boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&)
 {
-  // Have all the repops applied?
+  // Have all the trims finished?
   dout(10) << "Waiting on Replicas react" << dendl;
   ReplicatedPG *pg = context< SnapTrimmer >().pg;
-  set<RepGather *> &repops = context<SnapTrimmer>().repops;
-  for (set<RepGather *>::iterator i = repops.begin();
-       i != repops.end();
-       repops.erase(i++)) {
-    if (!(*i)->all_applied || !(*i)->all_committed) {
-      return discard_event();
-    } else {
-      (*i)->put();
-    }
+  if (!context<SnapTrimmer>().in_flight.empty()) {
+    return discard_event();
   }
 
   snapid_t &sn = context<SnapTrimmer>().snap_to_trim;
index f0211168f12c33d8bbf7b3a5e2540057a39bc823..0f0537493496161cb8b1c88909513746d152f8f2 100644 (file)
@@ -557,6 +557,29 @@ public:
     // pending xattr updates
     map<ObjectContextRef,
        map<string, boost::optional<bufferlist> > > pending_attrs;
+
+    list<std::function<void()>> on_applied;
+    list<std::function<void()>> on_committed;
+    list<std::function<void()>> on_finish;
+    list<std::function<void()>> on_success;
+    template <typename F>
+    void register_on_finish(F &&f) {
+      on_finish.emplace_back(std::move(f));
+    }
+    template <typename F>
+    void register_on_success(F &&f) {
+      on_finish.emplace_back(std::move(f));
+    }
+    template <typename F>
+    void register_on_applied(F &&f) {
+      on_applied.emplace_back(std::move(f));
+    }
+    template <typename F>
+    void register_on_commit(F &&f) {
+      on_committed.emplace_back(std::move(f));
+    }
+
+
     void apply_pending_attrs() {
       for (map<ObjectContextRef,
             map<string, boost::optional<bufferlist> > >::iterator i =
@@ -675,6 +698,7 @@ public:
       }
     }
   };
+  using OpContextUPtr = std::unique_ptr<OpContext>;
   friend struct OpContext;
 
   /*
@@ -705,9 +729,10 @@ public:
     
     eversion_t          pg_local_last_complete;
 
-    bool queue_snap_trimmer;
-
-    Context *on_applied;
+    list<std::function<void()>> on_applied;
+    list<std::function<void()>> on_committed;
+    list<std::function<void()>> on_success;
+    list<std::function<void()>> on_finish;
     bool log_op_stat;
     
     RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt,
@@ -721,8 +746,10 @@ public:
       //sent_nvram(false),
       sent_disk(false),
       pg_local_last_complete(lc),
-      queue_snap_trimmer(false),
-      on_applied(NULL),
+      on_applied(std::move(c->on_applied)),
+      on_committed(std::move(c->on_committed)),
+      on_success(std::move(c->on_success)),
+      on_finish(std::move(c->on_finish)),
       log_op_stat(false) { }
 
     RepGather *get() {
@@ -733,7 +760,7 @@ public:
       assert(nref > 0);
       if (--nref == 0) {
        delete ctx; // must already be unlocked
-       assert(on_applied == NULL);
+       assert(on_applied.empty());
        delete this;
        //generic_dout(0) << "deleting " << this << dendl;
       }
@@ -795,6 +822,11 @@ protected:
     release_op_ctx_locks(ctx);
     delete ctx->op_t;
     ctx->op_t = NULL;
+    for (auto p = ctx->on_finish.begin();
+        p != ctx->on_finish.end();
+        ctx->on_finish.erase(p++)) {
+      (*p)();
+    }
     ctx->finish(r);
     delete ctx;
   }
@@ -896,11 +928,14 @@ protected:
   void repop_all_committed(RepGather *repop);
   void eval_repop(RepGather*);
   void issue_repop(RepGather *repop);
-  RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, ceph_tid_t rep_tid);
+  RepGather *new_repop(
+    OpContext *ctx,
+    ObjectContextRef obc,
+    ceph_tid_t rep_tid);
   void remove_repop(RepGather *repop);
 
-  RepGather *simple_repop_create(ObjectContextRef obc);
-  void simple_repop_submit(RepGather *repop);
+  OpContextUPtr simple_opc_create(ObjectContextRef obc);
+  void simple_opc_submit(OpContextUPtr ctx);
 
   // hot/cold tracking
   HitSetRef hit_set;        ///< currently accumulating HitSet
@@ -913,7 +948,7 @@ protected:
   void hit_set_create();    ///< create a new HitSet
   void hit_set_persist();   ///< persist hit info
   bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
-  void hit_set_trim(RepGather *repop, unsigned max); ///< discard old HitSets
+  void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
   void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
   void hit_set_remove_all();
 
@@ -1474,7 +1509,7 @@ public:
     ThreadPool::TPHandle &handle);
   void do_backfill(OpRequestRef op);
 
-  RepGather *trim_object(const hobject_t &coid);
+  OpContextUPtr trim_object(const hobject_t &coid);
   void snap_trimmer(epoch_t e);
   int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
 
@@ -1544,7 +1579,7 @@ private:
   };
   struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
     ReplicatedPG *pg;
-    set<RepGather *> repops;
+    set<hobject_t, hobject_t::BitwiseComparator> in_flight;
     snapid_t snap_to_trim;
     bool need_share_pg_info;
     explicit SnapTrimmer(ReplicatedPG *pg) : pg(pg), need_share_pg_info(false) {}