]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Compressor: Add compressor infrastructure for ceph
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 17 Mar 2015 15:00:46 +0000 (23:00 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 1 Jul 2015 13:11:57 +0000 (21:11 +0800)
AsyncCompressor is a stucture which implement the nonblock compress method,
callers could compress this and then get the resulted data later. Of course,
callers could directly wait for the compressor completed.

Compression algorithm usually uses lots of helper data to speed compress,
so here AsyncCompressor uses dedicated threads to help cache-affinity. Second,
We want to make compression parallel with normal io logic. For example,
any component receive data may not need to care the actual data and need to
do some things with metadata. We could let metadata process parallel with data
compression.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/Makefile-env.am
src/Makefile.am
src/common/config_opts.h
src/compressor/AsyncCompressor.cc [new file with mode: 0644]
src/compressor/AsyncCompressor.h [new file with mode: 0644]
src/compressor/Compressor.cc [new file with mode: 0644]
src/compressor/Compressor.h [new file with mode: 0644]
src/compressor/Makefile.am [new file with mode: 0644]
src/compressor/SnappyCompressor.h [new file with mode: 0644]
src/test/Makefile.am
src/test/common/test_async_compressor.cc [new file with mode: 0644]

index e176596589800345e03d3d9e602d0c393e81a7f6..e9d0404f0f1f4cd7ab8345e3434e662bb29f7a39 100644 (file)
@@ -141,6 +141,7 @@ LIBPERFGLUE = libperfglue.la
 LIBAUTH = libauth.la
 LIBMSG = libmsg.la
 LIBCRUSH = libcrush.la
+LIBCOMPRESSOR = libcompressor.la -lsnappy
 LIBJSON_SPIRIT = libjson_spirit.la
 LIBLOG = liblog.la
 LIBOS = libos.la
index daf1a8aa88f95a868be282f509a2ea0034c53b38..b882d7a0c149e5b538696fa9880f391e9675c7e6 100644 (file)
@@ -39,6 +39,7 @@ include rbd_replay/Makefile.am
 include test/Makefile.am
 include tools/Makefile.am
 include Makefile-rocksdb.am
+include compressor/Makefile.am
 
 
 # shell scripts
index ebe1cf397cf46c93c97372fc9d6e0715f41bb2d6..04691b94b4664de12c47f865f3673ec1f5b0986f 100644 (file)
@@ -78,6 +78,17 @@ OPTION(xio_mp_max_hint, OPT_INT, 4096) // max size-hint chunks
 OPTION(xio_portal_threads, OPT_INT, 2) // xio portal threads per messenger
 OPTION(xio_transport_type, OPT_STR, "rdma") // xio transport type: {rdma or tcp}
 
+OPTION(async_compressor_enabled, OPT_BOOL, false)
+OPTION(async_compressor_type, OPT_STR, "snappy")
+OPTION(async_compressor_threads, OPT_INT, 2)
+OPTION(async_compressor_set_affinity, OPT_BOOL, true)
+// example: ms_async_affinity_cores = 0,1
+// The number of coreset is expected to equal to ms_async_op_threads, otherwise
+// extra op threads will loop ms_async_affinity_cores again.
+OPTION(async_compressor_affinity_cores, OPT_STR, "")
+OPTION(async_compressor_thread_timeout, OPT_INT, 5)
+OPTION(async_compressor_thread_suicide_timeout, OPT_INT, 30)
+
 DEFAULT_SUBSYS(0, 5)
 SUBSYS(lockdep, 0, 1)
 SUBSYS(context, 0, 1)
@@ -122,6 +133,7 @@ SUBSYS(asok, 1, 5)
 SUBSYS(throttle, 1, 1)
 SUBSYS(refs, 0, 0)
 SUBSYS(xio, 1, 5)
+SUBSYS(compressor, 1, 5)
 
 OPTION(key, OPT_STR, "")
 OPTION(keyfile, OPT_STR, "")
diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc
new file mode 100644 (file)
index 0000000..d9b52b0
--- /dev/null
@@ -0,0 +1,200 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/dout.h"
+#include "common/errno.h"
+#include "AsyncCompressor.h"
+
+#define dout_subsys ceph_subsys_compressor
+#undef dout_prefix
+#define dout_prefix *_dout << "compressor "
+
+//void AsyncCompressor::_compress(bufferlist &in, bufferlist &out)
+//{
+//  uint64_t length = 0;
+//  size_t res_len;
+//  uint64_t left_pbrs = in.buffers().size();
+//  compressor->max_compress_size(in.length(), &res_len);
+//  ldout(cct, 20) << __func__ << " data length=" << in.length() << " got max compressed size is " << res_len << dendl;
+//  bufferptr ptr(res_len);
+//  list<bufferptr>::const_iterator pb = in.buffers().begin();
+//  while (left_pbrs--) {
+//    if (compressor->compress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len))
+//      assert(0);
+//    ldout(cct, 20) << __func__ << " pb length=" << pb->length() << " compress size is " << res_len << dendl;
+//    out.append(ptr, length, length+res_len);
+//    length += res_len;
+//    pb++;
+//  }
+//  ldout(cct, 20) << __func__ << " total compressed length is " << length << dendl;
+//}
+//
+//void AsyncCompressor::_decompress(bufferlist &in, bufferlist &out)
+//{
+//  int i = 0;
+//  uint64_t length = 0;
+//  size_t res_len;
+//  bufferptr ptr;
+//  vector<uint64_t> lens;
+//  list<bufferptr>::const_iterator pb = in.buffers().begin();
+//  uint64_t left_pbrs = in.buffers().size();
+//  while (left_pbrs--) {
+//    if (compressor->max_uncompress_size(pb->c_str(), pb->length(), &res_len))
+//      assert(0);
+//    length += res_len;
+//    lens.push_back(res_len);
+//    pb++;
+//  }
+//  pb = in.buffers().begin();
+//  left_pbrs = in.buffers().size();
+//  ptr = bufferptr(length);
+//  length = 0;
+//  while (left_pbrs--) {
+//    res_len = lens[i++];
+//    if (compressor->decompress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len))
+//      assert(0);
+//    ldout(cct, 20) << __func__ << " pb compressed length=" << pb->length() << " actually got decompressed size is " << res_len << dendl;
+//    out.append(ptr, length, length+res_len);
+//    length += res_len;
+//    pb++;
+//  }
+//  ldout(cct, 20) << __func__ << " total decompressed length is " << length << dendl;
+//}
+
+AsyncCompressor::AsyncCompressor(CephContext *c):
+  compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c),
+  job_id(0),
+  compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", cct->_conf->async_compressor_threads, "async_compressor_threads"),
+  job_lock("AsyncCompressor::job_lock"),
+  compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
+  vector<string> corestrs;
+  get_str_vec(cct->_conf->async_compressor_affinity_cores, corestrs);
+  for (vector<string>::iterator it = corestrs.begin();
+       it != corestrs.end(); ++it) {
+    string err;
+    int coreid = strict_strtol(it->c_str(), 10, &err);
+    if (err == "")
+      coreids.push_back(coreid);
+    else
+      lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->async_compressor_affinity_cores << dendl;
+  }
+}
+
+void AsyncCompressor::init()
+{
+  ldout(cct, 10) << __func__ << dendl;
+  compress_tp.start();
+}
+
+void AsyncCompressor::terminate()
+{
+  ldout(cct, 10) << __func__ << dendl;
+  compress_tp.stop();
+}
+
+uint64_t AsyncCompressor::async_compress(bufferlist &data)
+{
+  uint64_t id = job_id.inc();
+  pair<unordered_map<uint64_t, Job>::iterator, bool> it;
+  {
+    Mutex::Locker l(job_lock);
+    it = jobs.insert(make_pair(id, Job(id, true)));
+    it.first->second.data = data;
+  }
+  compress_wq.queue(&it.first->second);
+  ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl;
+  return id;
+}
+
+uint64_t AsyncCompressor::async_decompress(bufferlist &data)
+{
+  uint64_t id = job_id.inc();
+  pair<unordered_map<uint64_t, Job>::iterator, bool> it;
+  {
+    Mutex::Locker l(job_lock);
+    it = jobs.insert(make_pair(id, Job(id, false)));
+    it.first->second.data = data;
+  }
+  compress_wq.queue(&it.first->second);
+  ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl;
+  return id;
+}
+
+int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished)
+{
+  assert(finished);
+  Mutex::Locker l(job_lock);
+  unordered_map<uint64_t, Job>::iterator it = jobs.find(compress_id);
+  if (it == jobs.end() || !it->second.is_compress) {
+    ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
+    return -ENOENT;
+  }
+
+ retry:
+  if (it->second.status.read() == DONE) {
+    ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
+    *finished = true;
+    data.swap(it->second.data);
+    jobs.erase(it);
+  } else if (blocking) {
+    if (it->second.status.cas(WAIT, DONE)) {
+      ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
+      compressor->compress(it->second.data, data);
+      *finished = true;
+    } else {
+      job_lock.Unlock();
+      usleep(1000);
+      job_lock.Lock();
+      goto retry;
+    }
+  } else {
+    ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl;
+    *finished = false;
+  }
+  return 0;
+}
+
+int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished)
+{
+  assert(finished);
+  Mutex::Locker l(job_lock);
+  unordered_map<uint64_t, Job>::iterator it = jobs.find(decompress_id);
+  if (it == jobs.end() || it->second.is_compress) {
+    ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
+    return -ENOENT;
+  }
+
+ retry:
+  if (it->second.status.read() == DONE) {
+    ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
+    *finished = true;
+    data.swap(it->second.data);
+    jobs.erase(it);
+  } else if (blocking) {
+    if (it->second.status.cas(WAIT, DONE)) {
+      ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
+      compressor->decompress(it->second.data, data);
+      *finished = true;
+    } else {
+      job_lock.Unlock();
+      usleep(1000);
+      job_lock.Lock();
+      goto retry;
+    }
+  } else {
+    ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl;
+    *finished = false;
+  }
+  return 0;
+}
diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h
new file mode 100644 (file)
index 0000000..d34cbe6
--- /dev/null
@@ -0,0 +1,126 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ASYNCCOMPRESSOR_H
+#define CEPH_ASYNCCOMPRESSOR_H
+
+#include <deque>
+
+#include "include/atomic.h"
+#include "include/str_list.h"
+#include "Compressor.h"
+#include "common/WorkQueue.h"
+
+class AsyncCompressor;
+
+class AsyncCompressor {
+ private:
+  Compressor *compressor;
+  CephContext *cct;
+  atomic_t job_id;
+  vector<int> coreids;
+  ThreadPool compress_tp;
+
+  enum {
+    WAIT,
+    WORKING,
+    DONE
+  } status;
+  struct Job {
+    uint64_t id;
+    atomic_t status;
+    bool is_compress;
+    bufferlist data;
+    Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {}
+    Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {}
+  };
+  Mutex job_lock;
+  // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
+  // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later
+  unordered_map<uint64_t, Job> jobs;
+
+  struct CompressWQ : public ThreadPool::WorkQueue<Job> {
+    typedef AsyncCompressor::Job Job;
+    AsyncCompressor *async_compressor;
+    deque<Job*> job_queue;
+
+    CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {}
+
+    bool _enqueue(Job *item) {
+      job_queue.push_back(item);
+      return true;
+    }
+    void _dequeue(Job *item) {
+      assert(0);
+    }
+    bool _empty() {
+      return job_queue.empty();
+    }
+    Job* _dequeue() {
+      if (job_queue.empty())
+        return NULL;
+      Job *item = NULL;
+      while (!job_queue.empty()) {
+        item = job_queue.front();
+        job_queue.pop_front();
+        if (item->status.cas(WAIT, WORKING)) {
+          break;
+        } else {
+          Mutex::Locker (async_compressor->job_lock);
+          assert(item->status.read() == DONE);
+          async_compressor->jobs.erase(item->id);
+          item = NULL;
+        }
+      }
+      return item;
+    }
+    void _process(Job *item, ThreadPool::TPHandle &handle) {
+      assert(item->status.read() == WORKING);
+      bufferlist out;
+      if (item->is_compress)
+        async_compressor->compressor->compress(item->data, out);
+      else
+        async_compressor->compressor->decompress(item->data, out);
+      item->data.swap(out);
+    }
+    void _process_finish(Job *item) {
+      assert(item->status.read() == WORKING);
+      item->status.set(DONE);
+    }
+    void _clear() {}
+  } compress_wq;
+  friend class CompressWQ;
+  void _compress(bufferlist &in, bufferlist &out);
+  void _decompress(bufferlist &in, bufferlist &out);
+
+ public:
+  AsyncCompressor(CephContext *c);
+  virtual ~AsyncCompressor() {}
+
+  int get_cpuid(int id) {
+    if (coreids.empty())
+      return -1;
+    return coreids[id % coreids.size()];
+  }
+
+  void init();
+  void terminate();
+  uint64_t async_compress(bufferlist &data);
+  uint64_t async_decompress(bufferlist &data);
+  int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished);
+  int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished);
+};
+
+#endif
diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc
new file mode 100644 (file)
index 0000000..0d11e74
--- /dev/null
@@ -0,0 +1,25 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Compressor.h"
+#include "SnappyCompressor.h"
+
+
+Compressor* Compressor::create(const string &type)
+{
+  if (type == "snappy")
+    return new SnappyCompressor();
+
+  assert(0);
+}
diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h
new file mode 100644 (file)
index 0000000..3eb71aa
--- /dev/null
@@ -0,0 +1,30 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMPRESSOR_H
+#define CEPH_COMPRESSOR_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+
+class Compressor {
+ public:
+  virtual ~Compressor() {}
+  virtual int compress(bufferlist &in, bufferlist &out) = 0;
+  virtual int decompress(bufferlist &in, bufferlist &out) = 0;
+
+  static Compressor *create(const string &type);
+};
+
+#endif
diff --git a/src/compressor/Makefile.am b/src/compressor/Makefile.am
new file mode 100644 (file)
index 0000000..bd2a2d7
--- /dev/null
@@ -0,0 +1,11 @@
+libcompressor_la_SOURCES = \
+       compressor/Compressor.cc \
+       compressor/AsyncCompressor.cc
+noinst_LTLIBRARIES += libcompressor.la
+
+libcompressor_la_LIBADD = $(LIBCOMMON)
+
+noinst_HEADERS += \
+       compressor/Compressor.h \
+       compressor/AsyncCompressor.h \
+       compressor/SnappyCompressor.h
diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h
new file mode 100644 (file)
index 0000000..8dc3497
--- /dev/null
@@ -0,0 +1,79 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SNAPPYCOMPRESSOR_H
+#define CEPH_SNAPPYCOMPRESSOR_H
+
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include "include/buffer.h"
+#include "Compressor.h"
+
+class BufferlistSource : public snappy::Source {
+  list<bufferptr>::const_iterator pb;
+  size_t pb_off;
+  size_t left;
+
+ public:
+  BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
+  virtual ~BufferlistSource() {}
+  virtual size_t Available() const { return left; }
+  virtual const char* Peek(size_t* len) {
+    if (left) {
+      *len = pb->length() - pb_off;
+      return pb->c_str() + pb_off;
+    } else {
+      *len = 0;
+      return NULL;
+    }
+  }
+  virtual void Skip(size_t n) {
+    if (n + pb_off == pb->length()) {
+      pb++;
+      pb_off = 0;
+    } else {
+      pb_off += n;
+    }
+    left -= n;
+  }
+};
+
+class SnappyCompressor : public Compressor {
+ public:
+  virtual ~SnappyCompressor() {}
+  virtual int compress(bufferlist &src, bufferlist &dst) {
+    BufferlistSource source(src);
+    bufferptr ptr(snappy::MaxCompressedLength(src.length()));
+    snappy::UncheckedByteArraySink sink(ptr.c_str());
+    snappy::Compress(&source, &sink);
+    dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
+    return 0;
+  }
+  virtual int decompress(bufferlist &src, bufferlist &dst) {
+    BufferlistSource source(src);
+    size_t res_len = 0;
+    // Trick, decompress only need first 32bits buffer
+    list<bufferptr>::const_iterator pb = src.buffers().begin();
+    if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
+      return -1;
+    bufferptr ptr(res_len);
+    if (snappy::RawUncompress(&source, ptr.c_str())) {
+      dst.append(ptr);
+      return 0;
+    }
+    return -1;
+  }
+};
+
+#endif
index 9d56f055b9869cdb39ba22a9e561a85e5a9ffe46..64e3f3b9d6d4b5d2802c0b9aa5691e20d94da84d 100644 (file)
@@ -414,6 +414,11 @@ unittest_subprocess_LDADD = $(LIBCOMMON) $(UNITTEST_LDADD)
 unittest_subprocess_CXXFLAGS = $(UNITTEST_CXXFLAGS)
 check_PROGRAMS += unittest_subprocess
 
+unittest_async_compressor_SOURCES = test/common/test_async_compressor.cc
+unittest_async_compressor_CXXFLAGS = $(UNITTEST_CXXFLAGS)
+unittest_async_compressor_LDADD = $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(LIBCOMPRESSOR)
+check_PROGRAMS += unittest_async_compressor
+
 check_SCRIPTS += test/pybind/test_ceph_argparse.py
 check_SCRIPTS += test/pybind/test_ceph_daemon.py
 
diff --git a/src/test/common/test_async_compressor.cc b/src/test/common/test_async_compressor.cc
new file mode 100644 (file)
index 0000000..7988868
--- /dev/null
@@ -0,0 +1,204 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#include <time.h>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/binomial_distribution.hpp>
+#include <gtest/gtest.h>
+#include "common/ceph_argparse.h"
+#include "compressor/AsyncCompressor.h"
+#include "global/global_init.h"
+
+typedef boost::mt11213b gen_type;
+
+class AsyncCompressorTest : public ::testing::Test {
+ public:
+  AsyncCompressor *async_compressor;
+  virtual void SetUp() {
+    cerr << __func__ << " start set up " << std::endl;
+    async_compressor = new AsyncCompressor(g_ceph_context);
+    async_compressor->init();
+  }
+  virtual void TearDown() {
+    async_compressor->terminate();
+    delete async_compressor;
+  }
+
+  void generate_random_data(bufferlist &bl, uint64_t len = 0) {
+    static const char *base= "znvm,x12399zasdfjkl1209zxcvjlkasjdfljwqelrjzx,cvn,m123#*(@)";
+    if (!len) {
+      boost::uniform_int<> kb(16, 4096);
+      gen_type rng(time(NULL));
+      len = kb(rng) * 1024;
+    }
+
+    while (bl.length() < len)
+      bl.append(base, sizeof(base)-1);
+  }
+};
+
+TEST_F(AsyncCompressorTest, SimpleTest) {
+  bufferlist compress_data, decompress_data, rawdata;
+  generate_random_data(rawdata, 1<<22);
+  bool finished;
+  uint64_t id = async_compressor->async_compress(rawdata);
+  ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished));
+  ASSERT_TRUE(finished == true);
+  id = async_compressor->async_decompress(compress_data);
+  do {
+    ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, false, &finished));
+  } while (!finished);
+  ASSERT_TRUE(finished == true);
+  ASSERT_TRUE(rawdata.contents_equal(decompress_data));
+  ASSERT_EQ(-ENOENT, async_compressor->get_decompress_data(id, decompress_data, true, &finished));
+}
+
+TEST_F(AsyncCompressorTest, GrubWaitTest) {
+  async_compressor->terminate();
+  bufferlist compress_data, decompress_data, rawdata;
+  generate_random_data(rawdata, 1<<22);
+  bool finished;
+  uint64_t id = async_compressor->async_compress(rawdata);
+  ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished));
+  ASSERT_TRUE(finished == true);
+  id = async_compressor->async_decompress(compress_data);
+  ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, true, &finished));
+  ASSERT_TRUE(finished == true);
+  ASSERT_TRUE(rawdata.contents_equal(decompress_data));
+  async_compressor->init();
+}
+
+class SyntheticWorkload {
+  set<pair<uint64_t, uint64_t> > compress_jobs, decompress_jobs;
+  AsyncCompressor *async_compressor;
+  vector<bufferlist> rand_data, compress_data;
+  gen_type rng;
+
+ public:
+  SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) {
+    for (int i = 0; i < 100; i++) {
+      bufferlist bl;
+      boost::uniform_int<> u(4096, 1<<24);
+      uint64_t value_len = u(rng);
+      bufferptr bp(value_len);
+      bp.zero();
+      for (uint64_t j = 0; j < value_len-sizeof(i); ) {
+        memcpy(bp.c_str()+j, &i, sizeof(i));
+        j += 4096;
+      }
+
+      bl.append(bp);
+      rand_data.push_back(bl);
+      compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[i]), i));
+      if (!(i % 10)) cerr << "seeding compress data " << i << std::endl;
+    }
+    compress_data.resize(100);
+    reap(true);
+  }
+  void do_compress() {
+    boost::uniform_int<> u(0, rand_data.size()-1);
+    uint64_t index = u(rng);
+    compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[index]), index));
+  }
+  void do_decompress() {
+    boost::uniform_int<> u(0, compress_data.size()-1);
+    uint64_t index = u(rng);
+    if (compress_data[index].length())
+      decompress_jobs.insert(make_pair(async_compressor->async_decompress(compress_data[index]), index));
+  }
+  void reap(bool blocking) {
+    bufferlist data;
+    bool finished;
+    set<pair<uint64_t, uint64_t> >::iterator prev;
+    uint64_t c_reap = 0, d_reap = 0;
+    for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin();
+         it != compress_jobs.end();) {
+      prev = it;
+      it++;
+      async_compressor->get_compress_data(prev->first, data, blocking, &finished);
+      if (finished) {
+        c_reap++;
+        if (compress_data[prev->second].length())
+          ASSERT_TRUE(compress_data[prev->second].contents_equal(data));
+        else
+          compress_data[prev->second].swap(data);
+        compress_jobs.erase(prev);
+      }
+    }
+
+    for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin();
+         it != decompress_jobs.end();) {
+      prev = it;
+      it++;
+      async_compressor->get_decompress_data(prev->first, data, blocking, &finished);
+      if (finished) {
+        d_reap++;
+        ASSERT_TRUE(rand_data[prev->second].contents_equal(data));
+        decompress_jobs.erase(prev);
+      }
+    }
+    cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl;
+  }
+  void print_internal_state() {
+    cerr << "inlfight compress jobs: " << compress_jobs.size()
+         << " inflight decompress jobs: " << decompress_jobs.size() << std::endl;
+  }
+  bool empty() const { return compress_jobs.empty() && decompress_jobs.empty(); }
+};
+
+TEST_F(AsyncCompressorTest, SyntheticTest) {
+  SyntheticWorkload test_ac(async_compressor);
+  gen_type rng(time(NULL));
+  boost::uniform_int<> true_false(0, 99);
+  int val;
+  for (int i = 0; i < 10000; ++i) {
+    if (!(i % 10)) {
+      cerr << "Op " << i << ": ";
+      test_ac.print_internal_state();
+    }
+    val = true_false(rng);
+    if (val < 45) {
+      test_ac.do_compress();
+    } else if (val < 95) {
+      test_ac.do_decompress();
+    } else {
+      test_ac.reap(false);
+    }
+  }
+  while (!test_ac.empty()) {
+    test_ac.reap(false);
+    test_ac.print_internal_state();
+    usleep(1000*500);
+  }
+}
+
+
+int main(int argc, char **argv) {
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+
+  global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make -j4 unittest_async_compressor && valgrind --tool=memcheck ./unittest_async_compressor"
+ * End:
+ */