/// send a message over a connection that has completed its handshake
virtual seastar::future<> send(MessageRef msg) = 0;
+ /// send a keepalive message over a connection that has completed its
+ /// handshake
+ virtual seastar::future<> keepalive() = 0;
+
/// close the connection and cancel any any pending futures from read/send
virtual seastar::future<> close() = 0;
#include <algorithm>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
+#include <seastar/net/packet.hh>
#include "Config.h"
#include "Messenger.h"
return f.get_future();
}
+seastar::future<> SocketConnection::keepalive()
+{
+ seastar::shared_future<> f = send_ready.then([this] {
+ k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+ ceph::coarse_real_clock::now());
+ seastar::net::packet msg{reinterpret_cast<const char*>(&k.req),
+ sizeof(k.req)};
+ return out.write(std::move(msg));
+ }).then([this] {
+ return out.flush();
+ });
+ send_ready = f.get_future();
+ return f.get_future();
+}
+
seastar::future<> SocketConnection::close()
{
// unregister_conn() drops a reference, so hold another until completion
{
return in.read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
- auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
- k.reply_stamp = *t;
- std::cout << "keepalive2 " << t->tv_sec << std::endl;
- char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
- return out.write(reinterpret_cast<const char*>(&tag), sizeof(tag));
- }).then([this] {
- out.write(reinterpret_cast<const char*>(&k.reply_stamp),
- sizeof(k.reply_stamp));
+ k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
+ std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
+ seastar::net::packet msg{reinterpret_cast<const char*>(&k.ack),
+ sizeof(k.ack)};
+ return out.write(std::move(msg));
}).then([this] {
return out.flush();
});
static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
struct Keepalive {
- ceph_timespec reply_stamp;
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
+ ceph_timespec stamp;
+ } __attribute__((packed)) req;
+ struct {
+ const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+ ceph_timespec stamp;
+ } __attribute__((packed)) ack;
ceph_timespec ack_stamp;
} k;
seastar::future<> send(MessageRef msg) override;
+ seastar::future<> keepalive() override;
+
seastar::future<> close() override;
uint32_t connect_seq() const override {