From aa4fd878ad682b3e8740aa5b2a48f8c7c06a0382 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 28 Aug 2017 05:56:19 -0700 Subject: [PATCH] rgw: http client, simplify interfaces work towards removal of duplicate synchronous api Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_http_client.cc | 183 ++++++++---------------------- src/rgw/rgw_http_client.h | 5 - src/rgw/rgw_rados.cc | 2 +- src/rgw/rgw_rest_client.cc | 126 ++++---------------- src/rgw/rgw_rest_client.h | 39 +++---- src/rgw/rgw_rest_conn.cc | 21 ++-- src/rgw/rgw_rest_conn.h | 4 +- src/test/rgw/test_http_manager.cc | 4 +- 8 files changed, 100 insertions(+), 284 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 32ca8613329..7535633d2ec 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -23,8 +23,12 @@ RGWHTTPManager *rgw_http_manager; +struct RGWCurlHandle; + +static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle); + struct rgw_http_req_data : public RefCountedObject { - CURL *easy_handle{nullptr}; + RGWCurlHandle *curl_handle{nullptr}; curl_slist *h{nullptr}; uint64_t id; int ret{0}; @@ -64,7 +68,7 @@ struct rgw_http_req_data : public RefCountedObject { /* shouldn't really be here */ return; } - rc = curl_easy_pause(easy_handle, bitmask); + rc = curl_easy_pause(**curl_handle, bitmask); if (rc != CURLE_OK) { dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl; } @@ -74,13 +78,13 @@ struct rgw_http_req_data : public RefCountedObject { void finish(int r) { Mutex::Locker l(lock); ret = r; - if (easy_handle) - curl_easy_cleanup(easy_handle); + if (curl_handle) + do_curl_easy_cleanup(curl_handle); if (h) curl_slist_free_all(h); - easy_handle = NULL; + curl_handle = NULL; h = NULL; done = true; cond.Signal(); @@ -99,6 +103,8 @@ struct rgw_http_req_data : public RefCountedObject { Mutex::Locker l(lock); return mgr; } + + CURL *get_easy_handle() const; }; struct RGWCurlHandle { @@ -112,6 +118,16 @@ struct RGWCurlHandle { } }; +void rgw_http_req_data::set_state(int bitmask) { + /* no need to lock here, moreover curl_easy_pause() might trigger + * the data receive callback :/ + */ + CURLcode rc = curl_easy_pause(**curl_handle, bitmask); + if (rc != CURLE_OK) { + dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl; + } +} + #define MAXIDLE 5 class RGWCurlHandles : public Thread { public: @@ -214,7 +230,23 @@ void RGWCurlHandles::flush_curl_handles() saved_curl.shrink_to_fit(); } +CURL *rgw_http_req_data::get_easy_handle() const +{ + return **curl_handle; +} + static RGWCurlHandles *handles; + +static RGWCurlHandle *do_curl_easy_init() +{ + return handles->get_curl_handle(); +} + +static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle) +{ + handles->release_curl_handle(curl_handle); +} + // XXX make this part of the token cache? (but that's swift-only; // and this especially needs to integrates with s3...) @@ -422,59 +454,11 @@ static bool is_upload_request(const string& method) } /* - * process a single simple one off request, not going through RGWHTTPManager. Not using - * req_data. + * process a single simple one off request */ int RGWHTTPClient::process() { - int ret = 0; - CURL *curl_handle; - - char error_buf[CURL_ERROR_SIZE]; - - auto ca = handles->get_curl_handle(); - curl_handle = **ca; - - dout(20) << "sending request to " << url << dendl; - - curl_slist *h = headers_to_slist(headers); - - curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method.c_str()); - curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L); - curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header); - curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this); - curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data); - curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this); - curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); - if (h) { - curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h); - } - curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data); - curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this); - if (is_upload_request(method)) { - curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); - } - if (has_send_len) { - curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); - } - if (!verify_ssl) { - curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L); - curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L); - dout(20) << "ssl verification is set to off" << dendl; - } - - CURLcode status = curl_easy_perform(curl_handle); - if (status) { - dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl; - ret = -EINVAL; - } - curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status); - handles->release_curl_handle(ca); - curl_slist_free_all(h); - - return ret; + return RGWHTTP::process(this); } string RGWHTTPClient::to_str() @@ -502,11 +486,9 @@ int RGWHTTPClient::init_request(rgw_http_req_data *_req_data, bool send_data_hin _req_data->get(); req_data = _req_data; - CURL *easy_handle; - - easy_handle = curl_easy_init(); + req_data->curl_handle = do_curl_easy_init(); - req_data->easy_handle = easy_handle; + CURL *easy_handle = req_data->get_easy_handle(); dout(20) << "sending request to " << url << dendl; @@ -806,7 +788,7 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data) req_data->registered = true; reqs[num_reqs] = req_data; num_reqs++; - ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; + ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; } void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) @@ -815,7 +797,7 @@ void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) req_data->get(); req_data->registered = false; unregistered_reqs.push_back(req_data); - ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; + ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; } void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) @@ -862,8 +844,8 @@ void RGWHTTPManager::_set_req_state(set_state& ss) */ int RGWHTTPManager::link_request(rgw_http_req_data *req_data) { - ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; - CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle); + ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; + CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle()); if (mstatus) { dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; return -EIO; @@ -877,8 +859,8 @@ int RGWHTTPManager::link_request(rgw_http_req_data *req_data) */ void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) { - if (req_data->easy_handle) { - curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle); + if (req_data->curl_handle) { + curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle()); } if (!req_data->is_done()) { _finish_request(req_data, -ECANCELED); @@ -1031,77 +1013,6 @@ int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetSt return 0; } -/* - * the synchronous, non-threaded request processing method. - */ -int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) -{ - assert(!is_threaded); - - int still_running; - int mstatus; - - do { - if (wait_for_data) { - int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1); - if (ret < 0) { - return ret; - } - } - - mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); - switch (mstatus) { - case CURLM_OK: - case CURLM_CALL_MULTI_PERFORM: - break; - default: - dout(20) << "curl_multi_perform returned: " << mstatus << dendl; - return -EINVAL; - } - int msgs_left; - CURLMsg *msg; - while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { - if (msg->msg == CURLMSG_DONE) { - CURL *e = msg->easy_handle; - rgw_http_req_data *req_data; - curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); - - long http_status; - curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); - - int status = rgw_http_error_to_errno(http_status); - int result = msg->data.result; - finish_request(req_data, status); - switch (result) { - case CURLE_OK: - break; - default: - dout(20) << "ERROR: msg->data.result=" << result << dendl; - return -EIO; - } - } - } - } while (mstatus == CURLM_CALL_MULTI_PERFORM); - - *done = (still_running == 0); - - return 0; -} - -/* - * the synchronous, non-threaded request processing completion method. - */ -int RGWHTTPManager::complete_requests() -{ - bool done = false; - int ret; - do { - ret = process_requests(true, &done); - } while (!done && !ret); - - return ret; -} - int RGWHTTPManager::set_threaded() { int r = pipe(thread_pipe); diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index fbe5d3626c0..ac230bfb2a6 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -313,11 +313,6 @@ public: int add_request(RGWHTTPClient *client, bool send_data_hint = false); int remove_request(RGWHTTPClient *client); int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state); - - /* only for non threaded case */ - int process_requests(bool wait_for_data, bool *done); - - int complete_requests(); }; class RGWHTTP diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 77697cf58d3..fa334ef906a 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8016,7 +8016,7 @@ int RGWRados::copy_obj_to_remote_dest(RGWObjState *astate, { string etag; - RGWRESTStreamWriteRequest *out_stream_req; + RGWRESTStreamS3PutObj *out_stream_req; int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req); if (ret < 0) { diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 6c2d6c5dda5..af3ce2667bc 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -316,9 +316,9 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz } class RGWRESTStreamOutCB : public RGWGetDataCB { - RGWRESTStreamWriteRequest *req; + RGWRESTStreamS3PutObj *req; public: - explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {} + explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */ }; @@ -326,34 +326,21 @@ int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl; if (!bl_ofs && bl_len == bl.length()) { - return req->add_output_data(bl); + req->add_send_data(bl); + return 0; } bufferptr bp(bl.c_str() + bl_ofs, bl_len); bufferlist new_bl; new_bl.push_back(bp); - return req->add_output_data(new_bl); + req->add_send_data(new_bl); + return 0; } -RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest() +RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj() { - delete cb; -} - -int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl) -{ - lock.Lock(); - if (status < 0) { - int ret = status; - lock.Unlock(); - return ret; - } - pending_send.push_back(bl); - lock.Unlock(); - - bool done; - return http_manager.process_requests(false, &done); + delete out_cb; } static void grants_by_type_add_one_grant(map& grants_by_type, int perm, ACLGrant& grant) @@ -426,7 +413,7 @@ static void add_grants_headers(map& grants, RGWEnv& env, map& attrs) +int RGWRESTStreamS3PutObj::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map& attrs) { string resource = obj.bucket.name + "/" + obj.get_oid(); string new_url = url; @@ -493,66 +480,20 @@ int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uin headers.emplace_back(kv); } - cb = new RGWRESTStreamOutCB(this); + out_cb = new RGWRESTStreamOutCB(this); set_send_length(obj_size); method = new_info.method; url = new_url; - int r = http_manager.add_request(this); + int r = RGWHTTP::send(this); if (r < 0) return r; return 0; } -int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len) -{ - uint64_t sent = 0; - - dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl; - lock.Lock(); - if (pending_send.empty() || status < 0) { - lock.Unlock(); - return status; - } - - list::iterator iter = pending_send.begin(); - while (iter != pending_send.end() && len > 0) { - bufferlist& bl = *iter; - - list::iterator next_iter = iter; - ++next_iter; - lock.Unlock(); - - uint64_t send_len = min(len, (size_t)bl.length()); - - memcpy(ptr, bl.c_str(), send_len); - - ptr = (char *)ptr + send_len; - len -= send_len; - sent += send_len; - - lock.Lock(); - - bufferlist new_bl; - if (bl.length() > send_len) { - bufferptr bp(bl.c_str() + send_len, bl.length() - send_len); - new_bl.append(bp); - } - pending_send.pop_front(); /* need to do this after we copy data from bl */ - if (new_bl.length()) { - pending_send.push_front(new_bl); - } - iter = next_iter; - } - lock.Unlock(); - - return sent; -} - - void set_str_from_headers(map& out_headers, const string& header_name, string& str) { map::iterator iter = out_headers.find(header_name); @@ -594,26 +535,6 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time * return 0; } -int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime) -{ - int ret = http_manager.complete_requests(); - if (ret < 0) - return ret; - - set_str_from_headers(out_headers, "ETAG", etag); - - if (mtime) { - string mtime_str; - set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str); - - ret = parse_rgwx_mtime(cct, mtime_str, mtime); - if (ret < 0) { - return ret; - } - } - return status; -} - int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr) { string urlsafe_bucket, urlsafe_object; @@ -621,11 +542,11 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map& url_encode(obj.key.name, urlsafe_object); string resource = urlsafe_bucket + "/" + urlsafe_object; - return send_request(&key, extra_headers, resource, nullptr, mgr); + return send_request(&key, extra_headers, resource, mgr); } int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map& extra_headers, const string& resource, - bufferlist *send_data, RGWHTTPManager *mgr) + RGWHTTPManager *mgr, bufferlist *send_data) { string new_url = url; if (new_url[new_url.size() - 1] != '/') @@ -688,11 +609,6 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map& send_data_hint = true; } - RGWHTTPManager *pmanager = &http_manager; - if (mgr) { - pmanager = mgr; - } - // Not sure if this is the place to set a send_size, curl otherwise sets // chunked option and doesn't send content length anymore uint64_t send_size = (size_t)(outbl.length() - write_ofs); @@ -705,21 +621,23 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map& method = new_info.method; url = new_url; - int r = pmanager->add_request(this, send_data_hint); - if (r < 0) - return r; - if (!mgr) { - r = pmanager->complete_requests(); - if (r < 0) - return r; + return RGWHTTP::send(this); } + int r = mgr->add_request(this, send_data_hint); + if (r < 0) + return r; + return 0; } int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map& attrs) { + int ret = wait(); + if (ret < 0) { + return ret; + } set_str_from_headers(out_headers, "ETAG", etag); if (status >= 0) { if (mtime) { diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index 9c474534e9c..51a7ec4310b 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -67,25 +67,6 @@ public: }; -class RGWRESTStreamWriteRequest : public RGWRESTSimpleRequest { - Mutex lock; - list pending_send; - RGWGetDataCB *cb; - RGWHTTPManager http_manager; -public: - int add_output_data(bufferlist& bl); - int send_data(void *ptr, size_t len) override; - - RGWRESTStreamWriteRequest(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers, - param_vec_t *_params) : RGWRESTSimpleRequest(_cct, _method, _url, _headers, _params), - lock("RGWRESTStreamWriteRequest"), cb(NULL), http_manager(_cct) {} - ~RGWRESTStreamWriteRequest() override; - int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map& attrs); - int complete(string& etag, real_time *mtime); - - RGWGetDataCB *get_out_cb() { return cb; } -}; - class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest { Mutex lock; Mutex write_lock; @@ -125,15 +106,13 @@ public: }; class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest { - RGWHTTPManager http_manager; public: RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb, - param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params), - http_manager(_cct) { + param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) { } virtual ~RGWRESTStreamRWRequest() override {} - int send_request(RGWAccessKey& key, map& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL); - int send_request(RGWAccessKey *key, map& extra_headers, const string& resource, bufferlist *send_data = NULL /* optional input data */, RGWHTTPManager *mgr = NULL); + int send_request(RGWAccessKey& key, map& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr); + int send_request(RGWAccessKey *key, map& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */); int complete_request(string& etag, real_time *mtime, uint64_t *psize, map& attrs); }; @@ -149,5 +128,17 @@ public: param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {} }; +class RGWRESTStreamS3PutObj : public RGWRESTStreamRWRequest { + RGWGetDataCB *out_cb; +public: + RGWRESTStreamS3PutObj(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers, + param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, _method, _url, nullptr, _headers, _params), + out_cb(NULL) {} + ~RGWRESTStreamS3PutObj() override; + int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map& attrs); + + RGWGetDataCB *get_out_cb() { return out_cb; } +}; + #endif diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 0970a252e13..8b168f9a66d 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -114,7 +114,7 @@ int RGWRESTConn::forward(const rgw_user& uid, req_info& info, obj_version *objv, } int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size, - map& attrs, RGWRESTStreamWriteRequest **req) + map& attrs, RGWRESTStreamS3PutObj **req) { string url; int ret = get_url(url); @@ -123,7 +123,7 @@ int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_si param_vec_t params; populate_params(params, &uid, self_zone_group); - RGWRESTStreamWriteRequest *wr = new RGWRESTStreamWriteRequest(cct, "PUT", url, NULL, ¶ms); + RGWRESTStreamS3PutObj *wr = new RGWRESTStreamS3PutObj(cct, "PUT", url, NULL, ¶ms); ret = wr->put_obj_init(key, obj, obj_size, attrs); if (ret < 0) { delete wr; @@ -133,9 +133,10 @@ int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_si return 0; } -int RGWRESTConn::complete_request(RGWRESTStreamWriteRequest *req, string& etag, real_time *mtime) +int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag, real_time *mtime) { - int ret = req->complete(etag, mtime); + map attrs; + int ret = req->complete_request(etag, mtime, nullptr, attrs); delete req; return ret; @@ -221,7 +222,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER"); } - int r = (*req)->send_request(key, extra_headers, obj); + int r = (*req)->send_request(key, extra_headers, obj, nullptr); if (r < 0) { delete *req; *req = nullptr; @@ -268,7 +269,7 @@ int RGWRESTConn::get_resource(const string& resource, headers.insert(extra_headers->begin(), extra_headers->end()); } - ret = req.send_request(&key, headers, resource, send_data, mgr); + ret = req.send_request(&key, headers, resource, mgr, send_data); if (ret < 0) { ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; @@ -315,7 +316,7 @@ void RGWRESTReadResource::init_common(param_vec_t *extra_headers) int RGWRESTReadResource::read() { - int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, mgr); if (ret < 0) { ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; @@ -328,7 +329,7 @@ int RGWRESTReadResource::read() int RGWRESTReadResource::aio_read() { - int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, mgr); if (ret < 0) { ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; @@ -376,7 +377,7 @@ void RGWRESTSendResource::init_common(param_vec_t *extra_headers) int RGWRESTSendResource::send(bufferlist& outbl) { req.set_outbl(outbl); - int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, mgr); if (ret < 0) { ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; @@ -390,7 +391,7 @@ int RGWRESTSendResource::send(bufferlist& outbl) int RGWRESTSendResource::aio_send(bufferlist& outbl) { req.set_outbl(outbl); - int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr); + int ret = req.send_request(&conn->get_key(), headers, resource, mgr); if (ret < 0) { ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index e0d448777b5..7d7556f1689 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -89,8 +89,8 @@ public: /* async request */ int put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size, - map& attrs, RGWRESTStreamWriteRequest **req); - int complete_request(RGWRESTStreamWriteRequest *req, string& etag, ceph::real_time *mtime); + map& attrs, RGWRESTStreamS3PutObj **req); + int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime); int get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj, const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr, diff --git a/src/test/rgw/test_http_manager.cc b/src/test/rgw/test_http_manager.cc index bfbcad72999..73626a7d9cf 100644 --- a/src/test/rgw/test_http_manager.cc +++ b/src/test/rgw/test_http_manager.cc @@ -38,8 +38,8 @@ TEST(HTTPManager, SignalThread) constexpr size_t num_requests = max_requests + 1; for (size_t i = 0; i < num_requests; i++) { - RGWHTTPClient client{cct}; - http.add_request(&client, "PUT", "http://127.0.0.1:80"); + RGWHTTPClient client{cct, "PUT", "http://127.0.0.1:80"}; + http.add_request(&client); } } -- 2.39.5