From 239a53d3cbe857a464b0d2b264c3d13cff625d59 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 27 Jul 2018 16:23:13 -0700 Subject: [PATCH] rgw: pubsub: dest bucket registers with lifecycle So that events expire after X days Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync_module_pubsub.cc | 95 +++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 6 deletions(-) diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 1e82c3bae5d..3a0de9f570d 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -15,6 +15,8 @@ #define dout_subsys ceph_subsys_rgw +#define PUBSUB_EVENTS_RETENTION_DEFAULT 7 + /* config: @@ -25,6 +27,8 @@ config: "data_bucket_prefix": # default: "pubsub-" "data_oid_prefix": # + "events_retention_days": # default: 7 + # non-dynamic config "notifications": [ { @@ -175,6 +179,8 @@ struct PSConfig { string data_bucket_prefix; string data_oid_prefix; + int events_retention_days{0}; + uint64_t sync_instance{0}; uint64_t max_id{0}; @@ -188,6 +194,7 @@ struct PSConfig { encode_json("user", user, f); encode_json("data_bucket_prefix", data_bucket_prefix, f); encode_json("data_oid_prefix", data_bucket_prefix, f); + encode_json("events_retention_days", events_retention_days, f); encode_json("sync_instance", sync_instance, f); encode_json("max_id", max_id, f); { @@ -227,6 +234,7 @@ struct PSConfig { user = rgw_user(config["tenant"], uid); data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); data_oid_prefix = config["data_oid_prefix"]; + events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT); for (auto& c : config["notifications"].array()) { PSNotificationConfig nc; @@ -543,6 +551,71 @@ class PSSubscription { class InitCR; InitCR *init_cr{nullptr}; + class InitBucketLifecycleCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSConfigRef& conf; + LCRule rule; + + int retention_days; + + rgw_bucket_lifecycle_config_params lc_config; + public: + InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env, + PSConfigRef& _conf, + RGWBucketInfo& _bucket_info, + map& _bucket_attrs) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + conf(_conf) { + lc_config.bucket_info = _bucket_info; + lc_config.bucket_attrs = _bucket_attrs; + retention_days = conf->events_retention_days; + } + + int operate() override { + reenter(this) { + + rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days); + + { + /* maybe we already have it configured? */ + RGWLifecycleConfiguration old_config; + auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC); + if (aiter != lc_config.bucket_attrs.end()) { + bufferlist::const_iterator iter{&aiter->second}; + try { + old_config.decode(iter); + } catch (const buffer::error& e) { + ldout(cct, 0) << __func__ << "(): decode life cycle config failed" << dendl; + } + } + + auto old_rules = old_config.get_rule_map(); + for (auto ori : old_rules) { + auto& old_rule = ori.second; + + if (old_rule.get_prefix().empty() && + old_rule.get_expiration().get_days() == retention_days && + old_rule.is_enabled()) { + ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl; + return set_cr_done(); + } + } + } + + lc_config.config.add_rule(&rule); + yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados, + sync_env->store, + lc_config)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; + return set_cr_error(retcode); + } + + return set_cr_done(); + } + return 0; + } + }; class InitCR : public RGWSingletonCR { RGWDataSyncEnv *sync_env; PSSubscriptionRef sub; @@ -575,13 +648,23 @@ class PSSubscription { << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; } if (retcode == 0) { - auto& result = sub->get_bucket_info_result; - sub->bucket_info = &result->bucket_info; + { + auto& result = sub->get_bucket_info_result; + sub->bucket_info = &result->bucket_info; + + int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); + if (ret < 0) { + ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; + return set_cr_error(ret); + } + } - int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; - return set_cr_error(ret); + yield call(new InitBucketLifecycleCR(sync_env, conf, + sub->get_bucket_info_result->bucket_info, + sub->get_bucket_info_result->attrs)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; + return set_cr_error(retcode); } return set_cr_done(); } -- 2.39.5