]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/record_submitter: refactor JournalAllocator::write()
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 16 Jul 2024 02:36:57 +0000 (10:36 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Aug 2024 03:14:06 +0000 (11:14 +0800)
Decouple get_written_to() so it can be queried upon writes.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal/circular_journal_space.cc
src/crimson/os/seastore/journal/circular_journal_space.h
src/crimson/os/seastore/journal/record_submitter.cc
src/crimson/os/seastore/journal/record_submitter.h
src/crimson/os/seastore/journal/segment_allocator.cc
src/crimson/os/seastore/journal/segment_allocator.h

index a26b5a652d5e64c6f758ccd5dd1c9a3939a3a604..458042ee36a2c4c8c8aecdfeb26785ac2378b2c2 100644 (file)
@@ -49,7 +49,7 @@ CircularJournalSpace::roll_ertr::future<> CircularJournalSpace::roll() {
   return roll_ertr::now();
 }
 
-CircularJournalSpace::write_ret
+CircularJournalSpace::write_ertr::future<>
 CircularJournalSpace::write(ceph::bufferlist&& to_write) {
   LOG_PREFIX(CircularJournalSpace::write);
   assert(get_written_to().segment_seq != NULL_SEG_SEQ);
@@ -60,7 +60,6 @@ CircularJournalSpace::write(ceph::bufferlist&& to_write) {
   assert(encoded_size + get_rbm_addr(get_written_to())
         < get_journal_end());
 
-  journal_seq_t j_seq = get_written_to();
   auto target = get_rbm_addr(get_written_to());
   auto new_written_to = target + encoded_size;
   assert(new_written_to < get_journal_end());
@@ -69,21 +68,11 @@ CircularJournalSpace::write(ceph::bufferlist&& to_write) {
     get_device_id());
   set_written_to(
     journal_seq_t{get_written_to().segment_seq, paddr});
-  DEBUG("{}, target {}", to_write.length(), target);
+  DEBUG("length {}, commit target {}, used_size {}",
+        encoded_size, target, get_records_used_size());
 
-  auto write_result = write_result_t{
-    j_seq,
-    encoded_size
-  };
   return device_write_bl(target, to_write
-  ).safe_then([this, target,
-    length=encoded_size,
-    write_result,
-    FNAME] {
-    DEBUG("commit target {} used_size {} written length {}",
-          target, get_records_used_size(), length);
-    return write_result;
-  }).handle_error(
+  ).handle_error(
     write_ertr::pass_further{},
     crimson::ct_error::assert_all{ "Invalid error" }
   );
index 2e1cb7ae8df63c521b50b72389c68a0c522b4cde..920b5d78d30a8862cabe5a384b67b325cb66ecec 100644 (file)
@@ -46,7 +46,11 @@ class CircularJournalSpace : public JournalAllocator {
 
   roll_ertr::future<> roll() final;
 
-  write_ret write(ceph::bufferlist&& to_write) final;
+  journal_seq_t get_written_to() const final {
+    return written_to;
+  }
+
+  write_ertr::future<> write(ceph::bufferlist&& to_write) final;
 
   void update_modify_time(record_t& record) final {}
 
@@ -140,9 +144,6 @@ class CircularJournalSpace : public JournalAllocator {
    *
    */
 
-  journal_seq_t get_written_to() const {
-    return written_to;
-  }
   rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
     return convert_paddr_to_abs_addr(seq.offset);
   }
index d784c33cfc36837d2e3a1a8a9f178283501bf546..8d71cb994a32132912656f79d277d21f48c810f3 100644 (file)
@@ -311,11 +311,14 @@ RecordSubmitter::submit(
       journal_allocator.get_nonce());
     DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
           get_name(), sizes, get_committed_to(), num_outstanding_io);
+    write_result_t result{
+        journal_allocator.get_written_to(),
+        to_write.length()};
     return journal_allocator.write(std::move(to_write)
-    ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+    ).safe_then([mdlength=sizes.get_mdlength(), result] {
       return record_locator_t{
-        write_result.start_seq.offset.add_offset(mdlength),
-        write_result
+        result.start_seq.offset.add_offset(mdlength),
+        result
       };
     }).finally([this] {
       decrement_io_with_flush();
@@ -533,11 +536,14 @@ void RecordSubmitter::flush_current_batch()
   // Note: rg is cleared
   DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
         get_name(), num, sizes, get_committed_to(), num_outstanding_io);
+  write_result_t result{
+      journal_allocator.get_written_to(),
+      to_write.length()};
   std::ignore = journal_allocator.write(std::move(to_write)
-  ).safe_then([this, p_batch, FNAME, num, sizes](auto write_result) {
+  ).safe_then([this, p_batch, FNAME, num, sizes, result] {
     TRACE("{} {} records, {}, write done with {}",
-          get_name(), num, sizes, write_result);
-    finish_submit_batch(p_batch, write_result);
+          get_name(), num, sizes, result);
+    finish_submit_batch(p_batch, result);
   }).handle_error(
     crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes](auto e) {
       ERROR("{} {} records, {}, got error {}",
index d69a5ac96f09421998aaf055f8a44cbd19c6a846..62047f6bdcd760cc1438a0bfe3d43bbba6804f93 100644 (file)
@@ -36,9 +36,10 @@ public:
 
   virtual segment_nonce_t get_nonce() const  = 0;
 
+  virtual journal_seq_t get_written_to() const = 0;
+
   using write_ertr = base_ertr;
-  using write_ret = write_ertr::future<write_result_t>;
-  virtual write_ret write(ceph::bufferlist&& to_write) = 0;
+  virtual write_ertr::future<> write(ceph::bufferlist&& to_write) = 0;
 
   virtual bool can_write() const = 0;
   
index 61e1be585c8eec4d62d46d8593c2ccf6ff110acd..adcea48cecc4744cd7be4a0556439f5ea20b64ff 100644 (file)
@@ -168,27 +168,32 @@ SegmentAllocator::roll()
   });
 }
 
-SegmentAllocator::write_ret
+journal_seq_t
+SegmentAllocator::get_written_to() const
+{
+  return journal_seq_t{
+    segment_provider.get_seg_info(
+      current_segment->get_segment_id()).seq,
+    paddr_t::make_seg_paddr(
+      current_segment->get_segment_id(),
+      written_to)
+  };
+}
+
+SegmentAllocator::write_ertr::future<>
 SegmentAllocator::write(ceph::bufferlist&& to_write)
 {
   LOG_PREFIX(SegmentAllocator::write);
   assert(can_write());
   auto write_length = to_write.length();
   auto write_start_offset = written_to;
-  auto write_start_seq = journal_seq_t{
-    segment_provider.get_seg_info(current_segment->get_segment_id()).seq,
-    paddr_t::make_seg_paddr(
-      current_segment->get_segment_id(), write_start_offset)
-  };
-  TRACE("{} {}~{}", print_name, write_start_seq, write_length);
+  if (unlikely(LOCAL_LOGGER.is_enabled(seastar::log_level::trace))) {
+    TRACE("{} {}~{}", print_name, get_written_to(), write_length);
+  }
   assert(write_length > 0);
   assert((write_length % get_block_size()) == 0);
   assert(!needs_roll(write_length));
 
-  auto write_result = write_result_t{
-    write_start_seq,
-    write_length
-  };
   written_to += write_length;
   segment_provider.update_segment_avail_bytes(
     type,
@@ -202,9 +207,7 @@ SegmentAllocator::write(ceph::bufferlist&& to_write)
     crimson::ct_error::assert_all{
       "Invalid error in SegmentAllocator::write"
     }
-  ).safe_then([write_result, cs=current_segment] {
-    return write_result;
-  });
+  ).finally([cs=current_segment] {});
 }
 
 SegmentAllocator::close_ertr::future<>
index 292c23070ba74e57925a080ae55b67d5af96b5f0..f86c7be048da9c79337b842489d7bcc6eedf27cd 100644 (file)
@@ -84,11 +84,13 @@ class SegmentAllocator : public JournalAllocator {
   // close the current segment and initialize next one
   roll_ertr::future<> roll() final;
 
+  journal_seq_t get_written_to() const final;
+
   // write the buffer, return the write result
   //
   // May be called concurrently, but writes may complete in any order.
   // If rolling/opening, no write is allowed.
-  write_ret write(ceph::bufferlist&& to_write) final;
+  write_ertr::future<> write(ceph::bufferlist&& to_write) final;
 
   using close_ertr = base_ertr;
   close_ertr::future<> close() final;