From b7102750a72cfca682051e9bacf591693d9b70d5 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 31 Oct 2017 09:44:15 -0700 Subject: [PATCH] rgw: cr rest splice, work towards write throttling Need to throttle writes, so that we don't just accumulate all data read from source endpoint in memory, in the case where the write endpoint is too slow. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 1 + src/rgw/rgw_cr_rest.cc | 59 +++++++++++++++++++++++++++++--------- src/rgw/rgw_cr_rest.h | 21 ++++++++++++-- src/rgw/rgw_rest_client.cc | 43 ++++++++++++++++++--------- src/rgw/rgw_rest_client.h | 11 +++++++ 5 files changed, 105 insertions(+), 30 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index ecc30fad808..cbdeecd84cd 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -62,6 +62,7 @@ void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_ cns.erase(cn); } +#warning shouldn't have more than one entry in complete_reqs per io_id complete_reqs.push_back(io_completion{io_id, user_info}); cond.Signal(); } diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index f489c239ad2..3d5fae456da 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -86,7 +86,7 @@ int RGWStreamReadHTTPResourceCRF::init() { env->stack->init_new_io(req); - in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ)); + in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); req->set_in_cb(in_cb); @@ -102,6 +102,8 @@ int RGWStreamWriteHTTPResourceCRF::send() { env->stack->init_new_io(req); + req->set_write_drain_cb(&write_drain_notify_cb); + int r = http_manager->add_request(req); if (r < 0) { return r; @@ -164,7 +166,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool */ continue; } - if (!req->is_done()) { + if (!req->is_done() || out->length() >= max_size) { yield; } } @@ -185,11 +187,38 @@ void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj) } } -int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data) +#define PENDING_WRITES_WINDOW (1 * 1024 * 1024) + +void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size) +{ + lock_guard l(blocked_lock); + if (is_blocked && (pending_size < PENDING_WRITES_WINDOW / 2)) { + env->manager->io_complete(caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); + is_blocked = false; + } +} + +void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size) +{ + crf->write_drain_notify(pending_size); +} + +int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending) { -#warning write need to throttle and block reenter(&write_state) { while (!req->is_done()) { + *io_pending = false; + if (req->get_pending_send_size() >= PENDING_WRITES_WINDOW) { + *io_pending = true; + { + lock_guard l(blocked_lock); + is_blocked = true; + + /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup + * correctly */ + } + yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); + } yield req->add_send_data(data); } } @@ -275,18 +304,20 @@ int RGWStreamSpliceCR::operate() { total_read += bl.length(); - yield { - ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl; - ret = out_crf->write(bl); - if (ret < 0) { - return set_cr_error(ret); + do { + yield { + ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl; + ret = out_crf->write(bl, &need_retry); + if (ret < 0) { + return set_cr_error(ret); + } } - } - if (retcode < 0) { - ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; - return set_cr_error(ret); - } + if (retcode < 0) { + ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; + return set_cr_error(ret); + } + } while (need_retry); } while (true); do { diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index c8bf4763ae5..66bb663ecf9 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -2,6 +2,7 @@ #define CEPH_RGW_CR_REST_H #include +#include #include "include/assert.h" // boost header clobbers our assert.h #include "rgw_coroutine.h" @@ -349,7 +350,7 @@ public: virtual int init() = 0; virtual void send_ready(const rgw_rest_obj& rest_obj) = 0; virtual int send() = 0; - virtual int write(bufferlist& data) = 0; /* reentrant */ + virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */ virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ }; @@ -421,6 +422,11 @@ protected: RGWCoroutine *caller; RGWHTTPManager *http_manager; + using lock_guard = std::lock_guard; + + std::mutex blocked_lock; + bool is_blocked; + RGWHTTPStreamRWRequest *req{nullptr}; struct multipart_info { @@ -430,13 +436,21 @@ protected: uint64_t part_size; } multipart; + class WriteDrainNotify : public RGWWriteDrainCB { + RGWStreamWriteHTTPResourceCRF *crf; + public: + WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {} + void notify(uint64_t pending_size) override; + } write_drain_notify_cb; + public: RGWStreamWriteHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWHTTPManager *_http_manager) : env(_env), caller(_caller), - http_manager(_http_manager) {} + http_manager(_http_manager), + write_drain_notify_cb(this) {} virtual ~RGWStreamWriteHTTPResourceCRF() {} int init() override { @@ -444,7 +458,8 @@ public: } void send_ready(const rgw_rest_obj& rest_obj) override; int send() override; - int write(bufferlist& data) override; /* reentrant */ + int write(bufferlist& data, bool *need_retry) override; /* reentrant */ + void write_drain_notify(uint64_t pending_size); int drain_writes(bool *need_retry) override; /* reentrant */ virtual void handle_headers(const std::map& headers) {} diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 9a791866622..ca63e09fb1d 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -774,6 +774,12 @@ void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl) _set_write_paused(false); } +uint64_t RGWHTTPStreamRWRequest::get_pending_send_size() +{ + Mutex::Locker wl(write_lock); + return outbl.length(); +} + void RGWHTTPStreamRWRequest::finish_write() { Mutex::Locker req_locker(get_req_lock()); @@ -784,23 +790,34 @@ void RGWHTTPStreamRWRequest::finish_write() int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause) { - Mutex::Locker wl(write_lock); + uint64_t out_len; + uint64_t send_size; + { + Mutex::Locker wl(write_lock); - if (outbl.length() == 0) { - if (stream_writes && !write_stream_complete) { - *pause = true; + if (outbl.length() == 0) { + if (stream_writes && !write_stream_complete) { + *pause = true; + } + return 0; } - return 0; - } - len = std::min(len, (size_t)outbl.length()); + len = std::min(len, (size_t)outbl.length()); + + bufferlist bl; + outbl.splice(0, len, &bl); + send_size = bl.length(); + if (send_size > 0) { + memcpy(ptr, bl.c_str(), send_size); + write_ofs += send_size; + } - bufferlist bl; - outbl.splice(0, len, &bl); - uint64_t send_size = bl.length(); - if (send_size > 0) { - memcpy(ptr, bl.c_str(), send_size); - write_ofs += send_size; + out_len = outbl.length(); + } + /* don't need to be under write_lock here, avoid deadlocks in case notify callback + * needs to lock */ + if (write_drain_cb) { + write_drain_cb->notify(out_len); } return send_size; } diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 277263bc972..964aa0dff7c 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -69,11 +69,19 @@ public: int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl); }; +class RGWWriteDrainCB { +public: + RGWWriteDrainCB() = default; + virtual ~RGWWriteDrainCB() = default; + virtual void notify(uint64_t pending_size) = 0; +}; + class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { Mutex lock; Mutex write_lock; RGWGetDataCB *cb{nullptr}; + RGWWriteDrainCB *write_drain_cb{nullptr}; bufferlist outbl; bufferlist in_data; size_t chunk_ofs{0}; @@ -103,11 +111,14 @@ public: } void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } + void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; } void add_send_data(bufferlist& bl); void set_stream_write(bool s); + uint64_t get_pending_send_size(); + /* finish streaming writes */ void finish_write(); }; -- 2.39.5