#include "cls/version/cls_version_client.h"
#include "cls/log/cls_log_client.h"
#include "cls/statelog/cls_statelog_client.h"
+#include "cls/timeindex/cls_timeindex_client.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/user/cls_user_client.h"
return cls_log_trim(io_ctx, oid, start_time, end_time, from_marker, to_marker);
}
+string RGWRados::objexp_hint_get_shardname(const utime_t &ts)
+{
+ const time_t roundedts = ts.sec() >> cct->_conf->rgw_objexp_time_step_exp;
+ const unsigned int shnum = roundedts % cct->_conf->rgw_objexp_hints_num_shards;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%010u", shnum);
+
+ string objname("time_index_hint.");
+ return objname + buf;
+}
+
+static string objexp_hint_get_keyext(const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_key& obj_key)
+{
+ return bucket_name + ":" + bucket_id + ":" + obj_key.name + ":" + obj_key.instance;
+}
+
+int RGWRados::objexp_hint_add(const utime_t& delete_at,
+ const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_key& obj_key)
+{
+ librados::IoCtx io_ctx;
+
+ const char * const log_pool = zone.log_pool.name.c_str();
+ int r = rados->ioctx_create(log_pool, io_ctx);
+ if (r == -ENOENT) {
+ rgw_bucket pool(log_pool);
+ r = create_pool(pool);
+ if (r < 0) {
+ return r;
+ } else {
+ /* retry */
+ r = rados->ioctx_create(log_pool, io_ctx);
+ }
+ }
+ if (r < 0) {
+ return r;
+ }
+
+ const string keyext = objexp_hint_get_keyext(bucket_name,
+ bucket_id, obj_key);
+ objexp_hint_entry he = {
+ .bucket_name = bucket_name,
+ .bucket_id = bucket_id,
+ .obj_key = obj_key,
+ .exp_time = delete_at };
+ bufferlist hebl;
+ ::encode(he, hebl);
+ ObjectWriteOperation op;
+ cls_timeindex_add(op, delete_at, keyext, hebl);
+
+ string shard_name = objexp_hint_get_shardname(delete_at);
+ r = io_ctx.operate(shard_name, &op);
+ return r;
+}
+
+void RGWRados::objexp_get_shard(const utime_t& start_time,
+ const utime_t& end_time,
+ utime_t &marker, /* in/out */
+ string& shard, /* out */
+ bool& truncated) /* out */
+{
+ if (marker.is_zero()) {
+ marker = start_time;
+ }
+
+ const uint32_t time_step_exp = cct->_conf->rgw_objexp_time_step_exp;
+ const uint32_t num_shards = cct->_conf->rgw_objexp_hints_num_shards;
+ const time_t time_step = 1 << time_step_exp;
+
+ const time_t sts = start_time.sec() >> time_step_exp;
+ const time_t ets = end_time.sec() >> time_step_exp;
+ const time_t mts = marker.sec() >> time_step_exp;
+
+ const uint32_t periods = (ets - sts) / time_step;
+ const uint32_t iters = min(periods, num_shards);
+
+ shard = objexp_hint_get_shardname(marker);
+
+ if (mts % num_shards < (sts + iters) % num_shards) {
+ truncated = true;
+ marker += utime_t(time_step, 0);
+ } else {
+ truncated = false;
+ }
+
+ return;
+}
+
+int RGWRados::objexp_hint_list(const string& oid,
+ const utime_t& start_time,
+ const utime_t& end_time,
+ const int max_entries,
+ const string& marker,
+ list<cls_timeindex_entry>& entries, /* out */
+ string *out_marker, /* out */
+ bool *truncated) /* out */
+{
+ librados::IoCtx io_ctx;
+
+ const char * const log_pool = zone.log_pool.name.c_str();
+ int ret = rados->ioctx_create(log_pool, io_ctx);
+ if (ret < 0) {
+ return ret;
+ }
+
+ librados::ObjectReadOperation op;
+ cls_timeindex_list(op, start_time, end_time, marker, max_entries, entries,
+ out_marker, truncated);
+
+ bufferlist obl;
+ ret = io_ctx.operate(oid, &op, &obl);
+
+ if ((ret < 0 ) && (ret != -ENOENT)) {
+ return ret;
+ }
+
+ if (ret == -ENOENT && truncated) {
+ *truncated = false;
+ }
+
+ return 0;
+}
+
+int RGWRados::objexp_hint_parse(cls_timeindex_entry &ti_entry, /* in */
+ objexp_hint_entry& hint_entry) /* out */
+{
+ try {
+ bufferlist::iterator iter = ti_entry.value.begin();
+ ::decode(hint_entry, iter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
+ }
+
+ return 0;
+}
+
+int RGWRados::objexp_hint_trim(const string& oid,
+ const utime_t& start_time,
+ const utime_t& end_time,
+ const string& from_marker,
+ const string& to_marker)
+{
+ librados::IoCtx io_ctx;
+
+ const char * const log_pool = zone.log_pool.name.c_str();
+ int ret = rados->ioctx_create(log_pool, io_ctx);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = cls_timeindex_trim(io_ctx, oid, start_time, end_time,
+ from_marker, to_marker);
+ if ((ret < 0 ) && (ret != -ENOENT)) {
+ return ret;
+ }
+
+ return 0;
+}
int RGWRados::lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration,
string& zone_id, string& owner_id) {
uint64_t obj_size = state->size;
+ if (!params.expiration_time.is_zero()) {
+ bufferlist bl;
+ utime_t delete_at;
+
+ if (state->get_attr(RGW_ATTR_DELETE_AT, bl)) {
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(delete_at, iter);
+ } catch (buffer::error& err) {
+ dout(5) << "ERROR: couldn't decode RGW_ATTR_DELETE_AT" << dendl;
+ }
+
+ if (params.expiration_time != delete_at) {
+ return -ERR_PRECONDITION_FAILED;
+ }
+ }
+ }
+
ObjectWriteOperation op;
r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL, true);
return 0;
}
-int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj,
- int versioning_status, uint16_t bilog_flags)
+int RGWRados::delete_obj(RGWObjectCtx& obj_ctx,
+ RGWBucketInfo& bucket_info,
+ rgw_obj& obj,
+ int versioning_status,
+ uint16_t bilog_flags,
+ const utime_t& expiration_time)
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.bucket_owner = bucket_info.owner;
del_op.params.versioning_status = versioning_status;
del_op.params.bilog_flags = bilog_flags;
+ del_op.params.expiration_time = expiration_time;
return del_op.delete_obj();
}
continue;
op.setxattr(name.c_str(), bl);
+
+ if (name.compare(RGW_ATTR_DELETE_AT) == 0) {
+ utime_t ts;
+ ::decode(ts, bl);
+
+ rgw_obj_key obj_key;
+ obj.get_index_key(&obj_key);
+
+ objexp_hint_add(ts, bucket.name, bucket.bucket_id, obj_key);
+ }
}
if (!op.size())
#include "cls/version/cls_version_types.h"
#include "cls/log/cls_log_types.h"
#include "cls/statelog/cls_statelog_types.h"
+#include "cls/timeindex/cls_timeindex_types.h"
#include "rgw_log.h"
#include "rgw_metadata.h"
#include "rgw_rest_conn.h"
};
WRITE_CLASS_ENCODER(RGWRegionMap)
+struct objexp_hint_entry {
+ string bucket_name;
+ string bucket_id;
+ rgw_obj_key obj_key;
+ utime_t exp_time;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(bucket_name, bl);
+ ::encode(bucket_id, bl);
+ ::encode(obj_key, bl);
+ ::encode(exp_time, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(bucket_name, bl);
+ ::decode(bucket_id, bl);
+ ::decode(obj_key, bl);
+ ::decode(exp_time, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(objexp_hint_entry)
+
class RGWDataChangesLog;
class RGWReplicaLogger;
string marker_version_id;
uint32_t bilog_flags;
list<rgw_obj_key> *remove_objs;
+ utime_t expiration_time;
DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0), remove_objs(NULL) {}
} params;
int bucket_suspended(rgw_bucket& bucket, bool *suspended);
/** Delete an object.*/
- virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj,
- int versioning_status, uint16_t bilog_flags = 0);
+ virtual int delete_obj(RGWObjectCtx& obj_ctx,
+ RGWBucketInfo& bucket_owner,
+ rgw_obj& src_obj,
+ int versioning_status,
+ uint16_t bilog_flags = 0,
+ const utime_t& expiration_time = utime_t());
/* Delete a system object */
virtual int delete_system_obj(rgw_obj& src_obj, RGWObjVersionTracker *objv_tracker = NULL);
int time_log_info(const string& oid, cls_log_header *header);
int time_log_trim(const string& oid, const utime_t& start_time, const utime_t& end_time,
const string& from_marker, const string& to_marker);
+
+ string objexp_hint_get_shardname(const utime_t &ts);
+ void objexp_get_shard(const utime_t& start_time,
+ const utime_t& end_time,
+ utime_t &marker, /* out */
+ string& shard, /* out */
+ bool& truncated); /* out */
+ int objexp_hint_add(const utime_t& delete_at,
+ const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_key& obj_key);
+ int objexp_hint_list(const string& oid,
+ const utime_t& start_time,
+ const utime_t& end_time,
+ const int max_entries,
+ const string& marker,
+ list<cls_timeindex_entry>& entries, /* out */
+ string *out_marker, /* out */
+ bool *truncated); /* out */
+ int objexp_hint_parse(cls_timeindex_entry &ti_entry,
+ objexp_hint_entry& hint_entry); /* out */
+ int objexp_hint_trim(const string& oid,
+ const utime_t& start_time,
+ const utime_t& end_time,
+ const string& from_marker = std::string(),
+ const string& to_marker = std::string());
+
int lock_exclusive(rgw_bucket& pool, const string& oid, utime_t& duration, string& zone_id, string& owner_id);
int unlock(rgw_bucket& pool, const string& oid, string& zone_id, string& owner_id);