From: Yehuda Sadeh Date: Tue, 31 Oct 2017 16:44:15 +0000 (-0700) Subject: rgw: cr rest splice, work towards write throttling X-Git-Tag: v13.1.0~270^2~65 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b7102750a72cfca682051e9bacf591693d9b70d5;p=ceph.git rgw: cr rest splice, work towards write throttling Need to throttle writes, so that we don't just accumulate all data read from source endpoint in memory, in the case where the write endpoint is too slow. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index ecc30fad8082..cbdeecd84cdd 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -62,6 +62,7 @@ void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_ cns.erase(cn); } +#warning shouldn't have more than one entry in complete_reqs per io_id complete_reqs.push_back(io_completion{io_id, user_info}); cond.Signal(); } diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index f489c239ad27..3d5fae456da8 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -86,7 +86,7 @@ int RGWStreamReadHTTPResourceCRF::init() { env->stack->init_new_io(req); - in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ)); + in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); req->set_in_cb(in_cb); @@ -102,6 +102,8 @@ int RGWStreamWriteHTTPResourceCRF::send() { env->stack->init_new_io(req); + req->set_write_drain_cb(&write_drain_notify_cb); + int r = http_manager->add_request(req); if (r < 0) { return r; @@ -164,7 +166,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool */ continue; } - if (!req->is_done()) { + if (!req->is_done() || out->length() >= max_size) { yield; } } @@ -185,11 +187,38 @@ void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj) } } -int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data) +#define PENDING_WRITES_WINDOW (1 * 1024 * 1024) + +void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size) +{ + lock_guard l(blocked_lock); + if (is_blocked && (pending_size < PENDING_WRITES_WINDOW / 2)) { + env->manager->io_complete(caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); + is_blocked = false; + } +} + +void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size) +{ + crf->write_drain_notify(pending_size); +} + +int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending) { -#warning write need to throttle and block reenter(&write_state) { while (!req->is_done()) { + *io_pending = false; + if (req->get_pending_send_size() >= PENDING_WRITES_WINDOW) { + *io_pending = true; + { + lock_guard l(blocked_lock); + is_blocked = true; + + /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup + * correctly */ + } + yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); + } yield req->add_send_data(data); } } @@ -275,18 +304,20 @@ int RGWStreamSpliceCR::operate() { total_read += bl.length(); - yield { - ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl; - ret = out_crf->write(bl); - if (ret < 0) { - return set_cr_error(ret); + do { + yield { + ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl; + ret = out_crf->write(bl, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } } - } - if (retcode < 0) { - ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; - return set_cr_error(ret); - } + if (retcode < 0) { + ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; + return set_cr_error(ret); + } + } while (need_retry); } while (true); do { diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index c8bf4763ae5e..66bb663ecf9d 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -2,6 +2,7 @@ #define CEPH_RGW_CR_REST_H #include +#include #include "include/assert.h" // boost header clobbers our assert.h #include "rgw_coroutine.h" @@ -349,7 +350,7 @@ public: virtual int init() = 0; virtual void send_ready(const rgw_rest_obj& rest_obj) = 0; virtual int send() = 0; - virtual int write(bufferlist& data) = 0; /* reentrant */ + virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */ virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ }; @@ -421,6 +422,11 @@ protected: RGWCoroutine *caller; RGWHTTPManager *http_manager; + using lock_guard = std::lock_guard; + + std::mutex blocked_lock; + bool is_blocked; + RGWHTTPStreamRWRequest *req{nullptr}; struct multipart_info { @@ -430,13 +436,21 @@ protected: uint64_t part_size; } multipart; + class WriteDrainNotify : public RGWWriteDrainCB { + RGWStreamWriteHTTPResourceCRF *crf; + public: + WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {} + void notify(uint64_t pending_size) override; + } write_drain_notify_cb; + public: RGWStreamWriteHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWHTTPManager *_http_manager) : env(_env), caller(_caller), - http_manager(_http_manager) {} + http_manager(_http_manager), + write_drain_notify_cb(this) {} virtual ~RGWStreamWriteHTTPResourceCRF() {} int init() override { @@ -444,7 +458,8 @@ public: } void send_ready(const rgw_rest_obj& rest_obj) override; int send() override; - int write(bufferlist& data) override; /* reentrant */ + int write(bufferlist& data, bool *need_retry) override; /* reentrant */ + void write_drain_notify(uint64_t pending_size); int drain_writes(bool *need_retry) override; /* reentrant */ virtual void handle_headers(const std::map& headers) {} diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 9a791866622b..ca63e09fb1d4 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -774,6 +774,12 @@ void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl) _set_write_paused(false); } +uint64_t RGWHTTPStreamRWRequest::get_pending_send_size() +{ + Mutex::Locker wl(write_lock); + return outbl.length(); +} + void RGWHTTPStreamRWRequest::finish_write() { Mutex::Locker req_locker(get_req_lock()); @@ -784,23 +790,34 @@ void RGWHTTPStreamRWRequest::finish_write() int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause) { - Mutex::Locker wl(write_lock); + uint64_t out_len; + uint64_t send_size; + { + Mutex::Locker wl(write_lock); - if (outbl.length() == 0) { - if (stream_writes && !write_stream_complete) { - *pause = true; + if (outbl.length() == 0) { + if (stream_writes && !write_stream_complete) { + *pause = true; + } + return 0; } - return 0; - } - len = std::min(len, (size_t)outbl.length()); + len = std::min(len, (size_t)outbl.length()); + + bufferlist bl; + outbl.splice(0, len, &bl); + send_size = bl.length(); + if (send_size > 0) { + memcpy(ptr, bl.c_str(), send_size); + write_ofs += send_size; + } - bufferlist bl; - outbl.splice(0, len, &bl); - uint64_t send_size = bl.length(); - if (send_size > 0) { - memcpy(ptr, bl.c_str(), send_size); - write_ofs += send_size; + out_len = outbl.length(); + } + /* don't need to be under write_lock here, avoid deadlocks in case notify callback + * needs to lock */ + if (write_drain_cb) { + write_drain_cb->notify(out_len); } return send_size; } diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 277263bc9723..964aa0dff7c3 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -69,11 +69,19 @@ public: int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl); }; +class RGWWriteDrainCB { +public: + RGWWriteDrainCB() = default; + virtual ~RGWWriteDrainCB() = default; + virtual void notify(uint64_t pending_size) = 0; +}; + class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { Mutex lock; Mutex write_lock; RGWGetDataCB *cb{nullptr}; + RGWWriteDrainCB *write_drain_cb{nullptr}; bufferlist outbl; bufferlist in_data; size_t chunk_ofs{0}; @@ -103,11 +111,14 @@ public: } void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } + void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; } void add_send_data(bufferlist& bl); void set_stream_write(bool s); + uint64_t get_pending_send_size(); + /* finish streaming writes */ void finish_write(); };