#if defined(HAVE_XIO)
class buffer::xio_msg_buffer : public buffer::raw {
private:
- XioCompletionHook* m_hook;
+ XioDispatchHook* m_hook;
public:
- xio_msg_buffer(XioCompletionHook* _m_hook, const char *d,
+ xio_msg_buffer(XioDispatchHook* _m_hook, const char *d,
unsigned l) :
raw((char*)d, l), m_hook(_m_hook->get()) {}
}
buffer::raw* buffer::create_msg(
- unsigned len, char *buf, XioCompletionHook *m_hook) {
+ unsigned len, char *buf, XioDispatchHook* m_hook) {
XioPool& pool = m_hook->get_pool();
buffer::raw* bp =
static_cast<buffer::raw*>(pool.alloc(sizeof(xio_msg_buffer)));
#if defined(HAVE_XIO)
struct xio_mempool_obj;
-class XioCompletionHook;
+class XioDispatchHook;
#endif
namespace ceph {
static raw* create_unshareable(unsigned len);
#if defined(HAVE_XIO)
- static raw* create_msg(unsigned len, char *buf, XioCompletionHook *m_hook);
+ static raw* create_msg(unsigned len, char *buf, XioDispatchHook *m_hook);
#endif
/*
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_NOP_H
+#define CEPH_MSG_NOP_H
+
+#include "msg/Message.h"
+#include "msg/msg_types.h"
+
+/*
+ * A message with no (remote) effect.
+ */
+class MNop : public Message {
+public:
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+
+ __u32 tag; // ignored tag value
+
+ MNop()
+ : Message(MSG_NOP, HEAD_VERSION, COMPAT_VERSION)
+ {}
+
+ ~MNop() {}
+
+ void encode_payload(uint64_t _features) {
+ ::encode(tag, payload);
+ }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(tag, p);
+ }
+
+ const char *get_type_name() const { return "MNop"; }
+
+ void print(ostream& out) const {
+ out << get_type_name() << " ";
+ }
+}; /* MNop */
+
+#endif /* CEPH_MSG_NOP_H */
// Xio Testing
#define MSG_DATA_PING 0x602
+// Xio intends to define messages 0x603..0x606
+
+// Special
+#define MSG_NOP 0x607
+
// ======================================================
// abstract Message class
magic(m->get_magic()),
scount(0),
send_ctr(0),
- in_seq()
+ in_seq(),
+ cstate(this)
{
pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
if (xio_conn_type == XioConnection::ACTIVE)
#define uint_to_timeval(tv, s) ((tv).tv_sec = (s), (tv).tv_usec = 0)
-static inline XioCompletionHook* pool_alloc_xio_completion_hook(
+static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
XioConnection *xcon, Message *m, XioInSeq& msg_seq)
{
struct xio_mempool_obj mp_mem;
int e = xpool_alloc(xio_msgr_noreg_mpool,
- sizeof(XioCompletionHook), &mp_mem);
+ sizeof(XioDispatchHook), &mp_mem);
if (!!e)
return NULL;
- XioCompletionHook *xhook = (XioCompletionHook*) mp_mem.addr;
- new (xhook) XioCompletionHook(xcon, m, msg_seq, mp_mem);
+ XioDispatchHook *xhook = (XioDispatchHook*) mp_mem.addr;
+ new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem);
return xhook;
}
}
XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
- XioCompletionHook *m_hook =
- pool_alloc_xio_completion_hook(this, NULL /* msg */, in_seq);
+ XioDispatchHook *m_hook =
+ pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq);
XioInSeq& msg_seq = m_hook->msg_seq;
in_seq.clear();
" seq: " << xmsg->m->get_seq() << dendl;
--send_ctr; /* atomic, because portal thread */
+
+ /* unblock flow-controlled connections, avoid oscillation */
+ if (unlikely(cstate.session_state.read() ==
+ XioConnection::FLOW_CONTROLLED)) {
+ if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
+ (1 /* XXX memory <= memory low-water mark */)) {
+ cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+ }
+ }
+
xmsg->put();
return 0;
" (" << xio_strerror(code) << ")" << dendl;
} /* msg_release_fail */
+int XioConnection::flush_input_queue(uint32_t flags) {
+ XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_lock(&sp);
+
+ // send deferred 1 (direct backpresssure)
+ if (outgoing.requeue.size() > 0)
+ portal->requeue(this, outgoing.requeue);
+
+ // send deferred 2 (sent while deferred)
+ int ix, q_size = outgoing.mqueue.size();
+ for (ix = 0; ix < q_size; ++ix) {
+ Message::Queue::iterator q_iter = outgoing.mqueue.begin();
+ Message* m = &(*q_iter);
+ outgoing.mqueue.erase(q_iter);
+ msgr->_send_message_impl(m, this);
+ }
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_unlock(&sp);
+ return 0;
+}
+
+int XioConnection::discard_input_queue(uint32_t flags)
+{
+ Message::Queue disc_q;
+ XioSubmit::Queue deferred_q;
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_lock(&sp);
+
+ /* the two send queues contain different objects:
+ * - anything on the mqueue is a Message
+ * - anything on the requeue is an XioMsg
+ */
+ Message::Queue::const_iterator i1 = disc_q.end();
+ disc_q.splice(i1, outgoing.mqueue);
+
+ XioSubmit::Queue::const_iterator i2 = deferred_q.end();
+ deferred_q.splice(i2, outgoing.requeue);
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_unlock(&sp);
+
+ // mqueue
+ int ix, q_size = disc_q.size();
+ for (ix = 0; ix < q_size; ++ix) {
+ Message::Queue::iterator q_iter = disc_q.begin();
+ Message* m = &(*q_iter);
+ disc_q.erase(q_iter);
+ m->put();
+ }
+
+ // requeue
+ q_size = deferred_q.size();
+ for (ix = 0; ix < q_size; ++ix) {
+ XioSubmit::Queue::iterator q_iter = deferred_q.begin();
+ XioSubmit* xs = &(*q_iter);
+ assert(xs->type == XioSubmit::OUTGOING_MSG);
+ XioMsg* xmsg = static_cast<XioMsg*>(xs);
+ deferred_q.erase(q_iter);
+ // release once for each chained xio_msg
+ for (ix = 0; ix < int(xmsg->hdr.msg_cnt); ++ix)
+ xmsg->put();
+ }
+
+ return 0;
+}
+
+int XioConnection::adjust_clru(uint32_t flags)
+{
+ if (flags & CState::OP_FLAG_LOCKED)
+ pthread_spin_unlock(&sp);
+
+ XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
+ msgr->conns_sp.lock();
+ pthread_spin_lock(&sp);
+
+ if (cstate.flags & CState::FLAG_MAPPED) {
+ XioConnection::ConnList::iterator citer =
+ XioConnection::ConnList::s_iterator_to(*this);
+ msgr->conns_list.erase(citer);
+ msgr->conns_list.push_front(*this); // LRU
+ }
+
+ msgr->conns_sp.unlock();
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_unlock(&sp);
+
+ return 0;
+}
+
int XioConnection::on_msg_error(struct xio_session *session,
enum xio_status error,
struct xio_msg *msg,
return 0;
} /* on_msg_error */
+void XioConnection::mark_down()
+{
+ _mark_down(XioConnection::CState::OP_FLAG_NONE);
+}
+
+int XioConnection::_mark_down(uint32_t flags)
+{
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_lock(&sp);
+
+ // per interface comment, we only stage a remote reset if the
+ // current policy required it
+ if (cstate.policy.resetcheck)
+ cstate.flags |= CState::FLAG_RESET;
+
+ // Accelio disconnect
+ xio_disconnect(conn);
+
+ /* XXX this will almost certainly be called again from
+ * on_disconnect_event() */
+ discard_input_queue(flags|CState::OP_FLAG_LOCKED);
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_unlock(&sp);
+
+ return 0;
+}
+
+void XioConnection::mark_disposable()
+{
+ _mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
+int XioConnection::_mark_disposable(uint32_t flags)
+{
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_lock(&sp);
+
+ cstate.policy.lossy = true;
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_unlock(&sp);
+
+ return 0;
+}
+
+int XioConnection::CState::state_up_ready(uint32_t flags)
+{
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_lock(&xcon->sp);
+
+ xcon->flush_input_queue(flags|CState::OP_FLAG_LOCKED);
+
+ session_state.set(UP);
+ startup_state.set(READY);
+
+ if (! (flags & CState::OP_FLAG_LOCKED))
+ pthread_spin_unlock(&xcon->sp);
+
+ return (0);
+}
+
+int XioConnection::CState::state_discon()
+{
+ session_state.set(DISCONNECTED);
+ startup_state.set(IDLE);
+
+ return 0;
+}
+
+int XioConnection::CState::state_flow_controlled(uint32_t flags) {
+ dout(11) << __func__ << " ENTER " << dendl;
+
+ if (! (flags & OP_FLAG_LOCKED))
+ pthread_spin_lock(&xcon->sp);
+
+ session_state.set(FLOW_CONTROLLED);
+
+ if (! (flags & OP_FLAG_LOCKED))
+ pthread_spin_unlock(&xcon->sp);
+
+ return (0);
+}
+
+int XioConnection::CState::state_fail(Message* m, uint32_t flags)
+{
+ if (! (flags & OP_FLAG_LOCKED))
+ pthread_spin_lock(&xcon->sp);
+
+ // advance to state FAIL, drop queued, msgs, adjust LRU
+ session_state.set(DISCONNECTED);
+ startup_state.set(FAIL);
+
+ xcon->discard_input_queue(flags|OP_FLAG_LOCKED);
+ xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
+
+ // Accelio disconnect
+ xio_disconnect(xcon->conn);
+
+ if (! (flags & OP_FLAG_LOCKED))
+ pthread_spin_unlock(&xcon->sp);
+
+ // notify ULP
+ XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());
+ msgr->ms_deliver_handle_reset(xcon);
+ m->put();
+
+ return 0;
+}
+
int XioLoopbackConnection::send_message(Message *m)
{
#include "libxio.h"
}
#include "XioInSeq.h"
+#include "XioSubmit.h"
#include "msg/Connection.h"
#include "msg/Messenger.h"
#include "include/atomic.h"
+#include "auth/AuthSessionHandler.h"
#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \
~CEPH_FEATURE_MSGR_KEEPALIVE2)
+#define XIO_NOP_TAG_MARKDOWN 0x0001
+
namespace bi = boost::intrusive;
class XioPortal;
public:
enum type { ACTIVE, PASSIVE };
+ enum session_states {
+ INIT = 0,
+ START,
+ UP,
+ FLOW_CONTROLLED,
+ DISCONNECTED,
+ DELETED,
+ BARRIER
+ };
+
+ enum session_startup_states {
+ IDLE = 0,
+ CONNECTING,
+ ACCEPTING,
+ READY,
+ FAIL
+ };
+
private:
XioConnection::type xio_conn_type;
XioPortal *portal;
uint64_t scount;
uint32_t send_ctr;
int q_high_mark;
+ int q_low_mark;
struct lifecycle {
// different from Pipe states?
uint32_t next_out_seq() {
return out_seq.inc();
- };
+ }
} state;
/* batching */
XioInSeq in_seq;
+ class CState
+ {
+ public:
+ static const int FLAG_NONE = 0x0000;
+ static const int FLAG_BAD_AUTH = 0x0001;
+ static const int FLAG_MAPPED = 0x0002;
+ static const int FLAG_RESET = 0x0004;
+
+ static const int OP_FLAG_NONE = 0x0000;
+ static const int OP_FLAG_LOCKED = 0x0001;
+ static const int OP_FLAG_LRU = 0x0002;
+
+ uint64_t features;
+ Messenger::Policy policy;
+
+ CryptoKey session_key;
+ ceph::shared_ptr<AuthSessionHandler> session_security;
+ AuthAuthorizer *authorizer;
+ XioConnection *xcon;
+ uint32_t protocol_version;
+
+ atomic_t session_state;
+ atomic_t startup_state;
+
+ uint32_t reconnects;
+ uint32_t connect_seq, global_seq, peer_global_seq;
+ uint32_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+ atomic_t out_seq; // atomic<uint32_t>
+
+ uint32_t flags;
+
+ CState(XioConnection* _xcon)
+ : xcon(_xcon),
+ protocol_version(0),
+ session_state(INIT),
+ startup_state(IDLE),
+ in_seq(0),
+ out_seq(0),
+ flags(FLAG_NONE) {}
+
+ uint64_t get_session_state() {
+ return session_state.read();
+ }
+
+ uint64_t get_startup_state() {
+ return startup_state.read();
+ }
+
+ void set_in_seq(uint32_t seq) {
+ in_seq = seq;
+ }
+
+ uint32_t next_out_seq() {
+ return out_seq.inc();
+ };
+
+ // state machine
+ int init_state();
+ int next_state(Message* m);
+#if 0 // future (session startup)
+ int msg_connect(MConnect *m);
+ int msg_connect_reply(MConnectReply *m);
+ int msg_connect_reply(MConnectAuthReply *m);
+ int msg_connect_auth(MConnectAuth *m);
+ int msg_connect_auth_reply(MConnectAuthReply *m);
+#endif
+ int state_up_ready(uint32_t flags);
+ int state_flow_controlled(uint32_t flags);
+ int state_discon();
+ int state_fail(Message* m, uint32_t flags);
+
+ } cstate; /* CState */
+
+ // message submission queue
+ struct SendQ {
+ Message::Queue mqueue; // deferred
+ XioSubmit::Queue requeue;
+ } outgoing;
+
// conns_entity_map comparison functor
struct EntityComp
{
friend class XioPortal;
friend class XioMessenger;
- friend class XioCompletionHook;
+ friend class XioDispatchHook;
+ friend class XioMarkDownHook;
friend class XioMsg;
int on_disconnect_event() {
connected.set(false);
pthread_spin_lock(&sp);
+ discard_input_queue(CState::OP_FLAG_LOCKED);
if (!conn)
this->put();
pthread_spin_unlock(&sp);
return q_high_mark;
}
+ int xio_qdepth_low_mark() {
+ return q_low_mark;
+ }
+
public:
XioConnection(XioMessenger *m, XioConnection::type _type,
const entity_inst_t& peer);
int send_message(Message *m);
void send_keepalive() {}
- void mark_down() {}
- void mark_disposable() {}
+ virtual void mark_down();
+ int _mark_down(uint32_t flags);
+ virtual void mark_disposable();
+ int _mark_disposable(uint32_t flags);
const entity_inst_t& get_peer() const { return peer; }
int on_msg_req(struct xio_session *session, struct xio_msg *req,
int more_in_batch, void *cb_user_context);
-
int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
void *conn_user_context);
-
int on_msg_error(struct xio_session *session, enum xio_status error,
struct xio_msg *msg, void *conn_user_context);
-
void msg_send_fail(XioMsg *xmsg, int code);
-
void msg_release_fail(struct xio_msg *msg, int code);
+ int flush_input_queue(uint32_t flags);
+ int discard_input_queue(uint32_t flags);
+ int adjust_clru(uint32_t flags);
};
typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
#include "XioMsg.h"
#include "XioMessenger.h"
#include "common/address_helper.h"
+#include "messages/MNop.h"
#define dout_subsys ceph_subsys_xio
* it's peer address */
conns_sp.unlock();
- ldout(cct,4) << "new connection session " << session
- << " xcon " << xcon << dendl;
+ /* XXXX pre-merge of session startup negotiation ONLY! */
+ xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+ ldout(cct,2) << "new connection session " << session
+ << " xcon " << xcon << dendl;
}
break;
case XIO_SESSION_CONNECTION_ERROR_EVENT:
int XioMessenger::_send_message(Message *m, Connection *con)
{
-
- static uint32_t nreqs;
- if (unlikely(XioPool::trace_mempool)) {
- if (unlikely((++nreqs % 65536) == 0)) {
- xp_stats.dump(__func__, nreqs);
- }
- }
-
if (con == &loop_con) {
m->set_connection(con);
m->set_src(get_myinst().name);
}
XioConnection *xcon = static_cast<XioConnection*>(con);
- if (! xcon->is_connected())
- return ENOTCONN;
+ /* If con is not in READY state, we have to enforce policy */
+ if (xcon->cstate.session_state.read() != XioConnection::UP) {
+ pthread_spin_lock(&xcon->sp);
+ if (xcon->cstate.session_state.read() != XioConnection::UP) {
+ xcon->outgoing.mqueue.push_back(*m);
+ pthread_spin_unlock(&xcon->sp);
+ return 0;
+ }
+ pthread_spin_unlock(&xcon->sp);
+ }
+
+ return _send_message_impl(m, xcon);
+} /* send_message(Message* m, Connection *con) */
+
+int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
+{
int code = 0;
+ static uint32_t nreqs;
+ if (unlikely(XioPool::trace_mempool)) {
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
m->set_seq(xcon->state.next_out_seq());
m->set_magic(magic); // trace flags and special handling
conns_entity_map.insert(*xcon);
conns_sp.unlock();
+ /* XXXX pre-merge of session startup negotiation ONLY! */
+ xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
return xcon->get(); /* nref +1 */
}
} /* get_connection */
return (loop_con.get());
} /* get_loopback_connection */
+void XioMessenger::mark_down(const entity_addr_t& addr)
+{
+ entity_inst_t inst(entity_name_t(), addr);
+ Spinlock::Locker lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(inst, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+ }
+} /* mark_down(const entity_addr_t& */
+
+void XioMessenger::mark_down(Connection* con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+} /* mark_down(Connection*) */
+
+void XioMessenger::mark_down_all()
+{
+ Spinlock::Locker lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter;
+ for (conn_iter = conns_entity_map.begin(); conn_iter !=
+ conns_entity_map.begin(); ++conn_iter) {
+ (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+ }
+} /* mark_down_all */
+
+static inline XioMarkDownHook* pool_alloc_markdown_hook(
+ XioConnection *xcon, Message *m)
+{
+ struct xio_mempool_obj mp_mem;
+ int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
+ sizeof(XioMarkDownHook), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
+ new (hook) XioMarkDownHook(xcon, m, mp_mem);
+ return hook;
+}
+
+void XioMessenger::mark_down_on_empty(Connection* con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ MNop* m = new MNop();
+ m->tag = XIO_NOP_TAG_MARKDOWN;
+ m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
+ // stall new messages
+ xcon->cstate.session_state.set(XioConnection::BARRIER);
+ (void) _send_message_impl(m, xcon);
+}
+
+void XioMessenger::mark_disposable(Connection *con)
+{
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
void XioMessenger::try_insert(XioConnection *xcon)
{
Spinlock::Locker lckr(conns_sp);
Mutex sh_mtx;
Cond sh_cond;
+ friend class XioConnection;
+
public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce,
int _send_message(Message *m, const entity_inst_t &dest);
int _send_message(Message *m, Connection *con);
+ int _send_message_impl(Message *m, XioConnection *xcon);
uint32_t get_magic() { return magic; }
void set_magic(int _magic) { magic = _magic; }
virtual int send_keepalive(Connection *con)
{ return EINVAL; }
- virtual void mark_down(const entity_addr_t& a)
- { }
-
- virtual void mark_down(Connection *con)
- { }
-
- virtual void mark_down_on_empty(Connection *con)
- { }
-
- virtual void mark_disposable(Connection *con)
- { }
-
- virtual void mark_down_all()
- { }
+ virtual void mark_down(const entity_addr_t& a);
+ virtual void mark_down(Connection *con);
+ virtual void mark_down_all();
+ virtual void mark_down_on_empty(Connection *con);
+ virtual void mark_disposable(Connection *con);
void ds_dispatch(Message *m)
{ dispatch_strategy->ds_dispatch(m); }
#include "XioMsg.h"
-int XioCompletionHook::release_msgs()
+int XioDispatchHook::release_msgs()
{
XioRsp *xrsp;
int r = msg_seq.size();
#include "libxio.h"
}
#include "XioConnection.h"
+#include "XioSubmit.h"
#include "msg/msg_types.h"
#include "XioPool.h"
WRITE_CLASS_ENCODER(XioMsgHdr);
-struct XioSubmit
-{
-public:
- enum submit_type
- {
- OUTGOING_MSG,
- INCOMING_MSG_RELEASE
- };
- enum submit_type type;
- bi::list_member_hook<> submit_list;
- XioConnection *xcon;
-
- XioSubmit(enum submit_type _type, XioConnection *_xcon) :
- type(_type), xcon(_xcon)
- {}
-
- typedef bi::list< XioSubmit,
- bi::member_hook< XioSubmit,
- bi::list_member_hook<>,
- &XioSubmit::submit_list >
- > Queue;
-};
-
extern struct xio_mempool *xio_msgr_noreg_mpool;
#define XIO_MSGR_IOVLEN 16
}
};
-class XioCompletionHook : public Message::CompletionHook
+class XioDispatchHook : public Message::CompletionHook
{
private:
XioConnection *xcon;
public:
struct xio_mempool_obj mp_this;
- XioCompletionHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
+ XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
struct xio_mempool_obj& _mp) :
CompletionHook(_m),
xcon(_xcon->get()),
int release_msgs();
- XioCompletionHook* get() {
+ XioDispatchHook* get() {
nrefs.inc(); return this;
}
if (!cl_flag && release_msgs())
return;
struct xio_mempool_obj *mp = &this->mp_this;
- this->~XioCompletionHook();
- xpool_free(sizeof(XioCompletionHook), mp);
+ this->~XioDispatchHook();
+ xpool_free(sizeof(XioDispatchHook), mp);
}
}
this->finish(-1);
}
- ~XioCompletionHook() {
+ ~XioDispatchHook() {
--xcon->n_reqs; // atomicity by portal thread
xpool_dec_hookcnt();
xcon->put();
}
};
+/* A sender-side CompletionHook that relies on the on_msg_delivered
+ * to complete a pending mark down. */
+class XioMarkDownHook : public Message::CompletionHook
+{
+private:
+ XioConnection* xcon;
+
+public:
+ struct xio_mempool_obj mp_this;
+
+ XioMarkDownHook(
+ XioConnection* _xcon, Message *_m, struct xio_mempool_obj& _mp) :
+ CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
+ { }
+
+ virtual void claim(int r) {}
+
+ virtual void finish(int r) {
+ xcon->put();
+ struct xio_mempool_obj *mp = &this->mp_this;
+ this->~XioMarkDownHook();
+ xio_mempool_free(mp);
+ }
+
+ virtual void complete(int r) {
+ xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+ finish(r);
+ }
+};
+
struct XioRsp : public XioSubmit
{
- XioCompletionHook *xhook;
+ XioDispatchHook *xhook;
public:
- XioRsp(XioConnection *_xcon, XioCompletionHook *_xhook)
+ XioRsp(XioConnection *_xcon, XioDispatchHook *_xhook)
: XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
xhook(_xhook->get()) {
// submit queue ref
return xhook->get_seq().dequeue();
}
- XioCompletionHook *get_xhook() { return xhook; }
+ XioDispatchHook* get_xhook() { return xhook; }
void finalize() {
xcon->put();
pthread_spin_unlock(&lane->sp);
}
- void deq(XioSubmit::Queue &send_q)
+ void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
+ {
+ int size = requeue_q.size();
+ Lane* lane = get_lane(xcon);
+ pthread_spin_lock(&lane->sp);
+ XioSubmit::Queue::const_iterator i1 = lane->q.end();
+ lane->q.splice(i1, requeue_q);
+ lane->size += size;
+ pthread_spin_unlock(&lane->sp);
+ }
+
+ void deq(XioSubmit::Queue& send_q)
{
int ix;
Lane* lane;
};
}
+ void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
+ submit_q.enq(xcon, send_q);
+ }
+
+ void requeue_all_xcon(XioMsg* xmsg,
+ XioConnection* xcon,
+ XioSubmit::Queue::iterator& q_iter,
+ XioSubmit::Queue& send_q) {
+ // XXX gather all already-dequeued outgoing messages for xcon
+ // and push them in FIFO order to front of the input queue,
+ // having first marked the connection as flow-controlled
+ XioSubmit::Queue requeue_q;
+ XioSubmit *xs;
+ requeue_q.push_back(*xmsg);
+ ++q_iter;
+ while (q_iter != send_q.end()) {
+ xs = &(*q_iter);
+ // skip retires and anything for other connections
+ if ((xs->type != XioSubmit::OUTGOING_MSG) ||
+ (xs->xcon != xcon))
+ continue;
+ xmsg = static_cast<XioMsg*>(xs);
+ q_iter = send_q.erase(q_iter);
+ requeue_q.push_back(*xmsg);
+ }
+ pthread_spin_lock(&xcon->sp);
+ XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
+ xcon->outgoing.requeue.splice(i1, requeue_q);
+ xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
+ pthread_spin_unlock(&xcon->sp);
+ }
+
void *entry()
{
int size, code = 0;
do {
submit_q.deq(send_q);
- size = send_q.size();
/* shutdown() barrier */
pthread_spin_lock(&sp);
+ restart:
+ size = send_q.size();
+
if (_shutdown) {
+ // XXX XioMsg queues for flow-controlled connections may require
+ // cleanup
drained = true;
}
if (size > 0) {
- q_iter = send_q.begin();
- while (q_iter != send_q.end()) {
- xs = &(*q_iter);
- xcon = xs->xcon;
- xmsg = static_cast<XioMsg*>(xs);
-
- /* guard Accelio send queue */
- xio_qdepth_high = xcon->xio_qdepth_high_mark();
- if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth_high)) {
- ++q_iter;
- continue;
- }
-
- q_iter = send_q.erase(q_iter);
-
- switch (xs->type) {
- case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ q_iter = send_q.begin();
+ while (q_iter != send_q.end()) {
+ xs = &(*q_iter);
+ xcon = xs->xcon;
+ xmsg = static_cast<XioMsg*>(xs);
+
+ /* guard Accelio send queue */
+ xio_qdepth_high = xcon->xio_qdepth_high_mark();
+ if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
+ xio_qdepth_high)) {
+ requeue_all_xcon(xmsg, xcon, q_iter, send_q);
+ goto restart;
+ }
+
+ q_iter = send_q.erase(q_iter);
+
+ switch (xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
if (unlikely(!xs->xcon->conn))
- code = ENOTCONN;
+ code = ENOTCONN;
else {
- msg = &xmsg->req_0.msg;
- code = xio_send_msg(xcon->conn, msg);
- /* header trace moved here to capture xio serial# */
- if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
- print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
- print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
- }
+ msg = &xmsg->req_0.msg;
+ code = xio_send_msg(xcon->conn, msg);
+ /* header trace moved here to capture xio serial# */
+ if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
+ print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
+ print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
+ }
}
if (unlikely(code)) {
- xs->xcon->msg_send_fail(xmsg, code);
+ switch (code) {
+ case XIO_E_TX_QUEUE_OVERFLOW:
+ {
+ requeue_all_xcon(xmsg, xcon, q_iter, send_q);
+ goto restart;
+ }
+ break;
+ default:
+ xs->xcon->msg_send_fail(xmsg, code);
+ break;
+ };
} else {
- xs->xcon->send.set(msg->timestamp); // need atomic?
- xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
- }
- break;
- default:
- /* INCOMING_MSG_RELEASE */
- release_xio_rsp(static_cast<XioRsp*>(xs));
- break;
+ xs->xcon->send.set(msg->timestamp); // need atomic?
+ xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ release_xio_rsp(static_cast<XioRsp*>(xs));
+ break;
}
}
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_SUBMIT_H
+#define XIO_SUBMIT_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+#include "XioConnection.h"
+#include "msg/msg_types.h"
+#include "XioPool.h"
+
+namespace bi = boost::intrusive;
+
+class XioConnection;
+
+struct XioSubmit
+{
+public:
+ enum submit_type
+ {
+ OUTGOING_MSG,
+ INCOMING_MSG_RELEASE
+ };
+ enum submit_type type;
+ bi::list_member_hook<> submit_list;
+ XioConnection *xcon;
+
+ XioSubmit(enum submit_type _type, XioConnection *_xcon) :
+ type(_type), xcon(_xcon)
+ {}
+
+ typedef bi::list< XioSubmit,
+ bi::member_hook< XioSubmit,
+ bi::list_member_hook<>,
+ &XioSubmit::submit_list >
+ > Queue;
+};
+
+#endif /* XIO_SUBMIT_H */