]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
xio: initial mark_* and queueing/flow control
authorMatt Benjamin <matt@cohortfs.com>
Mon, 15 Dec 2014 19:53:09 +0000 (14:53 -0500)
committerMatt Benjamin <matt@cohortfs.com>
Wed, 14 Jan 2015 21:44:11 +0000 (16:44 -0500)
This changes implements explicit support for Accelio sender-side
flow control, which requires queuing messages for later delivery
when the connection is ready to send.

This rquirement to queue messages for later delivery, and related
connection state logic, is substantially shared with new session
reset behavior, so we've pulled a subset of that logic foward.

Again due to shared implementation logic, this change also adds
implementations of mark_down(), mark_down_all(), mark_disposable(),
and related methods from Messenger, which were required to be
implemented after Hammer.

Add XioSubmit.h.

For now, start at state UP, READY.

When considering if a flow-controlled connection can be unblocked,
consider only the computed queue depth.  Re-activate and flush the
connection iff the computed queue depth <= 1/2 of the queue
high-water mark.

Placeholder added for byte-throttled case.
Fix lock flags abuse (found by Casey).

Discard deferred and unsent messages on unplanned disconnect.
The change causes discard_input_queue() to be called in Accelio's
on_disconnect_event() handler, as well as on mark_down().

xio: Change new established connection's state to up and ready
     Change the new established passive connection's state to up and
     ready then flush all pending msgs in input_queue

Signed-off-by: Matt Benjamin <matt@cohortfs.com>
Signed-off-by: Vu Pham <vu@mellanox.com>
Signed-off-by: Matt Benjamin <matt@cohortfs.com>
12 files changed:
src/common/buffer.cc
src/include/buffer.h
src/messages/MNop.h [new file with mode: 0644]
src/msg/Message.h
src/msg/xio/XioConnection.cc
src/msg/xio/XioConnection.h
src/msg/xio/XioMessenger.cc
src/msg/xio/XioMessenger.h
src/msg/xio/XioMsg.cc
src/msg/xio/XioMsg.h
src/msg/xio/XioPortal.h
src/msg/xio/XioSubmit.h [new file with mode: 0644]

index 18d95df18c7e7ecd5d7d35d5499396e0bcee275b..88656e836bd29fc8d51f94e6a30024e8bbf0e3d4 100644 (file)
@@ -535,9 +535,9 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
 #if defined(HAVE_XIO)
   class buffer::xio_msg_buffer : public buffer::raw {
   private:
-    XioCompletionHook* m_hook;
+    XioDispatchHook* m_hook;
   public:
-    xio_msg_buffer(XioCompletionHook* _m_hook, const char *d,
+    xio_msg_buffer(XioDispatchHook* _m_hook, const char *d,
        unsigned l) :
       raw((char*)d, l), m_hook(_m_hook->get()) {}
 
@@ -576,7 +576,7 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
   }
 
   buffer::raw* buffer::create_msg(
-      unsigned len, char *buf, XioCompletionHook *m_hook) {
+      unsigned len, char *buf, XioDispatchHook* m_hook) {
     XioPool& pool = m_hook->get_pool();
     buffer::raw* bp =
       static_cast<buffer::raw*>(pool.alloc(sizeof(xio_msg_buffer)));
index 5b5eae4b5426b51e31080aa6504638a7fdee5d8b..d243d6e0535dd60b6e11123aef6b4fbf3913cd45 100644 (file)
@@ -60,7 +60,7 @@
 
 #if defined(HAVE_XIO)
 struct xio_mempool_obj;
-class XioCompletionHook;
+class XioDispatchHook;
 #endif
 
 namespace ceph {
@@ -160,7 +160,7 @@ public:
   static raw* create_unshareable(unsigned len);
 
 #if defined(HAVE_XIO)
-  static raw* create_msg(unsigned len, char *buf, XioCompletionHook *m_hook);
+  static raw* create_msg(unsigned len, char *buf, XioDispatchHook *m_hook);
 #endif
 
   /*
diff --git a/src/messages/MNop.h b/src/messages/MNop.h
new file mode 100644 (file)
index 0000000..f820abb
--- /dev/null
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_NOP_H
+#define CEPH_MSG_NOP_H
+
+#include "msg/Message.h"
+#include "msg/msg_types.h"
+
+/*
+ * A message with no (remote) effect.
+ */
+class MNop : public Message {
+public:
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+
+  __u32 tag; // ignored tag value
+
+  MNop()
+    : Message(MSG_NOP, HEAD_VERSION, COMPAT_VERSION)
+    {}
+
+  ~MNop() {}
+
+  void encode_payload(uint64_t _features) {
+    ::encode(tag, payload);
+  }
+
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(tag, p);
+  }
+
+  const char *get_type_name() const { return "MNop"; }
+
+  void print(ostream& out) const {
+    out << get_type_name() << " ";
+  }
+}; /* MNop */
+
+#endif /* CEPH_MSG_NOP_H */
index 7243d0cf731877fe2068591676fd17ad73b0c365..963c9446a6ca9968db1703f1a61d2d231c257758 100644 (file)
 // Xio Testing
 #define MSG_DATA_PING            0x602
 
+// Xio intends to define messages 0x603..0x606
+
+// Special
+#define MSG_NOP                   0x607
+
 // ======================================================
 
 // abstract Message class
index b0a0ce5252625ef4c53c53aa4b4e5ef27011cbc5..8b7e28b2ef41dfc58f3775026da4a42076dfa8e8 100644 (file)
@@ -89,7 +89,8 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
   magic(m->get_magic()),
   scount(0),
   send_ctr(0),
-  in_seq()
+  in_seq(),
+  cstate(this)
 {
   pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
   if (xio_conn_type == XioConnection::ACTIVE)
@@ -185,16 +186,16 @@ int XioConnection::passive_setup()
 
 #define uint_to_timeval(tv, s) ((tv).tv_sec = (s), (tv).tv_usec = 0)
 
-static inline XioCompletionHook* pool_alloc_xio_completion_hook(
+static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
   XioConnection *xcon, Message *m, XioInSeq& msg_seq)
 {
   struct xio_mempool_obj mp_mem;
   int e = xpool_alloc(xio_msgr_noreg_mpool,
-                     sizeof(XioCompletionHook), &mp_mem);
+                     sizeof(XioDispatchHook), &mp_mem);
   if (!!e)
     return NULL;
-  XioCompletionHook *xhook = (XioCompletionHook*) mp_mem.addr;
-  new (xhook) XioCompletionHook(xcon, m, msg_seq, mp_mem);
+  XioDispatchHook *xhook = (XioDispatchHook*) mp_mem.addr;
+  new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem);
   return xhook;
 }
 
@@ -237,8 +238,8 @@ int XioConnection::on_msg_req(struct xio_session *session,
   }
 
   XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
-  XioCompletionHook *m_hook =
-    pool_alloc_xio_completion_hook(this, NULL /* msg */, in_seq);
+  XioDispatchHook *m_hook =
+    pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq);
   XioInSeq& msg_seq = m_hook->msg_seq;
   in_seq.clear();
 
@@ -448,6 +449,16 @@ int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
     " seq: " << xmsg->m->get_seq() << dendl;
 
   --send_ctr; /* atomic, because portal thread */
+
+  /* unblock flow-controlled connections, avoid oscillation */
+  if (unlikely(cstate.session_state.read() ==
+              XioConnection::FLOW_CONTROLLED)) {
+    if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
+       (1 /* XXX memory <= memory low-water mark */))  {
+      cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+    }
+  }
+
   xmsg->put();
 
   return 0;
@@ -467,6 +478,98 @@ void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
     " (" << xio_strerror(code) << ")" << dendl;
 } /* msg_release_fail */
 
+int XioConnection::flush_input_queue(uint32_t flags) {
+  XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_lock(&sp);
+
+  // send deferred 1 (direct backpresssure)
+  if (outgoing.requeue.size() > 0)
+    portal->requeue(this, outgoing.requeue);
+
+  // send deferred 2 (sent while deferred)
+  int ix, q_size = outgoing.mqueue.size();
+  for (ix = 0; ix < q_size; ++ix) {
+    Message::Queue::iterator q_iter = outgoing.mqueue.begin();
+    Message* m = &(*q_iter);
+    outgoing.mqueue.erase(q_iter);
+    msgr->_send_message_impl(m, this);
+  }
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_unlock(&sp);
+  return 0;
+}
+
+int XioConnection::discard_input_queue(uint32_t flags)
+{
+  Message::Queue disc_q;
+  XioSubmit::Queue deferred_q;
+
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_lock(&sp);
+
+  /* the two send queues contain different objects:
+   * - anything on the mqueue is a Message
+   * - anything on the requeue is an XioMsg
+   */
+  Message::Queue::const_iterator i1 = disc_q.end();
+  disc_q.splice(i1, outgoing.mqueue);
+
+  XioSubmit::Queue::const_iterator i2 = deferred_q.end();
+  deferred_q.splice(i2, outgoing.requeue);
+
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_unlock(&sp);
+
+  // mqueue
+  int ix, q_size =  disc_q.size();
+  for (ix = 0; ix < q_size; ++ix) {
+    Message::Queue::iterator q_iter = disc_q.begin();
+    Message* m = &(*q_iter);
+    disc_q.erase(q_iter);
+    m->put();
+  }
+
+  // requeue
+  q_size =  deferred_q.size();
+  for (ix = 0; ix < q_size; ++ix) {
+    XioSubmit::Queue::iterator q_iter = deferred_q.begin();
+    XioSubmit* xs = &(*q_iter);
+    assert(xs->type == XioSubmit::OUTGOING_MSG);
+    XioMsg* xmsg = static_cast<XioMsg*>(xs);
+    deferred_q.erase(q_iter);
+    // release once for each chained xio_msg
+    for (ix = 0; ix < int(xmsg->hdr.msg_cnt); ++ix)
+      xmsg->put();
+  }
+
+  return 0;
+}
+
+int XioConnection::adjust_clru(uint32_t flags)
+{
+  if (flags & CState::OP_FLAG_LOCKED)
+    pthread_spin_unlock(&sp);
+
+  XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
+  msgr->conns_sp.lock();
+  pthread_spin_lock(&sp);
+
+  if (cstate.flags & CState::FLAG_MAPPED) {
+    XioConnection::ConnList::iterator citer =
+      XioConnection::ConnList::s_iterator_to(*this);
+    msgr->conns_list.erase(citer);
+    msgr->conns_list.push_front(*this); // LRU
+  }
+
+  msgr->conns_sp.unlock();
+
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_unlock(&sp);
+
+  return 0;
+}
+
 int XioConnection::on_msg_error(struct xio_session *session,
                                enum xio_status error,
                                struct xio_msg  *msg,
@@ -480,6 +583,116 @@ int XioConnection::on_msg_error(struct xio_session *session,
   return 0;
 } /* on_msg_error */
 
+void XioConnection::mark_down()
+{
+  _mark_down(XioConnection::CState::OP_FLAG_NONE);
+}
+
+int XioConnection::_mark_down(uint32_t flags)
+{
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_lock(&sp);
+
+  // per interface comment, we only stage a remote reset if the
+  // current policy required it
+  if (cstate.policy.resetcheck)
+    cstate.flags |= CState::FLAG_RESET;
+
+  // Accelio disconnect
+  xio_disconnect(conn);
+
+  /* XXX this will almost certainly be called again from
+   * on_disconnect_event() */
+  discard_input_queue(flags|CState::OP_FLAG_LOCKED);
+
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_unlock(&sp);
+
+  return 0;
+}
+
+void XioConnection::mark_disposable()
+{
+  _mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
+int XioConnection::_mark_disposable(uint32_t flags)
+{
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_lock(&sp);
+
+  cstate.policy.lossy = true;
+
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_unlock(&sp);
+
+  return 0;
+}
+
+int XioConnection::CState::state_up_ready(uint32_t flags)
+{
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_lock(&xcon->sp);
+
+  xcon->flush_input_queue(flags|CState::OP_FLAG_LOCKED);
+
+  session_state.set(UP);
+  startup_state.set(READY);
+
+  if (! (flags & CState::OP_FLAG_LOCKED))
+    pthread_spin_unlock(&xcon->sp);
+
+  return (0);
+}
+
+int XioConnection::CState::state_discon()
+{
+  session_state.set(DISCONNECTED);
+  startup_state.set(IDLE);
+
+  return 0;
+}
+
+int XioConnection::CState::state_flow_controlled(uint32_t flags) {
+  dout(11) << __func__ << " ENTER " << dendl;
+
+  if (! (flags & OP_FLAG_LOCKED))
+    pthread_spin_lock(&xcon->sp);
+
+  session_state.set(FLOW_CONTROLLED);
+
+  if (! (flags & OP_FLAG_LOCKED))
+    pthread_spin_unlock(&xcon->sp);
+
+  return (0);
+}
+
+int XioConnection::CState::state_fail(Message* m, uint32_t flags)
+{
+  if (! (flags & OP_FLAG_LOCKED))
+    pthread_spin_lock(&xcon->sp);
+
+  // advance to state FAIL, drop queued, msgs, adjust LRU
+  session_state.set(DISCONNECTED);
+  startup_state.set(FAIL);
+
+  xcon->discard_input_queue(flags|OP_FLAG_LOCKED);
+  xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
+
+  // Accelio disconnect
+  xio_disconnect(xcon->conn);
+
+  if (! (flags & OP_FLAG_LOCKED))
+    pthread_spin_unlock(&xcon->sp);
+
+  // notify ULP
+  XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());
+  msgr->ms_deliver_handle_reset(xcon);
+  m->put();
+
+  return 0;
+}
+
 
 int XioLoopbackConnection::send_message(Message *m)
 {
index 2cd9d3499dbe44a2d6cb8f882de31c9d6b61d72d..21b19d098041d0a9fd97c46045cbe3c83c36aea6 100644 (file)
@@ -22,13 +22,17 @@ extern "C" {
 #include "libxio.h"
 }
 #include "XioInSeq.h"
+#include "XioSubmit.h"
 #include "msg/Connection.h"
 #include "msg/Messenger.h"
 #include "include/atomic.h"
+#include "auth/AuthSessionHandler.h"
 
 #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \
                          ~CEPH_FEATURE_MSGR_KEEPALIVE2)
 
+#define XIO_NOP_TAG_MARKDOWN 0x0001
+
 namespace bi = boost::intrusive;
 
 class XioPortal;
@@ -40,6 +44,24 @@ class XioConnection : public Connection
 public:
   enum type { ACTIVE, PASSIVE };
 
+  enum session_states {
+    INIT = 0,
+    START,
+    UP,
+    FLOW_CONTROLLED,
+    DISCONNECTED,
+    DELETED,
+    BARRIER
+  };
+
+  enum session_startup_states {
+    IDLE = 0,
+    CONNECTING,
+    ACCEPTING,
+    READY,
+    FAIL
+  };
+
 private:
   XioConnection::type xio_conn_type;
   XioPortal *portal;
@@ -56,6 +78,7 @@ private:
   uint64_t scount;
   uint32_t send_ctr;
   int q_high_mark;
+  int q_low_mark;
 
   struct lifecycle {
     // different from Pipe states?
@@ -81,13 +104,92 @@ private:
 
     uint32_t next_out_seq() {
       return out_seq.inc();
-    };
+    }
 
   } state;
 
   /* batching */
   XioInSeq in_seq;
 
+  class CState
+  {
+  public:
+    static const int FLAG_NONE = 0x0000;
+    static const int FLAG_BAD_AUTH = 0x0001;
+    static const int FLAG_MAPPED = 0x0002;
+    static const int FLAG_RESET = 0x0004;
+
+    static const int OP_FLAG_NONE = 0x0000;
+    static const int OP_FLAG_LOCKED = 0x0001;
+    static const int OP_FLAG_LRU = 0x0002;
+
+    uint64_t features;
+    Messenger::Policy policy;
+
+    CryptoKey session_key;
+    ceph::shared_ptr<AuthSessionHandler> session_security;
+    AuthAuthorizer *authorizer;
+    XioConnection *xcon;
+    uint32_t protocol_version;
+
+    atomic_t session_state;
+    atomic_t startup_state;
+
+    uint32_t reconnects;
+    uint32_t connect_seq, global_seq, peer_global_seq;
+    uint32_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+    atomic_t out_seq; // atomic<uint32_t>
+
+    uint32_t flags;
+
+    CState(XioConnection* _xcon)
+      : xcon(_xcon),
+       protocol_version(0),
+       session_state(INIT),
+       startup_state(IDLE),
+       in_seq(0),
+       out_seq(0),
+       flags(FLAG_NONE) {}
+
+    uint64_t get_session_state() {
+      return session_state.read();
+    }
+
+    uint64_t get_startup_state() {
+      return startup_state.read();
+    }
+
+    void set_in_seq(uint32_t seq) {
+      in_seq = seq;
+    }
+
+    uint32_t next_out_seq() {
+      return out_seq.inc();
+    };
+
+    // state machine
+    int init_state();
+    int next_state(Message* m);
+#if 0 // future (session startup)
+    int msg_connect(MConnect *m);
+    int msg_connect_reply(MConnectReply *m);
+    int msg_connect_reply(MConnectAuthReply *m);
+    int msg_connect_auth(MConnectAuth *m);
+    int msg_connect_auth_reply(MConnectAuthReply *m);
+#endif
+    int state_up_ready(uint32_t flags);
+    int state_flow_controlled(uint32_t flags);
+    int state_discon();
+    int state_fail(Message* m, uint32_t flags);
+
+  } cstate; /* CState */
+
+  // message submission queue
+  struct SendQ {
+    Message::Queue mqueue; // deferred
+    XioSubmit::Queue requeue;
+  } outgoing;
+
   // conns_entity_map comparison functor
   struct EntityComp
   {
@@ -118,12 +220,14 @@ private:
 
   friend class XioPortal;
   friend class XioMessenger;
-  friend class XioCompletionHook;
+  friend class XioDispatchHook;
+  friend class XioMarkDownHook;
   friend class XioMsg;
 
   int on_disconnect_event() {
     connected.set(false);
     pthread_spin_lock(&sp);
+    discard_input_queue(CState::OP_FLAG_LOCKED);
     if (!conn)
       this->put();
     pthread_spin_unlock(&sp);
@@ -144,6 +248,10 @@ private:
     return q_high_mark;
   }
 
+  int xio_qdepth_low_mark() {
+    return q_low_mark;
+  }
+
 public:
   XioConnection(XioMessenger *m, XioConnection::type _type,
                const entity_inst_t& peer);
@@ -157,8 +265,10 @@ public:
 
   int send_message(Message *m);
   void send_keepalive() {}
-  void mark_down() {}
-  void mark_disposable() {}
+  virtual void mark_down();
+  int _mark_down(uint32_t flags);
+  virtual void mark_disposable();
+  int _mark_disposable(uint32_t flags);
 
   const entity_inst_t& get_peer() const { return peer; }
 
@@ -196,16 +306,15 @@ public:
 
   int on_msg_req(struct xio_session *session, struct xio_msg *req,
                 int more_in_batch, void *cb_user_context);
-
   int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
                              void *conn_user_context);
-
   int on_msg_error(struct xio_session *session, enum xio_status error,
                   struct xio_msg  *msg, void *conn_user_context);
-
   void msg_send_fail(XioMsg *xmsg, int code);
-
   void msg_release_fail(struct xio_msg *msg, int code);
+  int flush_input_queue(uint32_t flags);
+  int discard_input_queue(uint32_t flags);
+  int adjust_clru(uint32_t flags);
 };
 
 typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
index f9f5799c0eccfbba8a38b23727d1805876e2a9f3..562e7e710c8fbab0a5d0d7679b359afc5c81c7b2 100644 (file)
@@ -22,6 +22,7 @@
 #include "XioMsg.h"
 #include "XioMessenger.h"
 #include "common/address_helper.h"
+#include "messages/MNop.h"
 
 #define dout_subsys ceph_subsys_xio
 
@@ -441,8 +442,11 @@ int XioMessenger::session_event(struct xio_session *session,
      * it's peer address */
     conns_sp.unlock();
 
-    ldout(cct,4) << "new connection session " << session
-      << " xcon " << xcon << dendl;
+    /* XXXX pre-merge of session startup negotiation ONLY! */
+    xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+    ldout(cct,2) << "new connection session " << session
+                << " xcon " << xcon << dendl;
   }
   break;
   case XIO_SESSION_CONNECTION_ERROR_EVENT:
@@ -714,14 +718,6 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
 
 int XioMessenger::_send_message(Message *m, Connection *con)
 {
-
-  static uint32_t nreqs;
-  if (unlikely(XioPool::trace_mempool)) {
-    if (unlikely((++nreqs % 65536) == 0)) {
-      xp_stats.dump(__func__, nreqs);
-    }
-  }
-
   if (con == &loop_con) {
     m->set_connection(con);
     m->set_src(get_myinst().name);
@@ -732,11 +728,32 @@ int XioMessenger::_send_message(Message *m, Connection *con)
   }
 
   XioConnection *xcon = static_cast<XioConnection*>(con);
-  if (! xcon->is_connected())
-    return ENOTCONN;
 
+  /* If con is not in READY state, we have to enforce policy */
+  if (xcon->cstate.session_state.read() != XioConnection::UP) {
+    pthread_spin_lock(&xcon->sp);
+    if (xcon->cstate.session_state.read() != XioConnection::UP) {
+      xcon->outgoing.mqueue.push_back(*m);
+      pthread_spin_unlock(&xcon->sp);
+      return 0;
+    }
+    pthread_spin_unlock(&xcon->sp);
+  }
+
+  return _send_message_impl(m, xcon);
+} /* send_message(Message* m, Connection *con) */
+
+int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
+{
   int code = 0;
 
+  static uint32_t nreqs;
+  if (unlikely(XioPool::trace_mempool)) {
+    if (unlikely((++nreqs % 65536) == 0)) {
+      xp_stats.dump(__func__, nreqs);
+    }
+  }
+
   m->set_seq(xcon->state.next_out_seq());
   m->set_magic(magic); // trace flags and special handling
 
@@ -938,6 +955,9 @@ ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
     conns_entity_map.insert(*xcon);
     conns_sp.unlock();
 
+    /* XXXX pre-merge of session startup negotiation ONLY! */
+    xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
     return xcon->get(); /* nref +1 */
   }
 } /* get_connection */
@@ -947,6 +967,63 @@ ConnectionRef XioMessenger::get_loopback_connection()
   return (loop_con.get());
 } /* get_loopback_connection */
 
+void XioMessenger::mark_down(const entity_addr_t& addr)
+{
+  entity_inst_t inst(entity_name_t(), addr);
+  Spinlock::Locker lckr(conns_sp);
+  XioConnection::EntitySet::iterator conn_iter =
+    conns_entity_map.find(inst, XioConnection::EntityComp());
+  if (conn_iter != conns_entity_map.end()) {
+      (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+    }
+} /* mark_down(const entity_addr_t& */
+
+void XioMessenger::mark_down(Connection* con)
+{
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+  xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+} /* mark_down(Connection*) */
+
+void XioMessenger::mark_down_all()
+{
+  Spinlock::Locker lckr(conns_sp);
+  XioConnection::EntitySet::iterator conn_iter;
+  for (conn_iter = conns_entity_map.begin(); conn_iter !=
+        conns_entity_map.begin(); ++conn_iter) {
+    (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+  }
+} /* mark_down_all */
+
+static inline XioMarkDownHook* pool_alloc_markdown_hook(
+  XioConnection *xcon, Message *m)
+{
+  struct xio_mempool_obj mp_mem;
+  int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
+                           sizeof(XioMarkDownHook), &mp_mem);
+  if (!!e)
+    return NULL;
+  XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
+  new (hook) XioMarkDownHook(xcon, m, mp_mem);
+  return hook;
+}
+
+void XioMessenger::mark_down_on_empty(Connection* con)
+{
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+  MNop* m = new MNop();
+  m->tag = XIO_NOP_TAG_MARKDOWN;
+  m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
+  // stall new messages
+  xcon->cstate.session_state.set(XioConnection::BARRIER);
+  (void) _send_message_impl(m, xcon);
+}
+
+void XioMessenger::mark_disposable(Connection *con)
+{
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+  xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
 void XioMessenger::try_insert(XioConnection *xcon)
 {
   Spinlock::Locker lckr(conns_sp);
index df4ea6a392023e019c7e4a4fa33da4d563aa548c..8cec6d9c63aeba04707c3779139522047a733064 100644 (file)
@@ -44,6 +44,8 @@ private:
   Mutex sh_mtx;
   Cond sh_cond;
 
+  friend class XioConnection;
+
 public:
   XioMessenger(CephContext *cct, entity_name_t name,
               string mname, uint64_t nonce,
@@ -60,6 +62,7 @@ public:
 
   int _send_message(Message *m, const entity_inst_t &dest);
   int _send_message(Message *m, Connection *con);
+  int _send_message_impl(Message *m, XioConnection *xcon);
 
   uint32_t get_magic() { return magic; }
   void set_magic(int _magic) { magic = _magic; }
@@ -120,20 +123,11 @@ public:
   virtual int send_keepalive(Connection *con)
     { return EINVAL; }
 
-  virtual void mark_down(const entity_addr_t& a)
-    { }
-
-  virtual void mark_down(Connection *con)
-    { }
-
-  virtual void mark_down_on_empty(Connection *con)
-    { }
-
-  virtual void mark_disposable(Connection *con)
-    { }
-
-  virtual void mark_down_all()
-    { }
+  virtual void mark_down(const entity_addr_t& a);
+  virtual void mark_down(Connection *con);
+  virtual void mark_down_all();
+  virtual void mark_down_on_empty(Connection *con);
+  virtual void mark_disposable(Connection *con);
 
   void ds_dispatch(Message *m)
     { dispatch_strategy->ds_dispatch(m); }
index e2e56e6c5e4905e523419e4b94bcab0b1066ef7e..362d8bebc35a87a65b13b8ec9b670e73756949ab 100644 (file)
@@ -18,7 +18,7 @@
 #include "XioMsg.h"
 
 
-int XioCompletionHook::release_msgs()
+int XioDispatchHook::release_msgs()
 {
   XioRsp *xrsp;
   int r = msg_seq.size();
index 77469aed845ac5dee4ced63b598df38c366c718e..96bd47e46b554a8896f5c66e68fdbd8f9338b96f 100644 (file)
@@ -22,6 +22,7 @@ extern "C" {
 #include "libxio.h"
 }
 #include "XioConnection.h"
+#include "XioSubmit.h"
 #include "msg/msg_types.h"
 #include "XioPool.h"
 
@@ -135,29 +136,6 @@ public:
 
 WRITE_CLASS_ENCODER(XioMsgHdr);
 
-struct XioSubmit
-{
-public:
-  enum submit_type
-  {
-    OUTGOING_MSG,
-    INCOMING_MSG_RELEASE
-  };
-  enum submit_type type;
-  bi::list_member_hook<> submit_list;
-  XioConnection *xcon;
-
-  XioSubmit(enum submit_type _type, XioConnection *_xcon) :
-    type(_type), xcon(_xcon)
-    {}
-
-  typedef bi::list< XioSubmit,
-                   bi::member_hook< XioSubmit,
-                                    bi::list_member_hook<>,
-                                    &XioSubmit::submit_list >
-                   > Queue;
-};
-
 extern struct xio_mempool *xio_msgr_noreg_mpool;
 
 #define XIO_MSGR_IOVLEN 16
@@ -292,7 +270,7 @@ public:
     }
 };
 
-class XioCompletionHook : public Message::CompletionHook
+class XioDispatchHook : public Message::CompletionHook
 {
 private:
   XioConnection *xcon;
@@ -305,7 +283,7 @@ private:
 public:
   struct xio_mempool_obj mp_this;
 
-  XioCompletionHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
+  XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
                    struct xio_mempool_obj& _mp) :
     CompletionHook(_m),
     xcon(_xcon->get()),
@@ -329,7 +307,7 @@ public:
 
   int release_msgs();
 
-  XioCompletionHook* get() {
+  XioDispatchHook* get() {
     nrefs.inc(); return this;
   }
 
@@ -342,8 +320,8 @@ public:
       if (!cl_flag && release_msgs())
        return;
       struct xio_mempool_obj *mp = &this->mp_this;
-      this->~XioCompletionHook();
-      xpool_free(sizeof(XioCompletionHook), mp);
+      this->~XioDispatchHook();
+      xpool_free(sizeof(XioDispatchHook), mp);
     }
   }
 
@@ -358,18 +336,48 @@ public:
     this->finish(-1);
   }
 
-  ~XioCompletionHook() {
+  ~XioDispatchHook() {
     --xcon->n_reqs; // atomicity by portal thread
     xpool_dec_hookcnt();
     xcon->put();
   }
 };
 
+/* A sender-side CompletionHook that relies on the on_msg_delivered
+ * to complete a pending mark down. */
+class XioMarkDownHook : public Message::CompletionHook
+{
+private:
+  XioConnection* xcon;
+
+public:
+  struct xio_mempool_obj mp_this;
+
+  XioMarkDownHook(
+    XioConnection* _xcon, Message *_m, struct xio_mempool_obj& _mp) :
+    CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
+    { }
+
+  virtual void claim(int r) {}
+
+  virtual void finish(int r) {
+    xcon->put();
+    struct xio_mempool_obj *mp = &this->mp_this;
+    this->~XioMarkDownHook();
+    xio_mempool_free(mp);
+  }
+
+  virtual void complete(int r) {
+    xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+    finish(r);
+  }
+};
+
 struct XioRsp : public XioSubmit
 {
-  XioCompletionHook *xhook;
+  XioDispatchHook *xhook;
 public:
-  XioRsp(XioConnection *_xcon, XioCompletionHook *_xhook)
+  XioRsp(XioConnection *_xcon, XioDispatchHook *_xhook)
     : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
       xhook(_xhook->get()) {
       // submit queue ref
@@ -380,7 +388,7 @@ public:
     return xhook->get_seq().dequeue();
   }
 
-  XioCompletionHook *get_xhook() { return xhook; }
+  XioDispatchHook* get_xhook() { return xhook; }
 
   void finalize() {
     xcon->put();
index 0090e6227e213d06629c4c81949a65694bcc4e73..5932d56dc08758a5f431b6d47005f4395eee39d6 100644 (file)
@@ -77,7 +77,18 @@ private:
        pthread_spin_unlock(&lane->sp);
       }
 
-    void deq(XioSubmit::Queue &send_q)
+    void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
+      {
+       int size = requeue_q.size();
+       Lane* lane = get_lane(xcon);
+       pthread_spin_lock(&lane->sp);
+       XioSubmit::Queue::const_iterator i1 = lane->q.end();
+       lane->q.splice(i1, requeue_q);
+       lane->size += size;
+       pthread_spin_unlock(&lane->sp);
+      }
+
+    void deq(XioSubmit::Queue& send_q)
       {
        int ix;
        Lane* lane;
@@ -178,6 +189,38 @@ public:
       };
     }
 
+  void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
+    submit_q.enq(xcon, send_q);
+  }
+
+  void requeue_all_xcon(XioMsg* xmsg,
+                       XioConnection* xcon,
+                       XioSubmit::Queue::iterator& q_iter,
+                       XioSubmit::Queue& send_q) {
+    // XXX gather all already-dequeued outgoing messages for xcon
+    // and push them in FIFO order to front of the input queue,
+    // having first marked the connection as flow-controlled
+    XioSubmit::Queue requeue_q;
+    XioSubmit *xs;
+    requeue_q.push_back(*xmsg);
+    ++q_iter;
+    while (q_iter != send_q.end()) {
+      xs = &(*q_iter);
+      // skip retires and anything for other connections
+      if ((xs->type != XioSubmit::OUTGOING_MSG) ||
+         (xs->xcon != xcon))
+       continue;
+      xmsg = static_cast<XioMsg*>(xs);
+      q_iter = send_q.erase(q_iter);
+      requeue_q.push_back(*xmsg);
+    }
+    pthread_spin_lock(&xcon->sp);
+    XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
+    xcon->outgoing.requeue.splice(i1, requeue_q);
+    xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
+    pthread_spin_unlock(&xcon->sp);
+  }
+
   void *entry()
     {
       int size, code = 0;
@@ -191,55 +234,70 @@ public:
 
       do {
        submit_q.deq(send_q);
-       size = send_q.size();
 
        /* shutdown() barrier */
        pthread_spin_lock(&sp);
 
+      restart:
+       size = send_q.size();
+
        if (_shutdown) {
+         // XXX XioMsg queues for flow-controlled connections may require
+         // cleanup
          drained = true;
        }
 
        if (size > 0) {
-          q_iter = send_q.begin();
-          while (q_iter != send_q.end()) {
-            xs = &(*q_iter);
-            xcon = xs->xcon;
-            xmsg = static_cast<XioMsg*>(xs);
-
-            /* guard Accelio send queue */
-            xio_qdepth_high = xcon->xio_qdepth_high_mark();
-            if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth_high)) {
-              ++q_iter;
-              continue;
-            }
-
-            q_iter = send_q.erase(q_iter);
-
-            switch (xs->type) {
-            case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+         q_iter = send_q.begin();
+         while (q_iter != send_q.end()) {
+           xs = &(*q_iter);
+           xcon = xs->xcon;
+           xmsg = static_cast<XioMsg*>(xs);
+
+           /* guard Accelio send queue */
+           xio_qdepth_high = xcon->xio_qdepth_high_mark();
+           if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
+                        xio_qdepth_high)) {
+             requeue_all_xcon(xmsg, xcon, q_iter, send_q);
+             goto restart;
+           }
+
+           q_iter = send_q.erase(q_iter);
+
+           switch (xs->type) {
+           case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
              if (unlikely(!xs->xcon->conn))
-                code = ENOTCONN;
+               code = ENOTCONN;
              else {
-                msg = &xmsg->req_0.msg;
-                code = xio_send_msg(xcon->conn, msg);
-                /* header trace moved here to capture xio serial# */
-                if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
-                  print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
-                  print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
-                }
+               msg = &xmsg->req_0.msg;
+               code = xio_send_msg(xcon->conn, msg);
+               /* header trace moved here to capture xio serial# */
+               if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
+                 print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
+                 print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
+               }
              }
              if (unlikely(code)) {
-                xs->xcon->msg_send_fail(xmsg, code);
+               switch (code) {
+               case XIO_E_TX_QUEUE_OVERFLOW:
+               {
+                 requeue_all_xcon(xmsg, xcon, q_iter, send_q);
+                 goto restart;
+               }
+                 break;
+               default:
+                 xs->xcon->msg_send_fail(xmsg, code);
+                 break;
+               };
              } else {
-                xs->xcon->send.set(msg->timestamp); // need atomic?
-                xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
-              }
-              break;
-            default:
-              /* INCOMING_MSG_RELEASE */
-              release_xio_rsp(static_cast<XioRsp*>(xs));
-              break;
+               xs->xcon->send.set(msg->timestamp); // need atomic?
+               xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
+             }
+             break;
+           default:
+             /* INCOMING_MSG_RELEASE */
+             release_xio_rsp(static_cast<XioRsp*>(xs));
+             break;
            }
          }
        }
diff --git a/src/msg/xio/XioSubmit.h b/src/msg/xio/XioSubmit.h
new file mode 100644 (file)
index 0000000..a227ed2
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef XIO_SUBMIT_H
+#define XIO_SUBMIT_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+#include "XioConnection.h"
+#include "msg/msg_types.h"
+#include "XioPool.h"
+
+namespace bi = boost::intrusive;
+
+class XioConnection;
+
+struct XioSubmit
+{
+public:
+  enum submit_type
+  {
+    OUTGOING_MSG,
+    INCOMING_MSG_RELEASE
+  };
+  enum submit_type type;
+  bi::list_member_hook<> submit_list;
+  XioConnection *xcon;
+
+  XioSubmit(enum submit_type _type, XioConnection *_xcon) :
+    type(_type), xcon(_xcon)
+    {}
+
+  typedef bi::list< XioSubmit,
+                   bi::member_hook< XioSubmit,
+                                    bi::list_member_hook<>,
+                                    &XioSubmit::submit_list >
+                   > Queue;
+};
+
+#endif /* XIO_SUBMIT_H */