From: Ved-vampir Date: Tue, 10 Nov 2015 15:20:02 +0000 (+0300) Subject: Compressor: compressor code extention: plugin system added X-Git-Tag: v10.0.3~37^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2cadc416ca64ec05cd3496f5862f46e9dacb258b;p=ceph.git Compressor: compressor code extention: plugin system added Signed-off-by: Alyona Kiseleva --- diff --git a/src/Makefile-env.am b/src/Makefile-env.am index 2501b9b640e0..7dd39673228c 100644 --- a/src/Makefile-env.am +++ b/src/Makefile-env.am @@ -201,7 +201,7 @@ LIBPERFGLUE = libperfglue.la LIBAUTH = libauth.la LIBMSG = libmsg.la LIBCRUSH = libcrush.la -LIBCOMPRESSOR = libcompressor.la -lsnappy +LIBCOMPRESSOR = libcompressor.la LIBJSON_SPIRIT = libjson_spirit.la LIBKV = libkv.a LIBLOG = liblog.la diff --git a/src/Makefile.am b/src/Makefile.am index cd24915322b9..1b651c157fe0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -25,6 +25,7 @@ include mds/Makefile.am include os/Makefile.am include osd/Makefile.am include erasure-code/Makefile.am +include compressor/Makefile.am include osdc/Makefile.am include client/Makefile.am include global/Makefile.am @@ -46,7 +47,6 @@ include rbd_replay/Makefile.am include test/Makefile.am include tools/Makefile.am include Makefile-rocksdb.am -include compressor/Makefile.am include tracing/Makefile.am include pybind/Makefile.am diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 4dfe988a57fb..5ed2d3b049f2 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -157,6 +157,7 @@ noinst_HEADERS += \ # important; libmsg before libauth! LIBCOMMON_DEPS += \ $(LIBERASURE_CODE) \ + $(LIBCOMPRESSOR) \ $(LIBMSG) $(LIBAUTH) \ $(LIBCRUSH) $(LIBJSON_SPIRIT) $(LIBLOG) $(LIBARCH) \ $(BOOST_RANDOM_LIBS) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 66ff3e38fb5f..8542162fb09b 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -39,6 +39,7 @@ OPTION(restapi_log_level, OPT_STR, "") // default set by Python code OPTION(restapi_base_url, OPT_STR, "") // " OPTION(fatal_signal_handlers, OPT_BOOL, true) OPTION(erasure_code_dir, OPT_STR, CEPH_PKGLIBDIR"/erasure-code") // default location for erasure-code plugins +OPTION(compression_dir, OPT_STR, CEPH_PKGLIBDIR"/compressor") // default location for compression plugins OPTION(log_file, OPT_STR, "/var/log/ceph/$cluster-$name.log") // default changed by common_preinit() OPTION(log_max_new, OPT_INT, 1000) // default changed by common_preinit() @@ -575,6 +576,9 @@ OPTION(osd_pool_default_size, OPT_INT, 3) OPTION(osd_pool_default_min_size, OPT_INT, 0) // 0 means no specific default; ceph will use size-size/2 OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Configure in global or mon section of ceph.conf OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num +OPTION(osd_compression_plugins, OPT_STR, + "snappy" + ) // list of compression plugins OPTION(osd_pool_default_erasure_code_profile, OPT_STR, "plugin=jerasure " diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc index cdd666701e2a..7a9071a7189a 100644 --- a/src/compressor/AsyncCompressor.cc +++ b/src/compressor/AsyncCompressor.cc @@ -21,9 +21,9 @@ #define dout_prefix *_dout << "compressor " AsyncCompressor::AsyncCompressor(CephContext *c): - compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c), + compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c), job_id(0), - compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), + compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), job_lock("AsyncCompressor::job_lock"), compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { } diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h index 15af92b4e3f6..cec2e961d4e3 100644 --- a/src/compressor/AsyncCompressor.h +++ b/src/compressor/AsyncCompressor.h @@ -25,7 +25,7 @@ class AsyncCompressor { private: - Compressor *compressor; + CompressorRef compressor; CephContext *cct; atomic_t job_id; vector coreids; diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt new file mode 100644 index 000000000000..8b41d2302690 --- /dev/null +++ b/src/compressor/CMakeLists.txt @@ -0,0 +1,10 @@ +## compressor plugins + +set(compressorlibdir ${LIBRARY_OUTPUT_PATH}/compressor) + +add_subdirectory(snappy) + +add_library(compressor_objs OBJECT Compressor.cc) + +add_custom_target(compressor_plugins DEPENDS + ceph_snappy) diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h new file mode 100644 index 000000000000..d699d007fd68 --- /dev/null +++ b/src/compressor/CompressionPlugin.h @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph distributed storage system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva + * + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef COMPRESSION_PLUGIN_H +#define COMPRESSION_PLUGIN_H + +#include "common/Mutex.h" +#include "common/PluginRegistry.h" +#include "Compressor.h" + +namespace ceph { + + class CompressionPlugin : public Plugin { + public: + CompressorRef compressor; + + CompressionPlugin(CephContext *cct) : Plugin(cct), + compressor(0) + {} + + virtual ~CompressionPlugin() {} + + virtual int factory(CompressorRef *cs, + ostream *ss) = 0; + + virtual const char* name() {return "CompressionPlugin";} + }; + +} + +#endif diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc index 0d11e748d41d..2ef817c57cbd 100644 --- a/src/compressor/Compressor.cc +++ b/src/compressor/Compressor.cc @@ -13,13 +13,21 @@ */ #include "Compressor.h" -#include "SnappyCompressor.h" +#include "CompressionPlugin.h" -Compressor* Compressor::create(const string &type) +CompressorRef Compressor::create(CephContext *cct, const string &type) { - if (type == "snappy") - return new SnappyCompressor(); - - assert(0); + CompressorRef cs_impl = NULL; + stringstream ss; + PluginRegistry *reg = cct->get_plugin_registry(); + CompressionPlugin *factory = dynamic_cast(reg->get_with_load("compressor", type)); + if (factory == NULL) { + lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl; + return NULL; + } + int err = factory->factory(&cs_impl, &ss); + if (err) + lderr(cct) << __func__ << " factory return error " << err << dendl; + return cs_impl; } diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index 3eb71aa5c45f..c891bbef2ede 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -18,13 +18,19 @@ #include "include/int_types.h" #include "include/Context.h" +class Compressor; +typedef shared_ptr CompressorRef; + + class Compressor { public: virtual ~Compressor() {} virtual int compress(bufferlist &in, bufferlist &out) = 0; virtual int decompress(bufferlist &in, bufferlist &out) = 0; - static Compressor *create(const string &type); + static CompressorRef create(CephContext *cct, const string &type); }; + + #endif diff --git a/src/compressor/Makefile.am b/src/compressor/Makefile.am index bd2a2d7d1744..47a069fd0417 100644 --- a/src/compressor/Makefile.am +++ b/src/compressor/Makefile.am @@ -1,11 +1,20 @@ +compressorlibdir = $(pkglibdir)/compressor +compressorlib_LTLIBRARIES = + +include compressor/snappy/Makefile.am + libcompressor_la_SOURCES = \ compressor/Compressor.cc \ compressor/AsyncCompressor.cc +compressor/CompressionPlugin.cc: ./ceph_ver.h +libcompressor_la_DEPENDENCIES = $(compressorlib_LTLIBRARIES) +if LINUX +libcompressor_la_LIBADD = -ldl +endif # LINUX noinst_LTLIBRARIES += libcompressor.la -libcompressor_la_LIBADD = $(LIBCOMMON) - noinst_HEADERS += \ compressor/Compressor.h \ compressor/AsyncCompressor.h \ - compressor/SnappyCompressor.h + compressor/CompressionPlugin.h + diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h deleted file mode 100644 index ba58b4624ee7..000000000000 --- a/src/compressor/SnappyCompressor.h +++ /dev/null @@ -1,78 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_SNAPPYCOMPRESSOR_H -#define CEPH_SNAPPYCOMPRESSOR_H - -#include -#include -#include "include/buffer.h" -#include "Compressor.h" - -class BufferlistSource : public snappy::Source { - list::const_iterator pb; - size_t pb_off; - size_t left; - - public: - BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {} - virtual ~BufferlistSource() {} - virtual size_t Available() const { return left; } - virtual const char* Peek(size_t* len) { - if (left) { - *len = pb->length() - pb_off; - return pb->c_str() + pb_off; - } else { - *len = 0; - return NULL; - } - } - virtual void Skip(size_t n) { - if (n + pb_off == pb->length()) { - ++pb; - pb_off = 0; - } else { - pb_off += n; - } - left -= n; - } -}; - -class SnappyCompressor : public Compressor { - public: - virtual ~SnappyCompressor() {} - virtual int compress(bufferlist &src, bufferlist &dst) { - BufferlistSource source(src); - bufferptr ptr(snappy::MaxCompressedLength(src.length())); - snappy::UncheckedByteArraySink sink(ptr.c_str()); - snappy::Compress(&source, &sink); - dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str()); - return 0; - } - virtual int decompress(bufferlist &src, bufferlist &dst) { - BufferlistSource source(src); - size_t res_len = 0; - // Trick, decompress only need first 32bits buffer - if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len)) - return -1; - bufferptr ptr(res_len); - if (snappy::RawUncompress(&source, ptr.c_str())) { - dst.append(ptr); - return 0; - } - return -1; - } -}; - -#endif diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt new file mode 100644 index 000000000000..62e8df8be4a5 --- /dev/null +++ b/src/compressor/snappy/CMakeLists.txt @@ -0,0 +1,13 @@ +# snappy + +set(snappy_sources + CompressionPluginSnappy.cc + $ +) + +add_library(ceph_snappy SHARED ${snappy_sources}) +add_dependencies(ceph_snappy ${CMAKE_SOURCE_DIR}/src/ceph_ver.h) +target_link_libraries(ceph_snappy ${EXTRALIBS}) +set_target_properties(ceph_snappy PROPERTIES VERSION 2.14.0 SOVERSION 2) +set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lsnappy") +install(TARGETS ceph_snappy DESTINATION lib/compressor) diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc new file mode 100644 index 000000000000..2030d56bf251 --- /dev/null +++ b/src/compressor/snappy/CompressionPluginSnappy.cc @@ -0,0 +1,57 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + + +// ----------------------------------------------------------------------------- +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "SnappyCompressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginSnappy : public CompressionPlugin { + +public: + + CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct) + {} + + virtual int factory(CompressorRef *cs, + ostream *ss) + { + if (compressor == 0) { + SnappyCompressor *interface = new SnappyCompressor(); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginSnappy(cct)); +} diff --git a/src/compressor/snappy/Makefile.am b/src/compressor/snappy/Makefile.am new file mode 100644 index 000000000000..49d3db053298 --- /dev/null +++ b/src/compressor/snappy/Makefile.am @@ -0,0 +1,22 @@ +# snappy plugin +noinst_HEADERS += \ + compressor/snappy/SnappyCompressor.h + +snappy_sources = \ + compressor/Compressor.cc \ + compressor/snappy/CompressionPluginSnappy.cc + +compressor/snappy/CompressionPluginSnappy.cc: ./ceph_ver.h + +libceph_snappy_la_SOURCES = ${snappy_sources} +libceph_snappy_la_CFLAGS = ${AM_CFLAGS} \ + -I$(srcdir)/compressor/snappy/snappy-1.2.8 +libceph_snappy_la_CXXFLAGS= ${AM_CXXFLAGS} \ + -I$(srcdir)/compressor/snappy/snappy-1.2.8 +libceph_snappy_la_LIBADD = $(LIBCRUSH) $(PTHREAD_LIBS) $(EXTRALIBS) +libceph_snappy_la_LDFLAGS = ${AM_LDFLAGS} -lsnappy -version-info 2:0:0 +if LINUX +libceph_snappy_la_LDFLAGS += -export-symbols-regex '.*__compressor_.*' +endif + +compressorlib_LTLIBRARIES += libceph_snappy.la diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h new file mode 100644 index 000000000000..40ec4509a3ac --- /dev/null +++ b/src/compressor/snappy/SnappyCompressor.h @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SNAPPYCOMPRESSOR_H +#define CEPH_SNAPPYCOMPRESSOR_H + +#include +#include +#include "include/buffer.h" +#include "compressor/Compressor.h" + +class BufferlistSource : public snappy::Source { + list::const_iterator pb; + size_t pb_off; + size_t left; + + public: + BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {} + virtual ~BufferlistSource() {} + virtual size_t Available() const { return left; } + virtual const char* Peek(size_t* len) { + if (left) { + *len = pb->length() - pb_off; + return pb->c_str() + pb_off; + } else { + *len = 0; + return NULL; + } + } + virtual void Skip(size_t n) { + if (n + pb_off == pb->length()) { + ++pb; + pb_off = 0; + } else { + pb_off += n; + } + left -= n; + } +}; + +class SnappyCompressor : public Compressor { + public: + virtual ~SnappyCompressor() {} + virtual const char* get_method_name() { return "snappy"; } + virtual int compress(bufferlist &src, bufferlist &dst) { + BufferlistSource source(src); + bufferptr ptr(snappy::MaxCompressedLength(src.length())); + snappy::UncheckedByteArraySink sink(ptr.c_str()); + snappy::Compress(&source, &sink); + dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str()); + return 0; + } + virtual int decompress(bufferlist &src, bufferlist &dst) { + BufferlistSource source(src); + size_t res_len = 0; + // Trick, decompress only need first 32bits buffer + if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len)) + return -1; + bufferptr ptr(res_len); + if (snappy::RawUncompress(&source, ptr.c_str())) { + dst.append(ptr); + return 0; + } + return -1; + } +}; + +#endif diff --git a/src/vstart.sh b/src/vstart.sh index bfbfc39bb8df..e5154792cff8 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -33,17 +33,31 @@ if [ -e CMakeCache.txt ]; then ln -sf ../${file} ec_plugins/`basename $file` done [ -z "$EC_PATH" ] && EC_PATH=./ec_plugins + # check for compression plugins + mkdir -p .libs/compressor + for file in ./src/compressor/*/libcs_*.so*; + do + ln -sf ../${file} .libs/compressor/`basename $file` + done +else + mkdir -p .libs/compressor + for f in `ls -d compressor/*/`; + do + cp .libs/libceph_`basename $f`.so* .libs/compressor/; + done fi if [ -z "$CEPH_BUILD_ROOT" ]; then [ -z "$CEPH_BIN" ] && CEPH_BIN=. [ -z "$CEPH_LIB" ] && CEPH_LIB=.libs [ -z $EC_PATH ] && EC_PATH=$CEPH_LIB + [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB [ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB else [ -z $CEPH_BIN ] && CEPH_BIN=$CEPH_BUILD_ROOT/bin [ -z $CEPH_LIB ] && CEPH_LIB=$CEPH_BUILD_ROOT/lib [ -z $EC_PATH ] && EC_PATH=$CEPH_LIB/erasure-code + [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB/compressor [ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB/rados-classes fi @@ -433,6 +447,7 @@ if [ "$start_mon" -eq 1 ]; then mon data avail warn = 10 mon data avail crit = 1 erasure code dir = $EC_PATH + plugin dir = $CS_PATH osd pool default erasure code profile = plugin=jerasure technique=reed_sol_van k=2 m=1 ruleset-failure-domain=osd rgw frontends = fastcgi, civetweb port=$CEPH_RGW_PORT rgw dns name = localhost