From cd7aeba9f9977927c6755a2369b335e1469a3c1d Mon Sep 17 00:00:00 2001 From: Jesse Williamson Date: Fri, 28 Apr 2017 04:52:46 -0700 Subject: [PATCH] xio: migrate atomic_t to std::atomic<> Signed-off-by: Jesse Williamson --- src/msg/xio/XioConnection.cc | 16 ++++----- src/msg/xio/XioConnection.h | 36 ++++++++++---------- src/msg/xio/XioMessenger.cc | 42 +++++++++++------------ src/msg/xio/XioMessenger.h | 11 ++++--- src/msg/xio/XioMsg.h | 13 ++++---- src/msg/xio/XioPool.h | 64 ++++++++++++++++-------------------- 6 files changed, 88 insertions(+), 94 deletions(-) diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc index 107a489a44e7..36b946de08fb 100644 --- a/src/msg/xio/XioConnection.cc +++ b/src/msg/xio/XioConnection.cc @@ -433,7 +433,7 @@ int XioConnection::handle_data_msg(struct xio_session *session, } /* update connection timestamp */ - recv.set(tmsg->timestamp); + recv = tmsg->timestamp; Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer, payload, middle, data, this); @@ -786,8 +786,8 @@ int XioConnection::CState::state_up_ready(uint32_t flags) xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED); - session_state.set(UP); - startup_state.set(READY); + session_state = session_states::UP; + startup_state = session_startup_states::READY; if (! (flags & CState::OP_FLAG_LOCKED)) pthread_spin_unlock(&xcon->sp); @@ -797,8 +797,8 @@ int XioConnection::CState::state_up_ready(uint32_t flags) int XioConnection::CState::state_discon() { - session_state.set(DISCONNECTED); - startup_state.set(IDLE); + session_state = session_states::DISCONNECTED; + startup_state = session_startup_states::IDLE; return 0; } @@ -808,7 +808,7 @@ int XioConnection::CState::state_flow_controlled(uint32_t flags) if (! (flags & OP_FLAG_LOCKED)) pthread_spin_lock(&xcon->sp); - session_state.set(FLOW_CONTROLLED); + session_state = session_states::FLOW_CONTROLLED; if (! (flags & OP_FLAG_LOCKED)) pthread_spin_unlock(&xcon->sp); @@ -822,8 +822,8 @@ int XioConnection::CState::state_fail(Message* m, uint32_t flags) pthread_spin_lock(&xcon->sp); // advance to state FAIL, drop queued, msgs, adjust LRU - session_state.set(DISCONNECTED); - startup_state.set(FAIL); + session_state = session_states::DISCONNECTED); + startup_state = session_startup_states::FAIL); xcon->discard_out_queues(flags|OP_FLAG_LOCKED); xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU); diff --git a/src/msg/xio/XioConnection.h b/src/msg/xio/XioConnection.h index cb1d735b10c7..1ed88b995d8e 100644 --- a/src/msg/xio/XioConnection.h +++ b/src/msg/xio/XioConnection.h @@ -16,16 +16,19 @@ #ifndef XIO_CONNECTION_H #define XIO_CONNECTION_H +#include + #include #include + extern "C" { #include "libxio.h" } + #include "XioInSeq.h" #include "XioSubmit.h" #include "msg/Connection.h" #include "msg/Messenger.h" -#include "include/atomic.h" #include "auth/AuthSessionHandler.h" #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) @@ -44,7 +47,7 @@ class XioConnection : public Connection public: enum type { ACTIVE, PASSIVE }; - enum session_states { + enum class session_states : unsigned { INIT = 0, START, UP, @@ -54,7 +57,7 @@ public: BARRIER }; - enum session_startup_states { + enum class session_startup_states : unsigned { IDLE = 0, CONNECTING, ACCEPTING, @@ -65,13 +68,13 @@ public: private: XioConnection::type xio_conn_type; XioPortal *portal; - atomic_t connected; + std::atomic connected = { false }; entity_inst_t peer; struct xio_session *session; struct xio_connection *conn; pthread_spinlock_t sp; - atomic_t send; - atomic_t recv; + std::atomic send = { 0 }; + std::atomic recv = { 0 }; uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials) uint32_t magic; uint32_t special_handling; @@ -94,18 +97,18 @@ private: uint32_t reconnects; uint32_t connect_seq, peer_global_seq; uint64_t in_seq, out_seq_acked; // atomic, got receipt - atomic64_t out_seq; // atomic + std::atomic out_seq = { 0 }; lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0), - peer_global_seq(0), in_seq(0), out_seq_acked(0), - out_seq(0) {} + peer_global_seq(0), in_seq(0), out_seq_acked(0) + {} void set_in_seq(uint64_t seq) { in_seq = seq; } uint64_t next_out_seq() { - return out_seq.inc(); + return ++out_seq; } } state; @@ -134,13 +137,13 @@ private: XioConnection *xcon; uint32_t protocol_version; - atomic_t session_state; - atomic_t startup_state; + std::atomic session_state = { 0 }; + std::atomic startup_state = { 0 }; uint32_t reconnects; uint32_t connect_seq, global_seq, peer_global_seq; uint64_t in_seq, out_seq_acked; // atomic, got receipt - std::atomic out_seq; + std::atomic out_seq = { 0 }; uint32_t flags; @@ -157,7 +160,6 @@ private: peer_global_seq(0), in_seq(0), out_seq_acked(0), - out_seq(0), flags(FLAG_NONE) {} uint64_t get_session_state() { @@ -239,7 +241,7 @@ private: friend class XioSend; int on_disconnect_event() { - connected.set(false); + connected = false; pthread_spin_lock(&sp); discard_out_queues(CState::OP_FLAG_LOCKED); pthread_spin_unlock(&sp); @@ -303,7 +305,7 @@ public: void disconnect() { if (is_connected()) { - connected.set(false); + connected = false; xio_disconnect(conn); // normal teardown will clean up conn } } @@ -364,7 +366,7 @@ public: } uint64_t next_seq() { - return seq.inc(); + return ++seq; } }; diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc index a63f5ffb3d69..6bf4d52c726d 100644 --- a/src/msg/xio/XioMessenger.cc +++ b/src/msg/xio/XioMessenger.cc @@ -30,9 +30,9 @@ #define dout_prefix *_dout << "xio." Mutex mtx("XioMessenger Package Lock"); -atomic_t initialized; +std::atomic initialized = { false }; -atomic_t XioMessenger::nInstances; +std::atomic XioMessenger::nInstances = { 0 }; struct xio_mempool *xio_msgr_noreg_mpool; @@ -235,10 +235,10 @@ static string xio_uri_from_entity(const string &type, } /* xio_uri_from_entity */ void XioInit::package_init(CephContext *cct) { - if (! initialized.read()) { + if (! initialized) { mtx.Lock(); - if (! initialized.read()) { + if (! initialized) { xio_init(); @@ -334,7 +334,7 @@ void XioInit::package_init(CephContext *cct) { xio_msgr_ops.on_cancel_request = on_cancel_request; /* mark initialized */ - initialized.set(1); + initialized = true; } mtx.Unlock(); } @@ -352,8 +352,6 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, uint64_t cflags, DispatchStrategy *ds) : SimplePolicyMessenger(cct, name, mname, _nonce), XioInit(cct), - nsessions(0), - shutdown_called(false), portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)), dispatch_strategy(ds), loop_con(new XioLoopbackConnection(this)), @@ -374,12 +372,12 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, dispatch_strategy->set_messenger(this); /* update class instance count */ - nInstances.inc(); + nInstances++; loop_con->set_features(CEPH_FEATURES_ALL); ldout(cct,2) << "Create msgr: " << this << " instance: " - << nInstances.read() << " type: " << name.type_str() + << nInstances << " type: " << name.type_str() << " subtype: " << mname << " nportals: " << get_nportals(cflags) << " nconns_per_portal: " << get_nconns_per_portal(cflags) << dendl; @@ -447,13 +445,13 @@ int XioMessenger::new_session(struct xio_session *session, struct xio_new_session_req *req, void *cb_user_context) { - if (shutdown_called.read()) { + if (shutdown_called) { return xio_reject( session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */); } int code = portals.accept(session, req, cb_user_context); if (! code) - nsessions.inc(); + nsessions++; return code; } /* new_session */ @@ -518,7 +516,7 @@ int XioMessenger::session_event(struct xio_session *session, xcona.user_context = xcon; (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX); - xcon->connected.set(true); + xcon->connected = true; /* sentinel ref */ xcon->get(); /* xcon->nref == 1 */ @@ -566,9 +564,9 @@ int XioMessenger::session_event(struct xio_session *session, xp_stats.dump("xio session dtor", reinterpret_cast(session)); } xio_session_destroy(session); - if (nsessions.dec() == 0) { + if (--nsessions == 0) { Mutex::Locker lck(sh_mtx); - if (nsessions.read() == 0) + if (nsessions == 0) sh_cond.Signal(); } break; @@ -943,7 +941,7 @@ assert(req->out.pdata_iov.nents || !nbuffers); int XioMessenger::shutdown() { - shutdown_called.set(true); + shutdown_called = true; conns_sp.lock(); XioConnection::ConnList::iterator iter; iter = conns_list.begin(); @@ -951,9 +949,9 @@ int XioMessenger::shutdown() (void) iter->disconnect(); // XXX mark down? } conns_sp.unlock(); - while(nsessions.read() > 0) { + while(nsessions > 0) { Mutex::Locker lck(sh_mtx); - if (nsessions.read() > 0) + if (nsessions > 0) sh_cond.Wait(sh_mtx); } portals.shutdown(); @@ -965,7 +963,7 @@ int XioMessenger::shutdown() ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest) { - if (shutdown_called.read()) + if (shutdown_called) return NULL; const entity_inst_t& self_inst = get_myinst(); @@ -1020,8 +1018,8 @@ ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest) return NULL; } - nsessions.inc(); - xcon->connected.set(true); + nsessions++; + xcon->connected = true; /* sentinel ref */ xcon->get(); /* xcon->nref == 1 */ @@ -1115,7 +1113,7 @@ void XioMessenger::mark_down_on_empty(Connection* con) m->tag = XIO_NOP_TAG_MARKDOWN; m->set_completion_hook(pool_alloc_markdown_hook(xcon, m)); // stall new messages - xcon->cstate.session_state.set(XioConnection::BARRIER); + xcon->cstate.session_state = XioConnection::session_states::BARRIER; (void) _send_message_impl(m, xcon); } @@ -1135,5 +1133,5 @@ void XioMessenger::try_insert(XioConnection *xcon) XioMessenger::~XioMessenger() { delete dispatch_strategy; - nInstances.dec(); + nInstances--; } /* dtor */ diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h index 0bfdd6e78017..9a81fb2473a1 100644 --- a/src/msg/xio/XioMessenger.h +++ b/src/msg/xio/XioMessenger.h @@ -17,13 +17,16 @@ #define XIO_MESSENGER_H #include "msg/SimplePolicyMessenger.h" + +#include + extern "C" { #include "libxio.h" } + #include "XioConnection.h" #include "XioPortal.h" #include "QueueStrategy.h" -#include "include/atomic.h" #include "common/Thread.h" #include "common/Mutex.h" #include "include/Spinlock.h" @@ -41,9 +44,9 @@ protected: class XioMessenger : public SimplePolicyMessenger, XioInit { private: - static atomic_t nInstances; - atomic_t nsessions; - atomic_t shutdown_called; + static std::atomic nInstances = { 0 }; + std::atomic nsessions = { 0 }; + std::atomic shutdown_called = { false }; Spinlock conns_sp; XioConnection::ConnList conns_list; XioConnection::EntitySet conns_entity_map; diff --git a/src/msg/xio/XioMsg.h b/src/msg/xio/XioMsg.h index 73d210df2da0..f85950ebc402 100644 --- a/src/msg/xio/XioMsg.h +++ b/src/msg/xio/XioMsg.h @@ -201,10 +201,10 @@ public: xcon->get(); } - XioSend* get() { nrefs.inc(); return this; }; + XioSend* get() { nrefs++; return this; }; void put(int n) { - int refs = nrefs.sub(n); + int refs = nrefs -= n; if (refs == 0) { struct xio_reg_mem *mp = &this->mp_this; this->~XioSend(); @@ -228,7 +228,7 @@ public: private: xio_msg_ex req_0; struct xio_reg_mem mp_this; - atomic_t nrefs; + std::atomic nrefs = { 0 }; }; class XioCommand : public XioSend @@ -316,7 +316,7 @@ private: XioConnection *xcon; XioInSeq msg_seq; XioPool rsp_pool; - atomic_t nrefs; + std::atomic nrefs { 1 }; bool cl_flag; friend class XioConnection; friend class XioMessenger; @@ -329,7 +329,6 @@ public: xcon(_xcon->get()), msg_seq(_msg_seq), rsp_pool(xio_msgr_noreg_mpool), - nrefs(1), cl_flag(false), mp_this(_mp) { @@ -348,11 +347,11 @@ public: int release_msgs(); XioDispatchHook* get() { - nrefs.inc(); return this; + nrefs++; return this; } void put(int n = 1) { - int refs = nrefs.sub(n); + int refs = nrefs -= n; if (refs == 0) { /* in Marcus' new system, refs reaches 0 twice: once in * Message lifecycle, and again after xio_release_msg. diff --git a/src/msg/xio/XioPool.h b/src/msg/xio/XioPool.h index f7c950fb434d..6084ce856824 100644 --- a/src/msg/xio/XioPool.h +++ b/src/msg/xio/XioPool.h @@ -14,23 +14,22 @@ #ifndef XIO_POOL_H #define XIO_POOL_H +#include +#include +#include +#include +#include + extern "C" { -#include -#include -#include #include "libxio.h" } -#include -#include "include/atomic.h" -#include "common/likely.h" +#include "common/likely.h" static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size, struct xio_reg_mem* mp); static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp); -using ceph::atomic_t; - class XioPool { private: @@ -95,84 +94,77 @@ private: NUM_SLABS, }; - atomic_t ctr_set[NUM_SLABS]; - - atomic_t msg_cnt; // send msgs - atomic_t hook_cnt; // recv msgs + std::atomic ctr_set[NUM_SLABS] = {}; + std::atomic msg_cnt = { 0 }; // send msgs + std::atomic hook_cnt = { 0 }; // recv msgs public: - XioPoolStats() : msg_cnt(0), hook_cnt(0) { - for (int ix = 0; ix < NUM_SLABS; ++ix) { - ctr_set[ix].set(0); - } - } - void dump(const char* tag, uint64_t serial); void inc(uint64_t size) { if (size <= 64) { - (ctr_set[SLAB_64]).inc(); + (ctr_set[SLAB_64])++; return; } if (size <= 256) { - (ctr_set[SLAB_256]).inc(); + (ctr_set[SLAB_256])++; return; } if (size <= 1024) { - (ctr_set[SLAB_1024]).inc(); + (ctr_set[SLAB_1024])++; return; } if (size <= 8192) { - (ctr_set[SLAB_PAGE]).inc(); + (ctr_set[SLAB_PAGE])++; return; } - (ctr_set[SLAB_MAX]).inc(); + (ctr_set[SLAB_MAX])++; } void dec(uint64_t size) { if (size <= 64) { - (ctr_set[SLAB_64]).dec(); + (ctr_set[SLAB_64])--; return; } if (size <= 256) { - (ctr_set[SLAB_256]).dec(); + (ctr_set[SLAB_256])--; return; } if (size <= 1024) { - (ctr_set[SLAB_1024]).dec(); + (ctr_set[SLAB_1024])--; return; } if (size <= 8192) { - (ctr_set[SLAB_PAGE]).dec(); + (ctr_set[SLAB_PAGE])--; return; } - (ctr_set[SLAB_MAX]).dec(); + (ctr_set[SLAB_MAX])--; } - void inc_overflow() { ctr_set[SLAB_OVERFLOW].inc(); } - void dec_overflow() { ctr_set[SLAB_OVERFLOW].dec(); } + void inc_overflow() { ctr_set[SLAB_OVERFLOW]++; } + void dec_overflow() { ctr_set[SLAB_OVERFLOW]--; } void inc_msgcnt() { if (unlikely(XioPool::trace_msgcnt)) { - msg_cnt.inc(); + msg_cnt++; } } void dec_msgcnt() { if (unlikely(XioPool::trace_msgcnt)) { - msg_cnt.dec(); + msg_cnt--; } } void inc_hookcnt() { if (unlikely(XioPool::trace_msgcnt)) { - hook_cnt.inc(); + hook_cnt++; } } void dec_hookcnt() { if (unlikely(XioPool::trace_msgcnt)) { - hook_cnt.dec(); + hook_cnt--; } } }; @@ -186,7 +178,7 @@ static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size, int r = xio_mempool_alloc(pool, size, mp); if (r == 0) { if (unlikely(XioPool::trace_mempool)) - xp_stats.inc(size); + xp_stats += size; return 0; } // fall back to malloc on errors @@ -202,7 +194,7 @@ static inline void xpool_free(uint64_t size, struct xio_reg_mem* mp) { if (mp->length) { if (unlikely(XioPool::trace_mempool)) - xp_stats.dec(size); + xp_stats -= size; xio_mempool_free(mp); } else { // from malloc if (unlikely(XioPool::trace_mempool)) -- 2.47.3