}
/* update connection timestamp */
- recv.set(tmsg->timestamp);
+ recv = tmsg->timestamp;
Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer,
payload, middle, data, this);
xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED);
- session_state.set(UP);
- startup_state.set(READY);
+ session_state = session_states::UP;
+ startup_state = session_startup_states::READY;
if (! (flags & CState::OP_FLAG_LOCKED))
pthread_spin_unlock(&xcon->sp);
int XioConnection::CState::state_discon()
{
- session_state.set(DISCONNECTED);
- startup_state.set(IDLE);
+ session_state = session_states::DISCONNECTED;
+ startup_state = session_startup_states::IDLE;
return 0;
}
if (! (flags & OP_FLAG_LOCKED))
pthread_spin_lock(&xcon->sp);
- session_state.set(FLOW_CONTROLLED);
+ session_state = session_states::FLOW_CONTROLLED;
if (! (flags & OP_FLAG_LOCKED))
pthread_spin_unlock(&xcon->sp);
pthread_spin_lock(&xcon->sp);
// advance to state FAIL, drop queued, msgs, adjust LRU
- session_state.set(DISCONNECTED);
- startup_state.set(FAIL);
+ session_state = session_states::DISCONNECTED);
+ startup_state = session_startup_states::FAIL);
xcon->discard_out_queues(flags|OP_FLAG_LOCKED);
xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
#ifndef XIO_CONNECTION_H
#define XIO_CONNECTION_H
+#include <atomic>
+
#include <boost/intrusive/avl_set.hpp>
#include <boost/intrusive/list.hpp>
+
extern "C" {
#include "libxio.h"
}
+
#include "XioInSeq.h"
#include "XioSubmit.h"
#include "msg/Connection.h"
#include "msg/Messenger.h"
-#include "include/atomic.h"
#include "auth/AuthSessionHandler.h"
#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
public:
enum type { ACTIVE, PASSIVE };
- enum session_states {
+ enum class session_states : unsigned {
INIT = 0,
START,
UP,
BARRIER
};
- enum session_startup_states {
+ enum class session_startup_states : unsigned {
IDLE = 0,
CONNECTING,
ACCEPTING,
private:
XioConnection::type xio_conn_type;
XioPortal *portal;
- atomic_t connected;
+ std::atomic<bool> connected = { false };
entity_inst_t peer;
struct xio_session *session;
struct xio_connection *conn;
pthread_spinlock_t sp;
- atomic_t send;
- atomic_t recv;
+ std::atomic<int64_t> send = { 0 };
+ std::atomic<int64_t> recv = { 0 };
uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
uint32_t magic;
uint32_t special_handling;
uint32_t reconnects;
uint32_t connect_seq, peer_global_seq;
uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
- atomic64_t out_seq; // atomic<uint32_t>
+ std::atomic<int64_t> out_seq = { 0 };
lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
- peer_global_seq(0), in_seq(0), out_seq_acked(0),
- out_seq(0) {}
+ peer_global_seq(0), in_seq(0), out_seq_acked(0)
+ {}
void set_in_seq(uint64_t seq) {
in_seq = seq;
}
uint64_t next_out_seq() {
- return out_seq.inc();
+ return ++out_seq;
}
} state;
XioConnection *xcon;
uint32_t protocol_version;
- atomic_t session_state;
- atomic_t startup_state;
+ std::atomic<session_states> session_state = { 0 };
+ std::atomic<session_startup_state> startup_state = { 0 };
uint32_t reconnects;
uint32_t connect_seq, global_seq, peer_global_seq;
uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
- std::atomic<uint64_t> out_seq;
+ std::atomic<uint64_t> out_seq = { 0 };
uint32_t flags;
peer_global_seq(0),
in_seq(0),
out_seq_acked(0),
- out_seq(0),
flags(FLAG_NONE) {}
uint64_t get_session_state() {
friend class XioSend;
int on_disconnect_event() {
- connected.set(false);
+ connected = false;
pthread_spin_lock(&sp);
discard_out_queues(CState::OP_FLAG_LOCKED);
pthread_spin_unlock(&sp);
void disconnect() {
if (is_connected()) {
- connected.set(false);
+ connected = false;
xio_disconnect(conn); // normal teardown will clean up conn
}
}
}
uint64_t next_seq() {
- return seq.inc();
+ return ++seq;
}
};
#define dout_prefix *_dout << "xio."
Mutex mtx("XioMessenger Package Lock");
-atomic_t initialized;
+std::atomic<bool> initialized = { false };
-atomic_t XioMessenger::nInstances;
+std::atomic<unsigned> XioMessenger::nInstances = { 0 };
struct xio_mempool *xio_msgr_noreg_mpool;
} /* xio_uri_from_entity */
void XioInit::package_init(CephContext *cct) {
- if (! initialized.read()) {
+ if (! initialized) {
mtx.Lock();
- if (! initialized.read()) {
+ if (! initialized) {
xio_init();
xio_msgr_ops.on_cancel_request = on_cancel_request;
/* mark initialized */
- initialized.set(1);
+ initialized = true;
}
mtx.Unlock();
}
uint64_t cflags, DispatchStrategy *ds)
: SimplePolicyMessenger(cct, name, mname, _nonce),
XioInit(cct),
- nsessions(0),
- shutdown_called(false),
portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
dispatch_strategy(ds),
loop_con(new XioLoopbackConnection(this)),
dispatch_strategy->set_messenger(this);
/* update class instance count */
- nInstances.inc();
+ nInstances++;
loop_con->set_features(CEPH_FEATURES_ALL);
ldout(cct,2) << "Create msgr: " << this << " instance: "
- << nInstances.read() << " type: " << name.type_str()
+ << nInstances << " type: " << name.type_str()
<< " subtype: " << mname << " nportals: " << get_nportals(cflags)
<< " nconns_per_portal: " << get_nconns_per_portal(cflags)
<< dendl;
struct xio_new_session_req *req,
void *cb_user_context)
{
- if (shutdown_called.read()) {
+ if (shutdown_called) {
return xio_reject(
session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
}
int code = portals.accept(session, req, cb_user_context);
if (! code)
- nsessions.inc();
+ nsessions++;
return code;
} /* new_session */
xcona.user_context = xcon;
(void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
- xcon->connected.set(true);
+ xcon->connected = true;
/* sentinel ref */
xcon->get(); /* xcon->nref == 1 */
xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
}
xio_session_destroy(session);
- if (nsessions.dec() == 0) {
+ if (--nsessions == 0) {
Mutex::Locker lck(sh_mtx);
- if (nsessions.read() == 0)
+ if (nsessions == 0)
sh_cond.Signal();
}
break;
int XioMessenger::shutdown()
{
- shutdown_called.set(true);
+ shutdown_called = true;
conns_sp.lock();
XioConnection::ConnList::iterator iter;
iter = conns_list.begin();
(void) iter->disconnect(); // XXX mark down?
}
conns_sp.unlock();
- while(nsessions.read() > 0) {
+ while(nsessions > 0) {
Mutex::Locker lck(sh_mtx);
- if (nsessions.read() > 0)
+ if (nsessions > 0)
sh_cond.Wait(sh_mtx);
}
portals.shutdown();
ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
{
- if (shutdown_called.read())
+ if (shutdown_called)
return NULL;
const entity_inst_t& self_inst = get_myinst();
return NULL;
}
- nsessions.inc();
- xcon->connected.set(true);
+ nsessions++;
+ xcon->connected = true;
/* sentinel ref */
xcon->get(); /* xcon->nref == 1 */
m->tag = XIO_NOP_TAG_MARKDOWN;
m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
// stall new messages
- xcon->cstate.session_state.set(XioConnection::BARRIER);
+ xcon->cstate.session_state = XioConnection::session_states::BARRIER;
(void) _send_message_impl(m, xcon);
}
XioMessenger::~XioMessenger()
{
delete dispatch_strategy;
- nInstances.dec();
+ nInstances--;
} /* dtor */
#define XIO_MESSENGER_H
#include "msg/SimplePolicyMessenger.h"
+
+#include <atomic>
+
extern "C" {
#include "libxio.h"
}
+
#include "XioConnection.h"
#include "XioPortal.h"
#include "QueueStrategy.h"
-#include "include/atomic.h"
#include "common/Thread.h"
#include "common/Mutex.h"
#include "include/Spinlock.h"
class XioMessenger : public SimplePolicyMessenger, XioInit
{
private:
- static atomic_t nInstances;
- atomic_t nsessions;
- atomic_t shutdown_called;
+ static std::atomic<uint64_t> nInstances = { 0 };
+ std::atomic<uint64_t> nsessions = { 0 };
+ std::atomic<bool> shutdown_called = { false };
Spinlock conns_sp;
XioConnection::ConnList conns_list;
XioConnection::EntitySet conns_entity_map;
xcon->get();
}
- XioSend* get() { nrefs.inc(); return this; };
+ XioSend* get() { nrefs++; return this; };
void put(int n) {
- int refs = nrefs.sub(n);
+ int refs = nrefs -= n;
if (refs == 0) {
struct xio_reg_mem *mp = &this->mp_this;
this->~XioSend();
private:
xio_msg_ex req_0;
struct xio_reg_mem mp_this;
- atomic_t nrefs;
+ std::atomic<unsigned> nrefs = { 0 };
};
class XioCommand : public XioSend
XioConnection *xcon;
XioInSeq msg_seq;
XioPool rsp_pool;
- atomic_t nrefs;
+ std::atomic<unsigned> nrefs { 1 };
bool cl_flag;
friend class XioConnection;
friend class XioMessenger;
xcon(_xcon->get()),
msg_seq(_msg_seq),
rsp_pool(xio_msgr_noreg_mpool),
- nrefs(1),
cl_flag(false),
mp_this(_mp)
{
int release_msgs();
XioDispatchHook* get() {
- nrefs.inc(); return this;
+ nrefs++; return this;
}
void put(int n = 1) {
- int refs = nrefs.sub(n);
+ int refs = nrefs -= n;
if (refs == 0) {
/* in Marcus' new system, refs reaches 0 twice: once in
* Message lifecycle, and again after xio_release_msg.
#ifndef XIO_POOL_H
#define XIO_POOL_H
+#include <atomic>
+#include <vector>
+#include <cstdlib>
+#include <cstring>
+#include <cstdint>
+
extern "C" {
-#include <stdlib.h>
-#include <string.h>
-#include <stdint.h>
#include "libxio.h"
}
-#include <vector>
-#include "include/atomic.h"
-#include "common/likely.h"
+#include "common/likely.h"
static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size,
struct xio_reg_mem* mp);
static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp);
-using ceph::atomic_t;
-
class XioPool
{
private:
NUM_SLABS,
};
- atomic_t ctr_set[NUM_SLABS];
-
- atomic_t msg_cnt; // send msgs
- atomic_t hook_cnt; // recv msgs
+ std::atomic<unsigned> ctr_set[NUM_SLABS] = {};
+ std::atomic<unsigned> msg_cnt = { 0 }; // send msgs
+ std::atomic<unsigned> hook_cnt = { 0 }; // recv msgs
public:
- XioPoolStats() : msg_cnt(0), hook_cnt(0) {
- for (int ix = 0; ix < NUM_SLABS; ++ix) {
- ctr_set[ix].set(0);
- }
- }
-
void dump(const char* tag, uint64_t serial);
void inc(uint64_t size) {
if (size <= 64) {
- (ctr_set[SLAB_64]).inc();
+ (ctr_set[SLAB_64])++;
return;
}
if (size <= 256) {
- (ctr_set[SLAB_256]).inc();
+ (ctr_set[SLAB_256])++;
return;
}
if (size <= 1024) {
- (ctr_set[SLAB_1024]).inc();
+ (ctr_set[SLAB_1024])++;
return;
}
if (size <= 8192) {
- (ctr_set[SLAB_PAGE]).inc();
+ (ctr_set[SLAB_PAGE])++;
return;
}
- (ctr_set[SLAB_MAX]).inc();
+ (ctr_set[SLAB_MAX])++;
}
void dec(uint64_t size) {
if (size <= 64) {
- (ctr_set[SLAB_64]).dec();
+ (ctr_set[SLAB_64])--;
return;
}
if (size <= 256) {
- (ctr_set[SLAB_256]).dec();
+ (ctr_set[SLAB_256])--;
return;
}
if (size <= 1024) {
- (ctr_set[SLAB_1024]).dec();
+ (ctr_set[SLAB_1024])--;
return;
}
if (size <= 8192) {
- (ctr_set[SLAB_PAGE]).dec();
+ (ctr_set[SLAB_PAGE])--;
return;
}
- (ctr_set[SLAB_MAX]).dec();
+ (ctr_set[SLAB_MAX])--;
}
- void inc_overflow() { ctr_set[SLAB_OVERFLOW].inc(); }
- void dec_overflow() { ctr_set[SLAB_OVERFLOW].dec(); }
+ void inc_overflow() { ctr_set[SLAB_OVERFLOW]++; }
+ void dec_overflow() { ctr_set[SLAB_OVERFLOW]--; }
void inc_msgcnt() {
if (unlikely(XioPool::trace_msgcnt)) {
- msg_cnt.inc();
+ msg_cnt++;
}
}
void dec_msgcnt() {
if (unlikely(XioPool::trace_msgcnt)) {
- msg_cnt.dec();
+ msg_cnt--;
}
}
void inc_hookcnt() {
if (unlikely(XioPool::trace_msgcnt)) {
- hook_cnt.inc();
+ hook_cnt++;
}
}
void dec_hookcnt() {
if (unlikely(XioPool::trace_msgcnt)) {
- hook_cnt.dec();
+ hook_cnt--;
}
}
};
int r = xio_mempool_alloc(pool, size, mp);
if (r == 0) {
if (unlikely(XioPool::trace_mempool))
- xp_stats.inc(size);
+ xp_stats += size;
return 0;
}
// fall back to malloc on errors
{
if (mp->length) {
if (unlikely(XioPool::trace_mempool))
- xp_stats.dec(size);
+ xp_stats -= size;
xio_mempool_free(mp);
} else { // from malloc
if (unlikely(XioPool::trace_mempool))