]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: move io reservation machinery into PeeringState
authorsjust@redhat.com <sjust@redhat.com>
Mon, 29 Apr 2019 20:53:24 +0000 (13:53 -0700)
committersjust@redhat.com <sjust@redhat.com>
Wed, 1 May 2019 18:22:15 +0000 (11:22 -0700)
This patch recasts the reservation and backoff interfaces
on PeeringListener in terms of events to queue rather than
explicit callbacks (PG handles implementing via callbacks).

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h
src/osd/PrimaryLogPG.cc

index 4109fafbf1955e3fc280bdd7fe89c6a5bc62faa9..c41420b35cd07a7179d6ff901e0f4473ee0de7f1 100644 (file)
@@ -253,7 +253,6 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   recovery_ops_active(0),
   need_up_thru(false),
   heartbeat_peer_lock("PG::heartbeat_peer_lock"),
-  backfill_reserved(false),
   backfill_reserving(false),
   pg_stats_publish_lock("PG::pg_stats_publish_lock"),
   pg_stats_publish_valid(false),
@@ -2001,162 +2000,6 @@ void PG::try_mark_clean()
   requeue_ops(waiting_for_clean_to_primary_repair);
 }
 
-bool PG::set_force_recovery(bool b)
-{
-  bool did = false;
-  if (b) {
-    if (!(state & PG_STATE_FORCED_RECOVERY) &&
-       (state & (PG_STATE_DEGRADED |
-                 PG_STATE_RECOVERY_WAIT |
-                 PG_STATE_RECOVERING))) {
-      dout(20) << __func__ << " set" << dendl;
-      state_set(PG_STATE_FORCED_RECOVERY);
-      publish_stats_to_osd();
-      did = true;
-    }
-  } else if (state & PG_STATE_FORCED_RECOVERY) {
-    dout(20) << __func__ << " clear" << dendl;
-    state_clear(PG_STATE_FORCED_RECOVERY);
-    publish_stats_to_osd();
-    did = true;
-  }
-  if (did) {
-    dout(20) << __func__ << " state " << recovery_state.get_current_state()
-            << dendl;
-    osd->local_reserver.update_priority(info.pgid, get_recovery_priority());
-  }
-  return did;
-}
-
-bool PG::set_force_backfill(bool b)
-{
-  bool did = false;
-  if (b) {
-    if (!(state & PG_STATE_FORCED_BACKFILL) &&
-       (state & (PG_STATE_DEGRADED |
-                 PG_STATE_BACKFILL_WAIT |
-                 PG_STATE_BACKFILLING))) {
-      dout(10) << __func__ << " set" << dendl;
-      state_set(PG_STATE_FORCED_BACKFILL);
-      publish_stats_to_osd();
-      did = true;
-    }
-  } else if (state & PG_STATE_FORCED_BACKFILL) {
-    dout(10) << __func__ << " clear" << dendl;
-    state_clear(PG_STATE_FORCED_BACKFILL);
-    publish_stats_to_osd();
-    did = true;
-  }
-  if (did) {
-    dout(20) << __func__ << " state " << recovery_state.get_current_state()
-            << dendl;
-    osd->local_reserver.update_priority(info.pgid, get_backfill_priority());
-  }
-  return did;
-}
-
-int PG::clamp_recovery_priority(int priority, int pool_recovery_priority, int max)
-{
-  static_assert(OSD_RECOVERY_PRIORITY_MIN < OSD_RECOVERY_PRIORITY_MAX, "Invalid priority range");
-  static_assert(OSD_RECOVERY_PRIORITY_MIN >= 0, "Priority range must match unsigned type");
-
-  ceph_assert(max <= OSD_RECOVERY_PRIORITY_MAX);
-
-  // User can't set this too high anymore, but might be a legacy value
-  if (pool_recovery_priority > OSD_POOL_PRIORITY_MAX)
-    pool_recovery_priority = OSD_POOL_PRIORITY_MAX;
-  if (pool_recovery_priority < OSD_POOL_PRIORITY_MIN)
-    pool_recovery_priority = OSD_POOL_PRIORITY_MIN;
-  // Shift range from min to max to 0 to max - min
-  pool_recovery_priority += (0 - OSD_POOL_PRIORITY_MIN);
-  ceph_assert(pool_recovery_priority >= 0 && pool_recovery_priority <= (OSD_POOL_PRIORITY_MAX - OSD_POOL_PRIORITY_MIN));
-
-  priority += pool_recovery_priority;
-
-  // Clamp to valid range
-  if (priority > max) {
-    return max;
-  } else if (priority < OSD_RECOVERY_PRIORITY_MIN) {
-    return OSD_RECOVERY_PRIORITY_MIN;
-  } else {
-    return priority;
-  }
-}
-
-unsigned PG::get_recovery_priority()
-{
-  // a higher value -> a higher priority
-  int ret = OSD_RECOVERY_PRIORITY_BASE;
-  int base = ret;
-
-  if (state & PG_STATE_FORCED_RECOVERY) {
-    ret = OSD_RECOVERY_PRIORITY_FORCED;
-  } else {
-    // XXX: This priority boost isn't so much about inactive, but about data-at-risk
-    if (is_degraded() && info.stats.avail_no_missing.size() < pool.info.min_size) {
-      base = OSD_RECOVERY_INACTIVE_PRIORITY_BASE;
-      // inactive: no. of replicas < min_size, highest priority since it blocks IO
-      ret = base + (pool.info.min_size - info.stats.avail_no_missing.size());
-    }
-
-    int64_t pool_recovery_priority = 0;
-    pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
-
-    ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]);
-  }
-  dout(20) << __func__ << " recovery priority is " << ret << dendl;
-  return static_cast<unsigned>(ret);
-}
-
-unsigned PG::get_backfill_priority()
-{
-  // a higher value -> a higher priority
-  int ret = OSD_BACKFILL_PRIORITY_BASE;
-  int base = ret;
-
-  if (state & PG_STATE_FORCED_BACKFILL) {
-    ret = OSD_BACKFILL_PRIORITY_FORCED;
-  } else {
-    if (acting.size() < pool.info.min_size) {
-      base = OSD_BACKFILL_INACTIVE_PRIORITY_BASE;
-      // inactive: no. of replicas < min_size, highest priority since it blocks IO
-      ret = base + (pool.info.min_size - acting.size());
-
-    } else if (is_undersized()) {
-      // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
-      ceph_assert(pool.info.size > actingset.size());
-      base = OSD_BACKFILL_DEGRADED_PRIORITY_BASE;
-      ret = base + (pool.info.size - actingset.size());
-
-    } else if (is_degraded()) {
-      // degraded: baseline degraded
-      base = ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE;
-    }
-
-    // Adjust with pool's recovery priority
-    int64_t pool_recovery_priority = 0;
-    pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
-
-    ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]);
-  }
-
-  dout(20) << __func__ << " backfill priority is " << ret << dendl;
-  return static_cast<unsigned>(ret);
-}
-
-unsigned PG::get_delete_priority()
-{
-  auto state = get_osdmap()->get_state(osd->whoami);
-  if (state & (CEPH_OSD_BACKFILLFULL |
-               CEPH_OSD_FULL)) {
-    return OSD_DELETE_PRIORITY_FULL;
-  } else if (state & CEPH_OSD_NEARFULL) {
-    return OSD_DELETE_PRIORITY_FULLISH;
-  } else {
-    return OSD_DELETE_PRIORITY_NORMAL;
-  }
-}
-
 Context *PG::finish_recovery()
 {
   dout(10) << "finish_recovery" << dendl;
@@ -4087,6 +3930,60 @@ LogChannel &PG::get_clog() {
   return *(osd->clog);
 }
 
+void PG::schedule_event_after(
+  PGPeeringEventRef event,
+  float delay) {
+  std::lock_guard lock(osd->recovery_request_lock);
+  osd->recovery_request_timer.add_event_after(
+    delay,
+    new QueuePeeringEvt(
+      this,
+      std::move(event)));
+}
+
+void PG::request_local_background_io_reservation(
+  unsigned priority,
+  PGPeeringEventRef on_grant,
+  PGPeeringEventRef on_preempt) {
+  osd->local_reserver.request_reservation(
+    pg_id,
+    on_grant ? new QueuePeeringEvt(
+      this, on_grant) : nullptr,
+    priority,
+    on_preempt ? new QueuePeeringEvt(
+      this, on_preempt) : nullptr);
+}
+
+void PG::update_local_background_io_priority(
+  unsigned priority) {
+  osd->local_reserver.update_priority(
+    pg_id,
+    priority);
+}
+
+void PG::cancel_local_background_io_reservation() {
+  osd->local_reserver.cancel_reservation(
+    pg_id);
+}
+
+void PG::request_remote_recovery_reservation(
+  unsigned priority,
+  PGPeeringEventRef on_grant,
+  PGPeeringEventRef on_preempt) {
+  osd->remote_reserver.request_reservation(
+    pg_id,
+    on_grant ? new QueuePeeringEvt(
+      this, on_grant) : nullptr,
+    priority,
+    on_preempt ? new QueuePeeringEvt(
+      this, on_preempt) : nullptr);
+}
+
+void PG::cancel_remote_recovery_reservation() {
+  osd->remote_reserver.cancel_reservation(
+    pg_id);
+}
+
 void PG::do_replica_scrub_map(OpRequestRef op)
 {
   const MOSDRepScrubMap *m = static_cast<const MOSDRepScrubMap*>(op->get_req());
@@ -4243,26 +4140,6 @@ void PG::reject_reservation()
     get_osdmap_epoch());
 }
 
-void PG::schedule_backfill_retry(float delay)
-{
-  std::lock_guard lock(osd->recovery_request_lock);
-  osd->recovery_request_timer.add_event_after(
-    delay,
-    new QueuePeeringEvt<PeeringState::PeeringState::PeeringState::RequestBackfill>(
-      this, get_osdmap_epoch(),
-      PeeringState::RequestBackfill()));
-}
-
-void PG::schedule_recovery_retry(float delay)
-{
-  std::lock_guard lock(osd->recovery_request_lock);
-  osd->recovery_request_timer.add_event_after(
-    delay,
-    new QueuePeeringEvt<PeeringState::PeeringState::DoRecovery>(
-      this, get_osdmap_epoch(),
-      PeeringState::DoRecovery()));
-}
-
 void PG::clear_scrub_reserved()
 {
   scrubber.reserved_peers.clear();
@@ -5757,7 +5634,7 @@ void PG::start_flush_on_transaction(ObjectStore::Transaction *t)
 bool PG::try_flush_or_schedule_async()
 {
   
-  Context *c = new QueuePeeringEvt<PeeringState::PeeringState::IntervalFlush>(
+  Context *c = new QueuePeeringEvt(
     this, get_osdmap_epoch(), PeeringState::IntervalFlush());
   if (!ch->flush_commit(c)) {
     return false;
index b58c4f18f8c5da156f9ba1f8665beb110eb45dee..e29903cad31da0bf5ca628d8bfc88e9db438e132 100644 (file)
@@ -430,11 +430,27 @@ public:
   epoch_t oldest_stored_osdmap() override;
   LogChannel &get_clog() override;
 
+  void schedule_event_after(
+    PGPeeringEventRef event,
+    float delay) override;
+  void request_local_background_io_reservation(
+    unsigned priority,
+    PGPeeringEventRef on_grant,
+    PGPeeringEventRef on_preempt) override;
+  void update_local_background_io_priority(
+    unsigned priority) override;
+  void cancel_local_background_io_reservation() override;
+
+  void request_remote_recovery_reservation(
+    unsigned priority,
+    PGPeeringEventRef on_grant,
+    PGPeeringEventRef on_preempt) override;
+  void cancel_remote_recovery_reservation() override;
+
+
   bool is_forced_recovery_or_backfill() const {
-    return get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL);
+    return recovery_state.is_forced_recovery_or_backfill();
   }
-  bool set_force_recovery(bool b);
-  bool set_force_backfill(bool b);
 
   void queue_peering_event(PGPeeringEventRef evt);
   void do_peering_event(PGPeeringEventRef evt, PeeringCtx *rcx);
@@ -738,7 +754,6 @@ public:
 protected:
   BackfillInterval backfill_info;
   map<pg_shard_t, BackfillInterval> peer_backfill_info;
-  bool backfill_reserved;
   bool backfill_reserving;
 
   // The primary's num_bytes and local num_bytes for this pg, only valid
@@ -945,15 +960,6 @@ protected:
   bool needs_recovery() const;
   bool needs_backfill() const;
 
-  /// clip calculated priority to reasonable range
-  int clamp_recovery_priority(int prio, int pool_recovery_prio, int max);
-  /// get log recovery reservation priority
-  unsigned get_recovery_priority();
-  /// get backfill reservation priority
-  unsigned get_backfill_priority();
-  /// get priority for pg deletion
-  unsigned get_delete_priority();
-
   void try_mark_clean();  ///< mark an active pg clean
 
   PastIntervals::PriorSet build_prior();
@@ -1427,25 +1433,23 @@ protected:
   void handle_scrub_reserve_release(OpRequestRef op);
 
   void reject_reservation();
-  void schedule_backfill_retry(float retry);
-  void schedule_recovery_retry(float retry);
 
   // -- recovery state --
 
-  template <class EVT>
   struct QueuePeeringEvt : Context {
     PGRef pg;
-    epoch_t epoch;
-    EVT evt;
+    PGPeeringEventRef evt;
+
+    template <class EVT>
     QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) :
-      pg(pg), epoch(epoch), evt(evt) {}
+      pg(pg), evt(std::make_shared<PGPeeringEvent>(epoch, epoch, evt)) {}
+
+    QueuePeeringEvt(PG *pg, PGPeeringEventRef evt) :
+      pg(pg), evt(std::move(evt)) {}
+
     void finish(int r) override {
       pg->lock();
-      pg->queue_peering_event(PGPeeringEventRef(
-                               new PGPeeringEvent(
-                                 epoch,
-                                 epoch,
-                                 evt)));
+      pg->queue_peering_event(std::move(evt));
       pg->unlock();
     }
   };
index 7a94ac477cf5c0c22ae4e41c2d3f99837c57543b..0571ffb8d737ce97d370c055f8a26f69d47a7854 100644 (file)
@@ -794,6 +794,162 @@ void PeeringState::check_past_interval_bounds() const
   }
 }
 
+int PeeringState::clamp_recovery_priority(int priority, int pool_recovery_priority, int max)
+{
+  static_assert(OSD_RECOVERY_PRIORITY_MIN < OSD_RECOVERY_PRIORITY_MAX, "Invalid priority range");
+  static_assert(OSD_RECOVERY_PRIORITY_MIN >= 0, "Priority range must match unsigned type");
+
+  ceph_assert(max <= OSD_RECOVERY_PRIORITY_MAX);
+
+  // User can't set this too high anymore, but might be a legacy value
+  if (pool_recovery_priority > OSD_POOL_PRIORITY_MAX)
+    pool_recovery_priority = OSD_POOL_PRIORITY_MAX;
+  if (pool_recovery_priority < OSD_POOL_PRIORITY_MIN)
+    pool_recovery_priority = OSD_POOL_PRIORITY_MIN;
+  // Shift range from min to max to 0 to max - min
+  pool_recovery_priority += (0 - OSD_POOL_PRIORITY_MIN);
+  ceph_assert(pool_recovery_priority >= 0 && pool_recovery_priority <= (OSD_POOL_PRIORITY_MAX - OSD_POOL_PRIORITY_MIN));
+
+  priority += pool_recovery_priority;
+
+  // Clamp to valid range
+  if (priority > max) {
+    return max;
+  } else if (priority < OSD_RECOVERY_PRIORITY_MIN) {
+    return OSD_RECOVERY_PRIORITY_MIN;
+  } else {
+    return priority;
+  }
+}
+
+unsigned PeeringState::get_recovery_priority()
+{
+  // a higher value -> a higher priority
+  int ret = OSD_RECOVERY_PRIORITY_BASE;
+  int base = ret;
+
+  if (state & PG_STATE_FORCED_RECOVERY) {
+    ret = OSD_RECOVERY_PRIORITY_FORCED;
+  } else {
+    // XXX: This priority boost isn't so much about inactive, but about data-at-risk
+    if (is_degraded() && info.stats.avail_no_missing.size() < pool.info.min_size) {
+      base = OSD_RECOVERY_INACTIVE_PRIORITY_BASE;
+      // inactive: no. of replicas < min_size, highest priority since it blocks IO
+      ret = base + (pool.info.min_size - info.stats.avail_no_missing.size());
+    }
+
+    int64_t pool_recovery_priority = 0;
+    pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
+
+    ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]);
+  }
+  psdout(20) << __func__ << " recovery priority is " << ret << dendl;
+  return static_cast<unsigned>(ret);
+}
+
+unsigned PeeringState::get_backfill_priority()
+{
+  // a higher value -> a higher priority
+  int ret = OSD_BACKFILL_PRIORITY_BASE;
+  int base = ret;
+
+  if (state & PG_STATE_FORCED_BACKFILL) {
+    ret = OSD_BACKFILL_PRIORITY_FORCED;
+  } else {
+    if (acting.size() < pool.info.min_size) {
+      base = OSD_BACKFILL_INACTIVE_PRIORITY_BASE;
+      // inactive: no. of replicas < min_size, highest priority since it blocks IO
+      ret = base + (pool.info.min_size - acting.size());
+
+    } else if (is_undersized()) {
+      // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
+      ceph_assert(pool.info.size > actingset.size());
+      base = OSD_BACKFILL_DEGRADED_PRIORITY_BASE;
+      ret = base + (pool.info.size - actingset.size());
+
+    } else if (is_degraded()) {
+      // degraded: baseline degraded
+      base = ret = OSD_BACKFILL_DEGRADED_PRIORITY_BASE;
+    }
+
+    // Adjust with pool's recovery priority
+    int64_t pool_recovery_priority = 0;
+    pool.info.opts.get(pool_opts_t::RECOVERY_PRIORITY, &pool_recovery_priority);
+
+    ret = clamp_recovery_priority(ret, pool_recovery_priority, max_prio_map[base]);
+  }
+
+  psdout(20) << __func__ << " backfill priority is " << ret << dendl;
+  return static_cast<unsigned>(ret);
+}
+
+unsigned PeeringState::get_delete_priority()
+{
+  auto state = get_osdmap()->get_state(pg_whoami.osd);
+  if (state & (CEPH_OSD_BACKFILLFULL |
+               CEPH_OSD_FULL)) {
+    return OSD_DELETE_PRIORITY_FULL;
+  } else if (state & CEPH_OSD_NEARFULL) {
+    return OSD_DELETE_PRIORITY_FULLISH;
+  } else {
+    return OSD_DELETE_PRIORITY_NORMAL;
+  }
+}
+
+bool PeeringState::set_force_recovery(bool b)
+{
+  bool did = false;
+  if (b) {
+    if (!(state & PG_STATE_FORCED_RECOVERY) &&
+       (state & (PG_STATE_DEGRADED |
+                 PG_STATE_RECOVERY_WAIT |
+                 PG_STATE_RECOVERING))) {
+      psdout(20) << __func__ << " set" << dendl;
+      state_set(PG_STATE_FORCED_RECOVERY);
+      pl->publish_stats_to_osd();
+      did = true;
+    }
+  } else if (state & PG_STATE_FORCED_RECOVERY) {
+    psdout(20) << __func__ << " clear" << dendl;
+    state_clear(PG_STATE_FORCED_RECOVERY);
+    pl->publish_stats_to_osd();
+    did = true;
+  }
+  if (did) {
+    psdout(20) << __func__ << " state " << get_current_state()
+            << dendl;
+    pl->update_local_background_io_priority(get_recovery_priority());
+  }
+  return did;
+}
+
+bool PeeringState::set_force_backfill(bool b)
+{
+  bool did = false;
+  if (b) {
+    if (!(state & PG_STATE_FORCED_BACKFILL) &&
+       (state & (PG_STATE_DEGRADED |
+                 PG_STATE_BACKFILL_WAIT |
+                 PG_STATE_BACKFILLING))) {
+      psdout(10) << __func__ << " set" << dendl;
+      state_set(PG_STATE_FORCED_BACKFILL);
+      pl->publish_stats_to_osd();
+      did = true;
+    }
+  } else if (state & PG_STATE_FORCED_BACKFILL) {
+    psdout(10) << __func__ << " clear" << dendl;
+    state_clear(PG_STATE_FORCED_BACKFILL);
+    pl->publish_stats_to_osd();
+    did = true;
+  }
+  if (did) {
+    psdout(20) << __func__ << " state " << get_current_state()
+            << dendl;
+    pl->update_local_background_io_priority(get_backfill_priority());
+  }
+  return did;
+}
+
 /*------------ Peering State Machine----------------*/
 #undef dout_prefix
 #define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \
@@ -1064,16 +1220,16 @@ boost::statechart::result PeeringState::Primary::react(const ActMap&)
 boost::statechart::result PeeringState::Primary::react(
   const SetForceRecovery&)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->set_force_recovery(true);
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->set_force_recovery(true);
   return discard_event();
 }
 
 boost::statechart::result PeeringState::Primary::react(
   const UnsetForceRecovery&)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->set_force_recovery(false);
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->set_force_recovery(false);
   return discard_event();
 }
 
@@ -1092,16 +1248,16 @@ boost::statechart::result PeeringState::Primary::react(
 boost::statechart::result PeeringState::Primary::react(
   const SetForceBackfill&)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->set_force_backfill(true);
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->set_force_backfill(true);
   return discard_event();
 }
 
 boost::statechart::result PeeringState::Primary::react(
   const UnsetForceBackfill&)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->set_force_backfill(false);
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->set_force_backfill(false);
   return discard_event();
 }
 
@@ -1220,33 +1376,33 @@ PeeringState::Backfilling::Backfilling(my_context ctx)
     NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/Backfilling")
 {
   context< PeeringMachine >().log_enter(state_name);
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
   PG *pg = context< PeeringMachine >().pg;
-  pg->backfill_reserved = true;
+  ps->backfill_reserved = true;
   pg->queue_recovery();
-  pg->state_clear(PG_STATE_BACKFILL_TOOFULL);
-  pg->state_clear(PG_STATE_BACKFILL_WAIT);
-  pg->state_set(PG_STATE_BACKFILLING);
-  pg->publish_stats_to_osd();
+  ps->state_clear(PG_STATE_BACKFILL_TOOFULL);
+  ps->state_clear(PG_STATE_BACKFILL_WAIT);
+  ps->state_set(PG_STATE_BACKFILLING);
+  pl->publish_stats_to_osd();
 }
 
 void PeeringState::Backfilling::backfill_release_reservations()
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
-  for (set<pg_shard_t>::iterator it = pg->backfill_targets.begin();
-       it != pg->backfill_targets.end();
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_local_background_io_reservation();
+  for (set<pg_shard_t>::iterator it = ps->backfill_targets.begin();
+       it != ps->backfill_targets.end();
        ++it) {
-    ceph_assert(*it != pg->pg_whoami);
-    ConnectionRef con = pg->osd->get_con_osd_cluster(
-      it->osd, pg->get_osdmap_epoch());
-    if (con) {
-      pg->osd->send_message_osd_cluster(
-        new MBackfillReserve(
-         MBackfillReserve::RELEASE,
-         spg_t(context< PeeringMachine >().spgid.pgid, it->shard),
-         pg->get_osdmap_epoch()),
-       con.get());
-    }
+    ceph_assert(*it != ps->pg_whoami);
+    pl->send_cluster_message(
+      it->osd,
+      new MBackfillReserve(
+       MBackfillReserve::RELEASE,
+       spg_t(ps->info.pgid.pgid, it->shard),
+       ps->get_osdmap_epoch()),
+      ps->get_osdmap_epoch());
   }
 }
 
@@ -1270,12 +1426,19 @@ PeeringState::Backfilling::react(const Backfilled &c)
 boost::statechart::result
 PeeringState::Backfilling::react(const DeferBackfill &c)
 {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
   psdout(10) << "defer backfill, retry delay " << c.delay << dendl;
-  pg->state_set(PG_STATE_BACKFILL_WAIT);
-  pg->state_clear(PG_STATE_BACKFILLING);
+  ps->state_set(PG_STATE_BACKFILL_WAIT);
+  ps->state_clear(PG_STATE_BACKFILLING);
   cancel_backfill();
-  pg->schedule_backfill_retry(c.delay);
+
+  pl->schedule_event_after(
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      RequestBackfill()),
+    c.delay);
   return transit<NotBackfilling>();
 }
 
@@ -1293,11 +1456,19 @@ PeeringState::Backfilling::react(const UnfoundBackfill &c)
 boost::statechart::result
 PeeringState::Backfilling::react(const RemoteReservationRevokedTooFull &)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->state_set(PG_STATE_BACKFILL_TOOFULL);
-  pg->state_clear(PG_STATE_BACKFILLING);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->state_set(PG_STATE_BACKFILL_TOOFULL);
+  ps->state_clear(PG_STATE_BACKFILLING);
   cancel_backfill();
-  pg->schedule_backfill_retry(pg->cct->_conf->osd_backfill_retry_interval);
+
+  pl->schedule_event_after(
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      RequestBackfill()),
+    ps->cct->_conf->osd_backfill_retry_interval);
+
   return transit<NotBackfilling>();
 }
 
@@ -1319,11 +1490,12 @@ void PeeringState::Backfilling::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
   PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
-  pg->backfill_reserved = false;
+  ps->backfill_reserved = false;
   pg->backfill_reserving = false;
-  pg->state_clear(PG_STATE_BACKFILLING);
-  pg->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY);
+  ps->state_clear(PG_STATE_BACKFILLING);
+  ps->state_clear(PG_STATE_FORCED_BACKFILL | PG_STATE_FORCED_RECOVERY);
   utime_t dur = ceph_clock_now() - enter_time;
   pl->get_peering_perf().tinc(rs_backfilling_latency, dur);
 }
@@ -1336,38 +1508,38 @@ PeeringState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context
     backfill_osd_it(context< Active >().remote_shards_to_reserve_backfill.begin())
 {
   context< PeeringMachine >().log_enter(state_name);
-  PG *pg = context< PeeringMachine >().pg;
-  pg->state_set(PG_STATE_BACKFILL_WAIT);
-  pg->publish_stats_to_osd();
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->state_set(PG_STATE_BACKFILL_WAIT);
+  pl->publish_stats_to_osd();
   post_event(RemoteBackfillReserved());
 }
 
 boost::statechart::result
 PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
 
-  int64_t num_bytes = pg->info.stats.stats.sum.num_bytes;
+  int64_t num_bytes = ps->info.stats.stats.sum.num_bytes;
   psdout(10) << __func__ << " num_bytes " << num_bytes << dendl;
-  if (backfill_osd_it != context< Active >().remote_shards_to_reserve_backfill.end()) {
-    //The primary never backfills itself
-    ceph_assert(*backfill_osd_it != pg->pg_whoami);
-    ConnectionRef con = pg->osd->get_con_osd_cluster(
-      backfill_osd_it->osd, pg->get_osdmap_epoch());
-    if (con) {
-      pg->osd->send_message_osd_cluster(
-        new MBackfillReserve(
+  if (backfill_osd_it !=
+      context< Active >().remote_shards_to_reserve_backfill.end()) {
+    // The primary never backfills itself
+    ceph_assert(*backfill_osd_it != ps->pg_whoami);
+    pl->send_cluster_message(
+      backfill_osd_it->osd,
+      new MBackfillReserve(
        MBackfillReserve::REQUEST,
        spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard),
-       pg->get_osdmap_epoch(),
-       pg->get_backfill_priority(),
+       ps->get_osdmap_epoch(),
+       ps->get_backfill_priority(),
         num_bytes,
-        pg->peer_bytes[*backfill_osd_it]),
-      con.get());
-    }
+        ps->peer_bytes[*backfill_osd_it]),
+      ps->get_osdmap_epoch());
     ++backfill_osd_it;
   } else {
-    pg->peer_bytes.clear();
+    ps->peer_bytes.clear();
     post_event(AllBackfillsReserved());
   }
   return discard_event();
@@ -1383,8 +1555,9 @@ void PeeringState::WaitRemoteBackfillReserved::exit()
 
 void PeeringState::WaitRemoteBackfillReserved::retry()
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_local_background_io_reservation();
 
   // Send CANCEL to all previously acquired reservations
   set<pg_shard_t>::const_iterator it, begin, end;
@@ -1392,25 +1565,27 @@ void PeeringState::WaitRemoteBackfillReserved::retry()
   end = context< Active >().remote_shards_to_reserve_backfill.end();
   ceph_assert(begin != end);
   for (it = begin; it != backfill_osd_it; ++it) {
-    //The primary never backfills itself
-    ceph_assert(*it != pg->pg_whoami);
-    ConnectionRef con = pg->osd->get_con_osd_cluster(
-      it->osd, pg->get_osdmap_epoch());
-    if (con) {
-      pg->osd->send_message_osd_cluster(
-        new MBackfillReserve(
+    // The primary never backfills itself
+    ceph_assert(*it != ps->pg_whoami);
+    pl->send_cluster_message(
+      it->osd,
+      new MBackfillReserve(
        MBackfillReserve::RELEASE,
        spg_t(context< PeeringMachine >().spgid.pgid, it->shard),
-       pg->get_osdmap_epoch()),
-      con.get());
-    }
+       ps->get_osdmap_epoch()),
+      ps->get_osdmap_epoch());
   }
 
-  pg->state_clear(PG_STATE_BACKFILL_WAIT);
-  pg->state_set(PG_STATE_BACKFILL_TOOFULL);
-  pg->publish_stats_to_osd();
+  ps->state_clear(PG_STATE_BACKFILL_WAIT);
+  ps->state_set(PG_STATE_BACKFILL_TOOFULL);
+  pl->publish_stats_to_osd();
 
-  pg->schedule_backfill_retry(pg->cct->_conf->osd_backfill_retry_interval);
+  pl->schedule_event_after(
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      RequestBackfill()),
+    ps->cct->_conf->osd_backfill_retry_interval);
 }
 
 boost::statechart::result
@@ -1433,18 +1608,20 @@ PeeringState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ct
     NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitLocalBackfillReserved")
 {
   context< PeeringMachine >().log_enter(state_name);
-  PG *pg = context< PeeringMachine >().pg;
-  pg->state_set(PG_STATE_BACKFILL_WAIT);
-  pg->osd->local_reserver.request_reservation(
-    pg->info.pgid,
-    new PG::QueuePeeringEvt<LocalBackfillReserved>(
-      pg, pg->get_osdmap_epoch(),
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->state_set(PG_STATE_BACKFILL_WAIT);
+  pl->request_local_background_io_reservation(
+    ps->get_backfill_priority(),
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
       LocalBackfillReserved()),
-    pg->get_backfill_priority(),
-    new PG::QueuePeeringEvt<DeferBackfill>(
-      pg, pg->get_osdmap_epoch(),
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
       DeferBackfill(0.0)));
-  pg->publish_stats_to_osd();
+  pl->publish_stats_to_osd();
 }
 
 void PeeringState::WaitLocalBackfillReserved::exit()
@@ -1544,14 +1721,15 @@ PeeringState::RepWaitRecoveryReserved::RepWaitRecoveryReserved(my_context ctx)
 boost::statechart::result
 PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->osd->send_message_osd_cluster(
-    pg->primary.osd,
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->send_cluster_message(
+    ps->primary.osd,
     new MRecoveryReserve(
       MRecoveryReserve::GRANT,
-      spg_t(context< PeeringMachine >().spgid.pgid, pg->primary.shard),
-      pg->get_osdmap_epoch()),
-    pg->get_osdmap_epoch());
+      spg_t(ps->info.pgid.pgid, ps->primary.shard),
+      ps->get_osdmap_epoch()),
+    ps->get_osdmap_epoch());
   return transit<RepRecovering>();
 }
 
@@ -1561,7 +1739,8 @@ PeeringState::RepWaitRecoveryReserved::react(
 {
   PG *pg = context< PeeringMachine >().pg;
   pg->clear_reserved_num_bytes();
-  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_remote_recovery_reservation();
   return transit<RepNotRecovering>();
 }
 
@@ -1592,6 +1771,7 @@ static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t loca
 boost::statechart::result
 PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt)
 {
+  PeeringListener *pl = context< PeeringMachine >().pl;
   PG *pg = context< PeeringMachine >().pg;
   // Use tentative_bacfill_full() to make sure enough
   // space is available to handle target bytes from primary.
@@ -1643,7 +1823,6 @@ PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt)
                       << dendl;
     post_event(RejectRemoteReservation());
   } else {
-    Context *preempt = nullptr;
     // Don't reserve space if skipped reservation check, this is used
     // to test the other backfill full check AND in case a corruption
     // of num_bytes requires ignoring that value and trying the
@@ -1654,18 +1833,21 @@ PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt)
       pg->clear_reserved_num_bytes();
     // Use un-ec-adjusted bytes for stats.
     pg->info.stats.stats.sum.num_bytes = evt.local_num_bytes;
+
+    PGPeeringEventRef preempt;
     if (HAVE_FEATURE(pg->upacting_features, RECOVERY_RESERVATION_2)) {
       // older peers will interpret preemption as TOOFULL
-      preempt = new PG::QueuePeeringEvt<RemoteBackfillPreempted>(
-       pg, pg->get_osdmap_epoch(),
+      preempt = std::make_shared<PGPeeringEvent>(
+       pl->get_osdmap_epoch(),
+       pl->get_osdmap_epoch(),
        RemoteBackfillPreempted());
     }
-    pg->osd->remote_reserver.request_reservation(
-      pg->info.pgid,
-      new PG::QueuePeeringEvt<RemoteBackfillReserved>(
-        pg, pg->get_osdmap_epoch(),
-        RemoteBackfillReserved()),
+    pl->request_remote_recovery_reservation(
       evt.priority,
+      std::make_shared<PGPeeringEvent>(
+       pl->get_osdmap_epoch(),
+       pl->get_osdmap_epoch(),
+        RemoteBackfillReserved()),
       preempt);
   }
   return transit<RepWaitBackfillReserved>();
@@ -1674,26 +1856,28 @@ PeeringState::RepNotRecovering::react(const RequestBackfillPrio &evt)
 boost::statechart::result
 PeeringState::RepNotRecovering::react(const RequestRecoveryPrio &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
 
   // fall back to a local reckoning of priority of primary doesn't pass one
   // (pre-mimic compat)
-  int prio = evt.priority ? evt.priority : pg->get_recovery_priority();
+  int prio = evt.priority ? evt.priority : ps->get_recovery_priority();
 
-  Context *preempt = nullptr;
-  if (HAVE_FEATURE(pg->upacting_features, RECOVERY_RESERVATION_2)) {
+  PGPeeringEventRef preempt;
+  if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) {
     // older peers can't handle this
-    preempt = new PG::QueuePeeringEvt<RemoteRecoveryPreempted>(
-      pg, pg->get_osdmap_epoch(),
+    preempt = std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
       RemoteRecoveryPreempted());
   }
 
-  pg->osd->remote_reserver.request_reservation(
-    pg->info.pgid,
-    new PG::QueuePeeringEvt<RemoteRecoveryReserved>(
-      pg, pg->get_osdmap_epoch(),
-      RemoteRecoveryReserved()),
+  pl->request_remote_recovery_reservation(
     prio,
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      RemoteRecoveryReserved()),
     preempt);
   return transit<RepWaitRecoveryReserved>();
 }
@@ -1709,15 +1893,15 @@ void PeeringState::RepWaitBackfillReserved::exit()
 boost::statechart::result
 PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-
-  pg->osd->send_message_osd_cluster(
-      pg->primary.osd,
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->send_cluster_message(
+      ps->primary.osd,
       new MBackfillReserve(
        MBackfillReserve::GRANT,
-       spg_t(context< PeeringMachine >().spgid.pgid, pg->primary.shard),
-       pg->get_osdmap_epoch()),
-      pg->get_osdmap_epoch());
+       spg_t(ps->info.pgid.pgid, ps->primary.shard),
+       ps->get_osdmap_epoch()),
+      ps->get_osdmap_epoch());
   return transit<RepRecovering>();
 }
 
@@ -1737,7 +1921,8 @@ PeeringState::RepWaitBackfillReserved::react(
 {
   PG *pg = context< PeeringMachine >().pg;
   pg->clear_reserved_num_bytes();
-  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_remote_recovery_reservation();
   return transit<RepNotRecovering>();
 }
 
@@ -1747,7 +1932,8 @@ PeeringState::RepWaitBackfillReserved::react(
 {
   PG *pg = context< PeeringMachine >().pg;
   pg->clear_reserved_num_bytes();
-  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_remote_recovery_reservation();
   return transit<RepNotRecovering>();
 }
 
@@ -1763,14 +1949,16 @@ boost::statechart::result
 PeeringState::RepRecovering::react(const RemoteRecoveryPreempted &)
 {
   PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
   pg->clear_reserved_num_bytes();
-  pg->osd->send_message_osd_cluster(
-    pg->primary.osd,
+  pl->send_cluster_message(
+    ps->primary.osd,
     new MRecoveryReserve(
       MRecoveryReserve::REVOKE,
-      spg_t(context< PeeringMachine >().spgid.pgid, pg->primary.shard),
-      pg->get_osdmap_epoch()),
-    pg->get_osdmap_epoch());
+      spg_t(ps->info.pgid.pgid, ps->primary.shard),
+      ps->get_osdmap_epoch()),
+    ps->get_osdmap_epoch());
   return discard_event();
 }
 
@@ -1778,14 +1966,16 @@ boost::statechart::result
 PeeringState::RepRecovering::react(const BackfillTooFull &)
 {
   PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
   pg->clear_reserved_num_bytes();
-  pg->osd->send_message_osd_cluster(
-    pg->primary.osd,
+  pl->send_cluster_message(
+    ps->primary.osd,
     new MBackfillReserve(
       MBackfillReserve::TOOFULL,
-      spg_t(context< PeeringMachine >().spgid.pgid, pg->primary.shard),
-      pg->get_osdmap_epoch()),
-    pg->get_osdmap_epoch());
+      spg_t(ps->info.pgid.pgid, ps->primary.shard),
+      ps->get_osdmap_epoch()),
+    ps->get_osdmap_epoch());
   return discard_event();
 }
 
@@ -1793,24 +1983,26 @@ boost::statechart::result
 PeeringState::RepRecovering::react(const RemoteBackfillPreempted &)
 {
   PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
   pg->clear_reserved_num_bytes();
-  pg->osd->send_message_osd_cluster(
-    pg->primary.osd,
+  pl->send_cluster_message(
+    ps->primary.osd,
     new MBackfillReserve(
       MBackfillReserve::REVOKE,
-      spg_t(context< PeeringMachine >().spgid.pgid, pg->primary.shard),
-      pg->get_osdmap_epoch()),
-    pg->get_osdmap_epoch());
+      spg_t(ps->info.pgid.pgid, ps->primary.shard),
+      ps->get_osdmap_epoch()),
+    ps->get_osdmap_epoch());
   return discard_event();
 }
 
 void PeeringState::RepRecovering::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
-  PeeringListener *pl = context< PeeringMachine >().pl;
   PG *pg = context< PeeringMachine >().pg;
   pg->clear_reserved_num_bytes();
-  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_remote_recovery_reservation();
   utime_t dur = ceph_clock_now() - enter_time;
   pl->get_peering_perf().tinc(rs_reprecovering_latency, dur);
 }
@@ -1836,35 +2028,43 @@ PeeringState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ct
     NamedState(context< PeeringMachine >().state_history, "Started/Primary/Active/WaitLocalRecoveryReserved")
 {
   context< PeeringMachine >().log_enter(state_name);
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
 
   // Make sure all nodes that part of the recovery aren't full
-  if (!pg->cct->_conf->osd_debug_skip_full_check_in_recovery &&
-      pg->osd->check_osdmap_full(pg->acting_recovery_backfill)) {
+  if (!ps->cct->_conf->osd_debug_skip_full_check_in_recovery &&
+      ps->pg->osd->check_osdmap_full(ps->acting_recovery_backfill)) {
     post_event(RecoveryTooFull());
     return;
   }
 
-  pg->state_clear(PG_STATE_RECOVERY_TOOFULL);
-  pg->state_set(PG_STATE_RECOVERY_WAIT);
-  pg->osd->local_reserver.request_reservation(
-    pg->info.pgid,
-    new PG::QueuePeeringEvt<LocalRecoveryReserved>(
-      pg, pg->get_osdmap_epoch(),
+  ps->state_clear(PG_STATE_RECOVERY_TOOFULL);
+  ps->state_set(PG_STATE_RECOVERY_WAIT);
+  pl->request_local_background_io_reservation(
+    ps->get_recovery_priority(),
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
       LocalRecoveryReserved()),
-    pg->get_recovery_priority(),
-    new PG::QueuePeeringEvt<DeferRecovery>(
-      pg, pg->get_osdmap_epoch(),
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
       DeferRecovery(0.0)));
-  pg->publish_stats_to_osd();
+  pl->publish_stats_to_osd();
 }
 
 boost::statechart::result
 PeeringState::WaitLocalRecoveryReserved::react(const RecoveryTooFull &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->state_set(PG_STATE_RECOVERY_TOOFULL);
-  pg->schedule_recovery_retry(pg->cct->_conf->osd_recovery_retry_interval);
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  ps->state_set(PG_STATE_RECOVERY_TOOFULL);
+  pl->schedule_event_after(
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      DoRecovery()),
+    ps->cct->_conf->osd_recovery_retry_interval);
   return transit<NotRecovering>();
 }
 
@@ -1887,21 +2087,21 @@ PeeringState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_context
 
 boost::statechart::result
 PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved &evt) {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
 
-  if (remote_recovery_reservation_it != context< Active >().remote_shards_to_reserve_recovery.end()) {
-    ceph_assert(*remote_recovery_reservation_it != pg->pg_whoami);
-    ConnectionRef con = pg->osd->get_con_osd_cluster(
-      remote_recovery_reservation_it->osd, pg->get_osdmap_epoch());
-    if (con) {
-      pg->osd->send_message_osd_cluster(
-        new MRecoveryReserve(
-         MRecoveryReserve::REQUEST,
-         spg_t(context< PeeringMachine >().spgid.pgid, remote_recovery_reservation_it->shard),
-         pg->get_osdmap_epoch(),
-         pg->get_recovery_priority()),
-       con.get());
-    }
+  if (remote_recovery_reservation_it !=
+      context< Active >().remote_shards_to_reserve_recovery.end()) {
+    ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami);
+    pl->send_cluster_message(
+      remote_recovery_reservation_it->osd,
+      new MRecoveryReserve(
+       MRecoveryReserve::REQUEST,
+       spg_t(context< PeeringMachine >().spgid.pgid,
+             remote_recovery_reservation_it->shard),
+       ps->get_osdmap_epoch(),
+       ps->get_recovery_priority()),
+      ps->get_osdmap_epoch());
     ++remote_recovery_reservation_it;
   } else {
     post_event(AllRemotesReserved());
@@ -1934,76 +2134,85 @@ PeeringState::Recovering::Recovering(my_context ctx)
 
 void PeeringState::Recovering::release_reservations(bool cancel)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  ceph_assert(cancel || !pg->pg_log.get_missing().have_missing());
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  ceph_assert(cancel || !ps->pg_log.get_missing().have_missing());
 
   // release remote reservations
   for (set<pg_shard_t>::const_iterator i =
         context< Active >().remote_shards_to_reserve_recovery.begin();
         i != context< Active >().remote_shards_to_reserve_recovery.end();
         ++i) {
-    if (*i == pg->pg_whoami) // skip myself
+    if (*i == ps->pg_whoami) // skip myself
       continue;
-    ConnectionRef con = pg->osd->get_con_osd_cluster(
-      i->osd, pg->get_osdmap_epoch());
-    if (con) {
-      pg->osd->send_message_osd_cluster(
-        new MRecoveryReserve(
-         MRecoveryReserve::RELEASE,
-         spg_t(context< PeeringMachine >().spgid.pgid, i->shard),
-         pg->get_osdmap_epoch()),
-       con.get());
-    }
+    pl->send_cluster_message(
+      i->osd,
+      new MRecoveryReserve(
+       MRecoveryReserve::RELEASE,
+       spg_t(ps->info.pgid.pgid, i->shard),
+       ps->get_osdmap_epoch()),
+      ps->get_osdmap_epoch());
   }
 }
 
 boost::statechart::result
 PeeringState::Recovering::react(const AllReplicasRecovered &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->state_clear(PG_STATE_FORCED_RECOVERY);
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  ps->state_clear(PG_STATE_FORCED_RECOVERY);
   release_reservations();
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  pl->cancel_local_background_io_reservation();
   return transit<Recovered>();
 }
 
 boost::statechart::result
 PeeringState::Recovering::react(const RequestBackfill &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  pg->state_clear(PG_STATE_FORCED_RECOVERY);
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+
   release_reservations();
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+
+  ps->state_clear(PG_STATE_FORCED_RECOVERY);
+  pl->cancel_local_background_io_reservation();
+  pl->publish_stats_to_osd();
   // XXX: Is this needed?
-  pg->publish_stats_to_osd();
   return transit<WaitLocalBackfillReserved>();
 }
 
 boost::statechart::result
 PeeringState::Recovering::react(const DeferRecovery &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  if (!pg->state_test(PG_STATE_RECOVERING)) {
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  if (!ps->state_test(PG_STATE_RECOVERING)) {
     // we may have finished recovery and have an AllReplicasRecovered
     // event queued to move us to the next state.
     psdout(10) << "got defer recovery but not recovering" << dendl;
     return discard_event();
   }
   psdout(10) << "defer recovery, retry delay " << evt.delay << dendl;
-  pg->state_set(PG_STATE_RECOVERY_WAIT);
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  ps->state_set(PG_STATE_RECOVERY_WAIT);
+  pl->cancel_local_background_io_reservation();
   release_reservations(true);
-  pg->schedule_recovery_retry(evt.delay);
+  pl->schedule_event_after(
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      DoRecovery()),
+    evt.delay);
   return transit<NotRecovering>();
 }
 
 boost::statechart::result
 PeeringState::Recovering::react(const UnfoundRecovery &evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
+  PeeringState *ps = context< PeeringMachine >().state;
+  PeeringListener *pl = context< PeeringMachine >().pl;
   psdout(10) << "recovery has unfound, can't continue" << dendl;
-  pg->state_set(PG_STATE_RECOVERY_UNFOUND);
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  ps->state_set(PG_STATE_RECOVERY_UNFOUND);
+  pl->cancel_local_background_io_reservation();
   release_reservations(true);
   return transit<NotRecovering>();
 }
@@ -2123,8 +2332,7 @@ PeeringState::Active::Active(my_context ctx)
   PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
 
-  ceph_assert(!pg->backfill_reserving);
-  ceph_assert(!pg->backfill_reserved);
+  ceph_assert(!ps->backfill_reserved);
   ceph_assert(pg->is_primary());
   psdout(10) << "In Active, about to call activate" << dendl;
   ps->start_flush(context< PeeringMachine >().get_cur_transaction());
@@ -2514,19 +2722,20 @@ void PeeringState::Active::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
   PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
   PG *pg = context< PeeringMachine >().pg;
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  pl->cancel_local_background_io_reservation();
 
   pg->blocked_by.clear();
-  pg->backfill_reserved = false;
+  ps->backfill_reserved = false;
   pg->backfill_reserving = false;
-  pg->state_clear(PG_STATE_ACTIVATING);
-  pg->state_clear(PG_STATE_DEGRADED);
-  pg->state_clear(PG_STATE_UNDERSIZED);
-  pg->state_clear(PG_STATE_BACKFILL_TOOFULL);
-  pg->state_clear(PG_STATE_BACKFILL_WAIT);
-  pg->state_clear(PG_STATE_RECOVERY_WAIT);
-  pg->state_clear(PG_STATE_RECOVERY_TOOFULL);
+  ps->state_clear(PG_STATE_ACTIVATING);
+  ps->state_clear(PG_STATE_DEGRADED);
+  ps->state_clear(PG_STATE_UNDERSIZED);
+  ps->state_clear(PG_STATE_BACKFILL_TOOFULL);
+  ps->state_clear(PG_STATE_BACKFILL_WAIT);
+  ps->state_clear(PG_STATE_RECOVERY_WAIT);
+  ps->state_clear(PG_STATE_RECOVERY_TOOFULL);
   utime_t dur = ceph_clock_now() - enter_time;
   pl->get_peering_perf().tinc(rs_active_latency, dur);
   pg->agent_stop();
@@ -2620,10 +2829,10 @@ boost::statechart::result PeeringState::ReplicaActive::react(const QueryState& q
 void PeeringState::ReplicaActive::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
-  PeeringListener *pl = context< PeeringMachine >().pl;
   PG *pg = context< PeeringMachine >().pg;
   pg->clear_reserved_num_bytes();
-  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_remote_recovery_reservation();
   utime_t dur = ceph_clock_now() - enter_time;
   pl->get_peering_perf().tinc(rs_replicaactive_latency, dur);
 }
@@ -2746,7 +2955,8 @@ void PeeringState::ToDelete::exit()
   // note: on a successful removal, this path doesn't execute. see
   // _delete_some().
   pg->osd->logger->dec(l_osd_pg_removing);
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_local_background_io_reservation();
 }
 
 /*----WaitDeleteReserved----*/
@@ -2756,27 +2966,30 @@ PeeringState::WaitDeleteReserved::WaitDeleteReserved(my_context ctx)
               "Started/ToDelete/WaitDeleteReseved")
 {
   context< PeeringMachine >().log_enter(state_name);
-  PG *pg = context< PeeringMachine >().pg;
-  context<ToDelete>().priority = pg->get_delete_priority();
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
-  pg->osd->local_reserver.request_reservation(
-    pg->info.pgid,
-    new PG::QueuePeeringEvt<DeleteReserved>(
-      pg, pg->get_osdmap_epoch(),
-      DeleteReserved()),
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  PeeringState *ps = context< PeeringMachine >().state;
+  context< ToDelete >().priority = ps->get_delete_priority();
+
+  pl->cancel_local_background_io_reservation();
+  pl->request_local_background_io_reservation(
     context<ToDelete>().priority,
-    new PG::QueuePeeringEvt<DeleteInterrupted>(
-      pg, pg->get_osdmap_epoch(),
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
+      DeleteReserved()),
+    std::make_shared<PGPeeringEvent>(
+      ps->get_osdmap_epoch(),
+      ps->get_osdmap_epoch(),
       DeleteInterrupted()));
 }
 
 boost::statechart::result PeeringState::ToDelete::react(
   const ActMap& evt)
 {
-  PG *pg = context< PeeringMachine >().pg;
-  if (pg->get_delete_priority() != priority) {
-    ldout(pg->cct,10) << __func__ << " delete priority changed, resetting"
-                     << dendl;
+  PeeringState *ps = context< PeeringMachine >().state;
+  if (ps->get_delete_priority() != priority) {
+    psdout(10) << __func__ << " delete priority changed, resetting"
+                  << dendl;
     return transit<ToDelete>();
   }
   return discard_event();
@@ -2811,9 +3024,10 @@ boost::statechart::result PeeringState::Deleting::react(
 void PeeringState::Deleting::exit()
 {
   context< PeeringMachine >().log_exit(state_name, enter_time);
-  PG *pg = context< PeeringMachine >().pg;
-  pg->deleting = false;
-  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+  PeeringState *ps = context< PeeringMachine >().state;
+  ps->deleting = false;
+  PeeringListener *pl = context< PeeringMachine >().pl;
+  pl->cancel_local_background_io_reservation();
 }
 
 /*--------GetInfo---------*/
index bf8dd01a0188edecc3de2c7486d532abf57270d0..90ea77500e5565cbf1b533ad9c353c524688fe72 100644 (file)
@@ -77,6 +77,25 @@ public:
       ObjectStore::Transaction *t) = 0;
     virtual void on_flushed() = 0;
 
+    // Recovery
+    virtual void schedule_event_after(
+      PGPeeringEventRef event,
+      float delay) = 0;
+    virtual void request_local_background_io_reservation(
+      unsigned priority,
+      PGPeeringEventRef on_grant,
+      PGPeeringEventRef on_preempt) = 0;
+    virtual void update_local_background_io_priority(
+      unsigned priority) = 0;
+    virtual void cancel_local_background_io_reservation() = 0;
+
+    virtual void request_remote_recovery_reservation(
+      unsigned priority,
+      PGPeeringEventRef on_grant,
+      PGPeeringEventRef on_preempt) = 0;
+    virtual void cancel_remote_recovery_reservation() = 0;
+
+
     virtual PerfCounters &get_peering_perf() = 0;
 
     virtual void clear_ready_to_merge() = 0;
@@ -1161,6 +1180,9 @@ public:
 
   MissingLoc missing_loc; ///< information about missing objects
 
+  bool backfill_reserved = false;
+  bool backfill_reserving = false;
+
   void update_osdmap_ref(OSDMapRef newmap) {
     osdmap_ref = std::move(newmap);
   }
@@ -1196,6 +1218,17 @@ public:
     int new_acting_primary);
   void clear_primary_state();
   void check_past_interval_bounds() const;
+  bool set_force_recovery(bool b);
+  bool set_force_backfill(bool b);
+
+  /// clip calculated priority to reasonable range
+  int clamp_recovery_priority(int prio, int pool_recovery_prio, int max);
+  /// get log recovery reservation priority
+  unsigned get_recovery_priority();
+  /// get backfill reservation priority
+  unsigned get_backfill_priority();
+  /// get priority for pg deletion
+  unsigned get_delete_priority();
 
 public:
   PeeringState(
@@ -1370,6 +1403,18 @@ public:
   bool is_repair() const { return state_test(PG_STATE_REPAIR); }
   bool is_empty() const { return info.last_update == eversion_t(0,0); }
 
+  bool is_forced_recovery_or_backfill() const {
+    return get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL);
+  }
+
+  bool is_backfill_reserved() const {
+    return backfill_reserved;
+  }
+
+  bool is_backfill_reserving() const {
+    return backfill_reserving;
+  }
+
   // Flush control interface
 private:
   void start_flush(ObjectStore::Transaction *t) {
index 647e2f321d69029b03297e2793e1a1755b5ef233..9f46fa3c1ffcbbac59c06ce40181ff520cbf47f5 100644 (file)
@@ -12460,7 +12460,7 @@ bool PrimaryLogPG::start_recovery_ops(
               !is_degraded())  {
       dout(10) << "deferring backfill due to NOREBALANCE" << dendl;
       deferred_backfill = true;
-    } else if (!backfill_reserved) {
+    } else if (!recovery_state.is_backfill_reserved()) {
       dout(10) << "deferring backfill due to !backfill_reserved" << dendl;
       if (!backfill_reserving) {
        dout(10) << "queueing RequestBackfill" << dendl;