ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
return -ENOENT;
}
+ int status;
retry:
- if (it->second.status.read() == DONE) {
+ status = it->second.status.read();
+ if (status == DONE) {
ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
*finished = true;
data.swap(it->second.data);
jobs.erase(it);
+ } else if (status == ERROR) {
+ ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl;
+ jobs.erase(it);
+ return -EIO;
} else if (blocking) {
if (it->second.status.cas(WAIT, DONE)) {
ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
- compressor->compress(it->second.data, data);
+ if (compressor->compress(it->second.data, data)) {
+ ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
+ it->second.status.set(ERROR);
+ return -EIO;
+ }
*finished = true;
} else {
job_lock.Unlock();
ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
return -ENOENT;
}
+ int status;
retry:
- if (it->second.status.read() == DONE) {
+ status = it->second.status.read();
+ if (status == DONE) {
ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
*finished = true;
data.swap(it->second.data);
jobs.erase(it);
+ } else if (status == ERROR) {
+ ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl;
+ jobs.erase(it);
+ return -EIO;
} else if (blocking) {
if (it->second.status.cas(WAIT, DONE)) {
ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
- compressor->decompress(it->second.data, data);
+ if (compressor->decompress(it->second.data, data)) {
+ ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
+ it->second.status.set(ERROR);
+ return -EIO;
+ }
*finished = true;
} else {
job_lock.Unlock();
enum {
WAIT,
WORKING,
- DONE
+ DONE,
+ ERROR
} status;
struct Job {
uint64_t id;
break;
} else {
Mutex::Locker (async_compressor->job_lock);
- assert(item->status.read() == DONE);
async_compressor->jobs.erase(item->id);
item = NULL;
}
void _process(Job *item, ThreadPool::TPHandle &handle) {
assert(item->status.read() == WORKING);
bufferlist out;
+ int r;
if (item->is_compress)
- async_compressor->compressor->compress(item->data, out);
+ r = async_compressor->compressor->compress(item->data, out);
else
- async_compressor->compressor->decompress(item->data, out);
- item->data.swap(out);
- }
- void _process_finish(Job *item) {
- assert(item->status.read() == WORKING);
- item->status.set(DONE);
+ r = async_compressor->compressor->decompress(item->data, out);
+ if (!r) {
+ item->data.swap(out);
+ assert(item->status.cas(WORKING, DONE));
+ } else {
+ item->status.set(ERROR);
+ }
}
+ void _process_finish(Job *item) {}
void _clear() {}
} compress_wq;
friend class CompressWQ;
async_compressor->init();
}
+TEST_F(AsyncCompressorTest, DecompressInjectTest) {
+ bufferlist compress_data, decompress_data, rawdata;
+ generate_random_data(rawdata, 1<<22);
+ bool finished;
+ uint64_t id = async_compressor->async_compress(rawdata);
+ ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished));
+ ASSERT_TRUE(finished == true);
+ char error[] = "asjdfkwejrljqwaelrj";
+ memcpy(compress_data.c_str()+1024, error, sizeof(error)-1);
+ id = async_compressor->async_decompress(compress_data);
+ ASSERT_EQ(-EIO, async_compressor->get_decompress_data(id, decompress_data, true, &finished));
+}
+
class SyntheticWorkload {
set<pair<uint64_t, uint64_t> > compress_jobs, decompress_jobs;
AsyncCompressor *async_compressor;
vector<bufferlist> rand_data, compress_data;
gen_type rng;
+ static const uint64_t MAX_INFLIGHT = 128;
public:
SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) {
bool finished;
set<pair<uint64_t, uint64_t> >::iterator prev;
uint64_t c_reap = 0, d_reap = 0;
- for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin();
- it != compress_jobs.end();) {
- prev = it;
- it++;
- async_compressor->get_compress_data(prev->first, data, blocking, &finished);
- if (finished) {
- c_reap++;
- if (compress_data[prev->second].length())
- ASSERT_TRUE(compress_data[prev->second].contents_equal(data));
- else
- compress_data[prev->second].swap(data);
- compress_jobs.erase(prev);
+ do {
+ for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin();
+ it != compress_jobs.end();) {
+ prev = it;
+ it++;
+ ASSERT_EQ(0, async_compressor->get_compress_data(prev->first, data, blocking, &finished));
+ if (finished) {
+ c_reap++;
+ if (compress_data[prev->second].length())
+ ASSERT_TRUE(compress_data[prev->second].contents_equal(data));
+ else
+ compress_data[prev->second].swap(data);
+ compress_jobs.erase(prev);
+ }
}
- }
- for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin();
- it != decompress_jobs.end();) {
- prev = it;
- it++;
- async_compressor->get_decompress_data(prev->first, data, blocking, &finished);
- if (finished) {
- d_reap++;
- ASSERT_TRUE(rand_data[prev->second].contents_equal(data));
- decompress_jobs.erase(prev);
+ for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin();
+ it != decompress_jobs.end();) {
+ prev = it;
+ it++;
+ ASSERT_EQ(0, async_compressor->get_decompress_data(prev->first, data, blocking, &finished));
+ if (finished) {
+ d_reap++;
+ ASSERT_TRUE(rand_data[prev->second].contents_equal(data));
+ decompress_jobs.erase(prev);
+ }
}
- }
+ usleep(1000 * 500);
+ } while (compress_jobs.size() + decompress_jobs.size() > MAX_INFLIGHT);
cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl;
}
void print_internal_state() {
gen_type rng(time(NULL));
boost::uniform_int<> true_false(0, 99);
int val;
- for (int i = 0; i < 10000; ++i) {
+ for (int i = 0; i < 3000; ++i) {
if (!(i % 10)) {
cerr << "Op " << i << ": ";
test_ac.print_internal_state();