]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: remove mon pg stat reports and queueing machinery
authorSage Weil <sage@redhat.com>
Thu, 24 Aug 2017 18:46:27 +0000 (14:46 -0400)
committerSage Weil <sage@redhat.com>
Tue, 29 Aug 2017 03:10:32 +0000 (23:10 -0400)
We now unconditionally send all pg stats to the mgr at regular
intervals.  Drop all of the monitor machinery!

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PrimaryLogPG.cc

index f2c5125f47c6addf6235367255de36f5b4c89528..5ce130879197c9d615264202a95eddd29ecee147 100644 (file)
@@ -490,16 +490,6 @@ void OSDService::need_heartbeat_peer_update()
   osd->need_heartbeat_peer_update();
 }
 
-void OSDService::pg_stat_queue_enqueue(PG *pg)
-{
-  osd->pg_stat_queue_enqueue(pg);
-}
-
-void OSDService::pg_stat_queue_dequeue(PG *pg)
-{
-  osd->pg_stat_queue_dequeue(pg);
-}
-
 void OSDService::start_shutdown()
 {
   {
@@ -1943,13 +1933,9 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   pg_map_lock("OSD::pg_map_lock"),
   last_pg_create_epoch(0),
   mon_report_lock("OSD::mon_report_lock"),
-  stats_ack_timeout(cct->_conf->osd_mon_ack_timeout),
   up_thru_wanted(0),
   requested_full_first(0),
   requested_full_last(0),
-  pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
-  osd_stat_updated(false),
-  pg_stat_tid(0), pg_stat_tid_flushed(0),
   command_wq(
     this,
     cct->_conf->osd_command_thread_timeout,
@@ -2553,13 +2539,10 @@ int OSD::init()
   if (r < 0)
     goto out;
 
-  /**
-   * FIXME: this is a placeholder implementation that unconditionally
-   * sends every is_primary PG's stats every time we're called, unlike
-   * the existing mon PGStats mechanism that uses pg_stat_queue and acks.
-   * This has equivalent cost to the existing worst case where all
-   * PGs are busy and their stats are always enqueued for sending.
-   */
+  // This implementation unconditionally sends every is_primary PG's
+  // stats every time we're called.  This has equivalent cost to the
+  // previous implementation's worst case where all PGs are busy and
+  // their stats are always enqueued for sending.
   mgrc.set_pgstats_cb([this](){
       RWLock::RLocker l(map_lock);
       
@@ -3261,7 +3244,6 @@ int OSD::shutdown()
       p->second->osr->flush();
     }
   }
-  clear_pg_stat_queue();
 
   // drain op queue again (in case PGs requeued something)
   op_shardedwq.drain();
@@ -3362,11 +3344,6 @@ int OSD::shutdown()
   }
 
 
-  {
-    Mutex::Locker l(pg_stat_queue_lock);
-    assert(pg_stat_queue.empty());
-  }
-
   service.shutdown_reserver();
 
   // Remove PGs
@@ -5104,60 +5081,12 @@ void OSD::tick_without_osd_lock()
     Mutex::Locker l(mon_report_lock);
 
     // mon report?
-    bool reset = false;
-    bool report = false;
     utime_t now = ceph_clock_now();
-    pg_stat_queue_lock.Lock();
-    double backoff = stats_ack_timeout / cct->_conf->osd_mon_ack_timeout;
-    double adjusted_min = cct->_conf->osd_mon_report_interval_min * backoff;
-    // note: we shouldn't adjust max because it must remain < the
-    // mon's mon_osd_report_timeout (which defaults to 1.5x our
-    // value).
-    double max = cct->_conf->osd_mon_report_interval_max;
-    if (!outstanding_pg_stats.empty() &&
-       (now - stats_ack_timeout) > last_pg_stats_ack) {
-      dout(1) << __func__ << " mon hasn't acked PGStats in "
-             << now - last_pg_stats_ack
-             << " seconds, reconnecting elsewhere" << dendl;
-      reset = true;
-      last_pg_stats_ack = now;  // reset clock
-      last_pg_stats_sent = utime_t();
-      stats_ack_timeout =
-       MAX(cct->_conf->osd_mon_ack_timeout,
-           stats_ack_timeout * cct->_conf->osd_stats_ack_timeout_factor);
-      outstanding_pg_stats.clear();
-    }
-    if (now - last_pg_stats_sent > max) {
-      osd_stat_updated = true;
-      report = true;
-    } else if (service.need_fullness_update()) {
-      report = true;
-    } else if ((int)outstanding_pg_stats.size() >=
-              cct->_conf->osd_mon_report_max_in_flight) {
-      dout(20) << __func__ << " have max " << outstanding_pg_stats
-              << " stats updates in flight" << dendl;
-    } else {
-      if (now - last_mon_report > adjusted_min) {
-       dout(20) << __func__ << " stats backoff " << backoff
-                << " adjusted_min " << adjusted_min << " - sending report"
-                << dendl;
-        osd_stat_updated = true;
-       report = true;
-      }
-    }
-    pg_stat_queue_lock.Unlock();
-
-    if (reset) {
-      monc->reopen_session();
-    } else if (report) {
+    if (service.need_fullness_update() ||
+       now - last_mon_report > cct->_conf->osd_mon_report_interval_min) {
       last_mon_report = now;
-
-      // do any pending reports
       send_full_update();
       send_failures();
-      if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
-       send_pg_stats(now);
-      }
     }
     map_lock.put_read();
   }
@@ -5561,9 +5490,6 @@ void OSD::ms_handle_connect(Connection *con)
       service.send_pg_temp();
       requeue_failures();
       send_failures();
-      if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
-       send_pg_stats(now);
-      }
 
       map_lock.put_read();
       if (is_active()) {
@@ -6013,162 +5939,6 @@ void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i)
   monc->send_mon_message(m);
 }
 
-void OSD::send_pg_stats(const utime_t &now)
-{
-  assert(map_lock.is_locked());
-  assert(osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS);
-  dout(20) << "send_pg_stats" << dendl;
-
-  osd_stat_t cur_stat = service.get_osd_stat();
-
-  cur_stat.os_perf_stat = store->get_cur_stats();
-
-  pg_stat_queue_lock.Lock();
-
-  if (osd_stat_updated || !pg_stat_queue.empty()) {
-    last_pg_stats_sent = now;
-    osd_stat_updated = false;
-
-    dout(10) << "send_pg_stats - " << pg_stat_queue.size() << " pgs updated" << dendl;
-
-    utime_t had_for(now);
-    had_for -= had_map_since;
-
-    MPGStats *m = new MPGStats(monc->get_fsid(), osdmap->get_epoch(), had_for);
-
-    uint64_t tid = ++pg_stat_tid;
-    m->set_tid(tid);
-    m->osd_stat = cur_stat;
-
-    xlist<PG*>::iterator p = pg_stat_queue.begin();
-    while (!p.end()) {
-      PG *pg = *p;
-      ++p;
-      if (!pg->is_primary()) {  // we hold map_lock; role is stable.
-       pg->stat_queue_item.remove_myself();
-       pg->put("pg_stat_queue");
-       continue;
-      }
-      pg->pg_stats_publish_lock.Lock();
-      if (pg->pg_stats_publish_valid) {
-       m->pg_stat[pg->info.pgid.pgid] = pg->pg_stats_publish;
-       dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":"
-                << pg->pg_stats_publish.reported_seq << dendl;
-      } else {
-       dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":"
-                << pg->pg_stats_publish.reported_seq << ", not valid" << dendl;
-      }
-      pg->pg_stats_publish_lock.Unlock();
-    }
-
-    if (last_pg_stats_ack == utime_t() || !outstanding_pg_stats.empty()) {
-      last_pg_stats_ack = ceph_clock_now();
-    }
-    outstanding_pg_stats.insert(tid);
-    dout(20) << __func__ << "  updates pending: " << outstanding_pg_stats << dendl;
-
-    monc->send_mon_message(m);
-  }
-
-  pg_stat_queue_lock.Unlock();
-}
-
-void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
-{
-  dout(10) << "handle_pg_stats_ack " << dendl;
-
-  if (!require_mon_peer(ack)) {
-    ack->put();
-    return;
-  }
-
-  // NOTE: we may get replies from a previous mon even while
-  // outstanding_pg_stats is empty if reconnecting races with replies
-  // in flight.
-
-  pg_stat_queue_lock.Lock();
-
-  last_pg_stats_ack = ceph_clock_now();
-
-  // decay timeout slowly (analogous to TCP)
-  stats_ack_timeout =
-    MAX(cct->_conf->osd_mon_ack_timeout,
-       stats_ack_timeout * cct->_conf->osd_stats_ack_timeout_decay);
-  dout(20) << __func__ << "  timeout now " << stats_ack_timeout << dendl;
-
-  const uint64_t ack_tid = ack->get_tid();
-  if (ack_tid > pg_stat_tid_flushed) {
-    pg_stat_tid_flushed = ack_tid;
-    pg_stat_queue_cond.Signal();
-  }
-
-  xlist<PG*>::iterator p = pg_stat_queue.begin();
-  while (!p.end()) {
-    PG *pg = *p;
-    PGRef _pg(pg);
-    ++p;
-
-    auto acked = ack->pg_stat.find(pg->info.pgid.pgid);
-    if (acked != ack->pg_stat.end()) {
-      pg->pg_stats_publish_lock.Lock();
-      if (acked->second.first == pg->pg_stats_publish.reported_seq &&
-         acked->second.second == pg->pg_stats_publish.reported_epoch) {
-       dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
-                << ":" << pg->pg_stats_publish.reported_seq << dendl;
-       pg->stat_queue_item.remove_myself();
-       pg->put("pg_stat_queue");
-      } else {
-       dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
-                << ":" << pg->pg_stats_publish.reported_seq << " > acked "
-                << acked->second << dendl;
-      }
-      pg->pg_stats_publish_lock.Unlock();
-    } else {
-      dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
-              << ":" << pg->pg_stats_publish.reported_seq << dendl;
-    }
-  }
-
-  // if there are earlier pg-stats not yet acked, 
-  // this happens if they are not sent successfully.
-  for (auto tid = outstanding_pg_stats.cbegin();
-        tid != outstanding_pg_stats.cend(); ) {
-    if (*tid <= ack_tid) {
-      tid = outstanding_pg_stats.erase(tid);
-    } else {
-      break;
-    }
-  }
-  dout(20) << __func__ << "  still pending: " << outstanding_pg_stats << dendl;
-
-  pg_stat_queue_lock.Unlock();
-
-  ack->put();
-}
-
-void OSD::flush_pg_stats()
-{
-  dout(10) << "flush_pg_stats" << dendl;
-  osd_lock.Unlock();
-  utime_t now = ceph_clock_now();
-  map_lock.get_read();
-  mon_report_lock.Lock();
-  send_pg_stats(now);
-  mon_report_lock.Unlock();
-  map_lock.put_read();
-
-
-  pg_stat_queue_lock.Lock();
-  uint64_t tid = pg_stat_tid;
-  dout(10) << "flush_pg_stats waiting for stats tid " << tid << " to flush" << dendl;
-  while (tid > pg_stat_tid_flushed)
-    pg_stat_queue_cond.Wait(pg_stat_queue_lock);
-  dout(10) << "flush_pg_stats finished waiting for stats tid " << tid << " to flush" << dendl;
-  pg_stat_queue_lock.Unlock();
-
-  osd_lock.Lock();
-}
-
 void OSD::send_beacon(const ceph::coarse_mono_clock::time_point& now)
 {
   const auto& monmap = monc->monmap;
@@ -6647,12 +6417,8 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
   }
 
   else if (prefix == "flush_pg_stats") {
-    if (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      mgrc.send_pgstats();
-      ds << service.get_osd_stat_seq() << "\n";
-    } else {
-      flush_pg_stats();
-    }
+    mgrc.send_pgstats();
+    ds << service.get_osd_stat_seq() << "\n";
   }
 
   else if (prefix == "heap") {
@@ -7136,10 +6902,6 @@ void OSD::_dispatch(Message *m)
     break;
 
     // osd
-  case MSG_PGSTATSACK:
-    handle_pg_stats_ack(static_cast<MPGStatsAck*>(m));
-    break;
-
   case MSG_MON_COMMAND:
     handle_command(static_cast<MMonCommand*>(m));
     break;
@@ -7791,13 +7553,6 @@ void OSD::_committed_osd_maps(epoch_t first, epoch_t last, MOSDMap *m)
        do_restart = true;
       }
     }
-    if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS &&
-       newmap->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
-      dout(10) << __func__ << " require_osd_release reached luminous in "
-              << newmap->get_epoch() << dendl;
-      clear_pg_stat_queue();
-      clear_outstanding_pg_stats();
-    }
 
     osdmap = newmap;
     epoch_t up_epoch;
index c8deea53090da738e8b2a57d9282a1388aba0d9c..9a1cbb671b537ae27443226e9b1336b98290be9e 100644 (file)
@@ -1006,9 +1006,6 @@ public:
 
   void need_heartbeat_peer_update();
 
-  void pg_stat_queue_enqueue(PG *pg);
-  void pg_stat_queue_dequeue(PG *pg);
-
   void init();
   void final_init();  
   void start_shutdown();
@@ -2029,17 +2026,6 @@ protected:
   // == monitor interaction ==
   Mutex mon_report_lock;
   utime_t last_mon_report;
-  utime_t last_pg_stats_sent;
-
-  /* if our monitor dies, we want to notice it and reconnect.
-   *  So we keep track of when it last acked our stat updates,
-   *  and if too much time passes (and we've been sending
-   *  more updates) then we can call it dead and reconnect
-   *  elsewhere.
-   */
-  utime_t last_pg_stats_ack;
-  float stats_ack_timeout;
-  set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
 
   // -- boot --
   void start_boot();
@@ -2082,17 +2068,6 @@ protected:
   void send_failures();
   void send_still_alive(epoch_t epoch, const entity_inst_t &i);
 
-  // -- pg stats --
-  Mutex pg_stat_queue_lock;
-  Cond pg_stat_queue_cond;
-  xlist<PG*> pg_stat_queue;
-  bool osd_stat_updated;
-  uint64_t pg_stat_tid, pg_stat_tid_flushed;
-
-  void send_pg_stats(const utime_t &now);
-  void handle_pg_stats_ack(class MPGStatsAck *ack);
-  void flush_pg_stats();
-
   ceph::coarse_mono_clock::time_point last_sent_beacon;
   Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
   epoch_t min_last_epoch_clean = 0;
@@ -2100,35 +2075,6 @@ protected:
   std::vector<pg_t> min_last_epoch_clean_pgs;
   void send_beacon(const ceph::coarse_mono_clock::time_point& now);
 
-  void pg_stat_queue_enqueue(PG *pg) {
-    pg_stat_queue_lock.Lock();
-    if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
-      pg->get("pg_stat_queue");
-      pg_stat_queue.push_back(&pg->stat_queue_item);
-    }
-    osd_stat_updated = true;
-    pg_stat_queue_lock.Unlock();
-  }
-  void pg_stat_queue_dequeue(PG *pg) {
-    pg_stat_queue_lock.Lock();
-    if (pg->stat_queue_item.remove_myself())
-      pg->put("pg_stat_queue");
-    pg_stat_queue_lock.Unlock();
-  }
-  void clear_pg_stat_queue() {
-    pg_stat_queue_lock.Lock();
-    while (!pg_stat_queue.empty()) {
-      PG *pg = pg_stat_queue.front();
-      pg_stat_queue.pop_front();
-      pg->put("pg_stat_queue");
-    }
-    pg_stat_queue_lock.Unlock();
-  }
-  void clear_outstanding_pg_stats(){
-    Mutex::Locker l(pg_stat_queue_lock);
-    outstanding_pg_stats.clear();
-  }
-
   ceph_tid_t get_tid() {
     return service.get_tid();
   }
index b9f62f415d033be5305195bf4787f9e0c22fedc0..0b5839a0732245afd2dd8fcca0444185308ee78f 100644 (file)
@@ -2715,7 +2715,6 @@ void PG::publish_stats_to_osd()
   _update_calc_stats();
   _update_blocked_by();
 
-  bool publish = false;
   pg_stat_t pre_publish = info.stats;
   pre_publish.stats.add(unstable_stats);
   utime_t cutoff = now;
@@ -2743,10 +2742,6 @@ void PG::publish_stats_to_osd()
     if ((info.stats.state & PG_STATE_UNDERSIZED) == 0)
       info.stats.last_fullsized = now;
 
-    // do not send pgstat to mon anymore once we are luminous, since mgr takes
-    // care of this by sending MMonMgrReport to mon.
-    publish =
-      osd->osd->get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
     pg_stats_publish_valid = true;
     pg_stats_publish = pre_publish;
 
@@ -2754,9 +2749,6 @@ void PG::publish_stats_to_osd()
             << ":" << pg_stats_publish.reported_seq << dendl;
   }
   pg_stats_publish_lock.Unlock();
-
-  if (publish)
-    osd->pg_stat_queue_enqueue(this);
 }
 
 void PG::clear_publish_stats()
@@ -2765,8 +2757,6 @@ void PG::clear_publish_stats()
   pg_stats_publish_lock.Lock();
   pg_stats_publish_valid = false;
   pg_stats_publish_lock.Unlock();
-
-  osd->pg_stat_queue_dequeue(this);
 }
 
 /**
index 5b93ff9826ce93eba412d045ae88f57b5016e71d..1e0cf132e50a700a39f14154babb62d0ec5e8a5b 100644 (file)
@@ -10833,7 +10833,6 @@ void PrimaryLogPG::on_shutdown()
   dout(10) << "on_shutdown" << dendl;
 
   // remove from queues
-  osd->pg_stat_queue_dequeue(this);
   osd->peering_wq.dequeue(this);
 
   // handles queue races