]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: consolidate seastore_journal logs with structured level and... 44556/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 12 Jan 2022 05:42:11 +0000 (13:42 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 14 Jan 2022 15:06:43 +0000 (23:06 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_reader.cc
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.cc

index 9ae0fcebf989645393c3cb8b1464ebdce6a0ac7a..8b66ffffd316d96809db76fbc624dbee6ed68282 100644 (file)
@@ -110,11 +110,13 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
   size_t budget,
   found_record_handler_t &handler)
 {
+  LOG_PREFIX(ExtentReader::scan_valid_records);
   auto& segment_manager =
     *segment_managers[cursor.get_segment_id().device_id()];
   if (cursor.get_segment_offset() == 0) {
     cursor.increment_seq(segment_manager.get_block_size());
   }
+  DEBUG("starting at {}, budget={}", cursor, budget);
   auto retref = std::make_unique<size_t>(0);
   auto &budget_used = *retref;
   return crimson::repeat(
@@ -122,26 +124,32 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
     -> scan_valid_records_ertr::future<seastar::stop_iteration> {
       return [=, &handler, &cursor, &budget_used] {
        if (!cursor.last_valid_header_found) {
-         LOG_PREFIX(ExtentReader::scan_valid_records);
          return read_validate_record_metadata(cursor.seq.offset, nonce
          ).safe_then([=, &cursor](auto md) {
-           DEBUG("read complete {}", cursor.seq);
            if (!md) {
-             DEBUG("found invalid header at {}, presumably at end", cursor.seq);
              cursor.last_valid_header_found = true;
+             if (cursor.is_complete()) {
+               INFO("complete at {}, invalid record group metadata",
+                     cursor);
+             } else {
+               DEBUG("found invalid record group metadata at {}, "
+                     "processing {} pending record groups",
+                     cursor.seq,
+                     cursor.pending_record_groups.size());
+             }
              return scan_valid_records_ertr::now();
            } else {
              auto& [header, md_bl] = *md;
-             auto new_committed_to = header.committed_to;
-             DEBUG("valid record read at {}, now committed at {}",
-                   cursor.seq, new_committed_to);
+             DEBUG("found valid {} at {}", header, cursor.seq);
              cursor.emplace_record_group(header, std::move(md_bl));
              return scan_valid_records_ertr::now();
            }
          }).safe_then([=, &cursor, &budget_used, &handler] {
+           DEBUG("processing committed record groups until {}, {} pending",
+                 cursor.last_committed,
+                 cursor.pending_record_groups.size());
            return crimson::repeat(
              [=, &budget_used, &cursor, &handler] {
-               DEBUG("valid record read, processing queue");
                if (cursor.pending_record_groups.empty()) {
                  /* This is only possible if the segment is empty.
                   * A record's last_commited must be prior to its own
@@ -169,8 +177,10 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
          assert(!cursor.pending_record_groups.empty());
          auto &next = cursor.pending_record_groups.front();
          return read_validate_data(next.offset, next.header
-         ).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
+         ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
            if (!valid) {
+             INFO("complete at {}, invalid record group data at {}, {}",
+                  cursor, next.offset, next.header);
              cursor.pending_record_groups.clear();
              return scan_valid_records_ertr::now();
            }
@@ -179,6 +189,8 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
        }
       }().safe_then([=, &budget_used, &cursor] {
        if (cursor.is_complete() || budget_used >= budget) {
+         DEBUG("finish at {}, budget_used={}, budget={}",
+                cursor, budget_used, budget);
          return seastar::stop_iteration::yes;
        } else {
          return seastar::stop_iteration::no;
@@ -200,14 +212,14 @@ ExtentReader::read_validate_record_metadata(
   auto& seg_addr = start.as_seg_paddr();
   auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
   auto block_size = segment_manager.get_block_size();
-  if (seg_addr.get_segment_off() + block_size >
-      (int64_t)segment_manager.get_segment_size()) {
-    DEBUG("failed, reach segment end");
+  auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
+  if (seg_addr.get_segment_off() + block_size > segment_size) {
+    DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
     return read_validate_record_metadata_ret(
       read_validate_record_metadata_ertr::ready_future_marker{},
       std::nullopt);
   }
-  DEBUG("reading header block {}...", start);
+  TRACE("reading record group header block {}~4096", start);
   return segment_manager.read(start, block_size
   ).safe_then([=, &segment_manager](bufferptr bptr) mutable
               -> read_validate_record_metadata_ret {
@@ -228,9 +240,8 @@ ExtentReader::read_validate_record_metadata(
         header.dlength % block_size != 0 ||
         (header.committed_to != journal_seq_t() &&
          header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
-        (seg_addr.get_segment_off() + header.mdlength + header.dlength >
-         (int64_t)segment_manager.get_segment_size())) {
-      ERROR("failed, invalid header");
+        (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
+      ERROR("failed, invalid record group header {}", start);
       return crimson::ct_error::input_output_error::make();
     }
     if (header.mdlength == block_size) {
@@ -239,12 +250,14 @@ ExtentReader::read_validate_record_metadata(
         std::make_pair(std::move(header), std::move(bl))
       );
     }
-    return segment_manager.read(
-      paddr_t::make_seg_paddr(
+
+    auto rest_start = paddr_t::make_seg_paddr(
         seg_addr.get_segment_id(),
         seg_addr.get_segment_off() + (segment_off_t)block_size
-      ),
-      header.mdlength - block_size
+    );
+    auto rest_len = header.mdlength - block_size;
+    TRACE("reading record group header rest {}~{}", rest_start, rest_len);
+    return segment_manager.read(rest_start, rest_len
     ).safe_then([header=std::move(header), bl=std::move(bl)
                 ](auto&& bptail) mutable {
       bl.push_back(bptail);
@@ -274,7 +287,7 @@ ExtentReader::read_validate_data(
   LOG_PREFIX(ExtentReader::read_validate_data);
   auto& segment_manager = *segment_managers[record_base.get_device_id()];
   auto data_addr = record_base.add_offset(header.mdlength);
-  DEBUG("reading data blocks {}+{}...", data_addr, header.dlength);
+  TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
   return segment_manager.read(
     data_addr,
     header.dlength
@@ -291,6 +304,7 @@ ExtentReader::consume_next_records(
   found_record_handler_t& handler,
   std::size_t& budget_used)
 {
+  LOG_PREFIX(ExtentReader::consume_next_records);
   auto& next = cursor.pending_record_groups.front();
   auto total_length = next.header.dlength + next.header.mdlength;
   budget_used += total_length;
@@ -304,12 +318,17 @@ ExtentReader::consume_next_records(
       static_cast<segment_off_t>(total_length)
     }
   };
+  DEBUG("processing {} at {}, budget_used={}",
+        next.header, locator, budget_used);
   return handler(
     locator,
     next.header,
     next.mdbuffer
-  ).safe_then([&cursor] {
+  ).safe_then([FNAME, &cursor] {
     cursor.pop_record_group();
+    if (cursor.is_complete()) {
+      INFO("complete at {}, no more record group", cursor);
+    }
   });
 }
 
index f24adb6890479cbd3a6c1dffac103d303cb41ada..ebcc366c3e950e0453ac43ea02629025e015beca 100644 (file)
 #include "crimson/os/seastore/segment_cleaner.h"
 
 SET_SUBSYS(seastore_journal);
+/*
+ * format:
+ * - H<handle-addr> information
+ *
+ * levels:
+ * - INFO:  major initiation, closing, rolling and replay operations
+ * - DEBUG: INFO details, major submit operations
+ * - TRACE: DEBUG details
+ */
 
 namespace crimson::os::seastore {
 
@@ -44,13 +53,28 @@ Journal::Journal(
   register_metrics();
 }
 
+Journal::open_for_write_ret Journal::open_for_write()
+{
+  LOG_PREFIX(Journal::open_for_write);
+  INFO("device_id={}", journal_segment_manager.get_device_id());
+  return journal_segment_manager.open();
+}
+
+Journal::close_ertr::future<> Journal::close()
+{
+  LOG_PREFIX(Journal::close);
+  INFO("closing");
+  metrics.clear();
+  return journal_segment_manager.close();
+}
+
 Journal::prep_replay_segments_fut
 Journal::prep_replay_segments(
   std::vector<std::pair<segment_id_t, segment_header_t>> segments)
 {
   LOG_PREFIX(Journal::prep_replay_segments);
-  DEBUG("{} segments", segments.size());
   if (segments.empty()) {
+    ERROR("no journal segments for replay");
     return crimson::ct_error::input_output_error::make();
   }
   std::sort(
@@ -83,7 +107,6 @@ Journal::prep_replay_segments(
   auto journal_tail = segments.rbegin()->second.journal_tail;
   segment_provider->update_journal_tail_committed(journal_tail);
   auto replay_from = journal_tail.offset;
-  DEBUG("journal_tail={}", journal_tail);
   auto from = segments.begin();
   if (replay_from != P_ADDR_NULL) {
     from = std::find_if(
@@ -103,17 +126,20 @@ Journal::prep_replay_segments(
       from->first,
       journal_segment_manager.get_block_size());
   }
-  auto ret = replay_segments_t(segments.end() - from);
+
+  auto num_segments = segments.end() - from;
+  INFO("{} segments to replay, from {}",
+       num_segments, replay_from);
+  auto ret = replay_segments_t(num_segments);
   std::transform(
     from, segments.end(), ret.begin(),
-    [this, FNAME](const auto &p) {
+    [this](const auto &p) {
       auto ret = journal_seq_t{
        p.second.journal_segment_seq,
        paddr_t::make_seg_paddr(
          p.first,
          journal_segment_manager.get_block_size())
       };
-      DEBUG("replaying from {}", ret);
       return std::make_pair(ret, p.second);
     });
   ret[0].first.offset = replay_from;
@@ -129,7 +155,7 @@ Journal::replay_segment(
   delta_handler_t &handler)
 {
   LOG_PREFIX(Journal::replay_segment);
-  DEBUG("starting at {}", seq);
+  INFO("starting at {} -- {}", seq, header);
   return seastar::do_with(
     scan_valid_records_cursor(seq),
     ExtentReader::found_record_handler_t([=, &handler](
@@ -138,12 +164,12 @@ Journal::replay_segment(
       const bufferlist& mdbuf)
       -> ExtentReader::scan_valid_records_ertr::future<>
     {
-      DEBUG("decoding {} records", header.records);
       auto maybe_record_deltas_list = try_decode_deltas(
           header, mdbuf, locator.record_block_base);
       if (!maybe_record_deltas_list) {
         // This should be impossible, we did check the crc on the mdbuf
-        ERROR("unable to decode deltas for record {}", locator.record_block_base);
+        ERROR("unable to decode deltas for record {} at {}",
+              header, locator);
         return crimson::ct_error::input_output_error::make();
       }
 
@@ -161,13 +187,13 @@ Journal::replay_segment(
            FNAME,
            &handler](record_deltas_t& record_deltas)
         {
-          DEBUG("decoded {} deltas at block_base {}",
-                record_deltas.deltas.size(),
-                record_deltas.record_block_base);
           auto locator = record_locator_t{
             record_deltas.record_block_base,
             write_result
           };
+          DEBUG("processing {} deltas at block_base {}",
+                record_deltas.deltas.size(),
+                locator);
           return crimson::do_for_each(
             record_deltas.deltas,
             [locator,
@@ -216,6 +242,8 @@ Journal::replay_ret Journal::replay(
   std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
   delta_handler_t &&delta_handler)
 {
+  LOG_PREFIX(Journal::replay);
+  INFO("got {} segments", segment_headers.size());
   return seastar::do_with(
     std::move(delta_handler), replay_segments_t(),
     [this, segment_headers=std::move(segment_headers)]
@@ -223,8 +251,6 @@ Journal::replay_ret Journal::replay(
   {
     return prep_replay_segments(std::move(segment_headers)
     ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
-      LOG_PREFIX(Journal::replay);
-      DEBUG("found {} segments", replay_segs.size());
       segments = std::move(replay_segs);
       return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
         return replay_segment(i.first, i.second, handler);
@@ -235,6 +261,8 @@ Journal::replay_ret Journal::replay(
 
 void Journal::register_metrics()
 {
+  LOG_PREFIX(Journal::register_metrics);
+  DEBUG("");
   record_submitter.reset_stats();
   namespace sm = seastar::metrics;
   metrics.add_group(
@@ -300,9 +328,30 @@ Journal::JournalSegmentManager::JournalSegmentManager(
   reset();
 }
 
+Journal::JournalSegmentManager::open_ret
+Journal::JournalSegmentManager::open()
+{
+  return roll().safe_then([this] {
+    return get_current_write_seq();
+  });
+}
+
 Journal::JournalSegmentManager::close_ertr::future<>
 Journal::JournalSegmentManager::close()
 {
+  LOG_PREFIX(JournalSegmentManager::close);
+  if (current_journal_segment) {
+    INFO("segment_id={}, seq={}, "
+         "written_to={}, committed_to={}, nonce={}",
+         current_journal_segment->get_segment_id(),
+         get_segment_seq(),
+         written_to,
+         committed_to,
+         current_segment_nonce);
+  } else {
+    INFO("no current journal segment");
+  }
+
   return (
     current_journal_segment ?
     current_journal_segment->close() :
@@ -320,16 +369,26 @@ Journal::JournalSegmentManager::close()
 Journal::JournalSegmentManager::roll_ertr::future<>
 Journal::JournalSegmentManager::roll()
 {
+  LOG_PREFIX(JournalSegmentManager::roll);
   auto old_segment_id = current_journal_segment ?
     current_journal_segment->get_segment_id() :
     NULL_SEG_ID;
+  if (current_journal_segment) {
+    INFO("closing segment {}, seq={}, "
+         "written_to={}, committed_to={}, nonce={}",
+         old_segment_id,
+         get_segment_seq(),
+         written_to,
+         committed_to,
+         current_segment_nonce);
+  }
 
   return (
     current_journal_segment ?
     current_journal_segment->close() :
     Segment::close_ertr::now()
   ).safe_then([this] {
-    return segment_provider->get_segment(segment_manager.get_device_id());
+    return segment_provider->get_segment(get_device_id());
   }).safe_then([this](auto segment) {
     return segment_manager.open(segment);
   }).safe_then([this](auto sref) {
@@ -356,10 +415,7 @@ Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
   LOG_PREFIX(JournalSegmentManager::write);
   auto write_length = to_write.length();
   auto write_start_seq = get_current_write_seq();
-  DEBUG("write_start {} => {}, length={}",
-        write_start_seq,
-        write_start_seq.offset.as_seg_paddr().get_segment_off() + write_length,
-        write_length);
+  TRACE("{}~{}", write_start_seq, write_length);
   assert(write_length > 0);
   assert((write_length % segment_manager.get_block_size()) == 0);
   assert(!needs_roll(write_length));
@@ -386,8 +442,7 @@ void Journal::JournalSegmentManager::mark_committed(
   const journal_seq_t& new_committed_to)
 {
   LOG_PREFIX(JournalSegmentManager::mark_committed);
-  DEBUG("committed_to {} => {}",
-        committed_to, new_committed_to);
+  TRACE("{} => {}", committed_to, new_committed_to);
   assert(committed_to == journal_seq_t() ||
          committed_to <= new_committed_to);
   committed_to = new_committed_to;
@@ -411,10 +466,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
     new_tail,
     current_segment_nonce,
     false};
-  DEBUG("segment_id {} journal_tail_target {}, header {}",
-        segment.get_segment_id(),
-        new_tail,
-        header);
+  INFO("writing {} ...", header);
   encode(header, bl);
 
   bufferptr bp(
@@ -436,17 +488,20 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
 Journal::RecordBatch::add_pending_ret
 Journal::RecordBatch::add_pending(
   record_t&& record,
+  OrderingHandle& handle,
   extent_len_t block_size)
 {
   LOG_PREFIX(RecordBatch::add_pending);
   auto new_size = get_encoded_length_after(record, block_size);
-  DEBUG("batches={}, write_size={}",
+  auto dlength_offset = pending.size.dlength;
+  TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
+        (void*)&handle,
         pending.get_size() + 1,
-        new_size.get_encoded_length());
+        new_size.get_encoded_length(),
+        dlength_offset);
   assert(state != state_t::SUBMITTING);
   assert(can_batch(record, block_size).value() == new_size);
 
-  auto dlength_offset = pending.size.dlength;
   pending.push_back(
       std::move(record), block_size);
   assert(pending.size == new_size);
@@ -459,9 +514,10 @@ Journal::RecordBatch::add_pending(
   state = state_t::PENDING;
 
   return io_promise->get_shared_future(
-  ).then([dlength_offset
+  ).then([dlength_offset, FNAME, &handle
          ](auto maybe_promise_result) -> add_pending_ret {
     if (!maybe_promise_result.has_value()) {
+      ERROR("H{} write failed", (void*)&handle);
       return crimson::ct_error::input_output_error::make();
     }
     auto write_result = maybe_promise_result->write_result;
@@ -470,6 +526,7 @@ Journal::RecordBatch::add_pending(
           maybe_promise_result->mdlength + dlength_offset),
       write_result
     };
+    TRACE("H{} write finish with {}", (void*)&handle, submit_result);
     return add_pending_ret(
       add_pending_ertr::ready_future_marker{},
       submit_result);
@@ -481,10 +538,6 @@ Journal::RecordBatch::encode_batch(
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
-  LOG_PREFIX(RecordBatch::encode_batch);
-  DEBUG("batches={}, committed_to={}",
-        pending.get_size(),
-        committed_to);
   assert(state == state_t::PENDING);
   assert(pending.get_size() > 0);
   assert(io_promise.has_value());
@@ -503,20 +556,13 @@ Journal::RecordBatch::encode_batch(
 void Journal::RecordBatch::set_result(
   maybe_result_t maybe_write_result)
 {
-  LOG_PREFIX(RecordBatch::set_result);
   maybe_promise_result_t result;
   if (maybe_write_result.has_value()) {
-    DEBUG("batches={}, write_start {} + {}",
-          submitting_size,
-          maybe_write_result->start_seq,
-          maybe_write_result->length);
     assert(maybe_write_result->length == submitting_length);
     result = promise_result_t{
       *maybe_write_result,
       submitting_mdlength
     };
-  } else {
-    ERROR("batches={}, write is failed!", submitting_size);
   }
   assert(state == state_t::SUBMITTING);
   assert(io_promise.has_value());
@@ -536,9 +582,8 @@ Journal::RecordBatch::submit_pending_fast(
   const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
-  LOG_PREFIX(RecordBatch::submit_pending_fast);
   auto new_size = get_encoded_length_after(record, block_size);
-  DEBUG("write_size={}", new_size.get_encoded_length());
+  std::ignore = new_size;
   assert(state == state_t::EMPTY);
   assert(can_batch(record, block_size).value() == new_size);
 
@@ -585,6 +630,7 @@ Journal::RecordSubmitter::submit(
   OrderingHandle& handle)
 {
   LOG_PREFIX(RecordSubmitter::submit);
+  DEBUG("H{} {} start ...", (void*)&handle, record);
   assert(write_pipeline);
   auto expected_size = record_group_size_t(
       record.size,
@@ -592,10 +638,8 @@ Journal::RecordSubmitter::submit(
   ).get_encoded_length();
   auto max_record_length = journal_segment_manager.get_max_write_length();
   if (expected_size > max_record_length) {
-    ERROR("record size {} exceeds max {}",
-          expected_size,
-          max_record_length
-    );
+    ERROR("H{} {} exceeds max record size {}",
+          (void*)&handle, record, max_record_length);
     return crimson::ct_error::erange::make();
   }
 
@@ -615,18 +659,33 @@ void Journal::RecordSubmitter::update_state()
   }
 }
 
+void Journal::RecordSubmitter::decrement_io_with_flush()
+{
+  LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+  assert(num_outstanding_io > 0);
+  --num_outstanding_io;
+#ifndef NDEBUG
+  auto prv_state = state;
+#endif
+  update_state();
+
+  if (wait_submit_promise.has_value()) {
+    DEBUG("wait resolved");
+    assert(prv_state == state_t::FULL);
+    wait_submit_promise->set_value();
+    wait_submit_promise.reset();
+  }
+
+  if (!p_current_batch->is_empty()) {
+    TRACE("flush");
+    flush_current_batch();
+  }
+}
+
 void Journal::RecordSubmitter::account_submission(
   std::size_t num,
   const record_group_size_t& size)
 {
-  LOG_PREFIX(RecordSubmitter::account_submission);
-  DEBUG("Journal::RecordSubmitter: submitting {} records, "
-        "mdsize={}, dsize={}, fillness={}",
-        num,
-        size.get_raw_mdlength(),
-        size.dlength,
-        ((double)(size.get_raw_mdlength() + size.dlength) /
-        (size.get_mdlength() + size.dlength)));
   stats.record_group_padding_bytes +=
     (size.get_mdlength() - size.get_raw_mdlength());
   stats.record_group_metadata_bytes += size.get_raw_mdlength();
@@ -653,20 +712,23 @@ void Journal::RecordSubmitter::flush_current_batch()
 
   increment_io();
   auto num = p_batch->get_num_records();
+  auto committed_to = journal_segment_manager.get_committed_to();
   auto [to_write, sizes] = p_batch->encode_batch(
-    journal_segment_manager.get_committed_to(),
-    journal_segment_manager.get_nonce());
+    committed_to, journal_segment_manager.get_nonce());
+  DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
+        num, sizes, committed_to, num_outstanding_io);
   account_submission(num, sizes);
   std::ignore = journal_segment_manager.write(to_write
-  ).safe_then([this, p_batch](auto write_result) {
+  ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+    TRACE("{} records, {}, write done with {}", num, sizes, write_result);
     finish_submit_batch(p_batch, write_result);
   }).handle_error(
-    crimson::ct_error::all_same_way([this, p_batch, FNAME](auto e) {
-      ERROR("got error {}", e);
+    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+      ERROR("{} records, {}, got error {}", num, sizes, e);
       finish_submit_batch(p_batch, std::nullopt);
     })
-  ).handle_exception([this, p_batch, FNAME](auto e) {
-    ERROR("got exception {}", e);
+  ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+    ERROR("{} records, {}, got exception {}", num, sizes, e);
     finish_submit_batch(p_batch, std::nullopt);
   });
 }
@@ -677,19 +739,23 @@ Journal::RecordSubmitter::submit_pending(
   OrderingHandle& handle,
   bool flush)
 {
+  LOG_PREFIX(RecordSubmitter::submit_pending);
   assert(!p_current_batch->is_submitting());
   stats.record_batch_stats.increment(
       p_current_batch->get_num_records() + 1);
   bool do_flush = (flush || state == state_t::IDLE);
-  auto write_fut = [this, do_flush, record=std::move(record)]() mutable {
+  auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
     if (do_flush && p_current_batch->is_empty()) {
       // fast path with direct write
       increment_io();
+      auto committed_to = journal_segment_manager.get_committed_to();
       auto [to_write, sizes] = p_current_batch->submit_pending_fast(
         std::move(record),
         journal_segment_manager.get_block_size(),
-        journal_segment_manager.get_committed_to(),
+        committed_to,
         journal_segment_manager.get_nonce());
+      DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
+            (void*)&handle, sizes, committed_to, num_outstanding_io);
       account_submission(1, sizes);
       return journal_segment_manager.write(to_write
       ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
@@ -703,9 +769,15 @@ Journal::RecordSubmitter::submit_pending(
     } else {
       // indirect write with or without the existing pending records
       auto write_fut = p_current_batch->add_pending(
-        std::move(record), journal_segment_manager.get_block_size());
+        std::move(record),
+        handle,
+        journal_segment_manager.get_block_size());
       if (do_flush) {
+        DEBUG("H{} added pending and flush", (void*)&handle);
         flush_current_batch();
+      } else {
+        DEBUG("H{} added with {} pending",
+              (void*)&handle, p_current_batch->get_num_records());
       }
       return write_fut;
     }
@@ -713,9 +785,10 @@ Journal::RecordSubmitter::submit_pending(
   return handle.enter(write_pipeline->device_submission
   ).then([write_fut=std::move(write_fut)]() mutable {
     return std::move(write_fut);
-  }).safe_then([this, &handle](auto submit_result) {
+  }).safe_then([this, FNAME, &handle](auto submit_result) {
     return handle.enter(write_pipeline->finalize
-    ).then([this, submit_result] {
+    ).then([this, FNAME, submit_result, &handle] {
+      DEBUG("H{} finish with {}", (void*)&handle, submit_result);
       journal_segment_manager.mark_committed(
           submit_result.write_result.get_end_seq());
       return submit_result;
@@ -728,6 +801,9 @@ Journal::RecordSubmitter::do_submit(
   record_t&& record,
   OrderingHandle& handle)
 {
+  LOG_PREFIX(RecordSubmitter::do_submit);
+  TRACE("H{} outstanding_io={}/{} ...",
+        (void*)&handle, num_outstanding_io, io_depth_limit);
   assert(!p_current_batch->is_submitting());
   if (state <= state_t::PENDING) {
     // can increment io depth
@@ -737,13 +813,17 @@ Journal::RecordSubmitter::do_submit(
     if (!maybe_new_size.has_value() ||
         (maybe_new_size->get_encoded_length() >
          journal_segment_manager.get_max_write_length())) {
+      TRACE("H{} flush", (void*)&handle);
       assert(p_current_batch->is_pending());
       flush_current_batch();
       return do_submit(std::move(record), handle);
     } else if (journal_segment_manager.needs_roll(
           maybe_new_size->get_encoded_length())) {
       if (p_current_batch->is_pending()) {
+        TRACE("H{} flush and roll", (void*)&handle);
         flush_current_batch();
+      } else {
+        TRACE("H{} roll", (void*)&handle);
       }
       return journal_segment_manager.roll(
       ).safe_then([this, record=std::move(record), &handle]() mutable {
@@ -768,6 +848,7 @@ Journal::RecordSubmitter::do_submit(
     if (!wait_submit_promise.has_value()) {
       wait_submit_promise = seastar::promise<>();
     }
+    DEBUG("H{} wait ...", (void*)&handle);
     return wait_submit_promise->get_future(
     ).then([this, record=std::move(record), &handle]() mutable {
       return do_submit(std::move(record), handle);
index f64472b963ea4ef537425957c841620e3076596f..c7be7cbab2bb780d24a1e6076e4130006eafbe17 100644 (file)
@@ -63,9 +63,7 @@ public:
     crimson::ct_error::input_output_error
     >;
   using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
-  open_for_write_ret open_for_write() {
-    return journal_segment_manager.open();
-  }
+  open_for_write_ret open_for_write();
 
   /**
    * close journal
@@ -74,10 +72,7 @@ public:
    */
   using close_ertr = crimson::errorator<
     crimson::ct_error::input_output_error>;
-  close_ertr::future<> close() {
-    metrics.clear();
-    return journal_segment_manager.close();
-  }
+  close_ertr::future<> close();
 
   /**
    * submit_record
@@ -130,6 +125,10 @@ private:
                      size_t(segment_manager.get_block_size()));
     }
 
+    device_id_t get_device_id() const {
+      return segment_manager.get_device_id();
+    }
+
     segment_off_t get_block_size() const {
       return segment_manager.get_block_size();
     }
@@ -156,11 +155,7 @@ private:
 
     using open_ertr = base_ertr;
     using open_ret = open_ertr::future<journal_seq_t>;
-    open_ret open() {
-      return roll().safe_then([this] {
-        return get_current_write_seq();
-      });
-    }
+    open_ret open();
 
     using close_ertr = base_ertr;
     close_ertr::future<> close();
@@ -286,6 +281,7 @@ private:
     using add_pending_ret = add_pending_ertr::future<record_locator_t>;
     add_pending_ret add_pending(
         record_t&&,
+        OrderingHandle&,
         extent_len_t block_size);
 
     // Encode the batched records for write.
@@ -401,24 +397,7 @@ private:
       update_state();
     }
 
-    void decrement_io_with_flush() {
-      assert(num_outstanding_io > 0);
-      --num_outstanding_io;
-#ifndef NDEBUG
-      auto prv_state = state;
-#endif
-      update_state();
-
-      if (wait_submit_promise.has_value()) {
-        assert(prv_state == state_t::FULL);
-        wait_submit_promise->set_value();
-        wait_submit_promise.reset();
-      }
-
-      if (!p_current_batch->is_empty()) {
-        flush_current_batch();
-      }
-    }
+    void decrement_io_with_flush();
 
     void pop_free_batch() {
       assert(p_current_batch == nullptr);
index 5846fa8b585832efbd84e774be124465ff6ac6fe..8befb9f1eed4a31802dd37cb4cfa27c466b60770 100644 (file)
@@ -427,8 +427,6 @@ try_decode_extent_infos(
 {
   auto maybe_headers = try_decode_record_headers(header, md_bl);
   if (!maybe_headers) {
-    journal_logger().debug(
-        "try_decode_extent_infos: failed, cannot decode record headers.");
     return std::nullopt;
   }
 
@@ -468,8 +466,6 @@ try_decode_deltas(
 {
   auto maybe_record_extent_infos = try_decode_extent_infos(header, md_bl);
   if (!maybe_record_extent_infos) {
-    journal_logger().debug(
-        "try_decode_deltas: failed, cannot decode extent_infos.");
     return std::nullopt;
   }