validate.set_url(url);
- int ret = validate.process();
+ int ret = validate.process(null_yield);
if (ret < 0) {
throw ret;
}
validate.set_send_length(os.str().length());
/* send request */
- ret = validate.process();
+ ret = validate.process(null_yield);
if (ret < 0) {
ldpp_dout(dpp, 2) << "s3 keystone: token validation ERROR: "
<< token_body_bl.c_str() << dendl;
secret.set_verify_ssl(cct->_conf->rgw_keystone_verify_ssl);
/* send request */
- ret = secret.process();
+ ret = secret.process(null_yield);
if (ret < 0) {
ldpp_dout(dpp, 2) << "s3 keystone: secret fetching error: "
<< token_body_bl.c_str() << dendl;
{
if (req) {
req->cancel();
- req->wait();
+ req->wait(null_yield);
delete req;
}
}
{
if (req) {
req->cancel();
- req->wait();
+ req->wait(null_yield);
delete req;
}
}
virtual int wait_result() {
- return http_op->wait(result);
+ return http_op->wait(result, null_yield);
}
int request_complete() override {
{}
int wait_result() override {
- return http_op->wait(result);
+ return http_op->wait(result, null_yield);
}
};
int request_complete() override {
int ret;
if (result || err_result) {
- ret = http_op->wait(result, err_result);
+ ret = http_op->wait(result, null_yield, err_result);
} else {
bufferlist bl;
- ret = http_op->wait(&bl);
+ ret = http_op->wait(&bl, null_yield);
}
auto op = std::move(http_op); // release ref on return
if (ret < 0) {
int request_complete() override {
int ret;
bufferlist bl;
- ret = http_op->wait(&bl);
+ ret = http_op->wait(&bl, null_yield);
auto op = std::move(http_op); // release ref on return
if (ret < 0) {
error_stream << "http operation failed: " << op->to_str()
secret_req.append_header("Accept", "application/octet-stream");
secret_req.append_header("X-Auth-Token", barbican_token);
- res = secret_req.process();
+ res = secret_req.process(null_yield);
if (res < 0) {
return res;
}
return io_block(0);
}
yield {
- int ret = http_op->wait(shard_info);
+ int ret = http_op->wait(shard_info, null_yield);
if (ret < 0) {
return set_cr_error(ret);
}
}
yield {
timer.reset();
- int ret = http_op->wait(&response);
+ int ret = http_op->wait(&response, null_yield);
if (ret < 0) {
if (sync_env->counters && ret != -ENOENT) {
sync_env->counters->inc(sync_counters::l_poll_err);
}
int request_complete() override {
- int ret = http_op->wait(result);
+ int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
#include "rgw_common.h"
#include "rgw_http_client.h"
#include "rgw_http_errors.h"
+#include "common/async/completion.h"
#include "common/RefCountedObj.h"
#include "rgw_coroutine.h"
Mutex lock;
Cond cond;
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ std::unique_ptr<Completion> completion;
+
rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
memset(error_buf, 0, sizeof(error_buf));
}
- int wait() {
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ completion = Completion::create(ctx.get_executor(), std::move(handler));
+ return init.result.get();
+ }
+
+ int wait(optional_yield y) {
Mutex::Locker l(lock);
if (done) {
return ret;
}
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& context = y.get_io_context();
+ auto& yield = y.get_yield_context();
+ boost::system::error_code ec;
+ async_wait(context, yield[ec]);
+ return -ec.value();
+ }
+#endif
cond.Wait(lock);
return ret;
}
curl_handle = NULL;
h = NULL;
done = true;
- cond.Signal();
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.Signal();
+ }
}
bool _is_done() {
/*
* process a single simple one off request
*/
-int RGWHTTPClient::process()
+int RGWHTTPClient::process(optional_yield y)
{
- return RGWHTTP::process(this);
+ return RGWHTTP::process(this, y);
}
string RGWHTTPClient::to_str()
/*
* wait for async request to complete
*/
-int RGWHTTPClient::wait()
+int RGWHTTPClient::wait(optional_yield y)
{
- return req_data->wait();
+ return req_data->wait(y);
}
void RGWHTTPClient::cancel()
return 0;
}
-int RGWHTTP::process(RGWHTTPClient *req) {
+int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
if (!req) {
return 0;
}
return r;
}
- return req->wait();
+ return req->wait(y);
}
#ifndef CEPH_RGW_HTTP_CLIENT_H
#define CEPH_RGW_HTTP_CLIENT_H
+#include "common/async/yield_context.h"
#include "common/RWLock.h"
#include "common/Cond.h"
#include "rgw_common.h"
verify_ssl = flag;
}
- int process();
+ int process(optional_yield y);
- int wait();
+ int wait(optional_yield y);
void cancel();
bool is_done();
{
public:
static int send(RGWHTTPClient *req);
- static int process(RGWHTTPClient *req);
+ static int process(RGWHTTPClient *req, optional_yield y);
};
#endif
token_req.set_url(token_url);
- const int ret = token_req.process();
+ const int ret = token_req.process(null_yield);
if (ret < 0) {
return ret;
}
token_req.set_url(token_url);
ldout(cct, 20) << "Requesting secret from barbican url=" << token_url << dendl;
- const int ret = token_req.process();
+ const int ret = token_req.process(null_yield);
if (ret < 0) {
ldout(cct, 20) << "Barbican process error:" << token_bl.c_str() << dendl;
return ret;
req.set_url(url);
req.set_send_length(0);
- int ret = req.process();
+ int ret = req.process(null_yield);
if (ret < 0) {
return ret;
}
req.set_send_length(ss.str().length());
/* send request */
- ret = req.process();
+ ret = req.process(null_yield);
if (ret < 0) {
ldpp_dout(op, 2) << "OPA process error:" << bl.c_str() << dendl;
return ret;
ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
- int r = process();
+ int r = process(null_yield);
if (r < 0)
return r;
method = new_info.method;
url = new_url;
- int r = process();
+ int r = process(null_yield);
if (r < 0){
if (r == -EINVAL){
// curl_easy has errored, generally means the service is not available
map<string, string> *pattrs,
map<string, string> *pheaders)
{
- int ret = wait();
+ int ret = wait(null_yield);
if (ret < 0) {
return ret;
}
return req.get_http_status();
}
- int wait(bufferlist *pbl) {
- int ret = req.wait();
+ int wait(bufferlist *pbl, optional_yield y) {
+ int ret = req.wait(y);
if (ret < 0) {
return ret;
}
}
template <class T>
- int wait(T *dest);
+ int wait(T *dest, optional_yield y);
template <class T>
int fetch(T *dest);
}
template <class T>
-int RGWRESTReadResource::wait(T *dest)
+int RGWRESTReadResource::wait(T *dest, optional_yield y)
{
- int ret = req.wait();
+ int ret = req.wait(y);
if (ret < 0) {
return ret;
}
return req.get_http_status();
}
- int wait(bufferlist *pbl) {
- int ret = req.wait();
+ int wait(bufferlist *pbl, optional_yield y) {
+ int ret = req.wait(y);
*pbl = bl;
if (ret < 0) {
return ret;
}
template <class T, class E = int>
- int wait(T *dest, E *err_result = nullptr);
+ int wait(T *dest, optional_yield y, E *err_result = nullptr);
};
template <class T, class E>
}
template <class T, class E>
-int RGWRESTSendResource::wait(T *dest, E *err_result)
+int RGWRESTSendResource::wait(T *dest, optional_yield y, E *err_result)
{
- int ret = req.wait();
+ int ret = req.wait(y);
if (ret < 0) {
if (err_result) {
parse_decode_json(cct, *err_result, bl);
introspect_req.set_post_data(post_data);
introspect_req.set_send_length(post_data.length());
- int res = introspect_req.process();
+ int res = introspect_req.process(null_yield);
if (res < 0) {
ldpp_dout(dpp, 10) << "HTTP request res: " << res << dendl;
throw -EINVAL;
ldpp_dout(dpp, 10) << "rgw_swift_validate_token url=" << url_buf << dendl;
- int ret = validator.process();
+ int ret = validator.process(null_yield);
if (ret < 0) {
throw ret;
}
return io_block(0);
}
yield {
- int ret = http_op->wait(shard_info);
+ int ret = http_op->wait(shard_info, null_yield);
http_op->put();
if (ret < 0) {
return set_cr_error(ret);
}
int request_complete() override {
- int ret = http_op->wait(result);
+ int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
return io_block(0);
}
yield {
- int ret = http_op->wait(pbl);
+ int ret = http_op->wait(pbl, null_yield);
http_op->put();
if (ret < 0) {
return set_cr_error(ret);
int RGWCloneMetaLogCoroutine::state_receive_rest_response()
{
- int ret = http_op->wait(&data);
+ int ret = http_op->wait(&data, null_yield);
if (ret < 0) {
error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;