LIBAUTH = libauth.la
LIBMSG = libmsg.la
LIBCRUSH = libcrush.la
+LIBCOMPRESSOR = libcompressor.la -lsnappy
LIBJSON_SPIRIT = libjson_spirit.la
LIBLOG = liblog.la
LIBOS = libos.la
include test/Makefile.am
include tools/Makefile.am
include Makefile-rocksdb.am
+include compressor/Makefile.am
# shell scripts
OPTION(xio_portal_threads, OPT_INT, 2) // xio portal threads per messenger
OPTION(xio_transport_type, OPT_STR, "rdma") // xio transport type: {rdma or tcp}
+OPTION(async_compressor_enabled, OPT_BOOL, false)
+OPTION(async_compressor_type, OPT_STR, "snappy")
+OPTION(async_compressor_threads, OPT_INT, 2)
+OPTION(async_compressor_set_affinity, OPT_BOOL, true)
+// example: ms_async_affinity_cores = 0,1
+// The number of coreset is expected to equal to ms_async_op_threads, otherwise
+// extra op threads will loop ms_async_affinity_cores again.
+OPTION(async_compressor_affinity_cores, OPT_STR, "")
+OPTION(async_compressor_thread_timeout, OPT_INT, 5)
+OPTION(async_compressor_thread_suicide_timeout, OPT_INT, 30)
+
DEFAULT_SUBSYS(0, 5)
SUBSYS(lockdep, 0, 1)
SUBSYS(context, 0, 1)
SUBSYS(throttle, 1, 1)
SUBSYS(refs, 0, 0)
SUBSYS(xio, 1, 5)
+SUBSYS(compressor, 1, 5)
OPTION(key, OPT_STR, "")
OPTION(keyfile, OPT_STR, "")
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "common/dout.h"
+#include "common/errno.h"
+#include "AsyncCompressor.h"
+
+#define dout_subsys ceph_subsys_compressor
+#undef dout_prefix
+#define dout_prefix *_dout << "compressor "
+
+//void AsyncCompressor::_compress(bufferlist &in, bufferlist &out)
+//{
+// uint64_t length = 0;
+// size_t res_len;
+// uint64_t left_pbrs = in.buffers().size();
+// compressor->max_compress_size(in.length(), &res_len);
+// ldout(cct, 20) << __func__ << " data length=" << in.length() << " got max compressed size is " << res_len << dendl;
+// bufferptr ptr(res_len);
+// list<bufferptr>::const_iterator pb = in.buffers().begin();
+// while (left_pbrs--) {
+// if (compressor->compress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len))
+// assert(0);
+// ldout(cct, 20) << __func__ << " pb length=" << pb->length() << " compress size is " << res_len << dendl;
+// out.append(ptr, length, length+res_len);
+// length += res_len;
+// pb++;
+// }
+// ldout(cct, 20) << __func__ << " total compressed length is " << length << dendl;
+//}
+//
+//void AsyncCompressor::_decompress(bufferlist &in, bufferlist &out)
+//{
+// int i = 0;
+// uint64_t length = 0;
+// size_t res_len;
+// bufferptr ptr;
+// vector<uint64_t> lens;
+// list<bufferptr>::const_iterator pb = in.buffers().begin();
+// uint64_t left_pbrs = in.buffers().size();
+// while (left_pbrs--) {
+// if (compressor->max_uncompress_size(pb->c_str(), pb->length(), &res_len))
+// assert(0);
+// length += res_len;
+// lens.push_back(res_len);
+// pb++;
+// }
+// pb = in.buffers().begin();
+// left_pbrs = in.buffers().size();
+// ptr = bufferptr(length);
+// length = 0;
+// while (left_pbrs--) {
+// res_len = lens[i++];
+// if (compressor->decompress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len))
+// assert(0);
+// ldout(cct, 20) << __func__ << " pb compressed length=" << pb->length() << " actually got decompressed size is " << res_len << dendl;
+// out.append(ptr, length, length+res_len);
+// length += res_len;
+// pb++;
+// }
+// ldout(cct, 20) << __func__ << " total decompressed length is " << length << dendl;
+//}
+
+AsyncCompressor::AsyncCompressor(CephContext *c):
+ compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c),
+ job_id(0),
+ compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", cct->_conf->async_compressor_threads, "async_compressor_threads"),
+ job_lock("AsyncCompressor::job_lock"),
+ compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
+ vector<string> corestrs;
+ get_str_vec(cct->_conf->async_compressor_affinity_cores, corestrs);
+ for (vector<string>::iterator it = corestrs.begin();
+ it != corestrs.end(); ++it) {
+ string err;
+ int coreid = strict_strtol(it->c_str(), 10, &err);
+ if (err == "")
+ coreids.push_back(coreid);
+ else
+ lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->async_compressor_affinity_cores << dendl;
+ }
+}
+
+void AsyncCompressor::init()
+{
+ ldout(cct, 10) << __func__ << dendl;
+ compress_tp.start();
+}
+
+void AsyncCompressor::terminate()
+{
+ ldout(cct, 10) << __func__ << dendl;
+ compress_tp.stop();
+}
+
+uint64_t AsyncCompressor::async_compress(bufferlist &data)
+{
+ uint64_t id = job_id.inc();
+ pair<unordered_map<uint64_t, Job>::iterator, bool> it;
+ {
+ Mutex::Locker l(job_lock);
+ it = jobs.insert(make_pair(id, Job(id, true)));
+ it.first->second.data = data;
+ }
+ compress_wq.queue(&it.first->second);
+ ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl;
+ return id;
+}
+
+uint64_t AsyncCompressor::async_decompress(bufferlist &data)
+{
+ uint64_t id = job_id.inc();
+ pair<unordered_map<uint64_t, Job>::iterator, bool> it;
+ {
+ Mutex::Locker l(job_lock);
+ it = jobs.insert(make_pair(id, Job(id, false)));
+ it.first->second.data = data;
+ }
+ compress_wq.queue(&it.first->second);
+ ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl;
+ return id;
+}
+
+int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished)
+{
+ assert(finished);
+ Mutex::Locker l(job_lock);
+ unordered_map<uint64_t, Job>::iterator it = jobs.find(compress_id);
+ if (it == jobs.end() || !it->second.is_compress) {
+ ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
+ return -ENOENT;
+ }
+
+ retry:
+ if (it->second.status.read() == DONE) {
+ ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
+ *finished = true;
+ data.swap(it->second.data);
+ jobs.erase(it);
+ } else if (blocking) {
+ if (it->second.status.cas(WAIT, DONE)) {
+ ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
+ compressor->compress(it->second.data, data);
+ *finished = true;
+ } else {
+ job_lock.Unlock();
+ usleep(1000);
+ job_lock.Lock();
+ goto retry;
+ }
+ } else {
+ ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl;
+ *finished = false;
+ }
+ return 0;
+}
+
+int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished)
+{
+ assert(finished);
+ Mutex::Locker l(job_lock);
+ unordered_map<uint64_t, Job>::iterator it = jobs.find(decompress_id);
+ if (it == jobs.end() || it->second.is_compress) {
+ ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
+ return -ENOENT;
+ }
+
+ retry:
+ if (it->second.status.read() == DONE) {
+ ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
+ *finished = true;
+ data.swap(it->second.data);
+ jobs.erase(it);
+ } else if (blocking) {
+ if (it->second.status.cas(WAIT, DONE)) {
+ ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
+ compressor->decompress(it->second.data, data);
+ *finished = true;
+ } else {
+ job_lock.Unlock();
+ usleep(1000);
+ job_lock.Lock();
+ goto retry;
+ }
+ } else {
+ ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl;
+ *finished = false;
+ }
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNCCOMPRESSOR_H
+#define CEPH_ASYNCCOMPRESSOR_H
+
+#include <deque>
+
+#include "include/atomic.h"
+#include "include/str_list.h"
+#include "Compressor.h"
+#include "common/WorkQueue.h"
+
+class AsyncCompressor;
+
+class AsyncCompressor {
+ private:
+ Compressor *compressor;
+ CephContext *cct;
+ atomic_t job_id;
+ vector<int> coreids;
+ ThreadPool compress_tp;
+
+ enum {
+ WAIT,
+ WORKING,
+ DONE
+ } status;
+ struct Job {
+ uint64_t id;
+ atomic_t status;
+ bool is_compress;
+ bufferlist data;
+ Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {}
+ Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {}
+ };
+ Mutex job_lock;
+ // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
+ // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later
+ unordered_map<uint64_t, Job> jobs;
+
+ struct CompressWQ : public ThreadPool::WorkQueue<Job> {
+ typedef AsyncCompressor::Job Job;
+ AsyncCompressor *async_compressor;
+ deque<Job*> job_queue;
+
+ CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {}
+
+ bool _enqueue(Job *item) {
+ job_queue.push_back(item);
+ return true;
+ }
+ void _dequeue(Job *item) {
+ assert(0);
+ }
+ bool _empty() {
+ return job_queue.empty();
+ }
+ Job* _dequeue() {
+ if (job_queue.empty())
+ return NULL;
+ Job *item = NULL;
+ while (!job_queue.empty()) {
+ item = job_queue.front();
+ job_queue.pop_front();
+ if (item->status.cas(WAIT, WORKING)) {
+ break;
+ } else {
+ Mutex::Locker (async_compressor->job_lock);
+ assert(item->status.read() == DONE);
+ async_compressor->jobs.erase(item->id);
+ item = NULL;
+ }
+ }
+ return item;
+ }
+ void _process(Job *item, ThreadPool::TPHandle &handle) {
+ assert(item->status.read() == WORKING);
+ bufferlist out;
+ if (item->is_compress)
+ async_compressor->compressor->compress(item->data, out);
+ else
+ async_compressor->compressor->decompress(item->data, out);
+ item->data.swap(out);
+ }
+ void _process_finish(Job *item) {
+ assert(item->status.read() == WORKING);
+ item->status.set(DONE);
+ }
+ void _clear() {}
+ } compress_wq;
+ friend class CompressWQ;
+ void _compress(bufferlist &in, bufferlist &out);
+ void _decompress(bufferlist &in, bufferlist &out);
+
+ public:
+ AsyncCompressor(CephContext *c);
+ virtual ~AsyncCompressor() {}
+
+ int get_cpuid(int id) {
+ if (coreids.empty())
+ return -1;
+ return coreids[id % coreids.size()];
+ }
+
+ void init();
+ void terminate();
+ uint64_t async_compress(bufferlist &data);
+ uint64_t async_decompress(bufferlist &data);
+ int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished);
+ int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished);
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Compressor.h"
+#include "SnappyCompressor.h"
+
+
+Compressor* Compressor::create(const string &type)
+{
+ if (type == "snappy")
+ return new SnappyCompressor();
+
+ assert(0);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMPRESSOR_H
+#define CEPH_COMPRESSOR_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+
+class Compressor {
+ public:
+ virtual ~Compressor() {}
+ virtual int compress(bufferlist &in, bufferlist &out) = 0;
+ virtual int decompress(bufferlist &in, bufferlist &out) = 0;
+
+ static Compressor *create(const string &type);
+};
+
+#endif
--- /dev/null
+libcompressor_la_SOURCES = \
+ compressor/Compressor.cc \
+ compressor/AsyncCompressor.cc
+noinst_LTLIBRARIES += libcompressor.la
+
+libcompressor_la_LIBADD = $(LIBCOMMON)
+
+noinst_HEADERS += \
+ compressor/Compressor.h \
+ compressor/AsyncCompressor.h \
+ compressor/SnappyCompressor.h
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SNAPPYCOMPRESSOR_H
+#define CEPH_SNAPPYCOMPRESSOR_H
+
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include "include/buffer.h"
+#include "Compressor.h"
+
+class BufferlistSource : public snappy::Source {
+ list<bufferptr>::const_iterator pb;
+ size_t pb_off;
+ size_t left;
+
+ public:
+ BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
+ virtual ~BufferlistSource() {}
+ virtual size_t Available() const { return left; }
+ virtual const char* Peek(size_t* len) {
+ if (left) {
+ *len = pb->length() - pb_off;
+ return pb->c_str() + pb_off;
+ } else {
+ *len = 0;
+ return NULL;
+ }
+ }
+ virtual void Skip(size_t n) {
+ if (n + pb_off == pb->length()) {
+ pb++;
+ pb_off = 0;
+ } else {
+ pb_off += n;
+ }
+ left -= n;
+ }
+};
+
+class SnappyCompressor : public Compressor {
+ public:
+ virtual ~SnappyCompressor() {}
+ virtual int compress(bufferlist &src, bufferlist &dst) {
+ BufferlistSource source(src);
+ bufferptr ptr(snappy::MaxCompressedLength(src.length()));
+ snappy::UncheckedByteArraySink sink(ptr.c_str());
+ snappy::Compress(&source, &sink);
+ dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
+ return 0;
+ }
+ virtual int decompress(bufferlist &src, bufferlist &dst) {
+ BufferlistSource source(src);
+ size_t res_len = 0;
+ // Trick, decompress only need first 32bits buffer
+ list<bufferptr>::const_iterator pb = src.buffers().begin();
+ if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
+ return -1;
+ bufferptr ptr(res_len);
+ if (snappy::RawUncompress(&source, ptr.c_str())) {
+ dst.append(ptr);
+ return 0;
+ }
+ return -1;
+ }
+};
+
+#endif
unittest_subprocess_CXXFLAGS = $(UNITTEST_CXXFLAGS)
check_PROGRAMS += unittest_subprocess
+unittest_async_compressor_SOURCES = test/common/test_async_compressor.cc
+unittest_async_compressor_CXXFLAGS = $(UNITTEST_CXXFLAGS)
+unittest_async_compressor_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(LIBCOMPRESSOR)
+check_PROGRAMS += unittest_async_compressor
+
check_SCRIPTS += test/pybind/test_ceph_argparse.py
check_SCRIPTS += test/pybind/test_ceph_daemon.py
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <time.h>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/binomial_distribution.hpp>
+#include <gtest/gtest.h>
+#include "common/ceph_argparse.h"
+#include "compressor/AsyncCompressor.h"
+#include "global/global_init.h"
+
+typedef boost::mt11213b gen_type;
+
+class AsyncCompressorTest : public ::testing::Test {
+ public:
+ AsyncCompressor *async_compressor;
+ virtual void SetUp() {
+ cerr << __func__ << " start set up " << std::endl;
+ async_compressor = new AsyncCompressor(g_ceph_context);
+ async_compressor->init();
+ }
+ virtual void TearDown() {
+ async_compressor->terminate();
+ delete async_compressor;
+ }
+
+ void generate_random_data(bufferlist &bl, uint64_t len = 0) {
+ static const char *base= "znvm,x12399zasdfjkl1209zxcvjlkasjdfljwqelrjzx,cvn,m123#*(@)";
+ if (!len) {
+ boost::uniform_int<> kb(16, 4096);
+ gen_type rng(time(NULL));
+ len = kb(rng) * 1024;
+ }
+
+ while (bl.length() < len)
+ bl.append(base, sizeof(base)-1);
+ }
+};
+
+TEST_F(AsyncCompressorTest, SimpleTest) {
+ bufferlist compress_data, decompress_data, rawdata;
+ generate_random_data(rawdata, 1<<22);
+ bool finished;
+ uint64_t id = async_compressor->async_compress(rawdata);
+ ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished));
+ ASSERT_TRUE(finished == true);
+ id = async_compressor->async_decompress(compress_data);
+ do {
+ ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, false, &finished));
+ } while (!finished);
+ ASSERT_TRUE(finished == true);
+ ASSERT_TRUE(rawdata.contents_equal(decompress_data));
+ ASSERT_EQ(-ENOENT, async_compressor->get_decompress_data(id, decompress_data, true, &finished));
+}
+
+TEST_F(AsyncCompressorTest, GrubWaitTest) {
+ async_compressor->terminate();
+ bufferlist compress_data, decompress_data, rawdata;
+ generate_random_data(rawdata, 1<<22);
+ bool finished;
+ uint64_t id = async_compressor->async_compress(rawdata);
+ ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished));
+ ASSERT_TRUE(finished == true);
+ id = async_compressor->async_decompress(compress_data);
+ ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, true, &finished));
+ ASSERT_TRUE(finished == true);
+ ASSERT_TRUE(rawdata.contents_equal(decompress_data));
+ async_compressor->init();
+}
+
+class SyntheticWorkload {
+ set<pair<uint64_t, uint64_t> > compress_jobs, decompress_jobs;
+ AsyncCompressor *async_compressor;
+ vector<bufferlist> rand_data, compress_data;
+ gen_type rng;
+
+ public:
+ SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) {
+ for (int i = 0; i < 100; i++) {
+ bufferlist bl;
+ boost::uniform_int<> u(4096, 1<<24);
+ uint64_t value_len = u(rng);
+ bufferptr bp(value_len);
+ bp.zero();
+ for (uint64_t j = 0; j < value_len-sizeof(i); ) {
+ memcpy(bp.c_str()+j, &i, sizeof(i));
+ j += 4096;
+ }
+
+ bl.append(bp);
+ rand_data.push_back(bl);
+ compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[i]), i));
+ if (!(i % 10)) cerr << "seeding compress data " << i << std::endl;
+ }
+ compress_data.resize(100);
+ reap(true);
+ }
+ void do_compress() {
+ boost::uniform_int<> u(0, rand_data.size()-1);
+ uint64_t index = u(rng);
+ compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[index]), index));
+ }
+ void do_decompress() {
+ boost::uniform_int<> u(0, compress_data.size()-1);
+ uint64_t index = u(rng);
+ if (compress_data[index].length())
+ decompress_jobs.insert(make_pair(async_compressor->async_decompress(compress_data[index]), index));
+ }
+ void reap(bool blocking) {
+ bufferlist data;
+ bool finished;
+ set<pair<uint64_t, uint64_t> >::iterator prev;
+ uint64_t c_reap = 0, d_reap = 0;
+ for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin();
+ it != compress_jobs.end();) {
+ prev = it;
+ it++;
+ async_compressor->get_compress_data(prev->first, data, blocking, &finished);
+ if (finished) {
+ c_reap++;
+ if (compress_data[prev->second].length())
+ ASSERT_TRUE(compress_data[prev->second].contents_equal(data));
+ else
+ compress_data[prev->second].swap(data);
+ compress_jobs.erase(prev);
+ }
+ }
+
+ for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin();
+ it != decompress_jobs.end();) {
+ prev = it;
+ it++;
+ async_compressor->get_decompress_data(prev->first, data, blocking, &finished);
+ if (finished) {
+ d_reap++;
+ ASSERT_TRUE(rand_data[prev->second].contents_equal(data));
+ decompress_jobs.erase(prev);
+ }
+ }
+ cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl;
+ }
+ void print_internal_state() {
+ cerr << "inlfight compress jobs: " << compress_jobs.size()
+ << " inflight decompress jobs: " << decompress_jobs.size() << std::endl;
+ }
+ bool empty() const { return compress_jobs.empty() && decompress_jobs.empty(); }
+};
+
+TEST_F(AsyncCompressorTest, SyntheticTest) {
+ SyntheticWorkload test_ac(async_compressor);
+ gen_type rng(time(NULL));
+ boost::uniform_int<> true_false(0, 99);
+ int val;
+ for (int i = 0; i < 10000; ++i) {
+ if (!(i % 10)) {
+ cerr << "Op " << i << ": ";
+ test_ac.print_internal_state();
+ }
+ val = true_false(rng);
+ if (val < 45) {
+ test_ac.do_compress();
+ } else if (val < 95) {
+ test_ac.do_decompress();
+ } else {
+ test_ac.reap(false);
+ }
+ }
+ while (!test_ac.empty()) {
+ test_ac.reap(false);
+ test_ac.print_internal_state();
+ usleep(1000*500);
+ }
+}
+
+
+int main(int argc, char **argv) {
+ vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+ common_init_finish(g_ceph_context);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make -j4 unittest_async_compressor && valgrind --tool=memcheck ./unittest_async_compressor"
+ * End:
+ */