head.dirty_tail =
journal_seq_t{0,
convert_abs_addr_to_paddr(
- get_start_addr(),
+ get_records_start(),
device->get_device_id())};
head.alloc_tail = head.dirty_tail;
encode(head, bl);
assert(written_to.segment_seq != NULL_SEG_SEQ);
auto r_size = record_group_size_t(record.size, get_block_size());
auto encoded_size = r_size.get_encoded_length();
- if (encoded_size > get_available_size()) {
+ if (encoded_size > get_records_available_size()) {
ERROR("record size {}, but available size {}",
- encoded_size, get_available_size());
+ encoded_size, get_records_available_size());
return crimson::ct_error::erange::make();
}
if (encoded_size + get_rbm_addr(get_written_to()) > get_journal_end()) {
DEBUG("roll");
paddr_t paddr = convert_abs_addr_to_paddr(
- get_start_addr(),
+ get_records_start(),
get_device_id());
set_written_to(
journal_seq_t{++written_to.segment_seq, paddr});
- if (encoded_size > get_available_size()) {
+ if (encoded_size > get_records_available_size()) {
ERROR("rolled, record size {}, but available size {}",
- encoded_size, get_available_size());
+ encoded_size, get_records_available_size());
return crimson::ct_error::erange::make();
}
}
assert(new_written_to == get_journal_end());
DEBUG("roll");
paddr_t paddr = convert_abs_addr_to_paddr(
- get_start_addr(),
+ get_records_start(),
get_device_id());
set_written_to(
journal_seq_t{++written_to.segment_seq, paddr});
r_size,
FNAME] {
DEBUG("commit target {} used_size {} written length {}",
- target, get_used_size(), length);
+ target, get_records_used_size(), length);
paddr_t paddr = convert_abs_addr_to_paddr(
target + r_size.get_mdlength(),
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
} else {
- cursor_addr = get_start_addr();
+ cursor_addr = get_records_start();
++expected_seq;
is_rolled = true;
return replay_ertr::make_ready_future<
cursor_addr += bl.length();
if (cursor_addr >= get_journal_end()) {
assert(cursor_addr == get_journal_end());
- cursor_addr = get_start_addr();
+ cursor_addr = get_records_start();
++expected_seq;
is_rolled = true;
}
*
*/
- size_t get_used_size() const {
- auto rbm_written_to = get_rbm_addr(get_written_to());
- auto rbm_tail = get_rbm_addr(get_dirty_tail());
- return rbm_written_to >= rbm_tail ?
- rbm_written_to - rbm_tail :
- rbm_written_to + get_total_size() + get_block_size()
- - rbm_tail;
- }
- size_t get_total_size() const {
- assert(device);
- return device->get_journal_size() - get_block_size();
- }
- rbm_abs_addr get_start_addr() const {
- assert(device);
- return device->get_journal_start() + get_block_size();
- }
- size_t get_available_size() const {
- return get_total_size() - get_used_size();
- }
-
seastar::future<> update_journal_tail(
journal_seq_t dirty,
journal_seq_t alloc) {
}
void set_written_to(journal_seq_t seq) {
rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset);
- assert(addr >= get_start_addr());
+ assert(addr >= get_records_start());
assert(addr < get_journal_end());
written_to = seq;
}
assert(device);
return device->get_block_size();
}
+
+ /*
+ Size-related interfaces
+ +---------------------------------------------------------+
+ | header | record | record | record | record | ... |
+ +---------------------------------------------------------+
+ ^ ^ ^
+ | | |
+ get_journal_start | get_journal_end
+ get_records_start
+ <-- get_records_total_size + block_size -->
+ <--------------- get_journal_size ------------------------>
+ */
+
+ size_t get_records_used_size() const {
+ auto rbm_written_to = get_rbm_addr(get_written_to());
+ auto rbm_tail = get_rbm_addr(get_dirty_tail());
+ return rbm_written_to >= rbm_tail ?
+ rbm_written_to - rbm_tail :
+ rbm_written_to + get_records_total_size() + get_block_size()
+ - rbm_tail;
+ }
+ size_t get_records_total_size() const {
+ assert(device);
+ // a block is for header and a block is reserved to denote the end
+ return device->get_journal_size() - (2 * get_block_size());
+ }
+ rbm_abs_addr get_records_start() const {
+ assert(device);
+ return device->get_journal_start() + get_block_size();
+ }
+ size_t get_records_available_size() const {
+ return get_records_total_size() - get_records_used_size();
+ }
rbm_abs_addr get_journal_end() const {
- return get_start_addr() + get_total_size() + get_block_size(); // journal size + header length
+ assert(device);
+ return device->get_journal_start() + device->get_journal_size();
}
seastar::future<> finish_commit(transaction_type_t type) final;
+
private:
cbj_header_t header;
JournalTrimmer &trimmer;
bool initialized = false;
// start address where the newest record will be written
- // should be in range [get_start_addr(), get_journal_end())
+ // should be in range [get_records_start(), get_journal_end())
// written_to.segment_seq is circulation seq to track
// the sequence to written records
journal_seq_t written_to;
return seastar::now();
}).unsafe_get0();
}
- auto get_available_size() {
- return cbj->get_available_size();
+ auto get_records_available_size() {
+ return cbj->get_records_available_size();
}
- auto get_total_size() {
- return cbj->get_total_size();
+ auto get_records_total_size() {
+ return cbj->get_records_total_size();
}
auto get_block_size() {
return device->get_block_size();
auto get_journal_tail() {
return cbj->get_dirty_tail();
}
- auto get_used_size() {
- return cbj->get_used_size();
+ auto get_records_used_size() {
+ return cbj->get_records_used_size();
}
void update_journal_tail(rbm_abs_addr addr, uint32_t len) {
paddr_t paddr =
auto record_total_size = r_size.get_encoded_length();
submit_record(std::move(rec));
- while (record_total_size <= get_available_size()) {
+ while (record_total_size <= get_records_available_size()) {
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
});
}
- uint64_t avail = get_available_size();
+ uint64_t avail = get_records_available_size();
update_journal_tail(entries.back().addr, record_total_size);
- ASSERT_EQ(get_total_size(),
- get_available_size());
+ ASSERT_EQ(get_records_total_size(),
+ get_records_available_size());
// will be appended at the begining of log
submit_record(
{ generate_delta(20), generate_delta(21) }
});
- while (record_total_size <= get_available_size()) {
+ while (record_total_size <= get_records_available_size()) {
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
{ generate_delta(20), generate_delta(21) }
});
}
- ASSERT_EQ(avail, get_available_size());
+ ASSERT_EQ(avail, get_records_available_size());
});
}
auto r_size = record_group_size_t(rec.size, block_size);
auto record_total_size = r_size.get_encoded_length();
submit_record(std::move(rec));
- while (record_total_size <= get_available_size()) {
+ while (record_total_size <= get_records_available_size()) {
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
});
}
- uint64_t avail = get_available_size();
+ uint64_t avail = get_records_available_size();
update_journal_tail(entries.front().addr, record_total_size);
entries.erase(entries.begin());
- ASSERT_EQ(avail + record_total_size, get_available_size());
- avail = get_available_size();
+ ASSERT_EQ(avail + record_total_size, get_records_available_size());
+ avail = get_records_available_size();
// will be appended at the begining of WAL
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
{ generate_delta(20), generate_delta(21) }
});
- ASSERT_EQ(avail - record_total_size, get_available_size());
+ ASSERT_EQ(avail - record_total_size, get_records_available_size());
replay_and_check();
});
}
auto r_size = record_group_size_t(rec.size, block_size);
auto record_total_size = r_size.get_encoded_length();
submit_record(std::move(rec));
- while (record_total_size <= get_available_size()) {
+ while (record_total_size <= get_records_available_size()) {
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
});
}
// will be appended at the begining of WAL
- uint64_t avail = get_available_size();
+ uint64_t avail = get_records_available_size();
update_journal_tail(entries.front().addr, record_total_size);
entries.erase(entries.begin());
- ASSERT_EQ(avail + record_total_size, get_available_size());
- avail = get_available_size();
+ ASSERT_EQ(avail + record_total_size, get_records_available_size());
+ avail = get_records_available_size();
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
{ generate_delta(20), generate_delta(21) }
});
- ASSERT_EQ(avail - record_total_size, get_available_size());
+ ASSERT_EQ(avail - record_total_size, get_records_available_size());
cbj->close().unsafe_get0();
replay();
});
auto r_size = record_group_size_t(rec.size, block_size);
auto record_total_size = r_size.get_encoded_length();
submit_record(std::move(rec));
- while (record_total_size <= get_available_size()) {
+ while (record_total_size <= get_records_available_size()) {
submit_record(
record_t {
{ generate_extent(1), generate_extent(2) },
});
}
auto old_written_to = get_written_to();
- auto old_used_size = get_used_size();
+ auto old_used_size = get_records_used_size();
set_written_to(
journal_seq_t{0,
convert_abs_addr_to_paddr(
- cbj->get_start_addr(),
+ cbj->get_records_start(),
cbj->get_device_id())});
cbj->close().unsafe_get0();
replay();
ASSERT_EQ(old_written_to, get_written_to());
ASSERT_EQ(old_used_size,
- get_used_size());
+ get_records_used_size());
});
}