From 387068547a4c95b27845c8670fbeb9fba0d6f577 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 15 Aug 2017 08:36:19 -0700 Subject: [PATCH] rgw: http client, streaming writes api Add a new class that enables streaming writes, that is -- doesn't require having all the data before the start of the send. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_http_client.cc | 133 +++++++++++++++++++++++++++++++++---- src/rgw/rgw_http_client.h | 26 ++++++++ src/rgw/rgw_rest_client.cc | 21 +++++- src/rgw/rgw_rest_client.h | 16 +++-- 4 files changed, 173 insertions(+), 23 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index f96b7d5c51cfc..68a426c41a632 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -22,23 +22,22 @@ #define dout_subsys ceph_subsys_rgw struct rgw_http_req_data : public RefCountedObject { - CURL *easy_handle; - curl_slist *h; + CURL *easy_handle{nullptr}; + curl_slist *h{nullptr}; uint64_t id; - int ret; + int ret{0}; std::atomic done = { false }; - RGWHTTPClient *client; - void *user_info; - bool registered; - RGWHTTPManager *mgr; + RGWHTTPClient *client{nullptr}; + void *user_info{nullptr}; + bool registered{false}; + RGWHTTPManager *mgr{nullptr}; char error_buf[CURL_ERROR_SIZE]; + bool write_paused{false}; Mutex lock; Cond cond; - rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0), - client(nullptr), user_info(nullptr), registered(false), - mgr(NULL), lock("rgw_http_req_data::lock") { + rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") { memset(error_buf, 0, sizeof(error_buf)); } @@ -48,6 +47,27 @@ struct rgw_http_req_data : public RefCountedObject { return ret; } + void set_state(RGWHTTPRequestSetState state) { + Mutex::Locker l(lock); + CURLcode rc; + int bitmask; + switch (state) { + case SET_WRITE_PAUSED: + bitmask = CURLPAUSE_SEND; + break; + case SET_WRITE_RESUME: + bitmask = CURLPAUSE_CONT; + break; + default: + /* shouldn't really be here */ + return; + } + rc = curl_easy_pause(easy_handle, bitmask); + if (rc != CURLE_OK) { + dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl; + } + } + void finish(int r) { Mutex::Locker l(lock); @@ -250,12 +270,18 @@ size_t RGWHTTPClient::simple_send_http_data(void * const ptr, void * const _info) { RGWHTTPClient *client = static_cast(_info); - int ret = client->send_data(ptr, size * nmemb); + bool pause = false; + int ret = client->send_data(ptr, size * nmemb, &pause); if (ret < 0) { dout(0) << "WARNING: client->send_data() returned ret=" << ret << dendl; } + if (ret == 0 && + pause) { + return CURL_READFUNC_PAUSE; + } + return ret; } @@ -320,14 +346,42 @@ size_t RGWHTTPClient::send_http_data(void * const ptr, return 0; } - int ret = req_data->client->send_data(ptr, size * nmemb); + bool pause; + + int ret = req_data->client->send_data(ptr, size * nmemb, &pause); if (ret < 0) { dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; } + if (ret == 0 && + pause) { + req_data->write_paused = true; + return CURL_READFUNC_PAUSE; + } + return ret; } +Mutex& RGWHTTPClient::get_req_lock() +{ + return req_data->lock; +} + +void RGWHTTPClient::_set_write_paused(bool pause) +{ + assert(req_data->lock.is_locked()); + + RGWHTTPManager *mgr = req_data->mgr; + if (pause == req_data->write_paused) { + return; + } + if (pause) { + mgr->set_request_state(this, SET_WRITE_PAUSED); + } else { + mgr->set_request_state(this, SET_WRITE_RESUME); + } +} + static curl_slist *headers_to_slist(param_vec_t& headers) { curl_slist *h = NULL; @@ -512,7 +566,7 @@ int RGWHTTPClient::wait() RGWHTTPClient::~RGWHTTPClient() { if (req_data) { - RGWHTTPManager *http_manager = req_data->get_manager(); + RGWHTTPManager *http_manager = req_data->mgr; if (http_manager) { http_manager->remove_request(this); } @@ -806,6 +860,10 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) _complete_request(req_data); } +void RGWHTTPManager::_set_req_state(set_state& ss) +{ + ss.req->set_state(ss.state); +} /* * hook request to the curl multi handle */ @@ -843,7 +901,9 @@ void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) void RGWHTTPManager::manage_pending_requests() { reqs_lock.get_read(); - if (max_threaded_req == num_reqs && unregistered_reqs.empty()) { + if (max_threaded_req == num_reqs && + unregistered_reqs.empty() && + reqs_change_state.empty()) { reqs_lock.unlock(); return; } @@ -875,6 +935,13 @@ void RGWHTTPManager::manage_pending_requests() } } + if (!reqs_change_state.empty()) { + for (auto siter : reqs_change_state) { + _set_req_state(siter); + } + reqs_change_state.clear(); + } + for (auto piter : remove_reqs) { rgw_http_req_data *req_data = piter.first; int r = piter.second; @@ -933,6 +1000,44 @@ int RGWHTTPManager::remove_request(RGWHTTPClient *client) return 0; } +int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state) +{ + rgw_http_req_data *req_data = client->get_req_data(); + + assert(req_data->lock.is_locked()); + + /* can only do that if threaded */ + if (!is_threaded) { + return -EINVAL; + } + + bool suggested_paused; + switch (state) { + case SET_WRITE_PAUSED: + suggested_paused = true; + break; + case SET_WRITE_RESUME: + suggested_paused = false; + break; + default: + /* shouldn't really be here */ + return -EIO; + } + if (suggested_paused == req_data->write_paused) { + return 0; + } + + req_data->write_paused = suggested_paused; + + reqs_change_state.push_back(set_state(req_data, state)); + int ret = signal_thread(); + if (ret < 0) { + return ret; + } + + return 0; +} + /* * the synchronous, non-threaded request processing method. */ diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index 53bb2905c79db..40c4bf15a084e 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -15,6 +15,7 @@ using param_pair_t = pair; using param_vec_t = vector; struct rgw_http_req_data; +class RGWHTTPManager; class RGWHTTPClient { @@ -40,6 +41,8 @@ protected: CephContext *cct; param_vec_t headers; + RGWHTTPManager *get_manager(); + int init_request(const char *method, const char *url, rgw_http_req_data *req_data, @@ -51,6 +54,9 @@ protected: virtual int receive_data(void *ptr, size_t len) { return 0; } + virtual int send_data(void *ptr, size_t len, bool *pause) { + return send_data(ptr, len); + } virtual int send_data(void *ptr, size_t len) { return 0; } @@ -82,6 +88,11 @@ protected: size_t size, size_t nmemb, void *_info); + + Mutex& get_req_lock(); + + /* needs to be called under req_lock() */ + void _set_write_paused(bool pause); public: static const long HTTP_STATUS_NOSTATUS = 0; static const long HTTP_STATUS_UNAUTHORIZED = 401; @@ -216,7 +227,19 @@ typedef RGWHTTPTransceiver RGWPostHTTPData; class RGWCompletionManager; +enum RGWHTTPRequestSetState { + SET_NOP = 0, + SET_WRITE_PAUSED = 1, + SET_WRITE_RESUME = 2, +}; + class RGWHTTPManager { + struct set_state { + rgw_http_req_data *req; + RGWHTTPRequestSetState state; + + set_state(rgw_http_req_data *_req, RGWHTTPRequestSetState _state) : req(_req), state(_state) {} + }; CephContext *cct; RGWCompletionManager *completion_mgr; void *multi_handle; @@ -227,6 +250,7 @@ class RGWHTTPManager { RWLock reqs_lock; map reqs; list unregistered_reqs; + list reqs_change_state; map complete_reqs; int64_t num_reqs; int64_t max_threaded_req; @@ -240,6 +264,7 @@ class RGWHTTPManager { void unlink_request(rgw_http_req_data *req_data); void finish_request(rgw_http_req_data *req_data, int r); void _finish_request(rgw_http_req_data *req_data, int r); + void _set_req_state(set_state& ss); int link_request(rgw_http_req_data *req_data); void manage_pending_requests(); @@ -268,6 +293,7 @@ public: int add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint = false); int remove_request(RGWHTTPClient *client); + int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state); /* only for non threaded case */ int process_requests(bool wait_for_data, bool *done); diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 92111725e56c6..20e614adf61a8 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -786,15 +786,30 @@ int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len) return len; } -int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len) +void RGWRESTStreamRWRequest::add_send_data(bufferlist& bl) { + Mutex::Locker req_locker(get_req_lock()); + Mutex::Locker wl(write_lock); + outbl.claim_append(bl); + _set_write_paused(false); +} + +int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len, bool *pause) +{ + Mutex::Locker wl(write_lock); + if (outbl.length() == 0) { + *pause = true; return 0; } - uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs)); + len = std::min(len, (size_t)outbl.length()); + + bufferlist bl; + outbl.splice(0, len, &bl); + uint64_t send_size = bl.length(); if (send_size > 0) { - memcpy(ptr, outbl.c_str() + write_ofs, send_size); + memcpy(ptr, bl.c_str(), send_size); write_ofs += send_size; } return send_size; diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 09393f8ea4af8..bf64a03f39ad3 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -84,24 +84,26 @@ public: class RGWRESTStreamRWRequest : public RGWRESTSimpleRequest { Mutex lock; + Mutex write_lock; RGWGetDataCB *cb; bufferlist outbl; bufferlist in_data; - size_t chunk_ofs; - size_t ofs; + size_t chunk_ofs{0}; + size_t ofs{0}; RGWHTTPManager http_manager; const char *method; - uint64_t write_ofs; + uint64_t write_ofs{0}; + bool send_paused{false}; protected: int handle_header(const string& name, const string& val) override; public: - int send_data(void *ptr, size_t len) override; + int send_data(void *ptr, size_t len, bool *pause) override; int receive_data(void *ptr, size_t len) override; RGWRESTStreamRWRequest(CephContext *_cct, const char *_method, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers, param_vec_t *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params), - lock("RGWRESTStreamReadRequest"), cb(_cb), - chunk_ofs(0), ofs(0), http_manager(_cct), method(_method), write_ofs(0) { + lock("RGWRESTStreamRWRequest"), write_lock("RGWRESTStreamRWRequest::write_lock"), cb(_cb), + http_manager(_cct), method(_method) { } virtual ~RGWRESTStreamRWRequest() override {} int send_request(RGWAccessKey& key, map& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL); @@ -113,6 +115,8 @@ public: } void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } + + void add_send_data(bufferlist& bl); }; class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest { -- 2.39.5