From: Yehuda Sadeh Date: Sat, 4 Nov 2017 03:26:31 +0000 (-0700) Subject: rgw: cr rest splice, both reads and writes are throttled X-Git-Tag: v13.1.0~270^2~62 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=4bcd842df14bd1ed8a090e8e96a468c419e83f33;p=ceph.git rgw: cr rest splice, both reads and writes are throttled Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index da812a9bd5c37..cfb1241ffebb4 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -14,12 +14,17 @@ class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { Mutex lock; RGWCoroutinesEnv *env; RGWCoroutine *cr; + RGWHTTPStreamRWRequest *req; rgw_io_id io_id; bufferlist data; bufferlist extra_data; bool got_all_extra_data{false}; + bool paused{false}; public: - RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {} + RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), req(_req) { + io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL); + req->set_in_cb(this); + } int handle_data(bufferlist& bl, bool *pause) override { { @@ -40,25 +45,39 @@ public: data.append(bl); } -#define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024 - if (data.length() >= GET_DATA_WINDOW_SIZE) { +#define GET_DATA_WINDOW_SIZE 2 * 1024 * 1024 + uint64_t data_len = data.length(); + if (data_len >= GET_DATA_WINDOW_SIZE) { env->manager->io_complete(cr, io_id); } + if (data_len >= 2 * GET_DATA_WINDOW_SIZE) { + *pause = true; + paused = true; + } return 0; } void claim_data(bufferlist *dest, uint64_t max) { - Mutex::Locker l(lock); + bool need_to_unpause = false; - if (data.length() == 0) { - return; - } + { + Mutex::Locker l(lock); - if (data.length() < max) { - max = data.length(); + if (data.length() == 0) { + return; + } + + if (data.length() < max) { + max = data.length(); + } + + data.splice(0, max, dest); + need_to_unpause = (paused && data.length() <= GET_DATA_WINDOW_SIZE); } - data.splice(0, max, dest); + if (need_to_unpause) { + req->unpause_receive(); + } } bufferlist& get_extra_data() { @@ -84,9 +103,7 @@ int RGWStreamReadHTTPResourceCRF::init() { env->stack->init_new_io(req); - in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); - - req->set_in_cb(in_cb); + in_cb = new RGWCRHTTPGetDataCB(env, caller, req); int r = http_manager->add_request(req); if (r < 0) { @@ -215,7 +232,7 @@ int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending) /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup * correctly */ } - yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); + yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); } yield req->add_send_data(data); } diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 39d585bd5b5cc..d4eadaac03dd0 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -55,28 +55,7 @@ struct rgw_http_req_data : public RefCountedObject { return ret; } - void set_state(int bitmask) { - Mutex::Locker l(lock); - CURLcode rc; - int bitmask; - switch (state) { - case SET_WRITE_PAUSED: - bitmask = CURLPAUSE_SEND; - break; - case SET_WRITE_RESUME: - bitmask = CURLPAUSE_CONT; - break; - default: - /* shouldn't really be here */ - return; - } - - rc = curl_easy_pause(**curl_handle, bitmask); - if (rc != CURLE_OK) { - dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl; - } - } - + void set_state(int bitmask); void finish(int r) { Mutex::Locker l(lock); @@ -313,7 +292,7 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr, bool pause = false; - size_t skip_bytes = req_data->client->receive_pause_skip; + size_t& skip_bytes = req_data->client->receive_pause_skip; if (skip_bytes >= len) { skip_bytes -= len; @@ -334,7 +313,9 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr, skip_bytes = 0; if (pause) { + dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl; skip_bytes = len; + req_data->read_paused = true; return CURL_WRITEFUNC_PAUSE; } @@ -983,8 +964,9 @@ int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetSt return -EINVAL; } - bool suggested_wr_paused; - bool suggested_rd_paused; + bool suggested_wr_paused = req_data->write_paused; + bool suggested_rd_paused = req_data->read_paused; + switch (state) { case SET_WRITE_PAUSED: suggested_wr_paused = true; diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index cdd33e01c5bd2..9c64feb19aa73 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -65,7 +65,6 @@ class RGWHTTPClient : public RGWIOProvider bufferlist send_bl; bufferlist::iterator send_iter; - size_t send_len; bool has_send_len; long http_status; size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called @@ -85,6 +84,8 @@ protected: string method; string url; + size_t send_len{0}; + param_vec_t headers; RGWHTTPManager *get_manager(); @@ -139,8 +140,7 @@ public: explicit RGWHTTPClient(CephContext *cct, const string& _method, const string& _url) - : send_len(0), - has_send_len(false), + : has_send_len(false), http_status(HTTP_STATUS_NOSTATUS), req_data(nullptr), verify_ssl(cct->_conf->rgw_verify_ssl), diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index c802e81bee4e2..9baf60aa079c3 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -752,8 +752,10 @@ int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause) size_t orig_len = len; if (cb) { - bufferptr bp((const char *)ptr, len); - in_data.append(bp); + in_data.append((const char *)ptr, len); + + size_t orig_in_data_len = in_data.length(); + int ret = cb->handle_data(in_data, pause); if (ret < 0) return ret; @@ -761,9 +763,13 @@ int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause) in_data.clear(); } else { /* partial read */ + assert(in_data.length() <= orig_in_data_len); len = ret; bufferlist bl; - in_data.splice(0, len, &bl); + size_t left_to_read = orig_in_data_len - len; + if (in_data.length() > left_to_read) { + in_data.splice(0, in_data.length() - left_to_read, &bl); + } } } ofs += len; @@ -775,6 +781,14 @@ void RGWHTTPStreamRWRequest::set_stream_write(bool s) { stream_writes = s; } +void RGWHTTPStreamRWRequest::unpause_receive() +{ + Mutex::Locker req_locker(get_req_lock()); + if (!read_paused) { + _set_read_paused(false); + } +} + void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl) { Mutex::Locker req_locker(get_req_lock()); @@ -805,7 +819,8 @@ int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause) Mutex::Locker wl(write_lock); if (outbl.length() == 0) { - if (stream_writes && !write_stream_complete) { + if ((stream_writes && !write_stream_complete) || + (write_ofs < send_len)) { *pause = true; } return 0; diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 816c87474eddb..5d033ab41e5a7 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -130,6 +130,8 @@ public: void set_in_cb(ReceiveCB *_cb) { cb = _cb; } void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; } + void unpause_receive(); + void add_send_data(bufferlist& bl); void set_stream_write(bool s);