void Cache::complete_commit(
Transaction &t,
- paddr_t final_block_start)
+ paddr_t final_block_start,
+ journal_seq_t seq)
{
if (t.root) {
remove_extent(root);
* and mutated exents.
*/
void complete_commit(
- Transaction &t, ///< [in, out] current transaction
- paddr_t final_block_start ///< [in] offset of initial block
+ Transaction &t, ///< [in, out] current transaction
+ paddr_t final_block_start, ///< [in] offset of initial block
+ journal_seq_t seq ///< [in] journal commit seq
);
/**
record_header_t header{
rsize.mdlength,
rsize.dlength,
- current_journal_seq,
0 /* checksum, TODO */,
record.deltas.size(),
record.extents.size()
return metadatabl;
}
-Journal::write_record_ertr::future<> Journal::write_record(
+Journal::write_record_ret Journal::write_record(
record_size_t rsize,
record_t &&record)
{
rsize.dlength);
return current_journal_segment->write(target, to_write).handle_error(
write_record_ertr::pass_further{},
- crimson::ct_error::assert_all{ "TODO" });
+ crimson::ct_error::assert_all{ "TODO" }).safe_then([this, target] {
+ return write_record_ret(
+ write_record_ertr::ready_future_marker{},
+ paddr_t{
+ current_journal_segment->get_segment_id(),
+ target});
+ });
}
Journal::record_size_t Journal::get_encoded_record_length(
current_journal_segment->get_write_capacity();
}
-paddr_t Journal::next_record_addr() const
-{
- paddr_t ret{current_journal_segment->get_segment_id(), written_to};
- logger().debug("next_record_addr: {}", ret);
- return ret;
-}
-
Journal::roll_journal_segment_ertr::future<>
Journal::roll_journal_segment()
{
namespace crimson::os::seastore {
-using journal_seq_t = uint64_t;
-static constexpr journal_seq_t NO_DELTAS =
- std::numeric_limits<journal_seq_t>::max();
-
/**
* Segment header
*
// Fixed portion
extent_len_t mdlength; // block aligned, length of metadata
extent_len_t dlength; // block aligned, length of data
- journal_seq_t seq; // current journal seqid
checksum_t full_checksum; // checksum for full record (TODO)
size_t deltas; // number of deltas
size_t extents; // number of extents
DENC_START(1, 1, p);
denc(v.mdlength, p);
denc(v.dlength, p);
- denc(v.seq, p);
denc(v.full_checksum, p);
denc(v.deltas, p);
denc(v.extents, p);
close_ertr::future<> close() { return close_ertr::now(); }
/**
- * write_record
+ * submit_record
*
- * @param write record and returns offset of first block
+ * @param write record and returns offset of first block and seq
*/
using submit_record_ertr = crimson::errorator<
crimson::ct_error::erange,
crimson::ct_error::input_output_error
>;
- using submit_record_ret = submit_record_ertr::future<paddr_t>;
+ using submit_record_ret = submit_record_ertr::future<
+ std::pair<paddr_t, journal_seq_t>
+ >;
submit_record_ret submit_record(record_t &&record) {
auto rsize = get_encoded_record_length(record);
auto total = rsize.mdlength + rsize.dlength;
: roll_journal_segment_ertr::now();
return roll.safe_then(
[this, rsize, record=std::move(record)]() mutable {
- auto ret = next_record_addr();
return write_record(rsize, std::move(record)
- ).safe_then([this, ret] {
- return ret.add_offset(block_size);
+ ).safe_then([this](auto addr) {
+ return std::make_pair(
+ addr.add_offset(block_size),
+ get_journal_seq(addr));
});
});
}
segment_off_t written_to = 0;
segment_id_t next_journal_segment_seq = NULL_SEG_ID;
- journal_seq_t current_journal_seq = 0;
+
+ journal_seq_t get_journal_seq(paddr_t addr) {
+ return journal_seq_t{current_journal_segment_seq, addr};
+ }
/// prepare segment for writes, writes out segment header
using initialize_segment_ertr = crimson::errorator<
/// do record write
using write_record_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
- write_record_ertr::future<> write_record(
+ using write_record_ret = write_record_ertr::future<paddr_t>;
+ write_record_ret write_record(
record_size_t rsize,
record_t &&record);
/// returns true iff current segment has insufficient space
bool needs_roll(segment_off_t length) const;
- /// returns next record addr
- paddr_t next_record_addr() const;
-
/// return ordered vector of segments to replay
using find_replay_segments_ertr = crimson::errorator<
crimson::ct_error::input_output_error
return out << ">";
}
+std::ostream &operator<<(std::ostream &out, const journal_seq_t &seq)
+{
+ return out << "journal_seq_t(segment_seq="
+ << seq.segment_seq << ", offset="
+ << seq.offset
+ << ")";
+}
+
std::ostream &operator<<(std::ostream &out, extent_types_t t)
{
switch (t) {
using objaddr_t = uint32_t;
constexpr objaddr_t OBJ_ADDR_MIN = std::numeric_limits<objaddr_t>::min();
+/* Monotonically increasing identifier for the location of a
+ * journal_record.
+ */
+struct journal_seq_t {
+ segment_seq_t segment_seq = 0;
+ paddr_t offset;
+
+ DENC(journal_seq_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.segment_seq, p);
+ denc(v.offset, p);
+ DENC_FINISH(p);
+ }
+};
+WRITE_CMP_OPERATORS_2(journal_seq_t, segment_seq, offset)
+WRITE_EQ_OPERATORS_2(journal_seq_t, segment_seq, offset)
+
+std::ostream &operator<<(std::ostream &out, const journal_seq_t &seq);
+
+static constexpr journal_seq_t NO_DELTAS = journal_seq_t{
+ NULL_SEG_SEQ,
+ P_ADDR_NULL
+};
+
// logical addr, see LBAManager, TransactionManager
using laddr_t = uint64_t;
constexpr laddr_t L_ADDR_MIN = std::numeric_limits<laddr_t>::min();
}
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal_seq_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t)
logger().debug("TransactionManager::submit_transaction");
return journal.submit_record(std::move(*record)).safe_then(
- [this, t=std::move(t)](paddr_t addr) mutable {
- cache.complete_commit(*t, addr);
+ [this, t=std::move(t)](auto p) mutable {
+ auto [addr, journal_seq] = p;
+ cache.complete_commit(*t, addr, journal_seq);
lba_manager.complete_transaction(*t);
},
submit_transaction_ertr::pass_further{},
}
return journal.submit_record(std::move(*record)).safe_then(
- [this, t=std::move(t)](paddr_t addr) mutable {
- cache.complete_commit(*t, addr);
+ [this, t=std::move(t)](auto p) mutable {
+ auto [addr, seq] = p;
+ cache.complete_commit(*t, addr, seq);
lba_manager->complete_transaction(*t);
},
crimson::ct_error::all_same_way([](auto e) {
segment_manager::EphemeralSegmentManager segment_manager;
Cache cache;
paddr_t current{0, 0};
+ journal_seq_t seq;
cache_test_t()
: segment_manager(segment_manager::DEFAULT_TEST_EPHEMERAL),
true
).safe_then(
[this, prev, t=std::move(t)]() mutable {
- cache.complete_commit(*t, prev);
+ cache.complete_commit(*t, prev, seq /* TODO */);
return seastar::make_ready_future<std::optional<paddr_t>>(prev);
},
crimson::ct_error::all_same_way([](auto e) {
auto submit_record(T&&... _record) {
auto record{std::forward<T>(_record)...};
records.push_back(record);
- auto addr = journal->submit_record(std::move(record)).unsafe_get0();
+ auto [addr, _] = journal->submit_record(std::move(record)).unsafe_get0();
records.back().record_final_offset = addr;
return addr;
}