From 0ce27b9f6b764b8fddb45e651f0ae4d3dfde47c4 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 23 Oct 2017 13:39:33 +0800 Subject: [PATCH] compressor: kill AsyncCompressor which is broken Signed-off-by: Haomai Wang --- src/compressor/AsyncCompressor.cc | 155 ---------------- src/compressor/AsyncCompressor.h | 133 -------------- src/compressor/CMakeLists.txt | 3 +- src/test/common/CMakeLists.txt | 8 - src/test/common/test_async_compressor.cc | 225 ----------------------- 5 files changed, 1 insertion(+), 523 deletions(-) delete mode 100644 src/compressor/AsyncCompressor.cc delete mode 100644 src/compressor/AsyncCompressor.h delete mode 100644 src/test/common/test_async_compressor.cc diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc deleted file mode 100644 index d4a77686b534f..0000000000000 --- a/src/compressor/AsyncCompressor.cc +++ /dev/null @@ -1,155 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "common/dout.h" -#include "common/errno.h" -#include "AsyncCompressor.h" - -#define dout_subsys ceph_subsys_compressor -#undef dout_prefix -#define dout_prefix *_dout << "compressor " - -AsyncCompressor::AsyncCompressor(CephContext *c): - compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c), - compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), - job_lock("AsyncCompressor::job_lock"), - compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { -} - -void AsyncCompressor::init() -{ - ldout(cct, 10) << __func__ << dendl; - compress_tp.start(); -} - -void AsyncCompressor::terminate() -{ - ldout(cct, 10) << __func__ << dendl; - compress_tp.stop(); -} - -uint64_t AsyncCompressor::async_compress(bufferlist &data) -{ - uint64_t id = ++job_id; - pair::iterator, bool> it; - { - Mutex::Locker l(job_lock); - it = jobs.insert(make_pair(id, Job(id, true))); - it.first->second.data = data; - } - compress_wq.queue(&it.first->second); - ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl; - return id; -} - -uint64_t AsyncCompressor::async_decompress(bufferlist &data) -{ - uint64_t id = ++job_id; - pair::iterator, bool> it; - { - Mutex::Locker l(job_lock); - it = jobs.insert(make_pair(id, Job(id, false))); - it.first->second.data = data; - } - compress_wq.queue(&it.first->second); - ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl; - return id; -} - -int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished) -{ - assert(finished); - Mutex::Locker l(job_lock); - unordered_map::iterator it = jobs.find(compress_id); - if (it == jobs.end() || !it->second.is_compress) { - ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; - return -ENOENT; - } - - retry: - auto status = it->second.status.load(); - if (status == status_t::DONE) { - ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; - *finished = true; - data.swap(it->second.data); - jobs.erase(it); - } else if (status == status_t::ERROR) { - ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl; - jobs.erase(it); - return -EIO; - } else if (blocking) { - auto expected = status_t::WAIT; - if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { - ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; - if (compressor->compress(it->second.data, data)) { - ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl; - it->second.status = status_t::ERROR; - return -EIO; - } - *finished = true; - } else { - job_lock.Unlock(); - usleep(1000); - job_lock.Lock(); - goto retry; - } - } else { - ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl; - *finished = false; - } - return 0; -} - -int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished) -{ - assert(finished); - Mutex::Locker l(job_lock); - unordered_map::iterator it = jobs.find(decompress_id); - if (it == jobs.end() || it->second.is_compress) { - ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; - return -ENOENT; - } - retry: - auto status = it->second.status.load(); - if (status == status_t::DONE) { - ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; - *finished = true; - data.swap(it->second.data); - jobs.erase(it); - } else if (status == status_t::ERROR) { - ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl; - jobs.erase(it); - return -EIO; - } else if (blocking) { - auto expected = status_t::WAIT; - if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { - ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; - if (compressor->decompress(it->second.data, data)) { - ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl; - it->second.status = status_t::ERROR; - return -EIO; - } - *finished = true; - } else { - job_lock.Unlock(); - usleep(1000); - job_lock.Lock(); - goto retry; - } - } else { - ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl; - *finished = false; - } - return 0; -} diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h deleted file mode 100644 index 7ca8fad04d680..0000000000000 --- a/src/compressor/AsyncCompressor.h +++ /dev/null @@ -1,133 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_ASYNCCOMPRESSOR_H -#define CEPH_ASYNCCOMPRESSOR_H - -#include -#include -#include - -#include "include/str_list.h" - -#include "Compressor.h" -#include "common/WorkQueue.h" - -class AsyncCompressor { - private: - CompressorRef compressor; - CephContext *cct; - std::atomic job_id { 0 }; - vector coreids; - ThreadPool compress_tp; - - enum class status_t { - WAIT, - WORKING, - DONE, - ERROR - }; - - struct Job { - uint64_t id; - std::atomic status { status_t::WAIT }; - bool is_compress; - bufferlist data; - Job(uint64_t i, bool compress): id(i), is_compress(compress) {} - Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {} - }; - Mutex job_lock; - // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs - // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later - unordered_map jobs; - - struct CompressWQ : public ThreadPool::WorkQueue { - typedef AsyncCompressor::Job Job; - AsyncCompressor *async_compressor; - deque job_queue; - - CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueue("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {} - - bool _enqueue(Job *item) override { - job_queue.push_back(item); - return true; - } - void _dequeue(Job *item) override { - ceph_abort(); - } - bool _empty() override { - return job_queue.empty(); - } - Job* _dequeue() override { - if (job_queue.empty()) - return NULL; - Job *item = NULL; - while (!job_queue.empty()) { - item = job_queue.front(); - job_queue.pop_front(); - - auto expected = status_t::WAIT; - if (item->status.compare_exchange_strong(expected, status_t::WORKING)) { - break; - } else { - Mutex::Locker l(async_compressor->job_lock); - async_compressor->jobs.erase(item->id); - item = NULL; - } - } - return item; - } - void _process(Job *item, ThreadPool::TPHandle &) override { - assert(item->status == status_t::WORKING); - bufferlist out; - int r; - if (item->is_compress) - r = async_compressor->compressor->compress(item->data, out); - else - r = async_compressor->compressor->decompress(item->data, out); - if (!r) { - item->data.swap(out); - auto expected = status_t::WORKING; - assert(item->status.compare_exchange_strong(expected, status_t::DONE)); - } else { - item->status = status_t::ERROR; - } - } - void _process_finish(Job *item) override {} - void _clear() override {} - } compress_wq; - friend class CompressWQ; - void _compress(bufferlist &in, bufferlist &out); - void _decompress(bufferlist &in, bufferlist &out); - - public: - explicit AsyncCompressor(CephContext *c); - virtual ~AsyncCompressor() {} - - int get_cpuid(int id) { - if (coreids.empty()) - return -1; - return coreids[id % coreids.size()]; - } - - void init(); - void terminate(); - uint64_t async_compress(bufferlist &data); - uint64_t async_decompress(bufferlist &data); - int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished); - int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished); -}; - -#endif diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt index 8e0e61cdb91f2..798268449bccd 100644 --- a/src/compressor/CMakeLists.txt +++ b/src/compressor/CMakeLists.txt @@ -1,7 +1,6 @@ set(compressor_srcs - Compressor.cc - AsyncCompressor.cc) + Compressor.cc) add_library(compressor_objs OBJECT ${compressor_srcs}) ## compressor plugins diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index c4b0eee39d2d4..0fffb43b11d04 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -185,14 +185,6 @@ add_executable(unittest_bit_vector add_ceph_unittest(unittest_bit_vector) target_link_libraries(unittest_bit_vector ceph-common) -# unittest_async_compressor -# the test is disabled, because async_compressor is not used anywhere yet. -add_executable(unittest_async_compressor - test_async_compressor.cc) -target_link_libraries(unittest_async_compressor global ${UNITTEST_LIBS}) -add_dependencies(unittest_async_compressor ceph_snappy) -set_target_properties(unittest_async_compressor PROPERTIES COMPILE_FLAGS ${UNITTEST_CXX_FLAGS}) - # unittest_interval_map add_executable(unittest_interval_map test_interval_map.cc diff --git a/src/test/common/test_async_compressor.cc b/src/test/common/test_async_compressor.cc deleted file mode 100644 index 61949ce494ce0..0000000000000 --- a/src/test/common/test_async_compressor.cc +++ /dev/null @@ -1,225 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#include -#include -#include -#include -#include -#include "common/ceph_argparse.h" -#include "compressor/AsyncCompressor.h" -#include "global/global_init.h" - -typedef boost::mt11213b gen_type; - -class AsyncCompressorTest : public ::testing::Test { - public: - AsyncCompressor *async_compressor; - void SetUp() override { - cerr << __func__ << " start set up " << std::endl; - async_compressor = new AsyncCompressor(g_ceph_context); - async_compressor->init(); - } - void TearDown() override { - async_compressor->terminate(); - delete async_compressor; - } - - void generate_random_data(bufferlist &bl, uint64_t len = 0) { - static const char *base= "znvm,x12399zasdfjkl1209zxcvjlkasjdfljwqelrjzx,cvn,m123#*(@)"; - if (!len) { - boost::uniform_int<> kb(16, 4096); - gen_type rng(time(NULL)); - len = kb(rng) * 1024; - } - - while (bl.length() < len) - bl.append(base, sizeof(base)-1); - } -}; - -TEST_F(AsyncCompressorTest, SimpleTest) { - bufferlist compress_data, decompress_data, rawdata; - generate_random_data(rawdata, 1<<22); - bool finished; - uint64_t id = async_compressor->async_compress(rawdata); - ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); - ASSERT_TRUE(finished == true); - id = async_compressor->async_decompress(compress_data); - do { - ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, false, &finished)); - } while (!finished); - ASSERT_TRUE(finished == true); - ASSERT_TRUE(rawdata.contents_equal(decompress_data)); - ASSERT_EQ(-ENOENT, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); -} - -TEST_F(AsyncCompressorTest, GrubWaitTest) { - async_compressor->terminate(); - bufferlist compress_data, decompress_data, rawdata; - generate_random_data(rawdata, 1<<22); - bool finished; - uint64_t id = async_compressor->async_compress(rawdata); - ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); - ASSERT_TRUE(finished == true); - id = async_compressor->async_decompress(compress_data); - ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); - ASSERT_TRUE(finished == true); - ASSERT_TRUE(rawdata.contents_equal(decompress_data)); - async_compressor->init(); -} - -TEST_F(AsyncCompressorTest, DecompressInjectTest) { - bufferlist compress_data, decompress_data, rawdata; - generate_random_data(rawdata, 1<<22); - bool finished; - uint64_t id = async_compressor->async_compress(rawdata); - ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); - ASSERT_TRUE(finished == true); - char error[] = "asjdfkwejrljqwaelrj"; - memcpy(compress_data.c_str()+1024, error, sizeof(error)-1); - id = async_compressor->async_decompress(compress_data); - ASSERT_EQ(-EIO, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); -} - -class SyntheticWorkload { - set > compress_jobs, decompress_jobs; - AsyncCompressor *async_compressor; - vector rand_data, compress_data; - gen_type rng; - static const uint64_t MAX_INFLIGHT = 128; - - public: - explicit SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) { - for (int i = 0; i < 100; i++) { - bufferlist bl; - boost::uniform_int<> u(4096, 1<<24); - uint64_t value_len = u(rng); - bufferptr bp(value_len); - bp.zero(); - for (uint64_t j = 0; j < value_len-sizeof(i); ) { - memcpy(bp.c_str()+j, &i, sizeof(i)); - j += 4096; - } - - bl.append(bp); - rand_data.push_back(bl); - compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[i]), i)); - if (!(i % 10)) cerr << "seeding compress data " << i << std::endl; - } - compress_data.resize(100); - reap(true); - } - void do_compress() { - boost::uniform_int<> u(0, rand_data.size()-1); - uint64_t index = u(rng); - compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[index]), index)); - } - void do_decompress() { - boost::uniform_int<> u(0, compress_data.size()-1); - uint64_t index = u(rng); - if (compress_data[index].length()) - decompress_jobs.insert(make_pair(async_compressor->async_decompress(compress_data[index]), index)); - } - void reap(bool blocking) { - bufferlist data; - bool finished; - set >::iterator prev; - uint64_t c_reap = 0, d_reap = 0; - do { - for (set >::iterator it = compress_jobs.begin(); - it != compress_jobs.end();) { - prev = it; - ++it; - ASSERT_EQ(0, async_compressor->get_compress_data(prev->first, data, blocking, &finished)); - if (finished) { - c_reap++; - if (compress_data[prev->second].length()) - ASSERT_TRUE(compress_data[prev->second].contents_equal(data)); - else - compress_data[prev->second].swap(data); - compress_jobs.erase(prev); - } - } - - for (set >::iterator it = decompress_jobs.begin(); - it != decompress_jobs.end();) { - prev = it; - ++it; - ASSERT_EQ(0, async_compressor->get_decompress_data(prev->first, data, blocking, &finished)); - if (finished) { - d_reap++; - ASSERT_TRUE(rand_data[prev->second].contents_equal(data)); - decompress_jobs.erase(prev); - } - } - usleep(1000 * 500); - } while (compress_jobs.size() + decompress_jobs.size() > MAX_INFLIGHT); - cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl; - } - void print_internal_state() { - cerr << "inlfight compress jobs: " << compress_jobs.size() - << " inflight decompress jobs: " << decompress_jobs.size() << std::endl; - } - bool empty() const { return compress_jobs.empty() && decompress_jobs.empty(); } -}; - -TEST_F(AsyncCompressorTest, SyntheticTest) { - SyntheticWorkload test_ac(async_compressor); - gen_type rng(time(NULL)); - boost::uniform_int<> true_false(0, 99); - int val; - for (int i = 0; i < 3000; ++i) { - if (!(i % 10)) { - cerr << "Op " << i << ": "; - test_ac.print_internal_state(); - } - val = true_false(rng); - if (val < 45) { - test_ac.do_compress(); - } else if (val < 95) { - test_ac.do_decompress(); - } else { - test_ac.reap(false); - } - } - while (!test_ac.empty()) { - test_ac.reap(false); - test_ac.print_internal_state(); - usleep(1000*500); - } -} - - -int main(int argc, char **argv) { - vector args; - argv_to_vec(argc, (const char **)argv, args); - - auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); - common_init_finish(g_ceph_context); - - const char* env = getenv("CEPH_LIB"); - string directory(env ? env : ".libs"); - g_conf->set_val("plugin_dir", directory, false); - - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} - -/* - * Local Variables: - * compile-command: "cd ../.. ; make -j4 unittest_async_compressor && valgrind --tool=memcheck ./unittest_async_compressor" - * End: - */ -- 2.39.5