#include "cls/user/cls_user_client.h"
#include "rgw_tools.h"
-
#include "rgw_coroutine.h"
+#include "rgw_compression.h"
#include "rgw_boost_asio_yield.h"
#undef fork // fails to compile RGWPeriod::fork() below
class RGWRadosPutObj : public RGWGetDataCB
{
+ CephContext* cct;
rgw_obj obj;
+ RGWPutObjDataProcessor *filter;
RGWPutObjProcessor_Atomic *processor;
RGWOpStateSingleOp *opstate;
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl;
uint64_t extra_data_len;
+ uint64_t data_len;
public:
- RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops,
- void (*_progress_cb)(off_t, void *), void *_progress_data) : processor(p), opstate(_ops),
- progress_cb(_progress_cb),
- progress_data(_progress_data),
- extra_data_len(0) {}
+ RGWRadosPutObj(CephContext* cct,
+ RGWPutObjDataProcessor *filter,
+ RGWPutObjProcessor_Atomic *p,
+ RGWOpStateSingleOp *_ops,
+ void (*_progress_cb)(off_t, void *),
+ void *_progress_data) :
+ cct(cct),
+ filter(filter),
+ processor(p),
+ opstate(_ops),
+ progress_cb(_progress_cb),
+ progress_data(_progress_data),
+ extra_data_len(0),
+ data_len(0) {}
int handle_data(bufferlist& bl, off_t ofs, off_t len) {
if (progress_cb) {
progress_cb(ofs, progress_data);
return 0;
}
}
+ data_len += bl.length();
bool again = false;
bool need_opstate = true;
do {
- void *handle;
+ void *handle = NULL;
rgw_obj obj;
- int ret = processor->handle_data(bl, ofs, &handle, &obj, &again);
+ int ret = filter->handle_data(bl, ofs, &handle, &obj, &again);
if (ret < 0)
return ret;
*/
ret = opstate->renew_state();
if (ret < 0) {
- ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
- int r = processor->throttle_data(handle, obj, false);
+ ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
+ int r = filter->throttle_data(handle, obj, false);
if (r < 0) {
- ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
+ ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
}
/* could not renew state! might have been marked as cancelled */
return ret;
need_opstate = false;
}
- ret = processor->throttle_data(handle, obj, false);
+ ret = filter->throttle_data(handle, obj, false);
if (ret < 0)
return ret;
} while (again);
extra_data_len = len;
}
+ uint64_t get_data_len() {
+ return data_len;
+ }
+
int complete(string& etag, real_time *mtime, real_time set_mtime, map<string, bufferlist>& attrs, real_time delete_at) {
return processor->complete(etag, mtime, set_mtime, attrs, delete_at);
}
}
}
- RGWRadosPutObj cb(&processor, opstate, progress_cb, progress_data);
+ boost::optional<RGWPutObj_Compress> compressor;
+
+ RGWPutObjDataProcessor *filter = &processor;
+ bool compression_enabled = cct->_conf->rgw_compression_type != "none";
+ if (compression_enabled) {
+ compressor = boost::in_place(cct, filter);
+ filter = &*compressor;
+ }
+
+ RGWRadosPutObj cb(cct, filter, &processor, opstate, progress_cb, progress_data);
+
string etag;
map<string, string> req_headers;
real_time set_mtime;
JSONDecoder::decode_json("attrs", src_attrs, &jp);
+ src_attrs.erase(RGW_ATTR_COMPRESSION);
src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
if (source_zone.empty()) { /* need to preserve expiration if copy in the same zonegroup */
src_attrs.erase(RGW_ATTR_DELETE_AT);
}
}
}
+ if (compression_enabled && compressor->is_compressed()) {
+ bufferlist tmp;
+ RGWCompressionInfo cs_info;
+ cs_info.compression_type = cct->_conf->rgw_compression_type;
+ cs_info.orig_size = cb.get_data_len();
+ cs_info.blocks = move(compressor->get_compression_blocks());
+ ::encode(cs_info, tmp);
+ src_attrs[RGW_ATTR_COMPRESSION] = tmp;
+ }
}
if (src_mtime) {