LIBAUTH = libauth.la
LIBMSG = libmsg.la
LIBCRUSH = libcrush.la
-LIBCOMPRESSOR = libcompressor.la -lsnappy
+LIBCOMPRESSOR = libcompressor.la
LIBJSON_SPIRIT = libjson_spirit.la
LIBKV = libkv.a
LIBLOG = liblog.la
include os/Makefile.am
include osd/Makefile.am
include erasure-code/Makefile.am
+include compressor/Makefile.am
include osdc/Makefile.am
include client/Makefile.am
include global/Makefile.am
include test/Makefile.am
include tools/Makefile.am
include Makefile-rocksdb.am
-include compressor/Makefile.am
include tracing/Makefile.am
include pybind/Makefile.am
# important; libmsg before libauth!
LIBCOMMON_DEPS += \
$(LIBERASURE_CODE) \
+ $(LIBCOMPRESSOR) \
$(LIBMSG) $(LIBAUTH) \
$(LIBCRUSH) $(LIBJSON_SPIRIT) $(LIBLOG) $(LIBARCH) \
$(BOOST_RANDOM_LIBS)
OPTION(restapi_base_url, OPT_STR, "") // "
OPTION(fatal_signal_handlers, OPT_BOOL, true)
OPTION(erasure_code_dir, OPT_STR, CEPH_PKGLIBDIR"/erasure-code") // default location for erasure-code plugins
+OPTION(compression_dir, OPT_STR, CEPH_PKGLIBDIR"/compressor") // default location for compression plugins
OPTION(log_file, OPT_STR, "/var/log/ceph/$cluster-$name.log") // default changed by common_preinit()
OPTION(log_max_new, OPT_INT, 1000) // default changed by common_preinit()
OPTION(osd_pool_default_min_size, OPT_INT, 0) // 0 means no specific default; ceph will use size-size/2
OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Configure in global or mon section of ceph.conf
OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num
+OPTION(osd_compression_plugins, OPT_STR,
+ "snappy"
+ ) // list of compression plugins
OPTION(osd_pool_default_erasure_code_profile,
OPT_STR,
"plugin=jerasure "
#define dout_prefix *_dout << "compressor "
AsyncCompressor::AsyncCompressor(CephContext *c):
- compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c),
+ compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c),
job_id(0),
- compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
+ compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
job_lock("AsyncCompressor::job_lock"),
compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
}
class AsyncCompressor {
private:
- Compressor *compressor;
+ CompressorRef compressor;
CephContext *cct;
atomic_t job_id;
vector<int> coreids;
--- /dev/null
+## compressor plugins
+
+set(compressorlibdir ${LIBRARY_OUTPUT_PATH}/compressor)
+
+add_subdirectory(snappy)
+
+add_library(compressor_objs OBJECT Compressor.cc)
+
+add_custom_target(compressor_plugins DEPENDS
+ ceph_snappy)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph distributed storage system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef COMPRESSION_PLUGIN_H
+#define COMPRESSION_PLUGIN_H
+
+#include "common/Mutex.h"
+#include "common/PluginRegistry.h"
+#include "Compressor.h"
+
+namespace ceph {
+
+ class CompressionPlugin : public Plugin {
+ public:
+ CompressorRef compressor;
+
+ CompressionPlugin(CephContext *cct) : Plugin(cct),
+ compressor(0)
+ {}
+
+ virtual ~CompressionPlugin() {}
+
+ virtual int factory(CompressorRef *cs,
+ ostream *ss) = 0;
+
+ virtual const char* name() {return "CompressionPlugin";}
+ };
+
+}
+
+#endif
*/
#include "Compressor.h"
-#include "SnappyCompressor.h"
+#include "CompressionPlugin.h"
-Compressor* Compressor::create(const string &type)
+CompressorRef Compressor::create(CephContext *cct, const string &type)
{
- if (type == "snappy")
- return new SnappyCompressor();
-
- assert(0);
+ CompressorRef cs_impl = NULL;
+ stringstream ss;
+ PluginRegistry *reg = cct->get_plugin_registry();
+ CompressionPlugin *factory = dynamic_cast<CompressionPlugin*>(reg->get_with_load("compressor", type));
+ if (factory == NULL) {
+ lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl;
+ return NULL;
+ }
+ int err = factory->factory(&cs_impl, &ss);
+ if (err)
+ lderr(cct) << __func__ << " factory return error " << err << dendl;
+ return cs_impl;
}
#include "include/int_types.h"
#include "include/Context.h"
+class Compressor;
+typedef shared_ptr<Compressor> CompressorRef;
+
+
class Compressor {
public:
virtual ~Compressor() {}
virtual int compress(bufferlist &in, bufferlist &out) = 0;
virtual int decompress(bufferlist &in, bufferlist &out) = 0;
- static Compressor *create(const string &type);
+ static CompressorRef create(CephContext *cct, const string &type);
};
+
+
#endif
+compressorlibdir = $(pkglibdir)/compressor
+compressorlib_LTLIBRARIES =
+
+include compressor/snappy/Makefile.am
+
libcompressor_la_SOURCES = \
compressor/Compressor.cc \
compressor/AsyncCompressor.cc
+compressor/CompressionPlugin.cc: ./ceph_ver.h
+libcompressor_la_DEPENDENCIES = $(compressorlib_LTLIBRARIES)
+if LINUX
+libcompressor_la_LIBADD = -ldl
+endif # LINUX
noinst_LTLIBRARIES += libcompressor.la
-libcompressor_la_LIBADD = $(LIBCOMMON)
-
noinst_HEADERS += \
compressor/Compressor.h \
compressor/AsyncCompressor.h \
- compressor/SnappyCompressor.h
+ compressor/CompressionPlugin.h
+
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_SNAPPYCOMPRESSOR_H
-#define CEPH_SNAPPYCOMPRESSOR_H
-
-#include <snappy.h>
-#include <snappy-sinksource.h>
-#include "include/buffer.h"
-#include "Compressor.h"
-
-class BufferlistSource : public snappy::Source {
- list<bufferptr>::const_iterator pb;
- size_t pb_off;
- size_t left;
-
- public:
- BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
- virtual ~BufferlistSource() {}
- virtual size_t Available() const { return left; }
- virtual const char* Peek(size_t* len) {
- if (left) {
- *len = pb->length() - pb_off;
- return pb->c_str() + pb_off;
- } else {
- *len = 0;
- return NULL;
- }
- }
- virtual void Skip(size_t n) {
- if (n + pb_off == pb->length()) {
- ++pb;
- pb_off = 0;
- } else {
- pb_off += n;
- }
- left -= n;
- }
-};
-
-class SnappyCompressor : public Compressor {
- public:
- virtual ~SnappyCompressor() {}
- virtual int compress(bufferlist &src, bufferlist &dst) {
- BufferlistSource source(src);
- bufferptr ptr(snappy::MaxCompressedLength(src.length()));
- snappy::UncheckedByteArraySink sink(ptr.c_str());
- snappy::Compress(&source, &sink);
- dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
- return 0;
- }
- virtual int decompress(bufferlist &src, bufferlist &dst) {
- BufferlistSource source(src);
- size_t res_len = 0;
- // Trick, decompress only need first 32bits buffer
- if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
- return -1;
- bufferptr ptr(res_len);
- if (snappy::RawUncompress(&source, ptr.c_str())) {
- dst.append(ptr);
- return 0;
- }
- return -1;
- }
-};
-
-#endif
--- /dev/null
+# snappy
+
+set(snappy_sources
+ CompressionPluginSnappy.cc
+ $<TARGET_OBJECTS:compressor_objs>
+)
+
+add_library(ceph_snappy SHARED ${snappy_sources})
+add_dependencies(ceph_snappy ${CMAKE_SOURCE_DIR}/src/ceph_ver.h)
+target_link_libraries(ceph_snappy ${EXTRALIBS})
+set_target_properties(ceph_snappy PROPERTIES VERSION 2.14.0 SOVERSION 2)
+set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -lsnappy")
+install(TARGETS ceph_snappy DESTINATION lib/compressor)
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "SnappyCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginSnappy : public CompressionPlugin {
+
+public:
+
+ CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ virtual int factory(CompressorRef *cs,
+ ostream *ss)
+ {
+ if (compressor == 0) {
+ SnappyCompressor *interface = new SnappyCompressor();
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ PluginRegistry *instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginSnappy(cct));
+}
--- /dev/null
+# snappy plugin
+noinst_HEADERS += \
+ compressor/snappy/SnappyCompressor.h
+
+snappy_sources = \
+ compressor/Compressor.cc \
+ compressor/snappy/CompressionPluginSnappy.cc
+
+compressor/snappy/CompressionPluginSnappy.cc: ./ceph_ver.h
+
+libceph_snappy_la_SOURCES = ${snappy_sources}
+libceph_snappy_la_CFLAGS = ${AM_CFLAGS} \
+ -I$(srcdir)/compressor/snappy/snappy-1.2.8
+libceph_snappy_la_CXXFLAGS= ${AM_CXXFLAGS} \
+ -I$(srcdir)/compressor/snappy/snappy-1.2.8
+libceph_snappy_la_LIBADD = $(LIBCRUSH) $(PTHREAD_LIBS) $(EXTRALIBS)
+libceph_snappy_la_LDFLAGS = ${AM_LDFLAGS} -lsnappy -version-info 2:0:0
+if LINUX
+libceph_snappy_la_LDFLAGS += -export-symbols-regex '.*__compressor_.*'
+endif
+
+compressorlib_LTLIBRARIES += libceph_snappy.la
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SNAPPYCOMPRESSOR_H
+#define CEPH_SNAPPYCOMPRESSOR_H
+
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include "include/buffer.h"
+#include "compressor/Compressor.h"
+
+class BufferlistSource : public snappy::Source {
+ list<bufferptr>::const_iterator pb;
+ size_t pb_off;
+ size_t left;
+
+ public:
+ BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
+ virtual ~BufferlistSource() {}
+ virtual size_t Available() const { return left; }
+ virtual const char* Peek(size_t* len) {
+ if (left) {
+ *len = pb->length() - pb_off;
+ return pb->c_str() + pb_off;
+ } else {
+ *len = 0;
+ return NULL;
+ }
+ }
+ virtual void Skip(size_t n) {
+ if (n + pb_off == pb->length()) {
+ ++pb;
+ pb_off = 0;
+ } else {
+ pb_off += n;
+ }
+ left -= n;
+ }
+};
+
+class SnappyCompressor : public Compressor {
+ public:
+ virtual ~SnappyCompressor() {}
+ virtual const char* get_method_name() { return "snappy"; }
+ virtual int compress(bufferlist &src, bufferlist &dst) {
+ BufferlistSource source(src);
+ bufferptr ptr(snappy::MaxCompressedLength(src.length()));
+ snappy::UncheckedByteArraySink sink(ptr.c_str());
+ snappy::Compress(&source, &sink);
+ dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
+ return 0;
+ }
+ virtual int decompress(bufferlist &src, bufferlist &dst) {
+ BufferlistSource source(src);
+ size_t res_len = 0;
+ // Trick, decompress only need first 32bits buffer
+ if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
+ return -1;
+ bufferptr ptr(res_len);
+ if (snappy::RawUncompress(&source, ptr.c_str())) {
+ dst.append(ptr);
+ return 0;
+ }
+ return -1;
+ }
+};
+
+#endif
ln -sf ../${file} ec_plugins/`basename $file`
done
[ -z "$EC_PATH" ] && EC_PATH=./ec_plugins
+ # check for compression plugins
+ mkdir -p .libs/compressor
+ for file in ./src/compressor/*/libcs_*.so*;
+ do
+ ln -sf ../${file} .libs/compressor/`basename $file`
+ done
+else
+ mkdir -p .libs/compressor
+ for f in `ls -d compressor/*/`;
+ do
+ cp .libs/libceph_`basename $f`.so* .libs/compressor/;
+ done
fi
if [ -z "$CEPH_BUILD_ROOT" ]; then
[ -z "$CEPH_BIN" ] && CEPH_BIN=.
[ -z "$CEPH_LIB" ] && CEPH_LIB=.libs
[ -z $EC_PATH ] && EC_PATH=$CEPH_LIB
+ [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB
[ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB
else
[ -z $CEPH_BIN ] && CEPH_BIN=$CEPH_BUILD_ROOT/bin
[ -z $CEPH_LIB ] && CEPH_LIB=$CEPH_BUILD_ROOT/lib
[ -z $EC_PATH ] && EC_PATH=$CEPH_LIB/erasure-code
+ [ -z $CS_PATH ] && CS_PATH=$CEPH_LIB/compressor
[ -z $OBJCLASS_PATH ] && OBJCLASS_PATH=$CEPH_LIB/rados-classes
fi
mon data avail warn = 10
mon data avail crit = 1
erasure code dir = $EC_PATH
+ plugin dir = $CS_PATH
osd pool default erasure code profile = plugin=jerasure technique=reed_sol_van k=2 m=1 ruleset-failure-domain=osd
rgw frontends = fastcgi, civetweb port=$CEPH_RGW_PORT
rgw dns name = localhost