]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw-admin: pubsub: pull sub, rm event
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 4 Jul 2018 00:26:51 +0000 (17:26 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc

index 9286bee96b29d5a2986ea7dc005c87c48fefe9de..52ac87c61aaf8acbad227b34954306a09d20f59a 100644 (file)
@@ -538,6 +538,8 @@ enum {
   OPT_PUBSUB_SUB_GET,
   OPT_PUBSUB_SUB_CREATE,
   OPT_PUBSUB_SUB_RM,
+  OPT_PUBSUB_SUB_PULL,
+  OPT_PUBSUB_EVENT_RM,
 };
 
 static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
@@ -553,6 +555,7 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
       strcmp(cmd, "data") == 0 ||
       strcmp(cmd, "datalog") == 0 ||
       strcmp(cmd, "error") == 0 ||
+      strcmp(cmd, "event") == 0 ||
       strcmp(cmd, "gc") == 0 ||
       strcmp(cmd, "global") == 0 ||
       strcmp(cmd, "key") == 0 ||
@@ -1019,6 +1022,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
         return OPT_PUBSUB_SUB_CREATE;
       if (strcmp(cmd, "rm") == 0)
         return OPT_PUBSUB_SUB_RM;
+      if (strcmp(cmd, "pull") == 0)
+        return OPT_PUBSUB_SUB_PULL;
+    } else if (strcmp(prev_cmd, "event") == 0) {
+      if (strcmp(cmd, "rm") == 0)
+        return OPT_PUBSUB_EVENT_RM;
     }
   }
   return -EINVAL;
@@ -2846,6 +2854,7 @@ int main(int argc, const char **argv)
   string sub_oid_prefix;
   string sub_dest_bucket;
   string sub_push_endpoint;
+  string event_id;
 
   for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
     if (ceph_argparse_double_dash(args, i)) {
@@ -3177,6 +3186,8 @@ int main(int argc, const char **argv)
       sub_dest_bucket = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--sub-push-endpoint", (char*)NULL)) {
       sub_push_endpoint = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
+      event_id = val;
     } else if (strncmp(*i, "-", 1) == 0) {
       cerr << "ERROR: invalid flag " << *i << std::endl;
       return EINVAL;
@@ -7968,5 +7979,62 @@ next:
     formatter->flush(cout);
   }
 
+ if (opt_cmd == OPT_PUBSUB_SUB_PULL) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (sub_name.empty()) {
+      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    RGWUserPubSub::list_events_result result;
+
+    if (!max_entries_specified) {
+      max_entries = 100;
+    }
+    ret = ups.list_events(sub_name, marker, max_entries, &result);
+    if (ret < 0) {
+      cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    encode_json("result", result, formatter);
+    formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT_PUBSUB_EVENT_RM) {
+    if (get_tier_type(store) != "pubsub") {
+      cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+      return EINVAL;
+    }
+    if (user_id.empty()) {
+      cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+      return EINVAL;
+    }
+    if (sub_name.empty()) {
+      cerr << "ERROR: subscription name was not provided (via --sub-name)" << std::endl;
+      return EINVAL;
+    }
+    if (event_id.empty()) {
+      cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
+      return EINVAL;
+    }
+    RGWUserInfo& user_info = user_op.get_user_info();
+    RGWUserPubSub ups(store, user_info.user_id);
+
+    ret = ups.remove_event(sub_name, event_id);
+    if (ret < 0) {
+      cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+  }
+
   return 0;
 }
index 1d7bb6dd1272b8a33e688cbf686864a8d13c8eca..5c926ec9db803de850a1d0980e447c0d1ef45ed6 100644 (file)
@@ -1,3 +1,4 @@
+#include "rgw_b64.h"
 #include "rgw_rados.h"
 #include "rgw_pubsub.h"
 #include "rgw_tools.h"
@@ -364,3 +365,112 @@ int RGWUserPubSub::remove_sub(const string& name, const string& _topic, const rg
   return 0;
 }
 
+void RGWUserPubSub::list_events_result::dump(Formatter *f) const
+{
+  encode_json("next_marker", next_marker, f);
+  encode_json("is_truncated", is_truncated, f);
+
+  Formatter::ArraySection s(*f, "events");
+  for (auto& event : events) {
+    encode_json("event", event, f);
+  }
+}
+
+int RGWUserPubSub::list_events(const string& sub_name,
+                               const string& marker, int max_events,
+                               list_events_result *result)
+{
+  rgw_pubsub_user_sub_config sub_conf;
+  int ret = get_sub(sub_name, &sub_conf);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
+    return ret;
+  }
+
+  RGWBucketInfo bucket_info;
+  string tenant;
+  RGWObjectCtx obj_ctx(store);
+  ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+    return ret;
+  }
+
+  RGWRados::Bucket target(store, bucket_info);
+  RGWRados::Bucket::List list_op(&target);
+
+  list_op.params.prefix = sub_conf.dest.oid_prefix;
+  list_op.params.marker = marker;
+
+  vector<rgw_bucket_dir_entry> objs;
+
+  ret = list_op.list_objects(max_events, &objs, nullptr, &result->is_truncated);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to list bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+    return ret;
+  }
+  if (result->is_truncated) {
+    result->next_marker = list_op.get_next_marker().name;
+  }
+
+  for (auto& obj : objs) {
+    bufferlist bl64;
+    bufferlist bl;
+    bl64.append(obj.meta.user_data);
+    try {
+      bl.decode_base64(bl64);
+    } catch (buffer::error& err) {
+      ldout(store->ctx(), 0) << "ERROR: failed to event (not a valid base64)" << dendl;
+      continue;
+    }
+    rgw_pubsub_event event;
+
+    auto iter = bl.cbegin();
+    try {
+      decode(event, iter);
+    } catch (buffer::error& err) {
+      ldout(store->ctx(), 0) << "ERROR: failed to decode event" << dendl;
+      continue;
+    };
+
+    result->events.push_back(event);
+  }
+  return 0;
+}
+
+int RGWUserPubSub::remove_event(const string& sub_name, const string& event_id)
+{
+  rgw_pubsub_user_sub_config sub_conf;
+  int ret = get_sub(sub_name, &sub_conf);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
+    return ret;
+  }
+
+  RGWBucketInfo bucket_info;
+  string tenant;
+  RGWObjectCtx obj_ctx(store);
+  ret = store->get_bucket_info(obj_ctx, tenant, sub_conf.dest.bucket_name, bucket_info, nullptr, nullptr);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to read bucket info for events bucket: bucket=" << sub_conf.dest.bucket_name << " ret=" << ret << dendl;
+    return ret;
+  }
+
+  rgw_bucket& bucket = bucket_info.bucket;
+
+  rgw_obj obj(bucket, event_id);
+
+  obj_ctx.obj.set_atomic(obj);
+
+  RGWRados::Object del_target(store, bucket_info, obj_ctx, obj);
+  RGWRados::Object::Delete del_op(&del_target);
+
+  del_op.params.bucket_owner = bucket_info.owner;
+  del_op.params.versioning_status = bucket_info.versioning_status();
+
+  ret = del_op.delete_obj();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to remove event (obj=" << obj << "): ret=" << ret << dendl;
+  }
+  return 0;
+}
index 52cf5e870df2253c13f66d5bb26a9f20dd2d8a3d..c6b8f350cb136ae3ae55692c980586472dd6b2e1 100644 (file)
@@ -238,6 +238,17 @@ public:
   int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
   int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
   int remove_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
+
+  struct list_events_result {
+    string next_marker;
+    bool is_truncated{false};
+    std::vector<rgw_pubsub_event> events;
+
+    void dump(Formatter *f) const;
+  };
+
+  int list_events(const string& sub_name, const string& marker, int max_events, list_events_result *result);
+  int remove_event(const string& sub_name, const string& event_id);
 };
 
 template <class T>
index af087f4bdee3810ca10eb4af8a6a7e4d1db6132f..4de58cab62d0ab8d31b9a5ff6847612e7bb97b71 100644 (file)
@@ -354,6 +354,10 @@ public:
   void format(bufferlist *bl) {
     bl->append(json_str("event", *event));
   }
+
+  void encode_event(bufferlist& bl) const {
+    encode(*event, bl);
+  }
         
 };
 
@@ -559,10 +563,12 @@ class PSSubscription {
         put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id());
 
         pse.format(&put_obj.data);
-
+       
         {
+          bufferlist bl;
+          pse.encode_event(bl);
           bufferlist bl64;
-          put_obj.data.encode_base64(bl64);
+          bl.encode_base64(bl64);
           put_obj.user_data = bl64.to_str();
         }