]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
LifeCycle feature 6331/head
authorJi Chen <insomnia@139.com>
Wed, 21 Oct 2015 02:10:39 +0000 (10:10 +0800)
committerJi Chen <insomnia@139.com>
Tue, 8 Dec 2015 03:22:26 +0000 (11:22 +0800)
As same as amazon S3 interface,"PUT Bucket lifecycle" and
"DELETE Bucket lifecycle" have been implemented,
"GET Bucket lifecycle" not realized yet as S3cmd has not
realize it also.
The feature`s main point is to remove expire file per day.
Files transfer from hot layer to cold layer is not supported.
ToDo:Maybe to transfer from replicate pool to EC pool or
from ssd to sata pool will be valuable.

Now put all buckets which should do lifecycle into shard
objects in .rgw.lc pool.

lifecycle config file format:
<LifecycleConfiguration>
    <Rule>
        <ID>sample-rule</ID>
        <Prefix></Prefix>
        <Status>enable</Status>
        <Expiration>
           <Days>1</Days>
        </Expiration>
    </Rule>
</LifecycleConfiguration>

Signed-off-by: Ji Chen <insomnia@139.com>
27 files changed:
src/CMakeLists.txt
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_ops.h
src/cls/rgw/cls_rgw_types.h
src/common/config_opts.h
src/rgw/Makefile.am
src/rgw/rgw_admin.cc
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_lc.cc [new file with mode: 0644]
src/rgw/rgw_lc.h [new file with mode: 0644]
src/rgw/rgw_lc_s3.cc [new file with mode: 0644]
src/rgw/rgw_lc_s3.h [new file with mode: 0644]
src/rgw/rgw_main.cc
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest.cc
src/rgw/rgw_rest.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/test/cli/radosgw-admin/help.t
src/test/test_rgw_admin_opstate.cc

index 0d90ee231c280162344cccf724d9057d01447c8c..554b015bd02f1a03f22e66d722e56b7565e45741 100644 (file)
@@ -1007,6 +1007,8 @@ if(${WITH_RADOSGW})
     rgw/rgw_acl.cc
     rgw/rgw_acl_s3.cc
     rgw/rgw_acl_swift.cc
+    rgw/rgw_lc.cc
+    rgw/rgw_lc_s3.cc
     rgw/rgw_client_io.cc
     rgw/rgw_fcgi.cc
     rgw/rgw_xml.cc
index b4892fde9d671f1d9f4ec7e8bf0618eeea6477e2..488dad929fa64a0dccf67dc9b4c7b0be437993a3 100644 (file)
@@ -48,6 +48,12 @@ cls_method_handle_t h_rgw_user_usage_log_trim;
 cls_method_handle_t h_rgw_gc_set_entry;
 cls_method_handle_t h_rgw_gc_list;
 cls_method_handle_t h_rgw_gc_remove;
+cls_method_handle_t h_rgw_lc_set_entry;
+cls_method_handle_t h_rgw_lc_rm_entry;
+cls_method_handle_t h_rgw_lc_get_entry;
+cls_method_handle_t h_rgw_lc_put_head;
+cls_method_handle_t h_rgw_lc_get_head;
+cls_method_handle_t h_rgw_lc_list_entry;
 
 
 #define ROUND_BLOCK_SIZE 4096
@@ -3054,6 +3060,142 @@ static int rgw_cls_gc_remove(cls_method_context_t hctx, bufferlist *in, bufferli
   return gc_remove(hctx, op.tags);
 }
 
+static int rgw_cls_lc_set_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  bufferlist::iterator in_iter = in->begin();
+
+  cls_rgw_lc_set_entry_op op;
+  try {
+    ::decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n");
+    return -EINVAL;
+  }
+  
+  bufferlist bl;
+  ::encode(op.entry, bl);
+
+  int ret = cls_cxx_map_set_val(hctx, op.entry.first, &bl);
+  return ret;
+}
+
+static int rgw_cls_lc_rm_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  bufferlist::iterator in_iter = in->begin();
+
+  cls_rgw_lc_rm_entry_op op;
+  try {
+    ::decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n");
+    return -EINVAL;
+  }
+  
+  bufferlist bl;
+  ::encode(op.entry, bl);
+
+  int ret = cls_cxx_map_remove_key(hctx, op.entry.first);
+  return ret;
+}
+
+static int rgw_cls_lc_get_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  bufferlist::iterator in_iter = in->begin();
+  cls_rgw_lc_get_entry_ret op_ret;
+  cls_rgw_lc_get_entry_op op;
+  try {
+    ::decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n");
+    return -EINVAL;
+  }
+  
+  map<string, bufferlist> vals;
+  string filter_prefix;
+  int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, 1, &vals);
+  if (ret < 0)
+    return ret;
+  map<string, bufferlist>::iterator it;
+  pair<string, int> entry;  
+  if (!vals.empty()) {
+    it=vals.begin();
+    in_iter = it->second.begin();
+    try {
+      ::decode(entry, in_iter);
+    } catch (buffer::error& err) {
+      CLS_LOG(1, "ERROR: rgw_cls_lc_get_entry(): failed to decode entry\n");
+      return -EINVAL;
+    }
+  }
+  op_ret.entry = entry;
+  ::encode(op_ret, *out);
+  return 0;
+}
+
+static int rgw_cls_lc_list_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  cls_rgw_lc_list_entry_ret op_ret; 
+  bufferlist::iterator iter;
+  map<string, bufferlist> vals;
+  int ret = cls_cxx_map_get_all_vals(hctx, &vals);
+  if (ret < 0)
+    return ret;
+  map<string, bufferlist>::iterator it;
+  pair<string, int> entry;
+  for (it = vals.begin(); it != vals.end(); it++) {
+    iter = it->second.begin();
+    try {
+    ::decode(entry, iter);
+    } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: rgw_cls_lc_list_entry(): failed to decode entry\n");
+    return -EINVAL;
+   }
+   op_ret.entries.insert(entry);
+  }
+  ::encode(op_ret, *out);
+  return 0;
+}
+
+static int rgw_cls_lc_put_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  bufferlist::iterator in_iter = in->begin();
+
+  cls_rgw_lc_put_head_op op;
+  try {
+    ::decode(op, in_iter);
+  } catch (buffer::error& err) {
+    CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n");
+    return -EINVAL;
+  }
+  
+  bufferlist bl;
+  ::encode(op.head, bl);
+  int ret = cls_cxx_map_write_header(hctx,&bl);
+  return ret;
+}
+
+static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in,  bufferlist *out)
+{
+  bufferlist bl;
+  int ret = cls_cxx_map_read_header(hctx, &bl);
+  if (ret < 0)
+    return ret;
+  cls_rgw_lc_obj_head head;
+  if (bl.length() != 0) {   
+    bufferlist::iterator iter = bl.begin();
+    try {
+      ::decode(head, iter);
+    } catch (buffer::error& err) {
+      CLS_LOG(0, "ERROR: rgw_cls_lc_get_head(): failed to decode entry %s\n",err.what());
+      return -EINVAL;
+    }
+  }
+  cls_rgw_lc_get_head_ret op_ret;
+  op_ret.head = head;
+  ::encode(op_ret, *out);
+  return 0;
+}
+
 void __cls_init()
 {
   CLS_LOG(1, "Loaded rgw class!");
@@ -3096,6 +3238,14 @@ void __cls_init()
   cls_register_cxx_method(h_class, "gc_list", CLS_METHOD_RD, rgw_cls_gc_list, &h_rgw_gc_list);
   cls_register_cxx_method(h_class, "gc_remove", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_gc_remove, &h_rgw_gc_remove);
 
+  /* lifecycle bucket list */
+  cls_register_cxx_method(h_class, "lc_set_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_set_entry, &h_rgw_lc_set_entry);
+  cls_register_cxx_method(h_class, "lc_rm_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_rm_entry, &h_rgw_lc_rm_entry);
+  cls_register_cxx_method(h_class, "lc_get_entry", CLS_METHOD_RD, rgw_cls_lc_get_entry, &h_rgw_lc_get_entry);
+  cls_register_cxx_method(h_class, "lc_put_head", CLS_METHOD_RD| CLS_METHOD_WR, rgw_cls_lc_put_head, &h_rgw_lc_put_head);
+  cls_register_cxx_method(h_class, "lc_get_head", CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head);
+  cls_register_cxx_method(h_class, "lc_list_entry", CLS_METHOD_RD, rgw_cls_lc_list_entry, &h_rgw_lc_list_entry);
+
   return;
 }
 
index e6ac56b822c3601ba731fdbaa7e50982ec9632e6..8fcbe4db262047059925317f31c3235bd8453592 100644 (file)
@@ -627,3 +627,93 @@ void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list<string>& t
   ::encode(call, in);
   op.exec("rgw", "gc_remove", in);
 }
+
+int cls_rgw_lc_get_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head)
+{
+  bufferlist in, out;
+  int r = io_ctx.exec(oid, "rgw", "lc_get_head", in, out);
+  if (r < 0)
+    return r;
+
+  cls_rgw_lc_get_head_ret ret;
+  try {
+    bufferlist::iterator iter = out.begin();
+    ::decode(ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+  head = ret.head;
+
+ return r;
+}
+
+int cls_rgw_lc_put_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head)
+{
+  bufferlist in, out;
+  cls_rgw_lc_put_head_op call;
+  call.head = head;
+  ::encode(call, in);
+  int r = io_ctx.exec(oid, "rgw", "lc_put_head", in, out);
+  return r;
+}
+
+int cls_rgw_lc_get_entry(IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry)
+{
+  bufferlist in, out;
+  cls_rgw_lc_get_entry_op call;
+  call.marker = marker;
+  ::encode(call, in);
+  int r = io_ctx.exec(oid, "rgw", "lc_get_entry", in, out);
+  if (r < 0)
+    return r;
+
+  cls_rgw_lc_get_entry_ret ret;
+  try {
+    bufferlist::iterator iter = out.begin();
+    ::decode(ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+  entry = ret.entry;
+
+ return r;
+}
+
+int cls_rgw_lc_rm_entry(IoCtx& io_ctx, string& oid, pair<string, int>& entry)
+{
+  bufferlist in, out;
+  cls_rgw_lc_rm_entry_op call;
+  call.entry = entry;
+  ::encode(call, in);
+  int r = io_ctx.exec(oid, "rgw", "lc_rm_entry", in, out);
+ return r;
+}
+
+int cls_rgw_lc_set_entry(IoCtx& io_ctx, string& oid, pair<string, int>& entry)
+{
+   bufferlist in, out;
+   cls_rgw_lc_rm_entry_op call;
+   call.entry = entry;
+   ::encode(call, in);
+   int r = io_ctx.exec(oid, "rgw", "lc_set_entry", in, out);
+  return r;
+}
+
+int cls_rgw_lc_list(IoCtx& io_ctx, string& oid, map<string, int>& entries)
+{
+  bufferlist in, out;
+  int r = io_ctx.exec(oid, "rgw", "lc_list_entry", in, out);
+  if (r < 0)
+    return r;
+
+  cls_rgw_lc_list_entry_ret ret;
+  try {
+    bufferlist::iterator iter = out.begin();
+    ::decode(ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+  entries.insert(ret.entries.begin(),ret.entries.end());
+
+ return r;
+}
index 37c856fa07f636ae06b603b4be3977531a4ba159..f5a8833acd75fb3a0b1c699311962361805cab74 100644 (file)
@@ -471,4 +471,17 @@ int cls_rgw_gc_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32
 
 void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list<string>& tags);
 
+/* lifecycle */
+int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head);
+int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head);
+int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry);
+int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry);
+int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry);
+int cls_rgw_lc_list(librados::IoCtx& io_ctx, string& oid, map<string, int>& entries);
+
+
+
+
+
+
 #endif
index 0a0686fbccb16cb92082540ed7021a4ad1242532..ce8f6f1053d670070c30566fb686c7e0fbaa81f1 100644 (file)
@@ -861,5 +861,139 @@ struct cls_rgw_bi_log_list_ret {
 };
 WRITE_CLASS_ENCODER(cls_rgw_bi_log_list_ret)
 
+struct cls_rgw_lc_get_entry_op {
+  string marker;
+  cls_rgw_lc_get_entry_op() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(marker, bl);   
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(marker, bl);   
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_op)
+
+struct cls_rgw_lc_get_entry_ret {
+  pair<string, int> entry;
+
+  cls_rgw_lc_get_entry_ret() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(entry, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(entry, bl);
+    DECODE_FINISH(bl);
+  }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_ret)
+
+struct cls_rgw_lc_rm_entry_op {
+  pair<string, int> entry;
+  cls_rgw_lc_rm_entry_op() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(entry, bl);   
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(entry, bl);   
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_rm_entry_op)
+
+struct cls_rgw_lc_set_entry_op {
+  pair<string, int> entry;
+  cls_rgw_lc_set_entry_op() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(entry, bl);   
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(entry, bl);   
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_set_entry_op)
+
+struct cls_rgw_lc_put_head_op {
+  cls_rgw_lc_obj_head head;
+
+
+  cls_rgw_lc_put_head_op() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(head, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(head, bl);
+    DECODE_FINISH(bl);
+  }
+  
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_put_head_op)
+
+struct cls_rgw_lc_get_head_ret {
+  cls_rgw_lc_obj_head head;
+
+  cls_rgw_lc_get_head_ret() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(head, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(head, bl);
+    DECODE_FINISH(bl);
+  }
+  
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_get_head_ret)
+
+struct cls_rgw_lc_list_entry_ret {
+  map<string, int> entries;
+
+  cls_rgw_lc_list_entry_ret() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(entries, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(entries, bl);
+    DECODE_FINISH(bl);
+  }
+  
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_list_entry_ret)
 
 #endif
index dfa9286c0008e2e970d68b498b0537e429b1d63f..812cc38cb09b8986ec60c3cd57c4003f60dc7653 100644 (file)
@@ -891,4 +891,28 @@ struct cls_rgw_gc_obj_info
 };
 WRITE_CLASS_ENCODER(cls_rgw_gc_obj_info)
 
+struct cls_rgw_lc_obj_head
+{ 
+  time_t start_date;
+  string marker;
+
+  cls_rgw_lc_obj_head() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(start_date, bl);
+    ::encode(marker, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(start_date, bl);
+    ::decode(marker, bl);
+    DECODE_FINISH(bl);
+  }
+  
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_obj_head)
+
 #endif
index 1a7fe61ef8b697405ca403a67c09f80b3de742ea..2a72f5b5d890c7ca76970fc8f5bfb825265f7c5f 100644 (file)
@@ -1059,6 +1059,8 @@ OPTION(rgw_bucket_index_max_aio, OPT_U32, 8)
  */
 OPTION(rgw_enable_quota_threads, OPT_BOOL, true)
 OPTION(rgw_enable_gc_threads, OPT_BOOL, true)
+OPTION(rgw_enable_lc_threads, OPT_BOOL, true)
+
 
 OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
 OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
@@ -1069,6 +1071,11 @@ OPTION(rgw_host, OPT_STR, "")  // host for radosgw, can be an IP, default is 0.0
 OPTION(rgw_port, OPT_STR, "")  // port to listen, format as "8080" "5000", if not specified, rgw will not run external fcgi
 OPTION(rgw_dns_name, OPT_STR, "")
 OPTION(rgw_content_length_compat, OPT_BOOL, false) // Check both HTTP_CONTENT_LENGTH and CONTENT_LENGTH in fcgi env
+OPTION(rgw_lifecycle_enabled, OPT_BOOL, true) //rgw lifecycle enabled
+OPTION(rgw_lifecycle_thread, OPT_INT, 1) //start lifecycle thread number per radosgw
+OPTION(rgw_lifecycle_work_time, OPT_STR, "00:00-06:00") //job process lc  at 00:00-06:00s
+OPTION(rgw_lc_lock_max_time, OPT_INT, 60)  // total run time for a single gc processor work
+OPTION(rgw_lc_max_objs, OPT_INT, 32)
 OPTION(rgw_script_uri, OPT_STR, "") // alternative value for SCRIPT_URI if not set in request
 OPTION(rgw_request_uri, OPT_STR,  "") // alternative value for REQUEST_URI if not set in request
 OPTION(rgw_swift_url, OPT_STR, "")             // the swift url, being published by the internal swift auth
index 98cd4b00484fa20cdb2fbf114025d0800d7d8074..c3f835b80a582e51ba8266ac7752d071cd43ae29 100644 (file)
@@ -17,6 +17,8 @@ librgw_la_SOURCES =  \
        rgw/rgw_acl.cc \
        rgw/rgw_acl_s3.cc \
        rgw/rgw_acl_swift.cc \
+       rgw/rgw_lc.cc \
+       rgw/rgw_lc_s3.cc \
        rgw/rgw_client_io.cc \
        rgw/rgw_fcgi.cc \
        rgw/rgw_xml.cc \
@@ -128,6 +130,8 @@ noinst_HEADERS += \
        rgw/rgw_acl.h \
        rgw/rgw_acl_s3.h \
        rgw/rgw_acl_swift.h \
+       rgw/rgw_lc.h \
+       rgw/rgw_lc_s3.h \
        rgw/rgw_client_io.h \
        rgw/rgw_fcgi.h \
        rgw/rgw_xml.h \
index 4eee6edb027214015ab5b7807d30d9d8a37502ab..b904bdc029304f285aa06a90cf7943006c789050 100644 (file)
@@ -27,6 +27,7 @@ using namespace std;
 #include "rgw_rados.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h"
+#include "rgw_lc.h"
 #include "rgw_log.h"
 #include "rgw_formats.h"
 #include "rgw_usage.h"
@@ -95,6 +96,8 @@ void _usage()
   cout << "  gc list                    dump expired garbage collection objects (specify\n";
   cout << "                             --include-all to list all entries, including unexpired)\n";
   cout << "  gc process                 manually process garbage\n";
+  cout << "  lc list                    list all bucket lifecycle progress\n";
+  cout << "  lc process                 manually process lifecycle\n";
   cout << "  metadata get               get metadata info\n";
   cout << "  metadata put               put metadata info\n";
   cout << "  metadata rm                remove metadata info\n";
@@ -239,6 +242,8 @@ enum {
   OPT_QUOTA_DISABLE,
   OPT_GC_LIST,
   OPT_GC_PROCESS,
+  OPT_LC_LIST,
+  OPT_LC_PROCESS,
   OPT_ORPHANS_FIND,
   OPT_ORPHANS_FINISH,
   OPT_REGION_GET,
@@ -285,6 +290,7 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       strcmp(cmd, "gc") == 0 || 
       strcmp(cmd, "key") == 0 ||
       strcmp(cmd, "log") == 0 ||
+      strcmp(cmd, "lc") == 0 ||
       strcmp(cmd, "mdlog") == 0 ||
       strcmp(cmd, "metadata") == 0 ||
       strcmp(cmd, "object") == 0 ||
@@ -455,6 +461,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       return OPT_GC_LIST;
     if (strcmp(cmd, "process") == 0)
       return OPT_GC_PROCESS;
+  } else if (strcmp(prev_cmd, "lc") == 0) {
+    if (strcmp(cmd, "list") == 0)
+      return OPT_LC_LIST;
+    if (strcmp(cmd, "process") == 0)
+      return OPT_LC_PROCESS;
   } else if (strcmp(prev_cmd, "orphans") == 0) {
     if (strcmp(cmd, "find") == 0)
       return OPT_ORPHANS_FIND;
@@ -1477,7 +1488,7 @@ int main(int argc, char **argv)
   if (raw_storage_op) {
     store = RGWStoreManager::get_raw_storage(g_ceph_context);
   } else {
-    store = RGWStoreManager::get_storage(g_ceph_context, false, false);
+    store = RGWStoreManager::get_storage(g_ceph_context, false, false, false);
   }
   if (!store) {
     cerr << "couldn't init storage provider" << std::endl;
@@ -2655,6 +2666,33 @@ next:
     }
   }
 
+  if (opt_cmd == OPT_LC_LIST) {     
+    formatter->open_array_section("life cycle progress");
+    map<string, int> bucket_lc_map;
+    int ret = store->list_lc_progress(bucket_lc_map);
+    if (ret < 0) {
+      cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl;
+      return 1;
+    }
+    map<string, int>::iterator iter;
+    for (iter = bucket_lc_map.begin(); iter != bucket_lc_map.end(); ++iter) {
+      formatter->open_object_section("bucket_lc_info");
+      formatter->dump_string("bucket", iter->first);
+      string lc_status = LC_STATUS[iter->second];
+      formatter->dump_string("status", lc_status);
+      formatter->close_section(); // objs          
+      formatter->flush(cout);
+    }
+  }
+  
+  if (opt_cmd == OPT_LC_PROCESS) {
+    int ret = store->process_lc();
+    if (ret < 0) {
+      cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl;
+      return 1;
+    }
+  }
+
   if (opt_cmd == OPT_ORPHANS_FIND) {
     RGWOrphanSearch search(store, max_concurrent_ios, orphan_stale_secs);
 
index bb8ecf384e0ed592c5bce1f6eb3618b3b797c2f5..6a050e9857ed25830bc55baa9d3e2feb40d32c1f 100644 (file)
@@ -602,6 +602,7 @@ int RGWHTTPArgs::parse()
       }
 
       if ((name.compare("acl") == 0) ||
+          (name.compare("lifecycle") == 0) ||
           (name.compare("cors") == 0) ||
           (name.compare("location") == 0) ||
           (name.compare("logging") == 0) ||
index bff070a06ae4c62208f5fbaf528270450ce33e1d..e3ddf4e4af1a5b56f94f0556fb2bebd06b178ba9 100644 (file)
@@ -56,6 +56,7 @@ using ceph::crypto::MD5;
 #define RGW_SYS_PARAM_PREFIX "rgwx-"
 
 #define RGW_ATTR_ACL           RGW_ATTR_PREFIX "acl"
+#define RGW_ATTR_LC            RGW_ATTR_PREFIX "lc"
 #define RGW_ATTR_CORS          RGW_ATTR_PREFIX "cors"
 #define RGW_ATTR_ETAG          RGW_ATTR_PREFIX "etag"
 #define RGW_ATTR_BUCKETS       RGW_ATTR_PREFIX "buckets"
diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc
new file mode 100644 (file)
index 0000000..99d79a9
--- /dev/null
@@ -0,0 +1,535 @@
+#include <string.h>\r
+#include <iostream>\r
+#include <map>\r
+\r
+#include "include/types.h"\r
+\r
+#include "common/Formatter.h"\r
+#include <common/errno.h>\r
+#include "auth/Crypto.h"\r
+#include "include/rados/librados.hpp"\r
+#include "cls/rgw/cls_rgw_client.h"\r
+#include "cls/refcount/cls_refcount_client.h"\r
+#include "cls/lock/cls_lock_client.h"\r
+#include <common/dout.h>\r
+#include "rgw_common.h"\r
+#include "rgw_bucket.h"\r
+#include "rgw_lc.h"\r
+#include "rgw_lc_s3.h"\r
+\r
+\r
+\r
+#define dout_subsys ceph_subsys_rgw\r
+\r
+const char* LC_STATUS[] = {\r
+      "UNINITIAL",\r
+      "PROCESSING",\r
+      "FAILED",\r
+      "COMPLETE"\r
+};\r
+\r
+using namespace std;\r
+using namespace librados;\r
+void RGWLifecycleConfiguration::add_rule(LCRule *rule)\r
+{\r
+  string id;\r
+  rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups\r
+  rule_map.insert(pair<string, LCRule>(id, *rule));\r
+  _add_rule(rule);\r
+}\r
+\r
+void RGWLifecycleConfiguration::_add_rule(LCRule *rule)\r
+{\r
+  string prefix;\r
+  LCExpiration expiration;\r
+  int days;\r
+  if (!rule->get_prefix(prefix)) {\r
+    ldout(cct, 5) << "ERROR: rule->get_prefix() failed" << dendl;\r
+  }\r
+  if (!rule->get_expiration(expiration)) {\r
+    ldout(cct, 5) << "ERROR: rule->get_expiration() failed" << dendl;\r
+  }\r
+  if (!expiration.get_days(&days)) {\r
+    ldout(cct, 5) << "ERROR: expiration->get_days() failed" << dendl;\r
+  }\r
+  prefix_map[prefix] = days;\r
+}\r
+\r
+void *RGWLC::LCWorker::entry() {\r
+  do {\r
+    utime_t start = ceph_clock_now(cct);\r
+    if (should_work(start)) {\r
+      dout(5) << "life cycle: start" << dendl;\r
+      int r = lc->process();\r
+      if (r < 0) {\r
+        dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;\r
+      }\r
+      dout(5) << "life cycle: stop" << dendl;\r
+    }\r
+    if (lc->going_down())\r
+      break;\r
+\r
+    utime_t end = ceph_clock_now(cct);   \r
+    int secs = shedule_next_start_time(end);\r
+    time_t next_time = end + secs;\r
+    char buf[30];\r
+    char *nt = ctime_r(&next_time, buf);\r
+    dout(5) << "shedule life cycle next start time: " << nt <<dendl;\r
+\r
+    lock.Lock();\r
+    cond.WaitInterval(cct, lock, utime_t(secs, 0));\r
+    lock.Unlock();\r
+  } while (!lc->going_down());\r
+\r
+  return NULL;\r
+}\r
+\r
+void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {\r
+  cct = _cct;\r
+  store = _store;\r
+  max_objs = cct->_conf->rgw_lc_max_objs;\r
+  if (max_objs > HASH_PRIME)
+    max_objs = HASH_PRIME;
+
+  obj_names = new string[max_objs];
+
+  for (int i = 0; i < max_objs; i++) {
+    obj_names[i] = lc_oid_prefix;\r
+    char buf[32];
+    snprintf(buf, 32, ".%d", i);
+    obj_names[i].append(buf);
+  }\r
+}\r
+\r
+void RGWLC::finalize()\r
+{\r
+  delete[] obj_names;\r
+}\r
+\r
+bool RGWLC::if_already_run_today(time_t& start_date)\r
+{\r
+  struct tm bdt;\r
+  time_t begin_of_day;\r
+  utime_t now = ceph_clock_now(cct);\r
+  localtime_r(&start_date, &bdt);\r
+  bdt.tm_hour = 0;\r
+  bdt.tm_min = 0;\r
+  bdt.tm_sec = 0;\r
+  begin_of_day = mktime(&bdt);\r
+  if (now - begin_of_day < 24*60*60)\r
+    return true;\r
+  else   \r
+    return false;\r
+}\r
+\r
+static std::vector<std::string> &split(const std::string &s, char delim, std::vector<std::string> &elems) {\r
+  std::stringstream ss(s);\r
+  std::string item;\r
+  while (std::getline(ss, item, delim)) {\r
+      elems.push_back(item);\r
+  }\r
+  return elems;\r
+}\r
+\r
+static std::vector<std::string> split(const std::string &s, char delim) {\r
+  std::vector<std::string> elems;\r
+  split(s, delim, elems);\r
+  return elems;\r
+}\r
+\r
+int RGWLC::bucket_lc_prepare(int index)\r
+{\r
+  map<string, int > entries;\r
+  int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], entries);\r
+  if (ret < 0)\r
+      return ret;\r
+  map<string, int>::iterator iter;\r
+  for (iter = entries.begin(); iter != entries.end(); ++iter) {\r
+    pair<string, int > entry(iter->first, lc_uninitial);\r
+    ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index],  entry);\r
+    if (ret < 0) {\r
+    dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;\r
+    break;\r
+    }\r
+  }\r
+  return ret;\r
+}\r
+int RGWLC::bucket_lc_process(string& shard_id)\r
+{\r
+  RGWLifecycleConfiguration  config(cct);\r
+  RGWBucketInfo bucket_info;\r
+  map<string, bufferlist> bucket_attrs;\r
+  string prefix, delimiter, marker, next_marker, no_ns, end_marker, list_versions;\r
+  bool is_truncated;\r
+  bool default_config = false;\r
+  int default_days = 0;\r
+  vector<RGWObjEnt> objs; \r
+  RGWObjectCtx obj_ctx(store);\r
+  map<string, bool> common_prefixes;\r
+  vector<std::string> result;\r
+  result = split(shard_id, ':');\r
+  string bucket_tenant = result[0];\r
+  string bucket_name = result[1];\r
+  string bucket_id = result[2];\r
+  int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);\r
+  if (ret < 0) {\r
+    ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;\r
+    return ret;\r
+  }\r
+\r
+  ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;\r
+  if (ret !=0) {\r
+    ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;\r
+    return -ENOENT;\r
+  }\r
+  \r
+  RGWRados::Bucket target(store, bucket_info.bucket);\r
+  RGWRados::Bucket::List list_op(&target);\r
+\r
+  list_op.params.prefix = prefix;\r
+  list_op.params.delim = delimiter;\r
+  list_op.params.marker = marker;\r
+  list_op.params.end_marker = end_marker;\r
+  list_op.params.list_versions = false;\r
+   \r
+  map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);\r
+  if (aiter == bucket_attrs.end())\r
+    return 0;\r
+  \r
+  bufferlist::iterator iter(&aiter->second);\r
+  try {\r
+      config.decode(iter);\r
+    } catch (const buffer::error& e) {\r
+      ldout(cct, 0) << __func__ <<  "decode life cycle config failed" << dendl;\r
+      return -1;\r
+    }\r
+\r
+  map<string, int>& prefix_map = config.get_prefix_map();\r
+  for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();  prefix_iter++) {\r
+    if (prefix_iter->first.empty()) {\r
+      default_config = true;\r
+      default_days = prefix_iter->second;\r
+      continue;\r
+    }\r
+  }\r
+  if (default_config) { \r
+    do {\r
+      \r
+      objs.clear();\r
+      list_op.params.marker = list_op.get_next_marker();\r
+      ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated);    \r
+      if (ret < 0) {\r
+        if (ret == -ENOENT)\r
+          return 0;\r
+        ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;\r
+        return ret;\r
+      }\r
+\r
+      vector<RGWObjEnt>::iterator obj_iter;\r
+      int pos = 0;\r
+      utime_t now = ceph_clock_now(cct);\r
+      for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) {\r
+        bool prefix_match = false;\r
+        int match_days = 0;        \r
+        map<string, int>& prefix_map = config.get_prefix_map();\r
+        \r
+        for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();  prefix_iter++) {\r
+          if (prefix_iter->first.empty()) {\r
+            continue;\r
+          }\r
+          pos = (*obj_iter).key.name.find(prefix_iter->first, 0);\r
+          if (pos != 0) {\r
+            continue;\r
+          }\r
+          prefix_match = true;\r
+          match_days = prefix_iter->second;\r
+          break;\r
+        }\r
+        int days = 0;      \r
+        if (prefix_match) {\r
+          days = match_days;\r
+        } else if (default_config) {\r
+          days = default_days;\r
+        } else {\r
+          continue;\r
+        }        \r
+        if (now - (*obj_iter).mtime >= days*24*60*60) {\r
+          RGWObjectCtx rctx(store);\r
+          rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);\r
+          RGWObjState *state;\r
+          int ret = store->get_obj_state(&rctx, obj, &state, NULL, false);\r
+          if (ret < 0) {\r
+            return ret;\r
+          }\r
+          if (state->mtime != (*obj_iter).mtime) //Check mtime again to avoid delete a recently update object as much as possible\r
+            continue;\r
+          ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);\r
+          if (ret < 0) {\r
+            ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;\r
+          } else {\r
+            ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name <<dendl;\r
+          }\r
+        }\r
+      }    \r
+    }while (is_truncated);\r
+  }else {\r
+    for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();  prefix_iter++) {\r
+      if (prefix_iter->first.empty()) {\r
+        continue;\r
+      }\r
+      list_op.params.prefix = prefix_iter->first;\r
+      \r
+      do {\r
+        \r
+        objs.clear();\r
+        list_op.params.marker = list_op.get_next_marker();          \r
+        ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); \r
+        \r
+        if (ret < 0) {\r
+          if (ret == (-ENOENT))\r
+            return 0;\r
+          ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;\r
+          return ret;\r
+        }\r
+        \r
+        vector<RGWObjEnt>::iterator obj_iter;\r
+        int days = prefix_iter->second;           \r
+        utime_t now = ceph_clock_now(cct);\r
+        \r
+        for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) {                  \r
+          if (now - (*obj_iter).mtime >= days*24*60*60) {\r
+            RGWObjectCtx rctx(store);\r
+            rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);\r
+            RGWObjState *state;\r
+            int ret = store->get_obj_state(&rctx, obj, &state, NULL, false); \r
+            if (ret < 0) {\r
+              return ret;\r
+            }\r
+            if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible\r
+              continue;\r
+            ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);\r
+            if (ret < 0) {\r
+              ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;\r
+            } else {\r
+              ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl;\r
+            }\r
+          }\r
+        }\r
+      } while (is_truncated);\r
+    }     \r
+  }\r
+\r
+  return ret;\r
+}\r
+\r
+int RGWLC::bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, \r
+                                                              pair<string, int >& entry, int& result)\r
+{\r
+  rados::cls::lock::Lock l(lc_index_lock_name);\r
+  do {\r
+    int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);\r
+    if (ret == -EBUSY) { /* already locked by another lc processor */\r
+      dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;\r
+      sleep(10);\r
+      continue;\r
+    }\r
+    if (ret < 0)\r
+      return 0;\r
+    dout(20) << "RGWLC::bucket_lc_post()  get lock" << obj_names[index] << dendl;\r
+    if (result ==  -ENOENT) {\r
+      ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index],  entry);\r
+      if (ret < 0) {\r
+        dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;\r
+        goto clean;\r
+      }\r
+    } else if (result < 0) {\r
+      entry.second = lc_failed;      \r
+    } else {\r
+      entry.second = lc_complete;   \r
+    }\r
+    \r
+    ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index],  entry);\r
+    if (ret < 0) {\r
+      dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;\r
+    }\r
+clean:    \r
+    l.unlock(&store->lc_pool_ctx, obj_names[index]);\r
+    dout(20) << "RGWLC::bucket_lc_post()  unlock" << obj_names[index] << dendl;\r
+    return 0;\r
+  }while(1);\r
+}\r
+\r
+int RGWLC::list_lc_progress(map<string, int>& progress_map)\r
+{\r
+  int index = 0;\r
+  for(; index <max_objs; index++) {\r
+    map<string, int > entries;\r
+    int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], entries);\r
+    if (ret < 0)
+      return ret;\r
+    map<string, int>::iterator iter;\r
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      progress_map.insert(*iter);\r
+    }\r
+  }\r
+  return 0;\r
+}\r
+\r
+int RGWLC::process()\r
+{\r
+  int max_secs = cct->_conf->rgw_lc_lock_max_time;\r
+\r
+  unsigned start;\r
+  int ret = get_random_bytes((char *)&start, sizeof(start));\r
+  if (ret < 0)\r
+    return ret;\r
+\r
+  for (int i = 0; i < max_objs; i++) {\r
+    int index = (i + start) % max_objs;\r
+    ret = process(index, max_secs);\r
+    if (ret < 0)\r
+      return ret;\r
+  }\r
+\r
+  return 0;\r
+}\r
+\r
+int RGWLC::process(int index, int max_lock_secs)\r
+{\r
+  rados::cls::lock::Lock l(lc_index_lock_name);\r
+  do {\r
+    utime_t now = ceph_clock_now(g_ceph_context);\r
+    pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS\r
+    if (max_lock_secs <= 0)\r
+      return -EAGAIN;\r
+      \r
+    utime_t time(max_lock_secs, 0);\r
+    l.set_duration(time);\r
+\r
+    int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);\r
+    if (ret == -EBUSY) { /* already locked by another lc processor */\r
+      dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;\r
+      sleep(10);\r
+      continue;\r
+    }\r
+    if (ret < 0)\r
+      return 0;\r
+\r
+    string marker;\r
+    cls_rgw_lc_obj_head head;\r
+    ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);\r
+    if (ret < 0) {\r
+      dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;\r
+      goto exit;\r
+    }\r
+\r
+    if(!if_already_run_today(head.start_date)) {\r
+      head.start_date = now;\r
+      head.marker.clear();\r
+      ret = bucket_lc_prepare(index);\r
+      if (ret < 0) {\r
+      dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;\r
+      goto exit;\r
+      }\r
+    }\r
+\r
+    ret = cls_rgw_lc_get_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);\r
+    if (ret < 0) {\r
+      dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;\r
+      goto exit;\r
+    }\r
+\r
+    if (entry.first.empty())\r
+      goto exit;\r
+\r
+    entry.second = lc_processing;\r
+    ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index],  entry);\r
+    if (ret < 0) {\r
+      dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;\r
+      goto exit;\r
+    }\r
+\r
+    head.marker = entry.first;\r
+    ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index],  head);\r
+    if (ret < 0) {\r
+      dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;\r
+      goto exit;\r
+    }\r
+    l.unlock(&store->lc_pool_ctx, obj_names[index]);\r
+    ret = bucket_lc_process(entry.first);\r
+    ret = bucket_lc_post(index, max_lock_secs, head, entry, ret);   \r
+    continue;\r
+exit:    \r
+    l.unlock(&store->lc_pool_ctx, obj_names[index]);\r
+    return 0;\r
+    \r
+  }while(1);\r
+      \r
+}\r
+\r
+void RGWLC::start_processor()\r
+{\r
+  worker = new LCWorker(cct, this);\r
+  worker->create();\r
+}\r
+\r
+void RGWLC::stop_processor()\r
+{\r
+  if (worker) {\r
+    worker->stop();\r
+    worker->join();\r
+  }\r
+  delete worker;\r
+  worker = NULL;\r
+}\r
+\r
+void RGWLC::LCWorker::stop()\r
+{\r
+  Mutex::Locker l(lock);\r
+  cond.Signal();\r
+}\r
+\r
+bool RGWLC::going_down()\r
+{\r
+  return false;\r
+}\r
+\r
+bool RGWLC::LCWorker::should_work(utime_t& now)\r
+{\r
+  int start_hour;\r
+  int start_minite;\r
+  int end_hour;\r
+  int end_minite;\r
+  string worktime = cct->_conf->rgw_lifecycle_work_time;\r
+  sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite);\r
+  struct tm bdt;\r
+  time_t tt = now.sec();\r
+  localtime_r(&tt, &bdt);\r
+  if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minite)||\r
+      (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minite)) {\r
+    return true;\r
+  } else {\r
+    return false;\r
+  }\r
+\r
+}\r
+\r
+int RGWLC::LCWorker::shedule_next_start_time(utime_t& now)\r
+{\r
+  int start_hour;\r
+  int start_minite;\r
+  int end_hour;\r
+  int end_minite;\r
+  string worktime = cct->_conf->rgw_lifecycle_work_time;\r
+  sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite);\r
+  struct tm bdt;\r
+  time_t tt = now.sec();\r
+  time_t nt;\r
+  localtime_r(&tt, &bdt);\r
+  bdt.tm_hour = start_hour;\r
+  bdt.tm_min = start_minite;\r
+  bdt.tm_sec = 0;\r
+  nt = mktime(&bdt);\r
+  return (nt+24*60*60 - tt);\r
+}\r
+\r
diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h
new file mode 100644 (file)
index 0000000..7d468b1
--- /dev/null
@@ -0,0 +1,227 @@
+#ifndef CEPH_RGW_LC_H\r
+#define CEPH_RGW_LC_H\r
+\r
+#include <map>\r
+#include <string>\r
+#include <iostream>\r
+#include <include/types.h>\r
+\r
+#include "common/debug.h"\r
+\r
+#include "include/types.h"\r
+#include "include/atomic.h"\r
+#include "include/rados/librados.hpp"\r
+#include "common/Mutex.h"\r
+#include "common/Cond.h"\r
+#include "common/Thread.h"\r
+#include "rgw_common.h"\r
+#include "rgw_rados.h"\r
+#include "cls/rgw/cls_rgw_types.h"\r
+\r
+using namespace std;\r
+#define HASH_PRIME 7877\r
+static string lc_oid_prefix = "lc";\r
+static string lc_index_lock_name = "lc_process";\r
+\r
+extern const char* LC_STATUS[];\r
+\r
+typedef enum {\r
+  lc_uninitial = 0,\r
+  lc_processing,\r
+  lc_failed,\r
+  lc_complete,\r
+}LC_BUCKET_STATUS;\r
+\r
+class LCExpiration\r
+{\r
+protected:\r
+  string days;\r
+public:\r
+  LCExpiration() {}\r
+  ~LCExpiration() {}\r
+\r
+  void encode(bufferlist& bl) const {\r
+    ENCODE_START(2, 2, bl);\r
+    ::encode(days, bl);\r
+    ENCODE_FINISH(bl);\r
+  }\r
+  void decode(bufferlist::iterator& bl) {\r
+    DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);\r
+    ::decode(days, bl);\r
+    DECODE_FINISH(bl);\r
+  }\r
+  void dump(Formatter *f) const;\r
+//  static void generate_test_instances(list<ACLOwner*>& o);\r
+  void set_days(const string& _days) { days = _days; }\r
+  bool get_days(int* _days) {*_days = atoi(days.c_str()); return true; }\r
+};\r
+WRITE_CLASS_ENCODER(LCExpiration)\r
+\r
+class LCRule\r
+{\r
+protected:\r
+  string id;\r
+  string prefix;\r
+  string status;\r
+  LCExpiration expiration;\r
+\r
+public:\r
+\r
+  LCRule(){};\r
+  ~LCRule(){};\r
+\r
+  bool get_id(string& _id) {\r
+      _id = id;\r
+      return true;\r
+  }\r
+\r
+  bool get_status(string& _status) {\r
+      _status = status;\r
+      return true;\r
+  }\r
+  \r
+  bool get_prefix(string& _prefix) {\r
+      _prefix = prefix;\r
+      return true;\r
+  }\r
+\r
+  bool get_expiration(LCExpiration& _expriation) {\r
+    _expriation = expiration;\r
+    return true;\r
+  }\r
+\r
+  void set_id(string*_id) {\r
+    id = *_id;\r
+  }\r
+\r
+  void set_prefix(string*_prefix) {\r
+    prefix = *_prefix;\r
+  }\r
+\r
+  void set_status(string*_status) {\r
+    status = *_status;\r
+  }\r
+\r
+  void set_expiration(LCExpiration*_expiration) {\r
+    expiration = *_expiration;\r
+  }\r
+  \r
+  void encode(bufferlist& bl) const {\r
+     ENCODE_START(1, 1, bl);\r
+     ::encode(id, bl);\r
+     ::encode(prefix, bl);\r
+     ::encode(status, bl);\r
+     ::encode(expiration, bl);\r
+     ENCODE_FINISH(bl);\r
+   }\r
+   void decode(bufferlist::iterator& bl) {\r
+     DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);\r
+     ::decode(id, bl);\r
+     ::decode(prefix, bl);\r
+     ::decode(status, bl);\r
+     ::decode(expiration, bl);\r
+     DECODE_FINISH(bl);\r
+   }\r
+\r
+};\r
+WRITE_CLASS_ENCODER(LCRule)\r
+\r
+class RGWLifecycleConfiguration\r
+{\r
+protected:\r
+  CephContext *cct;\r
+  map<string, int> prefix_map;\r
+  multimap<string, LCRule> rule_map;\r
+  void _add_rule(LCRule *rule);\r
+public:\r
+  RGWLifecycleConfiguration(CephContext *_cct) : cct(_cct) {}\r
+  RGWLifecycleConfiguration() : cct(NULL) {}\r
+\r
+  void set_ctx(CephContext *ctx) {\r
+    cct = ctx;\r
+  }\r
+\r
+  virtual ~RGWLifecycleConfiguration() {}\r
+\r
+//  int get_perm(string& id, int perm_mask);\r
+//  int get_group_perm(ACLGroupTypeEnum group, int perm_mask);\r
+  void encode(bufferlist& bl) const {\r
+    ENCODE_START(1, 1, bl);\r
+    ::encode(rule_map, bl);\r
+    ENCODE_FINISH(bl);\r
+  }\r
+  void decode(bufferlist::iterator& bl) {\r
+    DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);\r
+    ::decode(rule_map, bl);\r
+    multimap<string, LCRule>::iterator iter;\r
+    for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {\r
+      LCRule& rule = iter->second;\r
+      _add_rule(&rule);\r
+    }\r
+    DECODE_FINISH(bl);\r
+  }\r
+  void dump(Formatter *f) const;\r
+//  static void generate_test_instances(list<RGWAccessControlList*>& o);\r
+\r
+  void add_rule(LCRule* rule);\r
+\r
+  multimap<string, LCRule>& get_rule_map() { return rule_map; }\r
+  map<string, int>& get_prefix_map() { return prefix_map; }\r
+/*\r
+  void create_default(string id, string name) {\r
+    ACLGrant grant;\r
+    grant.set_canon(id, name, RGW_PERM_FULL_CONTROL);\r
+    add_grant(&grant);\r
+  }\r
+*/\r
+};\r
+WRITE_CLASS_ENCODER(RGWLifecycleConfiguration)\r
+\r
+class RGWLC {\r
+  CephContext *cct;\r
+  RGWRados *store;\r
+  int max_objs;\r
+  string *obj_names;\r
+\r
+  class LCWorker : public Thread {\r
+    CephContext *cct;\r
+    RGWLC *lc;\r
+    Mutex lock;\r
+    Cond cond;\r
+\r
+  public:\r
+    LCWorker(CephContext *_cct, RGWLC *_lc) : cct(_cct), lc(_lc), lock("LCWorker") {}\r
+    void *entry();\r
+    void stop();\r
+    bool should_work(utime_t& now);\r
+    int shedule_next_start_time(utime_t& now);\r
+  };\r
+  \r
+  public:\r
+  LCWorker *worker;\r
+public:\r
+  RGWLC() : cct(NULL), store(NULL), worker(NULL) {}\r
+  ~RGWLC() {\r
+    stop_processor();\r
+    finalize();\r
+  }\r
+\r
+  void initialize(CephContext *_cct, RGWRados *_store);\r
+  void finalize();\r
+\r
+  int process();\r
+  int process(int index, int max_secs);\r
+  bool if_already_run_today(time_t& start_date);\r
+  int list_lc_progress(map<string, int>& progress_map);\r
+  int bucket_lc_prepare(int index);\r
+  int bucket_lc_process(string& shard_id);\r
+  int bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, \r
+                                                              pair<string, int >& entry, int& result);\r
+  bool going_down();\r
+  void start_processor();\r
+  void stop_processor();\r
+};\r
+\r
+\r
+\r
+#endif\r
diff --git a/src/rgw/rgw_lc_s3.cc b/src/rgw/rgw_lc_s3.cc
new file mode 100644 (file)
index 0000000..0ef0113
--- /dev/null
@@ -0,0 +1,112 @@
+#include <string.h>\r
+\r
+#include <iostream>\r
+#include <map>\r
+\r
+#include "include/types.h"\r
+\r
+#include "rgw_lc_s3.h"\r
+\r
+\r
+#define dout_subsys ceph_subsys_rgw\r
+\r
+using namespace std;\r
+\r
+bool LCExpiration_S3::xml_end(const char * el) {\r
+  LCDays_S3 *lc_days = static_cast<LCDays_S3 *>(find_first("Days"));\r
+
+  // ID is mandatory
+  if (!lc_days)\r
+    return false;
+  days = lc_days->get_data();\r
+  return true;
+}\r
+\r
+bool RGWLifecycleConfiguration_S3::xml_end(const char *el) {\r
+  XMLObjIter iter = find("Rule");\r
+  LCRule_S3 *rule = static_cast<LCRule_S3 *>(iter.get_next());\r
+  while (rule) {\r
+    add_rule(rule);\r
+    rule = static_cast<LCRule_S3 *>(iter.get_next());\r
+  }\r
+  return true;\r
+}\r
+\r
+bool LCRule_S3::xml_end(const char *el) {\r
+  LCID_S3 *lc_id;\r
+  LCPrefix_S3 *lc_prefix;\r
+  LCStatus_S3 *lc_status;\r
+  LCExpiration_S3 *lc_expiration;\r
+\r
+  id.clear();\r
+  prefix.clear();\r
+  status.clear();\r
+  \r
+  lc_id = static_cast<LCID_S3 *>(find_first("ID"));\r
+  if (!lc_id)\r
+    return false;\r
+  id = lc_id->get_data();\r
+\r
+  lc_prefix = static_cast<LCPrefix_S3 *>(find_first("Prefix"));\r
+  if (!lc_prefix)\r
+    return false;\r
+  prefix = lc_prefix->get_data();\r
+\r
+  lc_status = static_cast<LCStatus_S3 *>(find_first("Status"));\r
+  if (!lc_status)\r
+    return false;\r
+  status = lc_status->get_data();\r
+  \r
+  lc_expiration = static_cast<LCExpiration_S3 *>(find_first("Expiration"));\r
+  if (!lc_expiration)\r
+    return false;\r
+  expiration = *lc_expiration;\r
+\r
+  return true;\r
+}\r
+\r
+void LCRule_S3::to_xml(CephContext *cct, ostream& out) {\r
+  LCExpiration_S3& expir = static_cast<LCExpiration_S3&>(expiration);\r
+  out << "<Rule>" ;\r
+  out << "<ID>" << id << "</ID>";\r
+  out << "<Prefix>" << prefix << "</Prefix>";\r
+  out << "<Status>" << status << "</Status>";\r
+  expir.to_xml(out);\r
+  out << "</Rule>";\r
+}\r
+\r
+int RGWLifecycleConfiguration_S3::rebuild(RGWRados *store, RGWLifecycleConfiguration& dest)\r
+{\r
+  multimap<string, LCRule>::iterator iter;\r
+  for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {\r
+    LCRule& src_rule = iter->second;\r
+    bool rule_ok = true;\r
+\r
+    if (rule_ok) {\r
+      dest.add_rule(&src_rule);\r
+    }\r
+  }\r
+\r
+  return 0; \r
+}\r
+\r
+XMLObj *RGWLCXMLParser_S3::alloc_obj(const char *el)\r
+{\r
+  XMLObj * obj = NULL;\r
+  if (strcmp(el, "LifecycleConfiguration") == 0) {\r
+    obj = new RGWLifecycleConfiguration_S3(cct);\r
+  } else if (strcmp(el, "Rule") == 0) {\r
+    obj = new LCRule_S3();\r
+  } else if (strcmp(el, "ID") == 0) {\r
+    obj = new LCID_S3();\r
+  } else if (strcmp(el, "Prefix") == 0) {\r
+    obj = new LCPrefix_S3();\r
+  } else if (strcmp(el, "Status") == 0) {\r
+    obj = new LCStatus_S3();\r
+  } else if (strcmp(el, "Expiration") == 0) {\r
+    obj = new LCExpiration_S3();\r
+  } else if (strcmp(el, "Days") == 0) {\r
+    obj = new LCDays_S3();\r
+  }\r
+  return obj;\r
+}\r
diff --git a/src/rgw/rgw_lc_s3.h b/src/rgw/rgw_lc_s3.h
new file mode 100644 (file)
index 0000000..79e56ec
--- /dev/null
@@ -0,0 +1,104 @@
+#ifndef CEPH_RGW_LC_S3_H\r
+#define CEPH_RGW_LC_S3_H\r
+\r
+#include <map>\r
+#include <string>\r
+#include <iostream>\r
+#include <include/types.h>\r
+\r
+#include <expat.h>\r
+\r
+#include "include/str_list.h"\r
+#include "rgw_lc.h"\r
+#include "rgw_xml.h"\r
+\r
+\r
+\r
+using namespace std;\r
+\r
+class LCRule_S3 : public LCRule, public XMLObj\r
+{\r
+public:\r
+  LCRule_S3() {}\r
+  ~LCRule_S3() {}\r
+\r
+  void to_xml(CephContext *cct, ostream& out);\r
+  bool xml_end(const char *el);\r
+  bool xml_start(const char *el, const char **attr);\r
+};\r
+\r
+class LCID_S3 : public XMLObj\r
+{\r
+public:\r
+  LCID_S3() {}\r
+  ~LCID_S3() {}\r
+  string& to_str() { return data; }\r
+};\r
+\r
+class LCPrefix_S3 : public XMLObj\r
+{\r
+public:\r
+  LCPrefix_S3() {}\r
+  ~LCPrefix_S3() {}\r
+  string& to_str() { return data; }\r
+};\r
+\r
+class LCStatus_S3 : public XMLObj\r
+{\r
+public:\r
+  LCStatus_S3() {}\r
+  ~LCStatus_S3() {}\r
+  string& to_str() { return data; }\r
+};\r
+\r
+class LCDays_S3 : public XMLObj\r
+{\r
+public:\r
+  LCDays_S3() {}\r
+  ~LCDays_S3() {}\r
+  string& to_str() { return data; }\r
+};\r
+\r
+class LCExpiration_S3 : public LCExpiration, public XMLObj\r
+{\r
+public:\r
+  LCExpiration_S3() {}\r
+  ~LCExpiration_S3() {}\r
+\r
+  bool xml_end(const char *el);\r
+  void to_xml(ostream& out) {\r
+    out << "<Expiration>" << "<Days>" << days << "</Days>"<< "</Expiration>";\r
+  }\r
+};\r
+\r
+class RGWLCXMLParser_S3 : public RGWXMLParser\r
+{\r
+  CephContext *cct;\r
+\r
+  XMLObj *alloc_obj(const char *el);\r
+public:\r
+  RGWLCXMLParser_S3(CephContext *_cct) : cct(_cct) {}\r
+};\r
+\r
+class RGWLifecycleConfiguration_S3 : public RGWLifecycleConfiguration, public XMLObj\r
+{\r
+public:\r
+  RGWLifecycleConfiguration_S3(CephContext *_cct) : RGWLifecycleConfiguration(_cct) {}\r
+  ~RGWLifecycleConfiguration_S3() {}\r
+\r
+  bool xml_end(const char *el);\r
+\r
+  void to_xml(ostream& out) {\r
+    out << "<LifecycleConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";\r
+    multimap<string, LCRule>::iterator iter;\r
+    for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {\r
+      LCRule_S3& rule = static_cast<LCRule_S3&>(iter->second);\r
+      rule.to_xml(cct, out);\r
+    }\r
+    out << "</LifecycleConfiguration>";\r
+  }\r
+  int rebuild(RGWRados *store, RGWLifecycleConfiguration& dest);\r
+};\r
+\r
+\r
+#endif\r
index 2246b6a40ea11b598e09a422e8dde714b57afd1a..1c8a38aa1462f287ff9b4e97659f46542d5f7346 100644 (file)
@@ -1072,7 +1072,7 @@ int main(int argc, const char **argv)
 
   int r = 0;
   RGWRados *store = RGWStoreManager::get_storage(g_ceph_context,
-      g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_quota_threads);
+      g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads);
   if (!store) {
     mutex.Lock();
     init_timer.cancel_all_events();
index dcbfbc1d3a43ad3b7e258726fc1e490561a4bcc6..09350be3bb2bbc75159a878c1415c12e239ad810 100644 (file)
@@ -78,7 +78,7 @@ int main(const int argc, const char **argv)
 
   common_init_finish(g_ceph_context);
 
-  store = RGWStoreManager::get_storage(g_ceph_context, false, false);
+  store = RGWStoreManager::get_storage(g_ceph_context, false, false, false);
   if (!store) {
     std::cerr << "couldn't init storage provider" << std::endl;
     return EIO;
index 815879d231b42beed3fab5bd8b21857fd4ff2d6a..d0d77cc3fc28a65f4907b871d450ab4ad8f4937c 100644 (file)
 #include "rgw_multi_del.h"
 #include "rgw_cors.h"
 #include "rgw_cors_s3.h"
-
+#include "rgw_lc.h"
+#include "rgw_lc_s3.h"
 #include "rgw_client_io.h"
+#include "cls/lock/cls_lock_client.h"
+#include "cls/rgw/cls_rgw_client.h"
+
 
 #define dout_subsys ceph_subsys_rgw
 
 using namespace std;
+using namespace librados;
 using ceph::crypto::MD5;
 
+
 static string mp_ns = RGW_OBJ_NS_MULTIPART;
 static string shadow_ns = RGW_OBJ_NS_SHADOW;
 
@@ -2748,11 +2754,43 @@ int RGWPutACLs::verify_permission()
   return 0;
 }
 
+int RGWPutLC::verify_permission()
+{
+  bool perm;
+  ldout(s->cct, 0) << "ccc" <<s->bucket_acl << dendl;
+  perm = s->bucket_acl->verify_permission(s->user.user_id, RGW_PERM_WRITE_ACP, RGW_PERM_WRITE_ACP);
+  if (!perm)
+    return -EACCES;
+
+  return 0;
+}
+
+int RGWDeleteLC::verify_permission()
+{
+  bool perm;
+  ldout(s->cct, 0) << "ccc" <<s->bucket_acl << dendl;
+  perm = s->bucket_acl->verify_permission(s->user.user_id, RGW_PERM_WRITE_ACP, RGW_PERM_WRITE_ACP);
+  if (!perm)
+    return -EACCES;
+
+  return 0;
+}
+
 void RGWPutACLs::pre_exec()
 {
   rgw_bucket_object_pre_exec(s);
 }
 
+void RGWPutLC::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
+void RGWDeleteLC::pre_exec()
+{
+  rgw_bucket_object_pre_exec(s);
+}
+
 void RGWPutACLs::execute()
 {
   bufferlist bl;
@@ -2847,6 +2885,135 @@ void RGWPutACLs::execute()
   }
 }
 
+static void get_lc_oid(struct req_state *s, string& oid)
+{
+  string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+  int max_objs = (s->cct->_conf->rgw_lc_max_objs > HASH_PRIME)?HASH_PRIME:s->cct->_conf->rgw_lc_max_objs;
+  int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
+  oid = lc_oid_prefix;
+  char buf[32];
+  snprintf(buf, 32, ".%d", index);
+  oid.append(buf);
+  return;
+}
+void RGWPutLC::execute()
+{
+  bufferlist bl;
+  
+  RGWLifecycleConfiguration_S3 *config = NULL;
+  RGWLCXMLParser_S3 parser(s->cct);
+  RGWLifecycleConfiguration_S3 new_config(s->cct);
+  ret = 0;
+
+  if (!parser.init()) {
+    ret = -EINVAL;
+    return;
+  }
+
+  ret = get_params();
+  if (ret < 0)
+    return;
+
+  ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
+
+  if (!parser.parse(data, len, 1)) {
+    ret = -EACCES;
+    return;
+  }
+  config = static_cast<RGWLifecycleConfiguration_S3 *>(parser.find_first("LifecycleConfiguration"));
+  if (!config) {
+    ret = -EINVAL;
+    return;
+  }
+
+  if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
+    ldout(s->cct, 15) << "Old LifecycleConfiguration";
+    config->to_xml(*_dout);
+    *_dout << dendl;
+  }
+
+  ret = config->rebuild(store, new_config);
+  if (ret < 0)
+    return;
+
+  if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
+    ldout(s->cct, 15) << "New LifecycleConfiguration:";
+    new_config.to_xml(*_dout);
+    *_dout << dendl;
+  }
+  
+  new_config.encode(bl);
+  map<string, bufferlist> attrs;
+  attrs[RGW_ATTR_LC] = bl;
+  ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, NULL, NULL);
+  if (ret < 0)
+    return;
+  string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;  
+  string oid; 
+  get_lc_oid(s, oid);
+  pair<string, int> entry(shard_id, lc_uninitial);
+  int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
+  rados::cls::lock::Lock l(lc_index_lock_name); 
+  utime_t time(max_lock_secs, 0);
+  l.set_duration(time);
+  librados::IoCtx *ctx = store->get_lc_pool_ctx();
+  do {
+    ret = l.lock_exclusive(ctx, oid);
+    if (ret == -EBUSY) {
+      dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
+      sleep(5);
+      continue;
+    }
+    if (ret < 0) {
+      dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl;
+      break;
+    }
+    ret = cls_rgw_lc_set_entry(*ctx, oid, entry);
+    if (ret < 0) {
+      dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl;     
+    }
+    break;
+  }while(1);
+  l.unlock(ctx, oid);
+  return;
+}
+
+void RGWDeleteLC::execute()
+{
+  bufferlist bl;
+  map<string, bufferlist> attrs, rmattrs;
+  rmattrs[RGW_ATTR_LC] = bl;
+  ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &rmattrs, NULL);
+  string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+  pair<string, int> entry(shard_id, lc_uninitial);
+  string oid; 
+  get_lc_oid(s, oid);
+  int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
+  librados::IoCtx *ctx = store->get_lc_pool_ctx();
+  rados::cls::lock::Lock l(lc_index_lock_name);
+  utime_t time(max_lock_secs, 0);
+  l.set_duration(time);
+  do {
+    ret = l.lock_exclusive(ctx, oid);
+    if (ret == -EBUSY) {
+      dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
+      sleep(5);
+      continue;
+    }
+    if (ret < 0) {
+      dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl;
+      break;
+    }
+    ret = cls_rgw_lc_rm_entry(*ctx, oid, entry);
+    if (ret < 0) {
+      dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl;     
+    }
+    break;
+  }while(1);
+  l.unlock(ctx, oid);
+  return;
+}
+
 int RGWGetCORS::verify_permission()
 {
   if (s->user.user_id.compare(s->bucket_owner.get_id()) != 0)
index 0252ff8a65d2f882586f5010d7f5f45a58e2ab21..de18804f01b990d8d4d45a2e4178788034c9c78c 100644 (file)
@@ -763,6 +763,58 @@ public:
   virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
 };
 
+class RGWPutLC : public RGWOp {
+protected:
+  int ret;
+  size_t len;
+  char *data;
+
+public:
+  RGWPutLC() {
+    ret = 0;
+    len = 0;
+    data = NULL;
+  }
+  virtual ~RGWPutLC() {
+    free(data);
+  }
+
+  int verify_permission();
+  void pre_exec();
+  void execute();
+
+//  virtual int get_policy_from_state(RGWRados *store, struct req_state *s, stringstream& ss) { return 0; }
+  virtual int get_params() = 0;
+  virtual void send_response() = 0;
+  virtual const string name() { return "put_lifecycle"; }
+  virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
+};
+
+class RGWDeleteLC : public RGWOp {
+protected:
+  int ret;
+  size_t len;
+  char *data;
+
+public:
+  RGWDeleteLC() {
+    ret = 0;
+    len = 0;
+    data = NULL;
+  }
+  virtual ~RGWDeleteLC() {
+    free(data);
+  }
+
+  int verify_permission();
+  void pre_exec();
+  void execute();
+
+  virtual void send_response() = 0;
+  virtual const string name() { return "delete_lifecycle"; }
+  virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
+};
+
 class RGWGetCORS : public RGWOp {
 protected:
   int ret;
index 3e38e9effefe4d4ef447560c10e4cf25a1ea5aa6..b5f23c94a1bdb3ae84c7eb4cd7885ed1dae721ac 100644 (file)
@@ -17,6 +17,8 @@
 #include "rgw_cache.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h" /* for dumping s3policy in debug log */
+#include "rgw_lc.h"
+#include "rgw_lc_s3.h"
 #include "rgw_metadata.h"
 #include "rgw_bucket.h"
 
@@ -48,6 +50,8 @@ using namespace librados;
 #include "rgw_log.h"
 
 #include "rgw_gc.h"
+#include "rgw_lc.h"
+
 #include "rgw_object_expirer_core.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -311,6 +315,7 @@ void RGWZoneParams::init_default(RGWRados *store)
   domain_root = ".rgw";
   control_pool = ".rgw.control";
   gc_pool = ".rgw.gc";
+  lc_pool = ".rgw.lc";
   log_pool = ".log";
   intent_log_pool = ".intent-log";
   usage_log_pool = ".usage";
@@ -1645,6 +1650,10 @@ int RGWRados::init_complete()
   if (ret < 0)
     return ret;
 
+  ret = open_lc_pool_ctx();
+  if (ret < 0)
+    return ret;
+
   ret = open_objexp_pool_ctx();
   if (ret < 0)
     return ret;
@@ -1661,6 +1670,12 @@ int RGWRados::init_complete()
     obj_expirer->start_processor();
   }
 
+  lc = new RGWLC();
+  lc->initialize(cct, this);
+  
+  if (use_lc_thread)
+    lc->start_processor();
+  
   quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
 
   bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
@@ -1794,6 +1809,24 @@ int RGWRados::open_gc_pool_ctx()
   return r;
 }
 
+int RGWRados::open_lc_pool_ctx()
+{
+  const char *lc_pool = zone.lc_pool.name.c_str();
+  librados::Rados *rad = get_rados_handle();
+  int r = rad->ioctx_create(lc_pool, lc_pool_ctx);
+  if (r == -ENOENT) {
+    r = rad->pool_create(lc_pool);
+    if (r == -EEXIST)
+      r = 0;
+    if (r < 0)
+      return r;
+
+    r = rad->ioctx_create(lc_pool, lc_pool_ctx);
+  }
+
+  return r;
+}
+
 int RGWRados::open_objexp_pool_ctx()
 {
   const char * const pool_name = zone.log_pool.name.c_str();
@@ -8204,6 +8237,16 @@ int RGWRados::process_gc()
   return gc->process();
 }
 
+int RGWRados::list_lc_progress(map<string, int>& progress_map)
+{
+  return lc->list_lc_progress(progress_map);
+}
+
+int RGWRados::process_lc()
+{
+  return lc->process();
+}
+
 int RGWRados::process_expire_objects()
 {
   obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now(cct));
@@ -9218,7 +9261,7 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads)
 {
   int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
@@ -9228,7 +9271,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t
     store = new RGWCache<RGWRados>; 
   }
 
-  if (store->initialize(cct, use_gc_thread, quota_threads) < 0) {
+  if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads) < 0) {
     delete store;
     return NULL;
   }
index 67adc60e4c73bf12ee9e6dd4017c9ddd0a7df9d8..7f38645311ef3a12b2994767e4a580ac2ef4249c 100644 (file)
@@ -22,6 +22,7 @@ class RGWWatcher;
 class SafeTimer;
 class ACLOwner;
 class RGWGC;
+class RGWLC;
 class RGWObjectExpirer;
 
 /* flags for put_obj_meta() */
@@ -732,6 +733,7 @@ struct RGWZoneParams {
   rgw_bucket domain_root;
   rgw_bucket control_pool;
   rgw_bucket gc_pool;
+  rgw_bucket lc_pool;
   rgw_bucket log_pool;
   rgw_bucket intent_log_pool;
   rgw_bucket usage_log_pool;
@@ -761,6 +763,7 @@ struct RGWZoneParams {
     ::encode(domain_root, bl);
     ::encode(control_pool, bl);
     ::encode(gc_pool, bl);
+    ::encode(lc_pool, bl);
     ::encode(log_pool, bl);
     ::encode(intent_log_pool, bl);
     ::encode(usage_log_pool, bl);
@@ -779,6 +782,7 @@ struct RGWZoneParams {
     ::decode(domain_root, bl);
     ::decode(control_pool, bl);
     ::decode(gc_pool, bl);
+    ::decode(lc_pool, bl);
     ::decode(log_pool, bl);
     ::decode(intent_log_pool, bl);
     ::decode(usage_log_pool, bl);
@@ -1207,6 +1211,7 @@ class Finisher;
 class RGWRados
 {
   friend class RGWGC;
+  friend class RGWLC;
   friend class RGWObjectExpirer;
   friend class RGWStateLog;
   friend class RGWReplicaLogger;
@@ -1214,6 +1219,7 @@ class RGWRados
   /** Open the pool used as root for this gateway */
   int open_root_pool_ctx();
   int open_gc_pool_ctx();
+  int open_lc_pool_ctx();
   int open_objexp_pool_ctx();
 
   int open_bucket_pool_ctx(const string& pool, librados::IoCtx&  io_ctx);
@@ -1251,8 +1257,10 @@ class RGWRados
   };
 
   RGWGC *gc;
+  RGWLC *lc;
   RGWObjectExpirer *obj_expirer;
   bool use_gc_thread;
+  bool use_lc_thread;
   bool quota_threads;
 
   int num_watchers;
@@ -1294,6 +1302,7 @@ protected:
   std::map<pthread_t, int> rados_map;
 
   librados::IoCtx gc_pool_ctx;        // .rgw.gc
+  librados::IoCtx lc_pool_ctx;        // .rgw.lc
   librados::IoCtx objexp_pool_ctx;
 
   bool pools_initialized;
@@ -1308,7 +1317,7 @@ protected:
 
 public:
   RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
-               gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false),
+               gc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
                num_watchers(0), watchers(NULL),
                watch_initialized(false),
                bucket_id_lock("rados_bucket_id"),
@@ -1326,6 +1335,9 @@ public:
     return max_req_id.inc();
   }
 
+  librados::IoCtx* get_lc_pool_ctx() {
+    return &lc_pool_ctx;
+  }
   void set_context(CephContext *_cct) {
     cct = _cct;
   }
@@ -1379,9 +1391,10 @@ public:
 
   CephContext *ctx() { return cct; }
   /** do all necessary setup of the storage device */
-  int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads) {
+  int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads) {
     set_context(_cct);
     use_gc_thread = _use_gc_thread;
+    use_lc_thread = _use_lc_thread;
     quota_threads = _quota_threads;
     return initialize();
   }
@@ -2165,6 +2178,9 @@ public:
   int process_expire_objects();
   int defer_gc(void *ctx, rgw_obj& obj);
 
+  int process_lc();
+  int list_lc_progress(map<string, int>& progress_map);
+  
   int bucket_check_index(rgw_bucket& bucket,
                          map<RGWObjCategory, RGWStorageStats> *existing_stats,
                          map<RGWObjCategory, RGWStorageStats> *calculated_stats);
@@ -2322,15 +2338,15 @@ public:
 class RGWStoreManager {
 public:
   RGWStoreManager() {}
-  static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads) {
-    RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads);
+  static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads) {
+    RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads);
     return store;
   }
   static RGWRados *get_raw_storage(CephContext *cct) {
     RGWRados *store = init_raw_storage_provider(cct);
     return store;
   }
-  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads);
+  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads);
   static RGWRados *init_raw_storage_provider(CephContext *cct);
   static void close_storage(RGWRados *store);
 
index 767ad1ecfc3d7b0b2f665c6786bede4bb21a1c9a..e20128d1aad5a5fa16ebd33ceea0e5ef78fcc4f3 100644 (file)
@@ -930,6 +930,30 @@ int RGWPutACLs_ObjStore::get_params()
   return ret;
 }
 
+int RGWPutLC_ObjStore::get_params()
+{
+  size_t cl = 0;
+  if (s->length)
+    cl = atoll(s->length);
+  if (cl) {
+    data = (char *)malloc(cl + 1);
+    if (!data) {
+       ret = -ENOMEM;
+       return ret;
+    }
+    int read_len;
+    int r = s->cio->read(data, cl, &read_len);
+    len = read_len;
+    if (r < 0)
+      return r;
+    data[len] = '\0';
+  } else {
+    len = 0;
+  }
+
+  return ret;
+}
+
 static int read_all_chunked_input(req_state *s, char **pdata, int *plen, int max_read)
 {
 #define READ_CHUNK 4096
index b9355e6457b01cb6a3b20393bb341fae6f622805..4e8d81f11185514c66b765c31a0bffd8850c6d14 100644 (file)
@@ -210,6 +210,21 @@ public:
   int get_params();
 };
 
+class RGWPutLC_ObjStore : public RGWPutLC {
+public:
+  RGWPutLC_ObjStore() {}
+  ~RGWPutLC_ObjStore() {}
+
+  int get_params();
+};
+
+class RGWDeleteLC_ObjStore : public RGWDeleteLC {
+public:
+  RGWDeleteLC_ObjStore() {}
+  ~RGWDeleteLC_ObjStore() {}
+
+};
+
 class RGWGetCORS_ObjStore : public RGWGetCORS {
 public:
   RGWGetCORS_ObjStore() {}
index 8c00e192c03fdd3fd41ff9a4a0ee8dbf29cdf772..49eaeebbcf290e29ae75606a481d190e3afafe43 100644 (file)
@@ -1612,6 +1612,27 @@ void RGWPutACLs_ObjStore_S3::send_response()
   dump_start(s);
 }
 
+void RGWPutLC_ObjStore_S3::send_response()
+{
+  if (ret)
+    set_req_state_err(s, ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
+void RGWDeleteLC_ObjStore_S3::send_response()
+{
+  if (ret == 0)
+      ret = STATUS_NO_CONTENT;
+  if (ret) {   
+    set_req_state_err(s, ret);
+  }
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
 void RGWGetCORS_ObjStore_S3::send_response()
 {
   if (ret) {
@@ -2145,6 +2166,8 @@ RGWOp *RGWHandler_ObjStore_Bucket_S3::op_put()
     return new RGWPutCORS_ObjStore_S3;
   } else if (is_request_payment_op()) {
     return new RGWSetRequestPayment_ObjStore_S3;
+  } else if(is_lc_op()) {
+    return new RGWPutLC_ObjStore_S3;
   }
   return new RGWCreateBucket_ObjStore_S3;
 }
@@ -2153,6 +2176,8 @@ RGWOp *RGWHandler_ObjStore_Bucket_S3::op_delete()
 {
   if (is_cors_op()) {
     return new RGWDeleteCORS_ObjStore_S3;
+  } else if(is_lc_op()) {
+    return new RGWDeleteLC_ObjStore_S3;
   }
   return new RGWDeleteBucket_ObjStore_S3;
 }
index 1c2d5290f3e90d4442f49f16a259db37def9ed77..231b3c909517a6f5583883a221c27228d6c0cb27 100644 (file)
@@ -202,6 +202,22 @@ public:
   void send_response();
 };
 
+class RGWPutLC_ObjStore_S3 : public RGWPutLC_ObjStore {
+public:
+  RGWPutLC_ObjStore_S3() {}
+  ~RGWPutLC_ObjStore_S3() {}
+  
+ void send_response();
+};
+
+class RGWDeleteLC_ObjStore_S3 : public RGWDeleteLC_ObjStore {
+public:
+  RGWDeleteLC_ObjStore_S3() {}
+  ~RGWDeleteLC_ObjStore_S3() {}
+  
+ void send_response();
+};
+
 class RGWGetCORS_ObjStore_S3 : public RGWGetCORS_ObjStore {
 public:
   RGWGetCORS_ObjStore_S3() {}
@@ -415,6 +431,9 @@ protected:
   bool is_cors_op() {
       return s->info.args.exists("cors");
   }
+  bool is_lc_op() {
+      return s->info.args.exists("lifecycle");
+  }
   bool is_obj_update_op() {
     return is_acl_op() || is_cors_op();
   }
index 0b0f9005c007572af81aca597fafeeb733da29dc..fcb2c2fabaa5c2fa30637a6b15f55b19993f461b 100644 (file)
@@ -52,6 +52,8 @@
     gc list                    dump expired garbage collection objects (specify
                                --include-all to list all entries, including unexpired)
     gc process                 manually process garbage
+    lc list                    list all bucket lifecycle progress
+    lc process                 manually process lifecycle
     metadata get               get metadata info
     metadata put               put metadata info
     metadata rm                remove metadata info
index 26568578c98aa4bf573e5a32d7dd4c66466f1a33..3499aa46abd192978cbee672380d81bedc66efef 100644 (file)
@@ -807,7 +807,7 @@ int main(int argc, char *argv[]){
 
   global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
   common_init_finish(g_ceph_context);
-  store = RGWStoreManager::get_storage(g_ceph_context, false, false);
+  store = RGWStoreManager::get_storage(g_ceph_context, false, false, false);
   g_test = new admin_log::test_helper();
   finisher = new Finisher(g_ceph_context);
 #ifdef GTEST