rgw/rgw_main.cc)
set(radosgw_admin_srcs
- rgw/rgw_admin.cc)
+ rgw/rgw_admin.cc
+ rgw/rgw_orphan.cc)
add_executable(radosgw ${radosgw_srcs} $<TARGET_OBJECTS:heap_profiler_objs>)
target_link_libraries(radosgw rgw_a librados
radosgw_LDADD = $(LIBRGW) $(LIBCIVETWEB) $(LIBRGW_DEPS) $(RESOLV_LIBS) $(CEPH_GLOBAL)
bin_PROGRAMS += radosgw
-radosgw_admin_SOURCES = rgw/rgw_admin.cc
+radosgw_admin_SOURCES = rgw/rgw_admin.cc rgw/rgw_orphan.cc
radosgw_admin_LDADD = $(LIBRGW) $(LIBRGW_DEPS) $(CEPH_GLOBAL)
bin_PROGRAMS += radosgw-admin
rgw/rgw_metadata.h \
rgw/rgw_multi_del.h \
rgw/rgw_op.h \
+ rgw/rgw_orphan.h \
rgw/rgw_http_client.h \
rgw/rgw_swift.h \
rgw/rgw_swift_auth.h \
#include "rgw_formats.h"
#include "rgw_usage.h"
#include "rgw_replica_log.h"
+#include "rgw_orphan.h"
#define dout_subsys ceph_subsys_rgw
OPT_QUOTA_DISABLE,
OPT_GC_LIST,
OPT_GC_PROCESS,
+ OPT_ORPHANS_FIND,
OPT_REGION_GET,
OPT_REGION_LIST,
OPT_REGION_SET,
strcmp(cmd, "object") == 0 ||
strcmp(cmd, "olh") == 0 ||
strcmp(cmd, "opstate") == 0 ||
+ strcmp(cmd, "orphans") == 0 ||
strcmp(cmd, "pool") == 0 ||
strcmp(cmd, "pools") == 0 ||
strcmp(cmd, "quota") == 0 ||
return OPT_GC_LIST;
if (strcmp(cmd, "process") == 0)
return OPT_GC_PROCESS;
+ } else if (strcmp(prev_cmd, "orphans") == 0) {
+ if (strcmp(cmd, "find") == 0)
+ return OPT_ORPHANS_FIND;
} else if (strcmp(prev_cmd, "metadata") == 0) {
if (strcmp(cmd, "get") == 0)
return OPT_METADATA_GET;
return 0;
}
+
int main(int argc, char **argv)
{
vector<const char*> args;
BIIndexType bi_index_type = PlainIdx;
+ string job_id;
+ int init_search = false;
+ int num_shards = 0;
+
std::string val;
std::ostringstream errs;
string err;
cerr << "bad key type: " << key_type_str << std::endl;
return usage();
}
+ } else if (ceph_argparse_witharg(args, i, &val, "--job-id", (char*)NULL)) {
+ job_id = val;
} else if (ceph_argparse_binary_flag(args, i, &gen_access_key, NULL, "--gen-access-key", (char*)NULL)) {
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &gen_secret_key, NULL, "--gen-secret", (char*)NULL)) {
start_date = val;
} else if (ceph_argparse_witharg(args, i, &val, "--end-date", "--end-time", (char*)NULL)) {
end_date = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--num-shards", (char*)NULL)) {
+ num_shards = atoi(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--shard-id", (char*)NULL)) {
shard_id = atoi(val.c_str());
specified_shard_id = true;
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &include_all, NULL, "--include-all", (char*)NULL)) {
// do nothing
+ } else if (ceph_argparse_binary_flag(args, i, &init_search, NULL, "--init-search", (char*)NULL)) {
+ // do nothing
} else if (ceph_argparse_witharg(args, i, &val, "--caps", (char*)NULL)) {
caps = val;
} else if (ceph_argparse_witharg(args, i, &val, "-i", "--infile", (char*)NULL)) {
}
}
+ if (opt_cmd == OPT_ORPHANS_FIND) {
+ RGWOrphanSearch search(store);
+
+ if (job_id.empty()) {
+ cerr << "ERROR: --job-id not specified" << std::endl;
+ return EINVAL;
+ }
+ RGWOrphanSearchInfo info, *pinfo = NULL;
+ if (init_search) {
+ if (pool_name.empty()) {
+ cerr << "ERROR: --pool-name not specified" << std::endl;
+ return EINVAL;
+ }
+ info.pool = pool_name;
+ info.job_name = job_id;
+ info.num_shards = num_shards;
+ pinfo = &info;
+ }
+
+ int ret = search.init(job_id, pinfo);
+ if (ret < 0) {
+ return -ret;
+ }
+ ret = search.run();
+ if (ret < 0) {
+ return -ret;
+ }
+ }
+
if (opt_cmd == OPT_USER_CHECK) {
check_bad_user_bucket_mapping(store, user_id, fix);
}
--- /dev/null
+
+
+#include <string>
+
+using namespace std;
+
+#include "common/config.h"
+#include "common/Formatter.h"
+#include "common/errno.h"
+
+#include "rgw_rados.h"
+#include "rgw_orphan.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#define DEFAULT_NUM_SHARDS 10
+
+int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState& state)
+{
+ set<string> keys;
+ map<string, bufferlist> vals;
+ keys.insert(job_name);
+ int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
+ if (r < 0) {
+ return r;
+ }
+
+ map<string, bufferlist>::iterator iter = vals.find(job_name);
+ if (iter == vals.end()) {
+ return -ENOENT;
+ }
+
+ try {
+ bufferlist& bl = iter->second;
+ ::decode(state, bl);
+ } catch (buffer::error& err) {
+ lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
+{
+ map<string, bufferlist> vals;
+ bufferlist bl;
+ ::encode(state, bl);
+ vals[job_name] = bl;
+ int r = ioctx.omap_set(oid, vals);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::init()
+{
+ const char *log_pool = store->get_zone_params().log_pool.name.c_str();
+ librados::Rados *rados = store->get_rados();
+ int r = rados->ioctx_create(log_pool, ioctx);
+ if (r < 0) {
+ cerr << "ERROR: failed to open log pool ret=" << r << std::endl;
+ return r;
+ }
+
+
+
+ return 0;
+}
+
+int RGWOrphanStore::store_entries(const string& oid, map<string, bufferlist> entries)
+{
+ librados::ObjectWriteOperation op;
+ op.omap_set(entries);
+ cout << "storing " << entries.size() << " entries at " << oid << std::endl;
+ int ret = ioctx.operate(oid, &op);
+ if (ret < 0) {
+ cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << std::endl;
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
+ int r = orphan_store.init();
+ if (r < 0) {
+ return r;
+ }
+
+ if (info) {
+ search_info = *info;
+ search_info.job_name = job_name;
+ search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
+ search_state = ORPHAN_SEARCH_INIT;
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
+ return r;
+ }
+ } else {
+ RGWOrphanSearchState state;
+ r = orphan_store.read_job(job_name, state);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
+ return r;
+ }
+
+ search_info = state.info;
+ search_state = state.state;
+ }
+
+ log_objs_prefix = RGW_ORPHAN_LOG_PREFIX + string(".");
+ log_objs_prefix += job_name;
+
+ for (int i = 0; i < search_info.num_shards; i++) {
+ char buf[128];
+
+ snprintf(buf, sizeof(buf), "%s.%d", log_objs_prefix.c_str(), i);
+ orphan_objs_log[i] = buf;
+ }
+ return 0;
+}
+
+int RGWOrphanSearch::log_oids(map<int, list<string> >& oids)
+{
+ map<int, list<string> >::iterator miter = oids.begin();
+
+ list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
+
+ for (; miter != oids.end(); ++miter) {
+ log_iter_info info;
+ info.oid = orphan_objs_log[miter->first];
+ info.cur = miter->second.begin();
+ info.end = miter->second.end();
+ liters.push_back(info);
+ }
+
+ list<log_iter_info>::iterator list_iter;
+ while (!liters.empty()) {
+ list_iter = liters.begin();
+
+ while (list_iter != liters.end()) {
+ log_iter_info& cur_info = *list_iter;
+
+ list<string>::iterator& cur = cur_info.cur;
+ list<string>::iterator& end = cur_info.end;
+
+ map<string, bufferlist> entries;
+#define MAX_OMAP_SET_ENTRIES 100
+ for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
+ ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
+ entries[*cur] = bufferlist();
+ }
+
+ int ret = orphan_store.store_entries(cur_info.oid, entries);
+ if (ret < 0) {
+ return ret;
+ }
+ list<log_iter_info>::iterator tmp = list_iter;
+ ++list_iter;
+ if (cur == end) {
+ liters.erase(tmp);
+ }
+ }
+ }
+ return 0;
+}
+
+int RGWOrphanSearch::build_all_oids_index()
+{
+ librados::Rados *rados = store->get_rados();
+
+ librados::IoCtx ioctx;
+
+ int ret = rados->ioctx_create(search_info.pool.c_str(), ioctx);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ioctx_create() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ioctx.set_namespace(librados::all_nspaces);
+ librados::NObjectIterator i = ioctx.nobjects_begin();
+ librados::NObjectIterator i_end = ioctx.nobjects_end();
+
+ map<int, list<string> > oids;
+
+ int count = 0;
+
+ cout << "logging all objects in the pool" << std::endl;
+
+ for (; i != i_end; ++i) {
+ string nspace = i->get_nspace();
+ string oid = i->get_oid();
+ string locator = i->get_locator();
+
+ string name = oid;
+ if (locator.size())
+ name += " (@" + locator + ")";
+
+ ssize_t pos = oid.find('_');
+ if (pos < 0) {
+ cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
+ }
+ string obj_marker = oid.substr(0, pos);
+
+ int shard = orphan_shard(oid);
+ oids[shard].push_back(oid);
+
+#define COUNT_BEFORE_FLUSH 1000
+ if (++count >= COUNT_BEFORE_FLUSH) {
+ ret = log_oids(oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
+ }
+ count = 0;
+ oids.clear();
+ }
+ }
+ ret = log_oids(oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::run()
+{
+ int r;
+
+ switch (search_state) {
+
+ case ORPHAN_SEARCH_INIT:
+ ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
+ search_state = ORPHAN_SEARCH_LSPOOL;
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+ case ORPHAN_SEARCH_LSPOOL:
+ ldout(store->ctx(), 0) << __func__ << "(): listing all objects in pool" << dendl;
+ r = build_all_oids_index();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returnr ret=" << r << dendl;
+ return r;
+ }
+
+ search_state = ORPHAN_SEARCH_LSBUCKETS;
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+
+ case ORPHAN_SEARCH_LSBUCKETS:
+ case ORPHAN_SEARCH_DONE:
+ break;
+
+ default:
+ assert(0);
+ };
+
+ return 0;
+}
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RGW_ORPHAN_H
+#define CEPH_RGW_ORPHAN_H
+
+#include "common/config.h"
+#include "common/Formatter.h"
+#include "common/errno.h"
+
+#include "rgw_rados.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#define RGW_ORPHAN_INDEX_OID "orphan.index"
+#define RGW_ORPHAN_LOG_PREFIX "orphan.scan"
+
+
+enum OrphanSearchState {
+ ORPHAN_SEARCH_UNKNOWN = 0,
+ ORPHAN_SEARCH_INIT = 1,
+ ORPHAN_SEARCH_LSPOOL = 2,
+ ORPHAN_SEARCH_LSBUCKETS = 3,
+ ORPHAN_SEARCH_DONE = 4,
+};
+
+
+struct RGWOrphanSearchInfo {
+ string job_name;
+ string pool;
+ uint16_t num_shards;
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(job_name, bl);
+ ::encode(pool, bl);
+ ::encode(num_shards, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(job_name, bl);
+ ::decode(pool, bl);
+ ::decode(num_shards, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(RGWOrphanSearchInfo)
+
+struct RGWOrphanSearchState {
+ RGWOrphanSearchInfo info;
+ OrphanSearchState state;
+ bufferlist state_info;
+
+ RGWOrphanSearchState() : state(ORPHAN_SEARCH_UNKNOWN) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(info, bl);
+ ::encode((int)state, bl);
+ ::encode(state_info, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(info, bl);
+ int s;
+ ::decode(s, bl);
+ state = (OrphanSearchState)s;
+ ::decode(state_info, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(RGWOrphanSearchState)
+
+class RGWOrphanStore {
+ RGWRados *store;
+ librados::IoCtx ioctx;
+
+ string oid;
+
+public:
+ RGWOrphanStore(RGWRados *_store) : store(_store) {
+ oid = RGW_ORPHAN_INDEX_OID;
+ }
+
+ int init();
+
+ int read_job(const string& job_name, RGWOrphanSearchState& state);
+ int write_job(const string& job_name, const RGWOrphanSearchState& state);
+
+
+ int store_entries(const string& oid, map<string, bufferlist> entries);
+};
+
+
+class RGWOrphanSearch {
+ RGWRados *store;
+ librados::IoCtx log_ioctx;
+
+ RGWOrphanStore orphan_store;
+
+ RGWOrphanSearchInfo search_info;
+ OrphanSearchState search_state;
+
+ map<int, string> orphan_objs_log;
+
+ string log_objs_prefix;
+
+ struct log_iter_info {
+ string oid;
+ list<string>::iterator cur;
+ list<string>::iterator end;
+ };
+
+ int log_oids(map<int, list<string> >& oids);
+
+#define RGW_ORPHANSEARCH_HASH_PRIME 7877
+ int orphan_shard(const string& str) {
+ return ceph_str_hash_linux(str.c_str(), str.size()) % RGW_ORPHANSEARCH_HASH_PRIME % search_info.num_shards;
+ }
+
+public:
+ RGWOrphanSearch(RGWRados *_store) : store(_store), orphan_store(store) {}
+
+ int save_state() {
+ RGWOrphanSearchState state;
+ state.info = search_info;
+ state.state = search_state;
+ return orphan_store.write_job(search_info.job_name, state);
+ }
+
+ int init(const string& job_name, RGWOrphanSearchInfo *info);
+
+ int create(const string& job_name, int num_shards);
+
+ int build_all_oids_index();
+ int run();
+};
+
+
+
+#endif
rest_master_conn(NULL),
meta_mgr(NULL), data_log(NULL) {}
+ librados::Rados *get_rados() { return rados; }
+
uint64_t get_new_req_id() {
return max_req_id.inc();
}
map<string, RGWRESTConn *> zone_conn_map;
map<string, RGWRESTConn *> region_conn_map;
+ RGWZoneParams& get_zone_params() { return zone; }
+
RGWMetadataManager *meta_mgr;
RGWDataChangesLog *data_log;