]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: cleanup includes and unused parameters
authorYuval Lifshitz <ylifshit@ibm.com>
Mon, 15 Apr 2024 17:31:30 +0000 (17:31 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Sun, 12 May 2024 10:16:20 +0000 (10:16 +0000)
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/driver/rados/rgw_pubsub_push.h

index 4f0ae5af54a347efd589b1bc53b815b1e35bf6d0..43481629bdf4960105662cac1e400ab2c5fe027b 100644 (file)
@@ -289,7 +289,7 @@ private:
           cct);
       ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
         " for entry: " << entry.marker << dendl;
-      const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
+      const auto ret = push_endpoint->send(event_entry.event, optional_yield(io_context, yield));
       if (ret < 0) {
         ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
                            << " failed. error: " << ret
@@ -1203,8 +1203,7 @@ int publish_commit(rgw::sal::Object* obj,
          dpp->get_cct());
         ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
                               << topic.cfg.dest.push_endpoint << dendl;
-        const auto ret = push_endpoint->send_to_completion_async(
-         dpp->get_cct(), event_entry.event, res.yield);
+        const auto ret = push_endpoint->send(event_entry.event, res.yield);
         if (ret < 0) {
           ldpp_dout(dpp, 1)
               << "ERROR: failed to push sync notification event with error: "
index 3a1fed164381bd47562b09a6c10b52c1e2f17d7d..d2c0824c90cbb0262517eb69cd9565d14529574c 100644 (file)
@@ -5,7 +5,6 @@
 #include <string>
 #include <sstream>
 #include <algorithm>
-#include "include/buffer_fwd.h"
 #include "common/Formatter.h"
 #include "common/iso_8601.h"
 #include "common/async/completion.h"
@@ -55,6 +54,7 @@ bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_val
 
 class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
 private:
+  CephContext* const cct;
   const std::string endpoint;
   typedef unsigned ack_level_t;
   ack_level_t ack_level; // TODO: not used for now
@@ -64,8 +64,8 @@ private:
   static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
 
 public:
-  RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : 
-    endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) 
+  RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args, CephContext* _cct) : 
+    cct(_cct), endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) 
   {
     bool exists;
     const auto& str_ack_level = args.get("http-ack-level", &exists);
@@ -82,7 +82,7 @@ public:
     }
   }
 
-  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
+  int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     bufferlist read_bl;
     RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
     const auto post_data = json_format_pubsub_event(event);
@@ -172,7 +172,6 @@ private:
     Broker,
     Routable
   };
-  CephContext* const cct;
   const std::string endpoint;
   const std::string topic;
   const std::string exchange;
@@ -224,9 +223,7 @@ private:
 public:
   RGWPubSubAMQPEndpoint(const std::string& _endpoint,
       const std::string& _topic,
-      const RGWHTTPArgs& args,
-      CephContext* _cct) : 
-        cct(_cct),
+      const RGWHTTPArgs& args) : 
         endpoint(_endpoint), 
         topic(_topic),
         exchange(get_exchange(args)),
@@ -236,7 +233,7 @@ public:
     }
   }
 
-  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
+  int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
       return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
@@ -276,7 +273,6 @@ private:
     None,
     Broker,
   };
-  CephContext* const cct;
   const std::string topic;
   const ack_level_t ack_level;
   std::string conn_name;
@@ -298,9 +294,7 @@ private:
 public:
   RGWPubSubKafkaEndpoint(const std::string& _endpoint,
       const std::string& _topic,
-      const RGWHTTPArgs& args,
-      CephContext* _cct) : 
-        cct(_cct),
+      const RGWHTTPArgs& args) : 
         topic(_topic),
         ack_level(get_ack_level(args)) {
     if (!kafka::connect(conn_name, _endpoint,
@@ -314,7 +308,7 @@ public:
     }
   }
 
-  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
+  int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
       return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
     } else {
@@ -376,7 +370,7 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
     CephContext* cct) {
   const auto& schema = get_schema(endpoint);
   if (schema == WEBHOOK_SCHEMA) {
-    return Ptr(new RGWPubSubHTTPEndpoint(endpoint, args));
+    return std::make_unique<RGWPubSubHTTPEndpoint>(endpoint, args, cct);
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
   } else if (schema == AMQP_SCHEMA) {
     bool exists;
@@ -385,7 +379,7 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
       version = AMQP_0_9_1;
     }
     if (version == AMQP_0_9_1) {
-      return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
+      return std::make_unique<RGWPubSubAMQPEndpoint>(endpoint, topic, args);
     } else if (version == AMQP_1_0) {
       throw configuration_error("AMQP: v1.0 not supported");
       return nullptr;
@@ -396,7 +390,7 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
 #endif
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
   } else if (schema == KAFKA_SCHEMA) {
-      return Ptr(new RGWPubSubKafkaEndpoint(endpoint, topic, args, cct));
+      return std::make_unique<RGWPubSubKafkaEndpoint>(endpoint, topic, args);
 #endif
   }
 
index 17905937c035afea144191ce8f6c3f6c0544c0eb..040be3dd428911b4913046dc80e800ae93823ea7 100644 (file)
@@ -5,12 +5,9 @@
 #include <string>
 #include <memory>
 #include <stdexcept>
-#include "include/buffer_fwd.h"
 #include "include/common_fwd.h"
 #include "common/async/yield_context.h"
 
-// TODO the env should be used as a template parameter to differentiate the source that triggers the pushes
-class RGWDataSyncEnv;
 class RGWHTTPArgs;
 struct rgw_pubsub_s3_event;
 
@@ -27,14 +24,14 @@ public:
   // factory method for the actual notification endpoint
   // derived class specific arguments are passed in http args format
   // may throw a configuration_error if creation fails
-  static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
+  static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContextcct=nullptr);
  
-  // this method is used in order to send notification (S3 compliant) and wait for completion 
+  // this method is used in order to send notification and wait for completion 
   // in async manner via a coroutine when invoked in the frontend environment
-  virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0;
+  virtual int send(const rgw_pubsub_s3_event& event, optional_yield y) = 0;
 
   // present as string
-  virtual std::string to_str() const { return ""; }
+  virtual std::string to_str() const = 0;
   
   virtual ~RGWPubSubEndpoint() = default;