}
Cache::replay_delta_ret
-Cache::replay_delta(paddr_t record_base, const delta_info_t &delta)
+Cache::replay_delta(
+ journal_seq_t journal_seq,
+ paddr_t record_base,
+ const delta_info_t &delta)
{
if (delta.type == extent_types_t::ROOT) {
logger().debug("replay_delta: found root delta");
root->apply_delta_and_adjust_crc(record_base, delta.bl);
+ root->dirty_from = journal_seq;
return replay_delta_ertr::now();
} else {
return get_extent_by_type(
delta.type,
delta.paddr,
delta.laddr,
- delta.length).safe_then([this, record_base, delta](auto extent) {
+ delta.length).safe_then([=](auto extent) {
logger().debug(
"replay_delta: replaying {} on {}",
*extent,
rt.second.journal_segment_seq;
});
- auto replay_from = segments.rbegin()->second.journal_tail.offset;
+ auto journal_tail = segments.rbegin()->second.journal_tail;
+ segment_provider->update_journal_tail_committed(journal_tail);
+ auto replay_from = journal_tail.offset;
auto from = segments.begin();
if (replay_from != P_ADDR_NULL) {
from = std::find_if(
} else {
replay_from = paddr_t{from->first, (segment_off_t)block_size};
}
- auto ret = std::vector<paddr_t>(segments.end() - from);
+ auto ret = std::vector<journal_seq_t>(segments.end() - from);
std::transform(
from, segments.end(), ret.begin(),
[this](const auto &p) {
- return paddr_t{p.first, (segment_off_t)block_size};
+ return journal_seq_t{
+ p.second.journal_segment_seq,
+ paddr_t{p.first, (segment_off_t)block_size}};
});
- ret[0] = replay_from;
+ ret[0].offset = replay_from;
return find_replay_segments_fut(
find_replay_segments_ertr::ready_future_marker{},
std::move(ret));
{
auto bliter = bl.cbegin();
bliter += ceph::encoded_sizeof_bounded<record_header_t>();
+ logger().debug("{}: decoding {} deltas", __func__, header.deltas);
std::vector<delta_info_t> deltas(header.deltas);
for (auto &&i : deltas) {
try {
Journal::replay_ertr::future<>
Journal::replay_segment(
- paddr_t start,
+ journal_seq_t seq,
delta_handler_t &delta_handler)
{
- logger().debug("replay_segment: starting at {}", start);
+ logger().debug("replay_segment: starting at {}", seq);
return seastar::do_with(
- std::move(start),
- [this, &delta_handler](auto ¤t) {
+ paddr_t(seq.offset),
+ [=, &delta_handler](paddr_t ¤t) {
return crimson::do_until(
- [this, ¤t, &delta_handler]() -> replay_ertr::future<bool> {
+ [=, ¤t, &delta_handler]() -> replay_ertr::future<bool> {
return read_record_metadata(current).safe_then
- ([this, ¤t, &delta_handler](auto p)
+ ([=, ¤t, &delta_handler](auto p)
-> replay_ertr::future<bool> {
if (!p.has_value()) {
return replay_ertr::make_ready_future<bool>(true);
return seastar::do_with(
std::move(*deltas),
- [this, &delta_handler, record_start](auto &deltas) {
+ [=, &delta_handler](auto &deltas) {
return crimson::do_for_each(
deltas,
- [this, &delta_handler, record_start](auto &info) {
+ [=, &delta_handler](auto &info) {
return delta_handler(
+ journal_seq_t{
+ seq.segment_seq,
+ record_start},
record_start.add_offset(block_size),
info);
});
Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
{
return seastar::do_with(
- std::move(delta_handler), std::vector<paddr_t>(),
+ std::move(delta_handler), std::vector<journal_seq_t>(),
[this](auto&& handler, auto&& segments) mutable -> replay_ret {
return find_replay_segments().safe_then(
[this, &handler, &segments](auto replay_segs) {
using replay_ertr = SegmentManager::read_ertr;
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
- replay_ret(paddr_t record_start, const delta_info_t&)>;
+ replay_ret(journal_seq_t seq,
+ paddr_t record_block_base,
+ const delta_info_t&)>;
replay_ret replay(delta_handler_t &&delta_handler);
private:
using find_replay_segments_ertr = crimson::errorator<
crimson::ct_error::input_output_error
>;
- using find_replay_segments_fut =
- find_replay_segments_ertr::future<std::vector<paddr_t>>;
+ using find_replay_segments_fut = find_replay_segments_ertr::future<
+ std::vector<journal_seq_t>>;
find_replay_segments_fut find_replay_segments();
/// read record metadata for record starting at start
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
- paddr_t start, ///< [in] starting addr
+ journal_seq_t start, ///< [in] starting addr, seq
delta_handler_t &delta_handler ///< [in] processes deltas in order
);
};