cout << " mfa remove delete MFA TOTP token\n";
cout << " mfa check check MFA TOTP token\n";
cout << " mfa resync re-sync MFA TOTP token\n";
- cout << " topic list list bucket notifications/pubsub topics\n";
- cout << " topic get get a bucket notifications/pubsub topic\n";
- cout << " topic rm remove a bucket notifications/pubsub topic\n";
- cout << " subscription get get a pubsub subscription definition\n";
- cout << " subscription rm remove a pubsub subscription\n";
- cout << " subscription pull show events in a pubsub subscription\n";
- cout << " subscription ack ack (remove) an events in a pubsub subscription\n";
+ cout << " topic list list bucket notifications topics\n";
+ cout << " topic get get a bucket notifications topic\n";
+ cout << " topic rm remove a bucket notifications topic\n";
cout << " script put upload a lua script to a context\n";
cout << " script get get the lua script of a context\n";
cout << " script rm remove the lua scripts of a context\n";
cout << " --totp-seconds the time resolution that is being used for TOTP generation\n";
cout << " --totp-window the number of TOTP tokens that are checked before and after the current token when validating token\n";
cout << " --totp-pin the valid value of a TOTP token at a certain time\n";
- cout << "\nBucket notifications/pubsub options:\n";
- cout << " --topic bucket notifications/pubsub topic name\n";
- cout << " --subscription pubsub subscription name\n";
- cout << " --event-id event id in a pubsub subscription\n";
+ cout << "\nBucket notifications options:\n";
+ cout << " --topic bucket notifications topic name\n";
cout << "\nScript options:\n";
cout << " --context context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n";
cout << " --package name of the lua package that should be added/removed to/from the allowlist\n";
RESHARD_STALE_INSTANCES_LIST,
RESHARD_STALE_INSTANCES_DELETE,
PUBSUB_TOPICS_LIST,
- // TODO add "subscription list" command
PUBSUB_TOPIC_GET,
PUBSUB_TOPIC_RM,
- PUBSUB_SUB_GET,
- PUBSUB_SUB_RM,
- PUBSUB_SUB_PULL,
- PUBSUB_EVENT_RM,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
{ "topic list", OPT::PUBSUB_TOPICS_LIST },
{ "topic get", OPT::PUBSUB_TOPIC_GET },
{ "topic rm", OPT::PUBSUB_TOPIC_RM },
- { "subscription get", OPT::PUBSUB_SUB_GET },
- { "subscription rm", OPT::PUBSUB_SUB_RM },
- { "subscription pull", OPT::PUBSUB_SUB_PULL },
- { "subscription ack", OPT::PUBSUB_EVENT_RM },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
OPT::RESHARD_STATUS,
OPT::PUBSUB_TOPICS_LIST,
OPT::PUBSUB_TOPIC_GET,
- OPT::PUBSUB_SUB_GET,
- OPT::PUBSUB_SUB_PULL,
OPT::SCRIPT_GET,
};
}
}
- if (opt_cmd == OPT::PUBSUB_SUB_GET) {
- if (store->get_zone()->get_tier_type() != "pubsub") {
- cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
- return EINVAL;
- }
- if (sub_name.empty()) {
- cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
- return EINVAL;
- }
-
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
- rgw_pubsub_sub_config sub_conf;
-
- auto sub = ps.get_sub(sub_name);
- ret = sub->get_conf(&sub_conf);
- if (ret < 0) {
- cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- encode_json("sub", sub_conf, formatter.get());
- formatter->flush(cout);
- }
-
- if (opt_cmd == OPT::PUBSUB_SUB_RM) {
- if (store->get_zone()->get_tier_type() != "pubsub") {
- cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
- return EINVAL;
- }
- if (sub_name.empty()) {
- cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
- return EINVAL;
- }
-
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
- auto sub = ps.get_sub(sub_name);
- ret = sub->unsubscribe(dpp(), topic_name, null_yield);
- if (ret < 0) {
- cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- }
-
- if (opt_cmd == OPT::PUBSUB_SUB_PULL) {
- if (store->get_zone()->get_tier_type() != "pubsub") {
- cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
- return EINVAL;
- }
- if (sub_name.empty()) {
- cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
- return EINVAL;
- }
-
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
- if (!max_entries_specified) {
- max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS;
- }
- auto sub = ps.get_sub_with_events(sub_name);
- ret = sub->list_events(dpp(), marker, max_entries);
- if (ret < 0) {
- cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- encode_json("result", *sub, formatter.get());
- formatter->flush(cout);
- }
-
- if (opt_cmd == OPT::PUBSUB_EVENT_RM) {
- if (store->get_zone()->get_tier_type() != "pubsub") {
- cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
- return EINVAL;
- }
- if (sub_name.empty()) {
- cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
- return EINVAL;
- }
- if (event_id.empty()) {
- cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
- return EINVAL;
- }
-
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(store), tenant);
-
- auto sub = ps.get_sub_with_events(sub_name);
- ret = sub->remove_event(dpp(), event_id);
- if (ret < 0) {
- cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- }
-
if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
cerr << "ERROR: context was not provided (via --context)" << std::endl;
encode_json("opaqueData", opaque_data, f);
}
-void rgw_pubsub_event::dump(Formatter *f) const
-{
- encode_json("id", id, f);
- encode_json("event", event_name, f);
- utime_t ut(timestamp);
- encode_json("timestamp", ut, f);
- encode_json("info", info, f);
-}
-
void rgw_pubsub_topic::dump(Formatter *f) const
{
encode_json("user", user, f);
return 0;
}
-int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
-{
- int ret = ps->read(sub_meta_obj, result, objv_tracker);
- if (ret < 0 && ret != -ENOENT) {
- ldout(ps->store->ctx(), 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
- return ret;
- }
- return 0;
-}
-
-int RGWPubSub::Sub::write_sub(const DoutPrefixProvider *dpp,
- const rgw_pubsub_sub_config& sub_conf,
- RGWObjVersionTracker *objv_tracker,
- optional_yield y)
-{
- int ret = ps->write(dpp, sub_meta_obj, sub_conf, objv_tracker, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
- return ret;
- }
-
- return 0;
-}
-
-int RGWPubSub::Sub::remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker *objv_tracker,
- optional_yield y)
-{
- int ret = ps->remove(dpp, sub_meta_obj, objv_tracker, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to remove subscription info: ret=" << ret << dendl;
- return ret;
- }
-
- return 0;
-}
-
-int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
-{
- return read_sub(result, nullptr);
-}
-
-int RGWPubSub::Sub::subscribe(const DoutPrefixProvider *dpp, const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
-{
- RGWObjVersionTracker objv_tracker;
- rgw_pubsub_topics topics;
-
- int ret = ps->read_topics(&topics, &objv_tracker);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
- return ret != -ENOENT ? ret : -EINVAL;
- }
-
- auto iter = topics.topics.find(topic);
- if (iter == topics.topics.end()) {
- ldpp_dout(dpp, 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
- return -EINVAL;
- }
-
- auto& t = iter->second;
-
- rgw_pubsub_sub_config sub_conf;
-
- sub_conf.user = rgw_user("", ps->tenant);
- sub_conf.name = sub;
- sub_conf.topic = topic;
- sub_conf.dest = dest;
- sub_conf.s3_id = s3_id;
-
- t.subs.insert(sub);
-
- ret = ps->write_topics(dpp, topics, &objv_tracker, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
- return ret;
- }
-
- ret = write_sub(dpp, sub_conf, nullptr, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
- return ret;
- }
- return 0;
-}
-
-int RGWPubSub::Sub::unsubscribe(const DoutPrefixProvider *dpp, const string& _topic, optional_yield y)
-{
- string topic = _topic;
- RGWObjVersionTracker sobjv_tracker;
-
- if (topic.empty()) {
- rgw_pubsub_sub_config sub_conf;
- int ret = read_sub(&sub_conf, &sobjv_tracker);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
- return ret;
- }
- topic = sub_conf.topic;
- }
-
- RGWObjVersionTracker objv_tracker;
- rgw_pubsub_topics topics;
-
- int ret = ps->read_topics(&topics, &objv_tracker);
- if (ret < 0) {
- // not an error - could be that topic was already deleted
- ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
- } else {
- auto iter = topics.topics.find(topic);
- if (iter != topics.topics.end()) {
- auto& t = iter->second;
-
- t.subs.erase(sub);
-
- ret = ps->write_topics(dpp, topics, &objv_tracker, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
- return ret;
- }
- }
- }
-
- ret = remove_sub(dpp, &sobjv_tracker, y);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
- return ret;
- }
- return 0;
-}
-
-template<typename EventType>
-void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
-{
- encode_json("next_marker", next_marker, f);
- encode_json("is_truncated", is_truncated, f);
-
- Formatter::ArraySection s(*f, EventType::json_type_plural);
- for (auto& event : events) {
- encode_json("", event, f);
- }
-}
-
-template<typename EventType>
-int RGWPubSub::SubWithEvents<EventType>::list_events(const DoutPrefixProvider *dpp, const string& marker, int max_events)
-{
- RGWRados *store = ps->store->getRados();
- rgw_pubsub_sub_config sub_conf;
- int ret = get_conf(&sub_conf);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
- return ret;
- }
-
- RGWBucketInfo bucket_info;
- string tenant;
- ret = store->get_bucket_info(&store->svc, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, null_yield, nullptr);
- if (ret == -ENOENT) {
- list.is_truncated = false;
- return 0;
- }
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
- return ret;
- }
-
- RGWRados::Bucket target(store, bucket_info);
- RGWRados::Bucket::List list_op(&target);
-
- list_op.params.prefix = sub_conf.dest.oid_prefix;
- list_op.params.marker = marker;
-
- std::vector<rgw_bucket_dir_entry> objs;
-
- ret = list_op.list_objects(dpp, max_events, &objs, nullptr, &list.is_truncated, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
- return ret;
- }
- if (list.is_truncated) {
- list.next_marker = list_op.get_next_marker().name;
- }
-
- for (auto& obj : objs) {
- bufferlist bl64;
- bufferlist bl;
- bl64.append(obj.meta.user_data);
- try {
- bl.decode_base64(bl64);
- } catch (buffer::error& err) {
- ldpp_dout(dpp, 1) << "ERROR: failed to event (not a valid base64)" << dendl;
- continue;
- }
- EventType event;
-
- auto iter = bl.cbegin();
- try {
- decode(event, iter);
- } catch (buffer::error& err) {
- ldpp_dout(dpp, 1) << "ERROR: failed to decode event" << dendl;
- continue;
- };
-
- list.events.push_back(event);
- }
- return 0;
-}
-
-template<typename EventType>
-int RGWPubSub::SubWithEvents<EventType>::remove_event(const DoutPrefixProvider *dpp, const string& event_id)
-{
- rgw::sal::RadosStore* store = ps->store;
- rgw_pubsub_sub_config sub_conf;
- int ret = get_conf(&sub_conf);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read sub config: ret=" << ret << dendl;
- return ret;
- }
-
- string tenant;
- std::unique_ptr<rgw::sal::Bucket> bucket;
- ret = store->get_bucket(dpp, nullptr, tenant, sub_conf.dest.bucket_name,
- &bucket, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
- return ret;
- }
- std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(
- sub_conf.dest.oid_prefix + event_id);
- obj->set_atomic();
-
- std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op();
-
- del_op->params.bucket_owner = bucket->get_info().owner;
- del_op->params.versioning_status = bucket->get_info().versioning_status();
-
- ret = del_op->delete_obj(dpp, null_yield);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
- }
- return 0;
-}
-
void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
}
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
}
-template<typename EventType>
-void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
- list.dump(f);
-}
-
-// explicit instantiation for the only two possible types
-// no need to move implementation to header
-template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
-
};
WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
-struct rgw_pubsub_event {
- constexpr static const char* const json_type_plural = "events";
- std::string id;
- std::string event_name;
- std::string source;
- ceph::real_time timestamp;
- JSONFormattable info;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(id, bl);
- encode(event_name, bl);
- encode(source, bl);
- encode(timestamp, bl);
- encode(info, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(id, bl);
- decode(event_name, bl);
- decode(source, bl);
- decode(timestamp, bl);
- decode(info, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
-};
-WRITE_CLASS_ENCODER(rgw_pubsub_event)
-
// setting a unique ID for an event based on object hash and timestamp
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
};
- // base class for subscription
- class Sub {
- friend class RGWPubSub;
- protected:
- RGWPubSub* const ps;
- const std::string sub;
- rgw_raw_obj sub_meta_obj;
-
- int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
- int write_sub(const DoutPrefixProvider *dpp, const rgw_pubsub_sub_config& sub_conf,
- RGWObjVersionTracker* objv_tracker, optional_yield y);
- int remove_sub(const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker, optional_yield y);
- public:
- Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
- ps->get_sub_meta_obj(sub, &sub_meta_obj);
- }
-
- virtual ~Sub() = default;
-
- int subscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw_pubsub_sub_dest& dest, optional_yield y,
- const std::string& s3_id="");
- int unsubscribe(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
- int get_conf(rgw_pubsub_sub_config* result);
-
- static const int DEFAULT_MAX_EVENTS = 100;
- // followint virtual methods should only be called in derived
- virtual int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) {ceph_assert(false);}
- virtual int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) {ceph_assert(false);}
- virtual void dump(Formatter* f) const {ceph_assert(false);}
- };
-
- // subscription with templated list of events to support both S3 compliant and Ceph specific events
- template<typename EventType>
- class SubWithEvents : public Sub {
- private:
- struct list_events_result {
- std::string next_marker;
- bool is_truncated{false};
- void dump(Formatter *f) const;
- std::vector<EventType> events;
- } list;
-
- public:
- SubWithEvents(RGWPubSub *_ps, const std::string& _sub) : Sub(_ps, _sub) {}
-
- virtual ~SubWithEvents() = default;
-
- int list_events(const DoutPrefixProvider *dpp, const std::string& marker, int max_events) override;
- int remove_event(const DoutPrefixProvider *dpp, const std::string& event_id) override;
- void dump(Formatter* f) const override;
- };
-
using BucketRef = std::shared_ptr<Bucket>;
- using SubRef = std::shared_ptr<Sub>;
BucketRef get_bucket(const rgw_bucket& bucket) {
return std::make_shared<Bucket>(this, bucket);
}
- SubRef get_sub(const std::string& sub) {
- return std::make_shared<Sub>(this, sub);
- }
-
- SubRef get_sub_with_events(const std::string& sub) {
- auto tmpsub = Sub(this, sub);
- rgw_pubsub_sub_config conf;
- if (tmpsub.get_conf(&conf) < 0) {
- return nullptr;
- }
- if (conf.s3_id.empty()) {
- return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
- }
- return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
- }
-
void get_meta_obj(rgw_raw_obj *obj) const;
void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
static const ack_level_t ACK_LEVEL_ANY = 0;
static const ack_level_t ACK_LEVEL_NON_ERROR = 1;
- // PostCR implements async execution of RGWPostHTTPData via coroutine
- class PostCR : public RGWPostHTTPData, public RGWSimpleCoroutine {
- private:
- RGWDataSyncEnv* const sync_env;
- bufferlist read_bl;
- const ack_level_t ack_level;
-
- public:
- PostCR(const std::string& _post_data,
- RGWDataSyncEnv* _sync_env,
- const std::string& endpoint,
- ack_level_t _ack_level,
- bool verify_ssl) :
- RGWPostHTTPData(_sync_env->cct, "POST", endpoint, &read_bl, verify_ssl),
- RGWSimpleCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- ack_level (_ack_level) {
- // ctor also set the data to send
- set_post_data(_post_data);
- set_send_length(_post_data.length());
- }
-
- // send message to endpoint
- int send_request(const DoutPrefixProvider *dpp) override {
- init_new_io(this);
- const auto rc = sync_env->http_manager->add_request(this);
- if (rc < 0) {
- return rc;
- }
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
- return 0;
- }
-
- // wait for reply
- int request_complete() override {
- if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
- if (ack_level == ACK_LEVEL_ANY) {
- return 0;
- } else if (ack_level == ACK_LEVEL_NON_ERROR) {
- // TODO check result code to be non-error
- } else {
- // TODO: check that result code == ack_level
- }
- return -1;
- }
- };
-
public:
RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
}
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
- return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
- }
-
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
- return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
- }
-
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
}
throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
}
-
- // NoAckPublishCR implements async amqp publishing via coroutine
- // This coroutine ends when it send the message and does not wait for an ack
- class NoAckPublishCR : public RGWCoroutine {
- private:
- const std::string topic;
- amqp::connection_ptr_t conn;
- const std::string message;
-
- public:
- NoAckPublishCR(CephContext* cct,
- const std::string& _topic,
- amqp::connection_ptr_t& _conn,
- const std::string& _message) :
- RGWCoroutine(cct),
- topic(_topic), conn(_conn), message(_message) {}
-
- // send message to endpoint, without waiting for reply
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- const auto rc = amqp::publish(conn, topic, message);
- if (rc < 0) {
- return set_cr_error(rc);
- }
- return set_cr_done();
- }
- return 0;
- }
- };
-
- // AckPublishCR implements async amqp publishing via coroutine
- // This coroutine ends when an ack is received from the borker
- // note that it does not wait for an ack fron the end client
- class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
- private:
- const std::string topic;
- amqp::connection_ptr_t conn;
- const std::string message;
-
- public:
- AckPublishCR(CephContext* cct,
- const std::string& _topic,
- amqp::connection_ptr_t& _conn,
- const std::string& _message) :
- RGWCoroutine(cct),
- topic(_topic), conn(_conn), message(_message) {}
-
- // send message to endpoint, waiting for reply
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- yield {
- init_new_io(this);
- const auto rc = amqp::publish_with_confirm(conn,
- topic,
- message,
- std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
- if (rc < 0) {
- // failed to publish, does not wait for reply
- return set_cr_error(rc);
- }
- // mark as blocked on the amqp answer
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
- io_block();
- return 0;
- }
- return set_cr_done();
- }
- return 0;
- }
-
- // callback invoked from the amqp manager thread when ack/nack is received
- void request_complete(int status) {
- ceph_assert(!is_done());
- if (status != 0) {
- // server replied with a nack
- set_cr_error(status);
- }
- io_complete();
- if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
- }
-
- // TODO: why are these mandatory in RGWIOProvider?
- void set_io_user_info(void *_user_info) override {
- }
-
- void *get_io_user_info() override {
- return nullptr;
- }
- };
public:
RGWPubSubAMQPEndpoint(const std::string& _endpoint,
}
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
- ceph_assert(conn);
- if (ack_level == ack_level_t::None) {
- return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- } else {
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- }
- }
-
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
- ceph_assert(conn);
- if (ack_level == ack_level_t::None) {
- return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- } else {
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- }
- }
-
// this allows waiting untill "finish()" is called from a different thread
// waiting could be blocking the waiting thread or yielding, depending
// with compilation flag support and whether the optional_yield is set
static const std::string AMQP_SCHEMA("amqp");
#endif // ifdef WITH_RADOSGW_AMQP_ENDPOINT
-
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
private:
throw configuration_error("Kafka: invalid kafka-ack-level: " + str_ack_level);
}
- // NoAckPublishCR implements async kafka publishing via coroutine
- // This coroutine ends when it send the message and does not wait for an ack
- class NoAckPublishCR : public RGWCoroutine {
- private:
- const std::string topic;
- kafka::connection_ptr_t conn;
- const std::string message;
-
- public:
- NoAckPublishCR(CephContext* cct,
- const std::string& _topic,
- kafka::connection_ptr_t& _conn,
- const std::string& _message) :
- RGWCoroutine(cct),
- topic(_topic), conn(_conn), message(_message) {}
-
- // send message to endpoint, without waiting for reply
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- const auto rc = kafka::publish(conn, topic, message);
- if (rc < 0) {
- return set_cr_error(rc);
- }
- return set_cr_done();
- }
- return 0;
- }
- };
-
- // AckPublishCR implements async kafka publishing via coroutine
- // This coroutine ends when an ack is received from the borker
- // note that it does not wait for an ack fron the end client
- class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
- private:
- const std::string topic;
- kafka::connection_ptr_t conn;
- const std::string message;
-
- public:
- AckPublishCR(CephContext* cct,
- const std::string& _topic,
- kafka::connection_ptr_t& _conn,
- const std::string& _message) :
- RGWCoroutine(cct),
- topic(_topic), conn(_conn), message(_message) {}
-
- // send message to endpoint, waiting for reply
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- yield {
- init_new_io(this);
- const auto rc = kafka::publish_with_confirm(conn,
- topic,
- message,
- std::bind(&AckPublishCR::request_complete, this, std::placeholders::_1));
- if (rc < 0) {
- // failed to publish, does not wait for reply
- return set_cr_error(rc);
- }
- // mark as blocked on the kafka answer
- if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
- io_block();
- return 0;
- }
- return set_cr_done();
- }
- return 0;
- }
-
- // callback invoked from the kafka manager thread when ack/nack is received
- void request_complete(int status) {
- ceph_assert(!is_done());
- if (status != 0) {
- // server replied with a nack
- set_cr_error(status);
- }
- io_complete();
- if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
- }
-
- // TODO: why are these mandatory in RGWIOProvider?
- void set_io_user_info(void *_user_info) override {
- }
-
- void *get_io_user_info() override {
- return nullptr;
- }
- };
-
public:
RGWPubSubKafkaEndpoint(const std::string& _endpoint,
const std::string& _topic,
}
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
- ceph_assert(conn);
- if (ack_level == ack_level_t::None) {
- return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- } else {
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- }
- }
-
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
- ceph_assert(conn);
- if (ack_level == ack_level_t::None) {
- return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- } else {
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
- }
- }
-
// this allows waiting untill "finish()" is called from a different thread
// waiting could be blocking the waiting thread or yielding, depending
// with compilation flag support and whether the optional_yield is set
// TODO the env should be used as a template parameter to differentiate the source that triggers the pushes
class RGWDataSyncEnv;
-class RGWCoroutine;
class RGWHTTPArgs;
-struct rgw_pubsub_event;
struct rgw_pubsub_s3_event;
// endpoint base class all endpoint - types should derive from it
// may throw a configuration_error if creation fails
static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
- // this method is used in order to send notification (Ceph specific) and wait for completion
- // in async manner via a coroutine when invoked in the data sync environment
- virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0;
-
- // this method is used in order to send notification (S3 compliant) and wait for completion
- // in async manner via a coroutine when invoked in the data sync environment
- virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) = 0;
-
// this method is used in order to send notification (S3 compliant) and wait for completion
// in async manner via a coroutine when invoked in the frontend environment
virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0;
}
// dest object only stores endpoint info
- // bucket to store events/records will be set only when subscription is created
- dest.bucket_name = "";
- dest.oid_prefix = "";
dest.arn_topic = topic_name;
// the topic ARN will be sent in the reply
const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::BucketRef& b, optional_yield y, RGWPubSub& ps) {
// delete all notifications of on a bucket
for (const auto& topic : bucket_topics.topics) {
- // remove the auto generated subscription of the topic (if exist)
- rgw_pubsub_topic_subs topic_subs;
- int op_ret = ps.get_topic(topic.first, &topic_subs);
- for (const auto& topic_sub_name : topic_subs.subs) {
- auto sub = ps.get_sub(topic_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldpp_dout(dpp, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
- return op_ret;
- }
- if (!sub_conf.s3_id.empty()) {
- // S3 notification, has autogenerated subscription
- const auto& sub_topic_name = sub_conf.topic;
- op_ret = sub->unsubscribe(dpp, sub_topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(dpp, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
- return op_ret;
- }
- }
- }
- op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+ const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
if (op_ret < 0) {
return op_ret;
}
auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
- std::string data_bucket_prefix = "";
- std::string data_oid_prefix = "";
- bool push_only = true;
-
if(configurations.list.empty()) {
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
return;
}
ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
-
- if (!push_only) {
- // generate the subscription with destination information from the original topic
- rgw_pubsub_sub_dest dest = topic_info.dest;
- dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
- dest.oid_prefix = data_oid_prefix + notif_name + "/";
- auto sub = ps->get_sub(notif_name);
- op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
- // rollback generated notification (ignore return value)
- b->remove_notification(this, unique_topic_name, y);
- // rollback generated topic (ignore return value)
- ps->remove_topic(this, unique_topic_name, y);
- return;
- }
- ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
- }
}
}
// delete a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
if (unique_topic) {
- // remove the auto generated subscription according to notification name (if exist)
const auto unique_topic_name = unique_topic->get().topic.name;
- auto sub = ps->get_sub(notif_name);
- op_ret = sub->unsubscribe(this, unique_topic_name, y);
- if (op_ret < 0 && op_ret != -ENOENT) {
- ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
- return;
- }
op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, *ps);
return;
}
return true;
}
-bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) {
- return sub.dest.stored_secret;
-}
-
bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
return topic.topic.dest.stored_secret;
}
ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
}
-void RGWPSCreateSubOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto sub = ps->get_sub(sub_name);
- op_ret = sub->subscribe(this, topic_name, dest, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSGetSubOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto sub = ps->get_sub(sub_name);
- op_ret = sub->get_conf(&result);
- if (subscription_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
- ldpp_dout(this, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
- op_ret = -EPERM;
- return;
- }
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSDeleteSubOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto sub = ps->get_sub(sub_name);
- op_ret = sub->unsubscribe(this, topic_name, y);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSAckSubEventOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- auto sub = ps->get_sub_with_events(sub_name);
- op_ret = sub->remove_event(s, event_id);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
-}
-
-void RGWPSPullSubEventsOp::execute(optional_yield y) {
- op_ret = get_params();
- if (op_ret < 0) {
- return;
- }
- ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
- sub = ps->get_sub_with_events(sub_name);
- if (!sub) {
- op_ret = -ENOENT;
- ldpp_dout(this, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
- return;
- }
- op_ret = sub->list_events(s, marker, max_entries);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
- return;
- }
- ldpp_dout(this, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
-}
-
-
int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
int ret = get_params();
if (ret < 0) {
uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
};
-// create a subscription
-class RGWPSCreateSubOp : public RGWDefaultResponseOp {
-protected:
- std::string sub_name;
- std::string topic_name;
- std::optional<RGWPubSub> ps;
- rgw_pubsub_sub_dest dest;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_subscription_create"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_CREATE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// get subscription information (including push-endpoint if exist)
-class RGWPSGetSubOp : public RGWOp {
-protected:
- std::string sub_name;
- std::optional<RGWPubSub> ps;
- rgw_pubsub_sub_config result;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_subscription_get"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_GET; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-// delete subscription
-class RGWPSDeleteSubOp : public RGWDefaultResponseOp {
-protected:
- std::string sub_name;
- std::string topic_name;
- std::optional<RGWPubSub> ps;
-
- virtual int get_params() = 0;
-
-public:
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_subscription_delete"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_DELETE; }
- uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// acking of an event
-class RGWPSAckSubEventOp : public RGWDefaultResponseOp {
-protected:
- std::string sub_name;
- std::string event_id;
- std::optional<RGWPubSub> ps;
-
- virtual int get_params() = 0;
-
-public:
- RGWPSAckSubEventOp() {}
-
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_subscription_ack"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_ACK; }
- uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// fetching events from a subscription
-// dpending on whether the subscription was created via s3 compliant API or not
-// the matching events will be returned
-class RGWPSPullSubEventsOp : public RGWOp {
-protected:
- int max_entries{0};
- std::string sub_name;
- std::string marker;
- std::optional<RGWPubSub> ps;
- RGWPubSub::SubRef sub;
-
- virtual int get_params() = 0;
-
-public:
- RGWPSPullSubEventsOp() {}
-
- int verify_permission(optional_yield) override {
- return 0;
- }
- void pre_exec() override {
- rgw_bucket_object_pre_exec(s);
- }
- void execute(optional_yield y) override;
-
- const char* name() const override { return "pubsub_subscription_pull"; }
- RGWOpType get_type() override { return RGW_OP_PUBSUB_SUB_PULL; }
- uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
// notification creation
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
protected:
mfa remove delete MFA TOTP token
mfa check check MFA TOTP token
mfa resync re-sync MFA TOTP token
- topic list list bucket notifications/pubsub topics
- topic get get a bucket notifications/pubsub topic
- topic rm remove a bucket notifications/pubsub topic
- subscription get get a pubsub subscription definition
- subscription rm remove a pubsub subscription
- subscription pull show events in a pubsub subscription
- subscription ack ack (remove) an events in a pubsub subscription
+ topic list list bucket notifications topics
+ topic get get a bucket notifications topic
+ topic rm remove a bucket notifications topic
script put upload a lua script to a context
script get get the lua script of a context
script rm remove the lua scripts of a context
--totp-window the number of TOTP tokens that are checked before and after the current token when validating token
--totp-pin the valid value of a TOTP token at a certain time
- Bucket notifications/pubsub options:
- --topic bucket notifications/pubsub topic name
- --subscription pubsub subscription name
- --event-id event id in a pubsub subscription
+ Bucket notifications options:
+ --topic bucket notifications topic name
Script options:
--context context in which the script runs. one of: prerequest, postrequest, background, getdata, putdata