%endif
%endif
%bcond_without crimson
+%bcond_with crimson_test_workload
%if 0%{?suse_version} || 0%{?openEuler}
%bcond_with jaeger
%else
%if 0%{with crimson}
-DWITH_CRIMSON:BOOL=ON \
-DWITH_JAEGER:BOOL=OFF \
+%endif
+%if 0%{with crimson_test_workload}
+ -DENABLE_CRIMSON_TEST_WORKLOAD:BOOL=ON \
%endif
-DWITH_GRAFANA:BOOL=ON \
%if %{with sccache}
desc: CPU cores on which POSIX threads alienized to seastar will run in cpuset(7) format
flags:
- startup
+- name: crimson_test_workload
+ type: bool
+ level: dev
+ desc: enable test workload
+ default: false
# Seastore options
level: dev
desc: select write through policy when data length is greater than this value.
default: 512_K
+- name: seastore_test_workload_write_through_probability
+ type: float
+ level: dev
+ desc: the probability of write the incoming data to slow devices
+ default: 0.5
+- name: seastore_test_workload_2Q_promote_probability
+ type: float
+ level: dev
+ desc: the probability of promote the extents to the faster devices
+ default: 0.5
+- name: seastore_test_workload_force_prcess_background_tasks_period
+ type: uint
+ level: dev
+ desc: the seconds of period for force process background tasks
+ default: 5
set_target_properties(crimson-seastore PROPERTIES
JOB_POOL_COMPILE heavy_compile_job_pool)
+
+option(ENABLE_CRIMSON_TEST_WORKLOAD "enable crimson test workload" OFF)
+if ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug")
+ set(ENABLE_CRIMSON_TEST_WORKLOAD ON CACHE BOOL "enable crimson test workload" FORCE)
+endif()
+if(ENABLE_CRIMSON_TEST_WORKLOAD)
+ target_compile_definitions(crimson-seastore PUBLIC CRIMSON_TEST_WORKLOAD)
+endif()
return static_cast<std::size_t>(ret);
}
-seastar::future<> JournalTrimmerImpl::trim() {
+seastar::future<> JournalTrimmerImpl::trim(bool force) {
return seastar::when_all(
- [this] {
- if (should_trim_alloc()) {
+ [this, force] {
+ if (force || should_trim_alloc()) {
return trim_alloc(
).handle_error(
crimson::ct_error::assert_all{
return seastar::now();
}
},
- [this] {
- if (should_start_trim_dirty()) {
+ [this, force] {
+ if (force || should_start_trim_dirty()) {
return trim_dirty(
).handle_error(
crimson::ct_error::assert_all{
Transaction &t,
CachedExtentRef extent) = 0;
+#ifdef CRIMSON_TEST_WORKLOAD
+ virtual promote_extent_ret promote_extents_from_disk(
+ Transaction &t,
+ paddr_t paddr) = 0;
+#endif
+
/**
* demote_region
*
reserved_usage -= usage;
}
- seastar::future<> trim();
+ seastar::future<> trim(bool force);
static JournalTrimmerImplRef create(
store_index_t store_index,
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 sts=2 expandtab
+#include "crimson/common/coroutine.h"
#include "crimson/os/seastore/backref/btree_backref_manager.h"
SET_SUBSYS(seastore_backref);
});
}
+#ifdef CRIMSON_TEST_WORKLOAD
+BtreeBackrefManager::scan_device_ret
+BtreeBackrefManager::scan_device(
+ Transaction &t,
+ paddr_t paddr,
+ scan_device_func_t &f)
+{
+ LOG_PREFIX(BtreeBackrefManager::scan_device);
+ auto c = get_context(t);
+ auto croot = co_await cache.get_root(t);
+ auto btree = BackrefBtree(croot);
+ auto iter = co_await btree.lower_bound(c, paddr);
+ while (!iter.is_end()) {
+ auto key = iter.get_key();
+ auto bentry = cache.get_cached_backref_entry(key);
+ if (bentry) {
+ assert(bentry->paddr == key);
+ }
+ if (bentry && bentry->laddr == L_ADDR_NULL) {
+ INFOT("{} is removed", t, bentry->paddr);
+ iter = co_await iter.next(c);
+ continue;
+ }
+ if (key.get_device_id() == paddr.get_device_id()) {
+ auto val = iter.get_val();
+ if (bentry && bentry->laddr != val.laddr) {
+ INFOT("{} changed from {} to {}",
+ t, bentry->paddr, val.laddr, bentry->laddr);
+ iter = co_await iter.next(c);
+ continue;
+ }
+ INFOT("scanned {}, {}", t, key, val.laddr);
+ auto ret = co_await f(key, val.len, val.type, val.laddr);
+ if (ret == seastar::stop_iteration::yes) {
+ break;
+ }
+ } else if (key.get_device_id() > paddr.get_device_id()) {
+ break;
+ }
+ iter = co_await iter.next(c);
+ }
+ co_return;
+}
+#endif
+
base_iertr::future<> _init_cached_extent(
op_context_t c,
const CachedExtentRef &e,
Transaction &t,
scan_mapped_space_func_t &&f) final;
+#ifdef CRIMSON_TEST_WORKLOAD
+ scan_device_ret scan_device(
+ Transaction &t,
+ paddr_t paddr,
+ scan_device_func_t &f) final;
+#endif
+
init_cached_extent_ret init_cached_extent(
Transaction &t,
CachedExtentRef e) final;
Transaction &t,
scan_mapped_space_func_t &&f) = 0;
+#ifdef CRIMSON_TEST_WORKLOAD
+ using scan_device_ret = base_iertr::future<>;
+ using scan_device_func_t = std::function<
+ base_iertr::future<seastar::stop_iteration>(
+ paddr_t, extent_len_t, extent_types_t, laddr_t)>;
+ virtual scan_device_ret scan_device(
+ Transaction &t,
+ paddr_t start,
+ scan_device_func_t &f) = 0;
+#endif
+
virtual ~BackrefManager() {}
};
ExtentPlacementManager &epm,
store_index_t store_index)
: epm(epm),
+#ifdef CRIMSON_TEST_WORKLOAD
+ force_backref(crimson::common::get_conf<bool>("crimson_test_workload")),
+#endif
pinboard(create_extent_pinboard(
crimson::common::get_conf<Option::size_t>(
"seastore_cachepin_size_pershard"),
}
bool can_drop_backref() const {
+#ifdef CRIMSON_TEST_WORKLOAD
+ return epm.is_pure_rbm() && !force_backref;
+#else
return epm.is_pure_rbm();
+#endif
}
void update_read_ratio(Transaction &t) {
return res;
}
+ std::optional<backref_entry_t> get_cached_backref_entry(paddr_t addr) {
+ auto it = backref_entry_mset.lower_bound(
+ addr,
+ backref_entry_t::cmp_t());
+ if (it == backref_entry_mset.end()) {
+ return std::nullopt;
+ }
+ while (it->paddr == addr) {
+ auto &backref_entry = *it;
+ ++it;
+ if (it->paddr != addr || it == backref_entry_mset.end()) {
+ return backref_entry;
+ }
+ }
+ return std::nullopt;
+ }
+
const backref_entry_mset_t& get_backref_entry_mset() {
return backref_entry_mset;
}
transaction_id_t next_id = 0;
+#ifdef CRIMSON_TEST_WORKLOAD
+ const bool force_backref = false;
+#endif
+
/**
* dirty
*
prior.last_committed_crc = extent.last_committed_crc;
prior.dirty_from = extent.dirty_from;
prior.length = extent.length;
- prior.loaded_length = extent.loaded_length;
- prior.buffer_space = std::move(extent.buffer_space);
+ // XXX: at present, zero loaded_length extents here
+ // must have been created by promoting/demoting them.
+ if (likely(extent.loaded_length != 0)) {
+ assert(prior.loaded_length == extent.loaded_length);
+ prior.buffer_space = std::move(extent.buffer_space);
+ }
// XXX: We can go ahead and change the prior's version because
// transactions don't hold a local view of the version field,
// unlike FixedKVLeafNode::modifications
}
size_t get_promotion_size() const {
+#ifdef CRIMSON_TEST_WORKLOAD
+ return current_contents >= promotion_size
+ ? current_contents
+ : promotion_size;
+#else
return current_contents;
+#endif
}
void set_background_callback(BackgroundListener *l) {
}
bool should_run_promote() const {
- return enabled() && current_contents >= promotion_size;
+ return enabled() &&
+ (current_contents >= promotion_size
+#ifdef CRIMSON_TEST_WORKLOAD
+ || (crimson::common::get_conf<bool>("crimson_test_workload")
+ && current_contents != 0)
+#endif
+ );
}
std::size_t get_promoted_size() const {
std::size_t promote_size = 0;
std::list<CachedExtentRef> extents;
DEBUGT("start promote", t);
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (current_contents < promotion_size
+ && crimson::common::get_conf<bool>("crimson_test_workload")) {
+ auto id = epm.get_cold_device_id();
+ paddr_t start = P_ADDR_NULL;
+ if (device_id_to_paddr_type(id) == paddr_types_t::SEGMENT) {
+ start = paddr_t::make_seg_paddr(id, 0, 0);
+ } else {
+ start = paddr_t::make_blk_paddr(id, 0);
+ }
+ co_await ecb->promote_extents_from_disk(t, start);
+ } else {
+#endif
for (auto &extent : list) {
DEBUGT("promote {} to the hot tier", t, extent);
ceph_assert(extent.is_stable_clean());
co_await trans_intr::make_interruptible(extent->wait_io());
co_await ecb->promote_extent(t, extent);
}
+#ifdef CRIMSON_TEST_WORKLOAD
+ }
+#endif
// existing extents in lru will be retired after transaction submitted
co_await ecb->submit_transaction_direct(t);
promoted_count += extents.size();
// to the warm out queue.
continue;
}
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (promoter.should_promote_extent(*extent)
+ && crimson::common::get_conf<bool>("crimson_test_workload")
+ && (double(std::rand() % 100) / 100.0) <= crimson::common::get_conf<double>(
+ "seastore_test_workload_2Q_promote_probability")) {
+ promoter.add_extent(*extent);
+ continue;
+ }
+#endif
auto lext = extent->cast<LogicalCachedExtent>();
auto laddr = lext->get_laddr();
auto end = extent->get_last_touch_end();
{
assert(is_running());
while (is_running()) {
- if (background_should_run()) {
+ if (background_should_run()
+#ifdef CRIMSON_TEST_WORKLOAD
+ || force_run_background()
+#endif
+ ) {
log_state("run(background)");
co_await do_background_cycle();
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (test_workload && force_process_state != ForceProcessState::STOP) {
+ last_process_state = force_process_state;
+ force_process_state = ForceProcessState::STOP;
+ set_next_arm_timepoint();
+ }
+#endif
} else {
log_state("run(block)");
assert(!blocking_background);
}
}
+ bool force_trim = false;
+ bool should_abort_cleaner_usage = true;
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (test_workload && force_process_state == ForceProcessState::TRIM) {
+ if (!proceed_trim) {
+ should_abort_cleaner_usage = false;
+ }
+ proceed_trim = true;
+ force_trim = true;
+ }
+#endif
+
if (proceed_trim) {
DEBUG("started trimming...");
- return trimmer->trim(
- ).finally([this, trim_usage, FNAME] {
+ return trimmer->trim(force_trim
+ ).finally([this, trim_usage, should_abort_cleaner_usage, FNAME] {
DEBUG("finished trimming");
- abort_cleaner_usage(trim_usage, {true, true});
+ if (should_abort_cleaner_usage) {
+ abort_cleaner_usage(trim_usage, {true, true});
+ }
});
} else {
assert(!proceed_trim);
proceed_demote = true;
}
+ bool abort_cold_cleaner_usage = true;
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (test_workload && force_process_state == ForceProcessState::CLEAN) {
+ if (!proceed_clean_main) {
+ abort_cold_cleaner_usage = false;
+ }
+ proceed_clean_main = main_cleaner->can_clean_space();
+ if (has_cold_tier()) {
+ proceed_clean_cold = cold_cleaner->can_clean_space();
+ }
+ if (logical_bucket) {
+ proceed_demote = logical_bucket->could_demote();
+ }
+ }
+#endif
+
if (!proceed_clean_main && !proceed_clean_cold && !proceed_demote) {
ceph_abort_msg("no background process will start");
}
return seastar::when_all(
- [this, FNAME, proceed_clean_main,
+ [this, FNAME, proceed_clean_main, abort_cold_cleaner_usage,
should_clean_main_for_trim, main_cold_usage] {
if (!proceed_clean_main) {
return seastar::now();
crimson::ct_error::assert_all{
"do_background_cycle encountered invalid error in main clean_space"
}
- ).finally([this, main_cold_usage, FNAME] {
+ ).finally([this, main_cold_usage, abort_cold_cleaner_usage, FNAME] {
DEBUG("finished clean main");
- abort_cold_usage(main_cold_usage, true);
+ if (abort_cold_cleaner_usage) {
+ abort_cold_usage(main_cold_usage, true);
+ }
});
},
[this, FNAME, proceed_clean_cold,
}
write_policy_t get_write_policy(extent_types_t type, extent_len_t length) const {
- if (has_cold_tier() && length >= write_through_size && is_data_type(type)) {
- return write_policy_t::WRITE_THROUGH;
+ if (has_cold_tier() && is_data_type(type)) {
+ if (length >= write_through_size
+#ifdef CRIMSON_TEST_WORKLOAD
+ || (crimson::common::get_conf<bool>("crimson_test_workload")
+ && (double(std::rand() % 100) / 100.0) <= crimson::common::get_conf<double>(
+ "seastore_test_workload_write_through_probability"))
+#endif
+ ) {
+ return write_policy_t::WRITE_THROUGH;
+ }
}
return write_policy_t::WRITE_BACK;
}
return hot_tier_generations - 1;
}
+#ifdef CRIMSON_TEST_WORKLOAD
+ device_id_t get_cold_device_id() const {
+ return background_process.get_cold_device_id();
+ }
+#endif
+
private:
rewrite_gen_t adjust_generation(
data_category_t category,
get_conf<Option::size_t>("seastore_logical_bucket_capacity"),
get_conf<Option::size_t>("seastore_logical_bucket_proceed_size_per_cycle"));
}
+#ifdef CRIMSON_TEST_WORKLOAD
+ LOG_PREFIX(BackgroundProcess::init);
+ test_workload = crimson::common::get_conf<bool>("crimson_test_workload");
+ force_process_half_life = crimson::common::get_conf<uint64_t>(
+ "seastore_test_workload_force_prcess_background_tasks_period");
+ force_background_timer.set_callback([this] { wake_half_life(); });
+ SUBINFO(seastore_epm, "crimson test workload supported, enabled: {}", test_workload);
+ if (test_workload) {
+ set_next_arm_timepoint();
+ }
+#endif
}
LogicalBucket *get_logical_bucket() {
}
}
+
+#ifdef CRIMSON_TEST_WORKLOAD
+ device_id_t get_cold_device_id() const {
+ assert(has_cold_tier());
+ return *cold_cleaner->get_device_ids().begin();
+ }
+#endif
+
seastar::future<> reserve_projected_usage(io_usage_t usage);
void release_projected_usage(const io_usage_t &usage) {
// to make sure the condition is consistent.
bool background_should_run() {
assert(is_ready());
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (test_workload) {
+ force_fast_evict();
+ } else {
+ maybe_update_eviction_mode();
+ }
+#else
maybe_update_eviction_mode();
+#endif
return main_cleaner_should_run()
|| cold_cleaner_should_run()
|| trimmer->should_trim();
}
}
+#ifdef CRIMSON_TEST_WORKLOAD
+ void force_fast_evict() {
+ if (has_cold_tier()) {
+ eviction_state.force_fast_evict();
+ }
+ }
+#endif
+
struct eviction_state_t {
enum class eviction_mode_t {
STOP, // generation greater than or equal to MIN_COLD_GENERATION
eviction_mode = eviction_mode_t::FAST;
}
}
+
+#ifdef CRIMSON_TEST_WORKLOAD
+ void force_fast_evict() {
+ eviction_mode = eviction_mode_t::FAST;
+ }
+#endif
};
seastar::future<> do_background_cycle();
state_t state = state_t::STOP;
eviction_state_t eviction_state;
+#ifdef CRIMSON_TEST_WORKLOAD
+ enum class ForceProcessState {
+ STOP,
+ TRIM,
+ CLEAN,
+ };
+ bool test_workload = false;
+ ForceProcessState force_process_state = ForceProcessState::STOP;
+ ForceProcessState last_process_state = ForceProcessState::STOP;
+ seastar::timer<seastar::steady_clock_type> force_background_timer;
+ int force_process_half_life;
+
+ void set_next_arm_timepoint() {
+ assert(test_workload);
+ force_background_timer.arm(std::chrono::seconds(force_process_half_life));
+ }
+
+ void wake_half_life() {
+ assert(test_workload);
+ if (last_process_state == ForceProcessState::TRIM) {
+ force_process_state = ForceProcessState::CLEAN;
+ } else {
+ force_process_state = ForceProcessState::TRIM;
+ }
+
+ do_wake_background();
+ }
+
+ bool force_run_background() const {
+ return test_workload && force_process_state != ForceProcessState::STOP
+ && (logical_bucket && logical_bucket->could_demote());
+ }
+#endif
friend class ::transaction_manager_test_t;
};
co_await rdir.close();
}
+#ifdef CRIMSON_TEST_WORKLOAD
+ if (sds.empty() && crimson::common::get_conf<bool>("crimson_test_workload")) {
+ // lbc test workload enabled while no secondary devices indicated, create one
+ std::string path = fmt::format("{}/block.1", root);
+ co_await seastar::make_directory(path);
+ DeviceRef sec_dev = co_await Device::make_device(path, dtype, btype);
+ auto p_sec_dev = sec_dev.get();
+ secondaries.emplace_back(std::move(sec_dev));
+ co_await p_sec_dev->start(store_shard_nums);
+ magic_t magic = (magic_t)std::rand();
+ device_id_t id = 0x1;
+ sds.emplace(id, device_spec_t{magic, dtype, btype, id});
+ co_await p_sec_dev->mkfs(
+ device_config_t::create_secondary(new_osd_fsid, id, dtype, btype, magic)
+ ).handle_error(crimson::ct_error::assert_all{"not possible"});
+ co_await set_secondaries();
+ }
+#endif
device_id_t id = 0;
device_type_t d_type = device->get_device_type();
backend_type_t b_type = device->get_backend_type();
t, mapping, std::move(promoted_extents));
}
+#ifdef CRIMSON_TEST_WORKLOAD
+TransactionManager::promote_extent_ret
+TransactionManager::promote_extents_from_disk(
+ Transaction &t,
+ paddr_t paddr)
+{
+ using scan_device_func_t = BackrefManager::scan_device_func_t;
+ std::size_t size = 0;
+ scan_device_func_t func = [this, &t, &size](
+ paddr_t paddr, extent_len_t length, extent_types_t type, laddr_t laddr)
+ -> base_iertr::future<seastar::stop_iteration> {
+ if (type != extent_types_t::OBJECT_DATA_BLOCK) {
+ co_return seastar::stop_iteration::no;
+ }
+ auto cursor = co_await lba_manager->get_cursor(t, laddr
+ ).handle_error_interruptible(
+ crimson::ct_error::enoent::assert_failure(),
+ crimson::ct_error::pass_further_all{}
+ );
+ assert(cursor->is_direct());
+ if (cursor->has_shadow_paddr()) {
+ // the extent is already promoted to the hot tier
+ co_return seastar::stop_iteration::no;
+ }
+ auto extent = co_await read_cursor_by_type(t, std::move(cursor), type);
+ if (extent->is_stable_dirty()) {
+ // dirty extents shouldn't be promoted as is in
+ // the real world
+ co_return seastar::stop_iteration::no;
+ }
+ co_await promote_extent(t, extent.get());
+ size += length;
+ if (size >= crimson::common::get_conf<
+ Option::size_t>("seastore_cache_promotion_size")) {
+ co_return seastar::stop_iteration::yes;
+ } else {
+ co_return seastar::stop_iteration::no;
+ }
+ };
+ co_await backref_manager->scan_device(t, paddr, func);
+}
+#endif
+
TransactionManager::rewrite_extents_ret TransactionManager::rewrite_extents(
Transaction &t,
std::vector<CachedExtentRef> &extents,
store_index,
*backref_manager, trimmer_config,
backend_type, roll_start, roll_size,
- !pure_rbm_backend);
+ !pure_rbm_backend
+#ifdef CRIMSON_TEST_WORKLOAD
+ || crimson::common::get_conf<bool>("crimson_test_workload")
+#endif
+ );
AsyncCleanerRef cleaner;
JournalRef journal;
using ExtentCallbackInterface::promote_extent_ret;
promote_extent_ret promote_extent(
Transaction &t,
- CachedExtentRef extent);
+ CachedExtentRef extent) final;
+
+#ifdef CRIMSON_TEST_WORKLOAD
+ promote_extent_ret promote_extents_from_disk(
+ Transaction &t,
+ paddr_t paddr) final;
+#endif
using ExtentCallbackInterface::demote_region_res_t;
using ExtentCallbackInterface::demote_region_ret;
if (run_clean) {
return epm->run_background_work_until_halt();
} else {
- return epm->background_process.trimmer->trim();
+ return epm->background_process.trimmer->trim(false);
}
}).handle_error(
crimson::ct_error::assert_all{