extents.erase(*ref);
}
+void Cache::retire_extent(CachedExtentRef ref)
+{
+ logger().debug("retire_extent: {}", *ref);
+ assert(ref->is_valid());
+
+ remove_from_dirty(ref);
+ ref->dirty_from_or_retired_at = JOURNAL_SEQ_MAX;
+ retired_extent_gate.add_extent(*ref);
+ ref->state = CachedExtent::extent_state_t::RETIRED;
+}
+
void Cache::replace_extent(CachedExtentRef next, CachedExtentRef prev)
{
assert(next->get_paddr() == prev->get_paddr());
}
i->state = CachedExtent::extent_state_t::DIRTY;
if (i->version == 1 || i->get_type() == extent_types_t::ROOT) {
- i->dirty_from = seq;
+ i->dirty_from_or_retired_at = seq;
}
}
for (auto &i: t.mutated_block_list) {
i->complete_io();
}
+
+ last_commit = seq;
+ for (auto &i: t.retired_set) {
+ logger().debug("try_construct_record: retiring {}", *i);
+ i->dirty_from_or_retired_at = last_commit;
+ }
+ retired_extent_gate.prune();
}
void Cache::init() {
logger().debug("replay_delta: found root delta");
remove_extent(root);
root->apply_delta_and_adjust_crc(record_base, delta.bl);
- root->dirty_from = journal_seq;
+ root->dirty_from_or_retired_at = journal_seq;
add_extent(root);
return replay_delta_ertr::now();
} else {
assert(extent->last_committed_crc == delta.final_crc);
if (extent->version == 0) {
- extent->dirty_from = journal_seq;
+ extent->dirty_from_or_retired_at = journal_seq;
}
extent->version++;
mark_dirty(extent);
// during write, contents match disk, version == 0
DIRTY, // Same as CLEAN, but contents do not match disk,
// version > 0
+ RETIRED, // In ExtentIndex while in retired_extent_gate
INVALID // Part of no ExtentIndex set
} state = extent_state_t::INVALID;
friend std::ostream &operator<<(std::ostream &, extent_state_t);
// Points at current version while in state MUTATION_PENDING
CachedExtentRef prior_instance;
- /**
- * dirty_from
- *
- * When dirty, indiciates the oldest journal entry which mutates
- * this extent.
- */
- journal_seq_t dirty_from;
-
public:
/**
* duplicate_for_write
out << "CachedExtent(addr=" << this
<< ", type=" << get_type()
<< ", version=" << version
- << ", dirty_from=" << dirty_from
+ << ", dirty_from_or_retired_at=" << dirty_from_or_retired_at
<< ", paddr=" << get_paddr()
<< ", state=" << state
<< ", last_committed_crc=" << last_committed_crc
/// Returns true if extent has not been superceded or retired
bool is_valid() const {
- return state != extent_state_t::INVALID;
+ return state != extent_state_t::INVALID && state != extent_state_t::RETIRED;
+ }
+
+ /// True iff extent is in state RETIRED
+ bool is_retired() const {
+ return state == extent_state_t::RETIRED;
}
/// Returns true if extent or prior_instance has been invalidated
return !is_valid() || (prior_instance && !prior_instance->is_valid());
}
- /**
- * get_dirty_from
- *
- * Return journal location of oldest relevant delta.
- */
- auto get_dirty_from() const { return dirty_from; }
+ /// Return journal location of oldest relevant delta, only valid while DIRTY
+ auto get_dirty_from() const {
+ ceph_assert(is_dirty());
+ return dirty_from_or_retired_at;
+ }
+ /// Return journal location of oldest relevant delta, only valid while RETIRED
+ auto get_retired_at() const {
+ ceph_assert(is_retired());
+ return dirty_from_or_retired_at;
+ }
/**
* get_paddr
using list = boost::intrusive::list<
CachedExtent,
primary_ref_list_member_options>;
+ friend class retired_extent_gate_t;
+
+ /**
+ * dirty_from_or_retired_at
+ *
+ * Encodes ordering token for primary_ref_list -- dirty_from when
+ * dirty or retired_at if retired.
+ */
+ journal_seq_t dirty_from_or_retired_at;
/// Actual data contents
ceph::bufferptr ptr;
CachedExtent(ceph::bufferptr &&ptr) : ptr(std::move(ptr)) {}
CachedExtent(const CachedExtent &other)
: state(other.state),
- dirty_from(other.dirty_from),
+ dirty_from_or_retired_at(other.dirty_from_or_retired_at),
ptr(other.ptr.c_str(), other.ptr.length()),
version(other.version),
poffset(other.poffset) {}
struct share_buffer_t {};
CachedExtent(const CachedExtent &other, share_buffer_t) :
state(other.state),
- dirty_from(other.dirty_from),
+ dirty_from_or_retired_at(other.dirty_from_or_retired_at),
ptr(other.ptr),
version(other.version),
poffset(other.poffset) {}
std::ostream &operator<<(std::ostream &out, const lba_pin_list_t &rhs);
+/**
+ * retired_extent_gate_t
+ *
+ * We need to keep each retired extent in memory until all transactions
+ * that could still reference it has completed. live_tokens tracks the
+ * set of tokens (which will be embedded in Transaction's) still live
+ * in order of the commit after which it was created. retired_extents
+ * lists retired extents ordered by the commit at which they were
+ * retired.
+ */
+class retired_extent_gate_t {
+public:
+ class token_t {
+ friend class retired_extent_gate_t;
+ retired_extent_gate_t *parent = nullptr;
+ journal_seq_t created_after;
+
+ boost::intrusive::list_member_hook<> list_hook;
+ using list_hook_options = boost::intrusive::member_hook<
+ token_t,
+ boost::intrusive::list_member_hook<>,
+ &token_t::list_hook>;
+ using registry = boost::intrusive::list<
+ token_t,
+ list_hook_options>;
+ public:
+ token_t(journal_seq_t created_after) : created_after(created_after) {}
+ ~token_t();
+ };
+
+ void prune() {
+ journal_seq_t prune_to = live_tokens.empty() ?
+ JOURNAL_SEQ_MAX : live_tokens.front().created_after;
+ while (!retired_extents.empty() &&
+ prune_to > retired_extents.front().get_retired_at()) {
+ auto ext = &retired_extents.front();
+ retired_extents.pop_front();
+ intrusive_ptr_release(ext);
+ }
+ }
+
+ void add_token(token_t &t) {
+ t.parent = this;
+ live_tokens.push_back(t);
+ }
+
+ void add_extent(CachedExtent &extent) {
+ intrusive_ptr_add_ref(&extent);
+ retired_extents.push_back(extent);
+ }
+
+private:
+ token_t::registry live_tokens;
+ CachedExtent::list retired_extents;
+};
+
+inline retired_extent_gate_t::token_t::~token_t() {
+ if (parent) {
+ parent->live_tokens.erase(
+ parent->live_tokens.s_iterator_to(*this));
+ parent->prune();
+ parent = nullptr;
+ }
+}
/**
* LogicalCachedExtent
// Identifies segment location on disk, see SegmentManager,
using segment_id_t = uint32_t;
+constexpr segment_id_t MAX_SEG_ID =
+ std::numeric_limits<segment_id_t>::max();
constexpr segment_id_t NULL_SEG_ID =
std::numeric_limits<segment_id_t>::max() - 1;
/* Used to denote relative paddr_t */
using segment_off_t = int32_t;
constexpr segment_off_t NULL_SEG_OFF =
std::numeric_limits<segment_off_t>::max();
+constexpr segment_off_t MAX_SEG_OFF =
+ std::numeric_limits<segment_off_t>::max();
std::ostream &offset_to_stream(std::ostream &, const segment_off_t &t);
using segment_seq_t = uint32_t;
static constexpr segment_seq_t NULL_SEG_SEQ =
std::numeric_limits<segment_seq_t>::max();
+static constexpr segment_seq_t MAX_SEG_SEQ =
+ std::numeric_limits<segment_seq_t>::max();
// Offset of delta within a record
using record_delta_idx_t = uint32_t;
WRITE_EQ_OPERATORS_2(paddr_t, segment, offset)
constexpr paddr_t P_ADDR_NULL = paddr_t{};
constexpr paddr_t P_ADDR_MIN = paddr_t{0, 0};
+constexpr paddr_t P_ADDR_MAX = paddr_t{
+ MAX_SEG_ID,
+ MAX_SEG_OFF
+};
constexpr paddr_t make_record_relative_paddr(segment_off_t off) {
return paddr_t{RECORD_REL_SEG_ID, off};
}
0,
paddr_t{0, 0}
};
+constexpr journal_seq_t JOURNAL_SEQ_MAX{
+ MAX_SEG_SEQ,
+ P_ADDR_MAX
+};
std::ostream &operator<<(std::ostream &out, const journal_seq_t &seq);