#define RGW_ATTR_OLH_ID_TAG RGW_ATTR_OLH_PREFIX "idtag"
#define RGW_ATTR_OLH_PENDING_PREFIX RGW_ATTR_OLH_PREFIX "pending."
+#define RGW_ATTR_COMPRESSION RGW_ATTR_PREFIX "compression"
+#define RGW_ATTR_COMPRESSION_ORIG_SIZE RGW_ATTR_PREFIX "orig_size"
+
/* RGW File Attributes */
#define RGW_ATTR_UNIX_KEY1 RGW_ATTR_PREFIX "unix-key1"
#define RGW_ATTR_UNIX1 RGW_ATTR_PREFIX "unix1"
#include "include/assert.h"
+#include "compressor/Compressor.h"
+
+
#define dout_subsys ceph_subsys_rgw
using namespace std;
gc_invalidate_time = start_time;
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
}
-
+ // compression stuff
+ bool need_decompress = (attrs.find(RGW_ATTR_COMPRESSION) != attrs.end());
+ if (need_decompress) { // or it's the first part and flag is set
+ ldout(s->cct, 10) << "Compression for rgw is enabled, decompress part" << dendl;
+ string compression_type = attrs[RGW_ATTR_COMPRESSION].c_str();
+ CompressorRef compressor = Compressor::create(s->cct, compression_type);
+ if (!compressor.get()) {
+ // if compressor isn't available - error, because cannot return decompressed data?
+ lderr(s->cct) << "Cannot load compressor of type " << compression_type
+ << "for rgw, check rgw_compression_type config option" << dendl;
+ return -1;
+ } else {
+ bufferlist out_bl;
+ int cr = compressor->decompress(bl, out_bl);
+ if (cr != 0) {
+ lderr(s->cct) << "Compression failed with exit code " << cr << dendl;
+ return -1;
+ }
+ return send_response_data(out_bl, bl_ofs, out_bl.length());
+ }
+ }
+ // end of compression stuff
return send_response_data(bl, bl_ofs, bl_len);
}
hash.Final(m);
+ if (processor->is_compressed()) {
+ bufferlist tmp;
+ tmp.append(s->cct->_conf->rgw_compression_type.c_str(), s->cct->_conf->rgw_compression_type.length()+1);
+ attrs[RGW_ATTR_COMPRESSION] = tmp;
+ tmp.clear();
+ char sz [20];
+ snprintf(sz, sizeof(sz), "%lu", s->obj_size);
+ tmp.append(sz, strlen(sz)+1);
+ attrs[RGW_ATTR_COMPRESSION_ORIG_SIZE] = tmp;
+ }
+
+
buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
etag = calc_md5;
acls = ss.str();
}
+
+
int RGWPutACLs::verify_permission()
{
bool perm;
ldout(store->ctx(), 5) << "Cannot load compressor of type " << store->ctx()->_conf->rgw_compression_type
<< "for rgw, check rgw_compression_type config option" << dendl;
compressed = false;
+ in_bl.claim(bl);
} else {
- bufferlist out;
- int cr = compressor->compress(bl, out);
+ int cr = compressor->compress(bl, in_bl);
if (cr != 0) {
ldout(store->ctx(), 5) << "Compression failed with exit code " << cr << dendl;
compressed = false;
+ in_bl.claim(bl);
} else {
compressed = true;
- in_bl = out;
}
}
} else {
compressed = false;
- in_bl = bl;
+ in_bl.claim(bl);
}
// end of compression stuff
int ret = write_data(in_bl, write_ofs, phandle, pobj, exclusive);
if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
in_bl.clear();
+ bl.clear();
}
return ret;
}
/* update quota cache */
store->quota_handler->update_stats(meta.owner, bucket, (orig_exists ? 0 : 1), size, orig_size);
-
return 0;
done_cancel:
return r;
}
+ int dec_size = 0;
if (params.attrs) {
*params.attrs = astate->attrset;
if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
for (iter = params.attrs->begin(); iter != params.attrs->end(); ++iter) {
- ldout(cct, 20) << "Read xattr: " << iter->first << dendl;
+ ldout(cct, 20) << "Read xattr: " << iter->first << "=" << iter->second << dendl;
}
}
+ if (params.attrs->find(RGW_ATTR_COMPRESSION) != params.attrs->end()) {
+ dec_size = atoi(params.attrs->at(RGW_ATTR_COMPRESSION_ORIG_SIZE).c_str());
+ }
}
/* Convert all times go GMT to make them compatible */
*pofs = ofs;
if (pend)
*pend = end;
- if (params.read_size)
- *params.read_size = (ofs <= end ? end + 1 - ofs : 0);
+ if (params.read_size) {
+ if (dec_size)
+ *params.read_size = dec_size;
+ else
+ *params.read_size = (ofs <= end ? end + 1 - ofs : 0);
+ }
if (params.obj_size)
*params.obj_size = astate->size;
if (params.lastmod)
CephContext *ctx();
bool is_canceled() { return canceled; }
+ bool is_compressed() { return compressed; }
}; /* RGWPutObjProcessor */
struct put_obj_aio_info {