]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: allow recovery pausing and deferral
authorSamuel Just <sjust@redhat.com>
Tue, 26 Apr 2016 01:47:41 +0000 (18:47 -0700)
committerSamuel Just <sjust@redhat.com>
Wed, 18 May 2016 19:58:47 +0000 (12:58 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 2da949a2edd8437e8aa57d0d009bcc58622312f9..5aaded0be7fd10959d1fc5834723211f28d79474 100644 (file)
@@ -745,8 +745,8 @@ OPTION(osd_default_data_pool_replay_window, OPT_INT, 45)
 OPTION(osd_preserve_trimmed_log, OPT_BOOL, false)
 OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false)
 OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
-OPTION(osd_recovery_max_active, OPT_INT, 3)
-OPTION(osd_recovery_max_single_start, OPT_INT, 1)
+OPTION(osd_recovery_max_active, OPT_U64, 3)
+OPTION(osd_recovery_max_single_start, OPT_U64, 1)
 OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20)  // max size of push chunk
 OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20)   // max size of a COPYFROM chunk
 OPTION(osd_push_per_object_cost, OPT_U64, 1000)  // push cost per object
index c57a776e8daa69b25a2d8e15648e40de0a456642..7d057509e7aec3539212dd9a03249763a826fac9 100644 (file)
@@ -172,8 +172,7 @@ void PGQueueable::RunVis::operator()(const PGScrub &op) {
 }
 
 void PGQueueable::RunVis::operator()(const PGRecovery &op) {
-  /// TODO: need to handle paused recovery
-  return osd->do_recovery(pg.get(), op.epoch_queued, handle);
+  return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
 }
 
 //Initial features in new superblock.
@@ -258,6 +257,10 @@ OSDService::OSDService(OSD *osd) :
   remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
                  cct->_conf->osd_min_recovery_priority),
   pg_temp_lock("OSDService::pg_temp_lock"),
+  recovery_lock("OSDService::recovery_lock"),
+  recovery_ops_active(0),
+  recovery_ops_reserved(0),
+  recovery_paused(false),
   map_cache_lock("OSDService::map_cache_lock"),
   map_cache(cct, cct->_conf->osd_map_cache_size),
   map_bl_cache(cct->_conf->osd_map_cache_size),
@@ -499,6 +502,9 @@ void OSDService::init()
   agent_timer.init();
 
   agent_thread.create("osd_srv_agent");
+
+  if (cct->_conf->osd_recovery_delay_start)
+    defer_recovery(cct->_conf->osd_recovery_delay_start);
 }
 
 void OSDService::final_init()
@@ -1654,7 +1660,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
   disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
   command_tp(cct, "OSD::command_tp", "tp_osd_cmd",  1),
-  paused_recovery(false),
   session_waiting_lock("OSD::session_waiting_lock"),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"),
@@ -1697,8 +1702,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_command_thread_timeout,
     cct->_conf->osd_command_thread_suicide_timeout,
     &command_tp),
-  recovery_lock("OSD::recovery_lock"),
-  recovery_ops_active(0),
   replay_queue_lock("OSD::replay_queue_lock"),
   remove_wq(
     store,
@@ -4344,6 +4347,8 @@ void OSD::tick()
 
   check_ops_in_flight();
 
+  service.kick_recovery_queue();
+
   tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
 }
 
@@ -5750,9 +5755,6 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     cct->_conf->apply_changes(NULL);
     ss << "kicking recovery queue. set osd_recovery_delay_start "
        << "to " << cct->_conf->osd_recovery_delay_start;
-    defer_recovery_until = ceph_clock_now(cct);
-    defer_recovery_until += cct->_conf->osd_recovery_delay_start;
-    /// TODO
   }
 
   else if (prefix == "cpu_profiler") {
@@ -7355,16 +7357,14 @@ void OSD::activate_map()
 
   // norecover?
   if (osdmap->test_flag(CEPH_OSDMAP_NORECOVER)) {
-    if (!paused_recovery) {
+    if (!service.recovery_is_paused()) {
       dout(1) << "pausing recovery (NORECOVER flag set)" << dendl;
-      paused_recovery = true;
-      /// TODO
+      service.pause_recovery();
     }
   } else {
-    if (paused_recovery) {
-      dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl;
-      /// TODO
-      paused_recovery = false;
+    if (service.recovery_is_paused()) {
+      dout(1) << "unpausing recovery (NORECOVER flag set)" << dendl;
+      service.unpause_recovery();
     }
   }
 
@@ -8344,23 +8344,52 @@ void OSD::check_replay_queue()
   }
 }
 
-bool OSD::_recover_now()
+void OSDService::_maybe_queue_recovery() {
+  assert(recovery_lock.is_locked_by_me());
+  uint64_t available_pushes;
+  while (!awaiting_throttle.empty() &&
+        _recover_now(&available_pushes)) {
+    uint64_t to_start = MIN(
+      available_pushes,
+      cct->_conf->osd_recovery_max_single_start);
+    _queue_for_recovery(awaiting_throttle.front(), to_start);
+    awaiting_throttle.pop_front();
+    recovery_ops_reserved += to_start;
+  }
+}
+
+bool OSDService::_recover_now(uint64_t *available_pushes)
 {
-  if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) {
+  uint64_t max = cct->_conf->osd_recovery_max_active;
+  if (max <= recovery_ops_active + recovery_ops_reserved) {
     dout(15) << "_recover_now active " << recovery_ops_active
-            << " >= max " << cct->_conf->osd_recovery_max_active << dendl;
+            << " + reserved " << recovery_ops_reserved
+            << " >= max " << max << dendl;
+    if (available_pushes)
+      *available_pushes = 0;
     return false;
   }
+
+  if (available_pushes)
+    *available_pushes = max - recovery_ops_active - recovery_ops_reserved;
+
   if (ceph_clock_now(cct) < defer_recovery_until) {
     dout(15) << "_recover_now defer until " << defer_recovery_until << dendl;
     return false;
   }
 
+  if (recovery_paused) {
+    dout(15) << "_recover_now paused" << dendl;
+    return false;
+  }
   return true;
 }
 
-void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle)
+void OSD::do_recovery(
+  PG *pg, epoch_t queued, uint64_t reserved_pushes,
+  ThreadPool::TPHandle &handle)
 {
+  uint64_t started = 0;
   if (g_conf->osd_recovery_sleep > 0) {
     handle.suspend_tp_timeout();
     utime_t t;
@@ -8370,43 +8399,26 @@ void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle)
     dout(20) << __func__ << " slept for " << t << dendl;
   }
 
-  // see how many we should try to start.  note that this is a bit racy.
-  recovery_lock.Lock();
-  int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active,
-      cct->_conf->osd_recovery_max_single_start);
-  if (max > 0) {
-    dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
-            << " rops)" << dendl;
-    recovery_ops_active += max;  // take them now, return them if we don't use them.
-  } else {
-    dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
-            << " rops)" << dendl;
-  }
-  recovery_lock.Unlock();
-
-  if (max <= 0) {
-    dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
-    service.queue_for_recovery(pg, true);
-    return;
-  } else {
-    pg->lock_suspend_timeout(handle);
-    if (pg->pg_has_reset_since(queued) ||
-       pg->deleting || !(pg->is_peered() && pg->is_primary())) {
-      pg->unlock();
+  {
+    if (pg->pg_has_reset_since(queued)) {
       goto out;
     }
 
+    assert(!pg->deleting);
+    assert(pg->is_peered() && pg->is_primary());
+
     assert(pg->recovery_queued);
     pg->recovery_queued = false;
-    
-    dout(10) << "do_recovery starting " << max << " " << *pg << dendl;
+
+    dout(10) << "do_recovery starting " << reserved_pushes << " " << *pg << dendl;
 #ifdef DEBUG_RECOVERY_OIDS
-    dout(20) << "  active was " << recovery_oids[pg->info.pgid] << dendl;
+    dout(20) << "  active was " << service.recovery_oids[pg->info.pgid] << dendl;
 #endif
 
-    int started = 0;
-    bool more = pg->start_recovery_ops(max, handle, &started);
-    dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
+    bool more = pg->start_recovery_ops(reserved_pushes, handle, &started);
+    dout(10) << "do_recovery started " << started << "/" << reserved_pushes 
+            << " on " << *pg << dendl;
+
     // If no recovery op is started, don't bother to manipulate the RecoveryCtx
     if (!started && (more || !pg->have_unfound())) {
       goto out;
@@ -8433,26 +8445,21 @@ void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle)
 
     pg->write_if_dirty(*rctx.transaction);
     OSDMapRef curmap = pg->get_osdmap();
-    pg->unlock();
     dispatch_context(rctx, pg, curmap);
   }
 
  out:
-  recovery_lock.Lock();
-  if (max > 0) {
-    assert(recovery_ops_active >= max);
-    recovery_ops_active -= max;
-  }
-  recovery_lock.Unlock();
+  assert(started <= reserved_pushes);
+  service.release_reserved_pushes(reserved_pushes);
 }
 
-void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
+void OSDService::start_recovery_op(PG *pg, const hobject_t& soid)
 {
-  recovery_lock.Lock();
+  Mutex::Locker l(recovery_lock);
   dout(10) << "start_recovery_op " << *pg << " " << soid
-          << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
+          << " (" << recovery_ops_active << "/"
+          << cct->_conf->osd_recovery_max_active << " rops)"
           << dendl;
-  assert(recovery_ops_active >= 0);
   recovery_ops_active++;
 
 #ifdef DEBUG_RECOVERY_OIDS
@@ -8460,21 +8467,19 @@ void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
   assert(recovery_oids[pg->info.pgid].count(soid) == 0);
   recovery_oids[pg->info.pgid].insert(soid);
 #endif
-
-  recovery_lock.Unlock();
 }
 
-void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
+void OSDService::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
 {
-  recovery_lock.Lock();
+  Mutex::Locker l(recovery_lock);
   dout(10) << "finish_recovery_op " << *pg << " " << soid
           << " dequeue=" << dequeue
           << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
           << dendl;
 
   // adjust count
+  assert(recovery_ops_active > 0);
   recovery_ops_active--;
-  assert(recovery_ops_active >= 0);
 
 #ifdef DEBUG_RECOVERY_OIDS
   dout(20) << "  active oids was " << recovery_oids[pg->info.pgid] << dendl;
@@ -8482,7 +8487,7 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
   recovery_oids[pg->info.pgid].erase(soid);
 #endif
 
-  recovery_lock.Unlock();
+  _maybe_queue_recovery();
 }
 
 // =========================================================
@@ -9018,6 +9023,7 @@ const char** OSD::get_tracked_conf_keys() const
     "clog_to_graylog_port",
     "host",
     "fsid",
+    "osd_recovery_delay_start",
     NULL
   };
   return KEYS;
@@ -9067,6 +9073,7 @@ void OSD::handle_conf_change(const struct md_config_t *conf,
       changed.count("fsid")) {
     update_log_config();
   }
+
 #ifdef HAVE_LIBFUSE
   if (changed.count("osd_objectstore_fuse")) {
     if (store) {
@@ -9074,6 +9081,12 @@ void OSD::handle_conf_change(const struct md_config_t *conf,
     }
   }
 #endif
+
+  if (changed.count("osd_recovery_delay_start")) {
+    service.defer_recovery(cct->_conf->osd_recovery_delay_start);
+    service.kick_recovery_queue();
+  }
+
   check_config();
 }
 
index 7b5d025382d0ed261559e94a0ab22c47edd585aa..4c3d29755587e708b3aecb35b164e1155d703e7c 100644 (file)
@@ -347,9 +347,12 @@ struct PGSnapTrim {
 
 struct PGRecovery {
   epoch_t epoch_queued;
-  PGRecovery(epoch_t e) : epoch_queued(e) {}
+  uint64_t reserved_pushes;
+  PGRecovery(epoch_t e, uint64_t reserved_pushes)
+    : epoch_queued(e), reserved_pushes(reserved_pushes) {}
   ostream &operator<<(ostream &rhs) {
-    return rhs << "PGRecovery";
+    return rhs << "PGRecovery(epoch=" << epoch_queued
+              << ", reserved_pushes: " << reserved_pushes << ")";
   }
 };
 
@@ -404,6 +407,10 @@ public:
     const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
     return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
   }
+  uint64_t get_reserved_pushes() const {
+    const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
+    return op ? op->reserved_pushes : 0;
+  }
   void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
     RunVis v(osd, pg, handle);
     boost::apply_visitor(v, qvariant);
@@ -883,21 +890,6 @@ public:
   void send_pg_temp();
 
   void queue_for_peering(PG *pg);
-  void queue_for_recovery(PG *pg, bool front = false) {
-    pair<PGRef, PGQueueable> to_queue = make_pair(
-      pg,
-      PGQueueable(
-       PGRecovery(pg->get_osdmap()->get_epoch()),
-       cct->_conf->osd_recovery_cost,
-       cct->_conf->osd_recovery_priority,
-       ceph_clock_now(cct),
-       entity_inst_t()));
-    if (front) {
-      op_wq.queue_front(to_queue);
-    } else {
-      op_wq.queue(to_queue);
-    }
-  }
   void queue_for_snap_trim(PG *pg) {
     op_wq.queue(
       make_pair(
@@ -921,6 +913,86 @@ public:
          entity_inst_t())));
   }
 
+  // -- pg recovery and associated throttling --
+  Mutex recovery_lock;
+  list<pair<epoch_t, PGRef> > awaiting_throttle;
+
+  utime_t defer_recovery_until;
+  uint64_t recovery_ops_active;
+  uint64_t recovery_ops_reserved;
+  bool recovery_paused;
+#ifdef DEBUG_RECOVERY_OIDS
+  map<spg_t, set<hobject_t, hobject_t::BitwiseComparator> > recovery_oids;
+#endif
+  void start_recovery_op(PG *pg, const hobject_t& soid);
+  void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
+  bool _recover_now(uint64_t *available_pushes);
+  void _maybe_queue_recovery();
+  void release_reserved_pushes(uint64_t pushes) {
+    Mutex::Locker l(recovery_lock);
+    assert(recovery_ops_reserved >= pushes);
+    recovery_ops_reserved -= pushes;
+    _maybe_queue_recovery();
+  }
+  void defer_recovery(float defer_for) {
+    defer_recovery_until = ceph_clock_now(cct);
+    defer_recovery_until += defer_for;
+  }
+  void pause_recovery() {
+    Mutex::Locker l(recovery_lock);
+    recovery_paused = true;
+  }
+  bool recovery_is_paused() {
+    Mutex::Locker l(recovery_lock);
+    return recovery_paused;
+  }
+  void unpause_recovery() {
+    Mutex::Locker l(recovery_lock);
+    recovery_paused = false;
+    _maybe_queue_recovery();
+  }
+  void kick_recovery_queue() {
+    Mutex::Locker l(recovery_lock);
+    _maybe_queue_recovery();
+  }
+  void clear_queued_recovery(PG *pg, bool front = false) {
+    Mutex::Locker l(recovery_lock);
+    for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
+        i != awaiting_throttle.end();
+      ) {
+      if (i->second.get() == pg) {
+       awaiting_throttle.erase(i++);
+       return;
+      } else {
+       ++i;
+      }
+    }
+  }
+  // replay / delayed pg activation
+  void queue_for_recovery(PG *pg, bool front = false) {
+    Mutex::Locker l(recovery_lock);
+    if (front) {
+      awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
+    } else {
+      awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
+    }
+    _maybe_queue_recovery();
+  }
+
+  void _queue_for_recovery(
+    pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
+    assert(recovery_lock.is_locked_by_me());
+    pair<PGRef, PGQueueable> to_queue = make_pair(
+      p.second,
+      PGQueueable(
+       PGRecovery(p.first, reserved_pushes),
+       cct->_conf->osd_recovery_cost,
+       cct->_conf->osd_recovery_priority,
+       ceph_clock_now(cct),
+       entity_inst_t()));
+    op_wq.queue(to_queue);
+  }
+
   // osd map cache (past osd maps)
   Mutex map_cache_lock;
   SharedLRU<epoch_t, const OSDMap> map_cache;
@@ -1310,8 +1382,6 @@ private:
   ThreadPool disk_tp;
   ThreadPool command_tp;
 
-  bool paused_recovery;
-
   void set_disk_tp_priority();
   void get_latest_osdmap();
 
@@ -1772,7 +1842,11 @@ private:
     struct Pred {
       PG *pg;
       list<OpRequestRef> *out_ops;
-      void accumulate(PGQueueable &op) {
+      uint64_t reserved_pushes_to_free;
+      Pred(PG *pg, list<OpRequestRef> *out_ops = 0)
+       : pg(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
+      void accumulate(const PGQueueable &op) {
+       reserved_pushes_to_free += op.get_reserved_pushes();
        if (out_ops) {
          boost::optional<OpRequestRef> mop = op.maybe_get_op();
          if (mop)
@@ -1787,6 +1861,9 @@ private:
          return false;
        }
       }
+      uint64_t get_reserved_pushes_to_free() const {
+       return reserved_pushes_to_free;
+      }
     };
 
     void dequeue(PG *pg) {
@@ -1818,6 +1895,7 @@ private:
       }
 
       sdata->sdata_op_ordering_lock.Unlock();
+      osd->service.release_reserved_pushes(f.get_reserved_pushes_to_free());
     }
  
     bool is_shard_empty(uint32_t thread_index) {
@@ -2248,19 +2326,9 @@ protected:
   void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
 
   // -- pg recovery --
-  Mutex recovery_lock;
-  utime_t defer_recovery_until;
-  int recovery_ops_active;
-#ifdef DEBUG_RECOVERY_OIDS
-  map<spg_t, set<hobject_t, hobject_t::BitwiseComparator> > recovery_oids;
-#endif
-
-  void start_recovery_op(PG *pg, const hobject_t& soid);
-  void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
-  void do_recovery(PG *pg, epoch_t epoch_queued, ThreadPool::TPHandle &handle);
-  bool _recover_now();
+  void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
+                  ThreadPool::TPHandle &handle);
 
-  // replay / delayed pg activation
   Mutex replay_queue_lock;
   list< pair<spg_t, utime_t > > replay_queue;
   
index a8c78164d6c2f02121763b1462f7be3183b5ac27..03c4df954203ae867c1c02e322c8dd95eb561dcd 100644 (file)
@@ -2221,8 +2221,7 @@ void PG::start_recovery_op(const hobject_t& soid)
   assert(recovering_oids.count(soid) == 0);
   recovering_oids.insert(soid);
 #endif
-  // TODOSAM: osd->osd-> not good
-  osd->osd->start_recovery_op(this, soid);
+  osd->start_recovery_op(this, soid);
 }
 
 void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
@@ -2238,10 +2237,10 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
   assert(recovering_oids.count(soid));
   recovering_oids.erase(soid);
 #endif
-  // TODOSAM: osd->osd-> not good
-  osd->osd->finish_recovery_op(this, soid, dequeue);
+  osd->finish_recovery_op(this, soid, dequeue);
 
   if (!dequeue) {
+    queue_recovery();
   }
 }
 
index f86bcfb5aede9ed9951b9420eccbe329954bc0df..568f5ea442bd14b7c718798e7a40efd9b5b43a81 100644 (file)
@@ -1099,8 +1099,9 @@ public:
    * @returns true if any useful work was accomplished; false otherwise
    */
   virtual bool start_recovery_ops(
-    int max, ThreadPool::TPHandle &handle,
-    int *ops_begun) = 0;
+    uint64_t max,
+    ThreadPool::TPHandle &handle,
+    uint64_t *ops_begun) = 0;
 
   void purge_strays();
 
index 5a1f037b2e0dcda696ae91b05bee91dc9bc26468..1dcbec021e241c03e4a2645c67486d5418b6577f 100644 (file)
@@ -10123,6 +10123,11 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
     hit_set_clear();
   }
 
+  if (recovery_queued) {
+    recovery_queued = false;
+    osd->clear_queued_recovery(this);
+  }
+
   // requeue everything in the reverse order they should be
   // reexamined.
   requeue_ops(waiting_for_peered);
@@ -10370,10 +10375,11 @@ void PG::MissingLoc::check_recovery_sources(const OSDMapRef osdmap)
   
 
 bool ReplicatedPG::start_recovery_ops(
-  int max, ThreadPool::TPHandle &handle,
-  int *ops_started)
+  uint64_t max,
+  ThreadPool::TPHandle &handle,
+  uint64_t *ops_started)
 {
-  int& started = *ops_started;
+  uint64_t& started = *ops_started;
   started = 0;
   bool work_in_progress = false;
   assert(is_primary());
@@ -10516,7 +10522,7 @@ bool ReplicatedPG::start_recovery_ops(
  * do one recovery op.
  * return true if done, false if nothing left to do.
  */
-int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
+uint64_t ReplicatedPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handle)
 {
   assert(is_primary());
 
@@ -10529,7 +10535,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
 
   // look at log!
   pg_log_entry_t *latest = 0;
-  int started = 0;
+  unsigned started = 0;
   int skipped = 0;
 
   PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
@@ -10734,10 +10740,10 @@ int ReplicatedPG::prep_object_replica_pushes(
   return 1;
 }
 
-int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
+uint64_t ReplicatedPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &handle)
 {
   dout(10) << __func__ << "(" << max << ")" << dendl;
-  int started = 0;
+  uint64_t started = 0;
 
   PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
 
@@ -10876,8 +10882,8 @@ bool ReplicatedPG::all_peer_done() const
  * io created objects since the last scan.  For this reason, we call
  * update_range() again before continuing backfill.
  */
-int ReplicatedPG::recover_backfill(
-  int max,
+uint64_t ReplicatedPG::recover_backfill(
+  uint64_t max,
   ThreadPool::TPHandle &handle, bool *work_started)
 {
   dout(10) << "recover_backfill (" << max << ")"
@@ -10938,7 +10944,7 @@ int ReplicatedPG::recover_backfill(
   backfill_info.begin = last_backfill_started;
   update_range(&backfill_info, handle);
 
-  int ops = 0;
+  unsigned ops = 0;
   vector<boost::tuple<hobject_t, eversion_t,
                       ObjectContextRef, vector<pg_shard_t> > > to_push;
   vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
index d0cb85535eebc680783b9c52a3db021399e9c6a4..fa70a505ee2aefc97a713ec1b4f233ac81752b96 100644 (file)
@@ -1240,18 +1240,19 @@ protected:
   void _clear_recovery_state();
 
   bool start_recovery_ops(
-    int max, ThreadPool::TPHandle &handle, int *started);
+    uint64_t max,
+    ThreadPool::TPHandle &handle, uint64_t *started);
 
-  int recover_primary(int max, ThreadPool::TPHandle &handle);
-  int recover_replicas(int max, ThreadPool::TPHandle &handle);
+  uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
+  uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
   hobject_t earliest_peer_backfill() const;
   bool all_peer_done() const;
   /**
    * @param work_started will be set to true if recover_backfill got anywhere
    * @returns the number of operations started
    */
-  int recover_backfill(int max, ThreadPool::TPHandle &handle,
-                       bool *work_started);
+  uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
+                           bool *work_started);
 
   /**
    * scan a (hash) range of objects in the current pg