From 66c1e52b4616a85c5e842abc228c70715c0e5971 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 16 Jul 2015 16:43:25 -0700 Subject: [PATCH] rgw: can now wait on async rest requests In addition to waiting on a specific request, added a completion manager that we can wait on so that we get which request is ready. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_http_client.cc | 55 ++++++++++++++++++++++++++++++-------- src/rgw/rgw_http_client.h | 34 ++++++++++++++++++++--- src/rgw/rgw_rest_conn.h | 7 +++++ 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 615548a4c63dc..39e53c0f07c15 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -294,7 +294,36 @@ void *RGWHTTPManager::ReqsThread::entry() return NULL; } -RGWHTTPManager::RGWHTTPManager(CephContext *_cct) : cct(_cct), is_threaded(false), +void RGWCompletionManager::complete(void *user_info) +{ + Mutex::Locker l(lock); + complete_reqs.push_back(user_info); + cond.Signal(); +} + +int RGWCompletionManager::get_next(void **user_info) +{ + Mutex::Locker l(lock); + while (complete_reqs.empty()) { + cond.Wait(lock); + if (going_down.read() != 0) { + return -ECANCELED; + } + } + *user_info = complete_reqs.front(); + complete_reqs.pop_front(); + return 0; +} + +void RGWCompletionManager::go_down() +{ + Mutex::Locker l(lock); + going_down.set(1); + cond.Signal(); +} + +RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), + completion_mgr(_cm), is_threaded(false), reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), reqs_thread(NULL) { @@ -329,8 +358,10 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) if (iter != reqs.end()) { reqs.erase(iter); } - complete_reqs[req_data->id] = req_data; -dout(0) << __FILE__ << ":" << __LINE__ << ": _complete_request() id=" << req_data->id << " complete_reqs.size()=" << complete_reqs.size() << dendl; + if (completion_mgr) { + completion_mgr->complete(req_data->client->get_user_info()); + } + req_data->put(); } void RGWHTTPManager::remove_request(uint64_t id) @@ -340,11 +371,6 @@ void RGWHTTPManager::remove_request(uint64_t id) if (iter != reqs.end()) { reqs.erase(iter); } - iter = complete_reqs.find(id); - if (iter == reqs.end()) { - complete_reqs.erase(iter); - } -dout(0) << __FILE__ << ":" << __LINE__ << ": remove_request() id=" << id << dendl; } void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret) @@ -400,7 +426,7 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const int ret = client->init_request(method, url, req_data); if (ret < 0) { - delete req_data; + req_data->put(); return ret; } @@ -458,7 +484,6 @@ int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) int status = rgw_http_error_to_errno(http_status); finish_request(req_data, status); - req_data->put(); switch (msg->data.result) { case CURLE_OK: break; @@ -560,7 +585,6 @@ void *RGWHTTPManager::reqs_thread_entry() int status = rgw_http_error_to_errno(http_status); finish_request(req_data, status); - req_data->put(); switch (msg->data.result) { case CURLE_OK: break; @@ -571,6 +595,15 @@ void *RGWHTTPManager::reqs_thread_entry() } } } + + map::iterator iter = reqs.begin(); + for (; iter != reqs.end(); ++iter) { + finish_request(iter->second, -ECANCELED); + } + + if (completion_mgr) { + completion_mgr->go_down(); + } return 0; diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index 95c5d21bea598..90ca69ef4901f 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -5,6 +5,7 @@ #define CEPH_RGW_HTTP_CLIENT_H #include "common/RWLock.h" +#include "common/Cond.h" #include "include/atomic.h" #include "rgw_common.h" @@ -21,15 +22,25 @@ class RGWHTTPClient rgw_http_req_data *req_data; + void *user_info; + protected: CephContext *cct; list > headers; int init_request(const char *method, const char *url, rgw_http_req_data *req_data); public: - explicit RGWHTTPClient(CephContext *_cct): send_len (0), has_send_len(false), req_data(NULL), cct(_cct) {} + explicit RGWHTTPClient(CephContext *_cct): send_len (0), has_send_len(false), req_data(NULL), user_info(NULL), cct(_cct) {} virtual ~RGWHTTPClient(); + void set_user_info(void *info) { + user_info = info; + } + + void *get_user_info() { + return user_info; + } + void append_header(const string& name, const string& val) { headers.push_back(pair(name, val)); } @@ -50,15 +61,32 @@ public: rgw_http_req_data *get_req_data() { return req_data; } }; +class RGWCompletionManager { + list complete_reqs; + + Mutex lock; + Cond cond; + + atomic_t going_down; + +public: + RGWCompletionManager() : lock("RGWCompletionManager::lock") {} + + void complete(void *user_info); + int get_next(void **user_info); + + void go_down(); +}; + class RGWHTTPManager { CephContext *cct; + RGWCompletionManager *completion_mgr; void *multi_handle; bool is_threaded; atomic_t going_down; RWLock reqs_lock; map reqs; - map complete_reqs; int64_t num_reqs; int64_t max_threaded_req; int thread_pipe[2]; @@ -87,7 +115,7 @@ class RGWHTTPManager { int signal_thread(); public: - RGWHTTPManager(CephContext *_cct); + RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL); ~RGWHTTPManager(); int set_threaded(); diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 6f7ce32398f73..2606c1a452340 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -143,6 +143,13 @@ public: list > *extra_headers, RGWHTTPManager *_mgr); + void set_user_info(void *user_info) { + req.set_user_info(user_info); + } + void *get_user_info() { + return req.get_user_info(); + } + template int decode_resource(T *dest); -- 2.39.5