]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw_file: enable compression filter (stores)
authorMatt Benjamin <mbenjamin@redhat.com>
Thu, 4 May 2017 19:47:39 +0000 (15:47 -0400)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 11 Jul 2017 09:14:30 +0000 (05:14 -0400)
Previously NFS clients could access compressed data, but the
RGWWriteDataRequest lacked glue to attach an inbound
compression filter.

Fixes: http://tracker.ceph.com/issues/20462
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index ca8a5c4b4b879af7e85b6252691382cea0881232..89d53f7dfb92e2eae7a4ae1ed9f5585de232c01d 100644 (file)
@@ -1286,6 +1286,27 @@ namespace rgw {
     processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                                 &multipart);
     op_ret = processor->prepare(get_store(), NULL);
+    if (op_ret < 0) {
+      ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
+                       << dendl;
+      goto done;
+    }
+
+    filter = processor;
+
+    if (compression_type == "none")
+      compression_type = get_store()->get_zone_params().get_compression_type(
+       s->bucket_info.placement_rule);
+    if (compression_type != "none") {
+      plugin = Compressor::create(s->cct, compression_type);
+    if (! plugin) {
+      ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
+                       << compression_type << dendl;
+    } else {
+      compressor.emplace(s->cct, plugin, filter);
+      filter = &*compressor;
+    }
+  }
 
   done:
     return op_ret;
@@ -1313,8 +1334,7 @@ namespace rgw {
       orig_data = data;
     }
     hash.Update((const byte *)data.c_str(), data.length());
-    op_ret = put_data_and_throttle(processor, data, ofs,
-                                  need_to_wait);
+    op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
     if (op_ret < 0) {
       if (!need_to_wait || op_ret != -EEXIST) {
        ldout(s->cct, 20) << "processor->thottle_data() returned ret="
@@ -1331,6 +1351,7 @@ namespace rgw {
       dispose_processor(processor);
       processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
                                   &multipart);
+      filter = processor;
 
       string oid_rand;
       char buf[33];
@@ -1344,6 +1365,12 @@ namespace rgw {
        goto done;
       }
 
+      /* restore compression filter, if any */
+      if (compressor) {
+       compressor.emplace(s->cct, plugin, filter);
+       filter = &*compressor;
+      }
+
       op_ret = put_data_and_throttle(processor, data, ofs, false);
       if (op_ret < 0) {
        goto done;
@@ -1384,6 +1411,20 @@ namespace rgw {
 
     hash.Final(m);
 
+    if (compressor && compressor->is_compressed()) {
+      bufferlist tmp;
+      RGWCompressionInfo cs_info;
+      cs_info.compression_type = plugin->get_type_name();
+      cs_info.orig_size = s->obj_size;
+      cs_info.blocks = std::move(compressor->get_compression_blocks());
+      ::encode(cs_info, tmp);
+      attrs[RGW_ATTR_COMPRESSION] = tmp;
+      ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
+                       << " with type=" << cs_info.compression_type
+                       << ", orig_size=" << cs_info.orig_size
+                       << ", blocks=" << cs_info.blocks.size() << dendl;
+    }
+
     buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
     etag = calc_md5;
 
index b93b592840930771d518461fa5bff4c013b0c04e..9a07d8340bbe1610266f4be2f884b68a987bd0b2 100644 (file)
@@ -34,6 +34,7 @@
 #include "rgw_lib.h"
 #include "rgw_ldap.h"
 #include "rgw_token.h"
+#include "rgw_compression.h"
 
 
 /* XXX
@@ -2186,7 +2187,11 @@ public:
   const std::string& bucket_name;
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
-  RGWPutObjProcessor *processor;
+  RGWPutObjProcessor* processor;
+  RGWPutObjDataProcessor* filter;
+  boost::optional<RGWPutObj_Compress> compressor;
+  std::string compression_type = "none";
+  CompressorRef plugin;
   buffer::list data;
   uint64_t timer_id;
   MD5 hash;
@@ -2198,8 +2203,8 @@ public:
   RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
                  const std::string& _bname, const std::string& _oname)
     : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
-      rgw_fh(_fh), processor(nullptr), real_ofs(0), bytes_written(0),
-      multipart(false), eio(false) {
+      rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
+      bytes_written(0), multipart(false), eio(false) {
 
     int ret = header_init();
     if (ret == 0) {