From: Yehuda Sadeh Date: Wed, 4 Jul 2018 00:26:51 +0000 (-0700) Subject: rgw-admin: pubsub: pull sub, rm event X-Git-Tag: v14.1.0~616^2~46 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2a9b315a71adc8a9c35f63e76140b94cfe71758f;p=ceph-ci.git rgw-admin: pubsub: pull sub, rm event Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 9286bee96b2..52ac87c61aa 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -538,6 +538,8 @@ enum { OPT_PUBSUB_SUB_GET, OPT_PUBSUB_SUB_CREATE, OPT_PUBSUB_SUB_RM, + OPT_PUBSUB_SUB_PULL, + OPT_PUBSUB_EVENT_RM, }; static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more) @@ -553,6 +555,7 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ strcmp(cmd, "data") == 0 || strcmp(cmd, "datalog") == 0 || strcmp(cmd, "error") == 0 || + strcmp(cmd, "event") == 0 || strcmp(cmd, "gc") == 0 || strcmp(cmd, "global") == 0 || strcmp(cmd, "key") == 0 || @@ -1019,6 +1022,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_PUBSUB_SUB_CREATE; if (strcmp(cmd, "rm") == 0) return OPT_PUBSUB_SUB_RM; + if (strcmp(cmd, "pull") == 0) + return OPT_PUBSUB_SUB_PULL; + } else if (strcmp(prev_cmd, "event") == 0) { + if (strcmp(cmd, "rm") == 0) + return OPT_PUBSUB_EVENT_RM; } } return -EINVAL; @@ -2846,6 +2854,7 @@ int main(int argc, const char **argv) string sub_oid_prefix; string sub_dest_bucket; string sub_push_endpoint; + string event_id; for (std::vector::iterator i = args.begin(); i != args.end(); ) { if (ceph_argparse_double_dash(args, i)) { @@ -3177,6 +3186,8 @@ int main(int argc, const char **argv) sub_dest_bucket = val; } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) { sub_push_endpoint = val; + } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) { + event_id = val; } else if (strncmp(*i, "-", 1) == 0) { cerr << "ERROR: invalid flag " << *i << std::endl; return EINVAL; @@ -7968,5 +7979,62 @@ next: formatter->flush(cout); } + if (opt_cmd == OPT_PUBSUB_SUB_PULL) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (sub_name.empty()) { + cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + RGWUserPubSub::list_events_result result; + + if (!max_entries_specified) { + max_entries = 100; + } + ret = ups.list_events(sub_name, marker, max_entries, &result); + if (ret < 0) { + cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + encode_json("result", result, formatter); + formatter->flush(cout); + } + + if (opt_cmd == OPT_PUBSUB_EVENT_RM) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + if (sub_name.empty()) { + cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl; + return EINVAL; + } + if (event_id.empty()) { + cerr << "ERROR: event id was not provided (via --event-id)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + ret = ups.remove_event(sub_name, event_id); + if (ret < 0) { + cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + return 0; } diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 1d7bb6dd127..5c926ec9db8 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -1,3 +1,4 @@ +#include "rgw_b64.h" #include "rgw_rados.h" #include "rgw_pubsub.h" #include "rgw_tools.h" @@ -364,3 +365,112 @@ int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rg return 0; } +void RGWUserPubSub::list_events_result::dump(Formatter *f) const +{ + encode_json("next_marker", next_marker, f); + encode_json("is_truncated", is_truncated, f); + + Formatter::ArraySection s(*f, "events"); + for (auto& event : events) { + encode_json("event", event, f); + } +} + +int RGWUserPubSub::list_events(const string& sub_name, + const string& marker, int max_events, + list_events_result *result) +{ + rgw_pubsub_user_sub_config sub_conf; + int ret = get_sub(sub_name, &sub_conf); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl; + return ret; + } + + RGWBucketInfo bucket_info; + string tenant; + RGWObjectCtx obj_ctx(store); + ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; + return ret; + } + + RGWRados::Bucket target(store, bucket_info); + RGWRados::Bucket::List list_op(&target); + + list_op.params.prefix = sub_conf.dest.oid_prefix; + list_op.params.marker = marker; + + vector objs; + + ret = list_op.list_objects(max_events, &objs, nullptr, &result->is_truncated); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; + return ret; + } + if (result->is_truncated) { + result->next_marker = list_op.get_next_marker().name; + } + + for (auto& obj : objs) { + bufferlist bl64; + bufferlist bl; + bl64.append(obj.meta.user_data); + try { + bl.decode_base64(bl64); + } catch (buffer::error& err) { + ldout(store->ctx(), 0) << "ERROR: failed to event (not a valid base64)" << dendl; + continue; + } + rgw_pubsub_event event; + + auto iter = bl.cbegin(); + try { + decode(event, iter); + } catch (buffer::error& err) { + ldout(store->ctx(), 0) << "ERROR: failed to decode event" << dendl; + continue; + }; + + result->events.push_back(event); + } + return 0; +} + +int RGWUserPubSub::remove_event(const string& sub_name, const string& event_id) +{ + rgw_pubsub_user_sub_config sub_conf; + int ret = get_sub(sub_name, &sub_conf); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl; + return ret; + } + + RGWBucketInfo bucket_info; + string tenant; + RGWObjectCtx obj_ctx(store); + ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl; + return ret; + } + + rgw_bucket& bucket = bucket_info.bucket; + + rgw_obj obj(bucket, event_id); + + obj_ctx.obj.set_atomic(obj); + + RGWRados::Object del_target(store, bucket_info, obj_ctx, obj); + RGWRados::Object::Delete del_op(&del_target); + + del_op.params.bucket_owner = bucket_info.owner; + del_op.params.versioning_status = bucket_info.versioning_status(); + + ret = del_op.delete_obj(); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl; + } + return 0; +} diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 52cf5e870df..c6b8f350cb1 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -238,6 +238,17 @@ public: int get_sub(const string& name, rgw_pubsub_user_sub_config *result); int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); + + struct list_events_result { + string next_marker; + bool is_truncated{false}; + std::vector events; + + void dump(Formatter *f) const; + }; + + int list_events(const string& sub_name, const string& marker, int max_events, list_events_result *result); + int remove_event(const string& sub_name, const string& event_id); }; template diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index af087f4bdee..4de58cab62d 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -354,6 +354,10 @@ public: void format(bufferlist *bl) { bl->append(json_str("event", *event)); } + + void encode_event(bufferlist& bl) const { + encode(*event, bl); + } }; @@ -559,10 +563,12 @@ class PSSubscription { put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id()); pse.format(&put_obj.data); - + { + bufferlist bl; + pse.encode_event(bl); bufferlist bl64; - put_obj.data.encode_base64(bl64); + bl.encode_base64(bl64); put_obj.user_data = bl64.to_str(); }