From: Jesse Williamson Date: Tue, 23 May 2017 12:16:07 +0000 (-0700) Subject: compressor: migrate atomic_t to X-Git-Tag: ses5-milestone6~8^2~11^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7b874e11add7fda5053dfa13992e47f6563f398e;p=ceph.git compressor: migrate atomic_t to compare_exchange_strong() is used because on architectures without CaS instructions, compare_exchange_weak() may fail. Signed-off-by: Jesse Williamson --- diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc index 7a9071a7189a..d4a77686b534 100644 --- a/src/compressor/AsyncCompressor.cc +++ b/src/compressor/AsyncCompressor.cc @@ -22,7 +22,6 @@ AsyncCompressor::AsyncCompressor(CephContext *c): compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c), - job_id(0), compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), job_lock("AsyncCompressor::job_lock"), compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { @@ -42,7 +41,7 @@ void AsyncCompressor::terminate() uint64_t AsyncCompressor::async_compress(bufferlist &data) { - uint64_t id = job_id.inc(); + uint64_t id = ++job_id; pair::iterator, bool> it; { Mutex::Locker l(job_lock); @@ -56,7 +55,7 @@ uint64_t AsyncCompressor::async_compress(bufferlist &data) uint64_t AsyncCompressor::async_decompress(bufferlist &data) { - uint64_t id = job_id.inc(); + uint64_t id = ++job_id; pair::iterator, bool> it; { Mutex::Locker l(job_lock); @@ -77,25 +76,25 @@ int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, b ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; return -ENOENT; } - int status; retry: - status = it->second.status.read(); - if (status == DONE) { + auto status = it->second.status.load(); + if (status == status_t::DONE) { ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); - } else if (status == ERROR) { + } else if (status == status_t::ERROR) { ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl; jobs.erase(it); return -EIO; } else if (blocking) { - if (it->second.status.compare_and_swap(WAIT, DONE)) { + auto expected = status_t::WAIT; + if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; if (compressor->compress(it->second.data, data)) { ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl; - it->second.status.set(ERROR); + it->second.status = status_t::ERROR; return -EIO; } *finished = true; @@ -121,25 +120,24 @@ int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &dat ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; return -ENOENT; } - int status; - retry: - status = it->second.status.read(); - if (status == DONE) { + auto status = it->second.status.load(); + if (status == status_t::DONE) { ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); - } else if (status == ERROR) { + } else if (status == status_t::ERROR) { ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl; jobs.erase(it); return -EIO; } else if (blocking) { - if (it->second.status.compare_and_swap(WAIT, DONE)) { + auto expected = status_t::WAIT; + if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; if (compressor->decompress(it->second.data, data)) { ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl; - it->second.status.set(ERROR); + it->second.status = status_t::ERROR; return -EIO; } *finished = true; diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h index 37f70f98eb28..7ca8fad04d68 100644 --- a/src/compressor/AsyncCompressor.h +++ b/src/compressor/AsyncCompressor.h @@ -17,7 +17,10 @@ #include #include -#include "include/atomic.h" +#include + +#include "include/str_list.h" + #include "Compressor.h" #include "common/WorkQueue.h" @@ -25,23 +28,24 @@ class AsyncCompressor { private: CompressorRef compressor; CephContext *cct; - atomic_t job_id; + std::atomic job_id { 0 }; vector coreids; ThreadPool compress_tp; - enum { + enum class status_t { WAIT, WORKING, DONE, ERROR - } status; + }; + struct Job { uint64_t id; - atomic_t status; + std::atomic status { status_t::WAIT }; bool is_compress; bufferlist data; - Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {} - Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {} + Job(uint64_t i, bool compress): id(i), is_compress(compress) {} + Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {} }; Mutex job_lock; // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs @@ -73,7 +77,9 @@ class AsyncCompressor { while (!job_queue.empty()) { item = job_queue.front(); job_queue.pop_front(); - if (item->status.compare_and_swap(WAIT, WORKING)) { + + auto expected = status_t::WAIT; + if (item->status.compare_exchange_strong(expected, status_t::WORKING)) { break; } else { Mutex::Locker l(async_compressor->job_lock); @@ -84,7 +90,7 @@ class AsyncCompressor { return item; } void _process(Job *item, ThreadPool::TPHandle &) override { - assert(item->status.read() == WORKING); + assert(item->status == status_t::WORKING); bufferlist out; int r; if (item->is_compress) @@ -93,9 +99,10 @@ class AsyncCompressor { r = async_compressor->compressor->decompress(item->data, out); if (!r) { item->data.swap(out); - assert(item->status.compare_and_swap(WORKING, DONE)); + auto expected = status_t::WORKING; + assert(item->status.compare_exchange_strong(expected, status_t::DONE)); } else { - item->status.set(ERROR); + item->status = status_t::ERROR; } } void _process_finish(Job *item) override {}