head.device_id = config.device_id;
encode(head, bl);
header = head;
- written_to = head.journal_tail;
+ set_written_to(head.journal_tail);
DEBUG(
"initialize header block in CircularBoundedJournal, length {}",
bl.length());
});
}
-CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::append_record(
- ceph::bufferlist bl,
- rbm_abs_addr addr)
-{
- LOG_PREFIX(CircularBoundedJournal::append_record);
- std::vector<std::pair<rbm_abs_addr, bufferlist>> writes;
- if (addr + bl.length() <= get_journal_end()) {
- writes.push_back(std::make_pair(addr, bl));
- } else {
- // write remaining data---in this case,
- // data is splited into two parts before due to the end of CircularBoundedJournal.
- // the following code is to write the second part
- bufferlist first_write, next_write;
- first_write.substr_of(bl, 0, get_journal_end() - addr);
- writes.push_back(std::make_pair(addr, first_write));
- next_write.substr_of(
- bl, first_write.length(), bl.length() - first_write.length());
- writes.push_back(std::make_pair(get_start_addr(), next_write));
- }
-
- return seastar::do_with(
- std::move(bl),
- [this, writes=move(writes), FNAME](auto& bl) mutable
- {
- DEBUG("original bl length {}", bl.length());
- return write_ertr::parallel_for_each(
- writes,
- [this, FNAME](auto& p) mutable
- {
- DEBUG(
- "append_record: offset {}, length {}",
- p.first,
- p.second.length());
- return device_write_bl(p.first, p.second
- ).handle_error(
- write_ertr::pass_further{},
- crimson::ct_error::assert_all{ "Invalid error device->write" }
- ).safe_then([]() {
- return write_ertr::now();
- });
- });
- });
-}
-
CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
record_t &&record,
OrderingHandle &handle)
assert(write_pipeline);
auto r_size = record_group_size_t(record.size, get_block_size());
auto encoded_size = r_size.get_encoded_length();
- if (get_written_to() +
- ceph::encoded_sizeof_bounded<record_group_header_t>() > get_journal_end()) {
- // not enough space between written_to and the end of journal,
- // so that set written_to to the beginning of cbjournal to append
- // the record at the start address of cbjournal
- // | cbjournal |
- // v v
- // written_to <-> the end of journal
- set_written_to(get_start_addr());
- }
if (encoded_size > get_available_size()) {
ERROR(
"CircularBoundedJournal::submit_record: record size {}, but available size {}",
);
return crimson::ct_error::erange::make();
}
+ if (encoded_size + get_written_to() > get_journal_end()) {
+ DEBUG("roll");
+ set_written_to(get_start_addr());
+ if (encoded_size > get_available_size()) {
+ ERROR("rolled, record size {}, but available size {}",
+ encoded_size, get_available_size());
+ return crimson::ct_error::erange::make();
+ }
+ }
journal_seq_t j_seq {
cur_segment_seq++,
std::move(record), device->get_block_size(),
j_seq, 0);
auto target = get_written_to();
- if (get_written_to() + to_write.length() >= get_journal_end()) {
- set_written_to(get_start_addr() +
- (to_write.length() - (get_journal_end() - get_written_to())));
+ auto new_written_to = target + to_write.length();
+ if (new_written_to >= get_journal_end()) {
+ assert(new_written_to == get_journal_end());
+ DEBUG("roll");
+ set_written_to(get_start_addr());
} else {
- set_written_to(get_written_to() + to_write.length());
+ set_written_to(new_written_to);
}
DEBUG(
"submit_record: mdlength {}, dlength {}, target {}",
j_seq,
(seastore_off_t)to_write.length()
};
- auto write_fut = append_record(to_write, target);
+ auto write_fut = device_write_bl(target, to_write);
return handle.enter(write_pipeline->device_submission
).then([write_fut = std::move(write_fut)]() mutable {
- return std::move(write_fut
- ).handle_error(
- write_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in CircularBoundedJournal::append_record"
- }
- );
+ return std::move(write_fut);
}).safe_then([this, &handle] {
return handle.enter(write_pipeline->finalize);
}).safe_then([this, target,
* read records from last applied record prior to written_to, and replay
*/
LOG_PREFIX(CircularBoundedJournal::replay);
- auto fut = open_device_read_header();
- return fut.safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto addr) {
+ return open_device_read_header(
+ ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) {
+ set_written_to(get_journal_tail());
return seastar::do_with(
+ bool(false),
rbm_abs_addr(get_journal_tail()),
std::move(delta_handler),
segment_seq_t(),
- [this, FNAME](auto &cursor_addr, auto &d_handler, auto &last_seq) {
+ [this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &last_seq) {
return crimson::repeat(
- [this, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
+ [this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME]() mutable
-> replay_ertr::future<seastar::stop_iteration> {
- paddr_t cursor_paddr = convert_abs_addr_to_paddr(
+ paddr_t record_paddr = convert_abs_addr_to_paddr(
cursor_addr,
header.device_id);
- return read_record(cursor_paddr
- ).safe_then([this, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret) {
+ return read_record(record_paddr, last_seq
+ ).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &last_seq, FNAME](auto ret)
+ -> replay_ertr::future<seastar::stop_iteration> {
if (!ret.has_value()) {
- DEBUG("no more records");
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
+ if (is_rolled) {
+ DEBUG("no more records, stop replaying");
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
+ } else {
+ cursor_addr = get_start_addr();
+ is_rolled = true;
+ return replay_ertr::make_ready_future<
+ seastar::stop_iteration>(seastar::stop_iteration::no);
+ }
}
auto [r_header, bl] = *ret;
- if (last_seq > r_header.committed_to.segment_seq) {
- DEBUG("found invalide record. stop replaying");
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
- }
bufferlist mdbuf;
mdbuf.substr_of(bl, 0, r_header.mdlength);
paddr_t record_block_base = paddr_t::make_blk_paddr(
auto maybe_record_deltas_list = try_decode_deltas(
r_header, mdbuf, record_block_base);
if (!maybe_record_deltas_list) {
- DEBUG("unable to decode deltas for record {} at {}",
- r_header, record_block_base);
- return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
+ // This should be impossible, we did check the crc on the mdbuf
+ ERROR("unable to decode deltas for record {} at {}",
+ r_header, record_block_base);
+ return crimson::ct_error::input_output_error::make();
}
- DEBUG(" record_group_header_t: {}, cursor_addr: {} ",
- r_header, cursor_addr);
+ DEBUG("{} at {}", r_header, cursor_addr);
auto write_result = write_result_t{
r_header.committed_to,
(seastore_off_t)bl.length()
};
cur_segment_seq = r_header.committed_to.segment_seq + 1;
cursor_addr += bl.length();
+ if (cursor_addr >= get_journal_end()) {
+ assert(cursor_addr == get_journal_end());
+ cursor_addr = get_start_addr();
+ is_rolled = true;
+ }
set_written_to(cursor_addr);
last_seq = r_header.committed_to.segment_seq;
return seastar::do_with(
std::move(*maybe_record_deltas_list),
[write_result,
- this,
&d_handler,
- &cursor_addr,
FNAME](auto& record_deltas_list) {
return crimson::do_for_each(
record_deltas_list,
locator.write_result.start_seq,
modify_time);
});
- }).safe_then([this, &cursor_addr]() {
- if (cursor_addr >= get_journal_end()) {
- cursor_addr = (cursor_addr - get_journal_end()) + get_start_addr();
- }
- if (get_written_to() +
- ceph::encoded_sizeof_bounded<record_group_header_t>() > get_journal_end()) {
- cursor_addr = get_start_addr();
- }
+ }).safe_then([]() {
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
CircularBoundedJournal::return_record(record_group_header_t& header, bufferlist bl)
{
LOG_PREFIX(CircularBoundedJournal::return_record);
+ DEBUG("record size {}", bl.length());
+ assert(bl.length() == header.mdlength + header.dlength);
bufferlist md_bl, data_bl;
- md_bl.substr_of(bl, 0, get_block_size());
+ md_bl.substr_of(bl, 0, header.mdlength);
data_bl.substr_of(bl, header.mdlength, header.dlength);
if (validate_records_metadata(md_bl) &&
validate_records_data(header, data_bl)) {
}
}
-CircularBoundedJournal::read_record_ret CircularBoundedJournal::read_record(paddr_t off)
+CircularBoundedJournal::read_record_ret
+CircularBoundedJournal::read_record(paddr_t off, segment_seq_t last_seq)
{
LOG_PREFIX(CircularBoundedJournal::read_record);
- rbm_abs_addr offset = convert_paddr_to_abs_addr(
- off);
- rbm_abs_addr addr = offset;
+ rbm_abs_addr addr = convert_paddr_to_abs_addr(off);
auto read_length = get_block_size();
- if (addr + get_block_size() > get_journal_end()) {
- addr = get_start_addr();
- read_length = get_journal_end() - offset;
- }
+ assert(addr + read_length <= get_journal_end());
DEBUG("read_record: reading record from abs addr {} read length {}",
addr, read_length);
auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
- bptr.zero();
return device->read(addr, bptr
- ).safe_then([this, addr, read_length, bptr, FNAME]() mutable
+ ).safe_then([this, addr, bptr, last_seq, FNAME]() mutable
-> read_record_ret {
record_group_header_t h;
bufferlist bl;
read_record_ertr::ready_future_marker{},
std::nullopt);
}
- /*
- * | journal |
- * | record 1 header | | record 1 data
- * record 1 data (remaining) |
- *
- * <---- 1 block ----><--
- * -- 2 block --->
- *
- * If record has logner than read_length and its data is located across
- * the end of journal and the begining of journal, we need three reads
- * ---reads of header, other remaining data before the end, and
- * the other remaining data from the begining.
- *
- */
- if (h.mdlength + h.dlength > read_length) {
- rbm_abs_addr next_read_addr = addr + read_length;
- auto next_read = h.mdlength + h.dlength - read_length;
- DEBUG(" next_read_addr {}, next_read_length {} ",
- next_read_addr, next_read);
- if (get_journal_end() < next_read_addr + next_read) {
- // In this case, need two more reads.
- // The first is to read remain bytes to the end of cbjournal
- // The second is to read the data at the begining of cbjournal
- next_read = get_journal_end() - (addr + read_length);
- }
- DEBUG("read_entry: additional reading addr {} length {}",
- next_read_addr,
- next_read);
- auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_read));
- next_bptr.zero();
- return device->read(
- next_read_addr,
- next_bptr
- ).safe_then([this, h=h, next_bptr=std::move(next_bptr), bl=std::move(bl),
- FNAME]() mutable {
- bl.append(next_bptr);
- if (h.mdlength + h.dlength == bl.length()) {
- DEBUG("read_record: record length {} done", bl.length());
- return return_record(h, bl);
- }
- // need one more read
- auto next_read_addr = get_start_addr();
- auto last_bptr = bufferptr(ceph::buffer::create_page_aligned(
- h.mdlength + h.dlength - bl.length()));
- DEBUG("read_record: last additional reading addr {} length {}",
- next_read_addr,
- h.mdlength + h.dlength - bl.length());
- return device->read(
- next_read_addr,
- last_bptr
- ).safe_then([this, h=h, last_bptr=std::move(last_bptr),
- bl=std::move(bl), FNAME]() mutable {
- bl.append(last_bptr);
- DEBUG("read_record: complte size {}", bl.length());
- return return_record(h, bl);
- });
+ if (h.mdlength < get_block_size() ||
+ h.mdlength % get_block_size() != 0 ||
+ h.dlength % get_block_size() != 0 ||
+ addr + h.mdlength + h.dlength > get_journal_end() ||
+ h.committed_to.segment_seq == NULL_SEG_SEQ ||
+ h.committed_to.segment_seq < last_seq) {
+ return read_record_ret(
+ read_record_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ auto record_size = h.mdlength + h.dlength;
+ if (record_size > get_block_size()) {
+ auto next_addr = addr + get_block_size();
+ auto next_length = record_size - get_block_size();
+ auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length));
+ DEBUG("reading record part 2 from abs addr {} read length {}",
+ next_addr, next_length);
+ return device->read(next_addr, next_bptr
+ ).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable {
+ bl.append(next_bptr);
+ return return_record(h, bl);
});
} else {
- DEBUG("read_record: complte size {}", bl.length());
+ assert(record_size == get_block_size());
return return_record(h, bl);
}
});