processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
&multipart);
op_ret = processor->prepare(get_store(), NULL);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
+ << dendl;
+ goto done;
+ }
+
+ filter = processor;
+
+ if (compression_type == "none")
+ compression_type = get_store()->get_zone_params().get_compression_type(
+ s->bucket_info.placement_rule);
+ if (compression_type != "none") {
+ plugin = Compressor::create(s->cct, compression_type);
+ if (! plugin) {
+ ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(s->cct, plugin, filter);
+ filter = &*compressor;
+ }
+ }
done:
return op_ret;
orig_data = data;
}
hash.Update((const byte *)data.c_str(), data.length());
- op_ret = put_data_and_throttle(processor, data, ofs,
- need_to_wait);
+ op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
if (op_ret < 0) {
if (!need_to_wait || op_ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret="
dispose_processor(processor);
processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
&multipart);
+ filter = processor;
string oid_rand;
char buf[33];
goto done;
}
+ /* restore compression filter, if any */
+ if (compressor) {
+ compressor.emplace(s->cct, plugin, filter);
+ filter = &*compressor;
+ }
+
op_ret = put_data_and_throttle(processor, data, ofs, false);
if (op_ret < 0) {
goto done;
hash.Final(m);
+ if (compressor && compressor->is_compressed()) {
+ bufferlist tmp;
+ RGWCompressionInfo cs_info;
+ cs_info.compression_type = plugin->get_type_name();
+ cs_info.orig_size = s->obj_size;
+ cs_info.blocks = std::move(compressor->get_compression_blocks());
+ ::encode(cs_info, tmp);
+ attrs[RGW_ATTR_COMPRESSION] = tmp;
+ ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
+ << " with type=" << cs_info.compression_type
+ << ", orig_size=" << cs_info.orig_size
+ << ", blocks=" << cs_info.blocks.size() << dendl;
+ }
+
buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
etag = calc_md5;
#include "rgw_lib.h"
#include "rgw_ldap.h"
#include "rgw_token.h"
+#include "rgw_compression.h"
/* XXX
const std::string& bucket_name;
const std::string& obj_name;
RGWFileHandle* rgw_fh;
- RGWPutObjProcessor *processor;
+ RGWPutObjProcessor* processor;
+ RGWPutObjDataProcessor* filter;
+ boost::optional<RGWPutObj_Compress> compressor;
+ std::string compression_type = "none";
+ CompressorRef plugin;
buffer::list data;
uint64_t timer_id;
MD5 hash;
RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, RGWFileHandle* _fh,
const std::string& _bname, const std::string& _oname)
: RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
- rgw_fh(_fh), processor(nullptr), real_ofs(0), bytes_written(0),
- multipart(false), eio(false) {
+ rgw_fh(_fh), processor(nullptr), filter(nullptr), real_ofs(0),
+ bytes_written(0), multipart(false), eio(false) {
int ret = header_init();
if (ret == 0) {