RGWHTTPManager *rgw_http_manager;
+struct RGWCurlHandle;
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
+
struct rgw_http_req_data : public RefCountedObject {
- CURL *easy_handle{nullptr};
+ RGWCurlHandle *curl_handle{nullptr};
curl_slist *h{nullptr};
uint64_t id;
int ret{0};
/* shouldn't really be here */
return;
}
- rc = curl_easy_pause(easy_handle, bitmask);
+ rc = curl_easy_pause(**curl_handle, bitmask);
if (rc != CURLE_OK) {
dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
}
void finish(int r) {
Mutex::Locker l(lock);
ret = r;
- if (easy_handle)
- curl_easy_cleanup(easy_handle);
+ if (curl_handle)
+ do_curl_easy_cleanup(curl_handle);
if (h)
curl_slist_free_all(h);
- easy_handle = NULL;
+ curl_handle = NULL;
h = NULL;
done = true;
cond.Signal();
Mutex::Locker l(lock);
return mgr;
}
+
+ CURL *get_easy_handle() const;
};
struct RGWCurlHandle {
}
};
+void rgw_http_req_data::set_state(int bitmask) {
+ /* no need to lock here, moreover curl_easy_pause() might trigger
+ * the data receive callback :/
+ */
+ CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
+ if (rc != CURLE_OK) {
+ dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+ }
+}
+
#define MAXIDLE 5
class RGWCurlHandles : public Thread {
public:
saved_curl.shrink_to_fit();
}
+CURL *rgw_http_req_data::get_easy_handle() const
+{
+ return **curl_handle;
+}
+
static RGWCurlHandles *handles;
+
+static RGWCurlHandle *do_curl_easy_init()
+{
+ return handles->get_curl_handle();
+}
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
+{
+ handles->release_curl_handle(curl_handle);
+}
+
// XXX make this part of the token cache? (but that's swift-only;
// and this especially needs to integrates with s3...)
}
/*
- * process a single simple one off request, not going through RGWHTTPManager. Not using
- * req_data.
+ * process a single simple one off request
*/
int RGWHTTPClient::process()
{
- int ret = 0;
- CURL *curl_handle;
-
- char error_buf[CURL_ERROR_SIZE];
-
- auto ca = handles->get_curl_handle();
- curl_handle = **ca;
-
- dout(20) << "sending request to " << url << dendl;
-
- curl_slist *h = headers_to_slist(headers);
-
- curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
- curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
- curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
- curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
- curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
- curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
- if (h) {
- curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
- }
- curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
- curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
- if (is_upload_request(method)) {
- curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
- }
- if (has_send_len) {
- curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
- }
- if (!verify_ssl) {
- curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
- curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
- dout(20) << "ssl verification is set to off" << dendl;
- }
-
- CURLcode status = curl_easy_perform(curl_handle);
- if (status) {
- dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
- ret = -EINVAL;
- }
- curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
- handles->release_curl_handle(ca);
- curl_slist_free_all(h);
-
- return ret;
+ return RGWHTTP::process(this);
}
string RGWHTTPClient::to_str()
_req_data->get();
req_data = _req_data;
- CURL *easy_handle;
-
- easy_handle = curl_easy_init();
+ req_data->curl_handle = do_curl_easy_init();
- req_data->easy_handle = easy_handle;
+ CURL *easy_handle = req_data->get_easy_handle();
dout(20) << "sending request to " << url << dendl;
req_data->registered = true;
reqs[num_reqs] = req_data;
num_reqs++;
- ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+ ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
}
void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
req_data->get();
req_data->registered = false;
unregistered_reqs.push_back(req_data);
- ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+ ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
}
void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
*/
int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
{
- ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
- CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+ ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+ CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
if (mstatus) {
dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
return -EIO;
*/
void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
{
- if (req_data->easy_handle) {
- curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+ if (req_data->curl_handle) {
+ curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
}
if (!req_data->is_done()) {
_finish_request(req_data, -ECANCELED);
return 0;
}
-/*
- * the synchronous, non-threaded request processing method.
- */
-int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
-{
- assert(!is_threaded);
-
- int still_running;
- int mstatus;
-
- do {
- if (wait_for_data) {
- int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
- if (ret < 0) {
- return ret;
- }
- }
-
- mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
- switch (mstatus) {
- case CURLM_OK:
- case CURLM_CALL_MULTI_PERFORM:
- break;
- default:
- dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
- return -EINVAL;
- }
- int msgs_left;
- CURLMsg *msg;
- while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
- if (msg->msg == CURLMSG_DONE) {
- CURL *e = msg->easy_handle;
- rgw_http_req_data *req_data;
- curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
-
- long http_status;
- curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
-
- int status = rgw_http_error_to_errno(http_status);
- int result = msg->data.result;
- finish_request(req_data, status);
- switch (result) {
- case CURLE_OK:
- break;
- default:
- dout(20) << "ERROR: msg->data.result=" << result << dendl;
- return -EIO;
- }
- }
- }
- } while (mstatus == CURLM_CALL_MULTI_PERFORM);
-
- *done = (still_running == 0);
-
- return 0;
-}
-
-/*
- * the synchronous, non-threaded request processing completion method.
- */
-int RGWHTTPManager::complete_requests()
-{
- bool done = false;
- int ret;
- do {
- ret = process_requests(true, &done);
- } while (!done && !ret);
-
- return ret;
-}
-
int RGWHTTPManager::set_threaded()
{
int r = pipe(thread_pipe);
int add_request(RGWHTTPClient *client, bool send_data_hint = false);
int remove_request(RGWHTTPClient *client);
int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
-
- /* only for non threaded case */
- int process_requests(bool wait_for_data, bool *done);
-
- int complete_requests();
};
class RGWHTTP
{
string etag;
- RGWRESTStreamWriteRequest *out_stream_req;
+ RGWRESTStreamS3PutObj *out_stream_req;
int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
if (ret < 0) {
}
class RGWRESTStreamOutCB : public RGWGetDataCB {
- RGWRESTStreamWriteRequest *req;
+ RGWRESTStreamS3PutObj *req;
public:
- explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
+ explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
};
{
dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
if (!bl_ofs && bl_len == bl.length()) {
- return req->add_output_data(bl);
+ req->add_send_data(bl);
+ return 0;
}
bufferptr bp(bl.c_str() + bl_ofs, bl_len);
bufferlist new_bl;
new_bl.push_back(bp);
- return req->add_output_data(new_bl);
+ req->add_send_data(new_bl);
+ return 0;
}
-RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
+RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
{
- delete cb;
-}
-
-int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
-{
- lock.Lock();
- if (status < 0) {
- int ret = status;
- lock.Unlock();
- return ret;
- }
- pending_send.push_back(bl);
- lock.Unlock();
-
- bool done;
- return http_manager.process_requests(false, &done);
+ delete out_cb;
}
static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
}
}
-int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
+int RGWRESTStreamS3PutObj::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
{
string resource = obj.bucket.name + "/" + obj.get_oid();
string new_url = url;
headers.emplace_back(kv);
}
- cb = new RGWRESTStreamOutCB(this);
+ out_cb = new RGWRESTStreamOutCB(this);
set_send_length(obj_size);
method = new_info.method;
url = new_url;
- int r = http_manager.add_request(this);
+ int r = RGWHTTP::send(this);
if (r < 0)
return r;
return 0;
}
-int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
-{
- uint64_t sent = 0;
-
- dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
- lock.Lock();
- if (pending_send.empty() || status < 0) {
- lock.Unlock();
- return status;
- }
-
- list<bufferlist>::iterator iter = pending_send.begin();
- while (iter != pending_send.end() && len > 0) {
- bufferlist& bl = *iter;
-
- list<bufferlist>::iterator next_iter = iter;
- ++next_iter;
- lock.Unlock();
-
- uint64_t send_len = min(len, (size_t)bl.length());
-
- memcpy(ptr, bl.c_str(), send_len);
-
- ptr = (char *)ptr + send_len;
- len -= send_len;
- sent += send_len;
-
- lock.Lock();
-
- bufferlist new_bl;
- if (bl.length() > send_len) {
- bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
- new_bl.append(bp);
- }
- pending_send.pop_front(); /* need to do this after we copy data from bl */
- if (new_bl.length()) {
- pending_send.push_front(new_bl);
- }
- iter = next_iter;
- }
- lock.Unlock();
-
- return sent;
-}
-
-
void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
{
map<string, string>::iterator iter = out_headers.find(header_name);
return 0;
}
-int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
-{
- int ret = http_manager.complete_requests();
- if (ret < 0)
- return ret;
-
- set_str_from_headers(out_headers, "ETAG", etag);
-
- if (mtime) {
- string mtime_str;
- set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
-
- ret = parse_rgwx_mtime(cct, mtime_str, mtime);
- if (ret < 0) {
- return ret;
- }
- }
- return status;
-}
-
int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
{
string urlsafe_bucket, urlsafe_object;
url_encode(obj.key.name, urlsafe_object);
string resource = urlsafe_bucket + "/" + urlsafe_object;
- return send_request(&key, extra_headers, resource, nullptr, mgr);
+ return send_request(&key, extra_headers, resource, mgr);
}
int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
- bufferlist *send_data, RGWHTTPManager *mgr)
+ RGWHTTPManager *mgr, bufferlist *send_data)
{
string new_url = url;
if (new_url[new_url.size() - 1] != '/')
send_data_hint = true;
}
- RGWHTTPManager *pmanager = &http_manager;
- if (mgr) {
- pmanager = mgr;
- }
-
// Not sure if this is the place to set a send_size, curl otherwise sets
// chunked option and doesn't send content length anymore
uint64_t send_size = (size_t)(outbl.length() - write_ofs);
method = new_info.method;
url = new_url;
- int r = pmanager->add_request(this, send_data_hint);
- if (r < 0)
- return r;
-
if (!mgr) {
- r = pmanager->complete_requests();
- if (r < 0)
- return r;
+ return RGWHTTP::send(this);
}
+ int r = mgr->add_request(this, send_data_hint);
+ if (r < 0)
+ return r;
+
return 0;
}
int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
{
+ int ret = wait();
+ if (ret < 0) {
+ return ret;
+ }
set_str_from_headers(out_headers, "ETAG", etag);
if (status >= 0) {
if (mtime) {
};
-class RGWRESTStreamWriteRequest : public RGWRESTSimpleRequest {
- Mutex lock;
- list<bufferlist> pending_send;
- RGWGetDataCB *cb;
- RGWHTTPManager http_manager;
-public:
- int add_output_data(bufferlist& bl);
- int send_data(void *ptr, size_t len) override;
-
- RGWRESTStreamWriteRequest(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers,
- param_vec_t *_params) : RGWRESTSimpleRequest(_cct, _method, _url, _headers, _params),
- lock("RGWRESTStreamWriteRequest"), cb(NULL), http_manager(_cct) {}
- ~RGWRESTStreamWriteRequest() override;
- int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs);
- int complete(string& etag, real_time *mtime);
-
- RGWGetDataCB *get_out_cb() { return cb; }
-};
-
class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
Mutex lock;
Mutex write_lock;
};
class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
- RGWHTTPManager http_manager;
public:
RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
- param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params),
- http_manager(_cct) {
+ param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
}
virtual ~RGWRESTStreamRWRequest() override {}
- int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL);
- int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = NULL /* optional input data */, RGWHTTPManager *mgr = NULL);
+ int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
+ int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
};
param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {}
};
+class RGWRESTStreamS3PutObj : public RGWRESTStreamRWRequest {
+ RGWGetDataCB *out_cb;
+public:
+ RGWRESTStreamS3PutObj(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers,
+ param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, _method, _url, nullptr, _headers, _params),
+ out_cb(NULL) {}
+ ~RGWRESTStreamS3PutObj() override;
+ int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs);
+
+ RGWGetDataCB *get_out_cb() { return out_cb; }
+};
+
#endif
}
int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size,
- map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req)
+ map<string, bufferlist>& attrs, RGWRESTStreamS3PutObj **req)
{
string url;
int ret = get_url(url);
param_vec_t params;
populate_params(params, &uid, self_zone_group);
- RGWRESTStreamWriteRequest *wr = new RGWRESTStreamWriteRequest(cct, "PUT", url, NULL, ¶ms);
+ RGWRESTStreamS3PutObj *wr = new RGWRESTStreamS3PutObj(cct, "PUT", url, NULL, ¶ms);
ret = wr->put_obj_init(key, obj, obj_size, attrs);
if (ret < 0) {
delete wr;
return 0;
}
-int RGWRESTConn::complete_request(RGWRESTStreamWriteRequest *req, string& etag, real_time *mtime)
+int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag, real_time *mtime)
{
- int ret = req->complete(etag, mtime);
+ map<string, string> attrs;
+ int ret = req->complete_request(etag, mtime, nullptr, attrs);
delete req;
return ret;
set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
}
- int r = (*req)->send_request(key, extra_headers, obj);
+ int r = (*req)->send_request(key, extra_headers, obj, nullptr);
if (r < 0) {
delete *req;
*req = nullptr;
headers.insert(extra_headers->begin(), extra_headers->end());
}
- ret = req.send_request(&key, headers, resource, send_data, mgr);
+ ret = req.send_request(&key, headers, resource, mgr, send_data);
if (ret < 0) {
ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
int RGWRESTReadResource::read()
{
- int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
if (ret < 0) {
ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
int RGWRESTReadResource::aio_read()
{
- int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
if (ret < 0) {
ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
int RGWRESTSendResource::send(bufferlist& outbl)
{
req.set_outbl(outbl);
- int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
if (ret < 0) {
ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
int RGWRESTSendResource::aio_send(bufferlist& outbl)
{
req.set_outbl(outbl);
- int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+ int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
if (ret < 0) {
ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
return ret;
/* async request */
int put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size,
- map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req);
- int complete_request(RGWRESTStreamWriteRequest *req, string& etag, ceph::real_time *mtime);
+ map<string, bufferlist>& attrs, RGWRESTStreamS3PutObj **req);
+ int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime);
int get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj,
const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
constexpr size_t num_requests = max_requests + 1;
for (size_t i = 0; i < num_requests; i++) {
- RGWHTTPClient client{cct};
- http.add_request(&client, "PUT", "http://127.0.0.1:80");
+ RGWHTTPClient client{cct, "PUT", "http://127.0.0.1:80"};
+ http.add_request(&client);
}
}