]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: add backfill reservations
authorSamuel Just <sam.just@inktank.com>
Thu, 6 Sep 2012 22:11:57 +0000 (15:11 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 25 Sep 2012 18:43:47 +0000 (11:43 -0700)
Previously, a new osd would be bombarded by backfills from many osds
simultaneously, resulting in excessively high load.  Instead, we
want to limit the number of backfills coming into and going out
from a single osd.

To that end, each OSDService now has two AsyncReserver instances: one
for backfills going from the osd (local_reserver) and one for backfills
going to the osd (remote_reserver).  For a primary to initiate a
backfill, it must first obtain a reservation from its own
local_reserver.  Then, it must obtain a reservation from the backfill
target's remote_reserver via a MBackfillReserve message. This process is
managed by substates of Active and ReplicaActive (see the changes in
PG.h).  The reservations are dropped either on the Backfilled event,
which is sent on the primary before calling recovery_complete and on the
replica on receipt of the BackfillComplete progress message), or upon
leaving Active or ReplicaActive.

It's important that we always grab the local reservation before the
remote reservation in order to prevent a circular dependency.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/Makefile.am
src/common/AsyncReserver.h [new file with mode: 0644]
src/common/config_opts.h
src/messages/MBackfillReserve.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc

index 56fc8f7d4478f9fc85da42618e4a137e8f920167..81a887d110fda415709122e4ffd5d419abc7ef0b 100644 (file)
@@ -1378,6 +1378,7 @@ noinst_HEADERS = \
        common/secret.h\
        common/strtol.h\
        common/static_assert.h\
+       common/AsyncReserver.h\
        crush/CrushCompiler.h\
        crush/CrushTester.h\
         crush/CrushWrapper.h\
@@ -1611,6 +1612,7 @@ noinst_HEADERS = \
         messages/MOSDPGQuery.h\
         messages/MOSDPGRemove.h\
        messages/MOSDPGScan.h\
+        messages/MBackfillReserve.h\
         messages/MOSDPGTemp.h\
         messages/MOSDPGTrim.h\
         messages/MOSDPing.h\
diff --git a/src/common/AsyncReserver.h b/src/common/AsyncReserver.h
new file mode 100644 (file)
index 0000000..755d11a
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef ASYNC_RESERVER_H
+#define ASYNC_RESERVER_H
+
+#include <map>
+#include <utility>
+#include <list>
+
+#include "common/Mutex.h"
+#include "common/Finisher.h"
+
+/**
+ * Manages a configurable number of asyncronous reservations.
+ */
+template <typename T>
+class AsyncReserver {
+  Finisher *f;
+  const unsigned max_allowed;
+  Mutex lock;
+
+  list<pair<T, Context*> > queue;
+  map<T, typename list<pair<T, Context*> >::iterator > queue_pointers;
+  set<T> in_progress;
+
+  void do_queues() {
+    while (in_progress.size() < max_allowed &&
+           queue.size()) {
+      pair<T, Context*> p = queue.front();
+      queue_pointers.erase(p.first);
+      queue.pop_front();
+      f->queue(p.second);
+      in_progress.insert(p.first);
+    }
+  }
+public:
+  AsyncReserver(
+    Finisher *f,
+    unsigned max_allowed)
+    : f(f), max_allowed(max_allowed), lock("AsyncReserver::lock") {}
+
+  /**
+   * Requests a reservation
+   *
+   * Note, on_reserved may be called following cancel_reservation.  Thus,
+   * the callback must be safe in that case.  Callback will be called
+   * with no locks held.  cancel_reservation must be called to release the
+   * reservation slot.
+   */
+  void request_reservation(
+    T item,                   ///< [in] reservation key
+    Context *on_reserved      ///< [in] callback to be called on reservation
+    ) {
+    Mutex::Locker l(lock);
+    assert(!queue_pointers.count(item) &&
+          !in_progress.count(item));
+    queue.push_back(make_pair(item, on_reserved));
+    queue_pointers.insert(make_pair(item, --queue.end()));
+    do_queues();
+  }
+
+  /**
+   * Cancels reservation
+   *
+   * Frees the reservation under key for use.
+   * Note, after cancel_reservation, the reservation_callback may or
+   * may not still be called. 
+   */
+  void cancel_reservation(
+    T item                   ///< [in] key for reservation to cancel
+    ) {
+    Mutex::Locker l(lock);
+    if (queue_pointers.count(item)) {
+      queue.erase(queue_pointers[item]);
+      queue_pointers.erase(item);
+    } else {
+      in_progress.erase(item);
+    }
+    do_queues();
+  }
+};
+
+#endif
index 8e959f775ca061fb8a1fb20cf5bc8db81ca346c6..7143361326c9920563e0dafc41bc14a646cb1574 100644 (file)
@@ -264,6 +264,9 @@ OPTION(osd_auto_upgrade_tmap, OPT_BOOL, true)
 // If true, TMAPPUT sets uses_tmap DEBUGGING ONLY
 OPTION(osd_tmapput_sets_uses_tmap, OPT_BOOL, false)
 
+// Maximum number of backfills to or from a single osd
+OPTION(osd_max_backfills, OPT_U64, 5)
+
 OPTION(osd_uuid, OPT_UUID, uuid_d())
 OPTION(osd_data, OPT_STR, "/var/lib/ceph/osd/$cluster-$id")
 OPTION(osd_journal, OPT_STR, "/var/lib/ceph/osd/$cluster-$id/journal")
diff --git a/src/messages/MBackfillReserve.h b/src/messages/MBackfillReserve.h
new file mode 100644 (file)
index 0000000..23308c2
--- /dev/null
@@ -0,0 +1,73 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MBACKFILL_H
+#define CEPH_MBACKFILL_H
+
+#include "msg/Message.h"
+
+class MBackfillReserve : public Message {
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+public:
+  pg_t pgid;
+  epoch_t query_epoch;
+  enum {
+    REQUEST = 0,
+    GRANT = 1
+  };
+  int type;
+  MBackfillReserve(int type,
+                     pg_t pgid,
+                     epoch_t query_epoch)
+    : Message(MSG_OSD_BACKFILL_RESERVE, HEAD_VERSION, COMPAT_VERSION),
+      pgid(pgid), query_epoch(query_epoch),
+      type(type) {}
+
+  MBackfillReserve() :
+    Message(MSG_OSD_BACKFILL_RESERVE, HEAD_VERSION, COMPAT_VERSION) {}
+
+  const char *get_type_name() const {
+    return "MBackfillReserve";
+  }
+
+  void print(ostream& out) const {
+    out << "MBackfillReserve ";
+    switch (type) {
+    case REQUEST:
+      out << "REQUEST ";
+      break;
+    case GRANT:
+      out << "GRANT "; 
+      break;
+    }
+    out << " pgid: " << pgid << ", query_epoch: " << query_epoch;
+    return;
+  }
+
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(pgid, p);
+    ::decode(query_epoch, p);
+    ::decode(type, p);
+  }
+
+  void encode_payload(uint64_t features) {
+    ::encode(pgid, payload);
+    ::encode(query_epoch, payload);
+    ::encode(type, payload);
+  }
+};
+
+#endif
index 0c5a185665392bde09112b3c843cb39b81972e2a..1417ae0c375c1ba5018f44a240eb733152ab9240 100644 (file)
@@ -44,6 +44,7 @@ using namespace std;
 
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
+#include "messages/MBackfillReserve.h"
 
 #include "messages/MRoute.h"
 #include "messages/MForward.h"
@@ -323,6 +324,9 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
   case MSG_COMMAND_REPLY:
     m = new MCommandReply;
     break;
+  case MSG_OSD_BACKFILL_RESERVE:
+    m = new MBackfillReserve;
+    break;
 
   case MSG_ROUTE:
     m = new MRoute;
index 7b4b0c57bf9ac8c72c312356e031a074197fb80c..560ca0ca06d828b1c47dc38af913bf5890acee2d 100644 (file)
@@ -93,6 +93,8 @@
 #define MSG_COMMAND            97
 #define MSG_COMMAND_REPLY      98
 
+#define MSG_OSD_BACKFILL_RESERVE 99
+
 // *** MDS ***
 
 #define MSG_MDS_BEACON             100  // to monitor
index c6adf0d0b493cf5d0be41c5de4c08c512014674e..606b5c3cc46ae97e10bbbe66350f166b6ff109e3 100644 (file)
@@ -71,6 +71,7 @@
 #include "messages/MOSDPGScan.h"
 #include "messages/MOSDPGBackfill.h"
 #include "messages/MOSDPGMissing.h"
+#include "messages/MBackfillReserve.h"
 
 #include "messages/MOSDAlive.h"
 
@@ -163,6 +164,9 @@ OSDService::OSDService(OSD *osd) :
   watch(NULL),
   last_tid(0),
   tid_lock("OSDService::tid_lock"),
+  reserver_finisher(g_ceph_context),
+  local_reserver(&reserver_finisher, g_conf->osd_max_backfills),
+  remote_reserver(&reserver_finisher, g_conf->osd_max_backfills),
   pg_temp_lock("OSDService::pg_temp_lock"),
   map_cache_lock("OSDService::map_lock"),
   map_cache(g_conf->osd_map_cache_size),
@@ -187,6 +191,7 @@ void OSDService::pg_stat_queue_dequeue(PG *pg)
 
 void OSDService::shutdown()
 {
+  reserver_finisher.stop();
   watch_lock.Lock();
   watch_timer.shutdown();
   watch_lock.Unlock();
@@ -196,6 +201,7 @@ void OSDService::shutdown()
 
 void OSDService::init()
 {
+  reserver_finisher.start();
   watch_timer.init();
   watch = new Watch();
 }
@@ -3147,6 +3153,10 @@ void OSD::dispatch_op(OpRequestRef op)
     handle_pg_backfill(op);
     break;
 
+  case MSG_OSD_BACKFILL_RESERVE:
+    handle_pg_backfill_reserve(op);
+    break;
+
     // client ops
   case CEPH_MSG_OSD_OP:
     handle_op(op);
@@ -4863,6 +4873,42 @@ void OSD::handle_pg_backfill(OpRequestRef op)
   pg->unlock();
 }
 
+void OSD::handle_pg_backfill_reserve(OpRequestRef op)
+{
+  MBackfillReserve *m = static_cast<MBackfillReserve*>(op->request);
+  assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE);
+
+  if (!require_osd_peer(op))
+    return;
+  if (!require_same_or_newer_map(op, m->query_epoch))
+    return;
+
+  PG *pg = 0;
+  if (!_have_pg(m->pgid))
+    return;
+
+  pg = _lookup_lock_pg(m->pgid);
+  assert(pg);
+
+  if (m->type == MBackfillReserve::REQUEST) {
+    pg->queue_peering_event(
+      PG::CephPeeringEvtRef(
+       new PG::CephPeeringEvt(
+         m->query_epoch,
+         m->query_epoch,
+         PG::RequestBackfill())));
+  } else if (m->type == MBackfillReserve::GRANT) {
+    pg->queue_peering_event(
+      PG::CephPeeringEvtRef(
+       new PG::CephPeeringEvt(
+         m->query_epoch,
+         m->query_epoch,
+         PG::RemoteBackfillReserved())));
+  } else {
+    assert(0);
+  }
+  pg->unlock();
+}
 
 /** PGQuery
  * from primary to replica | stray
index 25daf495f8114193ee5e0a2609cda62a5fa096fe..1409158d05457fce36e1e5ea3a735498af0e75cf 100644 (file)
@@ -26,6 +26,7 @@
 #include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "common/LogClient.h"
+#include "common/AsyncReserver.h"
 
 #include "os/ObjectStore.h"
 #include "OSDCap.h"
@@ -255,6 +256,11 @@ public:
     return t;
   }
 
+  // -- backfill_reservation --
+  Finisher reserver_finisher;
+  AsyncReserver<pg_t> local_reserver;
+  AsyncReserver<pg_t> remote_reserver;
+
   // -- pg_temp --
   Mutex pg_temp_lock;
   map<pg_t, vector<int> > pg_temp_wanted;
@@ -904,6 +910,7 @@ protected:
   void handle_pg_scan(OpRequestRef op);
 
   void handle_pg_backfill(OpRequestRef op);
+  void handle_pg_backfill_reserve(OpRequestRef op);
 
   void handle_pg_remove(OpRequestRef op);
   void _remove_pg(PG *pg);
index 204c309825211ae849a3264b33fd22e18a48d378..11dd94d71881c7b4f6f65732e78f8261f9a5b340 100644 (file)
@@ -27,6 +27,7 @@
 #include "messages/MOSDPGTrim.h"
 #include "messages/MOSDPGScan.h"
 #include "messages/MOSDPGBackfill.h"
+#include "messages/MBackfillReserve.h"
 
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
@@ -74,6 +75,8 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   last_peering_reset(0),
   heartbeat_peer_lock("PG::heartbeat_peer_lock"),
   backfill_target(-1),
+  backfill_reserved(0),
+  backfill_reserving(0),
   pg_stats_lock("PG::pg_stats_lock"),
   pg_stats_valid(false),
   osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
@@ -5068,6 +5071,158 @@ void PG::RecoveryState::Peering::exit()
   pg->clear_probe_targets();
 }
 
+/*------Backfilling-------*/
+PG::RecoveryState::Backfilling::Backfilling(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/Backfilling";
+  context< RecoveryMachine >().log_enter(state_name);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->backfill_reserved = true;
+  pg->osd->queue_for_recovery(pg);
+}
+
+void PG::RecoveryState::Backfilling::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->backfill_reserved = false;
+  pg->backfill_reserving = false;
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
+}
+
+/*--WaitRemoteBackfillReserved--*/
+
+PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/WaitRemoteBackfillReserved";
+  context< RecoveryMachine >().log_enter(state_name);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->cluster_messenger->send_message(
+    new MBackfillReserve(
+      MBackfillReserve::REQUEST,
+      pg->info.pgid,
+      pg->get_osdmap()->get_epoch()),
+    pg->get_osdmap()->get_cluster_inst(pg->backfill_target));
+}
+
+void PG::RecoveryState::WaitRemoteBackfillReserved::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+
+/*--WaitLocalBackfillReserved--*/
+template <class EVT>
+struct C_QueuePeeringEvt : Context {
+  boost::intrusive_ptr<PG> pg;
+  epoch_t epoch;
+  EVT evt;
+  C_QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) :
+    pg(pg), epoch(epoch), evt(evt) {}
+  void finish(int r) {
+    pg->lock();
+    pg->queue_peering_event(PG::CephPeeringEvtRef(
+       new PG::CephPeeringEvt(
+         epoch,
+         epoch,
+         evt)));
+    pg->unlock();
+  }
+};
+
+PG::RecoveryState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/WaitLocalBackfillReserved";
+  context< RecoveryMachine >().log_enter(state_name);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->local_reserver.request_reservation(
+    pg->info.pgid,
+    new C_QueuePeeringEvt<LocalBackfillReserved>(
+      pg, pg->get_osdmap()->get_epoch(),
+      LocalBackfillReserved()));
+}
+
+void PG::RecoveryState::WaitLocalBackfillReserved::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+/*----NotBackfilling------*/
+PG::RecoveryState::NotBackfilling::NotBackfilling(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/NotBackfilling";
+  context< RecoveryMachine >().log_enter(state_name);
+}
+
+void PG::RecoveryState::NotBackfilling::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+/*---RepNotBackfilling----*/
+PG::RecoveryState::RepNotBackfilling::RepNotBackfilling(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/RepNotBackfilling";
+  context< RecoveryMachine >().log_enter(state_name);
+}
+
+void PG::RecoveryState::RepNotBackfilling::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+/*-RepWaitBackfillReserved*/
+PG::RecoveryState::RepWaitBackfillReserved::RepWaitBackfillReserved(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/RepWaitBackfillReserved";
+  context< RecoveryMachine >().log_enter(state_name);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->remote_reserver.request_reservation(
+    pg->info.pgid,
+    new C_QueuePeeringEvt<RemoteBackfillReserved>(
+      pg, pg->get_osdmap()->get_epoch(),
+      RemoteBackfillReserved()));
+}
+
+void PG::RecoveryState::RepWaitBackfillReserved::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+}
+
+boost::statechart::result
+PG::RecoveryState::RepWaitBackfillReserved::react(const RemoteBackfillReserved &evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->cluster_messenger->send_message(
+    new MBackfillReserve(
+      MBackfillReserve::GRANT,
+      pg->info.pgid,
+      pg->get_osdmap()->get_epoch()),
+    pg->get_osdmap()->get_cluster_inst(pg->acting[0]));
+  return transit<RepBackfilling>();
+}
+
+/*---RepBackfilling-------*/
+PG::RecoveryState::RepBackfilling::RepBackfilling(my_context ctx)
+  : my_base(ctx)
+{
+  state_name = "Started/Primary/Active/RepBackfilling";
+  context< RecoveryMachine >().log_enter(state_name);
+}
+
+void PG::RecoveryState::RepBackfilling::exit()
+{
+  context< RecoveryMachine >().log_exit(state_name, enter_time);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
+}
+
 /*---------Active---------*/
 PG::RecoveryState::Active::Active(my_context ctx)
   : my_base(ctx)
@@ -5076,6 +5231,8 @@ PG::RecoveryState::Active::Active(my_context ctx)
   context< RecoveryMachine >().log_enter(state_name);
 
   PG *pg = context< RecoveryMachine >().pg;
+  assert(!pg->backfill_reserving);
+  assert(!pg->backfill_reserved);
   assert(pg->is_primary());
   dout(10) << "In Active, about to call activate" << dendl;
   pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
@@ -5299,7 +5456,10 @@ void PG::RecoveryState::Active::exit()
 {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
   PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->local_reserver.cancel_reservation(pg->info.pgid);
 
+  pg->backfill_reserved = false;
+  pg->backfill_reserving = false;
   pg->state_clear(PG_STATE_DEGRADED);
   pg->state_clear(PG_STATE_BACKFILL);
   pg->state_clear(PG_STATE_REPLAY);
@@ -5387,6 +5547,8 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const QuerySta
 void PG::RecoveryState::ReplicaActive::exit()
 {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
+  PG *pg = context< RecoveryMachine >().pg;
+  pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
 }
 
 /*-------Stray---*/
index 25b2f49c7ccfd5b3fc97699bf00ce0cfea51ad1f..bf3b2afab940e029cd7dee4afd2197c74010261b 100644 (file)
@@ -612,6 +612,8 @@ protected:
   BackfillInterval backfill_info;
   BackfillInterval peer_backfill_info;
   int backfill_target;
+  bool backfill_reserved;
+  bool backfill_reserving;
 
   friend class OSD;
 
@@ -1041,6 +1043,10 @@ public:
   TrivialEvent(CheckRepops)
   TrivialEvent(NullEvt)
   TrivialEvent(FlushedEvt)
+  TrivialEvent(Backfilled)
+  TrivialEvent(LocalBackfillReserved)
+  TrivialEvent(RemoteBackfillReserved)
+  TrivialEvent(RequestBackfill)
 
   /* Encapsulates PG recovery process */
   class RecoveryState {
@@ -1275,7 +1281,8 @@ public:
       boost::statechart::result react(const AdvMap &advmap);
     };
 
-    struct Active : boost::statechart::state< Active, Primary >, NamedState {
+    struct NotBackfilling;
+    struct Active : boost::statechart::state< Active, Primary, NotBackfilling >, NamedState {
       Active(my_context ctx);
       void exit();
 
@@ -1286,7 +1293,8 @@ public:
        boost::statechart::custom_reaction< MInfoRec >,
        boost::statechart::custom_reaction< MNotifyRec >,
        boost::statechart::custom_reaction< MLogRec >,
-       boost::statechart::custom_reaction< RecoveryComplete >
+       boost::statechart::custom_reaction< RecoveryComplete >,
+       boost::statechart::custom_reaction< Backfilled >
        > reactions;
       boost::statechart::result react(const QueryState& q);
       boost::statechart::result react(const ActMap&);
@@ -1294,10 +1302,46 @@ public:
       boost::statechart::result react(const MInfoRec& infoevt);
       boost::statechart::result react(const MNotifyRec& notevt);
       boost::statechart::result react(const MLogRec& logevt);
+      boost::statechart::result react(const Backfilled&) {
+       return discard_event();
+      }
       boost::statechart::result react(const RecoveryComplete&);
     };
 
-    struct ReplicaActive : boost::statechart::state< ReplicaActive, Started >, NamedState {
+    struct Backfilling : boost::statechart::state< Backfilling, Active >, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::transition< Backfilled, NotBackfilling >
+       > reactions;
+      Backfilling(my_context ctx);
+      void exit();
+    };
+
+    struct WaitRemoteBackfillReserved : boost::statechart::state< WaitRemoteBackfillReserved, Active >, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::transition< RemoteBackfillReserved, Backfilling >
+       > reactions;
+      WaitRemoteBackfillReserved(my_context ctx);
+      void exit();
+    };
+
+    struct WaitLocalBackfillReserved : boost::statechart::state< WaitLocalBackfillReserved, Active >, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::transition< LocalBackfillReserved, WaitRemoteBackfillReserved >
+       > reactions;
+      WaitLocalBackfillReserved(my_context ctx);
+      void exit();
+    };
+
+    struct NotBackfilling : boost::statechart::state< NotBackfilling, Active>, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved>
+       > reactions;
+      NotBackfilling(my_context ctx);
+      void exit();
+    };
+
+    struct RepNotBackfilling;
+    struct ReplicaActive : boost::statechart::state< ReplicaActive, Started, RepNotBackfilling >, NamedState {
       ReplicaActive(my_context ctx);
       void exit();
 
@@ -1317,6 +1361,31 @@ public:
       boost::statechart::result react(const Activate&);
     };
 
+    struct RepBackfilling : boost::statechart::state< RepBackfilling, ReplicaActive >, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::transition< Backfilled, RepNotBackfilling >
+       > reactions;
+      RepBackfilling(my_context ctx);
+      void exit();
+    };
+
+    struct RepWaitBackfillReserved : boost::statechart::state< RepWaitBackfillReserved, ReplicaActive >, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::custom_reaction< RemoteBackfillReserved >
+       > reactions;
+      RepWaitBackfillReserved(my_context ctx);
+      void exit();
+      boost::statechart::result react(const RemoteBackfillReserved &evt);
+    };
+
+    struct RepNotBackfilling : boost::statechart::state< RepNotBackfilling, ReplicaActive>, NamedState {
+      typedef boost::mpl::list<
+       boost::statechart::transition< RequestBackfill, RepWaitBackfillReserved >
+       > reactions;
+      RepNotBackfilling(my_context ctx);
+      void exit();
+    };
+
     struct Stray : boost::statechart::state< Stray, Started >, NamedState {
       map<int, pair<pg_query_t, epoch_t> > pending_queries;
 
index e3326cae7ea8e8e6775796c13f5b48e984c5eb8f..287a682c8cd662394e7cc00d3af716ee1a72cac1 100644 (file)
@@ -1221,6 +1221,12 @@ void ReplicatedPG::do_backfill(OpRequestRef op)
                                                 get_osdmap()->get_epoch(), m->query_epoch,
                                                 info.pgid);
       osd->cluster_messenger->send_message(reply, m->get_connection());
+      queue_peering_event(
+       CephPeeringEvtRef(
+         new CephPeeringEvt(
+           get_osdmap()->get_epoch(),
+           get_osdmap()->get_epoch(),
+           Backfilled())));
     }
     // fall-thru
 
@@ -6065,6 +6071,19 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
     if (get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL)) {
       dout(10) << "deferring backfill due to NOBACKFILL" << dendl;
       deferred_backfill = true;
+    } else if (!backfill_reserved) {
+      dout(10) << "deferring backfill due to !backfill_reserved" << dendl;
+      if (!backfill_reserving) {
+       dout(10) << "queueing RequestBackfill" << dendl;
+       backfill_reserving = true;
+       queue_peering_event(
+         CephPeeringEvtRef(
+           new CephPeeringEvt(
+             get_osdmap()->get_epoch(),
+             get_osdmap()->get_epoch(),
+             RequestBackfill())));
+      }
+      deferred_backfill = true;
     } else {
       started += recover_backfill(max - started);
     }
@@ -6091,6 +6110,12 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
     return started;
   }
 
+  queue_peering_event(
+    CephPeeringEvtRef(
+      new CephPeeringEvt(
+       get_osdmap()->get_epoch(),
+       get_osdmap()->get_epoch(),
+       Backfilled())));
   handle_recovery_complete(prctx);
 
   return 0;