t.write_set.clear();
record_t record;
+ auto commit_time = seastar::lowres_system_clock::now();
+ record.commit_time = commit_time.time_since_epoch().count();
+ record.commit_type = (t.get_src() == Transaction::src_t::MUTATE)
+ ? record_commit_type_t::MODIFY
+ : record_commit_type_t::REWRITE;
// Add new copy of mutated blocks, set_io_wait to block until written
record.deltas.reserve(t.mutated_block_list.size());
i->prepare_write();
i->set_io_wait();
+ i->set_last_modified(commit_time);
assert(i->get_version() > 0);
auto final_crc = i->get_crc32c();
if (i->get_type() == extent_types_t::ROOT) {
ceph_assert(0 == "ROOT never gets written as a fresh block");
}
+ if (t.get_src() == Transaction::src_t::MUTATE) {
+ i->set_last_modified(commit_time);
+ } else {
+ assert(t.get_src() >= Transaction::src_t::CLEANER_TRIM);
+ i->set_last_rewritten(commit_time);
+ }
+
assert(bl.length() == i->get_length());
record.push_back(extent_t{
i->get_type(),
: (is_lba_node(i->get_type())
? i->cast<lba_manager::btree::LBANode>()->get_node_meta().begin
: L_ADDR_NULL),
- std::move(bl)
+ std::move(bl),
+ i->get_last_modified().time_since_epoch().count()
});
}
if (cleaner) {
cleaner->mark_space_used(
i->get_paddr(),
- i->get_length());
+ i->get_length(),
+ (t.get_src() == Transaction::src_t::MUTATE)
+ ? i->last_modified
+ : seastar::lowres_system_clock::time_point(),
+ (t.get_src() >= Transaction::src_t::CLEANER_TRIM)
+ ? i->last_rewritten
+ : seastar::lowres_system_clock::time_point());
}
}
});
Cache::replay_delta(
journal_seq_t journal_seq,
paddr_t record_base,
- const delta_info_t &delta)
+ const delta_info_t &delta,
+ seastar::lowres_system_clock::time_point& last_modified)
{
LOG_PREFIX(Cache::replay_delta);
if (delta.type == extent_types_t::ROOT) {
root->state = CachedExtent::extent_state_t::DIRTY;
DEBUG("replayed root delta at {} {}, add extent -- {}, root={}",
journal_seq, record_base, delta, *root);
+ root->set_last_modified(last_modified);
add_extent(root);
return replay_delta_ertr::now();
} else {
assert(extent->last_committed_crc == delta.prev_crc);
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
+ extent->set_last_modified(last_modified);
assert(extent->last_committed_crc == delta.final_crc);
extent->version++;
replay_delta_ret replay_delta(
journal_seq_t seq,
paddr_t record_block_base,
- const delta_info_t &delta);
+ const delta_info_t &delta,
+ seastar::lowres_system_clock::time_point& last_modified);
/**
* init_cached_extents
// Points at current version while in state MUTATION_PENDING
CachedExtentRef prior_instance;
+ // time of the last modification
+ seastar::lowres_system_clock::time_point last_modified;
+
+ // time of the last rewrite
+ seastar::lowres_system_clock::time_point last_rewritten;
public:
+
+ void set_last_modified(seastar::lowres_system_clock::duration d) {
+ last_modified = seastar::lowres_system_clock::time_point(d);
+ }
+
+ void set_last_modified(seastar::lowres_system_clock::time_point t) {
+ last_modified = t;
+ }
+
+ seastar::lowres_system_clock::time_point get_last_modified() const {
+ return last_modified;
+ }
+
+ void set_last_rewritten(seastar::lowres_system_clock::duration d) {
+ last_rewritten = seastar::lowres_system_clock::time_point(d);
+ }
+
+ void set_last_rewritten(seastar::lowres_system_clock::time_point t) {
+ last_rewritten = t;
+ }
+
+ seastar::lowres_system_clock::time_point get_last_rewritten() const {
+ return last_rewritten;
+ }
/**
* duplicate_for_write
*
<< ", type=" << get_type()
<< ", version=" << version
<< ", dirty_from_or_retired_at=" << dirty_from_or_retired_at
+ << ", last_modified=" << last_modified.time_since_epoch()
+ << ", last_rewritten=" << last_rewritten.time_since_epoch()
<< ", paddr=" << get_paddr()
<< ", length=" << get_length()
<< ", state=" << state
}
assert(segment_allocator.can_write());
- ool_record_t record(segment_allocator.get_block_size());
+ ool_record_t record(
+ segment_allocator.get_block_size(),
+ (t.get_src() == Transaction::src_t::MUTATE)
+ ? record_commit_type_t::MODIFY
+ : record_commit_type_t::REWRITE);
for (auto it = extents.begin(); it != extents.end();) {
auto& extent = *it;
auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent);
public:
OolExtent(LogicalCachedExtentRef& lextent)
: lextent(lextent) {}
+
void set_ool_paddr(paddr_t addr) {
ool_offset = addr;
}
};
public:
- ool_record_t(size_t block_size) : block_size(block_size) {}
+ ool_record_t(
+ size_t block_size,
+ record_commit_type_t commit_type)
+ : block_size(block_size),
+ commit_type(commit_type) {}
record_group_size_t get_encoded_record_length() {
assert(extents.size() == record.extents.size());
return record_group_size_t(record.size, block_size);
segment_nonce_t nonce) {
assert(extents.size() == record.extents.size());
assert(!record.deltas.size());
+ auto commit_time = seastar::lowres_system_clock::now();
+ record.commit_time = commit_time.time_since_epoch().count();
+ record.commit_type = commit_type;
auto record_group = record_group_t(std::move(record), block_size);
seastore_off_t extent_offset = base + record_group.size.get_mdlength();
for (auto& extent : extents) {
extent.set_ool_paddr(
paddr_t::make_seg_paddr(segment, extent_offset));
+ if (commit_type == record_commit_type_t::MODIFY) {
+ extent.get_lextent()->set_last_modified(commit_time);
+ } else {
+ assert(commit_type == record_commit_type_t::REWRITE);
+ extent.get_lextent()->set_last_rewritten(commit_time);
+ }
extent_offset += extent.get_bptr().length();
}
assert(extent_offset ==
record.push_back(extent_t{
extent->get_type(),
extent->get_laddr(),
- std::move(bl)});
+ std::move(bl),
+ extent->get_last_modified().time_since_epoch().count()});
}
std::vector<OolExtent>& get_extents() {
return extents;
record_t record;
size_t block_size;
seastore_off_t base = MAX_SEG_OFF;
+ record_commit_type_t commit_type =
+ record_commit_type_t::NONE;
};
/**
namespace crimson::os::seastore {
+ExtentReader::read_segment_tail_ret
+ExtentReader::read_segment_tail(segment_id_t segment)
+{
+ auto& segment_manager = *segment_managers[segment.device_id()];
+ return segment_manager.read(
+ paddr_t::make_seg_paddr(
+ segment,
+ segment_manager.get_segment_size() -
+ segment_manager.get_rounded_tail_length()),
+ segment_manager.get_rounded_tail_length()
+ ).handle_error(
+ read_segment_header_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in ExtentReader::read_segment_tail"
+ }
+ ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret {
+ LOG_PREFIX(ExtentReader::read_segment_tail);
+ DEBUG("segment {} bptr size {}", segment, bptr.length());
+
+ segment_tail_t tail;
+ bufferlist bl;
+ bl.push_back(bptr);
+
+ DEBUG("segment {} block crc {}",
+ segment,
+ bl.begin().crc32c(segment_manager.get_block_size(), 0));
+
+ auto bp = bl.cbegin();
+ try {
+ decode(tail, bp);
+ } catch (ceph::buffer::error &e) {
+ DEBUG("segment {} unable to decode tail, skipping -- {}",
+ segment, e);
+ return crimson::ct_error::enodata::make();
+ }
+ DEBUG("segment {} tail {}", segment, tail);
+ return read_segment_tail_ret(
+ read_segment_tail_ertr::ready_future_marker{},
+ tail);
+ });
+}
+
ExtentReader::read_segment_header_ret
ExtentReader::read_segment_header(segment_id_t segment)
{
for (auto& r: *maybe_record_extent_infos) {
DEBUG("decoded {} extents", r.extent_infos.size());
for (const auto &i : r.extent_infos) {
- extents->emplace_back(extent_offset, i);
+ extents->emplace_back(
+ extent_offset,
+ std::pair<commit_info_t, extent_info_t>(
+ {r.header.commit_time,
+ r.header.commit_type},
+ i));
auto& seg_addr = extent_offset.as_seg_paddr();
seg_addr.set_segment_off(
seg_addr.get_segment_off() + i.len);
segment_header_t>;
read_segment_header_ret read_segment_header(segment_id_t segment);
+ using read_segment_tail_ertr = read_segment_header_ertr;
+ using read_segment_tail_ret = read_segment_tail_ertr::future<
+ segment_tail_t>;
+ read_segment_tail_ret read_segment_tail(segment_id_t segment);
+
+ struct commit_info_t {
+ mod_time_point_t commit_time;
+ record_commit_type_t commit_type;
+ };
+
/**
* scan_extents
*
*/
using scan_extents_cursor = scan_valid_records_cursor;
using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
- using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
+ using scan_extents_ret_bare =
+ std::list<std::pair<paddr_t, std::pair<commit_info_t, extent_info_t>>>;
using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
scan_extents_ret scan_extents(
scan_extents_cursor &cursor,
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
replay_ret(const record_locator_t&,
- const delta_info_t&)>;
+ const delta_info_t&,
+ seastar::lowres_system_clock::time_point last_modified)>;
virtual replay_ret replay(
delta_handler_t &&delta_handler) = 0;
cur_segment_seq,
close_segment_id,
cur_journal_tail,
- current_segment_nonce};
+ current_segment_nonce,
+ segment_provider.get_last_modified(
+ close_segment_id).time_since_epoch().count(),
+ segment_provider.get_last_rewritten(
+ close_segment_id).time_since_epoch().count()};
ceph::bufferlist bl;
encode(tail, bl);
[locator,
this,
FNAME,
- &handler](delta_info_t& delta)
+ &handler](auto &p)
{
+ auto& commit_time = p.first;
+ auto& delta = p.second;
/* The journal may validly contain deltas for extents in
* since released segments. We can detect those cases by
* checking whether the segment in question currently has a
return replay_ertr::now();
}
}
- return handler(locator, delta);
+ return handler(
+ locator,
+ delta,
+ seastar::lowres_system_clock::time_point(
+ seastar::lowres_system_clock::duration(commit_time)));
});
});
});
lba_extent.get_length(),
nlba_extent->get_bptr().c_str());
nlba_extent->pin.set_range(nlba_extent->get_node_meta());
+ nlba_extent->set_last_modified(lba_extent.get_last_modified());
/* This is a bit underhanded. Any relative addrs here must necessarily
* be record relative as we are rewriting a dirty extent. Thus, we
<< ")";
}
+std::ostream &operator<<(std::ostream &out, const segment_tail_t &tail)
+{
+ return out << "segment_tail_t("
+ << "segment_seq=" << tail.journal_segment_seq
+ << ", segment_id=" << tail.physical_segment_id
+ << ", journal_tail=" << tail.journal_tail
+ << ", segment_nonce=" << tail.segment_nonce
+ << ", last_modified=" << tail.last_modified
+ << ", last_rewritten=" << tail.last_rewritten
+ << ")";
+}
+
extent_len_t record_size_t::get_raw_mdlength() const
{
// empty record is allowed to submit
record_header_t rheader{
(extent_len_t)r.deltas.size(),
(extent_len_t)r.extents.size(),
+ r.commit_time,
+ r.commit_type
};
encode(rheader, bl);
}
return success;
}
-namespace {
-
std::optional<std::vector<record_header_t>>
try_decode_record_headers(
const record_group_header_t& header,
return record_headers;
}
-}
-
std::optional<std::vector<record_extent_infos_t> >
try_decode_extent_infos(
const record_group_header_t& header,
result_iter->deltas.resize(r.header.deltas);
for (auto& i: result_iter->deltas) {
try {
- decode(i, bliter);
+ decode(i.second, bliter);
+ i.first = r.header.commit_time;
} catch (ceph::buffer::error &e) {
journal_logger().debug(
"try_decode_deltas: failed, "
#include <vector>
#include <boost/core/ignore_unused.hpp>
+#include <seastar/core/lowres_clock.hh>
+
#include "include/byteorder.h"
#include "include/denc.h"
#include "include/buffer.h"
type == extent_types_t::LADDR_LEAF;
}
+std::ostream &operator<<(std::ostream &out, extent_types_t t);
+
+enum class record_commit_type_t : uint8_t {
+ NONE,
+ MODIFY,
+ REWRITE
+};
+
+// type for extent modification time, milliseconds since the epoch
+using mod_time_point_t = int64_t;
+
/* description of a new physical extent */
struct extent_t {
extent_types_t type; ///< type of extent
laddr_t addr; ///< laddr of extent (L_ADDR_NULL for non-logical)
ceph::bufferlist bl; ///< payload, bl.length() == length, aligned
+ mod_time_point_t last_modified;
};
using extent_version_t = uint32_t;
extent_types_t type = extent_types_t::NONE;
laddr_t addr = L_ADDR_NULL;
extent_len_t len = 0;
+ mod_time_point_t last_modified;
extent_info_t() = default;
extent_info_t(const extent_t &et)
- : type(et.type), addr(et.addr), len(et.bl.length()) {}
+ : type(et.type), addr(et.addr),
+ len(et.bl.length()),
+ last_modified(et.last_modified)
+ {}
DENC(extent_info_t, v, p) {
DENC_START(1, 1, p);
denc(v.type, p);
denc(v.addr, p);
denc(v.len, p);
+ denc(v.last_modified, p);
DENC_FINISH(p);
}
};
};
std::ostream &operator<<(std::ostream &out, const segment_header_t &header);
-using segment_tail_t = segment_header_t;
+struct segment_tail_t {
+ segment_seq_t journal_segment_seq;
+ segment_id_t physical_segment_id; // debugging
+
+ journal_seq_t journal_tail;
+ segment_nonce_t segment_nonce;
+ mod_time_point_t last_modified;
+ mod_time_point_t last_rewritten;
+
+ DENC(segment_tail_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.journal_segment_seq, p);
+ denc(v.physical_segment_id, p);
+ denc(v.journal_tail, p);
+ denc(v.segment_nonce, p);
+ denc(v.last_modified, p);
+ denc(v.last_rewritten, p);
+ DENC_FINISH(p);
+ }
+};
+std::ostream &operator<<(std::ostream &out, const segment_tail_t &tail);
struct record_size_t {
extent_len_t plain_mdlength = 0; // mdlength without the record header
std::vector<extent_t> extents;
std::vector<delta_info_t> deltas;
record_size_t size;
+ mod_time_point_t commit_time;
+ record_commit_type_t commit_type;
record_t() = default;
record_t(std::vector<extent_t>&& _extents,
struct record_header_t {
uint32_t deltas; // number of deltas
uint32_t extents; // number of extents
-
+ mod_time_point_t commit_time = 0;
+ record_commit_type_t commit_type;
DENC(record_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.deltas, p);
denc(v.extents, p);
+ denc(v.commit_time, p);
+ denc(v.commit_type, p);
DENC_FINISH(p);
}
};
try_decode_extent_infos(
const record_group_header_t& header,
const ceph::bufferlist& md_bl);
+std::optional<std::vector<record_header_t>>
+try_decode_record_headers(
+ const record_group_header_t& header,
+ const ceph::bufferlist& md_bl);
struct record_deltas_t {
paddr_t record_block_base;
- std::vector<delta_info_t> deltas;
+ // the mod time here can only be modification time, not rewritten time
+ std::vector<std::pair<mod_time_point_t, delta_info_t>> deltas;
};
std::optional<std::vector<record_deltas_t> >
try_decode_deltas(
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::extent_info_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::rbm_alloc_delta_t)
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_tail_t)
template<>
struct denc_traits<crimson::os::seastore::device_type_t> {
return trans_intr::do_for_each(
extents,
[this, &t](auto &extent) {
- auto &[addr, info] = extent;
+ auto &addr = extent.first;
+ auto commit_time = extent.second.first.commit_time;
+ auto commit_type = extent.second.first.commit_type;
+ auto &info = extent.second.second;
logger().debug(
"SegmentCleaner::gc_reclaim_space: checking extent {}",
info);
addr,
info.addr,
info.len
- ).si_then([addr=addr, &t, this](CachedExtentRef ext) {
+ ).si_then([&info, commit_type, commit_time, addr=addr, &t, this]
+ (CachedExtentRef ext) {
if (!ext) {
logger().debug(
"SegmentCleaner::gc_reclaim_space: addr {} dead, skipping",
"SegmentCleaner::gc_reclaim_space: addr {} alive, gc'ing {}",
addr,
*ext);
+ assert(commit_time);
+ assert(info.last_modified);
+ assert(commit_type == record_commit_type_t::MODIFY
+ || commit_type == record_commit_type_t::REWRITE);
+ if (ext->get_last_modified() ==
+ seastar::lowres_system_clock::time_point()) {
+ assert(ext->get_last_rewritten() ==
+ seastar::lowres_system_clock::time_point());
+ ext->set_last_modified(
+ seastar::lowres_system_clock::duration(
+ info.last_modified));
+ }
+ if (commit_type == record_commit_type_t::REWRITE
+ && ext->get_last_rewritten() ==
+ seastar::lowres_system_clock::time_point()) {
+ ext->set_last_rewritten(
+ seastar::lowres_system_clock::duration(
+ commit_time));
+ }
+
+ assert(
+ (commit_type == record_commit_type_t::MODIFY
+ && commit_time <=
+ ext->get_last_modified().time_since_epoch().count())
+ || (commit_type == record_commit_type_t::REWRITE
+ && commit_time ==
+ ext->get_last_rewritten().time_since_epoch().count()));
+
return ecb->rewrite_extent(
t,
ext);
register_metrics();
logger().debug("SegmentCleaner::mount: {} segments", segments.size());
- return crimson::do_for_each(
- segments.begin(),
- segments.end(),
- [this](auto& it) {
- auto segment_id = it.first;
- return scanner->read_segment_header(
- segment_id
- ).safe_then([segment_id, this](auto header) {
- logger().debug(
- "ExtentReader::mount: segment_id={} -- {}",
- segment_id, header);
- auto s_type = header.get_type();
- if (s_type == segment_type_t::NULL_SEG) {
- logger().error(
- "ExtentReader::mount: got null segment, segment_id={} -- {}",
+ return seastar::do_with(
+ std::vector<std::pair<segment_id_t, segment_header_t>>(),
+ [this](auto& segment_set) {
+ return crimson::do_for_each(
+ segments.begin(),
+ segments.end(),
+ [this, &segment_set](auto& it) {
+ auto segment_id = it.first;
+ return scanner->read_segment_header(
+ segment_id
+ ).safe_then([segment_id, this, &segment_set](auto header) {
+ logger().debug(
+ "ExtentReader::mount: segment_id={} -- {}",
segment_id, header);
- ceph_abort();
- }
- init_mark_segment_closed(
- segment_id,
- header.journal_segment_seq);
- }).handle_error(
- crimson::ct_error::enoent::handle([](auto) {
- return mount_ertr::now();
- }),
- crimson::ct_error::enodata::handle([](auto) {
- return mount_ertr::now();
+ auto s_type = header.get_type();
+ if (s_type == segment_type_t::NULL_SEG) {
+ logger().error(
+ "ExtentReader::mount: got null segment, segment_id={} -- {}",
+ segment_id, header);
+ ceph_abort();
+ }
+ return scanner->read_segment_tail(
+ segment_id
+ ).safe_then([this, segment_id, &segment_set, header](auto tail)
+ -> scan_extents_ertr::future<> {
+ if (tail.segment_nonce != header.segment_nonce) {
+ return scan_nonfull_segment(header, segment_set, segment_id);
+ }
+ seastar::lowres_system_clock::time_point last_modified(
+ seastar::lowres_system_clock::duration(tail.last_modified));
+ seastar::lowres_system_clock::time_point last_rewritten(
+ seastar::lowres_system_clock::duration(tail.last_rewritten));
+ if (segments[segment_id].last_modified < last_modified) {
+ segments[segment_id].last_modified = last_modified;
+ }
+ if (segments[segment_id].last_rewritten < last_rewritten) {
+ segments[segment_id].last_rewritten = last_rewritten;
+ }
+ init_mark_segment_closed(
+ segment_id,
+ header.journal_segment_seq);
+ return seastar::now();
+ }).handle_error(
+ crimson::ct_error::enodata::handle(
+ [this, header, segment_id, &segment_set](auto) {
+ return scan_nonfull_segment(header, segment_set, segment_id);
+ }),
+ crimson::ct_error::pass_further_all{}
+ );
+ }).handle_error(
+ crimson::ct_error::enoent::handle([](auto) {
+ return mount_ertr::now();
+ }),
+ crimson::ct_error::enodata::handle([](auto) {
+ return mount_ertr::now();
+ }),
+ crimson::ct_error::input_output_error::pass_further{},
+ crimson::ct_error::assert_all{"unexpected error"}
+ );
+ });
+ });
+}
+
+SegmentCleaner::scan_extents_ret SegmentCleaner::scan_nonfull_segment(
+ const segment_header_t& header,
+ scan_extents_ret_bare& segment_set,
+ segment_id_t segment_id)
+{
+ if (header.get_type() == segment_type_t::OOL) {
+ logger().info(
+ "ExtentReader::init_segments: out-of-line segment {}",
+ segment_id);
+ return seastar::do_with(
+ scan_valid_records_cursor({
+ segments[segment_id].journal_segment_seq,
+ paddr_t::make_seg_paddr(segment_id, 0)}),
+ [this, segment_id, header](auto& cursor) {
+ return seastar::do_with(
+ ExtentReader::found_record_handler_t([this, segment_id](
+ record_locator_t locator,
+ const record_group_header_t& header,
+ const bufferlist& mdbuf
+ ) mutable -> ExtentReader::scan_valid_records_ertr::future<> {
+ LOG_PREFIX(SegmentCleaner::scan_nonfull_segment);
+ DEBUG("decodeing {} records", header.records);
+ auto maybe_headers = try_decode_record_headers(header, mdbuf);
+ if (!maybe_headers) {
+ ERROR("unable to decode record headers for record group {}",
+ locator.record_block_base);
+ return crimson::ct_error::input_output_error::make();
+ }
+
+ for (auto& header : *maybe_headers) {
+ mod_time_point_t ctime = header.commit_time;
+ auto commit_type = header.commit_type;
+ if (!ctime) {
+ ERROR("Scanner::init_segments: extent {} 0 commit_time",
+ ctime);
+ ceph_abort("0 commit_time");
+ }
+ seastar::lowres_system_clock::time_point commit_time{
+ seastar::lowres_system_clock::duration(ctime)};
+ assert(commit_type == record_commit_type_t::MODIFY
+ || commit_type == record_commit_type_t::REWRITE);
+ if (commit_type == record_commit_type_t::MODIFY
+ && this->segments[segment_id].last_modified < commit_time) {
+ this->segments[segment_id].last_modified = commit_time;
+ }
+ if (commit_type == record_commit_type_t::REWRITE
+ && this->segments[segment_id].last_rewritten < commit_time) {
+ this->segments[segment_id].last_rewritten = commit_time;
+ }
+ }
+ return seastar::now();
}),
- crimson::ct_error::input_output_error::pass_further{}
+ [&cursor, header, segment_id, this](auto& handler) {
+ return scanner->scan_valid_records(
+ cursor,
+ header.segment_nonce,
+ segments[segment_id.device_id()]->segment_size,
+ handler);
+ }
);
+ }).safe_then([this, segment_id, header](auto) {
+ init_mark_segment_closed(
+ segment_id,
+ header.journal_segment_seq);
+ return seastar::now();
});
+ } else if (header.get_type() == segment_type_t::JOURNAL) {
+ logger().info(
+ "ExtentReader::init_segments: journal segment {}",
+ segment_id);
+ segment_set.emplace_back(std::make_pair(segment_id, std::move(header)));
+ } else {
+ ceph_abort("unexpected segment type");
+ }
+ init_mark_segment_closed(
+ segment_id,
+ header.journal_segment_seq);
+ return seastar::now();
}
}
// Will be non-null for any segments in the current journal
segment_seq_t journal_segment_seq = NULL_SEG_SEQ;
+ seastar::lowres_system_clock::time_point last_modified;
+ seastar::lowres_system_clock::time_point last_rewritten;
+
segment_type_t get_type() const {
return segment_seq_to_type(journal_segment_seq);
}
virtual segment_seq_t get_seq(segment_id_t id) { return 0; }
+ virtual seastar::lowres_system_clock::time_point get_last_modified(
+ segment_id_t id) const = 0;
+
+ virtual seastar::lowres_system_clock::time_point get_last_rewritten(
+ segment_id_t id) const = 0;
+
virtual void update_segment_avail_bytes(paddr_t offset) = 0;
virtual ~SegmentProvider() {}
void mark_space_used(
paddr_t addr,
extent_len_t len,
+ seastar::lowres_system_clock::time_point last_modified
+ = seastar::lowres_system_clock::time_point(),
+ seastar::lowres_system_clock::time_point last_rewritten
+ = seastar::lowres_system_clock::time_point(),
bool init_scan = false) {
auto& seg_addr = addr.as_seg_paddr();
assert(seg_addr.get_segment_id().device_id() ==
auto new_usage = space_tracker->calc_utilization(seg_addr.get_segment_id());
adjust_segment_util(old_usage, new_usage);
+ // use the last extent's last modified time for the calculation of the projected
+ // time the segments' live extents are to stay unmodified; this is an approximation
+ // of the sprite lfs' segment "age".
+
+ if (last_modified > segments[seg_addr.get_segment_id()].last_modified)
+ segments[seg_addr.get_segment_id()].last_modified = last_modified;
+
+ if (last_rewritten > segments[seg_addr.get_segment_id()].last_rewritten)
+ segments[seg_addr.get_segment_id()].last_rewritten = last_rewritten;
+
gc_process.maybe_wake_on_space_used();
assert(ret > 0);
}
+ seastar::lowres_system_clock::time_point get_last_modified(
+ segment_id_t id) const final {
+ return segments[id].last_modified;
+ }
+
+ seastar::lowres_system_clock::time_point get_last_rewritten(
+ segment_id_t id) const final {
+ return segments[id].last_rewritten;
+ }
+
void mark_space_free(
paddr_t addr,
extent_len_t len) {
}
}
+ using scan_extents_ret_bare =
+ std::vector<std::pair<segment_id_t, segment_header_t>>;
+ using scan_extents_ertr = ExtentReader::scan_extents_ertr;
+ using scan_extents_ret = scan_extents_ertr::future<>;
+ scan_extents_ret scan_nonfull_segment(
+ const segment_header_t& header,
+ scan_extents_ret_bare& segment_set,
+ segment_id_t segment_id);
+
/**
* gc_should_reclaim_space
*
scanner.get_segment_managers()
).safe_then([this] {
return journal->replay(
- [this](const auto &offsets, const auto &e) {
+ [this](const auto &offsets, const auto &e, auto last_modified) {
auto start_seq = offsets.write_result.start_seq;
segment_cleaner->update_journal_tail_target(
cache->get_oldest_dirty_from().value_or(start_seq));
return cache->replay_delta(
start_seq,
offsets.record_block_base,
- e);
+ e,
+ last_modified);
});
}).safe_then([this] {
return journal->open_for_write();
segment_cleaner->mark_space_used(
addr,
len ,
+ seastar::lowres_system_clock::time_point(),
+ seastar::lowres_system_clock::time_point(),
/* init_scan = */ true);
}
});
nlextent->get_bptr().c_str());
nlextent->set_laddr(lextent->get_laddr());
nlextent->set_pin(lextent->get_pin().duplicate());
+ nlextent->last_modified = lextent->last_modified;
DEBUGT("rewriting extent -- {} to {}", t, *lextent, *nlextent);
btree_test_base() = default;
+ seastar::lowres_system_clock::time_point get_last_modified(
+ segment_id_t id) const final {
+ return seastar::lowres_system_clock::time_point();
+ }
+
+ seastar::lowres_system_clock::time_point get_last_rewritten(
+ segment_id_t id) const final {
+ return seastar::lowres_system_clock::time_point();
+ }
void update_segment_avail_bytes(paddr_t offset) final {}
segment_id_t get_segment(device_id_t id, segment_seq_t seq) final {
journal_test_t() = default;
+ seastar::lowres_system_clock::time_point get_last_modified(
+ segment_id_t id) const final {
+ return seastar::lowres_system_clock::time_point();
+ }
+
+ seastar::lowres_system_clock::time_point get_last_rewritten(
+ segment_id_t id) const final {
+ return seastar::lowres_system_clock::time_point();
+ }
+
void update_segment_avail_bytes(paddr_t offset) final {}
segment_id_t get_segment(device_id_t id, segment_seq_t seq) final {
replay(
[&advance,
&delta_checker]
- (const auto &offsets, const auto &di) mutable {
+ (const auto &offsets, const auto &di, auto t) mutable {
if (!delta_checker) {
EXPECT_FALSE("No Deltas Left");
}
char contents = distribution(generator);
bufferlist bl;
bl.append(buffer::ptr(buffer::create(blocks * block_size, contents)));
- return extent_t{extent_types_t::TEST_BLOCK, L_ADDR_NULL, bl};
+ return extent_t{
+ extent_types_t::TEST_BLOCK,
+ L_ADDR_NULL,
+ bl};
}
delta_info_t generate_delta(size_t bytes) {