]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Added compression handling when syncing.
authorAdam Kupczyk <akupczyk@mirantis.com>
Wed, 14 Sep 2016 15:52:22 +0000 (17:52 +0200)
committerAdam Kupczyk <akupczyk@mirantis.com>
Wed, 2 Nov 2016 10:35:23 +0000 (11:35 +0100)
Signed-off-by: Adam Kupczyk <akupczyk@mirantis.com>
src/rgw/rgw_rados.cc

index 67eaa76cd886531a4e80c4cab0f46b8d5263fbd8..cb39809ad6e7cf6f21e1da0fa66a298fab62faa2 100644 (file)
@@ -43,8 +43,8 @@
 #include "cls/user/cls_user_client.h"
 
 #include "rgw_tools.h"
-
 #include "rgw_coroutine.h"
+#include "rgw_compression.h"
 
 #include "rgw_boost_asio_yield.h"
 #undef fork // fails to compile RGWPeriod::fork() below
@@ -6640,19 +6640,31 @@ bool RGWRados::aio_completed(void *handle)
 
 class RGWRadosPutObj : public RGWGetDataCB
 {
+  CephContext* cct;
   rgw_obj obj;
+  RGWPutObjDataProcessor *filter;
   RGWPutObjProcessor_Atomic *processor;
   RGWOpStateSingleOp *opstate;
   void (*progress_cb)(off_t, void *);
   void *progress_data;
   bufferlist extra_data_bl;
   uint64_t extra_data_len;
+  uint64_t data_len;
 public:
-  RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops,
-                 void (*_progress_cb)(off_t, void *), void *_progress_data) : processor(p), opstate(_ops),
-                                                                       progress_cb(_progress_cb),
-                                                                       progress_data(_progress_data),
-                                                                       extra_data_len(0) {}
+  RGWRadosPutObj(CephContext* cct,
+                 RGWPutObjDataProcessor *filter,
+                 RGWPutObjProcessor_Atomic *p,
+                 RGWOpStateSingleOp *_ops,
+                 void (*_progress_cb)(off_t, void *),
+                 void *_progress_data) :
+                       cct(cct),
+                       filter(filter),
+                       processor(p),
+                       opstate(_ops),
+                       progress_cb(_progress_cb),
+                       progress_data(_progress_data),
+                       extra_data_len(0),
+                       data_len(0) {}
   int handle_data(bufferlist& bl, off_t ofs, off_t len) {
     if (progress_cb) {
       progress_cb(ofs, progress_data);
@@ -6671,14 +6683,15 @@ public:
         return 0;
       }
     }
+    data_len += bl.length();
     bool again = false;
 
     bool need_opstate = true;
 
     do {
-      void *handle;
+      void *handle = NULL;
       rgw_obj obj;
-      int ret = processor->handle_data(bl, ofs, &handle, &obj, &again);
+      int ret = filter->handle_data(bl, ofs, &handle, &obj, &again);
       if (ret < 0)
         return ret;
 
@@ -6688,10 +6701,10 @@ public:
          */
         ret = opstate->renew_state();
         if (ret < 0) {
-          ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
-          int r = processor->throttle_data(handle, obj, false);
+          ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
+          int r = filter->throttle_data(handle, obj, false);
           if (r < 0) {
-            ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
+            ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
           }
           /* could not renew state! might have been marked as cancelled */
           return ret;
@@ -6700,7 +6713,7 @@ public:
         need_opstate = false;
       }
 
-      ret = processor->throttle_data(handle, obj, false);
+      ret = filter->throttle_data(handle, obj, false);
       if (ret < 0)
         return ret;
     } while (again);
@@ -6714,6 +6727,10 @@ public:
     extra_data_len = len;
   }
 
+  uint64_t get_data_len() {
+    return data_len;
+  }
+
   int complete(string& etag, real_time *mtime, real_time set_mtime, map<string, bufferlist>& attrs, real_time delete_at) {
     return processor->complete(etag, mtime, set_mtime, attrs, delete_at);
   }
@@ -7064,7 +7081,17 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     }
   }
 
-  RGWRadosPutObj cb(&processor, opstate, progress_cb, progress_data);
+  boost::optional<RGWPutObj_Compress> compressor;
+
+  RGWPutObjDataProcessor *filter = &processor;
+  bool compression_enabled = cct->_conf->rgw_compression_type != "none";
+  if (compression_enabled) {
+    compressor = boost::in_place(cct, filter);
+    filter = &*compressor;
+  }
+
+  RGWRadosPutObj cb(cct, filter, &processor, opstate, progress_cb, progress_data);
+
   string etag;
   map<string, string> req_headers;
   real_time set_mtime;
@@ -7112,6 +7139,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
       JSONDecoder::decode_json("attrs", src_attrs, &jp);
 
+      src_attrs.erase(RGW_ATTR_COMPRESSION);
       src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
       if (source_zone.empty()) { /* need to preserve expiration if copy in the same zonegroup */
         src_attrs.erase(RGW_ATTR_DELETE_AT);
@@ -7126,6 +7154,15 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
        }
       }
     }
+    if (compression_enabled && compressor->is_compressed()) {
+      bufferlist tmp;
+      RGWCompressionInfo cs_info;
+      cs_info.compression_type = cct->_conf->rgw_compression_type;
+      cs_info.orig_size = cb.get_data_len();
+      cs_info.blocks = move(compressor->get_compression_blocks());
+      ::encode(cs_info, tmp);
+      src_attrs[RGW_ATTR_COMPRESSION] = tmp;
+    }
   }
 
   if (src_mtime) {