#undef dout_prefix
#define dout_prefix *_dout << "compressor "
-//void AsyncCompressor::_compress(bufferlist &in, bufferlist &out)
-//{
-// uint64_t length = 0;
-// size_t res_len;
-// uint64_t left_pbrs = in.buffers().size();
-// compressor->max_compress_size(in.length(), &res_len);
-// ldout(cct, 20) << __func__ << " data length=" << in.length() << " got max compressed size is " << res_len << dendl;
-// bufferptr ptr(res_len);
-// list<bufferptr>::const_iterator pb = in.buffers().begin();
-// while (left_pbrs--) {
-// if (compressor->compress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len))
-// assert(0);
-// ldout(cct, 20) << __func__ << " pb length=" << pb->length() << " compress size is " << res_len << dendl;
-// out.append(ptr, length, length+res_len);
-// length += res_len;
-// pb++;
-// }
-// ldout(cct, 20) << __func__ << " total compressed length is " << length << dendl;
-//}
-//
-//void AsyncCompressor::_decompress(bufferlist &in, bufferlist &out)
-//{
-// int i = 0;
-// uint64_t length = 0;
-// size_t res_len;
-// bufferptr ptr;
-// vector<uint64_t> lens;
-// list<bufferptr>::const_iterator pb = in.buffers().begin();
-// uint64_t left_pbrs = in.buffers().size();
-// while (left_pbrs--) {
-// if (compressor->max_uncompress_size(pb->c_str(), pb->length(), &res_len))
-// assert(0);
-// length += res_len;
-// lens.push_back(res_len);
-// pb++;
-// }
-// pb = in.buffers().begin();
-// left_pbrs = in.buffers().size();
-// ptr = bufferptr(length);
-// length = 0;
-// while (left_pbrs--) {
-// res_len = lens[i++];
-// if (compressor->decompress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len))
-// assert(0);
-// ldout(cct, 20) << __func__ << " pb compressed length=" << pb->length() << " actually got decompressed size is " << res_len << dendl;
-// out.append(ptr, length, length+res_len);
-// length += res_len;
-// pb++;
-// }
-// ldout(cct, 20) << __func__ << " total decompressed length is " << length << dendl;
-//}
-
AsyncCompressor::AsyncCompressor(CephContext *c):
compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c),
job_id(0),
jobs.erase(it);
return -EIO;
} else if (blocking) {
- if (it->second.status.cas(WAIT, DONE)) {
+ if (it->second.status.compare_and_swap(WAIT, DONE)) {
ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
if (compressor->compress(it->second.data, data)) {
ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
jobs.erase(it);
return -EIO;
} else if (blocking) {
- if (it->second.status.cas(WAIT, DONE)) {
+ if (it->second.status.compare_and_swap(WAIT, DONE)) {
ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
if (compressor->decompress(it->second.data, data)) {
ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
#include "Compressor.h"
#include "common/WorkQueue.h"
-class AsyncCompressor;
class AsyncCompressor {
private:
while (!job_queue.empty()) {
item = job_queue.front();
job_queue.pop_front();
- if (item->status.cas(WAIT, WORKING)) {
+ if (item->status.compare_and_swap(WAIT, WORKING)) {
break;
} else {
Mutex::Locker (async_compressor->job_lock);
r = async_compressor->compressor->decompress(item->data, out);
if (!r) {
item->data.swap(out);
- assert(item->status.cas(WORKING, DONE));
+ assert(item->status.compare_and_swap(WORKING, DONE));
} else {
item->status.set(ERROR);
}