});
}
-seastar::future<> Client::ms_dispatch(ceph::net::ConnectionRef conn,
+seastar::future<> Client::ms_dispatch(ceph::net::Connection* conn,
MessageRef m)
{
switch(m->get_type()) {
});
}
-seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef,
+seastar::future<> Client::handle_mgr_map(ceph::net::Connection*,
Ref<MMgrMap> m)
{
mgrmap = m->get_map();
}
}
-seastar::future<> Client::handle_mgr_conf(ceph::net::ConnectionRef conn,
+seastar::future<> Client::handle_mgr_conf(ceph::net::Connection* conn,
Ref<MMgrConfigure> m)
{
logger().info("{} {}", __func__, *m);
seastar::future<> start();
seastar::future<> stop();
private:
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+ seastar::future<> ms_dispatch(ceph::net::Connection* conn,
Ref<Message> m) override;
seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
- seastar::future<> handle_mgr_map(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_mgr_map(ceph::net::Connection* conn,
Ref<MMgrMap> m);
- seastar::future<> handle_mgr_conf(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_mgr_conf(ceph::net::Connection* conn,
Ref<MMgrConfigure> m);
seastar::future<> reconnect();
void report();
}
seastar::future<>
-Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
+Client::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
{
// we only care about these message types
switch (m->get_type()) {
}
}
-seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn,
+seastar::future<> Client::handle_monmap(ceph::net::Connection* conn,
Ref<MMonMap> m)
{
monmap.decode(m->monmapbl);
}
}
-seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn,
+seastar::future<> Client::handle_auth_reply(ceph::net::Connection* conn,
Ref<MAuthReply> m)
{
logger().info("mon {} => {} returns {}: {}",
private:
void tick();
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+ seastar::future<> ms_dispatch(ceph::net::Connection* conn,
MessageRef m) override;
seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
AuthAuthorizer* ms_get_authorizer(peer_type_t peer) const override;
- seastar::future<> handle_monmap(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_monmap(ceph::net::Connection* conn,
Ref<MMonMap> m);
- seastar::future<> handle_auth_reply(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_auth_reply(ceph::net::Connection* conn,
Ref<MAuthReply> m);
seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m);
seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m);
public:
virtual ~Dispatcher() {}
- virtual seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef m) {
+ virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) {
return seastar::make_ready_future<>();
}
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
logger().debug("{} <= {}@{} === {}", messenger,
msg->get_source(), conn.peer_addr, *msg);
- return dispatcher.ms_dispatch(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()),
- std::move(msg))
+ return dispatcher.ms_dispatch(&conn, std::move(msg))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
ceph_assert(false);
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
- return dispatcher.ms_dispatch(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()),
- std::move(msg))
- .handle_exception([this] (std::exception_ptr eptr) {
+ return dispatcher.ms_dispatch(&conn, std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
ceph_assert(false);
});
seastar::future<>
-ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn,
+ChainedDispatchers::ms_dispatch(ceph::net::Connection* conn,
MessageRef m) {
return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) {
return dispatcher->ms_dispatch(conn, m);
void push_back(Dispatcher* dispatcher) {
dispatchers.push_back(dispatcher);
}
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override;
+ seastar::future<> ms_dispatch(ceph::net::Connection* conn, MessageRef m) override;
seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override;
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override;
seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
});
}
-seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::ms_dispatch(ceph::net::Connection* conn,
MessageRef m)
{
switch (m->get_type()) {
});
}
-seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::handle_osd_ping(ceph::net::Connection* conn,
Ref<MOSDPing> m)
{
switch (m->op) {
}
}
-seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::handle_ping(ceph::net::Connection* conn,
Ref<MOSDPing> m)
{
auto min_message = static_cast<uint32_t>(
return conn->send(reply);
}
-seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::handle_reply(ceph::net::Connection* conn,
Ref<MOSDPing> m)
{
const osd_id_t from = m->get_source().num();
const entity_addrvec_t& get_back_addrs() const;
// Dispatcher methods
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+ seastar::future<> ms_dispatch(ceph::net::Connection* conn,
MessageRef m) override;
seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
AuthAuthorizer* ms_get_authorizer(peer_type_t peer) const override;
private:
- seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_osd_ping(ceph::net::Connection* conn,
Ref<MOSDPing> m);
- seastar::future<> handle_ping(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_ping(ceph::net::Connection* conn,
Ref<MOSDPing> m);
- seastar::future<> handle_reply(ceph::net::ConnectionRef conn,
+ seastar::future<> handle_reply(ceph::net::Connection* conn,
Ref<MOSDPing> m);
seastar::future<> handle_you_died();
seastar::future<> stop() {
return seastar::make_ready_future<>();
}
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
active_session->connected_time = mono_clock::now();
return seastar::now();
}
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
struct ServerDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
std::cout << "server got ping " << *m << std::endl;
// reply with a pong
struct ClientDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
std::cout << "client got pong " << *m << std::endl;
++count;
seastar::future<> stop() {
return seastar::make_ready_future<>();
}
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
std::bernoulli_distribution keepalive_dist;
ceph::net::Messenger *msgr = nullptr;
std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
- std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+ std::map<ceph::net::Connection*, PingSessionRef> sessions;
MessageRef msg_ping{new MPing(), false};
ceph::auth::DummyAuthClientServer dummy_auth;
: rounds(rounds),
keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
- PingSessionRef find_session(ceph::net::ConnectionRef c) {
+ PingSessionRef find_session(ceph::net::Connection* c) {
auto found = sessions.find(c);
if (found == sessions.end()) {
ceph_assert(false);
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
auto session = seastar::make_shared<PingSession>();
- auto [i, added] = sessions.emplace(conn, session);
+ auto [i, added] = sessions.emplace(conn.get(), session);
std::ignore = i;
ceph_assert(added);
session->connected_time = mono_clock::now();
return seastar::now();
}
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
auto session = find_session(c);
++(session->count);
}
if (session->count == rounds) {
- logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+ logger().info("{}: finished receiving {} pongs", *c, session->count);
session->finish_time = mono_clock::now();
- return container().invoke_on_all([conn = c.get()](auto &client) {
- auto found = client.pending_conns.find(conn);
+ return container().invoke_on_all([c](auto &client) {
+ auto found = client.pending_conns.find(c);
ceph_assert(found != client.pending_conns.end());
found->second.set_value();
});
}
}).finally([this, conn, start_time] {
return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
- auto session = client.find_session((*conn)->shared_from_this());
+ auto session = client.find_session(&**conn);
std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
logger().info("{}: handshake {}, pingpong {}",
seastar::promise<> on_done; // satisfied when first dispatch unblocks
ceph::auth::DummyAuthClientServer dummy_auth;
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
switch (++count) {
case 1: