#include "Finisher.h"
+#include "common/debug.h"
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
#define dout_subsys ceph_subsys_finisher
#undef dout_prefix
#define dout_prefix *_dout << "finisher(" << this << ") "
#ifndef CEPH_FINISHER_H
#define CEPH_FINISHER_H
+#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/perf_counters.h"
bool healthy = true;
time_t was;
- was = h->timeout.read();
+ was = h->timeout;
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had timed out after " << h->grace << dendl;
healthy = false;
}
- was = h->suicide_timeout.read();
+ was = h->suicide_timeout;
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had suicide timed out after " << h->suicide_grace << dendl;
time_t now = time(NULL);
_check(h, "reset_timeout", now);
- h->timeout.set(now + grace);
+ h->timeout = now + grace;
h->grace = grace;
if (suicide_grace)
- h->suicide_timeout.set(now + suicide_grace);
+ h->suicide_timeout = now + suicide_grace;
else
- h->suicide_timeout.set(0);
+ h->suicide_timeout = 0;
h->suicide_grace = suicide_grace;
}
ldout(m_cct, 20) << "clear_timeout '" << h->name << "'" << dendl;
time_t now = time(NULL);
_check(h, "clear_timeout", now);
- h->timeout.set(0);
- h->suicide_timeout.set(0);
+ h->timeout = 0;
+ h->suicide_timeout = 0;
}
bool HeartbeatMap::is_healthy()
}
m_rwlock.put_read();
- m_unhealthy_workers.set(unhealthy);
- m_total_workers.set(total);
+ m_unhealthy_workers = unhealthy;
+ m_total_workers = total;
ldout(m_cct, 20) << "is_healthy = " << (healthy ? "healthy" : "NOT HEALTHY")
<< ", total workers: " << total << ", number of unhealthy: " << unhealthy << dendl;
int HeartbeatMap::get_unhealthy_workers() const
{
- return m_unhealthy_workers.read();
+ return m_unhealthy_workers;
}
int HeartbeatMap::get_total_workers() const
{
- return m_total_workers.read();
+ return m_total_workers;
}
void HeartbeatMap::check_touch_file()
#define CEPH_HEARTBEATMAP_H
#include <list>
+#include <atomic>
+#include <string>
+
+#include <pthread.h>
-#include "include/atomic.h"
#include "RWLock.h"
class CephContext;
struct heartbeat_handle_d {
const std::string name;
pthread_t thread_id;
- atomic_t timeout, suicide_timeout;
+ std::atomic<unsigned> timeout = { 0 }, suicide_timeout = { 0 };
time_t grace, suicide_grace;
std::list<heartbeat_handle_d*>::iterator list_item;
RWLock m_rwlock;
time_t m_inject_unhealthy_until;
std::list<heartbeat_handle_d*> m_workers;
- atomic_t m_unhealthy_workers;
- atomic_t m_total_workers;
+ std::atomic<unsigned> m_unhealthy_workers = { 0 };
+ std::atomic<unsigned> m_total_workers = { 0 };
bool _check(const heartbeat_handle_d *h, const char *who, time_t now);
};
#include <poll.h>
#include <sys/un.h>
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
#define dout_subsys ceph_subsys_asok
#undef dout_prefix
#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
#ifndef QUEUE_RING_H
#define QUEUE_RING_H
-#include <list>
-#include <vector>
#include "common/Mutex.h"
#include "common/Cond.h"
-
+#include <list>
+#include <atomic>
+#include <vector>
template <class T>
class QueueRing {
std::vector<QueueBucket> buckets;
int num_buckets;
- atomic_t cur_read_bucket;
- atomic_t cur_write_bucket;
+
+ std::atomic<int64_t> cur_read_bucket = { 0 };
+ std::atomic<int64_t> cur_write_bucket = { 0 };
+
public:
QueueRing(int n) : buckets(n), num_buckets(n) {
}
void enqueue(const T& entry) {
- buckets[cur_write_bucket.inc() % num_buckets].enqueue(entry);
+ buckets[++cur_write_bucket % num_buckets].enqueue(entry);
};
void dequeue(T *entry) {
- buckets[cur_read_bucket.inc() % num_buckets].dequeue(entry);
+ buckets[++cur_read_bucket % num_buckets].dequeue(entry);
}
};
#include <string>
#include <include/assert.h>
#include "lockdep.h"
-#include "include/atomic.h"
#include "common/valgrind.h"
+#include <atomic>
+
class RWLock final
{
mutable pthread_rwlock_t L;
std::string name;
mutable int id;
- mutable atomic_t nrlock, nwlock;
+ mutable std::atomic<unsigned> nrlock = { 0 }, nwlock = { 0 };
bool track, lockdep;
std::string unique_name(const char* name) const;
const RWLock& operator=(const RWLock& other) = delete;
RWLock(const std::string &n, bool track_lock=true, bool ld=true, bool prioritize_write=false)
- : name(n), id(-1), nrlock(0), nwlock(0), track(track_lock),
+ : name(n), id(-1), track(track_lock),
lockdep(ld) {
#if defined(PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
if (prioritize_write) {
bool is_locked() const {
assert(track);
- return (nrlock.read() > 0) || (nwlock.read() > 0);
+ return (nrlock > 0) || (nwlock > 0);
}
bool is_wlocked() const {
assert(track);
- return (nwlock.read() > 0);
+ return (nwlock > 0);
}
~RWLock() {
// The following check is racy but we are about to destroy
void unlock(bool lockdep=true) const {
if (track) {
- if (nwlock.read() > 0) {
- nwlock.dec();
+ if (nwlock > 0) {
+ nwlock--;
} else {
- assert(nrlock.read() > 0);
- nrlock.dec();
+ assert(nrlock > 0);
+ nrlock--;
}
}
if (lockdep && this->lockdep && g_lockdep)
assert(r == 0);
if (lockdep && g_lockdep) id = lockdep_locked(name.c_str(), id);
if (track)
- nrlock.inc();
+ nrlock++;
}
bool try_get_read() const {
if (pthread_rwlock_tryrdlock(&L) == 0) {
if (track)
- nrlock.inc();
+ nrlock++;
if (lockdep && g_lockdep) id = lockdep_locked(name.c_str(), id);
return true;
}
if (lockdep && this->lockdep && g_lockdep)
id = lockdep_locked(name.c_str(), id);
if (track)
- nwlock.inc();
+ nwlock++;
}
bool try_get_write(bool lockdep=true) {
if (lockdep && this->lockdep && g_lockdep)
id = lockdep_locked(name.c_str(), id);
if (track)
- nwlock.inc();
+ nwlock++;
return true;
}
return false;
#include "common/Mutex.h"
#include "common/Cond.h"
+#include "common/ceph_context.h"
#include "common/valgrind.h"
// re-include our assert to clobber the system one; fix dout:
struct RefCountedObject {
private:
- mutable atomic_t nref;
+ mutable std::atomic<uint64_t> nref;
CephContext *cct;
public:
RefCountedObject(CephContext *c = NULL, int n=1) : nref(n), cct(c) {}
virtual ~RefCountedObject() {
- assert(nref.read() == 0);
+ assert(nref == 0);
}
const RefCountedObject *get() const {
- int v = nref.inc();
+ int v = ++nref;
if (cct)
lsubdout(cct, refs, 1) << "RefCountedObject::get " << this << " "
<< (v - 1) << " -> " << v
return this;
}
RefCountedObject *get() {
- int v = nref.inc();
+ int v = ++nref;
if (cct)
lsubdout(cct, refs, 1) << "RefCountedObject::get " << this << " "
<< (v - 1) << " -> " << v
}
void put() const {
CephContext *local_cct = cct;
- int v = nref.dec();
+ int v = --nref;
if (v == 0) {
ANNOTATE_HAPPENS_AFTER(&nref);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&nref);
}
uint64_t get_nref() const {
- return nref.read();
+ return nref;
}
};
*
*/
struct RefCountedWaitObject {
- atomic_t nref;
+ std::atomic<uint64_t> nref = { 1 };
RefCountedCond *c;
- RefCountedWaitObject() : nref(1) {
+ RefCountedWaitObject() {
c = new RefCountedCond;
}
virtual ~RefCountedWaitObject() {
}
RefCountedWaitObject *get() {
- nref.inc();
+ nref++;
return this;
}
bool ret = false;
RefCountedCond *cond = c;
cond->get();
- if (nref.dec() == 0) {
+ if (--nref == 0) {
cond->done();
delete this;
ret = true;
RefCountedCond *cond = c;
cond->get();
- if (nref.dec() == 0) {
+ if (--nref == 0) {
cond->done();
delete this;
} else {
#include "common/Throttle.h"
#include "common/perf_counters.h"
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
#define dout_subsys ceph_subsys_throttle
#undef dout_prefix
logger = b.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
- logger->set(l_throttle_max, max.read());
+ logger->set(l_throttle_max, max);
}
}
void Throttle::_reset_max(int64_t m)
{
assert(lock.is_locked());
- if ((int64_t)max.read() == m)
+ if (static_cast<int64_t>(max) == m)
return;
if (!cond.empty())
cond.front()->SignalOne();
if (logger)
logger->set(l_throttle_max, m);
- max.set((size_t)m);
+ max = m;
}
bool Throttle::_wait(int64_t c)
bool Throttle::wait(int64_t m)
{
- if (0 == max.read() && 0 == m) {
+ if (0 == max && 0 == m) {
return false;
}
int64_t Throttle::take(int64_t c)
{
- if (0 == max.read()) {
+ if (0 == max) {
return 0;
}
assert(c >= 0);
ldout(cct, 10) << "take " << c << dendl;
{
Mutex::Locker l(lock);
- count.add(c);
+ count += c;
}
if (logger) {
logger->inc(l_throttle_take);
logger->inc(l_throttle_take_sum, c);
- logger->set(l_throttle_val, count.read());
+ logger->set(l_throttle_val, count);
}
- return count.read();
+ return count;
}
bool Throttle::get(int64_t c, int64_t m)
{
- if (0 == max.read() && 0 == m) {
+ if (0 == max && 0 == m) {
return false;
}
assert(c >= 0);
- ldout(cct, 10) << "get " << c << " (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
+ ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
if (logger) {
logger->inc(l_throttle_get_started);
}
_reset_max(m);
}
waited = _wait(c);
- count.add(c);
+ count += c;
}
if (logger) {
logger->inc(l_throttle_get);
logger->inc(l_throttle_get_sum, c);
- logger->set(l_throttle_val, count.read());
+ logger->set(l_throttle_val, count);
}
return waited;
}
*/
bool Throttle::get_or_fail(int64_t c)
{
- if (0 == max.read()) {
+ if (0 == max) {
return true;
}
}
return false;
} else {
- ldout(cct, 10) << "get_or_fail " << c << " success (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
- count.add(c);
+ ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
+ count += c;
if (logger) {
logger->inc(l_throttle_get_or_fail_success);
logger->inc(l_throttle_get);
logger->inc(l_throttle_get_sum, c);
- logger->set(l_throttle_val, count.read());
+ logger->set(l_throttle_val, count);
}
return true;
}
int64_t Throttle::put(int64_t c)
{
- if (0 == max.read()) {
+ if (0 == max) {
return 0;
}
assert(c >= 0);
- ldout(cct, 10) << "put " << c << " (" << count.read() << " -> " << (count.read()-c) << ")" << dendl;
+ ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
Mutex::Locker l(lock);
if (c) {
if (!cond.empty())
cond.front()->SignalOne();
- assert(((int64_t)count.read()) >= c); //if count goes negative, we failed somewhere!
- count.sub(c);
+ assert(static_cast<int64_t>(count) >= c); // if count goes negative, we failed somewhere!
+ count -= c;
if (logger) {
logger->inc(l_throttle_put);
logger->inc(l_throttle_put_sum, c);
- logger->set(l_throttle_val, count.read());
+ logger->set(l_throttle_val, count);
}
}
- return count.read();
+ return count;
}
void Throttle::reset()
Mutex::Locker l(lock);
if (!cond.empty())
cond.front()->SignalOne();
- count.set(0);
+ count = 0;
if (logger) {
logger->set(l_throttle_val, 0);
}
#ifndef CEPH_THROTTLE_H
#define CEPH_THROTTLE_H
-#include "Cond.h"
+#include <map>
+#include <list>
+#include <chrono>
+#include <atomic>
+#include <iostream>
#include <condition_variable>
+#include "Cond.h"
+#include "include/Context.h"
+
class CephContext;
class PerfCounters;
CephContext *cct;
const std::string name;
PerfCounters *logger;
- ceph::atomic_t count, max;
+ std::atomic<unsigned> count = { 0 }, max = { 0 };
Mutex lock;
list<Cond*> cond;
const bool use_perf;
private:
void _reset_max(int64_t m);
bool _should_wait(int64_t c) const {
- int64_t m = max.read();
- int64_t cur = count.read();
+ int64_t m = max;
+ int64_t cur = count;
return
m &&
((c <= m && cur + c > m) || // normally stay under max
* @returns the number of taken slots
*/
int64_t get_current() const {
- return count.read();
+ return count;
}
/**
* get the max number of slots
* @returns the max number of slots
*/
- int64_t get_max() const { return max.read(); }
+ int64_t get_max() const { return max; }
/**
* return true if past midpoint
*/
bool past_midpoint() const {
- return count.read() >= max.read() / 2;
+ return count >= max / 2;
}
/**
if (!tracking_enabled)
return false;
- uint64_t current_seq = seq.inc();
+ uint64_t current_seq = ++seq;
uint32_t shard_index = current_seq % num_optracker_shards;
ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
assert(NULL != sdata);
struct ShardedTrackingData;
class OpTracker {
friend class OpHistory;
- atomic64_t seq;
+ std::atomic<int64_t> seq = { 0 };
vector<ShardedTrackingData*> sharded_in_flight_list;
uint32_t num_optracker_shards;
OpHistory history;
lockname(name + "::lock"),
shardedpool_lock(lockname.c_str()),
num_threads(pnum_threads),
- stop_threads(0),
- pause_threads(0),
- drain_threads(0),
num_paused(0),
num_drained(0),
wq(NULL) {}
ss << name << " thread " << name;
heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
- while (!stop_threads.read()) {
- if(pause_threads.read()) {
+ while (!stop_threads) {
+ if (pause_threads) {
shardedpool_lock.Lock();
++num_paused;
wait_cond.Signal();
- while(pause_threads.read()) {
+ while (pause_threads) {
cct->get_heartbeat_map()->reset_timeout(
- hb,
- wq->timeout_interval, wq->suicide_interval);
+ hb,
+ wq->timeout_interval, wq->suicide_interval);
shardedpool_cond.WaitInterval(shardedpool_lock,
- utime_t(
+ utime_t(
cct->_conf->threadpool_empty_queue_max_wait, 0));
}
--num_paused;
shardedpool_lock.Unlock();
}
- if (drain_threads.read()) {
+ if (drain_threads) {
shardedpool_lock.Lock();
if (wq->is_shard_empty(thread_index)) {
++num_drained;
wait_cond.Signal();
- while (drain_threads.read()) {
+ while (drain_threads) {
cct->get_heartbeat_map()->reset_timeout(
hb,
wq->timeout_interval, wq->suicide_interval);
void ShardedThreadPool::stop()
{
ldout(cct,10) << "stop" << dendl;
- stop_threads.set(1);
+ stop_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
{
ldout(cct,10) << "pause" << dendl;
shardedpool_lock.Lock();
- pause_threads.set(1);
+ pause_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_paused){
{
ldout(cct,10) << "pause_new" << dendl;
shardedpool_lock.Lock();
- pause_threads.set(1);
+ pause_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
shardedpool_lock.Unlock();
{
ldout(cct,10) << "unpause" << dendl;
shardedpool_lock.Lock();
- pause_threads.set(0);
+ pause_threads = false;
shardedpool_cond.Signal();
shardedpool_lock.Unlock();
ldout(cct,10) << "unpaused" << dendl;
{
ldout(cct,10) << "drain" << dendl;
shardedpool_lock.Lock();
- drain_threads.set(1);
+ drain_threads = true;
assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_drained) {
wait_cond.Wait(shardedpool_lock);
}
- drain_threads.set(0);
+ drain_threads = false;
shardedpool_cond.Signal();
shardedpool_lock.Unlock();
ldout(cct,10) << "drained" << dendl;
#include "include/unordered_map.h"
#include "common/HeartbeatMap.h"
+#include <atomic>
+
class CephContext;
/// Pool of threads that share work submitted to multiple work queues.
Cond shardedpool_cond;
Cond wait_cond;
uint32_t num_threads;
- atomic_t stop_threads;
- atomic_t pause_threads;
- atomic_t drain_threads;
+
+ std::atomic<bool> stop_threads = { false };
+ std::atomic<bool> pause_threads = { false };
+ std::atomic<bool> drain_threads = { false };
+
uint32_t num_paused;
uint32_t num_drained;
#include <poll.h>
#include <sys/un.h>
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
#define dout_subsys ceph_subsys_asok
#undef dout_prefix
#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
*
*/
+#include <atomic>
+#include <errno.h>
+#include <limits.h>
+
+#include <sys/uio.h>
+
#include "include/compat.h"
#include "include/mempool.h"
#include "armor.h"
#include "msg/xio/XioMsg.h"
#endif
+using namespace ceph;
+
#define CEPH_BUFFER_ALLOC_UNIT (MIN(CEPH_PAGE_SIZE, 4096))
#define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))
# define bendl std::endl; }
#endif
- static atomic_t buffer_total_alloc;
- static atomic64_t buffer_history_alloc_bytes;
- static atomic64_t buffer_history_alloc_num;
+ static std::atomic<uint64_t> buffer_total_alloc { 0 };
+ static std::atomic<uint64_t> buffer_history_alloc_bytes { 0 };
+ static std::atomic<uint64_t> buffer_history_alloc_num { 0 };
+
const bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK");
namespace {
void inc_total_alloc(unsigned len) {
if (buffer_track_alloc)
- buffer_total_alloc.add(len);
+ buffer_total_alloc += len;
}
void dec_total_alloc(unsigned len) {
if (buffer_track_alloc)
- buffer_total_alloc.sub(len);
+ buffer_total_alloc -= len;
}
void inc_history_alloc(uint64_t len) {
if (buffer_track_alloc) {
- buffer_history_alloc_bytes.add(len);
- buffer_history_alloc_num.inc();
+ buffer_history_alloc_bytes += len;
+ buffer_history_alloc_num++;
}
}
- }
-
+ } // namespace
int buffer::get_total_alloc() {
- return buffer_total_alloc.read();
+ return buffer_total_alloc;
}
uint64_t buffer::get_history_alloc_bytes() {
- return buffer_history_alloc_bytes.read();
+ return buffer_history_alloc_bytes;
}
uint64_t buffer::get_history_alloc_num() {
- return buffer_history_alloc_num.read();
+ return buffer_history_alloc_num;
}
- static atomic_t buffer_cached_crc;
- static atomic_t buffer_cached_crc_adjusted;
- static atomic_t buffer_missed_crc;
+ static std::atomic<unsigned> buffer_cached_crc { 0 };
+ static std::atomic<unsigned> buffer_cached_crc_adjusted { 0 };
+ static std::atomic<unsigned> buffer_missed_crc { 0 };
+
static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK");
void buffer::track_cached_crc(bool b) {
buffer_track_crc = b;
}
int buffer::get_cached_crc() {
- return buffer_cached_crc.read();
+ return buffer_cached_crc;
}
int buffer::get_cached_crc_adjusted() {
- return buffer_cached_crc_adjusted.read();
+ return buffer_cached_crc_adjusted;
}
int buffer::get_missed_crc() {
- return buffer_missed_crc.read();
+ return buffer_missed_crc;
}
- static atomic_t buffer_c_str_accesses;
+ static std::atomic<unsigned> buffer_c_str_accesses { 0 };
+
static bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK");
void buffer::track_c_str(bool b) {
buffer_track_c_str = b;
}
int buffer::get_c_str_accesses() {
- return buffer_c_str_accesses.read();
+ return buffer_c_str_accesses;
}
- static atomic_t buffer_max_pipe_size;
+ static std::atomic<unsigned> buffer_max_pipe_size { 0 };
int update_max_pipe_size() {
#ifdef CEPH_HAVE_SETPIPE_SZ
char buf[32];
size_t size = strict_strtol(buf, 10, &err);
if (!err.empty())
return -EIO;
- buffer_max_pipe_size.set(size);
+ buffer_max_pipe_size = size;
#endif
return 0;
}
size_t get_max_pipe_size() {
#ifdef CEPH_HAVE_SETPIPE_SZ
- size_t size = buffer_max_pipe_size.read();
+ size_t size = buffer_max_pipe_size;
if (size)
return size;
if (update_max_pipe_size() == 0)
- return buffer_max_pipe_size.read();
+ return buffer_max_pipe_size;
#endif
// this is the max size hardcoded in linux before 2.6.35
return 65536;
public:
char *data;
unsigned len;
- atomic_t nref;
+ std::atomic<unsigned> nref { 0 };
int mempool = mempool::mempool_buffer_anon;
mutable std::atomic_flag crc_spinlock = ATOMIC_FLAG_INIT;
buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw.
{
- r->nref.inc();
+ r->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
{
_raw = create(l);
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l) // ditto.
{
_raw = copy(d, l);
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len)
{
if (_raw) {
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
}
{
assert(o+l <= p._len);
assert(_raw);
- _raw->nref.inc();
+ _raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::ptr& buffer::ptr::operator= (const ptr& p)
{
if (p._raw) {
- p._raw->nref.inc();
+ p._raw->nref++;
bdout << "ptr " << this << " get " << _raw << bendl;
}
buffer::raw *raw = p._raw;
if (_raw && !_raw->is_shareable()) {
buffer::raw *tr = _raw;
_raw = tr->clone();
- _raw->nref.set(1);
- if (unlikely(tr->nref.dec() == 0)) {
+ _raw->nref = 1;
+ if (unlikely(--tr->nref == 0)) {
ANNOTATE_HAPPENS_AFTER(&tr->nref);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&tr->nref);
delete tr;
{
if (_raw) {
bdout << "ptr " << this << " release " << _raw << bendl;
- if (_raw->nref.dec() == 0) {
+ if (--_raw->nref == 0) {
//cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
ANNOTATE_HAPPENS_AFTER(&_raw->nref);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
const char *buffer::ptr::c_str() const {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off;
}
char *buffer::ptr::c_str() {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off;
}
const char *buffer::ptr::end_c_str() const {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off + _len;
}
char *buffer::ptr::end_c_str() {
assert(_raw);
if (buffer_track_c_str)
- buffer_c_str_accesses.inc();
+ buffer_c_str_accesses++;
return _raw->get_data() + _off + _len;
}
const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; }
unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; }
- int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref.read(); }
+ int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref; }
void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
assert(_raw);
// got it already
crc = ccrc.second;
if (buffer_track_crc)
- buffer_cached_crc.inc();
+ buffer_cached_crc++;
} else {
/* If we have cached crc32c(buf, v) for initial value v,
* we can convert this to a different initial value v' by:
*/
crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length());
if (buffer_track_crc)
- buffer_cached_crc_adjusted.inc();
+ buffer_cached_crc_adjusted++;
}
} else {
if (buffer_track_crc)
- buffer_missed_crc.inc();
+ buffer_missed_crc++;
uint32_t base = crc;
crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length());
r->set_crc(ofs, make_pair(base, crc));
}
std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
- return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.read() << ")";
+ return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
}
std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
if (!(data.type & PERFCOUNTER_U64))
return;
if (data.type & PERFCOUNTER_LONGRUNAVG) {
- data.avgcount.inc();
- data.u64.add(amt);
- data.avgcount2.inc();
+ data.avgcount++;
+ data.u64 += amt;
+ data.avgcount2++;
} else {
- data.u64.add(amt);
+ data.u64 += amt;
}
}
assert(!(data.type & PERFCOUNTER_LONGRUNAVG));
if (!(data.type & PERFCOUNTER_U64))
return;
- data.u64.sub(amt);
+ data.u64 -= amt;
}
void PerfCounters::set(int idx, uint64_t amt)
ANNOTATE_BENIGN_RACE_SIZED(&data.u64, sizeof(data.u64),
"perf counter atomic");
if (data.type & PERFCOUNTER_LONGRUNAVG) {
- data.avgcount.inc();
- data.u64.set(amt);
- data.avgcount2.inc();
+ data.avgcount++;
+ data.u64 = amt;
+ data.avgcount2++;
} else {
- data.u64.set(amt);
+ data.u64 = amt;
}
}
const perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
if (!(data.type & PERFCOUNTER_U64))
return 0;
- return data.u64.read();
+ return data.u64;
}
void PerfCounters::tinc(int idx, utime_t amt, uint32_t avgcount)
if (!(data.type & PERFCOUNTER_TIME))
return;
if (data.type & PERFCOUNTER_LONGRUNAVG) {
- data.avgcount.add(avgcount);
- data.u64.add(amt.to_nsec());
- data.avgcount2.add(avgcount);
+ data.avgcount++;
+ data.u64 += amt.to_nsec();
+ data.avgcount2++;
} else {
- data.u64.add(amt.to_nsec());
+ data.u64 += amt.to_nsec();
}
}
if (!(data.type & PERFCOUNTER_TIME))
return;
if (data.type & PERFCOUNTER_LONGRUNAVG) {
- data.avgcount.add(avgcount);
- data.u64.add(amt.count());
- data.avgcount2.add(avgcount);
+ data.avgcount++;
+ data.u64 += amt.count();
+ data.avgcount2++;
} else {
- data.u64.add(amt.count());
+ data.u64 += amt.count();
}
}
perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
if (!(data.type & PERFCOUNTER_TIME))
return;
- data.u64.set(amt.to_nsec());
+ data.u64 = amt.to_nsec();
if (data.type & PERFCOUNTER_LONGRUNAVG)
ceph_abort();
}
const perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
if (!(data.type & PERFCOUNTER_TIME))
return utime_t();
- uint64_t v = data.u64.read();
+ uint64_t v = data.u64;
return utime_t(v / 1000000000ull, v % 1000000000ull);
}
d->histogram->dump_formatted(f);
f->close_section();
} else {
- uint64_t v = d->u64.read();
+ uint64_t v = d->u64;
if (d->type & PERFCOUNTER_U64) {
f->dump_unsigned(d->name, v);
} else if (d->type & PERFCOUNTER_TIME) {
#ifndef CEPH_COMMON_PERF_COUNTERS_H
#define CEPH_COMMON_PERF_COUNTERS_H
+#include <string>
+#include <vector>
+#include <memory>
+#include <atomic>
+#include <cstdint>
+
#include "common/perf_histogram.h"
#include "include/utime.h"
#include "common/Mutex.h"
+#include "common/ceph_time.h"
class CephContext;
class PerfCountersBuilder;
: name(NULL),
description(NULL),
nick(NULL),
- type(PERFCOUNTER_NONE),
- u64(0),
- avgcount(0),
- avgcount2(0)
+ type(PERFCOUNTER_NONE)
{}
perf_counter_data_any_d(const perf_counter_data_any_d& other)
: name(other.name),
description(other.description),
nick(other.nick),
type(other.type),
- u64(other.u64.read()) {
+ u64(other.u64.load()) {
pair<uint64_t,uint64_t> a = other.read_avg();
- u64.set(a.first);
- avgcount.set(a.second);
- avgcount2.set(a.second);
+ u64 = a.first;
+ avgcount = a.second;
+ avgcount2 = a.second;
if (other.histogram) {
histogram.reset(new PerfHistogram<>(*other.histogram));
}
const char *nick;
int prio = 0;
enum perfcounter_type_d type;
- atomic64_t u64;
- atomic64_t avgcount;
- atomic64_t avgcount2;
+ std::atomic<uint64_t> u64 = { 0 };
+ std::atomic<uint64_t> avgcount = { 0 };
+ std::atomic<uint64_t> avgcount2 = { 0 };
std::unique_ptr<PerfHistogram<>> histogram;
void reset()
{
if (type != PERFCOUNTER_U64) {
- u64.set(0);
- avgcount.set(0);
- avgcount2.set(0);
+ u64 = 0;
+ avgcount = 0;
+ avgcount2 = 0;
}
if (histogram) {
histogram->reset();
pair<uint64_t,uint64_t> read_avg() const {
uint64_t sum, count;
do {
- count = avgcount2.read();
- sum = u64.read();
- } while (avgcount.read() != count);
+ count = avgcount;
+ sum = u64;
+ } while (avgcount2 != count);
return make_pair(sum, count);
}
};
#ifndef CEPH_COMMON_PERF_HISTOGRAM_H
#define CEPH_COMMON_PERF_HISTOGRAM_H
-#include "common/Formatter.h"
-#include "include/atomic.h"
-
#include <array>
+#include <atomic>
#include <memory>
+#include <cassert>
+
+#include "common/Formatter.h"
+#include "include/int_types.h"
class PerfHistogramCommon {
public:
m_axes_config[i++] = ac;
}
- m_rawData.reset(new atomic64_t[get_raw_size()]);
+ m_rawData.reset(new std::atomic<uint64_t>[get_raw_size()] {});
}
/// Copy from other histogram object
PerfHistogram(const PerfHistogram &other)
: m_axes_config(other.m_axes_config) {
int64_t size = get_raw_size();
- m_rawData.reset(new atomic64_t[size]);
+ m_rawData.reset(new std::atomic<uint64_t>[size] {});
for (int64_t i = 0; i < size; i++) {
- m_rawData[i].set(other.m_rawData[i].read());
+ m_rawData[i] = other.m_rawData[i].load();
}
}
void reset() {
auto size = get_raw_size();
for (auto i = size; --i >= 0;) {
- m_rawData[i].set(0);
+ m_rawData[i] = 0;
}
}
template <typename... T>
void inc(T... axis) {
auto index = get_raw_index_for_value(axis...);
- m_rawData[index].add(1);
+ m_rawData[index]++;
}
/// Increase counter for given axis buckets by one
template <typename... T>
void inc_bucket(T... bucket) {
auto index = get_raw_index_for_bucket(bucket...);
- m_rawData[index].add(1);
+ m_rawData[index]++;
}
/// Read value from given bucket
template <typename... T>
uint64_t read_bucket(T... bucket) const {
auto index = get_raw_index_for_bucket(bucket...);
- return m_rawData[index].read();
+ return m_rawData[index];
}
/// Dump data to a Formatter object
protected:
/// Raw data stored as linear space, internal indexes are calculated on
/// demand.
- std::unique_ptr<atomic64_t[]> m_rawData;
+ std::unique_ptr<std::atomic<uint64_t>[]> m_rawData;
/// Configuration of axes
std::array<axis_config_d, DIM> m_axes_config;
void visit_values(FDE onDimensionEnter, FV onValue, FDL onDimensionLeave,
int level = 0, int startIndex = 0) const {
if (level == DIM) {
- onValue(m_rawData[startIndex].read());
+ onValue(m_rawData[startIndex]);
return;
}
#include "common/Cond.h"
#include "include/unordered_map.h"
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
template <class K, class V, class C = std::less<K>, class H = std::hash<K> >
class SharedLRU {
CephContext *cct;
#include <set>
#include <map>
#include <string>
-#include "include/memory.h"
-#include <errno.h>
+#include <cerrno>
+
using std::string;
+
+#include "include/memory.h"
+
#include "common/debug.h"
#include "common/perf_counters.h"
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
#define dout_context cct
#define dout_subsys ceph_subsys_leveldb
#undef dout_prefix
#include "common/ceph_context.h"
+// reinclude our assert to clobber the system one
+# include "include/assert.h"
+
class PerfCounters;
enum {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 OVH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_PERF_HISTOGRAM_H
+#define CEPH_COMMON_PERF_HISTOGRAM_H
+
+#include "common/Formatter.h"
+#include "include/int_types.h"
+
+#include <array>
+#include <atomic>
+#include <memory>
+#include <cassert>
+
+class PerfHistogramCommon {
+public:
+ enum scale_type_d : uint8_t {
+ SCALE_LINEAR = 1,
+ SCALE_LOG2 = 2,
+ };
+
+ struct axis_config_d {
+ const char *m_name = nullptr;
+ scale_type_d m_scale_type = SCALE_LINEAR;
+ int64_t m_min = 0;
+ int64_t m_quant_size = 0;
+ int32_t m_buckets = 0;
+ axis_config_d() = default;
+ axis_config_d(const char* name,
+ scale_type_d scale_type,
+ int64_t min,
+ int64_t quant_size,
+ int32_t buckets)
+ : m_name(name),
+ m_scale_type(scale_type),
+ m_min(min),
+ m_quant_size(quant_size),
+ m_buckets(buckets)
+ {}
+ };
+
+protected:
+ /// Dump configuration of one axis to a formatter
+ static void dump_formatted_axis(ceph::Formatter *f, const axis_config_d &ac);
+
+ /// Quantize given value and convert to bucket number on given axis
+ static int64_t get_bucket_for_axis(int64_t value, const axis_config_d &ac);
+
+ /// Calculate inclusive ranges of axis values for each bucket on that axis
+ static std::vector<std::pair<int64_t, int64_t>> get_axis_bucket_ranges(
+ const axis_config_d &ac);
+};
+
+/// PerfHistogram does trace a histogram of input values. It's an extended
+/// version of a standard histogram which does trace characteristics of a single
+/// one value only. In this implementation, values can be traced in multiple
+/// dimensions - i.e. we can create a histogram of input request size (first
+/// dimension) and processing latency (second dimension). Creating standard
+/// histogram out of such multidimensional one is trivial and requires summing
+/// values across dimensions we're not interested in.
+template <int DIM = 2>
+class PerfHistogram : public PerfHistogramCommon {
+public:
+ /// Initialize new histogram object
+ PerfHistogram(std::initializer_list<axis_config_d> axes_config) {
+ assert(axes_config.size() == DIM &&
+ "Invalid number of axis configuration objects");
+
+ int i = 0;
+ for (const auto &ac : axes_config) {
+ assert(ac.m_buckets > 0 && "Must have at least one bucket on axis");
+ assert(ac.m_quant_size > 0 &&
+ "Quantization unit must be non-zero positive integer value");
+
+ m_axes_config[i++] = ac;
+ }
+
+ m_rawData.reset(new std::atomic<uint64_t>[get_raw_size()]);
+ }
+
+ /// Copy from other histogram object
+ PerfHistogram(const PerfHistogram &other)
+ : m_axes_config(other.m_axes_config) {
+ int64_t size = get_raw_size();
+ m_rawData.reset(new std::atomic<uint64_t>[size]);
+ for (int64_t i = 0; i < size; i++) {
+ m_rawData[i] = other.m_rawData[i];
+ }
+ }
+
+ /// Set all histogram values to 0
+ void reset() {
+ auto size = get_raw_size();
+ for (auto i = size; --i >= 0;) {
+ m_rawData[i] = 0;
+ }
+ }
+
+ /// Increase counter for given axis values by one
+ template <typename... T>
+ void inc(T... axis) {
+ auto index = get_raw_index_for_value(axis...);
+ m_rawData[index] += 1;
+ }
+
+ /// Increase counter for given axis buckets by one
+ template <typename... T>
+ void inc_bucket(T... bucket) {
+ auto index = get_raw_index_for_bucket(bucket...);
+ m_rawData[index] += 1;
+ }
+
+ /// Read value from given bucket
+ template <typename... T>
+ uint64_t read_bucket(T... bucket) const {
+ auto index = get_raw_index_for_bucket(bucket...);
+ return m_rawData[index];
+ }
+
+ /// Dump data to a Formatter object
+ void dump_formatted(ceph::Formatter *f) const {
+ // Dump axes configuration
+ f->open_array_section("axes");
+ for (auto &ac : m_axes_config) {
+ dump_formatted_axis(f, ac);
+ }
+ f->close_section();
+
+ // Dump histogram values
+ dump_formatted_values(f);
+ }
+
+protected:
+ /// Raw data stored as linear space, internal indexes are calculated on
+ /// demand.
+ std::unique_ptr<std::atomic<uint64_t>[]> m_rawData;
+
+ /// Configuration of axes
+ std::array<axis_config_d, DIM> m_axes_config;
+
+ /// Dump histogram counters to a formatter
+ void dump_formatted_values(ceph::Formatter *f) const {
+ visit_values([f](int) { f->open_array_section("values"); },
+ [f](int64_t value) { f->dump_unsigned("value", value); },
+ [f](int) { f->close_section(); });
+ }
+
+ /// Get number of all histogram counters
+ int64_t get_raw_size() {
+ int64_t ret = 1;
+ for (const auto &ac : m_axes_config) {
+ ret *= ac.m_buckets;
+ }
+ return ret;
+ }
+
+ /// Calculate m_rawData index from axis values
+ template <typename... T>
+ int64_t get_raw_index_for_value(T... axes) const {
+ static_assert(sizeof...(T) == DIM, "Incorrect number of arguments");
+ return get_raw_index_internal<0>(get_bucket_for_axis, 0, axes...);
+ }
+
+ /// Calculate m_rawData index from axis bucket numbers
+ template <typename... T>
+ int64_t get_raw_index_for_bucket(T... buckets) const {
+ static_assert(sizeof...(T) == DIM, "Incorrect number of arguments");
+ return get_raw_index_internal<0>(
+ [](int64_t bucket, const axis_config_d &ac) {
+ assert(bucket >= 0 && "Bucket index can not be negative");
+ assert(bucket < ac.m_buckets && "Bucket index too large");
+ return bucket;
+ },
+ 0, buckets...);
+ }
+
+ template <int level = 0, typename F, typename... T>
+ int64_t get_raw_index_internal(F bucket_evaluator, int64_t startIndex,
+ int64_t value, T... tail) const {
+ static_assert(level + 1 + sizeof...(T) == DIM,
+ "Internal consistency check");
+ auto &ac = m_axes_config[level];
+ auto bucket = bucket_evaluator(value, ac);
+ return get_raw_index_internal<level + 1>(
+ bucket_evaluator, ac.m_buckets * startIndex + bucket, tail...);
+ }
+
+ template <int level, typename F>
+ int64_t get_raw_index_internal(F, int64_t startIndex) const {
+ static_assert(level == DIM, "Internal consistency check");
+ return startIndex;
+ }
+
+ /// Visit all histogram counters, call onDimensionEnter / onDimensionLeave
+ /// when starting / finishing traversal
+ /// on given axis, call onValue when dumping raw histogram counter value.
+ template <typename FDE, typename FV, typename FDL>
+ void visit_values(FDE onDimensionEnter, FV onValue, FDL onDimensionLeave,
+ int level = 0, int startIndex = 0) const {
+ if (level == DIM) {
+ onValue(m_rawData[startIndex]);
+ return;
+ }
+
+ onDimensionEnter(level);
+ auto &ac = m_axes_config[level];
+ startIndex *= ac.m_buckets;
+ for (int32_t i = 0; i < ac.m_buckets; ++i, ++startIndex) {
+ visit_values(onDimensionEnter, onValue, onDimensionLeave, level + 1,
+ startIndex);
+ }
+ onDimensionLeave(level);
+ }
+};
+
+#endif