From cb9a09b4447332e386407f0738a20b5f34806d48 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Mon, 15 Apr 2024 17:31:30 +0000 Subject: [PATCH] rgw/notifications: cleanup includes and unused parameters Signed-off-by: Yuval Lifshitz --- src/rgw/driver/rados/rgw_notify.cc | 5 ++--- src/rgw/driver/rados/rgw_pubsub_push.cc | 28 ++++++++++--------------- src/rgw/driver/rados/rgw_pubsub_push.h | 11 ++++------ 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index 4f0ae5af54a34..43481629bdf49 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -289,7 +289,7 @@ private: cct); ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint << " for entry: " << entry.marker << dendl; - const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield)); + const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield)); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker << " failed. error: " << ret @@ -1203,8 +1203,7 @@ int publish_commit(rgw::sal::Object* obj, dpp->get_cct()); ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl; - const auto ret = push_endpoint->send_to_completion_async( - dpp->get_cct(), event_entry.event, res.yield); + const auto ret = push_endpoint->send(event_entry.event, res.yield); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to push sync notification event with error: " diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 3a1fed164381b..d2c0824c90cbb 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -5,7 +5,6 @@ #include #include #include -#include "include/buffer_fwd.h" #include "common/Formatter.h" #include "common/iso_8601.h" #include "common/async/completion.h" @@ -55,6 +54,7 @@ bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_val class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint { private: + CephContext* const cct; const std::string endpoint; typedef unsigned ack_level_t; ack_level_t ack_level; // TODO: not used for now @@ -64,8 +64,8 @@ private: static const ack_level_t ACK_LEVEL_NON_ERROR = 1; public: - RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : - endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) + RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args, CephContext* _cct) : + cct(_cct), endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) { bool exists; const auto& str_ack_level = args.get("http-ack-level", &exists); @@ -82,7 +82,7 @@ public: } } - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { + int send(const rgw_pubsub_s3_event& event, optional_yield y) override { bufferlist read_bl; RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl); const auto post_data = json_format_pubsub_event(event); @@ -172,7 +172,6 @@ private: Broker, Routable }; - CephContext* const cct; const std::string endpoint; const std::string topic; const std::string exchange; @@ -224,9 +223,7 @@ private: public: RGWPubSubAMQPEndpoint(const std::string& _endpoint, const std::string& _topic, - const RGWHTTPArgs& args, - CephContext* _cct) : - cct(_cct), + const RGWHTTPArgs& args) : endpoint(_endpoint), topic(_topic), exchange(get_exchange(args)), @@ -236,7 +233,7 @@ public: } } - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { + int send(const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { return amqp::publish(conn_id, topic, json_format_pubsub_event(event)); } else { @@ -276,7 +273,6 @@ private: None, Broker, }; - CephContext* const cct; const std::string topic; const ack_level_t ack_level; std::string conn_name; @@ -298,9 +294,7 @@ private: public: RGWPubSubKafkaEndpoint(const std::string& _endpoint, const std::string& _topic, - const RGWHTTPArgs& args, - CephContext* _cct) : - cct(_cct), + const RGWHTTPArgs& args) : topic(_topic), ack_level(get_ack_level(args)) { if (!kafka::connect(conn_name, _endpoint, @@ -314,7 +308,7 @@ public: } } - int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { + int send(const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { return kafka::publish(conn_name, topic, json_format_pubsub_event(event)); } else { @@ -376,7 +370,7 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, CephContext* cct) { const auto& schema = get_schema(endpoint); if (schema == WEBHOOK_SCHEMA) { - return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args)); + return std::make_unique(endpoint, args, cct); #ifdef WITH_RADOSGW_AMQP_ENDPOINT } else if (schema == AMQP_SCHEMA) { bool exists; @@ -385,7 +379,7 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, version = AMQP_0_9_1; } if (version == AMQP_0_9_1) { - return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct)); + return std::make_unique(endpoint, topic, args); } else if (version == AMQP_1_0) { throw configuration_error("AMQP: v1.0 not supported"); return nullptr; @@ -396,7 +390,7 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint, #endif #ifdef WITH_RADOSGW_KAFKA_ENDPOINT } else if (schema == KAFKA_SCHEMA) { - return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct)); + return std::make_unique(endpoint, topic, args); #endif } diff --git a/src/rgw/driver/rados/rgw_pubsub_push.h b/src/rgw/driver/rados/rgw_pubsub_push.h index 17905937c035a..040be3dd42891 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.h +++ b/src/rgw/driver/rados/rgw_pubsub_push.h @@ -5,12 +5,9 @@ #include #include #include -#include "include/buffer_fwd.h" #include "include/common_fwd.h" #include "common/async/yield_context.h" -// TODO the env should be used as a template parameter to differentiate the source that triggers the pushes -class RGWDataSyncEnv; class RGWHTTPArgs; struct rgw_pubsub_s3_event; @@ -27,14 +24,14 @@ public: // factory method for the actual notification endpoint // derived class specific arguments are passed in http args format // may throw a configuration_error if creation fails - static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr); + static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext* cct=nullptr); - // this method is used in order to send notification (S3 compliant) and wait for completion + // this method is used in order to send notification and wait for completion // in async manner via a coroutine when invoked in the frontend environment - virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0; + virtual int send(const rgw_pubsub_s3_event& event, optional_yield y) = 0; // present as string - virtual std::string to_str() const { return ""; } + virtual std::string to_str() const = 0; virtual ~RGWPubSubEndpoint() = default; -- 2.39.5