OPTION(osd_preserve_trimmed_log, OPT_BOOL, false)
OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false)
OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
-OPTION(osd_recovery_max_active, OPT_INT, 3)
-OPTION(osd_recovery_max_single_start, OPT_INT, 1)
+OPTION(osd_recovery_max_active, OPT_U64, 3)
+OPTION(osd_recovery_max_single_start, OPT_U64, 1)
OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk
OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20) // max size of a COPYFROM chunk
OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object
}
void PGQueueable::RunVis::operator()(const PGRecovery &op) {
- /// TODO: need to handle paused recovery
- return osd->do_recovery(pg.get(), op.epoch_queued, handle);
+ return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
}
//Initial features in new superblock.
remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
pg_temp_lock("OSDService::pg_temp_lock"),
+ recovery_lock("OSDService::recovery_lock"),
+ recovery_ops_active(0),
+ recovery_ops_reserved(0),
+ recovery_paused(false),
map_cache_lock("OSDService::map_cache_lock"),
map_cache(cct, cct->_conf->osd_map_cache_size),
map_bl_cache(cct->_conf->osd_map_cache_size),
agent_timer.init();
agent_thread.create("osd_srv_agent");
+
+ if (cct->_conf->osd_recovery_delay_start)
+ defer_recovery(cct->_conf->osd_recovery_delay_start);
}
void OSDService::final_init()
cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
- paused_recovery(false),
session_waiting_lock("OSD::session_waiting_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"),
cct->_conf->osd_command_thread_timeout,
cct->_conf->osd_command_thread_suicide_timeout,
&command_tp),
- recovery_lock("OSD::recovery_lock"),
- recovery_ops_active(0),
replay_queue_lock("OSD::replay_queue_lock"),
remove_wq(
store,
check_ops_in_flight();
+ service.kick_recovery_queue();
+
tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
}
cct->_conf->apply_changes(NULL);
ss << "kicking recovery queue. set osd_recovery_delay_start "
<< "to " << cct->_conf->osd_recovery_delay_start;
- defer_recovery_until = ceph_clock_now(cct);
- defer_recovery_until += cct->_conf->osd_recovery_delay_start;
- /// TODO
}
else if (prefix == "cpu_profiler") {
// norecover?
if (osdmap->test_flag(CEPH_OSDMAP_NORECOVER)) {
- if (!paused_recovery) {
+ if (!service.recovery_is_paused()) {
dout(1) << "pausing recovery (NORECOVER flag set)" << dendl;
- paused_recovery = true;
- /// TODO
+ service.pause_recovery();
}
} else {
- if (paused_recovery) {
- dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl;
- /// TODO
- paused_recovery = false;
+ if (service.recovery_is_paused()) {
+ dout(1) << "unpausing recovery (NORECOVER flag set)" << dendl;
+ service.unpause_recovery();
}
}
}
}
-bool OSD::_recover_now()
+void OSDService::_maybe_queue_recovery() {
+ assert(recovery_lock.is_locked_by_me());
+ uint64_t available_pushes;
+ while (!awaiting_throttle.empty() &&
+ _recover_now(&available_pushes)) {
+ uint64_t to_start = MIN(
+ available_pushes,
+ cct->_conf->osd_recovery_max_single_start);
+ _queue_for_recovery(awaiting_throttle.front(), to_start);
+ awaiting_throttle.pop_front();
+ recovery_ops_reserved += to_start;
+ }
+}
+
+bool OSDService::_recover_now(uint64_t *available_pushes)
{
- if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) {
+ uint64_t max = cct->_conf->osd_recovery_max_active;
+ if (max <= recovery_ops_active + recovery_ops_reserved) {
dout(15) << "_recover_now active " << recovery_ops_active
- << " >= max " << cct->_conf->osd_recovery_max_active << dendl;
+ << " + reserved " << recovery_ops_reserved
+ << " >= max " << max << dendl;
+ if (available_pushes)
+ *available_pushes = 0;
return false;
}
+
+ if (available_pushes)
+ *available_pushes = max - recovery_ops_active - recovery_ops_reserved;
+
if (ceph_clock_now(cct) < defer_recovery_until) {
dout(15) << "_recover_now defer until " << defer_recovery_until << dendl;
return false;
}
+ if (recovery_paused) {
+ dout(15) << "_recover_now paused" << dendl;
+ return false;
+ }
return true;
}
-void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle)
+void OSD::do_recovery(
+ PG *pg, epoch_t queued, uint64_t reserved_pushes,
+ ThreadPool::TPHandle &handle)
{
+ uint64_t started = 0;
if (g_conf->osd_recovery_sleep > 0) {
handle.suspend_tp_timeout();
utime_t t;
dout(20) << __func__ << " slept for " << t << dendl;
}
- // see how many we should try to start. note that this is a bit racy.
- recovery_lock.Lock();
- int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active,
- cct->_conf->osd_recovery_max_single_start);
- if (max > 0) {
- dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
- << " rops)" << dendl;
- recovery_ops_active += max; // take them now, return them if we don't use them.
- } else {
- dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
- << " rops)" << dendl;
- }
- recovery_lock.Unlock();
-
- if (max <= 0) {
- dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
- service.queue_for_recovery(pg, true);
- return;
- } else {
- pg->lock_suspend_timeout(handle);
- if (pg->pg_has_reset_since(queued) ||
- pg->deleting || !(pg->is_peered() && pg->is_primary())) {
- pg->unlock();
+ {
+ if (pg->pg_has_reset_since(queued)) {
goto out;
}
+ assert(!pg->deleting);
+ assert(pg->is_peered() && pg->is_primary());
+
assert(pg->recovery_queued);
pg->recovery_queued = false;
-
- dout(10) << "do_recovery starting " << max << " " << *pg << dendl;
+
+ dout(10) << "do_recovery starting " << reserved_pushes << " " << *pg << dendl;
#ifdef DEBUG_RECOVERY_OIDS
- dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl;
+ dout(20) << " active was " << service.recovery_oids[pg->info.pgid] << dendl;
#endif
- int started = 0;
- bool more = pg->start_recovery_ops(max, handle, &started);
- dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
+ bool more = pg->start_recovery_ops(reserved_pushes, handle, &started);
+ dout(10) << "do_recovery started " << started << "/" << reserved_pushes
+ << " on " << *pg << dendl;
+
// If no recovery op is started, don't bother to manipulate the RecoveryCtx
if (!started && (more || !pg->have_unfound())) {
goto out;
pg->write_if_dirty(*rctx.transaction);
OSDMapRef curmap = pg->get_osdmap();
- pg->unlock();
dispatch_context(rctx, pg, curmap);
}
out:
- recovery_lock.Lock();
- if (max > 0) {
- assert(recovery_ops_active >= max);
- recovery_ops_active -= max;
- }
- recovery_lock.Unlock();
+ assert(started <= reserved_pushes);
+ service.release_reserved_pushes(reserved_pushes);
}
-void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
+void OSDService::start_recovery_op(PG *pg, const hobject_t& soid)
{
- recovery_lock.Lock();
+ Mutex::Locker l(recovery_lock);
dout(10) << "start_recovery_op " << *pg << " " << soid
- << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
+ << " (" << recovery_ops_active << "/"
+ << cct->_conf->osd_recovery_max_active << " rops)"
<< dendl;
- assert(recovery_ops_active >= 0);
recovery_ops_active++;
#ifdef DEBUG_RECOVERY_OIDS
assert(recovery_oids[pg->info.pgid].count(soid) == 0);
recovery_oids[pg->info.pgid].insert(soid);
#endif
-
- recovery_lock.Unlock();
}
-void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
+void OSDService::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
{
- recovery_lock.Lock();
+ Mutex::Locker l(recovery_lock);
dout(10) << "finish_recovery_op " << *pg << " " << soid
<< " dequeue=" << dequeue
<< " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
<< dendl;
// adjust count
+ assert(recovery_ops_active > 0);
recovery_ops_active--;
- assert(recovery_ops_active >= 0);
#ifdef DEBUG_RECOVERY_OIDS
dout(20) << " active oids was " << recovery_oids[pg->info.pgid] << dendl;
recovery_oids[pg->info.pgid].erase(soid);
#endif
- recovery_lock.Unlock();
+ _maybe_queue_recovery();
}
// =========================================================
"clog_to_graylog_port",
"host",
"fsid",
+ "osd_recovery_delay_start",
NULL
};
return KEYS;
changed.count("fsid")) {
update_log_config();
}
+
#ifdef HAVE_LIBFUSE
if (changed.count("osd_objectstore_fuse")) {
if (store) {
}
}
#endif
+
+ if (changed.count("osd_recovery_delay_start")) {
+ service.defer_recovery(cct->_conf->osd_recovery_delay_start);
+ service.kick_recovery_queue();
+ }
+
check_config();
}
struct PGRecovery {
epoch_t epoch_queued;
- PGRecovery(epoch_t e) : epoch_queued(e) {}
+ uint64_t reserved_pushes;
+ PGRecovery(epoch_t e, uint64_t reserved_pushes)
+ : epoch_queued(e), reserved_pushes(reserved_pushes) {}
ostream &operator<<(ostream &rhs) {
- return rhs << "PGRecovery";
+ return rhs << "PGRecovery(epoch=" << epoch_queued
+ << ", reserved_pushes: " << reserved_pushes << ")";
}
};
const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
}
+ uint64_t get_reserved_pushes() const {
+ const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
+ return op ? op->reserved_pushes : 0;
+ }
void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
RunVis v(osd, pg, handle);
boost::apply_visitor(v, qvariant);
void send_pg_temp();
void queue_for_peering(PG *pg);
- void queue_for_recovery(PG *pg, bool front = false) {
- pair<PGRef, PGQueueable> to_queue = make_pair(
- pg,
- PGQueueable(
- PGRecovery(pg->get_osdmap()->get_epoch()),
- cct->_conf->osd_recovery_cost,
- cct->_conf->osd_recovery_priority,
- ceph_clock_now(cct),
- entity_inst_t()));
- if (front) {
- op_wq.queue_front(to_queue);
- } else {
- op_wq.queue(to_queue);
- }
- }
void queue_for_snap_trim(PG *pg) {
op_wq.queue(
make_pair(
entity_inst_t())));
}
+ // -- pg recovery and associated throttling --
+ Mutex recovery_lock;
+ list<pair<epoch_t, PGRef> > awaiting_throttle;
+
+ utime_t defer_recovery_until;
+ uint64_t recovery_ops_active;
+ uint64_t recovery_ops_reserved;
+ bool recovery_paused;
+#ifdef DEBUG_RECOVERY_OIDS
+ map<spg_t, set<hobject_t, hobject_t::BitwiseComparator> > recovery_oids;
+#endif
+ void start_recovery_op(PG *pg, const hobject_t& soid);
+ void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
+ bool _recover_now(uint64_t *available_pushes);
+ void _maybe_queue_recovery();
+ void release_reserved_pushes(uint64_t pushes) {
+ Mutex::Locker l(recovery_lock);
+ assert(recovery_ops_reserved >= pushes);
+ recovery_ops_reserved -= pushes;
+ _maybe_queue_recovery();
+ }
+ void defer_recovery(float defer_for) {
+ defer_recovery_until = ceph_clock_now(cct);
+ defer_recovery_until += defer_for;
+ }
+ void pause_recovery() {
+ Mutex::Locker l(recovery_lock);
+ recovery_paused = true;
+ }
+ bool recovery_is_paused() {
+ Mutex::Locker l(recovery_lock);
+ return recovery_paused;
+ }
+ void unpause_recovery() {
+ Mutex::Locker l(recovery_lock);
+ recovery_paused = false;
+ _maybe_queue_recovery();
+ }
+ void kick_recovery_queue() {
+ Mutex::Locker l(recovery_lock);
+ _maybe_queue_recovery();
+ }
+ void clear_queued_recovery(PG *pg, bool front = false) {
+ Mutex::Locker l(recovery_lock);
+ for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
+ i != awaiting_throttle.end();
+ ) {
+ if (i->second.get() == pg) {
+ awaiting_throttle.erase(i++);
+ return;
+ } else {
+ ++i;
+ }
+ }
+ }
+ // replay / delayed pg activation
+ void queue_for_recovery(PG *pg, bool front = false) {
+ Mutex::Locker l(recovery_lock);
+ if (front) {
+ awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
+ } else {
+ awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
+ }
+ _maybe_queue_recovery();
+ }
+
+ void _queue_for_recovery(
+ pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
+ assert(recovery_lock.is_locked_by_me());
+ pair<PGRef, PGQueueable> to_queue = make_pair(
+ p.second,
+ PGQueueable(
+ PGRecovery(p.first, reserved_pushes),
+ cct->_conf->osd_recovery_cost,
+ cct->_conf->osd_recovery_priority,
+ ceph_clock_now(cct),
+ entity_inst_t()));
+ op_wq.queue(to_queue);
+ }
+
// osd map cache (past osd maps)
Mutex map_cache_lock;
SharedLRU<epoch_t, const OSDMap> map_cache;
ThreadPool disk_tp;
ThreadPool command_tp;
- bool paused_recovery;
-
void set_disk_tp_priority();
void get_latest_osdmap();
struct Pred {
PG *pg;
list<OpRequestRef> *out_ops;
- void accumulate(PGQueueable &op) {
+ uint64_t reserved_pushes_to_free;
+ Pred(PG *pg, list<OpRequestRef> *out_ops = 0)
+ : pg(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
+ void accumulate(const PGQueueable &op) {
+ reserved_pushes_to_free += op.get_reserved_pushes();
if (out_ops) {
boost::optional<OpRequestRef> mop = op.maybe_get_op();
if (mop)
return false;
}
}
+ uint64_t get_reserved_pushes_to_free() const {
+ return reserved_pushes_to_free;
+ }
};
void dequeue(PG *pg) {
}
sdata->sdata_op_ordering_lock.Unlock();
+ osd->service.release_reserved_pushes(f.get_reserved_pushes_to_free());
}
bool is_shard_empty(uint32_t thread_index) {
void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
// -- pg recovery --
- Mutex recovery_lock;
- utime_t defer_recovery_until;
- int recovery_ops_active;
-#ifdef DEBUG_RECOVERY_OIDS
- map<spg_t, set<hobject_t, hobject_t::BitwiseComparator> > recovery_oids;
-#endif
-
- void start_recovery_op(PG *pg, const hobject_t& soid);
- void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
- void do_recovery(PG *pg, epoch_t epoch_queued, ThreadPool::TPHandle &handle);
- bool _recover_now();
+ void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
+ ThreadPool::TPHandle &handle);
- // replay / delayed pg activation
Mutex replay_queue_lock;
list< pair<spg_t, utime_t > > replay_queue;
assert(recovering_oids.count(soid) == 0);
recovering_oids.insert(soid);
#endif
- // TODOSAM: osd->osd-> not good
- osd->osd->start_recovery_op(this, soid);
+ osd->start_recovery_op(this, soid);
}
void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
assert(recovering_oids.count(soid));
recovering_oids.erase(soid);
#endif
- // TODOSAM: osd->osd-> not good
- osd->osd->finish_recovery_op(this, soid, dequeue);
+ osd->finish_recovery_op(this, soid, dequeue);
if (!dequeue) {
+ queue_recovery();
}
}
* @returns true if any useful work was accomplished; false otherwise
*/
virtual bool start_recovery_ops(
- int max, ThreadPool::TPHandle &handle,
- int *ops_begun) = 0;
+ uint64_t max,
+ ThreadPool::TPHandle &handle,
+ uint64_t *ops_begun) = 0;
void purge_strays();
hit_set_clear();
}
+ if (recovery_queued) {
+ recovery_queued = false;
+ osd->clear_queued_recovery(this);
+ }
+
// requeue everything in the reverse order they should be
// reexamined.
requeue_ops(waiting_for_peered);
bool ReplicatedPG::start_recovery_ops(
- int max, ThreadPool::TPHandle &handle,
- int *ops_started)
+ uint64_t max,
+ ThreadPool::TPHandle &handle,
+ uint64_t *ops_started)
{
- int& started = *ops_started;
+ uint64_t& started = *ops_started;
started = 0;
bool work_in_progress = false;
assert(is_primary());
* do one recovery op.
* return true if done, false if nothing left to do.
*/
-int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
+uint64_t ReplicatedPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handle)
{
assert(is_primary());
// look at log!
pg_log_entry_t *latest = 0;
- int started = 0;
+ unsigned started = 0;
int skipped = 0;
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
return 1;
}
-int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
+uint64_t ReplicatedPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &handle)
{
dout(10) << __func__ << "(" << max << ")" << dendl;
- int started = 0;
+ uint64_t started = 0;
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
* io created objects since the last scan. For this reason, we call
* update_range() again before continuing backfill.
*/
-int ReplicatedPG::recover_backfill(
- int max,
+uint64_t ReplicatedPG::recover_backfill(
+ uint64_t max,
ThreadPool::TPHandle &handle, bool *work_started)
{
dout(10) << "recover_backfill (" << max << ")"
backfill_info.begin = last_backfill_started;
update_range(&backfill_info, handle);
- int ops = 0;
+ unsigned ops = 0;
vector<boost::tuple<hobject_t, eversion_t,
ObjectContextRef, vector<pg_shard_t> > > to_push;
vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
void _clear_recovery_state();
bool start_recovery_ops(
- int max, ThreadPool::TPHandle &handle, int *started);
+ uint64_t max,
+ ThreadPool::TPHandle &handle, uint64_t *started);
- int recover_primary(int max, ThreadPool::TPHandle &handle);
- int recover_replicas(int max, ThreadPool::TPHandle &handle);
+ uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
+ uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
hobject_t earliest_peer_backfill() const;
bool all_peer_done() const;
/**
* @param work_started will be set to true if recover_backfill got anywhere
* @returns the number of operations started
*/
- int recover_backfill(int max, ThreadPool::TPHandle &handle,
- bool *work_started);
+ uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
+ bool *work_started);
/**
* scan a (hash) range of objects in the current pg