]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: object expirer: move code around
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 8 Feb 2019 00:17:29 +0000 (16:17 -0800)
committerCasey Bodley <cbodley@redhat.com>
Mon, 29 Jul 2019 19:20:45 +0000 (15:20 -0400)
move code out of RGWRados, refactor a bit to use rados svc.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_object_expirer_core.cc
src/rgw/rgw_object_expirer_core.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/services/svc_rados.h

index 93d240f6e84e53b52e1d99d8f073370f4bf392b1..9a7447e95b5df4b764a023334392df41687d4fe3 100644 (file)
 #include "rgw_formats.h"
 #include "rgw_usage.h"
 #include "rgw_object_expirer_core.h"
+#include "rgw_zone.h"
 
+#include "services/svc_rados.h"
+#include "services/svc_zone.h"
 #include "services/svc_sys_obj.h"
 
 #include "cls/lock/cls_lock_client.h"
+#include "cls/timeindex/cls_timeindex_client.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
 static string objexp_lock_name = "gc_process";
 
+static string objexp_hint_get_shardname(int shard_num)
+{
+  char buf[64];
+  snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num);
+  return buf;
+}
+
+static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards)
+{
+  string obj_key = key.name + key.instance;
+  return rgw_bucket_shard_index(obj_key, num_shards);
+}
+
+static string objexp_hint_get_keyext(const string& tenant_name,
+                                     const string& bucket_name,
+                                     const string& bucket_id,
+                                     const rgw_obj_key& obj_key) {
+  return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
+    ":" + obj_key.name + ":" + obj_key.instance;
+}
+
+static void objexp_get_shard(int shard_num,
+                             string *shard)
+{
+  *shard = objexp_hint_get_shardname(shard_num);
+}
+
+static int objexp_hint_parse(CephContext *cct, cls_timeindex_entry &ti_entry,
+                             objexp_hint_entry *hint_entry)
+{
+  try {
+    auto iter = ti_entry.value.cbegin();
+    decode(*hint_entry, iter);
+  } catch (buffer::error& err) {
+    ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
+  }
+
+  return 0;
+}
+
+int RGWObjExpStore::objexp_hint_add(const ceph::real_time& delete_at,
+                              const string& tenant_name,
+                              const string& bucket_name,
+                              const string& bucket_id,
+                              const rgw_obj_index_key& obj_key)
+{
+  const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
+          bucket_id, obj_key);
+  objexp_hint_entry he = {
+      .tenant = tenant_name,
+      .bucket_name = bucket_name,
+      .bucket_id = bucket_id,
+      .obj_key = obj_key,
+      .exp_time = delete_at };
+  bufferlist hebl;
+  encode(he, hebl);
+  librados::ObjectWriteOperation op;
+  cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
+
+  string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards));
+  auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, shard_name));
+  int r = obj.open();
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  return obj.operate(&op, null_yield);
+}
+
+int RGWObjExpStore::objexp_hint_list(const string& oid,
+                               const ceph::real_time& start_time,
+                               const ceph::real_time& end_time,
+                               const int max_entries,
+                               const string& marker,
+                               list<cls_timeindex_entry>& entries, /* out */
+                               string *out_marker,                 /* out */
+                               bool *truncated)                    /* out */
+{
+  librados::ObjectReadOperation op;
+  cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
+        out_marker, truncated);
+
+  auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid));
+  int r = obj.open();
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  bufferlist obl;
+  int ret = obj.operate(&op, &obl, null_yield);
+
+  if ((ret < 0 ) && (ret != -ENOENT)) {
+    return ret;
+  }
+
+  if ((ret == -ENOENT) && truncated) {
+    *truncated = false;
+  }
+
+  return 0;
+}
+
+int RGWObjExpStore::objexp_hint_trim(const string& oid,
+                               const ceph::real_time& start_time,
+                               const ceph::real_time& end_time,
+                               const string& from_marker,
+                               const string& to_marker)
+{
+  auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid));
+  int r = obj.open();
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
+    return r;
+  }
+  auto& ref = obj.get_ref();
+  int ret = cls_timeindex_trim(ref.ioctx, ref.obj.oid, utime_t(start_time), utime_t(end_time),
+          from_marker, to_marker);
+  if ((ret < 0 ) && (ret != -ENOENT)) {
+    return ret;
+  }
+
+  return 0;
+}
+
 int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
                                        const string& bucket_name,
                                        const string& bucket_id,
@@ -106,7 +234,7 @@ void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /*
     ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
         << " - " << iter->key_ext << dendl;
 
-    int ret = store->objexp_hint_parse(*iter, hint);
+    int ret = objexp_hint_parse(store->ctx(), *iter, &hint);
     if (ret < 0) {
       ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
       continue;
@@ -139,8 +267,8 @@ void RGWObjectExpirer::trim_chunk(const string& shard,
   real_time rt_from = from.to_real_time();
   real_time rt_to = to.to_real_time();
 
-  int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
-                                    from_marker, to_marker);
+  int ret = exp_store.objexp_hint_trim(shard, rt_from, rt_to,
+                                       from_marker, to_marker);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
   }
@@ -180,9 +308,9 @@ bool RGWObjectExpirer::process_single_shard(const string& shard,
     real_time rt_start = round_start.to_real_time();
 
     list<cls_timeindex_entry> entries;
-    ret = store->objexp_hint_list(shard, rt_last, rt_start,
-                                  num_entries, marker, entries,
-                                  &out_marker, &truncated);
+    ret = exp_store.objexp_hint_list(shard, rt_last, rt_start,
+                                     num_entries, marker, entries,
+                                     &out_marker, &truncated);
     if (ret < 0) {
       ldout(cct, 10) << "cannot get removal hints from shard: " << shard
                      << dendl;
@@ -219,7 +347,7 @@ bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
 
   for (int i = 0; i < num_shards; i++) {
     string shard;
-    store->objexp_get_shard(i, shard);
+    objexp_get_shard(i, &shard);
 
     ldout(store->ctx(), 20) << "processing shard = " << shard << dendl;
 
index c3caff5cc517618bb7b285e0738a03cce5c48184..a9a6d1dea11e94296a2c237fb3d34a7c62b5861d 100644 (file)
 #include "rgw_formats.h"
 #include "rgw_usage.h"
 
+class RGWSI_RADOS;
+class RGWSI_Zone;
+
+class RGWObjExpStore {
+  CephContext *cct;
+  RGWSI_RADOS *rados_svc;
+  RGWSI_Zone *zone_svc;
+public:
+  RGWObjExpStore(CephContext *_cct, RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc) : cct(_cct),
+                                                                                      rados_svc(_rados_svc),
+                                                                                      zone_svc(_zone_svc) {}
+
+  int objexp_hint_add(const ceph::real_time& delete_at,
+                      const string& tenant_name,
+                      const string& bucket_name,
+                      const string& bucket_id,
+                      const rgw_obj_index_key& obj_key);
+
+  int objexp_hint_list(const string& oid,
+                       const ceph::real_time& start_time,
+                       const ceph::real_time& end_time,
+                       const int max_entries,
+                       const string& marker,
+                       list<cls_timeindex_entry>& entries, /* out */
+                       string *out_marker,                 /* out */
+                       bool *truncated);                   /* out */
+
+  int objexp_hint_trim(const string& oid,
+                       const ceph::real_time& start_time,
+                       const ceph::real_time& end_time,
+                       const string& from_marker,
+                       const string& to_marker);
+};
+
 class RGWObjectExpirer {
 protected:
   RGWRados *store;
+  RGWObjExpStore exp_store;
 
   int init_bucket_info(const std::string& tenant_name,
                        const std::string& bucket_name,
@@ -69,12 +104,23 @@ protected:
 
 public:
   explicit RGWObjectExpirer(RGWRados *_store)
-    : store(_store), worker(NULL) {
+    : store(_store),
+      exp_store(_store->ctx(), _store->svc.rados, _store->svc.zone),
+      worker(NULL) {
   }
   ~RGWObjectExpirer() {
     stop_processor();
   }
 
+  int hint_add(const ceph::real_time& delete_at,
+               const string& tenant_name,
+               const string& bucket_name,
+               const string& bucket_id,
+               const rgw_obj_index_key& obj_key) {
+    return exp_store.objexp_hint_add(delete_at, tenant_name, bucket_name,
+                                     bucket_id, obj_key);
+  }
+
   int garbage_single_object(objexp_hint_entry& hint);
 
   void garbage_chunk(std::list<cls_timeindex_entry>& entries, /* in  */
index 14126ff3309b489c602ceb469b44d8a0332388f1..5aa04df32d3c9b5966e00114330e330d46120bef 100644 (file)
@@ -37,7 +37,6 @@
 #include "cls/refcount/cls_refcount_client.h"
 #include "cls/version/cls_version_client.h"
 #include "cls/log/cls_log_client.h"
-#include "cls/timeindex/cls_timeindex_client.h"
 #include "cls/lock/cls_lock_client.h"
 #include "cls/user/cls_user_client.h"
 #include "cls/otp/cls_otp_client.h"
@@ -2180,115 +2179,6 @@ int RGWRados::time_log_trim(const string& oid, const real_time& start_time, cons
   return r;
 }
 
-string RGWRados::objexp_hint_get_shardname(int shard_num)
-{
-  char buf[32];
-  snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
-
-  string objname("obj_delete_at_hint.");
-  return objname + buf;
-}
-
-int RGWRados::objexp_key_shard(const rgw_obj_index_key& key)
-{
-  string obj_key = key.name + key.instance;
-  int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
-  return rgw_bucket_shard_index(obj_key, num_shards);
-}
-
-static string objexp_hint_get_keyext(const string& tenant_name,
-                                     const string& bucket_name,
-                                     const string& bucket_id,
-                                     const rgw_obj_key& obj_key)
-{
-  return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
-      ":" + obj_key.name + ":" + obj_key.instance;
-}
-
-int RGWRados::objexp_hint_add(const ceph::real_time& delete_at,
-                              const string& tenant_name,
-                              const string& bucket_name,
-                              const string& bucket_id,
-                              const rgw_obj_index_key& obj_key)
-{
-  const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
-          bucket_id, obj_key);
-  objexp_hint_entry he = {
-      .tenant = tenant_name,
-      .bucket_name = bucket_name,
-      .bucket_id = bucket_id,
-      .obj_key = obj_key,
-      .exp_time = delete_at };
-  bufferlist hebl;
-  encode(he, hebl);
-  ObjectWriteOperation op;
-  cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
-
-  string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key));
-  return objexp_pool_ctx.operate(shard_name, &op);
-}
-
-void  RGWRados::objexp_get_shard(int shard_num,
-                                 string& shard)                       /* out */
-{
-  shard = objexp_hint_get_shardname(shard_num);
-}
-
-int RGWRados::objexp_hint_list(const string& oid,
-                               const ceph::real_time& start_time,
-                               const ceph::real_time& end_time,
-                               const int max_entries,
-                               const string& marker,
-                               list<cls_timeindex_entry>& entries, /* out */
-                               string *out_marker,                 /* out */
-                               bool *truncated)                    /* out */
-{
-  librados::ObjectReadOperation op;
-  cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
-        out_marker, truncated);
-
-  bufferlist obl;
-  int ret = objexp_pool_ctx.operate(oid, &op, &obl);
-
-  if ((ret < 0 ) && (ret != -ENOENT)) {
-    return ret;
-  }
-
-  if ((ret == -ENOENT) && truncated) {
-    *truncated = false;
-  }
-
-  return 0;
-}
-
-int RGWRados::objexp_hint_parse(cls_timeindex_entry &ti_entry,  /* in */
-                                objexp_hint_entry& hint_entry)  /* out */
-{
-  try {
-    auto iter = ti_entry.value.cbegin();
-    decode(hint_entry, iter);
-  } catch (buffer::error& err) {
-    ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
-  }
-
-  return 0;
-}
-
-int RGWRados::objexp_hint_trim(const string& oid,
-                               const ceph::real_time& start_time,
-                               const ceph::real_time& end_time,
-                               const string& from_marker,
-                               const string& to_marker)
-{
-  int ret = cls_timeindex_trim(objexp_pool_ctx, oid, utime_t(start_time), utime_t(end_time),
-          from_marker, to_marker);
-  if ((ret < 0 ) && (ret != -ENOENT)) {
-    return ret;
-  }
-
-  return 0;
-}
-
 int RGWRados::lock_exclusive(const rgw_pool& pool, const string& oid, timespan& duration,
                              string& zone_id, string& owner_id) {
   librados::IoCtx io_ctx;
@@ -3755,8 +3645,8 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
     rgw_obj_index_key obj_key;
     obj.key.get_index_key(&obj_key);
 
-    r = store->objexp_hint_add(meta.delete_at,
-            obj.bucket.tenant, obj.bucket.name, obj.bucket.bucket_id, obj_key);
+    r = store->obj_expirer->hint_add(meta.delete_at, obj.bucket.tenant, obj.bucket.name,
+                                     obj.bucket.bucket_id, obj_key);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: objexp_hint_add() returned r=" << r << ", object will not get removed" << dendl;
       /* ignoring error, nothing we can do at this point */
@@ -6256,7 +6146,7 @@ int RGWRados::set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& sr
         rgw_obj_index_key obj_key;
         obj.key.get_index_key(&obj_key);
 
-        objexp_hint_add(ts, bucket.tenant, bucket.name, bucket.bucket_id, obj_key);
+        obj_expirer->hint_add(ts, bucket.tenant, bucket.name, bucket.bucket_id, obj_key);
       } catch (buffer::error& err) {
        ldout(cct, 0) << "ERROR: failed to decode " RGW_ATTR_DELETE_AT << " attr" << dendl;
       }
index 0c774dc247ac3206475f0edc05e155effeb16c89..e005c27ba461e93132ba6e8b17cc1e86aeb31bb5 100644 (file)
@@ -2272,31 +2272,6 @@ public:
                     const string& from_marker, const string& to_marker,
                     librados::AioCompletion *completion = nullptr);
 
-  string objexp_hint_get_shardname(int shard_num);
-  int objexp_key_shard(const rgw_obj_index_key& key);
-  void objexp_get_shard(int shard_num,
-                        string& shard);                       /* out */
-  int objexp_hint_add(const ceph::real_time& delete_at,
-                      const string& tenant_name,
-                      const string& bucket_name,
-                      const string& bucket_id,
-                      const rgw_obj_index_key& obj_key);
-  int objexp_hint_list(const string& oid,
-                       const ceph::real_time& start_time,
-                       const ceph::real_time& end_time,
-                       const int max_entries,
-                       const string& marker,
-                       list<cls_timeindex_entry>& entries, /* out */
-                       string *out_marker,                 /* out */
-                       bool *truncated);                   /* out */
-  int objexp_hint_parse(cls_timeindex_entry &ti_entry,
-                        objexp_hint_entry& hint_entry);    /* out */
-  int objexp_hint_trim(const string& oid,
-                       const ceph::real_time& start_time,
-                       const ceph::real_time& end_time,
-                       const string& from_marker = std::string(),
-                       const string& to_marker   = std::string());
-
   int lock_exclusive(const rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id);
   int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
 
index ae9c2dcf91ba6f43eaf5b2c2743478e37a1fdf0f..c3ae766d3a6e6253ffaf2f9abaa69d7176131368 100644 (file)
@@ -90,6 +90,10 @@ public:
 
     rgw_rados_ref& get_ref() { return ref; }
     const rgw_rados_ref& get_ref() const { return ref; }
+
+    const rgw_raw_obj& get_raw_obj() const {
+      return ref.obj;
+    }
   };
 
   class Pool {
@@ -174,3 +178,7 @@ public:
   friend Pool;
   friend Pool::List;
 };
+
+inline ostream& operator<<(ostream& out, const RGWSI_RADOS::Obj& obj) {
+  return out << obj.get_raw_obj();
+}