#define dout_subsys ceph_subsys_mon
 #define OSD_PG_CREATING_PREFIX "osd_pg_creating"
 
+void LastEpochClean::Lec::report(ps_t ps, epoch_t last_epoch_clean)
+{
+  if (epoch_by_pg.size() <= ps) {
+    epoch_by_pg.resize(ps + 1, 0);
+  }
+  if (epoch_by_pg[ps] >= last_epoch_clean) {
+    // stale lec
+    return;
+  }
+  epoch_by_pg[ps] = last_epoch_clean;
+  if (last_epoch_clean < floor) {
+    floor = last_epoch_clean;
+  }
+  if (ps != next_missing) {
+    return;
+  }
+  for (; next_missing < epoch_by_pg.size(); next_missing++) {
+    if (epoch_by_pg[next_missing] == 0) {
+      break;
+    }
+  }
+}
+
+void LastEpochClean::remove_pool(uint64_t pool)
+{
+  report_by_pool.erase(pool);
+}
+
+void LastEpochClean::report(const pg_t& pg, epoch_t last_epoch_clean)
+{
+  auto& lec = report_by_pool[pg.pool()];
+  return lec.report(pg.ps(), last_epoch_clean);
+}
+
+epoch_t LastEpochClean::get_lower_bound(const OSDMap& latest) const
+{
+  auto floor = latest.get_epoch();
+  for (auto& pool : latest.get_pools()) {
+    auto reported = report_by_pool.find(pool.first);
+    if (reported == report_by_pool.end()) {
+      return 0;
+    }
+    if (reported->second.next_missing < pool.second.get_pg_num()) {
+      return 0;
+    }
+    if (reported->second.floor < floor) {
+      floor = reported->second.floor;
+    }
+  }
+  return floor;
+}
+
+
 struct C_UpdateCreatingPGs : public Context {
   OSDMonitor *osdmon;
   utime_t start;
     }
     if (mon->monmap->get_required_features().contains_all(
           ceph::features::mon::FEATURE_LUMINOUS)) {
-      creating_pgs = update_pending_creatings(inc);
+      creating_pgs = update_pending_pgs(inc);
       for (const auto &osd_state : inc.new_state) {
        if (osd_state.second & CEPH_OSD_UP) {
          // could be marked up *or* down, but we're too lazy to check which
          last_osd_report.erase(osd_state.first);
        }
+       if (osd_state.second & CEPH_OSD_EXISTS) {
+         // could be created *or* destroyed, but we can safely drop it
+         osd_epochs.erase(osd_state.first);
+       }
       }
     }
   }
 }
 
 creating_pgs_t
-OSDMonitor::update_pending_creatings(const OSDMap::Incremental& inc)
+OSDMonitor::update_pending_pgs(const OSDMap::Incremental& inc)
 {
   creating_pgs_t pending_creatings;
   {
     auto last =
       pending_creatings.pgs.lower_bound(pg_t{0, removed_pool + 1});
     pending_creatings.pgs.erase(first, last);
+    last_epoch_clean.remove_pool(removed_pool);
   }
   scan_for_creating_pgs(osdmap.get_pools(),
                        inc.old_pools,
   // and pg creating, also!
   if (mon->monmap->get_required_features().contains_all(
        ceph::features::mon::FEATURE_LUMINOUS)) {
-    auto pending_creatings = update_pending_creatings(pending_inc);
+    auto pending_creatings = update_pending_pgs(pending_inc);
     if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
       dout(7) << __func__ << " in the middle of upgrading, "
              << " trimming pending creating_pgs using pgmap" << dendl;
 
 version_t OSDMonitor::get_trim_to()
 {
+  epoch_t floor;
+
   if (mon->monmap->get_required_features().contains_all(
         ceph::features::mon::FEATURE_LUMINOUS)) {
-    std::lock_guard<std::mutex> l(creating_pgs_lock);
-    if (!creating_pgs.pgs.empty()) {
-      return 0;
-    }
-    if (!mon->pgmon()->is_readable()) {
-      return 0;
+    {
+      std::lock_guard<std::mutex> l(creating_pgs_lock);
+      if (!creating_pgs.pgs.empty()) {
+       return 0;
+      }
     }
+    floor = get_min_last_epoch_clean();
   } else {
     if (!mon->pgmon()->is_readable())
       return 0;
     if (mon->pgmon()->pg_map.creating_pgs.empty()) {
       return 0;
     }
+    floor = mon->pgmon()->pg_map.get_min_last_epoch_clean();
   }
   {
-    epoch_t floor = mon->pgmon()->pg_map.get_min_last_epoch_clean();
     dout(10) << " min_last_epoch_clean " << floor << dendl;
     if (g_conf->mon_osd_force_trim_to > 0 &&
        g_conf->mon_osd_force_trim_to < (int)get_last_committed()) {
   return 0;
 }
 
+epoch_t OSDMonitor::get_min_last_epoch_clean() const
+{
+  auto floor = last_epoch_clean.get_lower_bound(osdmap);
+  // also scan osd epochs
+  // don't trim past the oldest reported osd epoch
+  for (auto& osd_epoch : osd_epochs) {
+    if (osd_epoch.second < floor) {
+      floor = osd_epoch.second;
+    }
+  }
+  return floor;
+}
+
 void OSDMonitor::encode_trim_extra(MonitorDBStore::TransactionRef tx,
                                   version_t first)
 {
   }
 
   last_osd_report[from] = ceph_clock_now();
+  osd_epochs[from] = beacon->version;
+
+  for (const auto& pg : beacon->pgs) {
+    last_epoch_clean.report(pg, beacon->min_last_epoch_clean);
+  }
   return false;
 }
 
 
   }
 };
 
+
+class LastEpochClean {
+  struct Lec {
+    vector<epoch_t> epoch_by_pg;
+    ps_t next_missing = 0;
+    epoch_t floor = std::numeric_limits<epoch_t>::max();
+    void report(ps_t pg, epoch_t last_epoch_clean);
+  };
+  std::map<uint64_t, Lec> report_by_pool;
+public:
+  void report(const pg_t& pg, epoch_t last_epoch_clean);
+  void remove_pool(uint64_t pool);
+  epoch_t get_lower_bound(const OSDMap& latest) const;
+};
+
+
 class OSDMonitor : public PaxosService {
   CephContext *cct;
 
 
   // when we last received PG stats from each osd
   map<int,utime_t> last_osd_report;
+  // TODO: use last_osd_report to store the osd report epochs, once we don't
+  //       need to upgrade from pre-luminous releases.
+  map<int,epoch_t> osd_epochs;
+  LastEpochClean last_epoch_clean;
   bool preprocess_beacon(MonOpRequestRef op);
   bool prepare_beacon(MonOpRequestRef op);
+  epoch_t get_min_last_epoch_clean() const;
 
   friend class C_UpdateCreatingPGs;
   std::map<int, std::map<epoch_t, std::set<pg_t>>> creating_pgs_by_osd_epoch;
   creating_pgs_t creating_pgs;
   std::mutex creating_pgs_lock;
 
-  creating_pgs_t update_pending_creatings(const OSDMap::Incremental& inc);
+  creating_pgs_t update_pending_pgs(const OSDMap::Incremental& inc);
   void trim_creating_pgs(creating_pgs_t *creating_pgs, const PGMap& pgm);
   void scan_for_creating_pgs(const std::map<int64_t,pg_pool_t>& pools,
                             const std::set<int64_t>& removed_pools,