rgw/rgw_acl.cc
rgw/rgw_acl_s3.cc
rgw/rgw_acl_swift.cc
+ rgw/rgw_lc.cc
+ rgw/rgw_lc_s3.cc
rgw/rgw_client_io.cc
rgw/rgw_fcgi.cc
rgw/rgw_xml.cc
cls_method_handle_t h_rgw_gc_set_entry;
cls_method_handle_t h_rgw_gc_list;
cls_method_handle_t h_rgw_gc_remove;
+cls_method_handle_t h_rgw_lc_set_entry;
+cls_method_handle_t h_rgw_lc_rm_entry;
+cls_method_handle_t h_rgw_lc_get_entry;
+cls_method_handle_t h_rgw_lc_put_head;
+cls_method_handle_t h_rgw_lc_get_head;
+cls_method_handle_t h_rgw_lc_list_entry;
#define ROUND_BLOCK_SIZE 4096
return gc_remove(hctx, op.tags);
}
+static int rgw_cls_lc_set_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_rgw_lc_set_entry_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ bufferlist bl;
+ ::encode(op.entry, bl);
+
+ int ret = cls_cxx_map_set_val(hctx, op.entry.first, &bl);
+ return ret;
+}
+
+static int rgw_cls_lc_rm_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_rgw_lc_rm_entry_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ bufferlist bl;
+ ::encode(op.entry, bl);
+
+ int ret = cls_cxx_map_remove_key(hctx, op.entry.first);
+ return ret;
+}
+
+static int rgw_cls_lc_get_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+ cls_rgw_lc_get_entry_ret op_ret;
+ cls_rgw_lc_get_entry_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ map<string, bufferlist> vals;
+ string filter_prefix;
+ int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, 1, &vals);
+ if (ret < 0)
+ return ret;
+ map<string, bufferlist>::iterator it;
+ pair<string, int> entry;
+ if (!vals.empty()) {
+ it=vals.begin();
+ in_iter = it->second.begin();
+ try {
+ ::decode(entry, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_lc_get_entry(): failed to decode entry\n");
+ return -EINVAL;
+ }
+ }
+ op_ret.entry = entry;
+ ::encode(op_ret, *out);
+ return 0;
+}
+
+static int rgw_cls_lc_list_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_rgw_lc_list_entry_ret op_ret;
+ bufferlist::iterator iter;
+ map<string, bufferlist> vals;
+ int ret = cls_cxx_map_get_all_vals(hctx, &vals);
+ if (ret < 0)
+ return ret;
+ map<string, bufferlist>::iterator it;
+ pair<string, int> entry;
+ for (it = vals.begin(); it != vals.end(); it++) {
+ iter = it->second.begin();
+ try {
+ ::decode(entry, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_lc_list_entry(): failed to decode entry\n");
+ return -EINVAL;
+ }
+ op_ret.entries.insert(entry);
+ }
+ ::encode(op_ret, *out);
+ return 0;
+}
+
+static int rgw_cls_lc_put_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist::iterator in_iter = in->begin();
+
+ cls_rgw_lc_put_head_op op;
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ bufferlist bl;
+ ::encode(op.head, bl);
+ int ret = cls_cxx_map_write_header(hctx,&bl);
+ return ret;
+}
+
+static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ bufferlist bl;
+ int ret = cls_cxx_map_read_header(hctx, &bl);
+ if (ret < 0)
+ return ret;
+ cls_rgw_lc_obj_head head;
+ if (bl.length() != 0) {
+ bufferlist::iterator iter = bl.begin();
+ try {
+ ::decode(head, iter);
+ } catch (buffer::error& err) {
+ CLS_LOG(0, "ERROR: rgw_cls_lc_get_head(): failed to decode entry %s\n",err.what());
+ return -EINVAL;
+ }
+ }
+ cls_rgw_lc_get_head_ret op_ret;
+ op_ret.head = head;
+ ::encode(op_ret, *out);
+ return 0;
+}
+
void __cls_init()
{
CLS_LOG(1, "Loaded rgw class!");
cls_register_cxx_method(h_class, "gc_list", CLS_METHOD_RD, rgw_cls_gc_list, &h_rgw_gc_list);
cls_register_cxx_method(h_class, "gc_remove", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_gc_remove, &h_rgw_gc_remove);
+ /* lifecycle bucket list */
+ cls_register_cxx_method(h_class, "lc_set_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_set_entry, &h_rgw_lc_set_entry);
+ cls_register_cxx_method(h_class, "lc_rm_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_rm_entry, &h_rgw_lc_rm_entry);
+ cls_register_cxx_method(h_class, "lc_get_entry", CLS_METHOD_RD, rgw_cls_lc_get_entry, &h_rgw_lc_get_entry);
+ cls_register_cxx_method(h_class, "lc_put_head", CLS_METHOD_RD| CLS_METHOD_WR, rgw_cls_lc_put_head, &h_rgw_lc_put_head);
+ cls_register_cxx_method(h_class, "lc_get_head", CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head);
+ cls_register_cxx_method(h_class, "lc_list_entry", CLS_METHOD_RD, rgw_cls_lc_list_entry, &h_rgw_lc_list_entry);
+
return;
}
::encode(call, in);
op.exec("rgw", "gc_remove", in);
}
+
+int cls_rgw_lc_get_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head)
+{
+ bufferlist in, out;
+ int r = io_ctx.exec(oid, "rgw", "lc_get_head", in, out);
+ if (r < 0)
+ return r;
+
+ cls_rgw_lc_get_head_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ head = ret.head;
+
+ return r;
+}
+
+int cls_rgw_lc_put_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head)
+{
+ bufferlist in, out;
+ cls_rgw_lc_put_head_op call;
+ call.head = head;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "lc_put_head", in, out);
+ return r;
+}
+
+int cls_rgw_lc_get_entry(IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry)
+{
+ bufferlist in, out;
+ cls_rgw_lc_get_entry_op call;
+ call.marker = marker;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "lc_get_entry", in, out);
+ if (r < 0)
+ return r;
+
+ cls_rgw_lc_get_entry_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ entry = ret.entry;
+
+ return r;
+}
+
+int cls_rgw_lc_rm_entry(IoCtx& io_ctx, string& oid, pair<string, int>& entry)
+{
+ bufferlist in, out;
+ cls_rgw_lc_rm_entry_op call;
+ call.entry = entry;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "lc_rm_entry", in, out);
+ return r;
+}
+
+int cls_rgw_lc_set_entry(IoCtx& io_ctx, string& oid, pair<string, int>& entry)
+{
+ bufferlist in, out;
+ cls_rgw_lc_rm_entry_op call;
+ call.entry = entry;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "lc_set_entry", in, out);
+ return r;
+}
+
+int cls_rgw_lc_list(IoCtx& io_ctx, string& oid, map<string, int>& entries)
+{
+ bufferlist in, out;
+ int r = io_ctx.exec(oid, "rgw", "lc_list_entry", in, out);
+ if (r < 0)
+ return r;
+
+ cls_rgw_lc_list_entry_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ entries.insert(ret.entries.begin(),ret.entries.end());
+
+ return r;
+}
void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list<string>& tags);
+/* lifecycle */
+int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head);
+int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head);
+int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry);
+int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry);
+int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry);
+int cls_rgw_lc_list(librados::IoCtx& io_ctx, string& oid, map<string, int>& entries);
+
+
+
+
+
+
#endif
};
WRITE_CLASS_ENCODER(cls_rgw_bi_log_list_ret)
+struct cls_rgw_lc_get_entry_op {
+ string marker;
+ cls_rgw_lc_get_entry_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(marker, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_op)
+
+struct cls_rgw_lc_get_entry_ret {
+ pair<string, int> entry;
+
+ cls_rgw_lc_get_entry_ret() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entry, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entry, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_ret)
+
+struct cls_rgw_lc_rm_entry_op {
+ pair<string, int> entry;
+ cls_rgw_lc_rm_entry_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entry, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entry, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_rm_entry_op)
+
+struct cls_rgw_lc_set_entry_op {
+ pair<string, int> entry;
+ cls_rgw_lc_set_entry_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entry, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entry, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_set_entry_op)
+
+struct cls_rgw_lc_put_head_op {
+ cls_rgw_lc_obj_head head;
+
+
+ cls_rgw_lc_put_head_op() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(head, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(head, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_put_head_op)
+
+struct cls_rgw_lc_get_head_ret {
+ cls_rgw_lc_obj_head head;
+
+ cls_rgw_lc_get_head_ret() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(head, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(head, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_get_head_ret)
+
+struct cls_rgw_lc_list_entry_ret {
+ map<string, int> entries;
+
+ cls_rgw_lc_list_entry_ret() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_list_entry_ret)
#endif
};
WRITE_CLASS_ENCODER(cls_rgw_gc_obj_info)
+struct cls_rgw_lc_obj_head
+{
+ time_t start_date;
+ string marker;
+
+ cls_rgw_lc_obj_head() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(start_date, bl);
+ ::encode(marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(start_date, bl);
+ ::decode(marker, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_rgw_lc_obj_head)
+
#endif
*/
OPTION(rgw_enable_quota_threads, OPT_BOOL, true)
OPTION(rgw_enable_gc_threads, OPT_BOOL, true)
+OPTION(rgw_enable_lc_threads, OPT_BOOL, true)
+
OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
OPTION(rgw_port, OPT_STR, "") // port to listen, format as "8080" "5000", if not specified, rgw will not run external fcgi
OPTION(rgw_dns_name, OPT_STR, "")
OPTION(rgw_content_length_compat, OPT_BOOL, false) // Check both HTTP_CONTENT_LENGTH and CONTENT_LENGTH in fcgi env
+OPTION(rgw_lifecycle_enabled, OPT_BOOL, true) //rgw lifecycle enabled
+OPTION(rgw_lifecycle_thread, OPT_INT, 1) //start lifecycle thread number per radosgw
+OPTION(rgw_lifecycle_work_time, OPT_STR, "00:00-06:00") //job process lc at 00:00-06:00s
+OPTION(rgw_lc_lock_max_time, OPT_INT, 60) // total run time for a single gc processor work
+OPTION(rgw_lc_max_objs, OPT_INT, 32)
OPTION(rgw_script_uri, OPT_STR, "") // alternative value for SCRIPT_URI if not set in request
OPTION(rgw_request_uri, OPT_STR, "") // alternative value for REQUEST_URI if not set in request
OPTION(rgw_swift_url, OPT_STR, "") // the swift url, being published by the internal swift auth
rgw/rgw_acl.cc \
rgw/rgw_acl_s3.cc \
rgw/rgw_acl_swift.cc \
+ rgw/rgw_lc.cc \
+ rgw/rgw_lc_s3.cc \
rgw/rgw_client_io.cc \
rgw/rgw_fcgi.cc \
rgw/rgw_xml.cc \
rgw/rgw_acl.h \
rgw/rgw_acl_s3.h \
rgw/rgw_acl_swift.h \
+ rgw/rgw_lc.h \
+ rgw/rgw_lc_s3.h \
rgw/rgw_client_io.h \
rgw/rgw_fcgi.h \
rgw/rgw_xml.h \
#include "rgw_rados.h"
#include "rgw_acl.h"
#include "rgw_acl_s3.h"
+#include "rgw_lc.h"
#include "rgw_log.h"
#include "rgw_formats.h"
#include "rgw_usage.h"
cout << " gc list dump expired garbage collection objects (specify\n";
cout << " --include-all to list all entries, including unexpired)\n";
cout << " gc process manually process garbage\n";
+ cout << " lc list list all bucket lifecycle progress\n";
+ cout << " lc process manually process lifecycle\n";
cout << " metadata get get metadata info\n";
cout << " metadata put put metadata info\n";
cout << " metadata rm remove metadata info\n";
OPT_QUOTA_DISABLE,
OPT_GC_LIST,
OPT_GC_PROCESS,
+ OPT_LC_LIST,
+ OPT_LC_PROCESS,
OPT_ORPHANS_FIND,
OPT_ORPHANS_FINISH,
OPT_REGION_GET,
strcmp(cmd, "gc") == 0 ||
strcmp(cmd, "key") == 0 ||
strcmp(cmd, "log") == 0 ||
+ strcmp(cmd, "lc") == 0 ||
strcmp(cmd, "mdlog") == 0 ||
strcmp(cmd, "metadata") == 0 ||
strcmp(cmd, "object") == 0 ||
return OPT_GC_LIST;
if (strcmp(cmd, "process") == 0)
return OPT_GC_PROCESS;
+ } else if (strcmp(prev_cmd, "lc") == 0) {
+ if (strcmp(cmd, "list") == 0)
+ return OPT_LC_LIST;
+ if (strcmp(cmd, "process") == 0)
+ return OPT_LC_PROCESS;
} else if (strcmp(prev_cmd, "orphans") == 0) {
if (strcmp(cmd, "find") == 0)
return OPT_ORPHANS_FIND;
if (raw_storage_op) {
store = RGWStoreManager::get_raw_storage(g_ceph_context);
} else {
- store = RGWStoreManager::get_storage(g_ceph_context, false, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false, false);
}
if (!store) {
cerr << "couldn't init storage provider" << std::endl;
}
}
+ if (opt_cmd == OPT_LC_LIST) {
+ formatter->open_array_section("life cycle progress");
+ map<string, int> bucket_lc_map;
+ int ret = store->list_lc_progress(bucket_lc_map);
+ if (ret < 0) {
+ cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl;
+ return 1;
+ }
+ map<string, int>::iterator iter;
+ for (iter = bucket_lc_map.begin(); iter != bucket_lc_map.end(); ++iter) {
+ formatter->open_object_section("bucket_lc_info");
+ formatter->dump_string("bucket", iter->first);
+ string lc_status = LC_STATUS[iter->second];
+ formatter->dump_string("status", lc_status);
+ formatter->close_section(); // objs
+ formatter->flush(cout);
+ }
+ }
+
+ if (opt_cmd == OPT_LC_PROCESS) {
+ int ret = store->process_lc();
+ if (ret < 0) {
+ cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl;
+ return 1;
+ }
+ }
+
if (opt_cmd == OPT_ORPHANS_FIND) {
RGWOrphanSearch search(store, max_concurrent_ios, orphan_stale_secs);
}
if ((name.compare("acl") == 0) ||
+ (name.compare("lifecycle") == 0) ||
(name.compare("cors") == 0) ||
(name.compare("location") == 0) ||
(name.compare("logging") == 0) ||
#define RGW_SYS_PARAM_PREFIX "rgwx-"
#define RGW_ATTR_ACL RGW_ATTR_PREFIX "acl"
+#define RGW_ATTR_LC RGW_ATTR_PREFIX "lc"
#define RGW_ATTR_CORS RGW_ATTR_PREFIX "cors"
#define RGW_ATTR_ETAG RGW_ATTR_PREFIX "etag"
#define RGW_ATTR_BUCKETS RGW_ATTR_PREFIX "buckets"
--- /dev/null
+#include <string.h>\r
+#include <iostream>\r
+#include <map>\r
+\r
+#include "include/types.h"\r
+\r
+#include "common/Formatter.h"\r
+#include <common/errno.h>\r
+#include "auth/Crypto.h"\r
+#include "include/rados/librados.hpp"\r
+#include "cls/rgw/cls_rgw_client.h"\r
+#include "cls/refcount/cls_refcount_client.h"\r
+#include "cls/lock/cls_lock_client.h"\r
+#include <common/dout.h>\r
+#include "rgw_common.h"\r
+#include "rgw_bucket.h"\r
+#include "rgw_lc.h"\r
+#include "rgw_lc_s3.h"\r
+\r
+\r
+\r
+#define dout_subsys ceph_subsys_rgw\r
+\r
+const char* LC_STATUS[] = {\r
+ "UNINITIAL",\r
+ "PROCESSING",\r
+ "FAILED",\r
+ "COMPLETE"\r
+};\r
+\r
+using namespace std;\r
+using namespace librados;\r
+void RGWLifecycleConfiguration::add_rule(LCRule *rule)\r
+{\r
+ string id;\r
+ rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups\r
+ rule_map.insert(pair<string, LCRule>(id, *rule));\r
+ _add_rule(rule);\r
+}\r
+\r
+void RGWLifecycleConfiguration::_add_rule(LCRule *rule)\r
+{\r
+ string prefix;\r
+ LCExpiration expiration;\r
+ int days;\r
+ if (!rule->get_prefix(prefix)) {\r
+ ldout(cct, 5) << "ERROR: rule->get_prefix() failed" << dendl;\r
+ }\r
+ if (!rule->get_expiration(expiration)) {\r
+ ldout(cct, 5) << "ERROR: rule->get_expiration() failed" << dendl;\r
+ }\r
+ if (!expiration.get_days(&days)) {\r
+ ldout(cct, 5) << "ERROR: expiration->get_days() failed" << dendl;\r
+ }\r
+ prefix_map[prefix] = days;\r
+}\r
+\r
+void *RGWLC::LCWorker::entry() {\r
+ do {\r
+ utime_t start = ceph_clock_now(cct);\r
+ if (should_work(start)) {\r
+ dout(5) << "life cycle: start" << dendl;\r
+ int r = lc->process();\r
+ if (r < 0) {\r
+ dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;\r
+ }\r
+ dout(5) << "life cycle: stop" << dendl;\r
+ }\r
+ if (lc->going_down())\r
+ break;\r
+\r
+ utime_t end = ceph_clock_now(cct); \r
+ int secs = shedule_next_start_time(end);\r
+ time_t next_time = end + secs;\r
+ char buf[30];\r
+ char *nt = ctime_r(&next_time, buf);\r
+ dout(5) << "shedule life cycle next start time: " << nt <<dendl;\r
+\r
+ lock.Lock();\r
+ cond.WaitInterval(cct, lock, utime_t(secs, 0));\r
+ lock.Unlock();\r
+ } while (!lc->going_down());\r
+\r
+ return NULL;\r
+}\r
+\r
+void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {\r
+ cct = _cct;\r
+ store = _store;\r
+ max_objs = cct->_conf->rgw_lc_max_objs;\r
+ if (max_objs > HASH_PRIME)
+ max_objs = HASH_PRIME;
+
+ obj_names = new string[max_objs];
+
+ for (int i = 0; i < max_objs; i++) {
+ obj_names[i] = lc_oid_prefix;\r
+ char buf[32];
+ snprintf(buf, 32, ".%d", i);
+ obj_names[i].append(buf);
+ }\r
+}\r
+\r
+void RGWLC::finalize()\r
+{\r
+ delete[] obj_names;\r
+}\r
+\r
+bool RGWLC::if_already_run_today(time_t& start_date)\r
+{\r
+ struct tm bdt;\r
+ time_t begin_of_day;\r
+ utime_t now = ceph_clock_now(cct);\r
+ localtime_r(&start_date, &bdt);\r
+ bdt.tm_hour = 0;\r
+ bdt.tm_min = 0;\r
+ bdt.tm_sec = 0;\r
+ begin_of_day = mktime(&bdt);\r
+ if (now - begin_of_day < 24*60*60)\r
+ return true;\r
+ else \r
+ return false;\r
+}\r
+\r
+static std::vector<std::string> &split(const std::string &s, char delim, std::vector<std::string> &elems) {\r
+ std::stringstream ss(s);\r
+ std::string item;\r
+ while (std::getline(ss, item, delim)) {\r
+ elems.push_back(item);\r
+ }\r
+ return elems;\r
+}\r
+\r
+static std::vector<std::string> split(const std::string &s, char delim) {\r
+ std::vector<std::string> elems;\r
+ split(s, delim, elems);\r
+ return elems;\r
+}\r
+\r
+int RGWLC::bucket_lc_prepare(int index)\r
+{\r
+ map<string, int > entries;\r
+ int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], entries);\r
+ if (ret < 0)\r
+ return ret;\r
+ map<string, int>::iterator iter;\r
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {\r
+ pair<string, int > entry(iter->first, lc_uninitial);\r
+ ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;\r
+ break;\r
+ }\r
+ }\r
+ return ret;\r
+}\r
+int RGWLC::bucket_lc_process(string& shard_id)\r
+{\r
+ RGWLifecycleConfiguration config(cct);\r
+ RGWBucketInfo bucket_info;\r
+ map<string, bufferlist> bucket_attrs;\r
+ string prefix, delimiter, marker, next_marker, no_ns, end_marker, list_versions;\r
+ bool is_truncated;\r
+ bool default_config = false;\r
+ int default_days = 0;\r
+ vector<RGWObjEnt> objs; \r
+ RGWObjectCtx obj_ctx(store);\r
+ map<string, bool> common_prefixes;\r
+ vector<std::string> result;\r
+ result = split(shard_id, ':');\r
+ string bucket_tenant = result[0];\r
+ string bucket_name = result[1];\r
+ string bucket_id = result[2];\r
+ int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);\r
+ if (ret < 0) {\r
+ ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;\r
+ return ret;\r
+ }\r
+\r
+ ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;\r
+ if (ret !=0) {\r
+ ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;\r
+ return -ENOENT;\r
+ }\r
+ \r
+ RGWRados::Bucket target(store, bucket_info.bucket);\r
+ RGWRados::Bucket::List list_op(&target);\r
+\r
+ list_op.params.prefix = prefix;\r
+ list_op.params.delim = delimiter;\r
+ list_op.params.marker = marker;\r
+ list_op.params.end_marker = end_marker;\r
+ list_op.params.list_versions = false;\r
+ \r
+ map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);\r
+ if (aiter == bucket_attrs.end())\r
+ return 0;\r
+ \r
+ bufferlist::iterator iter(&aiter->second);\r
+ try {\r
+ config.decode(iter);\r
+ } catch (const buffer::error& e) {\r
+ ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;\r
+ return -1;\r
+ }\r
+\r
+ map<string, int>& prefix_map = config.get_prefix_map();\r
+ for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) {\r
+ if (prefix_iter->first.empty()) {\r
+ default_config = true;\r
+ default_days = prefix_iter->second;\r
+ continue;\r
+ }\r
+ }\r
+ if (default_config) { \r
+ do {\r
+ \r
+ objs.clear();\r
+ list_op.params.marker = list_op.get_next_marker();\r
+ ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); \r
+ if (ret < 0) {\r
+ if (ret == -ENOENT)\r
+ return 0;\r
+ ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;\r
+ return ret;\r
+ }\r
+\r
+ vector<RGWObjEnt>::iterator obj_iter;\r
+ int pos = 0;\r
+ utime_t now = ceph_clock_now(cct);\r
+ for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) {\r
+ bool prefix_match = false;\r
+ int match_days = 0; \r
+ map<string, int>& prefix_map = config.get_prefix_map();\r
+ \r
+ for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) {\r
+ if (prefix_iter->first.empty()) {\r
+ continue;\r
+ }\r
+ pos = (*obj_iter).key.name.find(prefix_iter->first, 0);\r
+ if (pos != 0) {\r
+ continue;\r
+ }\r
+ prefix_match = true;\r
+ match_days = prefix_iter->second;\r
+ break;\r
+ }\r
+ int days = 0; \r
+ if (prefix_match) {\r
+ days = match_days;\r
+ } else if (default_config) {\r
+ days = default_days;\r
+ } else {\r
+ continue;\r
+ } \r
+ if (now - (*obj_iter).mtime >= days*24*60*60) {\r
+ RGWObjectCtx rctx(store);\r
+ rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);\r
+ RGWObjState *state;\r
+ int ret = store->get_obj_state(&rctx, obj, &state, NULL, false);\r
+ if (ret < 0) {\r
+ return ret;\r
+ }\r
+ if (state->mtime != (*obj_iter).mtime) //Check mtime again to avoid delete a recently update object as much as possible\r
+ continue;\r
+ ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);\r
+ if (ret < 0) {\r
+ ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;\r
+ } else {\r
+ ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name <<dendl;\r
+ }\r
+ }\r
+ } \r
+ }while (is_truncated);\r
+ }else {\r
+ for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) {\r
+ if (prefix_iter->first.empty()) {\r
+ continue;\r
+ }\r
+ list_op.params.prefix = prefix_iter->first;\r
+ \r
+ do {\r
+ \r
+ objs.clear();\r
+ list_op.params.marker = list_op.get_next_marker(); \r
+ ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); \r
+ \r
+ if (ret < 0) {\r
+ if (ret == (-ENOENT))\r
+ return 0;\r
+ ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;\r
+ return ret;\r
+ }\r
+ \r
+ vector<RGWObjEnt>::iterator obj_iter;\r
+ int days = prefix_iter->second; \r
+ utime_t now = ceph_clock_now(cct);\r
+ \r
+ for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) { \r
+ if (now - (*obj_iter).mtime >= days*24*60*60) {\r
+ RGWObjectCtx rctx(store);\r
+ rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);\r
+ RGWObjState *state;\r
+ int ret = store->get_obj_state(&rctx, obj, &state, NULL, false); \r
+ if (ret < 0) {\r
+ return ret;\r
+ }\r
+ if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible\r
+ continue;\r
+ ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);\r
+ if (ret < 0) {\r
+ ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;\r
+ } else {\r
+ ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl;\r
+ }\r
+ }\r
+ }\r
+ } while (is_truncated);\r
+ } \r
+ }\r
+\r
+ return ret;\r
+}\r
+\r
+int RGWLC::bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, \r
+ pair<string, int >& entry, int& result)\r
+{\r
+ rados::cls::lock::Lock l(lc_index_lock_name);\r
+ do {\r
+ int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);\r
+ if (ret == -EBUSY) { /* already locked by another lc processor */\r
+ dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;\r
+ sleep(10);\r
+ continue;\r
+ }\r
+ if (ret < 0)\r
+ return 0;\r
+ dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;\r
+ if (result == -ENOENT) {\r
+ ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;\r
+ goto clean;\r
+ }\r
+ } else if (result < 0) {\r
+ entry.second = lc_failed; \r
+ } else {\r
+ entry.second = lc_complete; \r
+ }\r
+ \r
+ ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;\r
+ }\r
+clean: \r
+ l.unlock(&store->lc_pool_ctx, obj_names[index]);\r
+ dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;\r
+ return 0;\r
+ }while(1);\r
+}\r
+\r
+int RGWLC::list_lc_progress(map<string, int>& progress_map)\r
+{\r
+ int index = 0;\r
+ for(; index <max_objs; index++) {\r
+ map<string, int > entries;\r
+ int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], entries);\r
+ if (ret < 0)
+ return ret;\r
+ map<string, int>::iterator iter;\r
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ progress_map.insert(*iter);\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+int RGWLC::process()\r
+{\r
+ int max_secs = cct->_conf->rgw_lc_lock_max_time;\r
+\r
+ unsigned start;\r
+ int ret = get_random_bytes((char *)&start, sizeof(start));\r
+ if (ret < 0)\r
+ return ret;\r
+\r
+ for (int i = 0; i < max_objs; i++) {\r
+ int index = (i + start) % max_objs;\r
+ ret = process(index, max_secs);\r
+ if (ret < 0)\r
+ return ret;\r
+ }\r
+\r
+ return 0;\r
+}\r
+\r
+int RGWLC::process(int index, int max_lock_secs)\r
+{\r
+ rados::cls::lock::Lock l(lc_index_lock_name);\r
+ do {\r
+ utime_t now = ceph_clock_now(g_ceph_context);\r
+ pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS\r
+ if (max_lock_secs <= 0)\r
+ return -EAGAIN;\r
+ \r
+ utime_t time(max_lock_secs, 0);\r
+ l.set_duration(time);\r
+\r
+ int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);\r
+ if (ret == -EBUSY) { /* already locked by another lc processor */\r
+ dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;\r
+ sleep(10);\r
+ continue;\r
+ }\r
+ if (ret < 0)\r
+ return 0;\r
+\r
+ string marker;\r
+ cls_rgw_lc_obj_head head;\r
+ ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;\r
+ goto exit;\r
+ }\r
+\r
+ if(!if_already_run_today(head.start_date)) {\r
+ head.start_date = now;\r
+ head.marker.clear();\r
+ ret = bucket_lc_prepare(index);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;\r
+ goto exit;\r
+ }\r
+ }\r
+\r
+ ret = cls_rgw_lc_get_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;\r
+ goto exit;\r
+ }\r
+\r
+ if (entry.first.empty())\r
+ goto exit;\r
+\r
+ entry.second = lc_processing;\r
+ ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;\r
+ goto exit;\r
+ }\r
+\r
+ head.marker = entry.first;\r
+ ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);\r
+ if (ret < 0) {\r
+ dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;\r
+ goto exit;\r
+ }\r
+ l.unlock(&store->lc_pool_ctx, obj_names[index]);\r
+ ret = bucket_lc_process(entry.first);\r
+ ret = bucket_lc_post(index, max_lock_secs, head, entry, ret); \r
+ continue;\r
+exit: \r
+ l.unlock(&store->lc_pool_ctx, obj_names[index]);\r
+ return 0;\r
+ \r
+ }while(1);\r
+ \r
+}\r
+\r
+void RGWLC::start_processor()\r
+{\r
+ worker = new LCWorker(cct, this);\r
+ worker->create();\r
+}\r
+\r
+void RGWLC::stop_processor()\r
+{\r
+ if (worker) {\r
+ worker->stop();\r
+ worker->join();\r
+ }\r
+ delete worker;\r
+ worker = NULL;\r
+}\r
+\r
+void RGWLC::LCWorker::stop()\r
+{\r
+ Mutex::Locker l(lock);\r
+ cond.Signal();\r
+}\r
+\r
+bool RGWLC::going_down()\r
+{\r
+ return false;\r
+}\r
+\r
+bool RGWLC::LCWorker::should_work(utime_t& now)\r
+{\r
+ int start_hour;\r
+ int start_minite;\r
+ int end_hour;\r
+ int end_minite;\r
+ string worktime = cct->_conf->rgw_lifecycle_work_time;\r
+ sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite);\r
+ struct tm bdt;\r
+ time_t tt = now.sec();\r
+ localtime_r(&tt, &bdt);\r
+ if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minite)||\r
+ (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minite)) {\r
+ return true;\r
+ } else {\r
+ return false;\r
+ }\r
+\r
+}\r
+\r
+int RGWLC::LCWorker::shedule_next_start_time(utime_t& now)\r
+{\r
+ int start_hour;\r
+ int start_minite;\r
+ int end_hour;\r
+ int end_minite;\r
+ string worktime = cct->_conf->rgw_lifecycle_work_time;\r
+ sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite);\r
+ struct tm bdt;\r
+ time_t tt = now.sec();\r
+ time_t nt;\r
+ localtime_r(&tt, &bdt);\r
+ bdt.tm_hour = start_hour;\r
+ bdt.tm_min = start_minite;\r
+ bdt.tm_sec = 0;\r
+ nt = mktime(&bdt);\r
+ return (nt+24*60*60 - tt);\r
+}\r
+\r
--- /dev/null
+#ifndef CEPH_RGW_LC_H\r
+#define CEPH_RGW_LC_H\r
+\r
+#include <map>\r
+#include <string>\r
+#include <iostream>\r
+#include <include/types.h>\r
+\r
+#include "common/debug.h"\r
+\r
+#include "include/types.h"\r
+#include "include/atomic.h"\r
+#include "include/rados/librados.hpp"\r
+#include "common/Mutex.h"\r
+#include "common/Cond.h"\r
+#include "common/Thread.h"\r
+#include "rgw_common.h"\r
+#include "rgw_rados.h"\r
+#include "cls/rgw/cls_rgw_types.h"\r
+\r
+using namespace std;\r
+#define HASH_PRIME 7877\r
+static string lc_oid_prefix = "lc";\r
+static string lc_index_lock_name = "lc_process";\r
+\r
+extern const char* LC_STATUS[];\r
+\r
+typedef enum {\r
+ lc_uninitial = 0,\r
+ lc_processing,\r
+ lc_failed,\r
+ lc_complete,\r
+}LC_BUCKET_STATUS;\r
+\r
+class LCExpiration\r
+{\r
+protected:\r
+ string days;\r
+public:\r
+ LCExpiration() {}\r
+ ~LCExpiration() {}\r
+\r
+ void encode(bufferlist& bl) const {\r
+ ENCODE_START(2, 2, bl);\r
+ ::encode(days, bl);\r
+ ENCODE_FINISH(bl);\r
+ }\r
+ void decode(bufferlist::iterator& bl) {\r
+ DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);\r
+ ::decode(days, bl);\r
+ DECODE_FINISH(bl);\r
+ }\r
+ void dump(Formatter *f) const;\r
+// static void generate_test_instances(list<ACLOwner*>& o);\r
+ void set_days(const string& _days) { days = _days; }\r
+ bool get_days(int* _days) {*_days = atoi(days.c_str()); return true; }\r
+};\r
+WRITE_CLASS_ENCODER(LCExpiration)\r
+\r
+class LCRule\r
+{\r
+protected:\r
+ string id;\r
+ string prefix;\r
+ string status;\r
+ LCExpiration expiration;\r
+\r
+public:\r
+\r
+ LCRule(){};\r
+ ~LCRule(){};\r
+\r
+ bool get_id(string& _id) {\r
+ _id = id;\r
+ return true;\r
+ }\r
+\r
+ bool get_status(string& _status) {\r
+ _status = status;\r
+ return true;\r
+ }\r
+ \r
+ bool get_prefix(string& _prefix) {\r
+ _prefix = prefix;\r
+ return true;\r
+ }\r
+\r
+ bool get_expiration(LCExpiration& _expriation) {\r
+ _expriation = expiration;\r
+ return true;\r
+ }\r
+\r
+ void set_id(string*_id) {\r
+ id = *_id;\r
+ }\r
+\r
+ void set_prefix(string*_prefix) {\r
+ prefix = *_prefix;\r
+ }\r
+\r
+ void set_status(string*_status) {\r
+ status = *_status;\r
+ }\r
+\r
+ void set_expiration(LCExpiration*_expiration) {\r
+ expiration = *_expiration;\r
+ }\r
+ \r
+ void encode(bufferlist& bl) const {\r
+ ENCODE_START(1, 1, bl);\r
+ ::encode(id, bl);\r
+ ::encode(prefix, bl);\r
+ ::encode(status, bl);\r
+ ::encode(expiration, bl);\r
+ ENCODE_FINISH(bl);\r
+ }\r
+ void decode(bufferlist::iterator& bl) {\r
+ DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);\r
+ ::decode(id, bl);\r
+ ::decode(prefix, bl);\r
+ ::decode(status, bl);\r
+ ::decode(expiration, bl);\r
+ DECODE_FINISH(bl);\r
+ }\r
+\r
+};\r
+WRITE_CLASS_ENCODER(LCRule)\r
+\r
+class RGWLifecycleConfiguration\r
+{\r
+protected:\r
+ CephContext *cct;\r
+ map<string, int> prefix_map;\r
+ multimap<string, LCRule> rule_map;\r
+ void _add_rule(LCRule *rule);\r
+public:\r
+ RGWLifecycleConfiguration(CephContext *_cct) : cct(_cct) {}\r
+ RGWLifecycleConfiguration() : cct(NULL) {}\r
+\r
+ void set_ctx(CephContext *ctx) {\r
+ cct = ctx;\r
+ }\r
+\r
+ virtual ~RGWLifecycleConfiguration() {}\r
+\r
+// int get_perm(string& id, int perm_mask);\r
+// int get_group_perm(ACLGroupTypeEnum group, int perm_mask);\r
+ void encode(bufferlist& bl) const {\r
+ ENCODE_START(1, 1, bl);\r
+ ::encode(rule_map, bl);\r
+ ENCODE_FINISH(bl);\r
+ }\r
+ void decode(bufferlist::iterator& bl) {\r
+ DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);\r
+ ::decode(rule_map, bl);\r
+ multimap<string, LCRule>::iterator iter;\r
+ for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {\r
+ LCRule& rule = iter->second;\r
+ _add_rule(&rule);\r
+ }\r
+ DECODE_FINISH(bl);\r
+ }\r
+ void dump(Formatter *f) const;\r
+// static void generate_test_instances(list<RGWAccessControlList*>& o);\r
+\r
+ void add_rule(LCRule* rule);\r
+\r
+ multimap<string, LCRule>& get_rule_map() { return rule_map; }\r
+ map<string, int>& get_prefix_map() { return prefix_map; }\r
+/*\r
+ void create_default(string id, string name) {\r
+ ACLGrant grant;\r
+ grant.set_canon(id, name, RGW_PERM_FULL_CONTROL);\r
+ add_grant(&grant);\r
+ }\r
+*/\r
+};\r
+WRITE_CLASS_ENCODER(RGWLifecycleConfiguration)\r
+\r
+class RGWLC {\r
+ CephContext *cct;\r
+ RGWRados *store;\r
+ int max_objs;\r
+ string *obj_names;\r
+\r
+ class LCWorker : public Thread {\r
+ CephContext *cct;\r
+ RGWLC *lc;\r
+ Mutex lock;\r
+ Cond cond;\r
+\r
+ public:\r
+ LCWorker(CephContext *_cct, RGWLC *_lc) : cct(_cct), lc(_lc), lock("LCWorker") {}\r
+ void *entry();\r
+ void stop();\r
+ bool should_work(utime_t& now);\r
+ int shedule_next_start_time(utime_t& now);\r
+ };\r
+ \r
+ public:\r
+ LCWorker *worker;\r
+public:\r
+ RGWLC() : cct(NULL), store(NULL), worker(NULL) {}\r
+ ~RGWLC() {\r
+ stop_processor();\r
+ finalize();\r
+ }\r
+\r
+ void initialize(CephContext *_cct, RGWRados *_store);\r
+ void finalize();\r
+\r
+ int process();\r
+ int process(int index, int max_secs);\r
+ bool if_already_run_today(time_t& start_date);\r
+ int list_lc_progress(map<string, int>& progress_map);\r
+ int bucket_lc_prepare(int index);\r
+ int bucket_lc_process(string& shard_id);\r
+ int bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, \r
+ pair<string, int >& entry, int& result);\r
+ bool going_down();\r
+ void start_processor();\r
+ void stop_processor();\r
+};\r
+\r
+\r
+\r
+#endif\r
--- /dev/null
+#include <string.h>\r
+\r
+#include <iostream>\r
+#include <map>\r
+\r
+#include "include/types.h"\r
+\r
+#include "rgw_lc_s3.h"\r
+\r
+\r
+#define dout_subsys ceph_subsys_rgw\r
+\r
+using namespace std;\r
+\r
+bool LCExpiration_S3::xml_end(const char * el) {\r
+ LCDays_S3 *lc_days = static_cast<LCDays_S3 *>(find_first("Days"));\r
+
+ // ID is mandatory
+ if (!lc_days)\r
+ return false;
+ days = lc_days->get_data();\r
+ return true;
+}\r
+\r
+bool RGWLifecycleConfiguration_S3::xml_end(const char *el) {\r
+ XMLObjIter iter = find("Rule");\r
+ LCRule_S3 *rule = static_cast<LCRule_S3 *>(iter.get_next());\r
+ while (rule) {\r
+ add_rule(rule);\r
+ rule = static_cast<LCRule_S3 *>(iter.get_next());\r
+ }\r
+ return true;\r
+}\r
+\r
+bool LCRule_S3::xml_end(const char *el) {\r
+ LCID_S3 *lc_id;\r
+ LCPrefix_S3 *lc_prefix;\r
+ LCStatus_S3 *lc_status;\r
+ LCExpiration_S3 *lc_expiration;\r
+\r
+ id.clear();\r
+ prefix.clear();\r
+ status.clear();\r
+ \r
+ lc_id = static_cast<LCID_S3 *>(find_first("ID"));\r
+ if (!lc_id)\r
+ return false;\r
+ id = lc_id->get_data();\r
+\r
+ lc_prefix = static_cast<LCPrefix_S3 *>(find_first("Prefix"));\r
+ if (!lc_prefix)\r
+ return false;\r
+ prefix = lc_prefix->get_data();\r
+\r
+ lc_status = static_cast<LCStatus_S3 *>(find_first("Status"));\r
+ if (!lc_status)\r
+ return false;\r
+ status = lc_status->get_data();\r
+ \r
+ lc_expiration = static_cast<LCExpiration_S3 *>(find_first("Expiration"));\r
+ if (!lc_expiration)\r
+ return false;\r
+ expiration = *lc_expiration;\r
+\r
+ return true;\r
+}\r
+\r
+void LCRule_S3::to_xml(CephContext *cct, ostream& out) {\r
+ LCExpiration_S3& expir = static_cast<LCExpiration_S3&>(expiration);\r
+ out << "<Rule>" ;\r
+ out << "<ID>" << id << "</ID>";\r
+ out << "<Prefix>" << prefix << "</Prefix>";\r
+ out << "<Status>" << status << "</Status>";\r
+ expir.to_xml(out);\r
+ out << "</Rule>";\r
+}\r
+\r
+int RGWLifecycleConfiguration_S3::rebuild(RGWRados *store, RGWLifecycleConfiguration& dest)\r
+{\r
+ multimap<string, LCRule>::iterator iter;\r
+ for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {\r
+ LCRule& src_rule = iter->second;\r
+ bool rule_ok = true;\r
+\r
+ if (rule_ok) {\r
+ dest.add_rule(&src_rule);\r
+ }\r
+ }\r
+\r
+ return 0; \r
+}\r
+\r
+XMLObj *RGWLCXMLParser_S3::alloc_obj(const char *el)\r
+{\r
+ XMLObj * obj = NULL;\r
+ if (strcmp(el, "LifecycleConfiguration") == 0) {\r
+ obj = new RGWLifecycleConfiguration_S3(cct);\r
+ } else if (strcmp(el, "Rule") == 0) {\r
+ obj = new LCRule_S3();\r
+ } else if (strcmp(el, "ID") == 0) {\r
+ obj = new LCID_S3();\r
+ } else if (strcmp(el, "Prefix") == 0) {\r
+ obj = new LCPrefix_S3();\r
+ } else if (strcmp(el, "Status") == 0) {\r
+ obj = new LCStatus_S3();\r
+ } else if (strcmp(el, "Expiration") == 0) {\r
+ obj = new LCExpiration_S3();\r
+ } else if (strcmp(el, "Days") == 0) {\r
+ obj = new LCDays_S3();\r
+ }\r
+ return obj;\r
+}\r
--- /dev/null
+#ifndef CEPH_RGW_LC_S3_H\r
+#define CEPH_RGW_LC_S3_H\r
+\r
+#include <map>\r
+#include <string>\r
+#include <iostream>\r
+#include <include/types.h>\r
+\r
+#include <expat.h>\r
+\r
+#include "include/str_list.h"\r
+#include "rgw_lc.h"\r
+#include "rgw_xml.h"\r
+\r
+\r
+\r
+using namespace std;\r
+\r
+class LCRule_S3 : public LCRule, public XMLObj\r
+{\r
+public:\r
+ LCRule_S3() {}\r
+ ~LCRule_S3() {}\r
+\r
+ void to_xml(CephContext *cct, ostream& out);\r
+ bool xml_end(const char *el);\r
+ bool xml_start(const char *el, const char **attr);\r
+};\r
+\r
+class LCID_S3 : public XMLObj\r
+{\r
+public:\r
+ LCID_S3() {}\r
+ ~LCID_S3() {}\r
+ string& to_str() { return data; }\r
+};\r
+\r
+class LCPrefix_S3 : public XMLObj\r
+{\r
+public:\r
+ LCPrefix_S3() {}\r
+ ~LCPrefix_S3() {}\r
+ string& to_str() { return data; }\r
+};\r
+\r
+class LCStatus_S3 : public XMLObj\r
+{\r
+public:\r
+ LCStatus_S3() {}\r
+ ~LCStatus_S3() {}\r
+ string& to_str() { return data; }\r
+};\r
+\r
+class LCDays_S3 : public XMLObj\r
+{\r
+public:\r
+ LCDays_S3() {}\r
+ ~LCDays_S3() {}\r
+ string& to_str() { return data; }\r
+};\r
+\r
+class LCExpiration_S3 : public LCExpiration, public XMLObj\r
+{\r
+public:\r
+ LCExpiration_S3() {}\r
+ ~LCExpiration_S3() {}\r
+\r
+ bool xml_end(const char *el);\r
+ void to_xml(ostream& out) {\r
+ out << "<Expiration>" << "<Days>" << days << "</Days>"<< "</Expiration>";\r
+ }\r
+};\r
+\r
+class RGWLCXMLParser_S3 : public RGWXMLParser\r
+{\r
+ CephContext *cct;\r
+\r
+ XMLObj *alloc_obj(const char *el);\r
+public:\r
+ RGWLCXMLParser_S3(CephContext *_cct) : cct(_cct) {}\r
+};\r
+\r
+class RGWLifecycleConfiguration_S3 : public RGWLifecycleConfiguration, public XMLObj\r
+{\r
+public:\r
+ RGWLifecycleConfiguration_S3(CephContext *_cct) : RGWLifecycleConfiguration(_cct) {}\r
+ ~RGWLifecycleConfiguration_S3() {}\r
+\r
+ bool xml_end(const char *el);\r
+\r
+ void to_xml(ostream& out) {\r
+ out << "<LifecycleConfiguration xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";\r
+ multimap<string, LCRule>::iterator iter;\r
+ for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {\r
+ LCRule_S3& rule = static_cast<LCRule_S3&>(iter->second);\r
+ rule.to_xml(cct, out);\r
+ }\r
+ out << "</LifecycleConfiguration>";\r
+ }\r
+ int rebuild(RGWRados *store, RGWLifecycleConfiguration& dest);\r
+};\r
+\r
+\r
+#endif\r
int r = 0;
RGWRados *store = RGWStoreManager::get_storage(g_ceph_context,
- g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_quota_threads);
+ g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads);
if (!store) {
mutex.Lock();
init_timer.cancel_all_events();
common_init_finish(g_ceph_context);
- store = RGWStoreManager::get_storage(g_ceph_context, false, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false, false);
if (!store) {
std::cerr << "couldn't init storage provider" << std::endl;
return EIO;
#include "rgw_multi_del.h"
#include "rgw_cors.h"
#include "rgw_cors_s3.h"
-
+#include "rgw_lc.h"
+#include "rgw_lc_s3.h"
#include "rgw_client_io.h"
+#include "cls/lock/cls_lock_client.h"
+#include "cls/rgw/cls_rgw_client.h"
+
#define dout_subsys ceph_subsys_rgw
using namespace std;
+using namespace librados;
using ceph::crypto::MD5;
+
static string mp_ns = RGW_OBJ_NS_MULTIPART;
static string shadow_ns = RGW_OBJ_NS_SHADOW;
return 0;
}
+int RGWPutLC::verify_permission()
+{
+ bool perm;
+ ldout(s->cct, 0) << "ccc" <<s->bucket_acl << dendl;
+ perm = s->bucket_acl->verify_permission(s->user.user_id, RGW_PERM_WRITE_ACP, RGW_PERM_WRITE_ACP);
+ if (!perm)
+ return -EACCES;
+
+ return 0;
+}
+
+int RGWDeleteLC::verify_permission()
+{
+ bool perm;
+ ldout(s->cct, 0) << "ccc" <<s->bucket_acl << dendl;
+ perm = s->bucket_acl->verify_permission(s->user.user_id, RGW_PERM_WRITE_ACP, RGW_PERM_WRITE_ACP);
+ if (!perm)
+ return -EACCES;
+
+ return 0;
+}
+
void RGWPutACLs::pre_exec()
{
rgw_bucket_object_pre_exec(s);
}
+void RGWPutLC::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
+void RGWDeleteLC::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
void RGWPutACLs::execute()
{
bufferlist bl;
}
}
+static void get_lc_oid(struct req_state *s, string& oid)
+{
+ string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+ int max_objs = (s->cct->_conf->rgw_lc_max_objs > HASH_PRIME)?HASH_PRIME:s->cct->_conf->rgw_lc_max_objs;
+ int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
+ oid = lc_oid_prefix;
+ char buf[32];
+ snprintf(buf, 32, ".%d", index);
+ oid.append(buf);
+ return;
+}
+void RGWPutLC::execute()
+{
+ bufferlist bl;
+
+ RGWLifecycleConfiguration_S3 *config = NULL;
+ RGWLCXMLParser_S3 parser(s->cct);
+ RGWLifecycleConfiguration_S3 new_config(s->cct);
+ ret = 0;
+
+ if (!parser.init()) {
+ ret = -EINVAL;
+ return;
+ }
+
+ ret = get_params();
+ if (ret < 0)
+ return;
+
+ ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl;
+
+ if (!parser.parse(data, len, 1)) {
+ ret = -EACCES;
+ return;
+ }
+ config = static_cast<RGWLifecycleConfiguration_S3 *>(parser.find_first("LifecycleConfiguration"));
+ if (!config) {
+ ret = -EINVAL;
+ return;
+ }
+
+ if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
+ ldout(s->cct, 15) << "Old LifecycleConfiguration";
+ config->to_xml(*_dout);
+ *_dout << dendl;
+ }
+
+ ret = config->rebuild(store, new_config);
+ if (ret < 0)
+ return;
+
+ if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) {
+ ldout(s->cct, 15) << "New LifecycleConfiguration:";
+ new_config.to_xml(*_dout);
+ *_dout << dendl;
+ }
+
+ new_config.encode(bl);
+ map<string, bufferlist> attrs;
+ attrs[RGW_ATTR_LC] = bl;
+ ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, NULL, NULL);
+ if (ret < 0)
+ return;
+ string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id;
+ string oid;
+ get_lc_oid(s, oid);
+ pair<string, int> entry(shard_id, lc_uninitial);
+ int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
+ rados::cls::lock::Lock l(lc_index_lock_name);
+ utime_t time(max_lock_secs, 0);
+ l.set_duration(time);
+ librados::IoCtx *ctx = store->get_lc_pool_ctx();
+ do {
+ ret = l.lock_exclusive(ctx, oid);
+ if (ret == -EBUSY) {
+ dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
+ sleep(5);
+ continue;
+ }
+ if (ret < 0) {
+ dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl;
+ break;
+ }
+ ret = cls_rgw_lc_set_entry(*ctx, oid, entry);
+ if (ret < 0) {
+ dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl;
+ }
+ break;
+ }while(1);
+ l.unlock(ctx, oid);
+ return;
+}
+
+void RGWDeleteLC::execute()
+{
+ bufferlist bl;
+ map<string, bufferlist> attrs, rmattrs;
+ rmattrs[RGW_ATTR_LC] = bl;
+ ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &rmattrs, NULL);
+ string shard_id = s->bucket.name + ':' +s->bucket.bucket_id;
+ pair<string, int> entry(shard_id, lc_uninitial);
+ string oid;
+ get_lc_oid(s, oid);
+ int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time;
+ librados::IoCtx *ctx = store->get_lc_pool_ctx();
+ rados::cls::lock::Lock l(lc_index_lock_name);
+ utime_t time(max_lock_secs, 0);
+ l.set_duration(time);
+ do {
+ ret = l.lock_exclusive(ctx, oid);
+ if (ret == -EBUSY) {
+ dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl;
+ sleep(5);
+ continue;
+ }
+ if (ret < 0) {
+ dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl;
+ break;
+ }
+ ret = cls_rgw_lc_rm_entry(*ctx, oid, entry);
+ if (ret < 0) {
+ dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl;
+ }
+ break;
+ }while(1);
+ l.unlock(ctx, oid);
+ return;
+}
+
int RGWGetCORS::verify_permission()
{
if (s->user.user_id.compare(s->bucket_owner.get_id()) != 0)
virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
};
+class RGWPutLC : public RGWOp {
+protected:
+ int ret;
+ size_t len;
+ char *data;
+
+public:
+ RGWPutLC() {
+ ret = 0;
+ len = 0;
+ data = NULL;
+ }
+ virtual ~RGWPutLC() {
+ free(data);
+ }
+
+ int verify_permission();
+ void pre_exec();
+ void execute();
+
+// virtual int get_policy_from_state(RGWRados *store, struct req_state *s, stringstream& ss) { return 0; }
+ virtual int get_params() = 0;
+ virtual void send_response() = 0;
+ virtual const string name() { return "put_lifecycle"; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
+};
+
+class RGWDeleteLC : public RGWOp {
+protected:
+ int ret;
+ size_t len;
+ char *data;
+
+public:
+ RGWDeleteLC() {
+ ret = 0;
+ len = 0;
+ data = NULL;
+ }
+ virtual ~RGWDeleteLC() {
+ free(data);
+ }
+
+ int verify_permission();
+ void pre_exec();
+ void execute();
+
+ virtual void send_response() = 0;
+ virtual const string name() { return "delete_lifecycle"; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; }
+};
+
class RGWGetCORS : public RGWOp {
protected:
int ret;
#include "rgw_cache.h"
#include "rgw_acl.h"
#include "rgw_acl_s3.h" /* for dumping s3policy in debug log */
+#include "rgw_lc.h"
+#include "rgw_lc_s3.h"
#include "rgw_metadata.h"
#include "rgw_bucket.h"
#include "rgw_log.h"
#include "rgw_gc.h"
+#include "rgw_lc.h"
+
#include "rgw_object_expirer_core.h"
#define dout_subsys ceph_subsys_rgw
domain_root = ".rgw";
control_pool = ".rgw.control";
gc_pool = ".rgw.gc";
+ lc_pool = ".rgw.lc";
log_pool = ".log";
intent_log_pool = ".intent-log";
usage_log_pool = ".usage";
if (ret < 0)
return ret;
+ ret = open_lc_pool_ctx();
+ if (ret < 0)
+ return ret;
+
ret = open_objexp_pool_ctx();
if (ret < 0)
return ret;
obj_expirer->start_processor();
}
+ lc = new RGWLC();
+ lc->initialize(cct, this);
+
+ if (use_lc_thread)
+ lc->start_processor();
+
quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
return r;
}
+int RGWRados::open_lc_pool_ctx()
+{
+ const char *lc_pool = zone.lc_pool.name.c_str();
+ librados::Rados *rad = get_rados_handle();
+ int r = rad->ioctx_create(lc_pool, lc_pool_ctx);
+ if (r == -ENOENT) {
+ r = rad->pool_create(lc_pool);
+ if (r == -EEXIST)
+ r = 0;
+ if (r < 0)
+ return r;
+
+ r = rad->ioctx_create(lc_pool, lc_pool_ctx);
+ }
+
+ return r;
+}
+
int RGWRados::open_objexp_pool_ctx()
{
const char * const pool_name = zone.log_pool.name.c_str();
return gc->process();
}
+int RGWRados::list_lc_progress(map<string, int>& progress_map)
+{
+ return lc->list_lc_progress(progress_map);
+}
+
+int RGWRados::process_lc()
+{
+ return lc->process();
+}
+
int RGWRados::process_expire_objects()
{
obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now(cct));
return ++max_bucket_id;
}
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads)
{
int use_cache = cct->_conf->rgw_cache_enabled;
RGWRados *store = NULL;
store = new RGWCache<RGWRados>;
}
- if (store->initialize(cct, use_gc_thread, quota_threads) < 0) {
+ if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads) < 0) {
delete store;
return NULL;
}
class SafeTimer;
class ACLOwner;
class RGWGC;
+class RGWLC;
class RGWObjectExpirer;
/* flags for put_obj_meta() */
rgw_bucket domain_root;
rgw_bucket control_pool;
rgw_bucket gc_pool;
+ rgw_bucket lc_pool;
rgw_bucket log_pool;
rgw_bucket intent_log_pool;
rgw_bucket usage_log_pool;
::encode(domain_root, bl);
::encode(control_pool, bl);
::encode(gc_pool, bl);
+ ::encode(lc_pool, bl);
::encode(log_pool, bl);
::encode(intent_log_pool, bl);
::encode(usage_log_pool, bl);
::decode(domain_root, bl);
::decode(control_pool, bl);
::decode(gc_pool, bl);
+ ::decode(lc_pool, bl);
::decode(log_pool, bl);
::decode(intent_log_pool, bl);
::decode(usage_log_pool, bl);
class RGWRados
{
friend class RGWGC;
+ friend class RGWLC;
friend class RGWObjectExpirer;
friend class RGWStateLog;
friend class RGWReplicaLogger;
/** Open the pool used as root for this gateway */
int open_root_pool_ctx();
int open_gc_pool_ctx();
+ int open_lc_pool_ctx();
int open_objexp_pool_ctx();
int open_bucket_pool_ctx(const string& pool, librados::IoCtx& io_ctx);
};
RGWGC *gc;
+ RGWLC *lc;
RGWObjectExpirer *obj_expirer;
bool use_gc_thread;
+ bool use_lc_thread;
bool quota_threads;
int num_watchers;
std::map<pthread_t, int> rados_map;
librados::IoCtx gc_pool_ctx; // .rgw.gc
+ librados::IoCtx lc_pool_ctx; // .rgw.lc
librados::IoCtx objexp_pool_ctx;
bool pools_initialized;
public:
RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
- gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false),
+ gc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL),
watch_initialized(false),
bucket_id_lock("rados_bucket_id"),
return max_req_id.inc();
}
+ librados::IoCtx* get_lc_pool_ctx() {
+ return &lc_pool_ctx;
+ }
void set_context(CephContext *_cct) {
cct = _cct;
}
CephContext *ctx() { return cct; }
/** do all necessary setup of the storage device */
- int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads) {
+ int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads) {
set_context(_cct);
use_gc_thread = _use_gc_thread;
+ use_lc_thread = _use_lc_thread;
quota_threads = _quota_threads;
return initialize();
}
int process_expire_objects();
int defer_gc(void *ctx, rgw_obj& obj);
+ int process_lc();
+ int list_lc_progress(map<string, int>& progress_map);
+
int bucket_check_index(rgw_bucket& bucket,
map<RGWObjCategory, RGWStorageStats> *existing_stats,
map<RGWObjCategory, RGWStorageStats> *calculated_stats);
class RGWStoreManager {
public:
RGWStoreManager() {}
- static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads) {
- RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads);
+ static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads) {
+ RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads);
return store;
}
static RGWRados *get_raw_storage(CephContext *cct) {
RGWRados *store = init_raw_storage_provider(cct);
return store;
}
- static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads);
+ static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads);
static RGWRados *init_raw_storage_provider(CephContext *cct);
static void close_storage(RGWRados *store);
return ret;
}
+int RGWPutLC_ObjStore::get_params()
+{
+ size_t cl = 0;
+ if (s->length)
+ cl = atoll(s->length);
+ if (cl) {
+ data = (char *)malloc(cl + 1);
+ if (!data) {
+ ret = -ENOMEM;
+ return ret;
+ }
+ int read_len;
+ int r = s->cio->read(data, cl, &read_len);
+ len = read_len;
+ if (r < 0)
+ return r;
+ data[len] = '\0';
+ } else {
+ len = 0;
+ }
+
+ return ret;
+}
+
static int read_all_chunked_input(req_state *s, char **pdata, int *plen, int max_read)
{
#define READ_CHUNK 4096
int get_params();
};
+class RGWPutLC_ObjStore : public RGWPutLC {
+public:
+ RGWPutLC_ObjStore() {}
+ ~RGWPutLC_ObjStore() {}
+
+ int get_params();
+};
+
+class RGWDeleteLC_ObjStore : public RGWDeleteLC {
+public:
+ RGWDeleteLC_ObjStore() {}
+ ~RGWDeleteLC_ObjStore() {}
+
+};
+
class RGWGetCORS_ObjStore : public RGWGetCORS {
public:
RGWGetCORS_ObjStore() {}
dump_start(s);
}
+void RGWPutLC_ObjStore_S3::send_response()
+{
+ if (ret)
+ set_req_state_err(s, ret);
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+ dump_start(s);
+}
+
+void RGWDeleteLC_ObjStore_S3::send_response()
+{
+ if (ret == 0)
+ ret = STATUS_NO_CONTENT;
+ if (ret) {
+ set_req_state_err(s, ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+ dump_start(s);
+}
+
void RGWGetCORS_ObjStore_S3::send_response()
{
if (ret) {
return new RGWPutCORS_ObjStore_S3;
} else if (is_request_payment_op()) {
return new RGWSetRequestPayment_ObjStore_S3;
+ } else if(is_lc_op()) {
+ return new RGWPutLC_ObjStore_S3;
}
return new RGWCreateBucket_ObjStore_S3;
}
{
if (is_cors_op()) {
return new RGWDeleteCORS_ObjStore_S3;
+ } else if(is_lc_op()) {
+ return new RGWDeleteLC_ObjStore_S3;
}
return new RGWDeleteBucket_ObjStore_S3;
}
void send_response();
};
+class RGWPutLC_ObjStore_S3 : public RGWPutLC_ObjStore {
+public:
+ RGWPutLC_ObjStore_S3() {}
+ ~RGWPutLC_ObjStore_S3() {}
+
+ void send_response();
+};
+
+class RGWDeleteLC_ObjStore_S3 : public RGWDeleteLC_ObjStore {
+public:
+ RGWDeleteLC_ObjStore_S3() {}
+ ~RGWDeleteLC_ObjStore_S3() {}
+
+ void send_response();
+};
+
class RGWGetCORS_ObjStore_S3 : public RGWGetCORS_ObjStore {
public:
RGWGetCORS_ObjStore_S3() {}
bool is_cors_op() {
return s->info.args.exists("cors");
}
+ bool is_lc_op() {
+ return s->info.args.exists("lifecycle");
+ }
bool is_obj_update_op() {
return is_acl_op() || is_cors_op();
}
gc list dump expired garbage collection objects (specify
--include-all to list all entries, including unexpired)
gc process manually process garbage
+ lc list list all bucket lifecycle progress
+ lc process manually process lifecycle
metadata get get metadata info
metadata put put metadata info
metadata rm remove metadata info
global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
- store = RGWStoreManager::get_storage(g_ceph_context, false, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false, false);
g_test = new admin_log::test_helper();
finisher = new Finisher(g_ceph_context);
#ifdef GTEST