From: Haomai Wang Date: Fri, 20 Mar 2015 04:14:02 +0000 (+0800) Subject: Compressor: add decompress failed codes X-Git-Tag: v9.1.0~481^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8e48ba16c7e96f5164910267c2162052bbfed702;p=ceph.git Compressor: add decompress failed codes Signed-off-by: Haomai Wang --- diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc index d9b52b06ac5a..e2ca2fe48bbf 100644 --- a/src/compressor/AsyncCompressor.cc +++ b/src/compressor/AsyncCompressor.cc @@ -140,17 +140,27 @@ int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, b ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; return -ENOENT; } + int status; retry: - if (it->second.status.read() == DONE) { + status = it->second.status.read(); + if (status == DONE) { ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); + } else if (status == ERROR) { + ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl; + jobs.erase(it); + return -EIO; } else if (blocking) { if (it->second.status.cas(WAIT, DONE)) { ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; - compressor->compress(it->second.data, data); + if (compressor->compress(it->second.data, data)) { + ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl; + it->second.status.set(ERROR); + return -EIO; + } *finished = true; } else { job_lock.Unlock(); @@ -174,17 +184,27 @@ int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &dat ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; return -ENOENT; } + int status; retry: - if (it->second.status.read() == DONE) { + status = it->second.status.read(); + if (status == DONE) { ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); + } else if (status == ERROR) { + ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl; + jobs.erase(it); + return -EIO; } else if (blocking) { if (it->second.status.cas(WAIT, DONE)) { ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; - compressor->decompress(it->second.data, data); + if (compressor->decompress(it->second.data, data)) { + ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl; + it->second.status.set(ERROR); + return -EIO; + } *finished = true; } else { job_lock.Unlock(); diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h index d34cbe6bf090..659f506be573 100644 --- a/src/compressor/AsyncCompressor.h +++ b/src/compressor/AsyncCompressor.h @@ -35,7 +35,8 @@ class AsyncCompressor { enum { WAIT, WORKING, - DONE + DONE, + ERROR } status; struct Job { uint64_t id; @@ -79,7 +80,6 @@ class AsyncCompressor { break; } else { Mutex::Locker (async_compressor->job_lock); - assert(item->status.read() == DONE); async_compressor->jobs.erase(item->id); item = NULL; } @@ -89,16 +89,19 @@ class AsyncCompressor { void _process(Job *item, ThreadPool::TPHandle &handle) { assert(item->status.read() == WORKING); bufferlist out; + int r; if (item->is_compress) - async_compressor->compressor->compress(item->data, out); + r = async_compressor->compressor->compress(item->data, out); else - async_compressor->compressor->decompress(item->data, out); - item->data.swap(out); - } - void _process_finish(Job *item) { - assert(item->status.read() == WORKING); - item->status.set(DONE); + r = async_compressor->compressor->decompress(item->data, out); + if (!r) { + item->data.swap(out); + assert(item->status.cas(WORKING, DONE)); + } else { + item->status.set(ERROR); + } } + void _process_finish(Job *item) {} void _clear() {} } compress_wq; friend class CompressWQ; diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h index 8dc3497c45f2..f5d86d5a2420 100644 --- a/src/compressor/SnappyCompressor.h +++ b/src/compressor/SnappyCompressor.h @@ -64,7 +64,6 @@ class SnappyCompressor : public Compressor { BufferlistSource source(src); size_t res_len = 0; // Trick, decompress only need first 32bits buffer - list::const_iterator pb = src.buffers().begin(); if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len)) return -1; bufferptr ptr(res_len); diff --git a/src/test/common/test_async_compressor.cc b/src/test/common/test_async_compressor.cc index 7988868419d4..aca448c0a00c 100644 --- a/src/test/common/test_async_compressor.cc +++ b/src/test/common/test_async_compressor.cc @@ -81,11 +81,25 @@ TEST_F(AsyncCompressorTest, GrubWaitTest) { async_compressor->init(); } +TEST_F(AsyncCompressorTest, DecompressInjectTest) { + bufferlist compress_data, decompress_data, rawdata; + generate_random_data(rawdata, 1<<22); + bool finished; + uint64_t id = async_compressor->async_compress(rawdata); + ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); + ASSERT_TRUE(finished == true); + char error[] = "asjdfkwejrljqwaelrj"; + memcpy(compress_data.c_str()+1024, error, sizeof(error)-1); + id = async_compressor->async_decompress(compress_data); + ASSERT_EQ(-EIO, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); +} + class SyntheticWorkload { set > compress_jobs, decompress_jobs; AsyncCompressor *async_compressor; vector rand_data, compress_data; gen_type rng; + static const uint64_t MAX_INFLIGHT = 128; public: SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) { @@ -124,32 +138,35 @@ class SyntheticWorkload { bool finished; set >::iterator prev; uint64_t c_reap = 0, d_reap = 0; - for (set >::iterator it = compress_jobs.begin(); - it != compress_jobs.end();) { - prev = it; - it++; - async_compressor->get_compress_data(prev->first, data, blocking, &finished); - if (finished) { - c_reap++; - if (compress_data[prev->second].length()) - ASSERT_TRUE(compress_data[prev->second].contents_equal(data)); - else - compress_data[prev->second].swap(data); - compress_jobs.erase(prev); + do { + for (set >::iterator it = compress_jobs.begin(); + it != compress_jobs.end();) { + prev = it; + it++; + ASSERT_EQ(0, async_compressor->get_compress_data(prev->first, data, blocking, &finished)); + if (finished) { + c_reap++; + if (compress_data[prev->second].length()) + ASSERT_TRUE(compress_data[prev->second].contents_equal(data)); + else + compress_data[prev->second].swap(data); + compress_jobs.erase(prev); + } } - } - for (set >::iterator it = decompress_jobs.begin(); - it != decompress_jobs.end();) { - prev = it; - it++; - async_compressor->get_decompress_data(prev->first, data, blocking, &finished); - if (finished) { - d_reap++; - ASSERT_TRUE(rand_data[prev->second].contents_equal(data)); - decompress_jobs.erase(prev); + for (set >::iterator it = decompress_jobs.begin(); + it != decompress_jobs.end();) { + prev = it; + it++; + ASSERT_EQ(0, async_compressor->get_decompress_data(prev->first, data, blocking, &finished)); + if (finished) { + d_reap++; + ASSERT_TRUE(rand_data[prev->second].contents_equal(data)); + decompress_jobs.erase(prev); + } } - } + usleep(1000 * 500); + } while (compress_jobs.size() + decompress_jobs.size() > MAX_INFLIGHT); cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl; } void print_internal_state() { @@ -164,7 +181,7 @@ TEST_F(AsyncCompressorTest, SyntheticTest) { gen_type rng(time(NULL)); boost::uniform_int<> true_false(0, 99); int val; - for (int i = 0; i < 10000; ++i) { + for (int i = 0; i < 3000; ++i) { if (!(i % 10)) { cerr << "Op " << i << ": "; test_ac.print_internal_state();