OPT_PUBSUB_SUB_GET,
OPT_PUBSUB_SUB_CREATE,
OPT_PUBSUB_SUB_RM,
+ OPT_PUBSUB_SUB_PULL,
+ OPT_PUBSUB_EVENT_RM,
};
static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
strcmp(cmd, "data") == 0 ||
strcmp(cmd, "datalog") == 0 ||
strcmp(cmd, "error") == 0 ||
+ strcmp(cmd, "event") == 0 ||
strcmp(cmd, "gc") == 0 ||
strcmp(cmd, "global") == 0 ||
strcmp(cmd, "key") == 0 ||
return OPT_PUBSUB_SUB_CREATE;
if (strcmp(cmd, "rm") == 0)
return OPT_PUBSUB_SUB_RM;
+ if (strcmp(cmd, "pull") == 0)
+ return OPT_PUBSUB_SUB_PULL;
+ } else if (strcmp(prev_cmd, "event") == 0) {
+ if (strcmp(cmd, "rm") == 0)
+ return OPT_PUBSUB_EVENT_RM;
}
}
return -EINVAL;
string sub_oid_prefix;
string sub_dest_bucket;
string sub_push_endpoint;
+ string event_id;
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
sub_dest_bucket = val;
} else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) {
sub_push_endpoint = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
+ event_id = val;
} else if (strncmp(*i, "-", 1) == 0) {
cerr << "ERROR: invalid flag " << *i << std::endl;
return EINVAL;
formatter->flush(cout);
}
+ if (opt_cmd == OPT_PUBSUB_SUB_PULL) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (sub_name.empty()) {
+ cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWUserPubSub::list_events_result result;
+
+ if (!max_entries_specified) {
+ max_entries = 100;
+ }
+ ret = ups.list_events(sub_name, marker, max_entries, &result);
+ if (ret < 0) {
+ cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ encode_json("result", result, formatter);
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT_PUBSUB_EVENT_RM) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ if (sub_name.empty()) {
+ cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+ return EINVAL;
+ }
+ if (event_id.empty()) {
+ cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ ret = ups.remove_event(sub_name, event_id);
+ if (ret < 0) {
+ cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
return 0;
}
+#include "rgw_b64.h"
#include "rgw_rados.h"
#include "rgw_pubsub.h"
#include "rgw_tools.h"
return 0;
}
+void RGWUserPubSub::list_events_result::dump(Formatter *f) const
+{
+ encode_json("next_marker", next_marker, f);
+ encode_json("is_truncated", is_truncated, f);
+
+ Formatter::ArraySection s(*f, "events");
+ for (auto& event : events) {
+ encode_json("event", event, f);
+ }
+}
+
+int RGWUserPubSub::list_events(const string& sub_name,
+ const string& marker, int max_events,
+ list_events_result *result)
+{
+ rgw_pubsub_user_sub_config sub_conf;
+ int ret = get_sub(sub_name, &sub_conf);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
+ return ret;
+ }
+
+ RGWBucketInfo bucket_info;
+ string tenant;
+ RGWObjectCtx obj_ctx(store);
+ ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+ return ret;
+ }
+
+ RGWRados::Bucket target(store, bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+
+ list_op.params.prefix = sub_conf.dest.oid_prefix;
+ list_op.params.marker = marker;
+
+ vector<rgw_bucket_dir_entry> objs;
+
+ ret = list_op.list_objects(max_events, &objs, nullptr, &result->is_truncated);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+ return ret;
+ }
+ if (result->is_truncated) {
+ result->next_marker = list_op.get_next_marker().name;
+ }
+
+ for (auto& obj : objs) {
+ bufferlist bl64;
+ bufferlist bl;
+ bl64.append(obj.meta.user_data);
+ try {
+ bl.decode_base64(bl64);
+ } catch (buffer::error& err) {
+ ldout(store->ctx(), 0) << "ERROR: failed to event (not a valid base64)" << dendl;
+ continue;
+ }
+ rgw_pubsub_event event;
+
+ auto iter = bl.cbegin();
+ try {
+ decode(event, iter);
+ } catch (buffer::error& err) {
+ ldout(store->ctx(), 0) << "ERROR: failed to decode event" << dendl;
+ continue;
+ };
+
+ result->events.push_back(event);
+ }
+ return 0;
+}
+
+int RGWUserPubSub::remove_event(const string& sub_name, const string& event_id)
+{
+ rgw_pubsub_user_sub_config sub_conf;
+ int ret = get_sub(sub_name, &sub_conf);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
+ return ret;
+ }
+
+ RGWBucketInfo bucket_info;
+ string tenant;
+ RGWObjectCtx obj_ctx(store);
+ ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+ return ret;
+ }
+
+ rgw_bucket& bucket = bucket_info.bucket;
+
+ rgw_obj obj(bucket, event_id);
+
+ obj_ctx.obj.set_atomic(obj);
+
+ RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
+ RGWRados::Object::Delete del_op(&del_target);
+
+ del_op.params.bucket_owner = bucket_info.owner;
+ del_op.params.versioning_status = bucket_info.versioning_status();
+
+ ret = del_op.delete_obj();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
+ }
+ return 0;
+}
int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+
+ struct list_events_result {
+ string next_marker;
+ bool is_truncated{false};
+ std::vector<rgw_pubsub_event> events;
+
+ void dump(Formatter *f) const;
+ };
+
+ int list_events(const string& sub_name, const string& marker, int max_events, list_events_result *result);
+ int remove_event(const string& sub_name, const string& event_id);
};
template <class T>
void format(bufferlist *bl) {
bl->append(json_str("event", *event));
}
+
+ void encode_event(bufferlist& bl) const {
+ encode(*event, bl);
+ }
};
put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id());
pse.format(&put_obj.data);
-
+
{
+ bufferlist bl;
+ pse.encode_event(bl);
bufferlist bl64;
- put_obj.data.encode_base64(bl64);
+ bl.encode_base64(bl64);
put_obj.user_data = bl64.to_str();
}