From 721a6bef9e0f853a9a5f75ec51c7e01a471334bf Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 21 Aug 2012 15:05:38 -0700 Subject: [PATCH] rgw: implement garbage collector Add a garbage collector thread that is responsible for clean up of clutter. When removing an object, store info about the leftovers in a special gc map (via rgw objclass). A new radosgw-admin commands to list objects in gc, and to run the gc process manually. Also, gc processors can run in parallel, however, each will handle a single gc shard (synchronized using lock objclass). Signed-off-by: Yehuda Sadeh --- src/Makefile.am | 4 +- src/cls/rgw/cls_rgw.cc | 66 +++++--- src/cls/rgw/cls_rgw_types.cc | 185 +++++++++++++++++++++ src/rgw/rgw_admin.cc | 61 ++++++- src/rgw/rgw_cache.h | 1 + src/rgw/rgw_common.h | 2 + src/rgw/rgw_gc.cc | 303 +++++++++++++++++++++++++++++++++++ src/rgw/rgw_gc.h | 59 +++++++ src/rgw/rgw_main.cc | 2 +- src/rgw/rgw_op.cc | 14 ++ src/rgw/rgw_rados.cc | 226 +++++++++++++++++++++----- src/rgw/rgw_rados.h | 58 +++++-- 12 files changed, 897 insertions(+), 84 deletions(-) create mode 100644 src/cls/rgw/cls_rgw_types.cc create mode 100644 src/rgw/rgw_gc.cc create mode 100644 src/rgw/rgw_gc.h diff --git a/src/Makefile.am b/src/Makefile.am index cb0d262179905..faec81c0a1f8a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -315,13 +315,14 @@ librgw_a_SOURCES = \ rgw/rgw_formats.cc \ rgw/rgw_log.cc \ rgw/rgw_multi.cc \ + rgw/rgw_gc.cc \ rgw/rgw_env.cc librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS} librgw_a_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS} noinst_LIBRARIES += librgw.a my_radosgw_ldadd = \ - libglobal.la librgw.a librados.la libcls_rgw_client.a -lcurl -lexpat \ + libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_lock_client.a -lcurl -lexpat \ $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) radosgw_SOURCES = \ @@ -1669,6 +1670,7 @@ noinst_HEADERS = \ rgw/rgw_formats.h\ rgw/rgw_log.h\ rgw/rgw_multi.h\ + rgw/rgw_gc.h\ rgw/rgw_op.h\ rgw/rgw_swift.h\ rgw/rgw_swift_auth.h\ diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 9fba355f90883..12012a45614d1 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -672,16 +672,27 @@ int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlis return 0; } +/* + * We hold the garbage collection chain data under two different indexes: the first 'name' index + * keeps them under a unique tag that represents the chains, and a second 'time' index keeps + * them by their expiration timestamp + */ #define GC_OBJ_NAME_INDEX 0 #define GC_OBJ_TIME_INDEX 1 static string gc_index_prefixes[] = { "0_", "1_" }; +static void prepend_index_prefix(const string& src, int index, string *dest) +{ + *dest = gc_index_prefixes[index]; + dest->append(src); +} + static int gc_omap_get(cls_method_context_t hctx, int type, const string& key, cls_rgw_gc_obj_info *info) { - string index = gc_index_prefixes[type]; - index.append(key); + string index; + prepend_index_prefix(key, type, &index); bufferlist bl; int ret = cls_cxx_map_get_val(hctx, index, &bl); @@ -698,7 +709,7 @@ static int gc_omap_get(cls_method_context_t hctx, int type, const string& key, c return 0; } -static int gc_omap_set(cls_method_context_t hctx, int type, const string& key, cls_rgw_gc_obj_info *info) +static int gc_omap_set(cls_method_context_t hctx, int type, const string& key, const cls_rgw_gc_obj_info *info) { bufferlist bl; ::encode(*info, bl); @@ -726,13 +737,20 @@ static int gc_omap_remove(cls_method_context_t hctx, int type, const string& key return 0; } -void get_time_key(utime_t& ut, string& key) +static void get_time_key(utime_t& ut, string *key) { char buf[32]; - snprintf(buf, 32, "%011lld.%d", (long long)ut.sec(), ut.nsec()); - key = buf; + snprintf(buf, 32, "%011lld.%09d", (long long)ut.sec(), ut.nsec()); + *key = buf; } +static bool key_in_index(const string& key, int index_type) +{ + const string& prefix = gc_index_prefixes[index_type]; + return (key.compare(0, prefix.size(), prefix) == 0); +} + + static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) { @@ -740,7 +758,7 @@ static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs, int ret = gc_omap_get(hctx, GC_OBJ_NAME_INDEX, info.tag, &old_info); if (ret == 0) { string key; - get_time_key(old_info.time, key); + get_time_key(old_info.time, &key); ret = gc_omap_remove(hctx, GC_OBJ_TIME_INDEX, key); if (ret < 0 && ret != -ENOENT) { CLS_LOG(0, "ERROR: failed to remove key=%s\n", key.c_str()); @@ -754,7 +772,7 @@ static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs, return ret; string key; - get_time_key(info.time, key); + get_time_key(info.time, &key); ret = gc_omap_set(hctx, GC_OBJ_TIME_INDEX, key, &info); if (ret < 0) goto done_err; @@ -835,28 +853,23 @@ static int gc_iterate_entries(cls_method_context_t hctx, const string& marker, if (truncated) *truncated = false; - string& index_prefix = gc_index_prefixes[GC_OBJ_TIME_INDEX]; - string start_key; if (key_iter.empty()) { - start_key = index_prefix; - start_key.append(marker); + prepend_index_prefix(marker, GC_OBJ_TIME_INDEX, &start_key); } else { start_key = key_iter; } utime_t now = ceph_clock_now(g_ceph_context); string now_str; - get_time_key(now, now_str); - end_key = gc_index_prefixes[GC_OBJ_TIME_INDEX]; - end_key.append(now_str); + get_time_key(now, &now_str); + prepend_index_prefix(now_str, GC_OBJ_TIME_INDEX, &end_key); CLS_LOG(0, "gc_iterate_entries end_key=%s\n", end_key.c_str()); string filter; do { - #define GC_NUM_KEYS 32 int ret = cls_cxx_map_get_vals(hctx, start_key, filter, GC_NUM_KEYS, &keys); if (ret < 0) @@ -871,28 +884,29 @@ static int gc_iterate_entries(cls_method_context_t hctx, const string& marker, const string& key = iter->first; cls_rgw_gc_obj_info e; - CLS_LOG(0, "gc_iterate_entries key=%s\n", key.c_str()); + CLS_LOG(10, "gc_iterate_entries key=%s\n", key.c_str()); if (key.compare(end_key) >= 0) return 0; - if (key.compare(0, index_prefix.size(), index_prefix) != 0) - return 0; + if (!key_in_index(key, GC_OBJ_TIME_INDEX)) + return 0; ret = gc_record_decode(iter->second, e); if (ret < 0) return ret; - ret = cb(hctx, key, e, param); - if (ret < 0) - return ret; - - i++; - if (max_entries && (i > max_entries)) { + if (max_entries && (i >= max_entries)) { *truncated = true; key_iter = key; return 0; } + + ret = cb(hctx, key, e, param); + if (ret < 0) + return ret; + i++; + } iter--; start_key = iter->first; @@ -956,7 +970,7 @@ static int gc_remove(cls_method_context_t hctx, list& tags) return ret; string time_key; - get_time_key(info.time, time_key); + get_time_key(info.time, &time_key); ret = gc_omap_remove(hctx, GC_OBJ_TIME_INDEX, time_key); if (ret < 0 && ret != -ENOENT) return ret; diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc new file mode 100644 index 0000000000000..1c40e02922e53 --- /dev/null +++ b/src/cls/rgw/cls_rgw_types.cc @@ -0,0 +1,185 @@ + +#include "cls/rgw/cls_rgw_types.h" +#include "common/Formatter.h" + + +void rgw_bucket_pending_info::generate_test_instances(list& o) +{ + rgw_bucket_pending_info *i = new rgw_bucket_pending_info; + i->state = CLS_RGW_STATE_COMPLETE; + i->op = CLS_RGW_OP_DEL; + o.push_back(i); + o.push_back(new rgw_bucket_pending_info); +} + +void rgw_bucket_pending_info::dump(Formatter *f) const +{ + f->dump_int("state", (int)state); + f->dump_stream("timestamp") << timestamp; + f->dump_int("op", (int)op); +} + +void rgw_bucket_dir_entry_meta::generate_test_instances(list& o) +{ + rgw_bucket_dir_entry_meta *m = new rgw_bucket_dir_entry_meta; + m->category = 1; + m->size = 100; + m->etag = "etag"; + m->owner = "owner"; + m->owner_display_name = "display name"; + m->tag = "tag"; + m->content_type = "content/type"; + o.push_back(m); + o.push_back(new rgw_bucket_dir_entry_meta); +} + +void rgw_bucket_dir_entry_meta::dump(Formatter *f) const +{ + f->dump_int("category", category); + f->dump_unsigned("size", size); + f->dump_stream("mtime") << mtime; + f->dump_string("etag", etag); + f->dump_string("owner", owner); + f->dump_string("owner_display_name", owner_display_name); + f->dump_string("tag", tag); + f->dump_string("content_type", content_type); +} + +void rgw_bucket_dir_entry::generate_test_instances(list& o) +{ + list l; + rgw_bucket_dir_entry_meta::generate_test_instances(l); + + list::iterator iter; + for (iter = l.begin(); iter != l.end(); ++iter) { + rgw_bucket_dir_entry_meta *m = *iter; + rgw_bucket_dir_entry *e = new rgw_bucket_dir_entry; + e->name = "name"; + e->epoch = 1234; + e->locator = "locator"; + e->exists = true; + e->meta = *m; + + o.push_back(e); + + delete m; + } + o.push_back(new rgw_bucket_dir_entry); +} + +void rgw_bucket_dir_entry::dump(Formatter *f) const +{ + f->dump_string("name", name); + f->dump_unsigned("epoch", epoch); + f->dump_string("locator", locator); + f->dump_int("exists", (int)exists); + f->open_object_section("meta"); + meta.dump(f); + f->close_section(); + + map::const_iterator iter = pending_map.begin(); + f->open_array_section("pending_map"); + for (; iter != pending_map.end(); ++iter) { + f->dump_string("tag", iter->first); + f->open_object_section("info"); + iter->second.dump(f); + f->close_section(); + } + f->close_section(); +} + +void rgw_bucket_category_stats::generate_test_instances(list& o) +{ + rgw_bucket_category_stats *s = new rgw_bucket_category_stats; + s->total_size = 1024; + s->total_size_rounded = 4096; + s->num_entries = 2; + o.push_back(s); + o.push_back(new rgw_bucket_category_stats); +} + +void rgw_bucket_category_stats::dump(Formatter *f) const +{ + f->dump_unsigned("total_size", total_size); + f->dump_unsigned("total_size_rounded", total_size_rounded); + f->dump_unsigned("num_entries", num_entries); +} + +void rgw_bucket_dir_header::generate_test_instances(list& o) +{ + list l; + list::iterator iter; + rgw_bucket_category_stats::generate_test_instances(l); + + uint8_t i; + for (i = 0, iter = l.begin(); iter != l.end(); ++iter, ++i) { + rgw_bucket_dir_header *h = new rgw_bucket_dir_header; + rgw_bucket_category_stats *s = *iter; + h->stats[i] = *s; + + o.push_back(h); + + delete s; + } + + o.push_back(new rgw_bucket_dir_header); +} + +void rgw_bucket_dir_header::dump(Formatter *f) const +{ + map::const_iterator iter = stats.begin(); + f->open_array_section("stats"); + for (; iter != stats.end(); ++iter) { + f->dump_int("category", (int)iter->first); + f->open_object_section("category_stats"); + iter->second.dump(f); + f->close_section(); + } + f->close_section(); +} + +void rgw_bucket_dir::generate_test_instances(list& o) +{ + list l; + list::iterator iter; + rgw_bucket_dir_header::generate_test_instances(l); + + uint8_t i; + for (i = 0, iter = l.begin(); iter != l.end(); ++iter, ++i) { + rgw_bucket_dir *d = new rgw_bucket_dir; + rgw_bucket_dir_header *h = *iter; + d->header = *h; + + list el; + list::iterator eiter; + for (eiter = el.begin(); eiter != el.end(); ++eiter) { + rgw_bucket_dir_entry *e = *eiter; + d->m[e->name] = *e; + + delete e; + } + + o.push_back(d); + + delete h; + } + + o.push_back(new rgw_bucket_dir); +} + +void rgw_bucket_dir::dump(Formatter *f) const +{ + f->open_object_section("header"); + header.dump(f); + f->close_section(); + map::const_iterator iter = m.begin(); + f->open_array_section("map"); + for (; iter != m.end(); ++iter) { + f->dump_string("obj", iter->first); + f->open_object_section("dir_entry"); + iter->second.dump(f); + f->close_section(); + } + f->close_section(); +} + diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index bfb74bba0b49d..7894b6429ba20 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -61,6 +61,8 @@ void _usage() cerr << " usage trim trim usage (by user, date range)\n"; cerr << " temp remove remove temporary objects that were created up to\n"; cerr << " specified date (and optional time)\n"; + cerr << " gc list dump expired garbage collection objects\n"; + cerr << " gc process manually process garbage\n"; cerr << "options:\n"; cerr << " --uid= user id\n"; cerr << " --auth-uid= librados uid\n"; @@ -143,6 +145,8 @@ enum { OPT_USAGE_TRIM, OPT_TEMP_REMOVE, OPT_OBJECT_RM, + OPT_GC_LIST, + OPT_GC_PROCESS, }; static uint32_t str_to_perm(const char *str) @@ -215,7 +219,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) strcmp(cmd, "log") == 0 || strcmp(cmd, "usage") == 0 || strcmp(cmd, "object") == 0 || - strcmp(cmd, "temp") == 0) { + strcmp(cmd, "temp") == 0 || + strcmp(cmd, "gc") == 0) { *need_more = true; return 0; } @@ -291,6 +296,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more) } else if (strcmp(prev_cmd, "object") == 0) { if (strcmp(cmd, "rm") == 0) return OPT_OBJECT_RM; + } else if (strcmp(prev_cmd, "gc") == 0) { + if (strcmp(cmd, "list") == 0) + return OPT_GC_LIST; + if (strcmp(cmd, "process") == 0) + return OPT_GC_PROCESS; } return -EINVAL; @@ -857,7 +867,7 @@ int main(int argc, char **argv) opt_cmd == OPT_KEY_CREATE || opt_cmd == OPT_KEY_RM || opt_cmd == OPT_USER_RM); RGWStoreManager store_manager; - store = store_manager.init(g_ceph_context); + store = store_manager.init(g_ceph_context, false); if (!store) { cerr << "couldn't init storage provider" << std::endl; return 5; //EIO @@ -1648,5 +1658,52 @@ next: } } + if (opt_cmd == OPT_GC_LIST) { + int ret; + int index = 0; + string marker; + bool truncated; + formatter->open_array_section("entries"); + + do { + list result; + ret = rgwstore->list_gc_objs(&index, marker, 1000, result, &truncated); + if (ret < 0) { + cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl; + return 1; + } + + + list::iterator iter; + for (iter = result.begin(); iter != result.end(); ++iter) { + cls_rgw_gc_obj_info& info = *iter; + formatter->open_object_section("chain_info"); + formatter->dump_string("tag", info.tag); + formatter->dump_stream("time") << info.time; + formatter->open_array_section("objs"); + list::iterator liter; + cls_rgw_obj_chain& chain = info.chain; + for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) { + cls_rgw_obj& obj = *liter; + formatter->dump_string("pool", obj.pool); + formatter->dump_string("oid", obj.oid); + formatter->dump_string("key", obj.key); + } + formatter->close_section(); // objs + formatter->close_section(); // obj_chain + formatter->flush(cout); + } + } while (truncated); + formatter->close_section(); + formatter->flush(cout); + } + + if (opt_cmd == OPT_GC_PROCESS) { + int ret = rgwstore->process_gc(); + if (ret < 0) { + cerr << "ERROR: gc processing returned error: " << cpp_strerror(-ret) << std::endl; + return 1; + } + } return 0; } diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index fa7630e17f8ef..80091892cbe3a 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -184,6 +184,7 @@ class RGWCache : public T void finalize() { T::finalize_watch(); + T::finalize(); } int distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op); int watch_cb(int opcode, uint64_t ver, bufferlist& bl); diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 22f69f45ac772..c46fc4d54a9df 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -39,6 +39,8 @@ using ceph::crypto::MD5; #define RGW_ROOT_BUCKET ".rgw" +#define RGW_GC_BUCKET ".rgw.gc" + #define RGW_CONTROL_BUCKET ".rgw.control" #define RGW_ATTR_PREFIX "user.rgw." diff --git a/src/rgw/rgw_gc.cc b/src/rgw/rgw_gc.cc new file mode 100644 index 0000000000000..b4f131f633623 --- /dev/null +++ b/src/rgw/rgw_gc.cc @@ -0,0 +1,303 @@ + + +#include "rgw_gc.h" +#include "include/rados/librados.hpp" +#include "cls/rgw/cls_rgw_client.h" +#include "cls/lock/cls_lock_client.h" +#include "auth/Crypto.h" + +#include + +#define dout_subsys ceph_subsys_rgw + +using namespace std; +using namespace librados; + +static string gc_oid_prefix = "gc"; +static string gc_index_lock_name = "gc_process"; + +void RGWGC::initialize(CephContext *_cct, RGWRados *_store) { + cct = _cct; + store = _store; + + max_objs = cct->_conf->rgw_gc_max_objs; + obj_names = new string[max_objs]; + + for (int i = 0; i < max_objs; i++) { + obj_names[i] = gc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", i); + obj_names[i].append(buf); + } +} + +void RGWGC::finalize() +{ + for (int i = 0; i < max_objs; i++) { + delete[] obj_names; + } +} + +int RGWGC::tag_index(const string& tag) +{ + return ceph_str_hash_linux(tag.c_str(), tag.size()) % max_objs; +} + +void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag) +{ + cls_rgw_gc_obj_info info; + info.chain = chain; + info.tag = tag; + + cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); +} + +int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync) +{ + ObjectWriteOperation op; + add_chain(op, chain, tag); + + int i = tag_index(tag); + + if (sync) + return store->gc_operate(obj_names[i], &op); + + return store->gc_aio_operate(obj_names[i], &op); +} + +int RGWGC::defer_chain(const string& tag, bool sync) +{ + ObjectWriteOperation op; + cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag); + + int i = tag_index(tag); + + if (sync) + return store->gc_operate(obj_names[i], &op); + + return store->gc_aio_operate(obj_names[i], &op); +} + +int RGWGC::remove(int index, const std::list& tags) +{ + ObjectWriteOperation op; + cls_rgw_gc_remove(op, tags); + return store->gc_operate(obj_names[index], &op); +} + +int RGWGC::list(int *index, string& marker, uint32_t max, std::list& result, bool *truncated) +{ + result.clear(); + + for (; *index < cct->_conf->rgw_gc_max_objs && result.size() < max; (*index)++, marker.clear()) { + std::list entries; + int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), entries, truncated); + if (ret == -ENOENT) + continue; + if (ret < 0) + return ret; + + std::list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + result.push_back(*iter); + } + + if (*index == cct->_conf->rgw_gc_max_objs - 1) { + /* we cut short here, truncated will hold the correct value */ + return 0; + } + + if (result.size() == max) { + /* close approximation, it might be that the next of the objects don't hold + * anything, in this case truncated should have been false, but we can find + * that out on the next iteration + */ + *truncated = true; + return 0; + } + + } + *truncated = false; + + return 0; +} + +int RGWGC::process(int index, int max_secs) +{ + rados::cls::lock::Lock l(gc_index_lock_name); + utime_t end = ceph_clock_now(g_ceph_context); + std::list remove_tags; + + /* max_secs should be greater than zero. We don't want a zero max_secs + * to be translated as no timeout, since we'd then need to break the + * lock and that would require a manual intervention. In this case + * we can just wait it out. */ + if (max_secs <= 0) + return -EAGAIN; + + end += max_secs; + utime_t time(max_secs, 0); + l.set_duration(time); + + int ret = l.lock_exclusive(store->gc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another gc processor */ + dout(0) << "RGWGC::process() failed to acquire lock on " << obj_names[index] << dendl; + return 0; + } + if (ret < 0) + return ret; + + string marker; + bool truncated; + IoCtx *ctx = NULL; + do { + int max = 100; + std::list entries; + ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, entries, &truncated); + if (ret == -ENOENT) { + ret = 0; + goto done; + } + if (ret < 0) + goto done; + + string last_pool; + ctx = new IoCtx; + std::list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + bool remove_tag; + cls_rgw_gc_obj_info& info = *iter; + std::list::iterator liter; + cls_rgw_obj_chain& chain = info.chain; + + utime_t now = ceph_clock_now(g_ceph_context); + if (now >= end) + goto done; + + remove_tag = true; + for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) { + cls_rgw_obj& obj = *liter; + + if (obj.pool != last_pool) { + if (ctx) { + delete ctx; + ctx = new IoCtx; + } + ret = rgwstore->rados->ioctx_create(obj.pool.c_str(), *ctx); + if (ret < 0) { + dout(0) << "ERROR: failed to create ioctx pool=" << obj.pool << dendl; + continue; + } + last_pool = obj.pool; + } + + ctx->locator_set_key(obj.key); + dout(0) << "gc::process: removing " << obj.pool << ":" << obj.oid << dendl; + ret = ctx->remove(obj.oid); + if (ret == -ENOENT) + ret = 0; + if (ret < 0) { + remove_tag = false; + dout(0) << "failed to remove " << obj.pool << ":" << obj.oid << "@" << obj.key << dendl; + } + + if (going_down()) // leave early, even if tag isn't removed, it's ok + goto done; + } + if (remove_tag) { + remove_tags.push_back(info.tag); +#define MAX_REMOVE_CHUNK 16 + if (remove_tags.size() > MAX_REMOVE_CHUNK) { + remove(index, remove_tags); + remove_tags.clear(); + } + } + } + } while (truncated); + +done: + if (remove_tags.size()) + remove(index, remove_tags); + l.unlock(store->gc_pool_ctx, obj_names[index]); + delete ctx; + return 0; +} + +int RGWGC::process() +{ + int max_objs = cct->_conf->rgw_gc_max_objs; + int max_secs = cct->_conf->rgw_gc_processor_max_time; + + unsigned start; + int ret = get_random_bytes((char *)&start, sizeof(start)); + if (ret < 0) + return ret; + + for (int i = 0; i < max_objs; i++) { + int index = (i + start) % max_objs; + ret = process(index, max_secs); + if (ret < 0) + return ret; + } + + return 0; +} + +bool RGWGC::going_down() +{ + return (down_flag.read() != 0); +} + +void RGWGC::start_processor() +{ + worker = new GCWorker(cct, this); + worker->create(); +} + +void RGWGC::stop_processor() +{ + down_flag.set(1); + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + +void *RGWGC::GCWorker::entry() { + do { + utime_t start = ceph_clock_now(cct); + dout(2) << "garbage collection: start" << dendl; + int r = gc->process(); + if (r < 0) { + dout(0) << "ERROR: garbage collection process() returned error r=" << r << dendl; + } + dout(2) << "garbage collection: stop" << dendl; + + if (gc->going_down()) + break; + + utime_t end = ceph_clock_now(cct); + end -= start; + int secs = cct->_conf->rgw_gc_processor_period; + + if (secs <= end.sec()) + continue; // next round + + secs -= end.sec(); + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(secs, 0)); + lock.Unlock(); + } while (!gc->going_down()); + + return NULL; +} + +void RGWGC::GCWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + diff --git a/src/rgw/rgw_gc.h b/src/rgw/rgw_gc.h new file mode 100644 index 0000000000000..db1753a100fe9 --- /dev/null +++ b/src/rgw/rgw_gc.h @@ -0,0 +1,59 @@ +#ifndef CEPH_RGW_GC_H +#define CEPH_RGW_GC_H + + +#include "include/types.h" +#include "include/atomic.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_common.h" +#include "rgw_rados.h" +#include "cls/rgw/cls_rgw_types.h" + +class RGWGC { + CephContext *cct; + RGWRados *store; + int max_objs; + string *obj_names; + atomic_t down_flag; + + int tag_index(const string& tag); + + class GCWorker : public Thread { + CephContext *cct; + RGWGC *gc; + Mutex lock; + Cond cond; + + public: + GCWorker(CephContext *_cct, RGWGC *_gc) : cct(_cct), gc(_gc), lock("GCWorker") {} + void *entry(); + void stop(); + }; + + GCWorker *worker; +public: + RGWGC() : cct(NULL), store(NULL), max_objs(0), obj_names(NULL), worker(NULL) {} + + void add_chain(librados::ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag); + int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync); + int defer_chain(const string& tag, bool sync); + int remove(int index, const std::list& tags); + + void initialize(CephContext *_cct, RGWRados *_store); + void finalize(); + + int list(int *index, string& marker, uint32_t max, std::list& result, bool *truncated); + void list_init(int *index) { *index = 0; } + int process(int index, int process_max_secs); + int process(); + + bool going_down(); + void start_processor(); + void stop_processor(); +}; + + +#endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 44a6f6c26a564..09ca073462093 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -420,7 +420,7 @@ int main(int argc, const char **argv) RGWStoreManager store_manager; int r = 0; - if (!store_manager.init(g_ceph_context)) { + if (!store_manager.init(g_ceph_context, true)) { derr << "Couldn't init storage provider (RADOS)" << dendl; r = EIO; } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 35b5f5d79f695..ea17a1c5ca0f7 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -319,6 +319,9 @@ void RGWGetObj::execute() void *handle = NULL; utime_t start_time = s->time; bufferlist bl; + utime_t gc_invalidate_time = ceph_clock_now(s->cct); + gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); + perfcounter->inc(l_rgw_get); @@ -356,6 +359,17 @@ void RGWGetObj::execute() send_response(bl); bl.clear(); start_time = ceph_clock_now(s->cct); + + if (ofs <= end) { + if (start_time > gc_invalidate_time) { + int r = rgwstore->defer_gc(s->obj_ctx, obj); + if (r < 0) { + dout(0) << "WARNING: could not defer gc entry for obj" << dendl; + } + gc_invalidate_time = start_time; + gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); + } + } } return; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index c5578b272d76e..9b105b87fbd38 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -27,12 +27,12 @@ using namespace librados; #include "rgw_log.h" +#include "rgw_gc.h" + #define dout_subsys ceph_subsys_rgw using namespace std; -Rados *rados = NULL; - static RGWCache cached_rados_provider; static RGWRados rados_provider; @@ -68,7 +68,7 @@ public: } }; -RGWRados *RGWRados::init_storage_provider(CephContext *cct) +RGWRados *RGWRados::init_storage_provider(CephContext *cct, bool use_gc_thread) { int use_cache = cct->_conf->rgw_cache_enabled; store = NULL; @@ -78,7 +78,7 @@ RGWRados *RGWRados::init_storage_provider(CephContext *cct) store = &cached_rados_provider; } - if (store->initialize(cct) < 0) + if (store->initialize(cct, use_gc_thread) < 0) store = NULL; return store; @@ -93,6 +93,13 @@ void RGWRados::close_storage() store = NULL; } + +void RGWRados::finalize() +{ + if (use_gc_thread) + gc->stop_processor(); +} + /** * Initialize the RADOS instance and prepare to do other ops * Returns 0 on success, -ERR# on failure. @@ -117,6 +124,16 @@ int RGWRados::initialize() if (ret < 0) return ret; + ret = open_gc_pool_ctx(); + if (ret < 0) + return ret; + + gc = new RGWGC(); + gc->initialize(cct, this); + + if (use_gc_thread) + gc->start_processor(); + return ret; } @@ -158,6 +175,22 @@ int RGWRados::open_root_pool_ctx() return r; } +int RGWRados::open_gc_pool_ctx() +{ + int r = rados->ioctx_create(RGW_GC_BUCKET, gc_pool_ctx); + if (r == -ENOENT) { + r = rados->pool_create(RGW_GC_BUCKET); + if (r == -EEXIST) + r = 0; + if (r < 0) + return r; + + r = rados->ioctx_create(RGW_GC_BUCKET, gc_pool_ctx); + } + + return r; +} + int RGWRados::init_watch() { int r = rados->ioctx_create(RGW_CONTROL_BUCKET, control_pool_ctx); @@ -915,17 +948,16 @@ int RGWRados::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, RGWObjState *state = NULL; if (!exclusive) { - r = prepare_atomic_for_write(rctx, obj, io_ctx, oid, op, &state); + r = prepare_atomic_for_write(rctx, obj, op, &state, true); if (r < 0) return r; + } else { + op.create(true); // exclusive create } - op.create(exclusive); - if (data) { /* if we want to overwrite the data, we also want to overwrite the xattrs, so just remove the object */ - op.remove(); op.write_full(*data); } @@ -1298,19 +1330,57 @@ int RGWRados::complete_atomic_overwrite(RGWRadosCtx *rctx, RGWObjState *state, r if (!state || !state->has_manifest) return 0; + cls_rgw_obj_chain chain; map::iterator iter; for (iter = state->manifest.objs.begin(); iter != state->manifest.objs.end(); ++iter) { rgw_obj& mobj = iter->second.loc; if (mobj == obj) continue; - int ret = rctx->notify_intent(mobj, DEL_OBJ); - if (ret < 0) { - ldout(cct, 0) << "WARNING: failed to log intent ret=" << ret << dendl; - } + string oid, key; + rgw_bucket bucket; + get_obj_bucket_and_oid_key(mobj, bucket, oid, key); + chain.push_obj(bucket.pool, oid, key); } - return 0; + + string tag = state->obj_tag.c_str(); + int ret = gc->send_chain(chain, tag, false); // do it async + + return ret; } +int RGWRados::defer_gc(void *ctx, rgw_obj& obj) +{ + RGWRadosCtx *rctx = (RGWRadosCtx *)ctx; + rgw_bucket bucket; + std::string oid, key; + get_obj_bucket_and_oid_key(obj, bucket, oid, key); + if (!rctx) + return 0; + + RGWObjState *state = NULL; + + int r = get_obj_state(rctx, obj, &state); + if (r < 0) + return r; + + if (!state->is_atomic) { + ldout(cct, 20) << "state for obj=" << obj << " is not atomic, not deferring gc operation" << dendl; + return -EINVAL; + } + + if (state->obj_tag.length() == 0) {// check for backward compatibility + ldout(cct, 20) << "state->obj_tag is empty, not deferring gc operation" << dendl; + return -EINVAL; + } + + string tag = state->obj_tag.c_str(); + + ldout(cct, 0) << "defer chain tag=" << tag << dendl; + + return gc->defer_chain(tag, false); +} + + /** * Delete an object. * bucket: name of the bucket storing the object @@ -1333,7 +1403,7 @@ int RGWRados::delete_obj_impl(void *ctx, rgw_obj& obj, bool sync) ObjectWriteOperation op; RGWObjState *state; - r = prepare_atomic_for_write(rctx, obj, io_ctx, oid, op, &state); + r = prepare_atomic_for_write(rctx, obj, op, &state, false); if (r < 0) return r; @@ -1392,7 +1462,39 @@ int RGWRados::delete_obj(void *ctx, rgw_obj& obj, bool sync) return r; } -int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state) +static void generate_fake_tag(CephContext *cct, map& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl) +{ + string tag; + + map::iterator mi = manifest.objs.begin(); + if (mi != manifest.objs.end()) { + if (manifest.objs.size() > 1) // first object usually points at the head, let's skip to a more unique part + ++mi; + tag = mi->second.loc.object; + tag.append("_"); + } + + unsigned char md5[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char md5_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + MD5 hash; + hash.Update((const byte *)manifest_bl.c_str(), manifest_bl.length()); + + map::iterator iter = attrset.find(RGW_ATTR_ETAG); + if (iter != attrset.end()) { + bufferlist& bl = iter->second; + hash.Update((const byte *)bl.c_str(), bl.length()); + } + + hash.Final(md5); + buf_to_hex(md5, CEPH_CRYPTO_MD5_DIGESTSIZE, md5_str); + tag.append(md5_str); + + ldout(cct, 10) << "generate_fake_tag new tag=" << tag << dendl; + + tag_bl.append(tag.c_str(), tag.size() + 1); +} + +int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state) { RGWObjState *s = rctx->get_state(obj); ldout(cct, 20) << "get_obj_state: rctx=" << (void *)rctx << " obj=" << obj << " state=" << (void *)s << " s->prefetch_data=" << s->prefetch_data << dendl; @@ -1436,6 +1538,15 @@ int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io for (mi = s->manifest.objs.begin(); mi != s->manifest.objs.end(); ++mi) { ldout(cct, 10) << "manifest: ofs=" << mi->first << " loc=" << mi->second.loc << dendl; } + + if (!s->obj_tag.length()) { + /* + * Uh oh, something's wrong, object with manifest should have tag. Let's + * create one out of the manifest, would be unique + */ + generate_fake_tag(cct, s->attrset, s->manifest, manifest_bl, s->obj_tag); + s->fake_tag = true; + } } if (s->obj_tag.length()) ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << s->obj_tag.c_str() << dendl; @@ -1475,7 +1586,7 @@ int RGWRados::get_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& de if (rctx) { RGWObjState *state; - r = get_obj_state(rctx, obj, io_ctx, actual_obj, &state); + r = get_obj_state(rctx, obj, &state); if (r < 0) return r; if (!state->exists) @@ -1492,13 +1603,13 @@ int RGWRados::get_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& de return 0; } -int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, - string& actual_obj, ObjectOperation& op, RGWObjState **pstate) +int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, + ObjectOperation& op, RGWObjState **pstate) { if (!rctx) return 0; - int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate); + int r = get_obj_state(rctx, obj, pstate); if (r < 0) return r; @@ -1509,7 +1620,7 @@ int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCt return 0; } - if (state->obj_tag.length() > 0) {// check for backward compatibility + if (state->obj_tag.length() > 0 && !state->fake_tag) {// check for backward compatibility op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag); } else { ldout(cct, 20) << "state->obj_tag is empty, not appending atomic test" << dendl; @@ -1517,19 +1628,26 @@ int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCt return 0; } -int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, - string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate) +int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, + ObjectWriteOperation& op, RGWObjState **pstate, + bool reset_obj) { - int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate); + int r = get_obj_state(rctx, obj, pstate); if (r < 0) return r; RGWObjState *state = *pstate; - bool need_guard = state->has_manifest || (state->obj_tag.length() != 0); + bool need_guard = (state->has_manifest || (state->obj_tag.length() != 0)) && (!state->fake_tag); if (!state->is_atomic) { ldout(cct, 20) << "prepare_atomic_for_write_impl: state is not atomic. state=" << (void *)state << dendl; + + if (reset_obj) { + op.create(false); + op.remove(); + } + return 0; } @@ -1579,10 +1697,15 @@ int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, lib // FIXME: need to add FAIL_NOTEXIST_OK for racing deletion } + if (reset_obj) { + op.create(false); + op.remove(); + } + string tag; append_rand_alpha(cct, tag, tag, 32); bufferlist bl; - bl.append(tag); + bl.append(tag.c_str(), tag.size() + 1); op.setxattr(RGW_ATTR_ID_TAG, bl); @@ -1597,8 +1720,9 @@ int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, lib return 0; } -int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, - string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate) +int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, + ObjectWriteOperation& op, RGWObjState **pstate, + bool reset_obj) { if (!rctx) { *pstate = NULL; @@ -1606,7 +1730,7 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados } int r; - r = prepare_atomic_for_write_impl(rctx, obj, io_ctx, actual_obj, op, pstate); + r = prepare_atomic_for_write_impl(rctx, obj, op, pstate, reset_obj); return r; } @@ -1638,16 +1762,16 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl if (r < 0) return r; - io_ctx.locator_set_key(key); - ObjectWriteOperation op; RGWObjState *state = NULL; - r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state); + r = append_atomic_test(rctx, obj, op, &state); if (r < 0) return r; op.setxattr(name, bl); + + io_ctx.locator_set_key(key); r = io_ctx.operate(actual_obj, &op); if (state && r >= 0) @@ -1693,7 +1817,7 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj, ObjectWriteOperation op; RGWObjState *state = NULL; - r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state); + r = append_atomic_test(rctx, obj, op, &state); if (r < 0) return r; @@ -1803,7 +1927,7 @@ int RGWRados::prepare_get_obj(void *ctx, rgw_obj& obj, rctx = new_ctx; } - r = get_obj_state(rctx, obj, state->io_ctx, oid, &astate); + r = get_obj_state(rctx, obj, &astate); if (r < 0) goto done_err; @@ -2013,7 +2137,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, } } RGWObjState *state; - r = prepare_atomic_for_write(rctx, dst_obj, io_ctx, dst_oid, op, &state); + r = prepare_atomic_for_write(rctx, dst_obj, op, &state, true); if (r < 0) return r; @@ -2131,7 +2255,7 @@ int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj, rctx = new_ctx; } - int r = get_obj_state(rctx, obj, state->io_ctx, oid, &astate); + int r = get_obj_state(rctx, obj, &astate); if (r < 0) goto done_ret; @@ -2169,7 +2293,7 @@ int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj, if (reading_from_head) { /* only when reading from the head object do we need to do the atomic test */ - r = append_atomic_test(rctx, read_obj, state->io_ctx, oid, op, &astate); + r = append_atomic_test(rctx, read_obj, op, &astate); if (r < 0) goto done_ret; } @@ -2235,7 +2359,7 @@ int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& ObjectReadOperation op; - r = append_atomic_test(rctx, obj, io_ctx, oid, op, &astate); + r = append_atomic_test(rctx, obj, op, &astate); if (r < 0) return r; @@ -2535,6 +2659,34 @@ int RGWRados::pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector& return objs.size(); } +int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op) +{ + return gc_pool_ctx.operate(oid, op); +} + +int RGWRados::gc_aio_operate(string& oid, librados::ObjectWriteOperation *op) +{ + AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + int r = gc_pool_ctx.aio_operate(oid, c, op); + c->release(); + return r; +} + +int RGWRados::gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl) +{ + return gc_pool_ctx.operate(oid, op, pbl); +} + +int RGWRados::list_gc_objs(int *index, string& marker, uint32_t max, std::list& result, bool *truncated) +{ + return gc->list(index, marker, max, result, truncated); +} + +int RGWRados::process_gc() +{ + return gc->process(); +} + int RGWRados::cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid) { bufferlist in; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index a63a531f435ee..50f07b893b46b 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -10,6 +10,7 @@ class RGWWatcher; class SafeTimer; class ACLOwner; +class RGWGC; static inline void prepend_bucket_marker(rgw_bucket& bucket, string& orig_oid, string& oid) { @@ -119,6 +120,7 @@ struct RGWObjState { uint64_t size; time_t mtime; bufferlist obj_tag; + bool fake_tag; RGWObjManifest manifest; bool has_manifest; string shadow_obj; @@ -127,7 +129,7 @@ struct RGWObjState { bool prefetch_data; map attrset; - RGWObjState() : is_atomic(false), has_attrs(0), exists(false), has_manifest(false), prefetch_data(false) {} + RGWObjState() : is_atomic(false), has_attrs(0), exists(false), fake_tag(false), has_manifest(false), prefetch_data(false) {} bool get_attr(string name, bufferlist& dest) { map::iterator iter = attrset.find(name); @@ -141,6 +143,7 @@ struct RGWObjState { void clear() { has_attrs = false; exists = false; + fake_tag = false; size = 0; mtime = 0; obj_tag.clear(); @@ -197,8 +200,11 @@ struct RGWPoolIterCtx { class RGWRados { + friend class RGWGC; + /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); + int open_gc_pool_ctx(); int open_bucket_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); @@ -223,6 +229,9 @@ class RGWRados } }; + RGWGC *gc; + bool use_gc_thread; + int num_watchers; RGWWatcher **watchers; @@ -233,13 +242,15 @@ class RGWRados Mutex bucket_id_lock; uint64_t max_bucket_id; - int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state); - int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, - string& actual_obj, librados::ObjectOperation& op, RGWObjState **state); - int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, - string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate); - int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, - string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate); + int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state); + int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, + librados::ObjectOperation& op, RGWObjState **state); + int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, + librados::ObjectWriteOperation& op, RGWObjState **pstate, + bool reset_obj); + int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, + librados::ObjectWriteOperation& op, RGWObjState **pstate, + bool reset_obj); void atomic_write_finish(RGWObjState *state, int r) { if (state && r == -ECANCELED) { @@ -279,25 +290,29 @@ class RGWRados protected: CephContext *cct; + librados::Rados *rados; + librados::IoCtx gc_pool_ctx; // .rgw.gc public: - RGWRados() : lock("rados_timer_lock"), timer(NULL), num_watchers(0), watchers(NULL), watch_handles(NULL), - bucket_id_lock("rados_bucket_id"), max_bucket_id(0) {} + RGWRados() : lock("rados_timer_lock"), timer(NULL), gc(NULL), use_gc_thread(false), + num_watchers(0), watchers(NULL), watch_handles(0), + bucket_id_lock("rados_bucket_id"), max_bucket_id(0), rados(NULL) {} virtual ~RGWRados() {} void tick(); CephContext *ctx() { return cct; } /** do all necessary setup of the storage device */ - int initialize(CephContext *_cct) { + int initialize(CephContext *_cct, bool _use_gc_thread) { cct = _cct; + use_gc_thread = _use_gc_thread; return initialize(); } /** Initialize the RADOS instance and prepare to do other ops */ virtual int initialize(); - virtual void finalize() {} + virtual void finalize(); - static RGWRados *init_storage_provider(CephContext *cct); + static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread); static void close_storage(); static RGWRados *store; @@ -585,6 +600,13 @@ public: /// clean up/process any temporary objects older than given date[/time] int remove_temp_objects(string date, string time); + int gc_operate(string& oid, librados::ObjectWriteOperation *op); + int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op); + int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl); + + int list_gc_objs(int *index, string& marker, uint32_t max, std::list& result, bool *truncated); + int process_gc(); + int defer_gc(void *ctx, rgw_obj& obj); private: int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); @@ -633,18 +655,20 @@ public: uint64_t instance_id(); uint64_t next_bucket_id(); + }; class RGWStoreManager { RGWRados *store; public: - RGWStoreManager() : store(NULL) {} + RGWStoreManager(): store(NULL) {} ~RGWStoreManager() { - if (store) + if (store) { RGWRados::close_storage(); + } } - RGWRados *init(CephContext *cct) { - store = RGWRados::init_storage_provider(cct); + RGWRados *init(CephContext *cct, bool use_gc_thread) { + store = RGWRados::init_storage_provider(cct, use_gc_thread); return store; } }; -- 2.39.5