From: Radoslaw Zarzynski Date: Thu, 21 May 2015 15:18:53 +0000 (+0200) Subject: rgw: add support for object expiration in rgw_rados.cc. X-Git-Tag: v9.1.0~229^2~20 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0d792c9dcc1fae8029f07a59e63dedb34eafcc6d;p=ceph.git rgw: add support for object expiration in rgw_rados.cc. Fixes: #4099 Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index d30e15e3428..f38ffd4864a 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1085,6 +1085,9 @@ OPTION(rgw_multipart_part_upload_limit, OPT_INT, 10000) // parts limit in multip OPTION(rgw_olh_pending_timeout_sec, OPT_INT, 3600) // time until we retire a pending olh change +OPTION(rgw_objexp_time_step_exp, OPT_U32, 12) // exponent value (2 is the base) for rounding the timestamps +OPTION(rgw_objexp_hints_num_shards, OPT_U32, 127) // maximum number of parts in which the hint index is stored in + OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index c23a2db3ee7..19bc923b198 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -27,6 +27,7 @@ #include "cls/version/cls_version_client.h" #include "cls/log/cls_log_client.h" #include "cls/statelog/cls_statelog_client.h" +#include "cls/timeindex/cls_timeindex_client.h" #include "cls/lock/cls_lock_client.h" #include "cls/user/cls_user_client.h" @@ -2313,6 +2314,168 @@ int RGWRados::time_log_trim(const string& oid, const utime_t& start_time, const return cls_log_trim(io_ctx, oid, start_time, end_time, from_marker, to_marker); } +string RGWRados::objexp_hint_get_shardname(const utime_t &ts) +{ + const time_t roundedts = ts.sec() >> cct->_conf->rgw_objexp_time_step_exp; + const unsigned int shnum = roundedts % cct->_conf->rgw_objexp_hints_num_shards; + + char buf[32]; + snprintf(buf, sizeof(buf), "%010u", shnum); + + string objname("time_index_hint."); + return objname + buf; +} + +static string objexp_hint_get_keyext(const string& bucket_name, + const string& bucket_id, + const rgw_obj_key& obj_key) +{ + return bucket_name + ":" + bucket_id + ":" + obj_key.name + ":" + obj_key.instance; +} + +int RGWRados::objexp_hint_add(const utime_t& delete_at, + const string& bucket_name, + const string& bucket_id, + const rgw_obj_key& obj_key) +{ + librados::IoCtx io_ctx; + + const char * const log_pool = zone.log_pool.name.c_str(); + int r = rados->ioctx_create(log_pool, io_ctx); + if (r == -ENOENT) { + rgw_bucket pool(log_pool); + r = create_pool(pool); + if (r < 0) { + return r; + } else { + /* retry */ + r = rados->ioctx_create(log_pool, io_ctx); + } + } + if (r < 0) { + return r; + } + + const string keyext = objexp_hint_get_keyext(bucket_name, + bucket_id, obj_key); + objexp_hint_entry he = { + .bucket_name = bucket_name, + .bucket_id = bucket_id, + .obj_key = obj_key, + .exp_time = delete_at }; + bufferlist hebl; + ::encode(he, hebl); + ObjectWriteOperation op; + cls_timeindex_add(op, delete_at, keyext, hebl); + + string shard_name = objexp_hint_get_shardname(delete_at); + r = io_ctx.operate(shard_name, &op); + return r; +} + +void RGWRados::objexp_get_shard(const utime_t& start_time, + const utime_t& end_time, + utime_t &marker, /* in/out */ + string& shard, /* out */ + bool& truncated) /* out */ +{ + if (marker.is_zero()) { + marker = start_time; + } + + const uint32_t time_step_exp = cct->_conf->rgw_objexp_time_step_exp; + const uint32_t num_shards = cct->_conf->rgw_objexp_hints_num_shards; + const time_t time_step = 1 << time_step_exp; + + const time_t sts = start_time.sec() >> time_step_exp; + const time_t ets = end_time.sec() >> time_step_exp; + const time_t mts = marker.sec() >> time_step_exp; + + const uint32_t periods = (ets - sts) / time_step; + const uint32_t iters = min(periods, num_shards); + + shard = objexp_hint_get_shardname(marker); + + if (mts % num_shards < (sts + iters) % num_shards) { + truncated = true; + marker += utime_t(time_step, 0); + } else { + truncated = false; + } + + return; +} + +int RGWRados::objexp_hint_list(const string& oid, + const utime_t& start_time, + const utime_t& end_time, + const int max_entries, + const string& marker, + list& entries, /* out */ + string *out_marker, /* out */ + bool *truncated) /* out */ +{ + librados::IoCtx io_ctx; + + const char * const log_pool = zone.log_pool.name.c_str(); + int ret = rados->ioctx_create(log_pool, io_ctx); + if (ret < 0) { + return ret; + } + + librados::ObjectReadOperation op; + cls_timeindex_list(op, start_time, end_time, marker, max_entries, entries, + out_marker, truncated); + + bufferlist obl; + ret = io_ctx.operate(oid, &op, &obl); + + if ((ret < 0 ) && (ret != -ENOENT)) { + return ret; + } + + if (ret == -ENOENT && truncated) { + *truncated = false; + } + + return 0; +} + +int RGWRados::objexp_hint_parse(cls_timeindex_entry &ti_entry, /* in */ + objexp_hint_entry& hint_entry) /* out */ +{ + try { + bufferlist::iterator iter = ti_entry.value.begin(); + ::decode(hint_entry, iter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl; + } + + return 0; +} + +int RGWRados::objexp_hint_trim(const string& oid, + const utime_t& start_time, + const utime_t& end_time, + const string& from_marker, + const string& to_marker) +{ + librados::IoCtx io_ctx; + + const char * const log_pool = zone.log_pool.name.c_str(); + int ret = rados->ioctx_create(log_pool, io_ctx); + if (ret < 0) { + return ret; + } + + ret = cls_timeindex_trim(io_ctx, oid, start_time, end_time, + from_marker, to_marker); + if ((ret < 0 ) && (ret != -ENOENT)) { + return ret; + } + + return 0; +} int RGWRados::lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& zone_id, string& owner_id) { @@ -4739,6 +4902,24 @@ int RGWRados::Object::Delete::delete_obj() uint64_t obj_size = state->size; + if (!params.expiration_time.is_zero()) { + bufferlist bl; + utime_t delete_at; + + if (state->get_attr(RGW_ATTR_DELETE_AT, bl)) { + try { + bufferlist::iterator iter = bl.begin(); + ::decode(delete_at, iter); + } catch (buffer::error& err) { + dout(5) << "ERROR: couldn't decode RGW_ATTR_DELETE_AT" << dendl; + } + + if (params.expiration_time != delete_at) { + return -ERR_PRECONDITION_FAILED; + } + } + } + ObjectWriteOperation op; r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true); @@ -4799,8 +4980,12 @@ int RGWRados::Object::Delete::delete_obj() return 0; } -int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj, - int versioning_status, uint16_t bilog_flags) +int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, + RGWBucketInfo& bucket_info, + rgw_obj& obj, + int versioning_status, + uint16_t bilog_flags, + const utime_t& expiration_time) { RGWRados::Object del_target(this, bucket_info, obj_ctx, obj); RGWRados::Object::Delete del_op(&del_target); @@ -4808,6 +4993,7 @@ int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_ del_op.params.bucket_owner = bucket_info.owner; del_op.params.versioning_status = versioning_status; del_op.params.bilog_flags = bilog_flags; + del_op.params.expiration_time = expiration_time; return del_op.delete_obj(); } @@ -5316,6 +5502,16 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj, continue; op.setxattr(name.c_str(), bl); + + if (name.compare(RGW_ATTR_DELETE_AT) == 0) { + utime_t ts; + ::decode(ts, bl); + + rgw_obj_key obj_key; + obj.get_index_key(&obj_key); + + objexp_hint_add(ts, bucket.name, bucket.bucket_id, obj_key); + } } if (!op.size()) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index c69ca9c9299..0ee187409d5 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -13,6 +13,7 @@ #include "cls/version/cls_version_types.h" #include "cls/log/cls_log_types.h" #include "cls/statelog/cls_statelog_types.h" +#include "cls/timeindex/cls_timeindex_types.h" #include "rgw_log.h" #include "rgw_metadata.h" #include "rgw_rest_conn.h" @@ -993,6 +994,32 @@ struct RGWRegionMap { }; WRITE_CLASS_ENCODER(RGWRegionMap) +struct objexp_hint_entry { + string bucket_name; + string bucket_id; + rgw_obj_key obj_key; + utime_t exp_time; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(bucket_name, bl); + ::encode(bucket_id, bl); + ::encode(obj_key, bl); + ::encode(exp_time, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(bucket_name, bl); + ::decode(bucket_id, bl); + ::decode(obj_key, bl); + ::decode(exp_time, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(objexp_hint_entry) + class RGWDataChangesLog; class RGWReplicaLogger; @@ -1597,6 +1624,7 @@ public: string marker_version_id; uint32_t bilog_flags; list *remove_objs; + utime_t expiration_time; DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0), remove_objs(NULL) {} } params; @@ -1862,8 +1890,12 @@ public: int bucket_suspended(rgw_bucket& bucket, bool *suspended); /** Delete an object.*/ - virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj, - int versioning_status, uint16_t bilog_flags = 0); + virtual int delete_obj(RGWObjectCtx& obj_ctx, + RGWBucketInfo& bucket_owner, + rgw_obj& src_obj, + int versioning_status, + uint16_t bilog_flags = 0, + const utime_t& expiration_time = utime_t()); /* Delete a system object */ virtual int delete_system_obj(rgw_obj& src_obj, RGWObjVersionTracker *objv_tracker = NULL); @@ -2071,6 +2103,33 @@ public: int time_log_info(const string& oid, cls_log_header *header); int time_log_trim(const string& oid, const utime_t& start_time, const utime_t& end_time, const string& from_marker, const string& to_marker); + + string objexp_hint_get_shardname(const utime_t &ts); + void objexp_get_shard(const utime_t& start_time, + const utime_t& end_time, + utime_t &marker, /* out */ + string& shard, /* out */ + bool& truncated); /* out */ + int objexp_hint_add(const utime_t& delete_at, + const string& bucket_name, + const string& bucket_id, + const rgw_obj_key& obj_key); + int objexp_hint_list(const string& oid, + const utime_t& start_time, + const utime_t& end_time, + const int max_entries, + const string& marker, + list& entries, /* out */ + string *out_marker, /* out */ + bool *truncated); /* out */ + int objexp_hint_parse(cls_timeindex_entry &ti_entry, + objexp_hint_entry& hint_entry); /* out */ + int objexp_hint_trim(const string& oid, + const utime_t& start_time, + const utime_t& end_time, + const string& from_marker = std::string(), + const string& to_marker = std::string()); + int lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& zone_id, string& owner_id); int unlock(rgw_bucket& pool, const string& oid, string& zone_id, string& owner_id);