From b2143cded0e971361cdb089db19a6f69ce5b74dd Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 3 Nov 2017 15:57:56 -0700 Subject: [PATCH] rgw: rest_client: work towards throttling of http read requests Adjust the interfaces to provide the ability for the read callback to pause the reads. While doing that, define a new class interface for this instead of RGWGetDataCB. This had a butterfly effect that required modifications to the obj read filters, but the end result is a bit cleaner. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_compression.cc | 2 +- src/rgw/rgw_compression.h | 2 +- src/rgw/rgw_cr_rest.cc | 16 ++++---- src/rgw/rgw_crypt.cc | 2 +- src/rgw/rgw_crypt.h | 2 +- src/rgw/rgw_http_client.cc | 76 +++++++++---------------------------- src/rgw/rgw_http_client.h | 26 ++----------- src/rgw/rgw_op.cc | 12 +++--- src/rgw/rgw_op.h | 78 +++++++++++++++++++++----------------- src/rgw/rgw_rados.cc | 14 ++++--- src/rgw/rgw_rados.h | 20 +--------- src/rgw/rgw_rest_client.cc | 21 +++++++--- src/rgw/rgw_rest_client.h | 33 ++++++++++++---- src/rgw/rgw_rest_conn.cc | 2 +- src/rgw/rgw_rest_conn.h | 10 ++--- src/rgw/rgw_rest_s3.cc | 6 +-- src/rgw/rgw_rest_s3.h | 8 ++-- 17 files changed, 146 insertions(+), 184 deletions(-) diff --git a/src/rgw/rgw_compression.cc b/src/rgw/rgw_compression.cc index 9e3c57be93505..3ed47492c9f6e 100644 --- a/src/rgw/rgw_compression.cc +++ b/src/rgw/rgw_compression.cc @@ -53,7 +53,7 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, RGWCompressionInfo* cs_info_, bool partial_content_, - RGWGetDataCB* next): RGWGetObj_Filter(next), + RGWGetObj_Filter* next): RGWGetObj_Filter(next), cct(cct_), cs_info(cs_info_), partial_content(partial_content_), diff --git a/src/rgw/rgw_compression.h b/src/rgw/rgw_compression.h index 721c510f02ab5..b95f91954a297 100644 --- a/src/rgw/rgw_compression.h +++ b/src/rgw/rgw_compression.h @@ -23,7 +23,7 @@ public: RGWGetObj_Decompress(CephContext* cct_, RGWCompressionInfo* cs_info_, bool partial_content_, - RGWGetDataCB* next); + RGWGetObj_Filter* next); ~RGWGetObj_Decompress() override {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 3d5fae456da86..da812a9bd5c37 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -10,7 +10,7 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw -class RGWCRHTTPGetDataCB : public RGWGetDataCB { +class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { Mutex lock; RGWCoroutinesEnv *env; RGWCoroutine *cr; @@ -21,12 +21,14 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB { public: RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {} - int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + int handle_data(bufferlist& bl, bool *pause) override { { + uint64_t bl_len = bl.length(); + Mutex::Locker l(lock); if (!got_all_extra_data) { - off_t max = extra_data_len - extra_data.length(); + uint64_t max = extra_data_len - extra_data.length(); if (max > bl_len) { max = bl_len; } @@ -35,15 +37,11 @@ public: got_all_extra_data = extra_data.length() == extra_data_len; } - if (bl_len == bl.length()) { - data.append(bl); - } else { - bl.splice(0, bl_len, &data); - } + data.append(bl); } #define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024 - if (bl.length() >= GET_DATA_WINDOW_SIZE) { + if (data.length() >= GET_DATA_WINDOW_SIZE) { env->manager->io_complete(cr, io_id); } return 0; diff --git a/src/rgw/rgw_crypt.cc b/src/rgw/rgw_crypt.cc index f8d6f1095e415..03dcf6df49dfb 100644 --- a/src/rgw/rgw_crypt.cc +++ b/src/rgw/rgw_crypt.cc @@ -516,7 +516,7 @@ bool AES_256_ECB_encrypt(CephContext* cct, RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(CephContext* cct, - RGWGetDataCB* next, + RGWGetObj_Filter* next, std::unique_ptr crypt): RGWGetObj_Filter(next), cct(cct), diff --git a/src/rgw/rgw_crypt.h b/src/rgw/rgw_crypt.h index 77f07f8f2fe2e..db88d835ec056 100644 --- a/src/rgw/rgw_crypt.h +++ b/src/rgw/rgw_crypt.h @@ -96,7 +96,7 @@ class RGWGetObj_BlockDecrypt : public RGWGetObj_Filter { std::vector parts_len; /**< size of parts of multipart object, parsed from manifest */ public: RGWGetObj_BlockDecrypt(CephContext* cct, - RGWGetDataCB* next, + RGWGetObj_Filter* next, std::unique_ptr crypt); virtual ~RGWGetObj_BlockDecrypt(); diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 757c6a5c7bffb..fc884668dd126 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -275,63 +275,6 @@ void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type) } } -/* - * the simple set of callbacks will be called on RGWHTTPClient::process() - */ -/* Static methods - callbacks for libcurl. */ -size_t RGWHTTPClient::simple_receive_http_header(void * const ptr, - const size_t size, - const size_t nmemb, - void * const _info) -{ - RGWHTTPClient *client = static_cast(_info); - const size_t len = size * nmemb; - int ret = client->receive_header(ptr, size * nmemb); - if (ret < 0) { - dout(0) << "WARNING: client->receive_header() returned ret=" - << ret << dendl; - } - - return len; -} - -size_t RGWHTTPClient::simple_receive_http_data(void * const ptr, - const size_t size, - const size_t nmemb, - void * const _info) -{ - RGWHTTPClient *client = static_cast(_info); - const size_t len = size * nmemb; - int ret = client->receive_data(ptr, size * nmemb); - if (ret < 0) { - dout(0) << "WARNING: client->receive_data() returned ret=" - << ret << dendl; - } - - return len; -} - -size_t RGWHTTPClient::simple_send_http_data(void * const ptr, - const size_t size, - const size_t nmemb, - void * const _info) -{ - RGWHTTPClient *client = static_cast(_info); - bool pause = false; - int ret = client->send_data(ptr, size * nmemb, &pause); - if (ret < 0) { - dout(0) << "WARNING: client->send_data() returned ret=" - << ret << dendl; - } - - if (ret == 0 && - pause) { - return CURL_READFUNC_PAUSE; - } - - return ret; -} - /* * the following set of callbacks will be called either on RGWHTTPManager::process(), * or via the RGWHTTPManager async processing. @@ -366,17 +309,34 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr, rgw_http_req_data *req_data = static_cast(_info); size_t len = size * nmemb; + bool pause = false; + + size_t skip_bytes = req_data->client->receive_pause_skip; + + if (skip_bytes >= len) { + skip_bytes -= len; + return len; + } + Mutex::Locker l(req_data->lock); if (!req_data->registered) { return len; } - int ret = req_data->client->receive_data(ptr, size * nmemb); + int ret = req_data->client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause); if (ret < 0) { dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; } + skip_bytes = 0; + + if (pause) { + skip_bytes = len; + } else { + skip_bytes = 0; + } + return len; } diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index ad1301531fa5a..e276d4d6fe074 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -68,6 +68,8 @@ class RGWHTTPClient : public RGWIOProvider size_t send_len; bool has_send_len; long http_status; + size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called + due to being paused */ void *user_info{nullptr}; @@ -93,7 +95,7 @@ protected: virtual int receive_header(void *ptr, size_t len) { return 0; } - virtual int receive_data(void *ptr, size_t len) { + virtual int receive_data(void *ptr, size_t len, bool *pause) { return 0; } virtual int send_data(void *ptr, size_t len, bool *pause) { @@ -104,28 +106,16 @@ protected: } /* Callbacks for libcurl. */ - static size_t simple_receive_http_header(void *ptr, - size_t size, - size_t nmemb, - void *_info); static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info); - static size_t simple_receive_http_data(void *ptr, - size_t size, - size_t nmemb, - void *_info); static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info); - static size_t simple_send_http_data(void *ptr, - size_t size, - size_t nmemb, - void *_info); static size_t send_http_data(void *ptr, size_t size, size_t nmemb, @@ -231,14 +221,6 @@ public: protected: int receive_header(void *ptr, size_t len) override; - int receive_data(void *ptr, size_t len) override { - return 0; - } - - int send_data(void *ptr, size_t len) override { - return 0; - } - private: const std::set relevant_headers; std::map found_headers; @@ -280,7 +262,7 @@ public: protected: int send_data(void* ptr, size_t len) override; - int receive_data(void *ptr, size_t len) override { + int receive_data(void *ptr, size_t len, bool *pause) override { read_bl->append((char *)ptr, len); return 0; } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 4028079fa0779..bf06110bb8d4c 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1135,7 +1135,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, { ldout(s->cct, 20) << "user manifest obj=" << ent.key.name << "[" << ent.key.instance << "]" << dendl; RGWGetObj_CB cb(this); - RGWGetDataCB* filter = &cb; + RGWGetObj_Filter* filter = &cb; boost::optional decompress; int64_t cur_ofs = start_ofs; @@ -1719,9 +1719,9 @@ void RGWGetObj::execute() int64_t ofs_x, end_x; RGWGetObj_CB cb(this); - RGWGetDataCB* filter = (RGWGetDataCB*)&cb; + RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb; boost::optional decompress; - std::unique_ptr decrypt; + std::unique_ptr decrypt; map::iterator attr_iter; perfcounter->inc(l_rgw_get); @@ -3309,7 +3309,7 @@ void RGWPutObj::pre_exec() rgw_bucket_object_pre_exec(s); } -class RGWPutObj_CB : public RGWGetDataCB +class RGWPutObj_CB : public RGWGetObj_Filter { RGWPutObj *op; public: @@ -3334,9 +3334,9 @@ int RGWPutObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl) { RGWPutObj_CB cb(this); - RGWGetDataCB* filter = &cb; + RGWGetObj_Filter* filter = &cb; boost::optional decompress; - std::unique_ptr decrypt; + std::unique_ptr decrypt; RGWCompressionInfo cs_info; map attrs; map::iterator attr_iter; diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 952fe36d1643b..875004fb391d5 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -180,6 +180,46 @@ public: virtual int error_handler(int err_no, string *error_content); }; +class RGWGetObj_Filter : public RGWGetDataCB +{ +protected: + RGWGetObj_Filter *next{nullptr}; +public: + RGWGetObj_Filter() {} + RGWGetObj_Filter(RGWGetObj_Filter *next): next(next) {} + ~RGWGetObj_Filter() override {} + /** + * Passes data through filter. + * Filter can modify content of bl. + * When bl_len == 0 , it means 'flush + */ + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + if (next) { + return next->handle_data(bl, bl_ofs, bl_len); + } + return 0; + } + /** + * Flushes any cached data. Used by RGWGetObjFilter. + * Return logic same as handle_data. + */ + virtual int flush() { + if (next) { + return next->flush(); + } + return 0; + } + /** + * Allows filter to extend range required for successful filtering + */ + virtual int fixup_range(off_t& ofs, off_t& end) { + if (next) { + return next->fixup_range(ofs, end); + } + return 0; + } +}; + class RGWGetObj : public RGWOp { protected: seed torrent; // get torrent @@ -281,13 +321,13 @@ public: /** * calculates filter used to decrypt RGW objects data */ - virtual int get_decrypt_filter(std::unique_ptr* filter, RGWGetDataCB* cb, bufferlist* manifest_bl) { + virtual int get_decrypt_filter(std::unique_ptr* filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl) { *filter = nullptr; return 0; } }; -class RGWGetObj_CB : public RGWGetDataCB +class RGWGetObj_CB : public RGWGetObj_Filter { RGWGetObj *op; public: @@ -299,36 +339,6 @@ public: } }; -class RGWGetObj_Filter : public RGWGetDataCB -{ -protected: - RGWGetDataCB* next; -public: - RGWGetObj_Filter(RGWGetDataCB* next): next(next) {} - ~RGWGetObj_Filter() override {} - /** - * Passes data through filter. - * Filter can modify content of bl. - * When bl_len == 0 , it means 'flush - */ - int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { - return next->handle_data(bl, bl_ofs, bl_len); - } - /** - * Flushes any cached data. Used by RGWGetObjFilter. - * Return logic same as handle_data. - */ - int flush() override { - return next->flush(); - } - /** - * Allows filter to extend range required for successful filtering - */ - int fixup_range(off_t& ofs, off_t& end) override { - return next->fixup_range(ofs, end); - } -}; - class RGWGetObjTags : public RGWOp { protected: bufferlist tags_bl; @@ -1041,8 +1051,8 @@ public: void execute() override; /* this is for cases when copying data from other object */ - virtual int get_decrypt_filter(std::unique_ptr* filter, - RGWGetDataCB* cb, + virtual int get_decrypt_filter(std::unique_ptr* filter, + RGWGetObj_Filter* cb, map& attrs, bufferlist* manifest_bl) { *filter = nullptr; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 43908c85f2ba4..63438c0b87fe2 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7346,7 +7346,7 @@ bool RGWRados::aio_completed(void *handle) return c->is_safe(); } -class RGWRadosPutObj : public RGWGetDataCB +class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB { CephContext* cct; rgw_obj obj; @@ -7361,6 +7361,7 @@ class RGWRadosPutObj : public RGWGetDataCB uint64_t extra_data_left; uint64_t data_len; map src_attrs; + off_t ofs{0}; public: RGWRadosPutObj(CephContext* cct, CompressorRef& plugin, @@ -7402,7 +7403,7 @@ public: return 0; } - int handle_data(bufferlist& bl, off_t ofs, off_t len) override { + int handle_data(bufferlist& bl, bool *pause) override { if (progress_cb) { progress_cb(ofs, progress_data); } @@ -7444,6 +7445,8 @@ public: if (ret < 0) return ret; + ofs += size; + if (need_opstate && opstate) { /* need to update opstate repository with new state. This is ratelimited, so we're not * really doing it every time @@ -7475,7 +7478,7 @@ public: void set_extra_data_len(uint64_t len) override { extra_data_left = len; - RGWGetDataCB::set_extra_data_len(len); + RGWHTTPStreamRWRequest::ReceiveCB::set_extra_data_len(len); } uint64_t get_data_len() { @@ -7625,11 +7628,12 @@ inline ostream& operator<<(ostream& out, const obj_time_weight &o) { return out; } -class RGWGetExtraDataCB : public RGWGetDataCB { +class RGWGetExtraDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { bufferlist extra_data; public: RGWGetExtraDataCB() {} - int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { + int handle_data(bufferlist& bl, bool *pause) override { + int bl_len = (int)bl.length(); if (extra_data.length() < extra_data_len) { off_t max = extra_data_len - extra_data.length(); if (max > bl_len) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 8601600e39469..003c09e6ab1d1 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -296,28 +296,10 @@ struct RGWUsageIter { }; class RGWGetDataCB { -protected: - uint64_t extra_data_len; public: virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0; - RGWGetDataCB() : extra_data_len(0) {} + RGWGetDataCB() {} virtual ~RGWGetDataCB() {} - virtual void set_extra_data_len(uint64_t len) { - extra_data_len = len; - } - /** - * Flushes any cached data. Used by RGWGetObjFilter. - * Return logic same as handle_data. - */ - virtual int flush() { - return 0; - } - /** - * Allows to extend fetch range of RGW object. Used by RGWGetObjFilter. - */ - virtual int fixup_range(off_t& bl_ofs, off_t& bl_end) { - return 0; - } }; class RGWAccessListFilter { diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index ca63e09fb1d4b..c802e81bee4e2 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -162,7 +162,7 @@ int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len) return len; } -int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len) +int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause) { size_t cp_len, left_len; @@ -747,18 +747,27 @@ int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val) return 0; } -int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len) +int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause) { + size_t orig_len = len; + if (cb) { bufferptr bp((const char *)ptr, len); - bufferlist bl; - bl.append(bp); - int ret = cb->handle_data(bl, ofs, len); + in_data.append(bp); + int ret = cb->handle_data(in_data, pause); if (ret < 0) return ret; + if (ret == 0) { + in_data.clear(); + } else { + /* partial read */ + len = ret; + bufferlist bl; + in_data.splice(0, len, &bl); + } } ofs += len; - return len; + return orig_len; } void RGWHTTPStreamRWRequest::set_stream_write(bool s) { diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 964aa0dff7c3f..816c87474eddb 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -49,7 +49,7 @@ public: } int receive_header(void *ptr, size_t len) override; - int receive_data(void *ptr, size_t len) override; + int receive_data(void *ptr, size_t len, bool *pause) override; int send_data(void *ptr, size_t len) override; bufferlist& get_response() { return response; } @@ -78,15 +78,20 @@ public: class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { +public: + class ReceiveCB; + +private: Mutex lock; Mutex write_lock; - RGWGetDataCB *cb{nullptr}; + ReceiveCB *cb{nullptr}; RGWWriteDrainCB *write_drain_cb{nullptr}; bufferlist outbl; bufferlist in_data; size_t chunk_ofs{0}; size_t ofs{0}; uint64_t write_ofs{0}; + bool read_paused{false}; bool send_paused{false}; bool stream_writes{false}; bool write_stream_complete{false}; @@ -94,13 +99,25 @@ protected: int handle_header(const string& name, const string& val) override; public: int send_data(void *ptr, size_t len, bool *pause) override; - int receive_data(void *ptr, size_t len) override; + int receive_data(void *ptr, size_t len, bool *pause) override; + + class ReceiveCB { + protected: + uint64_t extra_data_len{0}; + public: + ReceiveCB() = default; + virtual ~ReceiveCB() = default; + virtual int handle_data(bufferlist& bl, bool *pause = nullptr) = 0; + virtual void set_extra_data_len(uint64_t len) { + extra_data_len = len; + } + }; RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") { } - RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, + RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params), lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) { } @@ -110,7 +127,7 @@ public: outbl.swap(_outbl); } - void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } + void set_in_cb(ReceiveCB *_cb) { cb = _cb; } void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; } void add_send_data(bufferlist& bl); @@ -126,7 +143,7 @@ public: class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest { bool send_data_hint{false}; public: - RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, + RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWHTTPStreamRWRequest::ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) { } virtual ~RGWRESTStreamRWRequest() override {} @@ -145,13 +162,13 @@ public: class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest { public: - RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, + RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "GET", _url, _cb, _headers, _params) {} }; class RGWRESTStreamHeadRequest : public RGWRESTStreamRWRequest { public: - RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, + RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {} }; diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 0bfce1af30355..14586bcd452f4 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -198,7 +198,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, con uint32_t mod_zone_id, uint64_t mod_pg_ver, bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest, bool skip_decrypt, - bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req) + bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req) { get_obj_params params; params.uid = uid; diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 537077bb83f75..e4c9d3dbb1fb4 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -118,7 +118,7 @@ public: bool sync_manifest{false}; bool skip_decrypt{true}; - RGWGetDataCB *cb{nullptr}; + RGWHTTPStreamRWRequest::ReceiveCB *cb{nullptr}; bool range_is_set{false}; uint64_t range_start{0}; @@ -131,7 +131,7 @@ public: const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr, uint32_t mod_zone_id, uint64_t mod_pg_ver, bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest, - bool skip_decrypt, bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req); + bool skip_decrypt, bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req); int complete_request(RGWRESTStreamRWRequest *req, string& etag, ceph::real_time *mtime, uint64_t *psize, map& attrs); int get_resource(const string& resource, @@ -180,13 +180,13 @@ int RGWRESTConn::get_json_resource(const string& resource, const rgw_http_param return get_json_resource(resource, ¶ms, t); } -class RGWStreamIntoBufferlist : public RGWGetDataCB { +class RGWStreamIntoBufferlist : public RGWHTTPStreamRWRequest::ReceiveCB { bufferlist& bl; public: RGWStreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {} - int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override { + int handle_data(bufferlist& inbl, bool *pause) override { bl.claim_append(inbl); - return bl_len; + return inbl.length(); } }; diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 7d327b19a0583..ef67fcbbbcf56 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -339,7 +339,7 @@ send_data: return 0; } -int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr *filter, RGWGetDataCB* cb, bufferlist* manifest_bl) +int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr *filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl) { if (skip_decrypt) { // bypass decryption for multisite sync requests return 0; @@ -1466,8 +1466,8 @@ static inline void set_attr(map& attrs, const char* key, con } int RGWPutObj_ObjStore_S3::get_decrypt_filter( - std::unique_ptr* filter, - RGWGetDataCB* cb, + std::unique_ptr* filter, + RGWGetObj_Filter* cb, map& attrs, bufferlist* manifest_bl) { diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index 3b15afa0b8db5..9e5ef4316aeb0 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -50,8 +50,8 @@ public: int send_response_data_error() override; int send_response_data(bufferlist& bl, off_t ofs, off_t len) override; void set_custom_http_response(int http_ret) { custom_http_ret = http_ret; } - int get_decrypt_filter(std::unique_ptr* filter, - RGWGetDataCB* cb, + int get_decrypt_filter(std::unique_ptr* filter, + RGWGetObj_Filter* cb, bufferlist* manifest_bl) override; }; @@ -215,8 +215,8 @@ public: int get_encrypt_filter(std::unique_ptr* filter, RGWPutObjDataProcessor* cb) override; - int get_decrypt_filter(std::unique_ptr* filter, - RGWGetDataCB* cb, + int get_decrypt_filter(std::unique_ptr* filter, + RGWGetObj_Filter* cb, map& attrs, bufferlist* manifest_bl) override; }; -- 2.39.5