From 744ded1928dc2776c1ffe287c49bbe3c092025ce Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 7 Feb 2019 16:17:29 -0800 Subject: [PATCH] rgw: object expirer: move code around move code out of RGWRados, refactor a bit to use rados svc. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_object_expirer_core.cc | 142 +++++++++++++++++++++++++++-- src/rgw/rgw_object_expirer_core.h | 48 +++++++++- src/rgw/rgw_rados.cc | 116 +---------------------- src/rgw/rgw_rados.h | 25 ----- src/rgw/services/svc_rados.h | 8 ++ 5 files changed, 193 insertions(+), 146 deletions(-) diff --git a/src/rgw/rgw_object_expirer_core.cc b/src/rgw/rgw_object_expirer_core.cc index 93d240f6e84..9a7447e95b5 100644 --- a/src/rgw/rgw_object_expirer_core.cc +++ b/src/rgw/rgw_object_expirer_core.cc @@ -30,16 +30,144 @@ #include "rgw_formats.h" #include "rgw_usage.h" #include "rgw_object_expirer_core.h" +#include "rgw_zone.h" +#include "services/svc_rados.h" +#include "services/svc_zone.h" #include "services/svc_sys_obj.h" #include "cls/lock/cls_lock_client.h" +#include "cls/timeindex/cls_timeindex_client.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw static string objexp_lock_name = "gc_process"; +static string objexp_hint_get_shardname(int shard_num) +{ + char buf[64]; + snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num); + return buf; +} + +static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards) +{ + string obj_key = key.name + key.instance; + return rgw_bucket_shard_index(obj_key, num_shards); +} + +static string objexp_hint_get_keyext(const string& tenant_name, + const string& bucket_name, + const string& bucket_id, + const rgw_obj_key& obj_key) { + return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id + + ":" + obj_key.name + ":" + obj_key.instance; +} + +static void objexp_get_shard(int shard_num, + string *shard) +{ + *shard = objexp_hint_get_shardname(shard_num); +} + +static int objexp_hint_parse(CephContext *cct, cls_timeindex_entry &ti_entry, + objexp_hint_entry *hint_entry) +{ + try { + auto iter = ti_entry.value.cbegin(); + decode(*hint_entry, iter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl; + } + + return 0; +} + +int RGWObjExpStore::objexp_hint_add(const ceph::real_time& delete_at, + const string& tenant_name, + const string& bucket_name, + const string& bucket_id, + const rgw_obj_index_key& obj_key) +{ + const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name, + bucket_id, obj_key); + objexp_hint_entry he = { + .tenant = tenant_name, + .bucket_name = bucket_name, + .bucket_id = bucket_id, + .obj_key = obj_key, + .exp_time = delete_at }; + bufferlist hebl; + encode(he, hebl); + librados::ObjectWriteOperation op; + cls_timeindex_add(op, utime_t(delete_at), keyext, hebl); + + string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards)); + auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, shard_name)); + int r = obj.open(); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl; + return r; + } + return obj.operate(&op, null_yield); +} + +int RGWObjExpStore::objexp_hint_list(const string& oid, + const ceph::real_time& start_time, + const ceph::real_time& end_time, + const int max_entries, + const string& marker, + list& entries, /* out */ + string *out_marker, /* out */ + bool *truncated) /* out */ +{ + librados::ObjectReadOperation op; + cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries, + out_marker, truncated); + + auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid)); + int r = obj.open(); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl; + return r; + } + bufferlist obl; + int ret = obj.operate(&op, &obl, null_yield); + + if ((ret < 0 ) && (ret != -ENOENT)) { + return ret; + } + + if ((ret == -ENOENT) && truncated) { + *truncated = false; + } + + return 0; +} + +int RGWObjExpStore::objexp_hint_trim(const string& oid, + const ceph::real_time& start_time, + const ceph::real_time& end_time, + const string& from_marker, + const string& to_marker) +{ + auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid)); + int r = obj.open(); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl; + return r; + } + auto& ref = obj.get_ref(); + int ret = cls_timeindex_trim(ref.ioctx, ref.obj.oid, utime_t(start_time), utime_t(end_time), + from_marker, to_marker); + if ((ret < 0 ) && (ret != -ENOENT)) { + return ret; + } + + return 0; +} + int RGWObjectExpirer::init_bucket_info(const string& tenant_name, const string& bucket_name, const string& bucket_id, @@ -106,7 +234,7 @@ void RGWObjectExpirer::garbage_chunk(list& entries, /* ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \ << " - " << iter->key_ext << dendl; - int ret = store->objexp_hint_parse(*iter, hint); + int ret = objexp_hint_parse(store->ctx(), *iter, &hint); if (ret < 0) { ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl; continue; @@ -139,8 +267,8 @@ void RGWObjectExpirer::trim_chunk(const string& shard, real_time rt_from = from.to_real_time(); real_time rt_to = to.to_real_time(); - int ret = store->objexp_hint_trim(shard, rt_from, rt_to, - from_marker, to_marker); + int ret = exp_store.objexp_hint_trim(shard, rt_from, rt_to, + from_marker, to_marker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl; } @@ -180,9 +308,9 @@ bool RGWObjectExpirer::process_single_shard(const string& shard, real_time rt_start = round_start.to_real_time(); list entries; - ret = store->objexp_hint_list(shard, rt_last, rt_start, - num_entries, marker, entries, - &out_marker, &truncated); + ret = exp_store.objexp_hint_list(shard, rt_last, rt_start, + num_entries, marker, entries, + &out_marker, &truncated); if (ret < 0) { ldout(cct, 10) << "cannot get removal hints from shard: " << shard << dendl; @@ -219,7 +347,7 @@ bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, for (int i = 0; i < num_shards; i++) { string shard; - store->objexp_get_shard(i, shard); + objexp_get_shard(i, &shard); ldout(store->ctx(), 20) << "processing shard = " << shard << dendl; diff --git a/src/rgw/rgw_object_expirer_core.h b/src/rgw/rgw_object_expirer_core.h index c3caff5cc51..a9a6d1dea11 100644 --- a/src/rgw/rgw_object_expirer_core.h +++ b/src/rgw/rgw_object_expirer_core.h @@ -37,9 +37,44 @@ #include "rgw_formats.h" #include "rgw_usage.h" +class RGWSI_RADOS; +class RGWSI_Zone; + +class RGWObjExpStore { + CephContext *cct; + RGWSI_RADOS *rados_svc; + RGWSI_Zone *zone_svc; +public: + RGWObjExpStore(CephContext *_cct, RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc) : cct(_cct), + rados_svc(_rados_svc), + zone_svc(_zone_svc) {} + + int objexp_hint_add(const ceph::real_time& delete_at, + const string& tenant_name, + const string& bucket_name, + const string& bucket_id, + const rgw_obj_index_key& obj_key); + + int objexp_hint_list(const string& oid, + const ceph::real_time& start_time, + const ceph::real_time& end_time, + const int max_entries, + const string& marker, + list& entries, /* out */ + string *out_marker, /* out */ + bool *truncated); /* out */ + + int objexp_hint_trim(const string& oid, + const ceph::real_time& start_time, + const ceph::real_time& end_time, + const string& from_marker, + const string& to_marker); +}; + class RGWObjectExpirer { protected: RGWRados *store; + RGWObjExpStore exp_store; int init_bucket_info(const std::string& tenant_name, const std::string& bucket_name, @@ -69,12 +104,23 @@ protected: public: explicit RGWObjectExpirer(RGWRados *_store) - : store(_store), worker(NULL) { + : store(_store), + exp_store(_store->ctx(), _store->svc.rados, _store->svc.zone), + worker(NULL) { } ~RGWObjectExpirer() { stop_processor(); } + int hint_add(const ceph::real_time& delete_at, + const string& tenant_name, + const string& bucket_name, + const string& bucket_id, + const rgw_obj_index_key& obj_key) { + return exp_store.objexp_hint_add(delete_at, tenant_name, bucket_name, + bucket_id, obj_key); + } + int garbage_single_object(objexp_hint_entry& hint); void garbage_chunk(std::list& entries, /* in */ diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 14126ff3309..5aa04df32d3 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -37,7 +37,6 @@ #include "cls/refcount/cls_refcount_client.h" #include "cls/version/cls_version_client.h" #include "cls/log/cls_log_client.h" -#include "cls/timeindex/cls_timeindex_client.h" #include "cls/lock/cls_lock_client.h" #include "cls/user/cls_user_client.h" #include "cls/otp/cls_otp_client.h" @@ -2180,115 +2179,6 @@ int RGWRados::time_log_trim(const string& oid, const real_time& start_time, cons return r; } -string RGWRados::objexp_hint_get_shardname(int shard_num) -{ - char buf[32]; - snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); - - string objname("obj_delete_at_hint."); - return objname + buf; -} - -int RGWRados::objexp_key_shard(const rgw_obj_index_key& key) -{ - string obj_key = key.name + key.instance; - int num_shards = cct->_conf->rgw_objexp_hints_num_shards; - return rgw_bucket_shard_index(obj_key, num_shards); -} - -static string objexp_hint_get_keyext(const string& tenant_name, - const string& bucket_name, - const string& bucket_id, - const rgw_obj_key& obj_key) -{ - return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id + - ":" + obj_key.name + ":" + obj_key.instance; -} - -int RGWRados::objexp_hint_add(const ceph::real_time& delete_at, - const string& tenant_name, - const string& bucket_name, - const string& bucket_id, - const rgw_obj_index_key& obj_key) -{ - const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name, - bucket_id, obj_key); - objexp_hint_entry he = { - .tenant = tenant_name, - .bucket_name = bucket_name, - .bucket_id = bucket_id, - .obj_key = obj_key, - .exp_time = delete_at }; - bufferlist hebl; - encode(he, hebl); - ObjectWriteOperation op; - cls_timeindex_add(op, utime_t(delete_at), keyext, hebl); - - string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key)); - return objexp_pool_ctx.operate(shard_name, &op); -} - -void RGWRados::objexp_get_shard(int shard_num, - string& shard) /* out */ -{ - shard = objexp_hint_get_shardname(shard_num); -} - -int RGWRados::objexp_hint_list(const string& oid, - const ceph::real_time& start_time, - const ceph::real_time& end_time, - const int max_entries, - const string& marker, - list& entries, /* out */ - string *out_marker, /* out */ - bool *truncated) /* out */ -{ - librados::ObjectReadOperation op; - cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries, - out_marker, truncated); - - bufferlist obl; - int ret = objexp_pool_ctx.operate(oid, &op, &obl); - - if ((ret < 0 ) && (ret != -ENOENT)) { - return ret; - } - - if ((ret == -ENOENT) && truncated) { - *truncated = false; - } - - return 0; -} - -int RGWRados::objexp_hint_parse(cls_timeindex_entry &ti_entry, /* in */ - objexp_hint_entry& hint_entry) /* out */ -{ - try { - auto iter = ti_entry.value.cbegin(); - decode(hint_entry, iter); - } catch (buffer::error& err) { - ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl; - } - - return 0; -} - -int RGWRados::objexp_hint_trim(const string& oid, - const ceph::real_time& start_time, - const ceph::real_time& end_time, - const string& from_marker, - const string& to_marker) -{ - int ret = cls_timeindex_trim(objexp_pool_ctx, oid, utime_t(start_time), utime_t(end_time), - from_marker, to_marker); - if ((ret < 0 ) && (ret != -ENOENT)) { - return ret; - } - - return 0; -} - int RGWRados::lock_exclusive(const rgw_pool& pool, const string& oid, timespan& duration, string& zone_id, string& owner_id) { librados::IoCtx io_ctx; @@ -3755,8 +3645,8 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si rgw_obj_index_key obj_key; obj.key.get_index_key(&obj_key); - r = store->objexp_hint_add(meta.delete_at, - obj.bucket.tenant, obj.bucket.name, obj.bucket.bucket_id, obj_key); + r = store->obj_expirer->hint_add(meta.delete_at, obj.bucket.tenant, obj.bucket.name, + obj.bucket.bucket_id, obj_key); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: objexp_hint_add() returned r=" << r << ", object will not get removed" << dendl; /* ignoring error, nothing we can do at this point */ @@ -6256,7 +6146,7 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& sr rgw_obj_index_key obj_key; obj.key.get_index_key(&obj_key); - objexp_hint_add(ts, bucket.tenant, bucket.name, bucket.bucket_id, obj_key); + obj_expirer->hint_add(ts, bucket.tenant, bucket.name, bucket.bucket_id, obj_key); } catch (buffer::error& err) { ldout(cct, 0) << "ERROR: failed to decode " RGW_ATTR_DELETE_AT << " attr" << dendl; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 0c774dc247a..e005c27ba46 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2272,31 +2272,6 @@ public: const string& from_marker, const string& to_marker, librados::AioCompletion *completion = nullptr); - string objexp_hint_get_shardname(int shard_num); - int objexp_key_shard(const rgw_obj_index_key& key); - void objexp_get_shard(int shard_num, - string& shard); /* out */ - int objexp_hint_add(const ceph::real_time& delete_at, - const string& tenant_name, - const string& bucket_name, - const string& bucket_id, - const rgw_obj_index_key& obj_key); - int objexp_hint_list(const string& oid, - const ceph::real_time& start_time, - const ceph::real_time& end_time, - const int max_entries, - const string& marker, - list& entries, /* out */ - string *out_marker, /* out */ - bool *truncated); /* out */ - int objexp_hint_parse(cls_timeindex_entry &ti_entry, - objexp_hint_entry& hint_entry); /* out */ - int objexp_hint_trim(const string& oid, - const ceph::real_time& start_time, - const ceph::real_time& end_time, - const string& from_marker = std::string(), - const string& to_marker = std::string()); - int lock_exclusive(const rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id); int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id); diff --git a/src/rgw/services/svc_rados.h b/src/rgw/services/svc_rados.h index ae9c2dcf91b..c3ae766d3a6 100644 --- a/src/rgw/services/svc_rados.h +++ b/src/rgw/services/svc_rados.h @@ -90,6 +90,10 @@ public: rgw_rados_ref& get_ref() { return ref; } const rgw_rados_ref& get_ref() const { return ref; } + + const rgw_raw_obj& get_raw_obj() const { + return ref.obj; + } }; class Pool { @@ -174,3 +178,7 @@ public: friend Pool; friend Pool::List; }; + +inline ostream& operator<<(ostream& out, const RGWSI_RADOS::Obj& obj) { + return out << obj.get_raw_obj(); +} -- 2.39.5