]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common: migrate atomic_t to <atomic>
authorJesse Williamson <jwilliamson@suse.de>
Tue, 23 May 2017 11:40:38 +0000 (04:40 -0700)
committerJesse Williamson <jwilliamson@suse.de>
Thu, 1 Jun 2017 22:36:27 +0000 (15:36 -0700)
Signed-off-by: Jesse Williamson <jwilliamson@suse.de>
23 files changed:
src/common/Finisher.cc
src/common/Finisher.h
src/common/HeartbeatMap.cc
src/common/HeartbeatMap.h
src/common/OutputDataSocket.cc
src/common/QueueRing.h
src/common/RWLock.h
src/common/RefCountedObj.h
src/common/Throttle.cc
src/common/Throttle.h
src/common/TrackedOp.cc
src/common/TrackedOp.h
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/common/admin_socket.cc
src/common/buffer.cc
src/common/perf_counters.cc
src/common/perf_counters.h
src/common/perf_histogram.h
src/common/shared_cache.hpp
src/kv/LevelDBStore.cc
src/kv/LevelDBStore.h
src/perf_histogram.h [new file with mode: 0644]

index 5be9b04bb8836789a0c345293dfe9edd223b5d8a..3cd2e9133ecb753d004c08e0fdca9c45adc8129a 100644 (file)
@@ -3,6 +3,11 @@
 
 #include "Finisher.h"
 
+#include "common/debug.h"
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
 #define dout_subsys ceph_subsys_finisher
 #undef dout_prefix
 #define dout_prefix *_dout << "finisher(" << this << ") "
index effb3c4760cf5491d70e8498e479312f6d01f8ac..e1a4519c71cd33eb23faa1b5a03b3bc145ea7e2c 100644 (file)
@@ -15,6 +15,7 @@
 #ifndef CEPH_FINISHER_H
 #define CEPH_FINISHER_H
 
+#include "common/Mutex.h"
 #include "common/Cond.h"
 #include "common/perf_counters.h"
 
index f9a3d7cce6a99b4d926762370e6572c837154a70..ae1f8e8faae78a66eda32f5cb083ddd730ff1d85 100644 (file)
@@ -69,13 +69,13 @@ bool HeartbeatMap::_check(const heartbeat_handle_d *h, const char *who, time_t n
   bool healthy = true;
   time_t was;
 
-  was = h->timeout.read();
+  was = h->timeout;
   if (was && was < now) {
     ldout(m_cct, 1) << who << " '" << h->name << "'"
                    << " had timed out after " << h->grace << dendl;
     healthy = false;
   }
-  was = h->suicide_timeout.read();
+  was = h->suicide_timeout;
   if (was && was < now) {
     ldout(m_cct, 1) << who << " '" << h->name << "'"
                    << " had suicide timed out after " << h->suicide_grace << dendl;
@@ -93,13 +93,13 @@ void HeartbeatMap::reset_timeout(heartbeat_handle_d *h, time_t grace, time_t sui
   time_t now = time(NULL);
   _check(h, "reset_timeout", now);
 
-  h->timeout.set(now + grace);
+  h->timeout = now + grace;
   h->grace = grace;
 
   if (suicide_grace)
-    h->suicide_timeout.set(now + suicide_grace);
+    h->suicide_timeout = now + suicide_grace;
   else
-    h->suicide_timeout.set(0);
+    h->suicide_timeout = 0;
   h->suicide_grace = suicide_grace;
 }
 
@@ -108,8 +108,8 @@ void HeartbeatMap::clear_timeout(heartbeat_handle_d *h)
   ldout(m_cct, 20) << "clear_timeout '" << h->name << "'" << dendl;
   time_t now = time(NULL);
   _check(h, "clear_timeout", now);
-  h->timeout.set(0);
-  h->suicide_timeout.set(0);
+  h->timeout = 0;
+  h->suicide_timeout = 0;
 }
 
 bool HeartbeatMap::is_healthy()
@@ -142,8 +142,8 @@ bool HeartbeatMap::is_healthy()
   }
   m_rwlock.put_read();
 
-  m_unhealthy_workers.set(unhealthy);
-  m_total_workers.set(total);
+  m_unhealthy_workers = unhealthy;
+  m_total_workers = total;
 
   ldout(m_cct, 20) << "is_healthy = " << (healthy ? "healthy" : "NOT HEALTHY")
     << ", total workers: " << total << ", number of unhealthy: " << unhealthy << dendl;
@@ -152,12 +152,12 @@ bool HeartbeatMap::is_healthy()
 
 int HeartbeatMap::get_unhealthy_workers() const
 {
-  return m_unhealthy_workers.read();
+  return m_unhealthy_workers;
 }
 
 int HeartbeatMap::get_total_workers() const
 {
-  return m_total_workers.read();
+  return m_total_workers;
 }
 
 void HeartbeatMap::check_touch_file()
index 542d692e93a248009e87e7ec57d270b64765ef51..4e9b314667f4d53e5641373f870c1722f92d5ce7 100644 (file)
 #define CEPH_HEARTBEATMAP_H
 
 #include <list>
+#include <atomic>
+#include <string>
+
+#include <pthread.h>
 
-#include "include/atomic.h"
 #include "RWLock.h"
 
 class CephContext;
@@ -38,7 +41,7 @@ namespace ceph {
 struct heartbeat_handle_d {
   const std::string name;
   pthread_t thread_id;
-  atomic_t timeout, suicide_timeout;
+  std::atomic<unsigned> timeout = { 0 }, suicide_timeout = { 0 };
   time_t grace, suicide_grace;
   std::list<heartbeat_handle_d*>::iterator list_item;
 
@@ -78,8 +81,8 @@ class HeartbeatMap {
   RWLock m_rwlock;
   time_t m_inject_unhealthy_until;
   std::list<heartbeat_handle_d*> m_workers;
-  atomic_t m_unhealthy_workers;
-  atomic_t m_total_workers;
+  std::atomic<unsigned> m_unhealthy_workers = { 0 };
+  std::atomic<unsigned> m_total_workers = { 0 };
 
   bool _check(const heartbeat_handle_d *h, const char *who, time_t now);
 };
index e9c85bd040482c56dbc2d762971193d3f2ea2d95..f61c9c4453a2e7b43d25e7d8dd67fd5a1980b226 100644 (file)
@@ -21,6 +21,9 @@
 #include <poll.h>
 #include <sys/un.h>
 
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
 #define dout_subsys ceph_subsys_asok
 #undef dout_prefix
 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
index 830f80f8442e0e9934790f96cfcfd61f7d7af245..42582d24f4462c508ae357e8f9256aeda2d0540f 100644 (file)
@@ -1,12 +1,12 @@
 #ifndef QUEUE_RING_H
 #define QUEUE_RING_H
 
-#include <list>
-#include <vector>
 #include "common/Mutex.h"
 #include "common/Cond.h"
 
-
+#include <list>
+#include <atomic>
+#include <vector>
 
 template <class T>
 class QueueRing {
@@ -43,18 +43,20 @@ class QueueRing {
 
   std::vector<QueueBucket> buckets;
   int num_buckets;
-  atomic_t cur_read_bucket;
-  atomic_t cur_write_bucket;
+
+  std::atomic<int64_t> cur_read_bucket = { 0 };
+  std::atomic<int64_t> cur_write_bucket = { 0 };
+
 public:
   QueueRing(int n) : buckets(n), num_buckets(n) {
   }
 
   void enqueue(const T& entry) {
-    buckets[cur_write_bucket.inc() % num_buckets].enqueue(entry);
+    buckets[++cur_write_bucket % num_buckets].enqueue(entry);
   };
 
   void dequeue(T *entry) {
-    buckets[cur_read_bucket.inc() % num_buckets].dequeue(entry);
+    buckets[++cur_read_bucket % num_buckets].dequeue(entry);
   }
 };
 
index befc5a53ce3c592364237999d5bbd76948f723b3..fd8a2665ef18f657d4c9b4d2939e5ea908068cbe 100644 (file)
 #include <string>
 #include <include/assert.h>
 #include "lockdep.h"
-#include "include/atomic.h"
 #include "common/valgrind.h"
 
+#include <atomic>
+
 class RWLock final
 {
   mutable pthread_rwlock_t L;
   std::string name;
   mutable int id;
-  mutable atomic_t nrlock, nwlock;
+  mutable std::atomic<unsigned> nrlock = { 0 }, nwlock = { 0 };
   bool track, lockdep;
 
   std::string unique_name(const char* name) const;
@@ -39,7 +40,7 @@ public:
   const RWLock& operator=(const RWLock& other) = delete;
 
   RWLock(const std::string &n, bool track_lock=true, bool ld=true, bool prioritize_write=false)
-    : name(n), id(-1), nrlock(0), nwlock(0), track(track_lock),
+    : name(n), id(-1), track(track_lock),
       lockdep(ld) {
 #if defined(PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
     if (prioritize_write) {
@@ -65,12 +66,12 @@ public:
 
   bool is_locked() const {
     assert(track);
-    return (nrlock.read() > 0) || (nwlock.read() > 0);
+    return (nrlock > 0) || (nwlock > 0);
   }
 
   bool is_wlocked() const {
     assert(track);
-    return (nwlock.read() > 0);
+    return (nwlock > 0);
   }
   ~RWLock() {
     // The following check is racy but we are about to destroy
@@ -85,11 +86,11 @@ public:
 
   void unlock(bool lockdep=true) const {
     if (track) {
-      if (nwlock.read() > 0) {
-        nwlock.dec();
+      if (nwlock > 0) {
+        nwlock--;
       } else {
-        assert(nrlock.read() > 0);
-        nrlock.dec();
+        assert(nrlock > 0);
+        nrlock--;
       }
     }
     if (lockdep && this->lockdep && g_lockdep)
@@ -105,12 +106,12 @@ public:
     assert(r == 0);
     if (lockdep && g_lockdep) id = lockdep_locked(name.c_str(), id);
     if (track)
-      nrlock.inc();
+      nrlock++;
   }
   bool try_get_read() const {
     if (pthread_rwlock_tryrdlock(&L) == 0) {
       if (track)
-         nrlock.inc();
+         nrlock++;
       if (lockdep && g_lockdep) id = lockdep_locked(name.c_str(), id);
       return true;
     }
@@ -129,7 +130,7 @@ public:
     if (lockdep && this->lockdep && g_lockdep)
       id = lockdep_locked(name.c_str(), id);
     if (track)
-      nwlock.inc();
+      nwlock++;
 
   }
   bool try_get_write(bool lockdep=true) {
@@ -137,7 +138,7 @@ public:
       if (lockdep && this->lockdep && g_lockdep)
        id = lockdep_locked(name.c_str(), id);
       if (track)
-         nwlock.inc();
+         nwlock++;
       return true;
     }
     return false;
index 43d5c171396fee956fac35c648a0de094f95da0b..9c0dad1d302f8574e1ec017622ce616b547f503a 100644 (file)
@@ -17,6 +17,7 @@
  
 #include "common/Mutex.h"
 #include "common/Cond.h"
+#include "common/ceph_context.h"
 #include "common/valgrind.h"
 
 // re-include our assert to clobber the system one; fix dout:
 
 struct RefCountedObject {
 private:
-  mutable atomic_t nref;
+  mutable std::atomic<uint64_t> nref;
   CephContext *cct;
 public:
   RefCountedObject(CephContext *c = NULL, int n=1) : nref(n), cct(c) {}
   virtual ~RefCountedObject() {
-    assert(nref.read() == 0);
+    assert(nref == 0);
   }
   
   const RefCountedObject *get() const {
-    int v = nref.inc();
+    int v = ++nref;
     if (cct)
       lsubdout(cct, refs, 1) << "RefCountedObject::get " << this << " "
                             << (v - 1) << " -> " << v
@@ -41,7 +42,7 @@ public:
     return this;
   }
   RefCountedObject *get() {
-    int v = nref.inc();
+    int v = ++nref;
     if (cct)
       lsubdout(cct, refs, 1) << "RefCountedObject::get " << this << " "
                             << (v - 1) << " -> " << v
@@ -50,7 +51,7 @@ public:
   }
   void put() const {
     CephContext *local_cct = cct;
-    int v = nref.dec();
+    int v = --nref;
     if (v == 0) {
       ANNOTATE_HAPPENS_AFTER(&nref);
       ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&nref);
@@ -68,7 +69,7 @@ public:
   }
 
   uint64_t get_nref() const {
-    return nref.read();
+    return nref;
   }
 };
 
@@ -118,10 +119,10 @@ struct RefCountedCond : public RefCountedObject {
  *    
  */
 struct RefCountedWaitObject {
-  atomic_t nref;
+  std::atomic<uint64_t> nref = { 1 };
   RefCountedCond *c;
 
-  RefCountedWaitObject() : nref(1) {
+  RefCountedWaitObject() {
     c = new RefCountedCond;
   }
   virtual ~RefCountedWaitObject() {
@@ -129,7 +130,7 @@ struct RefCountedWaitObject {
   }
 
   RefCountedWaitObject *get() {
-    nref.inc();
+    nref++;
     return this;
   }
 
@@ -137,7 +138,7 @@ struct RefCountedWaitObject {
     bool ret = false;
     RefCountedCond *cond = c;
     cond->get();
-    if (nref.dec() == 0) {
+    if (--nref == 0) {
       cond->done();
       delete this;
       ret = true;
@@ -150,7 +151,7 @@ struct RefCountedWaitObject {
     RefCountedCond *cond = c;
 
     cond->get();
-    if (nref.dec() == 0) {
+    if (--nref == 0) {
       cond->done();
       delete this;
     } else {
index ca5701d8f7324416637e10fed50ca40db87ad861..7ddf4883ba8c43b4997262187758985b84a9534d 100644 (file)
@@ -4,6 +4,9 @@
 #include "common/Throttle.h"
 #include "common/perf_counters.h"
 
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
 #define dout_subsys ceph_subsys_throttle
 
 #undef dout_prefix
@@ -54,7 +57,7 @@ Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_
 
     logger = b.create_perf_counters();
     cct->get_perfcounters_collection()->add(logger);
-    logger->set(l_throttle_max, max.read());
+    logger->set(l_throttle_max, max);
   }
 }
 
@@ -78,13 +81,13 @@ Throttle::~Throttle()
 void Throttle::_reset_max(int64_t m)
 {
   assert(lock.is_locked());
-  if ((int64_t)max.read() == m)
+  if (static_cast<int64_t>(max) == m)
     return;
   if (!cond.empty())
     cond.front()->SignalOne();
   if (logger)
     logger->set(l_throttle_max, m);
-  max.set((size_t)m);
+  max = m;
 }
 
 bool Throttle::_wait(int64_t c)
@@ -121,7 +124,7 @@ bool Throttle::_wait(int64_t c)
 
 bool Throttle::wait(int64_t m)
 {
-  if (0 == max.read() && 0 == m) {
+  if (0 == max && 0 == m) {
     return false;
   }
 
@@ -136,31 +139,31 @@ bool Throttle::wait(int64_t m)
 
 int64_t Throttle::take(int64_t c)
 {
-  if (0 == max.read()) {
+  if (0 == max) {
     return 0;
   }
   assert(c >= 0);
   ldout(cct, 10) << "take " << c << dendl;
   {
     Mutex::Locker l(lock);
-    count.add(c);
+    count += c;
   }
   if (logger) {
     logger->inc(l_throttle_take);
     logger->inc(l_throttle_take_sum, c);
-    logger->set(l_throttle_val, count.read());
+    logger->set(l_throttle_val, count);
   }
-  return count.read();
+  return count;
 }
 
 bool Throttle::get(int64_t c, int64_t m)
 {
-  if (0 == max.read() && 0 == m) {
+  if (0 == max && 0 == m) {
     return false;
   }
 
   assert(c >= 0);
-  ldout(cct, 10) << "get " << c << " (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
+  ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
   if (logger) {
     logger->inc(l_throttle_get_started);
   }
@@ -172,12 +175,12 @@ bool Throttle::get(int64_t c, int64_t m)
       _reset_max(m);
     }
     waited = _wait(c);
-    count.add(c);
+    count += c;
   }
   if (logger) {
     logger->inc(l_throttle_get);
     logger->inc(l_throttle_get_sum, c);
-    logger->set(l_throttle_val, count.read());
+    logger->set(l_throttle_val, count);
   }
   return waited;
 }
@@ -187,7 +190,7 @@ bool Throttle::get(int64_t c, int64_t m)
  */
 bool Throttle::get_or_fail(int64_t c)
 {
-  if (0 == max.read()) {
+  if (0 == max) {
     return true;
   }
 
@@ -200,13 +203,13 @@ bool Throttle::get_or_fail(int64_t c)
     }
     return false;
   } else {
-    ldout(cct, 10) << "get_or_fail " << c << " success (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
-    count.add(c);
+    ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
+    count += c;
     if (logger) {
       logger->inc(l_throttle_get_or_fail_success);
       logger->inc(l_throttle_get);
       logger->inc(l_throttle_get_sum, c);
-      logger->set(l_throttle_val, count.read());
+      logger->set(l_throttle_val, count);
     }
     return true;
   }
@@ -214,25 +217,25 @@ bool Throttle::get_or_fail(int64_t c)
 
 int64_t Throttle::put(int64_t c)
 {
-  if (0 == max.read()) {
+  if (0 == max) {
     return 0;
   }
 
   assert(c >= 0);
-  ldout(cct, 10) << "put " << c << " (" << count.read() << " -> " << (count.read()-c) << ")" << dendl;
+  ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
   Mutex::Locker l(lock);
   if (c) {
     if (!cond.empty())
       cond.front()->SignalOne();
-    assert(((int64_t)count.read()) >= c); //if count goes negative, we failed somewhere!
-    count.sub(c);
+    assert(static_cast<int64_t>(count) >= c); // if count goes negative, we failed somewhere!
+    count -= c;
     if (logger) {
       logger->inc(l_throttle_put);
       logger->inc(l_throttle_put_sum, c);
-      logger->set(l_throttle_val, count.read());
+      logger->set(l_throttle_val, count);
     }
   }
-  return count.read();
+  return count;
 }
 
 void Throttle::reset()
@@ -240,7 +243,7 @@ void Throttle::reset()
   Mutex::Locker l(lock);
   if (!cond.empty())
     cond.front()->SignalOne();
-  count.set(0);
+  count = 0;
   if (logger) {
     logger->set(l_throttle_val, 0);
   }
index f6bcd078b45bde35a48423d95b740e13972c05ad..9f166cd48b979ccafa56b057240a842ea67a611a 100644 (file)
@@ -4,9 +4,16 @@
 #ifndef CEPH_THROTTLE_H
 #define CEPH_THROTTLE_H
 
-#include "Cond.h"
+#include <map>
+#include <list>
+#include <chrono>
+#include <atomic>
+#include <iostream>
 #include <condition_variable>
 
+#include "Cond.h"
+#include "include/Context.h"
+
 class CephContext;
 class PerfCounters;
 
@@ -22,7 +29,7 @@ class Throttle {
   CephContext *cct;
   const std::string name;
   PerfCounters *logger;
-  ceph::atomic_t count, max;
+  std::atomic<unsigned> count = { 0 }, max = { 0 };
   Mutex lock;
   list<Cond*> cond;
   const bool use_perf;
@@ -34,8 +41,8 @@ public:
 private:
   void _reset_max(int64_t m);
   bool _should_wait(int64_t c) const {
-    int64_t m = max.read();
-    int64_t cur = count.read();
+    int64_t m = max;
+    int64_t cur = count;
     return
       m &&
       ((c <= m && cur + c > m) || // normally stay under max
@@ -50,20 +57,20 @@ public:
    * @returns the number of taken slots
    */
   int64_t get_current() const {
-    return count.read();
+    return count;
   }
 
   /**
    * get the max number of slots
    * @returns the max number of slots
    */
-  int64_t get_max() const { return max.read(); }
+  int64_t get_max() const { return max; }
 
   /**
    * return true if past midpoint
    */
   bool past_midpoint() const {
-    return count.read() >= max.read() / 2;
+    return count >= max / 2;
   }
 
   /**
index 3181fe1fc22f33c9e6412e5c7476c5bbe8f0c6e1..ecaa196afd989a4dc395110964fe7de38e0bb612 100644 (file)
@@ -239,7 +239,7 @@ bool OpTracker::register_inflight_op(TrackedOp *i)
   if (!tracking_enabled)
     return false;
 
-  uint64_t current_seq = seq.inc();
+  uint64_t current_seq = ++seq;
   uint32_t shard_index = current_seq % num_optracker_shards;
   ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
   assert(NULL != sdata);
index 621831da9175c3c83a580d9940bad86a78096faf..40f38a61a5c2d68090da35c3a2faf5247273a887 100644 (file)
@@ -62,7 +62,7 @@ public:
 struct ShardedTrackingData;
 class OpTracker {
   friend class OpHistory;
-  atomic64_t seq;
+  std::atomic<int64_t> seq = { 0 };
   vector<ShardedTrackingData*> sharded_in_flight_list;
   uint32_t num_optracker_shards;
   OpHistory history;
index b077b813cc3f26c85e303fed0056cb9b2c814e7b..1d26723128a873ccf56a2871afa7c89c735fca45 100644 (file)
@@ -288,9 +288,6 @@ ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
   lockname(name + "::lock"),
   shardedpool_lock(lockname.c_str()),
   num_threads(pnum_threads),
-  stop_threads(0),
-  pause_threads(0),
-  drain_threads(0),
   num_paused(0),
   num_drained(0),
   wq(NULL) {}
@@ -306,28 +303,28 @@ void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
   ss << name << " thread " << name;
   heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
 
-  while (!stop_threads.read()) {
-    if(pause_threads.read()) {
+  while (!stop_threads) {
+    if (pause_threads) {
       shardedpool_lock.Lock();
       ++num_paused;
       wait_cond.Signal();
-      while(pause_threads.read()) {
+      while (pause_threads) {
        cct->get_heartbeat_map()->reset_timeout(
-        hb,
-        wq->timeout_interval, wq->suicide_interval);
+               hb,
+               wq->timeout_interval, wq->suicide_interval);
        shardedpool_cond.WaitInterval(shardedpool_lock,
-        utime_t(
+          utime_t(
           cct->_conf->threadpool_empty_queue_max_wait, 0));
       }
       --num_paused;
       shardedpool_lock.Unlock();
     }
-    if (drain_threads.read()) {
+    if (drain_threads) {
       shardedpool_lock.Lock();
       if (wq->is_shard_empty(thread_index)) {
         ++num_drained;
         wait_cond.Signal();
-        while (drain_threads.read()) {
+        while (drain_threads) {
          cct->get_heartbeat_map()->reset_timeout(
            hb,
            wq->timeout_interval, wq->suicide_interval);
@@ -380,7 +377,7 @@ void ShardedThreadPool::start()
 void ShardedThreadPool::stop()
 {
   ldout(cct,10) << "stop" << dendl;
-  stop_threads.set(1);
+  stop_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
@@ -397,7 +394,7 @@ void ShardedThreadPool::pause()
 {
   ldout(cct,10) << "pause" << dendl;
   shardedpool_lock.Lock();
-  pause_threads.set(1);
+  pause_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   while (num_threads != num_paused){
@@ -411,7 +408,7 @@ void ShardedThreadPool::pause_new()
 {
   ldout(cct,10) << "pause_new" << dendl;
   shardedpool_lock.Lock();
-  pause_threads.set(1);
+  pause_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   shardedpool_lock.Unlock();
@@ -422,7 +419,7 @@ void ShardedThreadPool::unpause()
 {
   ldout(cct,10) << "unpause" << dendl;
   shardedpool_lock.Lock();
-  pause_threads.set(0);
+  pause_threads = false;
   shardedpool_cond.Signal();
   shardedpool_lock.Unlock();
   ldout(cct,10) << "unpaused" << dendl;
@@ -432,13 +429,13 @@ void ShardedThreadPool::drain()
 {
   ldout(cct,10) << "drain" << dendl;
   shardedpool_lock.Lock();
-  drain_threads.set(1);
+  drain_threads = true;
   assert(wq != NULL);
   wq->return_waiting_threads();
   while (num_threads != num_drained) {
     wait_cond.Wait(shardedpool_lock);
   }
-  drain_threads.set(0);
+  drain_threads = false;
   shardedpool_cond.Signal();
   shardedpool_lock.Unlock();
   ldout(cct,10) << "drained" << dendl;
index 33c6c780d352492128268cbee6fbd576a94d29a1..c817e74ec0920201ef88199ff42750b284823f17 100644 (file)
@@ -19,6 +19,8 @@
 #include "include/unordered_map.h"
 #include "common/HeartbeatMap.h"
 
+#include <atomic>
+
 class CephContext;
 
 /// Pool of threads that share work submitted to multiple work queues.
@@ -620,9 +622,11 @@ class ShardedThreadPool {
   Cond shardedpool_cond;
   Cond wait_cond;
   uint32_t num_threads;
-  atomic_t stop_threads;
-  atomic_t pause_threads;
-  atomic_t drain_threads;
+
+  std::atomic<bool> stop_threads = { false };
+  std::atomic<bool> pause_threads = { false };
+  std::atomic<bool> drain_threads = { false };
+
   uint32_t num_paused;
   uint32_t num_drained;
 
index f2419c4160e76a03e51788de00badf19316c4e1f..2bb767edcff7026e8a94f7bf6e967de9dd2282ed 100644 (file)
@@ -23,6 +23,9 @@
 #include <poll.h>
 #include <sys/un.h>
 
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
 #define dout_subsys ceph_subsys_asok
 #undef dout_prefix
 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
index 5ff28c41c6adef537dd7d422dd200838d67b5e94..f0f33e344bb0c44f0a922db6873b5d06b01eb233 100644 (file)
  * 
  */
 
+#include <atomic>
+#include <errno.h>
+#include <limits.h>
+
+#include <sys/uio.h>
+
 #include "include/compat.h"
 #include "include/mempool.h"
 #include "armor.h"
@@ -31,6 +37,8 @@
 #include "msg/xio/XioMsg.h"
 #endif
 
+using namespace ceph;
+
 #define CEPH_BUFFER_ALLOC_UNIT  (MIN(CEPH_PAGE_SIZE, 4096))
 #define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))
 
@@ -43,71 +51,73 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 # define bendl std::endl; }
 #endif
 
-  static atomic_t buffer_total_alloc;
-  static atomic64_t buffer_history_alloc_bytes;
-  static atomic64_t buffer_history_alloc_num;
+  static std::atomic<uint64_t> buffer_total_alloc { 0 };
+  static std::atomic<uint64_t> buffer_history_alloc_bytes { 0 };
+  static std::atomic<uint64_t> buffer_history_alloc_num { 0 };
+
   const bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK");
 
   namespace {
   void inc_total_alloc(unsigned len) {
     if (buffer_track_alloc)
-      buffer_total_alloc.add(len);
+      buffer_total_alloc += len;
   }
 
   void dec_total_alloc(unsigned len) {
     if (buffer_track_alloc)
-      buffer_total_alloc.sub(len);
+      buffer_total_alloc -= len;
   }
 
   void inc_history_alloc(uint64_t len) {
     if (buffer_track_alloc) {
-      buffer_history_alloc_bytes.add(len);
-      buffer_history_alloc_num.inc();
+      buffer_history_alloc_bytes += len;
+      buffer_history_alloc_num++;
     }
   }
-  }
-
+  } // namespace
 
   int buffer::get_total_alloc() {
-    return buffer_total_alloc.read();
+    return buffer_total_alloc;
   }
   uint64_t buffer::get_history_alloc_bytes() {
-    return buffer_history_alloc_bytes.read();
+    return buffer_history_alloc_bytes;
   }
   uint64_t buffer::get_history_alloc_num() {
-    return buffer_history_alloc_num.read();
+    return buffer_history_alloc_num;
   }
 
-  static atomic_t buffer_cached_crc;
-  static atomic_t buffer_cached_crc_adjusted;
-  static atomic_t buffer_missed_crc;
+  static std::atomic<unsigned> buffer_cached_crc { 0 };
+  static std::atomic<unsigned> buffer_cached_crc_adjusted { 0 };
+  static std::atomic<unsigned> buffer_missed_crc { 0 };
+
   static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK");
 
   void buffer::track_cached_crc(bool b) {
     buffer_track_crc = b;
   }
   int buffer::get_cached_crc() {
-    return buffer_cached_crc.read();
+    return buffer_cached_crc;
   }
   int buffer::get_cached_crc_adjusted() {
-    return buffer_cached_crc_adjusted.read();
+    return buffer_cached_crc_adjusted;
   }
 
   int buffer::get_missed_crc() {
-    return buffer_missed_crc.read();
+    return buffer_missed_crc;
   }
 
-  static atomic_t buffer_c_str_accesses;
+  static std::atomic<unsigned> buffer_c_str_accesses { 0 };
+
   static bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK");
 
   void buffer::track_c_str(bool b) {
     buffer_track_c_str = b;
   }
   int buffer::get_c_str_accesses() {
-    return buffer_c_str_accesses.read();
+    return buffer_c_str_accesses;
   }
 
-  static atomic_t buffer_max_pipe_size;
+  static std::atomic<unsigned> buffer_max_pipe_size { 0 };
   int update_max_pipe_size() {
 #ifdef CEPH_HAVE_SETPIPE_SZ
     char buf[32];
@@ -124,18 +134,18 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     size_t size = strict_strtol(buf, 10, &err);
     if (!err.empty())
       return -EIO;
-    buffer_max_pipe_size.set(size);
+    buffer_max_pipe_size = size;
 #endif
     return 0;
   }
 
   size_t get_max_pipe_size() {
 #ifdef CEPH_HAVE_SETPIPE_SZ
-    size_t size = buffer_max_pipe_size.read();
+    size_t size = buffer_max_pipe_size;
     if (size)
       return size;
     if (update_max_pipe_size() == 0)
-      return buffer_max_pipe_size.read();
+      return buffer_max_pipe_size;
 #endif
     // this is the max size hardcoded in linux before 2.6.35
     return 65536;
@@ -160,7 +170,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   public:
     char *data;
     unsigned len;
-    atomic_t nref;
+    std::atomic<unsigned> nref { 0 };
     int mempool = mempool::mempool_buffer_anon;
 
     mutable std::atomic_flag crc_spinlock = ATOMIC_FLAG_INIT;
@@ -821,25 +831,25 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 
   buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len)   // no lock needed; this is an unref raw.
   {
-    r->nref.inc();
+    r->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
   {
     _raw = create(l);
-    _raw->nref.inc();
+    _raw->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l)    // ditto.
   {
     _raw = copy(d, l);
-    _raw->nref.inc();
+    _raw->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len)
   {
     if (_raw) {
-      _raw->nref.inc();
+      _raw->nref++;
       bdout << "ptr " << this << " get " << _raw << bendl;
     }
   }
@@ -853,13 +863,13 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   {
     assert(o+l <= p._len);
     assert(_raw);
-    _raw->nref.inc();
+    _raw->nref++;
     bdout << "ptr " << this << " get " << _raw << bendl;
   }
   buffer::ptr& buffer::ptr::operator= (const ptr& p)
   {
     if (p._raw) {
-      p._raw->nref.inc();
+      p._raw->nref++;
       bdout << "ptr " << this << " get " << _raw << bendl;
     }
     buffer::raw *raw = p._raw; 
@@ -898,8 +908,8 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
     if (_raw && !_raw->is_shareable()) {
       buffer::raw *tr = _raw;
       _raw = tr->clone();
-      _raw->nref.set(1);
-      if (unlikely(tr->nref.dec() == 0)) {
+      _raw->nref = 1;
+      if (unlikely(--tr->nref == 0)) {
         ANNOTATE_HAPPENS_AFTER(&tr->nref);
         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&tr->nref);
         delete tr;
@@ -927,7 +937,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   {
     if (_raw) {
       bdout << "ptr " << this << " release " << _raw << bendl;
-      if (_raw->nref.dec() == 0) {
+      if (--_raw->nref == 0) {
        //cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
         ANNOTATE_HAPPENS_AFTER(&_raw->nref);
         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
@@ -944,25 +954,25 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
   const char *buffer::ptr::c_str() const {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off;
   }
   char *buffer::ptr::c_str() {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off;
   }
   const char *buffer::ptr::end_c_str() const {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off + _len;
   }
   char *buffer::ptr::end_c_str() {
     assert(_raw);
     if (buffer_track_c_str)
-      buffer_c_str_accesses.inc();
+      buffer_c_str_accesses++;
     return _raw->get_data() + _off + _len;
   }
 
@@ -988,7 +998,7 @@ static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
 
   const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; }
   unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; }
-  int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref.read(); }
+  int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref; }
 
   void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
     assert(_raw);
@@ -2456,7 +2466,7 @@ __u32 buffer::list::crc32c(__u32 crc) const
          // got it already
          crc = ccrc.second;
          if (buffer_track_crc)
-           buffer_cached_crc.inc();
+           buffer_cached_crc++;
        } else {
          /* If we have cached crc32c(buf, v) for initial value v,
           * we can convert this to a different initial value v' by:
@@ -2468,11 +2478,11 @@ __u32 buffer::list::crc32c(__u32 crc) const
           */
          crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length());
          if (buffer_track_crc)
-           buffer_cached_crc_adjusted.inc();
+           buffer_cached_crc_adjusted++;
        }
       } else {
        if (buffer_track_crc)
-         buffer_missed_crc.inc();
+         buffer_missed_crc++;
        uint32_t base = crc;
        crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length());
        r->set_crc(ofs, make_pair(base, crc));
@@ -2597,7 +2607,7 @@ buffer::list buffer::list::static_from_string(string& s) {
 }
 
 std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
-  return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.read() << ")";
+  return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
 }
 
 std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
index f5969172b81d11d3ad880a5a55940e24a5a4ca47..3d462c72f552368618dc0291e51cd1f61f3d94ca 100644 (file)
@@ -173,11 +173,11 @@ void PerfCounters::inc(int idx, uint64_t amt)
   if (!(data.type & PERFCOUNTER_U64))
     return;
   if (data.type & PERFCOUNTER_LONGRUNAVG) {
-    data.avgcount.inc();
-    data.u64.add(amt);
-    data.avgcount2.inc();
+    data.avgcount++;
+    data.u64 += amt;
+    data.avgcount2++;
   } else {
-    data.u64.add(amt);
+    data.u64 += amt;
   }
 }
 
@@ -192,7 +192,7 @@ void PerfCounters::dec(int idx, uint64_t amt)
   assert(!(data.type & PERFCOUNTER_LONGRUNAVG));
   if (!(data.type & PERFCOUNTER_U64))
     return;
-  data.u64.sub(amt);
+  data.u64 -= amt;
 }
 
 void PerfCounters::set(int idx, uint64_t amt)
@@ -209,11 +209,11 @@ void PerfCounters::set(int idx, uint64_t amt)
   ANNOTATE_BENIGN_RACE_SIZED(&data.u64, sizeof(data.u64),
                              "perf counter atomic");
   if (data.type & PERFCOUNTER_LONGRUNAVG) {
-    data.avgcount.inc();
-    data.u64.set(amt);
-    data.avgcount2.inc();
+    data.avgcount++;
+    data.u64 = amt;
+    data.avgcount2++;
   } else {
-    data.u64.set(amt);
+    data.u64 = amt;
   }
 }
 
@@ -227,7 +227,7 @@ uint64_t PerfCounters::get(int idx) const
   const perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
   if (!(data.type & PERFCOUNTER_U64))
     return 0;
-  return data.u64.read();
+  return data.u64;
 }
 
 void PerfCounters::tinc(int idx, utime_t amt, uint32_t avgcount)
@@ -241,11 +241,11 @@ void PerfCounters::tinc(int idx, utime_t amt, uint32_t avgcount)
   if (!(data.type & PERFCOUNTER_TIME))
     return;
   if (data.type & PERFCOUNTER_LONGRUNAVG) {
-    data.avgcount.add(avgcount);
-    data.u64.add(amt.to_nsec());
-    data.avgcount2.add(avgcount);
+    data.avgcount++;
+    data.u64 += amt.to_nsec();
+    data.avgcount2++;
   } else {
-    data.u64.add(amt.to_nsec());
+    data.u64 += amt.to_nsec();
   }
 }
 
@@ -260,11 +260,11 @@ void PerfCounters::tinc(int idx, ceph::timespan amt, uint32_t avgcount)
   if (!(data.type & PERFCOUNTER_TIME))
     return;
   if (data.type & PERFCOUNTER_LONGRUNAVG) {
-    data.avgcount.add(avgcount);
-    data.u64.add(amt.count());
-    data.avgcount2.add(avgcount);
+    data.avgcount++;
+    data.u64 += amt.count();
+    data.avgcount2++;
   } else {
-    data.u64.add(amt.count());
+    data.u64 += amt.count();
   }
 }
 
@@ -278,7 +278,7 @@ void PerfCounters::tset(int idx, utime_t amt)
   perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
   if (!(data.type & PERFCOUNTER_TIME))
     return;
-  data.u64.set(amt.to_nsec());
+  data.u64 = amt.to_nsec();
   if (data.type & PERFCOUNTER_LONGRUNAVG)
     ceph_abort();
 }
@@ -293,7 +293,7 @@ utime_t PerfCounters::tget(int idx) const
   const perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
   if (!(data.type & PERFCOUNTER_TIME))
     return utime_t();
-  uint64_t v = data.u64.read();
+  uint64_t v = data.u64;
   return utime_t(v / 1000000000ull, v % 1000000000ull);
 }
 
@@ -426,7 +426,7 @@ void PerfCounters::dump_formatted_generic(Formatter *f, bool schema,
         d->histogram->dump_formatted(f);
         f->close_section();
       } else {
-       uint64_t v = d->u64.read();
+       uint64_t v = d->u64;
        if (d->type & PERFCOUNTER_U64) {
          f->dump_unsigned(d->name, v);
        } else if (d->type & PERFCOUNTER_TIME) {
index ea78f10dcaaee4c7f4335ef8f43b406e5d0f2c5a..5eaa59fb699afbea32d557716443b3cc78a69b4b 100644 (file)
 #ifndef CEPH_COMMON_PERF_COUNTERS_H
 #define CEPH_COMMON_PERF_COUNTERS_H
 
+#include <string>
+#include <vector>
+#include <memory>
+#include <atomic>
+#include <cstdint>
+
 #include "common/perf_histogram.h"
 #include "include/utime.h"
 #include "common/Mutex.h"
+#include "common/ceph_time.h"
 
 class CephContext;
 class PerfCountersBuilder;
@@ -69,21 +76,18 @@ public:
       : name(NULL),
         description(NULL),
         nick(NULL),
-       type(PERFCOUNTER_NONE),
-       u64(0),
-       avgcount(0),
-       avgcount2(0)
+           type(PERFCOUNTER_NONE)
     {}
     perf_counter_data_any_d(const perf_counter_data_any_d& other)
       : name(other.name),
         description(other.description),
         nick(other.nick),
        type(other.type),
-       u64(other.u64.read()) {
+       u64(other.u64.load()) {
       pair<uint64_t,uint64_t> a = other.read_avg();
-      u64.set(a.first);
-      avgcount.set(a.second);
-      avgcount2.set(a.second);
+      u64 = a.first;
+      avgcount = a.second;
+      avgcount2 = a.second;
       if (other.histogram) {
         histogram.reset(new PerfHistogram<>(*other.histogram));
       }
@@ -94,17 +98,17 @@ public:
     const char *nick;
     int prio = 0;
     enum perfcounter_type_d type;
-    atomic64_t u64;
-    atomic64_t avgcount;
-    atomic64_t avgcount2;
+    std::atomic<uint64_t> u64 = { 0 };
+    std::atomic<uint64_t> avgcount = { 0 };
+    std::atomic<uint64_t> avgcount2 = { 0 };
     std::unique_ptr<PerfHistogram<>> histogram;
 
     void reset()
     {
       if (type != PERFCOUNTER_U64) {
-       u64.set(0);
-       avgcount.set(0);
-       avgcount2.set(0);
+           u64 = 0;
+           avgcount = 0;
+           avgcount2 = 0;
       }
       if (histogram) {
         histogram->reset();
@@ -117,9 +121,9 @@ public:
     pair<uint64_t,uint64_t> read_avg() const {
       uint64_t sum, count;
       do {
-       count = avgcount2.read();
-       sum = u64.read();
-      } while (avgcount.read() != count);
+       count = avgcount;
+       sum = u64;
+      } while (avgcount2 != count);
       return make_pair(sum, count);
     }
   };
index ee726d394e495ed02f4ff48360bc73d14256285b..aa5713d0b6e032a67487812d991c0f21b2f28d7d 100644 (file)
 #ifndef CEPH_COMMON_PERF_HISTOGRAM_H
 #define CEPH_COMMON_PERF_HISTOGRAM_H
 
-#include "common/Formatter.h"
-#include "include/atomic.h"
-
 #include <array>
+#include <atomic>
 #include <memory>
+#include <cassert>
+
+#include "common/Formatter.h"
+#include "include/int_types.h"
 
 class PerfHistogramCommon {
 public:
@@ -84,16 +86,16 @@ public:
       m_axes_config[i++] = ac;
     }
 
-    m_rawData.reset(new atomic64_t[get_raw_size()]);
+    m_rawData.reset(new std::atomic<uint64_t>[get_raw_size()] {});
   }
 
   /// Copy from other histogram object
   PerfHistogram(const PerfHistogram &other)
       : m_axes_config(other.m_axes_config) {
     int64_t size = get_raw_size();
-    m_rawData.reset(new atomic64_t[size]);
+    m_rawData.reset(new std::atomic<uint64_t>[size] {});
     for (int64_t i = 0; i < size; i++) {
-      m_rawData[i].set(other.m_rawData[i].read());
+      m_rawData[i] = other.m_rawData[i].load();
     }
   }
 
@@ -101,7 +103,7 @@ public:
   void reset() {
     auto size = get_raw_size();
     for (auto i = size; --i >= 0;) {
-      m_rawData[i].set(0);
+      m_rawData[i] = 0;
     }
   }
 
@@ -109,21 +111,21 @@ public:
   template <typename... T>
   void inc(T... axis) {
     auto index = get_raw_index_for_value(axis...);
-    m_rawData[index].add(1);
+    m_rawData[index]++;
   }
 
   /// Increase counter for given axis buckets by one
   template <typename... T>
   void inc_bucket(T... bucket) {
     auto index = get_raw_index_for_bucket(bucket...);
-    m_rawData[index].add(1);
+    m_rawData[index]++;
   }
 
   /// Read value from given bucket
   template <typename... T>
   uint64_t read_bucket(T... bucket) const {
     auto index = get_raw_index_for_bucket(bucket...);
-    return m_rawData[index].read();
+    return m_rawData[index];
   }
 
   /// Dump data to a Formatter object
@@ -142,7 +144,7 @@ public:
 protected:
   /// Raw data stored as linear space, internal indexes are calculated on
   /// demand.
-  std::unique_ptr<atomic64_t[]> m_rawData;
+  std::unique_ptr<std::atomic<uint64_t>[]> m_rawData;
 
   /// Configuration of axes
   std::array<axis_config_d, DIM> m_axes_config;
@@ -207,7 +209,7 @@ protected:
   void visit_values(FDE onDimensionEnter, FV onValue, FDL onDimensionLeave,
                     int level = 0, int startIndex = 0) const {
     if (level == DIM) {
-      onValue(m_rawData[startIndex].read());
+      onValue(m_rawData[startIndex]);
       return;
     }
 
index 6098e57b956f188c5d544717c9a8f43fbde8cf06..3332bd4f00ac0d171ddf7f15780727a78375edcd 100644 (file)
@@ -21,6 +21,9 @@
 #include "common/Cond.h"
 #include "include/unordered_map.h"
 
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
 template <class K, class V, class C = std::less<K>, class H = std::hash<K> >
 class SharedLRU {
   CephContext *cct;
index 53840319cf50adca583da51b8f30d68e94127840..450bd59e38135601989ded4feec7bfbfbf91406a 100644 (file)
@@ -5,12 +5,18 @@
 #include <set>
 #include <map>
 #include <string>
-#include "include/memory.h"
-#include <errno.h>
+#include <cerrno>
+
 using std::string;
+
+#include "include/memory.h"
+
 #include "common/debug.h"
 #include "common/perf_counters.h"
 
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
 #define dout_context cct
 #define dout_subsys ceph_subsys_leveldb
 #undef dout_prefix
index 7ce87ecc4812091583ecd8e976c1eee829763810..be344ff18e6836a6719314162d0d583b3c5b97e6 100644 (file)
@@ -29,6 +29,9 @@
 
 #include "common/ceph_context.h"
 
+// reinclude our assert to clobber the system one
+# include "include/assert.h"
+
 class PerfCounters;
 
 enum {
diff --git a/src/perf_histogram.h b/src/perf_histogram.h
new file mode 100644 (file)
index 0000000..06ebaf8
--- /dev/null
@@ -0,0 +1,227 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 OVH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMMON_PERF_HISTOGRAM_H
+#define CEPH_COMMON_PERF_HISTOGRAM_H
+
+#include "common/Formatter.h"
+#include "include/int_types.h"
+
+#include <array>
+#include <atomic>
+#include <memory>
+#include <cassert>
+
+class PerfHistogramCommon {
+public:
+  enum scale_type_d : uint8_t {
+    SCALE_LINEAR = 1,
+    SCALE_LOG2 = 2,
+  };
+
+  struct axis_config_d {
+    const char *m_name = nullptr;
+    scale_type_d m_scale_type = SCALE_LINEAR;
+    int64_t m_min = 0;
+    int64_t m_quant_size = 0;
+    int32_t m_buckets = 0;
+    axis_config_d() = default;
+    axis_config_d(const char* name,
+                 scale_type_d scale_type,
+                 int64_t min,
+                 int64_t quant_size,
+                 int32_t buckets)
+      : m_name(name),
+       m_scale_type(scale_type),
+       m_min(min),
+       m_quant_size(quant_size),
+       m_buckets(buckets)
+    {}
+  };
+
+protected:
+  /// Dump configuration of one axis to a formatter
+  static void dump_formatted_axis(ceph::Formatter *f, const axis_config_d &ac);
+
+  /// Quantize given value and convert to bucket number on given axis
+  static int64_t get_bucket_for_axis(int64_t value, const axis_config_d &ac);
+
+  /// Calculate inclusive ranges of axis values for each bucket on that axis
+  static std::vector<std::pair<int64_t, int64_t>> get_axis_bucket_ranges(
+      const axis_config_d &ac);
+};
+
+/// PerfHistogram does trace a histogram of input values. It's an extended
+/// version of a standard histogram which does trace characteristics of a single
+/// one value only. In this implementation, values can be traced in multiple
+/// dimensions - i.e. we can create a histogram of input request size (first
+/// dimension) and processing latency (second dimension). Creating standard
+/// histogram out of such multidimensional one is trivial and requires summing
+/// values across dimensions we're not interested in.
+template <int DIM = 2>
+class PerfHistogram : public PerfHistogramCommon {
+public:
+  /// Initialize new histogram object
+  PerfHistogram(std::initializer_list<axis_config_d> axes_config) {
+    assert(axes_config.size() == DIM &&
+           "Invalid number of axis configuration objects");
+
+    int i = 0;
+    for (const auto &ac : axes_config) {
+      assert(ac.m_buckets > 0 && "Must have at least one bucket on axis");
+      assert(ac.m_quant_size > 0 &&
+             "Quantization unit must be non-zero positive integer value");
+
+      m_axes_config[i++] = ac;
+    }
+
+    m_rawData.reset(new std::atomic<uint64_t>[get_raw_size()]);
+  }
+
+  /// Copy from other histogram object
+  PerfHistogram(const PerfHistogram &other)
+      : m_axes_config(other.m_axes_config) {
+    int64_t size = get_raw_size();
+    m_rawData.reset(new std::atomic<uint64_t>[size]);
+    for (int64_t i = 0; i < size; i++) {
+      m_rawData[i] = other.m_rawData[i];
+    }
+  }
+
+  /// Set all histogram values to 0
+  void reset() {
+    auto size = get_raw_size();
+    for (auto i = size; --i >= 0;) {
+      m_rawData[i] = 0;
+    }
+  }
+
+  /// Increase counter for given axis values by one
+  template <typename... T>
+  void inc(T... axis) {
+    auto index = get_raw_index_for_value(axis...);
+    m_rawData[index] += 1;
+  }
+
+  /// Increase counter for given axis buckets by one
+  template <typename... T>
+  void inc_bucket(T... bucket) {
+    auto index = get_raw_index_for_bucket(bucket...);
+    m_rawData[index] += 1;
+  }
+
+  /// Read value from given bucket
+  template <typename... T>
+  uint64_t read_bucket(T... bucket) const {
+    auto index = get_raw_index_for_bucket(bucket...);
+    return m_rawData[index];
+  }
+
+  /// Dump data to a Formatter object
+  void dump_formatted(ceph::Formatter *f) const {
+    // Dump axes configuration
+    f->open_array_section("axes");
+    for (auto &ac : m_axes_config) {
+      dump_formatted_axis(f, ac);
+    }
+    f->close_section();
+
+    // Dump histogram values
+    dump_formatted_values(f);
+  }
+
+protected:
+  /// Raw data stored as linear space, internal indexes are calculated on
+  /// demand.
+  std::unique_ptr<std::atomic<uint64_t>[]> m_rawData;
+
+  /// Configuration of axes
+  std::array<axis_config_d, DIM> m_axes_config;
+
+  /// Dump histogram counters to a formatter
+  void dump_formatted_values(ceph::Formatter *f) const {
+    visit_values([f](int) { f->open_array_section("values"); },
+                 [f](int64_t value) { f->dump_unsigned("value", value); },
+                 [f](int) { f->close_section(); });
+  }
+
+  /// Get number of all histogram counters
+  int64_t get_raw_size() {
+    int64_t ret = 1;
+    for (const auto &ac : m_axes_config) {
+      ret *= ac.m_buckets;
+    }
+    return ret;
+  }
+
+  /// Calculate m_rawData index from axis values
+  template <typename... T>
+  int64_t get_raw_index_for_value(T... axes) const {
+    static_assert(sizeof...(T) == DIM, "Incorrect number of arguments");
+    return get_raw_index_internal<0>(get_bucket_for_axis, 0, axes...);
+  }
+
+  /// Calculate m_rawData index from axis bucket numbers
+  template <typename... T>
+  int64_t get_raw_index_for_bucket(T... buckets) const {
+    static_assert(sizeof...(T) == DIM, "Incorrect number of arguments");
+    return get_raw_index_internal<0>(
+        [](int64_t bucket, const axis_config_d &ac) {
+          assert(bucket >= 0 && "Bucket index can not be negative");
+          assert(bucket < ac.m_buckets && "Bucket index too large");
+          return bucket;
+        },
+        0, buckets...);
+  }
+
+  template <int level = 0, typename F, typename... T>
+  int64_t get_raw_index_internal(F bucket_evaluator, int64_t startIndex,
+                                 int64_t value, T... tail) const {
+    static_assert(level + 1 + sizeof...(T) == DIM,
+                  "Internal consistency check");
+    auto &ac = m_axes_config[level];
+    auto bucket = bucket_evaluator(value, ac);
+    return get_raw_index_internal<level + 1>(
+        bucket_evaluator, ac.m_buckets * startIndex + bucket, tail...);
+  }
+
+  template <int level, typename F>
+  int64_t get_raw_index_internal(F, int64_t startIndex) const {
+    static_assert(level == DIM, "Internal consistency check");
+    return startIndex;
+  }
+
+  /// Visit all histogram counters, call onDimensionEnter / onDimensionLeave
+  /// when starting / finishing traversal
+  /// on given axis, call onValue when dumping raw histogram counter value.
+  template <typename FDE, typename FV, typename FDL>
+  void visit_values(FDE onDimensionEnter, FV onValue, FDL onDimensionLeave,
+                    int level = 0, int startIndex = 0) const {
+    if (level == DIM) {
+      onValue(m_rawData[startIndex]);
+      return;
+    }
+
+    onDimensionEnter(level);
+    auto &ac = m_axes_config[level];
+    startIndex *= ac.m_buckets;
+    for (int32_t i = 0; i < ac.m_buckets; ++i, ++startIndex) {
+      visit_values(onDimensionEnter, onValue, onDimensionLeave, level + 1,
+                   startIndex);
+    }
+    onDimensionLeave(level);
+  }
+};
+
+#endif