]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: pubsub: dest bucket registers with lifecycle
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 27 Jul 2018 23:23:13 +0000 (16:23 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:43 +0000 (00:10 -0800)
So that events expire after X days

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 1e82c3bae5d99f6188b77aad7886e39cfc0d1c86..3a0de9f570d246edf7aa0e27084e06daf16e2b8d 100644 (file)
@@ -15,6 +15,8 @@
 #define dout_subsys ceph_subsys_rgw
 
 
+#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
+
 /*
 
 config:
@@ -25,6 +27,8 @@ config:
    "data_bucket_prefix": <prefix>  # default: "pubsub-"
    "data_oid_prefix": <prefix>     #
 
+   "events_retention_days": <days> # default: 7
+
     # non-dynamic config
     "notifications": [
         {
@@ -175,6 +179,8 @@ struct PSConfig {
   string data_bucket_prefix;
   string data_oid_prefix;
 
+  int events_retention_days{0};
+
   uint64_t sync_instance{0};
   uint64_t max_id{0};
 
@@ -188,6 +194,7 @@ struct PSConfig {
     encode_json("user", user, f);
     encode_json("data_bucket_prefix", data_bucket_prefix, f);
     encode_json("data_oid_prefix", data_bucket_prefix, f);
+    encode_json("events_retention_days", events_retention_days, f);
     encode_json("sync_instance", sync_instance, f);
     encode_json("max_id", max_id, f);
     {
@@ -227,6 +234,7 @@ struct PSConfig {
     user = rgw_user(config["tenant"], uid);
     data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
     data_oid_prefix = config["data_oid_prefix"];
+    events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
 
     for (auto& c : config["notifications"].array()) {
       PSNotificationConfig nc;
@@ -543,6 +551,71 @@ class PSSubscription {
   class InitCR;
   InitCR *init_cr{nullptr};
 
+  class InitBucketLifecycleCR : public RGWCoroutine {
+    RGWDataSyncEnv *sync_env;
+    PSConfigRef& conf;
+    LCRule rule;
+
+    int retention_days;
+
+    rgw_bucket_lifecycle_config_params lc_config;
+  public:
+    InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
+           PSConfigRef& _conf,
+           RGWBucketInfo& _bucket_info,
+           map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct),
+                                                     sync_env(_sync_env),
+                                                     conf(_conf) {
+      lc_config.bucket_info = _bucket_info;
+      lc_config.bucket_attrs = _bucket_attrs;
+      retention_days = conf->events_retention_days;
+    }
+
+    int operate() override {
+      reenter(this) {
+
+        rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
+
+        {
+          /* maybe we already have it configured? */
+          RGWLifecycleConfiguration old_config;
+          auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
+          if (aiter != lc_config.bucket_attrs.end()) {
+            bufferlist::const_iterator iter{&aiter->second};
+            try {
+              old_config.decode(iter);
+            } catch (const buffer::error& e) {
+              ldout(cct, 0) << __func__ <<  "(): decode life cycle config failed" << dendl;
+            }
+          }
+
+          auto old_rules = old_config.get_rule_map();
+          for (auto ori : old_rules) {
+            auto& old_rule = ori.second;
+
+            if (old_rule.get_prefix().empty() && 
+                old_rule.get_expiration().get_days() == retention_days &&
+                old_rule.is_enabled()) {
+              ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl;
+              return set_cr_done();
+            }
+          }
+        }
+
+        lc_config.config.add_rule(&rule);
+        yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
+                                                  sync_env->store,
+                                                  lc_config));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
+          return set_cr_error(retcode);
+        }
+
+        return set_cr_done();
+      }
+      return 0;
+    }
+  };
   class InitCR : public RGWSingletonCR<bool> {
     RGWDataSyncEnv *sync_env;
     PSSubscriptionRef sub;
@@ -575,13 +648,23 @@ class PSSubscription {
               << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
           }
           if (retcode == 0) {
-            auto& result = sub->get_bucket_info_result;
-            sub->bucket_info = &result->bucket_info;
+            {
+              auto& result = sub->get_bucket_info_result;
+              sub->bucket_info = &result->bucket_info;
+
+              int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
+              if (ret < 0) {
+                ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
+                return set_cr_error(ret);
+              }
+            }
 
-            int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
-            if (ret < 0) {
-              ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
-              return set_cr_error(ret);
+            yield call(new InitBucketLifecycleCR(sync_env, conf,
+                                                 sub->get_bucket_info_result->bucket_info,
+                                                 sub->get_bucket_info_result->attrs));
+            if (retcode < 0) {
+              ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
+              return set_cr_error(retcode);
             }
             return set_cr_done();
           }