From: Sage Weil Date: Thu, 24 Aug 2017 18:46:27 +0000 (-0400) Subject: osd: remove mon pg stat reports and queueing machinery X-Git-Tag: v13.0.1~1107^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=528736bed3a6adffa37d0bad6a0e1bedb55db931;p=ceph.git osd: remove mon pg stat reports and queueing machinery We now unconditionally send all pg stats to the mgr at regular intervals. Drop all of the monitor machinery! Signed-off-by: Sage Weil --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f2c5125f47c6..5ce130879197 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -490,16 +490,6 @@ void OSDService::need_heartbeat_peer_update() osd->need_heartbeat_peer_update(); } -void OSDService::pg_stat_queue_enqueue(PG *pg) -{ - osd->pg_stat_queue_enqueue(pg); -} - -void OSDService::pg_stat_queue_dequeue(PG *pg) -{ - osd->pg_stat_queue_dequeue(pg); -} - void OSDService::start_shutdown() { { @@ -1943,13 +1933,9 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, pg_map_lock("OSD::pg_map_lock"), last_pg_create_epoch(0), mon_report_lock("OSD::mon_report_lock"), - stats_ack_timeout(cct->_conf->osd_mon_ack_timeout), up_thru_wanted(0), requested_full_first(0), requested_full_last(0), - pg_stat_queue_lock("OSD::pg_stat_queue_lock"), - osd_stat_updated(false), - pg_stat_tid(0), pg_stat_tid_flushed(0), command_wq( this, cct->_conf->osd_command_thread_timeout, @@ -2553,13 +2539,10 @@ int OSD::init() if (r < 0) goto out; - /** - * FIXME: this is a placeholder implementation that unconditionally - * sends every is_primary PG's stats every time we're called, unlike - * the existing mon PGStats mechanism that uses pg_stat_queue and acks. - * This has equivalent cost to the existing worst case where all - * PGs are busy and their stats are always enqueued for sending. - */ + // This implementation unconditionally sends every is_primary PG's + // stats every time we're called. This has equivalent cost to the + // previous implementation's worst case where all PGs are busy and + // their stats are always enqueued for sending. mgrc.set_pgstats_cb([this](){ RWLock::RLocker l(map_lock); @@ -3261,7 +3244,6 @@ int OSD::shutdown() p->second->osr->flush(); } } - clear_pg_stat_queue(); // drain op queue again (in case PGs requeued something) op_shardedwq.drain(); @@ -3362,11 +3344,6 @@ int OSD::shutdown() } - { - Mutex::Locker l(pg_stat_queue_lock); - assert(pg_stat_queue.empty()); - } - service.shutdown_reserver(); // Remove PGs @@ -5104,60 +5081,12 @@ void OSD::tick_without_osd_lock() Mutex::Locker l(mon_report_lock); // mon report? - bool reset = false; - bool report = false; utime_t now = ceph_clock_now(); - pg_stat_queue_lock.Lock(); - double backoff = stats_ack_timeout / cct->_conf->osd_mon_ack_timeout; - double adjusted_min = cct->_conf->osd_mon_report_interval_min * backoff; - // note: we shouldn't adjust max because it must remain < the - // mon's mon_osd_report_timeout (which defaults to 1.5x our - // value). - double max = cct->_conf->osd_mon_report_interval_max; - if (!outstanding_pg_stats.empty() && - (now - stats_ack_timeout) > last_pg_stats_ack) { - dout(1) << __func__ << " mon hasn't acked PGStats in " - << now - last_pg_stats_ack - << " seconds, reconnecting elsewhere" << dendl; - reset = true; - last_pg_stats_ack = now; // reset clock - last_pg_stats_sent = utime_t(); - stats_ack_timeout = - MAX(cct->_conf->osd_mon_ack_timeout, - stats_ack_timeout * cct->_conf->osd_stats_ack_timeout_factor); - outstanding_pg_stats.clear(); - } - if (now - last_pg_stats_sent > max) { - osd_stat_updated = true; - report = true; - } else if (service.need_fullness_update()) { - report = true; - } else if ((int)outstanding_pg_stats.size() >= - cct->_conf->osd_mon_report_max_in_flight) { - dout(20) << __func__ << " have max " << outstanding_pg_stats - << " stats updates in flight" << dendl; - } else { - if (now - last_mon_report > adjusted_min) { - dout(20) << __func__ << " stats backoff " << backoff - << " adjusted_min " << adjusted_min << " - sending report" - << dendl; - osd_stat_updated = true; - report = true; - } - } - pg_stat_queue_lock.Unlock(); - - if (reset) { - monc->reopen_session(); - } else if (report) { + if (service.need_fullness_update() || + now - last_mon_report > cct->_conf->osd_mon_report_interval_min) { last_mon_report = now; - - // do any pending reports send_full_update(); send_failures(); - if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) { - send_pg_stats(now); - } } map_lock.put_read(); } @@ -5561,9 +5490,6 @@ void OSD::ms_handle_connect(Connection *con) service.send_pg_temp(); requeue_failures(); send_failures(); - if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) { - send_pg_stats(now); - } map_lock.put_read(); if (is_active()) { @@ -6013,162 +5939,6 @@ void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i) monc->send_mon_message(m); } -void OSD::send_pg_stats(const utime_t &now) -{ - assert(map_lock.is_locked()); - assert(osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS); - dout(20) << "send_pg_stats" << dendl; - - osd_stat_t cur_stat = service.get_osd_stat(); - - cur_stat.os_perf_stat = store->get_cur_stats(); - - pg_stat_queue_lock.Lock(); - - if (osd_stat_updated || !pg_stat_queue.empty()) { - last_pg_stats_sent = now; - osd_stat_updated = false; - - dout(10) << "send_pg_stats - " << pg_stat_queue.size() << " pgs updated" << dendl; - - utime_t had_for(now); - had_for -= had_map_since; - - MPGStats *m = new MPGStats(monc->get_fsid(), osdmap->get_epoch(), had_for); - - uint64_t tid = ++pg_stat_tid; - m->set_tid(tid); - m->osd_stat = cur_stat; - - xlist::iterator p = pg_stat_queue.begin(); - while (!p.end()) { - PG *pg = *p; - ++p; - if (!pg->is_primary()) { // we hold map_lock; role is stable. - pg->stat_queue_item.remove_myself(); - pg->put("pg_stat_queue"); - continue; - } - pg->pg_stats_publish_lock.Lock(); - if (pg->pg_stats_publish_valid) { - m->pg_stat[pg->info.pgid.pgid] = pg->pg_stats_publish; - dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":" - << pg->pg_stats_publish.reported_seq << dendl; - } else { - dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":" - << pg->pg_stats_publish.reported_seq << ", not valid" << dendl; - } - pg->pg_stats_publish_lock.Unlock(); - } - - if (last_pg_stats_ack == utime_t() || !outstanding_pg_stats.empty()) { - last_pg_stats_ack = ceph_clock_now(); - } - outstanding_pg_stats.insert(tid); - dout(20) << __func__ << " updates pending: " << outstanding_pg_stats << dendl; - - monc->send_mon_message(m); - } - - pg_stat_queue_lock.Unlock(); -} - -void OSD::handle_pg_stats_ack(MPGStatsAck *ack) -{ - dout(10) << "handle_pg_stats_ack " << dendl; - - if (!require_mon_peer(ack)) { - ack->put(); - return; - } - - // NOTE: we may get replies from a previous mon even while - // outstanding_pg_stats is empty if reconnecting races with replies - // in flight. - - pg_stat_queue_lock.Lock(); - - last_pg_stats_ack = ceph_clock_now(); - - // decay timeout slowly (analogous to TCP) - stats_ack_timeout = - MAX(cct->_conf->osd_mon_ack_timeout, - stats_ack_timeout * cct->_conf->osd_stats_ack_timeout_decay); - dout(20) << __func__ << " timeout now " << stats_ack_timeout << dendl; - - const uint64_t ack_tid = ack->get_tid(); - if (ack_tid > pg_stat_tid_flushed) { - pg_stat_tid_flushed = ack_tid; - pg_stat_queue_cond.Signal(); - } - - xlist::iterator p = pg_stat_queue.begin(); - while (!p.end()) { - PG *pg = *p; - PGRef _pg(pg); - ++p; - - auto acked = ack->pg_stat.find(pg->info.pgid.pgid); - if (acked != ack->pg_stat.end()) { - pg->pg_stats_publish_lock.Lock(); - if (acked->second.first == pg->pg_stats_publish.reported_seq && - acked->second.second == pg->pg_stats_publish.reported_epoch) { - dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch - << ":" << pg->pg_stats_publish.reported_seq << dendl; - pg->stat_queue_item.remove_myself(); - pg->put("pg_stat_queue"); - } else { - dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch - << ":" << pg->pg_stats_publish.reported_seq << " > acked " - << acked->second << dendl; - } - pg->pg_stats_publish_lock.Unlock(); - } else { - dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch - << ":" << pg->pg_stats_publish.reported_seq << dendl; - } - } - - // if there are earlier pg-stats not yet acked, - // this happens if they are not sent successfully. - for (auto tid = outstanding_pg_stats.cbegin(); - tid != outstanding_pg_stats.cend(); ) { - if (*tid <= ack_tid) { - tid = outstanding_pg_stats.erase(tid); - } else { - break; - } - } - dout(20) << __func__ << " still pending: " << outstanding_pg_stats << dendl; - - pg_stat_queue_lock.Unlock(); - - ack->put(); -} - -void OSD::flush_pg_stats() -{ - dout(10) << "flush_pg_stats" << dendl; - osd_lock.Unlock(); - utime_t now = ceph_clock_now(); - map_lock.get_read(); - mon_report_lock.Lock(); - send_pg_stats(now); - mon_report_lock.Unlock(); - map_lock.put_read(); - - - pg_stat_queue_lock.Lock(); - uint64_t tid = pg_stat_tid; - dout(10) << "flush_pg_stats waiting for stats tid " << tid << " to flush" << dendl; - while (tid > pg_stat_tid_flushed) - pg_stat_queue_cond.Wait(pg_stat_queue_lock); - dout(10) << "flush_pg_stats finished waiting for stats tid " << tid << " to flush" << dendl; - pg_stat_queue_lock.Unlock(); - - osd_lock.Lock(); -} - void OSD::send_beacon(const ceph::coarse_mono_clock::time_point& now) { const auto& monmap = monc->monmap; @@ -6647,12 +6417,8 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe } else if (prefix == "flush_pg_stats") { - if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) { - mgrc.send_pgstats(); - ds << service.get_osd_stat_seq() << "\n"; - } else { - flush_pg_stats(); - } + mgrc.send_pgstats(); + ds << service.get_osd_stat_seq() << "\n"; } else if (prefix == "heap") { @@ -7136,10 +6902,6 @@ void OSD::_dispatch(Message *m) break; // osd - case MSG_PGSTATSACK: - handle_pg_stats_ack(static_cast(m)); - break; - case MSG_MON_COMMAND: handle_command(static_cast(m)); break; @@ -7791,13 +7553,6 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m) do_restart = true; } } - if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS && - newmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) { - dout(10) << __func__ << " require_osd_release reached luminous in " - << newmap->get_epoch() << dendl; - clear_pg_stat_queue(); - clear_outstanding_pg_stats(); - } osdmap = newmap; epoch_t up_epoch; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c8deea53090d..9a1cbb671b53 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1006,9 +1006,6 @@ public: void need_heartbeat_peer_update(); - void pg_stat_queue_enqueue(PG *pg); - void pg_stat_queue_dequeue(PG *pg); - void init(); void final_init(); void start_shutdown(); @@ -2029,17 +2026,6 @@ protected: // == monitor interaction == Mutex mon_report_lock; utime_t last_mon_report; - utime_t last_pg_stats_sent; - - /* if our monitor dies, we want to notice it and reconnect. - * So we keep track of when it last acked our stat updates, - * and if too much time passes (and we've been sending - * more updates) then we can call it dead and reconnect - * elsewhere. - */ - utime_t last_pg_stats_ack; - float stats_ack_timeout; - set outstanding_pg_stats; // how many stat updates haven't been acked yet // -- boot -- void start_boot(); @@ -2082,17 +2068,6 @@ protected: void send_failures(); void send_still_alive(epoch_t epoch, const entity_inst_t &i); - // -- pg stats -- - Mutex pg_stat_queue_lock; - Cond pg_stat_queue_cond; - xlist pg_stat_queue; - bool osd_stat_updated; - uint64_t pg_stat_tid, pg_stat_tid_flushed; - - void send_pg_stats(const utime_t &now); - void handle_pg_stats_ack(class MPGStatsAck *ack); - void flush_pg_stats(); - ceph::coarse_mono_clock::time_point last_sent_beacon; Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"}; epoch_t min_last_epoch_clean = 0; @@ -2100,35 +2075,6 @@ protected: std::vector min_last_epoch_clean_pgs; void send_beacon(const ceph::coarse_mono_clock::time_point& now); - void pg_stat_queue_enqueue(PG *pg) { - pg_stat_queue_lock.Lock(); - if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) { - pg->get("pg_stat_queue"); - pg_stat_queue.push_back(&pg->stat_queue_item); - } - osd_stat_updated = true; - pg_stat_queue_lock.Unlock(); - } - void pg_stat_queue_dequeue(PG *pg) { - pg_stat_queue_lock.Lock(); - if (pg->stat_queue_item.remove_myself()) - pg->put("pg_stat_queue"); - pg_stat_queue_lock.Unlock(); - } - void clear_pg_stat_queue() { - pg_stat_queue_lock.Lock(); - while (!pg_stat_queue.empty()) { - PG *pg = pg_stat_queue.front(); - pg_stat_queue.pop_front(); - pg->put("pg_stat_queue"); - } - pg_stat_queue_lock.Unlock(); - } - void clear_outstanding_pg_stats(){ - Mutex::Locker l(pg_stat_queue_lock); - outstanding_pg_stats.clear(); - } - ceph_tid_t get_tid() { return service.get_tid(); } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index b9f62f415d03..0b5839a07322 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2715,7 +2715,6 @@ void PG::publish_stats_to_osd() _update_calc_stats(); _update_blocked_by(); - bool publish = false; pg_stat_t pre_publish = info.stats; pre_publish.stats.add(unstable_stats); utime_t cutoff = now; @@ -2743,10 +2742,6 @@ void PG::publish_stats_to_osd() if ((info.stats.state & PG_STATE_UNDERSIZED) == 0) info.stats.last_fullsized = now; - // do not send pgstat to mon anymore once we are luminous, since mgr takes - // care of this by sending MMonMgrReport to mon. - publish = - osd->osd->get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS; pg_stats_publish_valid = true; pg_stats_publish = pre_publish; @@ -2754,9 +2749,6 @@ void PG::publish_stats_to_osd() << ":" << pg_stats_publish.reported_seq << dendl; } pg_stats_publish_lock.Unlock(); - - if (publish) - osd->pg_stat_queue_enqueue(this); } void PG::clear_publish_stats() @@ -2765,8 +2757,6 @@ void PG::clear_publish_stats() pg_stats_publish_lock.Lock(); pg_stats_publish_valid = false; pg_stats_publish_lock.Unlock(); - - osd->pg_stat_queue_dequeue(this); } /** diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 5b93ff9826ce..1e0cf132e50a 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -10833,7 +10833,6 @@ void PrimaryLogPG::on_shutdown() dout(10) << "on_shutdown" << dendl; // remove from queues - osd->pg_stat_queue_dequeue(this); osd->peering_wq.dequeue(this); // handles queue races