From 2fbf80ff6ec6662ade4417b3eedd1761b9ddad3f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 9 Jul 2015 16:27:41 -0700 Subject: [PATCH] rgw: more http aio stuff Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_http_client.cc | 60 +++++++++++++++++++++++++++++++------- src/rgw/rgw_http_client.h | 5 +++- src/rgw/rgw_rest_client.cc | 17 +++++++---- src/rgw/rgw_rest_client.h | 2 +- src/rgw/rgw_rest_conn.cc | 24 +++++++++++++-- src/rgw/rgw_rest_conn.h | 4 ++- src/rgw/rgw_sync.cc | 16 ++++++++-- 7 files changed, 105 insertions(+), 23 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 6043998eaf2b4..923dd8ca16431 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -173,10 +173,16 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re #if HAVE_CURL_MULTI_WAIT -static int do_curl_wait(CephContext *cct, CURLM *handle) +static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) { int num_fds; - int ret = curl_multi_wait(handle, NULL, 0, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); + struct curl_waitfd wait_fd; + + wait_fd.fd = signal_fd; + wait_fd.events = CURL_WAIT_POLLIN; + wait_fd.revents = 0; + + int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); if (ret) { dout(0) << "ERROR: curl_multi_wait() returned " << ret << dendl; return -EIO; @@ -224,6 +230,7 @@ static int do_curl_wait(CephContext *cct, CURLM *handle) } #endif +#warning need to fix do_curl_wait() in second case void *RGWHTTPManager::ReqsThread::entry() { @@ -236,6 +243,8 @@ RGWHTTPManager::RGWHTTPManager(CephContext *_cct) : cct(_cct), is_threaded(false reqs_thread(NULL) { multi_handle = (void *)curl_multi_init(); + thread_pipe[0] = -1; + thread_pipe[1] = -1; } RGWHTTPManager::~RGWHTTPManager() { @@ -247,6 +256,7 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data) { RWLock::WLocker rl(reqs_lock); req_data->id = num_reqs; +ldout(cct, 0) << __FILE__ << ":" << __LINE__ << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; reqs[num_reqs] = req_data; num_reqs++; } @@ -270,10 +280,10 @@ void RGWHTTPManager::finish_request(rgw_http_req_data *req_data) int RGWHTTPManager::link_request(rgw_http_req_data *req_data) { +ldout(cct, 0) << __FILE__ << ":" << __LINE__ << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle); if (mstatus) { dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; - delete req_data; return -EIO; } return 0; @@ -293,12 +303,14 @@ void RGWHTTPManager::link_pending_requests() map::iterator iter = reqs.find(max_threaded_req); for (; iter != reqs.end(); ++iter) { - int r = link_request(iter->second); + rgw_http_req_data *req_data = iter->second; + int r = link_request(req_data); if (r < 0) { ldout(cct, 0) << "ERROR: failed to link http request" << dendl; #warning FIXME: need to send back error on request + delete req_data; } - max_threaded_req = iter->first; + max_threaded_req = iter->first + 1; } } @@ -315,9 +327,13 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const if (!is_threaded) { ret = link_request(req_data); - if (ret < 0) { - return ret; - } + } else { + ret = signal_thread(); + } + if (ret < 0) { + delete req_data; +#warning should drop reference here + return ret; } return 0; @@ -325,12 +341,14 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) { + assert(!is_threaded); + int still_running; int mstatus; do { if (wait_for_data) { - int ret = do_curl_wait(cct, (CURLM *)multi_handle); + int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1); if (ret < 0) { return ret; } @@ -380,11 +398,19 @@ int RGWHTTPManager::complete_requests() return ret; } -void RGWHTTPManager::set_threaded() +int RGWHTTPManager::set_threaded() { is_threaded = true; reqs_thread = new ReqsThread(this); reqs_thread->create(); + + int r = pipe(thread_pipe); + if (r < 0) { + r = -errno; + ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl; + return r; + } + return 0; } void RGWHTTPManager::stop() @@ -396,6 +422,18 @@ void RGWHTTPManager::stop() } } +int RGWHTTPManager::signal_thread() +{ + uint32_t buf = 0; + int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf)); + if (ret < 0) { + ret = -errno; + ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl; + return ret; + } + return 0; +} + void *RGWHTTPManager::reqs_thread_entry() { int still_running; @@ -404,7 +442,7 @@ void *RGWHTTPManager::reqs_thread_entry() ldout(cct, 0) << __func__ << ": start" << dendl; while (!going_down.read()) { - int ret = do_curl_wait(cct, (CURLM *)multi_handle); + int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]); if (ret < 0) { dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; return NULL; diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index d9aa6cb9dbc8b..9b36dc722d81e 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -54,6 +54,7 @@ class RGWHTTPManager { map reqs; int64_t num_reqs; int64_t max_threaded_req; + int thread_pipe[2]; void register_request(rgw_http_req_data *req_data); void unregister_request(rgw_http_req_data *req_data); @@ -74,11 +75,13 @@ class RGWHTTPManager { void *reqs_thread_entry(); + int signal_thread(); + public: RGWHTTPManager(CephContext *_cct); ~RGWHTTPManager(); - void set_threaded(); + int set_threaded(); void stop(); int add_request(RGWHTTPClient *client, const char *method, const char *url); diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 59aa3a444772d..0a2b814e79ec4 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -550,7 +550,7 @@ int RGWRESTStreamReadRequest::get_obj(RGWAccessKey& key, map& ex return get_resource(key, extra_headers, resource); } -int RGWRESTStreamReadRequest::get_resource(RGWAccessKey& key, map& extra_headers, const string& resource) +int RGWRESTStreamReadRequest::get_resource(RGWAccessKey& key, map& extra_headers, const string& resource, RGWHTTPManager *mgr) { string new_url = url; if (new_url[new_url.size() - 1] != '/') @@ -602,13 +602,20 @@ int RGWRESTStreamReadRequest::get_resource(RGWAccessKey& key, map(iter->first, iter->second)); } - int r = http_manager.add_request(this, new_info.method, new_url.c_str()); + RGWHTTPManager *pmanager = &http_manager; + if (mgr) { + pmanager = mgr; + } + + int r = pmanager->add_request(this, new_info.method, new_url.c_str()); if (r < 0) return r; - r = http_manager.complete_requests(); - if (r < 0) - return ret; + if (!mgr) { + r = pmanager->complete_requests(); + if (r < 0) + return ret; + } return 0; } diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 62b1a887485db..a88f0f1e72312 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -93,7 +93,7 @@ public: chunk_ofs(0), ofs(0), http_manager(_cct) {} ~RGWRESTStreamReadRequest() {} int get_obj(RGWAccessKey& key, map& extra_headers, rgw_obj& obj); - int get_resource(RGWAccessKey& key, map& extra_headers, const string& resource); + int get_resource(RGWAccessKey& key, map& extra_headers, const string& resource, RGWHTTPManager *mgr = NULL); int complete(string& etag, time_t *mtime, map& attrs); void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; } diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 283882209680d..ed435995691b1 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -135,7 +135,8 @@ public: int RGWRESTConn::get_resource(const string& resource, list > *extra_params, map *extra_headers, - bufferlist& bl) + bufferlist& bl, + RGWHTTPManager *mgr) { string url; int ret = get_url(url); @@ -165,7 +166,7 @@ int RGWRESTConn::get_resource(const string& resource, } } - ret = req.get_resource(key, headers, resource); + ret = req.get_resource(key, headers, resource, mgr); if (ret < 0) { ldout(cct, 0) << __func__ << ": get_resource() resource=" << resource << " returned ret=" << ret << dendl; return ret; @@ -176,4 +177,23 @@ int RGWRESTConn::get_resource(const string& resource, return req.complete(etag, NULL, attrs); } +int RGWRESTConn::send_get_resource(const string& resource, const rgw_http_param_pair *pp, + bufferlist &bl, RGWHTTPManager *mgr) +{ + list > params; + + while (pp && pp->key) { + string k = pp->key; + string v = (pp->val ? pp->val : ""); + params.push_back(make_pair(k, v)); + ++pp; + } + + int ret = get_resource(resource, ¶ms, NULL, bl, mgr); + if (ret < 0) { + return ret; + } + + return 0; +} diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 6c7a2bb9beeb0..997cca970e5c4 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -61,8 +61,10 @@ public: int get_resource(const string& resource, list > *extra_params, map* extra_headers, - bufferlist& bl); + bufferlist& bl, RGWHTTPManager *mgr = NULL); + int send_get_resource(const string& resource, const rgw_http_param_pair *pp, + bufferlist &bl, RGWHTTPManager *mgr); template int get_json_resource(const string& resource, list > *params, T& t); template diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 5635d8a7ae04d..1fcac5c1a6969 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -68,6 +68,13 @@ int RGWRemoteMetaLog::init() ut.shard_id = i; ts_to_shard[ut] = i; } + + ret = http_manager.set_threaded(); + if (ret < 0) { + ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; + return ret; + } + return 0; } @@ -192,13 +199,18 @@ int RGWRemoteMetaLog::clone_shard(int shard_id, const string& marker, string *ne { marker_key, marker.c_str() }, { NULL, NULL } }; - rgw_mdlog_shard_data data; - int ret = conn->get_json_resource("/admin/log", pairs, data); + bufferlist bl; + int ret = conn->send_get_resource("/admin/log", pairs, bl, &http_manager); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl; return ret; } +#warning removeme +sleep(7); + + rgw_mdlog_shard_data data; + ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl; *truncated = ((int)data.entries.size() == max_entries); -- 2.39.5