]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: push notifications from ops
authorYuval Lifshitz <yuvalif@yahoo.com>
Mon, 12 Aug 2019 16:48:15 +0000 (19:48 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Tue, 10 Sep 2019 15:54:05 +0000 (18:54 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
24 files changed:
src/mrgw.sh
src/rgw/CMakeLists.txt
src/rgw/rgw_admin.cc
src/rgw/rgw_amqp.cc
src/rgw/rgw_common.h
src/rgw/rgw_main.cc
src/rgw/rgw_notify.cc [new file with mode: 0644]
src/rgw/rgw_notify.h [new file with mode: 0644]
src/rgw/rgw_notify_event_type.cc [new file with mode: 0644]
src/rgw/rgw_notify_event_type.h [new file with mode: 0644]
src/rgw/rgw_op.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_pubsub_push.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub_rest.cc
src/test/rgw/amqp_mock.cc
src/test/rgw/amqp_mock.h
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py
src/test/rgw/test_multi.md
src/test/rgw/test_rgw_amqp.cc

index c27ba25bcab118d5e5599926d2ecfc1f75ac0aa8..972d8099c166e0fe41b3c4d91e23285ac7d93c5e 100755 (executable)
@@ -20,6 +20,10 @@ vstart_path=`dirname $0`
 name=$1
 port=$2
 
+if [ ! -z "$RGW_FRONTEND_THREADS" ]; then
+    set_frontend_threads="num_threads=$RGW_FRONTEND_THREADS"
+fi
+
 shift 2
 
 run_root=$script_root/run/$name
@@ -33,6 +37,6 @@ $vstart_path/mrun $name ceph -c $run_root/ceph.conf \
        -k $run_root/keyring auth get-or-create client.rgw.$port mon \
        'allow rw' osd 'allow rwx' mgr 'allow rw' >> $run_root/keyring
 
-$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port" \
+$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port $set_frontend_threads" \
        -n client.rgw.$port --pid-file=$pidfile \
        --admin-socket=$asokfile "$@" --log-file=$logfile
index 57901ce3f6b050061fc9ff46d9e0df7a7c91ae19..fbcf151f635703aca6bcde50fc499e3d1b4a1452 100644 (file)
@@ -87,6 +87,8 @@ set(librgw_common_srcs
   rgw_sync_module_log.cc
   rgw_sync_module_pubsub.cc
   rgw_pubsub_push.cc
+  rgw_notify.cc
+  rgw_notify_event_type.cc
   rgw_sync_module_pubsub_rest.cc
   rgw_sync_trace.cc
   rgw_trim_bilog.cc
index abb874340f3319102424789e68f30cb203a55631..503993cc2306f66752c3fab387bd6be151ac2d90 100644 (file)
@@ -63,6 +63,7 @@ extern "C" {
 #include "services/svc_datalog_rados.h"
 #include "services/svc_mdlog.h"
 #include "services/svc_meta_be_otp.h"
+#include "services/svc_zone.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
@@ -2912,7 +2913,7 @@ int main(int argc, const char **argv)
   string sub_dest_bucket;
   string sub_push_endpoint;
   string event_id;
-  set<string, ltstr_nocase> event_types;
+  rgw::notify::EventTypeList event_types;
 
   for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
     if (ceph_argparse_double_dash(args, i)) {
@@ -3255,7 +3256,7 @@ int main(int argc, const char **argv)
     } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
       event_id = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
-      get_str_set(val, ",", event_types);
+      rgw::notify::from_string_list(val, event_types);
     } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
       // do nothing
     } else if (strncmp(*i, "-", 1) == 0) {
index 60ae621df4ba46e78a220ac4a16fba9e6f3c30bd..c013ca88202a79373c4b66265e16c87bffb15a5f 100644 (file)
@@ -3,7 +3,6 @@
 
 #include "include/compat.h"
 #include "rgw_amqp.h"
-#include <atomic>
 #include <amqp.h>
 #include <amqp_tcp_socket.h>
 #include <amqp_framing.h>
@@ -73,6 +72,10 @@ struct connection_id_t {
   };
 };
 
+std::string to_string(const connection_id_t& id) {
+    return id.host+":"+"/"+id.vhost;
+}
+
 // connection_t state cleaner
 // could be used for automatic cleanup when getting out of scope
 class ConnectionCleaner {
@@ -145,9 +148,11 @@ struct connection_t {
     amqp_bytes_free(reply_to_queue);
     reply_to_queue = amqp_empty_bytes;
     // fire all remaining callbacks
-    std::for_each(callbacks.begin(), callbacks.end(), [s](auto& cb_tag) {
-        cb_tag.cb(s);
+    std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+        cb_tag.cb(status);
+        ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
       });
+    callbacks.clear();
     delivery_tag = 1;
   }
 
@@ -593,17 +598,17 @@ private:
     if (rc == AMQP_STATUS_OK) {
       auto const q_len = conn->callbacks.size();
       if (q_len < max_inflight) {
+        ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
         conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
-        ldout(conn->cct, 20) << "AMQP publish (" << reinterpret_cast<unsigned char*>(&message->cb) << "): OK. Queue has: " << q_len << " callbacks" << dendl;
       } else {
         // immediately invoke callback with error
-        ldout(conn->cct, 1) << "AMQP publish (" << reinterpret_cast<unsigned char*>(&message->cb) << "): failed with error: callback queue full" << dendl;
+        ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
         message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
       }
     } else {
       // an error occurred, close connection
       // it will be retied by the main loop
-      ldout(conn->cct, 1) << "AMQP publish (" << reinterpret_cast<unsigned char*>(&message->cb) << "): failed with error: " << status_to_string(rc) << dendl;
+      ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
       conn->destroy(rc);
       // immediately invoke callback with error
       message->cb(rc);
@@ -657,11 +662,13 @@ private:
           info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
           info.user = const_cast<char*>(conn->user.c_str());
           info.password = const_cast<char*>(conn->password.c_str());
-          ldout(conn->cct, 10) << "AMQP run: retry connection" << dendl;
+          ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
           if (create_connection(conn, info)->is_ok() == false) {
-            ldout(conn->cct, 1) << "AMQP run: connection retry failed" << dendl;
+            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
             // TODO: add error counter for failed retries
             // TODO: add exponential backoff for retries
+          } else {
+            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
           }
           INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -681,7 +688,7 @@ private:
         if (rc != AMQP_STATUS_OK) {
           // an error occurred, close connection
           // it will be retied by the main loop
-          ldout(conn->cct, 1) << "AMQP run: connection read error" << dendl;
+          ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
           conn->destroy(rc);
           INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -740,24 +747,26 @@ private:
 
         const auto& callbacks_end = conn->callbacks.end();
         const auto& callbacks_begin = conn->callbacks.begin();
-        const auto it = std::find(callbacks_begin, callbacks_end, tag);
-        if (it != callbacks_end) {
+        const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
+        if (tag_it != callbacks_end) {
           if (multiple) {
             // n/ack all up to (and including) the tag
-            ldout(conn->cct, 20) << "AMQP run: multiple n/acks received" << dendl;
-            for (auto rit = it; rit >= callbacks_begin; --rit) {
-              rit->cb(result);
-              conn->callbacks.erase(rit);
+            ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+            auto it = callbacks_begin;
+            while (it->tag <= tag && it != conn->callbacks.end()) {
+              ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+              it->cb(result);
+              it = conn->callbacks.erase(it);
             }
           } else {
             // n/ack a specific tag
-            ldout(conn->cct, 20) << "AMQP run: n/acks received" << dendl;
-            it->cb(result);
-            conn->callbacks.erase(it);
+            ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+            tag_it->cb(result);
+            conn->callbacks.erase(tag_it);
           }
         } else {
           // TODO add counter for acks with no callback
-          ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received" << dendl;
+          ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
         }
         // just increment the iterator
         ++conn_it;
index a9ac0cf494f385f374bc5e940c2ccd661458538e..cdc85016318cb1d1f928f933c32042eaf8956b85 100644 (file)
@@ -291,36 +291,38 @@ struct rgw_err {
   std::string message;
 };
 
-
-
 /* Helper class used for RGWHTTPArgs parsing */
 class NameVal
 {
-   string str;
-   string name;
-   string val;
+   const std::string str;
+   std::string name;
+   std::string val;
  public:
-    explicit NameVal(string nv) : str(nv) {}
+    explicit NameVal(const std::string& nv) : str(nv) {}
 
     int parse();
 
-    string& get_name() { return name; }
-    string& get_val() { return val; }
+    std::string& get_name() { return name; }
+    std::string& get_val() { return val; }
 };
 
 /** Stores the XML arguments associated with the HTTP request in req_state*/
 class RGWHTTPArgs {
-  string str, empty_str;
-  map<string, string> val_map;
-  map<string, string> sys_val_map;
-  map<string, string> sub_resources;
-  bool has_resp_modifier;
-  bool admin_subresource_added;
+  std::string str, empty_str;
+  std::map<std::string, std::string> val_map;
+  std::map<std::string, std::string> sys_val_map;
+  std::map<std::string, std::string> sub_resources;
+  bool has_resp_modifier = false;
+  bool admin_subresource_added = false;
  public:
-  RGWHTTPArgs() : has_resp_modifier(false), admin_subresource_added(false) {}
+  RGWHTTPArgs() = default;
+  explicit RGWHTTPArgs(const std::string& s) {
+      set(s);
+      parse();
+  }
 
   /** Set the arguments; as received */
-  void set(string s) {
+  void set(const std::string& s) {
     has_resp_modifier = false;
     val_map.clear();
     sub_resources.clear();
@@ -328,18 +330,18 @@ class RGWHTTPArgs {
   }
   /** parse the received arguments */
   int parse();
-  void append(const string& name, const string& val);
+  void append(const std::string& name, const string& val);
   /** Get the value for a specific argument parameter */
-  const string& get(const string& name, bool *exists = NULL) const;
+  const string& get(const std::string& name, bool *exists = NULL) const;
   boost::optional<const std::string&>
   get_optional(const std::string& name) const;
-  int get_bool(const string& name, bool *val, bool *exists);
+  int get_bool(const std::string& name, bool *val, bool *exists);
   int get_bool(const char *name, bool *val, bool *exists);
   void get_bool(const char *name, bool *val, bool def_val);
   int get_int(const char *name, int *val, int def_val);
 
   /** Get the value for specific system argument parameter */
-  std::string sys_get(const string& name, bool *exists = nullptr) const;
+  std::string sys_get(const std::string& name, bool *exists = nullptr) const;
 
   /** see if a parameter is contained in this RGWHTTPArgs */
   bool exists(const char *name) const {
@@ -348,7 +350,7 @@ class RGWHTTPArgs {
   bool sub_resource_exists(const char *name) const {
     return (sub_resources.find(name) != std::end(sub_resources));
   }
-  map<string, string>& get_params() {
+  std::map<std::string, std::string>& get_params() {
     return val_map;
   }
   const std::map<std::string, std::string>& get_sub_resources() const {
@@ -361,12 +363,12 @@ class RGWHTTPArgs {
     return has_resp_modifier;
   }
   void set_system() { /* make all system params visible */
-    map<string, string>::iterator iter;
+    std::map<std::string, std::string>::iterator iter;
     for (iter = sys_val_map.begin(); iter != sys_val_map.end(); ++iter) {
       val_map[iter->first] = iter->second;
     }
   }
-  const string& get_str() {
+  const std::string& get_str() {
     return str;
   }
 }; // RGWHTTPArgs
index bc0363bdf0795d5adf12424e6d5e91df773dc98c..d8afd86cdf1150111a40b75eb8e117ba7ea1258d 100644 (file)
@@ -37,6 +37,9 @@
 #include "rgw_frontend.h"
 #include "rgw_http_client_curl.h"
 #include "rgw_perf_counters.h"
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+#include "rgw_amqp.h"
+#endif
 #if defined(WITH_RADOSGW_BEAST_FRONTEND)
 #include "rgw_asio_frontend.h"
 #endif /* WITH_RADOSGW_BEAST_FRONTEND */
@@ -367,6 +370,14 @@ int main(int argc, const char **argv)
     }
   }
 
+  if (pubsub_enabled) {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+    if (!rgw::amqp::init(cct.get())) {
+        dout(1) << "ERROR: failed to initialize AMQP manager" << dendl;
+    }
+#endif
+  }
+
   if (apis_map.count("swift") > 0) {
     RGWRESTMgr_SWIFT* const swift_resource = new RGWRESTMgr_SWIFT;
 
@@ -593,6 +604,9 @@ int main(int argc, const char **argv)
   rgw_http_client_cleanup();
   rgw::curl::cleanup_curl();
   g_conf().remove_observer(&implicit_tenant_context);
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+  rgw::amqp::shutdown();
+#endif
 
   rgw_perf_stop(g_ceph_context);
 
diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc
new file mode 100644 (file)
index 0000000..944fda8
--- /dev/null
@@ -0,0 +1,128 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_notify.h"
+#include <memory>
+#include <boost/algorithm/hex.hpp>
+#include "rgw_pubsub.h"
+#include "rgw_pubsub_push.h"
+#include "rgw_perf_counters.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace rgw::notify {
+
+// populate record from request
+void populate_record_from_request(const req_state *s, 
+        const ceph::real_time& mtime, 
+        const std::string& etag, 
+        EventType event_type,
+        rgw_pubsub_s3_record& record) { 
+  record.eventVersion = "2.1";
+  record.eventSource = "aws:s3";
+  record.eventTime = mtime;
+  record.eventName = to_string(event_type);
+  record.userIdentity = s->user->user_id.id;    // user that triggered the change
+  record.sourceIPAddress = "";                  // IP address of client that triggered the change: TODO
+  record.x_amz_request_id = s->req_id;          // request ID of the original change
+  record.x_amz_id_2 = s->host_id;               // RGW on which the change was made
+  record.s3SchemaVersion = "1.0";
+  // configurationId is filled from subscription configuration
+  record.bucket_name = s->bucket_name;
+  record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
+  record.bucket_arn = to_string(rgw::ARN(s->bucket));
+  record.object_key = s->object.name;
+  record.object_size = s->obj_size;
+  record.object_etag = etag;
+  record.object_versionId = s->object.instance;
+  // use timestamp as per key sequence id (hex encoded)
+  const utime_t ts(real_clock::now());
+  boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
+          std::back_inserter(record.object_sequencer));
+  // event ID is rgw extension (not in the S3 spec), used for acking the event
+  // same format is used in both S3 compliant and Ceph specific events
+  // not used in case of push-only mode
+  record.id = "";
+  record.bucket_id = s->bucket.bucket_id;
+  // pass meta data
+  record.x_meta_map = s->info.x_meta_map;
+}
+
+bool filter(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event_type) {
+    // if event list exists, and none of the events in the list matches the event type, filter the message
+    if (filter.events.size() && std::find(filter.events.begin(), filter.events.end(), event_type) == filter.events.end()) {
+        return true;
+    }
+    // TODO: add filter by compliant conf: object name, prefix, suffix
+    // TODO: add extra filtering criteria: object size, ToD, metadata, ...
+    return false;
+}
+
+int publish(const req_state* s, 
+        const ceph::real_time& mtime, 
+        const std::string& etag, 
+        EventType event_type,
+        rgw::sal::RGWRadosStore* store) {
+    RGWUserPubSub ps_user(store, s->user->user_id);
+    RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket);
+    rgw_pubsub_bucket_topics bucket_topics;
+    auto rc = ps_bucket.get_topics(&bucket_topics);
+    if (rc < 0) {
+        // failed to fetch bucket topics
+        return rc;
+    }
+    rgw_pubsub_s3_record record;
+    populate_record_from_request(s, mtime, etag, event_type, record);
+    bool event_handled = false;
+    bool event_should_be_handled = false;
+    for (const auto& bucket_topic : bucket_topics.topics) {
+        const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
+        const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
+        if (filter(topic_filter, s, event_type)) {
+            // topic does not apply to req_state
+            continue;
+        }
+        event_should_be_handled = true;
+        try {
+            // TODO add endpoint LRU cache
+            const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint, 
+                    topic_cfg.dest.arn_topic,
+                    RGWHTTPArgs(topic_cfg.dest.push_endpoint_args), 
+                    s->cct);
+            const std::string push_endpoint_str = push_endpoint->to_str();
+            ldout(s->cct, 20) << "push endpoint created: " << push_endpoint_str << dendl;
+            auto rc = push_endpoint->send_to_completion_async(s->cct, record, s->yield);
+            if (rc < 0) {
+                // bail out on first error
+                // TODO: add conf for bail out policy
+                ldout(s->cct, 1) << "push to endpoint " << push_endpoint_str << " failed, with error: " << rc << dendl;
+                if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+                return rc;
+            }
+            if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
+            ldout(s->cct, 20) << "successfull push to endpoint " << push_endpoint_str << dendl;
+            event_handled = true;
+        } catch (const RGWPubSubEndpoint::configuration_error& e) {
+            ldout(s->cct, 1) << "ERROR: failed to create push endpoint: " 
+                << topic_cfg.dest.push_endpoint << " due to: " << e.what() << dendl;
+            if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+            return -EINVAL;
+        }
+    }
+
+    if (event_should_be_handled) {
+        // not counting events with no notifications or events that are filtered
+        // counting a single event, regardless of the number of notifications it sends
+        if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
+        if (!event_handled) {
+            // all notifications for this event failed
+            if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
+        }
+    }
+
+    return 0;
+}
+
+}
+
diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h
new file mode 100644 (file)
index 0000000..4711c30
--- /dev/null
@@ -0,0 +1,28 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+#include "common/ceph_time.h"
+#include "rgw_notify_event_type.h"
+
+// forward declarations
+class CephContext;
+namespace rgw::sal {
+    class RGWRadosStore;
+}
+class RGWRados;
+class req_state;
+
+namespace rgw::notify {
+
+// publish notification
+int publish(const req_state* s, 
+        const ceph::real_time& mtime, 
+        const std::string& etag, 
+        EventType event_type,
+        rgw::sal::RGWRadosStore* store);
+
+}
+
diff --git a/src/rgw/rgw_notify_event_type.cc b/src/rgw/rgw_notify_event_type.cc
new file mode 100644 (file)
index 0000000..a95d348
--- /dev/null
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_notify_event_type.h"
+#include "include/str_list.h"
+
+namespace rgw::notify {
+
+  std::string to_string(EventType t) {
+    switch (t) {
+      case ObjectCreated:
+        return "s3:ObjectCreated:*";
+      case ObjectCreatedPut:
+        return "s3:ObjectCreated:Put";
+      case ObjectCreatedPost:
+        return "s3:ObjectCreated:Post";
+      case ObjectCreatedCopy:
+        return "s3:ObjectCreated:Copy";
+      case ObjectRemoved:
+        return "s3:ObjectRemoved:*";
+      case ObjectRemovedDelete:
+        return "s3:ObjectRemoved:Delete";
+      case ObjectRemovedDeleteMarkerCreated:
+        return "s3:ObjectRemoved:DeleteMarkerCreated";
+      case UnknownEvent:
+        return "s3:UnknownEvet";
+    }
+    return "s3:UnknownEvent";
+  }
+
+  std::string to_ceph_string(EventType t) {
+    switch (t) {
+      case ObjectCreated:
+      case ObjectCreatedPut:
+      case ObjectCreatedPost:
+      case ObjectCreatedCopy:
+        return "OBJECT_CREATE";
+      case ObjectRemoved:
+      case ObjectRemovedDelete:
+        return "OBJECT_DELETE";
+      case ObjectRemovedDeleteMarkerCreated:
+        return "DELETE_MARKER_CREATE";
+      case UnknownEvent:
+        return "UNKNOWN_EVENT";
+    }
+    return "UNKNOWN_EVENT";
+  }
+
+  EventType from_string(const std::string& s) {
+    if (s == "s3:ObjectCreated:*" || s == "OBJECT_CREATE")
+        return ObjectCreated;
+    if (s == "s3:ObjectCreated:Put")
+        return ObjectCreatedPut;
+    if (s == "s3:ObjectCreated:Post")
+        return ObjectCreatedPost;
+    if (s == "s3:ObjectCreated:Copy")
+        return ObjectCreatedCopy;
+    if (s == "s3:ObjectRemoved:*" || s == "OBJECT_DELETE")
+        return ObjectRemoved;
+    if (s == "s3:ObjectRemoved:Delete")
+        return ObjectRemovedDelete;
+    if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE")
+        return ObjectRemovedDeleteMarkerCreated;
+    return UnknownEvent;
+  }
+
+bool operator==(EventType lhs, EventType rhs) {
+  return lhs & rhs;
+}
+
+void from_string_list(const std::string& string_list, EventTypeList& event_list) {
+  event_list.clear();
+  ceph::for_each_substr(string_list, ",", [&event_list] (auto token) {
+    event_list.push_back(rgw::notify::from_string(std::string(token.begin(), token.end())));
+  });
+}
+}
diff --git a/src/rgw/rgw_notify_event_type.h b/src/rgw/rgw_notify_event_type.h
new file mode 100644 (file)
index 0000000..2ac2d1f
--- /dev/null
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+#include <string>
+#include <vector>
+
+namespace rgw::notify {
+  enum EventType {
+    ObjectCreated       = 0xF,
+    ObjectCreatedPut    = 0x1,
+    ObjectCreatedPost   = 0x2,
+    ObjectCreatedCopy   = 0x4,
+    ObjectRemoved       = 0xF0,
+    ObjectRemovedDelete = 0x10,
+    ObjectRemovedDeleteMarkerCreated = 0x20,
+    UnknownEvent = 0x100
+  };
+
+  using EventTypeList = std::vector<EventType>;
+
+  // two event types are considered equal if their bits intersect
+  bool operator==(EventType lhs, EventType rhs);
+
+  std::string to_string(EventType t);
+
+  std::string to_ceph_string(EventType t);
+
+  EventType from_string(const std::string& s);
+  // create a vector of event types from comma separated list of event types
+  void from_string_list(const std::string& string_list, EventTypeList& event_list);
+}
+
index 68a6e32489269c76595131e5e0029bb3906dca87..238b8ba571f6dca58ea1d7b9061f04d2c062e9a6 100644 (file)
@@ -48,6 +48,8 @@
 #include "rgw_putobj_processor.h"
 #include "rgw_crypt.h"
 #include "rgw_perf_counters.h"
+#include "rgw_notify.h"
+#include "rgw_notify_event_type.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
@@ -4047,6 +4049,15 @@ void RGWPutObj::execute()
       return;
     }
   }
+
+  // send request to notification manager
+  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+       // this should be global conf (probably returnign a different handler)
+    // so we don't need to read the configured values before we perform it
+  }
 }
 
 int RGWPostObj::verify_permission()
@@ -4267,6 +4278,14 @@ void RGWPostObj::execute()
       return;
     }
   } while (is_next_file_to_upload());
+  // send request to notification manager
+  const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectCreatedPost, store);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+       // this should be global conf (probably returnign a different handler)
+    // so we don't need to read the configured values before we perform it
+  }
 }
 
 
@@ -4809,6 +4828,15 @@ void RGWDeleteObj::execute()
   } else {
     op_ret = -EINVAL;
   }
+  // TODO: add etag calculation
+  std::string etag;
+  const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectRemovedDelete, store);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+       // this should be global conf (probably returnign a different handler)
+    // so we don't need to read the configured values before we perform it
+  }
 }
 
 bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
@@ -5113,6 +5141,15 @@ void RGWCopyObj::execute()
                           copy_obj_progress_cb, (void *)this, 
                            this,
                            s->yield);
+  
+  // TODO: use s3:ObjectCreated:Copy
+  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+       // this should be global conf (probably returnign a different handler)
+    // so we don't need to read the configured values before we perform it
+  }
 }
 
 int RGWGetACLs::verify_permission()
index 389a3c84e91ede1e158c822bcecc3f62c7073710..4e80011388fb07f13cd6d5ee5a4ea94a13491b07 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
+#include "services/svc_zone.h"
 #include "rgw_b64.h"
 #include "rgw_sal.h"
 #include "rgw_pubsub.h"
 
 #define dout_subsys ceph_subsys_rgw
 
+void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj)
+{
+  l.clear();
+
+  XMLObjIter iter = obj->find(name);
+  XMLObj *o;
+
+  while ((o = iter.get_next())) {
+    std::string val;
+    decode_xml_obj(val, o);
+    l.push_back(rgw::notify::from_string(val));
+  }
+}
+
 bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
   const auto throw_if_missing = true;
   RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
@@ -21,8 +36,8 @@ bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
   do_decode_xml_obj(events, "Event", obj);
   if (events.empty()) {
     // if no events are provided, we assume all events
-    events.push_back("s3:ObjectCreated:*");
-    events.push_back("s3:ObjectRemoved:*");
+    events.push_back(rgw::notify::ObjectCreated);
+    events.push_back(rgw::notify::ObjectRemoved);
   }
   return true;
 }
@@ -31,7 +46,7 @@ void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
   ::encode_xml("Id", id, f);
   ::encode_xml("Topic", topic_arn.c_str(), f);
   for (const auto& event : events) {
-    ::encode_xml("Event", event, f);
+    ::encode_xml("Event", rgw::notify::to_string(event), f);
   }
 }
 
@@ -53,14 +68,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
   encode_json("awsRegion", awsRegion, f);
   utime_t ut(eventTime);
   encode_json("eventTime", ut, f);
-  if (eventName == "OBJECT_CREATE") {
-    encode_json("eventName", "ObjectCreated", f);
-  }
-  else if (eventName == "OBJECT_DELETE") {
-    encode_json("eventName", "ObjectRemoved", f);
-  } else {
-    encode_json("eventName", "UNKNOWN_EVENT", f);
-  }
+  encode_json("eventName", eventName, f);
   {
     Formatter::ObjectSection s(*f, "userIdentity");
     encode_json("principalId", userIdentity, f);
@@ -86,6 +94,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
             encode_json("principalId", bucket_ownerIdentity, f);
         }
         encode_json("arn", bucket_arn, f);
+        encode_json("id", bucket_id, f);
     }
     {
         Formatter::ObjectSection sub_s(*f, "object");
@@ -94,6 +103,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
         encode_json("etag", object_etag, f);
         encode_json("versionId", object_versionId, f);
         encode_json("sequencer", object_sequencer, f);
+        encode_json("metadata", x_meta_map, f);
     }
   }
   encode_json("eventId", id, f);
@@ -102,7 +112,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
 void rgw_pubsub_event::dump(Formatter *f) const
 {
   encode_json("id", id, f);
-  encode_json("event", event, f);
+  encode_json("event", event_name, f);
   utime_t ut(timestamp);
   encode_json("timestamp", ut, f);
   encode_json("info", info, f);
@@ -124,6 +134,15 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const
   encode_xml("TopicArn", arn, f);
 }
 
+void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
+{
+  f->open_array_section(name);
+  for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
+    f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
+  }
+  f->close_section();
+}
+
 void rgw_pubsub_topic_filter::dump(Formatter *f) const
 {
   encode_json("topic", topic, f);
@@ -165,14 +184,14 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const
   encode_json("oid_prefix", oid_prefix, f);
   encode_json("push_endpoint", push_endpoint, f);
   encode_json("push_endpoint_args", push_endpoint_args, f);
-  encode_json("arn_topic", arn_topic, f);
+  encode_json("push_endpoint_topic", arn_topic, f);
 }
 
 void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
 {
   encode_xml("EndpointAddress", push_endpoint, f);
   encode_xml("EndpointArgs", push_endpoint_args, f);
-  encode_xml("TopicArn", arn_topic, f);
+  encode_xml("EndpointTopic", arn_topic, f);
 }
 
 void rgw_pubsub_sub_config::dump(Formatter *f) const
@@ -184,23 +203,11 @@ void rgw_pubsub_sub_config::dump(Formatter *f) const
   encode_json("s3_id", s3_id, f);
 }
 
-RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user) : store(_store),
-                                                                        user(_user),
-                                                                        obj_ctx(store->svc()->sysobj->init_obj_ctx())
-{
-  get_user_meta_obj(&user_meta_obj);
-}
-
-void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
-  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
-}
-
-void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
-  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
-}
-
-void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
-  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
+RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) : 
+                            store(_store),
+                            user(_user),
+                            obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
+    get_user_meta_obj(&user_meta_obj);
 }
 
 int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
@@ -216,8 +223,8 @@ int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tra
 int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
 {
   int ret = read(user_meta_obj, result, objv_tracker);
-  if (ret < 0 && ret != -ENOENT) {
-    ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+  if (ret < 0) {
+    ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
     return ret;
   }
   return 0;
@@ -283,7 +290,26 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
   return 0;
 }
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const EventTypeList& events)
+int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+{
+  rgw_pubsub_user_topics topics;
+  int ret = get_user_topics(&topics);
+  if (ret < 0) {
+    ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  auto iter = topics.topics.find(name);
+  if (iter == topics.topics.end()) {
+    ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
+    return -ENOENT;
+  }
+
+  *result = iter->second.topic;
+  return 0;
+}
+
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name)
 {
   rgw_pubsub_topic_subs user_topic_info;
   rgw::sal::RGWRadosStore *store = ps->store;
@@ -310,6 +336,7 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const E
   auto& topic_filter = bucket_topics.topics[topic_name];
   topic_filter.topic = user_topic_info.topic;
   topic_filter.events = events;
+  topic_filter.s3_id = notif_name;
 
   ret = write_topics(bucket_topics, &objv_tracker);
   if (ret < 0) {
@@ -363,10 +390,11 @@ int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& d
 
   int ret = read_user_topics(&topics, &objv_tracker);
   if (ret < 0 && ret != -ENOENT) {
+    // its not an error if not topics exist, we create one
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
   }
-
   rgw_pubsub_topic_subs& new_topic = topics.topics[name];
   new_topic.topic.user = user;
   new_topic.topic.name = name;
@@ -391,6 +419,10 @@ int RGWUserPubSub::remove_topic(const string& name)
   if (ret < 0 && ret != -ENOENT) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
+  } else if (ret == -ENOENT) {
+      // its not an error if no topics exist, just a no-op
+      ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
+      return 0;
   }
 
   topics.topics.erase(name);
@@ -450,13 +482,13 @@ int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest
   int ret = ps->read_user_topics(&topics, &user_objv_tracker);
   if (ret < 0) {
     ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
-    return ret;
+    return ret != -ENOENT ? ret : -EINVAL;
   }
 
   auto iter = topics.topics.find(topic);
   if (iter == topics.topics.end()) {
     ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
-    return -ENOENT;
+    return -EINVAL;
   }
 
   auto& t = iter->second;
@@ -506,10 +538,9 @@ int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
 
   int ret = ps->read_user_topics(&topics, &objv_tracker);
   if (ret < 0) {
-    ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
-  }
-
-  if (ret >= 0) {
+    // not an error - could be that topic was already deleted
+    ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
+  } else {
     auto iter = topics.topics.find(topic);
     if (iter != topics.topics.end()) {
       auto& t = iter->second;
@@ -650,6 +681,18 @@ int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id
   return 0;
 }
 
+void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
+  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
+}
+
+void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
+}
+
+void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+  *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
+}
+
 template<typename EventType>
 void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
   list.dump(f);
index 7d4760adef42da7abcb9fe30842f8bf148bc0d28..bf29cd15e873366990141a872ea6c471cf57fd8e 100644 (file)
@@ -4,11 +4,12 @@
 #ifndef CEPH_RGW_PUBSUB_H
 #define CEPH_RGW_PUBSUB_H
 
+#include "rgw_sal.h"
+#include "services/svc_sys_obj.h"
 #include "rgw_tools.h"
 #include "rgw_zone.h"
 #include "rgw_rados.h"
-#include "services/svc_sys_obj.h"
-#include "services/svc_zone.h"
+#include "rgw_notify_event_type.h"
 
 class XMLObj;
 
@@ -35,7 +36,7 @@ struct rgw_pubsub_s3_notification {
   // notification id
   std::string id;
   // types of events
-  std::list<std::string> events;
+  rgw::notify::EventTypeList events;
   // topic ARN
   std::string topic_arn;
 
@@ -78,13 +79,15 @@ struct rgw_pubsub_s3_notifications {
           "principalId":""
         },
         "arn":""
+        "id": ""
       },
       "object":{
         "key":"",
         "size": ,
         "eTag":"",
         "versionId":"",
-        "sequencer": ""
+        "sequencer": "",
+        "metadata": ""
       }
     },
     "eventId":"",
@@ -137,9 +140,13 @@ struct rgw_pubsub_s3_record {
   // used to store a globally unique identifier of the event
   // that could be used for acking
   std::string id;
+  // this is an rgw extension holding the internal bucket id
+  std::string bucket_id;
+  // meta data
+  std::map<std::string, std::string> x_meta_map;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(eventVersion, bl);
     encode(eventSource, bl);
     encode(awsRegion, bl);
@@ -160,11 +167,13 @@ struct rgw_pubsub_s3_record {
     encode(object_versionId, bl);
     encode(object_sequencer, bl);
     encode(id, bl);
+    encode(bucket_id, bl);
+    encode(x_meta_map, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(eventVersion, bl);
     decode(eventSource, bl);
     decode(awsRegion, bl);
@@ -185,6 +194,10 @@ struct rgw_pubsub_s3_record {
     decode(object_versionId, bl);
     decode(object_sequencer, bl);
     decode(id, bl);
+    if (struct_v >= 2) {
+        decode(bucket_id, bl);
+        decode(x_meta_map, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -195,16 +208,16 @@ WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
 struct rgw_pubsub_event {
   constexpr static const char* const json_type_single = "event";
   constexpr static const char* const json_type_plural = "events";
-  string id;
-  string event;
-  string source;
+  std::string id;
+  std::string event_name;
+  std::string source;
   ceph::real_time timestamp;
   JSONFormattable info;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(id, bl);
-    encode(event, bl);
+    encode(event_name, bl);
     encode(source, bl);
     encode(timestamp, bl);
     encode(info, bl);
@@ -214,7 +227,7 @@ struct rgw_pubsub_event {
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(id, bl);
-    decode(event, bl);
+    decode(event_name, bl);
     decode(source, bl);
     decode(timestamp, bl);
     decode(info, bl);
@@ -261,7 +274,6 @@ struct rgw_pubsub_sub_dest {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
 
-
 struct rgw_pubsub_sub_config {
   rgw_user user;
   std::string name;
@@ -356,23 +368,34 @@ struct rgw_pubsub_topic_subs {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
 
-typedef std::set<std::string, ltstr_nocase> EventTypeList;
-
 struct rgw_pubsub_topic_filter {
   rgw_pubsub_topic topic;
-  EventTypeList events;
+  rgw::notify::EventTypeList events;
+  std::string s3_id;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(topic, bl);
-    encode(events, bl);
+    // events are stored as a vector of strings
+    std::vector<std::string> tmp_events;
+    const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
+    std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
+    encode(tmp_events, bl);
+    encode(s3_id, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(topic, bl);
-    decode(events, bl);
+    // events are stored as a vector of strings
+    events.clear();
+    std::vector<std::string> tmp_events;
+    decode(tmp_events, bl);
+    std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
+    if (struct_v >= 2) {
+      decode(s3_id, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -381,7 +404,7 @@ struct rgw_pubsub_topic_filter {
 WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
 
 struct rgw_pubsub_bucket_topics {
-  std::map<string, rgw_pubsub_topic_filter> topics;
+  std::map<std::string, rgw_pubsub_topic_filter> topics;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
@@ -431,15 +454,15 @@ class RGWUserPubSub
 
   rgw_raw_obj user_meta_obj;
 
-  string user_meta_oid() const {
+  std::string user_meta_oid() const {
     return pubsub_user_oid_prefix + user.to_str();
   }
 
-  string bucket_meta_oid(const rgw_bucket& bucket) const {
+  std::string bucket_meta_oid(const rgw_bucket& bucket) const {
     return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
   }
 
-  string sub_meta_oid(const string& name) const {
+  std::string sub_meta_oid(const string& name) const {
     return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
   }
 
@@ -480,10 +503,11 @@ public:
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
     int get_topics(rgw_pubsub_bucket_topics *result);
     // adds a topic + filter (event list) to a bucket
+    // assigning a notification name is optional (needed for S3 compatible notifications)
     // if the topic already exist on the bucket, the filter event list may be updated
     // return -ENOENT if the topic does not exists
     // return 0 on success, error code otherwise
-    int create_notification(const string& topic_name, const EventTypeList& events);
+    int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name="");
     // remove a topic and filter from bucket
     // if the topic does not exists on the bucket it is a no-op (considered success)
     // return -ENOENT if the topic does not exists
@@ -503,7 +527,7 @@ public:
     int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
     int remove_sub(RGWObjVersionTracker *objv_tracker);
   public:
-    Sub(RGWUserPubSub *_ps, const string& _sub) : ps(_ps), sub(_sub) {
+    Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
       ps->get_sub_meta_obj(sub, &sub_meta_obj);
     }
 
@@ -566,15 +590,20 @@ public:
 
   void get_user_meta_obj(rgw_raw_obj *obj) const;
   void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
+
   void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
 
   // get all topics defined for the user and populate them into "result"
   // return 0 on success or if no topics exist, error code otherwise
   int get_user_topics(rgw_pubsub_user_topics *result);
-  // get a topic by its name and populate it into "result"
+  // get a topic with its subscriptions by its name and populate it into "result"
   // return -ENOENT if the topic does not exists 
   // return 0 on success, error code otherwise
   int get_topic(const string& name, rgw_pubsub_topic_subs *result);
+  // get a topic with by its name and populate it into "result"
+  // return -ENOENT if the topic does not exists 
+  // return 0 on success, error code otherwise
+  int get_topic(const string& name, rgw_pubsub_topic *result);
   // create a topic with a name only
   // if the topic already exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
index b1e0430e23ced200ad44308d8faebe64e91464e7..dd825e0570b3599d7a7cc09f74795b38fd0c5cff 100644 (file)
@@ -7,6 +7,7 @@
 #include <algorithm>
 #include "include/buffer_fwd.h"
 #include "common/Formatter.h"
+#include "common/async/completion.h"
 #include "rgw_common.h"
 #include "rgw_data_sync.h"
 #include "rgw_pubsub.h"
@@ -101,7 +102,7 @@ public:
     } else {
       ack_level = std::atoi(str_ack_level.c_str());
       if (ack_level < 100 || ack_level >= 600) {
-        throw configuration_error("HTTP/S: invalid http-ack-level " + str_ack_level);
+        throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
       }
     }
 
@@ -125,6 +126,19 @@ public:
     return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
   }
 
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+    bufferlist read_bl;
+    RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
+    const auto post_data = json_format_pubsub_event(record);
+    request.set_post_data(post_data);
+    request.set_send_length(post_data.length());
+    if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+    const auto rc = RGWHTTP::process(&request, y);
+    if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+    // TODO: use read_bl to process return code and handle according to ack level
+    return rc;
+  }
+
   std::string to_str() const override {
     std::string str("HTTP/S Endpoint");
     str += "\nURI: " + endpoint;
@@ -143,6 +157,7 @@ private:
     ACK_LEVEL_BROKER,
     ACK_LEVEL_ROUTEABLE
   };
+  CephContext* const cct;
   const std::string endpoint;
   const std::string topic;
   amqp::connection_ptr_t conn;
@@ -162,17 +177,16 @@ private:
   // This coroutine ends when it send the message and does not wait for an ack
   class NoAckPublishCR : public RGWCoroutine {
   private:
-    RGWDataSyncEnv* const sync_env;
     const std::string topic;
     amqp::connection_ptr_t conn;
     const std::string message;
 
   public:
-    NoAckPublishCR(RGWDataSyncEnv* _sync_env,
+    NoAckPublishCR(CephContext* cct,
               const std::string& _topic,
               amqp::connection_ptr_t& _conn,
               const std::string& _message) :
-      RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      RGWCoroutine(cct),
       topic(_topic), conn(_conn), message(_message) {}
 
     // send message to endpoint, without waiting for reply
@@ -193,19 +207,18 @@ private:
   // note that it does not wait for an ack fron the end client
   class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
   private:
-    RGWDataSyncEnv* const sync_env;
     const std::string topic;
     amqp::connection_ptr_t conn;
     const std::string message;
     const ack_level_t ack_level; // TODO not used for now
 
   public:
-    AckPublishCR(RGWDataSyncEnv* _sync_env,
+    AckPublishCR(CephContext* cct,
               const std::string& _topic,
               amqp::connection_ptr_t& _conn,
               const std::string& _message,
               ack_level_t _ack_level) :
-      RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      RGWCoroutine(cct),
       topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
 
     // send message to endpoint, waiting for reply
@@ -255,10 +268,14 @@ public:
   RGWPubSubAMQPEndpoint(const std::string& _endpoint,
       const std::string& _topic,
       const RGWHTTPArgs& args,
-      CephContext* cct) : 
+      CephContext* _cct) : 
+        cct(_cct),
         endpoint(_endpoint), 
         topic(_topic), 
         conn(amqp::connect(endpoint, get_exchange(args))) {
+    if (!conn) { 
+      throw configuration_error("AMQP: failed to create connection to: " + endpoint);
+    }
     bool exists;
     // get ack level
     str_ack_level = args.get("amqp-ack-level", &exists);
@@ -270,27 +287,105 @@ public:
     } else if (str_ack_level == "routable") {
       ack_level = ACK_LEVEL_ROUTEABLE;
     } else {
-      throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
+      throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
     }
   }
 
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+    ceph_assert(conn);
     if (ack_level == ACK_LEVEL_NONE) {
-      return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags
       // but the same mechanism
-      return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
     }
   }
   
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+    ceph_assert(conn);
     if (ack_level == ACK_LEVEL_NONE) {
-      return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record));
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags
       // but the same mechanism
-      return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level);
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+    }
+  }
+
+  // this allows waiting untill "finish()" is called from a different thread
+  // waiting could be blocking the waiting thread or yielding, depending
+  // with compilation flag support and whether the optional_yield is set
+  class Waiter {
+    using Signature = void(boost::system::error_code);
+    using Completion = ceph::async::Completion<Signature>;
+    std::unique_ptr<Completion> completion = nullptr;
+    int ret;
+
+    mutable std::atomic<bool> done = false;
+    mutable std::mutex lock;
+    mutable std::condition_variable cond;
+
+    template <typename ExecutionContext, typename CompletionToken>
+    auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+      boost::asio::async_completion<CompletionToken, Signature> init(token);
+      auto& handler = init.completion_handler;
+      {
+        std::unique_lock l{lock};
+        completion = Completion::create(ctx.get_executor(), std::move(handler));
+      }
+      return init.result.get();
+    }
+
+  public:
+    int wait(optional_yield y) {
+      if (done) {
+        return ret;
+      }
+#ifdef HAVE_BOOST_CONTEXT
+      if (y) {
+        auto& io_ctx = y.get_io_context();
+        auto& yield_ctx = y.get_yield_context();
+        boost::system::error_code ec;
+        async_wait(io_ctx, yield_ctx[ec]);
+        return -ec.value();
+      }
+#endif
+      std::unique_lock l(lock);
+      cond.wait(l, [this]{return (done==true);});
+      return ret;
+    }
+
+    void finish(int r) {
+      std::unique_lock l{lock};
+      ret = r;
+      done = true;
+      if (completion) {
+        boost::system::error_code ec(-ret, boost::system::system_category());
+        Completion::post(std::move(completion), ec);
+      } else {
+        cond.notify_all();
+      }
+    }
+  };
+
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+    ceph_assert(conn);
+    if (ack_level == ACK_LEVEL_NONE) {
+      return amqp::publish(conn, topic, json_format_pubsub_event(record));
+    } else {
+      // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
+      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+      auto w = std::unique_ptr<Waiter>(new Waiter);
+      const auto rc = amqp::publish_with_confirm(conn, 
+        topic,
+        json_format_pubsub_event(record),
+        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+      if (rc < 0) {
+        // failed to publish, does not wait for reply
+        return rc;
+      }
+      return w->wait(y);
     }
   }
 
@@ -348,14 +443,14 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
     if (version == AMQP_0_9_1) {
       return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
     } else if (version == AMQP_1_0) {
-      throw configuration_error("amqp v1.0 not supported");
+      throw configuration_error("AMQP: v1.0 not supported");
       return nullptr;
     } else {
-      throw configuration_error("unknown amqp version " + version);
+      throw configuration_error("AMQP: unknown version: " + version);
       return nullptr;
     }
   } else if (schema == "amqps") {
-    throw configuration_error("amqps not supported");
+    throw configuration_error("AMQP: ssl not supported");
     return nullptr;
 #endif
   }
index 622be3520bf1ce040b9e0f3d3683e3ad5479e84c..54f46f69c252dff6016cf12ef718cde5151e48ac 100644 (file)
@@ -6,6 +6,7 @@
 #include <memory>
 #include <stdexcept>
 #include "include/buffer_fwd.h"
+#include "common/async/yield_context.h"
 
 // TODO the env should be used as a template parameter to differentiate the source that triggers the pushes
 class RGWDataSyncEnv;
@@ -31,13 +32,17 @@ public:
   static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
  
   // this method is used in order to send notification (Ceph specific) and wait for completion 
-  // in async manner via a coroutine
+  // in async manner via a coroutine when invoked in the data sync environment
   virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0;
 
   // this method is used in order to send notification (S3 compliant) and wait for completion 
-  // in async manner via a coroutine
+  // in async manner via a coroutine when invoked in the data sync environment
   virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0;
 
+  // this method is used in order to send notification (S3 compliant) and wait for completion 
+  // in async manner via a coroutine when invoked in the frontend environment
+  virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) = 0;
+
   // present as string
   virtual std::string to_str() const { return ""; }
   
index 1ff5099d85456f50bfd73915c8b1fe8cb9b6aea5..4a2e365fd2d742f05db397707d681b95742728f1 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <algorithm>
 #include <boost/tokenizer.hpp>
+#include <optional>
 #include "rgw_rest_pubsub_common.h"
 #include "rgw_rest_pubsub.h"
 #include "rgw_pubsub_push.h"
@@ -13,6 +14,7 @@
 #include "rgw_rest_s3.h"
 #include "rgw_arn.h"
 #include "rgw_auth_s3.h"
+#include "services/svc_zone.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
@@ -333,27 +335,8 @@ int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp) {
   return RGW_Auth_S3::authorize(dpp, store, auth_registry, s);
 }
 
-namespace {
-// conversion functions between S3 and GCP style event names
-std::string s3_to_gcp_event(const std::string& event) {
-  if (event == "s3:ObjectCreated:*") {
-    return "OBJECT_CREATE";
-  }
-  if (event == "s3:ObjectRemoved:*") {
-    return "OBJECT_DELETE";
-  }
-  return "UNKNOWN_EVENT";
-}
-std::string gcp_to_s3_event(const std::string& event) {
-  if (event == "OBJECT_CREATE") {
-    return "s3:ObjectCreated:";
-  }
-  if (event == "OBJECT_DELETE") {
-    return "s3:ObjectRemoved:";
-  }
-  return "UNKNOWN_EVENT";
-}
 
+namespace {
 // return a unique topic by prefexing with the notification name: <notification>_<topic>
 std::string topic_to_unique(const std::string& topic, const std::string& notification) {
   return notification + "_" + topic;
@@ -366,6 +349,14 @@ std::string topic_to_unique(const std::string& topic, const std::string& notific
   }
   return unique_topic.substr(notification.length() + 1);
 }
+
+// from list of bucket topics, find the one that was auto-generated by a notification
+auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
+    auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
+    return it != bucket_topics.topics.end() ?
+        std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
+        std::nullopt;
+}
 }
 
 // command (S3 compliant): PUT /<bucket name>?notification
@@ -444,31 +435,40 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
   ceph_assert(b);
   std::string data_bucket_prefix = "";
   std::string data_oid_prefix = "";
+  bool push_only = true;
   if (store->getRados()->get_sync_module()) {
     const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
     if (psmodule) {
         const auto& conf = psmodule->get_effective_conf();
         data_bucket_prefix = conf["data_bucket_prefix"];
         data_oid_prefix = conf["data_oid_prefix"];
+        // TODO: allow "push-only" on PS zone as well
+        push_only = false;
     }
   }
 
   for (const auto& c : configurations.list) {
-    const auto& sub_name = c.id;
-    if (sub_name.empty()) {
+    const auto& notif_name = c.id;
+    if (notif_name.empty()) {
       ldout(s->cct, 1) << "missing notification id" << dendl;
       op_ret = -EINVAL;
       return;
     }
     if (c.topic_arn.empty()) {
-      ldout(s->cct, 1) << "missing topic ARN" << dendl;
+      ldout(s->cct, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
       op_ret = -EINVAL;
       return;
     }
 
     const auto arn = rgw::ARN::parse(c.topic_arn);
     if (!arn || arn->resource.empty()) {
-      ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
+      ldout(s->cct, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
+      op_ret = -EINVAL;
+      return;
+    }
+
+    if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
+      ldout(s->cct, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
       op_ret = -EINVAL;
       return;
     }
@@ -476,7 +476,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     const auto topic_name = arn->resource;
 
     // get topic information. destination information is stored in the topic
-    rgw_pubsub_topic_subs topic_info;  
+    rgw_pubsub_topic topic_info;  
     op_ret = ups->get_topic(topic_name, &topic_info);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
@@ -488,53 +488,57 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     // create unique topic name. this has 2 reasons:
     // (1) topics cannot be shared between different S3 notifications because they hold the filter information
     // (2) make topic clneaup easier, when notification is removed
-    const auto unique_topic_name = topic_to_unique(topic_name, sub_name);
-    // generate the internal topic, no need to store destination info in thr unique topic
+    const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
+    // generate the internal topic. destination is stored here for the "push-only" case
+    // when no subscription exists
     // ARN is cached to make the "GET" method faster
-    op_ret = ups->create_topic(unique_topic_name, rgw_pubsub_sub_dest(), topic_info.topic.arn);
+    op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate topic '" << unique_topic_name << 
+      ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
       return;
     }
+    ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
     // generate the notification
-    EventTypeList events;
-    std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
-    op_ret = b->create_notification(unique_topic_name, events);
+    rgw::notify::EventTypeList events;
+    op_ret = b->create_notification(unique_topic_name, c.events, notif_name);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << unique_topic_name <<
+      ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
         "', ret=" << op_ret << dendl;
       // rollback generated topic (ignore return value)
       ups->remove_topic(unique_topic_name);
       return;
     }
-   
-    // generate the subscription with destination information from the original topic
-    rgw_pubsub_sub_dest dest = topic_info.topic.dest;
-    dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
-    dest.oid_prefix = data_oid_prefix + sub_name + "/";
-    auto sub = ups->get_sub(sub_name);
-    op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
-      // rollback generated notification (ignore return value)
-      b->remove_notification(unique_topic_name);
-      // rollback generated topic (ignore return value)
-      ups->remove_topic(unique_topic_name);
-      return;
+    ldout(s->cct, 20) << "successfully auto-generated notification for unique topic'" << unique_topic_name << "'" << dendl;
+  
+    if (!push_only) {
+      // generate the subscription with destination information from the original topic
+      rgw_pubsub_sub_dest dest = topic_info.dest;
+      dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
+      dest.oid_prefix = data_oid_prefix + notif_name + "/";
+      auto sub = ups->get_sub(notif_name);
+      op_ret = sub->subscribe(unique_topic_name, dest, notif_name);
+      if (op_ret < 0) {
+        ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
+        // rollback generated notification (ignore return value)
+        b->remove_notification(unique_topic_name);
+        // rollback generated topic (ignore return value)
+        ups->remove_topic(unique_topic_name);
+        return;
+      }
+      ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
     }
-    ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
   }
 }
 
 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
 class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
 private:
-  std::string sub_name;
+  std::string notif_name;
 
   int get_params() override {
     bool exists;
-    sub_name = s->info.args.get("notification", &exists);
+    notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
       ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
       return -EINVAL;
@@ -547,36 +551,15 @@ private:
     return 0;
   }
 
-  void delete_notification(const std::string& _sub_name, const RGWUserPubSub::BucketRef& b, bool must_delete) {
-    auto sub = ups->get_sub(_sub_name);
-    rgw_pubsub_sub_config sub_conf;
-    op_ret = sub->get_conf(&sub_conf);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-      return;
-    }
-    if (sub_conf.s3_id.empty()) {
-      if (must_delete) {
-        op_ret = -ENOENT;
-        ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
-      }
-      return;
-    }
-    const auto& sub_topic_name = sub_conf.topic;
-    op_ret = sub->unsubscribe(sub_topic_name);
+  void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) {
+    op_ret = b->remove_notification(topic_name);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated subscription, ret=" << op_ret << dendl;
-      return;
-    }
-    op_ret = b->remove_notification(sub_topic_name);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated notification, ret=" << op_ret << dendl;
+      ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
     }
-    op_ret = ups->remove_topic(sub_topic_name);
+    op_ret = ups->remove_topic(topic_name);
     if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to remove auto-generated topic, ret=" << op_ret << dendl;
+      ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
     }
-    return;
   }
 
 public:
@@ -594,36 +577,70 @@ void RGWPSDeleteNotif_ObjStore_S3::execute() {
   auto b = ups->get_bucket(bucket_info.bucket);
   ceph_assert(b);
 
-  if (!sub_name.empty()) {
+  // get all topics on a bucket
+  rgw_pubsub_bucket_topics bucket_topics;
+  op_ret = b->get_topics(&bucket_topics);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+    return;
+  }
+
+  if (!notif_name.empty()) {
     // delete a specific notification
-    delete_notification(sub_name, b, true);
+    const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
+    if (unique_topic) {
+      // remove the auto generated subscription according to notification name (if exist)
+      const auto unique_topic_name = unique_topic->get().topic.name;
+      auto sub = ups->get_sub(notif_name);
+      op_ret = sub->unsubscribe(unique_topic_name);
+      if (op_ret < 0 && op_ret != -ENOENT) {
+        ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
+        return;
+      }
+      remove_notification_by_topic(unique_topic_name, b);
+      return;
+    }
+    // notification to be removed is not found - considered success
+    ldout(s->cct, 20) << "notification '" << notif_name << "' already removed" << dendl;
     return;
   }
 
-  // delete all notifications on a bucket
-  rgw_pubsub_bucket_topics bucket_topics;
-  b->get_topics(&bucket_topics);
-  // loop through all topics of the bucket
+  // delete all notification of on a bucket
   for (const auto& topic : bucket_topics.topics) {
-    // for each topic get all subscriptions
+    // remove the auto generated subscription of the topic (if exist)
     rgw_pubsub_topic_subs topic_subs;
-    ups->get_topic(topic.first, &topic_subs);
-    // loop through all subscriptions
+    op_ret = ups->get_topic(topic.first, &topic_subs);
     for (const auto& topic_sub_name : topic_subs.subs) {
-      delete_notification(topic_sub_name, b, false);
+      auto sub = ups->get_sub(topic_sub_name);
+      rgw_pubsub_sub_config sub_conf;
+      op_ret = sub->get_conf(&sub_conf);
+      if (op_ret < 0) {
+        ldout(s->cct, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
+        return;
+      }
+      if (!sub_conf.s3_id.empty()) {
+        // S3 notification, has autogenerated subscription
+        const auto& sub_topic_name = sub_conf.topic;
+        op_ret = sub->unsubscribe(sub_topic_name);
+        if (op_ret < 0) {
+          ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
+          return;
+        }
+      }
     }
+    remove_notification_by_topic(topic.first, b);
   }
 }
 
 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
 class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
 private:
-  std::string sub_name;
+  std::string notif_name;
   rgw_pubsub_s3_notifications notifications;
 
   int get_params() override {
     bool exists;
-    sub_name = s->info.args.get("notification", &exists);
+    notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
       ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
       return -EINVAL;
@@ -636,9 +653,7 @@ private:
     return 0;
   }
 
-  void add_notification_to_list(const rgw_pubsub_sub_config& sub_conf, 
-      const EventTypeList& events,
-      const std::string& topic_arn);  
+  void add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter);
 
 public:
   void execute() override;
@@ -658,13 +673,11 @@ public:
   const char* name() const override { return "pubsub_notifications_get_s3"; }
 };
 
-void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_sub_config& sub_conf, 
-    const EventTypeList& events, 
-    const std::string& topic_arn) { 
+void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter) {
     rgw_pubsub_s3_notification notification;
-    notification.id = sub_conf.s3_id;
-    notification.topic_arn = topic_arn,
-    std::transform(events.begin(), events.end(), std::back_inserter(notification.events), gcp_to_s3_event);
+    notification.id = topic_filter.s3_id;
+    notification.topic_arn = topic_filter.topic.arn;
+    notification.events = topic_filter.events;
     notifications.list.push_back(notification);
 }
 
@@ -672,61 +685,32 @@ void RGWPSListNotifs_ObjStore_S3::execute() {
   ups.emplace(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
   ceph_assert(b);
-  if (!sub_name.empty()) {
+  
+  // get all topics on a bucket
+  rgw_pubsub_bucket_topics bucket_topics;
+  op_ret = b->get_topics(&bucket_topics);
+  if (op_ret < 0) {
+    ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  if (!notif_name.empty()) {
     // get info of a specific notification
-    auto sub = ups->get_sub(sub_name);
-    rgw_pubsub_sub_config sub_conf;
-    op_ret = sub->get_conf(&sub_conf);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-      return;
-    }
-    if (sub_conf.s3_id.empty()) {
-      op_ret = -ENOENT;
-      ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
+    const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
+    if (unique_topic) {
+      add_notification_to_list(unique_topic->get());
       return;
     }
-    rgw_pubsub_bucket_topics bucket_topics;
-    op_ret = b->get_topics(&bucket_topics);
-    if (op_ret < 0) {
-      ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-      return;
-    }
-    const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
-    if (topic_it == bucket_topics.topics.end()) {
-      op_ret = -ENOENT;
-      ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
-      return;
-    }
-    add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);
+    op_ret = -ENOENT;
+    ldout(s->cct, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
     return;
   }
-  // get info on all s3 notifications of the bucket
-  rgw_pubsub_bucket_topics bucket_topics;
-  b->get_topics(&bucket_topics);
   // loop through all topics of the bucket
   for (const auto& topic : bucket_topics.topics) {
-    // for each topic get all subscriptions
-    rgw_pubsub_topic_subs topic_subs;
-    ups->get_topic(topic.first, &topic_subs);
-    const auto& events = topic.second.events;
-    const auto& topic_arn = topic.second.topic.arn;
-    // loop through all subscriptions
-    for (const auto& topic_sub_name : topic_subs.subs) {
-      // get info of a specific notification
-      auto sub = ups->get_sub(topic_sub_name);
-      rgw_pubsub_sub_config sub_conf;
-      op_ret = sub->get_conf(&sub_conf);
-      if (op_ret < 0) {
-        ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
-        return;
-      }
-      if (sub_conf.s3_id.empty()) {
+    if (topic.second.s3_id.empty()) {
         // not an s3 notification
         continue;
-      }
-      add_notification_to_list(sub_conf, events, topic_arn);
     }
+    add_notification_to_list(topic.second);
   }
 }
 
index 440d085094c1cadcf36a02012285106abd35cd11..d565a902a237503fa39142baf444e3b06ba982d0 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
+#include "services/svc_zone.h"
 #include "rgw_common.h"
 #include "rgw_coroutine.h"
 #include "rgw_sync_module.h"
@@ -14,6 +15,7 @@
 #include "rgw_op.h"
 #include "rgw_pubsub.h"
 #include "rgw_pubsub_push.h"
+#include "rgw_notify_event_type.h"
 #include "rgw_perf_counters.h"
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 #include "rgw_amqp.h"
@@ -327,32 +329,6 @@ struct PSConfig {
   }
 };
 
-enum RGWPubSubEventType {
-  UNKNOWN_EVENT        = 0,
-  OBJECT_CREATE        = 1,
-  OBJECT_DELETE        = 2,
-  DELETE_MARKER_CREATE = 3,
-};
-
-#define EVENT_NAME_OBJECT_CREATE               "OBJECT_CREATE"
-#define EVENT_NAME_OBJECT_DELETE               "OBJECT_DELETE"
-#define EVENT_NAME_OBJECT_DELETE_MARKER_CREATE "DELETE_MARKER_CREATE"
-#define EVENT_NAME_UNKNOWN                    "UNKNOWN_EVENT"
-
-static const char *get_event_name(const RGWPubSubEventType& val)
-{
-  switch (val) {
-    case OBJECT_CREATE:
-      return EVENT_NAME_OBJECT_CREATE;
-    case OBJECT_DELETE:
-      return EVENT_NAME_OBJECT_DELETE;
-    case DELETE_MARKER_CREATE:
-      return EVENT_NAME_OBJECT_DELETE_MARKER_CREATE;
-    default:
-      return "EVENT_NAME_UNKNOWN";
-  };
-}
-
 using PSConfigRef = std::shared_ptr<PSConfig>;
 template<typename EventType>
 using EventRef = std::shared_ptr<EventType>;
@@ -420,12 +396,12 @@ static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
                        const rgw_obj_key& key,
                        const ceph::real_time& mtime,
                        const std::vector<std::pair<std::string, std::string> > *attrs,
-                       const string& event_name,
+                       rgw::notify::EventType event_type,
                        EventRef<rgw_pubsub_event> *event) {
   *event = std::make_shared<rgw_pubsub_event>();
 
   EventRef<rgw_pubsub_event>& e = *event;
-  e->event = event_name;
+  e->event_name = rgw::notify::to_ceph_string(event_type);
   e->source = bucket.name + "/" + key.name;
   e->timestamp = real_clock::now();
 
@@ -442,7 +418,7 @@ static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
                        const rgw_obj_key& key,
                        const ceph::real_time& mtime,
                        const std::vector<std::pair<std::string, std::string> > *attrs,
-                       const string& event_name,
+                       rgw::notify::EventType event_type,
                        EventRef<rgw_pubsub_s3_record> *record) {
   *record = std::make_shared<rgw_pubsub_s3_record>();
 
@@ -450,18 +426,19 @@ static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
   r->eventVersion = "2.1";
   r->eventSource = "aws:s3";
   r->eventTime = mtime;
-  r->eventName = event_name;
-  r->userIdentity = "";         // user that triggered the change: not supported yet
-  r->sourceIPAddress = "";      // IP address of client that triggered the change: not supported yet
-  r->x_amz_request_id = "";     // request ID of the original change: not supported yet
-  r->x_amz_id_2 = "";           // RGW on which the change was made: not supported yet
+  r->eventName = rgw::notify::to_string(event_type);
+  r->userIdentity = "";         // user that triggered the change: not supported in sync module
+  r->sourceIPAddress = "";      // IP address of client that triggered the change: not supported in sync module
+  r->x_amz_request_id = "";     // request ID of the original change: not supported in sync module
+  r->x_amz_id_2 = "";           // RGW on which the change was made: not supported in sync module
   r->s3SchemaVersion = "1.0";
   // configurationId is filled from subscription configuration
   r->bucket_name = bucket.name;
   r->bucket_ownerIdentity = owner.to_str();
   r->bucket_arn = to_string(rgw::ARN(bucket));
+  r->bucket_id = bucket.bucket_id; // rgw extension
   r->object_key = key.name;
-  r->object_size = 0;           // not supported yet
+  r->object_size = 0;           // not supported in sync module
   objstore_event oevent(bucket, key, mtime, attrs);
   r->object_etag = oevent.get_hash();
   r->object_versionId = key.instance;
@@ -1106,7 +1083,7 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   rgw_user owner;
   rgw_bucket bucket;
   rgw_obj_key key;
-  string event_name;
+  rgw::notify::EventType event_type;
 
   RGWUserPubSub ups;
 
@@ -1121,14 +1098,14 @@ public:
                       const rgw_user& _owner,
                       const rgw_bucket& _bucket,
                       const rgw_obj_key& _key,
-                      const string& _event_name,
+                      rgw::notify::EventType _event_type,
                       TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
                                                           sync_env(_sync_env),
                                                           env(_env),
                                                           owner(_owner),
                                                           bucket(_bucket),
                                                           key(_key),
-                                                          event_name(_event_name),
+                                                          event_type(_event_type),
                                                           ups(_sync_env->store, owner),
                                                           topics(_topics) {
     *topics = std::make_shared<vector<PSTopicConfigRef> >();
@@ -1167,11 +1144,12 @@ public:
       for (auto& titer : bucket_topics.topics) {
         auto& topic_filter = titer.second;
         auto& info = topic_filter.topic;
+        // if event list is defined but event does not match any in the list, we skip to the next one
         if (!topic_filter.events.empty() &&
-            topic_filter.events.find(event_name) == topic_filter.events.end()) {
+            std::find(topic_filter.events.begin(), topic_filter.events.end(), event_type) == topic_filter.events.end()) {
           continue;
         }
-        shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
+        std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
         tc->name = info.name;
         tc->subs = user_topics.topics[info.name].subs;
         (*topics)->push_back(tc);
@@ -1365,11 +1343,11 @@ public:
         make_event_ref(sync_env->cct,
                        bucket_info.bucket, key,
                        mtime, &attrs,
-                       EVENT_NAME_OBJECT_CREATE, &event);
+                       rgw::notify::ObjectCreated, &event);
         make_s3_record_ref(sync_env->cct,
                        bucket_info.bucket, bucket_info.owner, key,
                        mtime, &attrs,
-                       EVENT_NAME_OBJECT_CREATE, &record);
+                       rgw::notify::ObjectCreated, &record);
       }
 
       yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
@@ -1427,7 +1405,7 @@ public:
     reenter(this) {
       yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
                                              bucket_info.bucket, key,
-                                             EVENT_NAME_OBJECT_CREATE,
+                                             rgw::notify::ObjectCreated,
                                              &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
@@ -1455,7 +1433,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   rgw_bucket bucket;
   rgw_obj_key key;
   ceph::real_time mtime;
-  string event_name;
+  rgw::notify::EventType event_type;
   EventRef<rgw_pubsub_event> event;
   EventRef<rgw_pubsub_s3_record> record;
   TopicsRef topics;
@@ -1463,18 +1441,18 @@ public:
   RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
                            PSEnvRef _env,
                            RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
-                           RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct),
+                           rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct),
                                                              sync_env(_sync_env),
                                                              env(_env),
                                                              owner(_bucket_info.owner),
                                                              bucket(_bucket_info.bucket),
                                                              key(_key),
-                                                             mtime(_mtime), event_name(get_event_name(_event_type)) {}
+                                                             mtime(_mtime), event_type(_event_type) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_type, &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
@@ -1489,11 +1467,11 @@ public:
       make_event_ref(sync_env->cct,
                      bucket, key,
                      mtime, nullptr,
-                     event_name, &event);
+                     event_type, &event);
       make_s3_record_ref(sync_env->cct,
                      bucket, owner, key,
                      mtime, nullptr,
-                     event_name, &record);
+                     event_type, &record);
       yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
@@ -1537,14 +1515,14 @@ public:
       rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << 
           " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
+    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete);
   }
 
   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, 
       rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << 
           " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
+    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
   }
 
   PSConfigRef& get_conf() { return conf; }
@@ -1563,7 +1541,7 @@ RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFor
   }
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
   if (!rgw::amqp::init(cct)) {
-    ldout(cct, 1) << "ERROR: failed to initialize AMQP server in pubsub sync module" << dendl;
+    ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
   }
 #endif
 }
index e968360e3a92a3fac5ff7a7bbeb46b51d7ae4080..b1f250313814d0fc448f73d26ee9b2159976cf27 100644 (file)
@@ -12,6 +12,8 @@
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
 #include "rgw_arn.h"
+#include "rgw_zone.h"
+#include "services/svc_zone.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
@@ -333,7 +335,7 @@ int notif_bucket_path(const string& path, std::string& bucket_name) {
 class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
 private:
   std::string topic_name;
-  std::set<std::string, ltstr_nocase> events;
+  rgw::notify::EventTypeList events;
 
   int get_params() override {
     bool exists;
@@ -343,9 +345,17 @@ private:
       return -EINVAL;
     }
 
-    string events_str = s->info.args.get("events", &exists);
+    std::string events_str = s->info.args.get("events", &exists);
     if (exists) {
-      get_str_set(events_str, ",", events);
+        rgw::notify::from_string_list(events_str, events);
+        if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
+            ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
+            return -EINVAL;
+        }
+    } else {
+        // if no events are provided, we assume all events
+        events.push_back(rgw::notify::ObjectCreated);
+        events.push_back(rgw::notify::ObjectRemoved);
     }
     return notif_bucket_path(s->object.name, bucket_name);
   }
index b16095de1c7ef92b504eaf399ef616bbe2f25615..e37151e48fc7264cb1c02bd5d2793e64c2b46a29 100644 (file)
@@ -39,6 +39,19 @@ void set_valid_user(const std::string& user, const std::string& password) {
   VALID_PASSWORD = password;
 }
 
+std::atomic<unsigned> g_tag_skip = 0;
+std::atomic<int> g_multiple = 0;
+
+void set_multiple(unsigned tag_skip) {
+    g_multiple = 1;
+    g_tag_skip = tag_skip;
+}
+
+void reset_multiple() {
+    g_multiple = 0;
+    g_tag_skip = 0;
+}
+
 bool FAIL_NEXT_WRITE(false);
 bool FAIL_NEXT_READ(false);
 bool REPLY_ACK(true);
@@ -250,28 +263,49 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *
     // "wait" for queue
     usleep(tv->tv_sec*1000000+tv->tv_usec);
     // read from queue
-    if (REPLY_ACK) {
-      if (state->ack_list.pop(state->ack)) {
-        decoded_frame->frame_type = AMQP_FRAME_METHOD;
+    if (g_multiple) {
+      // pop multiples and reply once at the end
+      for (auto i = 0U; i < g_tag_skip; ++i) {
+        if (REPLY_ACK && !state->ack_list.pop(state->ack)) {
+          // queue is empty
+          return AMQP_STATUS_TIMEOUT;
+        } else if (!REPLY_ACK && !state->nack_list.pop(state->nack)) {
+          // queue is empty
+          return AMQP_STATUS_TIMEOUT;
+        }
+      }
+      if (REPLY_ACK) {
+        state->ack.multiple = g_multiple;
         decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
         decoded_frame->payload.method.decoded = &state->ack;
-        state->reply.reply_type = AMQP_RESPONSE_NORMAL;
-        return AMQP_STATUS_OK;
       } else {
-        // queue is empty
-        return AMQP_STATUS_TIMEOUT;
-      }
-    } else {
-      if (state->nack_list.pop(state->nack)) {
-        decoded_frame->frame_type = AMQP_FRAME_METHOD;
+        state->nack.multiple = g_multiple;
         decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
         decoded_frame->payload.method.decoded = &state->nack;
-        state->reply.reply_type = AMQP_RESPONSE_NORMAL;
-        return AMQP_STATUS_OK;
-      } else {
-        // queue is empty
-        return AMQP_STATUS_TIMEOUT;
       }
+      decoded_frame->frame_type = AMQP_FRAME_METHOD;
+      state->reply.reply_type = AMQP_RESPONSE_NORMAL;
+      reset_multiple();
+      return AMQP_STATUS_OK;
+    }
+    // pop replies one by one
+    if (REPLY_ACK && state->ack_list.pop(state->ack)) {
+      state->ack.multiple = g_multiple;
+      decoded_frame->frame_type = AMQP_FRAME_METHOD;
+      decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
+      decoded_frame->payload.method.decoded = &state->ack;
+      state->reply.reply_type = AMQP_RESPONSE_NORMAL;
+      return AMQP_STATUS_OK;
+    } else if (!REPLY_ACK && state->nack_list.pop(state->nack)) {
+      state->nack.multiple = g_multiple;
+      decoded_frame->frame_type = AMQP_FRAME_METHOD;
+      decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
+      decoded_frame->payload.method.decoded = &state->nack;
+      state->reply.reply_type = AMQP_RESPONSE_NORMAL;
+      return AMQP_STATUS_OK;
+    } else {
+      // queue is empty
+      return AMQP_STATUS_TIMEOUT;
     }
   }
   return AMQP_STATUS_CONNECTION_CLOSED;
index ac36e49000ae57067c9743eaad262458bada8c53..94fdfdddcc1c85e8f43f1692fbe586ccb07d3175 100644 (file)
@@ -9,6 +9,8 @@ void set_valid_port(int port);
 void set_valid_host(const std::string& host);
 void set_valid_vhost(const std::string& vhost);
 void set_valid_user(const std::string& user, const std::string& password);
+void set_multiple(unsigned tag);
+void reset_multiple();
   
 extern bool FAIL_NEXT_WRITE;        // default "false"
 extern bool FAIL_NEXT_READ;         // default "false"
index 5762ff741fd78f962f5629fc70dcde90b1594b7d..5918efb6f7f14052d54f1adc9a6468108b324c1f 100644 (file)
@@ -8,6 +8,7 @@ import threading
 import subprocess
 import socket
 import time
+import os
 from .tests import get_realm, \
     ZonegroupConns, \
     zonegroup_meta_checkpoint, \
@@ -27,12 +28,19 @@ from nose.tools import assert_not_equal, assert_equal
 # configure logging for the tests module
 log = logging.getLogger(__name__)
 
-skip_push_tests = True
+skip_push_tests = False
 
 ####################################
 # utility functions for pubsub tests
 ####################################
 
+def set_contents_from_string(key, content):
+    try:
+        key.set_contents_from_string(content)
+    except Exception as e:
+        print 'Error: ' + str(e) 
+
+
 # HTTP endpoint functions
 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
 
@@ -92,17 +100,19 @@ class HTTPServerThread(threading.Thread):
     def get_events(self):
         return self.httpd.events
 
+    def reset_events(self):
+        self.httpd.events = []
+
 
 class StreamingHTTPServer:
     """multi-threaded http server class also holding list of events received into the handler
     each thread has its own server, and all servers share the same socket"""
-    def __init__(self, host, port, num_workers=20):
+    def __init__(self, host, port, num_workers=100):
         addr = (host, port)
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         self.sock.bind(addr)
-        # maximum of 10 connection backlog on the listener
-        self.sock.listen(20)
+        self.sock.listen(num_workers)
         self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
 
     def verify_s3_events(self, keys, exact_match=False, deletions=False):
@@ -110,6 +120,7 @@ class StreamingHTTPServer:
         events = []
         for worker in self.workers:
             events += worker.get_events()
+            worker.reset_events()
         verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
 
     def verify_events(self, keys, exact_match=False, deletions=False):
@@ -117,6 +128,7 @@ class StreamingHTTPServer:
         events = []
         for worker in self.workers:
             events += worker.get_events()
+            worker.reset_events()
         verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
 
     def close(self):
@@ -262,10 +274,10 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
         for record in records:
             if record['s3']['bucket']['name'] == key.bucket.name and \
                 record['s3']['object']['key'] == key.name:
-                if deletions and record['eventName'] == 'ObjectRemoved':
+                if deletions and 'ObjectRemoved' in record['eventName']:
                     key_found = True
                     break
-                elif not deletions and record['eventName'] == 'ObjectCreated':
+                elif not deletions and 'ObjectCreated' in record['eventName']:
                     key_found = True
                     break
         if not key_found:
@@ -329,9 +341,10 @@ def clean_rabbitmq(proc): #, data_dir, log_dir)
     #    log.info('rabbitmq directories already removed')
 
 
-def init_env():
+def init_env(require_ps=True):
     """initialize the environment"""
-    check_ps_configured()
+    if require_ps:
+        check_ps_configured()
 
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
@@ -349,7 +362,8 @@ def init_env():
             zones.append(conn)
 
     assert_not_equal(len(zones), 0)
-    assert_not_equal(len(ps_zones), 0)
+    if require_ps:
+        assert_not_equal(len(ps_zones), 0)
     return zones, ps_zones
 
 
@@ -592,45 +606,445 @@ def test_ps_s3_notification():
     # delete the bucket
     zones[0].delete_bucket(bucket_name)
 
+def test_ps_s3_topic_on_master():
+    """ test s3 notification set/get/delete on master """
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name + TOPIC_SUFFIX
+   
+    # create s3 topics
+    endpoint_address = 'amqp://127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf1.set_config()
+    assert_equal(topic_arn,
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
+
+    endpoint_address = 'http://127.0.0.1:9001'
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf2.set_config()
+    assert_equal(topic_arn,
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
+    endpoint_address = 'http://127.0.0.1:9002'
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf3.set_config()
+    assert_equal(topic_arn,
+                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
+
+    try:
+        # get topic 3
+        result, status = topic_conf3.get_config()
+        assert_equal(status, 200)
+        assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+        assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+        # Note that endpoint args may be ordered differently in the result
+
+        # delete topic 1
+        result = topic_conf1.del_config()
+        assert_equal(status, 200)
+
+        # try to get a deleted topic (1)
+        _, status = topic_conf1.get_config()
+        assert_equal(status, 404)
+
+        # get the remaining 2 topics
+        result = topic_conf1.get_list()
+        assert_equal(len(result['Topics']), 2)
+        
+        # delete topics
+        result = topic_conf2.del_config()
+        # TODO: should be 200OK
+        # assert_equal(status, 200)
+        result = topic_conf3.del_config()
+        # TODO: should be 200OK
+        # assert_equal(status, 200)
+
+        # get topic list, make sure it is empty
+        result = topic_conf1.get_list()
+        assert_equal(len(result['Topics']), 0)
+    except AssertionError as e:
+        # topics are stored at user level, so cleanup is needed
+        # to prevent failures in consequent runs
+        topic_conf1.del_config()
+        topic_conf2.del_config()
+        topic_conf3.del_config()
+        raise e
+
 
 def test_ps_s3_notification_on_master():
     """ test s3 notification set/get/delete on master """
-    zones, _  = init_env()
+    zones, _  = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
-    # create bucket on the first of the rados zones
-    zones[0].create_bucket(bucket_name)
+    # create bucket
+    bucket = zones[0].create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
     # create s3 topic
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
+    endpoint_address = 'amqp://127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
+    topic_conf_list = [{'Id': notification_name+'_1',
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
+                       },
+                       {'Id': notification_name+'_2',
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectRemoved:*']
+                       },
+                       {'Id': notification_name+'_3',
+                        'TopicArn': topic_arn,
+                        'Events': []
                        }]
     s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
+    _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     # get notifications on a bucket
+    response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
+    assert_equal(status/100, 2)
+    assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+
+    # delete specific notifications
+    _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
+    assert_equal(status/100, 2)
+
+    # get the remaining 2 notifications on a bucket
     response, status = s3_notification_conf.get_config()
     assert_equal(status/100, 2)
-    assert_equal(len(response['TopicConfigurations']), 1)
+    assert_equal(len(response['TopicConfigurations']), 2)
     assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
+    assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
 
-    # delete specific notifications
-    _, status = s3_notification_conf.del_config(notification=notification_name)
+    # delete remaining notifications
+    _, status = s3_notification_conf.del_config()
     assert_equal(status/100, 2)
 
+    # make sure that the notifications are now deleted
+    _, status = s3_notification_conf.get_config()
+
     # cleanup
     topic_conf.del_config()
     # delete the bucket
     zones[0].delete_bucket(bucket_name)
 
 
+def test_ps_s3_notification_errors_on_master():
+    """ test s3 notification set/get/delete on master """
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    bucket_name = gen_bucket_name()
+    # create bucket
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+    # create s3 topic
+    endpoint_address = 'amqp://127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+
+    # create s3 notification with invalid event name
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:Kaboom']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    try:
+      result, status = s3_notification_conf.set_config()
+    except Exception as error:
+      print str(error) + ' - is expected'
+    else:
+      assert False, 'invalid event name is expected to fail'
+
+    # create s3 notification with missing name
+    topic_conf_list = [{'Id': '',
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:Put']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    try:
+      _, _ = s3_notification_conf.set_config()
+    except Exception as error:
+      print str(error) + ' - is expected'
+    else:
+      assert False, 'missing notification name is expected to fail'
+
+    # create s3 notification with invalid topic ARN
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': 'kaboom',
+                        'Events': ['s3:ObjectCreated:Put']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    try:
+      _, _ = s3_notification_conf.set_config()
+    except Exception as error:
+      print str(error) + ' - is expected'
+    else:
+      assert False, 'invalid ARN is expected to fail'
+
+    # create s3 notification with wrong bucket
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:Put']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, 'kaboom', topic_conf_list)
+    try:
+      _, _ = s3_notification_conf.set_config()
+    except Exception as error:
+      print str(error) + ' - is expected'
+    else:
+      assert False, 'unknown bucket is expected to fail'
+
+    # cleanup
+    topic_conf.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+
+
+def test_objcet_timing():
+    return SkipTest("only used in manual testing")
+    zones, _  = init_env(require_ps=False)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    # create objects in the bucket (async)
+    print 'creating objects...'
+    number_of_objects = 1000
+    client_threads = []
+    start_time = time.time()
+    content = str(bytearray(os.urandom(1024*1024)))
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print 'average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+    
+    print 'total number of objects: ' + str(len(list(bucket.list())))
+
+    print 'deleting objects...'
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    time_diff = time.time() - start_time
+    print 'average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+    
+    # cleanup
+    zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_s3_notification_push_amqp_on_master():
+    """ test pushing amqp s3 notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+    topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
+
+    # start amqp receivers
+    exchange = 'ex1'
+    task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
+    task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
+    task1.start()
+    task2.start()
+
+    # create two s3 topic
+    endpoint_address = 'amqp://' + hostname
+    # with acks from broker
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf1 = PSTopicS3(zones[0].conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn1 = topic_conf1.set_config()
+    # without acks from broker
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
+    topic_conf2 = PSTopicS3(zones[0].conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn2 = topic_conf2.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+                         'Events': []
+                       },
+                       {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
+                         'Events': ['s3:ObjectCreated:*']
+                       }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 100
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = 'bar'
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print 'average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+
+    # check amqp receiver
+    keys = list(bucket.list())
+    print 'total number of objects: ' + str(len(keys))
+    receiver1.verify_s3_events(keys, exact_match=True)
+    receiver2.verify_s3_events(keys, exact_match=True)
+    
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    time_diff = time.time() - start_time
+    print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    
+    # check amqp receiver 1 for deletions
+    receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+    # check amqp receiver 2 has no deletions
+    try:
+        receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
+    except:
+        pass
+    else:
+        err = 'amqp receiver 2 should have no deletions'
+        assert False, err
+
+
+    # cleanup
+    stop_amqp_receiver(receiver1, task1)
+    stop_amqp_receiver(receiver2, task2)
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    topic_conf2.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
+def test_ps_s3_notification_push_http_on_master():
+    """ test pushing http s3 notification on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    start_time = time.time()
+    content = 'bar'
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+
+    time_diff = time.time() - start_time
+    print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    
+    # check http receiver
+    keys = list(bucket.list())
+    print 'total number of objects: ' + str(len(keys))
+    http_server.verify_s3_events(keys, exact_match=True)
+    
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    time_diff = time.time() - start_time
+    print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    
+    # check http receiver
+    http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+    
+    # cleanup
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
+
 def test_ps_topic():
     """ test set/get/delete of topic """
     _, ps_zones = init_env()
@@ -661,33 +1075,6 @@ def test_ps_topic():
     assert_equal(parsed_result['Code'], 'NoSuchKey')
 
 
-def test_ps_s3_topic():
-    """ test set/get/delete of s3 topic """
-    zones, _ = init_env()
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    bucket_name = gen_bucket_name()
-    topic_name = bucket_name+TOPIC_SUFFIX
-
-    # create topic
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
-    topic_arn = topic_conf.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
-
-    # list topics
-    result = topic_conf.get_list()
-    assert len(result['Topics']) > 0
-
-    # get topic
-    result, status  = topic_conf.get_config()
-    assert_equal(status/100, 2)
-    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-
-    # delete topic
-    topic_conf.del_config()
-
-
 def test_ps_topic_with_endpoint():
     """ test set topic with endpoint"""
     _, ps_zones = init_env()
@@ -712,33 +1099,6 @@ def test_ps_topic_with_endpoint():
     topic_conf.del_config()
 
 
-def test_ps_s3_topic_with_endpoint():
-    """ test set/get/delete of s3 topic with endpoint """
-    zones, _ = init_env()
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    bucket_name = gen_bucket_name()
-    topic_name = bucket_name+TOPIC_SUFFIX
-
-    # create topic
-    endpoint_address = 'amqp://127.0.0.1:7001'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args)
-    topic_arn = topic_conf.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
-
-    # get topic
-    result, status  = topic_conf.get_config()
-    assert_equal(status/100, 2)
-    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
-    # Note that endpoint args may be ordered differently in the result
-
-    # delete topic
-    topic_conf.del_config()
-
-
 def test_ps_notification():
     """ test set/get/delete of notification """
     zones, ps_zones = init_env()
@@ -1206,6 +1566,76 @@ def test_ps_creation_triggers():
     zones[0].delete_bucket(bucket_name)
 
 
+def test_ps_s3_creation_triggers_on_master():
+    """ test object creation s3 notifications in using put/copy/post on master"""
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post']
+                       }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket using PUT
+    key = bucket.new_key('put')
+    key.set_contents_from_string('bar')
+    # create objects in the bucket using COPY
+    bucket.copy_key('copy', bucket.name, key.name)
+    # create objects in the bucket using multi-part upload
+    fp = tempfile.TemporaryFile(mode='w')
+    fp.write('bar')
+    fp.close()
+    uploader = bucket.initiate_multipart_upload('multipart')
+    fp = tempfile.TemporaryFile(mode='r')
+    uploader.upload_part_from_file(fp, 1)
+    uploader.complete_upload()
+    fp.close()
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+
+    # check amqp receiver
+    keys = list(bucket.list())
+    receiver.verify_s3_events(keys, exact_match=True)
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    for key in bucket.list():
+        key.delete()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
 def test_ps_versioned_deletion():
     """ test notification of deletion markers """
     zones, ps_zones = init_env()
@@ -1221,8 +1651,10 @@ def test_ps_versioned_deletion():
     # wait for sync
     zone_meta_checkpoint(ps_zones[0].zone)
     # create notifications
+    # TODO use 'DELETE_MARKER_CREATE'
+    event_type = 'OBJECT_DELETE'
     notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
-                                       topic_name, "OBJECT_DELETE")
+                                       topic_name, event_type)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
@@ -1249,7 +1681,7 @@ def test_ps_versioned_deletion():
     parsed_result = json.loads(result)
     for event in parsed_result['events']:
         log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
-        assert_equal(str(event['event']), 'OBJECT_DELETE')
+        assert_equal(str(event['event']), event_type)
 
     # TODO: verify we have exactly 2 events
     assert len(parsed_result['events']) >= 2
@@ -1270,6 +1702,136 @@ def test_ps_versioned_deletion():
     topic_conf.del_config()
 
 
+def test_ps_s3_metadata_on_master():
+    """ test s3 notification of metadata on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*']
+                       }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    key = bucket.new_key('foo')
+    key.set_metadata('meta1', 'This is my metadata value')
+    key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+    keys = list(bucket.list())
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    # check amqp receiver
+    receiver.verify_s3_events(keys, exact_match=True)
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    for key in bucket.list():
+        key.delete()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
+def test_ps_s3_versioned_deletion_on_master():
+    """ test s3 notification of deletion markers on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    bucket.configure_versioning(True)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
+    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectRemoved:Delete', 's3:ObjectCreated:Put']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    key = bucket.new_key('foo')
+    key.set_contents_from_string('bar')
+    v1 = key.version_id
+    key.set_contents_from_string('kaboom')
+    v2 = key.version_id
+    keys = list(bucket.list())
+    
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+    
+    # check amqp receiver
+    # Note: should not do exact match in case of versioned objects
+    receiver.verify_s3_events(keys, exact_match=False)
+    # set delete markers
+    bucket.delete_key(key.name, version_id=v2)
+    bucket.delete_key(key.name, version_id=v1)
+
+    print 'wait for 5sec for the messages...'
+    time.sleep(5)
+
+    # check amqp receiver
+    # Note: should not do exact match in case of versioned objects
+    receiver.verify_s3_events(keys, exact_match=False, deletions=True)
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
 def test_ps_push_http():
     """ test pushing to http endpoint """
     if skip_push_tests:
@@ -1426,7 +1988,7 @@ def test_ps_push_amqp():
     # create subscription
     sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
                               topic_name, endpoint='amqp://'+hostname,
-                              endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=none')
+                              endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -1946,7 +2508,7 @@ def test_ps_s3_multiple_topics_notification():
     keys = list(bucket.list())
     # TODO: use exact match
     verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
-    http_server.verify_s3_events(keys, exact_match=True)
+    http_server.verify_s3_events(keys, exact_match=False)
 
     # cleanup
     stop_amqp_receiver(receiver, amqp_task)
index 71737aaea36ec5efc8f83924d0f85a5d411d948d..d2aa5d346cebb2cf3ef90d8e17817ea7d9763a8b 100644 (file)
@@ -189,7 +189,8 @@ class PSTopicS3:
 
     def del_config(self):
         """delete topic"""
-        self.client.delete_topic(TopicArn=self.topic_arn)
+        result = self.client.delete_topic(TopicArn=self.topic_arn)
+        return result['ResponseMetadata']['HTTPStatusCode']
     
     def get_list(self):
         """list all topics"""
index 18a2f428aa58af390db5d2ba4ef78fca396149f5..6acba7d7323b4e0f4c820ee0ccc2206021758bb0 100644 (file)
@@ -14,9 +14,18 @@ Since we use the same entry point file for all tests, running specific tests is
 ```
 $ nosetests test_multi.py:<specific_test_name>
 ```
+To run miltiple tests based on wildcard string, use the following format:
+```
+$ nosetests test_multi.py -m "<wildcard string>"
+```
 Note that the test to run, does not have to be inside the `test_multi.py` file.
 Note that different options for running specific and multiple tests exists in the [nose documentation](https://nose.readthedocs.io/en/latest/usage.html#options), as well as other options to control the execution of the tests.
 ## Configuration
+### Environment Variables
+Following RGW environment variables are taken into consideration when running the tests:
+ - `RGW_FRONTEND`: used to change frontend to 'civetweb' or 'beast' (default)
+ - `RGW_VALGRIND`: used to run the radosgw under valgrind. e.g. RGW_VALGRIND=yes
+Other environment variables used to configure elements other than RGW can also be used as they are used in vstart.sh. E.g. MON, OSD, MGR, MSD
 The configuration file for the run has 3 sections:
 ### Default
 This section holds the following parameters:
@@ -42,6 +51,8 @@ This section holds the following parameters:
 *TODO*
 ### Cloud
 *TODO*
+### PubSub
+*TODO*
 ## Writing Tests
 New tests should be added into the `/path/to/ceph/src/test/rgw/rgw_multi` subdirectory.
 - Base classes are in: `/path/to/ceph/src/test/rgw/rgw_multi/multisite.py`
index 89086d3ec8270e5937042221e9beb39f25f6d1b0..e7147ed1fc83eee557ac97df68de883c851edfa1 100644 (file)
@@ -185,6 +185,8 @@ TEST_F(TestAMQP, MaxConnections)
 
 std::atomic<bool> callback_invoked = false;
 
+std::atomic<int> callbacks_invoked = 0;
+
 // note: because these callback are shared among different "publish" calls
 // they should be used on different connections
 
@@ -198,6 +200,25 @@ void my_callback_expect_nack(int rc) {
   callback_invoked = true;
 }
 
+void my_callback_expect_multiple_acks(int rc) {
+  EXPECT_EQ(0, rc);
+  ++callbacks_invoked;
+}
+
+class dynamic_callback_wrapper {
+    dynamic_callback_wrapper() = default;
+public:
+    static dynamic_callback_wrapper* create() {
+        return new dynamic_callback_wrapper;
+    }
+    void callback(int rc) {
+      EXPECT_EQ(0, rc);
+      ++callbacks_invoked;
+      delete this;
+    }
+};
+
+
 TEST_F(TestAMQP, ReceiveAck)
 {
   callback_invoked = false;
@@ -213,6 +234,63 @@ TEST_F(TestAMQP, ReceiveAck)
   amqp_mock::set_valid_host("localhost");
 }
 
+TEST_F(TestAMQP, ReceiveMultipleAck)
+{
+  callbacks_invoked = 0;
+  const std::string host("localhost1");
+  amqp_mock::set_valid_host(host);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  EXPECT_TRUE(conn);
+  const auto NUMBER_OF_CALLS = 100;
+  for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
+    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+    EXPECT_EQ(rc, 0);
+  }
+  std::this_thread::sleep_for(wait_time);
+  EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+  callbacks_invoked = 0;
+  amqp_mock::set_valid_host("localhost");
+}
+
+TEST_F(TestAMQP, ReceiveAckForMultiple)
+{
+  callbacks_invoked = 0;
+  const std::string host("localhost1");
+  amqp_mock::set_valid_host(host);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  EXPECT_TRUE(conn);
+  amqp_mock::set_multiple(59);
+  const auto NUMBER_OF_CALLS = 100;
+  for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
+    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+    EXPECT_EQ(rc, 0);
+  }
+  std::this_thread::sleep_for(wait_time);
+  EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+  callbacks_invoked = 0;
+  amqp_mock::set_valid_host("localhost");
+}
+
+TEST_F(TestAMQP, DynamicCallback)
+{
+  callbacks_invoked = 0;
+  const std::string host("localhost1");
+  amqp_mock::set_valid_host(host);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  EXPECT_TRUE(conn);
+  amqp_mock::set_multiple(59);
+  const auto NUMBER_OF_CALLS = 100;
+  for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
+    auto rc = publish_with_confirm(conn, "topic", "message",
+            std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
+    EXPECT_EQ(rc, 0);
+  }
+  std::this_thread::sleep_for(wait_time);
+  EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+  callbacks_invoked = 0;
+  amqp_mock::set_valid_host("localhost");
+}
+
 TEST_F(TestAMQP, ReceiveNack)
 {
   callback_invoked = false;
@@ -337,3 +415,57 @@ TEST_F(TestAMQP, RetryFailWrite)
   amqp_mock::set_valid_host("localhost");
 }
 
+int fail_after = -1;
+int recover_after = -1;
+bool expect_zero_rc = true;
+
+void my_callback_triggering_failure(int rc) {
+  if (expect_zero_rc) {
+      EXPECT_EQ(rc, 0);
+  } else {
+      EXPECT_NE(rc, 0);
+  }
+  ++callbacks_invoked;
+  if (fail_after == callbacks_invoked) {
+    amqp_mock::FAIL_NEXT_READ = true;
+    expect_zero_rc = false;
+
+  }
+  if (recover_after == callbacks_invoked) {
+    amqp_mock::FAIL_NEXT_READ = false;
+  }
+}
+
+TEST_F(TestAMQP, AcksWithReconnect)
+{
+  callbacks_invoked = 0;
+  const std::string host("localhost1");
+  amqp_mock::set_valid_host(host);
+  amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+  EXPECT_TRUE(conn);
+  amqp_mock::set_multiple(59);
+  // failure will take effect after: max(59, 70)
+  fail_after = 70;
+  // all callback are flushed during failure, so, recover will take effect after: max(90, 100)
+  recover_after = 90;
+  const auto NUMBER_OF_CALLS = 100;
+  for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
+    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
+    EXPECT_EQ(rc, 0);
+  }
+  // connection failes before multiple acks
+  std::this_thread::sleep_for(wait_time);
+  EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+  // publish more mesages
+  expect_zero_rc = true;
+  for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
+    auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
+    EXPECT_EQ(rc, 0);
+  }
+  std::this_thread::sleep_for(wait_time);
+  EXPECT_EQ(callbacks_invoked, 2*NUMBER_OF_CALLS);
+  callbacks_invoked = 0;
+  amqp_mock::set_valid_host("localhost");
+  fail_after = -1;
+}
+