]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: pubsub: trivial push notifications
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 3 Aug 2018 22:48:53 +0000 (15:48 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:43 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 0e21b60f3341e2cd2f07218b644f6de8b1a20fab..d58a8678c91249d8e7d61b714c892c5132189556 100644 (file)
@@ -6,6 +6,7 @@
 #include "rgw_sync_module_pubsub_rest.h"
 #include "rgw_rest_conn.h"
 #include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
 #include "rgw_cr_tools.h"
 #include "rgw_op.h"
 #include "rgw_pubsub.h"
@@ -545,6 +546,11 @@ class PSSubscription;
 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
 
 class PSSubscription {
+  class InitCR;
+  class StoreEventCR;
+  friend class InitCR;
+  friend class StoreEventCR;
+
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
   PSSubConfigRef sub_conf;
@@ -553,6 +559,11 @@ class PSSubscription {
   RGWDataAccessRef data_access;
   RGWDataAccess::BucketRef bucket;
 
+  struct push_endpoint_info {
+    shared_ptr<RGWRESTConn> conn;
+    string path;
+  } push;
+
   class InitCR;
   InitCR *init_cr{nullptr};
 
@@ -564,6 +575,7 @@ class PSSubscription {
     int retention_days;
 
     rgw_bucket_lifecycle_config_params lc_config;
+
   public:
     InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
            PSConfigRef& _conf,
@@ -621,6 +633,7 @@ class PSSubscription {
       return 0;
     }
   };
+
   class InitCR : public RGWSingletonCR<bool> {
     RGWDataSyncEnv *sync_env;
     PSSubscriptionRef sub;
@@ -629,6 +642,36 @@ class PSSubscription {
     PSConfigRef& conf;
     PSSubConfigRef& sub_conf;
     int i;
+
+    bool split_endpoint(const string& push_endpoint, string *addr, string *path) {
+      if (push_endpoint.size() < 9) { /* http://x/ */
+        return false;
+      }
+      size_t pos = push_endpoint.find(':');
+      if (pos == string::npos || pos >= push_endpoint.size() - 1) {
+        return false;
+      }
+
+      string protocol = push_endpoint.substr(0, pos);
+      string s = push_endpoint.substr(pos + 1);
+
+      if (s.size() < 4) { /* //x/ */
+        return false;
+      }
+
+      size_t slash_pos = s.find('/', 2);
+      if (slash_pos == string::npos) {
+        return false;
+      }
+
+      pos += slash_pos;
+
+      *addr = push_endpoint.substr(0, pos + 1);
+      *path = push_endpoint.substr(pos + 1);
+
+      return true;
+    }
+
   public:
     InitCR(RGWDataSyncEnv *_sync_env,
            PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
@@ -671,6 +714,18 @@ class PSSubscription {
               ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
               return set_cr_error(retcode);
             }
+
+            if (!sub_conf->push_endpoint.empty()) {
+              string remote_id = string("pubsub:sub:") + sub->get_bucket_info_result->bucket_info.owner.to_str() + ":" + sub_conf->name;
+              string addr;
+              if (split_endpoint(sub_conf->push_endpoint, &addr, &sub->push.path)) {
+                list<string> endpoints{addr};
+                sub->push.conn = std::make_shared<RGWRESTConn>(sync_env->cct, sync_env->store, remote_id, endpoints);
+              } else {
+                ldout(sync_env->cct, 20) << "failed to split push endpoint: " << sub_conf->push_endpoint << dendl;
+              }
+            }
+
             return set_cr_done();
           }
 
@@ -698,9 +753,12 @@ class PSSubscription {
     }
   };
 
+  using PushCR = RGWPostRESTResourceCR<rgw_pubsub_event, int>;
+
   class StoreEventCR : public RGWCoroutine {
     RGWDataSyncEnv *sync_env;
     PSSubscriptionRef sub;
+    EventRef event;
     PSEvent pse;
     PSConfigRef& conf;
     PSSubConfigRef& sub_conf;
@@ -713,6 +771,7 @@ class PSSubscription {
                  EventRef& _event) : RGWCoroutine(_sync_env->cct),
                                      sync_env(_sync_env),
                                      sub(_sub),
+                                     event(_event),
                                      pse(_event),
                                      conf(sub->env->conf),
                                      sub_conf(sub->sub_conf) {
@@ -743,6 +802,19 @@ class PSSubscription {
           return set_cr_error(retcode);
         }
 
+        if (sub->push.conn) {
+          yield {
+            rgw_http_param_pair params[] = {
+              { nullptr, nullptr }
+            };
+
+            call(new PushCR(sync_env->cct, sub->push.conn.get(),
+                            sync_env->http_manager,
+                            sub->push.path,
+                            params, *event, nullptr));
+          }
+        }
+
         return set_cr_done();
       }
       return 0;