cmake_minimum_required(VERSION 2.8.11)
project(Ceph)
-set(VERSION "0.88")
+set(VERSION "0.90")
-# Tweak policies (this one disables "missing" dependency warning)
-cmake_policy(SET CMP0046 OLD)
+if (NOT (CMAKE_MAJOR_VERSION LESS 3))
+ # Tweak policies (this one disables "missing" dependency warning)
+ cmake_policy(SET CMP0046 OLD)
+endif(NOT (CMAKE_MAJOR_VERSION LESS 3))
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/modules/")
${PROJECT_BINARY_DIR}/src/include
${OFED_PREFIX}/include
${LEVELDB_PREFIX}/include
+ ${PROJECT_SOURCE_DIR}/src
)
link_directories(
OUTPUT_VARIABLE PYTHON_INSTDIR
OUTPUT_STRIP_TRAILING_WHITESPACE)
-include_directories(".")
-
if(${WITH_TCMALLOC})
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free")
set(TCMALLOC_LIBS tcmalloc)
common/ceph_json.cc
common/ipaddr.cc
common/pick_address.cc
+ common/address_helper.cc
common/linux_version.c
osdc/Striper.cc
osdc/Objecter.cc
return CEPH_AUTH_NONE;
}
int handle_request(bufferlist::iterator& indata, bufferlist& result_bl, uint64_t& global_id, AuthCapsInfo& caps, uint64_t *auid = NULL) {
- assert(0); // shouldn't get called
return 0;
}
void build_cephx_response_header(int request_type, int status, bufferlist& bl) { }
--- /dev/null
+/*
+ * address_helper.cc
+ *
+ * Created on: Oct 27, 2013
+ * Author: matt
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "boost/regex.hpp"
+
+#include "common/address_helper.h"
+
+#include <arpa/inet.h>
+
+// decode strings like "tcp://<host>:<port>"
+int entity_addr_from_url(entity_addr_t *addr /* out */, const char *url)
+{
+ using namespace boost;
+ using std::endl;
+
+ struct addrinfo hints;
+ struct addrinfo *res;
+
+ regex expr("(tcp|rdma)://([^:]*):([\\d]+)");
+ cmatch m;
+
+ if (regex_match(url, m, expr)) {
+ int error;
+ string host(m[2].first, m[2].second);
+ string port(m[3].first, m[3].second);
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ error = getaddrinfo(host.c_str(), NULL, &hints, &res);
+ if (! error) {
+ struct sockaddr_in *sin;
+ struct sockaddr_in6 *sin6;
+ addr->addr.ss_family = res->ai_family;
+ switch(res->ai_family) {
+ case AF_INET:
+ sin = (struct sockaddr_in *) res->ai_addr;
+ memcpy(&addr->addr4.sin_addr, &sin->sin_addr,
+ sizeof(sin->sin_addr));
+ break;
+ case AF_INET6:
+ sin6 = (struct sockaddr_in6 *) res->ai_addr;
+ memcpy(&addr->addr6.sin6_addr, &sin6->sin6_addr,
+ sizeof(sin6->sin6_addr));
+ break;
+ default:
+ break;
+ };
+ addr->set_port(std::atoi(port.c_str()));
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
+int entity_addr_from_sockaddr(entity_addr_t *addr /* out */,
+ const struct sockaddr *sa)
+{
+ struct sockaddr_in *sin;
+ struct sockaddr_in6 *sin6;
+
+ if (! sa)
+ return 0;
+
+ addr->addr.ss_family = sa->sa_family;
+ switch(sa->sa_family) {
+ case AF_INET:
+ sin = (struct sockaddr_in *) sa;
+ memcpy(&addr->addr4.sin_addr, &sin->sin_addr,
+ sizeof(sin->sin_addr));
+ addr->addr4.sin_port = sin->sin_port;
+ break;
+ case AF_INET6:
+ sin6 = (struct sockaddr_in6 *) sa;
+ memcpy(&addr->addr6.sin6_addr, &sin6->sin6_addr,
+ sizeof(sin6->sin6_addr));
+ addr->addr6.sin6_port = sin6->sin6_port;
+ break;
+ default:
+ break;
+ };
+
+ return 1;
+}
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef ADDRESS_HELPER_H_
+#define ADDRESS_HELPER_H_
+
+#include "msg/msg_types.h"
+
+int entity_addr_from_url(entity_addr_t *addr /* out */, const char *url);
+int entity_addr_from_sockaddr(entity_addr_t *addr /* out */,
+ const struct sockaddr *sa);
+
+#endif /* ADDRESS_HELPER_H_ */
OPTION(enable_experimental_unrecoverable_data_corrupting_features, OPT_STR, "")
+OPTION(xio_trace_mempool, OPT_BOOL, false) // mempool allocation counters
+OPTION(xio_trace_msgcnt, OPT_BOOL, false) // incoming/outgoing msg counters
+OPTION(xio_trace_xcon, OPT_BOOL, false) // Xio message encode/decode trace
+OPTION(xio_queue_depth, OPT_INT, 512) // depth of Accelio msg queue
+OPTION(xio_mp_min, OPT_INT, 128) // default min mempool size
+OPTION(xio_mp_max_64, OPT_INT, 65536) // max 64-byte chunks (buffer is 40)
+OPTION(xio_mp_max_256, OPT_INT, 8192) // max 256-byte chunks
+OPTION(xio_mp_max_1k, OPT_INT, 8192) // max 1K chunks
+OPTION(xio_mp_max_page, OPT_INT, 4096) // max 1K chunks
+OPTION(xio_mp_max_hint, OPT_INT, 4096) // max size-hint chunks
+OPTION(xio_portal_threads, OPT_INT, 2) // xio portal threads per messenger
+
DEFAULT_SUBSYS(0, 5)
SUBSYS(lockdep, 0, 1)
SUBSYS(context, 0, 1)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)
SUBSYS(refs, 0, 0)
+SUBSYS(xio, 1, 5)
OPTION(key, OPT_STR, "")
OPTION(keyfile, OPT_STR, "")
#ifndef CEPH_MDATAPING_H
#define CEPH_MDATAPING_H
-#if defined(HAVE_XIO)
-
#include "msg/Message.h"
#include "messages/MPing.h"
+#if defined(HAVE_XIO)
extern "C" {
#include "libxio.h"
}
+#endif /* HAVE_XIO */
-typedef void (*mdata_hook_func)(struct xio_mempool_obj *mp);
+#if !defined(HAVE_XIO)
+struct xio_mempool_obj {};
+#endif
+typedef void (*mdata_hook_func)(struct xio_mempool_obj *mp);
class MDataPing : public Message {
}
};
-#endif /* HAVE_XIO */
#endif /* CEPH_MDATAPING_H */
#include "msg/simple/SimpleMessenger.h"
#include "msg/async/AsyncMessenger.h"
+#ifdef HAVE_XIO
+#include "msg/xio/XioMessenger.h"
+#endif
Messenger *Messenger::create(CephContext *cct, const string &type,
- entity_name_t name, string lname,
- uint64_t nonce)
+ entity_name_t name, string lname,
+ uint64_t nonce)
{
int r = -1;
if (type == "random")
- r = rand() % 2;
+ r = rand() % 2; // random does not include xio
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, lname, nonce);
else if ((r == 1 || type == "async") &&
cct->check_experimental_feature_enabled("ms-type-async"))
return new AsyncMessenger(cct, name, lname, nonce);
+#ifdef HAVE_XIO
+ else if ((type == "xio") &&
+ cct->check_experimental_feature_enabled("ms-type-xio"))
+ return new XioMessenger(cct, name, lname, nonce);
+#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return NULL;
}
int default_send_priority;
/// set to true once the Messenger has started, and set to false on shutdown
bool started;
+ uint32_t magic;
public:
/**
Messenger(CephContext *cct_, entity_name_t w)
: my_inst(),
default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
- cct(cct_),
+ magic(0), cct(cct_),
crcflags(get_default_crc_flags(cct->_conf))
{
my_inst.name = w;
* set messenger's instance
*/
void set_myinst(entity_inst_t i) { my_inst = i; }
+
+ uint32_t get_magic() { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+
/**
* Retrieve the Messenger's address.
*
/**
* set messenger's address
*/
- void set_myaddr(const entity_addr_t& a) { my_inst.addr = a; }
+ virtual void set_myaddr(const entity_addr_t& a) { my_inst.addr = a; }
public:
/**
* Retrieve the Messenger's name.
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef DISPATCH_STRATEGY_H
+#define DISPATCH_STRATEGY_H
+
+#include "msg/Message.h"
+
+class Messenger;
+
+class DispatchStrategy
+{
+protected:
+ Messenger *msgr;
+public:
+ DispatchStrategy() {}
+ Messenger *get_messenger() { return msgr; }
+ void set_messenger(Messenger *_msgr) { msgr = _msgr; }
+ virtual void ds_dispatch(Message *m) = 0;
+ virtual void shutdown() = 0;
+ virtual void start() = 0;
+ virtual void wait() = 0;
+ virtual ~DispatchStrategy() {}
+};
+
+#endif /* DISPATCH_STRATEGY_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef FAST_STRATEGY_H
+#define FAST_STRATEGY_H
+#include "DispatchStrategy.h"
+
+class FastStrategy : public DispatchStrategy {
+public:
+ FastStrategy() {}
+ virtual void ds_dispatch(Message *m) {
+ msgr->ms_fast_preprocess(m);
+ if (msgr->ms_can_fast_dispatch(m))
+ msgr->ms_fast_dispatch(m);
+ else
+ msgr->ms_deliver_dispatch(m);
+ }
+ virtual void shutdown() {}
+ virtual void start() {}
+ virtual void wait() {}
+ virtual ~FastStrategy() {}
+};
+#endif /* FAST_STRATEGY_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "QueueStrategy.h"
+#define dout_subsys ceph_subsys_ms
+#include "common/debug.h"
+
+QueueStrategy::QueueStrategy(int _n_threads)
+ : lock("QueueStrategy::lock"),
+ n_threads(_n_threads),
+ stop(false),
+ mqueue(),
+ disp_threads()
+{
+}
+
+void QueueStrategy::ds_dispatch(Message *m) {
+ msgr->ms_fast_preprocess(m);
+ if (msgr->ms_can_fast_dispatch(m)) {
+ msgr->ms_fast_dispatch(m);
+ return;
+ }
+ lock.Lock();
+ mqueue.push_back(*m);
+ if (disp_threads.size()) {
+ if (! disp_threads.empty()) {
+ QSThread *thrd = &disp_threads.front();
+ disp_threads.pop_front();
+ thrd->cond.Signal();
+ }
+ }
+ lock.Unlock();
+}
+
+void QueueStrategy::entry(QSThread *thrd)
+{
+ Message *m = NULL;
+ for (;;) {
+ lock.Lock();
+ for (;;) {
+ if (! mqueue.empty()) {
+ m = &(mqueue.front());
+ mqueue.pop_front();
+ break;
+ }
+ m = NULL;
+ if (stop)
+ break;
+ disp_threads.push_front(*thrd);
+ thrd->cond.Wait(lock);
+ }
+ lock.Unlock();
+ if (stop) {
+ if (!m) break;
+ m->put();
+ continue;
+ }
+ get_messenger()->ms_deliver_dispatch(m);
+ }
+}
+
+void QueueStrategy::shutdown()
+{
+ QSThread *thrd;
+ lock.Lock();
+ stop = true;
+ while (disp_threads.size()) {
+ thrd = &(disp_threads.front());
+ disp_threads.pop_front();
+ thrd->cond.Signal();
+ }
+ lock.Unlock();
+}
+
+void QueueStrategy::wait()
+{
+ QSThread *thrd;
+ lock.Lock();
+ assert(stop);
+ while (disp_threads.size()) {
+ thrd = &(disp_threads.front());
+ disp_threads.pop_front();
+ lock.Unlock();
+
+ // join outside of lock
+ thrd->join();
+
+ lock.Lock();
+ }
+ lock.Unlock();
+}
+
+void QueueStrategy::start()
+{
+ QSThread *thrd;
+ assert(!stop);
+ lock.Lock();
+ for (int ix = 0; ix < n_threads; ++ix) {
+ thrd = new QSThread(this);
+ thrd->create();
+ }
+ lock.Unlock();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef QUEUE_STRATEGY_H
+#define QUEUE_STRATEGY_H
+
+#include <boost/intrusive/list.hpp>
+#include "DispatchStrategy.h"
+#include "msg/Messenger.h"
+
+namespace bi = boost::intrusive;
+
+class QueueStrategy : public DispatchStrategy {
+ Mutex lock;
+ int n_threads;
+ bool stop;
+
+ Message::Queue mqueue;
+
+ class QSThread : public Thread {
+ public:
+ bi::list_member_hook<> thread_q;
+ QueueStrategy *dq;
+ Cond cond;
+ QSThread(QueueStrategy *dq) : thread_q(), dq(dq), cond() {}
+ void* entry() {
+ dq->entry(this);
+ delete(this);
+ return NULL;
+ }
+
+ typedef bi::list< QSThread,
+ bi::member_hook< QSThread,
+ bi::list_member_hook<>,
+ &QSThread::thread_q > > Queue;
+ };
+
+ QSThread::Queue disp_threads;
+
+public:
+ QueueStrategy(int n_threads);
+ virtual void ds_dispatch(Message *m);
+ virtual void shutdown();
+ virtual void start();
+ virtual void wait();
+ void entry(QSThread *thrd);
+ virtual ~QueueStrategy() {}
+};
+#endif /* QUEUE_STRATEGY_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioMsg.h"
+#include "XioConnection.h"
+#include "XioMessenger.h"
+#include "messages/MDataPing.h"
+
+#include "auth/none/AuthNoneProtocol.h" // XXX
+
+#include "include/assert.h"
+#include "common/dout.h"
+
+extern struct xio_mempool *xio_msgr_mpool;
+extern struct xio_mempool *xio_msgr_noreg_mpool;
+
+#define dout_subsys ceph_subsys_xio
+
+void print_xio_msg_hdr(CephContext *cct, const char *tag,
+ const XioMsgHdr &hdr, const struct xio_msg *msg)
+{
+ if (msg) {
+ ldout(cct,4) << tag <<
+ " xio msg:" <<
+ " sn: " << msg->sn <<
+ " timestamp: " << msg->timestamp <<
+ dendl;
+ }
+
+ ldout(cct,4) << tag <<
+ " ceph header: " <<
+ " front_len: " << hdr.hdr->front_len <<
+ " seq: " << hdr.hdr->seq <<
+ " tid: " << hdr.hdr->tid <<
+ " type: " << hdr.hdr->type <<
+ " prio: " << hdr.hdr->priority <<
+ " name type: " << (int) hdr.hdr->src.type <<
+ " name num: " << (int) hdr.hdr->src.num <<
+ " version: " << hdr.hdr->version <<
+ " compat_version: " << hdr.hdr->compat_version <<
+ " front_len: " << hdr.hdr->front_len <<
+ " middle_len: " << hdr.hdr->middle_len <<
+ " data_len: " << hdr.hdr->data_len <<
+ " xio header: " <<
+ " msg_cnt: " << hdr.msg_cnt <<
+ dendl;
+
+ ldout(cct,4) << tag <<
+ " ceph footer: " <<
+ " front_crc: " << hdr.ftr->front_crc <<
+ " middle_crc: " << hdr.ftr->middle_crc <<
+ " data_crc: " << hdr.ftr->data_crc <<
+ " sig: " << hdr.ftr->sig <<
+ " flags: " << (uint32_t) hdr.ftr->flags <<
+ dendl;
+}
+
+void print_ceph_msg(CephContext *cct, const char *tag, Message *m)
+{
+ if (m->get_magic() & (MSG_MAGIC_XIO & MSG_MAGIC_TRACE_DTOR)) {
+ ceph_msg_header& header = m->get_header();
+ ldout(cct,4) << tag << " header version " << header.version <<
+ " compat version " << header.compat_version <<
+ dendl;
+ }
+}
+
+XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
+ const entity_inst_t& _peer) :
+ Connection(m->cct, m),
+ xio_conn_type(_type),
+ portal(m->default_portal()),
+ connected(false),
+ peer(_peer),
+ session(NULL),
+ conn(NULL),
+ magic(m->get_magic()),
+ scount(0),
+ send_ctr(0),
+ in_seq()
+{
+ pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
+ if (xio_conn_type == XioConnection::ACTIVE)
+ peer_addr = peer.addr;
+ peer_type = peer.name.type();
+ set_peer_addr(peer.addr);
+
+ /* XXXX fake features, aieee! */
+ set_features(XIO_ALL_FEATURES);
+}
+
+int XioConnection::send_message(Message *m)
+{
+ XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
+ return ms->_send_message(m, this);
+}
+
+int XioConnection::passive_setup()
+{
+ /* XXX passive setup is a placeholder for (potentially active-side
+ initiated) feature and auth* negotiation */
+ static bufferlist authorizer_reply; /* static because fake */
+ static CryptoKey session_key; /* ditto */
+ bool authorizer_valid;
+
+ XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
+
+ // fake an auth buffer
+ EntityName name;
+ name.set_type(peer.name.type());
+
+ AuthNoneAuthorizer auth;
+ auth.build_authorizer(name, peer.name.num());
+
+ /* XXX fake authorizer! */
+ msgr->ms_deliver_verify_authorizer(
+ this, peer_type, CEPH_AUTH_NONE,
+ auth.bl,
+ authorizer_reply,
+ authorizer_valid,
+ session_key);
+
+ /* notify hook */
+ msgr->ms_deliver_handle_accept(this);
+
+ /* try to insert in conns_entity_map */
+ msgr->try_insert(this);
+ return (0);
+}
+
+#define uint_to_timeval(tv, s) ((tv).tv_sec = (s), (tv).tv_usec = 0)
+
+static inline XioCompletionHook* pool_alloc_xio_completion_hook(
+ XioConnection *xcon, Message *m, XioInSeq& msg_seq)
+{
+ struct xio_mempool_obj mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool,
+ sizeof(XioCompletionHook), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioCompletionHook *xhook = (XioCompletionHook*) mp_mem.addr;
+ new (xhook) XioCompletionHook(xcon, m, msg_seq, mp_mem);
+ return xhook;
+}
+
+int XioConnection::on_msg_req(struct xio_session *session,
+ struct xio_msg *req,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ struct xio_msg *treq = req;
+
+ /* XXX Accelio guarantees message ordering at
+ * xio_session */
+
+ if (! in_seq.p()) {
+ if (!treq->in.header.iov_len) {
+ derr << __func__ << " empty header: packet out of sequence?" << dendl;
+ xio_release_msg(req);
+ return 0;
+ }
+ XioMsgCnt msg_cnt(
+ buffer::create_static(treq->in.header.iov_len,
+ (char*) treq->in.header.iov_base));
+ ldout(msgr->cct,10) << __func__ << " receive req " << "treq " << treq
+ << " msg_cnt " << msg_cnt.msg_cnt
+ << " iov_base " << treq->in.header.iov_base
+ << " iov_len " << (int) treq->in.header.iov_len
+ << " nents " << treq->in.pdata_iov.nents
+ << " conn " << conn << " sess " << session
+ << " sn " << treq->sn << dendl;
+ assert(session == this->session);
+ in_seq.set_count(msg_cnt.msg_cnt);
+ } else {
+ /* XXX major sequence error */
+ assert(! treq->in.header.iov_len);
+ }
+
+ in_seq.append(req);
+ if (in_seq.count() > 0) {
+ return 0;
+ }
+
+ XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
+ XioCompletionHook *m_hook =
+ pool_alloc_xio_completion_hook(this, NULL /* msg */, in_seq);
+ XioInSeq& msg_seq = m_hook->msg_seq;
+ in_seq.clear();
+
+ ceph_msg_header header;
+ ceph_msg_footer footer;
+ buffer::list payload, middle, data;
+
+ struct timeval t1, t2;
+
+ ldout(msgr->cct,4) << __func__ << " " << "msg_seq.size()=" << msg_seq.size() <<
+ dendl;
+
+ struct xio_msg* msg_iter = msg_seq.begin();
+ treq = msg_iter;
+ XioMsgHdr hdr(header, footer,
+ buffer::create_static(treq->in.header.iov_len,
+ (char*) treq->in.header.iov_base));
+
+ uint_to_timeval(t1, treq->timestamp);
+
+ if (magic & (MSG_MAGIC_TRACE_XCON)) {
+ if (hdr.hdr->type == 43) {
+ print_xio_msg_hdr(msgr->cct, "on_msg_req", hdr, NULL);
+ }
+ }
+
+ unsigned int ix, blen, iov_len;
+ struct xio_iovec_ex *msg_iov, *iovs;
+ uint32_t take_len, left_len = 0;
+ char *left_base = NULL;
+
+ ix = 0;
+ blen = header.front_len;
+
+ while (blen && (msg_iter != msg_seq.end())) {
+ treq = msg_iter;
+ iov_len = vmsg_sglist_nents(&treq->in);
+ iovs = vmsg_sglist(&treq->in);
+ for (; blen && (ix < iov_len); ++ix) {
+ msg_iov = &iovs[ix];
+
+ /* XXX need to detect any buffer which needs to be
+ * split due to coalescing of a segment (front, middle,
+ * data) boundary */
+
+ take_len = MIN(blen, msg_iov->iov_len);
+ payload.append(
+ buffer::create_msg(
+ take_len, (char*) msg_iov->iov_base, m_hook));
+ blen -= take_len;
+ if (! blen) {
+ left_len = msg_iov->iov_len - take_len;
+ if (left_len) {
+ left_base = ((char*) msg_iov->iov_base) + take_len;
+ }
+ }
+ }
+ /* XXX as above, if a buffer is split, then we needed to track
+ * the new start (carry) and not advance */
+ if (ix == iov_len) {
+ msg_seq.next(&msg_iter);
+ ix = 0;
+ }
+ }
+
+ if (magic & (MSG_MAGIC_TRACE_XCON)) {
+ if (hdr.hdr->type == 43) {
+ ldout(msgr->cct,4) << "front (payload) dump:";
+ payload.hexdump( *_dout );
+ *_dout << dendl;
+ }
+ }
+
+ blen = header.middle_len;
+
+ if (blen && left_len) {
+ middle.append(
+ buffer::create_msg(left_len, left_base, m_hook));
+ left_len = 0;
+ }
+
+ while (blen && (msg_iter != msg_seq.end())) {
+ treq = msg_iter;
+ iov_len = vmsg_sglist_nents(&treq->in);
+ iovs = vmsg_sglist(&treq->in);
+ for (; blen && (ix < iov_len); ++ix) {
+ msg_iov = &iovs[ix];
+ take_len = MIN(blen, msg_iov->iov_len);
+ middle.append(
+ buffer::create_msg(
+ take_len, (char*) msg_iov->iov_base, m_hook));
+ blen -= take_len;
+ if (! blen) {
+ left_len = msg_iov->iov_len - take_len;
+ if (left_len) {
+ left_base = ((char*) msg_iov->iov_base) + take_len;
+ }
+ }
+ }
+ if (ix == iov_len) {
+ msg_seq.next(&msg_iter);
+ ix = 0;
+ }
+ }
+
+ blen = header.data_len;
+
+ if (blen && left_len) {
+ data.append(
+ buffer::create_msg(left_len, left_base, m_hook));
+ left_len = 0;
+ }
+
+ while (blen && (msg_iter != msg_seq.end())) {
+ treq = msg_iter;
+ iov_len = vmsg_sglist_nents(&treq->in);
+ iovs = vmsg_sglist(&treq->in);
+ for (; blen && (ix < iov_len); ++ix) {
+ msg_iov = &iovs[ix];
+ data.append(
+ buffer::create_msg(
+ msg_iov->iov_len, (char*) msg_iov->iov_base, m_hook));
+ blen -= msg_iov->iov_len;
+ }
+ if (ix == iov_len) {
+ msg_seq.next(&msg_iter);
+ ix = 0;
+ }
+ }
+
+ uint_to_timeval(t2, treq->timestamp);
+
+ /* update connection timestamp */
+ recv.set(treq->timestamp);
+
+ Message *m =
+ decode_message(msgr->cct, msgr->crcflags, header, footer, payload, middle,
+ data);
+
+ if (m) {
+ /* completion */
+ m->set_connection(this);
+
+ /* reply hook */
+ m_hook->set_message(m);
+ m->set_completion_hook(m_hook);
+
+ /* trace flag */
+ m->set_magic(magic);
+
+ /* update timestamps */
+ m->set_recv_stamp(t1);
+ m->set_recv_complete_stamp(t2);
+ m->set_seq(header.seq);
+
+ /* MP-SAFE */
+ state.set_in_seq(header.seq);
+
+ /* XXXX validate peer type */
+ if (peer_type != (int) hdr.peer_type) { /* XXX isn't peer_type -1? */
+ peer_type = hdr.peer_type;
+ peer_addr = hdr.addr;
+ peer.addr = peer_addr;
+ peer.name = hdr.hdr->src;
+ if (xio_conn_type == XioConnection::PASSIVE) {
+ /* XXX kick off feature/authn/authz negotiation
+ * nb: very possibly the active side should initiate this, but
+ * for now, call a passive hook so OSD and friends can create
+ * sessions without actually negotiating
+ */
+ passive_setup();
+ }
+ }
+
+ if (magic & (MSG_MAGIC_TRACE_XCON)) {
+ ldout(msgr->cct,4) << "decode m is " << m->get_type() << dendl;
+ }
+
+ /* dispatch it */
+ msgr->ds_dispatch(m);
+ } else {
+ /* responds for undecoded messages and frees hook */
+ ldout(msgr->cct,4) << "decode m failed" << dendl;
+ m_hook->on_err_finalize(this);
+ }
+
+ return 0;
+}
+
+int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
+ struct xio_msg *req,
+ void *conn_user_context)
+{
+ /* requester send complete (one-way) */
+ uint64_t rc = ++scount;
+
+ XioMsg* xmsg = static_cast<XioMsg*>(req->user_context);
+ if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) {
+ if (unlikely((rc % 1000000) == 0)) {
+ std::cout << "xio finished " << rc << " " << time(0) << std::endl;
+ }
+ } /* trace ctr */
+
+ ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xmsg->xcon <<
+ " session: " << session << " msg: " << req << " sn: " << req->sn <<
+ " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
+ " seq: " << xmsg->m->get_seq() << dendl;
+
+ --send_ctr; /* atomic, because portal thread */
+ xmsg->put();
+
+ return 0;
+} /* on_msg_delivered */
+
+void XioConnection::msg_send_fail(XioMsg *xmsg, int code)
+{
+ ldout(msgr->cct,4) << "xio_send_msg FAILED " << &xmsg->req_0.msg << " code=" << code <<
+ " (" << xio_strerror(code) << ")" << dendl;
+ /* return refs taken for each xio_msg */
+ xmsg->put_msg_refs();
+} /* msg_send_fail */
+
+void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
+{
+ ldout(msgr->cct,4) << "xio_release_msg FAILED " << msg << "code=" << code <<
+ " (" << xio_strerror(code) << ")" << dendl;
+} /* msg_release_fail */
+
+int XioConnection::on_msg_error(struct xio_session *session,
+ enum xio_status error,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioMsg *xmsg = static_cast<XioMsg*>(msg->user_context);
+ if (xmsg)
+ xmsg->put();
+
+ --send_ctr; /* atomic, because portal thread */
+ return 0;
+} /* on_msg_error */
+
+
+int XioLoopbackConnection::send_message(Message *m)
+{
+ XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
+ m->set_connection(this);
+ m->set_seq(next_seq());
+ m->set_src(ms->get_myinst().name);
+ ms->ds_dispatch(m);
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_CONNECTION_H
+#define XIO_CONNECTION_H
+
+#include <boost/intrusive/avl_set.hpp>
+#include <boost/intrusive/list.hpp>
+extern "C" {
+#include "libxio.h"
+}
+#include "XioInSeq.h"
+#include "msg/Connection.h"
+#include "msg/Messenger.h"
+#include "include/atomic.h"
+
+#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \
+ ~CEPH_FEATURE_MSGR_KEEPALIVE2)
+
+namespace bi = boost::intrusive;
+
+class XioPortal;
+class XioMessenger;
+class XioMsg;
+
+class XioConnection : public Connection
+{
+public:
+ enum type { ACTIVE, PASSIVE };
+
+private:
+ XioConnection::type xio_conn_type;
+ XioPortal *portal;
+ atomic_t connected;
+ entity_inst_t peer;
+ struct xio_session *session;
+ struct xio_connection *conn;
+ pthread_spinlock_t sp;
+ atomic_t send;
+ atomic_t recv;
+ uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
+ uint32_t magic;
+ uint32_t special_handling;
+ uint64_t scount;
+ uint32_t send_ctr;
+
+ struct lifecycle {
+ // different from Pipe states?
+ enum lf_state {
+ INIT,
+ LOCAL_DISCON,
+ REMOTE_DISCON,
+ RECONNECTING,
+ UP,
+ DEAD } state;
+
+ /* XXX */
+ uint32_t reconnects;
+ uint32_t connect_seq, peer_global_seq;
+ uint32_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+ atomic_t out_seq; // atomic<uint32_t>
+
+ lifecycle() : state(lifecycle::INIT), in_seq(0), out_seq(0) {}
+
+ void set_in_seq(uint32_t seq) {
+ in_seq = seq;
+ }
+
+ uint32_t next_out_seq() {
+ return out_seq.inc();
+ };
+
+ } state;
+
+ /* batching */
+ XioInSeq in_seq;
+
+ // conns_entity_map comparison functor
+ struct EntityComp
+ {
+ // for internal ordering
+ bool operator()(const XioConnection &lhs, const XioConnection &rhs) const
+ { return lhs.get_peer() < rhs.get_peer(); }
+
+ // for external search by entity_inst_t(peer)
+ bool operator()(const entity_inst_t &peer, const XioConnection &c) const
+ { return peer < c.get_peer(); }
+
+ bool operator()(const XioConnection &c, const entity_inst_t &peer) const
+ { return c.get_peer() < peer; }
+ };
+
+ bi::list_member_hook<> conns_hook;
+ bi::avl_set_member_hook<> conns_entity_map_hook;
+
+ typedef bi::list< XioConnection,
+ bi::member_hook<XioConnection, bi::list_member_hook<>,
+ &XioConnection::conns_hook > > ConnList;
+
+ typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
+ &XioConnection::conns_entity_map_hook> EntityHook;
+
+ typedef bi::avl_set< XioConnection, EntityHook,
+ bi::compare<EntityComp> > EntitySet;
+
+ friend class XioPortal;
+ friend class XioMessenger;
+ friend class XioCompletionHook;
+ friend class XioMsg;
+
+ int on_disconnect_event() {
+ connected.set(false);
+ pthread_spin_lock(&sp);
+ if (!conn)
+ this->put();
+ pthread_spin_unlock(&sp);
+ return 0;
+ }
+
+ int on_teardown_event() {
+ pthread_spin_lock(&sp);
+ if (conn)
+ xio_connection_destroy(conn);
+ conn = NULL;
+ pthread_spin_unlock(&sp);
+ this->put();
+ return 0;
+ }
+
+ int xio_queue_depth() {
+ return msgr->cct->_conf->xio_queue_depth;
+ }
+
+public:
+ XioConnection(XioMessenger *m, XioConnection::type _type,
+ const entity_inst_t& peer);
+
+ ~XioConnection() {
+ if (conn)
+ xio_connection_destroy(conn);
+ }
+
+ bool is_connected() { return connected.read(); }
+
+ int send_message(Message *m);
+ void send_keepalive() {}
+ void mark_down() {}
+ void mark_disposable() {}
+
+ const entity_inst_t& get_peer() const { return peer; }
+
+ XioConnection* get() {
+#if 0
+ int refs = nref.read();
+ cout << "XioConnection::get " << this << " " << refs << std::endl;
+#endif
+ RefCountedObject::get();
+ return this;
+ }
+
+ void put() {
+ RefCountedObject::put();
+#if 0
+ int refs = nref.read();
+ cout << "XioConnection::put " << this << " " << refs << std::endl;
+#endif
+ }
+
+ void disconnect() {
+ if (is_connected()) {
+ connected.set(false);
+ xio_disconnect(conn); // normal teardown will clean up conn
+ }
+ }
+
+ uint32_t get_magic() { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+ uint32_t get_special_handling() { return special_handling; }
+ void set_special_handling(int n) { special_handling = n; }
+ uint64_t get_scount() { return scount; }
+
+ int passive_setup(); /* XXX */
+
+ int on_msg_req(struct xio_session *session, struct xio_msg *req,
+ int more_in_batch, void *cb_user_context);
+
+ int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
+ void *conn_user_context);
+
+ int on_msg_error(struct xio_session *session, enum xio_status error,
+ struct xio_msg *msg, void *conn_user_context);
+
+ void msg_send_fail(XioMsg *xmsg, int code);
+
+ void msg_release_fail(struct xio_msg *msg, int code);
+};
+
+typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
+
+class XioLoopbackConnection : public Connection
+{
+private:
+ atomic_t seq;
+public:
+ XioLoopbackConnection(Messenger *m) : Connection(m->cct, m), seq(0)
+ {
+ const entity_inst_t& m_inst = m->get_myinst();
+ peer_addr = m_inst.addr;
+ peer_type = m_inst.name.type();
+ set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
+ }
+
+ XioLoopbackConnection* get() {
+ return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
+ }
+
+ virtual bool is_connected() { return true; }
+
+ int send_message(Message *m);
+ void send_keepalive() {}
+ void mark_down() {}
+ void mark_disposable() {}
+
+ uint32_t get_seq() {
+ return seq.read();
+ }
+
+ uint32_t next_seq() {
+ return seq.inc();
+ }
+};
+
+typedef boost::intrusive_ptr<XioLoopbackConnection> LoopbackConnectionRef;
+
+#endif /* XIO_CONNECTION_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_IN_SEQ_H
+#define XIO_IN_SEQ_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+
+/* For inbound messages (Accelio-owned) ONLY, use the message's
+ * user_context as an SLIST */
+class XioInSeq {
+private:
+ int cnt;
+ int sz;
+ struct xio_msg* head;
+ struct xio_msg* tail;
+
+public:
+ XioInSeq() : cnt(0), sz(0), head(NULL), tail(NULL) {}
+ XioInSeq(const XioInSeq& seq) {
+ cnt = seq.cnt;
+ sz = seq.sz;
+ head = seq.head;
+ tail = seq.tail;
+ }
+
+ int count() { return cnt; }
+
+ int size() { return sz; }
+
+ bool p() { return !!head; }
+
+ void set_count(int _cnt) { cnt = _cnt; }
+
+ void append(struct xio_msg* msg) {
+ msg->user_context = NULL;
+ if (!head) {
+ head = tail = msg;
+ } else {
+ tail->user_context = msg;
+ tail = msg;
+ }
+ ++sz;
+ --cnt;
+ }
+
+ struct xio_msg* begin() { return head; }
+
+ struct xio_msg* end() { return NULL; }
+
+ void next(struct xio_msg** msg) {
+ *msg = static_cast<struct xio_msg *>((*msg)->user_context);
+ }
+
+ struct xio_msg* dequeue() {
+ struct xio_msg* msgs = head;
+ clear();
+ return msgs;
+ }
+
+ void clear() {
+ head = tail = NULL;
+ cnt = 0;
+ sz = 0;
+ }
+};
+
+#endif /* XIO_IN_SEQ_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <arpa/inet.h>
+#include <boost/lexical_cast.hpp>
+#include <set>
+#include <stdlib.h>
+#include <memory>
+
+#include "XioMsg.h"
+#include "XioMessenger.h"
+#include "common/address_helper.h"
+
+#define dout_subsys ceph_subsys_xio
+
+Mutex mtx("XioMessenger Package Lock");
+atomic_t initialized;
+
+atomic_t XioMessenger::nInstances;
+
+struct xio_mempool *xio_msgr_noreg_mpool;
+
+static struct xio_session_ops xio_msgr_ops;
+
+/* Accelio API callouts */
+
+/* string table */
+static const char *xio_session_event_types[] =
+{ "XIO_SESSION_REJECT_EVENT",
+ "XIO_SESSION_TEARDOWN_EVENT",
+ "XIO_SESSION_NEW_CONNECTION_EVENT",
+ "XIO_SESSION_CONNECTION_ESTABLISHED_EVENT",
+ "XIO_SESSION_CONNECTION_TEARDOWN_EVENT",
+ "XIO_SESSION_CONNECTION_CLOSED_EVENT",
+ "XIO_SESSION_CONNECTION_DISCONNECTED_EVENT",
+ "XIO_SESSION_CONNECTION_REFUSED_EVENT",
+ "XIO_SESSION_CONNECTION_ERROR_EVENT",
+ "XIO_SESSION_ERROR_EVENT"
+};
+
+namespace xio_log
+{
+typedef pair<const char*, int> level_pair;
+static const level_pair LEVELS[] = {
+ make_pair("fatal", 0),
+ make_pair("error", 0),
+ make_pair("warn", 1),
+ make_pair("info", 5),
+ make_pair("debug", 10),
+ make_pair("trace", 20)
+};
+
+// maintain our own global context, we can't rely on g_ceph_context
+// for things like librados
+static CephContext *context;
+
+int get_level()
+{
+ int level = 0;
+ for (size_t i = 0; i < sizeof(LEVELS); i++) {
+ if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
+ break;
+ level++;
+ }
+ return level;
+}
+
+void log_dout(const char *file, unsigned line,
+ const char *function, unsigned level,
+ const char *fmt, ...)
+{
+ char buffer[2048];
+ va_list args;
+ va_start(args, fmt);
+ int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
+ va_end(args);
+
+ if (n > 0) {
+ const char *short_file = strrchr(file, '/');
+ short_file = (short_file == NULL) ? file : short_file + 1;
+
+ const level_pair &lvl = LEVELS[level];
+ ldout(context, lvl.second) << '[' << lvl.first << "] "
+ << short_file << ':' << line << ' '
+ << function << " - " << buffer << dendl;
+ }
+}
+}
+
+static int on_session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context)
+{
+ XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+ CephContext *cct = msgr->cct;
+
+ ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
+ << ". reason: " << xio_strerror(event_data->reason) << dendl;
+
+ return msgr->session_event(session, event_data, cb_user_context);
+}
+
+static int on_new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+{
+ XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+ CephContext *cct = msgr->cct;
+
+ ldout(cct,4) << "new session " << session
+ << " user_context " << cb_user_context << dendl;
+
+ return (msgr->new_session(session, req, cb_user_context));
+}
+
+static int on_msg(struct xio_session *session,
+ struct xio_msg *req,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(cb_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
+
+ static uint32_t nreqs;
+ if (unlikely(XioPool::trace_mempool)) {
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
+ return xcon->on_msg_req(session, req, more_in_batch,
+ cb_user_context);
+}
+
+static int on_ow_msg_send_complete(struct xio_session *session,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioConnection *xcon =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "msg delivered session: " << session
+ << " msg: " << msg << " conn_user_context "
+ << conn_user_context << dendl;
+
+ return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
+}
+
+static int on_msg_error(struct xio_session *session,
+ enum xio_status error,
+ enum xio_msg_direction dir,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ /* XIO promises to flush back undelivered messages */
+ XioConnection *xcon =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,4) << "msg error session: " << session
+ << " error: " << xio_strerror(error) << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return xcon->on_msg_error(session, error, msg, conn_user_context);
+}
+
+static int on_cancel(struct xio_session *session,
+ struct xio_msg *msg,
+ enum xio_status result,
+ void *conn_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return 0;
+}
+
+static int on_cancel_request(struct xio_session *session,
+ struct xio_msg *msg,
+ void *conn_user_context)
+{
+ XioConnection* xcon __attribute__((unused)) =
+ static_cast<XioConnection*>(conn_user_context);
+ CephContext *cct = xcon->get_messenger()->cct;
+
+ ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
+ << " conn_user_context " << conn_user_context << dendl;
+
+ return 0;
+}
+
+/* free functions */
+static string xio_uri_from_entity(const entity_addr_t& addr, bool want_port)
+{
+ const char *host = NULL;
+ char addr_buf[129];
+
+ switch(addr.addr.ss_family) {
+ case AF_INET:
+ host = inet_ntop(AF_INET, &addr.addr4.sin_addr, addr_buf,
+ INET_ADDRSTRLEN);
+ break;
+ case AF_INET6:
+ host = inet_ntop(AF_INET6, &addr.addr6.sin6_addr, addr_buf,
+ INET6_ADDRSTRLEN);
+ break;
+ default:
+ abort();
+ break;
+ };
+
+ /* The following can only succeed if the host is rdma-capable */
+ string xio_uri = "rdma://";
+ xio_uri += host;
+ if (want_port) {
+ xio_uri += ":";
+ xio_uri += boost::lexical_cast<std::string>(addr.get_port());
+ }
+
+ return xio_uri;
+} /* xio_uri_from_entity */
+
+/* XioMessenger */
+XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t nonce,
+ DispatchStrategy *ds)
+ : SimplePolicyMessenger(cct, name, mname, nonce),
+ nsessions(0),
+ shutdown_called(false),
+ portals(this, cct->_conf->xio_portal_threads),
+ dispatch_strategy(ds),
+ loop_con(this),
+ special_handling(0),
+ sh_mtx("XioMessenger session mutex"),
+ sh_cond()
+{
+
+ if (cct->_conf->xio_trace_xcon)
+ magic |= MSG_MAGIC_TRACE_XCON;
+
+ XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
+ XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
+
+ /* package init */
+ if (! initialized.read()) {
+
+ mtx.Lock();
+ if (! initialized.read()) {
+
+ xio_init();
+
+ // claim a reference to the first context we see
+ xio_log::context = cct->get();
+
+ int xopt;
+ xopt = xio_log::get_level();
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
+ (const void*)xio_log::log_dout, sizeof(xio_log_fn));
+
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
+ &xopt, sizeof(xopt));
+
+ xopt = XIO_MSGR_IOVLEN;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
+ &xopt, sizeof(xopt));
+
+ xopt = cct->_conf->xio_queue_depth; // defaults to 512
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
+ &xopt, sizeof(xopt));
+
+ /* and set threshold for buffer callouts */
+ xopt = 16384;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_DATA,
+ &xopt, sizeof(xopt));
+ xopt = 216;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_HEADER,
+ &xopt, sizeof(xopt));
+
+ /* and unregisterd one */
+#define XMSG_MEMPOOL_QUANTUM 4096
+
+ xio_msgr_noreg_mpool =
+ xio_mempool_create(-1 /* nodeid */,
+ XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
+
+ (void) xio_mempool_add_allocator(xio_msgr_noreg_mpool, 64,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_64,
+ XMSG_MEMPOOL_QUANTUM);
+ (void) xio_mempool_add_allocator(xio_msgr_noreg_mpool, 256,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_256,
+ XMSG_MEMPOOL_QUANTUM);
+ (void) xio_mempool_add_allocator(xio_msgr_noreg_mpool, 1024,
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_1k,
+ XMSG_MEMPOOL_QUANTUM);
+ (void) xio_mempool_add_allocator(xio_msgr_noreg_mpool, getpagesize(),
+ cct->_conf->xio_mp_min,
+ cct->_conf->xio_mp_max_page,
+ XMSG_MEMPOOL_QUANTUM);
+
+ /* initialize ops singleton */
+ xio_msgr_ops.on_session_event = on_session_event;
+ xio_msgr_ops.on_new_session = on_new_session;
+ xio_msgr_ops.on_session_established = NULL;
+ xio_msgr_ops.on_msg = on_msg;
+ xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
+ xio_msgr_ops.on_msg_error = on_msg_error;
+ xio_msgr_ops.on_cancel = on_cancel;
+ xio_msgr_ops.on_cancel_request = on_cancel_request;
+
+ /* mark initialized */
+ initialized.set(1);
+ }
+ mtx.Unlock();
+ }
+
+ dispatch_strategy->set_messenger(this);
+
+ /* update class instance count */
+ nInstances.inc();
+
+} /* ctor */
+
+int XioMessenger::pool_hint(uint32_t dsize) {
+ if (dsize > 1024*1024)
+ return 0;
+
+ /* if dsize is already present, returns -EEXIST */
+ return xio_mempool_add_allocator(xio_msgr_noreg_mpool, dsize, 0,
+ cct->_conf->xio_mp_max_hint,
+ XMSG_MEMPOOL_QUANTUM);
+}
+
+int XioMessenger::new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+{
+ if (shutdown_called.read()) {
+ return xio_reject(
+ session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
+ }
+ int code = portals.accept(session, req, cb_user_context);
+ if (! code)
+ nsessions.inc();
+ return code;
+} /* new_session */
+
+int XioMessenger::session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context)
+{
+ XioConnection *xcon;
+
+ switch (event_data->event) {
+ case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+
+ ldout(cct,4) << "connection established " << event_data->conn
+ << " session " << session << " xcon " << xcon << dendl;
+
+ /* notify hook */
+ this->ms_deliver_handle_connect(xcon);
+ break;
+
+ case XIO_SESSION_NEW_CONNECTION_EVENT:
+ {
+ struct xio_connection *conn = event_data->conn;
+ struct xio_connection_attr xcona;
+ entity_inst_t s_inst;
+
+ (void) xio_query_connection(conn, &xcona,
+ XIO_CONNECTION_ATTR_CTX|
+ XIO_CONNECTION_ATTR_PEER_ADDR);
+ /* XXX assumes RDMA */
+ (void) entity_addr_from_sockaddr(&s_inst.addr,
+ (struct sockaddr *) &xcona.peer_addr);
+
+ xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
+ xcon->session = session;
+
+ struct xio_context_attr xctxa;
+ (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
+
+ xcon->conn = conn;
+ xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
+ assert(xcon->portal);
+
+ xcona.user_context = xcon;
+ (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
+
+ xcon->connected.set(true);
+
+ /* sentinel ref */
+ xcon->get(); /* xcon->nref == 1 */
+ conns_sp.lock();
+ conns_list.push_back(*xcon);
+ /* XXX we can't put xcon in conns_entity_map becase we don't yet know
+ * it's peer address */
+ conns_sp.unlock();
+
+ ldout(cct,4) << "new connection session " << session
+ << " xcon " << xcon << dendl;
+ }
+ break;
+ case XIO_SESSION_CONNECTION_ERROR_EVENT:
+ ldout(cct,4) << xio_session_event_types[event_data->event]
+ << " user_context " << event_data->conn_user_context << dendl;
+ /* informational (Eyal)*/
+ break;
+ case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
+ case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
+ case XIO_SESSION_CONNECTION_REFUSED_EVENT:
+ ldout(cct,2) << xio_session_event_types[event_data->event]
+ << " user_context " << event_data->conn_user_context << dendl;
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+ if (likely(!!xcon)) {
+ Spinlock::Locker lckr(conns_sp);
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ XioConnection *xcon2 = &(*conn_iter);
+ if (xcon == xcon2) {
+ conns_entity_map.erase(conn_iter);
+ }
+ }
+ /* now find xcon on conns_list, erase, and release sentinel ref */
+ XioConnection::ConnList::iterator citer =
+ XioConnection::ConnList::s_iterator_to(*xcon);
+ /* XXX check if citer on conn_list? */
+ conns_list.erase(citer);
+ xcon->on_disconnect_event();
+ }
+ break;
+ case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
+ ldout(cct,2) << xio_session_event_types[event_data->event]
+ << " user_context " << event_data->conn_user_context << dendl;
+ xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+ xcon->on_teardown_event();
+ break;
+ case XIO_SESSION_TEARDOWN_EVENT:
+ ldout(cct,2) << "xio_session_teardown " << session << dendl;
+ if (unlikely(XioPool::trace_mempool)) {
+ xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
+ }
+ xio_session_destroy(session);
+ if (nsessions.dec() == 0) {
+ Mutex::Locker lck(sh_mtx);
+ if (nsessions.read() == 0)
+ sh_cond.Signal();
+ }
+ break;
+ default:
+ break;
+ };
+
+ return 0;
+}
+
+enum bl_type
+{
+ BUFFER_PAYLOAD,
+ BUFFER_MIDDLE,
+ BUFFER_DATA
+};
+
+#define MAX_XIO_BUF_SIZE 1044480
+
+static inline int
+xio_count_buffers(buffer::list& bl, int& req_size, int& msg_off, int& req_off)
+{
+
+ const std::list<buffer::ptr>& buffers = bl.buffers();
+ list<bufferptr>::const_iterator pb;
+ size_t size, off, count;
+ int result;
+ int first = 1;
+
+ off = size = 0;
+ result = 0;
+ for (;;) {
+ if (off >= size) {
+ if (first) pb = buffers.begin(); else ++pb;
+ if (pb == buffers.end()) {
+ break;
+ }
+ off = 0;
+ size = pb->length();
+ first = 0;
+ }
+ count = size - off;
+ if (!count) continue;
+ if (req_size + count > MAX_XIO_BUF_SIZE) {
+ count = MAX_XIO_BUF_SIZE - req_size;
+ }
+
+ ++result;
+
+ /* advance iov and perhaps request */
+
+ off += count;
+ req_size += count;
+ ++msg_off;
+ if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+ ++req_off;
+ msg_off = 0;
+ req_size = 0;
+ }
+ }
+
+ return result;
+}
+
+static inline void
+xio_place_buffers(buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
+ struct xio_iovec_ex*& msg_iov, int& req_size,
+ int ex_cnt, int& msg_off, int& req_off, bl_type type)
+{
+
+ const std::list<buffer::ptr>& buffers = bl.buffers();
+ list<bufferptr>::const_iterator pb;
+ struct xio_iovec_ex* iov;
+ size_t size, off, count;
+ const char *data = NULL;
+ int first = 1;
+
+ off = size = 0;
+ for (;;) {
+ if (off >= size) {
+ if (first) pb = buffers.begin(); else ++pb;
+ if (pb == buffers.end()) {
+ break;
+ }
+ off = 0;
+ size = pb->length();
+ data = pb->c_str(); // is c_str() efficient?
+ first = 0;
+ }
+ count = size - off;
+ if (!count) continue;
+ if (req_size + count > MAX_XIO_BUF_SIZE) {
+ count = MAX_XIO_BUF_SIZE - req_size;
+ }
+
+ /* assign buffer */
+ iov = &msg_iov[msg_off];
+ iov->iov_base = (void *) (&data[off]);
+ iov->iov_len = count;
+
+ switch (type) {
+ case BUFFER_DATA:
+ //break;
+ default:
+ {
+ struct xio_mempool_obj *mp = get_xio_mp(*pb);
+ iov->mr = (mp) ? mp->mr : NULL;
+ }
+ break;
+ }
+
+ /* advance iov(s) */
+
+ off += count;
+ req_size += count;
+ ++msg_off;
+
+ /* next request if necessary */
+
+ if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+ /* finish this request */
+ req->out.pdata_iov.nents = msg_off;
+ /* advance to next, and write in it if it's not the last one. */
+ if (++req_off >= ex_cnt) {
+ req = 0; /* poison. trap if we try to use it. */
+ msg_iov = NULL;
+ } else {
+ req = &xmsg->req_arr[req_off].msg;
+ msg_iov = req->out.pdata_iov.sglist;
+ }
+ msg_off = 0;
+ req_size = 0;
+ }
+ }
+}
+
+int XioMessenger::bind(const entity_addr_t& addr)
+{
+ const entity_addr_t *a = &addr;
+ if (a->is_blank_ip()) {
+ struct entity_addr_t _addr = *a;
+ a = &_addr;
+ std::vector <std::string> my_sections;
+ g_conf->get_my_sections(my_sections);
+ std::string rdma_local_str;
+ if (g_conf->get_val_from_conf_file(my_sections, "rdma local",
+ rdma_local_str, true) == 0) {
+ struct entity_addr_t local_rdma_addr;
+ local_rdma_addr = *a;
+ const char *ep;
+ if (!local_rdma_addr.parse(rdma_local_str.c_str(), &ep)) {
+ derr << "ERROR: Cannot parse rdma local: " << rdma_local_str << dendl;
+ return -EINVAL;
+ }
+ if (*ep) {
+ derr << "WARNING: 'rdma local trailing garbage ignored: '" << ep << dendl;
+ }
+ int p = _addr.get_port();
+ _addr.set_sockaddr(reinterpret_cast<struct sockaddr *>(
+ &local_rdma_addr.ss_addr()));
+ _addr.set_port(p);
+ } else {
+ derr << "WARNING: need 'rdma local' config for remote use!" <<dendl;
+ }
+ }
+
+ entity_addr_t shift_addr = *a;
+
+ string base_uri = xio_uri_from_entity(shift_addr, false /* want_port */);
+ ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
+ << base_uri << ':' << shift_addr.get_port() << dendl;
+
+ uint16_t port0;
+ int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
+ if (r == 0) {
+ shift_addr.set_port(port0);
+ set_myaddr(shift_addr);
+ }
+ return r;
+} /* bind */
+
+int XioMessenger::rebind(const set<int>& avoid_ports)
+{
+ ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
+ return 0;
+} /* rebind */
+
+int XioMessenger::start()
+{
+ portals.start();
+ dispatch_strategy->start();
+ started = true;
+ return 0;
+}
+
+void XioMessenger::wait()
+{
+ portals.join();
+ dispatch_strategy->wait();
+} /* wait */
+
+int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
+{
+ ConnectionRef conn = get_connection(dest);
+ if (conn)
+ return _send_message(m, &(*conn));
+ else
+ return EINVAL;
+} /* send_message(Message *, const entity_inst_t&) */
+
+static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
+ int ex_cnt)
+{
+ struct xio_mempool_obj mp_mem;
+ int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
+ if (!!e)
+ return NULL;
+ XioMsg *xmsg = (XioMsg*) mp_mem.addr;
+ assert(!!xmsg);
+ new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt);
+ return xmsg;
+}
+
+int XioMessenger::_send_message(Message *m, Connection *con)
+{
+
+ static uint32_t nreqs;
+ if (unlikely(XioPool::trace_mempool)) {
+ if (unlikely((++nreqs % 65536) == 0)) {
+ xp_stats.dump(__func__, nreqs);
+ }
+ }
+
+ if (con == &loop_con) {
+ m->set_connection(con);
+ m->set_src(get_myinst().name);
+ XioLoopbackConnection *xlcon = static_cast<XioLoopbackConnection*>(con);
+ m->set_seq(xlcon->next_seq());
+ ds_dispatch(m);
+ return 0;
+ }
+
+ XioConnection *xcon = static_cast<XioConnection*>(con);
+ if (! xcon->is_connected())
+ return ENOTCONN;
+
+ int code = 0;
+
+ m->set_seq(xcon->state.next_out_seq());
+ m->set_magic(magic); // trace flags and special handling
+
+ m->encode(xcon->get_features(), this->crcflags);
+
+ buffer::list &payload = m->get_payload();
+ buffer::list &middle = m->get_middle();
+ buffer::list &data = m->get_data();
+
+ int msg_off = 0;
+ int req_off = 0;
+ int req_size = 0;
+ int nbuffers =
+ xio_count_buffers(payload, req_size, msg_off, req_off) +
+ xio_count_buffers(middle, req_size, msg_off, req_off) +
+ xio_count_buffers(data, req_size, msg_off, req_off);
+
+ int ex_cnt = req_off;
+ if (msg_off == 0 && ex_cnt > 0) {
+ // no buffers for last msg
+ ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
+ ex_cnt--;
+ }
+
+ /* get an XioMsg frame */
+ XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
+ if (! xmsg) {
+ /* could happen if Accelio has been shutdown */
+ return ENOMEM;
+ }
+
+ ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
+ << " req_0 " << &xmsg->req_0.msg << " msg type " << m->get_type()
+ << " features: " << xcon->get_features()
+ << " conn " << xcon->conn << " sess " << xcon->session << dendl;
+
+ if (magic & (MSG_MAGIC_XIO)) {
+
+ /* XXXX verify */
+ switch (m->get_type()) {
+ case 43:
+ // case 15:
+ ldout(cct,4) << __func__ << "stop 43 " << m->get_type() << " " << *m << dendl;
+ buffer::list &payload = m->get_payload();
+ ldout(cct,4) << __func__ << "payload dump:" << dendl;
+ payload.hexdump(cout);
+ }
+ }
+
+ struct xio_msg *req = &xmsg->req_0.msg;
+ struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
+
+ if (magic & (MSG_MAGIC_XIO)) {
+ ldout(cct,4) << "payload: " << payload.buffers().size() <<
+ " middle: " << middle.buffers().size() <<
+ " data: " << data.buffers().size() <<
+ dendl;
+ }
+
+ if (unlikely(ex_cnt > 0)) {
+ ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
+ ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
+ }
+
+ /* do the invariant part */
+ msg_off = 0;
+ req_off = -1; /* most often, not used */
+ req_size = 0;
+
+ xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_PAYLOAD);
+
+ xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_MIDDLE);
+
+ xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+ req_off, BUFFER_DATA);
+ ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
+ << ", msg_cnt " << xmsg->hdr.msg_cnt << dendl;
+
+ /* finalize request */
+ if (msg_off)
+ req->out.pdata_iov.nents = msg_off;
+
+ /* fixup first msg */
+ req = &xmsg->req_0.msg;
+
+ const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
+ assert(header.size() == 1); /* XXX */
+ list<bufferptr>::const_iterator pb = header.begin();
+ req->out.header.iov_base = (char*) pb->c_str();
+ req->out.header.iov_len = pb->length();
+
+ /* deliver via xio, preserve ordering */
+ if (xmsg->hdr.msg_cnt > 1) {
+ struct xio_msg *head = &xmsg->req_0.msg;
+ struct xio_msg *tail = head;
+ for (req_off = 0; ((unsigned) req_off) < xmsg->hdr.msg_cnt-1; ++req_off) {
+ req = &xmsg->req_arr[req_off].msg;
+assert(!req->in.pdata_iov.nents);
+assert(req->out.pdata_iov.nents || !nbuffers);
+ tail->next = req;
+ tail = req;
+ }
+ tail->next = NULL;
+ }
+ xcon->portal->enqueue_for_send(xcon, xmsg);
+
+ return code;
+} /* send_message(Message *, Connection *) */
+
+int XioMessenger::shutdown()
+{
+ shutdown_called.set(true);
+ conns_sp.lock();
+ XioConnection::ConnList::iterator iter;
+ iter = conns_list.begin();
+ for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
+ (void) iter->disconnect(); // XXX mark down?
+ }
+ conns_sp.unlock();
+ while(nsessions.read() > 0) {
+ Mutex::Locker lck(sh_mtx);
+ if (nsessions.read() > 0)
+ sh_cond.Wait(sh_mtx);
+ }
+ portals.shutdown();
+ dispatch_strategy->shutdown();
+ started = false;
+ return 0;
+} /* shutdown */
+
+ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
+{
+ if (shutdown_called.read())
+ return NULL;
+
+ const entity_inst_t& self_inst = get_myinst();
+ if ((&dest == &self_inst) ||
+ (dest == self_inst)) {
+ return get_loopback_connection();
+ }
+
+ conns_sp.lock();
+ XioConnection::EntitySet::iterator conn_iter =
+ conns_entity_map.find(dest, XioConnection::EntityComp());
+ if (conn_iter != conns_entity_map.end()) {
+ ConnectionRef cref = &(*conn_iter);
+ conns_sp.unlock();
+ return cref;
+ }
+ else {
+ conns_sp.unlock();
+ string xio_uri = xio_uri_from_entity(dest.addr, true /* want_port */);
+
+ ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
+ << xio_uri << dendl;
+
+ /* XXX client session creation parameters */
+ struct xio_session_params params = {
+ .type = XIO_SESSION_CLIENT,
+ .initial_sn = 0,
+ .ses_ops = &xio_msgr_ops,
+ .user_context = this,
+ .private_data = NULL,
+ .private_data_len = 0,
+ .uri = (char *)xio_uri.c_str()
+ };
+
+ XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
+ dest);
+
+ xcon->session = xio_session_create(¶ms);
+ if (! xcon->session) {
+ delete xcon;
+ return NULL;
+ }
+ nsessions.inc();
+
+ /* this should cause callbacks with user context of conn, but
+ * we can always set it explicitly */
+ struct xio_connection_params xcp = {
+ .session = xcon->session,
+ .ctx = this->portals.get_portal0()->ctx,
+ .conn_idx = 0, /* XXX auto_count */
+ .enable_tos = 0,
+ .tos = 0,
+ .pad = 0,
+ .out_addr = NULL,
+ .conn_user_context = xcon
+ };
+ xcon->conn = xio_connect(&xcp);
+ xcon->connected.set(true);
+
+ /* sentinel ref */
+ xcon->get(); /* xcon->nref == 1 */
+ conns_sp.lock();
+ conns_list.push_back(*xcon);
+ conns_entity_map.insert(*xcon);
+ conns_sp.unlock();
+
+ return xcon->get(); /* nref +1 */
+ }
+} /* get_connection */
+
+ConnectionRef XioMessenger::get_loopback_connection()
+{
+ return (loop_con.get());
+} /* get_loopback_connection */
+
+void XioMessenger::try_insert(XioConnection *xcon)
+{
+ Spinlock::Locker lckr(conns_sp);
+ /* already resident in conns_list */
+ conns_entity_map.insert(*xcon);
+}
+
+XioMessenger::~XioMessenger()
+{
+ delete dispatch_strategy;
+ nInstances.dec();
+} /* dtor */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_MESSENGER_H
+#define XIO_MESSENGER_H
+
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+#include "XioConnection.h"
+#include "XioPortal.h"
+#include "QueueStrategy.h"
+#include "include/atomic.h"
+#include "common/Thread.h"
+#include "common/Mutex.h"
+#include "include/Spinlock.h"
+
+class XioMessenger : public SimplePolicyMessenger
+{
+private:
+ static atomic_t nInstances;
+ atomic_t nsessions;
+ atomic_t shutdown_called;
+ Spinlock conns_sp;
+ XioConnection::ConnList conns_list;
+ XioConnection::EntitySet conns_entity_map;
+ XioPortals portals;
+ DispatchStrategy* dispatch_strategy;
+ XioLoopbackConnection loop_con;
+ uint32_t special_handling;
+ Mutex sh_mtx;
+ Cond sh_cond;
+
+public:
+ XioMessenger(CephContext *cct, entity_name_t name,
+ string mname, uint64_t nonce,
+ DispatchStrategy* ds = new QueueStrategy(1));
+
+ virtual ~XioMessenger();
+
+ XioPortal* default_portal() { return portals.get_portal0(); }
+
+ virtual void set_myaddr(const entity_addr_t& a) {
+ Messenger::set_myaddr(a);
+ loop_con.set_peer_addr(a);
+ }
+
+ int _send_message(Message *m, const entity_inst_t &dest);
+ int _send_message(Message *m, Connection *con);
+
+ uint32_t get_magic() { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+ uint32_t get_special_handling() { return special_handling; }
+ void set_special_handling(int n) { special_handling = n; }
+ int pool_hint(uint32_t size);
+ void try_insert(XioConnection *xcon);
+
+ /* xio hooks */
+ int new_session(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context);
+
+ int session_event(struct xio_session *session,
+ struct xio_session_event_data *event_data,
+ void *cb_user_context);
+
+ /* Messenger interface */
+ virtual void set_addr_unknowns(entity_addr_t &addr)
+ { } /* XXX applicable? */
+
+ virtual int get_dispatch_queue_len()
+ { return 0; } /* XXX bogus? */
+
+ virtual double get_dispatch_queue_max_age(utime_t now)
+ { return 0; } /* XXX bogus? */
+
+ virtual void set_cluster_protocol(int p)
+ { }
+
+ virtual int bind(const entity_addr_t& addr);
+
+ virtual int rebind(const set<int>& avoid_ports);
+
+ virtual int start();
+
+ virtual void wait();
+
+ virtual int shutdown();
+
+ virtual int send_message(Message *m, const entity_inst_t &dest) {
+ return _send_message(m, dest);
+ }
+
+ virtual int lazy_send_message(Message *m, const entity_inst_t& dest)
+ { return EINVAL; }
+
+ virtual int lazy_send_message(Message *m, Connection *con)
+ { return EINVAL; }
+
+ virtual ConnectionRef get_connection(const entity_inst_t& dest);
+
+ virtual ConnectionRef get_loopback_connection();
+
+ virtual int send_keepalive(const entity_inst_t& dest)
+ { return EINVAL; }
+
+ virtual int send_keepalive(Connection *con)
+ { return EINVAL; }
+
+ virtual void mark_down(const entity_addr_t& a)
+ { }
+
+ virtual void mark_down(Connection *con)
+ { }
+
+ virtual void mark_down_on_empty(Connection *con)
+ { }
+
+ virtual void mark_disposable(Connection *con)
+ { }
+
+ virtual void mark_down_all()
+ { }
+
+ void ds_dispatch(Message *m)
+ { dispatch_strategy->ds_dispatch(m); }
+
+protected:
+ virtual void ready()
+ { }
+
+public:
+};
+
+#endif /* XIO_MESSENGER_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioMessenger.h"
+#include "XioConnection.h"
+#include "XioMsg.h"
+
+
+int XioCompletionHook::release_msgs()
+{
+ XioRsp *xrsp;
+ int r = msg_seq.size();
+ cl_flag = true;
+
+ /* queue for release */
+ xrsp = (XioRsp *) rsp_pool.alloc(sizeof(XioRsp));
+ new (xrsp) XioRsp(xcon, this);
+
+ /* merge with portal traffic */
+ xcon->portal->enqueue_for_send(xcon, xrsp);
+
+ assert(r);
+ return r;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_MSG_H
+#define XIO_MSG_H
+
+#include <boost/intrusive/list.hpp>
+#include "msg/SimplePolicyMessenger.h"
+extern "C" {
+#include "libxio.h"
+}
+#include "XioConnection.h"
+#include "msg/msg_types.h"
+#include "XioPool.h"
+
+namespace bi = boost::intrusive;
+
+class XioMsgCnt
+{
+public:
+ __le32 msg_cnt;
+ buffer::list bl;
+public:
+ XioMsgCnt(buffer::ptr p)
+ {
+ bl.append(p);
+ buffer::list::iterator bl_iter = bl.begin();
+ ::decode(msg_cnt, bl_iter);
+ }
+};
+
+class XioMsgHdr
+{
+public:
+ __le32 msg_cnt;
+ __le32 peer_type;
+ entity_addr_t addr; /* XXX hack! */
+ ceph_msg_header* hdr;
+ ceph_msg_footer* ftr;
+ buffer::list bl;
+public:
+ XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr)
+ : msg_cnt(0), hdr(&_hdr), ftr(&_ftr)
+ { }
+
+ XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
+ : hdr(&_hdr), ftr(&_ftr)
+ {
+ bl.append(p);
+ buffer::list::iterator bl_iter = bl.begin();
+ decode(bl_iter);
+ }
+
+ const buffer::list& get_bl() { encode(bl); return bl; };
+
+ inline void encode_hdr(buffer::list& bl) const {
+ ::encode(msg_cnt, bl);
+ ::encode(peer_type, bl);
+ ::encode(addr, bl);
+ ::encode(hdr->seq, bl);
+ ::encode(hdr->tid, bl);
+ ::encode(hdr->type, bl);
+ ::encode(hdr->priority, bl);
+ ::encode(hdr->version, bl);
+ ::encode(hdr->front_len, bl);
+ ::encode(hdr->middle_len, bl);
+ ::encode(hdr->data_len, bl);
+ ::encode(hdr->data_off, bl);
+ ::encode(hdr->src.type, bl);
+ ::encode(hdr->src.num, bl);
+ ::encode(hdr->compat_version, bl);
+ ::encode(hdr->crc, bl);
+ }
+
+ inline void encode_ftr(buffer::list& bl) const {
+ ::encode(ftr->front_crc, bl);
+ ::encode(ftr->middle_crc, bl);
+ ::encode(ftr->data_crc, bl);
+ ::encode(ftr->sig, bl);
+ ::encode(ftr->flags, bl);
+ }
+
+ inline void encode(buffer::list& bl) const {
+ encode_hdr(bl);
+ encode_ftr(bl);
+ }
+
+ inline void decode_hdr(buffer::list::iterator& bl) {
+ ::decode(msg_cnt, bl);
+ ::decode(peer_type, bl);
+ ::decode(addr, bl);
+ ::decode(hdr->seq, bl);
+ ::decode(hdr->tid, bl);
+ ::decode(hdr->type, bl);
+ ::decode(hdr->priority, bl);
+ ::decode(hdr->version, bl);
+ ::decode(hdr->front_len, bl);
+ ::decode(hdr->middle_len, bl);
+ ::decode(hdr->data_len, bl);
+ ::decode(hdr->data_off, bl);
+ ::decode(hdr->src.type, bl);
+ ::decode(hdr->src.num, bl);
+ ::decode(hdr->compat_version, bl);
+ ::decode(hdr->crc, bl);
+ }
+
+ inline void decode_ftr(buffer::list::iterator& bl) {
+ ::decode(ftr->front_crc, bl);
+ ::decode(ftr->middle_crc, bl);
+ ::decode(ftr->data_crc, bl);
+ ::decode(ftr->sig, bl);
+ ::decode(ftr->flags, bl);
+ }
+
+ inline void decode(buffer::list::iterator& bl) {
+ decode_hdr(bl);
+ decode_ftr(bl);
+ }
+
+ virtual ~XioMsgHdr()
+ {}
+};
+
+WRITE_CLASS_ENCODER(XioMsgHdr);
+
+struct XioSubmit
+{
+public:
+ enum submit_type
+ {
+ OUTGOING_MSG,
+ INCOMING_MSG_RELEASE
+ };
+ enum submit_type type;
+ bi::list_member_hook<> submit_list;
+ XioConnection *xcon;
+
+ XioSubmit(enum submit_type _type, XioConnection *_xcon) :
+ type(_type), xcon(_xcon)
+ {}
+
+ typedef bi::list< XioSubmit,
+ bi::member_hook< XioSubmit,
+ bi::list_member_hook<>,
+ &XioSubmit::submit_list >
+ > Queue;
+};
+
+extern struct xio_mempool *xio_msgr_noreg_mpool;
+
+#define XIO_MSGR_IOVLEN 16
+
+struct xio_msg_ex
+{
+ struct xio_msg msg;
+ struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN];
+
+ xio_msg_ex(void* user_context) {
+ // go in structure order
+ msg.in.header.iov_len = 0;
+ msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */
+
+ msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR;
+ msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
+ msg.in.pdata_iov.nents = 0;
+ msg.in.pdata_iov.sglist = iovs;
+
+ // minimal zero "out" side
+ msg.out.header.iov_len = 0;
+ msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently,
+ * against spec */
+ // out (some members adjusted later)
+ msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR;
+ msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
+ msg.out.pdata_iov.nents = 0;
+ msg.out.pdata_iov.sglist = iovs;
+
+ // minimal initialize an "out" msg
+ msg.request = NULL;
+ msg.type = XIO_MSG_TYPE_ONE_WAY;
+ // for now, we DO NEED receipts for every msg
+ msg.flags = 0;
+ msg.user_context = user_context;
+ msg.next = NULL;
+ // minimal zero "in" side
+ }
+};
+
+struct XioMsg : public XioSubmit
+{
+public:
+ Message* m;
+ XioMsgHdr hdr;
+ xio_msg_ex req_0;
+ xio_msg_ex* req_arr;
+ struct xio_mempool_obj mp_this;
+ atomic_t nrefs;
+
+public:
+ XioMsg(Message *_m, XioConnection *_xcon, struct xio_mempool_obj& _mp,
+ int _ex_cnt) :
+ XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
+ m(_m), hdr(m->get_header(), m->get_footer()),
+ req_0(this), req_arr(NULL), mp_this(_mp), nrefs(_ex_cnt+1)
+ {
+ const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
+ hdr.peer_type = inst.name.type();
+ hdr.addr = xcon->get_messenger()->get_myaddr();
+ hdr.hdr->src.type = inst.name.type();
+ hdr.hdr->src.num = inst.name.num();
+ hdr.msg_cnt = _ex_cnt+1;
+
+ if (unlikely(_ex_cnt > 0)) {
+ alloc_trailers(_ex_cnt);
+ }
+
+ xpool_inc_msgcnt();
+
+ // submit queue ref
+ xcon->get();
+ }
+
+ XioMsg* get() { nrefs.inc(); return this; };
+
+ void put(int n) {
+ int refs = nrefs.sub(n);
+ if (refs == 0) {
+ struct xio_mempool_obj *mp = &this->mp_this;
+ this->~XioMsg();
+ xpool_free(sizeof(XioMsg), mp);
+ }
+ }
+
+ void put() {
+ put(1);
+ }
+
+ void put_msg_refs() {
+ put(hdr.msg_cnt);
+ }
+
+ void alloc_trailers(int cnt) {
+ req_arr = (xio_msg_ex*) malloc(cnt * sizeof(xio_msg_ex));
+ for (int ix = 0; ix < cnt; ++ix) {
+ xio_msg_ex* xreq = &(req_arr[ix]);
+ new (xreq) xio_msg_ex(this);
+ }
+ }
+
+ Message *get_message() { return m; }
+
+ ~XioMsg()
+ {
+ if (unlikely(!!req_arr)) {
+ for (unsigned int ix = 0; ix < hdr.msg_cnt-1; ++ix) {
+ xio_msg_ex* xreq = &(req_arr[ix]);
+ xreq->~xio_msg_ex();
+ }
+ free(req_arr);
+ }
+
+ /* testing only! server's ready, resubmit request (not reached on
+ * PASSIVE/server side) */
+ if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) {
+ if (likely(xcon->is_connected())) {
+ xcon->send_message(m);
+ } else {
+ /* dispose it */
+ m->put();
+ }
+ } else {
+ /* the normal case: done with message */
+ m->put();
+ }
+
+ xpool_dec_msgcnt();
+
+ /* submit queue ref */
+ xcon->put();
+ }
+};
+
+class XioCompletionHook : public Message::CompletionHook
+{
+private:
+ XioConnection *xcon;
+ XioInSeq msg_seq;
+ XioPool rsp_pool;
+ atomic_t nrefs;
+ bool cl_flag;
+ friend class XioConnection;
+ friend class XioMessenger;
+public:
+ struct xio_mempool_obj mp_this;
+
+ XioCompletionHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
+ struct xio_mempool_obj& _mp) :
+ CompletionHook(_m),
+ xcon(_xcon->get()),
+ msg_seq(_msg_seq),
+ rsp_pool(xio_msgr_noreg_mpool),
+ nrefs(1),
+ cl_flag(false),
+ mp_this(_mp)
+ {
+ ++xcon->n_reqs; // atomicity by portal thread
+ xpool_inc_hookcnt();
+ }
+
+ virtual void finish(int r) {
+ this->put();
+ }
+
+ virtual void complete(int r) {
+ finish(r);
+ }
+
+ int release_msgs();
+
+ XioCompletionHook* get() {
+ nrefs.inc(); return this;
+ }
+
+ void put(int n = 1) {
+ int refs = nrefs.sub(n);
+ if (refs == 0) {
+ /* in Marcus' new system, refs reaches 0 twice: once in
+ * Message lifecycle, and again after xio_release_msg.
+ */
+ if (!cl_flag && release_msgs())
+ return;
+ struct xio_mempool_obj *mp = &this->mp_this;
+ this->~XioCompletionHook();
+ xpool_free(sizeof(XioCompletionHook), mp);
+ }
+ }
+
+ XioInSeq& get_seq() { return msg_seq; }
+
+ XioPool& get_pool() { return rsp_pool; }
+
+ void on_err_finalize(XioConnection *xcon) {
+ /* can't decode message; even with one-way must free
+ * xio_msg structures, and then xiopool
+ */
+ this->finish(-1);
+ }
+
+ ~XioCompletionHook() {
+ --xcon->n_reqs; // atomicity by portal thread
+ xpool_dec_hookcnt();
+ xcon->put();
+ }
+};
+
+struct XioRsp : public XioSubmit
+{
+ XioCompletionHook *xhook;
+public:
+ XioRsp(XioConnection *_xcon, XioCompletionHook *_xhook)
+ : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
+ xhook(_xhook->get()) {
+ // submit queue ref
+ xcon->get();
+ };
+
+ struct xio_msg* dequeue() {
+ return xhook->get_seq().dequeue();
+ }
+
+ XioCompletionHook *get_xhook() { return xhook; }
+
+ void finalize() {
+ xcon->put();
+ xhook->put();
+ }
+};
+
+void print_xio_msg_hdr(CephContext *cct, const char *tag,
+ const XioMsgHdr &hdr, const struct xio_msg *msg);
+void print_ceph_msg(CephContext *cct, const char *tag, Message *m);
+
+#endif /* XIO_MSG_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioPool.h"
+
+XioPoolStats xp_stats;
+
+bool XioPool::trace_mempool = 0;
+bool XioPool::trace_msgcnt = 0;
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef XIO_POOL_H
+#define XIO_POOL_H
+
+extern "C" {
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include "libxio.h"
+}
+#include <iostream>
+#include <vector>
+#include "include/atomic.h"
+#include "common/likely.h"
+
+
+static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size,
+ struct xio_mempool_obj* mp);
+static inline void xpool_free(uint64_t size, struct xio_mempool_obj* mp);
+
+using ceph::atomic_t;
+
+class XioPool
+{
+private:
+ struct xio_mempool *handle;
+
+public:
+ static bool trace_mempool;
+ static bool trace_msgcnt;
+ static const int MB = 8;
+
+ struct xio_piece {
+ struct xio_mempool_obj mp[1];
+ struct xio_piece *next;
+ int s;
+ char payload[MB];
+ } *first;
+
+ XioPool(struct xio_mempool *_handle) :
+ handle(_handle), first(0)
+ {
+ }
+ ~XioPool()
+ {
+ struct xio_piece *p;
+ while ((p = first)) {
+ first = p->next;
+ if (unlikely(trace_mempool)) {
+ memset(p->payload, 0xcf, p->s); // guard bytes
+ }
+ xpool_free(sizeof(struct xio_piece)+(p->s)-MB, p->mp);
+ }
+ }
+ void *alloc(size_t _s)
+ {
+ void *r;
+ struct xio_mempool_obj mp[1];
+ struct xio_piece *x;
+ int e = xpool_alloc(handle, (sizeof(struct xio_piece)-MB) + _s, mp);
+ if (e) {
+ r = 0;
+ } else {
+ x = reinterpret_cast<struct xio_piece *>(mp->addr);
+ *x->mp = *mp;
+ x->next = first;
+ x->s = _s;
+ first = x;
+ r = x->payload;
+ }
+ return r;
+ }
+};
+
+class XioPoolStats {
+private:
+ enum pool_sizes {
+ SLAB_64 = 0,
+ SLAB_256,
+ SLAB_1024,
+ SLAB_PAGE,
+ SLAB_MAX
+ };
+
+ atomic_t ctr_set[5];
+
+ atomic_t msg_cnt; // send msgs
+ atomic_t hook_cnt; // recv msgs
+
+public:
+ XioPoolStats() : msg_cnt(0), hook_cnt(0) {
+ for (int ix = 0; ix < 5; ++ix) {
+ ctr_set[ix].set(0);
+ }
+ }
+
+ void dump(const char* tag, uint64_t serial) {
+ std::cout
+ << tag << " #" << serial << ": "
+ << "pool objs: "
+ << "64: " << ctr_set[SLAB_64].read() << " "
+ << "256: " << ctr_set[SLAB_256].read() << " "
+ << "1024: " << ctr_set[SLAB_1024].read() << " "
+ << "page: " << ctr_set[SLAB_PAGE].read() << " "
+ << "max: " << ctr_set[SLAB_MAX].read() << " "
+ << std::endl;
+ std::cout
+ << tag << " #" << serial << ": "
+ << " msg objs: "
+ << "in: " << hook_cnt.read() << " "
+ << "out: " << msg_cnt.read() << " "
+ << std::endl;
+ }
+
+ void inc(uint64_t size) {
+ if (size <= 64) {
+ (ctr_set[SLAB_64]).inc();
+ return;
+ }
+ if (size <= 256) {
+ (ctr_set[SLAB_256]).inc();
+ return;
+ }
+ if (size <= 1024) {
+ (ctr_set[SLAB_1024]).inc();
+ return;
+ }
+ if (size <= 8192) {
+ (ctr_set[SLAB_PAGE]).inc();
+ return;
+ }
+ (ctr_set[SLAB_MAX]).inc();
+ }
+
+ void dec(uint64_t size) {
+ if (size <= 64) {
+ (ctr_set[SLAB_64]).dec();
+ return;
+ }
+ if (size <= 256) {
+ (ctr_set[SLAB_256]).dec();
+ return;
+ }
+ if (size <= 1024) {
+ (ctr_set[SLAB_1024]).dec();
+ return;
+ }
+ if (size <= 8192) {
+ (ctr_set[SLAB_PAGE]).dec();
+ return;
+ }
+ (ctr_set[SLAB_MAX]).dec();
+ }
+
+ void inc_msgcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ msg_cnt.inc();
+ }
+ }
+
+ void dec_msgcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ msg_cnt.dec();
+ }
+ }
+
+ void inc_hookcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ hook_cnt.inc();
+ }
+ }
+
+ void dec_hookcnt() {
+ if (unlikely(XioPool::trace_msgcnt)) {
+ hook_cnt.dec();
+ }
+ }
+};
+
+extern XioPoolStats xp_stats;
+
+static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size,
+ struct xio_mempool_obj* mp)
+{
+ if (unlikely(XioPool::trace_mempool))
+ xp_stats.inc(size);
+ return xio_mempool_alloc(pool, size, mp);
+}
+
+static inline void xpool_free(uint64_t size, struct xio_mempool_obj* mp)
+{
+ if (unlikely(XioPool::trace_mempool))
+ xp_stats.dec(size);
+ xio_mempool_free(mp);
+}
+
+#define xpool_inc_msgcnt() \
+ do { xp_stats.inc_msgcnt(); } while (0)
+
+#define xpool_dec_msgcnt() \
+ do { xp_stats.dec_msgcnt(); } while (0)
+
+#define xpool_inc_hookcnt() \
+ do { xp_stats.inc_hookcnt(); } while (0)
+
+#define xpool_dec_hookcnt() \
+ do { xp_stats.dec_hookcnt(); } while (0)
+
+#endif /* XIO_POOL_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "XioPortal.h"
+#include <stdio.h>
+
+#define dout_subsys ceph_subsys_xio
+
+int XioPortal::bind(struct xio_session_ops *ops, const string &base_uri,
+ uint16_t port, uint16_t *assigned_port)
+{
+ // format uri
+ char buf[40];
+ xio_uri = base_uri;
+ xio_uri += ":";
+ sprintf(buf, "%d", port);
+ xio_uri += buf;
+
+ uint16_t assigned;
+ server = xio_bind(ctx, ops, xio_uri.c_str(), &assigned, 0, msgr);
+ if (server == NULL)
+ return xio_errno();
+
+ // update uri if port changed
+ if (port != assigned) {
+ xio_uri = base_uri;
+ xio_uri += ":";
+ sprintf(buf, "%d", assigned);
+ xio_uri += buf;
+ }
+
+ portal_id = const_cast<char*>(xio_uri.c_str());
+ if (assigned_port)
+ *assigned_port = assigned;
+ ldout(msgr->cct,20) << "xio_bind: portal " << xio_uri
+ << " returned server " << server << dendl;
+ return 0;
+}
+
+int XioPortals::bind(struct xio_session_ops *ops, const string& base_uri,
+ uint16_t port, uint16_t *port0)
+{
+ /* a server needs at least 1 portal */
+ if (n < 1)
+ return EINVAL;
+ Messenger *msgr = portals[0]->msgr;
+ portals.resize(n);
+
+ uint16_t port_min = msgr->cct->_conf->ms_bind_port_min;
+ const uint16_t port_max = msgr->cct->_conf->ms_bind_port_max;
+
+ /* bind the portals */
+ for (size_t i = 0; i < portals.size(); i++) {
+ if (!portals[i])
+ portals[i] = new XioPortal(msgr);
+
+ uint16_t result_port;
+ if (port != 0) {
+ // bind directly to the given port
+ int r = portals[i]->bind(ops, base_uri, port, &result_port);
+ if (r != 0)
+ return -r;
+ } else {
+ int r = EADDRINUSE;
+ // try ports within the configured range
+ for (; port_min <= port_max; port_min++) {
+ r = portals[i]->bind(ops, base_uri, port_min, &result_port);
+ if (r == 0)
+ break;
+ }
+ if (r != 0) {
+ lderr(msgr->cct) << "portal.bind unable to bind to " << base_uri
+ << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
+ << "-" << port_max << ": " << xio_strerror(r) << dendl;
+ return -r;
+ }
+ }
+
+ ldout(msgr->cct,5) << "xp::bind: portal " << i << " bind OK: "
+ << portals[i]->xio_uri << dendl;
+
+ if (i == 0 && port0 != NULL)
+ *port0 = result_port;
+ port = 0; // use port 0 for all subsequent portals
+ }
+
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *s
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIO_PORTAL_H
+#define XIO_PORTAL_H
+
+extern "C" {
+#include "libxio.h"
+}
+#include "XioInSeq.h"
+#include <boost/lexical_cast.hpp>
+#include "msg/SimplePolicyMessenger.h"
+#include "XioConnection.h"
+#include "XioMsg.h"
+
+#include "include/assert.h"
+#include "common/dout.h"
+
+#ifndef CACHE_LINE_SIZE
+#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
+#endif
+#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
+
+class XioPortal : public Thread
+{
+private:
+
+ struct SubmitQueue
+ {
+ const static int nlanes = 7;
+
+ struct Lane
+ {
+ uint32_t size;
+ XioMsg::Queue q;
+ pthread_spinlock_t sp;
+ CACHE_PAD(0);
+ };
+
+ Lane qlane[nlanes];
+
+ SubmitQueue()
+ {
+ int ix;
+ Lane* lane;
+
+ for (ix = 0; ix < nlanes; ++ix) {
+ lane = &qlane[ix];
+ pthread_spin_init(&lane->sp, PTHREAD_PROCESS_PRIVATE);
+ lane->size = 0;
+ }
+ }
+
+ inline Lane* get_lane(XioConnection *xcon)
+ {
+ return &qlane[((uint64_t) xcon) % nlanes];
+ }
+
+ void enq(XioConnection *xcon, XioSubmit* xs)
+ {
+ Lane* lane = get_lane(xcon);
+ pthread_spin_lock(&lane->sp);
+ lane->q.push_back(*xs);
+ ++(lane->size);
+ pthread_spin_unlock(&lane->sp);
+ }
+
+ void deq(XioSubmit::Queue &send_q)
+ {
+ int ix;
+ Lane* lane;
+
+ for (ix = 0; ix < nlanes; ++ix) {
+ lane = &qlane[ix];
+ pthread_spin_lock(&lane->sp);
+ if (lane->size > 0) {
+ XioSubmit::Queue::const_iterator i1 = send_q.end();
+ send_q.splice(i1, lane->q);
+ lane->size = 0;
+ }
+ pthread_spin_unlock(&lane->sp);
+ }
+ }
+
+ };
+
+ Messenger *msgr;
+ struct xio_context *ctx;
+ struct xio_server *server;
+ SubmitQueue submit_q;
+ pthread_spinlock_t sp;
+ pthread_mutex_t mtx;
+ void *ev_loop;
+ string xio_uri;
+ char *portal_id;
+ bool _shutdown;
+ bool drained;
+ uint32_t magic;
+ uint32_t special_handling;
+
+ friend class XioPortals;
+ friend class XioMessenger;
+
+public:
+ XioPortal(Messenger *_msgr) :
+ msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
+ portal_id(NULL), _shutdown(false), drained(false),
+ magic(0),
+ special_handling(0)
+ {
+ pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
+ pthread_mutex_init(&mtx, NULL);
+
+ /* a portal is an xio_context and event loop */
+ ctx = xio_context_create(NULL, 0 /* poll timeout */, -1 /* cpu hint */);
+
+ /* associate this XioPortal object with the xio_context handle */
+ struct xio_context_attr xca;
+ xca.user_context = this;
+ xio_modify_context(ctx, &xca, XIO_CONTEXT_ATTR_USER_CTX);
+
+ if (magic & (MSG_MAGIC_XIO)) {
+ printf("XioPortal %p created ev_loop %p ctx %p\n",
+ this, ev_loop, ctx);
+ }
+ }
+
+ int bind(struct xio_session_ops *ops, const string &base_uri,
+ uint16_t port, uint16_t *assigned_port);
+
+ inline void release_xio_rsp(XioRsp* xrsp) {
+ struct xio_msg *msg = xrsp->dequeue();
+ struct xio_msg *next_msg = NULL;
+ while (msg) {
+ next_msg = static_cast<struct xio_msg *>(msg->user_context);
+ int code = xio_release_msg(msg);
+ if (unlikely(code)) {
+ /* very unlikely, so log it */
+ xrsp->xcon->msg_release_fail(msg, code);
+ }
+ msg = next_msg;
+ }
+ xrsp->finalize(); /* unconditional finalize */
+ }
+
+ void enqueue_for_send(XioConnection *xcon, XioSubmit *xs)
+ {
+ if (! _shutdown) {
+ submit_q.enq(xcon, xs);
+ xio_context_stop_loop(ctx);
+ return;
+ }
+
+ /* dispose xs */
+ switch(xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ {
+ XioMsg* xmsg = static_cast<XioMsg*>(xs);
+ xs->xcon->msg_send_fail(xmsg, -EINVAL);
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ release_xio_rsp(static_cast<XioRsp*>(xs));
+ break;
+ };
+ }
+
+ void *entry()
+ {
+ int size, code = 0;
+ uint32_t xio_qdepth;
+ XioSubmit::Queue send_q;
+ XioSubmit::Queue::iterator q_iter;
+ struct xio_msg *msg = NULL;
+ XioConnection *xcon;
+ XioSubmit *xs;
+ XioMsg *xmsg;
+
+ do {
+ submit_q.deq(send_q);
+ size = send_q.size();
+
+ /* shutdown() barrier */
+ pthread_spin_lock(&sp);
+
+ if (_shutdown) {
+ drained = true;
+ }
+
+ if (size > 0) {
+ q_iter = send_q.begin();
+ while (q_iter != send_q.end()) {
+ xs = &(*q_iter);
+ xcon = xs->xcon;
+ xmsg = static_cast<XioMsg*>(xs);
+
+ /* guard Accelio send queue */
+ xio_qdepth = xcon->xio_queue_depth();
+ if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth)) {
+ ++q_iter;
+ continue;
+ }
+
+ q_iter = send_q.erase(q_iter);
+
+ switch (xs->type) {
+ case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ if (unlikely(!xs->xcon->conn))
+ code = ENOTCONN;
+ else {
+ msg = &xmsg->req_0.msg;
+ code = xio_send_msg(xcon->conn, msg);
+ /* header trace moved here to capture xio serial# */
+ if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
+ print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
+ print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
+ }
+ }
+ if (unlikely(code)) {
+ xs->xcon->msg_send_fail(xmsg, code);
+ } else {
+ xs->xcon->send.set(msg->timestamp); // need atomic?
+ xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
+ }
+ break;
+ default:
+ /* INCOMING_MSG_RELEASE */
+ release_xio_rsp(static_cast<XioRsp*>(xs));
+ break;
+ }
+ }
+ }
+
+ pthread_spin_unlock(&sp);
+ xio_context_run_loop(ctx, 300);
+
+ } while ((!_shutdown) || (!drained));
+
+ /* shutting down */
+ if (server) {
+ xio_unbind(server);
+ }
+ xio_context_destroy(ctx);
+ return NULL;
+ }
+
+ void shutdown()
+ {
+ pthread_spin_lock(&sp);
+ xio_context_stop_loop(ctx);
+ _shutdown = true;
+ pthread_spin_unlock(&sp);
+ }
+};
+
+class XioPortals
+{
+private:
+ vector<XioPortal*> portals;
+ char **p_vec;
+ int n;
+
+public:
+ XioPortals(Messenger *msgr, int _n) : p_vec(NULL), n(_n)
+ {
+ /* portal0 */
+ portals.push_back(new XioPortal(msgr));
+
+ /* additional portals allocated on bind() */
+ }
+
+ vector<XioPortal*>& get() { return portals; }
+
+ const char **get_vec()
+ {
+ return (const char **) p_vec;
+ }
+
+ int get_portals_len()
+ {
+ return n;
+ }
+
+ XioPortal* get_portal0()
+ {
+ return portals[0];
+ }
+
+ int bind(struct xio_session_ops *ops, const string& base_uri,
+ uint16_t port, uint16_t *port0);
+
+ int accept(struct xio_session *session,
+ struct xio_new_session_req *req,
+ void *cb_user_context)
+ {
+ const char **portals_vec = get_vec();
+ int portals_len = get_portals_len()-1;
+
+ return xio_accept(session,
+ portals_vec,
+ portals_len,
+ NULL, 0);
+ }
+
+ void start()
+ {
+ XioPortal *portal;
+ int p_ix, nportals = portals.size();
+
+ /* portal_0 is the new-session handler, portal_1+ terminate
+ * active sessions */
+
+ p_vec = new char*[(nportals-1)];
+ for (p_ix = 1; p_ix < nportals; ++p_ix) {
+ portal = portals[p_ix];
+ /* shift left */
+ p_vec[(p_ix-1)] = (char*) /* portal->xio_uri.c_str() */
+ portal->portal_id;
+ }
+
+ for (p_ix = 0; p_ix < nportals; ++p_ix) {
+ portal = portals[p_ix];
+ portal->create();
+ }
+ }
+
+ void shutdown()
+ {
+ XioPortal *portal;
+ int nportals = portals.size();
+ for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+ portal = portals[p_ix];
+ portal->shutdown();
+ }
+ }
+
+ void join()
+ {
+ XioPortal *portal;
+ int nportals = portals.size();
+ for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+ portal = portals[p_ix];
+ portal->join();
+ }
+ }
+
+ ~XioPortals()
+ {
+ int nportals = portals.size();
+ for (int ix = 0; ix < nportals; ++ix) {
+ delete(portals[ix]);
+ }
+ portals.clear();
+ if (p_vec) {
+ delete[] p_vec;
+ }
+ }
+};
+
+#endif /* XIO_PORTAL_H */
// cons/des
OSD::OSD(CephContext *cct_, ObjectStore *store_,
- int id, Messenger *internal_messenger, Messenger *external_messenger,
+ int id,
+ Messenger *internal_messenger,
+ Messenger *external_messenger,
Messenger *hb_clientm,
Messenger *hb_front_serverm,
Messenger *hb_back_serverm,
objecter_messenger->shutdown();
hb_front_server_messenger->shutdown();
hb_back_server_messenger->shutdown();
+
peering_wq.clear();
+
return r;
}
${TCMALLOC_LIBS}
${CMAKE_DL_LIBS}
)
+
+add_executable(simple_server
+ messenger/simple_server.cc
+ messenger/simple_dispatcher.cc
+ $<TARGET_OBJECTS:heap_profiler_objs>
+ )
+target_link_libraries(simple_server
+ os global common boost_regex
+ ${EXTRALIBS}
+ ${TCMALLOC_LIBS}
+ ${CMAKE_DL_LIBS}
+ )
+
+add_executable(simple_client
+ messenger/simple_client.cc
+ messenger/simple_dispatcher.cc
+ $<TARGET_OBJECTS:heap_profiler_objs>
+ )
+target_link_libraries(simple_client
+ os global common boost_regex
+ ${EXTRALIBS}
+ ${TCMALLOC_LIBS}
+ ${CMAKE_DL_LIBS}
+ )
+
+if(HAVE_XIO)
+ add_executable(xio_server
+ messenger/xio_server.cc
+ messenger/xio_dispatcher.cc
+ $<TARGET_OBJECTS:heap_profiler_objs>
+ )
+ target_link_libraries(xio_server
+ os global common boost_regex
+ ${Xio_LIBRARY} ibverbs rdmacm pthread rt
+ ${EXTRALIBS}
+ ${TCMALLOC_LIBS}
+ ${CMAKE_DL_LIBS}
+ )
+
+ add_executable(xio_client
+ messenger/xio_client.cc
+ messenger/xio_dispatcher.cc
+ $<TARGET_OBJECTS:heap_profiler_objs>
+ )
+ target_link_libraries(xio_client
+ os global common boost_regex
+ ${Xio_LIBRARY} ibverbs rdmacm pthread rt
+ ${EXTRALIBS}
+ ${TCMALLOC_LIBS}
+ ${CMAKE_DL_LIBS}
+ )
+endif(HAVE_XIO)
include test/erasure-code/Makefile.am
+include test/messenger/Makefile.am
## Unknown/other tests
--- /dev/null
+
+simple_server_SOURCES = \
+ test/messenger/simple_server.cc \
+ test/messenger/simple_dispatcher.cc
+
+simple_server_CFLAGS = ${AM_CFLAGS}
+simple_server_CXXFLAGS = ${AM_CXXFLAGS}
+
+simple_server_LDADD = \
+ $(LIBOS) $(CEPH_GLOBAL) $(LIBCOMMON) $(EXTRALIBS)
+
+if LINUX
+ simple_server_LDADD += -ldl
+endif
+
+simple_client_SOURCES = \
+ test/messenger/simple_client.cc \
+ test/messenger/simple_dispatcher.cc
+
+simple_client_CFLAGS = ${AM_CFLAGS}
+simple_client_CXXFLAGS = ${AM_CXXFLAGS}
+
+simple_client_LDADD = \
+ $(LIBOS) $(CEPH_GLOBAL) $(LIBCOMMON) $(EXTRALIBS)
+
+if LINUX
+ simple_client_LDADD += -ldl
+endif
+
+xio_server_SOURCES = \
+ test/messenger/xio_server.cc \
+ test/messenger/xio_dispatcher.cc
+
+xio_server_CFLAGS = ${AM_CFLAGS}
+xio_server_CXXFLAGS = ${AM_CXXFLAGS}
+
+xio_server_LDADD = \
+ $(LIBOS) $(CEPH_GLOBAL) $(LIBCOMMON) $(EXTRALIBS)
+
+if LINUX
+ xio_server_LDADD += -ldl
+endif
+
+xio_client_SOURCES = \
+ test/messenger/xio_client.cc \
+ test/messenger/xio_dispatcher.cc
+
+xio_client_CFLAGS = ${AM_CFLAGS}
+xio_client_CXXFLAGS = ${AM_CXXFLAGS}
+
+xio_client_LDADD = \
+ $(LIBOS) $(CEPH_GLOBAL) $(LIBCOMMON) $(EXTRALIBS)
+
+if LINUX
+ xio_client_LDADD += -ldl
+endif
+
+noinst_PROGRAMS += \
+ simple_server \
+ simple_client \
+ xio_server \
+ xio_client
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef MESSAGE_HELPER_H
+#define MESSAGE_HELPER_H
+
+#include "msg/msg_types.h"
+#include "messages/MDataPing.h"
+#if defined(HAVE_XIO)
+#include "msg/xio/XioMessenger.h"
+#endif
+
+static inline Message* new_ping_monstyle(const char *tag, int mult)
+{
+ Message *m = new MPing();
+ Formatter *f = new JSONFormatter(true);
+
+ string str = "one giant step for ";
+
+ f->open_object_section(tag);
+ for (int ix = 0; ix < mult; ++ix) {
+ f->dump_string(tag, str);
+ }
+ f->close_section();
+
+ bufferlist bl;
+ stringstream ss;
+
+ f->flush(ss);
+ ::encode(ss.str(), bl);
+ m->set_payload(bl);
+
+ return m;
+}
+
+#if defined(HAVE_XIO)
+extern struct xio_mempool *xio_msgr_mpool;
+
+void xio_hook_func(struct xio_mempool_obj *mp)
+{
+ xio_mempool_free(mp);
+}
+
+static inline Message* new_ping_with_data(const char *tag, uint32_t size)
+{
+ static uint32_t counter;
+
+ MDataPing *m = new MDataPing();
+ m->counter = counter++;
+ m->tag = tag;
+
+ bufferlist bl;
+ void *p;
+
+ struct xio_mempool_obj *mp = m->get_mp();
+ int e = xio_mempool_alloc(xio_msgr_mpool, size, mp);
+ assert(e == 0);
+ p = mp->addr;
+ m->set_rdma_hook(xio_hook_func);
+
+ strcpy((char*) p, tag);
+ uint32_t* t = (uint32_t* ) (((char*) p) + size - 32);
+ *t = counter;
+
+ bl.append(buffer::create_static(size, (char*) p));
+ m->set_data(bl);
+
+ return static_cast<Message*>(m);
+}
+#endif
+
+static inline Message* new_simple_ping_with_data(const char *tag,
+ uint32_t size,
+ uint32_t nfrags)
+{
+ static size_t pagesize = sysconf(_SC_PAGESIZE);
+ static uint32_t counter;
+ uint32_t segsize;
+ int do_page_alignment;
+
+ MDataPing *m = new MDataPing();
+ m->counter = counter++;
+ m->tag = tag;
+
+ bufferlist bl;
+ void *p;
+
+ segsize = (size+nfrags-1)/nfrags;
+ segsize = (segsize + 7) & ~7;
+ if (segsize < 32) segsize = 32;
+
+ do_page_alignment = segsize >= 1024;
+ if (do_page_alignment)
+ segsize = (segsize + pagesize - 1) & ~(pagesize - 1);
+ m->free_data = true;
+ for (uint32_t i = 0; i < nfrags; ++i) {
+ if (do_page_alignment) {
+ if (posix_memalign(&p, pagesize, segsize))
+ p = NULL;
+ } else {
+ p = malloc(segsize);
+ }
+
+ strcpy((char*) p, tag);
+ uint32_t* t = (uint32_t* ) (((char*) p) + segsize - 32);
+ *t = counter;
+ t[1] = i;
+
+ bl.append(buffer::create_static(segsize, (char*) p));
+ }
+ m->set_data(bl);
+
+ return static_cast<Message*>(m);
+}
+
+static inline Message* new_simple_ping_with_data(const char *tag,
+ uint32_t size)
+{
+ return new_simple_ping_with_data(tag, size, 1);
+}
+
+
+#endif /* MESSAGE_HELPER_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/msg_types.h"
+#include "msg/Messenger.h"
+#include "messages/MPing.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "message_helper.h"
+#include "simple_dispatcher.h"
+
+#define dout_subsys ceph_subsys_simple_client
+
+void usage(ostream& out)
+{
+ out << "usage: simple_client [options]\n"
+"options:\n"
+" --addr X\n"
+" --port X\n"
+" --msgs X\n"
+" --dsize X\n"
+ ;
+}
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger* messenger;
+ SimpleDispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t dest_addr;
+ ConnectionRef conn;
+ int r = 0;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+
+ int n_msgs = 50;
+ int n_dsize = 0;
+
+ struct timespec ts;
+ ts.tv_sec = 1;
+ ts.tv_nsec = 0;
+
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_ANY, CODE_ENVIRONMENT_UTILITY,
+ 0);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--msgs",
+ (char*) NULL)) {
+ n_msgs = atoi(val.c_str());;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--dsize",
+ (char*) NULL)) {
+ n_dsize = atoi(val.c_str());;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ if (!args.empty()) {
+ cerr << "What is this? -- " << args[0] << std::endl;
+ usage(cerr);
+ exit(1);
+ }
+
+ cout << "simple_client starting " <<
+ "dest addr " << addr << " " <<
+ "dest port " << port << " " <<
+ "initial msgs (pipe depth) " << n_msgs << " " <<
+ "data buffer size " << n_dsize << std::endl;
+
+ messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
+ entity_name_t::MON(-1),
+ "client",
+ getpid());
+
+ // enable timing prints
+ messenger->set_magic(MSG_MAGIC_TRACE_CTR);
+ messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&dest_addr, dest_str.c_str());
+ entity_inst_t dest_server(entity_name_t::MON(-1), dest_addr);
+
+ dispatcher = new SimpleDispatcher(messenger);
+ messenger->add_dispatcher_head(dispatcher);
+
+ dispatcher->set_active(); // this side is the pinger
+
+ r = messenger->start();
+ if (r < 0)
+ goto out;
+
+ conn = messenger->get_connection(dest_server);
+
+ // do stuff
+ time_t t1, t2;
+
+ t1 = time(NULL);
+
+ int msg_ix;
+ Message *m;
+ for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
+ /* add a data payload if asked */
+ if (! n_dsize) {
+ m = new MPing();
+ } else {
+ m = new_simple_ping_with_data("simple_client", n_dsize);
+ }
+ conn->send_message(m);
+ }
+
+ // do stuff
+ while (conn->is_connected()) {
+ nanosleep(&ts, NULL);
+ }
+
+ t2 = time(NULL);
+ cout << "Processed " << dispatcher->get_dcount() + n_msgs
+ << " round-trip messages in " << t2-t1 << "s"
+ << std::endl;
+out:
+ return r;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "simple_dispatcher.h"
+#include "messages/MPing.h"
+#include "messages/MDataPing.h"
+
+SimpleDispatcher::SimpleDispatcher(Messenger *msgr) :
+ Dispatcher(msgr->cct),
+ active(false),
+ messenger(msgr),
+ dcount(0)
+{
+ // nothing
+}
+
+SimpleDispatcher::~SimpleDispatcher() {
+ // nothing
+}
+
+bool SimpleDispatcher::ms_dispatch(Message *m)
+{
+ ConnectionRef conn;
+ uint64_t dc = 0;
+
+ dc = dcount++;
+
+ ConnectionRef con = m->get_connection();
+ Messenger* msgr = con->get_messenger();
+
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ break;
+ case MSG_DATA_PING:
+ {
+ MDataPing* mdp __attribute__((unused)) = static_cast<MDataPing*>(m);
+ //cout << "MDataPing " << mdp->tag << " " << mdp->counter << std::endl;
+ //mdp->get_data().hexdump(cout);
+ ConnectionRef con = m->get_connection();
+ con->send_message(m);
+ }
+ break;
+ default:
+ abort();
+ }
+
+ if (unlikely(msgr->get_magic() & MSG_MAGIC_TRACE_CTR)) {
+ if (unlikely(dc % 65536) == 0) {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME_COARSE, &ts);
+ std::cout << "ping " << dc << " nanos: " <<
+ ts.tv_nsec + (ts.tv_sec * 1000000000) << std::endl;
+ }
+ } /* trace ctr */
+
+
+ con->send_message(m);
+
+ //m->put();
+
+ return true;
+}
+
+bool SimpleDispatcher::ms_handle_reset(Connection *con)
+{
+ return true;
+}
+
+void SimpleDispatcher::ms_handle_remote_reset(Connection *con)
+{
+ // nothing
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef SIMPLEDISPATCHER_H_
+#define SIMPLEDISPATCHER_H_
+
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+
+class SimpleDispatcher: public Dispatcher {
+private:
+ bool active;
+ Messenger *messenger;
+ uint64_t dcount;
+public:
+ SimpleDispatcher(Messenger *msgr);
+ virtual ~SimpleDispatcher();
+
+ uint64_t get_dcount() { return dcount; }
+
+ void set_active() {
+ active = true;
+ };
+
+ // how i receive messages
+ virtual bool ms_dispatch(Message *m);
+
+ /**
+ * This function will be called whenever a new Connection is made to the
+ * Messenger.
+ *
+ * @param con The new Connection which has been established. You are not
+ * granted a reference to it -- take one if you need one!
+ */
+ virtual void ms_handle_connect(Connection *con) { };
+
+ /**
+ * Callback indicating we have accepted an incoming connection.
+ *
+ * @param con The (new or existing) Connection associated with the session
+ */
+ virtual void ms_handle_accept(Connection *con) { };
+
+ /*
+ * this indicates that the ordered+reliable delivery semantics have
+ * been violated. Messages may have been lost due to a fault
+ * in the network connection.
+ * Only called on lossy Connections or those you've
+ * designated mark_down_on_empty().
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual bool ms_handle_reset(Connection *con);
+
+ /**
+ * This indicates that the ordered+reliable delivery semantics
+ * have been violated because the remote somehow reset.
+ * It implies that incoming messages were dropped, and
+ * probably some of our previous outgoing messages were too.
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual void ms_handle_remote_reset(Connection *con);
+
+ /**
+ * @defgroup Authentication
+ * @{
+ */
+ /**
+ * Retrieve the AuthAuthorizer for the given peer type. It might not
+ * provide one if it knows there is no AuthAuthorizer for that type.
+ *
+ * @param dest_type The peer type we want the authorizer for.
+ * @param a Double pointer to an AuthAuthorizer. The Dispatcher will fill
+ * in *a with the correct AuthAuthorizer, if it can. Make sure that you have
+ * set *a to NULL before calling in.
+ * @param force_new Force the Dispatcher to wait for a new set of keys before
+ * returning the authorizer.
+ *
+ * @return True if this function call properly filled in *a, false otherwise.
+ */
+ virtual bool ms_get_authorizer(int dest_type, AuthAuthorizer **a,
+ bool force_new) { return false; };
+
+ /**
+ * Verify the authorizer for a new incoming Connection.
+ *
+ * @param con The new incoming Connection
+ * @param peer_type The type of the endpoint which initiated this Connection
+ * @param protocol The ID of the protocol in use (at time of writing, cephx
+ * or none)
+ * @param authorizer The authorization string supplied by the remote
+ * @param authorizer_reply Output param: The string we should send back to
+ * the remote to authorize ourselves. Only filled in if isvalid
+ * @param isvalid Output param: True if authorizer is valid, false otherwise
+ *
+ * @return True if we were able to prove or disprove correctness of
+ * authorizer, false otherwise.
+ */
+ virtual bool ms_verify_authorizer(Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer,
+ bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key) {
+ /* always succeed */
+ isvalid = true;
+ return true;
+ };
+
+};
+
+#endif /* SIMPLEDISPATCHER_H_ */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/Messenger.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "simple_dispatcher.h"
+
+#define dout_subsys ceph_subsys_simple_server
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger *messenger;
+ Dispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t bind_addr;
+ int r = 0;
+
+ using std::endl;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+
+ cout << "Simple Server starting..." << endl;
+
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_ANY, CODE_ENVIRONMENT_DAEMON,
+ 0);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&bind_addr, dest_str.c_str());
+
+ messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
+ entity_name_t::MON(-1),
+ "simple_server",
+ 0 /* nonce */);
+ // enable timing prints
+ messenger->set_magic(MSG_MAGIC_TRACE_CTR);
+ messenger->set_default_policy(
+ Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0));
+
+ r = messenger->bind(bind_addr);
+ if (r < 0)
+ goto out;
+
+ // Set up crypto, daemonize, etc.
+ //global_init_daemonize(g_ceph_context, 0);
+ common_init_finish(g_ceph_context);
+
+ dispatcher = new SimpleDispatcher(messenger);
+
+ messenger->add_dispatcher_head(dispatcher); // should reach ready()
+ messenger->start();
+ messenger->wait(); // can't be called until ready()
+
+ // done
+ delete messenger;
+
+out:
+ cout << "Simple Server exit" << endl;
+ return r;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/msg_types.h"
+#include "msg/xio/XioMessenger.h"
+#include "msg/xio/FastStrategy.h"
+#include "msg/xio/QueueStrategy.h"
+#include "msg/xio/XioMsg.h"
+#include "messages/MPing.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "message_helper.h"
+#include "xio_dispatcher.h"
+#include "msg/xio/XioConnection.h"
+
+#define dout_subsys ceph_subsys_xio_client
+
+void usage(ostream& out)
+{
+ out << "usage: xio_client [options]\n"
+"options:\n"
+" --addr X\n"
+" --port X\n"
+" --msgs X\n"
+" --dsize X\n"
+" --nfrags X\n"
+" --dfast\n"
+ ;
+}
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger* messenger;
+ XioDispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t dest_addr;
+ ConnectionRef conn;
+ int r = 0;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+ int n_msgs = 50;
+ int n_dsize = 0;
+ int n_nfrags = 1;
+ bool dfast = false;
+
+ struct timespec ts;
+ ts.tv_sec = 5;
+ ts.tv_nsec = 0;
+
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ global_init(NULL, args,
+ CEPH_ENTITY_TYPE_ANY, CODE_ENVIRONMENT_UTILITY, 0);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--msgs",
+ (char*) NULL)) {
+ n_msgs = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--dsize",
+ (char*) NULL)) {
+ n_dsize = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--nfrags",
+ (char*) NULL)) {
+ n_nfrags = atoi(val.c_str());
+ } else if (ceph_argparse_flag(args, arg_iter, "--dfast",
+ (char*) NULL)) {
+ dfast = true;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ if (!args.empty()) {
+ cerr << "What is this? -- " << args[0] << std::endl;
+ usage(cerr);
+ exit(1);
+ }
+
+ DispatchStrategy* dstrategy;
+ if (dfast)
+ dstrategy = new FastStrategy();
+ else
+ dstrategy = new QueueStrategy(2);
+
+ messenger = new XioMessenger(g_ceph_context,
+ entity_name_t::MON(-1),
+ "xio_client",
+ 0 /* nonce */,
+ dstrategy);
+
+ // enable timing prints
+ static_cast<XioMessenger*>(messenger)->set_magic(
+ MSG_MAGIC_REDUPE /* resubmit messages on delivery (REQUIRED) */ |
+ MSG_MAGIC_TRACE_CTR /* timing prints */);
+
+ // ensure we have a pool of sizeof(payload data)
+ if (n_dsize)
+ (void) static_cast<XioMessenger*>(messenger)->pool_hint(n_dsize);
+
+ messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&dest_addr, dest_str.c_str());
+ entity_inst_t dest_server(entity_name_t::MON(-1), dest_addr);
+
+ dispatcher = new XioDispatcher(messenger);
+ messenger->add_dispatcher_head(dispatcher);
+
+ dispatcher->set_active(); // this side is the pinger
+
+ r = messenger->start();
+ if (r < 0)
+ goto out;
+
+ conn = messenger->get_connection(dest_server);
+
+ // do stuff
+ time_t t1, t2;
+ t1 = time(NULL);
+
+ int msg_ix;
+ for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
+ /* add a data payload if asked */
+ if (! n_dsize) {
+ conn->send_message(new MPing());
+ } else {
+ conn->send_message(new_simple_ping_with_data("xio_client", n_dsize, n_nfrags));
+ }
+ }
+
+ // do stuff
+ while (conn->is_connected()) {
+ nanosleep(&ts, NULL);
+ }
+
+ t2 = time(NULL);
+ cout << "Processed "
+ << static_cast<XioConnection*>(conn->get())->get_scount()
+ << " one-way messages in " << t2-t1 << "s"
+ << std::endl;
+
+ conn->put();
+
+ // wait a bit for cleanup to finalize
+ ts.tv_sec = 5;
+ nanosleep(&ts, NULL);
+
+ messenger->shutdown();
+
+out:
+ return r;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "xio_dispatcher.h"
+#include "messages/MPing.h"
+#include "messages/MDataPing.h"
+
+XioDispatcher::XioDispatcher(Messenger *msgr) :
+ Dispatcher(msgr->cct),
+ active(false),
+ messenger(msgr),
+ dcount(0)
+{
+ // nothing
+}
+
+XioDispatcher::~XioDispatcher() {
+ // nothing
+}
+
+bool XioDispatcher::ms_dispatch(Message *m)
+{
+ ConnectionRef conn;
+ uint64_t dc = 0;
+
+ dc = dcount++;
+
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ break;
+ case MSG_DATA_PING:
+ {
+ MDataPing* mdp __attribute__((unused)) = static_cast<MDataPing*>(m);
+ //cout << "MDataPing " << mdp->tag << " " << mdp->counter << std::endl;
+ //mdp->get_data().hexdump(cout);
+ }
+ break;
+ default:
+ abort();
+ }
+
+ if (unlikely(m->get_magic() & MSG_MAGIC_TRACE_CTR)) {
+ if (unlikely(dc % 65536) == 0) {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME_COARSE, &ts);
+ std::cout << "ping " << dc << " nanos: " <<
+ ts.tv_nsec + (ts.tv_sec * 1000000000) << std::endl;
+ }
+ } /* trace ctr */
+
+ m->put();
+
+ return true;
+}
+
+bool XioDispatcher::ms_handle_reset(Connection *con)
+{
+ return true;
+}
+
+void XioDispatcher::ms_handle_remote_reset(Connection *con)
+{
+ // nothing
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef XIODISPATCHER_H_
+#define XIODISPATCHER_H_
+
+#include "msg/Dispatcher.h"
+#include "msg/Messenger.h"
+
+class XioDispatcher: public Dispatcher {
+private:
+ bool active;
+ Messenger *messenger;
+ uint64_t dcount;
+public:
+ XioDispatcher(Messenger *msgr);
+ virtual ~XioDispatcher();
+
+ uint64_t get_dcount() { return dcount; }
+
+ void set_active() {
+ active = true;
+ };
+
+ // how i receive messages
+ virtual bool ms_dispatch(Message *m);
+
+ /**
+ * This function will be called whenever a new Connection is made to the
+ * Messenger.
+ *
+ * @param con The new Connection which has been established. You are not
+ * granted a reference to it -- take one if you need one!
+ */
+ virtual void ms_handle_connect(Connection *con) { };
+
+ /**
+ * Callback indicating we have accepted an incoming connection.
+ *
+ * @param con The (new or existing) Connection associated with the session
+ */
+ virtual void ms_handle_accept(Connection *con) { };
+
+ /*
+ * this indicates that the ordered+reliable delivery semantics have
+ * been violated. Messages may have been lost due to a fault
+ * in the network connection.
+ * Only called on lossy Connections or those you've
+ * designated mark_down_on_empty().
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual bool ms_handle_reset(Connection *con);
+
+ /**
+ * This indicates that the ordered+reliable delivery semantics
+ * have been violated because the remote somehow reset.
+ * It implies that incoming messages were dropped, and
+ * probably some of our previous outgoing messages were too.
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual void ms_handle_remote_reset(Connection *con);
+
+ /**
+ * @defgroup Authentication
+ * @{
+ */
+ /**
+ * Retrieve the AuthAuthorizer for the given peer type. It might not
+ * provide one if it knows there is no AuthAuthorizer for that type.
+ *
+ * @param dest_type The peer type we want the authorizer for.
+ * @param a Double pointer to an AuthAuthorizer. The Dispatcher will fill
+ * in *a with the correct AuthAuthorizer, if it can. Make sure that you have
+ * set *a to NULL before calling in.
+ * @param force_new Force the Dispatcher to wait for a new set of keys before
+ * returning the authorizer.
+ *
+ * @return True if this function call properly filled in *a, false otherwise.
+ */
+ virtual bool ms_get_authorizer(int dest_type, AuthAuthorizer **a,
+ bool force_new) { return false; };
+
+ /**
+ * Verify the authorizer for a new incoming Connection.
+ *
+ * @param con The new incoming Connection
+ * @param peer_type The type of the endpoint which initiated this Connection
+ * @param protocol The ID of the protocol in use (at time of writing, cephx
+ * or none)
+ * @param authorizer The authorization string supplied by the remote
+ * @param authorizer_reply Output param: The string we should send back to
+ * the remote to authorize ourselves. Only filled in if isvalid
+ * @param isvalid Output param: True if authorizer is valid, false otherwise
+ *
+ * @return True if we were able to prove or disprove correctness of
+ * authorizer, false otherwise.
+ */
+ virtual bool ms_verify_authorizer(Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer,
+ bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key) {
+ /* always succeed */
+ isvalid = true;
+ return true;
+ };
+
+};
+
+#endif /* XIODISPATCHER_H_ */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "msg/xio/XioMessenger.h"
+#include "msg/xio/FastStrategy.h"
+#include "msg/xio/QueueStrategy.h"
+#include "common/Timer.h"
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "perfglue/heap_profiler.h"
+#include "common/address_helper.h"
+#include "xio_dispatcher.h"
+
+#define dout_subsys ceph_subsys_simple_server
+
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ Messenger *messenger;
+ Dispatcher *dispatcher;
+ std::vector<const char*>::iterator arg_iter;
+ std::string val;
+ entity_addr_t bind_addr;
+ int r = 0;
+
+ using std::endl;
+
+ std::string addr = "localhost";
+ std::string port = "1234";
+ bool dfast = false;
+
+ cout << "Xio Server starting..." << endl;
+
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_ANY, CODE_ENVIRONMENT_DAEMON,
+ 0);
+
+ for (arg_iter = args.begin(); arg_iter != args.end();) {
+ if (ceph_argparse_witharg(args, arg_iter, &val, "--addr",
+ (char*) NULL)) {
+ addr = val;
+ } else if (ceph_argparse_witharg(args, arg_iter, &val, "--port",
+ (char*) NULL)) {
+ port = val;
+ } else if (ceph_argparse_flag(args, arg_iter, "--dfast",
+ (char*) NULL)) {
+ dfast = true;
+ } else {
+ ++arg_iter;
+ }
+ };
+
+ string dest_str = "tcp://";
+ dest_str += addr;
+ dest_str += ":";
+ dest_str += port;
+ entity_addr_from_url(&bind_addr, dest_str.c_str());
+
+ DispatchStrategy* dstrategy;
+ if (dfast)
+ dstrategy = new FastStrategy();
+ else
+ dstrategy = new QueueStrategy(2);
+
+ messenger = new XioMessenger(g_ceph_context,
+ entity_name_t::MON(-1),
+ "xio_server",
+ 0 /* nonce */,
+ dstrategy);
+
+ static_cast<XioMessenger*>(messenger)->set_magic(
+ MSG_MAGIC_REDUPE /* resubmit messages on delivery (REQUIRED) */ |
+ MSG_MAGIC_TRACE_CTR /* timing prints */);
+
+ messenger->set_default_policy(
+ Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0));
+
+ r = messenger->bind(bind_addr);
+ if (r < 0)
+ goto out;
+
+ // Set up crypto, daemonize, etc.
+ //global_init_daemonize(g_ceph_context, 0);
+ common_init_finish(g_ceph_context);
+
+ dispatcher = new XioDispatcher(messenger);
+
+ messenger->add_dispatcher_head(dispatcher); // should reach ready()
+ messenger->start();
+ messenger->wait(); // can't be called until ready()
+
+ // done
+ delete messenger;
+
+out:
+ cout << "Simple Server exit" << endl;
+ return r;
+}
+