OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing
OPTION(rgw_rest_getusage_op_compat, OPT_BOOL, false) // dump description of total stats for s3 GetUsage API
+OPTION(rgw_compression_enabled, OPT_BOOL, true) // to use compression on rgw level
+OPTION(rgw_compression_type, OPT_STR, "zlib") // type of compressor
OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter
#include "rgw_data_sync.h"
#include "rgw_realm_watcher.h"
+#include "compressor/Compressor.h"
+
#define dout_subsys ceph_subsys_rgw
using namespace std;
*again = false;
*phandle = NULL;
+
+ bufferlist in_bl;
+
+ // compression stuff
+ if ((ofs > 0 && compressed) || // if previous part was compressed
+ (ofs == 0 && store->ctx()->_conf->rgw_compression_enabled)) { // or it's the first part and flag is set
+ ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part" << dendl;
+ CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type);
+ if (!compressor.get()) {
+ // if compressor isn't available - just do not use it with log warning?
+ ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
+ << "for rgw, check rgw_compression_type config option" << dendl;
+ compressed = false;
+ } else {
+ bufferlist out;
+ int cr = compressor->compress(bl, out);
+ if (cr != 0) {
+ ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
+ compressed = false;
+ } else {
+ compressed = true;
+ in_bl = out;
+ }
+ }
+ } else {
+ compressed = false;
+ in_bl = bl;
+ }
+ // end of compression stuff
+
+
if (extra_data_len) {
- size_t extra_len = bl.length();
+ size_t extra_len = in_bl.length();
if (extra_len > extra_data_len)
extra_len = extra_data_len;
bufferlist extra;
- bl.splice(0, extra_len, &extra);
+ in_bl.splice(0, extra_len, &extra);
extra_data_bl.append(extra);
extra_data_len -= extra_len;
- if (bl.length() == 0) {
+ if (in_bl.length() == 0) {
return 0;
}
}
uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
- pending_data_bl.claim_append(bl);
+ pending_data_bl.claim_append(in_bl);
if (pending_data_bl.length() < max_write_size)
return 0;
- pending_data_bl.splice(0, max_write_size, &bl);
+ pending_data_bl.splice(0, max_write_size, &in_bl);
/* do we have enough data pending accumulated that needs to be written? */
*again = (pending_data_bl.length() >= max_chunk_size);
if (!data_ofs && !immutable_head()) {
- first_chunk.claim(bl);
+ first_chunk.claim(in_bl);
obj_len = (uint64_t)first_chunk.length();
- if (hash) {
- hash->Update((const byte *)first_chunk.c_str(), obj_len);
- }
int r = prepare_next_part(obj_len);
if (r < 0) {
return r;
return 0;
}
off_t write_ofs = data_ofs;
- data_ofs = write_ofs + bl.length();
+ data_ofs = write_ofs + in_bl.length();
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
- int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
+ int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive);
if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
- if (hash) {
- hash->Update((const byte *)bl.c_str(), bl.length());
- }
- bl.clear();
+ in_bl.clear();
}
return ret;
}
RGWRados *store;
RGWObjectCtx& obj_ctx;
bool is_complete;
+ bool compressed;
RGWBucketInfo bucket_info;
bool canceled;
const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
public:
- RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi), canceled(false) {}
+ RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), compressed(false), bucket_info(_bi), canceled(false) {}
virtual ~RGWPutObjProcessor() {}
virtual int prepare(RGWRados *_store, string *oid_rand) {
store = _store;