#include "common/Cond.h"
 
 #include "events/ESubtreeMap.h"
+#include "events/ESegment.h"
 
 #include "common/config.h"
 #include "common/errno.h"
   debug_subtrees = g_conf().get_val<bool>("mds_debug_subtrees");
   events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment");
   pause = g_conf().get_val<bool>("mds_log_pause");
+  major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio");
   max_segments = g_conf().get_val<uint64_t>("mds_log_max_segments");
   max_events = g_conf().get_val<int64_t>("mds_log_max_events");
 }
   plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
   plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
   plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
+  plb.add_u64(l_mdl_segmjr, "segmjr", "Major Segments");
   plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed",
                      "repl", PerfCountersBuilder::PRIO_INTERESTING);
   plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
 
 // -------------------------------------------------
 
+LogSegment* MDLog::_start_new_segment(SegmentBoundary* sb)
+{
+  ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
+  auto ls = new LogSegment(event_seq);
+  segments[event_seq] = ls;
+  logger->inc(l_mdl_segadd);
+  logger->set(l_mdl_seg, segments.size());
+  sb->set_seq(event_seq);
+
+  // Adjust to next stray dir
+  mds->mdcache->advance_stray();
+  return ls;
+}
+
 void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase* c)
 {
   dout(20) << __func__ << " " << *le << dendl;
   ceph_assert(!mds_is_shutting_down);
 
   event_seq++;
+  events_since_last_major_segment++;
+
+  if (auto sb = dynamic_cast<SegmentBoundary*>(le); sb) {
+    auto ls = _start_new_segment(sb);
+    if (sb->is_major_segment_boundary()) {
+      major_segments.insert(ls->seq);
+      logger->set(l_mdl_segmjr, major_segments.size());
+      events_since_last_major_segment = 0;
+    }
+  }
 
   EMetaBlob *metablob = le->get_metablob();
   if (metablob) {
   unflushed++;
 }
 
-void MDLog::_segment_upkeep(LogEvent* le)
+void MDLog::_segment_upkeep()
 {
   ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
   ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
   uint64_t period = journaler->get_layout_period();
   auto ls = get_current_segment();
   // start a new segment?
-  if (le->get_type() == EVENT_SUBTREEMAP ||
-      (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
-    // avoid infinite loop when ESubtreeMap is very large.
-    // do not insert ESubtreeMap among EImportFinish events that finish
-    // disambiguate imports. Because the ESubtreeMap reflects the subtree
-    // state when all EImportFinish events are replayed.
+  if (events_since_last_major_segment > events_per_segment*major_segment_event_ratio) {
+    dout(10) << __func__ << ": starting new major segment, current " << *ls << dendl;
+    auto sle = mds->mdcache->create_subtree_map();
+    _submit_entry(sle, NULL);
   } else if (ls->end/period != ls->offset/period || ls->num_events >= events_per_segment) {
-    dout(10) << "submit_entry also starting new segment: last = "
-            << ls->seq  << "/" << ls->offset << ", event seq = " << event_seq << dendl;
-    _start_new_segment();
-  } else if (debug_subtrees && le->get_type() != EVENT_SUBTREEMAP_TEST) {
+    dout(10) << __func__ << ": starting new segment, current " << *ls << dendl;
+    auto sb = new ESegment();
+    _submit_entry(sb, nullptr);
+  } else if (debug_subtrees && ls->num_events > 1) {
     // debug: journal this every time to catch subtree replay bugs.
     // use a different event id so it doesn't get interpreted as a
     // LogSegment boundary on replay.
     dout(10) << __func__ << ": creating test subtree map" << dendl;
-    LogEvent *sle = mds->mdcache->create_subtree_map();
+    auto sle = mds->mdcache->create_subtree_map();
     sle->set_type(EVENT_SUBTREEMAP_TEST);
     _submit_entry(sle, NULL);
   }
       uint64_t write_pos = journaler->get_write_pos();
 
       le->set_start_off(write_pos);
-      if (le->get_type() == EVENT_SUBTREEMAP)
+      if (dynamic_cast<SegmentBoundary*>(le)) {
        ls->offset = write_pos;
+      }
 
       dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
              << " : " << *le << dendl;
   }
 }
 
-
-// -----------------------------
-// segments
-
-void MDLog::_start_new_segment()
-{
-  _prepare_new_segment();
-  _journal_segment_subtree_map(NULL);
-}
-
-void MDLog::_prepare_new_segment()
-{
-  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
-
-  uint64_t seq = event_seq + 1;
-  dout(7) << __func__ << " seq " << seq << dendl;
-
-  segments[seq] = new LogSegment(seq);
-
-  logger->inc(l_mdl_segadd);
-  logger->set(l_mdl_seg, segments.size());
-
-  // Adjust to next stray dir
-  mds->mdcache->advance_stray();
-}
-
-void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
-{
-  ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
-
-  dout(7) << __func__ << dendl;
-  ESubtreeMap *sle = mds->mdcache->create_subtree_map();
-  sle->event_seq = get_last_segment_seq();
-
-  _submit_entry(sle, new C_MDL_Flushed(this, onsync));
-}
-
 class C_OFT_Committed : public MDSInternalContext {
   MDLog *mdlog;
   uint64_t seq;
 {
   ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
 
-  uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
+  uint64_t const oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
 
   // trim expired segments?
   bool trimmed = false;
-  while (!segments.empty()) {
-    LogSegment *ls = segments.begin()->second;
+  uint64_t end = 0;
+  for (auto it = segments.begin(); it != segments.end(); ++it) {
+    auto& [seq, ls] = *it;
+    dout(20) << __func__ << ": examining seq=" << seq << " ls=" << *ls << dendl;
+
+    if (auto msit = major_segments.find(seq); msit != major_segments.end() && end > 0) {
+      dout(10) << __func__ << ": expiring up to this major segment " << seq << dendl;
+      uint64_t expire_pos = 0;
+      for (auto& [seq2, ls2] : segments) {
+        if (seq <= seq2) {
+          break;
+        }
+        dout(20) << __func__ << ": expiring " << *ls2 << dendl;
+        expired_events -= ls2->num_events;
+        expired_segments.erase(ls2);
+        if (pre_segments_size > 0)
+          pre_segments_size--;
+        num_events -= ls2->num_events;
+        logger->inc(l_mdl_evtrm, ls2->num_events);
+        logger->inc(l_mdl_segtrm);
+        expire_pos = ls2->end;
+        delete ls2;
+      }
+      segments.erase(segments.begin(), it);
+      logger->set(l_mdl_seg, segments.size());
+      major_segments.erase(major_segments.begin(), msit);
+      logger->set(l_mdl_segmjr, major_segments.size());
+
+      auto jexpire_pos = journaler->get_expire_pos();
+      if (jexpire_pos < expire_pos) {
+        journaler->set_expire_pos(expire_pos);
+        logger->set(l_mdl_expos, expire_pos);
+      } else {
+        logger->set(l_mdl_expos, jexpire_pos);
+      }
+      trimmed = true;
+    }
+
     if (!expired_segments.count(ls)) {
-      dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
-              << " to expire" << dendl;
+      dout(10) << __func__ << " waiting for expiry " << *ls << dendl;
       break;
     }
 
     if (!mds_is_shutting_down && ls->seq >= oft_committed_seq) {
-      dout(10) << "_trim_expired_segments open file table committedseq " << oft_committed_seq
+      dout(10) << __func__ << " defer expire for open file table committedseq " << oft_committed_seq
               << " <= " << ls->seq << "/" << ls->offset << dendl;
       break;
     }
     
-    dout(10) << "_trim_expired_segments trimming expired "
-            << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
-    expired_events -= ls->num_events;
-    expired_segments.erase(ls);
-    if (pre_segments_size > 0)
-      pre_segments_size--;
-    num_events -= ls->num_events;
-      
-    // this was the oldest segment, adjust expire pos
-    if (journaler->get_expire_pos() < ls->end) {
-      journaler->set_expire_pos(ls->end);
-      logger->set(l_mdl_expos, ls->end);
-    } else {
-      logger->set(l_mdl_expos, ls->offset);
-    }
-    
-    logger->inc(l_mdl_segtrm);
-    logger->inc(l_mdl_evtrm, ls->num_events);
-    
-    segments.erase(ls->seq);
-    delete ls;
-    trimmed = true;
+    end = seq;
+    dout(10) << __func__ << ": maybe expiring " << *ls << dendl;
   }
 
   submit_mutex.unlock();
 
   logger->set(l_mdl_ev, num_events);
   logger->set(l_mdl_evexd, expired_events);
-  logger->set(l_mdl_seg, segments.size());
   logger->set(l_mdl_segexd, expired_segments.size());
 }
 
     if (le) {
       bool modified = false;
 
-      if (le->get_type() == EVENT_SUBTREEMAP ||
-          le->get_type() == EVENT_RESETJOURNAL) {
-        auto sle = dynamic_cast<ESubtreeMap*>(le.get());
-        if (sle == NULL || sle->event_seq == 0) {
+      if (auto sb = dynamic_cast<SegmentBoundary*>(le.get()); sb) {
+        if (sb->get_seq() == 0) {
           // A non-explicit event seq: the effective sequence number 
           // of this segment is it's position in the old journal and
           // the new effective sequence number will be its position
           || le->get_type() == EVENT_SUBTREEMAP_TEST) {
         auto& sle = dynamic_cast<ESubtreeMap&>(*le);
         dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
-          << le_pos << " seq=" << sle.event_seq << dendl;
+          << le_pos << " seq=" << sle.get_seq() << dendl;
         sle.expire_pos = 0;
         modified = true;
       }
         ceph_abort();  // Should be unreachable because damaged() calls
                     // respawn()
       }
-
     }
     le->set_start_off(pos);
 
-    // new segment?
-    if (le->get_type() == EVENT_SUBTREEMAP ||
-       le->get_type() == EVENT_RESETJOURNAL) {
-      auto sle = dynamic_cast<ESubtreeMap*>(le.get());
-      if (sle && sle->event_seq > 0)
-       event_seq = sle->event_seq;
-      else
-       event_seq = pos;
+    // have we seen an import map yet?
+    if (segments.empty() && !dynamic_cast<ESubtreeMap*>(le.get())) {
+      dout(1) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
+              << " " << le->get_stamp() << " -- waiting for ESubtreeMap.  (skipping " << *le << ")" << dendl;
+      continue;
+    }
+
+    events_since_last_major_segment++;
+    if (auto sb = dynamic_cast<SegmentBoundary*>(le.get()); sb) {
+      auto seq = sb->get_seq();
+      if (seq > 0) {
+        event_seq = seq;
+      } else {
+        event_seq = pos;
+      }
       segments[event_seq] = new LogSegment(event_seq, pos);
       logger->set(l_mdl_seg, segments.size());
+      if (sb->is_major_segment_boundary()) {
+        major_segments.insert(event_seq);
+        logger->set(l_mdl_segmjr, major_segments.size());
+        events_since_last_major_segment = 0;
+      }
     } else {
       event_seq++;
     }
 
-    // have we seen an import map yet?
-    if (segments.empty()) {
-      dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() 
-              << " " << le->get_stamp() << " -- waiting for subtree_map.  (skipping " << *le << ")" << dendl;
-    } else {
-      dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos() 
-              << " " << le->get_stamp() << ": " << *le << dendl;
-      le->_segment = get_current_segment();    // replay may need this
-      le->_segment->num_events++;
-      le->_segment->end = journaler->get_read_pos();
-      num_events++;
-      logger->set(l_mdl_ev, num_events);
-
-      {
-        std::lock_guard l(mds->mds_lock);
-        if (mds->is_daemon_stopping()) {
-          return;
-        }
-        logger->inc(l_mdl_replayed);
-        le->replay(mds);
+    dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
+             << " " << le->get_stamp() << ": " << *le << dendl;
+    le->_segment = get_current_segment();    // replay may need this
+    le->_segment->num_events++;
+    le->_segment->end = journaler->get_read_pos();
+    num_events++;
+    logger->set(l_mdl_ev, num_events);
+
+    {
+      std::lock_guard l(mds->mds_lock);
+      if (mds->is_daemon_stopping()) {
+        return;
       }
+      logger->inc(l_mdl_replayed);
+      le->replay(mds);
     }
 
     logger->set(l_mdl_rdpos, pos);
   if (changed.count("mds_log_events_per_segment")) {
     events_per_segment = g_conf().get_val<uint64_t>("mds_log_events_per_segment");
   }
+  if (changed.count("mds_log_major_segment_event_ratio")) {
+    major_segment_event_ratio = g_conf().get_val<uint64_t>("mds_log_major_segment_event_ratio");
+  }
   if (changed.count("mds_log_max_events")) {
     max_events = g_conf().get_val<int64_t>("mds_log_max_events");
   }
 
   l_mdl_segex,
   l_mdl_segtrm,
   l_mdl_seg,
+  l_mdl_segmjr,
   l_mdl_segexg,
   l_mdl_segexd,
   l_mdl_expos,
 
 #include "LogSegment.h"
 #include "MDSMap.h"
+#include "SegmentBoundary.h"
 
 #include <list>
 #include <map>
   void create_logger();
   void set_write_iohint(unsigned iohint_flags);
 
-  void start_new_segment() {
-    std::lock_guard l(submit_mutex);
-    _start_new_segment();
-  }
-  void prepare_new_segment() {
-    std::lock_guard l(submit_mutex);
-    _prepare_new_segment();
-  }
-  void journal_segment_subtree_map(MDSContext *onsync=NULL) {
-    {
-      std::lock_guard l{submit_mutex};
-      _journal_segment_subtree_map(onsync);
-    }
-    if (onsync)
-      flush();
-  }
-
   LogSegment *peek_current_segment() {
     return segments.empty() ? NULL : segments.rbegin()->second;
   }
   Journaler *get_journaler() { return journaler; }
   bool empty() const { return segments.empty(); }
 
+  uint64_t get_last_major_segment_seq() const {
+    ceph_assert(!major_segments.empty());
+    return *major_segments.rbegin();
+  }
   uint64_t get_last_segment_seq() const {
     ceph_assert(!segments.empty());
     return segments.rbegin()->first;
   void submit_entry(LogEvent *e, MDSLogContextBase* c = 0) {
     std::lock_guard l(submit_mutex);
     _submit_entry(e, c);
-    _segment_upkeep(e);
+    _segment_upkeep();
     submit_cond.notify_all();
   }
 
 
   // -- segments --
   std::map<uint64_t,LogSegment*> segments;
-  std::set<LogSegment*> expiring_segments;
-  std::set<LogSegment*> expired_segments;
   std::size_t pre_segments_size = 0;            // the num of segments when the mds finished replay-journal, to calc the num of segments growing
-  uint64_t event_seq = 0;
+  LogSegment::seq_t event_seq = 0;
   uint64_t expiring_events = 0;
   uint64_t expired_events = 0;
 
   friend class C_MDL_Flushed;
   friend class C_OFT_Committed;
 
-  // -- segments --
-  void _start_new_segment();
-  void _prepare_new_segment();
-  void _segment_upkeep(LogEvent* le);
-  void _journal_segment_subtree_map(MDSContext *onsync);
-  void _submit_entry(LogEvent *e, MDSLogContextBase *c);
-
   void try_to_commit_open_file_table(uint64_t last_seq);
+  LogSegment* _start_new_segment(SegmentBoundary* sb);
+  void _segment_upkeep();
+  void _submit_entry(LogEvent* e, MDSLogContextBase* c);
 
   void try_expire(LogSegment *ls, int op_prio);
   void _maybe_expired(LogSegment *ls, int op_prio);
 
   bool debug_subtrees;
   uint64_t events_per_segment;
+  uint64_t major_segment_event_ratio;
   int64_t max_events;
   uint64_t max_segments;
   bool pause;
+
+  std::set<uint64_t> major_segments;
+  std::set<LogSegment*> expired_segments;
+  std::set<LogSegment*> expiring_segments;
+  uint64_t events_since_last_major_segment = 0;
 };
 #endif