class Connection : public boost::intrusive_ref_counter<Connection,
boost::thread_unsafe_counter> {
protected:
- Messenger *const messenger;
entity_addr_t my_addr;
entity_addr_t peer_addr;
public:
- Connection(Messenger *messenger, const entity_addr_t& my_addr,
+ Connection(const entity_addr_t& my_addr,
const entity_addr_t& peer_addr)
- : messenger(messenger), my_addr(my_addr), peer_addr(peer_addr) {}
+ : my_addr(my_addr), peer_addr(peer_addr) {}
virtual ~Connection() {}
- Messenger* get_messenger() const { return messenger; }
-
+ virtual Messenger* get_messenger() const = 0;
const entity_addr_t& get_my_addr() const { return my_addr; }
const entity_addr_t& get_peer_addr() const { return peer_addr; }
virtual int get_peer_type() const = 0;
/// true if the handshake has completed and no errors have been encountered
virtual bool is_connected() = 0;
- /// complete a handshake from the client's perspective
- virtual seastar::future<> client_handshake(entity_type_t peer_type,
- entity_type_t host_type) = 0;
-
- /// complete a handshake from the server's perspective
- virtual seastar::future<> server_handshake() = 0;
-
- /// read a message from a connection that has completed its handshake
- virtual seastar::future<MessageRef> read_message() = 0;
-
/// send a message over a connection that has completed its handshake
virtual seastar::future<> send(MessageRef msg) = 0;
/// close the connection and cancel any any pending futures from read/send
virtual seastar::future<> close() = 0;
-
- /// move all messages in the sent list back into the queue
- virtual void requeue_sent() = 0;
-
- /// get all messages in the out queue
- virtual std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() = 0;
-
-public:
- enum class state_t {
- none,
- open,
- standby,
- closed,
- wait
- };
- /// the number of connections initiated in this session, increment when a
- /// new connection is established
- virtual uint32_t connect_seq() const = 0;
-
- /// the client side should connect us with a gseq. it will be reset with a
- /// the one of exsting connection if it's greater.
- virtual uint32_t peer_global_seq() const = 0;
-
- virtual seq_num_t rx_seq_num() const = 0;
-
- /// current state of connection
- virtual state_t get_state() const = 0;
- virtual bool is_server_side() const = 0;
- virtual bool is_lossy() const = 0;
};
} // namespace ceph::net
#include <boost/intrusive_ptr.hpp>
-#include "Errors.h"
#include "msg/msg_types.h"
#include "msg/Message.h"
using peer_type_t = int;
using auth_proto_t = int;
-class Message;
-using MessageRef = boost::intrusive_ptr<Message>;
-
namespace ceph::net {
using msgr_tag_t = uint8_t;
class Messenger;
} // namespace ceph::net
-
}
return ++global_seq;
}
- virtual ConnectionRef lookup_conn(const entity_addr_t&) = 0;
- virtual void unregister_conn(ConnectionRef) = 0;
// @returns a tuple of <is_valid, auth_reply, session_key>
virtual seastar::future<msgr_tag_t, /// tag for error, 0 if authorized
*
*/
+#include "SocketConnection.h"
+
#include <algorithm>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/net/packet.hh>
-#include "Config.h"
-#include "Messenger.h"
-#include "SocketConnection.h"
-
#include "include/msgr.h"
#include "include/random.h"
#include "auth/Auth.h"
#include "auth/AuthSessionHandler.h"
-#include "msg/Message.h"
#include "crimson/common/log.h"
+#include "Config.h"
+#include "Errors.h"
+#include "SocketMessenger.h"
using namespace ceph::net;
}
}
-SocketConnection::SocketConnection(Messenger *messenger,
+SocketConnection::SocketConnection(SocketMessenger& messenger,
const entity_addr_t& my_addr,
const entity_addr_t& peer_addr,
seastar::connected_socket&& fd)
- : Connection(messenger, my_addr, peer_addr),
+ : Connection(my_addr, peer_addr),
+ messenger(messenger),
socket(std::move(fd)),
in(socket.input()),
out(socket.output()),
send_ready.ignore_ready_future();
}
+ceph::net::Messenger*
+SocketConnection::get_messenger() const {
+ return &messenger;
+}
+
bool SocketConnection::is_connected()
{
return !send_ready.failed();
seastar::future<> SocketConnection::write_message(MessageRef msg)
{
msg->set_seq(++out_seq);
- msg->encode(features, get_messenger()->get_crc_flags());
+ msg->encode(features, messenger.get_crc_flags());
bufferlist bl;
bl.append(CEPH_MSGR_TAG_MSG);
auto& header = msg->get_header();
bl.append((const char*)&footer, sizeof(footer));
} else {
ceph_msg_footer_old old_footer;
- if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) {
+ if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
old_footer.front_crc = footer.front_crc;
old_footer.middle_crc = footer.middle_crc;
} else {
old_footer.front_crc = old_footer.middle_crc = 0;
}
- if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) {
+ if (messenger.get_crc_flags() & MSG_CRC_DATA) {
old_footer.data_crc = footer.data_crc;
} else {
old_footer.data_crc = 0;
state = state_t::closed;
// unregister_conn() drops a reference, so hold another until completion
- auto cleanup = [conn = ConnectionRef(this)] {};
+ auto cleanup = [conn = SocketConnectionRef(this)] {};
- get_messenger()->unregister_conn(this);
+ messenger.unregister_conn(this);
// close_ready become valid only after state is state_t::closed
assert(!close_ready.valid());
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
CEPH_MSGR_TAG_FEATURES, bufferlist{});
}
- return get_messenger()->verify_authorizer(get_peer_type(),
- h.connect.authorizer_protocol,
- authorizer);
+ return messenger.verify_authorizer(get_peer_type(),
+ h.connect.authorizer_protocol,
+ authorizer);
}).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
memset(&h.reply, 0, sizeof(h.reply));
if (tag) {
return send_connect_reply(tag, std::move(authorizer_reply));
}
- if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) {
+ if (auto existing = messenger.lookup_conn(peer_addr); existing) {
return handle_connect_with_existing(existing, std::move(authorizer_reply));
} else if (h.connect.connect_seq > 0) {
return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
bufferlist&& authorizer_reply)
{
- h.global_seq = get_messenger()->get_global_seq();
+ h.global_seq = messenger.get_global_seq();
h.reply.tag = tag;
h.reply.features = policy.features_supported;
h.reply.global_seq = h.global_seq;
}
seastar::future<>
-SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply)
+SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
{
if (h.connect.global_seq < existing->peer_global_seq()) {
h.reply.global_seq = existing->peer_global_seq();
h.reply.connect_seq = existing->connect_seq() + 1;
return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
}
- } else if (get_peer_addr() < get_my_addr() ||
+ } else if (peer_addr < my_addr ||
existing->is_server_side()) {
// incoming wins
return replace_existing(existing, std::move(authorizer_reply));
}
}
-seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
+seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing,
bufferlist&& authorizer_reply,
bool is_reset_from_peer)
{
} else {
reply_tag = CEPH_MSGR_TAG_READY;
}
- get_messenger()->unregister_conn(existing);
+ messenger.unregister_conn(existing);
if (!existing->is_lossy()) {
// reset the in_seq if this is a hard reset from peer,
// otherwise we respect our original connection's value
}
h.got_bad_auth = true;
// try harder
- return get_messenger()->get_authorizer(h.peer_type, true)
+ return messenger.get_authorizer(h.peer_type, true)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
return seastar::now();
reset_session();
return seastar::now();
case CEPH_MSGR_TAG_RETRY_GLOBAL:
- h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq);
+ h.global_seq = messenger.get_global_seq(h.reply.global_seq);
return seastar::now();
case CEPH_MSGR_TAG_RETRY_SESSION:
ceph_assert(h.reply.connect_seq > h.connect_seq);
// this is fyi, actually, server decides!
h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
- return get_messenger()->get_authorizer(peer_type, false)
+ return messenger.get_authorizer(peer_type, false)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
bufferlist bl;
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
- h.global_seq = get_messenger()->get_global_seq();
+ h.global_seq = messenger.get_global_seq();
return out.write(std::move(bl)).then([this] { return out.flush(); });
}).then([=] {
return seastar::do_until([=] { return state == state_t::open; },
seastar::future<> SocketConnection::fault()
{
if (policy.lossy) {
- get_messenger()->unregister_conn(this);
+ messenger.unregister_conn(this);
}
if (h.backoff.count()) {
h.backoff += h.backoff;
#include "Connection.h"
#include "crimson/thread/Throttle.h"
+class AuthAuthorizer;
class AuthSessionHandler;
namespace ceph::net {
+class SocketMessenger;
+class SocketConnection;
+using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
+
class SocketConnection : public Connection {
+ SocketMessenger& messenger;
seastar::connected_socket socket;
seastar::input_stream<char> in;
seastar::output_stream<char> out;
+ enum class state_t {
+ none,
+ open,
+ standby,
+ closed,
+ wait
+ };
state_t state = state_t::none;
/// become valid only when state is state_t::closed
/// server side of handshake negotiation
seastar::future<> handle_connect();
- seastar::future<> handle_connect_with_existing(ConnectionRef existing,
+ seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
bufferlist&& authorizer_reply);
- seastar::future<> replace_existing(ConnectionRef existing,
+ seastar::future<> replace_existing(SocketConnectionRef existing,
bufferlist&& authorizer_reply,
bool is_reset_from_peer = false);
seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
seastar::future<> handle_keepalive2_ack();
bool require_auth_feature() const;
- int get_peer_type() const override {
- return h.connect.host_type;
- }
uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
/// client side of handshake negotiation
seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
seastar::future<> fault();
public:
- SocketConnection(Messenger *messenger,
+ SocketConnection(SocketMessenger& messenger,
const entity_addr_t& my_addr,
const entity_addr_t& peer_addr,
seastar::connected_socket&& socket);
~SocketConnection();
- bool is_connected() override;
-
- seastar::future<> client_handshake(entity_type_t peer_type,
- entity_type_t host_type) override;
+ Messenger* get_messenger() const override;
- seastar::future<> server_handshake() override;
+ int get_peer_type() const override {
+ return h.connect.host_type;
+ }
- seastar::future<MessageRef> read_message() override;
+ bool is_connected() override;
seastar::future<> send(MessageRef msg) override;
seastar::future<> close() override;
- uint32_t connect_seq() const override {
+ public:
+ /// complete a handshake from the client's perspective
+ seastar::future<> client_handshake(entity_type_t peer_type,
+ entity_type_t host_type);
+
+ /// complete a handshake from the server's perspective
+ seastar::future<> server_handshake();
+
+ /// read a message from a connection that has completed its handshake
+ seastar::future<MessageRef> read_message();
+
+ /// the number of connections initiated in this session, increment when a
+ /// new connection is established
+ uint32_t connect_seq() const {
return h.connect_seq;
}
- uint32_t peer_global_seq() const override {
+
+ /// the client side should connect us with a gseq. it will be reset with
+ /// the one of exsting connection if it's greater.
+ uint32_t peer_global_seq() const {
return h.peer_global_seq;
}
seq_num_t rx_seq_num() const {
return in_seq;
}
- state_t get_state() const override {
+
+ /// current state of connection
+ state_t get_state() const {
return state;
}
- bool is_server_side() const override {
+ bool is_server_side() const {
return policy.server;
}
- bool is_lossy() const override {
+ bool is_lossy() const {
return policy.lossy;
}
-private:
- void requeue_sent() override;
- std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() override {
+ /// move all messages in the sent list back into the queue
+ void requeue_sent();
+
+ std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
return {out_seq, std::move(out_q)};
}
-
};
} // namespace ceph::net
*
*/
+#include "SocketMessenger.h"
+
#include <tuple>
+
#include "auth/Auth.h"
-#include "SocketMessenger.h"
-#include "SocketConnection.h"
+#include "Errors.h"
#include "Dispatcher.h"
-#include "msg/Message.h"
using namespace ceph::net;
listener = seastar::listen(address, lo);
}
-seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
+seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
{
auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
std::ignore = i;
entity_addr_t peer_addr;
peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- ConnectionRef conn = new SocketConnection(this, get_myaddr(),
- peer_addr, std::move(socket));
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(),
+ peer_addr, std::move(socket));
// initiate the handshake
return conn->server_handshake()
.then([=] {
}
return seastar::connect(addr.in4_addr())
.then([=] (seastar::connected_socket socket) {
- ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
- std::move(socket));
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), addr,
+ std::move(socket));
// complete the handshake before returning to the caller
return conn->client_handshake(peer_type, get_myname().type())
.then([=] {
// dispatch replies on this connection
dispatch(conn)
.handle_exception([] (std::exception_ptr eptr) {});
- return conn;
+ return ConnectionRef(conn);
});
});
}
policy_set.set_throttlers(peer_type, throttle, nullptr);
}
-ceph::net::ConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
+ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
{
if (auto found = connections.find(addr);
found != connections.end()) {
}
}
-void SocketMessenger::unregister_conn(ConnectionRef conn)
+void SocketMessenger::unregister_conn(SocketConnectionRef conn)
{
ceph_assert(conn);
auto found = connections.find(conn->get_peer_addr());
#include "msg/Policy.h"
#include "Messenger.h"
+#include "SocketConnection.h"
#include "crimson/thread/Throttle.h"
namespace ceph::net {
class SocketMessenger final : public Messenger {
std::optional<seastar::server_socket> listener;
Dispatcher *dispatcher = nullptr;
- std::map<entity_addr_t, ConnectionRef> connections;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
using Throttle = ceph::thread::Throttle;
ceph::net::PolicySet<Throttle> policy_set;
seastar::gate pending_dispatch;
- seastar::future<> dispatch(ConnectionRef conn);
+ seastar::future<> dispatch(SocketConnectionRef conn);
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
entity_type_t peer_type) override;
seastar::future<> shutdown() override;
- void set_default_policy(const SocketPolicy& p);
- void set_policy(entity_type_t peer_type, const SocketPolicy& p);
- void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
- ConnectionRef lookup_conn(const entity_addr_t& addr) override;
- void unregister_conn(ConnectionRef) override;
+
seastar::future<msgr_tag_t, bufferlist>
verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth) override;
+
seastar::future<std::unique_ptr<AuthAuthorizer>>
get_authorizer(peer_type_t peer_type,
bool force_new) override;
+
+ public:
+ void set_default_policy(const SocketPolicy& p);
+ void set_policy(entity_type_t peer_type, const SocketPolicy& p);
+ void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
+
+ SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+ void unregister_conn(SocketConnectionRef);
};
} // namespace ceph::net