From 18bee31a7eef898febd2d37f01e43d511db4fe73 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 9 Jul 2015 14:33:26 -0700 Subject: [PATCH] rgw: threaded http manager groundwork Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_http_client.cc | 152 ++++++++++++++++++++++++++++++------- src/rgw/rgw_http_client.h | 41 ++++++++-- 2 files changed, 160 insertions(+), 33 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index df43f905c6e51..6043998eaf2b4 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -117,13 +117,13 @@ int RGWHTTPClient::process(const char *method, const char *url) return ret; } -struct multi_req_data { +struct rgw_http_req_data { CURL *easy_handle; curl_slist *h; uint64_t id; - multi_req_data() : easy_handle(NULL), h(NULL), id((uint64_t)-1) {} - ~multi_req_data() { + rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1) {} + ~rgw_http_req_data() { if (easy_handle) curl_easy_cleanup(easy_handle); @@ -132,10 +132,9 @@ struct multi_req_data { } }; -int RGWHTTPClient::init_request(const char *method, const char *url, void *handle) +int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *req_data) { CURL *easy_handle; - multi_req_data *req_data = static_cast(handle); char error_buf[CURL_ERROR_SIZE]; @@ -226,7 +225,16 @@ static int do_curl_wait(CephContext *cct, CURLM *handle) #endif -RGWHTTPManager::RGWHTTPManager(CephContext *_cct) : cct(_cct), reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0) { +void *RGWHTTPManager::ReqsThread::entry() +{ + manager->reqs_thread_entry(); + return NULL; +} + +RGWHTTPManager::RGWHTTPManager(CephContext *_cct) : cct(_cct), is_threaded(false), + reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), + reqs_thread(NULL) +{ multi_handle = (void *)curl_multi_init(); } @@ -235,55 +243,83 @@ RGWHTTPManager::~RGWHTTPManager() { curl_multi_cleanup((CURLM *)multi_handle); } -void RGWHTTPManager::register_request(void *handle) +void RGWHTTPManager::register_request(rgw_http_req_data *req_data) { - multi_req_data *req_data = static_cast(handle); - RWLock::WLocker rl(reqs_lock); req_data->id = num_reqs; reqs[num_reqs] = req_data; num_reqs++; } -void RGWHTTPManager::unregister_request(void *handle) +void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) { - multi_req_data *req_data = static_cast(handle); - RWLock::WLocker rl(reqs_lock); - map::iterator iter = reqs.find(req_data->id); + map::iterator iter = reqs.find(req_data->id); if (iter == reqs.end()) { return; } reqs.erase(iter); } -void RGWHTTPManager::finish_request(void *handle) +void RGWHTTPManager::finish_request(rgw_http_req_data *req_data) { - multi_req_data *req_data = static_cast(handle); - unregister_request(handle); + unregister_request(req_data); delete req_data; } -int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url) +int RGWHTTPManager::link_request(rgw_http_req_data *req_data) { - multi_req_data *req_data = new multi_req_data; - void *handle = (void *)req_data; - - int ret = client->init_request(method, url, handle); - if (ret < 0) { - return ret; - } - CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle); if (mstatus) { dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; delete req_data; return -EIO; } + return 0; +} + +void RGWHTTPManager::link_pending_requests() +{ + reqs_lock.get_read(); + if (max_threaded_req == num_reqs) { + reqs_lock.unlock(); + return; + } + reqs_lock.unlock(); + + RWLock::WLocker wl(reqs_lock); + + map::iterator iter = reqs.find(max_threaded_req); + + for (; iter != reqs.end(); ++iter) { + int r = link_request(iter->second); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to link http request" << dendl; +#warning FIXME: need to send back error on request + } + max_threaded_req = iter->first; + } +} + +int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url) +{ + rgw_http_req_data *req_data = new rgw_http_req_data; + + int ret = client->init_request(method, url, req_data); + if (ret < 0) { + return ret; + } register_request(req_data); + if (!is_threaded) { + ret = link_request(req_data); + if (ret < 0) { + return ret; + } + } + return 0; } @@ -314,7 +350,7 @@ int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { if (msg->msg == CURLMSG_DONE) { CURL *e = msg->easy_handle; - multi_req_data *req_data; + rgw_http_req_data *req_data; curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); finish_request(req_data); switch (msg->data.result) { @@ -344,4 +380,68 @@ int RGWHTTPManager::complete_requests() return ret; } +void RGWHTTPManager::set_threaded() +{ + is_threaded = true; + reqs_thread = new ReqsThread(this); + reqs_thread->create(); +} + +void RGWHTTPManager::stop() +{ + if (is_threaded) { + going_down.set(1); + reqs_thread->join(); + delete reqs_thread; + } +} + +void *RGWHTTPManager::reqs_thread_entry() +{ + int still_running; + int mstatus; + + ldout(cct, 0) << __func__ << ": start" << dendl; + + while (!going_down.read()) { + int ret = do_curl_wait(cct, (CURLM *)multi_handle); + if (ret < 0) { + dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; + return NULL; + } + + link_pending_requests(); + + mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); + switch (mstatus) { + case CURLM_OK: + case CURLM_CALL_MULTI_PERFORM: + break; + default: + dout(10) << "curl_multi_perform returned: " << mstatus << dendl; + break; + } + int msgs_left; + CURLMsg *msg; + while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { + if (msg->msg == CURLMSG_DONE) { + CURL *e = msg->easy_handle; + rgw_http_req_data *req_data; + curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); + finish_request(req_data); + switch (msg->data.result) { + case CURLE_OK: + break; + default: + dout(20) << "ERROR: msg->data.result=" << msg->data.result << dendl; + break; + } + } + } + } + + + return 0; +} + diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index 5ba32189d23eb..d9aa6cb9dbc8b 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -5,8 +5,11 @@ #define CEPH_RGW_HTTP_CLIENT_H #include "common/RWLock.h" +#include "include/atomic.h" #include "rgw_common.h" +struct rgw_http_req_data; + class RGWHTTPClient { friend class RGWHTTPManager; @@ -19,7 +22,7 @@ protected: CephContext *cct; list > headers; - int init_request(const char *method, const char *url, void *handle); + int init_request(const char *method, const char *url, rgw_http_req_data *req_data); public: virtual ~RGWHTTPClient() {} explicit RGWHTTPClient(CephContext *_cct): send_len (0), has_send_len(false), cct(_cct) {} @@ -44,21 +47,45 @@ public: class RGWHTTPManager { CephContext *cct; void *multi_handle; - - void register_request(void *handle); - void unregister_request(void *handle); - void finish_request(void *handle); + bool is_threaded; + atomic_t going_down; RWLock reqs_lock; - map reqs; - uint64_t num_reqs; + map reqs; + int64_t num_reqs; + int64_t max_threaded_req; + + void register_request(rgw_http_req_data *req_data); + void unregister_request(rgw_http_req_data *req_data); + void finish_request(rgw_http_req_data *req_data); + int link_request(rgw_http_req_data *req_data); + + void link_pending_requests(); + + class ReqsThread : public Thread { + RGWHTTPManager *manager; + + public: + ReqsThread(RGWHTTPManager *_m) : manager(_m) {} + void *entry(); + }; + + ReqsThread *reqs_thread; + + void *reqs_thread_entry(); + public: RGWHTTPManager(CephContext *_cct); ~RGWHTTPManager(); + void set_threaded(); + void stop(); + int add_request(RGWHTTPClient *client, const char *method, const char *url); + /* only for non threaded case */ int process_requests(bool wait_for_data, bool *done); + int complete_requests(); }; -- 2.39.5