From: Ved-vampir Date: Tue, 16 Feb 2016 14:57:06 +0000 (+0300) Subject: rgw: add compression to RGWPutObj X-Git-Tag: v11.1.0~429^2~39 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=11f8d97949256e25e83e8682842117115df60c98;p=ceph.git rgw: add compression to RGWPutObj Signed-off-by: Alyona Kiseleva --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 2cf5080d9144..33486b51c61b 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1524,6 +1524,8 @@ OPTION(mon_mgr_beacon_grace, OPT_INT, 30) // How long to wait to failover OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing OPTION(rgw_rest_getusage_op_compat, OPT_BOOL, false) // dump description of total stats for s3 GetUsage API +OPTION(rgw_compression_enabled, OPT_BOOL, true) // to use compression on rgw level +OPTION(rgw_compression_type, OPT_STR, "zlib") // type of compressor OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index b166e08f93a1..17409aaa0642 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2881,6 +2881,10 @@ void RGWPutObj::execute() len = data.length(); } + if (need_calc_md5) { + hash.Update((const byte *)data.c_str(), data.length()); + } + /* save data for producing torrent data */ torrent.save_data(data_in); @@ -2977,9 +2981,6 @@ void RGWPutObj::execute() goto done; } - if (need_calc_md5) { - processor->complete_hash(&hash); - } hash.Final(m); buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4b42cdcbdadf..8744894ac34f 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -70,6 +70,8 @@ using namespace librados; #include "rgw_data_sync.h" #include "rgw_realm_watcher.h" +#include "compressor/Compressor.h" + #define dout_subsys ceph_subsys_rgw using namespace std; @@ -2364,38 +2366,66 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, *again = false; *phandle = NULL; + + bufferlist in_bl; + + // compression stuff + if ((ofs > 0 && compressed) || // if previous part was compressed + (ofs == 0 && store->ctx()->_conf->rgw_compression_enabled)) { // or it's the first part and flag is set + ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part" << dendl; + CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type); + if (!compressor.get()) { + // if compressor isn't available - just do not use it with log warning? + ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type + << "for rgw, check rgw_compression_type config option" << dendl; + compressed = false; + } else { + bufferlist out; + int cr = compressor->compress(bl, out); + if (cr != 0) { + ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl; + compressed = false; + } else { + compressed = true; + in_bl = out; + } + } + } else { + compressed = false; + in_bl = bl; + } + // end of compression stuff + + if (extra_data_len) { - size_t extra_len = bl.length(); + size_t extra_len = in_bl.length(); if (extra_len > extra_data_len) extra_len = extra_data_len; bufferlist extra; - bl.splice(0, extra_len, &extra); + in_bl.splice(0, extra_len, &extra); extra_data_bl.append(extra); extra_data_len -= extra_len; - if (bl.length() == 0) { + if (in_bl.length() == 0) { return 0; } } uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs); - pending_data_bl.claim_append(bl); + pending_data_bl.claim_append(in_bl); if (pending_data_bl.length() < max_write_size) return 0; - pending_data_bl.splice(0, max_write_size, &bl); + pending_data_bl.splice(0, max_write_size, &in_bl); /* do we have enough data pending accumulated that needs to be written? */ *again = (pending_data_bl.length() >= max_chunk_size); if (!data_ofs && !immutable_head()) { - first_chunk.claim(bl); + first_chunk.claim(in_bl); obj_len = (uint64_t)first_chunk.length(); - if (hash) { - hash->Update((const byte *)first_chunk.c_str(), obj_len); - } int r = prepare_next_part(obj_len); if (r < 0) { return r; @@ -2404,16 +2434,13 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, return 0; } off_t write_ofs = data_ofs; - data_ofs = write_ofs + bl.length(); + data_ofs = write_ofs + in_bl.length(); bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there we could be racing with another upload, to the same object and cleanup can be messy */ - int ret = write_data(bl, write_ofs, phandle, pobj, exclusive); + int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive); if (ret >= 0) { /* we might return, need to clear bl as it was already sent */ - if (hash) { - hash->Update((const byte *)bl.c_str(), bl.length()); - } - bl.clear(); + in_bl.clear(); } return ret; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 463118c63fe5..617c28ec27ae 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -3240,6 +3240,7 @@ protected: RGWRados *store; RGWObjectCtx& obj_ctx; bool is_complete; + bool compressed; RGWBucketInfo bucket_info; bool canceled; @@ -3248,7 +3249,7 @@ protected: const char *if_match = NULL, const char *if_nomatch = NULL) = 0; public: - RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi), canceled(false) {} + RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), compressed(false), bucket_info(_bi), canceled(false) {} virtual ~RGWPutObjProcessor() {} virtual int prepare(RGWRados *_store, string *oid_rand) { store = _store; diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 860aed7ded08..b02fde2b896c 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -23,6 +23,8 @@ #include "rgw_client_io.h" #include "rgw_resolve.h" +#include "compressor/Compressor.h" + #include #define dout_subsys ceph_subsys_rgw