]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial work of orphan detection tool implementation
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 25 Apr 2015 16:37:53 +0000 (09:37 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 29 Jun 2015 22:09:01 +0000 (15:09 -0700)
So far doesn't do much, iterate through all objects in a specific pool
data, store it in a sharded index.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/CMakeLists.txt
src/rgw/Makefile.am
src/rgw/rgw_admin.cc
src/rgw/rgw_orphan.cc [new file with mode: 0644]
src/rgw/rgw_orphan.h [new file with mode: 0644]
src/rgw/rgw_rados.h

index ce8ebfb3ffbc1c45ae6b31e4b1337ee1d04cceb9..58f52c627e9dd9ce4221accf3f00ef6f1a68b0ae 100644 (file)
@@ -779,7 +779,8 @@ if(${WITH_RADOSGW})
     rgw/rgw_main.cc)
 
   set(radosgw_admin_srcs
-    rgw/rgw_admin.cc)
+    rgw/rgw_admin.cc
+    rgw/rgw_orphan.cc)
 
   add_executable(radosgw ${radosgw_srcs} $<TARGET_OBJECTS:heap_profiler_objs>)
   target_link_libraries(radosgw rgw_a librados
index 316ae7620ce537a753eeb917b072d7d26db5938e..7620d73b053d1caab4ca22dd6d77a4a9e29c8c99 100644 (file)
@@ -100,7 +100,7 @@ radosgw_CFLAGS = -I$(srcdir)/civetweb/include
 radosgw_LDADD = $(LIBRGW) $(LIBCIVETWEB) $(LIBRGW_DEPS) $(RESOLV_LIBS) $(CEPH_GLOBAL)
 bin_PROGRAMS += radosgw
 
-radosgw_admin_SOURCES = rgw/rgw_admin.cc
+radosgw_admin_SOURCES = rgw/rgw_admin.cc rgw/rgw_orphan.cc
 radosgw_admin_LDADD = $(LIBRGW) $(LIBRGW_DEPS) $(CEPH_GLOBAL)
 bin_PROGRAMS += radosgw-admin
 
@@ -141,6 +141,7 @@ noinst_HEADERS += \
        rgw/rgw_metadata.h \
        rgw/rgw_multi_del.h \
        rgw/rgw_op.h \
+       rgw/rgw_orphan.h \
        rgw/rgw_http_client.h \
        rgw/rgw_swift.h \
        rgw/rgw_swift_auth.h \
index b6fdd1ccca627ebd422ff2faf2a8de12d75b555d..80f1d6a0ae579cb1fee9833dcf6ef614b9d99996 100644 (file)
@@ -31,6 +31,7 @@ using namespace std;
 #include "rgw_formats.h"
 #include "rgw_usage.h"
 #include "rgw_replica_log.h"
+#include "rgw_orphan.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -232,6 +233,7 @@ enum {
   OPT_QUOTA_DISABLE,
   OPT_GC_LIST,
   OPT_GC_PROCESS,
+  OPT_ORPHANS_FIND,
   OPT_REGION_GET,
   OPT_REGION_LIST,
   OPT_REGION_SET,
@@ -281,6 +283,7 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       strcmp(cmd, "object") == 0 ||
       strcmp(cmd, "olh") == 0 ||
       strcmp(cmd, "opstate") == 0 ||
+      strcmp(cmd, "orphans") == 0 || 
       strcmp(cmd, "pool") == 0 ||
       strcmp(cmd, "pools") == 0 ||
       strcmp(cmd, "quota") == 0 ||
@@ -441,6 +444,9 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       return OPT_GC_LIST;
     if (strcmp(cmd, "process") == 0)
       return OPT_GC_PROCESS;
+  } else if (strcmp(prev_cmd, "orphans") == 0) {
+    if (strcmp(cmd, "find") == 0)
+      return OPT_ORPHANS_FIND;
   } else if (strcmp(prev_cmd, "metadata") == 0) {
     if (strcmp(cmd, "get") == 0)
       return OPT_METADATA_GET;
@@ -1024,6 +1030,7 @@ int do_check_object_locator(const string& bucket_name, bool fix, bool remove_bad
   return 0;
 }
 
+
 int main(int argc, char **argv) 
 {
   vector<const char*> args;
@@ -1105,6 +1112,10 @@ int main(int argc, char **argv)
 
   BIIndexType bi_index_type = PlainIdx;
 
+  string job_id;
+  int init_search = false;
+  int num_shards = 0;
+
   std::string val;
   std::ostringstream errs;
   string err;
@@ -1154,6 +1165,8 @@ int main(int argc, char **argv)
         cerr << "bad key type: " << key_type_str << std::endl;
         return usage();
       }
+    } else if (ceph_argparse_witharg(args, i, &val, "--job-id", (char*)NULL)) {
+      job_id = val;
     } else if (ceph_argparse_binary_flag(args, i, &gen_access_key, NULL, "--gen-access-key", (char*)NULL)) {
       // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &gen_secret_key, NULL, "--gen-secret", (char*)NULL)) {
@@ -1203,6 +1216,8 @@ int main(int argc, char **argv)
       start_date = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--end-date", "--end-time", (char*)NULL)) {
       end_date = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--num-shards", (char*)NULL)) {
+      num_shards = atoi(val.c_str());
     } else if (ceph_argparse_witharg(args, i, &val, "--shard-id", (char*)NULL)) {
       shard_id = atoi(val.c_str());
       specified_shard_id = true;
@@ -1257,6 +1272,8 @@ int main(int argc, char **argv)
      // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &include_all, NULL, "--include-all", (char*)NULL)) {
      // do nothing
+    } else if (ceph_argparse_binary_flag(args, i, &init_search, NULL, "--init-search", (char*)NULL)) {
+     // do nothing
     } else if (ceph_argparse_witharg(args, i, &val, "--caps", (char*)NULL)) {
       caps = val;
     } else if (ceph_argparse_witharg(args, i, &val, "-i", "--infile", (char*)NULL)) {
@@ -2530,6 +2547,35 @@ next:
     }
   }
 
+  if (opt_cmd == OPT_ORPHANS_FIND) {
+    RGWOrphanSearch search(store);
+
+    if (job_id.empty()) {
+      cerr << "ERROR: --job-id not specified" << std::endl;
+      return EINVAL;
+    }
+    RGWOrphanSearchInfo info, *pinfo = NULL;
+    if (init_search) {
+      if (pool_name.empty()) {
+        cerr << "ERROR: --pool-name not specified" << std::endl;
+        return EINVAL;
+      }
+      info.pool = pool_name;
+      info.job_name = job_id;
+      info.num_shards = num_shards;
+      pinfo = &info;
+    }
+
+    int ret = search.init(job_id, pinfo);
+    if (ret < 0) {
+      return -ret;
+    }
+    ret = search.run();
+    if (ret < 0) {
+      return -ret;
+    }
+  }
+
   if (opt_cmd == OPT_USER_CHECK) {
     check_bad_user_bucket_mapping(store, user_id, fix);
   }
diff --git a/src/rgw/rgw_orphan.cc b/src/rgw/rgw_orphan.cc
new file mode 100644 (file)
index 0000000..42e05d1
--- /dev/null
@@ -0,0 +1,273 @@
+
+
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "common/Formatter.h"
+#include "common/errno.h"
+
+#include "rgw_rados.h"
+#include "rgw_orphan.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#define DEFAULT_NUM_SHARDS 10
+
+int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState& state)
+{
+  set<string> keys;
+  map<string, bufferlist> vals;
+  keys.insert(job_name);
+  int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
+  if (r < 0) {
+    return r;
+  }
+
+  map<string, bufferlist>::iterator iter = vals.find(job_name);
+  if (iter == vals.end()) {
+    return -ENOENT;
+  }
+
+  try {
+    bufferlist& bl = iter->second;
+    ::decode(state, bl);
+  } catch (buffer::error& err) {
+    lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
+    return -EIO;
+  }
+
+  return 0;
+}
+
+int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
+{
+  map<string, bufferlist> vals;
+  bufferlist bl;
+  ::encode(state, bl);
+  vals[job_name] = bl;
+  int r = ioctx.omap_set(oid, vals);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWOrphanStore::init()
+{
+  const char *log_pool = store->get_zone_params().log_pool.name.c_str();
+  librados::Rados *rados = store->get_rados();
+  int r = rados->ioctx_create(log_pool, ioctx);
+  if (r < 0) {
+    cerr << "ERROR: failed to open log pool ret=" << r << std::endl;
+    return r;
+  }
+
+
+
+  return 0;
+}
+
+int RGWOrphanStore::store_entries(const string& oid, map<string, bufferlist> entries)
+{
+  librados::ObjectWriteOperation op;
+  op.omap_set(entries);
+  cout << "storing " << entries.size() << " entries at " << oid << std::endl;
+  int ret = ioctx.operate(oid, &op);
+  if (ret < 0) {
+    cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << std::endl;
+  }
+  
+  return 0;
+}
+
+int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
+  int r = orphan_store.init();
+  if (r < 0) {
+    return r;
+  }
+
+  if (info) {
+    search_info = *info;
+    search_info.job_name = job_name;
+    search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
+    search_state = ORPHAN_SEARCH_INIT;
+    r = save_state();
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
+      return r;
+    }
+  } else {
+    RGWOrphanSearchState state;
+    r = orphan_store.read_job(job_name, state);
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
+      return r;
+    }
+
+    search_info = state.info;
+    search_state = state.state;
+  }
+
+  log_objs_prefix = RGW_ORPHAN_LOG_PREFIX + string(".");
+  log_objs_prefix += job_name;
+
+  for (int i = 0; i < search_info.num_shards; i++) {
+    char buf[128];
+
+    snprintf(buf, sizeof(buf), "%s.%d", log_objs_prefix.c_str(), i);
+    orphan_objs_log[i] = buf;
+  }
+  return 0;
+}
+
+int RGWOrphanSearch::log_oids(map<int, list<string> >& oids)
+{
+  map<int, list<string> >::iterator miter = oids.begin();
+
+  list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
+
+  for (; miter != oids.end(); ++miter) {
+    log_iter_info info;
+    info.oid = orphan_objs_log[miter->first];
+    info.cur = miter->second.begin();
+    info.end = miter->second.end();
+    liters.push_back(info);
+  }
+
+  list<log_iter_info>::iterator list_iter;
+  while (!liters.empty()) {
+     list_iter = liters.begin();
+
+     while (list_iter != liters.end()) {
+       log_iter_info& cur_info = *list_iter;
+
+       list<string>::iterator& cur = cur_info.cur;
+       list<string>::iterator& end = cur_info.end;
+
+       map<string, bufferlist> entries;
+#define MAX_OMAP_SET_ENTRIES 100
+       for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
+         ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
+         entries[*cur] = bufferlist();
+       }
+
+       int ret = orphan_store.store_entries(cur_info.oid, entries);
+       if (ret < 0) {
+         return ret;
+       }
+       list<log_iter_info>::iterator tmp = list_iter;
+       ++list_iter;
+       if (cur == end) {
+         liters.erase(tmp);
+       }
+     }
+  }
+  return 0;
+}
+
+int RGWOrphanSearch::build_all_oids_index()
+{
+  librados::Rados *rados = store->get_rados();
+
+  librados::IoCtx ioctx;
+
+  int ret = rados->ioctx_create(search_info.pool.c_str(), ioctx);
+  if (ret < 0) {
+    lderr(store->ctx()) << __func__ << ": ioctx_create() returned ret=" << ret << dendl;
+    return ret;
+  }
+
+  ioctx.set_namespace(librados::all_nspaces);
+  librados::NObjectIterator i = ioctx.nobjects_begin();
+  librados::NObjectIterator i_end = ioctx.nobjects_end();
+
+  map<int, list<string> > oids;
+
+  int count = 0;
+
+  cout << "logging all objects in the pool" << std::endl;
+
+  for (; i != i_end; ++i) {
+    string nspace = i->get_nspace();
+    string oid = i->get_oid();
+    string locator = i->get_locator();
+
+    string name = oid;
+    if (locator.size())
+      name += " (@" + locator + ")";  
+
+    ssize_t pos = oid.find('_');
+    if (pos < 0) {
+      cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
+    }
+    string obj_marker = oid.substr(0, pos);
+
+    int shard = orphan_shard(oid);
+    oids[shard].push_back(oid);
+
+#define COUNT_BEFORE_FLUSH 1000
+    if (++count >= COUNT_BEFORE_FLUSH) {
+      ret = log_oids(oids);
+      if (ret < 0) {
+        cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+        return ret;
+      }
+      count = 0;
+      oids.clear();
+    }
+  }
+  ret = log_oids(oids);
+  if (ret < 0) {
+    cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+    return ret;
+  }
+  
+  return 0;
+}
+
+int RGWOrphanSearch::run()
+{
+  int r;
+
+  switch (search_state) {
+    
+    case ORPHAN_SEARCH_INIT:
+      ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
+      search_state = ORPHAN_SEARCH_LSPOOL;
+      r = save_state();
+      if (r < 0) {
+        lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+        return r;
+      }
+      // fall through
+    case ORPHAN_SEARCH_LSPOOL:
+      ldout(store->ctx(), 0) << __func__ << "(): listing all objects in pool" << dendl;
+      r = build_all_oids_index();
+      if (r < 0) {
+        lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returnr ret=" << r << dendl;
+        return r;
+      }
+
+      search_state = ORPHAN_SEARCH_LSBUCKETS;
+      r = save_state();
+      if (r < 0) {
+        lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+        return r;
+      }
+      // fall through
+
+    case ORPHAN_SEARCH_LSBUCKETS:
+    case ORPHAN_SEARCH_DONE:
+      break;
+
+    default:
+      assert(0);
+  };
+
+  return 0;
+}
+
+
diff --git a/src/rgw/rgw_orphan.h b/src/rgw/rgw_orphan.h
new file mode 100644 (file)
index 0000000..82de635
--- /dev/null
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_RGW_ORPHAN_H
+#define CEPH_RGW_ORPHAN_H
+
+#include "common/config.h"
+#include "common/Formatter.h"
+#include "common/errno.h"
+
+#include "rgw_rados.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#define RGW_ORPHAN_INDEX_OID "orphan.index"
+#define RGW_ORPHAN_LOG_PREFIX "orphan.scan"
+
+
+enum OrphanSearchState {
+  ORPHAN_SEARCH_UNKNOWN = 0,
+  ORPHAN_SEARCH_INIT = 1,
+  ORPHAN_SEARCH_LSPOOL = 2,
+  ORPHAN_SEARCH_LSBUCKETS = 3,
+  ORPHAN_SEARCH_DONE = 4,
+};
+
+
+struct RGWOrphanSearchInfo {
+  string job_name;
+  string pool;
+  uint16_t num_shards;
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(job_name, bl);
+    ::encode(pool, bl);
+    ::encode(num_shards, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(job_name, bl);
+    ::decode(pool, bl);
+    ::decode(num_shards, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(RGWOrphanSearchInfo)
+
+struct RGWOrphanSearchState {
+  RGWOrphanSearchInfo info;
+  OrphanSearchState state;
+  bufferlist state_info;
+
+  RGWOrphanSearchState() : state(ORPHAN_SEARCH_UNKNOWN) {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(info, bl);
+    ::encode((int)state, bl);
+    ::encode(state_info, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(info, bl);
+    int s;
+    ::decode(s, bl);
+    state = (OrphanSearchState)s;
+    ::decode(state_info, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(RGWOrphanSearchState)
+
+class RGWOrphanStore {
+  RGWRados *store;
+  librados::IoCtx ioctx;
+
+  string oid;
+
+public:
+  RGWOrphanStore(RGWRados *_store) : store(_store) {
+    oid = RGW_ORPHAN_INDEX_OID;
+  }
+
+  int init();
+
+  int read_job(const string& job_name, RGWOrphanSearchState& state);
+  int write_job(const string& job_name, const RGWOrphanSearchState& state);
+
+
+  int store_entries(const string& oid, map<string, bufferlist> entries);
+};
+
+
+class RGWOrphanSearch {
+  RGWRados *store;
+  librados::IoCtx log_ioctx;
+
+  RGWOrphanStore orphan_store;
+
+  RGWOrphanSearchInfo search_info;
+  OrphanSearchState search_state;
+
+  map<int, string> orphan_objs_log;
+
+  string log_objs_prefix;
+
+  struct log_iter_info {
+    string oid;
+    list<string>::iterator cur;
+    list<string>::iterator end;
+  };
+
+  int log_oids(map<int, list<string> >& oids);
+
+#define RGW_ORPHANSEARCH_HASH_PRIME 7877
+  int orphan_shard(const string& str) {
+    return ceph_str_hash_linux(str.c_str(), str.size()) % RGW_ORPHANSEARCH_HASH_PRIME % search_info.num_shards;
+  }
+
+public:
+  RGWOrphanSearch(RGWRados *_store) : store(_store), orphan_store(store) {}
+
+  int save_state() {
+    RGWOrphanSearchState state;
+    state.info = search_info;
+    state.state = search_state;
+    return orphan_store.write_job(search_info.job_name, state);
+  }
+
+  int init(const string& job_name, RGWOrphanSearchInfo *info);
+
+  int create(const string& job_name, int num_shards);
+
+  int build_all_oids_index();
+  int run();
+};
+
+
+
+#endif
index f164559515db3fabc412aebbd2934dd0a93bc7ad..b487c034b9234e0d77a53d9f3f538361207c5dca 100644 (file)
@@ -1271,6 +1271,8 @@ public:
                rest_master_conn(NULL),
                meta_mgr(NULL), data_log(NULL) {}
 
+  librados::Rados *get_rados() { return rados; }
+
   uint64_t get_new_req_id() {
     return max_req_id.inc();
   }
@@ -1295,6 +1297,8 @@ public:
   map<string, RGWRESTConn *> zone_conn_map;
   map<string, RGWRESTConn *> region_conn_map;
 
+  RGWZoneParams& get_zone_params() { return zone; }
+
   RGWMetadataManager *meta_mgr;
 
   RGWDataChangesLog *data_log;