]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add decompression & cmpressor attrs save
authorVed-vampir <akiselyova@mirantis.com>
Fri, 26 Feb 2016 14:11:30 +0000 (17:11 +0300)
committerAdam Kupczyk <akupczyk@mirantis.com>
Wed, 2 Nov 2016 10:31:09 +0000 (11:31 +0100)
Signed-off-by: Alyona Kiseleva <akiselyova@mirantis.com>
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 1c486028fd506dd655fb6299aa2544e9bdd17764..e9c77e71b61877858f8e3f01d8a29f8edb761599 100644 (file)
@@ -108,6 +108,9 @@ using ceph::crypto::MD5;
 #define RGW_ATTR_OLH_ID_TAG     RGW_ATTR_OLH_PREFIX "idtag"
 #define RGW_ATTR_OLH_PENDING_PREFIX RGW_ATTR_OLH_PREFIX "pending."
 
+#define RGW_ATTR_COMPRESSION    RGW_ATTR_PREFIX "compression"
+#define RGW_ATTR_COMPRESSION_ORIG_SIZE    RGW_ATTR_PREFIX "orig_size"
+
 /* RGW File Attributes */
 #define RGW_ATTR_UNIX_KEY1      RGW_ATTR_PREFIX "unix-key1"
 #define RGW_ATTR_UNIX1          RGW_ATTR_PREFIX "unix1"
index 17409aaa06427a1cb87e8842007abef84f77d7e3..982c25a0f5b0aa5303befb5fc5636968acb607fd 100644 (file)
@@ -39,6 +39,9 @@
 
 #include "include/assert.h"
 
+#include "compressor/Compressor.h"
+
+
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
@@ -1262,7 +1265,28 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
     gc_invalidate_time = start_time;
     gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
   }
-
+  // compression stuff
+  bool need_decompress = (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end());
+  if (need_decompress) {   // or it's the first part and flag is set
+    ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part" << dendl;
+    string compression_type = attrs[RGW_ATTR_COMPRESSION].c_str();
+    CompressorRef compressor = Compressor::create(s->cct, compression_type);
+    if (!compressor.get()) {
+      // if compressor isn't available - error, because cannot return decompressed data?
+      lderr(s->cct) << "Cannot load compressor of type " << compression_type 
+                       << "for rgw, check rgw_compression_type config option" << dendl;
+      return -1;
+    } else {
+      bufferlist out_bl;
+      int cr = compressor->decompress(bl, out_bl);
+      if (cr != 0) {
+        lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
+        return -1;
+      }
+      return send_response_data(out_bl, bl_ofs, out_bl.length());
+    }
+  }
+  // end of compression stuff
   return send_response_data(bl, bl_ofs, bl_len);
 }
 
@@ -2983,6 +3007,18 @@ void RGWPutObj::execute()
 
   hash.Final(m);
 
+  if (processor->is_compressed()) {
+    bufferlist tmp;
+    tmp.append(s->cct->_conf->rgw_compression_type.c_str(), s->cct->_conf->rgw_compression_type.length()+1);
+    attrs[RGW_ATTR_COMPRESSION] = tmp;
+    tmp.clear();
+    char sz [20];
+    snprintf(sz, sizeof(sz), "%lu", s->obj_size);
+    tmp.append(sz, strlen(sz)+1);
+    attrs[RGW_ATTR_COMPRESSION_ORIG_SIZE] = tmp;
+  }
+
+
   buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
 
   etag = calc_md5;
@@ -3864,6 +3900,8 @@ void RGWGetACLs::execute()
   acls = ss.str();
 }
 
+
+
 int RGWPutACLs::verify_permission()
 {
   bool perm;
index 8744894ac34f7853bcb9c4e92fcc7c4acb96c6cc..fb277cba58dde51a36c914256711e5d5e6b839f5 100644 (file)
@@ -2379,20 +2379,20 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
       ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type 
                        << "for rgw, check rgw_compression_type config option" << dendl;
       compressed = false;
+      in_bl.claim(bl);
     } else {
-      bufferlist out;
-      int cr = compressor->compress(bl, out);
+      int cr = compressor->compress(bl, in_bl);
       if (cr != 0) {
         ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
         compressed = false;
+        in_bl.claim(bl);
       } else {
         compressed = true;
-        in_bl = out;
       }
     }
   } else {
     compressed = false;
-    in_bl = bl;
+    in_bl.claim(bl);
   }
   // end of compression stuff
 
@@ -2441,6 +2441,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive);
   if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
     in_bl.clear();
+    bl.clear();
   }
   return ret;
 }
@@ -6465,7 +6466,6 @@ int RGWRados::Object::Write::write_meta(uint64_t size,
 
   /* update quota cache */
   store->quota_handler->update_stats(meta.owner, bucket, (orig_exists ? 0 : 1), size, orig_size);
-
   return 0;
 
 done_cancel:
@@ -9052,13 +9052,17 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend)
     return r;
   }
 
+  int dec_size = 0;
   if (params.attrs) {
     *params.attrs = astate->attrset;
     if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
       for (iter = params.attrs->begin(); iter != params.attrs->end(); ++iter) {
-        ldout(cct, 20) << "Read xattr: " << iter->first << dendl;
+        ldout(cct, 20) << "Read xattr: " << iter->first << "=" << iter->second << dendl;
       }
     }
+    if (params.attrs->find(RGW_ATTR_COMPRESSION) != params.attrs->end()) {
+      dec_size = atoi(params.attrs->at(RGW_ATTR_COMPRESSION_ORIG_SIZE).c_str());
+    }
   }
 
   /* Convert all times go GMT to make them compatible */
@@ -9135,8 +9139,12 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend)
     *pofs = ofs;
   if (pend)
     *pend = end;
-  if (params.read_size)
-    *params.read_size = (ofs <= end ? end + 1 - ofs : 0);
+  if (params.read_size) {
+    if (dec_size)
+      *params.read_size = dec_size;
+    else
+      *params.read_size = (ofs <= end ? end + 1 - ofs : 0);
+  }
   if (params.obj_size)
     *params.obj_size = astate->size;
   if (params.lastmod)
index 617c28ec27ae9051ff046d10aac9099c56272a88..0c1ace4243586fb9d0d896dc40b5cd271fb0eb75 100644 (file)
@@ -3267,6 +3267,7 @@ public:
   CephContext *ctx();
 
   bool is_canceled() { return canceled; }
+  bool is_compressed() { return compressed; }
 }; /* RGWPutObjProcessor */
 
 struct put_obj_aio_info {