]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
src/tools: use the slice thing and make parallel (chunk_scrub)
authormyoungwon oh <omwmw@sk.com>
Sun, 18 Nov 2018 07:01:39 +0000 (16:01 +0900)
committermyoungwon oh <omwmw@sk.com>
Sat, 26 Jan 2019 03:10:44 +0000 (12:10 +0900)
Signed-off-by: Myoungwon Oh <omwmw@sk.com>
src/tools/ceph_dedup_tool.cc

index da3ac1ae06f3b6d658ef0af2586df326445646e2..2b4189083abd050956d7c2a8916d5dc1061d0d31 100644 (file)
@@ -105,20 +105,25 @@ class EstimateThread : public Thread
   string chunk_algo;
   string fp_algo;
   uint64_t chunk_size;
+  IoCtx chunk_io_ctx;
   map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
   typedef void (EstimateThread::*entry_func)();
   entry_func func;
 
   public:
   EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, 
-               string chunk_algo, string fp_algo, uint64_t chunk_size):
+               string chunk_algo, string fp_algo, uint64_t chunk_size, IoCtx chunk_io_ctx):
     io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo),
-    chunk_size(chunk_size) {}
+    chunk_size(chunk_size), chunk_io_ctx(chunk_io_ctx) {}
 
   void set_estimate_dedup_ratio() {
     func = &EstimateThread::estimate_dedup_ratio;
   }
+  void set_chunk_scrub_common() {
+    func = &EstimateThread::chunk_scrub_common;
+  }
   void estimate_dedup_ratio();
+  void chunk_scrub_common();
 
 };
 
@@ -160,7 +165,6 @@ void EstimateThread::estimate_dedup_ratio()
       const auto &oid = i.oid;
       uint64_t offset = 0;
       while (true) {
-       cout << __func__ << oid << std::endl;
        bufferlist outdata;
        ret = io_ctx.read(oid, outdata, op_size, offset);
        if (ret <= 0) {
@@ -212,6 +216,58 @@ void EstimateThread::estimate_dedup_ratio()
   chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end());
 }
 
+void EstimateThread::chunk_scrub_common()
+{
+  ObjectCursor shard_start;
+  ObjectCursor shard_end;
+  int ret;
+
+  chunk_io_ctx.object_list_slice(
+    begin,
+    end,
+    n,
+    m,
+    &shard_start,
+    &shard_end);
+
+  ObjectCursor c(shard_start);
+  while(c < shard_end)
+  {
+    std::vector<ObjectItem> result;
+    int r = chunk_io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
+    if (r < 0 ){
+      cerr << "error object_list : " << cpp_strerror(r) << std::endl;
+      return;
+    }
+
+    for (const auto & i : result) {
+      auto oid = i.oid;
+      set<hobject_t> refs;
+      set<hobject_t> real_refs;
+      ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs);
+      if (ret < 0) {
+       continue;
+      }
+
+      for (auto pp : refs) {
+       ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid);
+       if (ret != -ENOENT) {
+         real_refs.insert(pp);
+       } 
+      }
+
+      if (refs.size() != real_refs.size()) {
+       ObjectWriteOperation op;
+       cls_chunk_refcount_set(op, real_refs);
+       ret = chunk_io_ctx.operate(oid, &op);
+       if (ret < 0) {
+         continue;
+       }
+      }
+    }
+  }
+}
+
 int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
                          std::vector<const char*> &nargs)
 {
@@ -300,7 +356,7 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
     ObjectCursor begin = io_ctx.object_list_begin();
     ObjectCursor end = io_ctx.object_list_end();
     std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
-                                                           chunk_algo, fp_algo, chunk_size));
+                                                           chunk_algo, fp_algo, chunk_size, IoCtx()));
     ptr->set_estimate_dedup_ratio();
     ptr->create("estimate_thread");
     estimate_threads.push_back(move(ptr));
@@ -323,6 +379,7 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
   std::string object_name, target_object_name;
   string pool_name, chunk_pool_name, op_name;
   int ret;
+  unsigned max_thread = default_max_thread;
   std::map<std::string, std::string>::const_iterator i;
 
   i = opts.find("pool");
@@ -344,6 +401,12 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
   } else {
     usage_exit();
   }
+  i = opts.find("max-thread");
+  if (i != opts.end()) {
+    if (rados_sistrtoll(i, &max_thread)) {
+      return -EINVAL;
+    }
+  } 
   i = opts.find("pgid");
   boost::optional<pg_t> pgid(i != opts.end(), pg_t());
 
@@ -426,7 +489,6 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
     } else {
       usage_exit();
     }
-    cout << __func__ << " " << __LINE__ << std::endl;
     set<hobject_t> refs;
     cout << " refs: " << std::endl;
     ret = cls_chunk_refcount_read(chunk_io_ctx, object_name, &refs);
@@ -437,41 +499,18 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts,
     return ret;
   }
   
-  {
-    try {
-      librados::NObjectIterator i = pgid ? chunk_io_ctx.nobjects_begin(pgid->ps()) : chunk_io_ctx.nobjects_begin();
-      librados::NObjectIterator i_end = chunk_io_ctx.nobjects_end();
-      for (; i != i_end; ++i) {
-       set<hobject_t> refs;
-       set<hobject_t> real_refs;
-       string oid = i->get_oid();
-       ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs);
-       if (ret < 0) {
-         continue;
-       }
-
-       for (auto pp : refs) {
-         ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid);
-         if (ret != -ENOENT) {
-           real_refs.insert(pp);
-         } 
-       }
+  for (unsigned i = 0; i < max_thread; i++) {
+    ObjectCursor begin = io_ctx.object_list_begin();
+    ObjectCursor end = io_ctx.object_list_end();
+    std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
+                                                           "", "", 0, chunk_io_ctx));
+    ptr->set_chunk_scrub_common();
+    ptr->create("estimate_thread");
+    estimate_threads.push_back(move(ptr));
+  }
 
-       if (refs.size() != real_refs.size()) {
-         ObjectWriteOperation op;
-         cls_chunk_refcount_set(op, real_refs);
-         ret = chunk_io_ctx.operate(oid, &op);
-         if (ret < 0) {
-           continue;
-         }
-       }
-      }
-    }
-    catch (const std::runtime_error& e) {
-      cerr << e.what() << std::endl;
-      ret = -1;
-      goto out;
-    }
+  for (auto &p : estimate_threads) {
+    p->join();
   }
 
 out: