From 9adf66ba8571e1a019ce6e507d27057693f0ac21 Mon Sep 17 00:00:00 2001 From: Matt Benjamin Date: Wed, 1 Apr 2020 19:21:27 -0400 Subject: [PATCH] rgwlc: make rgwlc entries extensible and extend Add generation/run tracking to LC entries. Define the entry as a versioned structure, adapt accordingly. N.B., has extra debug prints from later commit. Signed-off-by: Matt Benjamin (cherry picked from commit 394750597656d4f3ab7b8220af7046753117d39b) Conflicts: src/cls/rgw/cls_rgw.cc src/cls/rgw/cls_rgw_client.h src/cls/rgw/cls_rgw_ops.h - adapt for Adam Emerson post-Octopus refactoring --- src/cls/rgw/cls_rgw.cc | 38 +++++++----- src/cls/rgw/cls_rgw_client.cc | 19 ++++-- src/cls/rgw/cls_rgw_client.h | 10 ++-- src/cls/rgw/cls_rgw_ops.h | 91 ++++++++++++++++++++-------- src/cls/rgw/cls_rgw_types.h | 31 ++++++++++ src/rgw/rgw_admin.cc | 24 +++++--- src/rgw/rgw_lc.cc | 110 +++++++++++++++++++++------------- src/rgw/rgw_lc.h | 7 ++- src/rgw/rgw_rados.cc | 3 +- src/rgw/rgw_rados.h | 3 +- 10 files changed, 230 insertions(+), 106 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index bdc3f78433246..bd08ed0c72d8c 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -3660,7 +3660,7 @@ static int rgw_cls_lc_get_entry(cls_method_context_t hctx, bufferlist *in, buffe return -EINVAL; } - rgw_lc_entry_t lc_entry; + cls_rgw_lc_entry lc_entry; int ret = read_omap_entry(hctx, op.marker, &lc_entry); if (ret < 0) return ret; @@ -3686,7 +3686,7 @@ static int rgw_cls_lc_set_entry(cls_method_context_t hctx, bufferlist *in, buffe bufferlist bl; encode(op.entry, bl); - int ret = cls_cxx_map_set_val(hctx, op.entry.first, &bl); + int ret = cls_cxx_map_set_val(hctx, op.entry.bucket, &bl); return ret; } @@ -3702,7 +3702,7 @@ static int rgw_cls_lc_rm_entry(cls_method_context_t hctx, bufferlist *in, buffer return -EINVAL; } - int ret = cls_cxx_map_remove_key(hctx, op.entry.first); + int ret = cls_cxx_map_remove_key(hctx, op.entry.bucket); return ret; } @@ -3725,7 +3725,7 @@ static int rgw_cls_lc_get_next_entry(cls_method_context_t hctx, bufferlist *in, if (ret < 0) return ret; map::iterator it; - pair entry; + cls_rgw_lc_entry entry; if (!vals.empty()) { it=vals.begin(); in_iter = it->second.begin(); @@ -3741,7 +3741,8 @@ static int rgw_cls_lc_get_next_entry(cls_method_context_t hctx, bufferlist *in, return 0; } -static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { cls_rgw_lc_list_entries_op op; auto in_iter = in->cbegin(); @@ -3752,24 +3753,33 @@ static int rgw_cls_lc_list_entries(cls_method_context_t hctx, bufferlist *in, bu return -EINVAL; } - cls_rgw_lc_list_entries_ret op_ret; + cls_rgw_lc_list_entries_ret op_ret(op.compat_v); bufferlist::const_iterator iter; map vals; string filter_prefix; - int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, &vals, &op_ret.is_truncated); + int ret = cls_cxx_map_get_vals(hctx, op.marker, filter_prefix, op.max_entries, + &vals, &op_ret.is_truncated); if (ret < 0) return ret; map::iterator it; - pair entry; - for (it = vals.begin(); it != vals.end(); ++it) { + for (auto it = vals.begin(); it != vals.end(); ++it) { + cls_rgw_lc_entry entry; iter = it->second.cbegin(); try { - decode(entry, iter); + decode(entry, iter); } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: rgw_cls_lc_list_entries(): failed to decode entry\n"); - return -EIO; - } - op_ret.entries.insert(entry); + /* try backward compat */ + pair oe; + try { + decode(oe, iter); + entry = {oe.first, 0 /* start */, uint32_t(oe.second)}; + } catch(buffer::error& err) { + CLS_LOG( + 1, "ERROR: rgw_cls_lc_list_entries(): failed to decode entry\n"); + } + return -EIO; + } + op_ret.entries.push_back(entry); } encode(op_ret, *out); return 0; diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 65e59047ebada..270f9aa2e2bdb 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -839,7 +839,8 @@ int cls_rgw_lc_put_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& h return r; } -int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker, pair& entry) +int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker, + cls_rgw_lc_entry& entry) { bufferlist in, out; cls_rgw_lc_get_next_entry_op call; @@ -861,7 +862,8 @@ int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker, return r; } -int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, const pair& entry) +int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, + const cls_rgw_lc_entry& entry) { bufferlist in, out; cls_rgw_lc_rm_entry_op call; @@ -871,7 +873,8 @@ int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, const pair& entry) +int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid, + const cls_rgw_lc_entry& entry) { bufferlist in, out; cls_rgw_lc_set_entry_op call; @@ -881,7 +884,8 @@ int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid, const pair& entries) + vector& entries) { bufferlist in, out; cls_rgw_lc_list_entries_op op; @@ -930,8 +934,11 @@ int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid, } catch (buffer::error& err) { return -EIO; } - entries.insert(ret.entries.begin(),ret.entries.end()); + std::sort(std::begin(ret.entries), std::end(ret.entries), + [](const cls_rgw_lc_entry& a, const cls_rgw_lc_entry& b) + { return a.bucket < b.bucket; }); + entries = std::move(ret.entries); return r; } diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index bd13710e86686..c68703c6958f7 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -607,14 +607,14 @@ int cls_rgw_gc_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32 #ifndef CLS_CLIENT_HIDE_IOCTX int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head); int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head); -int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, const string& oid, string& marker, pair& entry); -int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, const string& oid, const pair& entry); -int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, const string& oid, const pair& entry); -int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, const string& oid, const std::string& marker, rgw_lc_entry_t& entry); +int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, const string& oid, string& marker, cls_rgw_lc_entry& entry); +int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_lc_entry& entry); +int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_lc_entry& entry); +int cls_rgw_lc_get_entry(librados::IoCtx& io_ctx, const string& oid, const std::string& marker, cls_rgw_lc_entry& entry); int cls_rgw_lc_list(librados::IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max_entries, - map& entries); + vector& entries); #endif /* resharding */ diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index d752118b2fb19..785c862dd109d 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -1030,21 +1030,26 @@ struct cls_rgw_lc_get_next_entry_op { }; WRITE_CLASS_ENCODER(cls_rgw_lc_get_next_entry_op) -using rgw_lc_entry_t = std::pair; - struct cls_rgw_lc_get_next_entry_ret { - rgw_lc_entry_t entry; + cls_rgw_lc_entry entry; + cls_rgw_lc_get_next_entry_ret() {} void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 2, bl); encode(entry, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(entry, bl); + DECODE_START(2, bl); + if (struct_v < 1) { + std::pair oe; + decode(oe, bl); + entry = {oe.first, 0 /* start */, uint32_t(oe.second)}; + } else { + decode(entry, bl); + } DECODE_FINISH(bl); } @@ -1071,9 +1076,11 @@ struct cls_rgw_lc_get_entry_op { WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_op) struct cls_rgw_lc_get_entry_ret { - rgw_lc_entry_t entry; + cls_rgw_lc_entry entry; + cls_rgw_lc_get_entry_ret() {} - cls_rgw_lc_get_entry_ret(rgw_lc_entry_t&& _entry) : entry(std::move(_entry)) {} + cls_rgw_lc_get_entry_ret(cls_rgw_lc_entry&& _entry) + : entry(std::move(_entry)) {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -1090,38 +1097,49 @@ struct cls_rgw_lc_get_entry_ret { }; WRITE_CLASS_ENCODER(cls_rgw_lc_get_entry_ret) - struct cls_rgw_lc_rm_entry_op { - rgw_lc_entry_t entry; + cls_rgw_lc_entry entry; cls_rgw_lc_rm_entry_op() {} void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 2, bl); encode(entry, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(entry, bl); + DECODE_START(2, bl); + if (struct_v < 1) { + std::pair oe; + decode(oe, bl); + entry = {oe.first, 0 /* start */, uint32_t(oe.second)}; + } else { + decode(entry, bl); + } DECODE_FINISH(bl); } }; WRITE_CLASS_ENCODER(cls_rgw_lc_rm_entry_op) struct cls_rgw_lc_set_entry_op { - rgw_lc_entry_t entry; + cls_rgw_lc_entry entry; cls_rgw_lc_set_entry_op() {} void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 2, bl); encode(entry, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(entry, bl); + DECODE_START(2, bl); + if (struct_v < 1) { + std::pair oe; + decode(oe, bl); + entry = {oe.first, 0 /* start */, uint32_t(oe.second)}; + } else { + decode(entry, bl); + } DECODE_FINISH(bl); } }; @@ -1171,18 +1189,20 @@ WRITE_CLASS_ENCODER(cls_rgw_lc_get_head_ret) struct cls_rgw_lc_list_entries_op { string marker; uint32_t max_entries = 0; + uint8_t compat_v{0}; cls_rgw_lc_list_entries_op() {} void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(marker, bl); encode(max_entries, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); + compat_v = struct_v; decode(marker, bl); decode(max_entries, bl); DECODE_FINISH(bl); @@ -1192,27 +1212,46 @@ struct cls_rgw_lc_list_entries_op { WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_op) struct cls_rgw_lc_list_entries_ret { - map entries; + vector entries; bool is_truncated{false}; + uint8_t compat_v; - cls_rgw_lc_list_entries_ret() {} +cls_rgw_lc_list_entries_ret(uint8_t compat_v = 3) + : compat_v(compat_v) {} void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); - encode(entries, bl); + ENCODE_START(compat_v, 1, bl); + if (compat_v <= 2) { + map oes; + std::for_each(entries.begin(), entries.end(), + [&oes](const cls_rgw_lc_entry& elt) + {oes.insert({elt.bucket, elt.status});}); + encode(oes, bl); + } else { + encode(entries, bl); + } encode(is_truncated, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(2, bl); - decode(entries, bl); + DECODE_START(3, bl); + compat_v = struct_v; + if (struct_v <= 2) { + map oes; + decode(oes, bl); + std::for_each(oes.begin(), oes.end(), + [this](const std::pair& oe) + {entries.push_back({oe.first, 0 /* start */, + uint32_t(oe.second)});}); + } else { + decode(entries, bl); + } if (struct_v >= 2) { decode(is_truncated, bl); } DECODE_FINISH(bl); } - }; WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret) diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 0bd197ae856ad..620811dbc4a25 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -1214,6 +1214,37 @@ struct cls_rgw_lc_obj_head }; WRITE_CLASS_ENCODER(cls_rgw_lc_obj_head) +struct cls_rgw_lc_entry { + std::string bucket; + uint64_t start_time; // if in_progress + uint32_t status; + + cls_rgw_lc_entry() + : start_time(0), status(0) {} + + cls_rgw_lc_entry(const cls_rgw_lc_entry& rhs) = default; + + cls_rgw_lc_entry(const std::string& b, uint64_t t, uint32_t s) + : bucket(b), start_time(t), status(s) {}; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bucket, bl); + encode(start_time, bl); + encode(status, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(bucket, bl); + decode(start_time, bl); + decode(status, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_rgw_lc_entry); + struct cls_rgw_reshard_entry { ceph::real_time time; diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index d4f3d73442da9..84239157c7cfb 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -7224,27 +7224,35 @@ next: if (opt_cmd == OPT::LC_LIST) { formatter->open_array_section("lifecycle_list"); - map bucket_lc_map; + vector bucket_lc_map; string marker; #define MAX_LC_LIST_ENTRIES 100 if (max_entries < 0) { max_entries = MAX_LC_LIST_ENTRIES; } do { - int ret = store->getRados()->list_lc_progress(marker, max_entries, &bucket_lc_map); + int ret = store->getRados()->list_lc_progress(marker, max_entries, + bucket_lc_map); if (ret < 0) { - cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: failed to list objs: " << cpp_strerror(-ret) + << std::endl; return 1; } - map::iterator iter; - for (iter = bucket_lc_map.begin(); iter != bucket_lc_map.end(); ++iter) { + for (const auto& entry : bucket_lc_map) { formatter->open_object_section("bucket_lc_info"); - formatter->dump_string("bucket", iter->first); - string lc_status = LC_STATUS[iter->second]; + formatter->dump_string("bucket", entry.bucket); + char exp_buf[100]; + time_t t{time_t(entry.start_time)}; + if (std::strftime( + exp_buf, sizeof(exp_buf), + "%a, %d %b %Y %T %Z", std::gmtime(&t))) { + formatter->dump_string("started", exp_buf); + } + string lc_status = LC_STATUS[entry.status]; formatter->dump_string("status", lc_status); formatter->close_section(); // objs formatter->flush(cout); - marker = iter->first; + marker = entry.bucket; } } while (!bucket_lc_map.empty()); diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index dea19cbe33a7c..92840ab31e199 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -26,6 +26,7 @@ #include "rgw_zone.h" #include "rgw_string.h" #include "rgw_multi.h" +#include "rgw_sal.h" // this seems safe to use, at least for now--arguably, we should // prefer header-only fmt, in general @@ -288,8 +289,7 @@ bool RGWLC::if_already_run_today(time_t& start_date) int RGWLC::bucket_lc_prepare(int index, LCWorker* worker) { - map entries; - + vector entries; string marker; #define MAX_LC_LIST_ENTRIES 100 @@ -298,20 +298,22 @@ int RGWLC::bucket_lc_prepare(int index, LCWorker* worker) marker, MAX_LC_LIST_ENTRIES, entries); if (ret < 0) return ret; - map::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - pair entry(iter->first, lc_uninitial); + + for (auto& entry : entries) { + entry.start_time = ceph_clock_now(); + entry.status = lc_uninitial; // lc_uninitial? really? ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, - obj_names[index], entry); + obj_names[index], entry); if (ret < 0) { - ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on " - << obj_names[index] << dendl; + ldpp_dout(this, 0) + << "RGWLC::bucket_lc_prepare() failed to set entry on " + << obj_names[index] << dendl; return ret; } } - if (!entries.empty()) { - marker = std::move(entries.rbegin()->first); + if (! entries.empty()) { + marker = std::move(entries.back().bucket); } } while (!entries.empty()); @@ -1334,7 +1336,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker) } int RGWLC::bucket_lc_post(int index, int max_lock_sec, - pair& entry, int& result, + cls_rgw_lc_entry& entry, int& result, LCWorker* worker) { utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); @@ -1343,6 +1345,10 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, l.set_cookie(cookie); l.set_duration(lock_duration); + dout(5) << "RGWLC::bucket_lc_post(): POST " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + do { int ret = l.lock_exclusive( &store->getRados()->lc_pool_ctx, obj_names[index]); @@ -1364,9 +1370,9 @@ int RGWLC::bucket_lc_post(int index, int max_lock_sec, } goto clean; } else if (result < 0) { - entry.second = lc_failed; + entry.status = lc_failed; } else { - entry.second = lc_complete; + entry.status = lc_complete; } ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, @@ -1383,15 +1389,13 @@ clean: } int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, - map* progress_map) + vector& progress_map) { int index = 0; - progress_map->clear(); for(; index entries; int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, - max_entries, entries); + max_entries, progress_map); if (ret < 0) { if (ret == -ENOENT) { ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object=" @@ -1401,10 +1405,6 @@ int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, return ret; } } - map::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - progress_map->insert(*iter); - } } return 0; } @@ -1441,7 +1441,8 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) rados::cls::lock::Lock l(lc_index_lock_name); do { utime_t now = ceph_clock_now(); - pair entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + //string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + cls_rgw_lc_entry entry; if (max_lock_secs <= 0) return -EAGAIN; @@ -1468,6 +1469,18 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) goto exit; } + if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) { + ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx, + obj_names[index], head.marker, entry); + if ((entry.status == lc_processing) && + (true /* XXXX expired epoch! */)) { + dout(5) << "RGWLC::process(): ACTIVE entry: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + goto exit; + } + } + if(!if_already_run_today(head.start_date)) { head.start_date = now; head.marker.clear(); @@ -1490,32 +1503,38 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker) } /* termination condition (eof) */ - if (entry.first.empty()) + if (entry.bucket.empty()) goto exit; - entry.second = lc_processing; + ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + entry.status = lc_processing; ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, - obj_names[index], entry); + obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " - << obj_names[index] - << " (" << entry.first << "," - << entry.second << ")" - << dendl; + << obj_names[index] << entry.bucket << entry.status << dendl; goto exit; } - head.marker = entry.first; - ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index], - head); + head.marker = entry.bucket; + ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, + obj_names[index], head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] - << dendl; + << dendl; goto exit; } + + ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]); - ret = bucket_lc_process(entry.first, worker); + ret = bucket_lc_process(entry.bucket, worker); bucket_lc_post(index, max_lock_secs, entry, ret, worker); } while(1); @@ -1655,9 +1674,9 @@ static std::string get_lc_shard_name(const rgw_bucket& bucket){ } template -static int guard_lc_modify( - rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket, - const string& cookie, const F& f) { +static int guard_lc_modify(rgw::sal::RGWRadosStore* store, + const rgw_bucket& bucket, const string& cookie, + const F& f) { CephContext *cct = store->ctx(); string shard_id = get_lc_shard_name(bucket); @@ -1665,7 +1684,10 @@ static int guard_lc_modify( string oid; get_lc_oid(cct, shard_id, &oid); - pair entry(shard_id, lc_uninitial); + /* XXX it makes sense to take shard_id for a bucket_id? */ + cls_rgw_lc_entry entry; + entry.bucket = shard_id; + entry.status = lc_uninitial; int max_lock_secs = cct->_conf->rgw_lc_lock_max_time; rados::cls::lock::Lock l(lc_index_lock_name); @@ -1718,9 +1740,10 @@ int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info, rgw_bucket& bucket = bucket_info.bucket; + ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid, - const pair& entry) { + const cls_rgw_lc_entry& entry) { return cls_rgw_lc_set_entry(*ctx, oid, entry); }); @@ -1747,7 +1770,7 @@ int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info, ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid, - const pair& entry) { + const cls_rgw_lc_entry& entry) { return cls_rgw_lc_rm_entry(*ctx, oid, entry); }); @@ -1775,7 +1798,7 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, std::string lc_oid; get_lc_oid(store->ctx(), shard_name, &lc_oid); - rgw_lc_entry_t entry; + cls_rgw_lc_entry entry; // There are multiple cases we need to encounter here // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update @@ -1799,8 +1822,9 @@ int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, ret = guard_lc_modify( store, bucket_info.bucket, cookie, - [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid, - const pair& entry) { + [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx, + const string& oid, + const cls_rgw_lc_entry& entry) { return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry); }); diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index 57f02a63e81f6..793e6f90b7b87 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -467,6 +467,7 @@ public: const DoutPrefixProvider *dpp; CephContext *cct; RGWLC *lc; + int ix; ceph::mutex lock = ceph::make_mutex("LCWorker"); ceph::condition_variable cond; WorkPool* workpool{nullptr}; @@ -497,10 +498,12 @@ public: int process(LCWorker* worker); int process(int index, int max_secs, LCWorker* worker); bool if_already_run_today(time_t& start_date); - int list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map); + int list_lc_progress(const string& marker, uint32_t max_entries, + vector&); int bucket_lc_prepare(int index, LCWorker* worker); int bucket_lc_process(string& shard_id, LCWorker* worker); - int bucket_lc_post(int index, int max_lock_sec, pair& entry, int& result, LCWorker* worker); + int bucket_lc_post(int index, int max_lock_sec, + cls_rgw_lc_entry& entry, int& result, LCWorker* worker); bool going_down(); void start_processor(); void stop_processor(); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 44527c9e0fa35..1623c12f9efc4 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8035,7 +8035,8 @@ int RGWRados::process_gc(bool expired_only) return gc->process(expired_only); } -int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map) +int RGWRados::list_lc_progress(const string& marker, uint32_t max_entries, + vector& progress_map) { return lc->list_lc_progress(marker, max_entries, progress_map); } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index e19b8d44811e6..6ac58fc305792 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1444,7 +1444,8 @@ public: int defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, optional_yield y); int process_lc(); - int list_lc_progress(const string& marker, uint32_t max_entries, map *progress_map); + int list_lc_progress(const string& marker, uint32_t max_entries, + vector& progress_map); int bucket_check_index(RGWBucketInfo& bucket_info, map *existing_stats, -- 2.39.5