return io_block(0);
}
yield {
- op_ret = http_op->wait(shard_info, null_yield);
+ op_ret = http_op->wait(dpp, shard_info, null_yield);
http_op->put();
}
}
yield {
timer.reset();
- op_ret = http_op->wait(&response, null_yield);
+ op_ret = http_op->wait(dpp, &response, null_yield);
http_op->put();
}
}
int request_complete() override {
- int ret = http_op->wait(result, null_yield);
+ int ret = http_op->wait(sync_env->dpp, result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(sync_env->dpp, 5) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
}
/* fetch headers */
- ret = tier_ctx.conn.complete_request(in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
+ ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, nullptr, nullptr, nullptr, nullptr, &headers, null_yield);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(tier_ctx.dpp, 20) << "ERROR: " << __func__ << "(): conn.complete_request() returned ret=" << ret << dendl;
return ret;
}
int RGWLCCloudStreamPut::complete_request() {
- int ret = conn.complete_request(out_req, etag, &obj_properties.mtime, null_yield);
- return ret;
+ return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, null_yield);
}
/* Read local copy and write to Cloud endpoint */
<< " retry_number: "
<< entry_persistency_tracker.retires_num
<< " current time: " << time_now << dendl;
- const auto ret = push_endpoint->send(event_entry.event, yield);
+ const auto ret = push_endpoint->send(this, event_entry.event, yield);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
<< " failed. error: " << ret
dpp->get_cct());
ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
<< topic.cfg.dest.push_endpoint << dendl;
- const auto ret = push_endpoint->send(event_entry.event, res.yield);
+ const auto ret = push_endpoint->send(dpp, event_entry.event, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "ERROR: failed to push sync notification event with error: "
}
}
- int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
+ int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
+ optional_yield y) override {
std::shared_lock lock(s_http_manager_mutex);
if (!s_http_manager) {
ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
auto rc = s_http_manager->add_request(&request);
if (rc == 0) {
- rc = request.wait(y);
+ rc = request.wait(dpp, y);
}
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
// TODO: use read_bl to process return code and handle according to ack level
mutable std::condition_variable cond;
public:
- int wait(optional_yield y) {
+ int wait(const DoutPrefixProvider* dpp, optional_yield y) {
std::unique_lock l{lock};
if (done) {
return ret;
}
}
- int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
+ int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
// failed to publish, does not wait for reply
return rc;
}
- return w->wait(y);
+ return w->wait(dpp, y);
}
}
}
}
- int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
+ int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
+ optional_yield y) override {
if (ack_level == ack_level_t::None) {
return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
// failed to publish, does not wait for reply
return rc;
}
- return w->wait(y);
+ return w->wait(dpp, y);
}
}
#include "include/common_fwd.h"
#include "common/async/yield_context.h"
+class DoutPrefixProvider;
class RGWHTTPArgs;
struct rgw_pubsub_s3_event;
// this method is used in order to send notification and wait for completion
// in async manner via a coroutine when invoked in the frontend environment
- virtual int send(const rgw_pubsub_s3_event& event, optional_yield y) = 0;
+ virtual int send(const DoutPrefixProvider* dpp,
+ const rgw_pubsub_s3_event& event,
+ optional_yield y) = 0;
// present as string
virtual std::string to_str() const = 0;
return ret;
}
- ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
+ ret = conn->complete_request(dpp, in_stream_req, nullptr, &set_mtime, psize,
nullptr, pheaders, y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
goto set_err_state;
}
- ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
+ ret = conn->complete_request(rctx.dpp, in_stream_req, &etag, &set_mtime,
&accounted_size, nullptr, nullptr, rctx.y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
return ret;
}
- ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
+ ret = rest_master_conn->complete_request(dpp, out_stream_req, etag, mtime, y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << __func__ << "(): failed to put_obj_async_init. retries=" << tries << dendl;
return io_block(0);
}
yield {
- op_ret = http_op->wait(shard_info, null_yield);
+ op_ret = http_op->wait(dpp, shard_info, null_yield);
http_op->put();
}
}
int request_complete() override {
- int ret = http_op->wait(result, null_yield);
+ int ret = http_op->wait(sync_env->dpp, result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(sync_env->dpp, 5) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
return io_block(0);
}
yield {
- op_ret = http_op->wait(pbl, null_yield);
+ op_ret = http_op->wait(dpp, pbl, null_yield);
http_op->put();
}
int RGWCloneMetaLogCoroutine::state_receive_rest_response()
{
- op_ret = http_op->wait(&data, null_yield);
+ op_ret = http_op->wait(sync_env->dpp, &data, null_yield);
if (op_ret < 0 && op_ret != -EIO) {
error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << op_ret << dendl;
validate.set_url(url);
- ret = validate.process(y);
+ ret = validate.process(dpp, y);
/* NULL terminate for debug output. */
token_body_bl.append(static_cast<char>(0));
validate.set_send_length(os.str().length());
/* send request */
- ret = validate.process(y);
+ ret = validate.process(dpp, y);
/* if the supplied signature is wrong, we will get 401 from Keystone */
if (validate.get_http_status() ==
secret.set_verify_ssl(cct->_conf->rgw_keystone_verify_ssl);
/* send request */
- ret = secret.process(y);
+ ret = secret.process(dpp, y);
/* if the supplied access key isn't found, we will get 404 from Keystone */
if (secret.get_http_status() ==
{
if (req) {
req->cancel();
- req->wait(null_yield);
+ auto dpp = NoDoutPrefix{cct, ceph_subsys_rgw};
+ req->wait(&dpp, null_yield);
delete req;
}
}
{
if (req) {
req->cancel();
- req->wait(null_yield);
+ auto dpp = NoDoutPrefix{cct, ceph_subsys_rgw};
+ req->wait(&dpp, null_yield);
delete req;
}
}
- virtual int wait_result() {
- return http_op->wait(result, null_yield);
+ virtual int wait_result(const DoutPrefixProvider* dpp) {
+ return http_op->wait(dpp, result, null_yield);
}
int request_complete() override {
- int ret;
-
- ret = wait_result();
+ auto dpp = NoDoutPrefix{cct, ceph_subsys_rgw};
+ int ret = wait_result(&dpp);
auto op = std::move(http_op); // release ref on return
if (ret < 0) {
: RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params, hdrs), result(_result)
{}
- int wait_result() override {
- return http_op->wait(result, null_yield);
+ int wait_result(const DoutPrefixProvider* dpp) override {
+ return http_op->wait(dpp, result, null_yield);
}
};
}
int request_complete() override {
+ auto dpp = NoDoutPrefix{cct, ceph_subsys_rgw};
int ret;
if (result || err_result) {
- ret = http_op->wait(result, null_yield, err_result);
+ ret = http_op->wait(&dpp, result, null_yield, err_result);
} else {
bufferlist bl;
- ret = http_op->wait(&bl, null_yield);
+ ret = http_op->wait(&dpp, &bl, null_yield);
}
auto op = std::move(http_op); // release ref on return
if (ret < 0) {
}
int request_complete() override {
- int ret;
+ auto dpp = NoDoutPrefix{cct, ceph_subsys_rgw};
bufferlist bl;
- ret = http_op->wait(&bl, null_yield);
+ int ret = http_op->wait(&dpp, &bl, null_yield);
auto op = std::move(http_op); // release ref on return
if (ret < 0) {
error_stream << "http operation failed: " << op->to_str()
class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
protected:
+ CephContext *cct;
RGWCoroutinesEnv *env;
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager) : env(_env),
- caller(_caller),
- http_manager(_http_manager),
- write_drain_notify_cb(this) {}
+ RGWHTTPManager *_http_manager)
+ : cct(_cct),
+ env(_env),
+ caller(_caller),
+ http_manager(_http_manager),
+ write_drain_notify_cb(this)
+ {}
virtual ~RGWStreamWriteHTTPResourceCRF();
int init() override {
}, token, ex);
}
- int wait(optional_yield y) {
+ int wait(const DoutPrefixProvider* dpp, optional_yield y) {
std::unique_lock l{lock};
if (done) {
return ret;
/*
* process a single simple one off request
*/
-int RGWHTTPClient::process(optional_yield y)
+int RGWHTTPClient::process(const DoutPrefixProvider* dpp, optional_yield y)
{
- return RGWHTTP::process(this, y);
+ return RGWHTTP::process(dpp, this, y);
}
string RGWHTTPClient::to_str()
/*
* wait for async request to complete
*/
-int RGWHTTPClient::wait(optional_yield y)
+int RGWHTTPClient::wait(const DoutPrefixProvider* dpp, optional_yield y)
{
- return req_data->wait(y);
+ return req_data->wait(dpp, y);
}
void RGWHTTPClient::cancel()
return 0;
}
-int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
+int RGWHTTP::process(const DoutPrefixProvider* dpp, RGWHTTPClient *req, optional_yield y) {
if (!req) {
return 0;
}
return r;
}
- return req->wait(y);
+ return req->wait(dpp, y);
}
req_timeout = timeout;
}
- int process(optional_yield y);
+ int process(const DoutPrefixProvider* dpp, optional_yield y);
- int wait(optional_yield y);
+ int wait(const DoutPrefixProvider* dpp, optional_yield y);
void cancel();
bool is_done();
{
public:
static int send(RGWHTTPClient *req);
- static int process(RGWHTTPClient *req, optional_yield y);
+ static int process(const DoutPrefixProvider* dpp, RGWHTTPClient *req,
+ optional_yield y);
};
token_req.set_url(token_url);
- const int ret = token_req.process(y);
+ const int ret = token_req.process(dpp, y);
/* Detect rejection earlier than during the token parsing step. */
if (token_req.get_http_status() ==
token_req.set_url(token_url);
ldpp_dout(dpp, 20) << "Requesting secret from barbican url=" << token_url << dendl;
- const int ret = token_req.process(y);
+ const int ret = token_req.process(dpp, y);
if (ret < 0) {
ldpp_dout(dpp, 20) << "Barbican process error:" << token_bl.c_str() << dendl;
return ret;
secret_req.set_client_key(kctx.ssl_clientkey());
}
- res = secret_req.process(y);
+ res = secret_req.process(dpp, y);
// map 401 to EACCES instead of EPERM
if (secret_req.get_http_status() ==
protected:
KmipGetTheKey(CephContext *cct) : cct(cct) {}
KmipGetTheKey& keyid_to_keyname(std::string_view key_id);
- KmipGetTheKey& get_uniqueid_for_keyname(optional_yield y);
- int get_key_for_uniqueid(optional_yield y, std::string &);
+ KmipGetTheKey& get_uniqueid_for_keyname(const DoutPrefixProvider* dpp, optional_yield y);
+ int get_key_for_uniqueid(const DoutPrefixProvider* dpp, optional_yield y, std::string &);
friend KmipSecretEngine;
};
}
KmipGetTheKey&
-KmipGetTheKey::get_uniqueid_for_keyname(optional_yield y)
+KmipGetTheKey::get_uniqueid_for_keyname(const DoutPrefixProvider* dpp,
+ optional_yield y)
{
RGWKMIPTransceiver secret_req(cct, RGWKMIPTransceiver::LOCATE);
}
int
-KmipGetTheKey::get_key_for_uniqueid(optional_yield y, std::string& actual_key)
+KmipGetTheKey::get_key_for_uniqueid(const DoutPrefixProvider* dpp,
+ optional_yield y, std::string& actual_key)
{
if (failed) return ret;
RGWKMIPTransceiver secret_req(cct, RGWKMIPTransceiver::GET);
int r;
r = KmipGetTheKey{cct}
.keyid_to_keyname(key_id)
- .get_uniqueid_for_keyname(y)
- .get_key_for_uniqueid(y, actual_key);
+ .get_uniqueid_for_keyname(dpp, y)
+ .get_key_for_uniqueid(dpp, y, actual_key);
return r;
}
};
secret_req.append_header("Accept", "application/octet-stream");
secret_req.append_header("X-Auth-Token", barbican_token);
- res = secret_req.process(y);
+ res = secret_req.process(dpp, y);
// map 401 to EACCES instead of EPERM
if (secret_req.get_http_status() ==
RGWHTTPTransceiver::HTTP_STATUS_UNAUTHORIZED) {
req.set_send_length(ss.str().length());
/* send request */
- ret = req.process(null_yield);
+ ret = req.process(op, s->yield);
if (ret < 0) {
ldpp_dout(op, 2) << "OPA process error:" << bl.c_str() << dendl;
return ret;
method = new_info.method;
url = new_url;
- int r = process(y);
+ int r = process(dpp, y);
if (r < 0){
if (r == -EINVAL){
// curl_easy has errored, generally means the service is not available
return RGWHTTPStreamRWRequest::send(mgr);
}
-int RGWHTTPStreamRWRequest::complete_request(optional_yield y,
+int RGWHTTPStreamRWRequest::complete_request(const DoutPrefixProvider* dpp,
+ optional_yield y,
string *etag,
real_time *mtime,
uint64_t *psize,
map<string, string> *pattrs,
map<string, string> *pheaders)
{
- int ret = wait(y);
+ int ret = wait(dpp, y);
if (ret < 0) {
return ret;
}
virtual int send(RGWHTTPManager *mgr);
- int complete_request(optional_yield y,
+ int complete_request(const DoutPrefixProvider* dpp, optional_yield y,
std::string *etag = nullptr,
real_time *mtime = nullptr,
uint64_t *psize = nullptr,
return 0;
}
-int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag,
+int RGWRESTConn::complete_request(const DoutPrefixProvider* dpp,
+ RGWRESTStreamS3PutObj *req, string& etag,
real_time *mtime, optional_yield y)
{
- int ret = req->complete_request(y, &etag, mtime);
+ int ret = req->complete_request(dpp, y, &etag, mtime);
if (ret == -EIO) {
ldout(cct, 5) << __func__ << ": complete_request() returned ret=" << ret << dendl;
set_url_unconnectable(req->get_url_orig());
return r;
}
-int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req,
+int RGWRESTConn::complete_request(const DoutPrefixProvider* dpp,
+ RGWRESTStreamRWRequest *req,
string *etag,
real_time *mtime,
uint64_t *psize,
map<string, string> *pheaders,
optional_yield y)
{
- int ret = req->complete_request(y, etag, mtime, psize, pattrs, pheaders);
+ int ret = req->complete_request(dpp, y, etag, mtime, psize, pattrs, pheaders);
if (ret == -EIO) {
ldout(cct, 5) << __func__ << ": complete_request() returned ret=" << ret << dendl;
set_url_unconnectable(req->get_url_orig());
return ret;
}
- ret = req.complete_request(y);
+ ret = req.complete_request(dpp, y);
if (ret == -EIO) {
set_url_unconnectable(url);
if (tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
return ret;
}
- ret = req.complete_request(y);
+ ret = req.complete_request(dpp, y);
if (ret == -EIO) {
set_url_unconnectable(url);
if (tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
return ret;
}
- ret = req.complete_request(y);
+ ret = req.complete_request(dpp, y);
if (ret == -EIO) {
conn->set_url_unconnectable(req.get_url_orig());
ldpp_dout(dpp, 20) << __func__ << ": complete_request() returned ret=" << ret << dendl;
return ret;
}
- ret = req.complete_request(y);
+ ret = req.complete_request(dpp, y);
if (ret == -EIO) {
conn->set_url_unconnectable(req.get_url_orig());
ldpp_dout(dpp, 20) << __func__ << ": complete_request() returned ret=" << ret << dendl;
int put_obj_send_init(const rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req);
int put_obj_async_init(const DoutPrefixProvider *dpp, const rgw_owner& uid, const rgw_obj& obj,
std::map<std::string, bufferlist>& attrs, RGWRESTStreamS3PutObj **req);
- int complete_request(RGWRESTStreamS3PutObj *req, std::string& etag,
+ int complete_request(const DoutPrefixProvider* dpp,
+ RGWRESTStreamS3PutObj *req, std::string& etag,
ceph::real_time *mtime, optional_yield y);
struct get_obj_params {
bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest,
bool skip_decrypt, rgw_zone_set_entry *dst_zone_trace, bool sync_cloudtiered,
bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req);
- int complete_request(RGWRESTStreamRWRequest *req,
+ int complete_request(const DoutPrefixProvider* dpp,
+ RGWRESTStreamRWRequest *req,
std::string *etag,
ceph::real_time *mtime,
uint64_t *psize,
return req.get_http_status();
}
- int wait(bufferlist *pbl, optional_yield y) {
- int ret = req.wait(y);
+ int wait(const DoutPrefixProvider* dpp, bufferlist *pbl, optional_yield y) {
+ int ret = req.wait(dpp, y);
if (ret < 0) {
if (ret == -EIO) {
conn->set_url_unconnectable(req.get_url_orig());
}
template <class T>
- int wait(T *dest, optional_yield y);
+ int wait(const DoutPrefixProvider* dpp, T *dest, optional_yield y);
template <class T>
int fetch(const DoutPrefixProvider *dpp, T *dest, optional_yield y);
}
template <class T>
-int RGWRESTReadResource::wait(T *dest, optional_yield y)
+int RGWRESTReadResource::wait(const DoutPrefixProvider* dpp, T *dest,
+ optional_yield y)
{
- int ret = req.wait(y);
+ int ret = req.wait(dpp, y);
if (ret < 0) {
if (ret == -EIO) {
conn->set_url_unconnectable(req.get_url_orig());
}
template <class E = int>
- int wait(bufferlist *pbl, optional_yield y, E *err_result = nullptr) {
- int ret = req.wait(y);
+ int wait(const DoutPrefixProvider* dpp, bufferlist *pbl,
+ optional_yield y, E *err_result = nullptr) {
+ int ret = req.wait(dpp, y);
*pbl = bl;
if (ret == -EIO) {
}
template <class T, class E = int>
- int wait(T *dest, optional_yield y, E *err_result = nullptr);
+ int wait(const DoutPrefixProvider* dpp, T *dest,
+ optional_yield y, E *err_result = nullptr);
};
template <class T, class E>
-int RGWRESTSendResource::wait(T *dest, optional_yield y, E *err_result)
+int RGWRESTSendResource::wait(const DoutPrefixProvider* dpp, T *dest,
+ optional_yield y, E *err_result)
{
- int ret = req.wait(y);
+ int ret = req.wait(dpp, y);
if (ret == -EIO) {
conn->set_url_unconnectable(req.get_url_orig());
}
//Headers
openidc_req.append_header("Content-Type", "application/x-www-form-urlencoded");
- int res = openidc_req.process(y);
+ int res = openidc_req.process(dpp, y);
if (res < 0) {
ldpp_dout(dpp, 10) << "HTTP request res: " << res << dendl;
throw -EINVAL;
//Headers
cert_req.append_header("Content-Type", "application/x-www-form-urlencoded");
- int res = cert_req.process(y);
+ int res = cert_req.process(dpp, y);
if (res < 0) {
ldpp_dout(dpp, 10) << "HTTP request res: " << res << dendl;
throw -EINVAL;
ldpp_dout(dpp, 10) << "rgw_swift_validate_token url=" << url_buf << dendl;
- int ret = validator.process(y);
+ int ret = validator.process(dpp, y);
if (ret < 0) {
throw ret;
}
const auto url = std::string{"http://127.0.0.1:"} + std::to_string(acceptor.local_endpoint().port());
RGWHTTPClient client{g_ceph_context, "GET", url};
- EXPECT_EQ(-EAGAIN, RGWHTTP::process(&client, null_yield));
+ const auto dpp = NoDoutPrefix{g_ceph_context, ceph_subsys_rgw};
+ EXPECT_EQ(-EAGAIN, RGWHTTP::process(&dpp, &client, null_yield));
server.join();
}
const auto url = std::string{"http://127.0.0.1:"} + std::to_string(acceptor.local_endpoint().port());
RGWHTTPClient client{g_ceph_context, "HEAD", url};
- EXPECT_EQ(0, RGWHTTP::process(&client, null_yield));
+ const auto dpp = NoDoutPrefix{g_ceph_context, ceph_subsys_rgw};
+ EXPECT_EQ(0, RGWHTTP::process(&dpp, &client, null_yield));
server.join();
}