cct(cct),
    inc_osd_cache(g_conf->mon_osd_cache_size),
    full_osd_cache(g_conf->mon_osd_cache_size),
+   has_osdmap_manifest(false),
    last_attempted_minwait_time(utime_t()),
    mapper(mn->cct, &mn->cpu_tp)
 {}
 
 void OSDMonitor::update_from_paxos(bool *need_bootstrap)
 {
+  // we really don't care if the version has been updated, because we may
+  // have trimmed without having increased the last committed; yet, we may
+  // need to update the in-memory manifest.
+  load_osdmap_manifest();
+
   version_t version = get_last_committed();
   if (version == osdmap.epoch)
     return;
   dout(10) << "encode_pending e " << pending_inc.epoch
           << dendl;
 
+  if (do_prune(t)) {
+    dout(1) << __func__ << " osdmap full prune encoded e"
+            << pending_inc.epoch << dendl;
+  }
+
   // finalize up pending_inc
   pending_inc.modified = ceph_clock_now();
 
   bufferlist bl;
   get_version_full(first, bl);
   put_version_full(tx, first, bl);
+
+  if (has_osdmap_manifest &&
+      first > osdmap_manifest.get_first_pinned()) {
+    _prune_update_trimmed(tx, first);
+  }
 }
 
+
+/* full osdmap prune
+ *
+ * for more information, please refer to doc/dev/mon-osdmap-prune.rst
+ */
+
+void OSDMonitor::load_osdmap_manifest()
+{
+  bool store_has_manifest =
+    mon->store->exists(get_service_name(), "osdmap_manifest");
+
+  if (!store_has_manifest) {
+    if (!has_osdmap_manifest) {
+      return;
+    }
+
+    dout(20) << __func__
+             << " dropping osdmap manifest from memory." << dendl;
+    osdmap_manifest = osdmap_manifest_t();
+    has_osdmap_manifest = false;
+    return;
+  }
+
+  dout(20) << __func__
+           << " osdmap manifest detected in store; reload." << dendl;
+
+  bufferlist manifest_bl;
+  int r = get_value("osdmap_manifest", manifest_bl);
+  if (r < 0) {
+    derr << __func__ << " unable to read osdmap version manifest" << dendl;
+    ceph_assert(0 == "error reading manifest");
+  }
+  osdmap_manifest.decode(manifest_bl);
+  has_osdmap_manifest = true;
+
+  dout(10) << __func__ << " store osdmap manifest pinned ("
+           << osdmap_manifest.get_first_pinned()
+           << " .. "
+           << osdmap_manifest.get_last_pinned()
+           << ")"
+           << dendl;
+}
+
+bool OSDMonitor::should_prune() const
+{
+  version_t first = get_first_committed();
+  version_t last = get_last_committed();
+  version_t min_osdmap_epochs =
+    g_conf->get_val<int64_t>("mon_min_osdmap_epochs");
+  version_t prune_min =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_min");
+  version_t prune_interval =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_interval");
+  version_t last_pinned = osdmap_manifest.get_last_pinned();
+  version_t last_to_pin = last - min_osdmap_epochs;
+
+  // Make it or break it constraints.
+  //
+  // If any of these conditions fails, we will not prune, regardless of
+  // whether we have an on-disk manifest with an on-going pruning state.
+  //
+  if ((last - first) <= min_osdmap_epochs) {
+    // between the first and last committed epochs, we don't have
+    // enough epochs to trim, much less to prune.
+    dout(10) << __func__
+             << " currently holding only " << (last - first)
+             << " epochs (min osdmap epochs: " << min_osdmap_epochs
+             << "); do not prune."
+             << dendl;
+    return false;
+
+  } else if ((last_to_pin - first) < prune_min) {
+    // between the first committed epoch and the last epoch we would prune,
+    // we simply don't have enough versions over the minimum to prune maps.
+    dout(10) << __func__
+             << " could only prune " << (last_to_pin - first)
+             << " epochs (" << first << ".." << last_to_pin << "), which"
+                " is less than the required minimum (" << prune_min << ")"
+             << dendl;
+    return false;
+
+  } else if (has_osdmap_manifest && last_pinned >= last_to_pin) {
+    dout(10) << __func__
+             << " we have pruned as far as we can; do not prune."
+             << dendl;
+    return false;
+
+  } else if (last_pinned + prune_interval > last_to_pin) {
+    dout(10) << __func__
+             << " not enough epochs to form an interval (last pinned: "
+             << last_pinned << ", last to pin: "
+             << last_to_pin << ", interval: " << prune_interval << ")"
+             << dendl;
+    return false;
+  }
+
+  dout(15) << __func__
+           << " should prune (" << last_pinned << ".." << last_to_pin << ")"
+           << " lc (" << first << ".." << last << ")"
+           << dendl;
+  return true;
+}
+
+void OSDMonitor::_prune_update_trimmed(
+    MonitorDBStore::TransactionRef tx,
+    version_t first)
+{
+  dout(10) << __func__
+           << " first " << first
+           << " last_pinned " << osdmap_manifest.get_last_pinned()
+           << " last_pinned " << osdmap_manifest.get_last_pinned()
+           << dendl;
+
+  if (!osdmap_manifest.is_pinned(first)) {
+    osdmap_manifest.pin(first);
+  }
+
+  set<version_t>::iterator p_end = osdmap_manifest.pinned.find(first);
+  set<version_t>::iterator p = osdmap_manifest.pinned.begin();
+  osdmap_manifest.pinned.erase(p, p_end);
+  ceph_assert(osdmap_manifest.get_first_pinned() == first);
+
+  if (osdmap_manifest.get_last_pinned() == first+1 ||
+      osdmap_manifest.pinned.size() == 1) {
+    // we reached the end of the line, as pinned maps go; clean up our
+    // manifest, and let `should_prune()` decide whether we should prune
+    // again.
+    tx->erase(get_service_name(), "osdmap_manifest");
+    return;
+  }
+
+  bufferlist bl;
+  osdmap_manifest.encode(bl);
+  tx->put(get_service_name(), "osdmap_manifest", bl);
+}
+
+void OSDMonitor::prune_init()
+{
+  dout(1) << __func__ << dendl;
+
+  version_t pin_first;
+
+  if (!has_osdmap_manifest) {
+    // we must have never pruned, OR if we pruned the state must no longer
+    // be relevant (i.e., the state must have been removed alongside with
+    // the trim that *must* have removed past the last pinned map in a
+    // previous prune).
+    ceph_assert(osdmap_manifest.pinned.empty());
+    ceph_assert(!mon->store->exists(get_service_name(), "osdmap_manifest"));
+    pin_first = get_first_committed();
+
+  } else {
+    // we must have pruned in the past AND its state is still relevant
+    // (i.e., even if we trimmed, we still hold pinned maps in the manifest,
+    // and thus we still hold a manifest in the store).
+    ceph_assert(!osdmap_manifest.pinned.empty());
+    ceph_assert(osdmap_manifest.get_first_pinned() == get_first_committed());
+    ceph_assert(osdmap_manifest.get_last_pinned() < get_last_committed());
+
+    dout(10) << __func__
+             << " first_pinned " << osdmap_manifest.get_first_pinned()
+             << " last_pinned " << osdmap_manifest.get_last_pinned()
+             << dendl;
+
+    pin_first = osdmap_manifest.get_last_pinned();
+  }
+
+  osdmap_manifest.pin(pin_first);
+}
+
+bool OSDMonitor::_prune_sanitize_options() const
+{
+  uint64_t prune_interval =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_interval");
+  uint64_t prune_min =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_min");
+  uint64_t txsize =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_txsize");
+
+  bool r = true;
+
+  if (prune_interval == 0) {
+    derr << __func__
+         << " prune is enabled BUT prune interval is zero; abort."
+         << dendl;
+    r = false;
+  } else if (prune_interval == 1) {
+    derr << __func__
+         << " prune interval is equal to one, which essentially means"
+            " no pruning; abort."
+         << dendl;
+    r = false;
+  }
+  if (prune_min == 0) {
+    derr << __func__
+         << " prune is enabled BUT prune min is zero; abort."
+         << dendl;
+    r = false;
+  }
+  if (prune_interval > prune_min) {
+    derr << __func__
+         << " impossible to ascertain proper prune interval because"
+         << " it is greater than the minimum prune epochs"
+         << " (min: " << prune_min << ", interval: " << prune_interval << ")"
+         << dendl;
+    r = false;
+  }
+
+  if (txsize <= prune_interval) {
+    derr << __func__
+         << "'mon_osdmap_full_prune_txsize' (" << txsize
+         << ") <= 'mon_osdmap_full_prune_interval' (" << prune_interval
+         << "); abort." << dendl;
+    r = false;
+  }
+  return r;
+}
+
+bool OSDMonitor::is_prune_enabled() const {
+  return g_conf->get_val<bool>("mon_osdmap_full_prune_enabled");
+}
+
+bool OSDMonitor::is_prune_supported() const {
+  return mon->get_required_mon_features().contains_any(
+      ceph::features::mon::FEATURE_OSDMAP_PRUNE);
+}
+
+/** do_prune
+ *
+ * @returns true if has side-effects; false otherwise.
+ */
+bool OSDMonitor::do_prune(MonitorDBStore::TransactionRef tx)
+{
+  bool enabled = is_prune_enabled();
+
+  dout(1) << __func__ << " osdmap full prune "
+          << ( enabled ? "enabled" : "disabled")
+          << dendl;
+
+  if (!enabled || !_prune_sanitize_options() || !should_prune()) {
+    return false;
+  }
+
+  // we are beyond the minimum prune versions, we need to remove maps because
+  // otherwise the store will grow unbounded and we may end up having issues
+  // with available disk space or store hangs.
+
+  // we will not pin all versions. We will leave a buffer number of versions.
+  // this allows us the monitor to trim maps without caring too much about
+  // pinned maps, and then allow us to use another ceph-mon without these
+  // capabilities, without having to repair the store.
+
+  version_t first = get_first_committed();
+  version_t last = get_last_committed();
+
+  version_t last_to_pin = last - g_conf->mon_min_osdmap_epochs;
+  version_t last_pinned = osdmap_manifest.get_last_pinned();
+  uint64_t prune_interval =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_interval");
+  uint64_t txsize =
+    g_conf->get_val<uint64_t>("mon_osdmap_full_prune_txsize");
+
+  prune_init();
+
+  // we need to get rid of some osdmaps
+
+  dout(5) << __func__
+          << " lc (" << first << " .. " << last << ")"
+          << " last_pinned " << last_pinned
+          << " interval " << prune_interval
+          << " last_to_pin " << last_to_pin
+          << dendl;
+
+  // We will be erasing maps as we go.
+  //
+  // We will erase all maps between `last_pinned` and the `next_to_pin`.
+  //
+  // If `next_to_pin` happens to be greater than `last_to_pin`, then
+  // we stop pruning. We could prune the maps between `next_to_pin` and
+  // `last_to_pin`, but by not doing it we end up with neater pruned
+  // intervals, aligned with `prune_interval`. Besides, this should not be a
+  // problem as long as `prune_interval` is set to a sane value, instead of
+  // hundreds or thousands of maps.
+
+  auto map_exists = [this](version_t v) {
+    string k = mon->store->combine_strings("full", v);
+    return mon->store->exists(get_service_name(), k);
+  };
+
+  // 'interval' represents the number of maps from the last pinned
+  // i.e., if we pinned version 1 and have an interval of 10, we're pinning
+  // version 11 next; all intermediate versions will be removed.
+  //
+  // 'txsize' represents the maximum number of versions we'll be removing in
+  // this iteration. If 'txsize' is large enough to perform multiple passes
+  // pinning and removing maps, we will do so; if not, we'll do at least one
+  // pass. We are quite relaxed about honouring 'txsize', but we'll always
+  // ensure that we never go *over* the maximum.
+
+  // e.g., if we pin 1 and 11, we're removing versions [2..10]; i.e., 9 maps.
+  uint64_t removal_interval = prune_interval - 1;
+
+  if (txsize < removal_interval) {
+    dout(5) << __func__
+           << " setting txsize to removal interval size ("
+           << removal_interval << " versions"
+           << dendl;
+    txsize = removal_interval;
+  }
+  ceph_assert(removal_interval > 0);
+
+  uint64_t num_pruned = 0;
+  while (num_pruned + removal_interval <= txsize) { 
+    last_pinned = osdmap_manifest.get_last_pinned();
+
+    if (last_pinned + prune_interval > last_to_pin) {
+      break;
+    }
+    ceph_assert(last_pinned < last_to_pin);
+
+    version_t next_pinned = last_pinned + prune_interval;
+    ceph_assert(next_pinned <= last_to_pin);
+    osdmap_manifest.pin(next_pinned);
+
+    dout(20) << __func__
+            << " last_pinned " << last_pinned
+            << " next_pinned " << next_pinned
+            << " num_pruned " << num_pruned
+            << " removal interval (" << (last_pinned+1)
+            << ".." << (next_pinned-1) << ")"
+            << " txsize " << txsize << dendl;
+
+    ceph_assert(map_exists(last_pinned));
+    ceph_assert(map_exists(next_pinned));
+
+    for (version_t v = last_pinned+1; v < next_pinned; ++v) {
+      ceph_assert(!osdmap_manifest.is_pinned(v));
+
+      dout(20) << __func__ << "   pruning full osdmap e" << v << dendl;
+      string full_key = mon->store->combine_strings("full", v);
+      tx->erase(get_service_name(), full_key);
+      ++num_pruned;
+    }
+  }
+
+  ceph_assert(num_pruned > 0);
+
+  bufferlist bl;
+  osdmap_manifest.encode(bl);
+  tx->put(get_service_name(), "osdmap_manifest", bl);
+
+  return true;
+}
+
+
 // -------------
 
 bool OSDMonitor::preprocess_query(MonOpRequestRef op)
     return ret;
 }
 
+int OSDMonitor::get_inc(version_t ver, OSDMap::Incremental& inc)
+{
+  bufferlist inc_bl;
+  int err = get_version(ver, inc_bl);
+  ceph_assert(err == 0);
+  ceph_assert(inc_bl.length());
+
+  bufferlist::iterator p = inc_bl.begin();
+  inc.decode(p);
+  dout(10) << __func__ << "     "
+           << " epoch " << inc.epoch
+           << " inc_crc " << inc.inc_crc
+           << " full_crc " << inc.full_crc
+           << " encode_features " << inc.encode_features << dendl;
+  return 0;
+}
+
+int OSDMonitor::get_full_from_pinned_map(version_t ver, bufferlist& bl)
+{
+  dout(10) << __func__ << " ver " << ver << dendl;
+
+  version_t closest_pinned = osdmap_manifest.get_lower_closest_pinned(ver);
+  if (closest_pinned == 0) {
+    return -ENOENT;
+  }
+  if (closest_pinned > ver) {
+    dout(0) << __func__ << " pinned: " << osdmap_manifest.pinned << dendl;
+  }
+  ceph_assert(closest_pinned <= ver);
+
+  dout(10) << __func__ << " closest pinned ver " << closest_pinned << dendl;
+
+  // get osdmap incremental maps and apply on top of this one.
+  bufferlist osdm_bl;
+  bool has_cached_osdmap = false;
+  for (version_t v = ver-1; v >= closest_pinned; --v) {
+    if (full_osd_cache.lookup(v, &osdm_bl)) {
+      dout(10) << __func__ << " found map in cache ver " << v << dendl;
+      closest_pinned = v;
+      has_cached_osdmap = true;
+      break;
+    }
+  }
+
+  if (!has_cached_osdmap) {
+    int err = PaxosService::get_version_full(closest_pinned, osdm_bl);
+    if (err != 0) {
+      derr << __func__ << " closest pinned map ver " << closest_pinned
+           << " not available! error: " << cpp_strerror(err) << dendl;
+    }
+    ceph_assert(err == 0);
+  }
+
+  ceph_assert(osdm_bl.length());
+
+  OSDMap osdm;
+  osdm.decode(osdm_bl);
+
+  dout(10) << __func__ << " loaded osdmap epoch " << closest_pinned
+           << " e" << osdm.epoch
+           << " crc " << osdm.get_crc()
+           << " -- applying incremental maps." << dendl;
+
+  uint64_t encode_features = 0;
+  for (version_t v = closest_pinned + 1; v <= ver; ++v) {
+    dout(20) << __func__ << "    applying inc epoch " << v << dendl;
+
+    OSDMap::Incremental inc;
+    int err = get_inc(v, inc);
+    ceph_assert(err == 0);
+
+    encode_features = inc.encode_features;
+
+    err = osdm.apply_incremental(inc);
+    ceph_assert(err == 0);
+
+    // this block performs paranoid checks on map retrieval
+    if (g_conf->get_val<bool>("mon_debug_extra_checks") &&
+        inc.full_crc != 0) {
+
+      uint64_t f = encode_features;
+      if (!f) {
+        f = (mon->quorum_con_features ? mon->quorum_con_features : -1);
+      }
+
+      // encode osdmap to force calculating crcs
+      bufferlist tbl;
+      osdm.encode(tbl, f | CEPH_FEATURE_RESERVED);
+      // decode osdmap to compare crcs with what's expected by incremental
+      OSDMap tosdm;
+      tosdm.decode(tbl);
+
+      if (tosdm.get_crc() != inc.full_crc) {
+        derr << __func__
+             << "    osdmap crc mismatch! (osdmap crc " << tosdm.get_crc()
+             << ", expected " << inc.full_crc << ")" << dendl;
+        ceph_assert(0 == "osdmap crc mismatch");
+      }
+    }
+
+    // note: we cannot add the recently computed map to the cache, as is,
+    // because we have not encoded the map into a bl.
+  }
+
+  if (!encode_features) {
+    dout(10) << __func__
+             << " last incremental map didn't have features;"
+             << " defaulting to quorum's or all" << dendl;
+    encode_features =
+      (mon->quorum_con_features ? mon->quorum_con_features : -1);
+  }
+  osdm.encode(bl, encode_features | CEPH_FEATURE_RESERVED);
+
+  return 0;
+}
+
 int OSDMonitor::get_version_full(version_t ver, bufferlist& bl)
 {
     if (full_osd_cache.lookup(ver, &bl)) {
       return 0;
     }
     int ret = PaxosService::get_version_full(ver, bl);
-    if (!ret) {
-      full_osd_cache.add(ver, bl);
+    if (ret == -ENOENT) {
+      // build map?
+      ret = get_full_from_pinned_map(ver, bl);
     }
-    return ret;
+    if (ret != 0) {
+      return ret;
+    }
+
+    full_osd_cache.add(ver, bl);
+    return 0;
 }
 
 epoch_t OSDMonitor::blacklist(const entity_addr_t& a, utime_t until)
 
       required:   [none]
   
   AVAILABLE FEATURES:
-      supported:  [kraken(1),luminous(2),mimic(4)]
-      persistent: [kraken(1),luminous(2),mimic(4)]
+      supported:  [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+      persistent: [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
   MONMAP FEATURES:
       persistent: [none]
       optional:   [none]
       required:   [none]
   
   AVAILABLE FEATURES:
-      supported:  [kraken(1),luminous(2),mimic(4)]
-      persistent: [kraken(1),luminous(2),mimic(4)]
+      supported:  [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+      persistent: [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
   monmap:persistent:[none]
   monmap:optional:[none]
   monmap:required:[none]
-  available:supported:[kraken(1),luminous(2),mimic(4)]
-  available:persistent:[kraken(1),luminous(2),mimic(4)]
+  available:supported:[kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+  available:persistent:[kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
 
   $ monmaptool --feature-set foo /tmp/test.monmap.1234
   unknown features name 'foo' or unable to parse value: Expected option value to be integer, got 'foo'
       required:   [kraken(1),unknown(16),unknown(32)]
   
   AVAILABLE FEATURES:
-      supported:  [kraken(1),luminous(2),mimic(4)]
-      persistent: [kraken(1),luminous(2),mimic(4)]
+      supported:  [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+      persistent: [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
 
   $ monmaptool --feature-unset 32 --optional --feature-list /tmp/test.monmap.1234
   monmaptool: monmap file /tmp/test.monmap.1234
       required:   [kraken(1),unknown(16),unknown(32)]
   
   AVAILABLE FEATURES:
-      supported:  [kraken(1),luminous(2),mimic(4)]
-      persistent: [kraken(1),luminous(2),mimic(4)]
+      supported:  [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+      persistent: [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
   monmaptool: writing epoch 0 to /tmp/test.monmap.1234 (1 monitors)
 
   $ monmaptool --feature-unset 32 --persistent --feature-unset 16 --optional --feature-list /tmp/test.monmap.1234
       required:   [kraken(1)]
   
   AVAILABLE FEATURES:
-      supported:  [kraken(1),luminous(2),mimic(4)]
-      persistent: [kraken(1),luminous(2),mimic(4)]
+      supported:  [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+      persistent: [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
   monmaptool: writing epoch 0 to /tmp/test.monmap.1234 (1 monitors)
 
   $ monmaptool --feature-unset kraken --feature-list /tmp/test.monmap.1234
       required:   [none]
   
   AVAILABLE FEATURES:
-      supported:  [kraken(1),luminous(2),mimic(4)]
-      persistent: [kraken(1),luminous(2),mimic(4)]
+      supported:  [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
+      persistent: [kraken(1),luminous(2),mimic(4),osdmap-prune(8)]
   monmaptool: writing epoch 0 to /tmp/test.monmap.1234 (1 monitors)
 
   $ rm /tmp/test.monmap.1234