return handle_keepalive2_ack()
.then([this] { return stop_t::no; });
case CEPH_MSGR_TAG_CLOSE:
- std::cout << "close" << std::endl;
+ logger().info("{} got tag close", *this);
break;
}
return seastar::make_ready_future<stop_t>(stop_t::no);
ceph_assert(state == state_t::connecting);
close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
+ logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state));
state = state_t::closing;
return close_ready.get_future();
}
return socket->read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
- std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
+ logger().info("{} keepalive2 {}", *this, k.ack.stamp.tv_sec);
return socket->write_flush(make_static_packet(k.ack));
});
}
.then([this] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
k.ack_stamp = *t;
- std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
+ logger().info("{} keepalive2 ack {}", *this, t->tv_sec);
});
}
peer_addr = _peer_addr;
peer_type = _peer_type;
messenger.register_conn(this);
+ logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
state = state_t::connecting;
seastar::with_gate(pending_dispatch, [this] {
return seastar::connect(peer_addr.in4_addr())
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
+ logger().warn("{} connecting fault: {}", *this, eptr);
close();
});
});
peer_addr = _peer_addr;
socket.emplace(std::move(fd));
messenger.accept_conn(this);
+ logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
state = state_t::accepting;
seastar::with_gate(pending_dispatch, [this] {
// encode/send server's handshake header
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
+ logger().warn("{} accepting fault: {}", *this, eptr);
close();
});
});
void
SocketConnection::execute_open()
{
+ logger().debug("{} trigger open, was {}", *this, static_cast<int>(state));
state = state_t::open;
seastar::with_gate(pending_dispatch, [this] {
// start background processing of tags
} else {
throw e;
}
- }).handle_exception([] (std::exception_ptr eptr) {
+ }).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
+ logger().warn("{} open fault: {}", *this, eptr);
});
});
}
}
return seastar::sleep(h.backoff);
}
+
+void SocketConnection::print(ostream& out) const {
+ messenger.print(out);
+ out << " >> " << peer_addr;
+}