]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: remove pusub subscription functionality
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 21 Nov 2022 11:52:26 +0000 (13:52 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Wed, 30 Nov 2022 18:48:16 +0000 (18:48 +0000)
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_pubsub_push.h
src/rgw/store/rados/rgw_rest_pubsub.cc
src/rgw/store/rados/rgw_rest_pubsub_common.cc
src/rgw/store/rados/rgw_rest_pubsub_common.h
src/test/cli/radosgw-admin/help.t

index 796c5a707f6772a019a49f30153429f81f4e395f..81cb33c4479b62f5867ca3dd715294b41c4a6533 100644 (file)
@@ -308,13 +308,9 @@ void usage()
   cout << "  mfa remove                 delete MFA TOTP token\n";
   cout << "  mfa check                  check MFA TOTP token\n";
   cout << "  mfa resync                 re-sync MFA TOTP token\n";
-  cout << "  topic list                 list bucket notifications/pubsub topics\n";
-  cout << "  topic get                  get a bucket notifications/pubsub topic\n";
-  cout << "  topic rm                   remove a bucket notifications/pubsub topic\n";
-  cout << "  subscription get           get a pubsub subscription definition\n";
-  cout << "  subscription rm            remove a pubsub subscription\n";
-  cout << "  subscription pull          show events in a pubsub subscription\n";
-  cout << "  subscription ack           ack (remove) an events in a pubsub subscription\n";
+  cout << "  topic list                 list bucket notifications topics\n";
+  cout << "  topic get                  get a bucket notifications topic\n";
+  cout << "  topic rm                   remove a bucket notifications topic\n";
   cout << "  script put                 upload a lua script to a context\n";
   cout << "  script get                 get the lua script of a context\n";
   cout << "  script rm                  remove the lua scripts of a context\n";
@@ -480,10 +476,8 @@ void usage()
   cout << "   --totp-seconds            the time resolution that is being used for TOTP generation\n";
   cout << "   --totp-window             the number of TOTP tokens that are checked before and after the current token when validating token\n";
   cout << "   --totp-pin                the valid value of a TOTP token at a certain time\n";
-  cout << "\nBucket notifications/pubsub options:\n";
-  cout << "   --topic                   bucket notifications/pubsub topic name\n";
-  cout << "   --subscription            pubsub subscription name\n";
-  cout << "   --event-id                event id in a pubsub subscription\n";
+  cout << "\nBucket notifications options:\n";
+  cout << "   --topic                   bucket notifications topic name\n";
   cout << "\nScript options:\n";
   cout << "   --context                 context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n";
   cout << "   --package                 name of the lua package that should be added/removed to/from the allowlist\n";
@@ -831,13 +825,8 @@ enum class OPT {
   RESHARD_STALE_INSTANCES_LIST,
   RESHARD_STALE_INSTANCES_DELETE,
   PUBSUB_TOPICS_LIST,
-  // TODO add "subscription list" command
   PUBSUB_TOPIC_GET,
   PUBSUB_TOPIC_RM,
-  PUBSUB_SUB_GET,
-  PUBSUB_SUB_RM,
-  PUBSUB_SUB_PULL,
-  PUBSUB_EVENT_RM,
   SCRIPT_PUT,
   SCRIPT_GET,
   SCRIPT_RM,
@@ -1068,10 +1057,6 @@ static SimpleCmd::Commands all_cmds = {
   { "topic list", OPT::PUBSUB_TOPICS_LIST },
   { "topic get", OPT::PUBSUB_TOPIC_GET },
   { "topic rm", OPT::PUBSUB_TOPIC_RM },
-  { "subscription get", OPT::PUBSUB_SUB_GET },
-  { "subscription rm", OPT::PUBSUB_SUB_RM },
-  { "subscription pull", OPT::PUBSUB_SUB_PULL },
-  { "subscription ack", OPT::PUBSUB_EVENT_RM },
   { "script put", OPT::SCRIPT_PUT },
   { "script get", OPT::SCRIPT_GET },
   { "script rm", OPT::SCRIPT_RM },
@@ -4335,8 +4320,6 @@ int main(int argc, const char **argv)
                         OPT::RESHARD_STATUS,
                         OPT::PUBSUB_TOPICS_LIST,
                         OPT::PUBSUB_TOPIC_GET,
-                        OPT::PUBSUB_SUB_GET,
-                        OPT::PUBSUB_SUB_PULL,
                         OPT::SCRIPT_GET,
     };
 
@@ -10497,99 +10480,6 @@ next:
     }
   }
 
-  if (opt_cmd == OPT::PUBSUB_SUB_GET) {
-    if (store->get_zone()->get_tier_type() != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
-      return EINVAL;
-    }
-
-    RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
-    rgw_pubsub_sub_config sub_conf;
-
-    auto sub = ps.get_sub(sub_name);
-    ret = sub->get_conf(&sub_conf);
-    if (ret < 0) {
-      cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-    encode_json("sub", sub_conf, formatter.get());
-    formatter->flush(cout);
-  }
-
- if (opt_cmd == OPT::PUBSUB_SUB_RM) {
-    if (store->get_zone()->get_tier_type() != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
-      return EINVAL;
-    }
-
-    RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
-    auto sub = ps.get_sub(sub_name);
-    ret = sub->unsubscribe(dpp(), topic_name, null_yield);
-    if (ret < 0) {
-      cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-  }
-
- if (opt_cmd == OPT::PUBSUB_SUB_PULL) {
-    if (store->get_zone()->get_tier_type() != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
-      return EINVAL;
-    }
-
-    RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
-    if (!max_entries_specified) {
-      max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS;
-    }
-    auto sub = ps.get_sub_with_events(sub_name);
-    ret = sub->list_events(dpp(), marker, max_entries);
-    if (ret < 0) {
-      cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-    encode_json("result", *sub, formatter.get());
-    formatter->flush(cout);
- }
-
- if (opt_cmd == OPT::PUBSUB_EVENT_RM) {
-    if (store->get_zone()->get_tier_type() != "pubsub") {
-      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
-      return EINVAL;
-    }
-    if (sub_name.empty()) {
-      cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
-      return EINVAL;
-    }
-    if (event_id.empty()) {
-      cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
-      return EINVAL;
-    }
-
-    RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
-    auto sub = ps.get_sub_with_events(sub_name);
-    ret = sub->remove_event(dpp(), event_id);
-    if (ret < 0) {
-      cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-  }
-
   if (opt_cmd == OPT::SCRIPT_PUT) {
     if (!str_script_ctx) {
       cerr << "ERROR: context was not provided (via --context)" << std::endl;
index 012648ac3274a4f23e4037b8a355a1de115db815..bf0e2d46b4c242d9f4b0ce0adda1ac5c1f237bd5 100644 (file)
@@ -296,15 +296,6 @@ void rgw_pubsub_s3_event::dump(Formatter *f) const {
   encode_json("opaqueData", opaque_data, f);
 }
 
-void rgw_pubsub_event::dump(Formatter *f) const
-{
-  encode_json("id", id, f);
-  encode_json("event", event_name, f);
-  utime_t ut(timestamp);
-  encode_json("timestamp", ut, f);
-  encode_json("info", info, f);
-}
-
 void rgw_pubsub_topic::dump(Formatter *f) const
 {
   encode_json("user", user, f);
@@ -718,247 +709,6 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, o
   return 0;
 }
 
-int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
-{
-  int ret = ps->read(sub_meta_obj, result, objv_tracker);
-  if (ret < 0 && ret != -ENOENT) {
-    ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
-    return ret;
-  }
-  return 0;
-}
-
-int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp, 
-                              const rgw_pubsub_sub_config& sub_conf,
-                                 RGWObjVersionTracker *objv_tracker,
-                                 optional_yield y)
-{
-  int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
-    return ret;
-  }
-
-  return 0;
-}
-
-int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker,
-                                  optional_yield y)
-{
-  int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
-    return ret;
-  }
-
-  return 0;
-}
-
-int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
-{
-  return read_sub(result, nullptr);
-}
-
-int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
-{
-  RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_topics topics;
-
-  int ret = ps->read_topics(&topics, &objv_tracker);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
-    return ret != -ENOENT ? ret : -EINVAL;
-  }
-
-  auto iter = topics.topics.find(topic);
-  if (iter == topics.topics.end()) {
-    ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
-    return -EINVAL;
-  }
-
-  auto& t = iter->second;
-
-  rgw_pubsub_sub_config sub_conf;
-
-  sub_conf.user = rgw_user("", ps->tenant);
-  sub_conf.name = sub;
-  sub_conf.topic = topic;
-  sub_conf.dest = dest;
-  sub_conf.s3_id = s3_id;
-
-  t.subs.insert(sub);
-
-  ret = ps->write_topics(dpp, topics, &objv_tracker, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
-    return ret;
-  }
-
-  ret = write_sub(dpp, sub_conf, nullptr, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
-    return ret;
-  }
-  return 0;
-}
-
-int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y)
-{
-  string topic = _topic;
-  RGWObjVersionTracker sobjv_tracker;
-
-  if (topic.empty()) {
-    rgw_pubsub_sub_config sub_conf;
-    int ret = read_sub(&sub_conf, &sobjv_tracker);
-    if (ret < 0) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
-      return ret;
-    }
-    topic = sub_conf.topic;
-  }
-
-  RGWObjVersionTracker objv_tracker;
-  rgw_pubsub_topics topics;
-
-  int ret = ps->read_topics(&topics, &objv_tracker);
-  if (ret < 0) {
-    // not an error - could be that topic was already deleted
-    ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
-  } else {
-    auto iter = topics.topics.find(topic);
-    if (iter != topics.topics.end()) {
-      auto& t = iter->second;
-
-      t.subs.erase(sub);
-
-      ret = ps->write_topics(dpp, topics, &objv_tracker, y);
-      if (ret < 0) {
-        ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
-        return ret;
-      }
-    }
-  }
-
-  ret = remove_sub(dpp, &sobjv_tracker, y);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
-    return ret;
-  }
-  return 0;
-}
-
-template<typename EventType>
-void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
-{
-  encode_json("next_marker", next_marker, f);
-  encode_json("is_truncated", is_truncated, f);
-
-  Formatter::ArraySection s(*f, EventType::json_type_plural);
-  for (auto& event : events) {
-    encode_json("", event, f);
-  }
-}
-
-template<typename EventType>
-int RGWPubSub::SubWithEvents<EventType>::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events)
-{
-  RGWRados *store = ps->store->getRados();
-  rgw_pubsub_sub_config sub_conf;
-  int ret = get_conf(&sub_conf);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
-    return ret;
-  }
-
-  RGWBucketInfo bucket_info;
-  string tenant;
-  ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
-  if (ret == -ENOENT) {
-    list.is_truncated = false;
-    return 0;
-  }
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
-    return ret;
-  }
-
-  RGWRados::Bucket target(store, bucket_info);
-  RGWRados::Bucket::List list_op(&target);
-
-  list_op.params.prefix = sub_conf.dest.oid_prefix;
-  list_op.params.marker = marker;
-
-  std::vector<rgw_bucket_dir_entry> objs;
-
-  ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
-    return ret;
-  }
-  if (list.is_truncated) {
-    list.next_marker = list_op.get_next_marker().name;
-  }
-
-  for (auto& obj : objs) {
-    bufferlist bl64;
-    bufferlist bl;
-    bl64.append(obj.meta.user_data);
-    try {
-      bl.decode_base64(bl64);
-    } catch (buffer::error& err) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl;
-      continue;
-    }
-    EventType event;
-
-    auto iter = bl.cbegin();
-    try {
-      decode(event, iter);
-    } catch (buffer::error& err) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl;
-      continue;
-    };
-
-    list.events.push_back(event);
-  }
-  return 0;
-}
-
-template<typename EventType>
-int RGWPubSub::SubWithEvents<EventType>::remove_event(const DoutPrefixProvider *dpp, const string& event_id)
-{
-  rgw::sal::RadosStore* store = ps->store;
-  rgw_pubsub_sub_config sub_conf;
-  int ret = get_conf(&sub_conf);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
-    return ret;
-  }
-
-  string tenant;
-  std::unique_ptr<rgw::sal::Bucket> bucket;
-  ret = store->get_bucket(dpp, nullptr, tenant, sub_conf.dest.bucket_name,
-                         &bucket, null_yield);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
-    return ret;
-  }
-  std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
-                                                 sub_conf.dest.oid_prefix + event_id);
-  obj->set_atomic();
-
-  std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
-
-  del_op->params.bucket_owner = bucket->get_info().owner;
-  del_op->params.versioning_status = bucket->get_info().versioning_status();
-
-  ret = del_op->delete_obj(dpp, null_yield);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
-  }
-  return 0;
-}
-
 void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
 }
@@ -971,13 +721,3 @@ void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
   *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
 }
 
-template<typename EventType>
-void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
-  list.dump(f);
-}
-
-// explicit instantiation for the only two possible types
-// no need to move implementation to header
-template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
-
index 6335a5cb67aefc7322ac527c05ce312fe0f6ca01..c3b0b4d07750d5c4c3a7231c8bf35a7e99f12bec 100644 (file)
@@ -332,38 +332,6 @@ struct rgw_pubsub_s3_event {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
 
-struct rgw_pubsub_event {
-  constexpr static const char* const json_type_plural = "events";
-  std::string id;
-  std::string event_name;
-  std::string source;
-  ceph::real_time timestamp;
-  JSONFormattable info;
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(id, bl);
-    encode(event_name, bl);
-    encode(source, bl);
-    encode(timestamp, bl);
-    encode(info, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(id, bl);
-    decode(event_name, bl);
-    decode(source, bl);
-    decode(timestamp, bl);
-    decode(info, bl);
-    DECODE_FINISH(bl);
-  }
-
-  void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_event)
-
 // setting a unique ID for an event based on object hash and timestamp
 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
 
@@ -675,81 +643,12 @@ public:
     int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
   };
 
-  // base class for subscription
-  class Sub {
-    friend class RGWPubSub;
-  protected:
-    RGWPubSub* const ps;
-    const std::string sub;
-    rgw_raw_obj sub_meta_obj;
-
-    int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
-    int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf,
-                 RGWObjVersionTracker* objv_tracker, optional_yield y);
-    int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y);
-  public:
-    Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
-      ps->get_sub_meta_obj(sub, &sub_meta_obj);
-    }
-
-    virtual ~Sub() = default;
-
-    int subscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y,
-                 const std::string& s3_id="");
-    int unsubscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
-    int get_conf(rgw_pubsub_sub_config* result);
-    
-    static const int DEFAULT_MAX_EVENTS = 100;
-    // followint virtual methods should only be called in derived
-    virtual int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) {ceph_assert(false);}
-    virtual int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) {ceph_assert(false);}
-    virtual void dump(Formatter* f) const {ceph_assert(false);}
-  };
-
-  // subscription with templated list of events to support both S3 compliant and Ceph specific events
-  template<typename EventType>
-  class SubWithEvents : public Sub {
-  private:
-    struct list_events_result {
-      std::string next_marker;
-      bool is_truncated{false};
-      void dump(Formatter *f) const;
-      std::vector<EventType> events;
-    } list;
-
-  public:
-    SubWithEvents(RGWPubSub *_ps, const std::string& _sub) : Sub(_ps, _sub) {}
-
-    virtual ~SubWithEvents() = default;
-    
-    int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) override;
-    int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) override;
-    void dump(Formatter* f) const override;
-  };
-
   using BucketRef = std::shared_ptr<Bucket>;
-  using SubRef = std::shared_ptr<Sub>;
 
   BucketRef get_bucket(const rgw_bucket& bucket) {
     return std::make_shared<Bucket>(this, bucket);
   }
 
-  SubRef get_sub(const std::string& sub) {
-    return std::make_shared<Sub>(this, sub);
-  }
-  
-  SubRef get_sub_with_events(const std::string& sub) {
-    auto tmpsub = Sub(this, sub);
-    rgw_pubsub_sub_config conf;
-    if (tmpsub.get_conf(&conf) < 0) {
-      return nullptr;
-    }
-    if (conf.s3_id.empty()) {
-      return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
-    }
-    return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
-  }
-
   void get_meta_obj(rgw_raw_obj *obj) const;
   void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
 
index 79fa736e9250421ff87bf85a4c5877759dbb6874..2f734c21df838addf385cfbdd500b759301e5a5f 100644 (file)
@@ -63,53 +63,6 @@ private:
   static const ack_level_t ACK_LEVEL_ANY = 0;
   static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
 
-  // PostCR implements async execution of RGWPostHTTPData via coroutine
-  class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
-  private:
-    RGWDataSyncEnv* const sync_env;
-    bufferlist read_bl;
-    const ack_level_t ack_level;
-
-  public:
-    PostCR(const std::string& _post_data,
-        RGWDataSyncEnv* _sync_env,
-        const std::string& endpoint,
-        ack_level_t _ack_level,
-        bool verify_ssl) :
-      RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
-      RGWSimpleCoroutine(_sync_env->cct), 
-      sync_env(_sync_env),
-      ack_level (_ack_level) {
-      // ctor also set the data to send
-      set_post_data(_post_data);
-      set_send_length(_post_data.length());
-    }
-
-    // send message to endpoint
-    int send_request(const DoutPrefixProvider *dpp) override {
-      init_new_io(this);
-      const auto rc = sync_env->http_manager->add_request(this);
-      if (rc < 0) {
-        return rc;
-      }
-      if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
-      return 0;
-    }
-
-    // wait for reply
-    int request_complete() override {
-      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
-      if (ack_level == ACK_LEVEL_ANY) {
-        return 0;
-      } else if (ack_level == ACK_LEVEL_NON_ERROR) {
-        // TODO check result code to be non-error
-      } else {
-        // TODO: check that result code == ack_level
-      }
-      return -1;
-    }
-  };
-
 public:
   RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) : 
     endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false)) 
@@ -129,14 +82,6 @@ public:
     }
   }
 
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
-    return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
-  }
-
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
-    return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
-  }
-
   int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
     bufferlist read_bl;
     RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
@@ -226,95 +171,6 @@ private:
     }
     throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
   }
-
-  // NoAckPublishCR implements async amqp publishing via coroutine
-  // This coroutine ends when it send the message and does not wait for an ack
-  class NoAckPublishCR : public RGWCoroutine {
-  private:
-    const std::string topic;
-    amqp::connection_ptr_t conn;
-    const std::string message;
-
-  public:
-    NoAckPublishCR(CephContext* cct,
-              const std::string& _topic,
-              amqp::connection_ptr_t& _conn,
-              const std::string& _message) :
-      RGWCoroutine(cct),
-      topic(_topic), conn(_conn), message(_message) {}
-
-    // send message to endpoint, without waiting for reply
-    int operate(const DoutPrefixProvider *dpp) override {
-      reenter(this) {
-        const auto rc = amqp::publish(conn, topic, message);
-        if (rc < 0) {
-          return set_cr_error(rc);
-        }
-        return set_cr_done();
-      }
-      return 0;
-    }
-  };
-
-  // AckPublishCR implements async amqp publishing via coroutine
-  // This coroutine ends when an ack is received from the borker 
-  // note that it does not wait for an ack fron the end client
-  class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
-  private:
-    const std::string topic;
-    amqp::connection_ptr_t conn;
-    const std::string message;
-
-  public:
-    AckPublishCR(CephContext* cct,
-              const std::string& _topic,
-              amqp::connection_ptr_t& _conn,
-              const std::string& _message) :
-      RGWCoroutine(cct),
-      topic(_topic), conn(_conn), message(_message) {}
-
-    // send message to endpoint, waiting for reply
-    int operate(const DoutPrefixProvider *dpp) override {
-      reenter(this) {
-        yield {
-          init_new_io(this);
-          const auto rc = amqp::publish_with_confirm(conn, 
-              topic,
-              message,
-              std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
-          if (rc < 0) {
-            // failed to publish, does not wait for reply
-            return set_cr_error(rc);
-          }
-          // mark as blocked on the amqp answer
-          if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
-          io_block();
-          return 0;
-        }
-        return set_cr_done();
-      }
-      return 0;
-    }
-
-    // callback invoked from the amqp manager thread when ack/nack is received
-    void request_complete(int status) {
-      ceph_assert(!is_done());
-      if (status != 0) {
-        // server replied with a nack
-        set_cr_error(status);
-      }
-      io_complete();
-      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
-    }
-   
-    // TODO: why are these mandatory in RGWIOProvider?
-    void set_io_user_info(void *_user_info) override {
-    }
-
-    void *get_io_user_info() override {
-      return nullptr;
-    }
-  };
   
 public:
   RGWPubSubAMQPEndpoint(const std::string& _endpoint,
@@ -332,24 +188,6 @@ public:
     }
   }
 
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
-    ceph_assert(conn);
-    if (ack_level == ack_level_t::None) {
-      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    } else {
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    }
-  }
-  
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
-    ceph_assert(conn);
-    if (ack_level == ack_level_t::None) {
-      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    } else {
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    }
-  }
-
   // this allows waiting untill "finish()" is called from a different thread
   // waiting could be blocking the waiting thread or yielding, depending
   // with compilation flag support and whether the optional_yield is set
@@ -438,7 +276,6 @@ static const std::string AMQP_1_0("1-0");
 static const std::string AMQP_SCHEMA("amqp");
 #endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
 
-
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
 class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
 private:
@@ -465,95 +302,6 @@ private:
     throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
   }
 
-  // NoAckPublishCR implements async kafka publishing via coroutine
-  // This coroutine ends when it send the message and does not wait for an ack
-  class NoAckPublishCR : public RGWCoroutine {
-  private:
-    const std::string topic;
-    kafka::connection_ptr_t conn;
-    const std::string message;
-
-  public:
-    NoAckPublishCR(CephContext* cct,
-              const std::string& _topic,
-              kafka::connection_ptr_t& _conn,
-              const std::string& _message) :
-      RGWCoroutine(cct),
-      topic(_topic), conn(_conn), message(_message) {}
-
-    // send message to endpoint, without waiting for reply
-    int operate(const DoutPrefixProvider *dpp) override {
-      reenter(this) {
-        const auto rc = kafka::publish(conn, topic, message);
-        if (rc < 0) {
-          return set_cr_error(rc);
-        }
-        return set_cr_done();
-      }
-      return 0;
-    }
-  };
-
-  // AckPublishCR implements async kafka publishing via coroutine
-  // This coroutine ends when an ack is received from the borker 
-  // note that it does not wait for an ack fron the end client
-  class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
-  private:
-    const std::string topic;
-    kafka::connection_ptr_t conn;
-    const std::string message;
-
-  public:
-    AckPublishCR(CephContext* cct,
-              const std::string& _topic,
-              kafka::connection_ptr_t& _conn,
-              const std::string& _message) :
-      RGWCoroutine(cct),
-      topic(_topic), conn(_conn), message(_message) {}
-
-    // send message to endpoint, waiting for reply
-    int operate(const DoutPrefixProvider *dpp) override {
-      reenter(this) {
-        yield {
-          init_new_io(this);
-          const auto rc = kafka::publish_with_confirm(conn, 
-              topic,
-              message,
-              std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
-          if (rc < 0) {
-            // failed to publish, does not wait for reply
-            return set_cr_error(rc);
-          }
-          // mark as blocked on the kafka answer
-          if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
-          io_block();
-          return 0;
-        }
-        return set_cr_done();
-      }
-      return 0;
-    }
-
-    // callback invoked from the kafka manager thread when ack/nack is received
-    void request_complete(int status) {
-      ceph_assert(!is_done());
-      if (status != 0) {
-        // server replied with a nack
-        set_cr_error(status);
-      }
-      io_complete();
-      if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
-    }
-   
-    // TODO: why are these mandatory in RGWIOProvider?
-    void set_io_user_info(void *_user_info) override {
-    }
-
-    void *get_io_user_info() override {
-      return nullptr;
-    }
-  };
-
 public:
   RGWPubSubKafkaEndpoint(const std::string& _endpoint,
       const std::string& _topic,
@@ -568,24 +316,6 @@ public:
     }
   }
 
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
-    ceph_assert(conn);
-    if (ack_level == ack_level_t::None) {
-      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    } else {
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    }
-  }
-  
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
-    ceph_assert(conn);
-    if (ack_level == ack_level_t::None) {
-      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    } else {
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
-    }
-  }
-
   // this allows waiting untill "finish()" is called from a different thread
   // waiting could be blocking the waiting thread or yielding, depending
   // with compilation flag support and whether the optional_yield is set
index 6cb3db74b332a12bb4ab48b4f802e3021db4a951..17905937c035afea144191ce8f6c3f6c0544c0eb 100644 (file)
@@ -11,9 +11,7 @@
 
 // TODO the env should be used as a template parameter to differentiate the source that triggers the pushes
 class RGWDataSyncEnv;
-class RGWCoroutine;
 class RGWHTTPArgs;
-struct rgw_pubsub_event;
 struct rgw_pubsub_s3_event;
 
 // endpoint base class all endpoint  - types should derive from it
@@ -31,14 +29,6 @@ public:
   // may throw a configuration_error if creation fails
   static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
  
-  // this method is used in order to send notification (Ceph specific) and wait for completion 
-  // in async manner via a coroutine when invoked in the data sync environment
-  virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0;
-
-  // this method is used in order to send notification (S3 compliant) and wait for completion 
-  // in async manner via a coroutine when invoked in the data sync environment
-  virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) = 0;
-
   // this method is used in order to send notification (S3 compliant) and wait for completion 
   // in async manner via a coroutine when invoked in the frontend environment
   virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0;
index 239f509821052080e8bd2d585bf97ad2926a020d..5b216330bd415cf567f2762558ad415c4530b02e 100644 (file)
@@ -64,9 +64,6 @@ public:
     }
     
     // dest object only stores endpoint info
-    // bucket to store events/records will be set only when subscription is created
-    dest.bucket_name = "";
-    dest.oid_prefix = "";
     dest.arn_topic = topic_name;
     // the topic ARN will be sent in the reply
     const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
@@ -437,28 +434,7 @@ int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::strin
 int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
   // delete all notifications of on a bucket
   for (const auto& topic : bucket_topics.topics) {
-    // remove the auto generated subscription of the topic (if exist)
-    rgw_pubsub_topic_subs topic_subs;
-    int op_ret = ps.get_topic(topic.first, &topic_subs);
-    for (const auto& topic_sub_name : topic_subs.subs) {
-      auto sub = ps.get_sub(topic_sub_name);
-      rgw_pubsub_sub_config sub_conf;
-      op_ret = sub->get_conf(&sub_conf);
-      if (op_ret < 0) {
-        ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
-        return op_ret;
-      }
-      if (!sub_conf.s3_id.empty()) {
-        // S3 notification, has autogenerated subscription
-        const auto& sub_topic_name = sub_conf.topic;
-        op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
-        if (op_ret < 0) {
-          ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
-          return op_ret;
-        }
-      }
-    }
-    op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+    const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
     if (op_ret < 0) {
       return op_ret;
     }
@@ -542,10 +518,6 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
   auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
 
-  std::string data_bucket_prefix = "";
-  std::string data_oid_prefix = "";
-  bool push_only = true;
-
   if(configurations.list.empty()) {
     // get all topics on a bucket
     rgw_pubsub_bucket_topics bucket_topics;
@@ -622,24 +594,6 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
       return;
     }
     ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
-  
-    if (!push_only) {
-      // generate the subscription with destination information from the original topic
-      rgw_pubsub_sub_dest dest = topic_info.dest;
-      dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
-      dest.oid_prefix = data_oid_prefix + notif_name + "/";
-      auto sub = ps->get_sub(notif_name);
-      op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name);
-      if (op_ret < 0) {
-        ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
-        // rollback generated notification (ignore return value)
-        b->remove_notification(this, unique_topic_name, y);
-        // rollback generated topic (ignore return value)
-        ps->remove_topic(this, unique_topic_name, y);
-        return;
-      }
-      ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
-    }
   }
 }
 
@@ -690,14 +644,7 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
     // delete a specific notification
     const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
     if (unique_topic) {
-      // remove the auto generated subscription according to notification name (if exist)
       const auto unique_topic_name = unique_topic->get().topic.name;
-      auto sub = ps->get_sub(notif_name);
-      op_ret = sub->unsubscribe(this, unique_topic_name, y);
-      if (op_ret < 0 && op_ret != -ENOENT) {
-        ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
-        return;
-      }
       op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
       return;
     }
index 15455ec3ab2680866c69eb9fd2c11a0bb79a5b0f..6410e5b341eb79d65a5b377873444a98a33401f5 100644 (file)
@@ -41,10 +41,6 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext
   return true;
 }
 
-bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) {
-    return sub.dest.stored_secret;
-}
-
 bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
     return topic.topic.dest.stored_secret;
 }
@@ -121,92 +117,6 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
   ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
 }
 
-void RGWPSCreateSubOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto sub = ps->get_sub(sub_name);
-  op_ret = sub->subscribe(this, topic_name, dest, y);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSGetSubOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto sub = ps->get_sub(sub_name);
-  op_ret = sub->get_conf(&result);
-  if (subscription_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
-    ldpp_dout(this, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
-    op_ret = -EPERM;
-    return;
-  }
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSDeleteSubOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto sub = ps->get_sub(sub_name);
-  op_ret = sub->unsubscribe(this, topic_name, y);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSAckSubEventOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  auto sub = ps->get_sub_with_events(sub_name);
-  op_ret = sub->remove_event(s, event_id);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSPullSubEventsOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  sub = ps->get_sub_with_events(sub_name);
-  if (!sub) {
-    op_ret = -ENOENT;
-    ldpp_dout(this, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
-    return;
-  }
-  op_ret = sub->list_events(s, marker, max_entries);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
-}
-
-
 int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
   int ret = get_params();
   if (ret < 0) {
index a702cf342041441866671fac35d8a38208b5fa65..fe3126c8377c8871591a10ea6c9292301ae8efd1 100644 (file)
@@ -101,130 +101,6 @@ public:
   uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
 };
 
-// create a subscription
-class RGWPSCreateSubOp : public RGWDefaultResponseOp {
-protected:
-  std::string sub_name;
-  std::string topic_name;
-  std::optional<RGWPubSub> ps;
-  rgw_pubsub_sub_dest dest;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_subscription_create"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// get subscription information (including push-endpoint if exist)
-class RGWPSGetSubOp : public RGWOp {
-protected:
-  std::string sub_name;
-  std::optional<RGWPubSub> ps;
-  rgw_pubsub_sub_config result;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_subscription_get"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-// delete subscription
-class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
-protected:
-  std::string sub_name;
-  std::string topic_name;
-  std::optional<RGWPubSub> ps;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_subscription_delete"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// acking of an event
-class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
-protected:
-  std::string sub_name;
-  std::string event_id;
-  std::optional<RGWPubSub> ps;
-  
-  virtual int get_params() = 0;
-
-public:
-  RGWPSAckSubEventOp() {}
-
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_subscription_ack"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// fetching events from a subscription
-// dpending on whether the subscription was created via s3 compliant API or not
-// the matching events will be returned
-class RGWPSPullSubEventsOp : public RGWOp {
-protected:
-  int max_entries{0};
-  std::string sub_name;
-  std::string marker;
-  std::optional<RGWPubSub> ps;
-  RGWPubSub::SubRef sub; 
-
-  virtual int get_params() = 0;
-
-public:
-  RGWPSPullSubEventsOp() {}
-
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_subscription_pull"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
 // notification creation
 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
 protected:
index bdff2abc4c2e83d49cb5d9eabebc23025aadb8d4..34953ad205456ef8b055e87902aa958d8d02ca36 100644 (file)
     mfa remove                 delete MFA TOTP token
     mfa check                  check MFA TOTP token
     mfa resync                 re-sync MFA TOTP token
-    topic list                 list bucket notifications/pubsub topics
-    topic get                  get a bucket notifications/pubsub topic
-    topic rm                   remove a bucket notifications/pubsub topic
-    subscription get           get a pubsub subscription definition
-    subscription rm            remove a pubsub subscription
-    subscription pull          show events in a pubsub subscription
-    subscription ack           ack (remove) an events in a pubsub subscription
+    topic list                 list bucket notifications topics
+    topic get                  get a bucket notifications topic
+    topic rm                   remove a bucket notifications topic
     script put                 upload a lua script to a context
     script get                 get the lua script of a context
     script rm                  remove the lua scripts of a context
      --totp-window             the number of TOTP tokens that are checked before and after the current token when validating token
      --totp-pin                the valid value of a TOTP token at a certain time
   
-  Bucket notifications/pubsub options:
-     --topic                   bucket notifications/pubsub topic name
-     --subscription            pubsub subscription name
-     --event-id                event id in a pubsub subscription
+  Bucket notifications options:
+     --topic                   bucket notifications topic name
   
   Script options:
      --context                 context in which the script runs. one of: prerequest, postrequest, background, getdata, putdata