Decouple get_written_to() so it can be queried upon writes.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
return roll_ertr::now();
}
-CircularJournalSpace::write_ret
+CircularJournalSpace::write_ertr::future<>
CircularJournalSpace::write(ceph::bufferlist&& to_write) {
LOG_PREFIX(CircularJournalSpace::write);
assert(get_written_to().segment_seq != NULL_SEG_SEQ);
assert(encoded_size + get_rbm_addr(get_written_to())
< get_journal_end());
- journal_seq_t j_seq = get_written_to();
auto target = get_rbm_addr(get_written_to());
auto new_written_to = target + encoded_size;
assert(new_written_to < get_journal_end());
get_device_id());
set_written_to(
journal_seq_t{get_written_to().segment_seq, paddr});
- DEBUG("{}, target {}", to_write.length(), target);
+ DEBUG("length {}, commit target {}, used_size {}",
+ encoded_size, target, get_records_used_size());
- auto write_result = write_result_t{
- j_seq,
- encoded_size
- };
return device_write_bl(target, to_write
- ).safe_then([this, target,
- length=encoded_size,
- write_result,
- FNAME] {
- DEBUG("commit target {} used_size {} written length {}",
- target, get_records_used_size(), length);
- return write_result;
- }).handle_error(
+ ).handle_error(
write_ertr::pass_further{},
crimson::ct_error::assert_all{ "Invalid error" }
);
roll_ertr::future<> roll() final;
- write_ret write(ceph::bufferlist&& to_write) final;
+ journal_seq_t get_written_to() const final {
+ return written_to;
+ }
+
+ write_ertr::future<> write(ceph::bufferlist&& to_write) final;
void update_modify_time(record_t& record) final {}
*
*/
- journal_seq_t get_written_to() const {
- return written_to;
- }
rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
return convert_paddr_to_abs_addr(seq.offset);
}
journal_allocator.get_nonce());
DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
get_name(), sizes, get_committed_to(), num_outstanding_io);
+ write_result_t result{
+ journal_allocator.get_written_to(),
+ to_write.length()};
return journal_allocator.write(std::move(to_write)
- ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+ ).safe_then([mdlength=sizes.get_mdlength(), result] {
return record_locator_t{
- write_result.start_seq.offset.add_offset(mdlength),
- write_result
+ result.start_seq.offset.add_offset(mdlength),
+ result
};
}).finally([this] {
decrement_io_with_flush();
// Note: rg is cleared
DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
get_name(), num, sizes, get_committed_to(), num_outstanding_io);
+ write_result_t result{
+ journal_allocator.get_written_to(),
+ to_write.length()};
std::ignore = journal_allocator.write(std::move(to_write)
- ).safe_then([this, p_batch, FNAME, num, sizes](auto write_result) {
+ ).safe_then([this, p_batch, FNAME, num, sizes, result] {
TRACE("{} {} records, {}, write done with {}",
- get_name(), num, sizes, write_result);
- finish_submit_batch(p_batch, write_result);
+ get_name(), num, sizes, result);
+ finish_submit_batch(p_batch, result);
}).handle_error(
crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes](auto e) {
ERROR("{} {} records, {}, got error {}",
virtual segment_nonce_t get_nonce() const = 0;
+ virtual journal_seq_t get_written_to() const = 0;
+
using write_ertr = base_ertr;
- using write_ret = write_ertr::future<write_result_t>;
- virtual write_ret write(ceph::bufferlist&& to_write) = 0;
+ virtual write_ertr::future<> write(ceph::bufferlist&& to_write) = 0;
virtual bool can_write() const = 0;
});
}
-SegmentAllocator::write_ret
+journal_seq_t
+SegmentAllocator::get_written_to() const
+{
+ return journal_seq_t{
+ segment_provider.get_seg_info(
+ current_segment->get_segment_id()).seq,
+ paddr_t::make_seg_paddr(
+ current_segment->get_segment_id(),
+ written_to)
+ };
+}
+
+SegmentAllocator::write_ertr::future<>
SegmentAllocator::write(ceph::bufferlist&& to_write)
{
LOG_PREFIX(SegmentAllocator::write);
assert(can_write());
auto write_length = to_write.length();
auto write_start_offset = written_to;
- auto write_start_seq = journal_seq_t{
- segment_provider.get_seg_info(current_segment->get_segment_id()).seq,
- paddr_t::make_seg_paddr(
- current_segment->get_segment_id(), write_start_offset)
- };
- TRACE("{} {}~{}", print_name, write_start_seq, write_length);
+ if (unlikely(LOCAL_LOGGER.is_enabled(seastar::log_level::trace))) {
+ TRACE("{} {}~{}", print_name, get_written_to(), write_length);
+ }
assert(write_length > 0);
assert((write_length % get_block_size()) == 0);
assert(!needs_roll(write_length));
- auto write_result = write_result_t{
- write_start_seq,
- write_length
- };
written_to += write_length;
segment_provider.update_segment_avail_bytes(
type,
crimson::ct_error::assert_all{
"Invalid error in SegmentAllocator::write"
}
- ).safe_then([write_result, cs=current_segment] {
- return write_result;
- });
+ ).finally([cs=current_segment] {});
}
SegmentAllocator::close_ertr::future<>
// close the current segment and initialize next one
roll_ertr::future<> roll() final;
+ journal_seq_t get_written_to() const final;
+
// write the buffer, return the write result
//
// May be called concurrently, but writes may complete in any order.
// If rolling/opening, no write is allowed.
- write_ret write(ceph::bufferlist&& to_write) final;
+ write_ertr::future<> write(ceph::bufferlist&& to_write) final;
using close_ertr = base_ertr;
close_ertr::future<> close() final;