From 2cadc416ca64ec05cd3496f5862f46e9dacb258b Mon Sep 17 00:00:00 2001 From: Ved-vampir Date: Tue, 10 Nov 2015 18:20:02 +0300 Subject: [PATCH] Compressor: compressor code extention: plugin system added Signed-off-by: Alyona Kiseleva --- src/Makefile-env.am | 2 +- src/Makefile.am | 2 +- src/common/Makefile.am | 1 + src/common/config_opts.h | 4 ++ src/compressor/AsyncCompressor.cc | 4 +- src/compressor/AsyncCompressor.h | 2 +- src/compressor/CMakeLists.txt | 10 ++++ src/compressor/CompressionPlugin.h | 45 +++++++++++++++ src/compressor/Compressor.cc | 20 +++++-- src/compressor/Compressor.h | 8 ++- src/compressor/Makefile.am | 15 ++++- src/compressor/snappy/CMakeLists.txt | 13 +++++ .../snappy/CompressionPluginSnappy.cc | 57 +++++++++++++++++++ src/compressor/snappy/Makefile.am | 22 +++++++ .../{ => snappy}/SnappyCompressor.h | 3 +- src/vstart.sh | 15 +++++ 16 files changed, 207 insertions(+), 16 deletions(-) create mode 100644 src/compressor/CMakeLists.txt create mode 100644 src/compressor/CompressionPlugin.h create mode 100644 src/compressor/snappy/CMakeLists.txt create mode 100644 src/compressor/snappy/CompressionPluginSnappy.cc create mode 100644 src/compressor/snappy/Makefile.am rename src/compressor/{ => snappy}/SnappyCompressor.h (95%) diff --git a/src/Makefile-env.am b/src/Makefile-env.am index 2501b9b640e05..7dd39673228c7 100644 --- a/src/Makefile-env.am +++ b/src/Makefile-env.am @@ -201,7 +201,7 @@ LIBPERFGLUE = libperfglue.la LIBAUTH = libauth.la LIBMSG = libmsg.la LIBCRUSH = libcrush.la -LIBCOMPRESSOR = libcompressor.la -lsnappy +LIBCOMPRESSOR = libcompressor.la LIBJSON_SPIRIT = libjson_spirit.la LIBKV = libkv.a LIBLOG = liblog.la diff --git a/src/Makefile.am b/src/Makefile.am index cd24915322b9f..1b651c157fe05 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -25,6 +25,7 @@ include mds/Makefile.am include os/Makefile.am include osd/Makefile.am include erasure-code/Makefile.am +include compressor/Makefile.am include osdc/Makefile.am include client/Makefile.am include global/Makefile.am @@ -46,7 +47,6 @@ include rbd_replay/Makefile.am include test/Makefile.am include tools/Makefile.am include Makefile-rocksdb.am -include compressor/Makefile.am include tracing/Makefile.am include pybind/Makefile.am diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 4dfe988a57fbb..5ed2d3b049f2b 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -157,6 +157,7 @@ noinst_HEADERS += \ # important; libmsg before libauth! LIBCOMMON_DEPS += \ $(LIBERASURE_CODE) \ + $(LIBCOMPRESSOR) \ $(LIBMSG) $(LIBAUTH) \ $(LIBCRUSH) $(LIBJSON_SPIRIT) $(LIBLOG) $(LIBARCH) \ $(BOOST_RANDOM_LIBS) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 66ff3e38fb5f5..8542162fb09b3 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -39,6 +39,7 @@ OPTION(restapi_log_level, OPT_STR, "") // default set by Python code OPTION(restapi_base_url, OPT_STR, "") // " OPTION(fatal_signal_handlers, OPT_BOOL, true) OPTION(erasure_code_dir, OPT_STR, CEPH_PKGLIBDIR"/erasure-code") // default location for erasure-code plugins +OPTION(compression_dir, OPT_STR, CEPH_PKGLIBDIR"/compressor") // default location for compression plugins OPTION(log_file, OPT_STR, "/var/log/ceph/$cluster-$name.log") // default changed by common_preinit() OPTION(log_max_new, OPT_INT, 1000) // default changed by common_preinit() @@ -575,6 +576,9 @@ OPTION(osd_pool_default_size, OPT_INT, 3) OPTION(osd_pool_default_min_size, OPT_INT, 0) // 0 means no specific default; ceph will use size-size/2 OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Configure in global or mon section of ceph.conf OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num +OPTION(osd_compression_plugins, OPT_STR, + "snappy" + ) // list of compression plugins OPTION(osd_pool_default_erasure_code_profile, OPT_STR, "plugin=jerasure " diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc index cdd666701e2a7..7a9071a7189a2 100644 --- a/src/compressor/AsyncCompressor.cc +++ b/src/compressor/AsyncCompressor.cc @@ -21,9 +21,9 @@ #define dout_prefix *_dout << "compressor " AsyncCompressor::AsyncCompressor(CephContext *c): - compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c), + compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c), job_id(0), - compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), + compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), job_lock("AsyncCompressor::job_lock"), compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { } diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h index 15af92b4e3f61..cec2e961d4e3e 100644 --- a/src/compressor/AsyncCompressor.h +++ b/src/compressor/AsyncCompressor.h @@ -25,7 +25,7 @@ class AsyncCompressor { private: - Compressor *compressor; + CompressorRef compressor; CephContext *cct; atomic_t job_id; vector coreids; diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt new file mode 100644 index 0000000000000..8b41d2302690c --- /dev/null +++ b/src/compressor/CMakeLists.txt @@ -0,0 +1,10 @@ +## compressor plugins + +set(compressorlibdir ${LIBRARY_OUTPUT_PATH}/compressor) + +add_subdirectory(snappy) + +add_library(compressor_objs OBJECT Compressor.cc) + +add_custom_target(compressor_plugins DEPENDS + ceph_snappy) diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h new file mode 100644 index 0000000000000..d699d007fd685 --- /dev/null +++ b/src/compressor/CompressionPlugin.h @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph distributed storage system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva + * + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef COMPRESSION_PLUGIN_H +#define COMPRESSION_PLUGIN_H + +#include "common/Mutex.h" +#include "common/PluginRegistry.h" +#include "Compressor.h" + +namespace ceph { + + class CompressionPlugin : public Plugin { + public: + CompressorRef compressor; + + CompressionPlugin(CephContext *cct) : Plugin(cct), + compressor(0) + {} + + virtual ~CompressionPlugin() {} + + virtual int factory(CompressorRef *cs, + ostream *ss) = 0; + + virtual const char* name() {return "CompressionPlugin";} + }; + +} + +#endif diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc index 0d11e748d41d7..2ef817c57cbdb 100644 --- a/src/compressor/Compressor.cc +++ b/src/compressor/Compressor.cc @@ -13,13 +13,21 @@ */ #include "Compressor.h" -#include "SnappyCompressor.h" +#include "CompressionPlugin.h" -Compressor* Compressor::create(const string &type) +CompressorRef Compressor::create(CephContext *cct, const string &type) { - if (type == "snappy") - return new SnappyCompressor(); - - assert(0); + CompressorRef cs_impl = NULL; + stringstream ss; + PluginRegistry *reg = cct->get_plugin_registry(); + CompressionPlugin *factory = dynamic_cast(reg->get_with_load("compressor", type)); + if (factory == NULL) { + lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl; + return NULL; + } + int err = factory->factory(&cs_impl, &ss); + if (err) + lderr(cct) << __func__ << " factory return error " << err << dendl; + return cs_impl; } diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index 3eb71aa5c45f8..c891bbef2ede4 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -18,13 +18,19 @@ #include "include/int_types.h" #include "include/Context.h" +class Compressor; +typedef shared_ptr CompressorRef; + + class Compressor { public: virtual ~Compressor() {} virtual int compress(bufferlist &in, bufferlist &out) = 0; virtual int decompress(bufferlist &in, bufferlist &out) = 0; - static Compressor *create(const string &type); + static CompressorRef create(CephContext *cct, const string &type); }; + + #endif diff --git a/src/compressor/Makefile.am b/src/compressor/Makefile.am index bd2a2d7d1744f..47a069fd0417b 100644 --- a/src/compressor/Makefile.am +++ b/src/compressor/Makefile.am @@ -1,11 +1,20 @@ +compressorlibdir = $(pkglibdir)/compressor +compressorlib_LTLIBRARIES = + +include compressor/snappy/Makefile.am + libcompressor_la_SOURCES = \ compressor/Compressor.cc \ compressor/AsyncCompressor.cc +compressor/CompressionPlugin.cc: ./ceph_ver.h +libcompressor_la_DEPENDENCIES = $(compressorlib_LTLIBRARIES) +if LINUX +libcompressor_la_LIBADD = -ldl +endif # LINUX noinst_LTLIBRARIES += libcompressor.la -libcompressor_la_LIBADD = $(LIBCOMMON) - noinst_HEADERS += \ compressor/Compressor.h \ compressor/AsyncCompressor.h \ - compressor/SnappyCompressor.h + compressor/CompressionPlugin.h + diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt new file mode 100644 index 0000000000000..62e8df8be4a55 --- /dev/null +++ b/src/compressor/snappy/CMakeLists.txt @@ -0,0 +1,13 @@ +# snappy + +set(snappy_sources + CompressionPluginSnappy.cc + $ +) + +add_library(ceph_snappy SHARED ${snappy_sources}) +add_dependencies(ceph_snappy ${CMAKE_SOURCE_DIR}/src/ceph_ver.h) +target_link_libraries(ceph_snappy ${EXTRALIBS}) +set_target_properties(ceph_snappy PROPERTIES VERSION 2.14.0 SOVERSION 2) +set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lsnappy") +install(TARGETS ceph_snappy DESTINATION lib/compressor) diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc new file mode 100644 index 0000000000000..2030d56bf251b --- /dev/null +++ b/src/compressor/snappy/CompressionPluginSnappy.cc @@ -0,0 +1,57 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + + +// ----------------------------------------------------------------------------- +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "SnappyCompressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginSnappy : public CompressionPlugin { + +public: + + CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct) + {} + + virtual int factory(CompressorRef *cs, + ostream *ss) + { + if (compressor == 0) { + SnappyCompressor *interface = new SnappyCompressor(); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginSnappy(cct)); +} diff --git a/src/compressor/snappy/Makefile.am b/src/compressor/snappy/Makefile.am new file mode 100644 index 0000000000000..49d3db0532980 --- /dev/null +++ b/src/compressor/snappy/Makefile.am @@ -0,0 +1,22 @@ +# snappy plugin +noinst_HEADERS += \ + compressor/snappy/SnappyCompressor.h + +snappy_sources = \ + compressor/Compressor.cc \ + compressor/snappy/CompressionPluginSnappy.cc + +compressor/snappy/CompressionPluginSnappy.cc: ./ceph_ver.h + +libceph_snappy_la_SOURCES = ${snappy_sources} +libceph_snappy_la_CFLAGS = ${AM_CFLAGS} \ + -I$(srcdir)/compressor/snappy/snappy-1.2.8 +libceph_snappy_la_CXXFLAGS= ${AM_CXXFLAGS} \ + -I$(srcdir)/compressor/snappy/snappy-1.2.8 +libceph_snappy_la_LIBADD = $(LIBCRUSH) $(PTHREAD_LIBS) $(EXTRALIBS) +libceph_snappy_la_LDFLAGS = ${AM_LDFLAGS} -lsnappy -version-info 2:0:0 +if LINUX +libceph_snappy_la_LDFLAGS += -export-symbols-regex '.*__compressor_.*' +endif + +compressorlib_LTLIBRARIES += libceph_snappy.la diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h similarity index 95% rename from src/compressor/SnappyCompressor.h rename to src/compressor/snappy/SnappyCompressor.h index ba58b4624ee79..40ec4509a3ac2 100644 --- a/src/compressor/SnappyCompressor.h +++ b/src/compressor/snappy/SnappyCompressor.h @@ -18,7 +18,7 @@ #include #include #include "include/buffer.h" -#include "Compressor.h" +#include "compressor/Compressor.h" class BufferlistSource : public snappy::Source { list::const_iterator pb; @@ -52,6 +52,7 @@ class BufferlistSource : public snappy::Source { class SnappyCompressor : public Compressor { public: virtual ~SnappyCompressor() {} + virtual const char* get_method_name() { return "snappy"; } virtual int compress(bufferlist &src, bufferlist &dst) { BufferlistSource source(src); bufferptr ptr(snappy::MaxCompressedLength(src.length())); diff --git a/src/vstart.sh b/src/vstart.sh index bfbfc39bb8df3..e5154792cff8a 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -33,17 +33,31 @@ if [ -e CMakeCache.txt ]; then ln -sf ../${file} ec_plugins/`basename $file` done [ -z "$EC_PATH" ] && EC_PATH=./ec_plugins + # check for compression plugins + mkdir -p .libs/compressor + for file in ./src/compressor/*/libcs_*.so*; + do + ln -sf ../${file} .libs/compressor/`basename $file` + done +else + mkdir -p .libs/compressor + for f in `ls -d compressor/*/`; + do + cp .libs/libceph_`basename $f`.so* .libs/compressor/; + done fi if [ -z "$CEPH_BUILD_ROOT" ]; then [ -z "$CEPH_BIN" ] && CEPH_BIN=. [ -z "$CEPH_LIB" ] && CEPH_LIB=.libs [ -z $EC_PATH ] && EC_PATH=$CEPH_LIB + [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB [ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB else [ -z $CEPH_BIN ] && CEPH_BIN=$CEPH_BUILD_ROOT/bin [ -z $CEPH_LIB ] && CEPH_LIB=$CEPH_BUILD_ROOT/lib [ -z $EC_PATH ] && EC_PATH=$CEPH_LIB/erasure-code + [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB/compressor [ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB/rados-classes fi @@ -433,6 +447,7 @@ if [ "$start_mon" -eq 1 ]; then mon data avail warn = 10 mon data avail crit = 1 erasure code dir = $EC_PATH + plugin dir = $CS_PATH osd pool default erasure code profile = plugin=jerasure technique=reed_sol_van k=2 m=1 ruleset-failure-domain=osd rgw frontends = fastcgi, civetweb port=$CEPH_RGW_PORT rgw dns name = localhost -- 2.39.5