move code out of RGWRados, refactor a bit to use rados svc.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
#include "rgw_formats.h"
#include "rgw_usage.h"
#include "rgw_object_expirer_core.h"
+#include "rgw_zone.h"
+#include "services/svc_rados.h"
+#include "services/svc_zone.h"
#include "services/svc_sys_obj.h"
#include "cls/lock/cls_lock_client.h"
+#include "cls/timeindex/cls_timeindex_client.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
static string objexp_lock_name = "gc_process";
+static string objexp_hint_get_shardname(int shard_num)
+{
+ char buf[64];
+ snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num);
+ return buf;
+}
+
+static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards)
+{
+ string obj_key = key.name + key.instance;
+ return rgw_bucket_shard_index(obj_key, num_shards);
+}
+
+static string objexp_hint_get_keyext(const string& tenant_name,
+ const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_key& obj_key) {
+ return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
+ ":" + obj_key.name + ":" + obj_key.instance;
+}
+
+static void objexp_get_shard(int shard_num,
+ string *shard)
+{
+ *shard = objexp_hint_get_shardname(shard_num);
+}
+
+static int objexp_hint_parse(CephContext *cct, cls_timeindex_entry &ti_entry,
+ objexp_hint_entry *hint_entry)
+{
+ try {
+ auto iter = ti_entry.value.cbegin();
+ decode(*hint_entry, iter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
+ }
+
+ return 0;
+}
+
+int RGWObjExpStore::objexp_hint_add(const ceph::real_time& delete_at,
+ const string& tenant_name,
+ const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_index_key& obj_key)
+{
+ const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
+ bucket_id, obj_key);
+ objexp_hint_entry he = {
+ .tenant = tenant_name,
+ .bucket_name = bucket_name,
+ .bucket_id = bucket_id,
+ .obj_key = obj_key,
+ .exp_time = delete_at };
+ bufferlist hebl;
+ encode(he, hebl);
+ librados::ObjectWriteOperation op;
+ cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
+
+ string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards));
+ auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, shard_name));
+ int r = obj.open();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+ return r;
+ }
+ return obj.operate(&op, null_yield);
+}
+
+int RGWObjExpStore::objexp_hint_list(const string& oid,
+ const ceph::real_time& start_time,
+ const ceph::real_time& end_time,
+ const int max_entries,
+ const string& marker,
+ list<cls_timeindex_entry>& entries, /* out */
+ string *out_marker, /* out */
+ bool *truncated) /* out */
+{
+ librados::ObjectReadOperation op;
+ cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
+ out_marker, truncated);
+
+ auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid));
+ int r = obj.open();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+ return r;
+ }
+ bufferlist obl;
+ int ret = obj.operate(&op, &obl, null_yield);
+
+ if ((ret < 0 ) && (ret != -ENOENT)) {
+ return ret;
+ }
+
+ if ((ret == -ENOENT) && truncated) {
+ *truncated = false;
+ }
+
+ return 0;
+}
+
+int RGWObjExpStore::objexp_hint_trim(const string& oid,
+ const ceph::real_time& start_time,
+ const ceph::real_time& end_time,
+ const string& from_marker,
+ const string& to_marker)
+{
+ auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid));
+ int r = obj.open();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+ return r;
+ }
+ auto& ref = obj.get_ref();
+ int ret = cls_timeindex_trim(ref.ioctx, ref.obj.oid, utime_t(start_time), utime_t(end_time),
+ from_marker, to_marker);
+ if ((ret < 0 ) && (ret != -ENOENT)) {
+ return ret;
+ }
+
+ return 0;
+}
+
int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
const string& bucket_name,
const string& bucket_id,
ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
<< " - " << iter->key_ext << dendl;
- int ret = store->objexp_hint_parse(*iter, hint);
+ int ret = objexp_hint_parse(store->ctx(), *iter, &hint);
if (ret < 0) {
ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
continue;
real_time rt_from = from.to_real_time();
real_time rt_to = to.to_real_time();
- int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
- from_marker, to_marker);
+ int ret = exp_store.objexp_hint_trim(shard, rt_from, rt_to,
+ from_marker, to_marker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
}
real_time rt_start = round_start.to_real_time();
list<cls_timeindex_entry> entries;
- ret = store->objexp_hint_list(shard, rt_last, rt_start,
- num_entries, marker, entries,
- &out_marker, &truncated);
+ ret = exp_store.objexp_hint_list(shard, rt_last, rt_start,
+ num_entries, marker, entries,
+ &out_marker, &truncated);
if (ret < 0) {
ldout(cct, 10) << "cannot get removal hints from shard: " << shard
<< dendl;
for (int i = 0; i < num_shards; i++) {
string shard;
- store->objexp_get_shard(i, shard);
+ objexp_get_shard(i, &shard);
ldout(store->ctx(), 20) << "processing shard = " << shard << dendl;
#include "rgw_formats.h"
#include "rgw_usage.h"
+class RGWSI_RADOS;
+class RGWSI_Zone;
+
+class RGWObjExpStore {
+ CephContext *cct;
+ RGWSI_RADOS *rados_svc;
+ RGWSI_Zone *zone_svc;
+public:
+ RGWObjExpStore(CephContext *_cct, RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc) : cct(_cct),
+ rados_svc(_rados_svc),
+ zone_svc(_zone_svc) {}
+
+ int objexp_hint_add(const ceph::real_time& delete_at,
+ const string& tenant_name,
+ const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_index_key& obj_key);
+
+ int objexp_hint_list(const string& oid,
+ const ceph::real_time& start_time,
+ const ceph::real_time& end_time,
+ const int max_entries,
+ const string& marker,
+ list<cls_timeindex_entry>& entries, /* out */
+ string *out_marker, /* out */
+ bool *truncated); /* out */
+
+ int objexp_hint_trim(const string& oid,
+ const ceph::real_time& start_time,
+ const ceph::real_time& end_time,
+ const string& from_marker,
+ const string& to_marker);
+};
+
class RGWObjectExpirer {
protected:
RGWRados *store;
+ RGWObjExpStore exp_store;
int init_bucket_info(const std::string& tenant_name,
const std::string& bucket_name,
public:
explicit RGWObjectExpirer(RGWRados *_store)
- : store(_store), worker(NULL) {
+ : store(_store),
+ exp_store(_store->ctx(), _store->svc.rados, _store->svc.zone),
+ worker(NULL) {
}
~RGWObjectExpirer() {
stop_processor();
}
+ int hint_add(const ceph::real_time& delete_at,
+ const string& tenant_name,
+ const string& bucket_name,
+ const string& bucket_id,
+ const rgw_obj_index_key& obj_key) {
+ return exp_store.objexp_hint_add(delete_at, tenant_name, bucket_name,
+ bucket_id, obj_key);
+ }
+
int garbage_single_object(objexp_hint_entry& hint);
void garbage_chunk(std::list<cls_timeindex_entry>& entries, /* in */
#include "cls/refcount/cls_refcount_client.h"
#include "cls/version/cls_version_client.h"
#include "cls/log/cls_log_client.h"
-#include "cls/timeindex/cls_timeindex_client.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/user/cls_user_client.h"
#include "cls/otp/cls_otp_client.h"
return r;
}
-string RGWRados::objexp_hint_get_shardname(int shard_num)
-{
- char buf[32];
- snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
-
- string objname("obj_delete_at_hint.");
- return objname + buf;
-}
-
-int RGWRados::objexp_key_shard(const rgw_obj_index_key& key)
-{
- string obj_key = key.name + key.instance;
- int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
- return rgw_bucket_shard_index(obj_key, num_shards);
-}
-
-static string objexp_hint_get_keyext(const string& tenant_name,
- const string& bucket_name,
- const string& bucket_id,
- const rgw_obj_key& obj_key)
-{
- return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
- ":" + obj_key.name + ":" + obj_key.instance;
-}
-
-int RGWRados::objexp_hint_add(const ceph::real_time& delete_at,
- const string& tenant_name,
- const string& bucket_name,
- const string& bucket_id,
- const rgw_obj_index_key& obj_key)
-{
- const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
- bucket_id, obj_key);
- objexp_hint_entry he = {
- .tenant = tenant_name,
- .bucket_name = bucket_name,
- .bucket_id = bucket_id,
- .obj_key = obj_key,
- .exp_time = delete_at };
- bufferlist hebl;
- encode(he, hebl);
- ObjectWriteOperation op;
- cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
-
- string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key));
- return objexp_pool_ctx.operate(shard_name, &op);
-}
-
-void RGWRados::objexp_get_shard(int shard_num,
- string& shard) /* out */
-{
- shard = objexp_hint_get_shardname(shard_num);
-}
-
-int RGWRados::objexp_hint_list(const string& oid,
- const ceph::real_time& start_time,
- const ceph::real_time& end_time,
- const int max_entries,
- const string& marker,
- list<cls_timeindex_entry>& entries, /* out */
- string *out_marker, /* out */
- bool *truncated) /* out */
-{
- librados::ObjectReadOperation op;
- cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
- out_marker, truncated);
-
- bufferlist obl;
- int ret = objexp_pool_ctx.operate(oid, &op, &obl);
-
- if ((ret < 0 ) && (ret != -ENOENT)) {
- return ret;
- }
-
- if ((ret == -ENOENT) && truncated) {
- *truncated = false;
- }
-
- return 0;
-}
-
-int RGWRados::objexp_hint_parse(cls_timeindex_entry &ti_entry, /* in */
- objexp_hint_entry& hint_entry) /* out */
-{
- try {
- auto iter = ti_entry.value.cbegin();
- decode(hint_entry, iter);
- } catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
- }
-
- return 0;
-}
-
-int RGWRados::objexp_hint_trim(const string& oid,
- const ceph::real_time& start_time,
- const ceph::real_time& end_time,
- const string& from_marker,
- const string& to_marker)
-{
- int ret = cls_timeindex_trim(objexp_pool_ctx, oid, utime_t(start_time), utime_t(end_time),
- from_marker, to_marker);
- if ((ret < 0 ) && (ret != -ENOENT)) {
- return ret;
- }
-
- return 0;
-}
-
int RGWRados::lock_exclusive(const rgw_pool& pool, const string& oid, timespan& duration,
string& zone_id, string& owner_id) {
librados::IoCtx io_ctx;
rgw_obj_index_key obj_key;
obj.key.get_index_key(&obj_key);
- r = store->objexp_hint_add(meta.delete_at,
- obj.bucket.tenant, obj.bucket.name, obj.bucket.bucket_id, obj_key);
+ r = store->obj_expirer->hint_add(meta.delete_at, obj.bucket.tenant, obj.bucket.name,
+ obj.bucket.bucket_id, obj_key);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: objexp_hint_add() returned r=" << r << ", object will not get removed" << dendl;
/* ignoring error, nothing we can do at this point */
rgw_obj_index_key obj_key;
obj.key.get_index_key(&obj_key);
- objexp_hint_add(ts, bucket.tenant, bucket.name, bucket.bucket_id, obj_key);
+ obj_expirer->hint_add(ts, bucket.tenant, bucket.name, bucket.bucket_id, obj_key);
} catch (buffer::error& err) {
ldout(cct, 0) << "ERROR: failed to decode " RGW_ATTR_DELETE_AT << " attr" << dendl;
}
const string& from_marker, const string& to_marker,
librados::AioCompletion *completion = nullptr);
- string objexp_hint_get_shardname(int shard_num);
- int objexp_key_shard(const rgw_obj_index_key& key);
- void objexp_get_shard(int shard_num,
- string& shard); /* out */
- int objexp_hint_add(const ceph::real_time& delete_at,
- const string& tenant_name,
- const string& bucket_name,
- const string& bucket_id,
- const rgw_obj_index_key& obj_key);
- int objexp_hint_list(const string& oid,
- const ceph::real_time& start_time,
- const ceph::real_time& end_time,
- const int max_entries,
- const string& marker,
- list<cls_timeindex_entry>& entries, /* out */
- string *out_marker, /* out */
- bool *truncated); /* out */
- int objexp_hint_parse(cls_timeindex_entry &ti_entry,
- objexp_hint_entry& hint_entry); /* out */
- int objexp_hint_trim(const string& oid,
- const ceph::real_time& start_time,
- const ceph::real_time& end_time,
- const string& from_marker = std::string(),
- const string& to_marker = std::string());
-
int lock_exclusive(const rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id);
int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
rgw_rados_ref& get_ref() { return ref; }
const rgw_rados_ref& get_ref() const { return ref; }
+
+ const rgw_raw_obj& get_raw_obj() const {
+ return ref.obj;
+ }
};
class Pool {
friend Pool;
friend Pool::List;
};
+
+inline ostream& operator<<(ostream& out, const RGWSI_RADOS::Obj& obj) {
+ return out << obj.get_raw_obj();
+}