1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <seastar/core/reactor.hh>
18 #include <seastar/core/shared_future.hh>
20 #include "msg/Policy.h"
21 #include "Connection.h"
23 #include "crimson/thread/Throttle.h"
26 class AuthSessionHandler;
30 using stop_t = seastar::stop_iteration;
32 class SocketMessenger;
33 class SocketConnection;
34 using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
36 class SocketConnection : public Connection {
37 SocketMessenger& messenger;
38 std::optional<Socket> socket;
49 state_t state = state_t::none;
51 /// become valid only when state is state_t::closing
52 seastar::shared_future<> close_ready;
54 /// state for handshake
56 ceph_msg_connect connect;
57 ceph_msg_connect_reply reply;
58 bool got_bad_auth = false;
59 std::unique_ptr<AuthAuthorizer> authorizer;
60 std::chrono::milliseconds backoff;
61 uint32_t connect_seq = 0;
62 uint32_t peer_global_seq = 0;
64 seastar::promise<> promise;
67 /// server side of handshake negotiation
68 seastar::future<> repeat_handle_connect();
69 seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
70 bufferlist&& authorizer_reply);
71 seastar::future<> replace_existing(SocketConnectionRef existing,
72 bufferlist&& authorizer_reply,
73 bool is_reset_from_peer = false);
74 seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
75 bufferlist&& authorizer_reply = {});
76 seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
77 bufferlist&& authorizer_reply);
79 seastar::future<> handle_keepalive2();
80 seastar::future<> handle_keepalive2_ack();
82 bool require_auth_feature() const;
83 uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
84 /// client side of handshake negotiation
85 seastar::future<> repeat_connect();
86 seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
89 /// state for an incoming message
90 struct MessageReader {
91 ceph_msg_header header;
92 ceph_msg_footer footer;
98 /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message
99 /// header will follow
100 seastar::promise<> on_message;
102 seastar::future<> maybe_throttle();
103 void read_tags_until_next_message();
104 seastar::future<stop_t> handle_ack();
106 /// becomes available when handshake completes, and when all previous messages
107 /// have been sent to the output stream. send() chains new messages as
108 /// continuations to this future to act as a queue
109 seastar::future<> send_ready;
111 /// encode/write a message
112 seastar::future<> write_message(MessageRef msg);
114 ceph::net::Policy<ceph::thread::Throttle> policy;
116 void set_features(uint64_t new_features) {
117 features = new_features;
120 /// the seq num of the last transmitted message
121 seq_num_t out_seq = 0;
122 /// the seq num of the last received message
123 seq_num_t in_seq = 0;
124 /// update the seq num of last received message
125 /// @returns true if the @c seq is valid, and @c in_seq is updated,
127 bool update_rx_seq(seq_num_t seq);
129 seastar::future<MessageRef> do_read_message();
131 std::unique_ptr<AuthSessionHandler> session_security;
133 // messages to be resent after connection gets reset
134 std::queue<MessageRef> out_q;
135 // messages sent, but not yet acked by peer
136 std::queue<MessageRef> sent;
137 static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
141 const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
143 } __attribute__((packed)) req;
145 const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
147 } __attribute__((packed)) ack;
148 ceph_timespec ack_stamp;
151 seastar::future<> fault();
154 SocketConnection(SocketMessenger& messenger,
155 const entity_addr_t& my_addr);
158 Messenger* get_messenger() const override;
160 int get_peer_type() const override {
164 bool is_connected() override;
166 seastar::future<> send(MessageRef msg) override;
168 seastar::future<> keepalive() override;
170 seastar::future<> close() override;
173 /// complete a handshake from the client's perspective
174 seastar::future<> start_connect(const entity_addr_t& peer_addr,
175 const entity_type_t& peer_type);
177 /// complete a handshake from the server's perspective
178 seastar::future<> start_accept(seastar::connected_socket&& socket,
179 const entity_addr_t& peer_addr);
181 /// read a message from a connection that has completed its handshake
182 seastar::future<MessageRef> read_message();
184 /// the number of connections initiated in this session, increment when a
185 /// new connection is established
186 uint32_t connect_seq() const {
187 return h.connect_seq;
190 /// the client side should connect us with a gseq. it will be reset with
191 /// the one of exsting connection if it's greater.
192 uint32_t peer_global_seq() const {
193 return h.peer_global_seq;
195 seq_num_t rx_seq_num() const {
199 /// current state of connection
200 state_t get_state() const {
203 bool is_server_side() const {
204 return policy.server;
206 bool is_lossy() const {
210 /// move all messages in the sent list back into the queue
213 std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
214 return {out_seq, std::move(out_q)};
218 } // namespace ceph::net