ConfigProxy::ConfigProxy(const EntityName& name, std::string_view cluster)
{
- if (seastar::engine().cpu_id() != 0) {
+ if (seastar::this_shard_id() != 0) {
return;
}
// set the initial value on CPU#0
};
Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
- : sid{seastar::engine().cpu_id()},
+ : sid{seastar::this_shard_id()},
socket(std::move(_socket)),
in(socket.input()),
// the default buffer size 8192 is too small that may impact our write
FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
seastar::future<> listen(entity_addr_t addr) {
- assert(seastar::engine().cpu_id() == cpu);
+ assert(seastar::this_shard_id() == cpu);
logger().trace("FixedCPUServerSocket::listen({})...", addr);
return container().invoke_on_all([addr] (auto& ss) {
ss.addr = addr;
// seastar::future<>(SocketRef, entity_addr_t)
template <typename Func>
seastar::future<> accept(Func&& fn_accept) {
- assert(seastar::engine().cpu_id() == cpu);
+ assert(seastar::this_shard_id() == cpu);
logger().trace("FixedCPUServerSocket({})::accept()...", addr);
return container().invoke_on_all(
[fn_accept = std::move(fn_accept)] (auto& ss) mutable {
[&ss, fn_accept = std::move(fn_accept)]
(seastar::accept_result accept_result) mutable {
// assert seastar::listen_options::set_fixed_cpu() works
- assert(seastar::engine().cpu_id() == ss.cpu);
+ assert(seastar::this_shard_id() == ss.cpu);
auto [socket, paddr] = std::move(accept_result);
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
}
seastar::future<> shutdown() {
- assert(seastar::engine().cpu_id() == cpu);
+ assert(seastar::this_shard_id() == cpu);
logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
return container().invoke_on_all([] (auto& ss) {
if (ss.listener) {
}
seastar::future<> destroy() {
- assert(seastar::engine().cpu_id() == cpu);
+ assert(seastar::this_shard_id() == cpu);
return shutdown().then([this] {
// we should only construct/stop shards on #0
return container().invoke_on(0, [] (auto& ss) {
}
static seastar::future<FixedCPUServerSocket*> create() {
- auto cpu = seastar::engine().cpu_id();
+ auto cpu = seastar::this_shard_id();
// we should only construct/stop shards on #0
return seastar::smp::submit_to(0, [cpu] {
auto service = std::make_unique<sharded_service_t>();
bool SocketConnection::is_connected() const
{
- assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::this_shard_id() == shard_id());
return protocol->is_connected();
}
#ifdef UNIT_TESTS_BUILT
bool SocketConnection::is_closed() const
{
- assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::this_shard_id() == shard_id());
return protocol->is_closed();
}
bool SocketConnection::is_closed_clean() const
{
- assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::this_shard_id() == shard_id());
return protocol->is_closed_clean;
}
seastar::future<> SocketConnection::send(MessageRef msg)
{
- assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::this_shard_id() == shard_id());
return protocol->send(std::move(msg));
}
seastar::future<> SocketConnection::keepalive()
{
- assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::this_shard_id() == shard_id());
return protocol->keepalive();
}
void SocketConnection::mark_down()
{
- assert(seastar::engine().cpu_id() == shard_id());
+ assert(seastar::this_shard_id() == shard_id());
protocol->close(false);
}
const std::string& logic_name,
uint32_t nonce)
: Messenger{myname},
- master_sid{seastar::engine().cpu_id()},
+ master_sid{seastar::this_shard_id()},
logic_name{logic_name},
nonce{nonce}
{}
seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
auto my_addrs = addrs;
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
ceph_assert(addrs.front().get_family() == AF_INET);
return set_myaddrs(addrs).then([this] {
if (!listener) {
}
seastar::future<> SocketMessenger::start(Dispatcher *disp) {
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
dispatcher = disp;
if (listener) {
ceph_assert(get_myaddr().get_port() > 0);
return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
*this, *dispatcher, get_myaddr().is_msgr2());
conn->start_accept(std::move(socket), peer_addr);
crimson::net::ConnectionRef
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
// make sure we connect to a valid peer_addr
ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
seastar::future<> SocketMessenger::shutdown()
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
return seastar::futurize_invoke([this] {
if (listener) {
auto d_listener = listener;
seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
{
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
if (!need_addr) {
if ((!get_myaddr().is_any() &&
get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
const entity_name_t& peer_name) override;
// can only wait once
seastar::future<> wait() override {
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
return shutdown_promise.get_future();
}
void register_conn(SocketConnectionRef);
void unregister_conn(SocketConnectionRef);
seastar::shard_id shard_id() const {
- assert(seastar::engine().cpu_id() == master_sid);
+ assert(seastar::this_shard_id() == master_sid);
return master_sid;
}
};
g_ceph_context = cct.get();
cct->_conf.set_config_values(values);
store = std::make_unique<BlueStore>(cct.get(), path);
- tp = std::make_unique<crimson::thread::ThreadPool>(1, 128, seastar::engine().cpu_id() + 10);
+ tp = std::make_unique<crimson::thread::ThreadPool>(1, 128, seastar::this_shard_id() + 10);
}
seastar::future<> AlienStore::start()
ceph::os::Transaction&& txn)
{
logger().debug("{}", __func__);
- auto id = seastar::engine().cpu_id();
+ auto id = seastar::this_shard_id();
auto done = seastar::promise<>();
return seastar::do_with(
std::move(txn),
}).then([] {
return crimson::common::sharded_conf().invoke_on_all([](Config& config) {
return config.set_val(test_uint_option,
- std::to_string(seastar::engine().cpu_id()));
+ std::to_string(seastar::this_shard_id()));
});
}).then([] {
auto expected = crimson::common::local_conf().get_val<uint64_t>(test_uint_option);
static seastar::future<> test_perfcounters(){
return crimson::common::sharded_perf_coll().start().then([] {
return crimson::common::sharded_perf_coll().invoke_on_all([] (auto& s){
- std::string name =fmt::format("seastar-osd::shard-{}",seastar::engine().cpu_id());
+ std::string name =fmt::format("seastar-osd::shard-{}",seastar::this_shard_id());
PerfCountersBuilder plb(NULL, name, PERFTEST_FIRST,PERFTEST_LAST);
plb.add_u64_counter(PERFTEST_INDEX, "perftest_count", "count perftest");
auto perf_logger = plb.create_perf_counters();
// cb_client() on CPU#0, cb_server() on CPU#1
template <typename FuncC, typename FuncS>
static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
- assert(engine().cpu_id() == 0u);
+ assert(seastar::this_shard_id() == 0u);
auto owner = std::make_unique<SocketFactory>();
auto psf = owner.get();
return seastar::smp::submit_to(1u, [psf] {
bufferlist msg_data;
Server(unsigned msg_len)
- : msgr_sid{seastar::engine().cpu_id()},
+ : msgr_sid{seastar::this_shard_id()},
msg_len{msg_len} {
lname = "server#";
lname += std::to_string(msgr_sid);
seastar::promise<> stopped_send_promise;
Client(unsigned jobs, unsigned msg_len, unsigned depth)
- : sid{seastar::engine().cpu_id()},
+ : sid{seastar::this_shard_id()},
jobs{jobs},
msg_len{msg_len},
nr_depth{depth/jobs},
// should start messenger at this shard?
bool is_active() {
- ceph_assert(seastar::engine().cpu_id() == sid);
+ ceph_assert(seastar::this_shard_id() == sid);
return sid != 0 && sid <= jobs;
}
private:
seastar::future<> send_msg(crimson::net::Connection* conn) {
- ceph_assert(seastar::engine().cpu_id() == sid);
+ ceph_assert(seastar::this_shard_id() == sid);
return depth.wait(1).then([this, conn] {
const static pg_t pgid;
const static object_locator_t oloc;
}
void do_dispatch_messages(crimson::net::Connection* conn) {
- ceph_assert(seastar::engine().cpu_id() == sid);
+ ceph_assert(seastar::this_shard_id() == sid);
ceph_assert(sent_count == 0);
conn_stats.start_time = mono_clock::now();
// forwarded to stopped_send_promise