]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
compressor/zstd: add zstd compressor 13075/head
authorSage Weil <sage@redhat.com>
Thu, 19 Jan 2017 18:57:33 +0000 (12:57 -0600)
committerKefu Chai <kchai@redhat.com>
Wed, 25 Jan 2017 16:37:23 +0000 (00:37 +0800)
Build/link of zstd itself is maybe not ideal, but it works fine.

Signed-off-by: Sage Weil <sage@redhat.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/compressor/CMakeLists.txt
src/compressor/Compressor.cc
src/compressor/Compressor.h
src/compressor/zstd/CMakeLists.txt [new file with mode: 0644]
src/compressor/zstd/CompressionPluginZstd.cc [new file with mode: 0644]
src/compressor/zstd/ZstdCompressor.h [new file with mode: 0644]
src/test/compressor/test_compression.cc

index 436ea3eb1df30f68ef338fcd740013e367b14dff..c4d3b82c91d8bde1ce3d1b52b9bb63724a516129 100644 (file)
@@ -10,14 +10,16 @@ set(compressor_plugin_dir ${CMAKE_INSTALL_PKGLIBDIR}/compressor)
 
 add_subdirectory(snappy)
 add_subdirectory(zlib)
+add_subdirectory(zstd)
 
 add_custom_target(compressor_plugins DEPENDS
     ceph_snappy
-    ceph_zlib)
+    ceph_zlib
+    ceph_zstd)
 
 if(WITH_EMBEDDED)
   include(MergeStaticLibraries)
   add_library(cephd_compressor_base STATIC ${compressor_srcs})
   set_target_properties(cephd_compressor_base PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
-  merge_static_libraries(cephd_compressor cephd_compressor_base cephd_compressor_snappy cephd_compressor_zlib)
+  merge_static_libraries(cephd_compressor cephd_compressor_base cephd_compressor_snappy cephd_compressor_zlib cephd_compressor_zstd)
 endif()
index cc6aababb7769853e8fd6b09ccf02f22b5d3f0c5..6b49b788f838fe79ec3400e15dc5cba2b6c8339d 100644 (file)
 
 const char * Compressor::get_comp_alg_name(int a) {
   switch (a) {
-    case COMP_ALG_NONE: return "none";
-    case COMP_ALG_SNAPPY: return "snappy";
-    case COMP_ALG_ZLIB: return "zlib";
-    default: return "???";
+  case COMP_ALG_NONE: return "none";
+  case COMP_ALG_SNAPPY: return "snappy";
+  case COMP_ALG_ZLIB: return "zlib";
+  case COMP_ALG_ZSTD: return "zstd";
+  default: return "???";
   }
 }
 
@@ -31,6 +32,8 @@ boost::optional<Compressor::CompressionAlgorithm> Compressor::get_comp_alg_type(
     return COMP_ALG_SNAPPY;
   if (s == "zlib")
     return COMP_ALG_ZLIB;
+  if (s == "zstd")
+    return COMP_ALG_ZSTD;
   if (s == "")
     return COMP_ALG_NONE;
 
index 89687f2ddc9910de9f3673ea810deda4201ef9f8..8be13043d69db604bb4201d8e09baf368db4299d 100644 (file)
@@ -29,6 +29,7 @@ public:
     COMP_ALG_NONE = 0,
     COMP_ALG_SNAPPY = 1,
     COMP_ALG_ZLIB = 2,
+    COMP_ALG_ZSTD = 3,
     COMP_ALG_LAST      //the last value for range checks
   };
   // compression options
diff --git a/src/compressor/zstd/CMakeLists.txt b/src/compressor/zstd/CMakeLists.txt
new file mode 100644 (file)
index 0000000..0332834
--- /dev/null
@@ -0,0 +1,42 @@
+# zstd
+
+# libzstd - build it statically
+set(ZSTD_C_FLAGS -fPIC -Wno-unused-variable -O3)
+
+include(ExternalProject)
+ExternalProject_Add(zstd_ext
+  SOURCE_DIR ${CMAKE_SOURCE_DIR}/src/zstd/build/cmake
+  CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
+             -DCMAKE_C_FLAGS=${ZSTD_C_FLAGS}
+  BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/libzstd
+  BUILD_COMMAND $(MAKE) libzstd_static
+  INSTALL_COMMAND "true")
+
+# force zstd make to be called on each time
+ExternalProject_Add_Step(zstd_ext forcebuild
+  DEPENDEES configure
+  DEPENDERS build
+  COMMAND "true"
+  ALWAYS 1)
+
+add_library(zstd STATIC IMPORTED)
+set_property(TARGET zstd PROPERTY
+  IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/libzstd/lib/libzstd.a")
+add_dependencies(zstd zstd_ext)
+set(ZSTD_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/src/zstd/lib)
+
+#
+set(zstd_sources
+  CompressionPluginZstd.cc
+)
+
+add_library(ceph_zstd SHARED ${zstd_sources})
+add_dependencies(ceph_zstd ${CMAKE_SOURCE_DIR}/src/ceph_ver.h)
+target_link_libraries(ceph_zstd zstd)
+set_target_properties(ceph_zstd PROPERTIES VERSION 2.0.0 SOVERSION 2)
+install(TARGETS ceph_zstd DESTINATION ${compressor_plugin_dir})
+
+if(WITH_EMBEDDED)
+  add_library(cephd_compressor_zstd STATIC ${zstd_sources})
+  set_target_properties(cephd_compressor_zstd PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
+endif()
diff --git a/src/compressor/zstd/CompressionPluginZstd.cc b/src/compressor/zstd/CompressionPluginZstd.cc
new file mode 100644 (file)
index 0000000..2170dbd
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ *  This library is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public
+ *  License as published by the Free Software Foundation; either
+ *  version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <ostream>
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "ZstdCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginZstd : public CompressionPlugin {
+
+public:
+
+  explicit CompressionPluginZstd(CephContext* cct) : CompressionPlugin(cct)
+  {}
+
+  virtual int factory(CompressorRef *cs,
+                      std::ostream *ss)
+  {
+    if (compressor == 0) {
+      ZstdCompressor *interface = new ZstdCompressor();
+      compressor = CompressorRef(interface);
+    }
+    *cs = compressor;
+    return 0;
+  }
+};
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+  return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+                       const std::string& type,
+                       const std::string& name)
+{
+  PluginRegistry *instance = cct->get_plugin_registry();
+
+  return instance->add(type, name, new CompressionPluginZstd(cct));
+}
diff --git a/src/compressor/zstd/ZstdCompressor.h b/src/compressor/zstd/ZstdCompressor.h
new file mode 100644 (file)
index 0000000..fd465cd
--- /dev/null
@@ -0,0 +1,99 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ZSTDCOMPRESSOR_H
+#define CEPH_ZSTDCOMPRESSOR_H
+
+#include "zstd/lib/zstd.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "compressor/Compressor.h"
+
+#define COMPRESSION_LEVEL 5
+
+class ZstdCompressor : public Compressor {
+ public:
+  ZstdCompressor() : Compressor(COMP_ALG_ZSTD, "zstd") {}
+
+  int compress(const bufferlist &src, bufferlist &dst) override {
+    bufferptr outptr = buffer::create_page_aligned(
+      ZSTD_compressBound(src.length()));
+    ZSTD_outBuffer_s outbuf;
+    outbuf.dst = outptr.c_str();
+    outbuf.size = outptr.length();
+    outbuf.pos = 0;
+
+    ZSTD_CStream *s = ZSTD_createCStream();
+    ZSTD_initCStream(s, COMPRESSION_LEVEL);
+    auto p = src.begin();
+    size_t left = src.length();
+    while (left) {
+      assert(!p.end());
+      struct ZSTD_inBuffer_s inbuf;
+      inbuf.pos = 0;
+      inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src);
+      ZSTD_compressStream(s, &outbuf, &inbuf);
+      left -= inbuf.size;
+    }
+    assert(p.end());
+    ZSTD_flushStream(s, &outbuf);
+    ZSTD_endStream(s, &outbuf);
+    ZSTD_freeCStream(s);
+
+    // prefix with decompressed length
+    ::encode((uint32_t)src.length(), dst);
+    dst.append(outptr, 0, outbuf.pos);
+    return 0;
+  }
+
+  int decompress(const bufferlist &src, bufferlist &dst) override {
+    bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
+    return decompress(i, src.length(), dst);
+  }
+
+  int decompress(bufferlist::iterator &p,
+                size_t compressed_len,
+                bufferlist &dst) override {
+    if (compressed_len < 4) {
+      return -1;
+    }
+    compressed_len -= 4;
+    uint32_t dst_len;
+    ::decode(dst_len, p);
+
+    bufferptr dstptr(dst_len);
+    ZSTD_outBuffer_s outbuf;
+    outbuf.dst = dstptr.c_str();
+    outbuf.size = dstptr.length();
+    outbuf.pos = 0;
+    ZSTD_DStream *s = ZSTD_createDStream();
+    ZSTD_initDStream(s);
+    while (compressed_len > 0) {
+      if (p.end()) {
+       return -1;
+      }
+      ZSTD_inBuffer_s inbuf;
+      inbuf.pos = 0;
+      inbuf.size = p.get_ptr_and_advance(compressed_len, (const char**)&inbuf.src);
+      ZSTD_decompressStream(s, &outbuf, &inbuf);
+      compressed_len -= inbuf.size;
+    }
+    ZSTD_freeDStream(s);
+
+    dst.append(dstptr, 0, outbuf.pos);
+    return 0;
+  }
+};
+
+#endif
index 1b9782a0467236ff912f144c9509b447a1dd5217..11e68932ad2a20727529a5a0d58ca0517a600d63 100644 (file)
@@ -324,7 +324,8 @@ INSTANTIATE_TEST_CASE_P(
   ::testing::Values(
     "zlib/isal",
     "zlib/noisal",
-    "snappy"));
+    "snappy",
+    "zstd"));
 
 TEST(ZlibCompressor, zlib_isal_compatibility)
 {