crimson/net: define an alias of seastar::stop_iteration
[ceph.git] / src / crimson / net / SocketConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2017 Red Hat, Inc
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #pragma once
16
17 #include <seastar/core/reactor.hh>
18 #include <seastar/core/shared_future.hh>
19
20 #include "msg/Policy.h"
21 #include "Connection.h"
22 #include "Socket.h"
23 #include "crimson/thread/Throttle.h"
24
25 class AuthAuthorizer;
26 class AuthSessionHandler;
27
28 namespace ceph::net {
29
30 using stop_t = seastar::stop_iteration;
31
32 class SocketMessenger;
33 class SocketConnection;
34 using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
35
36 class SocketConnection : public Connection {
37   SocketMessenger& messenger;
38   std::optional<Socket> socket;
39
40   enum class state_t {
41     none,
42     accepting,
43     connecting,
44     open,
45     standby,
46     wait,
47     closing
48   };
49   state_t state = state_t::none;
50
51   /// become valid only when state is state_t::closing
52   seastar::shared_future<> close_ready;
53
54   /// state for handshake
55   struct Handshake {
56     ceph_msg_connect connect;
57     ceph_msg_connect_reply reply;
58     bool got_bad_auth = false;
59     std::unique_ptr<AuthAuthorizer> authorizer;
60     std::chrono::milliseconds backoff;
61     uint32_t connect_seq = 0;
62     uint32_t peer_global_seq = 0;
63     uint32_t global_seq;
64     seastar::promise<> promise;
65   } h;
66
67   /// server side of handshake negotiation
68   seastar::future<> repeat_handle_connect();
69   seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
70                                                  bufferlist&& authorizer_reply);
71   seastar::future<> replace_existing(SocketConnectionRef existing,
72                                      bufferlist&& authorizer_reply,
73                                      bool is_reset_from_peer = false);
74   seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
75                                        bufferlist&& authorizer_reply = {});
76   seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
77                                              bufferlist&& authorizer_reply);
78
79   seastar::future<> handle_keepalive2();
80   seastar::future<> handle_keepalive2_ack();
81
82   bool require_auth_feature() const;
83   uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
84   /// client side of handshake negotiation
85   seastar::future<> repeat_connect();
86   seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
87   void reset_session();
88
89   /// state for an incoming message
90   struct MessageReader {
91     ceph_msg_header header;
92     ceph_msg_footer footer;
93     bufferlist front;
94     bufferlist middle;
95     bufferlist data;
96   } m;
97
98   /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message
99   /// header will follow
100   seastar::promise<> on_message;
101
102   seastar::future<> maybe_throttle();
103   void read_tags_until_next_message();
104   seastar::future<stop_t> handle_ack();
105
106   /// becomes available when handshake completes, and when all previous messages
107   /// have been sent to the output stream. send() chains new messages as
108   /// continuations to this future to act as a queue
109   seastar::future<> send_ready;
110
111   /// encode/write a message
112   seastar::future<> write_message(MessageRef msg);
113
114   ceph::net::Policy<ceph::thread::Throttle> policy;
115   uint64_t features;
116   void set_features(uint64_t new_features) {
117     features = new_features;
118   }
119
120   /// the seq num of the last transmitted message
121   seq_num_t out_seq = 0;
122   /// the seq num of the last received message
123   seq_num_t in_seq = 0;
124   /// update the seq num of last received message
125   /// @returns true if the @c seq is valid, and @c in_seq is updated,
126   ///          false otherwise.
127   bool update_rx_seq(seq_num_t seq);
128
129   seastar::future<MessageRef> do_read_message();
130
131   std::unique_ptr<AuthSessionHandler> session_security;
132
133   // messages to be resent after connection gets reset
134   std::queue<MessageRef> out_q;
135   // messages sent, but not yet acked by peer
136   std::queue<MessageRef> sent;
137   static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
138
139   struct Keepalive {
140     struct {
141       const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
142       ceph_timespec stamp;
143     } __attribute__((packed)) req;
144     struct {
145       const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
146       ceph_timespec stamp;
147     } __attribute__((packed)) ack;
148     ceph_timespec ack_stamp;
149   } k;
150
151   seastar::future<> fault();
152
153  public:
154   SocketConnection(SocketMessenger& messenger,
155                    const entity_addr_t& my_addr);
156   ~SocketConnection();
157
158   Messenger* get_messenger() const override;
159
160   int get_peer_type() const override {
161     return peer_type;
162   }
163
164   bool is_connected() override;
165
166   seastar::future<> send(MessageRef msg) override;
167
168   seastar::future<> keepalive() override;
169
170   seastar::future<> close() override;
171
172  public:
173   /// complete a handshake from the client's perspective
174   seastar::future<> start_connect(const entity_addr_t& peer_addr,
175                                   const entity_type_t& peer_type);
176
177   /// complete a handshake from the server's perspective
178   seastar::future<> start_accept(seastar::connected_socket&& socket,
179                                  const entity_addr_t& peer_addr);
180
181   /// read a message from a connection that has completed its handshake
182   seastar::future<MessageRef> read_message();
183
184   /// the number of connections initiated in this session, increment when a
185   /// new connection is established
186   uint32_t connect_seq() const {
187     return h.connect_seq;
188   }
189
190   /// the client side should connect us with a gseq. it will be reset with
191   /// the one of exsting connection if it's greater.
192   uint32_t peer_global_seq() const {
193     return h.peer_global_seq;
194   }
195   seq_num_t rx_seq_num() const {
196     return in_seq;
197   }
198
199   /// current state of connection
200   state_t get_state() const {
201     return state;
202   }
203   bool is_server_side() const {
204     return policy.server;
205   }
206   bool is_lossy() const {
207     return policy.lossy;
208   }
209
210   /// move all messages in the sent list back into the queue
211   void requeue_sent();
212
213   std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
214     return {out_seq, std::move(out_q)};
215   }
216 };
217
218 } // namespace ceph::net