#include "rgw_http_client_curl.h"
#include "rgw_zone.h"
#include "rgw_pubsub.h"
+#include "rgw_sync_module_pubsub.h"
#include "services/svc_sync_modules.h"
return EINVAL;
}
- auto& tier_config = get_tier_config(store);
-
rgw_pubsub_sub_dest dest_config;
dest_config.bucket_name = sub_dest_bucket;
dest_config.oid_prefix = sub_oid_prefix;
dest_config.push_endpoint = sub_push_endpoint;
+ auto psmodule = static_cast<RGWPSSyncModuleInstance *>(store->get_sync_module().get());
+ auto conf = psmodule->get_effective_conf();
+
if (dest_config.bucket_name.empty()) {
- dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
+ dest_config.bucket_name = string(conf["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
}
if (dest_config.oid_prefix.empty()) {
- dest_config.oid_prefix = tier_config["data_oid_prefix"];
+ dest_config.oid_prefix = conf["data_oid_prefix"];
}
auto sub = ups.get_sub(sub_name);
ret = sub->subscribe(topic_name, dest_config);
return ret;
}
-
int RGWRados::register_to_service_map(const string& daemon_type, const map<string, string>& meta)
{
map<string,string> metadata = meta;
{
int ret;
- if (run_sync_thread) {
- auto& zone_public_config = svc.zone->get_zone();
- ret = svc.sync_modules->get_manager()->create_instance(cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module);
- if (ret < 0) {
- lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
- if (ret == -ENOENT) {
- lderr(cct) << "ERROR: " << zone_public_config.tier_type
- << " sync module does not exist. valid sync modules: "
- << svc.sync_modules->get_manager()->get_registered_module_names()
- << dendl;
- }
- return ret;
+ /*
+ * create sync module instance even if we don't run sync thread, might need it for radosgw-admin
+ */
+ auto& zone_public_config = svc.zone->get_zone();
+ ret = svc.sync_modules->get_manager()->create_instance(cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module);
+ if (ret < 0) {
+ lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
+ if (ret == -ENOENT) {
+ lderr(cct) << "ERROR: " << zone_public_config.tier_type
+ << " sync module does not exist. valid sync modules: "
+ << svc.sync_modules->get_manager()->get_registered_module_names()
+ << dendl;
}
+ return ret;
}
period_puller.reset(new RGWPeriodPuller(this));
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
}
+
+ PSConfigRef& get_conf() { return conf; }
};
RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
{
data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
+ string jconf = json_str("conf", *data_handler->get_conf());
+ JSONParser p;
+ if (!p.parse(jconf.c_str(), jconf.size())) {
+ ldout(cct, 0) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
+ effective_conf = config;
+ } else {
+ effective_conf.decode_json(&p);
+ }
}
RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
class RGWPSSyncModuleInstance : public RGWSyncModuleInstance {
std::unique_ptr<RGWPSDataSyncModule> data_handler;
+ JSONFormattable effective_conf;
public:
RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config);
RGWDataSyncModule *get_data_handler() override;
bool supports_user_writes() override {
return true;
}
+ const JSONFormattable& get_effective_conf() {
+ return effective_conf;
+ }
};
#endif