From 988462a80108a0040731375bdb71453b00ccb54f Mon Sep 17 00:00:00 2001 From: Ved-vampir Date: Fri, 26 Feb 2016 17:11:30 +0300 Subject: [PATCH] rgw: add decompression & cmpressor attrs save Signed-off-by: Alyona Kiseleva --- src/rgw/rgw_common.h | 3 +++ src/rgw/rgw_op.cc | 40 +++++++++++++++++++++++++++++++++++++++- src/rgw/rgw_rados.cc | 24 ++++++++++++++++-------- src/rgw/rgw_rados.h | 1 + 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 1c486028fd5..e9c77e71b61 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -108,6 +108,9 @@ using ceph::crypto::MD5; #define RGW_ATTR_OLH_ID_TAG RGW_ATTR_OLH_PREFIX "idtag" #define RGW_ATTR_OLH_PENDING_PREFIX RGW_ATTR_OLH_PREFIX "pending." +#define RGW_ATTR_COMPRESSION RGW_ATTR_PREFIX "compression" +#define RGW_ATTR_COMPRESSION_ORIG_SIZE RGW_ATTR_PREFIX "orig_size" + /* RGW File Attributes */ #define RGW_ATTR_UNIX_KEY1 RGW_ATTR_PREFIX "unix-key1" #define RGW_ATTR_UNIX1 RGW_ATTR_PREFIX "unix1" diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 17409aaa064..982c25a0f5b 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -39,6 +39,9 @@ #include "include/assert.h" +#include "compressor/Compressor.h" + + #define dout_subsys ceph_subsys_rgw using namespace std; @@ -1262,7 +1265,28 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) gc_invalidate_time = start_time; gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); } - + // compression stuff + bool need_decompress = (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()); + if (need_decompress) { // or it's the first part and flag is set + ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part" << dendl; + string compression_type = attrs[RGW_ATTR_COMPRESSION].c_str(); + CompressorRef compressor = Compressor::create(s->cct, compression_type); + if (!compressor.get()) { + // if compressor isn't available - error, because cannot return decompressed data? + lderr(s->cct) << "Cannot load compressor of type " << compression_type + << "for rgw, check rgw_compression_type config option" << dendl; + return -1; + } else { + bufferlist out_bl; + int cr = compressor->decompress(bl, out_bl); + if (cr != 0) { + lderr(s->cct) << "Compression failed with exit code " << cr << dendl; + return -1; + } + return send_response_data(out_bl, bl_ofs, out_bl.length()); + } + } + // end of compression stuff return send_response_data(bl, bl_ofs, bl_len); } @@ -2983,6 +3007,18 @@ void RGWPutObj::execute() hash.Final(m); + if (processor->is_compressed()) { + bufferlist tmp; + tmp.append(s->cct->_conf->rgw_compression_type.c_str(), s->cct->_conf->rgw_compression_type.length()+1); + attrs[RGW_ATTR_COMPRESSION] = tmp; + tmp.clear(); + char sz [20]; + snprintf(sz, sizeof(sz), "%lu", s->obj_size); + tmp.append(sz, strlen(sz)+1); + attrs[RGW_ATTR_COMPRESSION_ORIG_SIZE] = tmp; + } + + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); etag = calc_md5; @@ -3864,6 +3900,8 @@ void RGWGetACLs::execute() acls = ss.str(); } + + int RGWPutACLs::verify_permission() { bool perm; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8744894ac34..fb277cba58d 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2379,20 +2379,20 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type << "for rgw, check rgw_compression_type config option" << dendl; compressed = false; + in_bl.claim(bl); } else { - bufferlist out; - int cr = compressor->compress(bl, out); + int cr = compressor->compress(bl, in_bl); if (cr != 0) { ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl; compressed = false; + in_bl.claim(bl); } else { compressed = true; - in_bl = out; } } } else { compressed = false; - in_bl = bl; + in_bl.claim(bl); } // end of compression stuff @@ -2441,6 +2441,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive); if (ret >= 0) { /* we might return, need to clear bl as it was already sent */ in_bl.clear(); + bl.clear(); } return ret; } @@ -6465,7 +6466,6 @@ int RGWRados::Object::Write::write_meta(uint64_t size, /* update quota cache */ store->quota_handler->update_stats(meta.owner, bucket, (orig_exists ? 0 : 1), size, orig_size); - return 0; done_cancel: @@ -9052,13 +9052,17 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend) return r; } + int dec_size = 0; if (params.attrs) { *params.attrs = astate->attrset; if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) { for (iter = params.attrs->begin(); iter != params.attrs->end(); ++iter) { - ldout(cct, 20) << "Read xattr: " << iter->first << dendl; + ldout(cct, 20) << "Read xattr: " << iter->first << "=" << iter->second << dendl; } } + if (params.attrs->find(RGW_ATTR_COMPRESSION) != params.attrs->end()) { + dec_size = atoi(params.attrs->at(RGW_ATTR_COMPRESSION_ORIG_SIZE).c_str()); + } } /* Convert all times go GMT to make them compatible */ @@ -9135,8 +9139,12 @@ int RGWRados::Object::Read::prepare(int64_t *pofs, int64_t *pend) *pofs = ofs; if (pend) *pend = end; - if (params.read_size) - *params.read_size = (ofs <= end ? end + 1 - ofs : 0); + if (params.read_size) { + if (dec_size) + *params.read_size = dec_size; + else + *params.read_size = (ofs <= end ? end + 1 - ofs : 0); + } if (params.obj_size) *params.obj_size = astate->size; if (params.lastmod) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 617c28ec27a..0c1ace42435 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -3267,6 +3267,7 @@ public: CephContext *ctx(); bool is_canceled() { return canceled; } + bool is_compressed() { return compressed; } }; /* RGWPutObjProcessor */ struct put_obj_aio_info { -- 2.39.5