rgw_cache.cc
rgw_client_io.cc
rgw_common.cc
+ rgw_compression.cc
rgw_cors.cc
rgw_cors_s3.cc
rgw_dencoder.cc
--- /dev/null
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_compression.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+//------------RGWPutObj_Compress---------------
+
+int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
+{
+ bufferlist in_bl;
+
+ if (bl.length() > 0) {
+ // compression stuff
+ if ((ofs > 0 && compressed) || // if previous part was compressed
+ (ofs == 0)) { // or it's the first part
+ ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
+ CompressorRef compressor = Compressor::create(cct, cct->_conf->rgw_compression_type);
+ if (!compressor.get()) {
+ if (ofs > 0 && compressed) {
+ lderr(cct) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type
+ << " for next part, compression process failed" << dendl;
+ return -EIO;
+ }
+ // if compressor isn't available - just do not use it with log warning?
+ ldout(cct, 5) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type
+ << " for rgw, check rgw_compression_type config option" << dendl;
+ compressed = false;
+ in_bl.claim(bl);
+ } else {
+ int cr = compressor->compress(bl, in_bl);
+ if (cr < 0) {
+ if (ofs > 0 && compressed) {
+ lderr(cct) << "Compression failed with exit code " << cr
+ << " for next part, compression process failed" << dendl;
+ return -EIO;
+ }
+ ldout(cct, 5) << "Compression failed with exit code " << cr << dendl;
+ compressed = false;
+ in_bl.claim(bl);
+ } else {
+ compressed = true;
+
+ compression_block newbl;
+ int bs = blocks.size();
+ newbl.old_ofs = ofs;
+ newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
+ newbl.len = in_bl.length();
+ blocks.push_back(newbl);
+ }
+ }
+ } else {
+ compressed = false;
+ in_bl.claim(bl);
+ }
+ // end of compression stuff
+ }
+
+ return next->handle_data(in_bl, ofs, phandle, pobj, again);
+}
+
+//----------------RGWGetObj_Decompress---------------------
+RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
+ RGWCompressionInfo* cs_info_,
+ bool partial_content_,
+ RGWGetDataCB* next): RGWGetObj_Filter(next),
+ cct(cct_),
+ cs_info(cs_info_),
+ partial_content(partial_content_),
+ q_ofs(0),
+ q_len(0),
+ first_data(true),
+ cur_ofs(0)
+{
+ compressor = Compressor::create(cct, cs_info->compression_type);
+ if (!compressor.get())
+ lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type
+ << " for rgw, check rgw_compression_type config option" << dendl;
+}
+
+int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+ ldout(cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
+
+ if (!compressor.get()) {
+ // if compressor isn't available - error, because cannot return decompressed data?
+ lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type
+ << " for rgw, check rgw_compression_type config option" << dendl;
+ return -EIO;
+ }
+ bufferlist out_bl, in_bl;
+ bl_ofs = 0;
+ if (waiting.length() != 0) {
+ in_bl.append(waiting);
+ in_bl.append(bl);
+ waiting.clear();
+ } else {
+ in_bl.claim(bl);
+ }
+ bl_len = in_bl.length();
+
+ while (first_block <= last_block) {
+ bufferlist tmp, tmp_out;
+ int ofs_in_bl = first_block->new_ofs - cur_ofs;
+ if (ofs_in_bl + (unsigned)first_block->len > bl_len) {
+ // not complete block, put it to waiting
+ int tail = bl_len - ofs_in_bl;
+ in_bl.copy(ofs_in_bl, tail, waiting);
+ cur_ofs -= tail;
+ break;
+ }
+ in_bl.copy(ofs_in_bl, first_block->len, tmp);
+ int cr = compressor->decompress(tmp, tmp_out);
+ if (cr < 0) {
+ lderr(cct) << "Compression failed with exit code " << cr << dendl;
+ return cr;
+ }
+ if (first_block == last_block && partial_content)
+ tmp_out.copy(0, q_len, out_bl);
+ else
+ out_bl.append(tmp_out);
+ first_block++;
+ }
+
+ if (first_data && partial_content && out_bl.length() != 0)
+ bl_ofs = q_ofs;
+
+ if (first_data && out_bl.length() != 0)
+ first_data = false;
+
+ cur_ofs += bl_len;
+ return next->handle_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
+}
+
+void RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
+{
+ if (partial_content) {
+ // if user set range, we need to calculate it in decompressed data
+ first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
+ if (cs_info->blocks.size() > 1) {
+ vector<compression_block>::iterator fb, lb;
+ // not bad to use auto for lambda, I think
+ auto cmp_u = [] (off_t ofs, const compression_block& e) { return (unsigned)ofs < e.old_ofs; };
+ auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs < (unsigned)ofs; };
+ fb = upper_bound(cs_info->blocks.begin()+1,
+ cs_info->blocks.end(),
+ ofs,
+ cmp_u);
+ first_block = fb - 1;
+ lb = lower_bound(fb,
+ cs_info->blocks.end(),
+ end,
+ cmp_l);
+ last_block = lb - 1;
+ }
+ } else {
+ first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
+ }
+
+ q_ofs = ofs - first_block->old_ofs;
+ q_len = end - last_block->old_ofs + 1;
+
+ ofs = first_block->new_ofs;
+ end = last_block->new_ofs + last_block->len;
+
+ first_data = true;
+ cur_ofs = ofs;
+ waiting.clear();
+
+ next->fixup_range(ofs, end);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RGW_COMPRESSION_H
+#define CEPH_RGW_COMPRESSION_H
+
+#include <vector>
+
+#include "compressor/Compressor.h"
+#include "rgw_op.h"
+
+class RGWGetObj_Decompress : public RGWGetObj_Filter
+{
+ CephContext* cct;
+ CompressorRef compressor;
+ RGWCompressionInfo* cs_info;
+ bool partial_content;
+ vector<compression_block>::iterator first_block, last_block;
+ off_t q_ofs, q_len;
+ bool first_data;
+ uint64_t cur_ofs;
+ bufferlist waiting;
+public:
+ RGWGetObj_Decompress(CephContext* cct_,
+ RGWCompressionInfo* cs_info_,
+ bool partial_content_,
+ RGWGetDataCB* next);
+ virtual ~RGWGetObj_Decompress() {}
+
+ virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+ virtual void fixup_range(off_t& ofs, off_t& end) override;
+
+};
+
+class RGWPutObj_Compress : public RGWPutObj_Filter
+{
+ CephContext* cct;
+ bool compressed;
+ std::vector<compression_block> blocks;
+public:
+ RGWPutObj_Compress(CephContext* cct_, RGWPutObjDataProcessor* next) : RGWPutObj_Filter(next),
+ cct(cct_) {}
+ virtual ~RGWPutObj_Compress(){}
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override;
+
+ bool is_compressed() { return compressed; }
+ vector<compression_block>& get_compression_blocks() { return blocks; }
+
+}; /* RGWPutObj_Compress */
+
+#endif /* CEPH_RGW_COMPRESSION_H */
#include <sstream>
#include <boost/algorithm/string/predicate.hpp>
+#include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
#include "common/Clock.h"
#include "common/armor.h"
#include "rgw_cors_s3.h"
#include "rgw_rest_conn.h"
#include "rgw_rest_s3.h"
-#include "rgw_lc.h"
-#include "rgw_lc_s3.h"
#include "rgw_client_io.h"
+#include "rgw_compression.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/rgw/cls_rgw_client.h"
return true;
}
+int RGWOp::update_compressed_bucket_size(uint64_t obj_size, RGWBucketCompressionInfo& bucket_size)
+{
+ bucket_size.orig_size += obj_size;
+ bufferlist bs;
+ ::encode(bucket_size, bs);
+ s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
+ return store->put_bucket_instance_info(s->bucket_info, false, real_time(),
+ &s->bucket_attrs);
+}
+
int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
const RGWObjEnt& ent,
RGWAccessControlPolicy * const bucket_policy,
gc_invalidate_time = start_time;
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
}
- // compression stuff
- if (need_decompress) {
- ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
-
- CompressorRef compressor = Compressor::create(s->cct, cs_info.compression_type);
- if (!compressor.get()) {
- // if compressor isn't available - error, because cannot return decompressed data?
- lderr(s->cct) << "Cannot load compressor of type " << cs_info.compression_type
- << "for rgw, check rgw_compression_type config option" << dendl;
- return -EIO;
- } else {
- bufferlist out_bl, in_bl;
- bl_ofs = 0;
- if (waiting.length() != 0) {
- in_bl.append(waiting);
- in_bl.append(bl);
- waiting.clear();
- } else {
- in_bl.claim(bl);
- }
- bl_len = in_bl.length();
-
- while (first_block <= last_block) {
- bufferlist tmp, tmp_out;
- int ofs_in_bl = cs_info.blocks[first_block].new_ofs - cur_ofs;
- if (ofs_in_bl + cs_info.blocks[first_block].len > bl_len) {
- // not complete block, put it to waiting
- int tail = bl_len - ofs_in_bl;
- in_bl.copy(ofs_in_bl, tail, waiting);
- cur_ofs -= tail;
- break;
- }
- in_bl.copy(ofs_in_bl, cs_info.blocks[first_block].len, tmp);
- int cr = compressor->decompress(tmp, tmp_out);
- if (cr < 0) {
- lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
- return cr;
- }
- if (first_block == last_block && partial_content)
- out_bl.append(tmp_out.c_str(), q_len);
- else
- out_bl.append(tmp_out);
- first_block++;
- }
-
- if (first_data && partial_content && out_bl.length() != 0)
- bl_ofs = q_ofs;
-
- if (first_data && out_bl.length() != 0)
- first_data = false;
-
- cur_ofs += bl_len;
- return send_response_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
- }
- }
- // end of compression stuff
return send_response_data(bl, bl_ofs, bl_len);
}
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
RGWGetObj_CB cb(this);
- RGWGetDataCB* decrypt = nullptr;
+ RGWGetDataCB* filter = (RGWGetDataCB*)&cb;
+ boost::optional<RGWGetObj_Decompress> decompress;
map<string, bufferlist>::iterator attr_iter;
perfcounter->inc(l_rgw_get);
end = cs_info.orig_size - 1;
}
- if (ofs >= cs_info.orig_size) {
+ if ((unsigned)ofs >= cs_info.orig_size) {
lderr(s->cct) << "ERROR: begin of the bytes range more than object size (" << cs_info.orig_size
<< ")" << dendl;
op_ret = -ERANGE;
} else
new_ofs = ofs;
- if (end >= cs_info.orig_size) {
+ if ((unsigned)end >= cs_info.orig_size) {
ldout(s->cct, 5) << "WARNING: end of the bytes range more than object size (" << cs_info.orig_size
<< ")" << dendl;
new_end = cs_info.orig_size - 1;
goto done_err;
}
- if (partial_content) {
-
- // if user set range, we need to calculate it in decompressed data
- first_block = 0; last_block = 0;
- if (cs_info.blocks.size() > 1) {
- off_t i = 1;
- while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs <= new_ofs) i++;
- first_block = i - 1;
- while (i < cs_info.blocks.size() && cs_info.blocks[i].old_ofs < new_end) i++;
- last_block = i - 1;
- }
- } else {
- first_block = 0; last_block = cs_info.blocks.size() - 1;
- }
-
- q_ofs = new_ofs - cs_info.blocks[first_block].old_ofs;
- q_len = new_end - cs_info.blocks[last_block].old_ofs + 1;
-
- new_ofs = cs_info.blocks[first_block].new_ofs;
- new_end = cs_info.blocks[last_block].new_ofs + cs_info.blocks[last_block].len;
-
- first_data = true;
- cur_ofs = new_ofs;
- waiting.clear();
+ decompress = boost::in_place(s->cct, &cs_info, partial_content, filter);
+ filter = &*decompress;
}
/* STAT ops don't need data, and do no i/o */
perfcounter->inc(l_rgw_get_b, new_end - new_ofs);
- op_ret = this->get_decrypt_filter(&decrypt, cb);
- if (op_ret < 0) {
- goto done_err;
- }
- if (decrypt != nullptr) {
- off_t tmp_ofs = ofs;
- off_t tmp_end = end;
- decrypt->fixup_range(tmp_ofs, tmp_end);
- op_ret = read_op.iterate(tmp_ofs, tmp_end, decrypt);
- if (op_ret >= 0)
- op_ret = decrypt->flush();
- delete decrypt;
- }
- else
- op_ret = read_op.iterate(ofs, end, &cb);
+ filter->fixup_range(new_ofs, new_end);
+ op_ret = read_op.iterate(new_ofs, new_end, filter);
+ if (op_ret >= 0)
+ op_ret = filter->flush();
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
op_ret = -EINVAL;
}
}
- if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) {
- ::decode(bucket.size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
- }
+ RGWBucketCompressionInfo bcs;
+ int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bcs);
+ if (!res)
+ bucket.size = bcs.orig_size;
+ if (res < 0)
+ lderr(s->cct) << "Failed to read decompressed bucket size" << dendl;
}
int RGWListBucket::verify_permission()
info.size = s->obj_size;
info.modified = real_clock::now();
info.manifest = manifest;
- if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
- bool tmp;
- RGWCompressionInfo cs_info;
- rgw_compression_info_from_attrset(attrs, tmp, cs_info);
- info.cs_info = cs_info;
+
+ bool compressed;
+ r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
+ if (r < 0) {
+ dout(1) << "cannot get compression info" << dendl;
+ return r;
}
+
::encode(info, bl);
string multipart_meta_obj = mp.get_meta();
return processor;
}
-void RGWPutObj::dispose_processor(RGWPutObjProcessor *processor)
+void RGWPutObj::dispose_processor(RGWPutObjDataProcessor *processor)
{
delete processor;
}
void RGWPutObj::execute()
{
RGWPutObjProcessor *processor = NULL;
+ RGWPutObjDataProcessor *filter = NULL;
char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
off_t fst;
off_t lst;
uint64_t bucket_size = 0;
+ RGWBucketCompressionInfo bucket_size;
+ int res;
+ bool compression_enabled;
+ boost::optional<RGWPutObj_Compress> compressor;
bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
+ // no filters by default
+ filter = processor;
+
/* Handle object versioning of Swift API. */
if (! multipart) {
rgw_obj obj(s->bucket, s->object);
fst = copy_source_range_fst;
lst = copy_source_range_lst;
- processor->compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+ compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+ if (compression_enabled) {
+ compressor = boost::in_place(s->cct, filter);
+ filter = &*compressor;
+ }
do {
bufferlist data_in;
orig_data = data;
}
- op_ret = put_data_and_throttle(processor, data, ofs, need_to_wait);
+ op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
if (op_ret < 0) {
if (!need_to_wait || op_ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret="
dispose_processor(processor);
processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
+ filter = processor;
+
string oid_rand;
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
goto done;
}
- op_ret = put_data_and_throttle(processor, data, ofs, false);
+ if (compression_enabled) {
+ compressor = boost::in_place(s->cct, filter);
+ filter = &*compressor;
+ }
+
+ op_ret = put_data_and_throttle(filter, data, ofs, false);
if (op_ret < 0) {
goto done;
}
hash.Final(m);
- if (processor->is_compressed()) {
+ if (compression_enabled && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = s->cct->_conf->rgw_compression_type;
cs_info.orig_size = s->obj_size;
- cs_info.blocks = processor->get_compression_blocks();
+ cs_info.blocks = move(compressor->get_compression_blocks());
::encode(cs_info, tmp);
attrs[RGW_ATTR_COMPRESSION] = tmp;
}
- // add attr to bucket to know original size of data
- if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end())
- ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
- bucket_size += s->obj_size;
- ::encode(bucket_size, bs);
- s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
- op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
- &s->bucket_attrs);
+ // add attr to bucket to know original size of data
+ res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size);
+ if (res < 0)
+ lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl;
+ op_ret = update_compressed_bucket_size(s->obj_size, bucket_size);
if (op_ret < 0)
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
<< " returned err=" << op_ret << dendl;
return 0;
}
+RGWPutObjProcessor *RGWPostObj::select_processor(RGWObjectCtx& obj_ctx)
+{
+ RGWPutObjProcessor *processor;
+
+ uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
+
+ processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
+
+ return processor;
+}
+
+void RGWPostObj::dispose_processor(RGWPutObjDataProcessor *processor)
+{
+ delete processor;
+}
+
void RGWPostObj::pre_exec()
{
rgw_bucket_object_pre_exec(s);
void RGWPostObj::execute()
{
+ RGWPutObjDataProcessor *filter = NULL;
char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
MD5 hash;
buffer::list bl, aclbl;
int len = 0;
+ bool compression_enabled;
+ boost::optional<RGWPutObj_Compress> compressor;
// read in the data from the POST form
op_ret = get_params();
s->req_id,
s->bucket_info.versioning_enabled());
+ // no filters by default
+ filter = &processor;
+
op_ret = processor.prepare(store, nullptr);
- if (op_ret < 0) {
+ if (op_ret < 0)
return;
- }
- processor.compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+ compression_enabled = s->cct->_conf->rgw_compression_type != "none";
+ if (compression_enabled) {
+ compressor = boost::in_place(s->cct, filter);
+ filter = &*compressor;
+ }
while (data_pending) {
bufferlist data;
break;
hash.Update((const byte *)data.c_str(), data.length());
- op_ret = put_data_and_throttle(&processor, data, ofs, &hash, false);
+ op_ret = put_data_and_throttle(filter, data, ofs, false);
ofs += len;
emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
}
- if (processor.is_compressed()) {
+ if (compression_enabled && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = s->cct->_conf->rgw_compression_type;
cs_info.orig_size = s->obj_size;
- cs_info.blocks = processor.get_compression_blocks();
+ cs_info.blocks = move(compressor->get_compression_blocks());
::encode(cs_info, tmp);
emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
}
if (op_ret >= 0) {
delete_marker = del_op.result.delete_marker;
version_id = del_op.result.version_id;
- if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) {
- uint64_t bucket_size, deleted_size;
- ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
- if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
- bool tmp;
- RGWCompressionInfo cs_info;
- rgw_compression_info_from_attrset(attrs, tmp, cs_info);
- deleted_size = cs_info.orig_size;
- } else
+ RGWBucketCompressionInfo bucket_size;
+ int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size);
+ if (res < 0)
+ lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl;
+ if (!res) {
+ uint64_t deleted_size;
+ bool tmp;
+ RGWCompressionInfo cs_info;
+ res = rgw_compression_info_from_attrset(attrs, tmp, cs_info);
+ if (res < 0) {
+ lderr(s->cct) << "ERROR: failed to decode compression info, cannot update stat" << dendl;
deleted_size = s->obj_size;
- bucket_size -= deleted_size;
- bufferlist bs;
- ::encode(bucket_size, bs);
- s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
- op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
- &s->bucket_attrs);
+ } else {
+ if (tmp)
+ deleted_size = cs_info.orig_size;
+ else
+ deleted_size = s->obj_size;
+ }
+ if (update_compressed_bucket_size(-deleted_size, bucket_size) < 0)
+ lderr(s->cct) << "ERROR: failed to update bucket stat" << dendl;
}
}
}
int new_ofs; // offset in compression data for new part
if (cs_info.blocks.size() > 0)
- new_ofs = cs_info.blocks[cs_info.blocks.size() - 1].new_ofs + cs_info.blocks[cs_info.blocks.size() - 1].len;
+ new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
else
new_ofs = 0;
- for (off_t i=0; i < obj_part.cs_info.blocks.size(); ++i) {
+ for (const auto& block : obj_part.cs_info.blocks) {
compression_block cb;
- cb.old_ofs = obj_part.cs_info.blocks[i].old_ofs + cs_info.orig_size;
+ cb.old_ofs = block.old_ofs + cs_info.orig_size;
cb.new_ofs = new_ofs;
- cb.len = obj_part.cs_info.blocks[i].len;
+ cb.len = block.len;
cs_info.blocks.push_back(cb);
new_ofs = cb.new_ofs + cb.len;
}
}
map<string, bufferlist> attrset;
int y = get_obj_attrs(store, s, obj, attrset);
- if (!y && attrset.find(RGW_ATTR_COMPRESSION) != attrset.end()) {
+ map<string, bufferlist>::iterator cmp = attrset.find(RGW_ATTR_COMPRESSION);
+ if (!y && cmp != attrset.end()) {
RGWCompressionInfo cs_info;
- bufferlist::iterator bliter = attrset[RGW_ATTR_COMPRESSION].begin();
+ bufferlist::iterator bliter = cmp->second.begin();
try {
::decode(cs_info, bliter);
+ if (cs_info.compression_type != "none")
+ deleted_size += cs_info.orig_size;
+ else
+ deleted_size += obj_part.size;
} catch (buffer::error& err) {
ldout(s->cct, 5) << "Failed to get decompressed obj size" << dendl;
- }
- if (cs_info.compression_type != "none")
- deleted_size += cs_info.orig_size;
- else
deleted_size += obj_part.size;
+ }
} else
deleted_size += obj_part.size;
}
}
if (!op_ret) {
- if (s->bucket_attrs.find(RGW_ATTR_COMPRESSION) != s->bucket_attrs.end()) {
- uint64_t bucket_size;
- ::decode(bucket_size, s->bucket_attrs[RGW_ATTR_COMPRESSION]);
- bucket_size -= deleted_size;
- bufferlist bs;
- ::encode(bucket_size, bs);
- s->bucket_attrs[RGW_ATTR_COMPRESSION] = bs;
- op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
- &s->bucket_attrs);
+ RGWBucketCompressionInfo bucket_size;
+ int res = rgw_bucket_compression_info_from_attrset(s->bucket_attrs, bucket_size);
+ if (res < 0)
+ lderr(s->cct) << "ERROR: failed to read decompressed bucket size, cannot update stat" << dendl;
+ if (!res) {
+ if (update_compressed_bucket_size(-deleted_size, bucket_size) < 0)
+ lderr(s->cct) << "ERROR: failed to update bucket stat" << dendl;
}
}
#include <map>
#include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
#include "common/armor.h"
#include "common/mime.h"
}
int read_bucket_cors();
bool generate_cors_headers(string& origin, string& method, string& headers, string& exp_headers, unsigned *max_age);
+ // obj_size can be positive or negative
+ int update_compressed_bucket_size(uint64_t obj_size, RGWBucketCompressionInfo& bucket_size);
virtual int verify_params() { return 0; }
virtual bool prefetch_data() { return false; }
virtual RGWOpType get_type() { return RGW_OP_GET_OBJ; }
virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; }
virtual bool need_object_expiration() { return false; }
- /**
- * calculates filter used to decrypt RGW objects data
- */
- virtual int get_decrypt_filter(RGWGetDataCB** filter, RGWGetDataCB& cb) {
- *filter = NULL;
- return 0;
- }
};
class RGWGetObj_CB : public RGWGetDataCB
class RGWGetObj_Filter : public RGWGetDataCB
{
protected:
- RGWGetDataCB& next;
+ RGWGetDataCB* next;
public:
- RGWGetObj_Filter(RGWGetDataCB& next): next(next) {}
+ RGWGetObj_Filter(RGWGetDataCB* next): next(next) {}
virtual ~RGWGetObj_Filter() {}
/**
* Passes data through filter.
* Filter can modify content of bl.
* When bl_len == 0 , it means 'flush
*/
- virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
- return next.handle_data(bl, bl_ofs, bl_len);
+ virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+ return next->handle_data(bl, bl_ofs, bl_len);
}
/**
* Flushes any cached data. Used by RGWGetObjFilter.
* Return logic same as handle_data.
*/
- virtual int flush() {
- return next.flush();
+ virtual int flush() override {
+ return next->flush();
}
/**
* Allows filter to extend range required for successful filtering
*/
- virtual void fixup_range(off_t& bl_ofs, off_t& bl_len) {
- next.fixup_range(bl_ofs, bl_len);
+ virtual void fixup_range(off_t& ofs, off_t& end) override {
+ next->fixup_range(ofs, end);
}
};
}
virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart);
- void dispose_processor(RGWPutObjProcessor *processor);
+ void dispose_processor(RGWPutObjDataProcessor *processor);
int verify_permission();
void pre_exec();
class RGWPutObj_Filter : public RGWPutObjDataProcessor
{
protected:
- RGWPutObjDataProcessor& next;
+ RGWPutObjDataProcessor* next;
public:
- RGWPutObj_Filter(RGWPutObjDataProcessor& next) :
+ RGWPutObj_Filter(RGWPutObjDataProcessor* next) :
next(next){}
- virtual ~RGWPutObj_Filter(){}
- virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) {
- return next.handle_data(bl, ofs, phandle, again);
+ virtual ~RGWPutObj_Filter() {}
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override {
+ return next->handle_data(bl, ofs, phandle, pobj, again);
}
- virtual int throttle_data(void *handle, bool need_to_wait) {
- return next.throttle_data(handle, need_to_wait);
+ virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override {
+ return next->throttle_data(handle, obj, need_to_wait);
}
}; /* RGWPutObj_Filter */
#include <boost/format.hpp>
#include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
#include "common/ceph_json.h"
#include "common/utf8.h"
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
{
*again = false;
*phandle = NULL;
- bufferlist in_bl;
-
- // compression stuff
- if ((ofs > 0 && compressed) || // if previous part was compressed
- (ofs == 0 && compression_enabled)) { // or it's the first part and flag is set
- ldout(store->ctx(), 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
- CompressorRef compressor = Compressor::create(store->ctx(), store->ctx()->_conf->rgw_compression_type);
- if (!compressor.get()) {
- if (ofs > 0 && compressed) {
- lderr(store->ctx()) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
- << " for next part, compression process failed" << dendl;
- return -EIO;
- }
- // if compressor isn't available - just do not use it with log warning?
- ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
- << "for rgw, check rgw_compression_type config option" << dendl;
- compressed = false;
- in_bl.claim(bl);
- } else {
- int cr = compressor->compress(bl, in_bl);
- if (cr < 0) {
- if (ofs > 0 && compressed) {
- lderr(store->ctx()) << "Compression failed with exit code " << cr
- << " for next part, compression process failed" << dendl;
- return -EIO;
- }
- ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
- compressed = false;
- in_bl.claim(bl);
- } else {
- compressed = true;
-
- compression_block newbl;
- int bs = blocks.size();
- newbl.old_ofs = ofs;
- newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
- newbl.len = in_bl.length();
- blocks.push_back(newbl);
- }
- }
- } else {
- compressed = false;
- in_bl.claim(bl);
- }
- // end of compression stuff
-
if (extra_data_len) {
- size_t extra_len = in_bl.length();
+ size_t extra_len = bl.length();
if (extra_len > extra_data_len)
extra_len = extra_data_len;
bufferlist extra;
- in_bl.splice(0, extra_len, &extra);
+ bl.splice(0, extra_len, &extra);
extra_data_bl.append(extra);
extra_data_len -= extra_len;
- if (in_bl.length() == 0) {
+ if (bl.length() == 0) {
return 0;
}
}
uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
- pending_data_bl.claim_append(in_bl);
+ pending_data_bl.claim_append(bl);
if (pending_data_bl.length() < max_write_size)
return 0;
- pending_data_bl.splice(0, max_write_size, &in_bl);
+ pending_data_bl.splice(0, max_write_size, &bl);
/* do we have enough data pending accumulated that needs to be written? */
*again = (pending_data_bl.length() >= max_chunk_size);
if (!data_ofs && !immutable_head()) {
- first_chunk.claim(in_bl);
+ first_chunk.claim(bl);
obj_len = (uint64_t)first_chunk.length();
int r = prepare_next_part(obj_len);
if (r < 0) {
return 0;
}
off_t write_ofs = data_ofs;
- data_ofs = write_ofs + in_bl.length();
+ data_ofs = write_ofs + bl.length();
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
- int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive);
+ int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
- in_bl.clear();
bl.clear();
}
return ret;
bufferlist attr;
map<string, bufferlist> attrset;
int y = store->raw_obj_stat(cs_obj, NULL, NULL, NULL, &attrset, NULL, NULL);
- if (!y && attrset.find(RGW_ATTR_COMPRESSION) != attrset.end()) {
+ map<string, bufferlist>::iterator cmp = attrset.find(RGW_ATTR_COMPRESSION);
+ if (!y && cmp != attrset.end()) {
RGWCompressionInfo cs_info;
- bufferlist::iterator bliter = attrset[RGW_ATTR_COMPRESSION].begin();
+ bufferlist::iterator bliter = cmp->second.begin();
try {
::decode(cs_info, bliter);
} catch (buffer::error& err) {
attrs.erase(RGW_ATTR_ID_TAG);
attrs.erase(RGW_ATTR_PG_VER);
attrs.erase(RGW_ATTR_SOURCE_ZONE);
- if (src_attrs.find(RGW_ATTR_COMPRESSION) != src_attrs.end())
- attrs[RGW_ATTR_COMPRESSION] = src_attrs[RGW_ATTR_COMPRESSION];
+ map<string, bufferlist>::iterator cmp = src_attrs.find(RGW_ATTR_COMPRESSION);
+ if (cmp != src_attrs.end())
+ attrs[RGW_ATTR_COMPRESSION] = cmp->second;
RGWObjManifest manifest;
RGWObjState *astate = NULL;
}
int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need_decompress, RGWCompressionInfo& cs_info) {
- if (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end()) {
- bufferlist::iterator bliter = attrs[RGW_ATTR_COMPRESSION].begin();
+ map<string, bufferlist>::iterator value = attrs.find(RGW_ATTR_COMPRESSION);
+ if (value != attrs.end()) {
+ bufferlist::iterator bliter = value->second.begin();
try {
::decode(cs_info, bliter);
} catch (buffer::error& err) {
}
}
-
+int rgw_bucket_compression_info_from_attrset(map<string, bufferlist>& attrs, RGWBucketCompressionInfo& cs_info)
+{
+ map<string, bufferlist>::iterator cmp = attrs.find(RGW_ATTR_COMPRESSION);
+ if (cmp != attrs.end()) {
+ try {
+ ::decode(cs_info, cmp->second);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ } else {
+ return 1;
+ }
+ return 0;
+}
vector<compression_block> blocks;
RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
+ RGWCompressionInfo(const RGWCompressionInfo& cs_info) : compression_type(cs_info.compression_type),
+ orig_size(cs_info.orig_size),
+ blocks(cs_info.blocks) {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
};
WRITE_CLASS_ENCODER(RGWCompressionInfo)
+struct RGWBucketCompressionInfo {
+ uint64_t orig_size;
+
+ RGWBucketCompressionInfo() : orig_size(0) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(orig_size, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(orig_size, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(RGWBucketCompressionInfo)
+
int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need_decompress, RGWCompressionInfo& cs_info);
+// return 0, if find and read; -EIO, if find and not read; 1 if not find
+int rgw_bucket_compression_info_from_attrset(map<string, bufferlist>& attrs, RGWBucketCompressionInfo& cs_info);
struct RGWOLHInfo {
rgw_obj target;
RGWRados *store;
RGWObjectCtx& obj_ctx;
bool is_complete;
- bool compressed;
RGWBucketInfo bucket_info;
bool canceled;
- vector<compression_block> blocks;
virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL),
obj_ctx(_obj_ctx),
is_complete(false),
- compressed(false),
bucket_info(_bi),
- canceled(false),
- compression_enabled(false) {}
+ canceled(false) {}
virtual ~RGWPutObjProcessor() {}
virtual int prepare(RGWRados *_store, string *oid_rand) {
store = _store;
return 0;
}
- //virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again);
- //virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
virtual int complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
const char *if_match = NULL, const char *if_nomatch = NULL);
- bool compression_enabled;
CephContext *ctx();
bool is_canceled() { return canceled; }
- bool is_compressed() { return compressed; }
- const vector<compression_block>& get_compression_blocks() { return blocks; }
}; /* RGWPutObjProcessor */
struct put_obj_aio_info {
const off_t bl_ofs,
const off_t bl_len)
{
- string content_type; int rr;
+ string content_type;
if (sent_header) {
goto send_data;
-Subproject commit c02b179490123a3212b0c0d23b69da13965d1552
+Subproject commit 9322c258084c6abdeefe00067f8b310a6e0d9a5a
CEPH_OUT_DIR=$VSTART_DEST/out
fi
-mkdir -p .libs/compressor
-for f in `ls -d compressor/*/`;
-do
- cp .libs/libceph_`basename $f`.so* .libs/compressor/;
-done
-
# for running out of the CMake build directory
if [ -e CMakeCache.txt ]; then
# Out of tree build, learn source location from CMakeCache.txt
-Subproject commit 44a6297b298e59ab7452defe859f21ed8371aa1c
+Subproject commit 1f40c6511fa8dd9d2e337ca8c9bc18b3e87663c9