#include "rgw_sync_module_pubsub_rest.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
#include "rgw_cr_tools.h"
#include "rgw_op.h"
#include "rgw_pubsub.h"
using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
class PSSubscription {
+ class InitCR;
+ class StoreEventCR;
+ friend class InitCR;
+ friend class StoreEventCR;
+
RGWDataSyncEnv *sync_env;
PSEnvRef env;
PSSubConfigRef sub_conf;
RGWDataAccessRef data_access;
RGWDataAccess::BucketRef bucket;
+ struct push_endpoint_info {
+ shared_ptr<RGWRESTConn> conn;
+ string path;
+ } push;
+
class InitCR;
InitCR *init_cr{nullptr};
int retention_days;
rgw_bucket_lifecycle_config_params lc_config;
+
public:
InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
PSConfigRef& _conf,
return 0;
}
};
+
class InitCR : public RGWSingletonCR<bool> {
RGWDataSyncEnv *sync_env;
PSSubscriptionRef sub;
PSConfigRef& conf;
PSSubConfigRef& sub_conf;
int i;
+
+ bool split_endpoint(const string& push_endpoint, string *addr, string *path) {
+ if (push_endpoint.size() < 9) { /* http://x/ */
+ return false;
+ }
+ size_t pos = push_endpoint.find(':');
+ if (pos == string::npos || pos >= push_endpoint.size() - 1) {
+ return false;
+ }
+
+ string protocol = push_endpoint.substr(0, pos);
+ string s = push_endpoint.substr(pos + 1);
+
+ if (s.size() < 4) { /* //x/ */
+ return false;
+ }
+
+ size_t slash_pos = s.find('/', 2);
+ if (slash_pos == string::npos) {
+ return false;
+ }
+
+ pos += slash_pos;
+
+ *addr = push_endpoint.substr(0, pos + 1);
+ *path = push_endpoint.substr(pos + 1);
+
+ return true;
+ }
+
public:
InitCR(RGWDataSyncEnv *_sync_env,
PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
return set_cr_error(retcode);
}
+
+ if (!sub_conf->push_endpoint.empty()) {
+ string remote_id = string("pubsub:sub:") + sub->get_bucket_info_result->bucket_info.owner.to_str() + ":" + sub_conf->name;
+ string addr;
+ if (split_endpoint(sub_conf->push_endpoint, &addr, &sub->push.path)) {
+ list<string> endpoints{addr};
+ sub->push.conn = std::make_shared<RGWRESTConn>(sync_env->cct, sync_env->store, remote_id, endpoints);
+ } else {
+ ldout(sync_env->cct, 20) << "failed to split push endpoint: " << sub_conf->push_endpoint << dendl;
+ }
+ }
+
return set_cr_done();
}
}
};
+ using PushCR = RGWPostRESTResourceCR<rgw_pubsub_event, int>;
+
class StoreEventCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSSubscriptionRef sub;
+ EventRef event;
PSEvent pse;
PSConfigRef& conf;
PSSubConfigRef& sub_conf;
EventRef& _event) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
sub(_sub),
+ event(_event),
pse(_event),
conf(sub->env->conf),
sub_conf(sub->sub_conf) {
return set_cr_error(retcode);
}
+ if (sub->push.conn) {
+ yield {
+ rgw_http_param_pair params[] = {
+ { nullptr, nullptr }
+ };
+
+ call(new PushCR(sync_env->cct, sub->push.conn.get(),
+ sync_env->http_manager,
+ sub->push.path,
+ params, *event, nullptr));
+ }
+ }
+
return set_cr_done();
}
return 0;