osd->need_heartbeat_peer_update();
}
-void OSDService::pg_stat_queue_enqueue(PG *pg)
-{
- osd->pg_stat_queue_enqueue(pg);
-}
-
-void OSDService::pg_stat_queue_dequeue(PG *pg)
-{
- osd->pg_stat_queue_dequeue(pg);
-}
-
void OSDService::start_shutdown()
{
{
pg_map_lock("OSD::pg_map_lock"),
last_pg_create_epoch(0),
mon_report_lock("OSD::mon_report_lock"),
- stats_ack_timeout(cct->_conf->osd_mon_ack_timeout),
up_thru_wanted(0),
requested_full_first(0),
requested_full_last(0),
- pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
- osd_stat_updated(false),
- pg_stat_tid(0), pg_stat_tid_flushed(0),
command_wq(
this,
cct->_conf->osd_command_thread_timeout,
if (r < 0)
goto out;
- /**
- * FIXME: this is a placeholder implementation that unconditionally
- * sends every is_primary PG's stats every time we're called, unlike
- * the existing mon PGStats mechanism that uses pg_stat_queue and acks.
- * This has equivalent cost to the existing worst case where all
- * PGs are busy and their stats are always enqueued for sending.
- */
+ // This implementation unconditionally sends every is_primary PG's
+ // stats every time we're called. This has equivalent cost to the
+ // previous implementation's worst case where all PGs are busy and
+ // their stats are always enqueued for sending.
mgrc.set_pgstats_cb([this](){
RWLock::RLocker l(map_lock);
p->second->osr->flush();
}
}
- clear_pg_stat_queue();
// drain op queue again (in case PGs requeued something)
op_shardedwq.drain();
}
- {
- Mutex::Locker l(pg_stat_queue_lock);
- assert(pg_stat_queue.empty());
- }
-
service.shutdown_reserver();
// Remove PGs
Mutex::Locker l(mon_report_lock);
// mon report?
- bool reset = false;
- bool report = false;
utime_t now = ceph_clock_now();
- pg_stat_queue_lock.Lock();
- double backoff = stats_ack_timeout / cct->_conf->osd_mon_ack_timeout;
- double adjusted_min = cct->_conf->osd_mon_report_interval_min * backoff;
- // note: we shouldn't adjust max because it must remain < the
- // mon's mon_osd_report_timeout (which defaults to 1.5x our
- // value).
- double max = cct->_conf->osd_mon_report_interval_max;
- if (!outstanding_pg_stats.empty() &&
- (now - stats_ack_timeout) > last_pg_stats_ack) {
- dout(1) << __func__ << " mon hasn't acked PGStats in "
- << now - last_pg_stats_ack
- << " seconds, reconnecting elsewhere" << dendl;
- reset = true;
- last_pg_stats_ack = now; // reset clock
- last_pg_stats_sent = utime_t();
- stats_ack_timeout =
- MAX(cct->_conf->osd_mon_ack_timeout,
- stats_ack_timeout * cct->_conf->osd_stats_ack_timeout_factor);
- outstanding_pg_stats.clear();
- }
- if (now - last_pg_stats_sent > max) {
- osd_stat_updated = true;
- report = true;
- } else if (service.need_fullness_update()) {
- report = true;
- } else if ((int)outstanding_pg_stats.size() >=
- cct->_conf->osd_mon_report_max_in_flight) {
- dout(20) << __func__ << " have max " << outstanding_pg_stats
- << " stats updates in flight" << dendl;
- } else {
- if (now - last_mon_report > adjusted_min) {
- dout(20) << __func__ << " stats backoff " << backoff
- << " adjusted_min " << adjusted_min << " - sending report"
- << dendl;
- osd_stat_updated = true;
- report = true;
- }
- }
- pg_stat_queue_lock.Unlock();
-
- if (reset) {
- monc->reopen_session();
- } else if (report) {
+ if (service.need_fullness_update() ||
+ now - last_mon_report > cct->_conf->osd_mon_report_interval_min) {
last_mon_report = now;
-
- // do any pending reports
send_full_update();
send_failures();
- if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
- send_pg_stats(now);
- }
}
map_lock.put_read();
}
service.send_pg_temp();
requeue_failures();
send_failures();
- if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
- send_pg_stats(now);
- }
map_lock.put_read();
if (is_active()) {
monc->send_mon_message(m);
}
-void OSD::send_pg_stats(const utime_t &now)
-{
- assert(map_lock.is_locked());
- assert(osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS);
- dout(20) << "send_pg_stats" << dendl;
-
- osd_stat_t cur_stat = service.get_osd_stat();
-
- cur_stat.os_perf_stat = store->get_cur_stats();
-
- pg_stat_queue_lock.Lock();
-
- if (osd_stat_updated || !pg_stat_queue.empty()) {
- last_pg_stats_sent = now;
- osd_stat_updated = false;
-
- dout(10) << "send_pg_stats - " << pg_stat_queue.size() << " pgs updated" << dendl;
-
- utime_t had_for(now);
- had_for -= had_map_since;
-
- MPGStats *m = new MPGStats(monc->get_fsid(), osdmap->get_epoch(), had_for);
-
- uint64_t tid = ++pg_stat_tid;
- m->set_tid(tid);
- m->osd_stat = cur_stat;
-
- xlist<PG*>::iterator p = pg_stat_queue.begin();
- while (!p.end()) {
- PG *pg = *p;
- ++p;
- if (!pg->is_primary()) { // we hold map_lock; role is stable.
- pg->stat_queue_item.remove_myself();
- pg->put("pg_stat_queue");
- continue;
- }
- pg->pg_stats_publish_lock.Lock();
- if (pg->pg_stats_publish_valid) {
- m->pg_stat[pg->info.pgid.pgid] = pg->pg_stats_publish;
- dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":"
- << pg->pg_stats_publish.reported_seq << dendl;
- } else {
- dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":"
- << pg->pg_stats_publish.reported_seq << ", not valid" << dendl;
- }
- pg->pg_stats_publish_lock.Unlock();
- }
-
- if (last_pg_stats_ack == utime_t() || !outstanding_pg_stats.empty()) {
- last_pg_stats_ack = ceph_clock_now();
- }
- outstanding_pg_stats.insert(tid);
- dout(20) << __func__ << " updates pending: " << outstanding_pg_stats << dendl;
-
- monc->send_mon_message(m);
- }
-
- pg_stat_queue_lock.Unlock();
-}
-
-void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
-{
- dout(10) << "handle_pg_stats_ack " << dendl;
-
- if (!require_mon_peer(ack)) {
- ack->put();
- return;
- }
-
- // NOTE: we may get replies from a previous mon even while
- // outstanding_pg_stats is empty if reconnecting races with replies
- // in flight.
-
- pg_stat_queue_lock.Lock();
-
- last_pg_stats_ack = ceph_clock_now();
-
- // decay timeout slowly (analogous to TCP)
- stats_ack_timeout =
- MAX(cct->_conf->osd_mon_ack_timeout,
- stats_ack_timeout * cct->_conf->osd_stats_ack_timeout_decay);
- dout(20) << __func__ << " timeout now " << stats_ack_timeout << dendl;
-
- const uint64_t ack_tid = ack->get_tid();
- if (ack_tid > pg_stat_tid_flushed) {
- pg_stat_tid_flushed = ack_tid;
- pg_stat_queue_cond.Signal();
- }
-
- xlist<PG*>::iterator p = pg_stat_queue.begin();
- while (!p.end()) {
- PG *pg = *p;
- PGRef _pg(pg);
- ++p;
-
- auto acked = ack->pg_stat.find(pg->info.pgid.pgid);
- if (acked != ack->pg_stat.end()) {
- pg->pg_stats_publish_lock.Lock();
- if (acked->second.first == pg->pg_stats_publish.reported_seq &&
- acked->second.second == pg->pg_stats_publish.reported_epoch) {
- dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
- << ":" << pg->pg_stats_publish.reported_seq << dendl;
- pg->stat_queue_item.remove_myself();
- pg->put("pg_stat_queue");
- } else {
- dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
- << ":" << pg->pg_stats_publish.reported_seq << " > acked "
- << acked->second << dendl;
- }
- pg->pg_stats_publish_lock.Unlock();
- } else {
- dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
- << ":" << pg->pg_stats_publish.reported_seq << dendl;
- }
- }
-
- // if there are earlier pg-stats not yet acked,
- // this happens if they are not sent successfully.
- for (auto tid = outstanding_pg_stats.cbegin();
- tid != outstanding_pg_stats.cend(); ) {
- if (*tid <= ack_tid) {
- tid = outstanding_pg_stats.erase(tid);
- } else {
- break;
- }
- }
- dout(20) << __func__ << " still pending: " << outstanding_pg_stats << dendl;
-
- pg_stat_queue_lock.Unlock();
-
- ack->put();
-}
-
-void OSD::flush_pg_stats()
-{
- dout(10) << "flush_pg_stats" << dendl;
- osd_lock.Unlock();
- utime_t now = ceph_clock_now();
- map_lock.get_read();
- mon_report_lock.Lock();
- send_pg_stats(now);
- mon_report_lock.Unlock();
- map_lock.put_read();
-
-
- pg_stat_queue_lock.Lock();
- uint64_t tid = pg_stat_tid;
- dout(10) << "flush_pg_stats waiting for stats tid " << tid << " to flush" << dendl;
- while (tid > pg_stat_tid_flushed)
- pg_stat_queue_cond.Wait(pg_stat_queue_lock);
- dout(10) << "flush_pg_stats finished waiting for stats tid " << tid << " to flush" << dendl;
- pg_stat_queue_lock.Unlock();
-
- osd_lock.Lock();
-}
-
void OSD::send_beacon(const ceph::coarse_mono_clock::time_point& now)
{
const auto& monmap = monc->monmap;
}
else if (prefix == "flush_pg_stats") {
- if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- mgrc.send_pgstats();
- ds << service.get_osd_stat_seq() << "\n";
- } else {
- flush_pg_stats();
- }
+ mgrc.send_pgstats();
+ ds << service.get_osd_stat_seq() << "\n";
}
else if (prefix == "heap") {
break;
// osd
- case MSG_PGSTATSACK:
- handle_pg_stats_ack(static_cast<MPGStatsAck*>(m));
- break;
-
case MSG_MON_COMMAND:
handle_command(static_cast<MMonCommand*>(m));
break;
do_restart = true;
}
}
- if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS &&
- newmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- dout(10) << __func__ << " require_osd_release reached luminous in "
- << newmap->get_epoch() << dendl;
- clear_pg_stat_queue();
- clear_outstanding_pg_stats();
- }
osdmap = newmap;
epoch_t up_epoch;
void need_heartbeat_peer_update();
- void pg_stat_queue_enqueue(PG *pg);
- void pg_stat_queue_dequeue(PG *pg);
-
void init();
void final_init();
void start_shutdown();
// == monitor interaction ==
Mutex mon_report_lock;
utime_t last_mon_report;
- utime_t last_pg_stats_sent;
-
- /* if our monitor dies, we want to notice it and reconnect.
- * So we keep track of when it last acked our stat updates,
- * and if too much time passes (and we've been sending
- * more updates) then we can call it dead and reconnect
- * elsewhere.
- */
- utime_t last_pg_stats_ack;
- float stats_ack_timeout;
- set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
// -- boot --
void start_boot();
void send_failures();
void send_still_alive(epoch_t epoch, const entity_inst_t &i);
- // -- pg stats --
- Mutex pg_stat_queue_lock;
- Cond pg_stat_queue_cond;
- xlist<PG*> pg_stat_queue;
- bool osd_stat_updated;
- uint64_t pg_stat_tid, pg_stat_tid_flushed;
-
- void send_pg_stats(const utime_t &now);
- void handle_pg_stats_ack(class MPGStatsAck *ack);
- void flush_pg_stats();
-
ceph::coarse_mono_clock::time_point last_sent_beacon;
Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
epoch_t min_last_epoch_clean = 0;
std::vector<pg_t> min_last_epoch_clean_pgs;
void send_beacon(const ceph::coarse_mono_clock::time_point& now);
- void pg_stat_queue_enqueue(PG *pg) {
- pg_stat_queue_lock.Lock();
- if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
- pg->get("pg_stat_queue");
- pg_stat_queue.push_back(&pg->stat_queue_item);
- }
- osd_stat_updated = true;
- pg_stat_queue_lock.Unlock();
- }
- void pg_stat_queue_dequeue(PG *pg) {
- pg_stat_queue_lock.Lock();
- if (pg->stat_queue_item.remove_myself())
- pg->put("pg_stat_queue");
- pg_stat_queue_lock.Unlock();
- }
- void clear_pg_stat_queue() {
- pg_stat_queue_lock.Lock();
- while (!pg_stat_queue.empty()) {
- PG *pg = pg_stat_queue.front();
- pg_stat_queue.pop_front();
- pg->put("pg_stat_queue");
- }
- pg_stat_queue_lock.Unlock();
- }
- void clear_outstanding_pg_stats(){
- Mutex::Locker l(pg_stat_queue_lock);
- outstanding_pg_stats.clear();
- }
-
ceph_tid_t get_tid() {
return service.get_tid();
}