]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/osd: Clean up PeeringCtx ownership semantics and pointer/ref usage
authorSamuel Just <sjust@redhat.com>
Fri, 3 May 2019 18:48:06 +0000 (11:48 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 10 May 2019 00:22:26 +0000 (17:22 -0700)
PeeringCtx was a bit of a mess in terms of tracking down where the
resources were cleaned up.  This patch:
- Reworks PeeringCtx to directly own its maps/transaction
- Introduces PeeringCtxWrapper to deal with the message buffering
  internally
- Cleans up users to avoid passing pointers through the PeeringState
  interface unnecessarily.

This'll allow the PeeringCtx destructor to do its job vastly
simplifying usage in crimson.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index 597d41f0de2264358fef0d85cf56e626e7ede42c..3aca91296570b7d9ce841c5409b23e3f6b383e76 100644 (file)
@@ -4083,8 +4083,8 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
                 << "the pool allows ec overwrites but is not stored in "
                 << "bluestore, so deep scrubbing will not detect bitrot";
   }
-  PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
-  PG::_init(*rctx.transaction, pgid, pp);
+  PG::_create(rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
+  PG::_init(rctx.transaction, pgid, pp);
 
   int role = startmap->calc_pg_role(whoami, acting, acting.size());
   if (!pp->is_replicated() && role != pgid.shard) {
@@ -4121,8 +4121,8 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
     pg->set_dynamic_perf_stats_queries(m_perf_queries);
   }
 
-  pg->handle_initialize(&rctx);
-  pg->handle_activate_map(&rctx);
+  pg->handle_initialize(rctx);
+  pg->handle_activate_map(rctx);
 
   dispatch_context(rctx, pg.get(), osdmap, nullptr);
 
@@ -8016,7 +8016,7 @@ void OSD::_finish_splits(set<PGRef>& pgs)
     pg->lock();
     dout(10) << __func__ << " " << *pg << dendl;
     epoch_t e = pg->get_osdmap_epoch();
-    pg->handle_initialize(&rctx);
+    pg->handle_initialize(rctx);
     pg->queue_null(e, e);
     dispatch_context_transaction(rctx, pg);
     pg->unlock();
@@ -8044,7 +8044,7 @@ bool OSD::advance_pg(
   epoch_t osd_epoch,
   PG *pg,
   ThreadPool::TPHandle &handle,
-  PG::PeeringCtx *rctx)
+  PG::PeeringCtx &rctx)
 {
   if (osd_epoch <= pg->get_osdmap_epoch()) {
     return true;
@@ -8083,7 +8083,7 @@ bool OSD::advance_pg(
                  << " is merge source, target is " << parent
                   << dendl;
          pg->write_if_dirty(rctx);
-         dispatch_context_transaction(*rctx, pg, &handle);
+         dispatch_context_transaction(rctx, pg, &handle);
          pg->ch->flush();
          pg->on_shutdown();
          OSDShard *sdata = pg->osd_shard;
@@ -8201,7 +8201,7 @@ bool OSD::advance_pg(
   ret = true;
  out:
   if (!new_pgs.empty()) {
-    rctx->transaction->register_on_applied(new C_FinishSplits(this, new_pgs));
+    rctx.transaction.register_on_applied(new C_FinishSplits(this, new_pgs));
   }
   return ret;
 }
@@ -8480,7 +8480,7 @@ void OSD::split_pgs(
   const set<spg_t> &childpgids, set<PGRef> *out_pgs,
   OSDMapRef curmap,
   OSDMapRef nextmap,
-  PG::PeeringCtx *rctx)
+  PG::PeeringCtx &rctx)
 {
   unsigned pg_num = nextmap->get_pg_num(parent->pg_id.pool());
   parent->update_snap_mapper_bits(parent->get_pgid().get_split_bits(pg_num));
@@ -8514,17 +8514,17 @@ void OSD::split_pgs(
       split_bits,
       i->ps(),
       &child->get_pool().info,
-      rctx->transaction);
+      rctx.transaction);
     parent->split_into(
       i->pgid,
       child,
       split_bits);
 
-    child->finish_split_stats(*stat_iter, rctx->transaction);
+    child->finish_split_stats(*stat_iter, rctx.transaction);
     child->unlock();
   }
   ceph_assert(stat_iter != updated_stats.end());
-  parent->finish_split_stats(*stat_iter, rctx->transaction);
+  parent->finish_split_stats(*stat_iter, rctx.transaction);
 }
 
 /*
@@ -8630,27 +8630,18 @@ void OSD::handle_pg_create(OpRequestRef op)
 
 PG::PeeringCtx OSD::create_context()
 {
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  map<int, map<spg_t,pg_query_t> > *query_map =
-    new map<int, map<spg_t, pg_query_t> >;
-  map<int,vector<pair<pg_notify_t, PastIntervals> > > *notify_list =
-    new map<int, vector<pair<pg_notify_t, PastIntervals> > >;
-  map<int,vector<pair<pg_notify_t, PastIntervals> > > *info_map =
-    new map<int,vector<pair<pg_notify_t, PastIntervals> > >;
-  PG::PeeringCtx rctx(query_map, info_map, notify_list, t);
-  return rctx;
+  return PG::PeeringCtx();
 }
 
 void OSD::dispatch_context_transaction(PG::PeeringCtx &ctx, PG *pg,
                                        ThreadPool::TPHandle *handle)
 {
-  if (!ctx.transaction->empty() || ctx.transaction->has_contexts()) {
+  if (!ctx.transaction.empty() || ctx.transaction.has_contexts()) {
     int tr = store->queue_transaction(
       pg->ch,
-      std::move(*ctx.transaction), TrackedOpRef(), handle);
+      std::move(ctx.transaction), TrackedOpRef(), handle);
     ceph_assert(tr == 0);
-    delete (ctx.transaction);
-    ctx.transaction = new ObjectStore::Transaction;
+    ctx.reset_transaction();
   }
 }
 
@@ -8662,32 +8653,19 @@ void OSD::dispatch_context(PG::PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
   } else if (!is_active()) {
     dout(20) << __func__ << " not active" << dendl;
   } else {
-    do_notifies(*ctx.notify_list, curmap);
-    do_queries(*ctx.query_map, curmap);
-    do_infos(*ctx.info_map, curmap);
+    do_notifies(ctx.notify_list, curmap);
+    do_queries(ctx.query_map, curmap);
+    do_infos(ctx.info_map, curmap);
   }
-  if ((!ctx.transaction->empty() || ctx.transaction->has_contexts()) && pg) {
+  if ((!ctx.transaction.empty() || ctx.transaction.has_contexts()) && pg) {
     int tr = store->queue_transaction(
       pg->ch,
-      std::move(*ctx.transaction), TrackedOpRef(),
+      std::move(ctx.transaction), TrackedOpRef(),
       handle);
     ceph_assert(tr == 0);
   }
-  delete ctx.notify_list;
-  delete ctx.query_map;
-  delete ctx.info_map;
-  delete ctx.transaction;
 }
 
-void OSD::discard_context(PG::PeeringCtx& ctx)
-{
-  delete ctx.notify_list;
-  delete ctx.query_map;
-  delete ctx.info_map;
-  delete ctx.transaction;
-}
-
-
 /** do_notifies
  * Send an MOSDPGNotify to a primary, with a list of PGs that I have
  * content for, and they are primary for.
@@ -9155,7 +9133,7 @@ void OSD::do_recovery(
     if (do_unfound) {
       PG::PeeringCtx rctx = create_context();
       rctx.handle = &handle;
-      pg->find_unfound(queued, &rctx);
+      pg->find_unfound(queued, rctx);
       dispatch_context(rctx, pg, pg->get_osdmap());
     }
   }
@@ -9340,11 +9318,9 @@ void OSD::dequeue_peering_evt(
       derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
       ceph_abort();
     }
-  } else if (advance_pg(curmap->get_epoch(), pg, handle, &rctx)) {
-    pg->do_peering_event(evt, &rctx);
+  } else if (advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
+    pg->do_peering_event(evt, rctx);
     if (pg->is_deleted()) {
-      // do not dispatch rctx; the final _delete_some already did it.
-      discard_context(rctx);
       pg->unlock();
       return;
     }
index 3856ab27196c278451f28f2734125b120240e81b..cb6f4b87bf93aa85612b5d40617ee985470af6a5 100644 (file)
@@ -1700,7 +1700,7 @@ protected:
     epoch_t advance_to,
     PG *pg,
     ThreadPool::TPHandle &handle,
-    PG::PeeringCtx *rctx);
+    PG::PeeringCtx &rctx);
   void consume_map();
   void activate_map();
 
@@ -1793,7 +1793,7 @@ protected:
     const set<spg_t> &childpgids, set<PGRef> *out_pgs,
     OSDMapRef curmap,
     OSDMapRef nextmap,
-    PG::PeeringCtx *rctx);
+    PG::PeeringCtx &rctx);
   void _finish_splits(set<PGRef>& pgs);
 
   // == monitor interaction ==
index 61ef4e4e4a3ed8fa5d1bb242ce3e71af42f07baf..d6565eb5057209b14fef809cc75f1960abc0ade4 100644 (file)
@@ -553,12 +553,12 @@ void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t>
   recovery_state.start_split_stats(childpgs, out);
 }
 
-void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction *t)
+void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
 {
   recovery_state.finish_split_stats(stats, t);
 }
 
-void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx *rctx,
+void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
                    unsigned split_bits,
                    const pg_merge_meta_t& last_pg_merge_meta)
 {
@@ -573,14 +573,14 @@ void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx *rctx,
   for (auto& i : sources) {
     auto& source = i.second;
     // wipe out source's pgmeta
-    rctx->transaction->remove(source->coll, source->pgmeta_oid);
+    rctx.transaction.remove(source->coll, source->pgmeta_oid);
 
     // merge (and destroy source collection)
-    rctx->transaction->merge_collection(source->coll, coll, split_bits);
+    rctx.transaction.merge_collection(source->coll, coll, split_bits);
   }
 
   // merge_collection does this, but maybe all of our sources were missing.
-  rctx->transaction->collection_set_bits(coll, split_bits);
+  rctx.transaction.collection_set_bits(coll, split_bits);
 
   snap_mapper.update_bits(split_bits);
 }
@@ -857,7 +857,7 @@ void PG::init(
   const pg_history_t& history,
   const PastIntervals& pi,
   bool backfill,
-  ObjectStore::Transaction *t)
+  ObjectStore::Transaction &t)
 {
   recovery_state.init(
     role, newup, new_up_primary, newacting,
@@ -1235,13 +1235,12 @@ void PG::read_state(ObjectStore *store)
       recovery_state.set_role(-1);
   }
 
-  PG::PeeringCtx rctx(0, 0, 0, new ObjectStore::Transaction);
-  handle_initialize(&rctx);
+  PG::PeeringCtx rctx;
+  handle_initialize(rctx);
   // note: we don't activate here because we know the OSD will advance maps
   // during boot.
-  write_if_dirty(*rctx.transaction);
-  store->queue_transaction(ch, std::move(*rctx.transaction));
-  delete rctx.transaction;
+  write_if_dirty(rctx.transaction);
+  store->queue_transaction(ch, std::move(rctx.transaction));
 }
 
 void PG::update_snap_map(
@@ -3372,13 +3371,13 @@ struct FlushState {
 };
 typedef std::shared_ptr<FlushState> FlushStateRef;
 
-void PG::start_flush_on_transaction(ObjectStore::Transaction *t)
+void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
 {
   // flush in progress ops
   FlushStateRef flush_trigger (std::make_shared<FlushState>(
                                this, get_osdmap_epoch()));
-  t->register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
-  t->register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
+  t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
+  t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
 }
 
 bool PG::try_flush_or_schedule_async()
@@ -3617,18 +3616,18 @@ bool PG::can_discard_request(OpRequestRef& op)
   return true;
 }
 
-void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx *rctx)
+void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
 {
   dout(10) << __func__ << ": " << evt->get_desc() << dendl;
   ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
   if (old_peering_evt(evt)) {
     dout(10) << "discard old " << evt->get_desc() << dendl;
   } else {
-    recovery_state.handle_event(evt, rctx);
+    recovery_state.handle_event(evt, &rctx);
   }
   // write_if_dirty regardless of path above to ensure we capture any work
   // done by OSD::advance_pg().
-  write_if_dirty(*rctx->transaction);
+  write_if_dirty(rctx.transaction);
 }
 
 void PG::queue_peering_event(PGPeeringEventRef evt)
@@ -3647,7 +3646,7 @@ void PG::queue_null(epoch_t msg_epoch,
                                         NullEvt())));
 }
 
-void PG::find_unfound(epoch_t queued, PeeringCtx *rctx)
+void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
 {
   /*
     * if we couldn't start any recovery ops and things are still
@@ -3655,8 +3654,8 @@ void PG::find_unfound(epoch_t queued, PeeringCtx *rctx)
     * It may be that our initial locations were bad and we errored
     * out while trying to pull.
     */
-  recovery_state.discover_all_missing(*rctx->query_map);
-  if (rctx->query_map->empty()) {
+  recovery_state.discover_all_missing(rctx.query_map);
+  if (rctx.query_map.empty()) {
     string action;
     if (state_test(PG_STATE_BACKFILLING)) {
       auto evt = PGPeeringEventRef(
@@ -3688,7 +3687,7 @@ void PG::handle_advance_map(
   OSDMapRef osdmap, OSDMapRef lastmap,
   vector<int>& newup, int up_primary,
   vector<int>& newacting, int acting_primary,
-  PeeringCtx *rctx)
+  PeeringCtx &rctx)
 {
   dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
   osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
@@ -3702,7 +3701,7 @@ void PG::handle_advance_map(
     rctx);
 }
 
-void PG::handle_activate_map(PeeringCtx *rctx)
+void PG::handle_activate_map(PeeringCtx &rctx)
 {
   dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
           << dendl;
@@ -3711,11 +3710,11 @@ void PG::handle_activate_map(PeeringCtx *rctx)
   requeue_map_waiters();
 }
 
-void PG::handle_initialize(PeeringCtx *rctx)
+void PG::handle_initialize(PeeringCtx &rctx)
 {
   dout(10) << __func__ << dendl;
   PeeringState::Initialize evt;
-  recovery_state.handle_event(evt, rctx);
+  recovery_state.handle_event(evt, &rctx);
 }
 
 void PG::handle_query_state(Formatter *f)
@@ -3766,7 +3765,7 @@ void PG::C_DeleteMore::complete(int r) {
   delete this;
 }
 
-void PG::do_delete_work(ObjectStore::Transaction *t)
+void PG::do_delete_work(ObjectStore::Transaction &t)
 {
   dout(10) << __func__ << dendl;
 
@@ -3812,7 +3811,7 @@ void PG::do_delete_work(ObjectStore::Transaction *t)
     &next);
   dout(20) << __func__ << " " << olist << dendl;
 
-  OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+  OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
   int64_t num = 0;
   for (auto& oid : olist) {
     if (oid == pgmeta_oid) {
@@ -3826,13 +3825,13 @@ void PG::do_delete_work(ObjectStore::Transaction *t)
     if (r != 0 && r != -ENOENT) {
       ceph_abort();
     }
-    t->remove(coll, oid);
+    t.remove(coll, oid);
     ++num;
   }
   if (num) {
     dout(20) << __func__ << " deleting " << num << " objects" << dendl;
     Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
-    t->register_on_commit(fin);
+    t.register_on_commit(fin);
   } else {
     dout(20) << __func__ << " finished" << dendl;
     if (cct->_conf->osd_inject_failure_on_pg_removal) {
@@ -3843,21 +3842,21 @@ void PG::do_delete_work(ObjectStore::Transaction *t)
     // are the SnapMapper ContainerContexts.
     {
       PGRef pgref(this);
-      PGLog::clear_info_log(info.pgid, t);
-      t->remove_collection(coll);
-      t->register_on_commit(new ContainerContext<PGRef>(pgref));
-      t->register_on_applied(new ContainerContext<PGRef>(pgref));
-      osd->store->queue_transaction(ch, std::move(*t));
+      PGLog::clear_info_log(info.pgid, &t);
+      t.remove_collection(coll);
+      t.register_on_commit(new ContainerContext<PGRef>(pgref));
+      t.register_on_applied(new ContainerContext<PGRef>(pgref));
+      osd->store->queue_transaction(ch, std::move(t));
     }
     ch->flush();
 
     if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
       dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
       ch = osd->store->create_new_collection(coll);
-      _create(*t,
+      _create(t,
              info.pgid,
              info.pgid.get_split_bits(pool.info.get_pg_num()));
-      _init(*t, info.pgid, &pool.info);
+      _init(t, info.pgid, &pool.info);
       recovery_state.reset_last_persisted();
     } else {
       recovery_state.set_delete_complete();
index f2e651e7bd35aae100da852b637805ae2a0c4588..d6119ca70f591a7649c49b68191cc15fd51340e2 100644 (file)
@@ -346,7 +346,7 @@ public:
     const pg_history_t& history,
     const PastIntervals& pim,
     bool backfill,
-    ObjectStore::Transaction *t);
+    ObjectStore::Transaction &t);
 
   /// read existing pg state off disk
   void read_state(ObjectStore *store);
@@ -375,12 +375,13 @@ public:
     int split_bits,
     int seed,
     const pg_pool_t *pool,
-    ObjectStore::Transaction *t) = 0;
+    ObjectStore::Transaction &t) = 0;
   void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
-  void merge_from(map<spg_t,PGRef>& sources, PeeringCtx *rctx,
+  void merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
                  unsigned split_bits,
                  const pg_merge_meta_t& last_pg_merge_meta);
-  void finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction *t);
+  void finish_split_stats(const object_stat_sum_t& stats,
+                         ObjectStore::Transaction &t);
 
   void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
 
@@ -410,9 +411,9 @@ public:
   void clear_primary_state() override;
 
   epoch_t oldest_stored_osdmap() override;
-  OstreamTemp &get_clog_error() override;
-  OstreamTemp &get_clog_info() override;
-  OstreamTemp &get_clog_debug() override;
+  OstreamTemp get_clog_error() override;
+  OstreamTemp get_clog_info() override;
+  OstreamTemp get_clog_debug() override;
 
   void schedule_event_after(
     PGPeeringEventRef event,
@@ -467,11 +468,11 @@ public:
   }
 
   PGLog::LogEntryHandlerRef get_log_handler(
-    ObjectStore::Transaction *t) override {
-    return std::make_unique<PG::PGLogEntryHandler>(this, t);
+    ObjectStore::Transaction &t) override {
+    return std::make_unique<PG::PGLogEntryHandler>(this, &t);
   }
 
-  void do_delete_work(ObjectStore::Transaction *t) override;
+  void do_delete_work(ObjectStore::Transaction &t) override;
 
   void clear_ready_to_merge() override;
   void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override;
@@ -484,16 +485,16 @@ public:
   void rebuild_missing_set_with_deletes(PGLog &pglog) override;
 
   void queue_peering_event(PGPeeringEventRef evt);
-  void do_peering_event(PGPeeringEventRef evt, PeeringCtx *rcx);
+  void do_peering_event(PGPeeringEventRef evt, PeeringCtx &rcx);
   void queue_null(epoch_t msg_epoch, epoch_t query_epoch);
   void queue_flushed(epoch_t started_at);
   void handle_advance_map(
     OSDMapRef osdmap, OSDMapRef lastmap,
     vector<int>& newup, int up_primary,
     vector<int>& newacting, int acting_primary,
-    PeeringCtx *rctx);
-  void handle_activate_map(PeeringCtx *rctx);
-  void handle_initialize(PeeringCtx *rctx);
+    PeeringCtx &rctx);
+  void handle_activate_map(PeeringCtx &rctx);
+  void handle_initialize(PeeringCtx &rxcx);
   void handle_query_state(Formatter *f);
 
   /**
@@ -506,7 +507,7 @@ public:
     uint64_t *ops_begun) = 0;
 
   // more work after the above, but with a PeeringCtx
-  void find_unfound(epoch_t queued, PeeringCtx *rctx);
+  void find_unfound(epoch_t queued, PeeringCtx &rctx);
 
   virtual void get_watchers(std::list<obj_watch_item_t> *ls) = 0;
 
@@ -1432,8 +1433,8 @@ public:
     bool try_fast_info,
     PerfCounters *logger = nullptr);
 
-  void write_if_dirty(PeeringCtx *rctx) {
-    write_if_dirty(*rctx->transaction);
+  void write_if_dirty(PeeringCtx &rctx) {
+    write_if_dirty(rctx.transaction);
   }
 protected:
   void write_if_dirty(ObjectStore::Transaction& t) {
@@ -1476,7 +1477,7 @@ protected:
 
   bool try_flush_or_schedule_async() override;
   void start_flush_on_transaction(
-    ObjectStore::Transaction *t) override;
+    ObjectStore::Transaction &t) override;
 
   void update_history(const pg_history_t& history) {
     recovery_state.update_history(history);
index 23020c953267603b22ea91659a9bdaec73a2df6c..75f56677a67d24cf456426d81cc7042cc3906c69 100644 (file)
@@ -101,8 +101,7 @@ void PGPool::update(CephContext *cct, OSDMapRef map)
 void PeeringState::PeeringMachine::send_query(
   pg_shard_t to, const pg_query_t &query) {
   ceph_assert(state->rctx);
-  ceph_assert(state->rctx->query_map);
-  (*state->rctx->query_map)[to.osd][
+  state->rctx->query_map[to.osd][
     spg_t(context< PeeringMachine >().spgid.pgid, to.shard)] = query;
 }
 
@@ -143,9 +142,9 @@ void PeeringState::start_handle(PeeringCtx *new_ctx) {
   orig_ctx = new_ctx;
   if (new_ctx) {
     if (messages_pending_flush) {
-      rctx = PeeringCtx(*messages_pending_flush, *new_ctx);
+      rctx.emplace(*messages_pending_flush, *new_ctx);
     } else {
-      rctx = *new_ctx;
+      rctx.emplace(*new_ctx);
     }
     rctx->start_time = ceph_clock_now();
   }
@@ -156,7 +155,7 @@ void PeeringState::begin_block_outgoing() {
   ceph_assert(orig_ctx);
   ceph_assert(rctx);
   messages_pending_flush = BufferedRecoveryMessages();
-  rctx = PeeringCtx(*messages_pending_flush, *orig_ctx);
+  rctx.emplace(*messages_pending_flush, *orig_ctx);
 }
 
 void PeeringState::clear_blocked_outgoing() {
@@ -170,8 +169,8 @@ void PeeringState::end_block_outgoing() {
   ceph_assert(orig_ctx);
   ceph_assert(rctx);
 
-  rctx = PeeringCtx(*orig_ctx);
-  rctx->accept_buffered_messages(*messages_pending_flush);
+  orig_ctx->accept_buffered_messages(*messages_pending_flush);
+  rctx.emplace(*orig_ctx);
   messages_pending_flush = boost::optional<BufferedRecoveryMessages>();
 }
 
@@ -182,7 +181,7 @@ void PeeringState::end_handle() {
   }
 
   machine.event_count++;
-  rctx = boost::optional<PeeringCtx>();
+  rctx = std::nullopt;
   orig_ctx = NULL;
 }
 
@@ -389,7 +388,7 @@ void PeeringState::advance_map(
   OSDMapRef osdmap, OSDMapRef lastmap,
   vector<int>& newup, int up_primary,
   vector<int>& newacting, int acting_primary,
-  PeeringCtx *rctx)
+  PeeringCtx &rctx)
 {
   ceph_assert(lastmap->get_epoch() == osdmap_ref->get_epoch());
   ceph_assert(lastmap == osdmap_ref);
@@ -404,18 +403,18 @@ void PeeringState::advance_map(
   AdvMap evt(
     osdmap, lastmap, newup, up_primary,
     newacting, acting_primary);
-  handle_event(evt, rctx);
+  handle_event(evt, &rctx);
   if (pool.info.last_change == osdmap_ref->get_epoch()) {
     pl->on_pool_change();
   }
   last_require_osd_release = osdmap->require_osd_release;
 }
 
-void PeeringState::activate_map(PeeringCtx *rctx)
+void PeeringState::activate_map(PeeringCtx &rctx)
 {
   psdout(10) << __func__ << dendl;
   ActMap evt;
-  handle_event(evt, rctx);
+  handle_event(evt, &rctx);
   if (osdmap_ref->get_epoch() - last_persisted_osdmap >
     cct->_conf->osd_pg_epoch_persisted_max_stale) {
     psdout(20) << __func__ << ": Dirtying info: last_persisted is "
@@ -427,7 +426,7 @@ void PeeringState::activate_map(PeeringCtx *rctx)
              << last_persisted_osdmap
              << " while current is " << osdmap_ref->get_epoch() << dendl;
   }
-  write_if_dirty(*rctx->transaction);
+  write_if_dirty(rctx.transaction);
 
   if (get_osdmap()->check_new_blacklist_entries()) {
     pl->check_blacklisted_watchers();
@@ -521,7 +520,7 @@ void PeeringState::start_peering_interval(
   const OSDMapRef lastmap,
   const vector<int>& newup, int new_up_primary,
   const vector<int>& newacting, int new_acting_primary,
-  ObjectStore::Transaction *t)
+  ObjectStore::Transaction &t)
 {
   const OSDMapRef osdmap = get_osdmap();
 
@@ -1960,11 +1959,11 @@ void PeeringState::log_weirdness()
 bool PeeringState::search_for_missing(
   const pg_info_t &oinfo, const pg_missing_t &omissing,
   pg_shard_t from,
-  PeeringCtx *ctx)
+  PeeringCtxWrapper &ctx)
 {
   uint64_t num_unfound_before = missing_loc.num_unfound();
   bool found_missing = missing_loc.add_source_info(
-    from, oinfo, omissing, ctx->handle);
+    from, oinfo, omissing, ctx.handle);
   if (found_missing && num_unfound_before != missing_loc.num_unfound())
     pl->publish_stats_to_osd();
   // avoid doing this if the peer is empty.  This is abit of paranoia
@@ -1975,7 +1974,7 @@ bool PeeringState::search_for_missing(
       oinfo.last_update != eversion_t()) {
     pg_info_t tinfo(oinfo);
     tinfo.pgid.shard = pg_whoami.shard;
-    (*(ctx->info_map))[from.osd].emplace_back(
+    ctx.info_map[from.osd].emplace_back(
       pg_notify_t(
        from.shard, pg_whoami.shard,
        get_osdmap_epoch(),
@@ -2082,7 +2081,7 @@ void PeeringState::activate(
   map<int,
   vector<
   pair<pg_notify_t, PastIntervals> > > *activator_map,
-  PeeringCtx *ctx)
+  PeeringCtxWrapper &ctx)
 {
   ceph_assert(!is_peered());
 
@@ -2180,7 +2179,6 @@ void PeeringState::activate(
       info.purged_snaps.swap(purged);
     }
 
-    ceph_assert(ctx);
     // start up replicas
 
     ceph_assert(!acting_recovery_backfill.empty());
@@ -2362,10 +2360,10 @@ void PeeringState::activate(
       // and covers vast majority of the use cases, like one OSD/host is down for
       // a while for hardware repairing
       if (complete_shards.size() + 1 == acting_recovery_backfill.size()) {
-        missing_loc.add_batch_sources_info(complete_shards, ctx->handle);
+        missing_loc.add_batch_sources_info(complete_shards, ctx.handle);
       } else {
         missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
-                                   ctx->handle);
+                                   ctx.handle);
         for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
             i != acting_recovery_backfill.end();
             ++i) {
@@ -2377,7 +2375,7 @@ void PeeringState::activate(
            *i,
            peer_info[*i],
            peer_missing[*i],
-            ctx->handle);
+            ctx.handle);
         }
       }
       for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
@@ -2410,7 +2408,7 @@ void PeeringState::activate(
     pl->on_activate(std::move(to_trim));
   }
   if (acting.size() >= pool.info.min_size) {
-    PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+    PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
     pg_log.roll_forward(rollbacker.get());
   }
 }
@@ -2448,7 +2446,7 @@ void PeeringState::merge_log(
   ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
   pg_shard_t from)
 {
-  PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+  PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
   pg_log.merge_log(
     oinfo, olog, from, info, rollbacker.get(), dirty_info, dirty_big_info);
 }
@@ -2456,7 +2454,7 @@ void PeeringState::merge_log(
 void PeeringState::rewind_divergent_log(
   ObjectStore::Transaction& t, eversion_t newhead)
 {
-  PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+  PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
   pg_log.rewind_divergent_log(
     newhead, info, rollbacker.get(), dirty_info, dirty_big_info);
 }
@@ -2594,13 +2592,13 @@ void PeeringState::fulfill_log(
   pl->send_cluster_message(from.osd, mlog, get_osdmap_epoch(), true);
 }
 
-void PeeringState::fulfill_query(const MQuery& query, PeeringCtx *rctx)
+void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
 {
   if (query.query.type == pg_query_t::INFO) {
     pair<pg_shard_t, pg_info_t> notify_info;
     update_history(query.query.history);
     fulfill_info(query.from, query.query, notify_info);
-    rctx->send_notify(
+    rctx.send_notify(
       notify_info.first,
       pg_notify_t(
        notify_info.first.shard, pg_whoami.shard,
@@ -2740,7 +2738,7 @@ void PeeringState::split_into(
 
 void PeeringState::merge_from(
   map<spg_t,PeeringState *>& sources,
-  PeeringCtx *rctx,
+  PeeringCtx &rctx,
   unsigned split_bits,
   const pg_merge_meta_t& last_pg_merge_meta)
 {
@@ -2766,7 +2764,7 @@ void PeeringState::merge_from(
     }
   }
 
-  PGLog::LogEntryHandlerRef handler{pl->get_log_handler(rctx->transaction)};
+  PGLog::LogEntryHandlerRef handler{pl->get_log_handler(rctx.transaction)};
   pg_log.roll_forward(handler.get());
 
   info.last_complete = info.last_update;  // to fake out trim()
@@ -2805,7 +2803,7 @@ void PeeringState::merge_from(
 
     // prepare log
     PGLog::LogEntryHandlerRef handler{
-      source->pl->get_log_handler(rctx->transaction)};
+      source->pl->get_log_handler(rctx.transaction)};
     source->pg_log.roll_forward(handler.get());
     source->info.last_complete = source->info.last_update;  // to fake out trim()
     source->pg_log.reset_recovery_pointers();
@@ -2903,10 +2901,10 @@ void PeeringState::start_split_stats(
 }
 
 void PeeringState::finish_split_stats(
-  const object_stat_sum_t& stats, ObjectStore::Transaction *t)
+  const object_stat_sum_t& stats, ObjectStore::Transaction &t)
 {
   info.stats.stats.sum = stats;
-  write_if_dirty(*t);
+  write_if_dirty(t);
 }
 
 void PeeringState::update_blocked_by()
@@ -3332,7 +3330,7 @@ void PeeringState::init(
   const pg_history_t& history,
   const PastIntervals& pi,
   bool backfill,
-  ObjectStore::Transaction *t)
+  ObjectStore::Transaction &t)
 {
   psdout(10) << "init role " << role << " up "
             << newup << " acting " << newacting
@@ -3367,7 +3365,7 @@ void PeeringState::init(
 
   dirty_info = true;
   dirty_big_info = true;
-  write_if_dirty(*t);
+  write_if_dirty(t);
 }
 
 void PeeringState::dump_peering_state(Formatter *f)
@@ -3444,7 +3442,7 @@ bool PeeringState::append_log_entries_update_missing(
   ceph_assert(!entries.empty());
   ceph_assert(entries.begin()->version > info.last_update);
 
-  PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+  PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
   bool invalidate_stats =
     pg_log.append_new_log_entries(
       info.last_backfill,
@@ -3570,7 +3568,7 @@ void PeeringState::append_log(
   }
   psdout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl;
 
-  PGLog::LogEntryHandlerRef handler{pl->get_log_handler(&t)};
+  PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)};
   if (!transaction_applied) {
      /* We must be a backfill or async recovery peer, so it's ok if we apply
       * out-of-turn since we won't be considered when
@@ -3629,7 +3627,7 @@ void PeeringState::recover_got(
      * write in question must be fully committed, so it's not valid
      * to roll it back anyway (and we'll be rolled forward shortly
      * anyway) */
-    PGLog::LogEntryHandlerRef handler{pl->get_log_handler(&t)};
+    PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)};
     pg_log.roll_forward_to(v, handler.get());
   }
 
@@ -5227,7 +5225,7 @@ PeeringState::Clean::Clean(my_context ctx)
 
   ps->try_mark_clean();
 
-  context< PeeringMachine >().get_cur_transaction()->register_on_commit(
+  context< PeeringMachine >().get_cur_transaction().register_on_commit(
     pl->on_clean());
 }
 
@@ -5280,10 +5278,10 @@ PeeringState::Active::Active(my_context ctx)
   ceph_assert(ps->is_primary());
   psdout(10) << "In Active, about to call activate" << dendl;
   ps->start_flush(context< PeeringMachine >().get_cur_transaction());
-  ps->activate(*context< PeeringMachine >().get_cur_transaction(),
+  ps->activate(context< PeeringMachine >().get_cur_transaction(),
               ps->get_osdmap_epoch(),
-              *context< PeeringMachine >().get_query_map(),
-              context< PeeringMachine >().get_info_map(),
+              context< PeeringMachine >().get_query_map(),
+              &context< PeeringMachine >().get_info_map(),
               context< PeeringMachine >().get_recovery_ctx());
 
   // everyone has to commit/ack before we are truly active
@@ -5370,7 +5368,7 @@ boost::statechart::result PeeringState::Active::react(const ActMap&)
 
   if (ps->have_unfound()) {
     // object may have become unfound
-    ps->discover_all_missing(*context< PeeringMachine >().get_query_map());
+    ps->discover_all_missing(context< PeeringMachine >().get_query_map());
   }
 
   uint64_t unfound = ps->missing_loc.num_unfound();
@@ -5409,7 +5407,7 @@ boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt)
     ps->proc_replica_info(
       notevt.from, notevt.notify.info, notevt.notify.epoch_sent);
     if (ps->have_unfound() || (ps->is_degraded() && ps->might_have_unfound.count(notevt.from))) {
-      ps->discover_all_missing(*context< PeeringMachine >().get_query_map());
+      ps->discover_all_missing(context< PeeringMachine >().get_query_map());
     }
   }
   return discard_event();
@@ -5649,9 +5647,12 @@ boost::statechart::result PeeringState::ReplicaActive::react(
   DECLARE_LOCALS
   psdout(10) << "In ReplicaActive, about to call activate" << dendl;
   map<int, map<spg_t, pg_query_t> > query_map;
-  ps->activate(*context< PeeringMachine >().get_cur_transaction(),
-              actevt.activation_epoch,
-              query_map, NULL, NULL);
+  ps->activate(
+    context< PeeringMachine >().get_cur_transaction(),
+    actevt.activation_epoch,
+    query_map,
+    NULL,
+    context< PeeringMachine >().get_recovery_ctx());
   psdout(10) << "Activate Finished" << dendl;
   return discard_event();
 }
@@ -5690,7 +5691,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(
 boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt)
 {
   DECLARE_LOCALS
-  ps->proc_primary_info(*context<PeeringMachine>().get_cur_transaction(),
+  ps->proc_primary_info(context<PeeringMachine>().get_cur_transaction(),
                        infoevt.info);
   return discard_event();
 }
@@ -5699,8 +5700,8 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLogRec& loge
 {
   DECLARE_LOCALS
   psdout(10) << "received log from " << logevt.from << dendl;
-  ObjectStore::Transactiont = context<PeeringMachine>().get_cur_transaction();
-  ps->merge_log(*t, logevt.msg->info, logevt.msg->log, logevt.from);
+  ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
+  ps->merge_log(t, logevt.msg->info, logevt.msg->log, logevt.from);
   ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
 
   return discard_event();
@@ -5786,7 +5787,7 @@ boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt)
   MOSDPGLog *msg = logevt.msg.get();
   psdout(10) << "got info+log from osd." << logevt.from << " " << msg->info << " " << msg->log << dendl;
 
-  ObjectStore::Transactiont = context<PeeringMachine>().get_cur_transaction();
+  ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
   if (msg->info.last_backfill == hobject_t()) {
     // restart backfill
     ps->info = msg->info;
@@ -5799,7 +5800,7 @@ boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt)
 
     ps->pg_log.reset_backfill();
   } else {
-    ps->merge_log(*t, msg->info, msg->log, logevt.from);
+    ps->merge_log(t, msg->info, msg->log, logevt.from);
   }
 
   ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
@@ -5815,8 +5816,8 @@ boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt)
 
   if (ps->info.last_update > infoevt.info.last_update) {
     // rewind divergent log entries
-    ObjectStore::Transactiont = context<PeeringMachine>().get_cur_transaction();
-    ps->rewind_divergent_log(*t, infoevt.info.last_update);
+    ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
+    ps->rewind_divergent_log(t, infoevt.info.last_update);
     ps->info.stats = infoevt.info.stats;
     ps->info.hit_set = infoevt.info.hit_set;
   }
@@ -5929,7 +5930,7 @@ PeeringState::Deleting::Deleting(my_context ctx)
   context< PeeringMachine >().log_enter(state_name);
   DECLARE_LOCALS
   ps->deleting = true;
-  ObjectStore::Transactiont = context<PeeringMachine>().get_cur_transaction();
+  ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
 
   // clear log
   PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
@@ -6210,7 +6211,7 @@ boost::statechart::result PeeringState::GetLog::react(const GotLog&)
   psdout(10) << "leaving GetLog" << dendl;
   if (msg) {
     psdout(10) << "processing master log" << dendl;
-    ps->proc_master_log(*context<PeeringMachine>().get_cur_transaction(),
+    ps->proc_master_log(context<PeeringMachine>().get_cur_transaction(),
                        msg->info, msg->log, msg->missing,
                        auth_log_shard);
   }
index 55d9a554fc54283decc8688d1ceda3cd3172f8db..c11dfa71be29954f2dfa05c2ee0816f9bd051c28 100644 (file)
@@ -94,7 +94,7 @@ public:
     virtual bool try_flush_or_schedule_async() = 0;
     /// Arranges for a commit on t to call on_flushed() once flushed.
     virtual void start_flush_on_transaction(
-      ObjectStore::Transaction *t) = 0;
+      ObjectStore::Transaction &t) = 0;
     /// Notification that all outstanding flushes for interval have completed
     virtual void on_flushed() = 0;
 
@@ -166,7 +166,7 @@ public:
     // =================== Event notification ====================
     virtual void on_pool_change() = 0;
     virtual void on_role_change() = 0;
-    virtual void on_change(ObjectStore::Transaction *t) = 0;
+    virtual void on_change(ObjectStore::Transaction &t) = 0;
     virtual void on_activate(interval_set<snapid_t> to_trim) = 0;
     virtual void on_activate_complete() = 0;
     virtual void on_new_interval() = 0;
@@ -176,9 +176,9 @@ public:
 
     // ====================== PG deletion =======================
     /// Notification of removal complete, t must be populated to complete removal
-    virtual void on_removal(ObjectStore::Transaction *t) = 0;
+    virtual void on_removal(ObjectStore::Transaction &t) = 0;
     /// Perform incremental removal work
-    virtual void do_delete_work(ObjectStore::Transaction *t) = 0;
+    virtual void do_delete_work(ObjectStore::Transaction &t) = 0;
 
     // ======================= PG Merge =========================
     virtual void clear_ready_to_merge() = 0;
@@ -205,7 +205,7 @@ public:
     // ================== Peering log events ====================
     /// Get handler for rolling forward/back log entries
     virtual PGLog::LogEntryHandlerRef get_log_handler(
-      ObjectStore::Transaction *t) = 0;
+      ObjectStore::Transaction &t) = 0;
 
     // ============ On disk representation changes ==============
     virtual void rebuild_missing_set_with_deletes(PGLog &pglog) = 0;
@@ -234,38 +234,23 @@ public:
   };
 
   struct PeeringCtx {
-    utime_t start_time;
-    map<int, map<spg_t, pg_query_t> > *query_map;
-    map<int, vector<pair<pg_notify_t, PastIntervals> > > *info_map;
-    map<int, vector<pair<pg_notify_t, PastIntervals> > > *notify_list;
-    ObjectStore::Transaction *transaction;
-    HBHandle* handle;
-    PeeringCtx(map<int, map<spg_t, pg_query_t> > *query_map,
-               map<int,
-                   vector<pair<pg_notify_t, PastIntervals> > > *info_map,
-               map<int,
-                   vector<pair<pg_notify_t, PastIntervals> > > *notify_list,
-               ObjectStore::Transaction *transaction)
-      : query_map(query_map), info_map(info_map),
-       notify_list(notify_list),
-       transaction(transaction),
-        handle(NULL) {}
-
-    PeeringCtx(BufferedRecoveryMessages &buf, PeeringCtx &rctx)
-      : query_map(&(buf.query_map)),
-       info_map(&(buf.info_map)),
-       notify_list(&(buf.notify_list)),
-       transaction(rctx.transaction),
-        handle(rctx.handle) {}
+    map<int, map<spg_t, pg_query_t> > query_map;
+    map<int, vector<pair<pg_notify_t, PastIntervals> > > info_map;
+    map<int, vector<pair<pg_notify_t, PastIntervals> > > notify_list;
+    ObjectStore::Transaction transaction;
+    HBHandle* handle = nullptr;
+
+    PeeringCtx() = default;
+
+    void reset_transaction() {
+      transaction = ObjectStore::Transaction();
+    }
 
     void accept_buffered_messages(BufferedRecoveryMessages &m) {
-      ceph_assert(query_map);
-      ceph_assert(info_map);
-      ceph_assert(notify_list);
       for (map<int, map<spg_t, pg_query_t> >::iterator i = m.query_map.begin();
           i != m.query_map.end();
           ++i) {
-       map<spg_t, pg_query_t> &omap = (*query_map)[i->first];
+       map<spg_t, pg_query_t> &omap = query_map[i->first];
        for (map<spg_t, pg_query_t>::iterator j = i->second.begin();
             j != i->second.end();
             ++j) {
@@ -277,7 +262,7 @@ public:
           i != m.info_map.end();
           ++i) {
        vector<pair<pg_notify_t, PastIntervals> > &ovec =
-         (*info_map)[i->first];
+         info_map[i->first];
        ovec.reserve(ovec.size() + i->second.size());
        ovec.insert(ovec.end(), i->second.begin(), i->second.end());
       }
@@ -286,18 +271,48 @@ public:
           i != m.notify_list.end();
           ++i) {
        vector<pair<pg_notify_t, PastIntervals> > &ovec =
-         (*notify_list)[i->first];
+         notify_list[i->first];
        ovec.reserve(ovec.size() + i->second.size());
        ovec.insert(ovec.end(), i->second.begin(), i->second.end());
       }
     }
+  };
+
+private:
+  /**
+   * Wraps PeeringCtx to hide the difference between buffering messages to
+   * be sent after flush or immediately.
+   */
+  struct PeeringCtxWrapper {
+    utime_t start_time;
+    map<int, map<spg_t, pg_query_t> > &query_map;
+    map<int, vector<pair<pg_notify_t, PastIntervals> > > &info_map;
+    map<int, vector<pair<pg_notify_t, PastIntervals> > > &notify_list;
+    ObjectStore::Transaction &transaction;
+    HBHandle * const handle = nullptr;
+
+    PeeringCtxWrapper(PeeringCtx &wrapped) :
+      query_map(wrapped.query_map),
+      info_map(wrapped.info_map),
+      notify_list(wrapped.notify_list),
+      transaction(wrapped.transaction),
+      handle(wrapped.handle) {}
+
+    PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
+      : query_map(buf.query_map),
+       info_map(buf.info_map),
+       notify_list(buf.notify_list),
+       transaction(wrapped.transaction),
+        handle(wrapped.handle) {}
+
+    PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
 
     void send_notify(pg_shard_t to,
                     const pg_notify_t &info, const PastIntervals &pi) {
-      ceph_assert(notify_list);
-      (*notify_list)[to.osd].emplace_back(info, pi);
+      notify_list[to.osd].emplace_back(info, pi);
     }
   };
+public:
 
   struct QueryState : boost::statechart::event< QueryState > {
     Formatter *f;
@@ -458,27 +473,27 @@ public:
       event_count(0) {}
 
     /* Accessor functions for state methods */
-    ObjectStore::Transaction* get_cur_transaction() {
+    ObjectStore::Transaction& get_cur_transaction() {
       ceph_assert(state->rctx);
-      ceph_assert(state->rctx->transaction);
       return state->rctx->transaction;
     }
 
     void send_query(pg_shard_t to, const pg_query_t &query);
 
-    map<int, map<spg_t, pg_query_t> > *get_query_map() {
+    map<int, map<spg_t, pg_query_t> > &get_query_map() {
       ceph_assert(state->rctx);
-      ceph_assert(state->rctx->query_map);
       return state->rctx->query_map;
     }
 
-    map<int, vector<pair<pg_notify_t, PastIntervals> > > *get_info_map() {
+    map<int, vector<pair<pg_notify_t, PastIntervals> > > &get_info_map() {
       ceph_assert(state->rctx);
-      ceph_assert(state->rctx->info_map);
       return state->rctx->info_map;
     }
 
-    PeeringCtx *get_recovery_ctx() { return &*(state->rctx); }
+    PeeringCtxWrapper &get_recovery_ctx() {
+      assert(state->rctx);
+      return *(state->rctx);
+    }
 
     void send_notify(pg_shard_t to,
               const pg_notify_t &info, const PastIntervals &pi) {
@@ -1204,7 +1219,7 @@ public:
    * the message lists for messages_pending_flush while blocking messages
    * or into orig_ctx otherwise
    */
-  boost::optional<PeeringCtx> rctx;
+  std::optional<PeeringCtxWrapper> rctx;
 
   /**
    * OSDMap state
@@ -1330,7 +1345,7 @@ public:
     const OSDMapRef lastmap,
     const vector<int>& newup, int up_primary,
     const vector<int>& newacting, int acting_primary,
-    ObjectStore::Transaction *t);
+    ObjectStore::Transaction &t);
   void on_new_interval();
   void clear_recovery_state();
   void clear_primary_state();
@@ -1403,7 +1418,7 @@ public:
   bool search_for_missing(
     const pg_info_t &oinfo, const pg_missing_t &omissing,
     pg_shard_t fromosd,
-    PeeringCtx*);
+    PeeringCtxWrapper &rctx);
   void build_might_have_unfound();
   void log_weirdness();
   void activate(
@@ -1411,7 +1426,7 @@ public:
     epoch_t activation_epoch,
     map<int, map<spg_t,pg_query_t> >& query_map,
     map<int, vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
-    PeeringCtx *ctx);
+    PeeringCtxWrapper &ctx);
 
   void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);
   void merge_log(
@@ -1449,7 +1464,7 @@ public:
     pair<pg_shard_t, pg_info_t> &notify_info);
   void fulfill_log(
     pg_shard_t from, const pg_query_t &query, epoch_t query_epoch);
-  void fulfill_query(const MQuery& q, PeeringCtx *rctx);
+  void fulfill_query(const MQuery& q, PeeringCtxWrapper &rctx);
 
   void try_mark_clean();
 
@@ -1473,7 +1488,7 @@ public:
 
   /// Process evt
   void handle_event(const boost::statechart::event_base &evt,
-             PeeringCtx *rctx) {
+                   PeeringCtx *rctx) {
     start_handle(rctx);
     machine.process_event(evt);
     end_handle();
@@ -1481,7 +1496,7 @@ public:
 
   /// Process evt
   void handle_event(PGPeeringEventRef evt,
-             PeeringCtx *rctx) {
+                   PeeringCtx *rctx) {
     start_handle(rctx);
     machine.process_event(evt->get_event());
     end_handle();
@@ -1495,7 +1510,7 @@ public:
     const pg_history_t& history,
     const PastIntervals& pi,
     bool backfill,
-    ObjectStore::Transaction *t);
+    ObjectStore::Transaction &t);
 
   /// Init pg instance from disk state
   template <typename F>
@@ -1538,7 +1553,7 @@ public:
 
   /// Update new child with stats
   void finish_split_stats(
-    const object_stat_sum_t& stats, ObjectStore::Transaction *t);
+    const object_stat_sum_t& stats, ObjectStore::Transaction &t);
 
   /// Split state for child_pgid into *child
   void split_into(
@@ -1547,7 +1562,7 @@ public:
   /// Merge state from sources
   void merge_from(
     map<spg_t,PeeringState *>& sources,
-    PeeringCtx *rctx,
+    PeeringCtx &rctx,
     unsigned split_bits,
     const pg_merge_meta_t& last_pg_merge_meta);
 
@@ -1771,12 +1786,12 @@ public:
     int up_primary,         ///< [in] new up primary
     vector<int>& newacting, ///< [in] new acting
     int acting_primary,     ///< [in] new acting primary
-    PeeringCtx *rctx        ///< [out] recovery context
+    PeeringCtx &rctx        ///< [out] recovery context
     );
 
   /// Activates most recently updated map
   void activate_map(
-    PeeringCtx *rctx        ///< [out] recovery context
+    PeeringCtx &rctx        ///< [out] recovery context
     );
 
   /// resets last_persisted_osdmap
@@ -2095,7 +2110,7 @@ private:
    * complete_flush is called once for each start_flush call as
    * required by start_flush_on_transaction).
    */
-  void start_flush(ObjectStore::Transaction *t) {
+  void start_flush(ObjectStore::Transaction &t) {
     flushes_in_progress++;
     pl->start_flush_on_transaction(t);
   }
index f0be0cea4f732d3434255fd9c19329fc89f3cf13..a95b5a4610a99966f4932169ea8428f53bfa72ab 100644 (file)
@@ -11792,13 +11792,13 @@ void PrimaryLogPG::on_flushed()
   }
 }
 
-void PrimaryLogPG::on_removal(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_removal(ObjectStore::Transaction &t)
 {
   dout(10) << __func__ << dendl;
 
   on_shutdown();
 
-  t->register_on_commit(new C_DeleteMore(this, get_osdmap_epoch()));
+  t.register_on_commit(new C_DeleteMore(this, get_osdmap_epoch()));
 }
 
 void PrimaryLogPG::clear_async_reads()
@@ -11928,7 +11928,7 @@ void PrimaryLogPG::on_activate_complete()
   agent_setup();
 }
 
-void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_change(ObjectStore::Transaction &t)
 {
   dout(10) << __func__ << dendl;
 
@@ -12019,8 +12019,8 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
   // registered watches.
   context_registry_on_change();
 
-  pgbackend->on_change_cleanup(t);
-  scrubber.cleanup_store(t);
+  pgbackend->on_change_cleanup(&t);
+  scrubber.cleanup_store(&t);
   pgbackend->on_change();
 
   // clear snap_trimmer state
index e91937e33b92eddda6178320faae0ed9edbe7116..88fc59ec5d808cb2da92acbe1aa7c910fb8a75e7 100644 (file)
@@ -1547,15 +1547,15 @@ public:
     int split_bits,
     int seed,
     const pg_pool_t *pool,
-    ObjectStore::Transaction *t) override {
+    ObjectStore::Transaction &t) override {
     coll_t target = coll_t(child);
-    PG::_create(*t, child, split_bits);
-    t->split_collection(
+    PG::_create(t, child, split_bits);
+    t.split_collection(
       coll,
       split_bits,
       seed,
       target);
-    PG::_init(*t, child, pool);
+    PG::_init(t, child, pool);
   }
 private:
 
@@ -1888,10 +1888,10 @@ public:
   void plpg_on_role_change() override;
   void plpg_on_pool_change() override;
   void clear_async_reads();
-  void on_change(ObjectStore::Transaction *t) override;
+  void on_change(ObjectStore::Transaction &t) override;
   void on_activate_complete() override;
   void on_flushed() override;
-  void on_removal(ObjectStore::Transaction *t) override;
+  void on_removal(ObjectStore::Transaction &t) override;
   void on_shutdown() override;
   bool check_failsafe_full() override;
   bool maybe_preempt_replica_scrub(const hobject_t& oid) override {