From: Yehuda Sadeh Date: Fri, 3 Aug 2018 22:48:53 +0000 (-0700) Subject: rgw: pubsub: trivial push notifications X-Git-Tag: v14.1.0~616^2~18 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=31b352fcfb6be5353f873c1139d6554bcb06087a;p=ceph-ci.git rgw: pubsub: trivial push notifications Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 0e21b60f334..d58a8678c91 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -6,6 +6,7 @@ #include "rgw_sync_module_pubsub_rest.h" #include "rgw_rest_conn.h" #include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" #include "rgw_cr_tools.h" #include "rgw_op.h" #include "rgw_pubsub.h" @@ -545,6 +546,11 @@ class PSSubscription; using PSSubscriptionRef = std::shared_ptr; class PSSubscription { + class InitCR; + class StoreEventCR; + friend class InitCR; + friend class StoreEventCR; + RGWDataSyncEnv *sync_env; PSEnvRef env; PSSubConfigRef sub_conf; @@ -553,6 +559,11 @@ class PSSubscription { RGWDataAccessRef data_access; RGWDataAccess::BucketRef bucket; + struct push_endpoint_info { + shared_ptr conn; + string path; + } push; + class InitCR; InitCR *init_cr{nullptr}; @@ -564,6 +575,7 @@ class PSSubscription { int retention_days; rgw_bucket_lifecycle_config_params lc_config; + public: InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env, PSConfigRef& _conf, @@ -621,6 +633,7 @@ class PSSubscription { return 0; } }; + class InitCR : public RGWSingletonCR { RGWDataSyncEnv *sync_env; PSSubscriptionRef sub; @@ -629,6 +642,36 @@ class PSSubscription { PSConfigRef& conf; PSSubConfigRef& sub_conf; int i; + + bool split_endpoint(const string& push_endpoint, string *addr, string *path) { + if (push_endpoint.size() < 9) { /* http://x/ */ + return false; + } + size_t pos = push_endpoint.find(':'); + if (pos == string::npos || pos >= push_endpoint.size() - 1) { + return false; + } + + string protocol = push_endpoint.substr(0, pos); + string s = push_endpoint.substr(pos + 1); + + if (s.size() < 4) { /* //x/ */ + return false; + } + + size_t slash_pos = s.find('/', 2); + if (slash_pos == string::npos) { + return false; + } + + pos += slash_pos; + + *addr = push_endpoint.substr(0, pos + 1); + *path = push_endpoint.substr(pos + 1); + + return true; + } + public: InitCR(RGWDataSyncEnv *_sync_env, PSSubscriptionRef& _sub) : RGWSingletonCR(_sync_env->cct), @@ -671,6 +714,18 @@ class PSSubscription { ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; return set_cr_error(retcode); } + + if (!sub_conf->push_endpoint.empty()) { + string remote_id = string("pubsub:sub:") + sub->get_bucket_info_result->bucket_info.owner.to_str() + ":" + sub_conf->name; + string addr; + if (split_endpoint(sub_conf->push_endpoint, &addr, &sub->push.path)) { + list endpoints{addr}; + sub->push.conn = std::make_shared(sync_env->cct, sync_env->store, remote_id, endpoints); + } else { + ldout(sync_env->cct, 20) << "failed to split push endpoint: " << sub_conf->push_endpoint << dendl; + } + } + return set_cr_done(); } @@ -698,9 +753,12 @@ class PSSubscription { } }; + using PushCR = RGWPostRESTResourceCR; + class StoreEventCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSSubscriptionRef sub; + EventRef event; PSEvent pse; PSConfigRef& conf; PSSubConfigRef& sub_conf; @@ -713,6 +771,7 @@ class PSSubscription { EventRef& _event) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sub(_sub), + event(_event), pse(_event), conf(sub->env->conf), sub_conf(sub->sub_conf) { @@ -743,6 +802,19 @@ class PSSubscription { return set_cr_error(retcode); } + if (sub->push.conn) { + yield { + rgw_http_param_pair params[] = { + { nullptr, nullptr } + }; + + call(new PushCR(sync_env->cct, sub->push.conn.get(), + sync_env->http_manager, + sub->push.path, + params, *event, nullptr)); + } + } + return set_cr_done(); } return 0;