]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement garbage collector
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 21 Aug 2012 22:05:38 +0000 (15:05 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Mon, 27 Aug 2012 19:41:19 +0000 (12:41 -0700)
Add a garbage collector thread that is responsible for clean
up of clutter. When removing an object, store info about the
leftovers in a special gc map (via rgw objclass). A new
radosgw-admin commands to list objects in gc, and to run the
gc process manually. Also, gc processors can run in parallel,
however, each will handle a single gc shard (synchronized
using lock objclass).

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
12 files changed:
src/Makefile.am
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_types.cc [new file with mode: 0644]
src/rgw/rgw_admin.cc
src/rgw/rgw_cache.h
src/rgw/rgw_common.h
src/rgw/rgw_gc.cc [new file with mode: 0644]
src/rgw/rgw_gc.h [new file with mode: 0644]
src/rgw/rgw_main.cc
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index cb0d262179905a210cbc6653fc1c5fd78821b3cc..faec81c0a1f8a4d4307e1a00185686661a14189d 100644 (file)
@@ -315,13 +315,14 @@ librgw_a_SOURCES =  \
        rgw/rgw_formats.cc \
        rgw/rgw_log.cc \
        rgw/rgw_multi.cc \
+       rgw/rgw_gc.cc \
        rgw/rgw_env.cc
 librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS}
 librgw_a_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
 noinst_LIBRARIES += librgw.a
 
 my_radosgw_ldadd = \
-       libglobal.la librgw.a librados.la libcls_rgw_client.a -lcurl -lexpat \
+       libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_lock_client.a -lcurl -lexpat \
        $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
 
 radosgw_SOURCES = \
@@ -1669,6 +1670,7 @@ noinst_HEADERS = \
        rgw/rgw_formats.h\
        rgw/rgw_log.h\
        rgw/rgw_multi.h\
+       rgw/rgw_gc.h\
        rgw/rgw_op.h\
        rgw/rgw_swift.h\
        rgw/rgw_swift_auth.h\
index 9fba355f9088311d834f511722de59396fae1462..12012a45614d1c74ce84882c4e0b41a66a7aecb3 100644 (file)
@@ -672,16 +672,27 @@ int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlis
   return 0;
 }
 
+/*
+ * We hold the garbage collection chain data under two different indexes: the first 'name' index
+ * keeps them under a unique tag that represents the chains, and a second 'time' index keeps
+ * them by their expiration timestamp
+ */
 #define GC_OBJ_NAME_INDEX 0
 #define GC_OBJ_TIME_INDEX 1
 
 static string gc_index_prefixes[] = { "0_",
                                       "1_" };
 
+static void prepend_index_prefix(const string& src, int index, string *dest)
+{
+  *dest = gc_index_prefixes[index];
+  dest->append(src);
+}
+
 static int gc_omap_get(cls_method_context_t hctx, int type, const string& key, cls_rgw_gc_obj_info *info)
 {
-  string index = gc_index_prefixes[type];
-  index.append(key);
+  string index;
+  prepend_index_prefix(key, type, &index);
 
   bufferlist bl;
   int ret = cls_cxx_map_get_val(hctx, index, &bl);
@@ -698,7 +709,7 @@ static int gc_omap_get(cls_method_context_t hctx, int type, const string& key, c
   return 0;
 }
 
-static int gc_omap_set(cls_method_context_t hctx, int type, const string& key, cls_rgw_gc_obj_info *info)
+static int gc_omap_set(cls_method_context_t hctx, int type, const string& key, const cls_rgw_gc_obj_info *info)
 {
   bufferlist bl;
   ::encode(*info, bl);
@@ -726,13 +737,20 @@ static int gc_omap_remove(cls_method_context_t hctx, int type, const string& key
   return 0;
 }
 
-void get_time_key(utime_t& ut, string& key)
+static void get_time_key(utime_t& ut, string *key)
 {
   char buf[32];
-  snprintf(buf, 32, "%011lld.%d", (long long)ut.sec(), ut.nsec());
-  key = buf;
+  snprintf(buf, 32, "%011lld.%09d", (long long)ut.sec(), ut.nsec());
+  *key = buf;
 }
 
+static bool key_in_index(const string& key, int index_type)
+{
+  const string& prefix = gc_index_prefixes[index_type]; 
+  return (key.compare(0, prefix.size(), prefix) == 0);
+}
+
+
 static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs,
                            cls_rgw_gc_obj_info& info)
 {
@@ -740,7 +758,7 @@ static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs,
   int ret = gc_omap_get(hctx, GC_OBJ_NAME_INDEX, info.tag, &old_info);
   if (ret == 0) {
     string key;
-    get_time_key(old_info.time, key);
+    get_time_key(old_info.time, &key);
     ret = gc_omap_remove(hctx, GC_OBJ_TIME_INDEX, key);
     if (ret < 0 && ret != -ENOENT) {
       CLS_LOG(0, "ERROR: failed to remove key=%s\n", key.c_str());
@@ -754,7 +772,7 @@ static int gc_update_entry(cls_method_context_t hctx, uint32_t expiration_secs,
     return ret;
 
   string key;
-  get_time_key(info.time, key);
+  get_time_key(info.time, &key);
   ret = gc_omap_set(hctx, GC_OBJ_TIME_INDEX, key, &info);
   if (ret < 0)
     goto done_err;
@@ -835,28 +853,23 @@ static int gc_iterate_entries(cls_method_context_t hctx, const string& marker,
   if (truncated)
     *truncated = false;
 
-  string& index_prefix = gc_index_prefixes[GC_OBJ_TIME_INDEX];
-
   string start_key;
   if (key_iter.empty()) {
-    start_key = index_prefix;
-    start_key.append(marker);
+    prepend_index_prefix(marker, GC_OBJ_TIME_INDEX, &start_key);
   } else {
     start_key = key_iter;
   }
 
   utime_t now = ceph_clock_now(g_ceph_context);
   string now_str;
-  get_time_key(now, now_str);
-  end_key = gc_index_prefixes[GC_OBJ_TIME_INDEX];
-  end_key.append(now_str);
+  get_time_key(now, &now_str);
+  prepend_index_prefix(now_str, GC_OBJ_TIME_INDEX, &end_key);
 
   CLS_LOG(0, "gc_iterate_entries end_key=%s\n", end_key.c_str());
 
   string filter;
 
   do {
-    
 #define GC_NUM_KEYS 32
     int ret = cls_cxx_map_get_vals(hctx, start_key, filter, GC_NUM_KEYS, &keys);
     if (ret < 0)
@@ -871,28 +884,29 @@ static int gc_iterate_entries(cls_method_context_t hctx, const string& marker,
       const string& key = iter->first;
       cls_rgw_gc_obj_info e;
 
-      CLS_LOG(0, "gc_iterate_entries key=%s\n", key.c_str());
+      CLS_LOG(10, "gc_iterate_entries key=%s\n", key.c_str());
 
       if (key.compare(end_key) >= 0)
         return 0;
 
-      if (key.compare(0, index_prefix.size(), index_prefix) != 0)
-        return 0;
+      if (!key_in_index(key, GC_OBJ_TIME_INDEX))
+       return 0;
 
       ret = gc_record_decode(iter->second, e);
       if (ret < 0)
         return ret;
 
-      ret = cb(hctx, key, e, param);
-      if (ret < 0)
-        return ret;
-
-      i++;
-      if (max_entries && (i > max_entries)) {
+      if (max_entries && (i >= max_entries)) {
         *truncated = true;
         key_iter = key;
         return 0;
       }
+
+      ret = cb(hctx, key, e, param);
+      if (ret < 0)
+        return ret;
+      i++;
+
     }
     iter--;
     start_key = iter->first;
@@ -956,7 +970,7 @@ static int gc_remove(cls_method_context_t hctx, list<string>& tags)
       return ret;
 
     string time_key;
-    get_time_key(info.time, time_key);
+    get_time_key(info.time, &time_key);
     ret = gc_omap_remove(hctx, GC_OBJ_TIME_INDEX, time_key);
     if (ret < 0 && ret != -ENOENT)
       return ret;
diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc
new file mode 100644 (file)
index 0000000..1c40e02
--- /dev/null
@@ -0,0 +1,185 @@
+
+#include "cls/rgw/cls_rgw_types.h"
+#include "common/Formatter.h"
+
+
+void rgw_bucket_pending_info::generate_test_instances(list<rgw_bucket_pending_info*>& o)
+{
+  rgw_bucket_pending_info *i = new rgw_bucket_pending_info;
+  i->state = CLS_RGW_STATE_COMPLETE;
+  i->op = CLS_RGW_OP_DEL;
+  o.push_back(i);
+  o.push_back(new rgw_bucket_pending_info);
+}
+
+void rgw_bucket_pending_info::dump(Formatter *f) const
+{
+  f->dump_int("state", (int)state);
+  f->dump_stream("timestamp") << timestamp;
+  f->dump_int("op", (int)op);
+}
+
+void rgw_bucket_dir_entry_meta::generate_test_instances(list<rgw_bucket_dir_entry_meta*>& o)
+{
+  rgw_bucket_dir_entry_meta *m = new rgw_bucket_dir_entry_meta;
+  m->category = 1;
+  m->size = 100;
+  m->etag = "etag";
+  m->owner = "owner";
+  m->owner_display_name = "display name";
+  m->tag = "tag";
+  m->content_type = "content/type";
+  o.push_back(m);
+  o.push_back(new rgw_bucket_dir_entry_meta);
+}
+
+void rgw_bucket_dir_entry_meta::dump(Formatter *f) const
+{
+  f->dump_int("category", category);
+  f->dump_unsigned("size", size);
+  f->dump_stream("mtime") << mtime;
+  f->dump_string("etag", etag);
+  f->dump_string("owner", owner);
+  f->dump_string("owner_display_name", owner_display_name);
+  f->dump_string("tag", tag);
+  f->dump_string("content_type", content_type);
+}
+
+void rgw_bucket_dir_entry::generate_test_instances(list<rgw_bucket_dir_entry*>& o)
+{
+  list<rgw_bucket_dir_entry_meta *> l;
+  rgw_bucket_dir_entry_meta::generate_test_instances(l);
+
+  list<rgw_bucket_dir_entry_meta *>::iterator iter;
+  for (iter = l.begin(); iter != l.end(); ++iter) {
+    rgw_bucket_dir_entry_meta *m = *iter;
+    rgw_bucket_dir_entry *e = new rgw_bucket_dir_entry;
+    e->name = "name";
+    e->epoch = 1234;
+    e->locator = "locator";
+    e->exists = true;
+    e->meta = *m;
+
+    o.push_back(e);
+
+    delete m;
+  }
+  o.push_back(new rgw_bucket_dir_entry);
+}
+
+void rgw_bucket_dir_entry::dump(Formatter *f) const
+{
+  f->dump_string("name", name);
+  f->dump_unsigned("epoch", epoch);
+  f->dump_string("locator", locator);
+  f->dump_int("exists", (int)exists);
+  f->open_object_section("meta");
+  meta.dump(f);
+  f->close_section();
+
+  map<string, struct rgw_bucket_pending_info>::const_iterator iter = pending_map.begin();
+  f->open_array_section("pending_map");
+  for (; iter != pending_map.end(); ++iter) {
+    f->dump_string("tag", iter->first);
+    f->open_object_section("info");
+    iter->second.dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void rgw_bucket_category_stats::generate_test_instances(list<rgw_bucket_category_stats*>& o)
+{
+  rgw_bucket_category_stats *s = new rgw_bucket_category_stats;
+  s->total_size = 1024;
+  s->total_size_rounded = 4096;
+  s->num_entries = 2;
+  o.push_back(s);
+  o.push_back(new rgw_bucket_category_stats);
+}
+
+void rgw_bucket_category_stats::dump(Formatter *f) const
+{
+  f->dump_unsigned("total_size", total_size);
+  f->dump_unsigned("total_size_rounded", total_size_rounded);
+  f->dump_unsigned("num_entries", num_entries);
+}
+
+void rgw_bucket_dir_header::generate_test_instances(list<rgw_bucket_dir_header*>& o)
+{
+  list<rgw_bucket_category_stats *> l;
+  list<rgw_bucket_category_stats *>::iterator iter;
+  rgw_bucket_category_stats::generate_test_instances(l);
+
+  uint8_t i;
+  for (i = 0, iter = l.begin(); iter != l.end(); ++iter, ++i) {
+    rgw_bucket_dir_header *h = new rgw_bucket_dir_header;
+    rgw_bucket_category_stats *s = *iter;
+    h->stats[i] = *s;
+
+    o.push_back(h);
+
+    delete s;
+  }
+
+  o.push_back(new rgw_bucket_dir_header);
+}
+
+void rgw_bucket_dir_header::dump(Formatter *f) const
+{
+  map<uint8_t, struct rgw_bucket_category_stats>::const_iterator iter = stats.begin();
+  f->open_array_section("stats");
+  for (; iter != stats.end(); ++iter) {
+    f->dump_int("category", (int)iter->first);
+    f->open_object_section("category_stats");
+    iter->second.dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void rgw_bucket_dir::generate_test_instances(list<rgw_bucket_dir*>& o)
+{
+  list<rgw_bucket_dir_header *> l;
+  list<rgw_bucket_dir_header *>::iterator iter;
+  rgw_bucket_dir_header::generate_test_instances(l);
+
+  uint8_t i;
+  for (i = 0, iter = l.begin(); iter != l.end(); ++iter, ++i) {
+    rgw_bucket_dir *d = new rgw_bucket_dir;
+    rgw_bucket_dir_header *h = *iter;
+    d->header = *h;
+
+    list<rgw_bucket_dir_entry *> el;
+    list<rgw_bucket_dir_entry *>::iterator eiter;
+    for (eiter = el.begin(); eiter != el.end(); ++eiter) {
+      rgw_bucket_dir_entry *e = *eiter;
+      d->m[e->name] = *e;
+
+      delete e;
+    }
+
+    o.push_back(d);
+
+    delete h;
+  }
+
+  o.push_back(new rgw_bucket_dir);
+}
+
+void rgw_bucket_dir::dump(Formatter *f) const
+{
+  f->open_object_section("header");
+  header.dump(f);
+  f->close_section();
+  map<string, struct rgw_bucket_dir_entry>::const_iterator iter = m.begin();
+  f->open_array_section("map");
+  for (; iter != m.end(); ++iter) {
+    f->dump_string("obj", iter->first);
+    f->open_object_section("dir_entry");
+    iter->second.dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
index bfb74bba0b49d73223242c926e9744561c4da464..7894b6429ba208ab18f701818d51982949779fd7 100644 (file)
@@ -61,6 +61,8 @@ void _usage()
   cerr << "  usage trim                 trim usage (by user, date range)\n";
   cerr << "  temp remove                remove temporary objects that were created up to\n";
   cerr << "                             specified date (and optional time)\n";
+  cerr << "  gc list                    dump expired garbage collection objects\n";
+  cerr << "  gc process                 manually process garbage\n";
   cerr << "options:\n";
   cerr << "   --uid=<id>                user id\n";
   cerr << "   --auth-uid=<auid>         librados uid\n";
@@ -143,6 +145,8 @@ enum {
   OPT_USAGE_TRIM,
   OPT_TEMP_REMOVE,
   OPT_OBJECT_RM,
+  OPT_GC_LIST,
+  OPT_GC_PROCESS,
 };
 
 static uint32_t str_to_perm(const char *str)
@@ -215,7 +219,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       strcmp(cmd, "log") == 0 ||
       strcmp(cmd, "usage") == 0 ||
       strcmp(cmd, "object") == 0 ||
-      strcmp(cmd, "temp") == 0) {
+      strcmp(cmd, "temp") == 0 ||
+      strcmp(cmd, "gc") == 0) {
     *need_more = true;
     return 0;
   }
@@ -291,6 +296,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
   } else if (strcmp(prev_cmd, "object") == 0) {
     if (strcmp(cmd, "rm") == 0)
       return OPT_OBJECT_RM;
+  } else if (strcmp(prev_cmd, "gc") == 0) {
+    if (strcmp(cmd, "list") == 0)
+      return OPT_GC_LIST;
+    if (strcmp(cmd, "process") == 0)
+      return OPT_GC_PROCESS;
   }
 
   return -EINVAL;
@@ -857,7 +867,7 @@ int main(int argc, char **argv)
                     opt_cmd == OPT_KEY_CREATE || opt_cmd == OPT_KEY_RM || opt_cmd == OPT_USER_RM);
 
   RGWStoreManager store_manager;
-  store = store_manager.init(g_ceph_context);
+  store = store_manager.init(g_ceph_context, false);
   if (!store) {
     cerr << "couldn't init storage provider" << std::endl;
     return 5; //EIO
@@ -1648,5 +1658,52 @@ next:
     }
   }
 
+  if (opt_cmd == OPT_GC_LIST) {
+    int ret;
+    int index = 0;
+    string marker;
+    bool truncated;
+    formatter->open_array_section("entries");
+
+    do {
+      list<cls_rgw_gc_obj_info> result;
+      ret = rgwstore->list_gc_objs(&index, marker, 1000, result, &truncated);
+      if (ret < 0) {
+       cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl;
+       return 1;
+      }
+
+
+      list<cls_rgw_gc_obj_info>::iterator iter;
+      for (iter = result.begin(); iter != result.end(); ++iter) {
+       cls_rgw_gc_obj_info& info = *iter;
+       formatter->open_object_section("chain_info");
+       formatter->dump_string("tag", info.tag);
+       formatter->dump_stream("time") << info.time;
+       formatter->open_array_section("objs");
+        list<cls_rgw_obj>::iterator liter;
+       cls_rgw_obj_chain& chain = info.chain;
+       for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
+         cls_rgw_obj& obj = *liter;
+         formatter->dump_string("pool", obj.pool);
+         formatter->dump_string("oid", obj.oid);
+         formatter->dump_string("key", obj.key);
+       }
+       formatter->close_section(); // objs
+       formatter->close_section(); // obj_chain
+       formatter->flush(cout);
+      }
+    } while (truncated);
+    formatter->close_section();
+    formatter->flush(cout);
+  }
+
+  if (opt_cmd == OPT_GC_PROCESS) {
+    int ret = rgwstore->process_gc();
+    if (ret < 0) {
+      cerr << "ERROR: gc processing returned error: " << cpp_strerror(-ret) << std::endl;
+      return 1;
+    }
+  }
   return 0;
 }
index fa7630e17f8ef77186b018b4600db9b4999c072e..80091892cbe3ad5a58c6a678ecaf211cc4af881a 100644 (file)
@@ -184,6 +184,7 @@ class RGWCache  : public T
 
   void finalize() {
     T::finalize_watch();
+    T::finalize();
   }
   int distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
   int watch_cb(int opcode, uint64_t ver, bufferlist& bl);
index 22f69f45ac7726b9cc48a34461a157b901878a90..c46fc4d54a9dfc68ba902eefefe2f7ecffbc34bb 100644 (file)
@@ -39,6 +39,8 @@ using ceph::crypto::MD5;
 
 #define RGW_ROOT_BUCKET ".rgw"
 
+#define RGW_GC_BUCKET ".rgw.gc"
+
 #define RGW_CONTROL_BUCKET ".rgw.control"
 
 #define RGW_ATTR_PREFIX  "user.rgw."
diff --git a/src/rgw/rgw_gc.cc b/src/rgw/rgw_gc.cc
new file mode 100644 (file)
index 0000000..b4f131f
--- /dev/null
@@ -0,0 +1,303 @@
+
+
+#include "rgw_gc.h"
+#include "include/rados/librados.hpp"
+#include "cls/rgw/cls_rgw_client.h"
+#include "cls/lock/cls_lock_client.h"
+#include "auth/Crypto.h"
+
+#include <list>
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+using namespace librados;
+
+static string gc_oid_prefix = "gc";
+static string gc_index_lock_name = "gc_process";
+
+void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
+  cct = _cct;
+  store = _store;
+
+  max_objs = cct->_conf->rgw_gc_max_objs;
+  obj_names = new string[max_objs];
+
+  for (int i = 0; i < max_objs; i++) {
+    obj_names[i] = gc_oid_prefix;
+    char buf[32];
+    snprintf(buf, 32, ".%d", i);
+    obj_names[i].append(buf);
+  }
+}
+
+void RGWGC::finalize()
+{
+  for (int i = 0; i < max_objs; i++) {
+    delete[] obj_names;
+  }
+}
+
+int RGWGC::tag_index(const string& tag)
+{
+  return ceph_str_hash_linux(tag.c_str(), tag.size()) % max_objs;
+}
+
+void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag)
+{
+  cls_rgw_gc_obj_info info;
+  info.chain = chain;
+  info.tag = tag;
+
+  cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
+}
+
+int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
+{
+  ObjectWriteOperation op;
+  add_chain(op, chain, tag);
+
+  int i = tag_index(tag);
+
+  if (sync)
+    return store->gc_operate(obj_names[i], &op);
+
+  return store->gc_aio_operate(obj_names[i], &op);
+}
+
+int RGWGC::defer_chain(const string& tag, bool sync)
+{
+  ObjectWriteOperation op;
+  cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
+
+  int i = tag_index(tag);
+
+  if (sync)
+    return store->gc_operate(obj_names[i], &op);
+
+  return store->gc_aio_operate(obj_names[i], &op);
+}
+
+int RGWGC::remove(int index, const std::list<string>& tags)
+{
+  ObjectWriteOperation op;
+  cls_rgw_gc_remove(op, tags);
+  return store->gc_operate(obj_names[index], &op);
+}
+
+int RGWGC::list(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
+{
+  result.clear();
+
+  for (; *index < cct->_conf->rgw_gc_max_objs && result.size() < max; (*index)++, marker.clear()) {
+    std::list<cls_rgw_gc_obj_info> entries;
+    int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), entries, truncated);
+    if (ret == -ENOENT)
+      continue;
+    if (ret < 0)
+      return ret;
+
+    std::list<cls_rgw_gc_obj_info>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      result.push_back(*iter);
+    }
+
+    if (*index == cct->_conf->rgw_gc_max_objs - 1) {
+      /* we cut short here, truncated will hold the correct value */
+      return 0;
+    }
+
+    if (result.size() == max) {
+      /* close approximation, it might be that the next of the objects don't hold
+       * anything, in this case truncated should have been false, but we can find
+       * that out on the next iteration
+       */
+      *truncated = true;
+      return 0;
+    }
+
+  }
+  *truncated = false;
+
+  return 0;
+}
+
+int RGWGC::process(int index, int max_secs)
+{
+  rados::cls::lock::Lock l(gc_index_lock_name);
+  utime_t end = ceph_clock_now(g_ceph_context);
+  std::list<string> remove_tags;
+
+  /* max_secs should be greater than zero. We don't want a zero max_secs
+   * to be translated as no timeout, since we'd then need to break the
+   * lock and that would require a manual intervention. In this case
+   * we can just wait it out. */
+  if (max_secs <= 0)
+    return -EAGAIN;
+
+  end += max_secs;
+  utime_t time(max_secs, 0);
+  l.set_duration(time);
+
+  int ret = l.lock_exclusive(store->gc_pool_ctx, obj_names[index]);
+  if (ret == -EBUSY) { /* already locked by another gc processor */
+    dout(0) << "RGWGC::process() failed to acquire lock on " << obj_names[index] << dendl;
+    return 0;
+  }
+  if (ret < 0)
+    return ret;
+
+  string marker;
+  bool truncated;
+  IoCtx *ctx = NULL;
+  do {
+    int max = 100;
+    std::list<cls_rgw_gc_obj_info> entries;
+    ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, entries, &truncated);
+    if (ret == -ENOENT) {
+      ret = 0;
+      goto done;
+    }
+    if (ret < 0)
+      goto done;
+
+    string last_pool;
+    ctx = new IoCtx;
+    std::list<cls_rgw_gc_obj_info>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      bool remove_tag;
+      cls_rgw_gc_obj_info& info = *iter;
+      std::list<cls_rgw_obj>::iterator liter;
+      cls_rgw_obj_chain& chain = info.chain;
+
+      utime_t now = ceph_clock_now(g_ceph_context);
+      if (now >= end)
+        goto done;
+
+      remove_tag = true;
+      for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
+        cls_rgw_obj& obj = *liter;
+
+        if (obj.pool != last_pool) {
+          if (ctx) {
+            delete ctx;
+            ctx = new IoCtx;
+          }
+         ret = rgwstore->rados->ioctx_create(obj.pool.c_str(), *ctx);
+         if (ret < 0) {
+           dout(0) << "ERROR: failed to create ioctx pool=" << obj.pool << dendl;
+           continue;
+         }
+          last_pool = obj.pool;
+        }
+
+        ctx->locator_set_key(obj.key);
+       dout(0) << "gc::process: removing " << obj.pool << ":" << obj.oid << dendl;
+        ret = ctx->remove(obj.oid);
+       if (ret == -ENOENT)
+         ret = 0;
+        if (ret < 0) {
+          remove_tag = false;
+          dout(0) << "failed to remove " << obj.pool << ":" << obj.oid << "@" << obj.key << dendl;
+        }
+
+        if (going_down()) // leave early, even if tag isn't removed, it's ok
+          goto done;
+      }
+      if (remove_tag) {
+        remove_tags.push_back(info.tag);
+#define MAX_REMOVE_CHUNK 16
+        if (remove_tags.size() > MAX_REMOVE_CHUNK) {
+          remove(index, remove_tags);
+          remove_tags.clear();
+        }
+      }
+    }
+  } while (truncated);
+
+done:
+  if (remove_tags.size())
+    remove(index, remove_tags);
+  l.unlock(store->gc_pool_ctx, obj_names[index]);
+  delete ctx;
+  return 0;
+}
+
+int RGWGC::process()
+{
+  int max_objs = cct->_conf->rgw_gc_max_objs;
+  int max_secs = cct->_conf->rgw_gc_processor_max_time;
+
+  unsigned start;
+  int ret = get_random_bytes((char *)&start, sizeof(start));
+  if (ret < 0)
+    return ret;
+
+  for (int i = 0; i < max_objs; i++) {
+    int index = (i + start) % max_objs;
+    ret = process(index, max_secs);
+    if (ret < 0)
+      return ret;
+  }
+
+  return 0;
+}
+
+bool RGWGC::going_down()
+{
+  return (down_flag.read() != 0);
+}
+
+void RGWGC::start_processor()
+{
+  worker = new GCWorker(cct, this);
+  worker->create();
+}
+
+void RGWGC::stop_processor()
+{
+  down_flag.set(1);
+  if (worker) {
+    worker->stop();
+    worker->join();
+  }
+  delete worker;
+  worker = NULL;
+}
+
+void *RGWGC::GCWorker::entry() {
+  do {
+    utime_t start = ceph_clock_now(cct);
+    dout(2) << "garbage collection: start" << dendl;
+    int r = gc->process();
+    if (r < 0) {
+      dout(0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
+    }
+    dout(2) << "garbage collection: stop" << dendl;
+
+    if (gc->going_down())
+      break;
+
+    utime_t end = ceph_clock_now(cct);
+    end -= start;
+    int secs = cct->_conf->rgw_gc_processor_period;
+
+    if (secs <= end.sec())
+      continue; // next round
+
+    secs -= end.sec();
+
+    lock.Lock();
+    cond.WaitInterval(cct, lock, utime_t(secs, 0));
+    lock.Unlock();
+  } while (!gc->going_down());
+
+  return NULL;
+}
+
+void RGWGC::GCWorker::stop()
+{
+  Mutex::Locker l(lock);
+  cond.Signal();
+}
+
diff --git a/src/rgw/rgw_gc.h b/src/rgw/rgw_gc.h
new file mode 100644 (file)
index 0000000..db1753a
--- /dev/null
@@ -0,0 +1,59 @@
+#ifndef CEPH_RGW_GC_H
+#define CEPH_RGW_GC_H
+
+
+#include "include/types.h"
+#include "include/atomic.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "cls/rgw/cls_rgw_types.h"
+
+class RGWGC {
+  CephContext *cct;
+  RGWRados *store;
+  int max_objs;
+  string *obj_names;
+  atomic_t down_flag;
+
+  int tag_index(const string& tag);
+
+  class GCWorker : public Thread {
+    CephContext *cct;
+    RGWGC *gc;
+    Mutex lock;
+    Cond cond;
+
+  public:
+    GCWorker(CephContext *_cct, RGWGC *_gc) : cct(_cct), gc(_gc), lock("GCWorker") {}
+    void *entry();
+    void stop();
+  };
+
+  GCWorker *worker;
+public:
+  RGWGC() : cct(NULL), store(NULL), max_objs(0), obj_names(NULL), worker(NULL) {}
+
+  void add_chain(librados::ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag);
+  int send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync);
+  int defer_chain(const string& tag, bool sync);
+  int remove(int index, const std::list<string>& tags);
+
+  void initialize(CephContext *_cct, RGWRados *_store);
+  void finalize();
+
+  int list(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);
+  void list_init(int *index) { *index = 0; }
+  int process(int index, int process_max_secs);
+  int process();
+
+  bool going_down();
+  void start_processor();
+  void stop_processor();
+};
+
+
+#endif
index 44a6f6c26a5646a8f963457758f123eebe072c6b..09ca073462093b570570a97cc0f6be6c8caeff07 100644 (file)
@@ -420,7 +420,7 @@ int main(int argc, const char **argv)
   RGWStoreManager store_manager;
 
   int r = 0;
-  if (!store_manager.init(g_ceph_context)) {
+  if (!store_manager.init(g_ceph_context, true)) {
     derr << "Couldn't init storage provider (RADOS)" << dendl;
     r = EIO;
   }
index 35b5f5d79f6958c61c59ca5402c1ec51b522ba28..ea17a1c5ca0f7ccad987e8bf204e05d851882376 100644 (file)
@@ -319,6 +319,9 @@ void RGWGetObj::execute()
   void *handle = NULL;
   utime_t start_time = s->time;
   bufferlist bl;
+  utime_t gc_invalidate_time = ceph_clock_now(s->cct);
+  gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+
 
   perfcounter->inc(l_rgw_get);
 
@@ -356,6 +359,17 @@ void RGWGetObj::execute()
     send_response(bl);
     bl.clear();
     start_time = ceph_clock_now(s->cct);
+
+    if (ofs <= end) {
+      if (start_time > gc_invalidate_time) {
+       int r = rgwstore->defer_gc(s->obj_ctx, obj);
+       if (r < 0) {
+         dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
+       }
+       gc_invalidate_time = start_time;
+        gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+      }
+    }
   }
 
   return;
index c5578b272d76e1470d6d16181502817949b86897..9b105b87fbd3835d547ecc90b5057ecdb1717a9a 100644 (file)
@@ -27,12 +27,12 @@ using namespace librados;
 
 #include "rgw_log.h"
 
+#include "rgw_gc.h"
+
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
 
-Rados *rados = NULL;
-
 static RGWCache<RGWRados> cached_rados_provider;
 static RGWRados rados_provider;
 
@@ -68,7 +68,7 @@ public:
   }
 };
 
-RGWRados *RGWRados::init_storage_provider(CephContext *cct)
+RGWRados *RGWRados::init_storage_provider(CephContext *cct, bool use_gc_thread)
 {
   int use_cache = cct->_conf->rgw_cache_enabled;
   store = NULL;
@@ -78,7 +78,7 @@ RGWRados *RGWRados::init_storage_provider(CephContext *cct)
     store = &cached_rados_provider;
   }
 
-  if (store->initialize(cct) < 0)
+  if (store->initialize(cct, use_gc_thread) < 0)
     store = NULL;
 
   return store;
@@ -93,6 +93,13 @@ void RGWRados::close_storage()
   store = NULL;
 }
 
+
+void RGWRados::finalize()
+{
+  if (use_gc_thread)
+    gc->stop_processor();
+}
+
 /** 
  * Initialize the RADOS instance and prepare to do other ops
  * Returns 0 on success, -ERR# on failure.
@@ -117,6 +124,16 @@ int RGWRados::initialize()
   if (ret < 0)
     return ret;
 
+  ret = open_gc_pool_ctx();
+  if (ret < 0)
+    return ret;
+
+  gc = new RGWGC();
+  gc->initialize(cct, this);
+
+  if (use_gc_thread)
+    gc->start_processor();
+
   return ret;
 }
 
@@ -158,6 +175,22 @@ int RGWRados::open_root_pool_ctx()
   return r;
 }
 
+int RGWRados::open_gc_pool_ctx()
+{
+  int r = rados->ioctx_create(RGW_GC_BUCKET, gc_pool_ctx);
+  if (r == -ENOENT) {
+    r = rados->pool_create(RGW_GC_BUCKET);
+    if (r == -EEXIST)
+      r = 0;
+    if (r < 0)
+      return r;
+
+    r = rados->ioctx_create(RGW_GC_BUCKET, gc_pool_ctx);
+  }
+
+  return r;
+}
+
 int RGWRados::init_watch()
 {
   int r = rados->ioctx_create(RGW_CONTROL_BUCKET, control_pool_ctx);
@@ -915,17 +948,16 @@ int RGWRados::put_obj_meta(void *ctx, rgw_obj& obj,  uint64_t size,
   RGWObjState *state = NULL;
 
   if (!exclusive) {
-    r = prepare_atomic_for_write(rctx, obj, io_ctx, oid, op, &state);
+    r = prepare_atomic_for_write(rctx, obj, op, &state, true);
     if (r < 0)
       return r;
+  } else {
+    op.create(true); // exclusive create
   }
 
-  op.create(exclusive);
-
   if (data) {
     /* if we want to overwrite the data, we also want to overwrite the
        xattrs, so just remove the object */
-    op.remove();
     op.write_full(*data);
   }
 
@@ -1298,19 +1330,57 @@ int RGWRados::complete_atomic_overwrite(RGWRadosCtx *rctx, RGWObjState *state, r
   if (!state || !state->has_manifest)
     return 0;
 
+  cls_rgw_obj_chain chain;
   map<uint64_t, RGWObjManifestPart>::iterator iter;
   for (iter = state->manifest.objs.begin(); iter != state->manifest.objs.end(); ++iter) {
     rgw_obj& mobj = iter->second.loc;
     if (mobj == obj)
       continue;
-    int ret = rctx->notify_intent(mobj, DEL_OBJ);
-    if (ret < 0) {
-      ldout(cct, 0) << "WARNING: failed to log intent ret=" << ret << dendl;
-    }
+    string oid, key;
+    rgw_bucket bucket;
+    get_obj_bucket_and_oid_key(mobj, bucket, oid, key);
+    chain.push_obj(bucket.pool, oid, key);
   }
-  return 0;
+
+  string tag = state->obj_tag.c_str();
+  int ret = gc->send_chain(chain, tag, false);  // do it async
+
+  return ret;
 }
 
+int RGWRados::defer_gc(void *ctx, rgw_obj& obj)
+{
+  RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+  rgw_bucket bucket;
+  std::string oid, key;
+  get_obj_bucket_and_oid_key(obj, bucket, oid, key);
+  if (!rctx)
+    return 0;
+
+  RGWObjState *state = NULL;
+
+  int r = get_obj_state(rctx, obj, &state);
+  if (r < 0)
+    return r;
+
+  if (!state->is_atomic) {
+    ldout(cct, 20) << "state for obj=" << obj << " is not atomic, not deferring gc operation" << dendl;
+    return -EINVAL;
+  }
+
+  if (state->obj_tag.length() == 0) {// check for backward compatibility
+    ldout(cct, 20) << "state->obj_tag is empty, not deferring gc operation" << dendl;
+    return -EINVAL;
+  }
+
+  string tag = state->obj_tag.c_str();
+
+  ldout(cct, 0) << "defer chain tag=" << tag << dendl;
+
+  return gc->defer_chain(tag, false);
+}
+
+
 /**
  * Delete an object.
  * bucket: name of the bucket storing the object
@@ -1333,7 +1403,7 @@ int RGWRados::delete_obj_impl(void *ctx, rgw_obj& obj, bool sync)
   ObjectWriteOperation op;
 
   RGWObjState *state;
-  r = prepare_atomic_for_write(rctx, obj, io_ctx, oid, op, &state);
+  r = prepare_atomic_for_write(rctx, obj, op, &state, false);
   if (r < 0)
     return r;
 
@@ -1392,7 +1462,39 @@ int RGWRados::delete_obj(void *ctx, rgw_obj& obj, bool sync)
   return r;
 }
 
-int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state)
+static void generate_fake_tag(CephContext *cct, map<string, bufferlist>& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl)
+{
+  string tag;
+
+  map<uint64_t, RGWObjManifestPart>::iterator mi = manifest.objs.begin();
+  if (mi != manifest.objs.end()) {
+    if (manifest.objs.size() > 1) // first object usually points at the head, let's skip to a more unique part
+      ++mi;
+    tag = mi->second.loc.object;
+    tag.append("_");
+  }
+
+  unsigned char md5[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  char md5_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+  MD5 hash;
+  hash.Update((const byte *)manifest_bl.c_str(), manifest_bl.length());
+
+  map<string, bufferlist>::iterator iter = attrset.find(RGW_ATTR_ETAG);
+  if (iter != attrset.end()) {
+    bufferlist& bl = iter->second;
+    hash.Update((const byte *)bl.c_str(), bl.length());
+  }
+
+  hash.Final(md5);
+  buf_to_hex(md5, CEPH_CRYPTO_MD5_DIGESTSIZE, md5_str);
+  tag.append(md5_str);
+
+  ldout(cct, 10) << "generate_fake_tag new tag=" << tag << dendl;
+
+  tag_bl.append(tag.c_str(), tag.size() + 1);
+}
+
+int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state)
 {
   RGWObjState *s = rctx->get_state(obj);
   ldout(cct, 20) << "get_obj_state: rctx=" << (void *)rctx << " obj=" << obj << " state=" << (void *)s << " s->prefetch_data=" << s->prefetch_data << dendl;
@@ -1436,6 +1538,15 @@ int RGWRados::get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io
     for (mi = s->manifest.objs.begin(); mi != s->manifest.objs.end(); ++mi) {
       ldout(cct, 10) << "manifest: ofs=" << mi->first << " loc=" << mi->second.loc << dendl;
     }
+
+    if (!s->obj_tag.length()) {
+      /*
+       * Uh oh, something's wrong, object with manifest should have tag. Let's
+       * create one out of the manifest, would be unique
+       */
+      generate_fake_tag(cct, s->attrset, s->manifest, manifest_bl, s->obj_tag);
+      s->fake_tag = true;
+    }
   }
   if (s->obj_tag.length())
     ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << s->obj_tag.c_str() << dendl;
@@ -1475,7 +1586,7 @@ int RGWRados::get_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& de
 
   if (rctx) {
     RGWObjState *state;
-    r = get_obj_state(rctx, obj, io_ctx, actual_obj, &state);
+    r = get_obj_state(rctx, obj, &state);
     if (r < 0)
       return r;
     if (!state->exists)
@@ -1492,13 +1603,13 @@ int RGWRados::get_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& de
   return 0;
 }
 
-int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
-                            string& actual_obj, ObjectOperation& op, RGWObjState **pstate)
+int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj,
+                            ObjectOperation& op, RGWObjState **pstate)
 {
   if (!rctx)
     return 0;
 
-  int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+  int r = get_obj_state(rctx, obj, pstate);
   if (r < 0)
     return r;
 
@@ -1509,7 +1620,7 @@ int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCt
     return 0;
   }
 
-  if (state->obj_tag.length() > 0) {// check for backward compatibility
+  if (state->obj_tag.length() > 0 && !state->fake_tag) {// check for backward compatibility
     op.cmpxattr(RGW_ATTR_ID_TAG, LIBRADOS_CMPXATTR_OP_EQ, state->obj_tag);
   } else {
     ldout(cct, 20) << "state->obj_tag is empty, not appending atomic test" << dendl;
@@ -1517,19 +1628,26 @@ int RGWRados::append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCt
   return 0;
 }
 
-int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
-                            string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate)
+int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj,
+                            ObjectWriteOperation& op, RGWObjState **pstate,
+                           bool reset_obj)
 {
-  int r = get_obj_state(rctx, obj, io_ctx, actual_obj, pstate);
+  int r = get_obj_state(rctx, obj, pstate);
   if (r < 0)
     return r;
 
   RGWObjState *state = *pstate;
 
-  bool need_guard = state->has_manifest || (state->obj_tag.length() != 0);
+  bool need_guard = (state->has_manifest || (state->obj_tag.length() != 0)) && (!state->fake_tag);
 
   if (!state->is_atomic) {
     ldout(cct, 20) << "prepare_atomic_for_write_impl: state is not atomic. state=" << (void *)state << dendl;
+
+    if (reset_obj) {
+      op.create(false);
+      op.remove();
+    }
+
     return 0;
   }
 
@@ -1579,10 +1697,15 @@ int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, lib
     // FIXME: need to add FAIL_NOTEXIST_OK for racing deletion
   }
 
+  if (reset_obj) {
+    op.create(false);
+    op.remove();
+  }
+
   string tag;
   append_rand_alpha(cct, tag, tag, 32);
   bufferlist bl;
-  bl.append(tag);
+  bl.append(tag.c_str(), tag.size() + 1);
 
   op.setxattr(RGW_ATTR_ID_TAG, bl);
 
@@ -1597,8 +1720,9 @@ int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, lib
   return 0;
 }
 
-int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
-                            string& actual_obj, ObjectWriteOperation& op, RGWObjState **pstate)
+int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj,
+                            ObjectWriteOperation& op, RGWObjState **pstate,
+                           bool reset_obj)
 {
   if (!rctx) {
     *pstate = NULL;
@@ -1606,7 +1730,7 @@ int RGWRados::prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados
   }
 
   int r;
-  r = prepare_atomic_for_write_impl(rctx, obj, io_ctx, actual_obj, op, pstate);
+  r = prepare_atomic_for_write_impl(rctx, obj, op, pstate, reset_obj);
 
   return r;
 }
@@ -1638,16 +1762,16 @@ int RGWRados::set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl
   if (r < 0)
     return r;
 
-  io_ctx.locator_set_key(key);
-
   ObjectWriteOperation op;
   RGWObjState *state = NULL;
 
-  r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state);
+  r = append_atomic_test(rctx, obj, op, &state);
   if (r < 0)
     return r;
 
   op.setxattr(name, bl);
+
+  io_ctx.locator_set_key(key);
   r = io_ctx.operate(actual_obj, &op);
 
   if (state && r >= 0)
@@ -1693,7 +1817,7 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj,
   ObjectWriteOperation op;
   RGWObjState *state = NULL;
 
-  r = append_atomic_test(rctx, obj, io_ctx, actual_obj, op, &state);
+  r = append_atomic_test(rctx, obj, op, &state);
   if (r < 0)
     return r;
 
@@ -1803,7 +1927,7 @@ int RGWRados::prepare_get_obj(void *ctx, rgw_obj& obj,
     rctx = new_ctx;
   }
 
-  r = get_obj_state(rctx, obj, state->io_ctx, oid, &astate);
+  r = get_obj_state(rctx, obj, &astate);
   if (r < 0)
     goto done_err;
 
@@ -2013,7 +2137,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
     }
   }
   RGWObjState *state;
-  r = prepare_atomic_for_write(rctx, dst_obj, io_ctx, dst_oid, op, &state);
+  r = prepare_atomic_for_write(rctx, dst_obj, op, &state, true);
   if (r < 0)
     return r;
 
@@ -2131,7 +2255,7 @@ int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj,
     rctx = new_ctx;
   }
 
-  int r = get_obj_state(rctx, obj, state->io_ctx, oid, &astate);
+  int r = get_obj_state(rctx, obj, &astate);
   if (r < 0)
     goto done_ret;
 
@@ -2169,7 +2293,7 @@ int RGWRados::get_obj(void *ctx, void **handle, rgw_obj& obj,
 
   if (reading_from_head) {
     /* only when reading from the head object do we need to do the atomic test */
-    r = append_atomic_test(rctx, read_obj, state->io_ctx, oid, op, &astate);
+    r = append_atomic_test(rctx, read_obj, op, &astate);
     if (r < 0)
       goto done_ret;
   }
@@ -2235,7 +2359,7 @@ int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist&
 
   ObjectReadOperation op;
 
-  r = append_atomic_test(rctx, obj, io_ctx, oid, op, &astate);
+  r = append_atomic_test(rctx, obj, op, &astate);
   if (r < 0)
     return r;
 
@@ -2535,6 +2659,34 @@ int RGWRados::pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<RGWObjEnt>&
   return objs.size();
 }
 
+int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
+{
+  return gc_pool_ctx.operate(oid, op);
+}
+
+int RGWRados::gc_aio_operate(string& oid, librados::ObjectWriteOperation *op)
+{
+  AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+  int r = gc_pool_ctx.aio_operate(oid, c, op);
+  c->release();
+  return r;
+}
+
+int RGWRados::gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl)
+{
+  return gc_pool_ctx.operate(oid, op, pbl);
+}
+
+int RGWRados::list_gc_objs(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
+{
+  return gc->list(index, marker, max, result, truncated);
+}
+
+int RGWRados::process_gc()
+{
+  return gc->process();
+}
+
 int RGWRados::cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid)
 {
   bufferlist in;
index a63a531f435ee63617edd30956d1222badf8c0cf..50f07b893b46b34951b6cff92e126704e77d9904 100644 (file)
@@ -10,6 +10,7 @@
 class RGWWatcher;
 class SafeTimer;
 class ACLOwner;
+class RGWGC;
 
 static inline void prepend_bucket_marker(rgw_bucket& bucket, string& orig_oid, string& oid)
 {
@@ -119,6 +120,7 @@ struct RGWObjState {
   uint64_t size;
   time_t mtime;
   bufferlist obj_tag;
+  bool fake_tag;
   RGWObjManifest manifest;
   bool has_manifest;
   string shadow_obj;
@@ -127,7 +129,7 @@ struct RGWObjState {
   bool prefetch_data;
 
   map<string, bufferlist> attrset;
-  RGWObjState() : is_atomic(false), has_attrs(0), exists(false), has_manifest(false), prefetch_data(false) {}
+  RGWObjState() : is_atomic(false), has_attrs(0), exists(false), fake_tag(false), has_manifest(false), prefetch_data(false) {}
 
   bool get_attr(string name, bufferlist& dest) {
     map<string, bufferlist>::iterator iter = attrset.find(name);
@@ -141,6 +143,7 @@ struct RGWObjState {
   void clear() {
     has_attrs = false;
     exists = false;
+    fake_tag = false;
     size = 0;
     mtime = 0;
     obj_tag.clear();
@@ -197,8 +200,11 @@ struct RGWPoolIterCtx {
   
 class RGWRados
 {
+  friend class RGWGC;
+
   /** Open the pool used as root for this gateway */
   int open_root_pool_ctx();
+  int open_gc_pool_ctx();
 
   int open_bucket_ctx(rgw_bucket& bucket, librados::IoCtx&  io_ctx);
 
@@ -223,6 +229,9 @@ class RGWRados
     }
   };
 
+  RGWGC *gc;
+  bool use_gc_thread;
+
 
   int num_watchers;
   RGWWatcher **watchers;
@@ -233,13 +242,15 @@ class RGWRados
   Mutex bucket_id_lock;
   uint64_t max_bucket_id;
 
-  int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx, string& actual_obj, RGWObjState **state);
-  int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
-                         string& actual_obj, librados::ObjectOperation& op, RGWObjState **state);
-  int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
-                         string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate);
-  int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj, librados::IoCtx& io_ctx,
-                         string& actual_obj, librados::ObjectWriteOperation& op, RGWObjState **pstate);
+  int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state);
+  int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj,
+                         librados::ObjectOperation& op, RGWObjState **state);
+  int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj,
+                         librados::ObjectWriteOperation& op, RGWObjState **pstate,
+                        bool reset_obj);
+  int prepare_atomic_for_write(RGWRadosCtx *rctx, rgw_obj& obj,
+                         librados::ObjectWriteOperation& op, RGWObjState **pstate,
+                        bool reset_obj);
 
   void atomic_write_finish(RGWObjState *state, int r) {
     if (state && r == -ECANCELED) {
@@ -279,25 +290,29 @@ class RGWRados
 
 protected:
   CephContext *cct;
+  librados::Rados *rados;
+  librados::IoCtx gc_pool_ctx;        // .rgw.gc
 
 public:
-  RGWRados() : lock("rados_timer_lock"), timer(NULL), num_watchers(0), watchers(NULL), watch_handles(NULL),
-               bucket_id_lock("rados_bucket_id"), max_bucket_id(0) {}
+  RGWRados() : lock("rados_timer_lock"), timer(NULL), gc(NULL), use_gc_thread(false),
+               num_watchers(0), watchers(NULL), watch_handles(0),
+               bucket_id_lock("rados_bucket_id"), max_bucket_id(0), rados(NULL) {}
   virtual ~RGWRados() {}
 
   void tick();
 
   CephContext *ctx() { return cct; }
   /** do all necessary setup of the storage device */
-  int initialize(CephContext *_cct) {
+  int initialize(CephContext *_cct, bool _use_gc_thread) {
     cct = _cct;
+    use_gc_thread = _use_gc_thread;
     return initialize();
   }
   /** Initialize the RADOS instance and prepare to do other ops */
   virtual int initialize();
-  virtual void finalize() {}
+  virtual void finalize();
 
-  static RGWRados *init_storage_provider(CephContext *cct);
+  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread);
   static void close_storage();
   static RGWRados *store;
 
@@ -585,6 +600,13 @@ public:
   /// clean up/process any temporary objects older than given date[/time]
   int remove_temp_objects(string date, string time);
 
+  int gc_operate(string& oid, librados::ObjectWriteOperation *op);
+  int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op);
+  int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl);
+
+  int list_gc_objs(int *index, string& marker, uint32_t max, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);
+  int process_gc();
+  int defer_gc(void *ctx, rgw_obj& obj);
  private:
   int process_intent_log(rgw_bucket& bucket, string& oid,
                         time_t epoch, int flags, bool purge);
@@ -633,18 +655,20 @@ public:
 
   uint64_t instance_id();
   uint64_t next_bucket_id();
+
 };
 
 class RGWStoreManager {
   RGWRados *store;
 public:
-  RGWStoreManager() : store(NULL) {}
+  RGWStoreManager(): store(NULL) {}
   ~RGWStoreManager() {
-    if (store)
+    if (store) {
       RGWRados::close_storage();
+    }
   }
-  RGWRados *init(CephContext *cct) {
-    store = RGWRados::init_storage_provider(cct);
+  RGWRados *init(CephContext *cct, bool use_gc_thread) {
+    store = RGWRados::init_storage_provider(cct, use_gc_thread);
     return store;
   }
 };