]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add support for object expiration in rgw_rados.cc.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Thu, 21 May 2015 15:18:53 +0000 (17:18 +0200)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 27 Aug 2015 17:39:33 +0000 (10:39 -0700)
Fixes: #4099
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/common/config_opts.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index d30e15e3428a944daae1d99ff0e7deaa29284c02..f38ffd4864af9884dafcab81112179e26dcc35f4 100644 (file)
@@ -1085,6 +1085,9 @@ OPTION(rgw_multipart_part_upload_limit, OPT_INT, 10000) // parts limit in multip
 
 OPTION(rgw_olh_pending_timeout_sec, OPT_INT, 3600) // time until we retire a pending olh change
 
+OPTION(rgw_objexp_time_step_exp, OPT_U32, 12) // exponent value (2 is the base) for rounding the timestamps
+OPTION(rgw_objexp_hints_num_shards, OPT_U32, 127) // maximum number of parts in which the hint index is stored in
+
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter
 
index c23a2db3ee7f086cbd548049838c081f20a3165e..19bc923b1986c111f050e1971854493516c104c4 100644 (file)
@@ -27,6 +27,7 @@
 #include "cls/version/cls_version_client.h"
 #include "cls/log/cls_log_client.h"
 #include "cls/statelog/cls_statelog_client.h"
+#include "cls/timeindex/cls_timeindex_client.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/user/cls_user_client.h"
 
@@ -2313,6 +2314,168 @@ int RGWRados::time_log_trim(const string& oid, const utime_t& start_time, const
   return cls_log_trim(io_ctx, oid, start_time, end_time, from_marker, to_marker);
 }
 
+string RGWRados::objexp_hint_get_shardname(const utime_t &ts)
+{
+  const time_t roundedts = ts.sec() >> cct->_conf->rgw_objexp_time_step_exp;
+  const unsigned int shnum = roundedts % cct->_conf->rgw_objexp_hints_num_shards;
+
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%010u", shnum);
+
+  string objname("time_index_hint.");
+  return objname + buf;
+}
+
+static string objexp_hint_get_keyext(const string& bucket_name,
+                                     const string& bucket_id,
+                                     const rgw_obj_key& obj_key)
+{
+  return bucket_name + ":" + bucket_id + ":" + obj_key.name + ":" + obj_key.instance;
+}
+
+int RGWRados::objexp_hint_add(const utime_t& delete_at,
+                              const string& bucket_name,
+                              const string& bucket_id,
+                              const rgw_obj_key& obj_key)
+{
+  librados::IoCtx io_ctx;
+
+  const char * const log_pool = zone.log_pool.name.c_str();
+  int r = rados->ioctx_create(log_pool, io_ctx);
+  if (r == -ENOENT) {
+    rgw_bucket pool(log_pool);
+    r = create_pool(pool);
+    if (r < 0) {
+      return r;
+    } else {
+      /* retry */
+      r = rados->ioctx_create(log_pool, io_ctx);
+    }
+  }
+  if (r < 0) {
+    return r;
+  }
+
+  const string keyext = objexp_hint_get_keyext(bucket_name,
+          bucket_id, obj_key);
+  objexp_hint_entry he = {
+      .bucket_name = bucket_name,
+      .bucket_id = bucket_id,
+      .obj_key = obj_key,
+      .exp_time = delete_at };
+  bufferlist hebl;
+  ::encode(he, hebl);
+  ObjectWriteOperation op;
+  cls_timeindex_add(op, delete_at, keyext, hebl);
+
+  string shard_name = objexp_hint_get_shardname(delete_at);
+  r = io_ctx.operate(shard_name, &op);
+  return r;
+}
+
+void  RGWRados::objexp_get_shard(const utime_t& start_time,
+                                 const utime_t& end_time,
+                                 utime_t &marker,                     /* in/out */
+                                 string& shard,                       /* out */
+                                 bool& truncated)                     /* out */
+{
+  if (marker.is_zero()) {
+    marker = start_time;
+  }
+
+  const uint32_t time_step_exp = cct->_conf->rgw_objexp_time_step_exp;
+  const uint32_t num_shards = cct->_conf->rgw_objexp_hints_num_shards;
+  const time_t time_step = 1 << time_step_exp;
+
+  const time_t sts = start_time.sec() >> time_step_exp;
+  const time_t ets = end_time.sec() >> time_step_exp;
+  const time_t mts = marker.sec() >> time_step_exp;
+
+  const uint32_t periods = (ets - sts) / time_step;
+  const uint32_t iters = min(periods, num_shards);
+
+  shard = objexp_hint_get_shardname(marker);
+
+  if (mts % num_shards < (sts + iters) % num_shards) {
+    truncated = true;
+    marker += utime_t(time_step, 0);
+  } else {
+    truncated = false;
+  }
+
+  return;
+}
+
+int RGWRados::objexp_hint_list(const string& oid,
+                               const utime_t& start_time,
+                               const utime_t& end_time,
+                               const int max_entries,
+                               const string& marker,
+                               list<cls_timeindex_entry>& entries, /* out */
+                               string *out_marker,                 /* out */
+                               bool *truncated)                    /* out */
+{
+  librados::IoCtx io_ctx;
+
+  const char * const log_pool = zone.log_pool.name.c_str();
+  int ret = rados->ioctx_create(log_pool, io_ctx);
+  if (ret < 0) {
+    return ret;
+  }
+
+  librados::ObjectReadOperation op;
+  cls_timeindex_list(op, start_time, end_time, marker, max_entries, entries,
+              out_marker, truncated);
+
+  bufferlist obl;
+  ret = io_ctx.operate(oid, &op, &obl);
+
+  if ((ret < 0 ) && (ret != -ENOENT)) {
+    return ret;
+  }
+
+  if (ret == -ENOENT && truncated) {
+    *truncated = false;
+  }
+
+  return 0;
+}
+
+int RGWRados::objexp_hint_parse(cls_timeindex_entry &ti_entry,  /* in */
+                                objexp_hint_entry& hint_entry)  /* out */
+{
+  try {
+    bufferlist::iterator iter = ti_entry.value.begin();
+    ::decode(hint_entry, iter);
+  } catch (buffer::error& err) {
+    ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
+  }
+
+  return 0;
+}
+
+int RGWRados::objexp_hint_trim(const string& oid,
+                               const utime_t& start_time,
+                               const utime_t& end_time,
+                               const string& from_marker,
+                               const string& to_marker)
+{
+  librados::IoCtx io_ctx;
+
+  const char * const log_pool = zone.log_pool.name.c_str();
+  int ret = rados->ioctx_create(log_pool, io_ctx);
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = cls_timeindex_trim(io_ctx, oid, start_time, end_time,
+          from_marker, to_marker);
+  if ((ret < 0 ) && (ret != -ENOENT)) {
+    return ret;
+  }
+
+  return 0;
+}
 
 int RGWRados::lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, 
                              string& zone_id, string& owner_id) {
@@ -4739,6 +4902,24 @@ int RGWRados::Object::Delete::delete_obj()
 
   uint64_t obj_size = state->size;
 
+  if (!params.expiration_time.is_zero()) {
+    bufferlist bl;
+    utime_t delete_at;
+
+    if (state->get_attr(RGW_ATTR_DELETE_AT, bl)) {
+      try {
+        bufferlist::iterator iter = bl.begin();
+        ::decode(delete_at, iter);
+      } catch (buffer::error& err) {
+        dout(5) << "ERROR: couldn't decode RGW_ATTR_DELETE_AT" << dendl;
+      }
+
+      if (params.expiration_time != delete_at) {
+        return -ERR_PRECONDITION_FAILED;
+      }
+    }
+  }
+
   ObjectWriteOperation op;
 
   r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true);
@@ -4799,8 +4980,12 @@ int RGWRados::Object::Delete::delete_obj()
   return 0;
 }
 
-int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj,
-                         int versioning_status, uint16_t bilog_flags)
+int RGWRados::delete_obj(RGWObjectCtx& obj_ctx,
+                         RGWBucketInfo& bucket_info,
+                         rgw_obj& obj,
+                         int versioning_status,
+                         uint16_t bilog_flags,
+                         const utime_t& expiration_time)
 {
   RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
   RGWRados::Object::Delete del_op(&del_target);
@@ -4808,6 +4993,7 @@ int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_
   del_op.params.bucket_owner = bucket_info.owner;
   del_op.params.versioning_status = versioning_status;
   del_op.params.bilog_flags = bilog_flags;
+  del_op.params.expiration_time = expiration_time;
 
   return del_op.delete_obj();
 }
@@ -5316,6 +5502,16 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj,
       continue;
 
     op.setxattr(name.c_str(), bl);
+
+    if (name.compare(RGW_ATTR_DELETE_AT) == 0) {
+      utime_t ts;
+      ::decode(ts, bl);
+
+      rgw_obj_key obj_key;
+      obj.get_index_key(&obj_key);
+
+      objexp_hint_add(ts, bucket.name, bucket.bucket_id, obj_key);
+    }
   }
 
   if (!op.size())
index c69ca9c929911b271667f863397118b9804e1b4f..0ee187409d592329e8cb0e706ae1b56ca3b8acb9 100644 (file)
@@ -13,6 +13,7 @@
 #include "cls/version/cls_version_types.h"
 #include "cls/log/cls_log_types.h"
 #include "cls/statelog/cls_statelog_types.h"
+#include "cls/timeindex/cls_timeindex_types.h"
 #include "rgw_log.h"
 #include "rgw_metadata.h"
 #include "rgw_rest_conn.h"
@@ -993,6 +994,32 @@ struct RGWRegionMap {
 };
 WRITE_CLASS_ENCODER(RGWRegionMap)
 
+struct objexp_hint_entry {
+  string bucket_name;
+  string bucket_id;
+  rgw_obj_key obj_key;
+  utime_t exp_time;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(bucket_name, bl);
+    ::encode(bucket_id, bl);
+    ::encode(obj_key, bl);
+    ::encode(exp_time, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(bucket_name, bl);
+    ::decode(bucket_id, bl);
+    ::decode(obj_key, bl);
+    ::decode(exp_time, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(objexp_hint_entry)
+
 class RGWDataChangesLog;
 class RGWReplicaLogger;
   
@@ -1597,6 +1624,7 @@ public:
         string marker_version_id;
         uint32_t bilog_flags;
         list<rgw_obj_key> *remove_objs;
+        utime_t expiration_time;
 
         DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0), remove_objs(NULL) {}
       } params;
@@ -1862,8 +1890,12 @@ public:
   int bucket_suspended(rgw_bucket& bucket, bool *suspended);
 
   /** Delete an object.*/
-  virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj,
-                         int versioning_status, uint16_t bilog_flags = 0);
+  virtual int delete_obj(RGWObjectCtx& obj_ctx,
+                         RGWBucketInfo& bucket_owner,
+                         rgw_obj& src_obj,
+                         int versioning_status,
+                         uint16_t bilog_flags = 0,
+                         const utime_t& expiration_time = utime_t());
 
   /* Delete a system object */
   virtual int delete_system_obj(rgw_obj& src_obj, RGWObjVersionTracker *objv_tracker = NULL);
@@ -2071,6 +2103,33 @@ public:
   int time_log_info(const string& oid, cls_log_header *header);
   int time_log_trim(const string& oid, const utime_t& start_time, const utime_t& end_time,
                     const string& from_marker, const string& to_marker);
+
+  string objexp_hint_get_shardname(const utime_t &ts);
+  void objexp_get_shard(const utime_t& start_time,
+                        const utime_t& end_time,
+                        utime_t &marker,                     /* out */
+                        string& shard,                       /* out */
+                        bool& truncated);                    /* out */
+  int objexp_hint_add(const utime_t& delete_at,
+                      const string& bucket_name,
+                      const string& bucket_id,
+                      const rgw_obj_key& obj_key);
+  int objexp_hint_list(const string& oid,
+                       const utime_t& start_time,
+                       const utime_t& end_time,
+                       const int max_entries,
+                       const string& marker,
+                       list<cls_timeindex_entry>& entries, /* out */
+                       string *out_marker,                 /* out */
+                       bool *truncated);                   /* out */
+  int objexp_hint_parse(cls_timeindex_entry &ti_entry,
+                        objexp_hint_entry& hint_entry);    /* out */
+  int objexp_hint_trim(const string& oid,
+                       const utime_t& start_time,
+                       const utime_t& end_time,
+                       const string& from_marker = std::string(),
+                       const string& to_marker   = std::string());
+
   int lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& zone_id, string& owner_id);
   int unlock(rgw_bucket& pool, const string& oid, string& zone_id, string& owner_id);