AsyncCompressor::AsyncCompressor(CephContext *c):
compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c),
- job_id(0),
compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
job_lock("AsyncCompressor::job_lock"),
compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
uint64_t AsyncCompressor::async_compress(bufferlist &data)
{
- uint64_t id = job_id.inc();
+ uint64_t id = ++job_id;
pair<unordered_map<uint64_t, Job>::iterator, bool> it;
{
Mutex::Locker l(job_lock);
uint64_t AsyncCompressor::async_decompress(bufferlist &data)
{
- uint64_t id = job_id.inc();
+ uint64_t id = ++job_id;
pair<unordered_map<uint64_t, Job>::iterator, bool> it;
{
Mutex::Locker l(job_lock);
ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
return -ENOENT;
}
- int status;
retry:
- status = it->second.status.read();
- if (status == DONE) {
+ auto status = it->second.status.load();
+ if (status == status_t::DONE) {
ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
*finished = true;
data.swap(it->second.data);
jobs.erase(it);
- } else if (status == ERROR) {
+ } else if (status == status_t::ERROR) {
ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl;
jobs.erase(it);
return -EIO;
} else if (blocking) {
- if (it->second.status.compare_and_swap(WAIT, DONE)) {
+ auto expected = status_t::WAIT;
+ if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) {
ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
if (compressor->compress(it->second.data, data)) {
ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
- it->second.status.set(ERROR);
+ it->second.status = status_t::ERROR;
return -EIO;
}
*finished = true;
ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
return -ENOENT;
}
- int status;
-
retry:
- status = it->second.status.read();
- if (status == DONE) {
+ auto status = it->second.status.load();
+ if (status == status_t::DONE) {
ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
*finished = true;
data.swap(it->second.data);
jobs.erase(it);
- } else if (status == ERROR) {
+ } else if (status == status_t::ERROR) {
ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl;
jobs.erase(it);
return -EIO;
} else if (blocking) {
- if (it->second.status.compare_and_swap(WAIT, DONE)) {
+ auto expected = status_t::WAIT;
+ if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) {
ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
if (compressor->decompress(it->second.data, data)) {
ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
- it->second.status.set(ERROR);
+ it->second.status = status_t::ERROR;
return -EIO;
}
*finished = true;
#include <deque>
#include <vector>
-#include "include/atomic.h"
+#include <atomic>
+
+#include "include/str_list.h"
+
#include "Compressor.h"
#include "common/WorkQueue.h"
private:
CompressorRef compressor;
CephContext *cct;
- atomic_t job_id;
+ std::atomic<uint64_t> job_id { 0 };
vector<int> coreids;
ThreadPool compress_tp;
- enum {
+ enum class status_t {
WAIT,
WORKING,
DONE,
ERROR
- } status;
+ };
+
struct Job {
uint64_t id;
- atomic_t status;
+ std::atomic<status_t> status { status_t::WAIT };
bool is_compress;
bufferlist data;
- Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {}
- Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {}
+ Job(uint64_t i, bool compress): id(i), is_compress(compress) {}
+ Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {}
};
Mutex job_lock;
// only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
while (!job_queue.empty()) {
item = job_queue.front();
job_queue.pop_front();
- if (item->status.compare_and_swap(WAIT, WORKING)) {
+
+ auto expected = status_t::WAIT;
+ if (item->status.compare_exchange_strong(expected, status_t::WORKING)) {
break;
} else {
Mutex::Locker l(async_compressor->job_lock);
return item;
}
void _process(Job *item, ThreadPool::TPHandle &) override {
- assert(item->status.read() == WORKING);
+ assert(item->status == status_t::WORKING);
bufferlist out;
int r;
if (item->is_compress)
r = async_compressor->compressor->decompress(item->data, out);
if (!r) {
item->data.swap(out);
- assert(item->status.compare_and_swap(WORKING, DONE));
+ auto expected = status_t::WORKING;
+ assert(item->status.compare_exchange_strong(expected, status_t::DONE));
} else {
- item->status.set(ERROR);
+ item->status = status_t::ERROR;
}
}
void _process_finish(Job *item) override {}