From: Samuel Just Date: Tue, 26 Apr 2016 01:47:41 +0000 (-0700) Subject: OSD: allow recovery pausing and deferral X-Git-Tag: v11.0.0~465^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2c50e230c1aa32b14cdc2e91ed70e6e48147343c;p=ceph.git OSD: allow recovery pausing and deferral Signed-off-by: Samuel Just --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 2da949a2edd8..5aaded0be7fd 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -745,8 +745,8 @@ OPTION(osd_default_data_pool_replay_window, OPT_INT, 45) OPTION(osd_preserve_trimmed_log, OPT_BOOL, false) OPTION(osd_auto_mark_unfound_lost, OPT_BOOL, false) OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) -OPTION(osd_recovery_max_active, OPT_INT, 3) -OPTION(osd_recovery_max_single_start, OPT_INT, 1) +OPTION(osd_recovery_max_active, OPT_U64, 3) +OPTION(osd_recovery_max_single_start, OPT_U64, 1) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20) // max size of a COPYFROM chunk OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c57a776e8daa..7d057509e7ae 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -172,8 +172,7 @@ void PGQueueable::RunVis::operator()(const PGScrub &op) { } void PGQueueable::RunVis::operator()(const PGRecovery &op) { - /// TODO: need to handle paused recovery - return osd->do_recovery(pg.get(), op.epoch_queued, handle); + return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle); } //Initial features in new superblock. @@ -258,6 +257,10 @@ OSDService::OSDService(OSD *osd) : remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills, cct->_conf->osd_min_recovery_priority), pg_temp_lock("OSDService::pg_temp_lock"), + recovery_lock("OSDService::recovery_lock"), + recovery_ops_active(0), + recovery_ops_reserved(0), + recovery_paused(false), map_cache_lock("OSDService::map_cache_lock"), map_cache(cct, cct->_conf->osd_map_cache_size), map_bl_cache(cct->_conf->osd_map_cache_size), @@ -499,6 +502,9 @@ void OSDService::init() agent_timer.init(); agent_thread.create("osd_srv_agent"); + + if (cct->_conf->osd_recovery_delay_start) + defer_recovery(cct->_conf->osd_recovery_delay_start); } void OSDService::final_init() @@ -1654,7 +1660,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards), disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"), command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1), - paused_recovery(false), session_waiting_lock("OSD::session_waiting_lock"), heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"), @@ -1697,8 +1702,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_command_thread_timeout, cct->_conf->osd_command_thread_suicide_timeout, &command_tp), - recovery_lock("OSD::recovery_lock"), - recovery_ops_active(0), replay_queue_lock("OSD::replay_queue_lock"), remove_wq( store, @@ -4344,6 +4347,8 @@ void OSD::tick() check_ops_in_flight(); + service.kick_recovery_queue(); + tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this)); } @@ -5750,9 +5755,6 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe cct->_conf->apply_changes(NULL); ss << "kicking recovery queue. set osd_recovery_delay_start " << "to " << cct->_conf->osd_recovery_delay_start; - defer_recovery_until = ceph_clock_now(cct); - defer_recovery_until += cct->_conf->osd_recovery_delay_start; - /// TODO } else if (prefix == "cpu_profiler") { @@ -7355,16 +7357,14 @@ void OSD::activate_map() // norecover? if (osdmap->test_flag(CEPH_OSDMAP_NORECOVER)) { - if (!paused_recovery) { + if (!service.recovery_is_paused()) { dout(1) << "pausing recovery (NORECOVER flag set)" << dendl; - paused_recovery = true; - /// TODO + service.pause_recovery(); } } else { - if (paused_recovery) { - dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl; - /// TODO - paused_recovery = false; + if (service.recovery_is_paused()) { + dout(1) << "unpausing recovery (NORECOVER flag set)" << dendl; + service.unpause_recovery(); } } @@ -8344,23 +8344,52 @@ void OSD::check_replay_queue() } } -bool OSD::_recover_now() +void OSDService::_maybe_queue_recovery() { + assert(recovery_lock.is_locked_by_me()); + uint64_t available_pushes; + while (!awaiting_throttle.empty() && + _recover_now(&available_pushes)) { + uint64_t to_start = MIN( + available_pushes, + cct->_conf->osd_recovery_max_single_start); + _queue_for_recovery(awaiting_throttle.front(), to_start); + awaiting_throttle.pop_front(); + recovery_ops_reserved += to_start; + } +} + +bool OSDService::_recover_now(uint64_t *available_pushes) { - if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) { + uint64_t max = cct->_conf->osd_recovery_max_active; + if (max <= recovery_ops_active + recovery_ops_reserved) { dout(15) << "_recover_now active " << recovery_ops_active - << " >= max " << cct->_conf->osd_recovery_max_active << dendl; + << " + reserved " << recovery_ops_reserved + << " >= max " << max << dendl; + if (available_pushes) + *available_pushes = 0; return false; } + + if (available_pushes) + *available_pushes = max - recovery_ops_active - recovery_ops_reserved; + if (ceph_clock_now(cct) < defer_recovery_until) { dout(15) << "_recover_now defer until " << defer_recovery_until << dendl; return false; } + if (recovery_paused) { + dout(15) << "_recover_now paused" << dendl; + return false; + } return true; } -void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle) +void OSD::do_recovery( + PG *pg, epoch_t queued, uint64_t reserved_pushes, + ThreadPool::TPHandle &handle) { + uint64_t started = 0; if (g_conf->osd_recovery_sleep > 0) { handle.suspend_tp_timeout(); utime_t t; @@ -8370,43 +8399,26 @@ void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle) dout(20) << __func__ << " slept for " << t << dendl; } - // see how many we should try to start. note that this is a bit racy. - recovery_lock.Lock(); - int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active, - cct->_conf->osd_recovery_max_single_start); - if (max > 0) { - dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active - << " rops)" << dendl; - recovery_ops_active += max; // take them now, return them if we don't use them. - } else { - dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active - << " rops)" << dendl; - } - recovery_lock.Unlock(); - - if (max <= 0) { - dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl; - service.queue_for_recovery(pg, true); - return; - } else { - pg->lock_suspend_timeout(handle); - if (pg->pg_has_reset_since(queued) || - pg->deleting || !(pg->is_peered() && pg->is_primary())) { - pg->unlock(); + { + if (pg->pg_has_reset_since(queued)) { goto out; } + assert(!pg->deleting); + assert(pg->is_peered() && pg->is_primary()); + assert(pg->recovery_queued); pg->recovery_queued = false; - - dout(10) << "do_recovery starting " << max << " " << *pg << dendl; + + dout(10) << "do_recovery starting " << reserved_pushes << " " << *pg << dendl; #ifdef DEBUG_RECOVERY_OIDS - dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl; + dout(20) << " active was " << service.recovery_oids[pg->info.pgid] << dendl; #endif - int started = 0; - bool more = pg->start_recovery_ops(max, handle, &started); - dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl; + bool more = pg->start_recovery_ops(reserved_pushes, handle, &started); + dout(10) << "do_recovery started " << started << "/" << reserved_pushes + << " on " << *pg << dendl; + // If no recovery op is started, don't bother to manipulate the RecoveryCtx if (!started && (more || !pg->have_unfound())) { goto out; @@ -8433,26 +8445,21 @@ void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle) pg->write_if_dirty(*rctx.transaction); OSDMapRef curmap = pg->get_osdmap(); - pg->unlock(); dispatch_context(rctx, pg, curmap); } out: - recovery_lock.Lock(); - if (max > 0) { - assert(recovery_ops_active >= max); - recovery_ops_active -= max; - } - recovery_lock.Unlock(); + assert(started <= reserved_pushes); + service.release_reserved_pushes(reserved_pushes); } -void OSD::start_recovery_op(PG *pg, const hobject_t& soid) +void OSDService::start_recovery_op(PG *pg, const hobject_t& soid) { - recovery_lock.Lock(); + Mutex::Locker l(recovery_lock); dout(10) << "start_recovery_op " << *pg << " " << soid - << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)" + << " (" << recovery_ops_active << "/" + << cct->_conf->osd_recovery_max_active << " rops)" << dendl; - assert(recovery_ops_active >= 0); recovery_ops_active++; #ifdef DEBUG_RECOVERY_OIDS @@ -8460,21 +8467,19 @@ void OSD::start_recovery_op(PG *pg, const hobject_t& soid) assert(recovery_oids[pg->info.pgid].count(soid) == 0); recovery_oids[pg->info.pgid].insert(soid); #endif - - recovery_lock.Unlock(); } -void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) +void OSDService::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) { - recovery_lock.Lock(); + Mutex::Locker l(recovery_lock); dout(10) << "finish_recovery_op " << *pg << " " << soid << " dequeue=" << dequeue << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)" << dendl; // adjust count + assert(recovery_ops_active > 0); recovery_ops_active--; - assert(recovery_ops_active >= 0); #ifdef DEBUG_RECOVERY_OIDS dout(20) << " active oids was " << recovery_oids[pg->info.pgid] << dendl; @@ -8482,7 +8487,7 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) recovery_oids[pg->info.pgid].erase(soid); #endif - recovery_lock.Unlock(); + _maybe_queue_recovery(); } // ========================================================= @@ -9018,6 +9023,7 @@ const char** OSD::get_tracked_conf_keys() const "clog_to_graylog_port", "host", "fsid", + "osd_recovery_delay_start", NULL }; return KEYS; @@ -9067,6 +9073,7 @@ void OSD::handle_conf_change(const struct md_config_t *conf, changed.count("fsid")) { update_log_config(); } + #ifdef HAVE_LIBFUSE if (changed.count("osd_objectstore_fuse")) { if (store) { @@ -9074,6 +9081,12 @@ void OSD::handle_conf_change(const struct md_config_t *conf, } } #endif + + if (changed.count("osd_recovery_delay_start")) { + service.defer_recovery(cct->_conf->osd_recovery_delay_start); + service.kick_recovery_queue(); + } + check_config(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 7b5d025382d0..4c3d29755587 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -347,9 +347,12 @@ struct PGSnapTrim { struct PGRecovery { epoch_t epoch_queued; - PGRecovery(epoch_t e) : epoch_queued(e) {} + uint64_t reserved_pushes; + PGRecovery(epoch_t e, uint64_t reserved_pushes) + : epoch_queued(e), reserved_pushes(reserved_pushes) {} ostream &operator<<(ostream &rhs) { - return rhs << "PGRecovery"; + return rhs << "PGRecovery(epoch=" << epoch_queued + << ", reserved_pushes: " << reserved_pushes << ")"; } }; @@ -404,6 +407,10 @@ public: const OpRequestRef *op = boost::get(&qvariant); return op ? OpRequestRef(*op) : boost::optional(); } + uint64_t get_reserved_pushes() const { + const PGRecovery *op = boost::get(&qvariant); + return op ? op->reserved_pushes : 0; + } void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { RunVis v(osd, pg, handle); boost::apply_visitor(v, qvariant); @@ -883,21 +890,6 @@ public: void send_pg_temp(); void queue_for_peering(PG *pg); - void queue_for_recovery(PG *pg, bool front = false) { - pair to_queue = make_pair( - pg, - PGQueueable( - PGRecovery(pg->get_osdmap()->get_epoch()), - cct->_conf->osd_recovery_cost, - cct->_conf->osd_recovery_priority, - ceph_clock_now(cct), - entity_inst_t())); - if (front) { - op_wq.queue_front(to_queue); - } else { - op_wq.queue(to_queue); - } - } void queue_for_snap_trim(PG *pg) { op_wq.queue( make_pair( @@ -921,6 +913,86 @@ public: entity_inst_t()))); } + // -- pg recovery and associated throttling -- + Mutex recovery_lock; + list > awaiting_throttle; + + utime_t defer_recovery_until; + uint64_t recovery_ops_active; + uint64_t recovery_ops_reserved; + bool recovery_paused; +#ifdef DEBUG_RECOVERY_OIDS + map > recovery_oids; +#endif + void start_recovery_op(PG *pg, const hobject_t& soid); + void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); + bool _recover_now(uint64_t *available_pushes); + void _maybe_queue_recovery(); + void release_reserved_pushes(uint64_t pushes) { + Mutex::Locker l(recovery_lock); + assert(recovery_ops_reserved >= pushes); + recovery_ops_reserved -= pushes; + _maybe_queue_recovery(); + } + void defer_recovery(float defer_for) { + defer_recovery_until = ceph_clock_now(cct); + defer_recovery_until += defer_for; + } + void pause_recovery() { + Mutex::Locker l(recovery_lock); + recovery_paused = true; + } + bool recovery_is_paused() { + Mutex::Locker l(recovery_lock); + return recovery_paused; + } + void unpause_recovery() { + Mutex::Locker l(recovery_lock); + recovery_paused = false; + _maybe_queue_recovery(); + } + void kick_recovery_queue() { + Mutex::Locker l(recovery_lock); + _maybe_queue_recovery(); + } + void clear_queued_recovery(PG *pg, bool front = false) { + Mutex::Locker l(recovery_lock); + for (list >::iterator i = awaiting_throttle.begin(); + i != awaiting_throttle.end(); + ) { + if (i->second.get() == pg) { + awaiting_throttle.erase(i++); + return; + } else { + ++i; + } + } + } + // replay / delayed pg activation + void queue_for_recovery(PG *pg, bool front = false) { + Mutex::Locker l(recovery_lock); + if (front) { + awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg)); + } else { + awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg)); + } + _maybe_queue_recovery(); + } + + void _queue_for_recovery( + pair p, uint64_t reserved_pushes) { + assert(recovery_lock.is_locked_by_me()); + pair to_queue = make_pair( + p.second, + PGQueueable( + PGRecovery(p.first, reserved_pushes), + cct->_conf->osd_recovery_cost, + cct->_conf->osd_recovery_priority, + ceph_clock_now(cct), + entity_inst_t())); + op_wq.queue(to_queue); + } + // osd map cache (past osd maps) Mutex map_cache_lock; SharedLRU map_cache; @@ -1310,8 +1382,6 @@ private: ThreadPool disk_tp; ThreadPool command_tp; - bool paused_recovery; - void set_disk_tp_priority(); void get_latest_osdmap(); @@ -1772,7 +1842,11 @@ private: struct Pred { PG *pg; list *out_ops; - void accumulate(PGQueueable &op) { + uint64_t reserved_pushes_to_free; + Pred(PG *pg, list *out_ops = 0) + : pg(pg), out_ops(out_ops), reserved_pushes_to_free(0) {} + void accumulate(const PGQueueable &op) { + reserved_pushes_to_free += op.get_reserved_pushes(); if (out_ops) { boost::optional mop = op.maybe_get_op(); if (mop) @@ -1787,6 +1861,9 @@ private: return false; } } + uint64_t get_reserved_pushes_to_free() const { + return reserved_pushes_to_free; + } }; void dequeue(PG *pg) { @@ -1818,6 +1895,7 @@ private: } sdata->sdata_op_ordering_lock.Unlock(); + osd->service.release_reserved_pushes(f.get_reserved_pushes_to_free()); } bool is_shard_empty(uint32_t thread_index) { @@ -2248,19 +2326,9 @@ protected: void do_command(Connection *con, ceph_tid_t tid, vector& cmd, bufferlist& data); // -- pg recovery -- - Mutex recovery_lock; - utime_t defer_recovery_until; - int recovery_ops_active; -#ifdef DEBUG_RECOVERY_OIDS - map > recovery_oids; -#endif - - void start_recovery_op(PG *pg, const hobject_t& soid); - void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); - void do_recovery(PG *pg, epoch_t epoch_queued, ThreadPool::TPHandle &handle); - bool _recover_now(); + void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved, + ThreadPool::TPHandle &handle); - // replay / delayed pg activation Mutex replay_queue_lock; list< pair > replay_queue; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index a8c78164d6c2..03c4df954203 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2221,8 +2221,7 @@ void PG::start_recovery_op(const hobject_t& soid) assert(recovering_oids.count(soid) == 0); recovering_oids.insert(soid); #endif - // TODOSAM: osd->osd-> not good - osd->osd->start_recovery_op(this, soid); + osd->start_recovery_op(this, soid); } void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) @@ -2238,10 +2237,10 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) assert(recovering_oids.count(soid)); recovering_oids.erase(soid); #endif - // TODOSAM: osd->osd-> not good - osd->osd->finish_recovery_op(this, soid, dequeue); + osd->finish_recovery_op(this, soid, dequeue); if (!dequeue) { + queue_recovery(); } } diff --git a/src/osd/PG.h b/src/osd/PG.h index f86bcfb5aede..568f5ea442bd 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1099,8 +1099,9 @@ public: * @returns true if any useful work was accomplished; false otherwise */ virtual bool start_recovery_ops( - int max, ThreadPool::TPHandle &handle, - int *ops_begun) = 0; + uint64_t max, + ThreadPool::TPHandle &handle, + uint64_t *ops_begun) = 0; void purge_strays(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 5a1f037b2e0d..1dcbec021e24 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -10123,6 +10123,11 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) hit_set_clear(); } + if (recovery_queued) { + recovery_queued = false; + osd->clear_queued_recovery(this); + } + // requeue everything in the reverse order they should be // reexamined. requeue_ops(waiting_for_peered); @@ -10370,10 +10375,11 @@ void PG::MissingLoc::check_recovery_sources(const OSDMapRef osdmap) bool ReplicatedPG::start_recovery_ops( - int max, ThreadPool::TPHandle &handle, - int *ops_started) + uint64_t max, + ThreadPool::TPHandle &handle, + uint64_t *ops_started) { - int& started = *ops_started; + uint64_t& started = *ops_started; started = 0; bool work_in_progress = false; assert(is_primary()); @@ -10516,7 +10522,7 @@ bool ReplicatedPG::start_recovery_ops( * do one recovery op. * return true if done, false if nothing left to do. */ -int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) +uint64_t ReplicatedPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handle) { assert(is_primary()); @@ -10529,7 +10535,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle) // look at log! pg_log_entry_t *latest = 0; - int started = 0; + unsigned started = 0; int skipped = 0; PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); @@ -10734,10 +10740,10 @@ int ReplicatedPG::prep_object_replica_pushes( return 1; } -int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle) +uint64_t ReplicatedPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &handle) { dout(10) << __func__ << "(" << max << ")" << dendl; - int started = 0; + uint64_t started = 0; PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op(); @@ -10876,8 +10882,8 @@ bool ReplicatedPG::all_peer_done() const * io created objects since the last scan. For this reason, we call * update_range() again before continuing backfill. */ -int ReplicatedPG::recover_backfill( - int max, +uint64_t ReplicatedPG::recover_backfill( + uint64_t max, ThreadPool::TPHandle &handle, bool *work_started) { dout(10) << "recover_backfill (" << max << ")" @@ -10938,7 +10944,7 @@ int ReplicatedPG::recover_backfill( backfill_info.begin = last_backfill_started; update_range(&backfill_info, handle); - int ops = 0; + unsigned ops = 0; vector > > to_push; vector > to_remove; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index d0cb85535eeb..fa70a505ee2a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -1240,18 +1240,19 @@ protected: void _clear_recovery_state(); bool start_recovery_ops( - int max, ThreadPool::TPHandle &handle, int *started); + uint64_t max, + ThreadPool::TPHandle &handle, uint64_t *started); - int recover_primary(int max, ThreadPool::TPHandle &handle); - int recover_replicas(int max, ThreadPool::TPHandle &handle); + uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle); + uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle); hobject_t earliest_peer_backfill() const; bool all_peer_done() const; /** * @param work_started will be set to true if recover_backfill got anywhere * @returns the number of operations started */ - int recover_backfill(int max, ThreadPool::TPHandle &handle, - bool *work_started); + uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle, + bool *work_started); /** * scan a (hash) range of objects in the current pg