]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add compression to RGWPutObj
authorVed-vampir <akiselyova@mirantis.com>
Tue, 16 Feb 2016 14:57:06 +0000 (17:57 +0300)
committerAdam Kupczyk <akupczyk@mirantis.com>
Wed, 2 Nov 2016 10:31:09 +0000 (11:31 +0100)
Signed-off-by: Alyona Kiseleva <akiselyova@mirantis.com>
src/common/config_opts.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest.cc

index 2cf5080d91444d5bf3ea2ccb2fb6a55fb42f23bf..33486b51c61bfc6a124b34ea1bf5631fe2a12280 100644 (file)
@@ -1524,6 +1524,8 @@ OPTION(mon_mgr_beacon_grace, OPT_INT, 30)  // How long to wait to failover
 OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing
 
 OPTION(rgw_rest_getusage_op_compat, OPT_BOOL, false) // dump description of total stats for s3 GetUsage API
+OPTION(rgw_compression_enabled, OPT_BOOL, true) // to use compression on rgw level
+OPTION(rgw_compression_type, OPT_STR, "zlib") // type of compressor
 
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter
index b166e08f93a1cefcdbb41a7b55f1ed98052d3301..17409aaa06427a1cb87e8842007abef84f77d7e3 100644 (file)
@@ -2881,6 +2881,10 @@ void RGWPutObj::execute()
       len = data.length();
     }
 
+    if (need_calc_md5) {
+      hash.Update((const byte *)data.c_str(), data.length());
+    }
+
     /* save data for producing torrent data */
     torrent.save_data(data_in);
 
@@ -2977,9 +2981,6 @@ void RGWPutObj::execute()
     goto done;
   }
 
-  if (need_calc_md5) {
-    processor->complete_hash(&hash);
-  }
   hash.Final(m);
 
   buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
index 4b42cdcbdadfe175b012c3b0a4583fd76b6f878e..8744894ac34f7853bcb9c4e92fcc7c4acb96c6cc 100644 (file)
@@ -70,6 +70,8 @@ using namespace librados;
 #include "rgw_data_sync.h"
 #include "rgw_realm_watcher.h"
 
+#include "compressor/Compressor.h"
+
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
@@ -2364,38 +2366,66 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   *again = false;
 
   *phandle = NULL;
+
+  bufferlist in_bl;
+
+  // compression stuff
+  if ((ofs > 0 && compressed) ||                                // if previous part was compressed
+      (ofs == 0 && store->ctx()->_conf->rgw_compression_enabled)) {   // or it's the first part and flag is set
+    ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part" << dendl;
+    CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type);
+    if (!compressor.get()) {
+      // if compressor isn't available - just do not use it with log warning?
+      ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type 
+                       << "for rgw, check rgw_compression_type config option" << dendl;
+      compressed = false;
+    } else {
+      bufferlist out;
+      int cr = compressor->compress(bl, out);
+      if (cr != 0) {
+        ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
+        compressed = false;
+      } else {
+        compressed = true;
+        in_bl = out;
+      }
+    }
+  } else {
+    compressed = false;
+    in_bl = bl;
+  }
+  // end of compression stuff
+
+
   if (extra_data_len) {
-    size_t extra_len = bl.length();
+    size_t extra_len = in_bl.length();
     if (extra_len > extra_data_len)
       extra_len = extra_data_len;
 
     bufferlist extra;
-    bl.splice(0, extra_len, &extra);
+    in_bl.splice(0, extra_len, &extra);
     extra_data_bl.append(extra);
 
     extra_data_len -= extra_len;
-    if (bl.length() == 0) {
+    if (in_bl.length() == 0) {
       return 0;
     }
   }
 
   uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
 
-  pending_data_bl.claim_append(bl);
+  pending_data_bl.claim_append(in_bl);
   if (pending_data_bl.length() < max_write_size)
     return 0;
 
-  pending_data_bl.splice(0, max_write_size, &bl);
+  pending_data_bl.splice(0, max_write_size, &in_bl);
 
   /* do we have enough data pending accumulated that needs to be written? */
   *again = (pending_data_bl.length() >= max_chunk_size);
 
   if (!data_ofs && !immutable_head()) {
-    first_chunk.claim(bl);
+    first_chunk.claim(in_bl);
     obj_len = (uint64_t)first_chunk.length();
-    if (hash) {
-      hash->Update((const byte *)first_chunk.c_str(), obj_len);
-    }
     int r = prepare_next_part(obj_len);
     if (r < 0) {
       return r;
@@ -2404,16 +2434,13 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
     return 0;
   }
   off_t write_ofs = data_ofs;
-  data_ofs = write_ofs + bl.length();
+  data_ofs = write_ofs + in_bl.length();
   bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
                                                         we could be racing with another upload, to the same
                                                         object and cleanup can be messy */
-  int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
+  int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive);
   if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
-    if (hash) {
-      hash->Update((const byte *)bl.c_str(), bl.length());
-    }
-    bl.clear();
+    in_bl.clear();
   }
   return ret;
 }
index 463118c63fe5727dcecd994649b7cd7efad7474c..617c28ec27ae9051ff046d10aac9099c56272a88 100644 (file)
@@ -3240,6 +3240,7 @@ protected:
   RGWRados *store;
   RGWObjectCtx& obj_ctx;
   bool is_complete;
+  bool compressed;
   RGWBucketInfo bucket_info;
   bool canceled;
 
@@ -3248,7 +3249,7 @@ protected:
                           const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
 
 public:
-  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi), canceled(false) {}
+  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), compressed(false), bucket_info(_bi), canceled(false) {}
   virtual ~RGWPutObjProcessor() {}
   virtual int prepare(RGWRados *_store, string *oid_rand) {
     store = _store;
index 860aed7ded086cca152954261760c8d70e9c6365..b02fde2b896cdd216ad987b137c07e89a99b401e 100644 (file)
@@ -23,6 +23,8 @@
 #include "rgw_client_io.h"
 #include "rgw_resolve.h"
 
+#include "compressor/Compressor.h"
+
 #include <numeric>
 
 #define dout_subsys ceph_subsys_rgw