rgw/rgw_formats.cc \
rgw/rgw_log.cc \
rgw/rgw_multi.cc \
+ rgw/rgw_gc.cc \
rgw/rgw_env.cc
librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS}
librgw_a_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
noinst_LIBRARIES += librgw.a
my_radosgw_ldadd = \
- libglobal.la librgw.a librados.la libcls_rgw_client.a -lcurl -lexpat \
+ libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_lock_client.a -lcurl -lexpat \
$(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
radosgw_SOURCES = \
rgw/rgw_formats.h\
rgw/rgw_log.h\
rgw/rgw_multi.h\
+ rgw/rgw_gc.h\
rgw/rgw_op.h\
rgw/rgw_swift.h\
rgw/rgw_swift_auth.h\
return 0;
}
+/*
+ * We hold the garbage collection chain data under two different indexes: the first 'name' index
+ * keeps them under a unique tag that represents the chains, and a second 'time' index keeps
+ * them by their expiration timestamp
+ */
#define GC_OBJ_NAME_INDEX 0
#define GC_OBJ_TIME_INDEX 1
static string gc_index_prefixes[] = { "0_",
"1_" };
+static void prepend_index_prefix(const string& src, int index, string *dest)
+{
+ *dest = gc_index_prefixes[index];
+ dest->append(src);
+}
+
static int gc_omap_get(cls_method_context_t hctx, int type, const string& key, cls_rgw_gc_obj_info *info)
{
- string index = gc_index_prefixes[type];
- index.append(key);
+ string index;
+ prepend_index_prefix(key, type, &index);
bufferlist bl;
int ret = cls_cxx_map_get_val(hctx, index, &bl);
return 0;
}
-static int gc_omap_set(cls_method_context_t hctx, int type, const string& key, cls_rgw_gc_obj_info *info)
+static int gc_omap_set(cls_method_context_t hctx, int type, const string& key, const cls_rgw_gc_obj_info *info)
{
bufferlist bl;
::encode(*info, bl);
return 0;
}
-void get_time_key(utime_t& ut, string& key)
+static void get_time_key(utime_t& ut, string *key)
{
char buf[32];
- snprintf(buf, 32, "%011lld.%d", (long long)ut.sec(), ut.nsec());
- key = buf;
+ snprintf(buf, 32, "%011lld.%09d", (long long)ut.sec(), ut.nsec());
+ *key = buf;
}
+static bool key_in_index(const string& key, int index_type)
+{
+ const string& prefix = gc_index_prefixes[index_type];
+ return (key.compare(0, prefix.size(), prefix) == 0);
+}
+
+
static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs,
cls_rgw_gc_obj_info& info)
{
int ret = gc_omap_get(hctx, GC_OBJ_NAME_INDEX, info.tag, &old_info);
if (ret == 0) {
string key;
- get_time_key(old_info.time, key);
+ get_time_key(old_info.time, &key);
ret = gc_omap_remove(hctx, GC_OBJ_TIME_INDEX, key);
if (ret < 0 && ret != -ENOENT) {
CLS_LOG(0, "ERROR: failed to remove key=%s\n", key.c_str());
return ret;
string key;
- get_time_key(info.time, key);
+ get_time_key(info.time, &key);
ret = gc_omap_set(hctx, GC_OBJ_TIME_INDEX, key, &info);
if (ret < 0)
goto done_err;
if (truncated)
*truncated = false;
- string& index_prefix = gc_index_prefixes[GC_OBJ_TIME_INDEX];
-
string start_key;
if (key_iter.empty()) {
- start_key = index_prefix;
- start_key.append(marker);
+ prepend_index_prefix(marker, GC_OBJ_TIME_INDEX, &start_key);
} else {
start_key = key_iter;
}
utime_t now = ceph_clock_now(g_ceph_context);
string now_str;
- get_time_key(now, now_str);
- end_key = gc_index_prefixes[GC_OBJ_TIME_INDEX];
- end_key.append(now_str);
+ get_time_key(now, &now_str);
+ prepend_index_prefix(now_str, GC_OBJ_TIME_INDEX, &end_key);
CLS_LOG(0, "gc_iterate_entries end_key=%s\n", end_key.c_str());
string filter;
do {
-
#define GC_NUM_KEYS 32
int ret = cls_cxx_map_get_vals(hctx, start_key, filter, GC_NUM_KEYS, &keys);
if (ret < 0)
const string& key = iter->first;
cls_rgw_gc_obj_info e;
- CLS_LOG(0, "gc_iterate_entries key=%s\n", key.c_str());
+ CLS_LOG(10, "gc_iterate_entries key=%s\n", key.c_str());
if (key.compare(end_key) >= 0)
return 0;
- if (key.compare(0, index_prefix.size(), index_prefix) != 0)
- return 0;
+ if (!key_in_index(key, GC_OBJ_TIME_INDEX))
+ return 0;
ret = gc_record_decode(iter->second, e);
if (ret < 0)
return ret;
- ret = cb(hctx, key, e, param);
- if (ret < 0)
- return ret;
-
- i++;
- if (max_entries && (i > max_entries)) {
+ if (max_entries && (i >= max_entries)) {
*truncated = true;
key_iter = key;
return 0;
}
+
+ ret = cb(hctx, key, e, param);
+ if (ret < 0)
+ return ret;
+ i++;
+
}
iter--;
start_key = iter->first;
return ret;
string time_key;
- get_time_key(info.time, time_key);
+ get_time_key(info.time, &time_key);
ret = gc_omap_remove(hctx, GC_OBJ_TIME_INDEX, time_key);
if (ret < 0 && ret != -ENOENT)
return ret;
--- /dev/null
+
+#include "cls/rgw/cls_rgw_types.h"
+#include "common/Formatter.h"
+
+
+void rgw_bucket_pending_info::generate_test_instances(list<rgw_bucket_pending_info*>& o)
+{
+ rgw_bucket_pending_info *i = new rgw_bucket_pending_info;
+ i->state = CLS_RGW_STATE_COMPLETE;
+ i->op = CLS_RGW_OP_DEL;
+ o.push_back(i);
+ o.push_back(new rgw_bucket_pending_info);
+}
+
+void rgw_bucket_pending_info::dump(Formatter *f) const
+{
+ f->dump_int("state", (int)state);
+ f->dump_stream("timestamp") << timestamp;
+ f->dump_int("op", (int)op);
+}
+
+void rgw_bucket_dir_entry_meta::generate_test_instances(list<rgw_bucket_dir_entry_meta*>& o)
+{
+ rgw_bucket_dir_entry_meta *m = new rgw_bucket_dir_entry_meta;
+ m->category = 1;
+ m->size = 100;
+ m->etag = "etag";
+ m->owner = "owner";
+ m->owner_display_name = "display name";
+ m->tag = "tag";
+ m->content_type = "content/type";
+ o.push_back(m);
+ o.push_back(new rgw_bucket_dir_entry_meta);
+}
+
+void rgw_bucket_dir_entry_meta::dump(Formatter *f) const
+{
+ f->dump_int("category", category);
+ f->dump_unsigned("size", size);
+ f->dump_stream("mtime") << mtime;
+ f->dump_string("etag", etag);
+ f->dump_string("owner", owner);
+ f->dump_string("owner_display_name", owner_display_name);
+ f->dump_string("tag", tag);
+ f->dump_string("content_type", content_type);
+}
+
+void rgw_bucket_dir_entry::generate_test_instances(list<rgw_bucket_dir_entry*>& o)
+{
+ list<rgw_bucket_dir_entry_meta *> l;
+ rgw_bucket_dir_entry_meta::generate_test_instances(l);
+
+ list<rgw_bucket_dir_entry_meta *>::iterator iter;
+ for (iter = l.begin(); iter != l.end(); ++iter) {
+ rgw_bucket_dir_entry_meta *m = *iter;
+ rgw_bucket_dir_entry *e = new rgw_bucket_dir_entry;
+ e->name = "name";
+ e->epoch = 1234;
+ e->locator = "locator";
+ e->exists = true;
+ e->meta = *m;
+
+ o.push_back(e);
+
+ delete m;
+ }
+ o.push_back(new rgw_bucket_dir_entry);
+}
+
+void rgw_bucket_dir_entry::dump(Formatter *f) const
+{
+ f->dump_string("name", name);
+ f->dump_unsigned("epoch", epoch);
+ f->dump_string("locator", locator);
+ f->dump_int("exists", (int)exists);
+ f->open_object_section("meta");
+ meta.dump(f);
+ f->close_section();
+
+ map<string, struct rgw_bucket_pending_info>::const_iterator iter = pending_map.begin();
+ f->open_array_section("pending_map");
+ for (; iter != pending_map.end(); ++iter) {
+ f->dump_string("tag", iter->first);
+ f->open_object_section("info");
+ iter->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void rgw_bucket_category_stats::generate_test_instances(list<rgw_bucket_category_stats*>& o)
+{
+ rgw_bucket_category_stats *s = new rgw_bucket_category_stats;
+ s->total_size = 1024;
+ s->total_size_rounded = 4096;
+ s->num_entries = 2;
+ o.push_back(s);
+ o.push_back(new rgw_bucket_category_stats);
+}
+
+void rgw_bucket_category_stats::dump(Formatter *f) const
+{
+ f->dump_unsigned("total_size", total_size);
+ f->dump_unsigned("total_size_rounded", total_size_rounded);
+ f->dump_unsigned("num_entries", num_entries);
+}
+
+void rgw_bucket_dir_header::generate_test_instances(list<rgw_bucket_dir_header*>& o)
+{
+ list<rgw_bucket_category_stats *> l;
+ list<rgw_bucket_category_stats *>::iterator iter;
+ rgw_bucket_category_stats::generate_test_instances(l);
+
+ uint8_t i;
+ for (i = 0, iter = l.begin(); iter != l.end(); ++iter, ++i) {
+ rgw_bucket_dir_header *h = new rgw_bucket_dir_header;
+ rgw_bucket_category_stats *s = *iter;
+ h->stats[i] = *s;
+
+ o.push_back(h);
+
+ delete s;
+ }
+
+ o.push_back(new rgw_bucket_dir_header);
+}
+
+void rgw_bucket_dir_header::dump(Formatter *f) const
+{
+ map<uint8_t, struct rgw_bucket_category_stats>::const_iterator iter = stats.begin();
+ f->open_array_section("stats");
+ for (; iter != stats.end(); ++iter) {
+ f->dump_int("category", (int)iter->first);
+ f->open_object_section("category_stats");
+ iter->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void rgw_bucket_dir::generate_test_instances(list<rgw_bucket_dir*>& o)
+{
+ list<rgw_bucket_dir_header *> l;
+ list<rgw_bucket_dir_header *>::iterator iter;
+ rgw_bucket_dir_header::generate_test_instances(l);
+
+ uint8_t i;
+ for (i = 0, iter = l.begin(); iter != l.end(); ++iter, ++i) {
+ rgw_bucket_dir *d = new rgw_bucket_dir;
+ rgw_bucket_dir_header *h = *iter;
+ d->header = *h;
+
+ list<rgw_bucket_dir_entry *> el;
+ list<rgw_bucket_dir_entry *>::iterator eiter;
+ for (eiter = el.begin(); eiter != el.end(); ++eiter) {
+ rgw_bucket_dir_entry *e = *eiter;
+ d->m[e->name] = *e;
+
+ delete e;
+ }
+
+ o.push_back(d);
+
+ delete h;
+ }
+
+ o.push_back(new rgw_bucket_dir);
+}
+
+void rgw_bucket_dir::dump(Formatter *f) const
+{
+ f->open_object_section("header");
+ header.dump(f);
+ f->close_section();
+ map<string, struct rgw_bucket_dir_entry>::const_iterator iter = m.begin();
+ f->open_array_section("map");
+ for (; iter != m.end(); ++iter) {
+ f->dump_string("obj", iter->first);
+ f->open_object_section("dir_entry");
+ iter->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
cerr << " usage trim trim usage (by user, date range)\n";
cerr << " temp remove remove temporary objects that were created up to\n";
cerr << " specified date (and optional time)\n";
+ cerr << " gc list dump expired garbage collection objects\n";
+ cerr << " gc process manually process garbage\n";
cerr << "options:\n";
cerr << " --uid=<id> user id\n";
cerr << " --auth-uid=<auid> librados uid\n";
OPT_USAGE_TRIM,
OPT_TEMP_REMOVE,
OPT_OBJECT_RM,
+ OPT_GC_LIST,
+ OPT_GC_PROCESS,
};
static uint32_t str_to_perm(const char *str)
strcmp(cmd, "log") == 0 ||
strcmp(cmd, "usage") == 0 ||
strcmp(cmd, "object") == 0 ||
- strcmp(cmd, "temp") == 0) {
+ strcmp(cmd, "temp") == 0 ||
+ strcmp(cmd, "gc") == 0) {
*need_more = true;
return 0;
}
} else if (strcmp(prev_cmd, "object") == 0) {
if (strcmp(cmd, "rm") == 0)
return OPT_OBJECT_RM;
+ } else if (strcmp(prev_cmd, "gc") == 0) {
+ if (strcmp(cmd, "list") == 0)
+ return OPT_GC_LIST;
+ if (strcmp(cmd, "process") == 0)
+ return OPT_GC_PROCESS;
}
return -EINVAL;
opt_cmd == OPT_KEY_CREATE || opt_cmd == OPT_KEY_RM || opt_cmd == OPT_USER_RM);
RGWStoreManager store_manager;
- store = store_manager.init(g_ceph_context);
+ store = store_manager.init(g_ceph_context, false);
if (!store) {
cerr << "couldn't init storage provider" << std::endl;
return 5; //EIO
}
}
+ if (opt_cmd == OPT_GC_LIST) {
+ int ret;
+ int index = 0;
+ string marker;
+ bool truncated;
+ formatter->open_array_section("entries");
+
+ do {
+ list<cls_rgw_gc_obj_info> result;
+ ret = rgwstore->list_gc_objs(&index, marker, 1000, result, &truncated);
+ if (ret < 0) {
+ cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl;
+ return 1;
+ }
+
+
+ list<cls_rgw_gc_obj_info>::iterator iter;
+ for (iter = result.begin(); iter != result.end(); ++iter) {
+ cls_rgw_gc_obj_info& info = *iter;
+ formatter->open_object_section("chain_info");
+ formatter->dump_string("tag", info.tag);
+ formatter->dump_stream("time") << info.time;
+ formatter->open_array_section("objs");
+ list<cls_rgw_obj>::iterator liter;
+ cls_rgw_obj_chain& chain = info.chain;
+ for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
+ cls_rgw_obj& obj = *liter;
+ formatter->dump_string("pool", obj.pool);
+ formatter->dump_string("oid", obj.oid);
+ formatter->dump_string("key", obj.key);
+ }
+ formatter->close_section(); // objs
+ formatter->close_section(); // obj_chain
+ formatter->flush(cout);
+ }
+ } while (truncated);
+ formatter->close_section();
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT_GC_PROCESS) {
+ int ret = rgwstore->process_gc();
+ if (ret < 0) {
+ cerr << "ERROR: gc processing returned error: " << cpp_strerror(-ret) << std::endl;
+ return 1;
+ }
+ }
return 0;
}
void finalize() {
T::finalize_watch();
+ T::finalize();
}
int distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
int watch_cb(int opcode, uint64_t ver, bufferlist& bl);
#define RGW_ROOT_BUCKET ".rgw"
+#define RGW_GC_BUCKET ".rgw.gc"
+
#define RGW_CONTROL_BUCKET ".rgw.control"
#define RGW_ATTR_PREFIX "user.rgw."
--- /dev/null
+
+
+#include "rgw_gc.h"
+#include "include/rados/librados.hpp"
+#include "cls/rgw/cls_rgw_client.h"
+#include "cls/lock/cls_lock_client.h"
+#include "auth/Crypto.h"
+
+#include <list>
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+using namespace librados;
+
+static string gc_oid_prefix = "gc";
+static string gc_index_lock_name = "gc_process";
+
+void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
+ cct = _cct;
+ store = _store;
+
+ max_objs = cct->_conf->rgw_gc_max_objs;
+ obj_names = new string[max_objs];
+
+ for (int i = 0; i < max_objs; i++) {
+ obj_names[i] = gc_oid_prefix;
+ char buf[32];
+ snprintf(buf, 32, ".%d", i);
+ obj_names[i].append(buf);
+ }
+}
+
+void RGWGC::finalize()
+{
+ for (int i = 0; i < max_objs; i++) {
+ delete[] obj_names;
+ }
+}
+
+int RGWGC::tag_index(const string& tag)
+{
+ return ceph_str_hash_linux(tag.c_str(), tag.size()) % max_objs;
+}
+
+void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag)
+{
+ cls_rgw_gc_obj_info info;
+ info.chain = chain;
+ info.tag = tag;
+
+ cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
+}
+
+int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+{
+ ObjectWriteOperation op;
+ add_chain(op, chain, tag);
+
+ int i = tag_index(tag);
+
+ if (sync)
+ return store->gc_operate(obj_names[i], &op);
+
+ return store->gc_aio_operate(obj_names[i], &op);
+}
+
+int RGWGC::defer_chain(const string& tag, bool sync)
+{
+ ObjectWriteOperation op;
+ cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
+
+ int i = tag_index(tag);
+
+ if (sync)
+ return store->gc_operate(obj_names[i], &op);
+
+ return store->gc_aio_operate(obj_names[i], &op);
+}
+
+int RGWGC::remove(int index, const std::list<string>& tags)
+{
+ ObjectWriteOperation op;
+ cls_rgw_gc_remove(op, tags);
+ return store->gc_operate(obj_names[index], &op);
+}
+
+int RGWGC::list(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
+{
+ result.clear();
+
+ for (; *index < cct->_conf->rgw_gc_max_objs && result.size() < max; (*index)++, marker.clear()) {
+ std::list<cls_rgw_gc_obj_info> entries;
+ int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), entries, truncated);
+ if (ret == -ENOENT)
+ continue;
+ if (ret < 0)
+ return ret;
+
+ std::list<cls_rgw_gc_obj_info>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ result.push_back(*iter);
+ }
+
+ if (*index == cct->_conf->rgw_gc_max_objs - 1) {
+ /* we cut short here, truncated will hold the correct value */
+ return 0;
+ }
+
+ if (result.size() == max) {
+ /* close approximation, it might be that the next of the objects don't hold
+ * anything, in this case truncated should have been false, but we can find
+ * that out on the next iteration
+ */
+ *truncated = true;
+ return 0;
+ }
+
+ }
+ *truncated = false;
+
+ return 0;
+}
+
+int RGWGC::process(int index, int max_secs)
+{
+ rados::cls::lock::Lock l(gc_index_lock_name);
+ utime_t end = ceph_clock_now(g_ceph_context);
+ std::list<string> remove_tags;
+
+ /* max_secs should be greater than zero. We don't want a zero max_secs
+ * to be translated as no timeout, since we'd then need to break the
+ * lock and that would require a manual intervention. In this case
+ * we can just wait it out. */
+ if (max_secs <= 0)
+ return -EAGAIN;
+
+ end += max_secs;
+ utime_t time(max_secs, 0);
+ l.set_duration(time);
+
+ int ret = l.lock_exclusive(store->gc_pool_ctx, obj_names[index]);
+ if (ret == -EBUSY) { /* already locked by another gc processor */
+ dout(0) << "RGWGC::process() failed to acquire lock on " << obj_names[index] << dendl;
+ return 0;
+ }
+ if (ret < 0)
+ return ret;
+
+ string marker;
+ bool truncated;
+ IoCtx *ctx = NULL;
+ do {
+ int max = 100;
+ std::list<cls_rgw_gc_obj_info> entries;
+ ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, entries, &truncated);
+ if (ret == -ENOENT) {
+ ret = 0;
+ goto done;
+ }
+ if (ret < 0)
+ goto done;
+
+ string last_pool;
+ ctx = new IoCtx;
+ std::list<cls_rgw_gc_obj_info>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ bool remove_tag;
+ cls_rgw_gc_obj_info& info = *iter;
+ std::list<cls_rgw_obj>::iterator liter;
+ cls_rgw_obj_chain& chain = info.chain;
+
+ utime_t now = ceph_clock_now(g_ceph_context);
+ if (now >= end)
+ goto done;
+
+ remove_tag = true;
+ for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
+ cls_rgw_obj& obj = *liter;
+
+ if (obj.pool != last_pool) {
+ if (ctx) {
+ delete ctx;
+ ctx = new IoCtx;
+ }
+ ret = rgwstore->rados->ioctx_create(obj.pool.c_str(), *ctx);
+ if (ret < 0) {
+ dout(0) << "ERROR: failed to create ioctx pool=" << obj.pool << dendl;
+ continue;
+ }
+ last_pool = obj.pool;
+ }
+
+ ctx->locator_set_key(obj.key);
+ dout(0) << "gc::process: removing " << obj.pool << ":" << obj.oid << dendl;
+ ret = ctx->remove(obj.oid);
+ if (ret == -ENOENT)
+ ret = 0;
+ if (ret < 0) {
+ remove_tag = false;
+ dout(0) << "failed to remove " << obj.pool << ":" << obj.oid << "@" << obj.key << dendl;
+ }
+
+ if (going_down()) // leave early, even if tag isn't removed, it's ok
+ goto done;
+ }
+ if (remove_tag) {
+ remove_tags.push_back(info.tag);
+#define MAX_REMOVE_CHUNK 16
+ if (remove_tags.size() > MAX_REMOVE_CHUNK) {
+ remove(index, remove_tags);
+ remove_tags.clear();
+ }
+ }
+ }
+ } while (truncated);
+
+done:
+ if (remove_tags.size())
+ remove(index, remove_tags);
+ l.unlock(store->gc_pool_ctx, obj_names[index]);
+ delete ctx;
+ return 0;
+}
+
+int RGWGC::process()
+{
+ int max_objs = cct->_conf->rgw_gc_max_objs;
+ int max_secs = cct->_conf->rgw_gc_processor_max_time;
+
+ unsigned start;
+ int ret = get_random_bytes((char *)&start, sizeof(start));
+ if (ret < 0)
+ return ret;
+
+ for (int i = 0; i < max_objs; i++) {
+ int index = (i + start) % max_objs;
+ ret = process(index, max_secs);
+ if (ret < 0)
+ return ret;
+ }
+
+ return 0;
+}
+
+bool RGWGC::going_down()
+{
+ return (down_flag.read() != 0);
+}
+
+void RGWGC::start_processor()
+{
+ worker = new GCWorker(cct, this);
+ worker->create();
+}
+
+void RGWGC::stop_processor()
+{
+ down_flag.set(1);
+ if (worker) {
+ worker->stop();
+ worker->join();
+ }
+ delete worker;
+ worker = NULL;
+}
+
+void *RGWGC::GCWorker::entry() {
+ do {
+ utime_t start = ceph_clock_now(cct);
+ dout(2) << "garbage collection: start" << dendl;
+ int r = gc->process();
+ if (r < 0) {
+ dout(0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
+ }
+ dout(2) << "garbage collection: stop" << dendl;
+
+ if (gc->going_down())
+ break;
+
+ utime_t end = ceph_clock_now(cct);
+ end -= start;
+ int secs = cct->_conf->rgw_gc_processor_period;
+
+ if (secs <= end.sec())
+ continue; // next round
+
+ secs -= end.sec();
+
+ lock.Lock();
+ cond.WaitInterval(cct, lock, utime_t(secs, 0));
+ lock.Unlock();
+ } while (!gc->going_down());
+
+ return NULL;
+}
+
+void RGWGC::GCWorker::stop()
+{
+ Mutex::Locker l(lock);
+ cond.Signal();
+}
+
--- /dev/null
+#ifndef CEPH_RGW_GC_H
+#define CEPH_RGW_GC_H
+
+
+#include "include/types.h"
+#include "include/atomic.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "cls/rgw/cls_rgw_types.h"
+
+class RGWGC {
+ CephContext *cct;
+ RGWRados *store;
+ int max_objs;
+ string *obj_names;
+ atomic_t down_flag;
+
+ int tag_index(const string& tag);
+
+ class GCWorker : public Thread {
+ CephContext *cct;
+ RGWGC *gc;
+ Mutex lock;
+ Cond cond;
+
+ public:
+ GCWorker(CephContext *_cct, RGWGC *_gc) : cct(_cct), gc(_gc), lock("GCWorker") {}
+ void *entry();
+ void stop();
+ };
+
+ GCWorker *worker;
+public:
+ RGWGC() : cct(NULL), store(NULL), max_objs(0), obj_names(NULL), worker(NULL) {}
+
+ void add_chain(librados::ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag);
+ int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync);
+ int defer_chain(const string& tag, bool sync);
+ int remove(int index, const std::list<string>& tags);
+
+ void initialize(CephContext *_cct, RGWRados *_store);
+ void finalize();
+
+ int list(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);
+ void list_init(int *index) { *index = 0; }
+ int process(int index, int process_max_secs);
+ int process();
+
+ bool going_down();
+ void start_processor();
+ void stop_processor();
+};
+
+
+#endif
RGWStoreManager store_manager;
int r = 0;
- if (!store_manager.init(g_ceph_context)) {
+ if (!store_manager.init(g_ceph_context, true)) {
derr << "Couldn't init storage provider (RADOS)" << dendl;
r = EIO;
}
void *handle = NULL;
utime_t start_time = s->time;
bufferlist bl;
+ utime_t gc_invalidate_time = ceph_clock_now(s->cct);
+ gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+
perfcounter->inc(l_rgw_get);
send_response(bl);
bl.clear();
start_time = ceph_clock_now(s->cct);
+
+ if (ofs <= end) {
+ if (start_time > gc_invalidate_time) {
+ int r = rgwstore->defer_gc(s->obj_ctx, obj);
+ if (r < 0) {
+ dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
+ }
+ gc_invalidate_time = start_time;
+ gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+ }
+ }
}
return;
#include "rgw_log.h"
+#include "rgw_gc.h"
+
#define dout_subsys ceph_subsys_rgw
using namespace std;
-Rados *rados = NULL;
-
static RGWCache<RGWRados> cached_rados_provider;
static RGWRados rados_provider;
}
};
-RGWRados *RGWRados::init_storage_provider(CephContext *cct)
+RGWRados *RGWRados::init_storage_provider(CephContext *cct, bool use_gc_thread)
{
int use_cache = cct->_conf->rgw_cache_enabled;
store = NULL;
store = &cached_rados_provider;
}
- if (store->initialize(cct) < 0)
+ if (store->initialize(cct, use_gc_thread) < 0)
store = NULL;
return store;
store = NULL;
}
+
+void RGWRados::finalize()
+{
+ if (use_gc_thread)
+ gc->stop_processor();
+}
+
/**
* Initialize the RADOS instance and prepare to do other ops
* Returns 0 on success, -ERR# on failure.
if (ret < 0)
return ret;
+ ret = open_gc_pool_ctx();
+ if (ret < 0)
+ return ret;
+
+ gc = new RGWGC();
+ gc->initialize(cct, this);
+
+ if (use_gc_thread)
+ gc->start_processor();
+
return ret;
}
return r;
}
+int RGWRados::open_gc_pool_ctx()
+{
+ int r = rados->ioctx_create(RGW_GC_BUCKET, gc_pool_ctx);
+ if (r == -ENOENT) {
+ r = rados->pool_create(RGW_GC_BUCKET);
+ if (r == -EEXIST)
+ r = 0;
+ if (r < 0)
+ return r;
+
+ r = rados->ioctx_create(RGW_GC_BUCKET, gc_pool_ctx);
+ }
+
+ return r;
+}
+
int RGWRados::init_watch()
{
int r = rados->ioctx_create(RGW_CONTROL_BUCKET, control_pool_ctx);
RGWObjState *state = NULL;
if (!exclusive) {
- r = prepare_atomic_for_write(rctx, obj, io_ctx, oid, op, &state);
+ r = prepare_atomic_for_write(rctx, obj, op, &state, true);
if (r < 0)
return r;
+ } else {
+ op.create(true); // exclusive create
}
- op.create(exclusive);
-
if (data) {
/* if we want to overwrite the data, we also want to overwrite the
xattrs, so just remove the object */
- op.remove();
op.write_full(*data);
}
if (!state || !state->has_manifest)
return 0;
+ cls_rgw_obj_chain chain;
map<uint64_t, RGWObjManifestPart>::iterator iter;
for (iter = state->manifest.objs.begin(); iter != state->manifest.objs.end(); ++iter) {
rgw_obj& mobj = iter->second.loc;
if (mobj == obj)
continue;
- int ret = rctx->notify_intent(mobj, DEL_OBJ);
- if (ret < 0) {
- ldout(cct, 0) << "WARNING: failed to log intent ret=" << ret << dendl;
- }
+ string oid, key;
+ rgw_bucket bucket;
+ get_obj_bucket_and_oid_key(mobj, bucket, oid, key);
+ chain.push_obj(bucket.pool, oid, key);
}
- return 0;
+
+ string tag = state->obj_tag.c_str();
+ int ret = gc->send_chain(chain, tag, false); // do it async
+
+ return ret;
}
+int RGWRados::defer_gc(void *ctx, rgw_obj& obj)
+{
+ RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+ rgw_bucket bucket;
+ std::string oid, key;
+ get_obj_bucket_and_oid_key(obj, bucket, oid, key);
+ if (!rctx)
+ return 0;
+
+ RGWObjState *state = NULL;
+
+ int r = get_obj_state(rctx, obj, &state);
+ if (r < 0)
+ return r;
+
+ if (!state->is_atomic) {
+ ldout(cct, 20) << "state for obj=" << obj << " is not atomic, not deferring gc operation" << dendl;
+ return -EINVAL;
+ }
+
+ if (state->obj_tag.length() == 0) {// check for backward compatibility
+ ldout(cct, 20) << "state->obj_tag is empty, not deferring gc operation" << dendl;
+ return -EINVAL;
+ }
+
+ string tag = state->obj_tag.c_str();
+
+ ldout(cct, 0) << "defer chain tag=" << tag << dendl;
+
+ return gc->defer_chain(tag, false);
+}
+
+
/**
* Delete an object.
* bucket: name of the bucket storing the object
ObjectWriteOperation op;
RGWObjState *state;
- r = prepare_atomic_for_write(rctx, obj, io_ctx, oid, op, &state);
+ r = prepare_atomic_for_write(rctx, obj, op, &state, false);
if (r < 0)
return r;
return r;
}
-int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state)
+static void generate_fake_tag(CephContext *cct, map<string, bufferlist>& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl)
+{
+ string tag;
+
+ map<uint64_t, RGWObjManifestPart>::iterator mi = manifest.objs.begin();
+ if (mi != manifest.objs.end()) {
+ if (manifest.objs.size() > 1) // first object usually points at the head, let's skip to a more unique part
+ ++mi;
+ tag = mi->second.loc.object;
+ tag.append("_");
+ }
+
+ unsigned char md5[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ char md5_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+ MD5 hash;
+ hash.Update((const byte *)manifest_bl.c_str(), manifest_bl.length());
+
+ map<string, bufferlist>::iterator iter = attrset.find(RGW_ATTR_ETAG);
+ if (iter != attrset.end()) {
+ bufferlist& bl = iter->second;
+ hash.Update((const byte *)bl.c_str(), bl.length());
+ }
+
+ hash.Final(md5);
+ buf_to_hex(md5, CEPH_CRYPTO_MD5_DIGESTSIZE, md5_str);
+ tag.append(md5_str);
+
+ ldout(cct, 10) << "generate_fake_tag new tag=" << tag << dendl;
+
+ tag_bl.append(tag.c_str(), tag.size() + 1);
+}
+
+int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state)
{
RGWObjState *s = rctx->get_state(obj);
ldout(cct, 20) << "get_obj_state: rctx=" << (void *)rctx << " obj=" << obj << " state=" << (void *)s << " s->prefetch_data=" << s->prefetch_data << dendl;
for (mi = s->manifest.objs.begin(); mi != s->manifest.objs.end(); ++mi) {
ldout(cct, 10) << "manifest: ofs=" << mi->first << " loc=" << mi->second.loc << dendl;
}
+
+ if (!s->obj_tag.length()) {
+ /*
+ * Uh oh, something's wrong, object with manifest should have tag. Let's
+ * create one out of the manifest, would be unique
+ */
+ generate_fake_tag(cct, s->attrset, s->manifest, manifest_bl, s->obj_tag);
+ s->fake_tag = true;
+ }
}
if (s->obj_tag.length())
ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << s->obj_tag.c_str() << dendl;
if (rctx) {
RGWObjState *state;
- r = get_obj_state(rctx, obj, io_ctx, actual_obj, &state);
+ r = get_obj_state(rctx, obj, &state);
if (r < 0)
return r;
if (!state->exists)
return 0;
}
-int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
- string& actual_obj, ObjectOperation& op, RGWObjState **pstate)
+int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj,
+ ObjectOperation& op, RGWObjState **pstate)
{
if (!rctx)
return 0;
- int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+ int r = get_obj_state(rctx, obj, pstate);
if (r < 0)
return r;
return 0;
}
- if (state->obj_tag.length() > 0) {// check for backward compatibility
+ if (state->obj_tag.length() > 0 && !state->fake_tag) {// check for backward compatibility
op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag);
} else {
ldout(cct, 20) << "state->obj_tag is empty, not appending atomic test" << dendl;
return 0;
}
-int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
- string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate)
+int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj,
+ ObjectWriteOperation& op, RGWObjState **pstate,
+ bool reset_obj)
{
- int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+ int r = get_obj_state(rctx, obj, pstate);
if (r < 0)
return r;
RGWObjState *state = *pstate;
- bool need_guard = state->has_manifest || (state->obj_tag.length() != 0);
+ bool need_guard = (state->has_manifest || (state->obj_tag.length() != 0)) && (!state->fake_tag);
if (!state->is_atomic) {
ldout(cct, 20) << "prepare_atomic_for_write_impl: state is not atomic. state=" << (void *)state << dendl;
+
+ if (reset_obj) {
+ op.create(false);
+ op.remove();
+ }
+
return 0;
}
// FIXME: need to add FAIL_NOTEXIST_OK for racing deletion
}
+ if (reset_obj) {
+ op.create(false);
+ op.remove();
+ }
+
string tag;
append_rand_alpha(cct, tag, tag, 32);
bufferlist bl;
- bl.append(tag);
+ bl.append(tag.c_str(), tag.size() + 1);
op.setxattr(RGW_ATTR_ID_TAG, bl);
return 0;
}
-int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
- string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate)
+int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj,
+ ObjectWriteOperation& op, RGWObjState **pstate,
+ bool reset_obj)
{
if (!rctx) {
*pstate = NULL;
}
int r;
- r = prepare_atomic_for_write_impl(rctx, obj, io_ctx, actual_obj, op, pstate);
+ r = prepare_atomic_for_write_impl(rctx, obj, op, pstate, reset_obj);
return r;
}
if (r < 0)
return r;
- io_ctx.locator_set_key(key);
-
ObjectWriteOperation op;
RGWObjState *state = NULL;
- r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state);
+ r = append_atomic_test(rctx, obj, op, &state);
if (r < 0)
return r;
op.setxattr(name, bl);
+
+ io_ctx.locator_set_key(key);
r = io_ctx.operate(actual_obj, &op);
if (state && r >= 0)
ObjectWriteOperation op;
RGWObjState *state = NULL;
- r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state);
+ r = append_atomic_test(rctx, obj, op, &state);
if (r < 0)
return r;
rctx = new_ctx;
}
- r = get_obj_state(rctx, obj, state->io_ctx, oid, &astate);
+ r = get_obj_state(rctx, obj, &astate);
if (r < 0)
goto done_err;
}
}
RGWObjState *state;
- r = prepare_atomic_for_write(rctx, dst_obj, io_ctx, dst_oid, op, &state);
+ r = prepare_atomic_for_write(rctx, dst_obj, op, &state, true);
if (r < 0)
return r;
rctx = new_ctx;
}
- int r = get_obj_state(rctx, obj, state->io_ctx, oid, &astate);
+ int r = get_obj_state(rctx, obj, &astate);
if (r < 0)
goto done_ret;
if (reading_from_head) {
/* only when reading from the head object do we need to do the atomic test */
- r = append_atomic_test(rctx, read_obj, state->io_ctx, oid, op, &astate);
+ r = append_atomic_test(rctx, read_obj, op, &astate);
if (r < 0)
goto done_ret;
}
ObjectReadOperation op;
- r = append_atomic_test(rctx, obj, io_ctx, oid, op, &astate);
+ r = append_atomic_test(rctx, obj, op, &astate);
if (r < 0)
return r;
return objs.size();
}
+int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
+{
+ return gc_pool_ctx.operate(oid, op);
+}
+
+int RGWRados::gc_aio_operate(string& oid, librados::ObjectWriteOperation *op)
+{
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ int r = gc_pool_ctx.aio_operate(oid, c, op);
+ c->release();
+ return r;
+}
+
+int RGWRados::gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl)
+{
+ return gc_pool_ctx.operate(oid, op, pbl);
+}
+
+int RGWRados::list_gc_objs(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
+{
+ return gc->list(index, marker, max, result, truncated);
+}
+
+int RGWRados::process_gc()
+{
+ return gc->process();
+}
+
int RGWRados::cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid)
{
bufferlist in;
class RGWWatcher;
class SafeTimer;
class ACLOwner;
+class RGWGC;
static inline void prepend_bucket_marker(rgw_bucket& bucket, string& orig_oid, string& oid)
{
uint64_t size;
time_t mtime;
bufferlist obj_tag;
+ bool fake_tag;
RGWObjManifest manifest;
bool has_manifest;
string shadow_obj;
bool prefetch_data;
map<string, bufferlist> attrset;
- RGWObjState() : is_atomic(false), has_attrs(0), exists(false), has_manifest(false), prefetch_data(false) {}
+ RGWObjState() : is_atomic(false), has_attrs(0), exists(false), fake_tag(false), has_manifest(false), prefetch_data(false) {}
bool get_attr(string name, bufferlist& dest) {
map<string, bufferlist>::iterator iter = attrset.find(name);
void clear() {
has_attrs = false;
exists = false;
+ fake_tag = false;
size = 0;
mtime = 0;
obj_tag.clear();
class RGWRados
{
+ friend class RGWGC;
+
/** Open the pool used as root for this gateway */
int open_root_pool_ctx();
+ int open_gc_pool_ctx();
int open_bucket_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
}
};
+ RGWGC *gc;
+ bool use_gc_thread;
+
int num_watchers;
RGWWatcher **watchers;
Mutex bucket_id_lock;
uint64_t max_bucket_id;
- int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state);
- int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
- string& actual_obj, librados::ObjectOperation& op, RGWObjState **state);
- int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
- string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate);
- int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
- string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate);
+ int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state);
+ int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj,
+ librados::ObjectOperation& op, RGWObjState **state);
+ int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj,
+ librados::ObjectWriteOperation& op, RGWObjState **pstate,
+ bool reset_obj);
+ int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj,
+ librados::ObjectWriteOperation& op, RGWObjState **pstate,
+ bool reset_obj);
void atomic_write_finish(RGWObjState *state, int r) {
if (state && r == -ECANCELED) {
protected:
CephContext *cct;
+ librados::Rados *rados;
+ librados::IoCtx gc_pool_ctx; // .rgw.gc
public:
- RGWRados() : lock("rados_timer_lock"), timer(NULL), num_watchers(0), watchers(NULL), watch_handles(NULL),
- bucket_id_lock("rados_bucket_id"), max_bucket_id(0) {}
+ RGWRados() : lock("rados_timer_lock"), timer(NULL), gc(NULL), use_gc_thread(false),
+ num_watchers(0), watchers(NULL), watch_handles(0),
+ bucket_id_lock("rados_bucket_id"), max_bucket_id(0), rados(NULL) {}
virtual ~RGWRados() {}
void tick();
CephContext *ctx() { return cct; }
/** do all necessary setup of the storage device */
- int initialize(CephContext *_cct) {
+ int initialize(CephContext *_cct, bool _use_gc_thread) {
cct = _cct;
+ use_gc_thread = _use_gc_thread;
return initialize();
}
/** Initialize the RADOS instance and prepare to do other ops */
virtual int initialize();
- virtual void finalize() {}
+ virtual void finalize();
- static RGWRados *init_storage_provider(CephContext *cct);
+ static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread);
static void close_storage();
static RGWRados *store;
/// clean up/process any temporary objects older than given date[/time]
int remove_temp_objects(string date, string time);
+ int gc_operate(string& oid, librados::ObjectWriteOperation *op);
+ int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op);
+ int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl);
+
+ int list_gc_objs(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);
+ int process_gc();
+ int defer_gc(void *ctx, rgw_obj& obj);
private:
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);
uint64_t instance_id();
uint64_t next_bucket_id();
+
};
class RGWStoreManager {
RGWRados *store;
public:
- RGWStoreManager() : store(NULL) {}
+ RGWStoreManager(): store(NULL) {}
~RGWStoreManager() {
- if (store)
+ if (store) {
RGWRados::close_storage();
+ }
}
- RGWRados *init(CephContext *cct) {
- store = RGWRados::init_storage_provider(cct);
+ RGWRados *init(CephContext *cct, bool use_gc_thread) {
+ store = RGWRados::init_storage_provider(cct, use_gc_thread);
return store;
}
};