]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
compressor: migrate atomic_t to <atomic>
authorJesse Williamson <jwilliamson@suse.de>
Tue, 23 May 2017 12:16:07 +0000 (05:16 -0700)
committerJesse Williamson <jwilliamson@suse.de>
Sun, 28 May 2017 15:48:15 +0000 (08:48 -0700)
compare_exchange_strong() is used because on architectures without CaS instructions,
compare_exchange_weak() may fail.

Signed-off-by: Jesse Williamson <jwilliamson@suse.de>
src/compressor/AsyncCompressor.cc
src/compressor/AsyncCompressor.h

index 7a9071a7189a2ad312314240a7bf43fce8d947fd..d4a77686b534f365897c1f4e07a65e10b0686ca5 100644 (file)
@@ -22,7 +22,6 @@
 
 AsyncCompressor::AsyncCompressor(CephContext *c):
   compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c),
-  job_id(0),
   compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
   job_lock("AsyncCompressor::job_lock"),
   compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
@@ -42,7 +41,7 @@ void AsyncCompressor::terminate()
 
 uint64_t AsyncCompressor::async_compress(bufferlist &data)
 {
-  uint64_t id = job_id.inc();
+  uint64_t id = ++job_id;
   pair<unordered_map<uint64_t, Job>::iterator, bool> it;
   {
     Mutex::Locker l(job_lock);
@@ -56,7 +55,7 @@ uint64_t AsyncCompressor::async_compress(bufferlist &data)
 
 uint64_t AsyncCompressor::async_decompress(bufferlist &data)
 {
-  uint64_t id = job_id.inc();
+  uint64_t id = ++job_id;
   pair<unordered_map<uint64_t, Job>::iterator, bool> it;
   {
     Mutex::Locker l(job_lock);
@@ -77,25 +76,25 @@ int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, b
     ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
     return -ENOENT;
   }
-  int status;
 
  retry:
-  status = it->second.status.read();
-  if (status == DONE) {
+  auto status = it->second.status.load();
+  if (status == status_t::DONE) {
     ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
     *finished = true;
     data.swap(it->second.data);
     jobs.erase(it);
-  } else if (status == ERROR) {
+  } else if (status == status_t::ERROR) {
     ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl;
     jobs.erase(it);
     return -EIO;
   } else if (blocking) {
-    if (it->second.status.compare_and_swap(WAIT, DONE)) {
+    auto expected = status_t::WAIT;
+    if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) {
       ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
       if (compressor->compress(it->second.data, data)) {
         ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
-        it->second.status.set(ERROR);
+        it->second.status = status_t::ERROR;
         return -EIO;
       }
       *finished = true;
@@ -121,25 +120,24 @@ int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &dat
     ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
     return -ENOENT;
   }
-  int status;
-
  retry:
-  status = it->second.status.read();
-  if (status == DONE) {
+  auto status = it->second.status.load();
+  if (status == status_t::DONE) {
     ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
     *finished = true;
     data.swap(it->second.data);
     jobs.erase(it);
-  } else if (status == ERROR) {
+  } else if (status == status_t::ERROR) {
     ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl;
     jobs.erase(it);
     return -EIO;
   } else if (blocking) {
-    if (it->second.status.compare_and_swap(WAIT, DONE)) {
+    auto expected = status_t::WAIT;
+    if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) {
       ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
       if (compressor->decompress(it->second.data, data)) {
         ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
-        it->second.status.set(ERROR);
+        it->second.status = status_t::ERROR;
         return -EIO;
       }
       *finished = true;
index 37f70f98eb280c0d21a319fda902624c6dae9fc8..7ca8fad04d6804886e9c74c3570b9811f796d607 100644 (file)
 
 #include <deque>
 #include <vector>
-#include "include/atomic.h"
+#include <atomic>
+
+#include "include/str_list.h"
+
 #include "Compressor.h"
 #include "common/WorkQueue.h"
 
@@ -25,23 +28,24 @@ class AsyncCompressor {
  private:
   CompressorRef compressor;
   CephContext *cct;
-  atomic_t job_id;
+  std::atomic<uint64_t> job_id { 0 };
   vector<int> coreids;
   ThreadPool compress_tp;
 
-  enum {
+  enum class status_t {
     WAIT,
     WORKING,
     DONE,
     ERROR
-  } status;
+  };
+
   struct Job {
     uint64_t id;
-    atomic_t status;
+    std::atomic<status_t> status { status_t::WAIT };
     bool is_compress;
     bufferlist data;
-    Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {}
-    Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {}
+    Job(uint64_t i, bool compress): id(i), is_compress(compress) {}
+    Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {}
   };
   Mutex job_lock;
   // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
@@ -73,7 +77,9 @@ class AsyncCompressor {
       while (!job_queue.empty()) {
         item = job_queue.front();
         job_queue.pop_front();
-        if (item->status.compare_and_swap(WAIT, WORKING)) {
+
+        auto expected = status_t::WAIT;
+        if (item->status.compare_exchange_strong(expected, status_t::WORKING)) {
           break;
         } else {
           Mutex::Locker l(async_compressor->job_lock);
@@ -84,7 +90,7 @@ class AsyncCompressor {
       return item;
     }
     void _process(Job *item, ThreadPool::TPHandle &) override {
-      assert(item->status.read() == WORKING);
+      assert(item->status == status_t::WORKING);
       bufferlist out;
       int r;
       if (item->is_compress)
@@ -93,9 +99,10 @@ class AsyncCompressor {
         r = async_compressor->compressor->decompress(item->data, out);
       if (!r) {
         item->data.swap(out);
-        assert(item->status.compare_and_swap(WORKING, DONE));
+        auto expected = status_t::WORKING;
+        assert(item->status.compare_exchange_strong(expected, status_t::DONE));
       } else {
-        item->status.set(ERROR);
+        item->status = status_t::ERROR;
       }
     }
     void _process_finish(Job *item) override {}