]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Compressor: compressor code extention: plugin system added
authorVed-vampir <akiselyova@mirantis.com>
Tue, 10 Nov 2015 15:20:02 +0000 (18:20 +0300)
committerSage Weil <sage@redhat.com>
Fri, 15 Jan 2016 00:04:11 +0000 (19:04 -0500)
Signed-off-by: Alyona Kiseleva <akiselyova@mirantis.com>
17 files changed:
src/Makefile-env.am
src/Makefile.am
src/common/Makefile.am
src/common/config_opts.h
src/compressor/AsyncCompressor.cc
src/compressor/AsyncCompressor.h
src/compressor/CMakeLists.txt [new file with mode: 0644]
src/compressor/CompressionPlugin.h [new file with mode: 0644]
src/compressor/Compressor.cc
src/compressor/Compressor.h
src/compressor/Makefile.am
src/compressor/SnappyCompressor.h [deleted file]
src/compressor/snappy/CMakeLists.txt [new file with mode: 0644]
src/compressor/snappy/CompressionPluginSnappy.cc [new file with mode: 0644]
src/compressor/snappy/Makefile.am [new file with mode: 0644]
src/compressor/snappy/SnappyCompressor.h [new file with mode: 0644]
src/vstart.sh

index 2501b9b640e05d50d21c41a5ab0a49e7ccb51549..7dd39673228c79d3bd7c7d2ed0556a0602946345 100644 (file)
@@ -201,7 +201,7 @@ LIBPERFGLUE = libperfglue.la
 LIBAUTH = libauth.la
 LIBMSG = libmsg.la
 LIBCRUSH = libcrush.la
-LIBCOMPRESSOR = libcompressor.la -lsnappy
+LIBCOMPRESSOR = libcompressor.la
 LIBJSON_SPIRIT = libjson_spirit.la
 LIBKV = libkv.a
 LIBLOG = liblog.la
index cd24915322b9ff6c53ac9251c18813de7f3eabcf..1b651c157fe05edb3d68cf7a7a24d404971a495f 100644 (file)
@@ -25,6 +25,7 @@ include mds/Makefile.am
 include os/Makefile.am
 include osd/Makefile.am
 include erasure-code/Makefile.am
+include compressor/Makefile.am
 include osdc/Makefile.am
 include client/Makefile.am
 include global/Makefile.am
@@ -46,7 +47,6 @@ include rbd_replay/Makefile.am
 include test/Makefile.am
 include tools/Makefile.am
 include Makefile-rocksdb.am
-include compressor/Makefile.am
 include tracing/Makefile.am
 include pybind/Makefile.am
 
index 4dfe988a57fbb537ec7950d0c86b03df706586a2..5ed2d3b049f2be8744f66569f1090e3b175458b2 100644 (file)
@@ -157,6 +157,7 @@ noinst_HEADERS += \
 # important; libmsg before libauth!
 LIBCOMMON_DEPS += \
        $(LIBERASURE_CODE) \
+       $(LIBCOMPRESSOR) \
        $(LIBMSG) $(LIBAUTH) \
        $(LIBCRUSH) $(LIBJSON_SPIRIT) $(LIBLOG) $(LIBARCH) \
        $(BOOST_RANDOM_LIBS)
index 66ff3e38fb5f52c6bda471fc9dbe6b802e4db97f..8542162fb09b3c72e4b5b6d6c0e94f5bf4ffd6ed 100644 (file)
@@ -39,6 +39,7 @@ OPTION(restapi_log_level, OPT_STR, "")        // default set by Python code
 OPTION(restapi_base_url, OPT_STR, "")  // "
 OPTION(fatal_signal_handlers, OPT_BOOL, true)
 OPTION(erasure_code_dir, OPT_STR, CEPH_PKGLIBDIR"/erasure-code") // default location for erasure-code plugins
+OPTION(compression_dir, OPT_STR, CEPH_PKGLIBDIR"/compressor") // default location for compression plugins
 
 OPTION(log_file, OPT_STR, "/var/log/ceph/$cluster-$name.log") // default changed by common_preinit()
 OPTION(log_max_new, OPT_INT, 1000) // default changed by common_preinit()
@@ -575,6 +576,9 @@ OPTION(osd_pool_default_size, OPT_INT, 3)
 OPTION(osd_pool_default_min_size, OPT_INT, 0)  // 0 means no specific default; ceph will use size-size/2
 OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Configure in global or mon section of ceph.conf
 OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num
+OPTION(osd_compression_plugins, OPT_STR,
+       "snappy"
+       ) // list of compression plugins
 OPTION(osd_pool_default_erasure_code_profile,
        OPT_STR,
        "plugin=jerasure "
index cdd666701e2a7d59ca7585f7c5fad2c949b65133..7a9071a7189a2ad312314240a7bf43fce8d947fd 100644 (file)
@@ -21,9 +21,9 @@
 #define dout_prefix *_dout << "compressor "
 
 AsyncCompressor::AsyncCompressor(CephContext *c):
-  compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c),
+  compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c),
   job_id(0),
-  compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
+  compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
   job_lock("AsyncCompressor::job_lock"),
   compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
 }
index 15af92b4e3f6127787cb8e8d889a9b94ddb65539..cec2e961d4e3e4a5f59b9e8a9f052138803bdef8 100644 (file)
@@ -25,7 +25,7 @@
 
 class AsyncCompressor {
  private:
-  Compressor *compressor;
+  CompressorRef compressor;
   CephContext *cct;
   atomic_t job_id;
   vector<int> coreids;
diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt
new file mode 100644 (file)
index 0000000..8b41d23
--- /dev/null
@@ -0,0 +1,10 @@
+## compressor plugins
+
+set(compressorlibdir ${LIBRARY_OUTPUT_PATH}/compressor)
+
+add_subdirectory(snappy)
+
+add_library(compressor_objs OBJECT Compressor.cc)
+
+add_custom_target(compressor_plugins DEPENDS
+    ceph_snappy)
diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h
new file mode 100644 (file)
index 0000000..d699d00
--- /dev/null
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph distributed storage system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ * 
+ */
+
+#ifndef COMPRESSION_PLUGIN_H
+#define COMPRESSION_PLUGIN_H
+
+#include "common/Mutex.h"
+#include "common/PluginRegistry.h"
+#include "Compressor.h"
+
+namespace ceph {
+
+  class CompressionPlugin :  public Plugin {
+  public:
+    CompressorRef compressor;
+
+    CompressionPlugin(CephContext *cct) : Plugin(cct),
+                                          compressor(0) 
+    {}
+    
+    virtual ~CompressionPlugin() {}
+
+    virtual int factory(CompressorRef *cs,
+                                         ostream *ss) = 0;
+
+    virtual const char* name() {return "CompressionPlugin";}
+  };
+
+}
+
+#endif
index 0d11e748d41d7825884fec8c7dbd19dbaf16ec94..2ef817c57cbdb7409333e677c2f9aa9d44989fc1 100644 (file)
  */
 
 #include "Compressor.h"
-#include "SnappyCompressor.h"
+#include "CompressionPlugin.h"
 
 
-Compressor* Compressor::create(const string &type)
+CompressorRef Compressor::create(CephContext *cct, const string &type)
 {
-  if (type == "snappy")
-    return new SnappyCompressor();
-
-  assert(0);
+  CompressorRef cs_impl = NULL;
+  stringstream ss;
+  PluginRegistry *reg = cct->get_plugin_registry();
+  CompressionPlugin *factory = dynamic_cast<CompressionPlugin*>(reg->get_with_load("compressor", type));
+  if (factory == NULL) {
+    lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl;
+    return NULL;
+  }
+  int err = factory->factory(&cs_impl, &ss);
+  if (err)
+    lderr(cct) << __func__ << " factory return error " << err << dendl;
+  return cs_impl;
 }
index 3eb71aa5c45f8fdde77766a24eff7edcabff52d2..c891bbef2ede4cf3afce175f823aa6ff2fabe5eb 100644 (file)
 #include "include/int_types.h"
 #include "include/Context.h"
 
+class Compressor;
+typedef shared_ptr<Compressor> CompressorRef;
+
+
 class Compressor {
  public:
   virtual ~Compressor() {}
   virtual int compress(bufferlist &in, bufferlist &out) = 0;
   virtual int decompress(bufferlist &in, bufferlist &out) = 0;
 
-  static Compressor *create(const string &type);
+  static CompressorRef create(CephContext *cct, const string &type);
 };
 
+
+
 #endif
index bd2a2d7d1744f756bf429155b54cecb58f14b8a8..47a069fd0417bf1c1ae39be19e7a8db3f0040fe2 100644 (file)
@@ -1,11 +1,20 @@
+compressorlibdir = $(pkglibdir)/compressor
+compressorlib_LTLIBRARIES =  
+
+include compressor/snappy/Makefile.am
+
 libcompressor_la_SOURCES = \
        compressor/Compressor.cc \
        compressor/AsyncCompressor.cc
+compressor/CompressionPlugin.cc: ./ceph_ver.h
+libcompressor_la_DEPENDENCIES = $(compressorlib_LTLIBRARIES)
+if LINUX
+libcompressor_la_LIBADD = -ldl
+endif # LINUX
 noinst_LTLIBRARIES += libcompressor.la
 
-libcompressor_la_LIBADD = $(LIBCOMMON)
-
 noinst_HEADERS += \
        compressor/Compressor.h \
        compressor/AsyncCompressor.h \
-       compressor/SnappyCompressor.h
+    compressor/CompressionPlugin.h
+
diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h
deleted file mode 100644 (file)
index ba58b46..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef CEPH_SNAPPYCOMPRESSOR_H
-#define CEPH_SNAPPYCOMPRESSOR_H
-
-#include <snappy.h>
-#include <snappy-sinksource.h>
-#include "include/buffer.h"
-#include "Compressor.h"
-
-class BufferlistSource : public snappy::Source {
-  list<bufferptr>::const_iterator pb;
-  size_t pb_off;
-  size_t left;
-
- public:
-  BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
-  virtual ~BufferlistSource() {}
-  virtual size_t Available() const { return left; }
-  virtual const char* Peek(size_t* len) {
-    if (left) {
-      *len = pb->length() - pb_off;
-      return pb->c_str() + pb_off;
-    } else {
-      *len = 0;
-      return NULL;
-    }
-  }
-  virtual void Skip(size_t n) {
-    if (n + pb_off == pb->length()) {
-      ++pb;
-      pb_off = 0;
-    } else {
-      pb_off += n;
-    }
-    left -= n;
-  }
-};
-
-class SnappyCompressor : public Compressor {
- public:
-  virtual ~SnappyCompressor() {}
-  virtual int compress(bufferlist &src, bufferlist &dst) {
-    BufferlistSource source(src);
-    bufferptr ptr(snappy::MaxCompressedLength(src.length()));
-    snappy::UncheckedByteArraySink sink(ptr.c_str());
-    snappy::Compress(&source, &sink);
-    dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
-    return 0;
-  }
-  virtual int decompress(bufferlist &src, bufferlist &dst) {
-    BufferlistSource source(src);
-    size_t res_len = 0;
-    // Trick, decompress only need first 32bits buffer
-    if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
-      return -1;
-    bufferptr ptr(res_len);
-    if (snappy::RawUncompress(&source, ptr.c_str())) {
-      dst.append(ptr);
-      return 0;
-    }
-    return -1;
-  }
-};
-
-#endif
diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt
new file mode 100644 (file)
index 0000000..62e8df8
--- /dev/null
@@ -0,0 +1,13 @@
+# snappy
+
+set(snappy_sources
+  CompressionPluginSnappy.cc
+  $<TARGET_OBJECTS:compressor_objs>
+)
+
+add_library(ceph_snappy SHARED ${snappy_sources})
+add_dependencies(ceph_snappy ${CMAKE_SOURCE_DIR}/src/ceph_ver.h)
+target_link_libraries(ceph_snappy ${EXTRALIBS})
+set_target_properties(ceph_snappy PROPERTIES VERSION 2.14.0 SOVERSION 2)
+set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lsnappy")
+install(TARGETS ceph_snappy DESTINATION lib/compressor)
diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc
new file mode 100644 (file)
index 0000000..2030d56
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "SnappyCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginSnappy : public CompressionPlugin {
+
+public:
+
+  CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct)
+  {}
+
+  virtual int factory(CompressorRef *cs,
+                      ostream *ss)
+  {
+    if (compressor == 0) {
+      SnappyCompressor *interface = new SnappyCompressor();
+      compressor = CompressorRef(interface);
+    }
+    *cs = compressor;
+    return 0;
+  }
+};
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+  return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+                       const std::string& type,
+                       const std::string& name)
+{
+  PluginRegistry *instance = cct->get_plugin_registry();
+
+  return instance->add(type, name, new CompressionPluginSnappy(cct));
+}
diff --git a/src/compressor/snappy/Makefile.am b/src/compressor/snappy/Makefile.am
new file mode 100644 (file)
index 0000000..49d3db0
--- /dev/null
@@ -0,0 +1,22 @@
+# snappy plugin
+noinst_HEADERS += \
+  compressor/snappy/SnappyCompressor.h
+
+snappy_sources = \
+  compressor/Compressor.cc \
+  compressor/snappy/CompressionPluginSnappy.cc
+
+compressor/snappy/CompressionPluginSnappy.cc: ./ceph_ver.h
+
+libceph_snappy_la_SOURCES = ${snappy_sources}
+libceph_snappy_la_CFLAGS = ${AM_CFLAGS}  \
+  -I$(srcdir)/compressor/snappy/snappy-1.2.8
+libceph_snappy_la_CXXFLAGS= ${AM_CXXFLAGS} \
+  -I$(srcdir)/compressor/snappy/snappy-1.2.8
+libceph_snappy_la_LIBADD = $(LIBCRUSH) $(PTHREAD_LIBS) $(EXTRALIBS)
+libceph_snappy_la_LDFLAGS = ${AM_LDFLAGS} -lsnappy -version-info 2:0:0
+if LINUX
+libceph_snappy_la_LDFLAGS += -export-symbols-regex '.*__compressor_.*'
+endif
+
+compressorlib_LTLIBRARIES += libceph_snappy.la
diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h
new file mode 100644 (file)
index 0000000..40ec450
--- /dev/null
@@ -0,0 +1,79 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SNAPPYCOMPRESSOR_H
+#define CEPH_SNAPPYCOMPRESSOR_H
+
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include "include/buffer.h"
+#include "compressor/Compressor.h"
+
+class BufferlistSource : public snappy::Source {
+  list<bufferptr>::const_iterator pb;
+  size_t pb_off;
+  size_t left;
+
+ public:
+  BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
+  virtual ~BufferlistSource() {}
+  virtual size_t Available() const { return left; }
+  virtual const char* Peek(size_t* len) {
+    if (left) {
+      *len = pb->length() - pb_off;
+      return pb->c_str() + pb_off;
+    } else {
+      *len = 0;
+      return NULL;
+    }
+  }
+  virtual void Skip(size_t n) {
+    if (n + pb_off == pb->length()) {
+      ++pb;
+      pb_off = 0;
+    } else {
+      pb_off += n;
+    }
+    left -= n;
+  }
+};
+
+class SnappyCompressor : public Compressor {
+ public:
+  virtual ~SnappyCompressor() {}
+  virtual const char* get_method_name() { return "snappy"; }
+  virtual int compress(bufferlist &src, bufferlist &dst) {
+    BufferlistSource source(src);
+    bufferptr ptr(snappy::MaxCompressedLength(src.length()));
+    snappy::UncheckedByteArraySink sink(ptr.c_str());
+    snappy::Compress(&source, &sink);
+    dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
+    return 0;
+  }
+  virtual int decompress(bufferlist &src, bufferlist &dst) {
+    BufferlistSource source(src);
+    size_t res_len = 0;
+    // Trick, decompress only need first 32bits buffer
+    if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
+      return -1;
+    bufferptr ptr(res_len);
+    if (snappy::RawUncompress(&source, ptr.c_str())) {
+      dst.append(ptr);
+      return 0;
+    }
+    return -1;
+  }
+};
+
+#endif
index bfbfc39bb8df3dddfd0cc17dd006a48cd87058eb..e5154792cff8a1b6080dc2da781911bf4a6c80d2 100755 (executable)
@@ -33,17 +33,31 @@ if [ -e CMakeCache.txt ]; then
     ln -sf ../${file} ec_plugins/`basename $file`
   done
   [ -z "$EC_PATH" ] && EC_PATH=./ec_plugins
+  # check for compression plugins
+  mkdir -p .libs/compressor
+  for file in ./src/compressor/*/libcs_*.so*;
+  do
+    ln -sf ../${file} .libs/compressor/`basename $file`
+  done
+else
+    mkdir -p .libs/compressor
+    for f in `ls -d compressor/*/`; 
+    do 
+        cp .libs/libceph_`basename $f`.so* .libs/compressor/;
+    done
 fi
 
 if [ -z "$CEPH_BUILD_ROOT" ]; then
         [ -z "$CEPH_BIN" ] && CEPH_BIN=.
         [ -z "$CEPH_LIB" ] && CEPH_LIB=.libs
         [ -z $EC_PATH ] && EC_PATH=$CEPH_LIB
+        [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB
         [ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB
 else
         [ -z $CEPH_BIN ] && CEPH_BIN=$CEPH_BUILD_ROOT/bin
         [ -z $CEPH_LIB ] && CEPH_LIB=$CEPH_BUILD_ROOT/lib
         [ -z $EC_PATH ] && EC_PATH=$CEPH_LIB/erasure-code
+        [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB/compressor
         [ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB/rados-classes
 fi
 
@@ -433,6 +447,7 @@ if [ "$start_mon" -eq 1 ]; then
         mon data avail warn = 10
         mon data avail crit = 1
         erasure code dir = $EC_PATH
+        plugin dir = $CS_PATH
         osd pool default erasure code profile = plugin=jerasure technique=reed_sol_van k=2 m=1 ruleset-failure-domain=osd
         rgw frontends = fastcgi, civetweb port=$CEPH_RGW_PORT
         rgw dns name = localhost