From abe4b1378cc0e23e0da9c76c034c6d592b81b105 Mon Sep 17 00:00:00 2001 From: Ji Chen Date: Mon, 13 Jun 2016 10:32:34 +0800 Subject: [PATCH] RGW:lifecycle feature[rebase] As same as amazon S3 interface,"PUT Bucket lifecycle" and "DELETE Bucket lifecycle" have been implemented, "GET Bucket lifecycle" not realized yet as S3cmd has not realize it also. The feature`s main point is to remove expire file per day. Files transfer from hot layer to cold layer is not supported. ToDo:Maybe to transfer from replicate pool to EC pool or from ssd to sata pool will be valuable. Now put all buckets which should do lifecycle into shard objects in .rgw.lc pool. lifecycle config file format: sample-rule enable 1 Signed-off-by: Ji Chen --- src/cls/rgw/cls_rgw.cc | 166 ++++++++- src/cls/rgw/cls_rgw_client.cc | 102 ++++++ src/cls/rgw/cls_rgw_client.h | 16 + src/cls/rgw/cls_rgw_ops.h | 158 +++++++++ src/cls/rgw/cls_rgw_types.h | 28 +- src/common/config_opts.h | 7 + src/rgw/CMakeLists.txt | 2 + src/rgw/Makefile.am | 4 + src/rgw/librgw.cc | 1 + src/rgw/rgw_admin.cc | 98 ++++-- src/rgw/rgw_common.cc | 1 + src/rgw/rgw_common.h | 1 + src/rgw/rgw_lc.cc | 546 +++++++++++++++++++++++++++++ src/rgw/rgw_lc.h | 227 ++++++++++++ src/rgw/rgw_lc_s3.cc | 112 ++++++ src/rgw/rgw_lc_s3.h | 104 ++++++ src/rgw/rgw_main.cc | 2 +- src/rgw/rgw_object_expirer.cc | 2 +- src/rgw/rgw_op.cc | 183 ++++++++++ src/rgw/rgw_op.h | 52 +++ src/rgw/rgw_rados.cc | 47 ++- src/rgw/rgw_rados.h | 26 +- src/rgw/rgw_realm_reloader.cc | 1 + src/rgw/rgw_rest.cc | 27 +- src/rgw/rgw_rest.h | 15 + src/rgw/rgw_rest_s3.cc | 25 ++ src/rgw/rgw_rest_s3.h | 19 + src/test/cli/radosgw-admin/help.t | 2 + src/test/test_rgw_admin_opstate.cc | 2 +- 29 files changed, 1933 insertions(+), 43 deletions(-) create mode 100644 src/rgw/rgw_lc.cc create mode 100644 src/rgw/rgw_lc.h create mode 100644 src/rgw/rgw_lc_s3.cc create mode 100644 src/rgw/rgw_lc_s3.h diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 46c2a3f9b49fc..161c78c971242 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -50,6 +50,12 @@ cls_method_handle_t h_rgw_user_usage_log_trim; cls_method_handle_t h_rgw_gc_set_entry; cls_method_handle_t h_rgw_gc_list; cls_method_handle_t h_rgw_gc_remove; +cls_method_handle_t h_rgw_lc_set_entry; +cls_method_handle_t h_rgw_lc_rm_entry; +cls_method_handle_t h_rgw_lc_get_next_entry; +cls_method_handle_t h_rgw_lc_put_head; +cls_method_handle_t h_rgw_lc_get_head; +cls_method_handle_t h_rgw_lc_list_entries; #define ROUND_BLOCK_SIZE 4096 @@ -2818,7 +2824,7 @@ static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_u rgw_user_bucket ub(puser->to_str(), entry.bucket); rgw_usage_log_entry& le = (*usage)[ub]; le.aggregate(entry); - + return 0; } @@ -2962,7 +2968,7 @@ static int gc_omap_remove(cls_method_context_t hctx, int type, const string& key static bool key_in_index(const string& key, int index_type) { - const string& prefix = gc_index_prefixes[index_type]; + const string& prefix = gc_index_prefixes[index_type]; return (key.compare(0, prefix.size(), prefix) == 0); } @@ -3221,6 +3227,154 @@ static int rgw_cls_gc_remove(cls_method_context_t hctx, bufferlist *in, bufferli return gc_remove(hctx, op.tags); } +static int rgw_cls_lc_set_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_rgw_lc_set_entry_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n"); + return -EINVAL; + } + + bufferlist bl; + ::encode(op.entry, bl); + + int ret = cls_cxx_map_set_val(hctx, op.entry.first, &bl); + return ret; +} + +static int rgw_cls_lc_rm_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_rgw_lc_rm_entry_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n"); + return -EINVAL; + } + + bufferlist bl; + ::encode(op.entry, bl); + + int ret = cls_cxx_map_remove_key(hctx, op.entry.first); + return ret; +} + +static int rgw_cls_lc_get_next_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + cls_rgw_lc_get_next_entry_ret op_ret; + cls_rgw_lc_get_next_entry_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n"); + return -EINVAL; + } + + map vals; + string filter_prefix; + int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, 1, &vals); + if (ret < 0) + return ret; + map::iterator it; + pair entry; + if (!vals.empty()) { + it=vals.begin(); + in_iter = it->second.begin(); + try { + ::decode(entry, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_get_next_entry(): failed to decode entry\n"); + return -EIO; + } + } + op_ret.entry = entry; + ::encode(op_ret, *out); + return 0; +} + +static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_rgw_lc_list_entries_op op; + bufferlist::iterator in_iter = in->begin(); + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_rm_entry(): failed to decode entry\n"); + return -EINVAL; + } + cls_rgw_lc_list_entries_ret op_ret; + bufferlist::iterator iter; + map vals; + string filter_prefix; + int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, &vals); + if (ret < 0) + return ret; + map::iterator it; + pair entry; + for (it = vals.begin(); it != vals.end(); it++) { + iter = it->second.begin(); + try { + ::decode(entry, iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_list_entries(): failed to decode entry\n"); + return -EIO; + } + op_ret.entries.insert(entry); + } + ::encode(op_ret, *out); + return 0; +} + +static int rgw_cls_lc_put_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_rgw_lc_put_head_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: rgw_cls_lc_set_entry(): failed to decode entry\n"); + return -EINVAL; + } + + bufferlist bl; + ::encode(op.head, bl); + int ret = cls_cxx_map_write_header(hctx,&bl); + return ret; +} + +static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist bl; + int ret = cls_cxx_map_read_header(hctx, &bl); + if (ret < 0) + return ret; + cls_rgw_lc_obj_head head; + if (bl.length() != 0) { + bufferlist::iterator iter = bl.begin(); + try { + ::decode(head, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: rgw_cls_lc_get_head(): failed to decode entry %s\n",err.what()); + return -EINVAL; + } + } else { + head.start_date = 0; + head.marker.clear(); + } + cls_rgw_lc_get_head_ret op_ret; + op_ret.head = head; + ::encode(op_ret, *out); + return 0; +} + void __cls_init() { CLS_LOG(1, "Loaded rgw class!"); @@ -3265,6 +3419,14 @@ void __cls_init() cls_register_cxx_method(h_class, "gc_list", CLS_METHOD_RD, rgw_cls_gc_list, &h_rgw_gc_list); cls_register_cxx_method(h_class, "gc_remove", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_gc_remove, &h_rgw_gc_remove); + /* lifecycle bucket list */ + cls_register_cxx_method(h_class, "lc_set_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_set_entry, &h_rgw_lc_set_entry); + cls_register_cxx_method(h_class, "lc_rm_entry", CLS_METHOD_RD | CLS_METHOD_WR, rgw_cls_lc_rm_entry, &h_rgw_lc_rm_entry); + cls_register_cxx_method(h_class, "lc_get_next_entry", CLS_METHOD_RD, rgw_cls_lc_get_next_entry, &h_rgw_lc_get_next_entry); + cls_register_cxx_method(h_class, "lc_put_head", CLS_METHOD_RD| CLS_METHOD_WR, rgw_cls_lc_put_head, &h_rgw_lc_put_head); + cls_register_cxx_method(h_class, "lc_get_head", CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head); + cls_register_cxx_method(h_class, "lc_list_entries", CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries); + return; } diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 1cf1156385bb2..41647845691dd 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -650,3 +650,105 @@ void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list& t ::encode(call, in); op.exec("rgw", "gc_remove", in); } + +int cls_rgw_lc_get_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, "rgw", "lc_get_head", in, out); + if (r < 0) + return r; + + cls_rgw_lc_get_head_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + head = ret.head; + + return r; +} + +int cls_rgw_lc_put_head(IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head) +{ + bufferlist in, out; + cls_rgw_lc_put_head_op call; + call.head = head; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_put_head", in, out); + return r; +} + +int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, string& oid, string& marker, pair& entry) +{ + bufferlist in, out; + cls_rgw_lc_get_next_entry_op call; + call.marker = marker; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_get_next_entry", in, out); + if (r < 0) + return r; + + cls_rgw_lc_get_next_entry_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + entry = ret.entry; + + return r; +} + +int cls_rgw_lc_rm_entry(IoCtx& io_ctx, string& oid, pair& entry) +{ + bufferlist in, out; + cls_rgw_lc_rm_entry_op call; + call.entry = entry; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_rm_entry", in, out); + return r; +} + +int cls_rgw_lc_set_entry(IoCtx& io_ctx, string& oid, pair& entry) +{ + bufferlist in, out; + cls_rgw_lc_rm_entry_op call; + call.entry = entry; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "lc_set_entry", in, out); + return r; +} + +int cls_rgw_lc_list(IoCtx& io_ctx, string& oid, + const string& marker, + uint32_t max_entries, + map& entries) +{ + bufferlist in, out; + cls_rgw_lc_list_entries_op op; + + entries.clear(); + + op.marker = marker; + op.max_entries = max_entries; + + ::encode(op, in); + + int r = io_ctx.exec(oid, "rgw", "lc_list_entries", in, out); + if (r < 0) + return r; + + cls_rgw_lc_list_entries_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + entries.insert(ret.entries.begin(),ret.entries.end()); + + return r; +} diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 1b02a5eabf4e6..4c68385f5a8ff 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -474,4 +474,20 @@ int cls_rgw_gc_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32 void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list& tags); +/* lifecycle */ +int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head); +int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head); +int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, string& oid, string& marker, pair& entry); +int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, string& oid, pair& entry); +int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, string& oid, pair& entry); +int cls_rgw_lc_list(librados::IoCtx& io_ctx, string& oid, + const string& marker, + uint32_t max_entries, + map& entries); + + + + + + #endif diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 15a638a392339..4aed26b4f5eb6 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -937,4 +937,162 @@ struct cls_rgw_bi_log_list_ret { }; WRITE_CLASS_ENCODER(cls_rgw_bi_log_list_ret) +struct cls_rgw_lc_get_next_entry_op { + string marker; + cls_rgw_lc_get_next_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_get_next_entry_op) + +struct cls_rgw_lc_get_next_entry_ret { + pair entry; + + cls_rgw_lc_get_next_entry_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_get_next_entry_ret) + +struct cls_rgw_lc_rm_entry_op { + pair entry; + cls_rgw_lc_rm_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_rm_entry_op) + +struct cls_rgw_lc_set_entry_op { + pair entry; + cls_rgw_lc_set_entry_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_set_entry_op) + +struct cls_rgw_lc_put_head_op { + cls_rgw_lc_obj_head head; + + + cls_rgw_lc_put_head_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(head, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(head, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_put_head_op) + +struct cls_rgw_lc_get_head_ret { + cls_rgw_lc_obj_head head; + + cls_rgw_lc_get_head_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(head, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(head, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_get_head_ret) + +struct cls_rgw_lc_list_entries_op { + string marker; + uint32_t max_entries; + + cls_rgw_lc_list_entries_op() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(marker, bl); + ::encode(max_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(marker, bl); + ::decode(max_entries, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_op) + +struct cls_rgw_lc_list_entries_ret { + map entries; + + cls_rgw_lc_list_entries_ret() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(entries, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret) + #endif /* CEPH_CLS_RGW_OPS_H */ diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index cf143ce92cc09..7527db7c7276d 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -532,7 +532,7 @@ struct rgw_bi_log_entry { void dump(Formatter *f) const; void decode_json(JSONObj *obj); static void generate_test_instances(list& o); - + bool is_versioned() { return ((bilog_flags & RGW_BILOG_FLAG_VERSIONED_OP) != 0); } @@ -794,7 +794,7 @@ struct rgw_user_bucket { return true; else if (!comp) return bucket.compare(ub2.bucket) < 0; - + return false; } }; @@ -930,4 +930,28 @@ struct cls_rgw_gc_obj_info }; WRITE_CLASS_ENCODER(cls_rgw_gc_obj_info) +struct cls_rgw_lc_obj_head +{ + time_t start_date; + string marker; + + cls_rgw_lc_obj_head() {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(start_date, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(start_date, bl); + ::decode(marker, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_obj_head) + #endif diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 39406bc183bec..bcba3e432c0a7 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1282,6 +1282,8 @@ OPTION(rgw_bucket_index_max_aio, OPT_U32, 8) */ OPTION(rgw_enable_quota_threads, OPT_BOOL, true) OPTION(rgw_enable_gc_threads, OPT_BOOL, true) +OPTION(rgw_enable_lc_threads, OPT_BOOL, true) + OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id") OPTION(rgw_enable_apis, OPT_STR, "s3, s3website, swift, swift_auth, admin") @@ -1293,6 +1295,11 @@ OPTION(rgw_port, OPT_STR, "") // port to listen, format as "8080" "5000", if no OPTION(rgw_dns_name, OPT_STR, "") // hostname suffix on buckets OPTION(rgw_dns_s3website_name, OPT_STR, "") // hostname suffix on buckets for s3-website endpoint OPTION(rgw_content_length_compat, OPT_BOOL, false) // Check both HTTP_CONTENT_LENGTH and CONTENT_LENGTH in fcgi env +OPTION(rgw_lifecycle_enabled, OPT_BOOL, true) //rgw lifecycle enabled +OPTION(rgw_lifecycle_thread, OPT_INT, 1) //start lifecycle thread number per radosgw +OPTION(rgw_lifecycle_work_time, OPT_STR, "00:00-06:00") //job process lc at 00:00-06:00s +OPTION(rgw_lc_lock_max_time, OPT_INT, 60) // total run time for a single gc processor work +OPTION(rgw_lc_max_objs, OPT_INT, 32) OPTION(rgw_script_uri, OPT_STR, "") // alternative value for SCRIPT_URI if not set in request OPTION(rgw_request_uri, OPT_STR, "") // alternative value for REQUEST_URI if not set in request OPTION(rgw_swift_url, OPT_STR, "") // the swift url, being published by the internal swift auth diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index fa66eb4ba5303..6c449233dc5ad 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -41,6 +41,8 @@ set(rgw_a_srcs rgw_ldap.cc rgw_loadgen.cc rgw_log.cc + rgw_lc.cc + rgw_lc_s3.cc rgw_metadata.cc rgw_multi.cc rgw_multi_del.cc diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index 96c9c1a7bc3fd..90606ad08d6ef 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -28,6 +28,8 @@ librgw_la_SOURCES = \ rgw/rgw_auth.cc \ rgw/rgw_coroutine.cc \ rgw/rgw_cr_rados.cc \ + rgw/rgw_lc.cc \ + rgw/rgw_lc_s3.cc \ rgw/rgw_tools.cc \ rgw/rgw_basic_types.cc \ rgw/rgw_bucket.cc \ @@ -194,6 +196,8 @@ noinst_HEADERS += \ rgw/rgw_auth.h \ rgw/rgw_auth_decoimpl.h \ rgw/rgw_b64.h \ + rgw/rgw_lc.h \ + rgw/rgw_lc_s3.h \ rgw/rgw_client_io.h \ rgw/rgw_coroutine.h \ rgw/rgw_cr_rados.h \ diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index cded711ff1dea..1b15a7014c622 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -457,6 +457,7 @@ namespace rgw { store = RGWStoreManager::get_storage(g_ceph_context, g_conf->rgw_enable_gc_threads, + g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads, g_conf->rgw_run_sync_thread); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index addf36c579ab0..4e851d257ee86 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -26,7 +26,8 @@ #include "rgw_rados.h" #include "rgw_acl.h" #include "rgw_acl_s3.h" - +#include "rgw_lc.h" +#include "rgw_log.h" #include "rgw_formats.h" #include "rgw_usage.h" #include "rgw_replica_log.h" @@ -45,7 +46,7 @@ using namespace std; static RGWRados *store = NULL; -void _usage() +void _usage() { cout << "usage: radosgw-admin [options...]" << std::endl; cout << "commands:\n"; @@ -131,6 +132,8 @@ void _usage() cout << " gc list dump expired garbage collection objects (specify\n"; cout << " --include-all to list all entries, including unexpired)\n"; cout << " gc process manually process garbage\n"; + cout << " lc list list all bucket lifecycle progress\n"; + cout << " lc process manually process lifecycle\n"; cout << " metadata get get metadata info\n"; cout << " metadata put put metadata info\n"; cout << " metadata rm remove metadata info\n"; @@ -310,6 +313,8 @@ enum { OPT_QUOTA_DISABLE, OPT_GC_LIST, OPT_GC_PROCESS, + OPT_LC_LIST, + OPT_LC_PROCESS, OPT_ORPHANS_FIND, OPT_ORPHANS_FINISH, OPT_ORPHANS_LIST_JOBS, @@ -326,7 +331,7 @@ enum { OPT_ZONEGROUPMAP_GET, OPT_ZONEGROUPMAP_SET, OPT_ZONEGROUPMAP_UPDATE, - OPT_ZONE_CREATE, + OPT_ZONE_CREATE, OPT_ZONE_DELETE, OPT_ZONE_GET, OPT_ZONE_MODIFY, @@ -399,16 +404,17 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ strcmp(cmd, "data") == 0 || strcmp(cmd, "datalog") == 0 || strcmp(cmd, "error") == 0 || - strcmp(cmd, "gc") == 0 || + strcmp(cmd, "gc") == 0 || strcmp(cmd, "key") == 0 || strcmp(cmd, "log") == 0 || + strcmp(cmd, "lc") == 0 || strcmp(cmd, "mdlog") == 0 || strcmp(cmd, "metadata") == 0 || strcmp(cmd, "object") == 0 || strcmp(cmd, "objects") == 0 || strcmp(cmd, "olh") == 0 || strcmp(cmd, "opstate") == 0 || - strcmp(cmd, "orphans") == 0 || + strcmp(cmd, "orphans") == 0 || strcmp(cmd, "period") == 0 || strcmp(cmd, "pool") == 0 || strcmp(cmd, "pools") == 0 || @@ -660,6 +666,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_GC_LIST; if (strcmp(cmd, "process") == 0) return OPT_GC_PROCESS; + } else if (strcmp(prev_cmd, "lc") == 0) { + if (strcmp(cmd, "list") == 0) + return OPT_LC_LIST; + if (strcmp(cmd, "process") == 0) + return OPT_LC_PROCESS; } else if (strcmp(prev_cmd, "orphans") == 0) { if (strcmp(cmd, "find") == 0) return OPT_ORPHANS_FIND; @@ -854,7 +865,7 @@ int bucket_stats(rgw_bucket& bucket, int shard_id, Formatter *formatter) formatter->dump_string("bucket", bucket.name); formatter->dump_string("pool", bucket.data_pool); formatter->dump_string("index_pool", bucket.index_pool); - + formatter->dump_string("id", bucket.bucket_id); formatter->dump_string("marker", bucket.marker); ::encode_json("owner", bucket_info.owner, formatter); @@ -957,7 +968,7 @@ static int read_decode_json(const string& infile, T& t) } return 0; } - + template static int read_decode_json(const string& infile, T& t, K *k) { @@ -1196,7 +1207,7 @@ int check_obj_locator_underscore(RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_o f->dump_string("oid", oid); f->dump_string("locator", locator); - + RGWObjectCtx obj_ctx(store); RGWRados::Object op_target(store, bucket_info, obj_ctx, obj); @@ -1294,7 +1305,7 @@ int do_check_object_locator(const string& tenant_name, const string& bucket_name list_op.params.ns = ns; list_op.params.enforce_ns = true; list_op.params.list_versions = true; - + f->open_array_section("check_objects"); do { ret = list_op.list_objects(max_entries - count, &result, &common_prefixes, &truncated); @@ -1311,7 +1322,7 @@ int do_check_object_locator(const string& tenant_name, const string& bucket_name if (key.name[0] == '_') { ret = check_obj_locator_underscore(bucket_info, obj, key, fix, remove_bad, f); - + if (ret >= 0) { ret = check_obj_tail_locator_underscore(bucket_info, obj, key, fix, f); } @@ -1672,7 +1683,7 @@ static void get_md_sync_status(list& status) } status.push_back(status_str); - + uint64_t full_total = 0; uint64_t full_complete = 0; @@ -1811,7 +1822,7 @@ static void get_data_sync_status(const string& source_zone, list& status } push_ss(ss, status, tab) << status_str; - + uint64_t full_total = 0; uint64_t full_complete = 0; @@ -1959,7 +1970,7 @@ static void sync_status(Formatter *formatter) tab_dump("data sync", width, data_status); } -int main(int argc, char **argv) +int main(int argc, char **argv) { vector args; argv_to_vec(argc, (const char **)argv, args); @@ -2437,7 +2448,7 @@ int main(int argc, char **argv) bool raw_period_pull = opt_cmd == OPT_PERIOD_PULL && remote.empty() && !url.empty(); bool raw_storage_op = (opt_cmd == OPT_ZONEGROUP_ADD || opt_cmd == OPT_ZONEGROUP_CREATE || opt_cmd == OPT_ZONEGROUP_DELETE || - opt_cmd == OPT_ZONEGROUP_GET || opt_cmd == OPT_ZONEGROUP_LIST || + opt_cmd == OPT_ZONEGROUP_GET || opt_cmd == OPT_ZONEGROUP_LIST || opt_cmd == OPT_ZONEGROUP_SET || opt_cmd == OPT_ZONEGROUP_DEFAULT || opt_cmd == OPT_ZONEGROUP_RENAME || opt_cmd == OPT_ZONEGROUP_MODIFY || opt_cmd == OPT_ZONEGROUP_REMOVE || @@ -2459,7 +2470,7 @@ int main(int argc, char **argv) if (raw_storage_op) { store = RGWStoreManager::get_raw_storage(g_ceph_context); } else { - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); } if (!store) { cerr << "couldn't init storage provider" << std::endl; @@ -2720,7 +2731,7 @@ int main(int argc, char **argv) if (ret < 0) { cerr << "list periods failed: " << cpp_strerror(-ret) << std::endl; return -ret; - } + } formatter->open_object_section("realm_periods_list"); encode_json("current_period", period_id, formatter); encode_json("periods", periods, formatter); @@ -3203,7 +3214,7 @@ int main(int argc, char **argv) cerr << "failed to read zonegroupmap info: " << cpp_strerror(ret); return ret; } - + encode_json("zonegroup-map", zonegroupmap, formatter); formatter->flush(cout); } @@ -3977,7 +3988,7 @@ int main(int argc, char **argv) list_op.params.ns = ns; list_op.params.enforce_ns = false; list_op.params.list_versions = true; - + do { ret = list_op.list_objects(max_entries - count, &result, &common_prefixes, &truncated); if (ret < 0) { @@ -4088,7 +4099,7 @@ int main(int argc, char **argv) formatter->open_object_section("log"); struct rgw_log_entry entry; - + // peek at first entry to get bucket metadata r = store->log_show_next(h, &entry); if (r < 0) { @@ -4155,7 +4166,7 @@ next: } } } - + if (opt_cmd == OPT_POOL_ADD) { if (pool_name.empty()) { cerr << "need to specify pool to add!" << std::endl; @@ -4203,7 +4214,7 @@ next: uint64_t end_epoch = (uint64_t)-1; int ret; - + if (!start_date.empty()) { ret = utime_t::parse_date(start_date, &start_epoch, NULL); if (ret < 0) { @@ -4260,7 +4271,7 @@ next: if (ret < 0) { cerr << "ERROR: read_usage() returned ret=" << ret << std::endl; return 1; - } + } } if (opt_cmd == OPT_OLH_GET || opt_cmd == OPT_OLH_READLOG) { @@ -4704,6 +4715,39 @@ next: } } + if (opt_cmd == OPT_LC_LIST) { + formatter->open_array_section("life cycle progress"); + map bucket_lc_map; + string marker; +#define MAX_LC_LIST_ENTRIES 100 + do { + int ret = store->list_lc_progress(marker, max_entries, &bucket_lc_map); + if (ret < 0) { + cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl; + return 1; + } + map::iterator iter; + for (iter = bucket_lc_map.begin(); iter != bucket_lc_map.end(); ++iter) { + formatter->open_object_section("bucket_lc_info"); + formatter->dump_string("bucket", iter->first); + string lc_status = LC_STATUS[iter->second]; + formatter->dump_string("status", lc_status); + formatter->close_section(); // objs + formatter->flush(cout); + marker = iter->first; + } + } while (!bucket_lc_map.empty()); + } + + + if (opt_cmd == OPT_LC_PROCESS) { + int ret = store->process_lc(); + if (ret < 0) { + cerr << "ERROR: lc processing returned error: " << cpp_strerror(-ret) << std::endl; + return 1; + } + } + if (opt_cmd == OPT_ORPHANS_FIND) { RGWOrphanSearch search(store, max_concurrent_ios, orphan_stale_secs); @@ -4913,7 +4957,7 @@ next: list entries; - meta_log->init_list_entries(i, start_time.to_real_time(), end_time.to_real_time(), marker, &handle); + meta_log->init_list_entries(i, start_time.to_real_time(), end_time.to_real_time(), marker, &handle); bool truncated; do { int ret = meta_log->list_entries(handle, 1000, entries, NULL, &truncated); @@ -4934,7 +4978,7 @@ next: if (specified_shard_id) break; } - + formatter->close_section(); formatter->flush(cout); @@ -4964,7 +5008,7 @@ next: if (specified_shard_id) break; } - + formatter->close_section(); formatter->flush(cout); @@ -5454,7 +5498,7 @@ next: formatter->close_section(); formatter->flush(cout); } - + if (opt_cmd == OPT_DATALOG_STATUS) { RGWDataChangesLog *log = store->data_log; int i = (specified_shard_id ? shard_id : 0); @@ -5475,7 +5519,7 @@ next: formatter->close_section(); formatter->flush(cout); } - + if (opt_cmd == OPT_DATALOG_TRIM) { utime_t start_time, end_time; diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index 26d5aa7739002..f483fd12aa89a 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -746,6 +746,7 @@ void RGWHTTPArgs::append(const string& name, const string& val) (name.compare("location") == 0) || (name.compare("logging") == 0) || (name.compare("usage") == 0) || + (name.compare("lifecycle") == 0) || (name.compare("delete") == 0) || (name.compare("uploads") == 0) || (name.compare("partNumber") == 0) || diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 792f1ecbc2cc3..8f551cdaa3025 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -61,6 +61,7 @@ using ceph::crypto::MD5; #define RGW_SYS_PARAM_PREFIX "rgwx-" #define RGW_ATTR_ACL RGW_ATTR_PREFIX "acl" +#define RGW_ATTR_LC RGW_ATTR_PREFIX "lc" #define RGW_ATTR_CORS RGW_ATTR_PREFIX "cors" #define RGW_ATTR_ETAG RGW_ATTR_PREFIX "etag" #define RGW_ATTR_BUCKETS RGW_ATTR_PREFIX "buckets" diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc new file mode 100644 index 0000000000000..3d110e829febb --- /dev/null +++ b/src/rgw/rgw_lc.cc @@ -0,0 +1,546 @@ +#include +#include +#include + +#include "include/types.h" + +#include "common/Formatter.h" +#include +#include "auth/Crypto.h" +#include "include/rados/librados.hpp" +#include "cls/rgw/cls_rgw_client.h" +#include "cls/refcount/cls_refcount_client.h" +#include "cls/lock/cls_lock_client.h" +#include +#include "rgw_common.h" +#include "rgw_bucket.h" +#include "rgw_lc.h" +#include "rgw_lc_s3.h" + + + +#define dout_subsys ceph_subsys_rgw + +const char* LC_STATUS[] = { + "UNINITIAL", + "PROCESSING", + "FAILED", + "COMPLETE" +}; + +using namespace std; +using namespace librados; +void RGWLifecycleConfiguration::add_rule(LCRule *rule) +{ + string id; + rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups + rule_map.insert(pair(id, *rule)); + _add_rule(rule); +} + +void RGWLifecycleConfiguration::_add_rule(LCRule *rule) +{ + string prefix; + LCExpiration expiration; + int days; + if (!rule->get_prefix(prefix)) { + ldout(cct, 5) << "ERROR: rule->get_prefix() failed" << dendl; + } + if (!rule->get_expiration(expiration)) { + ldout(cct, 5) << "ERROR: rule->get_expiration() failed" << dendl; + } + if (!expiration.get_days(&days)) { + ldout(cct, 5) << "ERROR: expiration->get_days() failed" << dendl; + } + prefix_map[prefix] = days; +} + +void *RGWLC::LCWorker::entry() { + do { + utime_t start = ceph_clock_now(cct); + if (should_work(start)) { + dout(5) << "life cycle: start" << dendl; + int r = lc->process(); + if (r < 0) { + dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl; + } + dout(5) << "life cycle: stop" << dendl; + } + if (lc->going_down()) + break; + + utime_t end = ceph_clock_now(cct); + int secs = shedule_next_start_time(end); + time_t next_time = end + secs; + char buf[30]; + char *nt = ctime_r(&next_time, buf); + dout(5) << "shedule life cycle next start time: " << nt <going_down()); + + return NULL; +} + +void RGWLC::initialize(CephContext *_cct, RGWRados *_store) { + cct = _cct; + store = _store; + max_objs = cct->_conf->rgw_lc_max_objs; + if (max_objs > HASH_PRIME) + max_objs = HASH_PRIME; + + obj_names = new string[max_objs]; + + for (int i = 0; i < max_objs; i++) { + obj_names[i] = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", i); + obj_names[i].append(buf); + } +} + +void RGWLC::finalize() +{ + delete[] obj_names; +} + +bool RGWLC::if_already_run_today(time_t& start_date) +{ + struct tm bdt; + time_t begin_of_day; + utime_t now = ceph_clock_now(cct); + localtime_r(&start_date, &bdt); + bdt.tm_hour = 0; + bdt.tm_min = 0; + bdt.tm_sec = 0; + begin_of_day = mktime(&bdt); + if (now - begin_of_day < 24*60*60) + return true; + else + return false; +} + +static std::vector &split(const std::string &s, char delim, std::vector &elems) { + std::stringstream ss(s); + std::string item; + while (std::getline(ss, item, delim)) { + elems.push_back(item); + } + return elems; +} + +static std::vector split(const std::string &s, char delim) { + std::vector elems; + split(s, delim, elems); + return elems; +} + +int RGWLC::bucket_lc_prepare(int index) +{ + map entries; + + string marker; + +#define MAX_LC_LIST_ENTRIES 100 + do { + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries); + if (ret < 0) + return ret; + map::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + pair entry(iter->first, lc_uninitial); + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl; + break; + } + marker = iter->first; + } + } while (!entries.empty()); + + return 0; +} + +int RGWLC::bucket_lc_process(string& shard_id) +{ + RGWLifecycleConfiguration config(cct); + RGWBucketInfo bucket_info; + map bucket_attrs; + string prefix, delimiter, marker, next_marker, no_ns, end_marker, list_versions; + bool is_truncated; + bool default_config = false; + int default_days = 0; + vector objs; + RGWObjectCtx obj_ctx(store); + map common_prefixes; + vector result; + result = split(shard_id, ':'); + string bucket_tenant = result[0]; + string bucket_name = result[1]; + string bucket_id = result[2]; + int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs); + if (ret < 0) { + ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return 0; + + bufferlist::iterator iter(&aiter->second); + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl; + return -1; + } + + map& prefix_map = config.get_prefix_map(); + for(map::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) { + if (prefix_iter->first.empty()) { + default_config = true; + default_days = prefix_iter->second; + continue; + } + } + + if (default_config) { + do { + + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); + if (ret < 0) { + if (ret == -ENOENT) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <::iterator obj_iter; + int pos = 0; + utime_t now = ceph_clock_now(cct); + for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) { + bool prefix_match = false; + int match_days = 0; + map& prefix_map = config.get_prefix_map(); + + for(map::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) { + if (prefix_iter->first.empty()) { + continue; + } + pos = (*obj_iter).key.name.find(prefix_iter->first, 0); + if (pos != 0) { + continue; + } + prefix_match = true; + match_days = prefix_iter->second; + break; + } + int days = 0; + if (prefix_match) { + days = match_days; + } else if (default_config) { + days = default_days; + } else { + continue; + } + if (now - ceph::real_clock::to_time_t((*obj_iter).mtime) >= days*24*60*60) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, obj, &state, false); + if (ret < 0) { + return ret; + } + if (state->mtime != (*obj_iter).mtime) //Check mtime again to avoid delete a recently update object as much as possible + continue; + ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key); + if (ret < 0) { + ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl; + } else { + ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name <::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); prefix_iter++) { + if (prefix_iter->first.empty()) { + continue; + } + list_op.params.prefix = prefix_iter->first; + + do { + + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, &common_prefixes, &is_truncated); + + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldout(cct, 0) << "ERROR: store->list_objects():" <::iterator obj_iter; + int days = prefix_iter->second; + utime_t now = ceph_clock_now(cct); + + for (obj_iter = objs.begin(); obj_iter != objs.end(); obj_iter++) { + if (now - ceph::real_clock::to_time_t((*obj_iter).mtime) >= days*24*60*60) { + RGWObjectCtx rctx(store); + rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name); + RGWObjState *state; + int ret = store->get_obj_state(&rctx, obj, &state, false); + if (ret < 0) { + return ret; + } + if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible + continue; + ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key); + if (ret < 0) { + ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl; + } else { + ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl; + } + } + } + } while (is_truncated); + } + } + + return ret; +} + +int RGWLC::bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, + pair& entry, int& result) +{ + rados::cls::lock::Lock l(lc_index_lock_name); + do { + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another lc processor */ + dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl; + sleep(10); + continue; + } + if (ret < 0) + return 0; + dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl; + if (result == -ENOENT) { + ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl; + goto clean; + } + } else if (result < 0) { + entry.second = lc_failed; + } else { + entry.second = lc_complete; + } + + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl; + } +clean: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl; + return 0; + } while (true); +} + +int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map) +{ + int index = 0; + progress_map->clear(); + for(; index entries; + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries); + if (ret < 0) + return ret; + map::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + progress_map->insert(*iter); + } + } + return 0; +} + +int RGWLC::process() +{ + int max_secs = cct->_conf->rgw_lc_lock_max_time; + + unsigned start; + int ret = get_random_bytes((char *)&start, sizeof(start)); + if (ret < 0) + return ret; + + for (int i = 0; i < max_objs; i++) { + int index = (i + start) % max_objs; + ret = process(index, max_secs); + if (ret < 0) + return ret; + } + + return 0; +} + +int RGWLC::process(int index, int max_lock_secs) +{ + rados::cls::lock::Lock l(lc_index_lock_name); + do { + utime_t now = ceph_clock_now(g_ceph_context); + pair entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + if (max_lock_secs <= 0) + return -EAGAIN; + + utime_t time(max_lock_secs, 0); + l.set_duration(time); + + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY) { /* already locked by another lc processor */ + dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl; + sleep(10); + continue; + } + if (ret < 0) + return 0; + + string marker; + cls_rgw_lc_obj_head head; + ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl; + goto exit; + } + + if(!if_already_run_today(head.start_date)) { + head.start_date = now; + head.marker.clear(); + ret = bucket_lc_prepare(index); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl; + goto exit; + } + } + + ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl; + goto exit; + } + + if (entry.first.empty()) + goto exit; + + entry.second = lc_processing; + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl; + goto exit; + } + + head.marker = entry.first; + ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl; + goto exit; + } + l.unlock(&store->lc_pool_ctx, obj_names[index]); + ret = bucket_lc_process(entry.first); + ret = bucket_lc_post(index, max_lock_secs, head, entry, ret); + return 0; +exit: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + return 0; + + }while(1); + +} + +void RGWLC::start_processor() +{ + worker = new LCWorker(cct, this); + worker->create("lifecycle_thread"); +} + +void RGWLC::stop_processor() +{ + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + +void RGWLC::LCWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + +bool RGWLC::going_down() +{ + return false; +} + +bool RGWLC::LCWorker::should_work(utime_t& now) +{ + int start_hour; + int start_minite; + int end_hour; + int end_minite; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite); + struct tm bdt; + time_t tt = now.sec(); + localtime_r(&tt, &bdt); + if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minite)|| + (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minite)) { + return true; + } else { + return false; + } + +} + +int RGWLC::LCWorker::shedule_next_start_time(utime_t& now) +{ + int start_hour; + int start_minite; + int end_hour; + int end_minite; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minite, &end_hour, &end_minite); + struct tm bdt; + time_t tt = now.sec(); + time_t nt; + localtime_r(&tt, &bdt); + bdt.tm_hour = start_hour; + bdt.tm_min = start_minite; + bdt.tm_sec = 0; + nt = mktime(&bdt); + return (nt+24*60*60 - tt); +} + diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h new file mode 100644 index 0000000000000..cdc5ff2f6badf --- /dev/null +++ b/src/rgw/rgw_lc.h @@ -0,0 +1,227 @@ +#ifndef CEPH_RGW_LC_H +#define CEPH_RGW_LC_H + +#include +#include +#include +#include + +#include "common/debug.h" + +#include "include/types.h" +#include "include/atomic.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_common.h" +#include "rgw_rados.h" +#include "cls/rgw/cls_rgw_types.h" + +using namespace std; +#define HASH_PRIME 7877 +static string lc_oid_prefix = "lc"; +static string lc_index_lock_name = "lc_process"; + +extern const char* LC_STATUS[]; + +typedef enum { + lc_uninitial = 0, + lc_processing, + lc_failed, + lc_complete, +}LC_BUCKET_STATUS; + +class LCExpiration +{ +protected: + string days; +public: + LCExpiration() {} + ~LCExpiration() {} + + void encode(bufferlist& bl) const { + ENCODE_START(2, 2, bl); + ::encode(days, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); + ::decode(days, bl); + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; +// static void generate_test_instances(list& o); + void set_days(const string& _days) { days = _days; } + bool get_days(int* _days) {*_days = atoi(days.c_str()); return true; } +}; +WRITE_CLASS_ENCODER(LCExpiration) + +class LCRule +{ +protected: + string id; + string prefix; + string status; + LCExpiration expiration; + +public: + + LCRule(){}; + ~LCRule(){}; + + bool get_id(string& _id) { + _id = id; + return true; + } + + bool get_status(string& _status) { + _status = status; + return true; + } + + bool get_prefix(string& _prefix) { + _prefix = prefix; + return true; + } + + bool get_expiration(LCExpiration& _expriation) { + _expriation = expiration; + return true; + } + + void set_id(string*_id) { + id = *_id; + } + + void set_prefix(string*_prefix) { + prefix = *_prefix; + } + + void set_status(string*_status) { + status = *_status; + } + + void set_expiration(LCExpiration*_expiration) { + expiration = *_expiration; + } + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(id, bl); + ::encode(prefix, bl); + ::encode(status, bl); + ::encode(expiration, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl); + ::decode(id, bl); + ::decode(prefix, bl); + ::decode(status, bl); + ::decode(expiration, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(LCRule) + +class RGWLifecycleConfiguration +{ +protected: + CephContext *cct; + map prefix_map; + multimap rule_map; + void _add_rule(LCRule *rule); +public: + RGWLifecycleConfiguration(CephContext *_cct) : cct(_cct) {} + RGWLifecycleConfiguration() : cct(NULL) {} + + void set_ctx(CephContext *ctx) { + cct = ctx; + } + + virtual ~RGWLifecycleConfiguration() {} + +// int get_perm(string& id, int perm_mask); +// int get_group_perm(ACLGroupTypeEnum group, int perm_mask); + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(rule_map, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& bl) { + DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl); + ::decode(rule_map, bl); + multimap::iterator iter; + for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) { + LCRule& rule = iter->second; + _add_rule(&rule); + } + DECODE_FINISH(bl); + } + void dump(Formatter *f) const; +// static void generate_test_instances(list& o); + + void add_rule(LCRule* rule); + + multimap& get_rule_map() { return rule_map; } + map& get_prefix_map() { return prefix_map; } +/* + void create_default(string id, string name) { + ACLGrant grant; + grant.set_canon(id, name, RGW_PERM_FULL_CONTROL); + add_grant(&grant); + } +*/ +}; +WRITE_CLASS_ENCODER(RGWLifecycleConfiguration) + +class RGWLC { + CephContext *cct; + RGWRados *store; + int max_objs; + string *obj_names; + + class LCWorker : public Thread { + CephContext *cct; + RGWLC *lc; + Mutex lock; + Cond cond; + + public: + LCWorker(CephContext *_cct, RGWLC *_lc) : cct(_cct), lc(_lc), lock("LCWorker") {} + void *entry(); + void stop(); + bool should_work(utime_t& now); + int shedule_next_start_time(utime_t& now); + }; + + public: + LCWorker *worker; +public: + RGWLC() : cct(NULL), store(NULL), worker(NULL) {} + ~RGWLC() { + stop_processor(); + finalize(); + } + + void initialize(CephContext *_cct, RGWRados *_store); + void finalize(); + + int process(); + int process(int index, int max_secs); + bool if_already_run_today(time_t& start_date); + int list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map); + int bucket_lc_prepare(int index); + int bucket_lc_process(string& shard_id); + int bucket_lc_post(int index, int max_lock_sec, cls_rgw_lc_obj_head& head, + pair& entry, int& result); + bool going_down(); + void start_processor(); + void stop_processor(); +}; + + + +#endif diff --git a/src/rgw/rgw_lc_s3.cc b/src/rgw/rgw_lc_s3.cc new file mode 100644 index 0000000000000..cee710bb9a746 --- /dev/null +++ b/src/rgw/rgw_lc_s3.cc @@ -0,0 +1,112 @@ +#include + +#include +#include + +#include "include/types.h" + +#include "rgw_lc_s3.h" + + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +bool LCExpiration_S3::xml_end(const char * el) { + LCDays_S3 *lc_days = static_cast(find_first("Days")); + + // ID is mandatory + if (!lc_days) + return false; + days = lc_days->get_data(); + return true; +} + +bool RGWLifecycleConfiguration_S3::xml_end(const char *el) { + XMLObjIter iter = find("Rule"); + LCRule_S3 *rule = static_cast(iter.get_next()); + while (rule) { + add_rule(rule); + rule = static_cast(iter.get_next()); + } + return true; +} + +bool LCRule_S3::xml_end(const char *el) { + LCID_S3 *lc_id; + LCPrefix_S3 *lc_prefix; + LCStatus_S3 *lc_status; + LCExpiration_S3 *lc_expiration; + + id.clear(); + prefix.clear(); + status.clear(); + + lc_id = static_cast(find_first("ID")); + if (!lc_id) + return false; + id = lc_id->get_data(); + + lc_prefix = static_cast(find_first("Prefix")); + if (!lc_prefix) + return false; + prefix = lc_prefix->get_data(); + + lc_status = static_cast(find_first("Status")); + if (!lc_status) + return false; + status = lc_status->get_data(); + + lc_expiration = static_cast(find_first("Expiration")); + if (!lc_expiration) + return false; + expiration = *lc_expiration; + + return true; +} + +void LCRule_S3::to_xml(CephContext *cct, ostream& out) { + LCExpiration_S3& expir = static_cast(expiration); + out << "" ; + out << "" << id << ""; + out << "" << prefix << ""; + out << "" << status << ""; + expir.to_xml(out); + out << ""; +} + +int RGWLifecycleConfiguration_S3::rebuild(RGWRados *store, RGWLifecycleConfiguration& dest) +{ + multimap::iterator iter; + for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) { + LCRule& src_rule = iter->second; + bool rule_ok = true; + + if (rule_ok) { + dest.add_rule(&src_rule); + } + } + + return 0; +} + +XMLObj *RGWLCXMLParser_S3::alloc_obj(const char *el) +{ + XMLObj * obj = NULL; + if (strcmp(el, "LifecycleConfiguration") == 0) { + obj = new RGWLifecycleConfiguration_S3(cct); + } else if (strcmp(el, "Rule") == 0) { + obj = new LCRule_S3(); + } else if (strcmp(el, "ID") == 0) { + obj = new LCID_S3(); + } else if (strcmp(el, "Prefix") == 0) { + obj = new LCPrefix_S3(); + } else if (strcmp(el, "Status") == 0) { + obj = new LCStatus_S3(); + } else if (strcmp(el, "Expiration") == 0) { + obj = new LCExpiration_S3(); + } else if (strcmp(el, "Days") == 0) { + obj = new LCDays_S3(); + } + return obj; +} diff --git a/src/rgw/rgw_lc_s3.h b/src/rgw/rgw_lc_s3.h new file mode 100644 index 0000000000000..1de47d5fb60fb --- /dev/null +++ b/src/rgw/rgw_lc_s3.h @@ -0,0 +1,104 @@ +#ifndef CEPH_RGW_LC_S3_H +#define CEPH_RGW_LC_S3_H + +#include +#include +#include +#include + +#include + +#include "include/str_list.h" +#include "rgw_lc.h" +#include "rgw_xml.h" + + + +using namespace std; + +class LCRule_S3 : public LCRule, public XMLObj +{ +public: + LCRule_S3() {} + ~LCRule_S3() {} + + void to_xml(CephContext *cct, ostream& out); + bool xml_end(const char *el); + bool xml_start(const char *el, const char **attr); +}; + +class LCID_S3 : public XMLObj +{ +public: + LCID_S3() {} + ~LCID_S3() {} + string& to_str() { return data; } +}; + +class LCPrefix_S3 : public XMLObj +{ +public: + LCPrefix_S3() {} + ~LCPrefix_S3() {} + string& to_str() { return data; } +}; + +class LCStatus_S3 : public XMLObj +{ +public: + LCStatus_S3() {} + ~LCStatus_S3() {} + string& to_str() { return data; } +}; + +class LCDays_S3 : public XMLObj +{ +public: + LCDays_S3() {} + ~LCDays_S3() {} + string& to_str() { return data; } +}; + +class LCExpiration_S3 : public LCExpiration, public XMLObj +{ +public: + LCExpiration_S3() {} + ~LCExpiration_S3() {} + + bool xml_end(const char *el); + void to_xml(ostream& out) { + out << "" << "" << days << ""<< ""; + } +}; + +class RGWLCXMLParser_S3 : public RGWXMLParser +{ + CephContext *cct; + + XMLObj *alloc_obj(const char *el); +public: + RGWLCXMLParser_S3(CephContext *_cct) : cct(_cct) {} +}; + +class RGWLifecycleConfiguration_S3 : public RGWLifecycleConfiguration, public XMLObj +{ +public: + RGWLifecycleConfiguration_S3(CephContext *_cct) : RGWLifecycleConfiguration(_cct) {} + ~RGWLifecycleConfiguration_S3() {} + + bool xml_end(const char *el); + + void to_xml(ostream& out) { + out << ""; + multimap::iterator iter; + for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) { + LCRule_S3& rule = static_cast(iter->second); + rule.to_xml(cct, out); + } + out << ""; + } + int rebuild(RGWRados *store, RGWLifecycleConfiguration& dest); +}; + + +#endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 85c044ae27536..323a5e7a01d3f 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -302,7 +302,7 @@ int main(int argc, const char **argv) FCGX_Init(); RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, - g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_quota_threads, + g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads, g_conf->rgw_run_sync_thread); if (!store) { mutex.Lock(); diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index f044db7f875e2..97a17bd3e280a 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -78,7 +78,7 @@ int main(const int argc, const char **argv) common_init_finish(g_ceph_context); - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); if (!store) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 6ae7627021cda..6bd66a03e8650 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -27,15 +27,22 @@ #include "rgw_cors_s3.h" #include "rgw_rest_conn.h" #include "rgw_rest_s3.h" +#include "rgw_lc.h" +#include "rgw_lc_s3.h" #include "rgw_client_io.h" +#include "cls/lock/cls_lock_client.h" +#include "cls/rgw/cls_rgw_client.h" + #include "include/assert.h" #define dout_subsys ceph_subsys_rgw using namespace std; +using namespace librados; using ceph::crypto::MD5; + static string mp_ns = RGW_OBJ_NS_MULTIPART; static string shadow_ns = RGW_OBJ_NS_SHADOW; @@ -3662,11 +3669,41 @@ int RGWPutACLs::verify_permission() return 0; } +int RGWPutLC::verify_permission() +{ + bool perm; + perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP); + if (!perm) + return -EACCES; + + return 0; +} + +int RGWDeleteLC::verify_permission() +{ + bool perm; + perm = verify_bucket_permission(s, RGW_PERM_WRITE_ACP); + if (!perm) + return -EACCES; + + return 0; +} + void RGWPutACLs::pre_exec() { rgw_bucket_object_pre_exec(s); } +void RGWPutLC::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWDeleteLC::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + void RGWPutACLs::execute() { bufferlist bl; @@ -3758,6 +3795,152 @@ void RGWPutACLs::execute() } } +static void get_lc_oid(struct req_state *s, string& oid) +{ + string shard_id = s->bucket.name + ':' +s->bucket.bucket_id; + int max_objs = (s->cct->_conf->rgw_lc_max_objs > HASH_PRIME)?HASH_PRIME:s->cct->_conf->rgw_lc_max_objs; + int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs; + oid = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", index); + oid.append(buf); + return; +} +void RGWPutLC::execute() +{ + bufferlist bl; + + RGWLifecycleConfiguration_S3 *config = NULL; + RGWLCXMLParser_S3 parser(s->cct); + RGWLifecycleConfiguration_S3 new_config(s->cct); + ret = 0; + + if (!parser.init()) { + ret = -EINVAL; + return; + } + + ret = get_params(); + if (ret < 0) + return; + + ldout(s->cct, 15) << "read len=" << len << " data=" << (data ? data : "") << dendl; + + if (!parser.parse(data, len, 1)) { + ret = -EACCES; + return; + } + config = static_cast(parser.find_first("LifecycleConfiguration")); + if (!config) { + ret = -EINVAL; + return; + } + + if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) { + ldout(s->cct, 15) << "Old LifecycleConfiguration"; + config->to_xml(*_dout); + *_dout << dendl; + } + + ret = config->rebuild(store, new_config); + if (ret < 0) + return; + + if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 15)) { + ldout(s->cct, 15) << "New LifecycleConfiguration:"; + new_config.to_xml(*_dout); + *_dout << dendl; + } + + new_config.encode(bl); + map attrs; + attrs = s->bucket_attrs; + attrs[RGW_ATTR_LC] = bl; + ret =rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker); + if (ret < 0) + return; + string shard_id = s->bucket.tenant + ':' + s->bucket.name + ':' + s->bucket.bucket_id; + string oid; + get_lc_oid(s, oid); + pair entry(shard_id, lc_uninitial); + int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time; + rados::cls::lock::Lock l(lc_index_lock_name); + utime_t time(max_lock_secs, 0); + l.set_duration(time); + librados::IoCtx *ctx = store->get_lc_pool_ctx(); + do { + ret = l.lock_exclusive(ctx, oid); + if (ret == -EBUSY) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl; + sleep(5); + continue; + } + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl; + break; + } + ret = cls_rgw_lc_set_entry(*ctx, oid, entry); + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl; + } + break; + }while(1); + l.unlock(ctx, oid); + return; +} + +void RGWDeleteLC::execute() +{ + bufferlist bl; + map orig_attrs, attrs; + map::iterator iter; + rgw_obj obj; + store->get_bucket_instance_obj(s->bucket, obj); + store->set_atomic(s->obj_ctx, obj); + ret = get_system_obj_attrs(store, s, obj, orig_attrs, NULL, &s->bucket_info.objv_tracker); + if (op_ret < 0) + return; + + for (iter = orig_attrs.begin(); iter != orig_attrs.end(); ++iter) { + const string& name = iter->first; + dout(10) << "DeleteLC : attr: " << name << dendl; + if (name.compare(0, (sizeof(RGW_ATTR_LC) - 1), RGW_ATTR_LC) != 0) { + if (attrs.find(name) == attrs.end()) { + attrs[name] = iter->second; + } + } + } + ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs, &s->bucket_info.objv_tracker); + string shard_id = s->bucket.name + ':' +s->bucket.bucket_id; + pair entry(shard_id, lc_uninitial); + string oid; + get_lc_oid(s, oid); + int max_lock_secs = s->cct->_conf->rgw_lc_lock_max_time; + librados::IoCtx *ctx = store->get_lc_pool_ctx(); + rados::cls::lock::Lock l(lc_index_lock_name); + utime_t time(max_lock_secs, 0); + l.set_duration(time); + do { + ret = l.lock_exclusive(ctx, oid); + if (ret == -EBUSY) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid << dendl; + sleep(5); + continue; + } + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid << ret << dendl; + break; + } + ret = cls_rgw_lc_rm_entry(*ctx, oid, entry); + if (ret < 0) { + dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid << ret << dendl; + } + break; + }while(1); + l.unlock(ctx, oid); + return; +} + int RGWGetCORS::verify_permission() { if (false == s->auth_identity->is_owner_of(s->bucket_owner.get_id())) { diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index eeb633fd4e7b0..1fbb34832431e 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1005,6 +1005,58 @@ public: virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } }; +class RGWPutLC : public RGWOp { +protected: + int ret; + size_t len; + char *data; + +public: + RGWPutLC() { + ret = 0; + len = 0; + data = NULL; + } + virtual ~RGWPutLC() { + free(data); + } + + int verify_permission(); + void pre_exec(); + void execute(); + +// virtual int get_policy_from_state(RGWRados *store, struct req_state *s, stringstream& ss) { return 0; } + virtual int get_params() = 0; + virtual void send_response() = 0; + virtual const string name() { return "put_lifecycle"; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } +}; + +class RGWDeleteLC : public RGWOp { +protected: + int ret; + size_t len; + char *data; + +public: + RGWDeleteLC() { + ret = 0; + len = 0; + data = NULL; + } + virtual ~RGWDeleteLC() { + free(data); + } + + int verify_permission(); + void pre_exec(); + void execute(); + + virtual void send_response() = 0; + virtual const string name() { return "delete_lifecycle"; } + virtual uint32_t op_mask() { return RGW_OP_TYPE_WRITE; } +}; + class RGWGetCORS : public RGWOp { protected: diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index b38ff7cdb0e39..0abfa6306e830 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -22,6 +22,8 @@ #include "rgw_cache.h" #include "rgw_acl.h" #include "rgw_acl_s3.h" /* for dumping s3policy in debug log */ +#include "rgw_lc.h" +#include "rgw_lc_s3.h" #include "rgw_metadata.h" #include "rgw_bucket.h" #include "rgw_rest_conn.h" @@ -58,6 +60,8 @@ using namespace librados; #include "rgw_log.h" #include "rgw_gc.h" +#include "rgw_lc.h" + #include "rgw_object_expirer_core.h" #include "rgw_sync.h" #include "rgw_data_sync.h" @@ -1486,6 +1490,7 @@ int RGWZoneParams::fix_pool_names() metadata_heap = fix_zone_pool_name(pool_names, name, ".rgw.meta", metadata_heap.name); control_pool = fix_zone_pool_name(pool_names, name, ".rgw.control", control_pool.name); gc_pool = fix_zone_pool_name(pool_names, name ,".rgw.gc", gc_pool.name); + lc_pool = fix_zone_pool_name(pool_names, name ,".rgw.lc", lc_pool.name); log_pool = fix_zone_pool_name(pool_names, name, ".rgw.log", log_pool.name); intent_log_pool = fix_zone_pool_name(pool_names, name, ".rgw.intent-log", intent_log_pool.name); usage_log_pool = fix_zone_pool_name(pool_names, name, ".rgw.usage", usage_log_pool.name); @@ -3733,6 +3738,10 @@ int RGWRados::init_complete() if (ret < 0) return ret; + ret = open_lc_pool_ctx(); + if (ret < 0) + return ret; + ret = open_objexp_pool_ctx(); if (ret < 0) return ret; @@ -3797,6 +3806,12 @@ int RGWRados::init_complete() data_notifier = new RGWDataNotifier(this); data_notifier->start(); + lc = new RGWLC(); + lc->initialize(cct, this); + + if (use_lc_thread) + lc->start_processor(); + quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : @@ -3983,6 +3998,24 @@ int RGWRados::open_gc_pool_ctx() return r; } +int RGWRados::open_lc_pool_ctx() +{ + const char *lc_pool = get_zone_params().lc_pool.name.c_str(); + librados::Rados *rad = get_rados_handle(); + int r = rad->ioctx_create(lc_pool, lc_pool_ctx); + if (r == -ENOENT) { + r = rad->pool_create(lc_pool); + if (r == -EEXIST) + r = 0; + if (r < 0) + return r; + + r = rad->ioctx_create(lc_pool, lc_pool_ctx); + } + + return r; +} + int RGWRados::open_objexp_pool_ctx() { const char * const pool_name = get_zone_params().log_pool.name.c_str(); @@ -11232,6 +11265,16 @@ int RGWRados::process_gc() return gc->process(); } +int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map) +{ + return lc->list_lc_progress(marker, max_entries, progress_map); +} + +int RGWRados::process_lc() +{ + return lc->process(); +} + int RGWRados::process_expire_objects() { obj_expirer->inspect_all_shards(utime_t(), ceph_clock_now(cct)); @@ -12254,7 +12297,7 @@ uint64_t RGWRados::next_bucket_id() return ++max_bucket_id; } -RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread) +RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) { int use_cache = cct->_conf->rgw_cache_enabled; RGWRados *store = NULL; @@ -12264,7 +12307,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t store = new RGWCache; } - if (store->initialize(cct, use_gc_thread, quota_threads, run_sync_thread) < 0) { + if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) { delete store; return NULL; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 5b8d2ec502786..4af138f04836e 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -29,6 +29,7 @@ class ACLOwner; class RGWGC; class RGWMetaNotifier; class RGWDataNotifier; +class RGWLC; class RGWObjectExpirer; class RGWMetaSyncProcessorThread; class RGWDataSyncProcessorThread; @@ -856,6 +857,7 @@ struct RGWZoneParams : RGWSystemMetaObj { rgw_bucket metadata_heap; rgw_bucket control_pool; rgw_bucket gc_pool; + rgw_bucket lc_pool; rgw_bucket log_pool; rgw_bucket intent_log_pool; rgw_bucket usage_log_pool; @@ -897,6 +899,7 @@ struct RGWZoneParams : RGWSystemMetaObj { ::encode(domain_root, bl); ::encode(control_pool, bl); ::encode(gc_pool, bl); + ::encode(lc_pool, bl); ::encode(log_pool, bl); ::encode(intent_log_pool, bl); ::encode(usage_log_pool, bl); @@ -917,6 +920,7 @@ struct RGWZoneParams : RGWSystemMetaObj { ::decode(domain_root, bl); ::decode(control_pool, bl); ::decode(gc_pool, bl); + ::decode(lc_pool, bl); ::decode(log_pool, bl); ::decode(intent_log_pool, bl); ::decode(usage_log_pool, bl); @@ -1718,6 +1722,7 @@ class RGWRados friend class RGWGC; friend class RGWMetaNotifier; friend class RGWDataNotifier; + friend class RGWLC; friend class RGWObjectExpirer; friend class RGWMetaSyncProcessorThread; friend class RGWDataSyncProcessorThread; @@ -1727,6 +1732,7 @@ class RGWRados /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); int open_gc_pool_ctx(); + int open_lc_pool_ctx(); int open_objexp_pool_ctx(); int open_pool_ctx(const string& pool, librados::IoCtx& io_ctx); @@ -1764,8 +1770,10 @@ class RGWRados }; RGWGC *gc; + RGWLC *lc; RGWObjectExpirer *obj_expirer; bool use_gc_thread; + bool use_lc_thread; bool quota_threads; bool run_sync_thread; @@ -1826,6 +1834,7 @@ protected: tombstone_cache_t *obj_tombstone_cache; librados::IoCtx gc_pool_ctx; // .rgw.gc + librados::IoCtx lc_pool_ctx; // .rgw.lc librados::IoCtx objexp_pool_ctx; bool pools_initialized; @@ -1848,7 +1857,7 @@ protected: RGWPeriod current_period; public: RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), - gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false), + gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false), run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL), data_notifier(NULL), meta_sync_processor_thread(NULL), meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"), @@ -1872,6 +1881,9 @@ public: return max_req_id.inc(); } + librados::IoCtx* get_lc_pool_ctx() { + return &lc_pool_ctx; + } void set_context(CephContext *_cct) { cct = _cct; } @@ -1996,9 +2008,10 @@ public: CephContext *ctx() { return cct; } /** do all necessary setup of the storage device */ - int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads, bool _run_sync_thread) { + int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread) { set_context(_cct); use_gc_thread = _use_gc_thread; + use_lc_thread = _use_lc_thread; quota_threads = _quota_threads; run_sync_thread = _run_sync_thread; return initialize(); @@ -2862,6 +2875,9 @@ public: int process_expire_objects(); int defer_gc(void *ctx, rgw_obj& obj); + int process_lc(); + int list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map); + int bucket_check_index(rgw_bucket& bucket, map *existing_stats, map *calculated_stats); @@ -3023,15 +3039,15 @@ public: class RGWStoreManager { public: RGWStoreManager() {} - static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread) { - RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads, run_sync_thread); + static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) { + RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread); return store; } static RGWRados *get_raw_storage(CephContext *cct) { RGWRados *store = init_raw_storage_provider(cct); return store; } - static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread); + static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread); static RGWRados *init_raw_storage_provider(CephContext *cct); static void close_storage(RGWRados *store); diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 8f38e987ea7da..4edf347742b85 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -100,6 +100,7 @@ void RGWRealmReloader::reload() // recreate and initialize a new store store = RGWStoreManager::get_storage(cct, cct->_conf->rgw_enable_gc_threads, + cct->_conf->rgw_enable_lc_threads, cct->_conf->rgw_enable_quota_threads, cct->_conf->rgw_run_sync_thread); diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index e485d99f40ce8..77e67a768e0df 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -1149,8 +1149,31 @@ int RGWPutACLs_ObjStore::get_params() return op_ret; } -static int read_all_chunked_input(req_state *s, char **pdata, int *plen, - int max_read) +int RGWPutLC_ObjStore::get_params() +{ + size_t cl = 0; + if (s->length) + cl = atoll(s->length); + if (cl) { + data = (char *)malloc(cl + 1); + if (!data) { + ret = -ENOMEM; + return ret; + } + int read_len; + int r = STREAM_IO(s)->read(data, cl, &read_len, s->aws4_auth_needs_complete); + len = read_len; + if (r < 0) + return r; + data[len] = '\0'; + } else { + len = 0; + } + + return ret; +} + +static int read_all_chunked_input(req_state *s, char **pdata, int *plen, int max_read) { #define READ_CHUNK 4096 #define MAX_READ_CHUNK (128 * 1024) diff --git a/src/rgw/rgw_rest.h b/src/rgw/rgw_rest.h index 3e23945fb2d84..831b214df754c 100644 --- a/src/rgw/rgw_rest.h +++ b/src/rgw/rgw_rest.h @@ -278,6 +278,21 @@ public: virtual int get_params(); }; +class RGWPutLC_ObjStore : public RGWPutLC { +public: + RGWPutLC_ObjStore() {} + ~RGWPutLC_ObjStore() {} + + int get_params(); +}; + +class RGWDeleteLC_ObjStore : public RGWDeleteLC { +public: + RGWDeleteLC_ObjStore() {} + ~RGWDeleteLC_ObjStore() {} + +}; + class RGWGetCORS_ObjStore : public RGWGetCORS { public: RGWGetCORS_ObjStore() {} diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 5acba5e08255b..679d423568dfc 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -2260,6 +2260,27 @@ void RGWPutACLs_ObjStore_S3::send_response() dump_start(s); } +void RGWPutLC_ObjStore_S3::send_response() +{ + if (ret) + set_req_state_err(s, ret); + dump_errno(s); + end_header(s, this, "application/xml"); + dump_start(s); +} + +void RGWDeleteLC_ObjStore_S3::send_response() +{ + if (ret == 0) + ret = STATUS_NO_CONTENT; + if (ret) { + set_req_state_err(s, ret); + } + dump_errno(s); + end_header(s, this, "application/xml"); + dump_start(s); +} + void RGWGetCORS_ObjStore_S3::send_response() { if (op_ret) { @@ -2844,6 +2865,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put() return new RGWPutCORS_ObjStore_S3; } else if (is_request_payment_op()) { return new RGWSetRequestPayment_ObjStore_S3; + } else if(is_lc_op()) { + return new RGWPutLC_ObjStore_S3; } return new RGWCreateBucket_ObjStore_S3; } @@ -2852,6 +2875,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete() { if (is_cors_op()) { return new RGWDeleteCORS_ObjStore_S3; + } else if(is_lc_op()) { + return new RGWDeleteLC_ObjStore_S3; } if (s->info.args.sub_resource_exists("website")) { diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index cecc14c97656d..43c254e3825c6 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -257,6 +257,22 @@ public: int get_params(); }; +class RGWPutLC_ObjStore_S3 : public RGWPutLC_ObjStore { +public: + RGWPutLC_ObjStore_S3() {} + ~RGWPutLC_ObjStore_S3() {} + + void send_response(); +}; + +class RGWDeleteLC_ObjStore_S3 : public RGWDeleteLC_ObjStore { +public: + RGWDeleteLC_ObjStore_S3() {} + ~RGWDeleteLC_ObjStore_S3() {} + + void send_response(); +}; + class RGWGetCORS_ObjStore_S3 : public RGWGetCORS_ObjStore { public: RGWGetCORS_ObjStore_S3() {} @@ -497,6 +513,9 @@ protected: bool is_cors_op() { return s->info.args.exists("cors"); } + bool is_lc_op() { + return s->info.args.exists("lifecycle"); + } bool is_obj_update_op() { return is_acl_op() || is_cors_op(); } diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 217ac6ec03bd6..3a18587f876ac 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -83,6 +83,8 @@ gc list dump expired garbage collection objects (specify --include-all to list all entries, including unexpired) gc process manually process garbage + lc list list all bucket lifecycle progress + lc process manually process lifecycle metadata get get metadata info metadata put put metadata info metadata rm remove metadata info diff --git a/src/test/test_rgw_admin_opstate.cc b/src/test/test_rgw_admin_opstate.cc index 3499aa46abd19..451d6df22f2ec 100644 --- a/src/test/test_rgw_admin_opstate.cc +++ b/src/test/test_rgw_admin_opstate.cc @@ -807,7 +807,7 @@ int main(int argc, char *argv[]){ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); common_init_finish(g_ceph_context); - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); g_test = new admin_log::test_helper(); finisher = new Finisher(g_ceph_context); #ifdef GTEST -- 2.39.5