#define dout_subsys ceph_subsys_rgw
+#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
+
/*
config:
"data_bucket_prefix": <prefix> # default: "pubsub-"
"data_oid_prefix": <prefix> #
+ "events_retention_days": <days> # default: 7
+
# non-dynamic config
"notifications": [
{
string data_bucket_prefix;
string data_oid_prefix;
+ int events_retention_days{0};
+
uint64_t sync_instance{0};
uint64_t max_id{0};
encode_json("user", user, f);
encode_json("data_bucket_prefix", data_bucket_prefix, f);
encode_json("data_oid_prefix", data_bucket_prefix, f);
+ encode_json("events_retention_days", events_retention_days, f);
encode_json("sync_instance", sync_instance, f);
encode_json("max_id", max_id, f);
{
user = rgw_user(config["tenant"], uid);
data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
data_oid_prefix = config["data_oid_prefix"];
+ events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
for (auto& c : config["notifications"].array()) {
PSNotificationConfig nc;
class InitCR;
InitCR *init_cr{nullptr};
+ class InitBucketLifecycleCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSConfigRef& conf;
+ LCRule rule;
+
+ int retention_days;
+
+ rgw_bucket_lifecycle_config_params lc_config;
+ public:
+ InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
+ PSConfigRef& _conf,
+ RGWBucketInfo& _bucket_info,
+ map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ conf(_conf) {
+ lc_config.bucket_info = _bucket_info;
+ lc_config.bucket_attrs = _bucket_attrs;
+ retention_days = conf->events_retention_days;
+ }
+
+ int operate() override {
+ reenter(this) {
+
+ rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
+
+ {
+ /* maybe we already have it configured? */
+ RGWLifecycleConfiguration old_config;
+ auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
+ if (aiter != lc_config.bucket_attrs.end()) {
+ bufferlist::const_iterator iter{&aiter->second};
+ try {
+ old_config.decode(iter);
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << __func__ << "(): decode life cycle config failed" << dendl;
+ }
+ }
+
+ auto old_rules = old_config.get_rule_map();
+ for (auto ori : old_rules) {
+ auto& old_rule = ori.second;
+
+ if (old_rule.get_prefix().empty() &&
+ old_rule.get_expiration().get_days() == retention_days &&
+ old_rule.is_enabled()) {
+ ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl;
+ return set_cr_done();
+ }
+ }
+ }
+
+ lc_config.config.add_rule(&rule);
+ yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
+ sync_env->store,
+ lc_config));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+ };
class InitCR : public RGWSingletonCR<bool> {
RGWDataSyncEnv *sync_env;
PSSubscriptionRef sub;
<< get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
}
if (retcode == 0) {
- auto& result = sub->get_bucket_info_result;
- sub->bucket_info = &result->bucket_info;
+ {
+ auto& result = sub->get_bucket_info_result;
+ sub->bucket_info = &result->bucket_info;
+
+ int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
+ return set_cr_error(ret);
+ }
+ }
- int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
- if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
- return set_cr_error(ret);
+ yield call(new InitBucketLifecycleCR(sync_env, conf,
+ sub->get_bucket_info_result->bucket_info,
+ sub->get_bucket_info_result->attrs));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
+ return set_cr_error(retcode);
}
return set_cr_done();
}