return ms->_send_message(m, this);
}
+void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp)
+{
+ /* If con is not in READY state, we need to queue the request */
+ if (cstate.session_state.read() != XioConnection::UP) {
+ pthread_spin_lock(&sp);
+ if (cstate.session_state.read() != XioConnection::UP) {
+ if (ack) {
+ outgoing.ack = true;
+ outgoing.ack_time = *tp;
+ }
+ else {
+ outgoing.keepalive = true;
+ }
+ pthread_spin_unlock(&sp);
+ return;
+ }
+ pthread_spin_unlock(&sp);
+ }
+
+ send_keepalive_or_ack_internal(ack, tp);
+}
+
+void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
+{
+ XioCommand *xcmd = pool_alloc_xio_command(this);
+ if (! xcmd) {
+ /* could happen if Accelio has been shutdown */
+ return;
+ }
+
+ struct ceph_timespec ts;
+ if (ack) {
+ assert(tp);
+ tp->encode_timeval(&ts);
+ xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
+ xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
+ } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+ utime_t t = ceph_clock_now(msgr->cct);
+ t.encode_timeval(&ts);
+ xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2);
+ xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
+ } else {
+ xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE);
+ }
+
+ const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
+ assert(header.size() == 1); /* accelio header must be without scatter gather */
+ list<bufferptr>::const_iterator pb = header.begin();
+ assert(pb->length() < XioMsgHdr::get_max_encoded_length());
+ struct xio_msg * msg = xcmd->get_xio_msg();
+ msg->out.header.iov_base = (char*) pb->c_str();
+ msg->out.header.iov_len = pb->length();
+
+ ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base)
+ << " len " << msg->out.header.iov_len << dendl;
+
+ portal->enqueue_for_send(this, xcmd);
+}
+
+
int XioConnection::passive_setup()
{
/* XXX passive setup is a placeholder for (potentially active-side
ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
<< (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
+ //header_len_without_tag is only meaningful in case we have tag
+ size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag);
+
switch(tag) {
case CEPH_MSGR_TAG_MSG:
ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
return handle_data_msg(session, msg, more_in_batch, cb_user_context);
+ case CEPH_MSGR_TAG_KEEPALIVE:
+ ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
+ set_last_keepalive(ceph_clock_now(nullptr));
+ break;
+
+ case CEPH_MSGR_TAG_KEEPALIVE2:
+ if (header_len_without_tag < sizeof(ceph_timespec)) {
+ lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag <<
+ " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
+ }
+ else {
+ ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
+ utime_t kp_t = utime_t(*t);
+ ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl;
+ send_keepalive_or_ack(true, &kp_t);
+ set_last_keepalive(ceph_clock_now(nullptr));
+ }
+
+ break;
+
+ case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+ if (header_len_without_tag < sizeof(ceph_timespec)) {
+ lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag <<
+ " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
+ }
+ else {
+ ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
+ utime_t kp_t(*t);
+ ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl;
+ set_last_keepalive_ack(kp_t);
+ }
+ break;
+
default:
- assert(! "unrecognized message tag");
+ lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
+ assert(! "unsupported message tag");
}
+ xio_release_msg(msg);
return 0;
}
if (! (flags & CState::OP_FLAG_LOCKED))
pthread_spin_lock(&sp);
+ if (outgoing.keepalive) {
+ outgoing.keepalive = false;
+ send_keepalive_or_ack_internal();
+ }
+
+ if (outgoing.ack) {
+ outgoing.ack = false;
+ send_keepalive_or_ack_internal(true, &outgoing.ack_time);
+ }
+
// send deferred 1 (direct backpresssure)
if (outgoing.requeue.size() > 0)
portal->requeue(this, outgoing.requeue);
XioSubmit::Queue::const_iterator i2 = deferred_q.end();
deferred_q.splice(i2, outgoing.requeue);
+ outgoing.keepalive = outgoing.ack = false;
+
if (! (flags & CState::OP_FLAG_LOCKED))
pthread_spin_unlock(&sp);
ms->ds_dispatch(m);
return 0;
}
+
+void XioLoopbackConnection::send_keepalive()
+{
+ utime_t t = ceph_clock_now(nullptr);
+ set_last_keepalive(t);
+ set_last_keepalive_ack(t);
+}
#include "include/atomic.h"
#include "auth/AuthSessionHandler.h"
-#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \
- ~CEPH_FEATURE_MSGR_KEEPALIVE2)
+#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
+
#define XIO_NOP_TAG_MARKDOWN 0x0001
// message submission queue
struct SendQ {
+ bool keepalive;
+ bool ack;
+ utime_t ack_time;
Message::Queue mqueue; // deferred
XioSubmit::Queue requeue;
+
+ SendQ():keepalive(false), ack(false){}
} outgoing;
// conns_entity_map comparison functor
bool is_connected() override { return connected.read(); }
int send_message(Message *m) override;
- void send_keepalive() override {}
+ void send_keepalive() override {send_keepalive_or_ack();}
+ void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
void mark_down() override;
int _mark_down(uint32_t flags);
void mark_disposable() override;
int flush_out_queues(uint32_t flags);
int discard_input_queue(uint32_t flags);
int adjust_clru(uint32_t flags);
+private:
+ void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
};
typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
bool is_connected() override { return true; }
int send_message(Message *m) override;
- void send_keepalive() override {}
+ void send_keepalive() override;
void mark_down() override {}
void mark_disposable() override {}