device(std::move(dev)),
max_object_size(
get_conf<uint64_t>("seastore_default_max_object_size")),
- is_test(is_test)
+ is_test(is_test),
+ throttler(
+ get_conf<uint64_t>("seastore_max_concurrent_transactions"))
{
register_metrics();
}
}
);
}
+
+ metrics.add_group(
+ "seastore",
+ {
+ sm::make_gauge(
+ "concurrent_transactions",
+ [this] {
+ return throttler.get_current();
+ },
+ sm::description("transactions that are running inside seastore")
+ )
+ }
+ );
}
seastar::future<> SeaStore::stop()
#include "include/uuid.h"
#include "os/Transaction.h"
+#include "crimson/common/throttle.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
transaction_manager->create_transaction(src, tname)),
std::forward<F>(f),
[this, op_type](auto &ctx, auto &f) {
- return ctx.transaction->get_handle().take_collection_lock(
- static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
- ).then([&, this] {
+ return throttler.get(1).then([&ctx] {
+ return ctx.transaction->get_handle().take_collection_lock(
+ static_cast<SeastoreCollection&>(*(ctx.ch)).ordering_lock
+ );
+ }).then([&, this] {
return repeat_eagain([&, this] {
ctx.reset_preserve_handle(*transaction_manager);
return std::invoke(f, ctx);
}).then([this, op_type, &ctx] {
add_latency_sample(op_type,
std::chrono::steady_clock::now() - ctx.begin_timestamp);
+ }).finally([this] {
+ throttler.put();
});
}
);
CollectionManagerRef collection_manager;
OnodeManagerRef onode_manager;
+ common::Throttle throttler;
+
using tm_iertr = TransactionManager::base_iertr;
using tm_ret = tm_iertr::future<>;
tm_ret _do_transaction_step(