]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
src/tools: use the slice thing and make parallel
authormyoungwon oh <omwmw@sk.com>
Sun, 18 Nov 2018 04:36:05 +0000 (13:36 +0900)
committermyoungwon oh <omwmw@sk.com>
Sat, 26 Jan 2019 03:10:02 +0000 (12:10 +0900)
Signed-off-by: Myoungwon Oh <omwmw@sk.com>
src/tools/ceph_dedup_tool.cc

index 4a960e5e449006c895179c04280e6be416d5c6e1..da3ac1ae06f3b6d658ef0af2586df326445646e2 100644 (file)
@@ -45,7 +45,9 @@
 
 using namespace librados;
 unsigned default_op_size = 1 << 22;
+unsigned default_max_thread = 2;
 map< string, pair <uint64_t, uint64_t> > chunk_statistics; // < key, <count, chunk_size> >
+Mutex glock("chunk_statistics::Locker");
 
 void usage()
 {
@@ -92,6 +94,124 @@ static void print_dedup_estimate()
   cout << " result: " << total_size << " / " << dedup_size << " (total size / dedup size) " << std::endl;
 }
 
+class EstimateThread : public Thread 
+{
+  void* entry() override;
+  IoCtx io_ctx;
+  int n;
+  int m;
+  ObjectCursor begin;
+  ObjectCursor end;
+  string chunk_algo;
+  string fp_algo;
+  uint64_t chunk_size;
+  map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
+  typedef void (EstimateThread::*entry_func)();
+  entry_func func;
+
+  public:
+  EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, 
+               string chunk_algo, string fp_algo, uint64_t chunk_size):
+    io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo),
+    chunk_size(chunk_size) {}
+
+  void set_estimate_dedup_ratio() {
+    func = &EstimateThread::estimate_dedup_ratio;
+  }
+  void estimate_dedup_ratio();
+
+};
+
+vector<std::unique_ptr<EstimateThread>> estimate_threads;
+
+void* EstimateThread::entry() {
+  if (func) {
+    (this->*func)();
+  }
+  return NULL;
+}
+
+void EstimateThread::estimate_dedup_ratio()
+{
+  ObjectCursor shard_start;
+  ObjectCursor shard_end;
+  unsigned op_size = default_op_size;
+  int ret;
+
+  io_ctx.object_list_slice(
+    begin,
+    end,
+    n,
+    m,
+    &shard_start,
+    &shard_end);
+
+  ObjectCursor c(shard_start);
+  while(c < shard_end)
+  {
+    std::vector<ObjectItem> result;
+    int r = io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
+    if (r < 0 ){
+      cerr << "error object_list : " << cpp_strerror(r) << std::endl;
+      return;
+    }
+
+    for (const auto & i : result) {
+      const auto &oid = i.oid;
+      uint64_t offset = 0;
+      while (true) {
+       cout << __func__ << oid << std::endl;
+       bufferlist outdata;
+       ret = io_ctx.read(oid, outdata, op_size, offset);
+       if (ret <= 0) {
+         break;
+       }
+
+       if (chunk_algo == "fixed") {
+         if (fp_algo == "sha1") {
+           uint64_t c_offset = 0;
+           while (c_offset < outdata.length()) {
+             bufferlist chunk;
+             if (outdata.length() - c_offset > chunk_size) {
+               bufferptr bptr(chunk_size);
+               chunk.push_back(std::move(bptr));
+               chunk.copy_in(0, chunk_size, outdata.c_str());    
+             } else {
+               bufferptr bptr(outdata.length() - c_offset);
+               chunk.push_back(std::move(bptr));
+               chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str());   
+             }
+             sha1_digest_t sha1_val = chunk.sha1();
+             string fp = sha1_val.to_str();
+             auto p = local_chunk_statistics.find(fp);
+             if (p != local_chunk_statistics.end()) {
+               uint64_t count = p->second.first;
+               count++;
+               local_chunk_statistics[fp] = make_pair(count, chunk.length());
+             } else {
+               local_chunk_statistics[fp] = make_pair(1, chunk.length());
+             }
+             c_offset = c_offset + chunk_size;
+           }
+         } else {
+           ceph_assert(0 == "no support fingerprint algorithm"); 
+         }
+       } else {
+         ceph_assert(0 == "no support chunk algorithm"); 
+       }
+       
+       if (outdata.length() < op_size) {
+         break;
+       }
+       offset += outdata.length();
+      }
+    }
+  }
+
+  Mutex::Locker l(glock);
+  chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end());
+}
+
 int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
                          std::vector<const char*> &nargs)
 {
@@ -102,7 +222,7 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
   string fp_algo;
   string pool_name;
   uint64_t chunk_size = 0;
-  unsigned op_size = default_op_size;
+  unsigned max_thread = default_max_thread;
   int ret;
   std::map<std::string, std::string>::const_iterator i;
 
@@ -143,6 +263,13 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
     usage_exit();
   }
 
+  i = opts.find("max-thread");
+  if (i != opts.end()) {
+    if (rados_sistrtoll(i, &max_thread)) {
+      return -EINVAL;
+    }
+  } 
+
   i = opts.find("pgid");
   boost::optional<pg_t> pgid(i != opts.end(), pg_t());
 
@@ -169,64 +296,18 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
     goto out;
   }
 
-  {
-    try {
-      librados::NObjectIterator i = pgid ? io_ctx.nobjects_begin(pgid->ps()) : io_ctx.nobjects_begin();
-      librados::NObjectIterator i_end = io_ctx.nobjects_end();
-      for (; i != i_end; ++i) {
-       uint64_t offset = 0;
-       while (true) {
-         bufferlist outdata;
-         ret = io_ctx.read(i->get_oid(), outdata, op_size, offset);
-         if (ret <= 0) {
-           break;
-         }
+  for (unsigned i = 0; i < max_thread; i++) {
+    ObjectCursor begin = io_ctx.object_list_begin();
+    ObjectCursor end = io_ctx.object_list_end();
+    std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
+                                                           chunk_algo, fp_algo, chunk_size));
+    ptr->set_estimate_dedup_ratio();
+    ptr->create("estimate_thread");
+    estimate_threads.push_back(move(ptr));
+  }
 
-         if (chunk_algo == "fixed") {
-           if (fp_algo == "sha1") {
-               uint64_t c_offset = 0;
-               while (c_offset < outdata.length()) {
-                 bufferlist chunk;
-                 if (outdata.length() - c_offset > chunk_size) {
-                   bufferptr bptr(chunk_size);
-                   chunk.push_back(std::move(bptr));
-                   chunk.copy_in(0, chunk_size, outdata.c_str());        
-                 } else {
-                   bufferptr bptr(outdata.length() - c_offset);
-                   chunk.push_back(std::move(bptr));
-                   chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str());       
-                 }
-                 sha1_digest_t sha1_val = chunk.sha1();
-                 string fp = sha1_val.to_str();
-                 auto p = chunk_statistics.find(fp);
-                 if (p != chunk_statistics.end()) {
-                   uint64_t count = p->second.first;
-                   count++;
-                   chunk_statistics[fp] = make_pair(count, chunk.length());
-                 } else {
-                   chunk_statistics[fp] = make_pair(1, chunk.length());
-                 }
-                 c_offset = c_offset + chunk_size;
-               }
-           } else {
-             ceph_assert(0 == "no support fingerprint algorithm"); 
-           }
-         } else {
-           ceph_assert(0 == "no support chunk algorithm"); 
-         }
-         
-         if (outdata.length() < op_size) {
-           break;
-         }
-         offset += outdata.length();
-       }
-      }
-    }
-    catch (const std::runtime_error& e) {
-      cerr << e.what() << std::endl;
-      ret = -1;
-      goto out;
-    }
+  for (auto &p : estimate_threads) {
+    p->join();
   }
 
   print_dedup_estimate();
@@ -439,6 +520,8 @@ int main(int argc, const char **argv)
       opts["chunk-pool"] = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--target-ref", (char*)NULL)) {
       opts["target-ref"] = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--max-thread", (char*)NULL)) {
+      opts["max-thread"] = val;
     } else {
       if (val[0] == '-')
         usage_exit();