void Cache::complete_commit(
Transaction &t,
paddr_t final_block_start,
- journal_seq_t seq)
+ journal_seq_t seq,
+ SegmentCleaner *cleaner)
{
if (t.root) {
remove_extent(root);
i->state = CachedExtent::extent_state_t::CLEAN;
logger().debug("complete_commit: fresh {}", *i);
add_extent(i);
+ if (cleaner) {
+ cleaner->mark_space_used(
+ i->get_paddr(),
+ i->get_length());
+ }
}
// Add new copy of mutated blocks, set_io_wait to block until written
}
}
+ if (cleaner) {
+ for (auto &i: t.retired_set) {
+ cleaner->mark_space_free(
+ i->get_paddr(),
+ i->get_length());
+ }
+ }
+
for (auto &i: t.mutated_block_list) {
i->complete_io();
}
void complete_commit(
Transaction &t, ///< [in, out] current transaction
paddr_t final_block_start, ///< [in] offset of initial block
- journal_seq_t seq ///< [in] journal commit seq
+ journal_seq_t seq, ///< [in] journal commit seq
+ SegmentCleaner *cleaner=nullptr ///< [out] optional segment stat listener
);
/**
current_journal_segment->close() :
Segment::close_ertr::now()).safe_then(
[this, old_segment_id] {
- // TODO: pretty sure this needs to be atomic in some sense with
- // making use of the new segment, maybe this bit needs to take
- // the first transaction of the new segment? Or the segment
- // header should include deltas?
- if (old_segment_id != NULL_SEG_ID) {
- segment_provider->put_segment(old_segment_id);
- }
return segment_provider->get_segment();
}).safe_then([this](auto segment) {
return segment_manager.open(segment);
current_journal_segment = sref;
written_to = 0;
return initialize_segment(*current_journal_segment);
+ }).safe_then([=](auto seq) {
+ if (old_segment_id != NULL_SEG_ID) {
+ segment_provider->close_segment(old_segment_id);
+ }
+ segment_provider->set_journal_segment(
+ current_journal_segment->get_segment_id(),
+ seq);
+ return seq;
}).handle_error(
roll_journal_segment_ertr::pass_further{},
crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
return lt.second.journal_segment_seq <
rt.second.journal_segment_seq;
});
+
current_journal_segment_seq =
segments.rbegin()->second.journal_segment_seq + 1;
+ std::for_each(
+ segments.begin(),
+ segments.end(),
+ [this](auto &seg) {
+ segment_provider->init_mark_segment_closed(
+ seg.first,
+ seg.second.journal_segment_seq);
+ });
auto journal_tail = segments.rbegin()->second.journal_tail;
segment_provider->update_journal_tail_committed(journal_tail);
using get_segment_ret = get_segment_ertr::future<segment_id_t>;
virtual get_segment_ret get_segment() = 0;
- /* TODO: we'll want to use this to propogate information about segment contents */
- virtual void put_segment(segment_id_t segment) = 0;
+ virtual void close_segment(segment_id_t) {}
+
+ virtual void set_journal_segment(
+ segment_id_t segment,
+ segment_seq_t seq) {}
virtual journal_seq_t get_journal_tail_target() const = 0;
virtual void update_journal_tail_committed(journal_seq_t tail_committed) = 0;
+ virtual void init_mark_segment_closed(
+ segment_id_t segment, segment_seq_t seq) {}
+
virtual ~JournalSegmentProvider() {}
};
namespace crimson::os::seastore {
+bool SpaceTrackerSimple::equals(const SpaceTrackerI &_other) const
+{
+ const auto &other = static_cast<const SpaceTrackerSimple&>(_other);
+
+ if (other.live_bytes_by_segment.size() != live_bytes_by_segment.size()) {
+ logger().error("{}: different segment counts, bug in test");
+ assert(0 == "segment counts should match");
+ return false;
+ }
+
+ bool all_match = true;
+ for (segment_id_t i = 0; i < live_bytes_by_segment.size(); ++i) {
+ if (other.live_bytes_by_segment[i] != live_bytes_by_segment[i]) {
+ all_match = false;
+ logger().debug(
+ "{}: segment_id {} live bytes mismatch *this: {}, other: {}",
+ __func__,
+ i,
+ live_bytes_by_segment[i],
+ other.live_bytes_by_segment[i]);
+ }
+ }
+ return all_match;
+}
+
+int64_t SpaceTrackerDetailed::SegmentMap::allocate(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len,
+ const extent_len_t block_size)
+{
+ assert(offset % block_size == 0);
+ assert(len % block_size == 0);
+
+ const auto b = (offset / block_size);
+ const auto e = (offset + len) / block_size;
+
+ bool error = false;
+ for (auto i = b; i < e; ++i) {
+ if (bitmap[i]) {
+ if (!error) {
+ logger().error(
+ "SegmentMap::allocate found allocated in {}, {} ~ {}",
+ segment,
+ offset,
+ len);
+ error = true;
+ }
+ logger().debug(
+ "SegmentMap::allocate block {} allocated",
+ i * block_size);
+ }
+ bitmap[i] = true;
+ }
+ return update_usage(block_size);
+}
+
+int64_t SpaceTrackerDetailed::SegmentMap::release(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len,
+ const extent_len_t block_size)
+{
+ assert(offset % block_size == 0);
+ assert(len % block_size == 0);
+
+ const auto b = (offset / block_size);
+ const auto e = (offset + len) / block_size;
+
+ bool error = false;
+ for (auto i = b; i < e; ++i) {
+ if (!bitmap[i]) {
+ if (!error) {
+ logger().error(
+ "SegmentMap::release found unallocated in {}, {} ~ {}",
+ segment,
+ offset,
+ len);
+ error = true;
+ }
+ logger().debug(
+ "SegmentMap::release block {} unallocated",
+ i * block_size);
+ }
+ bitmap[i] = false;
+ }
+ return update_usage(-(int64_t)block_size);
+}
+
+bool SpaceTrackerDetailed::equals(const SpaceTrackerI &_other) const
+{
+ const auto &other = static_cast<const SpaceTrackerDetailed&>(_other);
+
+ if (other.segment_usage.size() != segment_usage.size()) {
+ logger().error("{}: different segment counts, bug in test");
+ assert(0 == "segment counts should match");
+ return false;
+ }
+
+ bool all_match = true;
+ for (segment_id_t i = 0; i < segment_usage.size(); ++i) {
+ if (other.segment_usage[i].get_usage() != segment_usage[i].get_usage()) {
+ all_match = false;
+ logger().error(
+ "{}: segment_id {} live bytes mismatch *this: {}, other: {}",
+ __func__,
+ i,
+ segment_usage[i].get_usage(),
+ other.segment_usage[i].get_usage());
+ }
+ }
+ return all_match;
+}
+
+void SpaceTrackerDetailed::SegmentMap::dump_usage(extent_len_t block_size) const
+{
+ for (unsigned i = 0; i < bitmap.size(); ++i) {
+ if (bitmap[i]) {
+ logger().debug(" {} still live", i * block_size);
+ }
+ }
+}
+
+void SpaceTrackerDetailed::dump_usage(segment_id_t id) const
+{
+ logger().debug("SpaceTrackerDetailed::dump_usage {}", id);
+ segment_usage[id].dump_usage(block_size);
+}
+
SegmentCleaner::get_segment_ret SegmentCleaner::get_segment()
{
- // TODO
+ for (size_t i = 0; i < segments.size(); ++i) {
+ if (segments[i].is_empty()) {
+ mark_open(i);
+ logger().debug("{}: returning segment {}", __func__, i);
+ return get_segment_ret(
+ get_segment_ertr::ready_future_marker{},
+ i);
+ }
+ }
+ assert(0 == "out of space handling todo");
return get_segment_ret(
get_segment_ertr::ready_future_marker{},
- next++);
+ 0);
}
void SegmentCleaner::update_journal_tail_target(journal_seq_t target)
}
}
-void SegmentCleaner::put_segment(segment_id_t segment)
+void SegmentCleaner::close_segment(segment_id_t segment)
{
- return;
+ mark_closed(segment);
}
SegmentCleaner::do_immediate_work_ret SegmentCleaner::do_immediate_work(
});
}
+SegmentCleaner::do_deferred_work_ret SegmentCleaner::do_deferred_work(
+ Transaction &t)
+{
+ return do_deferred_work_ret(
+ do_deferred_work_ertr::ready_future_marker{},
+ ceph::timespan());
+}
+
}
namespace crimson::os::seastore {
class Transaction;
+struct segment_info_t {
+ Segment::segment_state_t state = Segment::segment_state_t::EMPTY;
+
+ // Will be non-null for any segments in the current journal
+ segment_seq_t journal_segment_seq = NULL_SEG_SEQ;
+
+ bool is_empty() const {
+ return state == Segment::segment_state_t::EMPTY;
+ }
+
+ bool is_closed() const {
+ return state == Segment::segment_state_t::CLOSED;
+ }
+
+ bool is_open() const {
+ return state == Segment::segment_state_t::OPEN;
+ }
+};
+
+class SpaceTrackerI {
+public:
+ virtual int64_t allocate(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len) = 0;
+
+ virtual int64_t release(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len) = 0;
+
+ virtual int64_t get_usage(
+ segment_id_t segment) const = 0;
+
+ virtual bool equals(const SpaceTrackerI &other) const = 0;
+
+ virtual std::unique_ptr<SpaceTrackerI> make_empty() const = 0;
+
+ virtual void dump_usage(segment_id_t) const = 0;
+
+ virtual void reset() = 0;
+
+ virtual ~SpaceTrackerI() = default;
+};
+using SpaceTrackerIRef = std::unique_ptr<SpaceTrackerI>;
+
+class SpaceTrackerSimple : public SpaceTrackerI {
+ // Tracks live space for each segment
+ std::vector<int64_t> live_bytes_by_segment;
+
+ int64_t update_usage(segment_id_t segment, int64_t delta) {
+ assert(segment < live_bytes_by_segment.size());
+ live_bytes_by_segment[segment] += delta;
+ assert(live_bytes_by_segment[segment] >= 0);
+ return live_bytes_by_segment[segment];
+ }
+public:
+ SpaceTrackerSimple(size_t num_segments)
+ : live_bytes_by_segment(num_segments, 0) {}
+
+ int64_t allocate(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len) final {
+ return update_usage(segment, len);
+ }
+
+ int64_t release(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len) final {
+ return update_usage(segment, -len);
+ }
+
+ int64_t get_usage(segment_id_t segment) const final {
+ assert(segment < live_bytes_by_segment.size());
+ return live_bytes_by_segment[segment];
+ }
+
+ void dump_usage(segment_id_t) const final {}
+
+ void reset() final {
+ for (auto &i: live_bytes_by_segment)
+ i = 0;
+ }
+
+ SpaceTrackerIRef make_empty() const final {
+ return SpaceTrackerIRef(
+ new SpaceTrackerSimple(live_bytes_by_segment.size()));
+ }
+
+ bool equals(const SpaceTrackerI &other) const;
+};
+
+class SpaceTrackerDetailed : public SpaceTrackerI {
+ class SegmentMap {
+ int64_t used = 0;
+ std::vector<bool> bitmap;
+
+ public:
+ SegmentMap(size_t blocks) : bitmap(blocks, false) {}
+
+ int64_t update_usage(int64_t delta) {
+ used += delta;
+ return used;
+ }
+
+ int64_t allocate(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len,
+ const extent_len_t block_size);
+
+ int64_t release(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len,
+ const extent_len_t block_size);
+
+ int64_t get_usage() const {
+ return used;
+ }
+
+ void dump_usage(extent_len_t block_size) const;
+
+ void reset() {
+ used = 0;
+ for (auto &&i: bitmap) {
+ i = false;
+ }
+ }
+ };
+ const size_t block_size;
+ const size_t segment_size;
+
+ // Tracks live space for each segment
+ std::vector<SegmentMap> segment_usage;
+
+public:
+ SpaceTrackerDetailed(size_t num_segments, size_t segment_size, size_t block_size)
+ : block_size(block_size),
+ segment_size(segment_size),
+ segment_usage(num_segments, segment_size / block_size) {}
+
+ int64_t allocate(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len) final {
+ assert(segment < segment_usage.size());
+ return segment_usage[segment].allocate(segment, offset, len, block_size);
+ }
+
+ int64_t release(
+ segment_id_t segment,
+ segment_off_t offset,
+ extent_len_t len) final {
+ assert(segment < segment_usage.size());
+ return segment_usage[segment].release(segment, offset, len, block_size);
+ }
+
+ int64_t get_usage(segment_id_t segment) const final {
+ assert(segment < segment_usage.size());
+ return segment_usage[segment].get_usage();
+ }
+
+ void dump_usage(segment_id_t seg) const final;
+
+ void reset() final {
+ for (auto &i: segment_usage)
+ i.reset();
+ }
+
+ SpaceTrackerIRef make_empty() const final {
+ return SpaceTrackerIRef(
+ new SpaceTrackerDetailed(
+ segment_usage.size(),
+ segment_size,
+ block_size));
+ }
+
+ bool equals(const SpaceTrackerI &other) const;
+};
+
+
class SegmentCleaner : public JournalSegmentProvider {
public:
/// Config
struct config_t {
size_t num_segments = 0;
size_t segment_size = 0;
+ size_t block_size = 0;
size_t target_journal_segments = 0;
size_t max_journal_segments = 0;
return config_t{
manager.get_num_segments(),
static_cast<size_t>(manager.get_segment_size()),
+ (size_t)manager.get_block_size(),
2,
4};
}
};
private:
- segment_id_t next = 0;
const config_t config;
+ SpaceTrackerIRef space_tracker;
+ std::vector<segment_info_t> segments;
+ size_t empty_segments;
+ int64_t used_bytes = 0;
+ bool init_complete = false;
+
journal_seq_t journal_tail_target;
journal_seq_t journal_tail_committed;
journal_seq_t journal_head;
ExtentCallbackInterface *ecb = nullptr;
public:
- SegmentCleaner(config_t config)
- : config(config) {}
+ SegmentCleaner(config_t config, bool detailed = false)
+ : config(config),
+ space_tracker(
+ detailed ?
+ (SpaceTrackerI*)new SpaceTrackerDetailed(
+ config.num_segments,
+ config.segment_size,
+ config.block_size) :
+ (SpaceTrackerI*)new SpaceTrackerSimple(
+ config.num_segments)),
+ segments(config.num_segments),
+ empty_segments(config.num_segments) {}
get_segment_ret get_segment() final;
- // hack for testing until we get real space handling
- void set_next(segment_id_t _next) {
- next = _next;
- }
- segment_id_t get_next() const {
- return next;
- }
-
+ void close_segment(segment_id_t segment) final;
- void put_segment(segment_id_t segment) final;
+ void set_journal_segment(
+ segment_id_t segment, segment_seq_t seq) final {
+ assert(segment < segments.size());
+ segments[segment].journal_segment_seq = seq;
+ assert(segments[segment].is_open());
+ }
journal_seq_t get_journal_tail_target() const final {
return journal_tail_target;
journal_head = head;
}
+ void init_mark_segment_closed(segment_id_t segment, segment_seq_t seq) final {
+ crimson::get_logger(ceph_subsys_filestore).debug(
+ "SegmentCleaner::init_mark_segment_closed: segment {}, seq {}",
+ segment,
+ seq);
+ mark_closed(segment);
+ segments[segment].journal_segment_seq = seq;
+ }
+
+ void mark_space_used(
+ paddr_t addr,
+ extent_len_t len,
+ bool init_scan = false) {
+ assert(addr.segment < segments.size());
+
+ if (!init_scan && !init_complete)
+ return;
+
+ if (!init_scan) {
+ assert(segments[addr.segment].state == Segment::segment_state_t::OPEN);
+ }
+
+ used_bytes += len;
+ auto ret = space_tracker->allocate(
+ addr.segment,
+ addr.offset,
+ len);
+ assert(ret > 0);
+ }
+
+ void mark_space_free(
+ paddr_t addr,
+ extent_len_t len) {
+ if (!init_complete)
+ return;
+
+ used_bytes -= len;
+ assert(addr.segment < segments.size());
+
+ auto ret = space_tracker->release(
+ addr.segment,
+ addr.offset,
+ len);
+ assert(ret >= 0);
+ }
+
+ SpaceTrackerIRef get_empty_space_tracker() const {
+ return space_tracker->make_empty();
+ }
+
+ void complete_init() { init_complete = true; }
+
void set_extent_callback(ExtentCallbackInterface *cb) {
ecb = cb;
}
+ bool debug_check_space(const SpaceTrackerI &tracker) {
+ return space_tracker->equals(tracker);
+ }
+
/**
* do_immediate_work
*
Transaction &t);
private:
+
+ // journal status helpers
+
journal_seq_t get_dirty_tail() const {
auto ret = journal_head;
ret.segment_seq -= std::min(
config.max_journal_segments);
return ret;
}
+ void mark_closed(segment_id_t segment) {
+ assert(segments.size() > segment);
+ if (init_complete) {
+ assert(segments[segment].is_open());
+ } else {
+ assert(segments[segment].is_empty());
+ assert(empty_segments > 0);
+ --empty_segments;
+ }
+ crimson::get_logger(ceph_subsys_filestore).debug(
+ "mark_closed: empty_segments: {}",
+ empty_segments);
+ segments[segment].state = Segment::segment_state_t::CLOSED;
+ }
+
+ void mark_empty(segment_id_t segment) {
+ assert(segments.size() > segment);
+ assert(segments[segment].is_closed());
+ assert(segments.size() > empty_segments);
+ ++empty_segments;
+ if (space_tracker->get_usage(segment) != 0) {
+ space_tracker->dump_usage(segment);
+ assert(space_tracker->get_usage(segment) == 0);
+ }
+ segments[segment].state = Segment::segment_state_t::EMPTY;
+ }
+
+ void mark_open(segment_id_t segment) {
+ assert(segments.size() > segment);
+ assert(segments[segment].is_empty());
+ assert(empty_segments > 0);
+ --empty_segments;
+ segments[segment].state = Segment::segment_state_t::OPEN;
+ }
};
}
return submit_transaction(std::move(t));
});
});
+ }).safe_then([this] {
+ return seastar::do_with(
+ make_weak_transaction(),
+ [this](auto &t) {
+ assert(segment_cleaner.debug_check_space(
+ *segment_cleaner.get_empty_space_tracker()));
+ return lba_manager.scan_mapped_space(
+ *t,
+ [this](paddr_t addr, extent_len_t len) {
+ logger().debug("TransactionManager::mount: marking {}~{} used",
+ addr,
+ len);
+ segment_cleaner.mark_space_used(
+ addr,
+ len ,
+ /* init_scan = */ true);
+ });
+ });
+ }).safe_then([this] {
+ segment_cleaner.complete_init();
}).handle_error(
mount_ertr::pass_further{},
crimson::ct_error::all_same_way([] {
[this, t=std::move(t)](auto p) mutable {
auto [addr, journal_seq] = p;
segment_cleaner.set_journal_head(journal_seq);
- cache.complete_commit(*t, addr, journal_seq);
+ cache.complete_commit(*t, addr, journal_seq, &segment_cleaner);
lba_manager.complete_transaction(*t);
},
submit_transaction_ertr::pass_further{},
next++);
}
- void put_segment(segment_id_t segment) final {
- return;
- }
-
journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }
void update_journal_tail_committed(journal_seq_t committed) final {}
next++);
}
- void put_segment(segment_id_t segment) final {
- return;
- }
-
journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }
void update_journal_tail_committed(journal_seq_t paddr) final {}
void init() {
segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::default_from_segment_manager(
- *segment_manager));
+ *segment_manager),
+ true);
journal = std::make_unique<Journal>(*segment_manager);
cache = std::make_unique<Cache>(*segment_manager);
lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
get_random_contents());
}
+ bool check_usage() {
+ auto t = create_weak_transaction();
+ SpaceTrackerIRef tracker(segment_cleaner->get_empty_space_tracker());
+ lba_manager->scan_mapped_space(
+ *t.t,
+ [&tracker](auto offset, auto len) {
+ tracker->allocate(
+ offset.segment,
+ offset.offset,
+ len);
+ }).unsafe_get0();
+ return segment_cleaner->debug_check_space(*tracker);
+ }
+
void replay() {
+ logger().debug("{}: begin", __func__);
+ EXPECT_TRUE(check_usage());
tm->close().unsafe_get();
destroy();
static_cast<segment_manager::EphemeralSegmentManager*>(&*segment_manager)->remount();
init();
tm->mount().unsafe_get();
+ logger().debug("{}: end", __func__);
+ }
+
+ void check() {
+ check_mappings();
+ check_usage();
}
void check_mappings() {
'a');
ASSERT_EQ(ADDR, extent->get_laddr());
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
});
}
'a');
ASSERT_EQ(ADDR, extent->get_laddr());
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
+ ASSERT_TRUE(check_usage());
replay();
{
auto t = create_transaction();
SIZE);
auto mut = mutate_extent(t, ext);
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
+ ASSERT_TRUE(check_usage());
replay();
- check_mappings();
+ check();
});
}
'a');
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
replay();
- check_mappings();
+ check();
});
}
-
TEST_F(transaction_manager_test_t, inc_dec_ref)
{
constexpr laddr_t SIZE = 4096;
'a');
ASSERT_EQ(ADDR, extent->get_laddr());
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
replay();
{
auto t = create_transaction();
inc_ref(t, ADDR);
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
{
auto t = create_transaction();
dec_ref(t, ADDR);
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
replay();
{
auto t = create_transaction();
dec_ref(t, ADDR);
check_mappings(t);
- check_mappings();
+ check();
submit_transaction(std::move(t));
- check_mappings();
+ check();
}
});
}
ASSERT_EQ(i * SIZE, extent->get_laddr());
submit_transaction(std::move(t));
}
- check_mappings();
+ check();
});
}
submit_transaction(std::move(t));
}
- for (unsigned i = 0; i < 5; ++i) {
- for (unsigned j = 0; j < 50; ++j) {
+ for (unsigned i = 0; i < 4; ++i) {
+ for (unsigned j = 0; j < 65; ++j) {
auto t = create_transaction();
for (unsigned k = 0; k < 2; ++k) {
auto ext = get_extent(
}
replay();
logger().debug("random_writes: checking");
- check_mappings();
+ check();
logger().debug("random_writes: done replaying/checking");
}
});