From 6fd1910516acdd1076aa68960f91b6049ade69ab Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Wed, 14 Sep 2016 17:52:22 +0200 Subject: [PATCH] Added compression handling when syncing. Signed-off-by: Adam Kupczyk --- src/rgw/rgw_rados.cc | 63 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 67eaa76cd8865..cb39809ad6e7c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -43,8 +43,8 @@ #include "cls/user/cls_user_client.h" #include "rgw_tools.h" - #include "rgw_coroutine.h" +#include "rgw_compression.h" #include "rgw_boost_asio_yield.h" #undef fork // fails to compile RGWPeriod::fork() below @@ -6640,19 +6640,31 @@ bool RGWRados::aio_completed(void *handle) class RGWRadosPutObj : public RGWGetDataCB { + CephContext* cct; rgw_obj obj; + RGWPutObjDataProcessor *filter; RGWPutObjProcessor_Atomic *processor; RGWOpStateSingleOp *opstate; void (*progress_cb)(off_t, void *); void *progress_data; bufferlist extra_data_bl; uint64_t extra_data_len; + uint64_t data_len; public: - RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops, - void (*_progress_cb)(off_t, void *), void *_progress_data) : processor(p), opstate(_ops), - progress_cb(_progress_cb), - progress_data(_progress_data), - extra_data_len(0) {} + RGWRadosPutObj(CephContext* cct, + RGWPutObjDataProcessor *filter, + RGWPutObjProcessor_Atomic *p, + RGWOpStateSingleOp *_ops, + void (*_progress_cb)(off_t, void *), + void *_progress_data) : + cct(cct), + filter(filter), + processor(p), + opstate(_ops), + progress_cb(_progress_cb), + progress_data(_progress_data), + extra_data_len(0), + data_len(0) {} int handle_data(bufferlist& bl, off_t ofs, off_t len) { if (progress_cb) { progress_cb(ofs, progress_data); @@ -6671,14 +6683,15 @@ public: return 0; } } + data_len += bl.length(); bool again = false; bool need_opstate = true; do { - void *handle; + void *handle = NULL; rgw_obj obj; - int ret = processor->handle_data(bl, ofs, &handle, &obj, &again); + int ret = filter->handle_data(bl, ofs, &handle, &obj, &again); if (ret < 0) return ret; @@ -6688,10 +6701,10 @@ public: */ ret = opstate->renew_state(); if (ret < 0) { - ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl; - int r = processor->throttle_data(handle, obj, false); + ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl; + int r = filter->throttle_data(handle, obj, false); if (r < 0) { - ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl; + ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl; } /* could not renew state! might have been marked as cancelled */ return ret; @@ -6700,7 +6713,7 @@ public: need_opstate = false; } - ret = processor->throttle_data(handle, obj, false); + ret = filter->throttle_data(handle, obj, false); if (ret < 0) return ret; } while (again); @@ -6714,6 +6727,10 @@ public: extra_data_len = len; } + uint64_t get_data_len() { + return data_len; + } + int complete(string& etag, real_time *mtime, real_time set_mtime, map& attrs, real_time delete_at) { return processor->complete(etag, mtime, set_mtime, attrs, delete_at); } @@ -7064,7 +7081,17 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, } } - RGWRadosPutObj cb(&processor, opstate, progress_cb, progress_data); + boost::optional compressor; + + RGWPutObjDataProcessor *filter = &processor; + bool compression_enabled = cct->_conf->rgw_compression_type != "none"; + if (compression_enabled) { + compressor = boost::in_place(cct, filter); + filter = &*compressor; + } + + RGWRadosPutObj cb(cct, filter, &processor, opstate, progress_cb, progress_data); + string etag; map req_headers; real_time set_mtime; @@ -7112,6 +7139,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, JSONDecoder::decode_json("attrs", src_attrs, &jp); + src_attrs.erase(RGW_ATTR_COMPRESSION); src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout if (source_zone.empty()) { /* need to preserve expiration if copy in the same zonegroup */ src_attrs.erase(RGW_ATTR_DELETE_AT); @@ -7126,6 +7154,15 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, } } } + if (compression_enabled && compressor->is_compressed()) { + bufferlist tmp; + RGWCompressionInfo cs_info; + cs_info.compression_type = cct->_conf->rgw_compression_type; + cs_info.orig_size = cb.get_data_len(); + cs_info.blocks = move(compressor->get_compression_blocks()); + ::encode(cs_info, tmp); + src_attrs[RGW_ATTR_COMPRESSION] = tmp; + } } if (src_mtime) { -- 2.39.5