#if defined(WITH_RADOSGW_BEAST_FRONTEND)
#include "rgw_asio_frontend.h"
#endif /* WITH_RADOSGW_BEAST_FRONTEND */
-
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-#include "rgw_amqp.h"
-#endif /* WITH_RADOSGW_AMQP_ENDPOINT */
-
#include "rgw_dmclock_scheduler_ctx.h"
#include "services/svc_zone.h"
FCGX_Init();
#endif
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- rgw::amqp::init(cct.get());
-#endif
-
RGWRados *store =
RGWStoreManager::get_storage(g_ceph_context,
g_conf()->rgw_enable_gc_threads,
rgw_http_client_cleanup();
rgw::curl::cleanup_curl();
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- rgw::amqp::shutdown();
-#endif
-
rgw_perf_stop(g_ceph_context);
dout(1) << "final shutdown" << dendl;
WRITE_CLASS_ENCODER(rgw_pubsub_event)
struct rgw_pubsub_sub_dest {
- rgw_pubsub_sub_dest() : bucket_name(""), oid_prefix(""), push_endpoint(""), push_endpoint_args("") {}
std::string bucket_name;
std::string oid_prefix;
std::string push_endpoint;
#include "rgw_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_perf_counters.h"
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+#include "rgw_amqp.h"
+#endif
#include <boost/algorithm/hex.hpp>
#include <boost/asio/yield.hpp>
} else {
effective_conf.decode_json(&p);
}
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ if (!rgw::amqp::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to initialize AMQP server in pubsub sync module" << dendl;
+ }
+#endif
+}
+
+RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ rgw::amqp::shutdown();
+#endif
}
RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
JSONFormattable effective_conf;
public:
RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config);
+ ~RGWPSSyncModuleInstance();
RGWDataSyncModule *get_data_handler() override;
RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
bool supports_user_writes() override {
}
if (sub_conf.s3_id.empty()) {
if (must_delete) {
- ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
op_ret = -ENOENT;
+ ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
}
return;
}
return;
}
if (sub_conf.s3_id.empty()) {
- ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
op_ret = -ENOENT;
+ ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
return;
}
rgw_pubsub_bucket_topics bucket_topics;
}
const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
if (topic_it == bucket_topics.topics.end()) {
- ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
op_ret = -ENOENT;
+ ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
return;
}
add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);