From 8f0919e366c789092c6c9cbd98665d9ef0d4f69d Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 17 Mar 2015 23:00:46 +0800 Subject: [PATCH] Compressor: Add compressor infrastructure for ceph AsyncCompressor is a stucture which implement the nonblock compress method, callers could compress this and then get the resulted data later. Of course, callers could directly wait for the compressor completed. Compression algorithm usually uses lots of helper data to speed compress, so here AsyncCompressor uses dedicated threads to help cache-affinity. Second, We want to make compression parallel with normal io logic. For example, any component receive data may not need to care the actual data and need to do some things with metadata. We could let metadata process parallel with data compression. Signed-off-by: Haomai Wang --- src/Makefile-env.am | 1 + src/Makefile.am | 1 + src/common/config_opts.h | 12 ++ src/compressor/AsyncCompressor.cc | 200 ++++++++++++++++++++++ src/compressor/AsyncCompressor.h | 126 ++++++++++++++ src/compressor/Compressor.cc | 25 +++ src/compressor/Compressor.h | 30 ++++ src/compressor/Makefile.am | 11 ++ src/compressor/SnappyCompressor.h | 79 +++++++++ src/test/Makefile.am | 5 + src/test/common/test_async_compressor.cc | 204 +++++++++++++++++++++++ 11 files changed, 694 insertions(+) create mode 100644 src/compressor/AsyncCompressor.cc create mode 100644 src/compressor/AsyncCompressor.h create mode 100644 src/compressor/Compressor.cc create mode 100644 src/compressor/Compressor.h create mode 100644 src/compressor/Makefile.am create mode 100644 src/compressor/SnappyCompressor.h create mode 100644 src/test/common/test_async_compressor.cc diff --git a/src/Makefile-env.am b/src/Makefile-env.am index e176596589800..e9d0404f0f1f4 100644 --- a/src/Makefile-env.am +++ b/src/Makefile-env.am @@ -141,6 +141,7 @@ LIBPERFGLUE = libperfglue.la LIBAUTH = libauth.la LIBMSG = libmsg.la LIBCRUSH = libcrush.la +LIBCOMPRESSOR = libcompressor.la -lsnappy LIBJSON_SPIRIT = libjson_spirit.la LIBLOG = liblog.la LIBOS = libos.la diff --git a/src/Makefile.am b/src/Makefile.am index daf1a8aa88f95..b882d7a0c149e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -39,6 +39,7 @@ include rbd_replay/Makefile.am include test/Makefile.am include tools/Makefile.am include Makefile-rocksdb.am +include compressor/Makefile.am # shell scripts diff --git a/src/common/config_opts.h b/src/common/config_opts.h index ebe1cf397cf46..04691b94b4664 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -78,6 +78,17 @@ OPTION(xio_mp_max_hint, OPT_INT, 4096) // max size-hint chunks OPTION(xio_portal_threads, OPT_INT, 2) // xio portal threads per messenger OPTION(xio_transport_type, OPT_STR, "rdma") // xio transport type: {rdma or tcp} +OPTION(async_compressor_enabled, OPT_BOOL, false) +OPTION(async_compressor_type, OPT_STR, "snappy") +OPTION(async_compressor_threads, OPT_INT, 2) +OPTION(async_compressor_set_affinity, OPT_BOOL, true) +// example: ms_async_affinity_cores = 0,1 +// The number of coreset is expected to equal to ms_async_op_threads, otherwise +// extra op threads will loop ms_async_affinity_cores again. +OPTION(async_compressor_affinity_cores, OPT_STR, "") +OPTION(async_compressor_thread_timeout, OPT_INT, 5) +OPTION(async_compressor_thread_suicide_timeout, OPT_INT, 30) + DEFAULT_SUBSYS(0, 5) SUBSYS(lockdep, 0, 1) SUBSYS(context, 0, 1) @@ -122,6 +133,7 @@ SUBSYS(asok, 1, 5) SUBSYS(throttle, 1, 1) SUBSYS(refs, 0, 0) SUBSYS(xio, 1, 5) +SUBSYS(compressor, 1, 5) OPTION(key, OPT_STR, "") OPTION(keyfile, OPT_STR, "") diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc new file mode 100644 index 0000000000000..d9b52b06ac5a4 --- /dev/null +++ b/src/compressor/AsyncCompressor.cc @@ -0,0 +1,200 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/dout.h" +#include "common/errno.h" +#include "AsyncCompressor.h" + +#define dout_subsys ceph_subsys_compressor +#undef dout_prefix +#define dout_prefix *_dout << "compressor " + +//void AsyncCompressor::_compress(bufferlist &in, bufferlist &out) +//{ +// uint64_t length = 0; +// size_t res_len; +// uint64_t left_pbrs = in.buffers().size(); +// compressor->max_compress_size(in.length(), &res_len); +// ldout(cct, 20) << __func__ << " data length=" << in.length() << " got max compressed size is " << res_len << dendl; +// bufferptr ptr(res_len); +// list::const_iterator pb = in.buffers().begin(); +// while (left_pbrs--) { +// if (compressor->compress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len)) +// assert(0); +// ldout(cct, 20) << __func__ << " pb length=" << pb->length() << " compress size is " << res_len << dendl; +// out.append(ptr, length, length+res_len); +// length += res_len; +// pb++; +// } +// ldout(cct, 20) << __func__ << " total compressed length is " << length << dendl; +//} +// +//void AsyncCompressor::_decompress(bufferlist &in, bufferlist &out) +//{ +// int i = 0; +// uint64_t length = 0; +// size_t res_len; +// bufferptr ptr; +// vector lens; +// list::const_iterator pb = in.buffers().begin(); +// uint64_t left_pbrs = in.buffers().size(); +// while (left_pbrs--) { +// if (compressor->max_uncompress_size(pb->c_str(), pb->length(), &res_len)) +// assert(0); +// length += res_len; +// lens.push_back(res_len); +// pb++; +// } +// pb = in.buffers().begin(); +// left_pbrs = in.buffers().size(); +// ptr = bufferptr(length); +// length = 0; +// while (left_pbrs--) { +// res_len = lens[i++]; +// if (compressor->decompress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len)) +// assert(0); +// ldout(cct, 20) << __func__ << " pb compressed length=" << pb->length() << " actually got decompressed size is " << res_len << dendl; +// out.append(ptr, length, length+res_len); +// length += res_len; +// pb++; +// } +// ldout(cct, 20) << __func__ << " total decompressed length is " << length << dendl; +//} + +AsyncCompressor::AsyncCompressor(CephContext *c): + compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c), + job_id(0), + compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", cct->_conf->async_compressor_threads, "async_compressor_threads"), + job_lock("AsyncCompressor::job_lock"), + compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { + vector corestrs; + get_str_vec(cct->_conf->async_compressor_affinity_cores, corestrs); + for (vector::iterator it = corestrs.begin(); + it != corestrs.end(); ++it) { + string err; + int coreid = strict_strtol(it->c_str(), 10, &err); + if (err == "") + coreids.push_back(coreid); + else + lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->async_compressor_affinity_cores << dendl; + } +} + +void AsyncCompressor::init() +{ + ldout(cct, 10) << __func__ << dendl; + compress_tp.start(); +} + +void AsyncCompressor::terminate() +{ + ldout(cct, 10) << __func__ << dendl; + compress_tp.stop(); +} + +uint64_t AsyncCompressor::async_compress(bufferlist &data) +{ + uint64_t id = job_id.inc(); + pair::iterator, bool> it; + { + Mutex::Locker l(job_lock); + it = jobs.insert(make_pair(id, Job(id, true))); + it.first->second.data = data; + } + compress_wq.queue(&it.first->second); + ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl; + return id; +} + +uint64_t AsyncCompressor::async_decompress(bufferlist &data) +{ + uint64_t id = job_id.inc(); + pair::iterator, bool> it; + { + Mutex::Locker l(job_lock); + it = jobs.insert(make_pair(id, Job(id, false))); + it.first->second.data = data; + } + compress_wq.queue(&it.first->second); + ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl; + return id; +} + +int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished) +{ + assert(finished); + Mutex::Locker l(job_lock); + unordered_map::iterator it = jobs.find(compress_id); + if (it == jobs.end() || !it->second.is_compress) { + ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; + return -ENOENT; + } + + retry: + if (it->second.status.read() == DONE) { + ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; + *finished = true; + data.swap(it->second.data); + jobs.erase(it); + } else if (blocking) { + if (it->second.status.cas(WAIT, DONE)) { + ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; + compressor->compress(it->second.data, data); + *finished = true; + } else { + job_lock.Unlock(); + usleep(1000); + job_lock.Lock(); + goto retry; + } + } else { + ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl; + *finished = false; + } + return 0; +} + +int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished) +{ + assert(finished); + Mutex::Locker l(job_lock); + unordered_map::iterator it = jobs.find(decompress_id); + if (it == jobs.end() || it->second.is_compress) { + ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; + return -ENOENT; + } + + retry: + if (it->second.status.read() == DONE) { + ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; + *finished = true; + data.swap(it->second.data); + jobs.erase(it); + } else if (blocking) { + if (it->second.status.cas(WAIT, DONE)) { + ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; + compressor->decompress(it->second.data, data); + *finished = true; + } else { + job_lock.Unlock(); + usleep(1000); + job_lock.Lock(); + goto retry; + } + } else { + ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl; + *finished = false; + } + return 0; +} diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h new file mode 100644 index 0000000000000..d34cbe6bf0907 --- /dev/null +++ b/src/compressor/AsyncCompressor.h @@ -0,0 +1,126 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNCCOMPRESSOR_H +#define CEPH_ASYNCCOMPRESSOR_H + +#include + +#include "include/atomic.h" +#include "include/str_list.h" +#include "Compressor.h" +#include "common/WorkQueue.h" + +class AsyncCompressor; + +class AsyncCompressor { + private: + Compressor *compressor; + CephContext *cct; + atomic_t job_id; + vector coreids; + ThreadPool compress_tp; + + enum { + WAIT, + WORKING, + DONE + } status; + struct Job { + uint64_t id; + atomic_t status; + bool is_compress; + bufferlist data; + Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {} + Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {} + }; + Mutex job_lock; + // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs + // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later + unordered_map jobs; + + struct CompressWQ : public ThreadPool::WorkQueue { + typedef AsyncCompressor::Job Job; + AsyncCompressor *async_compressor; + deque job_queue; + + CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {} + + bool _enqueue(Job *item) { + job_queue.push_back(item); + return true; + } + void _dequeue(Job *item) { + assert(0); + } + bool _empty() { + return job_queue.empty(); + } + Job* _dequeue() { + if (job_queue.empty()) + return NULL; + Job *item = NULL; + while (!job_queue.empty()) { + item = job_queue.front(); + job_queue.pop_front(); + if (item->status.cas(WAIT, WORKING)) { + break; + } else { + Mutex::Locker (async_compressor->job_lock); + assert(item->status.read() == DONE); + async_compressor->jobs.erase(item->id); + item = NULL; + } + } + return item; + } + void _process(Job *item, ThreadPool::TPHandle &handle) { + assert(item->status.read() == WORKING); + bufferlist out; + if (item->is_compress) + async_compressor->compressor->compress(item->data, out); + else + async_compressor->compressor->decompress(item->data, out); + item->data.swap(out); + } + void _process_finish(Job *item) { + assert(item->status.read() == WORKING); + item->status.set(DONE); + } + void _clear() {} + } compress_wq; + friend class CompressWQ; + void _compress(bufferlist &in, bufferlist &out); + void _decompress(bufferlist &in, bufferlist &out); + + public: + AsyncCompressor(CephContext *c); + virtual ~AsyncCompressor() {} + + int get_cpuid(int id) { + if (coreids.empty()) + return -1; + return coreids[id % coreids.size()]; + } + + void init(); + void terminate(); + uint64_t async_compress(bufferlist &data); + uint64_t async_decompress(bufferlist &data); + int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished); + int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished); +}; + +#endif diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc new file mode 100644 index 0000000000000..0d11e748d41d7 --- /dev/null +++ b/src/compressor/Compressor.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Compressor.h" +#include "SnappyCompressor.h" + + +Compressor* Compressor::create(const string &type) +{ + if (type == "snappy") + return new SnappyCompressor(); + + assert(0); +} diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h new file mode 100644 index 0000000000000..3eb71aa5c45f8 --- /dev/null +++ b/src/compressor/Compressor.h @@ -0,0 +1,30 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMPRESSOR_H +#define CEPH_COMPRESSOR_H + +#include "include/int_types.h" +#include "include/Context.h" + +class Compressor { + public: + virtual ~Compressor() {} + virtual int compress(bufferlist &in, bufferlist &out) = 0; + virtual int decompress(bufferlist &in, bufferlist &out) = 0; + + static Compressor *create(const string &type); +}; + +#endif diff --git a/src/compressor/Makefile.am b/src/compressor/Makefile.am new file mode 100644 index 0000000000000..bd2a2d7d1744f --- /dev/null +++ b/src/compressor/Makefile.am @@ -0,0 +1,11 @@ +libcompressor_la_SOURCES = \ + compressor/Compressor.cc \ + compressor/AsyncCompressor.cc +noinst_LTLIBRARIES += libcompressor.la + +libcompressor_la_LIBADD = $(LIBCOMMON) + +noinst_HEADERS += \ + compressor/Compressor.h \ + compressor/AsyncCompressor.h \ + compressor/SnappyCompressor.h diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h new file mode 100644 index 0000000000000..8dc3497c45f27 --- /dev/null +++ b/src/compressor/SnappyCompressor.h @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SNAPPYCOMPRESSOR_H +#define CEPH_SNAPPYCOMPRESSOR_H + +#include +#include +#include "include/buffer.h" +#include "Compressor.h" + +class BufferlistSource : public snappy::Source { + list::const_iterator pb; + size_t pb_off; + size_t left; + + public: + BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {} + virtual ~BufferlistSource() {} + virtual size_t Available() const { return left; } + virtual const char* Peek(size_t* len) { + if (left) { + *len = pb->length() - pb_off; + return pb->c_str() + pb_off; + } else { + *len = 0; + return NULL; + } + } + virtual void Skip(size_t n) { + if (n + pb_off == pb->length()) { + pb++; + pb_off = 0; + } else { + pb_off += n; + } + left -= n; + } +}; + +class SnappyCompressor : public Compressor { + public: + virtual ~SnappyCompressor() {} + virtual int compress(bufferlist &src, bufferlist &dst) { + BufferlistSource source(src); + bufferptr ptr(snappy::MaxCompressedLength(src.length())); + snappy::UncheckedByteArraySink sink(ptr.c_str()); + snappy::Compress(&source, &sink); + dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str()); + return 0; + } + virtual int decompress(bufferlist &src, bufferlist &dst) { + BufferlistSource source(src); + size_t res_len = 0; + // Trick, decompress only need first 32bits buffer + list::const_iterator pb = src.buffers().begin(); + if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len)) + return -1; + bufferptr ptr(res_len); + if (snappy::RawUncompress(&source, ptr.c_str())) { + dst.append(ptr); + return 0; + } + return -1; + } +}; + +#endif diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 9d56f055b9869..64e3f3b9d6d4b 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -414,6 +414,11 @@ unittest_subprocess_LDADD = $(LIBCOMMON) $(UNITTEST_LDADD) unittest_subprocess_CXXFLAGS = $(UNITTEST_CXXFLAGS) check_PROGRAMS += unittest_subprocess +unittest_async_compressor_SOURCES = test/common/test_async_compressor.cc +unittest_async_compressor_CXXFLAGS = $(UNITTEST_CXXFLAGS) +unittest_async_compressor_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(LIBCOMPRESSOR) +check_PROGRAMS += unittest_async_compressor + check_SCRIPTS += test/pybind/test_ceph_argparse.py check_SCRIPTS += test/pybind/test_ceph_daemon.py diff --git a/src/test/common/test_async_compressor.cc b/src/test/common/test_async_compressor.cc new file mode 100644 index 0000000000000..7988868419d4b --- /dev/null +++ b/src/test/common/test_async_compressor.cc @@ -0,0 +1,204 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include +#include +#include +#include +#include +#include "common/ceph_argparse.h" +#include "compressor/AsyncCompressor.h" +#include "global/global_init.h" + +typedef boost::mt11213b gen_type; + +class AsyncCompressorTest : public ::testing::Test { + public: + AsyncCompressor *async_compressor; + virtual void SetUp() { + cerr << __func__ << " start set up " << std::endl; + async_compressor = new AsyncCompressor(g_ceph_context); + async_compressor->init(); + } + virtual void TearDown() { + async_compressor->terminate(); + delete async_compressor; + } + + void generate_random_data(bufferlist &bl, uint64_t len = 0) { + static const char *base= "znvm,x12399zasdfjkl1209zxcvjlkasjdfljwqelrjzx,cvn,m123#*(@)"; + if (!len) { + boost::uniform_int<> kb(16, 4096); + gen_type rng(time(NULL)); + len = kb(rng) * 1024; + } + + while (bl.length() < len) + bl.append(base, sizeof(base)-1); + } +}; + +TEST_F(AsyncCompressorTest, SimpleTest) { + bufferlist compress_data, decompress_data, rawdata; + generate_random_data(rawdata, 1<<22); + bool finished; + uint64_t id = async_compressor->async_compress(rawdata); + ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); + ASSERT_TRUE(finished == true); + id = async_compressor->async_decompress(compress_data); + do { + ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, false, &finished)); + } while (!finished); + ASSERT_TRUE(finished == true); + ASSERT_TRUE(rawdata.contents_equal(decompress_data)); + ASSERT_EQ(-ENOENT, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); +} + +TEST_F(AsyncCompressorTest, GrubWaitTest) { + async_compressor->terminate(); + bufferlist compress_data, decompress_data, rawdata; + generate_random_data(rawdata, 1<<22); + bool finished; + uint64_t id = async_compressor->async_compress(rawdata); + ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); + ASSERT_TRUE(finished == true); + id = async_compressor->async_decompress(compress_data); + ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); + ASSERT_TRUE(finished == true); + ASSERT_TRUE(rawdata.contents_equal(decompress_data)); + async_compressor->init(); +} + +class SyntheticWorkload { + set > compress_jobs, decompress_jobs; + AsyncCompressor *async_compressor; + vector rand_data, compress_data; + gen_type rng; + + public: + SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) { + for (int i = 0; i < 100; i++) { + bufferlist bl; + boost::uniform_int<> u(4096, 1<<24); + uint64_t value_len = u(rng); + bufferptr bp(value_len); + bp.zero(); + for (uint64_t j = 0; j < value_len-sizeof(i); ) { + memcpy(bp.c_str()+j, &i, sizeof(i)); + j += 4096; + } + + bl.append(bp); + rand_data.push_back(bl); + compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[i]), i)); + if (!(i % 10)) cerr << "seeding compress data " << i << std::endl; + } + compress_data.resize(100); + reap(true); + } + void do_compress() { + boost::uniform_int<> u(0, rand_data.size()-1); + uint64_t index = u(rng); + compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[index]), index)); + } + void do_decompress() { + boost::uniform_int<> u(0, compress_data.size()-1); + uint64_t index = u(rng); + if (compress_data[index].length()) + decompress_jobs.insert(make_pair(async_compressor->async_decompress(compress_data[index]), index)); + } + void reap(bool blocking) { + bufferlist data; + bool finished; + set >::iterator prev; + uint64_t c_reap = 0, d_reap = 0; + for (set >::iterator it = compress_jobs.begin(); + it != compress_jobs.end();) { + prev = it; + it++; + async_compressor->get_compress_data(prev->first, data, blocking, &finished); + if (finished) { + c_reap++; + if (compress_data[prev->second].length()) + ASSERT_TRUE(compress_data[prev->second].contents_equal(data)); + else + compress_data[prev->second].swap(data); + compress_jobs.erase(prev); + } + } + + for (set >::iterator it = decompress_jobs.begin(); + it != decompress_jobs.end();) { + prev = it; + it++; + async_compressor->get_decompress_data(prev->first, data, blocking, &finished); + if (finished) { + d_reap++; + ASSERT_TRUE(rand_data[prev->second].contents_equal(data)); + decompress_jobs.erase(prev); + } + } + cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl; + } + void print_internal_state() { + cerr << "inlfight compress jobs: " << compress_jobs.size() + << " inflight decompress jobs: " << decompress_jobs.size() << std::endl; + } + bool empty() const { return compress_jobs.empty() && decompress_jobs.empty(); } +}; + +TEST_F(AsyncCompressorTest, SyntheticTest) { + SyntheticWorkload test_ac(async_compressor); + gen_type rng(time(NULL)); + boost::uniform_int<> true_false(0, 99); + int val; + for (int i = 0; i < 10000; ++i) { + if (!(i % 10)) { + cerr << "Op " << i << ": "; + test_ac.print_internal_state(); + } + val = true_false(rng); + if (val < 45) { + test_ac.do_compress(); + } else if (val < 95) { + test_ac.do_decompress(); + } else { + test_ac.reap(false); + } + } + while (!test_ac.empty()) { + test_ac.reap(false); + test_ac.print_internal_state(); + usleep(1000*500); + } +} + + +int main(int argc, char **argv) { + vector args; + argv_to_vec(argc, (const char **)argv, args); + + global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +/* + * Local Variables: + * compile-command: "cd ../.. ; make -j4 unittest_async_compressor && valgrind --tool=memcheck ./unittest_async_compressor" + * End: + */ -- 2.39.5