cct);
ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
" for entry: " << entry.marker << dendl;
- const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
+ const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield));
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
<< " failed. error: " << ret
dpp->get_cct());
ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
<< topic.cfg.dest.push_endpoint << dendl;
- const auto ret = push_endpoint->send_to_completion_async(
- dpp->get_cct(), event_entry.event, res.yield);
+ const auto ret = push_endpoint->send(event_entry.event, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "ERROR: failed to push sync notification event with error: "
#include <string>
#include <sstream>
#include <algorithm>
-#include "include/buffer_fwd.h"
#include "common/Formatter.h"
#include "common/iso_8601.h"
#include "common/async/completion.h"
class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
+ CephContext* const cct;
const std::string endpoint;
typedef unsigned ack_level_t;
ack_level_t ack_level; // TODO: not used for now
static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
public:
- RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
- endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
+ RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args, CephContext* _cct) :
+ cct(_cct), endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
{
bool exists;
const auto& str_ack_level = args.get("http-ack-level", &exists);
}
}
- int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
+ int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
const auto post_data = json_format_pubsub_event(event);
Broker,
Routable
};
- CephContext* const cct;
const std::string endpoint;
const std::string topic;
const std::string exchange;
public:
RGWPubSubAMQPEndpoint(const std::string& _endpoint,
const std::string& _topic,
- const RGWHTTPArgs& args,
- CephContext* _cct) :
- cct(_cct),
+ const RGWHTTPArgs& args) :
endpoint(_endpoint),
topic(_topic),
exchange(get_exchange(args)),
}
}
- int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
+ int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
None,
Broker,
};
- CephContext* const cct;
const std::string topic;
const ack_level_t ack_level;
std::string conn_name;
public:
RGWPubSubKafkaEndpoint(const std::string& _endpoint,
const std::string& _topic,
- const RGWHTTPArgs& args,
- CephContext* _cct) :
- cct(_cct),
+ const RGWHTTPArgs& args) :
topic(_topic),
ack_level(get_ack_level(args)) {
if (!kafka::connect(conn_name, _endpoint,
}
}
- int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
+ int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
} else {
CephContext* cct) {
const auto& schema = get_schema(endpoint);
if (schema == WEBHOOK_SCHEMA) {
- return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
+ return std::make_unique<RGWPubSubHTTPEndpoint>(endpoint, args, cct);
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
} else if (schema == AMQP_SCHEMA) {
bool exists;
version = AMQP_0_9_1;
}
if (version == AMQP_0_9_1) {
- return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
+ return std::make_unique<RGWPubSubAMQPEndpoint>(endpoint, topic, args);
} else if (version == AMQP_1_0) {
throw configuration_error("AMQP: v1.0 not supported");
return nullptr;
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
} else if (schema == KAFKA_SCHEMA) {
- return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
+ return std::make_unique<RGWPubSubKafkaEndpoint>(endpoint, topic, args);
#endif
}
#include <string>
#include <memory>
#include <stdexcept>
-#include "include/buffer_fwd.h"
#include "include/common_fwd.h"
#include "common/async/yield_context.h"
-// TODO the env should be used as a template parameter to differentiate the source that triggers the pushes
-class RGWDataSyncEnv;
class RGWHTTPArgs;
struct rgw_pubsub_s3_event;
// factory method for the actual notification endpoint
// derived class specific arguments are passed in http args format
// may throw a configuration_error if creation fails
- static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
+ static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext* cct=nullptr);
- // this method is used in order to send notification (S3 compliant) and wait for completion
+ // this method is used in order to send notification and wait for completion
// in async manner via a coroutine when invoked in the frontend environment
- virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0;
+ virtual int send(const rgw_pubsub_s3_event& event, optional_yield y) = 0;
// present as string
- virtual std::string to_str() const { return ""; }
+ virtual std::string to_str() const = 0;
virtual ~RGWPubSubEndpoint() = default;