return ret;
}
-record_t Cache::prepare_record(Transaction &t)
+record_t Cache::prepare_record(
+ Transaction &t,
+ SegmentProvider *cleaner)
{
LOG_PREFIX(Cache::prepare_record);
SUBTRACET(seastore_t, "enter", t);
0,
0,
t.root->get_version() - 1,
+ MAX_SEG_SEQ,
std::move(delta_bl)
});
} else {
final_crc,
(seastore_off_t)i->get_length(),
i->get_version() - 1,
+ cleaner
+ ? cleaner->get_seq(i->get_paddr().as_seg_paddr().get_segment_id())
+ : MAX_SEG_SEQ,
std::move(delta_bl)
});
i->last_committed_crc = final_crc;
return;
}
- TRACE("replay extent delta at {} {} ... -- {}, prv_extent={}",
+ DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}",
journal_seq, record_base, delta, *extent);
assert(extent->version == delta.pversion);
i != dirty.end() && bytes_so_far < max_bytes;
++i) {
auto dirty_from = i->get_dirty_from();
+ if (!(dirty_from != JOURNAL_SEQ_NULL &&
+ dirty_from != JOURNAL_SEQ_MAX &&
+ dirty_from != NO_DELTAS))
+ ERRORT("{}", *i);
ceph_assert(dirty_from != JOURNAL_SEQ_NULL &&
dirty_from != JOURNAL_SEQ_MAX &&
dirty_from != NO_DELTAS);
* Construct the record for Journal from transaction.
*/
record_t prepare_record(
- Transaction &t ///< [in, out] current transaction
+ Transaction &t, ///< [in, out] current transaction
+ SegmentProvider *cleaner
);
/**
SegmentedAllocator::SegmentedAllocator(
SegmentProvider& sp,
- SegmentManager& sm)
- : cold_writer{"COLD", sp, sm},
- rewrite_writer{"REWRITE", sp, sm}
-{
-}
+ SegmentManager& sm,
+ SegmentSeqAllocator &ssa)
+ : cold_writer{"COLD", sp, sm, ssa},
+ rewrite_writer{"REWRITE", sp, sm, ssa}
+{}
SegmentedAllocator::Writer::Writer(
std::string name,
SegmentProvider& sp,
- SegmentManager& sm)
- : segment_allocator(name, segment_type_t::OOL, sp, sm),
+ SegmentManager& sm,
+ SegmentSeqAllocator &ssa)
+ : segment_allocator(name, segment_type_t::OOL, sp, sm, ssa),
record_submitter(crimson::common::get_conf<uint64_t>(
"seastore_journal_iodepth_limit"),
crimson::common::get_conf<uint64_t>(
return record_submitter.submit(std::move(record)
).safe_then([this, FNAME, &t, extents=std::move(extents)
](record_locator_t ret) mutable {
- assert(ret.write_result.start_seq.segment_seq == OOL_SEG_SEQ);
DEBUGT("{} finish with {} and {} extents",
t, segment_allocator.get_name(),
ret, extents.size());
class SegmentedAllocator : public ExtentAllocator {
class Writer : public ExtentOolWriter {
public:
- Writer(std::string name, SegmentProvider& sp, SegmentManager& sm);
+ Writer(std::string name,
+ SegmentProvider& sp,
+ SegmentManager& sm,
+ SegmentSeqAllocator &ssa);
Writer(Writer &&) = default;
open_ertr::future<> open() final {
public:
SegmentedAllocator(
SegmentProvider& sp,
- SegmentManager& sm);
+ SegmentManager& sm,
+ SegmentSeqAllocator &ssa);
Writer &get_writer(placement_hint_t hint) {
assert(hint >= placement_hint_t::COLD);
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/os/seastore/segment_seq_allocator.h"
namespace crimson::os::seastore {
virtual replay_ret replay(
delta_handler_t &&delta_handler) = 0;
+ virtual SegmentSeqAllocator& get_segment_seq_allocator() = 0;
+
virtual ~Journal() {}
};
using JournalRef = std::unique_ptr<Journal>;
std::string name,
segment_type_t type,
SegmentProvider &sp,
- SegmentManager &sm)
+ SegmentManager &sm,
+ SegmentSeqAllocator &ssa)
: name{name},
type{type},
segment_provider{sp},
- segment_manager{sm}
+ segment_manager{sm},
+ segment_seq_allocator(ssa)
{
ceph_assert(type != segment_type_t::NULL_SEG);
std::ostringstream oss;
reset();
}
-void SegmentAllocator::set_next_segment_seq(segment_seq_t seq)
-{
- LOG_PREFIX(SegmentAllocator::set_next_segment_seq);
- INFO("{} next_segment_seq={}",
- print_name, segment_seq_printer_t{seq});
- assert(type == segment_seq_to_type(seq));
- next_segment_seq = seq;
-}
-
SegmentAllocator::open_ret
SegmentAllocator::do_open()
{
LOG_PREFIX(SegmentAllocator::do_open);
ceph_assert(!current_segment);
- segment_seq_t new_segment_seq = get_new_segment_seq_and_increment();
+ segment_seq_t new_segment_seq =
+ segment_seq_allocator.get_and_inc_next_segment_seq();
+ auto meta = segment_manager.get_meta();
+ current_segment_nonce = ceph_crc32c(
+ new_segment_seq,
+ reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
+ sizeof(meta.seastore_id.uuid));
auto new_segment_id = segment_provider.get_segment(
- get_device_id(), new_segment_seq);
+ get_device_id(), new_segment_seq, type);
return segment_manager.open(new_segment_id
).handle_error(
open_ertr::pass_further{},
new_segment_seq,
segment_id,
new_journal_tail,
- current_segment_nonce};
+ current_segment_nonce,
+ type};
INFO("{} writing header to new segment ... -- {}",
print_name, header);
}
DEBUG("{} rolled new segment id={}",
print_name, current_segment->get_segment_id());
- ceph_assert(new_journal_seq.segment_seq == get_current_segment_seq());
+ ceph_assert(new_journal_seq.segment_seq ==
+ segment_provider.get_seq(current_segment->get_segment_id()));
return new_journal_seq;
});
});
auto write_length = to_write.length();
auto write_start_offset = written_to;
auto write_start_seq = journal_seq_t{
- get_current_segment_seq(),
+ segment_provider.get_seq(current_segment->get_segment_id()),
paddr_t::make_seg_paddr(
current_segment->get_segment_id(), write_start_offset)
};
// Note: make sure no one can access the current segment once closing
auto seg_to_close = std::move(current_segment);
auto close_segment_id = seg_to_close->get_segment_id();
- INFO("{} close segment id={}, seq={}, written_to={}, nonce={}",
- print_name,
- close_segment_id,
- segment_seq_printer_t{get_current_segment_seq()},
- written_to,
- current_segment_nonce);
if (is_rolling) {
segment_provider.close_segment(close_segment_id);
}
+ segment_seq_t cur_segment_seq =
+ segment_provider.get_seq(seg_to_close->get_segment_id());
journal_seq_t cur_journal_tail;
if (type == segment_type_t::JOURNAL) {
cur_journal_tail = segment_provider.get_journal_tail_target();
cur_journal_tail = NO_DELTAS;
}
auto tail = segment_tail_t{
- get_current_segment_seq(),
+ segment_provider.get_seq(close_segment_id),
close_segment_id,
cur_journal_tail,
current_segment_nonce,
+ type,
segment_provider.get_last_modified(
close_segment_id).time_since_epoch().count(),
segment_provider.get_last_rewritten(
close_segment_id).time_since_epoch().count()};
ceph::bufferlist bl;
encode(tail, bl);
+ INFO("{} close segment id={}, seq={}, written_to={}, nonce={}, journal_tail={}",
+ print_name,
+ close_segment_id,
+ cur_segment_seq,
+ written_to,
+ current_segment_nonce,
+ tail.journal_tail);
bufferptr bp(
ceph::buffer::create_page_aligned(
#include "crimson/common/errorator.h"
#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/segment_seq_allocator.h"
namespace crimson::os::seastore {
class SegmentProvider;
SegmentAllocator(std::string name,
segment_type_t type,
SegmentProvider &sp,
- SegmentManager &sm);
+ SegmentManager &sm,
+ SegmentSeqAllocator &ssa);
const std::string& get_name() const {
return print_name;
return written_to;
}
- void set_next_segment_seq(segment_seq_t);
-
// returns true iff the current segment has insufficient space
bool needs_roll(std::size_t length) const {
assert(can_write());
current_segment.reset();
written_to = 0;
- // segment type related special handling
- reset_segment_seq();
current_segment_nonce = 0;
}
using close_segment_ertr = base_ertr;
close_segment_ertr::future<> close_segment(bool is_rolling);
- /*
- * segment type related special handling
- */
-
- void reset_segment_seq() {
- if (type == segment_type_t::JOURNAL) {
- next_segment_seq = 0;
- } else { // OOL
- next_segment_seq = OOL_SEG_SEQ;
- }
- }
-
- segment_seq_t get_current_segment_seq() const {
- segment_seq_t ret;
- if (type == segment_type_t::JOURNAL) {
- assert(next_segment_seq != 0);
- ret = next_segment_seq - 1;
- } else { // OOL
- ret = next_segment_seq;
- }
- assert(segment_seq_to_type(ret) == type);
- return ret;
- }
-
- segment_seq_t get_new_segment_seq_and_increment() {
- segment_seq_t new_segment_seq;
- if (type == segment_type_t::JOURNAL) {
- new_segment_seq = next_segment_seq++;
- auto meta = segment_manager.get_meta();
- current_segment_nonce = ceph_crc32c(
- new_segment_seq,
- reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
- sizeof(meta.seastore_id.uuid));
- } else { // OOL
- new_segment_seq = next_segment_seq;
- assert(current_segment_nonce == 0);
- }
- assert(new_segment_seq == get_current_segment_seq());
- ceph_assert(segment_seq_to_type(new_segment_seq) == type);
- return new_segment_seq;
- }
-
const std::string name;
// device id is not available during construction,
// so generate the print_name later.
SegmentManager &segment_manager;
SegmentRef current_segment;
seastore_off_t written_to;
-
- // segment type related special handling
- segment_seq_t next_segment_seq;
+ SegmentSeqAllocator &segment_seq_allocator;
segment_nonce_t current_segment_nonce;
//3. journal tail written to both segment_header_t and segment_tail_t
};
ExtentReader &scanner,
SegmentProvider &segment_provider)
: segment_provider(segment_provider),
+ segment_seq_allocator(new SegmentSeqAllocator),
journal_segment_allocator("JOURNAL",
segment_type_t::JOURNAL,
segment_provider,
- segment_manager),
+ segment_manager,
+ *segment_seq_allocator),
record_submitter(crimson::common::get_conf<uint64_t>(
"seastore_journal_iodepth_limit"),
crimson::common::get_conf<uint64_t>(
rt.second.journal_segment_seq;
});
- journal_segment_allocator.set_next_segment_seq(
+ segment_seq_allocator->set_next_segment_seq(
segments.rbegin()->second.journal_segment_seq + 1);
std::for_each(
segments.begin(),
INFO("starting at {} -- {}", seq, header);
return seastar::do_with(
scan_valid_records_cursor(seq),
- ExtentReader::found_record_handler_t([=, &handler](
+ ExtentReader::found_record_handler_t(
+ [s_type=header.type, &handler, this](
record_locator_t locator,
const record_group_header_t& header,
const bufferlist& mdbuf)
-> ExtentReader::scan_valid_records_ertr::future<>
{
+ LOG_PREFIX(Journal::replay_segment);
auto maybe_record_deltas_list = try_decode_deltas(
header, mdbuf, locator.record_block_base);
if (!maybe_record_deltas_list) {
return seastar::do_with(
std::move(*maybe_record_deltas_list),
[write_result=locator.write_result,
+ s_type,
this,
FNAME,
&handler](auto& record_deltas_list)
return crimson::do_for_each(
record_deltas_list,
[write_result,
+ s_type,
this,
FNAME,
&handler](record_deltas_t& record_deltas)
return crimson::do_for_each(
record_deltas.deltas,
[locator,
+ s_type,
this,
FNAME,
&handler](auto &p)
if (delta.paddr != P_ADDR_NULL) {
auto& seg_addr = delta.paddr.as_seg_paddr();
auto delta_paddr_segment_seq = segment_provider.get_seq(seg_addr.get_segment_id());
- auto delta_paddr_segment_type = segment_seq_to_type(delta_paddr_segment_seq);
- auto locator_segment_seq = locator.write_result.start_seq.segment_seq;
- if (delta_paddr_segment_type == segment_type_t::NULL_SEG ||
- (delta_paddr_segment_type == segment_type_t::JOURNAL &&
- delta_paddr_segment_seq > locator_segment_seq)) {
+ if (s_type == segment_type_t::NULL_SEG ||
+ (delta_paddr_segment_seq != delta.ext_seq)) {
SUBDEBUG(seastore_cache,
- "delta is obsolete, delta_paddr_segment_seq={}, locator_segment_seq={} -- {}",
+ "delta is obsolete, delta_paddr_segment_seq={}, -- {}",
segment_seq_printer_t{delta_paddr_segment_seq},
- segment_seq_printer_t{locator_segment_seq},
delta);
+ assert(delta_paddr_segment_seq > delta.ext_seq);
return replay_ertr::now();
}
}
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/osd/exceptions.h"
#include "segment_allocator.h"
+#include "crimson/os/seastore/segment_seq_allocator.h"
namespace crimson::os::seastore::journal {
-
/**
* Manages stream of atomically written records to a SegmentManager.
*/
write_pipeline = _write_pipeline;
}
+ SegmentSeqAllocator& get_segment_seq_allocator() final {
+ return *segment_seq_allocator;
+ }
private:
submit_record_ret do_submit_record(
record_t &&record,
);
SegmentProvider& segment_provider;
+ SegmentSeqAllocatorRef segment_seq_allocator;
SegmentAllocator journal_segment_allocator;
RecordSubmitter record_submitter;
ExtentReader& scanner;
}
}
-segment_type_t segment_seq_to_type(segment_seq_t seq)
-{
- if (seq <= MAX_VALID_SEG_SEQ) {
- return segment_type_t::JOURNAL;
- } else if (seq == OOL_SEG_SEQ) {
- return segment_type_t::OOL;
- } else {
- assert(seq == NULL_SEG_SEQ);
- return segment_type_t::NULL_SEG;
- }
-}
-
std::ostream& operator<<(std::ostream& out, segment_seq_printer_t seq)
{
- auto type = segment_seq_to_type(seq.seq);
- switch(type) {
- case segment_type_t::JOURNAL:
+ if (seq.seq == NULL_SEG_SEQ) {
+ return out << "NULL_SEG_SEQ";
+ } else {
+ assert(seq.seq <= MAX_VALID_SEG_SEQ);
return out << seq.seq;
- default:
- return out << type;
}
}
<< ", final_crc: " << delta.final_crc
<< ", length: " << delta.length
<< ", pversion: " << delta.pversion
+ << ", ext_seq: " << delta.ext_seq
<< ")";
}
<< ", segment_id=" << header.physical_segment_id
<< ", journal_tail=" << header.journal_tail
<< ", segment_nonce=" << header.segment_nonce
+ << ", type=" << header.type
<< ")";
}
static constexpr segment_seq_t MAX_SEG_SEQ =
std::numeric_limits<segment_seq_t>::max();
static constexpr segment_seq_t NULL_SEG_SEQ = MAX_SEG_SEQ;
-static constexpr segment_seq_t OOL_SEG_SEQ = MAX_SEG_SEQ - 1;
static constexpr segment_seq_t MAX_VALID_SEG_SEQ = MAX_SEG_SEQ - 2;
enum class segment_type_t {
std::ostream& operator<<(std::ostream& out, segment_type_t t);
-segment_type_t segment_seq_to_type(segment_seq_t seq);
-
struct segment_seq_printer_t {
segment_seq_t seq;
};
return {segment_seq, offset.add_offset(o)};
}
- segment_type_t get_type() const {
- return segment_seq_to_type(segment_seq);
- }
-
DENC(journal_seq_t, v, p) {
DENC_START(1, 1, p);
denc(v.segment_seq, p);
uint32_t final_crc = 0;
seastore_off_t length = NULL_SEG_OFF; ///< extent length
extent_version_t pversion; ///< prior version
+ segment_seq_t ext_seq; ///< seq of the extent's segment
ceph::bufferlist bl; ///< payload
DENC(delta_info_t, v, p) {
denc(v.final_crc, p);
denc(v.length, p);
denc(v.pversion, p);
+ denc(v.ext_seq, p);
denc(v.bl, p);
DENC_FINISH(p);
}
final_crc == rhs.final_crc &&
length == rhs.length &&
pversion == rhs.pversion &&
+ ext_seq == rhs.ext_seq &&
bl == rhs.bl
);
}
journal_seq_t journal_tail;
segment_nonce_t segment_nonce;
+ segment_type_t type;
+
segment_type_t get_type() const {
- return segment_seq_to_type(journal_segment_seq);
+ return type;
}
DENC(segment_header_t, v, p) {
denc(v.physical_segment_id, p);
denc(v.journal_tail, p);
denc(v.segment_nonce, p);
+ denc(v.type, p);
DENC_FINISH(p);
}
};
journal_seq_t journal_tail;
segment_nonce_t segment_nonce;
+
+ segment_type_t type;
+
mod_time_point_t last_modified;
mod_time_point_t last_rewritten;
+ segment_type_t get_type() const {
+ return type;
+ }
+
DENC(segment_tail_t, v, p) {
DENC_START(1, 1, p);
denc(v.journal_segment_seq, p);
denc(v.physical_segment_id, p);
denc(v.journal_tail, p);
denc(v.segment_nonce, p);
+ denc(v.type, p);
denc(v.last_modified, p);
denc(v.last_rewritten, p);
DENC_FINISH(p);
reinterpret_cast<char*>(&o));
}
};
+
+template<>
+struct denc_traits<crimson::os::seastore::segment_type_t> {
+ static constexpr bool supported = true;
+ static constexpr bool featured = false;
+ static constexpr bool bounded = true;
+ static constexpr bool need_contiguous = false;
+
+ static void bound_encode(
+ const crimson::os::seastore::segment_type_t &o,
+ size_t& p,
+ uint64_t f=0) {
+ p += sizeof(crimson::os::seastore::segment_type_t);
+ }
+ template<class It>
+ static std::enable_if_t<!is_const_iterator_v<It>>
+ encode(
+ const crimson::os::seastore::segment_type_t &o,
+ It& p,
+ uint64_t f=0) {
+ get_pos_add<crimson::os::seastore::segment_type_t>(p) = o;
+ }
+ template<class It>
+ static std::enable_if_t<is_const_iterator_v<It>>
+ decode(
+ crimson::os::seastore::segment_type_t& o,
+ It& p,
+ uint64_t f=0) {
+ o = get_pos_add<crimson::os::seastore::segment_type_t>(p);
+ }
+ static void decode(
+ crimson::os::seastore::segment_type_t& o,
+ ceph::buffer::list::const_iterator &p) {
+ p.copy(sizeof(crimson::os::seastore::segment_type_t),
+ reinterpret_cast<char*>(&o));
+ }
+};
void segment_info_set_t::segment_info_t::set_open(segment_seq_t seq) {
assert(state == Segment::segment_state_t::EMPTY);
- assert(segment_seq_to_type(seq) != segment_type_t::NULL_SEG);
+ assert(seq != NULL_SEG_SEQ);
state = Segment::segment_state_t::OPEN;
journal_segment_seq = seq;
}
}
segment_id_t SegmentCleaner::get_segment(
- device_id_t device_id, segment_seq_t seq)
+ device_id_t device_id,
+ segment_seq_t seq,
+ segment_type_t type)
{
LOG_PREFIX(SegmentCleaner::get_segment);
- assert(segment_seq_to_type(seq) != segment_type_t::NULL_SEG);
+ assert(seq != NULL_SEG_SEQ);
for (auto it = segments.device_begin(device_id);
it != segments.device_end(device_id);
++it) {
auto& segment_info = it->second;
if (segment_info.is_empty()) {
DEBUG("returning segment {} {}", seg_id, segment_seq_printer_t{seq});
- mark_open(seg_id, seq);
+ mark_open(seg_id, seq, type);
return seg_id;
}
}
void SegmentCleaner::close_segment(segment_id_t segment)
{
- ceph_assert(segment_seq_to_type(segments[segment].journal_segment_seq) !=
- segment_type_t::NULL_SEG);
+ ceph_assert(segments[segment].journal_segment_seq != NULL_SEG_SEQ);
mark_closed(segment);
}
}
init_mark_segment_closed(
segment_id,
- header.journal_segment_seq);
+ header.journal_segment_seq,
+ header.type);
return seastar::now();
}).handle_error(
crimson::ct_error::enodata::handle(
}).safe_then([this, segment_id, header](auto) {
init_mark_segment_closed(
segment_id,
- header.journal_segment_seq);
+ header.journal_segment_seq,
+ header.type);
return seastar::now();
});
} else if (header.get_type() == segment_type_t::JOURNAL) {
}
init_mark_segment_closed(
segment_id,
- header.journal_segment_seq);
+ header.journal_segment_seq,
+ header.type);
return seastar::now();
}
// Will be non-null for any segments in the current journal
segment_seq_t journal_segment_seq = NULL_SEG_SEQ;
+ segment_type_t type = segment_type_t::NULL_SEG;
+
seastar::lowres_system_clock::time_point last_modified;
seastar::lowres_system_clock::time_point last_rewritten;
segment_type_t get_type() const {
- return segment_seq_to_type(journal_segment_seq);
+ return type;
}
void set_open(segment_seq_t);
class SegmentProvider {
public:
virtual segment_id_t get_segment(
- device_id_t id, segment_seq_t seq) = 0;
+ device_id_t id, segment_seq_t seq, segment_type_t type) = 0;
virtual void close_segment(segment_id_t) {}
mount_ret mount(device_id_t pdevice_id, std::vector<SegmentManager*>& sms);
segment_id_t get_segment(
- device_id_t id, segment_seq_t seq) final;
+ device_id_t id, segment_seq_t seq, segment_type_t type) final;
void close_segment(segment_id_t segment) final;
void init_mark_segment_closed(
segment_id_t segment,
- segment_seq_t seq) {
+ segment_seq_t seq,
+ segment_type_t s_type) {
crimson::get_logger(ceph_subsys_seastore_cleaner).debug(
"SegmentCleaner::init_mark_segment_closed: segment {}, seq {}",
segment,
segment_seq_printer_t{seq});
- ceph_assert(segment_seq_to_type(seq) != segment_type_t::NULL_SEG);
mark_closed(segment);
segments[segment].journal_segment_seq = seq;
- auto s_type = segments[segment].get_type();
assert(s_type != segment_type_t::NULL_SEG);
+ segments[segment].type = s_type;
if (s_type == segment_type_t::JOURNAL) {
assert(journal_device_id == segment.device_id());
segments.new_journal_segment();
maybe_wake_gc_blocked_io();
}
- void mark_open(segment_id_t segment, segment_seq_t seq) {
+ void mark_open(segment_id_t segment, segment_seq_t seq, segment_type_t s_type) {
assert(segment.device_id() ==
segments[segment.device_id()]->device_id);
assert(segment.device_segment_id() <
segments.segment_opened(segment);
auto& segment_info = segments[segment];
segment_info.set_open(seq);
+ segment_info.type = s_type;
- auto s_type = segment_info.get_type();
ceph_assert(s_type != segment_type_t::NULL_SEG);
if (s_type == segment_type_t::JOURNAL) {
segments.new_journal_segment();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/seastore_types.h"
+
+namespace crimson::os::seastore::journal {
+class SegmentedJournal;
+}
+
+namespace crimson::os::seastore {
+
+class SegmentSeqAllocator {
+public:
+ segment_seq_t get_and_inc_next_segment_seq() {
+ return next_segment_seq++;
+ }
+private:
+ void set_next_segment_seq(segment_seq_t seq) {
+ LOG_PREFIX(SegmentSeqAllocator::set_next_segment_seq);
+ SUBINFO(seastore_journal, "next_segment_seq={}", segment_seq_printer_t{seq});
+ next_segment_seq = seq;
+ }
+ segment_seq_t next_segment_seq = 0;
+ friend class journal::SegmentedJournal;
+};
+
+using SegmentSeqAllocatorRef =
+ std::unique_ptr<SegmentSeqAllocator>;
+
+};
return tref.get_handle().enter(write_pipeline.prepare);
}).si_then([this, FNAME, &tref]() mutable
-> submit_transaction_iertr::future<> {
- auto record = cache->prepare_record(tref);
+ auto record = cache->prepare_record(tref, segment_cleaner.get());
tref.get_handle().maybe_release_collection_lock();
device_type_t::SEGMENTED,
std::make_unique<SegmentedAllocator>(
*segment_cleaner,
- *sm));
+ *sm,
+ journal->get_segment_seq_allocator()));
}
~TransactionManager();
btree_test_base() = default;
+ std::map<segment_id_t, segment_seq_t> segment_seqs;
+
+
+
seastar::lowres_system_clock::time_point get_last_modified(
segment_id_t id) const final {
return seastar::lowres_system_clock::time_point();
}
void update_segment_avail_bytes(paddr_t offset) final {}
- segment_id_t get_segment(device_id_t id, segment_seq_t seq) final {
+ segment_id_t get_segment(
+ device_id_t id,
+ segment_seq_t seq,
+ segment_type_t) final
+ {
auto ret = next;
next = segment_id_t{
next.device_id(),
next.device_segment_id() + 1};
+ segment_seqs[ret] = seq;
return ret;
}
+ segment_seq_t get_seq(segment_id_t id) {
+ return segment_seqs[id];
+ }
+
journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }
void update_journal_tail_committed(journal_seq_t committed) final {}
virtual void complete_commit(Transaction &t) {}
seastar::future<> submit_transaction(TransactionRef t)
{
- auto record = cache->prepare_record(*t);
+ auto record = cache->prepare_record(*t, this);
return journal->submit_record(std::move(record), t->get_handle()).safe_then(
[this, t=std::move(t)](auto submit_result) mutable {
cache->complete_commit(
seastar::future<paddr_t> submit_transaction(
TransactionRef t) {
- auto record = cache->prepare_record(*t);
+ auto record = cache->prepare_record(*t, nullptr);
bufferlist bl;
for (auto &&block : record.extents) {
segment_id_t next;
+ std::map<segment_id_t, segment_seq_t> segment_seqs;
+
journal_test_t() = default;
seastar::lowres_system_clock::time_point get_last_modified(
void update_segment_avail_bytes(paddr_t offset) final {}
- segment_id_t get_segment(device_id_t id, segment_seq_t seq) final {
+ segment_id_t get_segment(
+ device_id_t id,
+ segment_seq_t seq,
+ segment_type_t) final
+ {
auto ret = next;
next = segment_id_t{
next.device_id(),
next.device_segment_id() + 1};
+ segment_seqs[ret] = seq;
return ret;
}
+ segment_seq_t get_seq(segment_id_t id) {
+ return segment_seqs[id];
+ }
+
journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }
void update_journal_tail_committed(journal_seq_t paddr) final {}
0, 0,
block_size,
1,
+ MAX_SEG_SEQ,
bl
};
}