From 1c1569b66084754511f2933d2247d132cf6018b6 Mon Sep 17 00:00:00 2001 From: Cory Snyder Date: Thu, 25 May 2023 16:59:04 +0000 Subject: [PATCH] rgw: add radosgw-admin bucket check olh/unlinked commands Adds commands to radosgw-admin for checking for and fixing leftover entries in the bucket index (and associated RADOS objects). Fixes: https://tracker.ceph.com/issues/62075 Signed-off-by: Cory Snyder (cherry picked from commit 9b2042a0750109f575b02c9c4b127dd941994662) Conflicts: src/rgw/driver/rados/rgw_bucket.cc src/rgw/driver/rados/rgw_bucket.h src/rgw/driver/rados/rgw_rados.cc src/rgw/rgw_admin.cc Cherry-pick notes: - conflicts due to rados files being moved into driver directory - conflicts due to rename of rados classes --- qa/suites/rgw/verify/tasks/bucket-check.yaml | 5 + qa/workunits/rgw/run-bucket-check.sh | 19 + qa/workunits/rgw/test_rgw_bucket_check.py | 209 +++++++++ qa/workunits/rgw/test_rgw_reshard.py | 2 +- src/rgw/rgw_admin.cc | 33 +- src/rgw/rgw_bucket.cc | 423 +++++++++++++++++++ src/rgw/rgw_bucket.h | 17 +- src/rgw/rgw_rados.cc | 12 +- src/test/cli/radosgw-admin/help.t | 11 +- 9 files changed, 719 insertions(+), 12 deletions(-) create mode 100644 qa/suites/rgw/verify/tasks/bucket-check.yaml create mode 100755 qa/workunits/rgw/run-bucket-check.sh create mode 100755 qa/workunits/rgw/test_rgw_bucket_check.py diff --git a/qa/suites/rgw/verify/tasks/bucket-check.yaml b/qa/suites/rgw/verify/tasks/bucket-check.yaml new file mode 100644 index 0000000000000..4955d41c6477b --- /dev/null +++ b/qa/suites/rgw/verify/tasks/bucket-check.yaml @@ -0,0 +1,5 @@ +tasks: +- workunit: + clients: + client.0: + - rgw/run-bucket-check.sh diff --git a/qa/workunits/rgw/run-bucket-check.sh b/qa/workunits/rgw/run-bucket-check.sh new file mode 100755 index 0000000000000..85e02db5eb715 --- /dev/null +++ b/qa/workunits/rgw/run-bucket-check.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -ex + +# assume working ceph environment (radosgw-admin in path) and rgw on localhost:80 +# localhost::443 for ssl + +mydir=`dirname $0` + +python3 -m venv $mydir +source $mydir/bin/activate +pip install pip --upgrade +pip install boto3 + +## run test +$mydir/bin/python3 $mydir/test_rgw_bucket_check.py + +deactivate +echo OK. + diff --git a/qa/workunits/rgw/test_rgw_bucket_check.py b/qa/workunits/rgw/test_rgw_bucket_check.py new file mode 100755 index 0000000000000..4b4f776e8488f --- /dev/null +++ b/qa/workunits/rgw/test_rgw_bucket_check.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 + +import logging as log +import json +import random +import botocore +from common import exec_cmd, create_user, boto_connect +from time import sleep +from botocore.config import Config + +""" +Tests behavior of radosgw-admin bucket check commands. +""" +# The test cases in this file have been annotated for inventory. +# To extract the inventory (in csv format) use the command: +# +# grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //' +# +# + +""" Constants """ +USER = 'check-tester' +DISPLAY_NAME = 'Check Testing' +ACCESS_KEY = 'OJODXSLNX4LUNHQG99PA' +SECRET_KEY = '3l6ffld34qaymfomuh832j94738aie2x4p2o8h6n' +BUCKET_NAME = 'check-bucket' + +def put_objects(bucket, key_list): + objs = [] + for key in key_list: + o = bucket.put_object(Key=key, Body=b"some_data") + objs.append((o.key, o.version_id)) + return objs + +def create_unlinked_objects(conn, bucket, key_list): + # creates an unlinked/unlistable object for each key in key_list + + object_versions = [] + try: + exec_cmd('ceph config set client rgw_debug_inject_set_olh_err 2') + exec_cmd('ceph config set client rgw_debug_inject_olh_cancel_modification_err true') + sleep(1) + for key in key_list: + tag = str(random.randint(0, 1_000_000)) + try: + bucket.put_object(Key=key, Body=b"some_data", Metadata = { + 'tag': tag, + }) + except Exception as e: + log.debug(e) + out = exec_cmd(f'radosgw-admin bi list --bucket {bucket.name} --object {key}') + instance_entries = filter( + lambda x: x['type'] == 'instance', + json.loads(out.replace(b'\x80', b'0x80'))) + found = False + for ie in instance_entries: + instance_id = ie['entry']['instance'] + ov = conn.ObjectVersion(bucket.name, key, instance_id).head() + if ov['Metadata'] and ov['Metadata']['tag'] == tag: + object_versions.append((key, instance_id)) + found = True + break + if not found: + raise Exception(f'failed to create unlinked object for key={key}') + finally: + exec_cmd('ceph config rm client rgw_debug_inject_set_olh_err') + exec_cmd('ceph config rm client rgw_debug_inject_olh_cancel_modification_err') + return object_versions + +def main(): + """ + execute bucket check commands + """ + create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY) + + connection = boto_connect(ACCESS_KEY, SECRET_KEY, Config(retries = { + 'total_max_attempts': 1, + })) + + # pre-test cleanup + try: + bucket = connection.Bucket(BUCKET_NAME) + bucket.objects.all().delete() + bucket.object_versions.all().delete() + bucket.delete() + except botocore.exceptions.ClientError as e: + if not e.response['Error']['Code'] == 'NoSuchBucket': + raise + + bucket = connection.create_bucket(Bucket=BUCKET_NAME) + + null_version_keys = ['a', 'z'] + null_version_objs = put_objects(bucket, null_version_keys) + + connection.BucketVersioning(BUCKET_NAME).enable() + + ok_keys = ['a', 'b', 'c', 'd'] + unlinked_keys = ['c', 'd', 'e', 'f'] + ok_objs = put_objects(bucket, ok_keys) + + # TESTCASE 'bucket check unlinked does not report normal entries' + log.debug('TEST: bucket check unlinked does not report normal entries\n') + out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys') + json_out = json.loads(out) + assert len(json_out) == 0 + + unlinked_objs = create_unlinked_objects(connection, bucket, unlinked_keys) + + # TESTCASE 'bucket check unlinked finds unlistable entries' + log.debug('TEST: bucket check unlinked finds unlistable entries\n') + out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys') + json_out = json.loads(out) + assert len(json_out) == len(unlinked_keys) + + # TESTCASE 'unlinked entries are not listable' + log.debug('TEST: unlinked entries are not listable\n') + for ov in bucket.object_versions.all(): + assert (ov.key, ov.version_id) not in unlinked_objs, f'object "{ov.key}:{ov.version_id}" was found in bucket listing' + + # TESTCASE 'GET returns 404 for unlinked entry keys that have no other versions' + log.debug('TEST: GET returns 404 for unlinked entry keys that have no other versions\n') + noent_keys = set(unlinked_keys) - set(ok_keys) + for key in noent_keys: + try: + bucket.Object(key).get() + assert False, 'GET did not return 404 for key={key} with no prior successful PUT' + except botocore.exceptions.ClientError as e: + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 404 + + # TESTCASE 'bucket check unlinked fixes unlistable entries' + log.debug('TEST: bucket check unlinked fixes unlistable entries\n') + out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --fix --min-age-hours 0 --rgw-olh-pending-timeout-sec 0 --dump-keys') + json_out = json.loads(out) + assert len(json_out) == len(unlinked_keys) + for o in unlinked_objs: + try: + connection.ObjectVersion(bucket.name, o[0], o[1]).head() + assert False, f'head for unlistable object {o[0]}:{o[1]} succeeded after fix' + except botocore.exceptions.ClientError as e: + assert e.response['ResponseMetadata']['HTTPStatusCode'] == 404 + + # TESTCASE 'bucket check unlinked fix does not affect normal entries' + log.debug('TEST: bucket check unlinked does not affect normal entries\n') + all_listable = list(bucket.object_versions.all()) + assert len(all_listable) == len(ok_keys) + len(null_version_keys), 'some normal objects were not accounted for in object listing after unlinked fix' + for o in ok_objs: + assert o in map(lambda x: (x.key, x.version_id), all_listable), "normal object not listable after fix" + connection.ObjectVersion(bucket.name, o[0], o[1]).head() + + # TESTCASE 'bucket check unlinked does not find new unlistable entries after fix' + log.debug('TEST: bucket check unlinked does not find new unlistable entries after fix\n') + out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys') + json_out = json.loads(out) + assert len(json_out) == 0 + + # for this set of keys we can produce leftover OLH object/entries by + # deleting the normal object instance since we should already have a leftover + # pending xattr on the OLH object due to the errors associated with the + # prior unlinked entries that were created for the same keys + leftover_pending_xattr_keys = set(ok_keys).intersection(unlinked_keys) + objs_to_delete = filter(lambda x: x[0] in leftover_pending_xattr_keys, ok_objs) + + for o in objs_to_delete: + connection.ObjectVersion(bucket.name, o[0], o[1]).delete() + + for key in leftover_pending_xattr_keys: + out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}') + idx_entries = json.loads(out.replace(b'\x80', b'0x80')) + assert len(idx_entries) > 0, 'failed to create leftover OLH entries for key {key}' + + # TESTCASE 'bucket check olh finds leftover OLH entries' + log.debug('TEST: bucket check olh finds leftover OLH entries\n') + out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --dump-keys') + json_out = json.loads(out) + assert len(json_out) == len(leftover_pending_xattr_keys) + + # TESTCASE 'bucket check olh fixes leftover OLH entries' + log.debug('TEST: bucket check olh fixes leftover OLH entries\n') + out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --fix --rgw-olh-pending-timeout-sec 0 --dump-keys') + json_out = json.loads(out) + assert len(json_out) == len(leftover_pending_xattr_keys) + + for key in leftover_pending_xattr_keys: + out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}') + idx_entries = json.loads(out.replace(b'\x80', b'0x80')) + assert len(idx_entries) == 0, 'index entries still exist for key={key} after olh fix' + + # TESTCASE 'bucket check olh does not find new leftover OLH entries after fix' + log.debug('TEST: bucket check olh does not find new leftover OLH entries after fix\n') + out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --dump-keys') + json_out = json.loads(out) + assert len(json_out) == 0 + + # TESTCASE 'bucket check fixes do not affect null version objects' + log.debug('TEST: verify that bucket check fixes do not affect null version objects\n') + for o in null_version_objs: + connection.ObjectVersion(bucket.name, o[0], 'null').head() + + all_versions = list(map(lambda x: (x.key, x.version_id), bucket.object_versions.all())) + for key in null_version_keys: + assert (key, 'null') in all_versions + + # Clean up + log.debug("Deleting bucket {}".format(BUCKET_NAME)) + bucket.object_versions.all().delete() + bucket.delete() + +main() +log.info("Completed bucket check tests") diff --git a/qa/workunits/rgw/test_rgw_reshard.py b/qa/workunits/rgw/test_rgw_reshard.py index 54fddb469e9f5..842f70da14b59 100755 --- a/qa/workunits/rgw/test_rgw_reshard.py +++ b/qa/workunits/rgw/test_rgw_reshard.py @@ -75,7 +75,7 @@ def main(): execute manual and dynamic resharding commands """ create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY) - + connection = boto_connect(ACCESS_KEY, SECRET_KEY) # create a bucket diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 0c0bd34055534..862daea15c85a 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -6,6 +6,7 @@ #include #include +#include #include extern "C" { @@ -131,7 +132,9 @@ void usage() cout << " bucket unlink unlink bucket from specified user\n"; cout << " bucket stats returns bucket statistics\n"; cout << " bucket rm remove bucket\n"; - cout << " bucket check check bucket index\n"; + cout << " bucket check check bucket index by verifying size and object count stats\n"; + cout << " bucket check olh check for olh index entries and objects that are pending removal\n"; + cout << " bucket check unlinked check for object versions that are not visible in a bucket listing \n"; cout << " bucket chown link bucket to specified user and update its object ACLs\n"; cout << " bucket reshard reshard bucket\n"; cout << " bucket rewrite rewrite all objects in the specified bucket\n"; @@ -439,6 +442,10 @@ void usage() cout << " --context context in which the script runs. one of: preRequest, postRequest\n"; cout << " --package name of the lua package that should be added/removed to/from the allowlist\n"; cout << " --allow-compilation package is allowed to compile C code as part of its installation\n"; + cout << "\nBucket check olh/unlinked options:\n"; + cout << " --min-age-hours minimum age of unlinked objects to consider for bucket check unlinked (default: 1)\n"; + cout << " --dump-keys when specified, all keys identified as problematic are printed to stdout\n"; + cout << " --hide-progress when specified, per-shard progress details are not printed to stderr\n"; cout << "\nradoslist options:\n"; cout << " --rgw-obj-fs the field separator that will separate the rados\n"; cout << " object name from the rgw object name;\n"; @@ -606,6 +613,8 @@ enum class OPT { BUCKET_UNLINK, BUCKET_STATS, BUCKET_CHECK, + BUCKET_CHECK_OLH, + BUCKET_CHECK_UNLINKED, BUCKET_SYNC_CHECKPOINT, BUCKET_SYNC_INFO, BUCKET_SYNC_STATUS, @@ -812,6 +821,8 @@ static SimpleCmd::Commands all_cmds = { { "bucket unlink", OPT::BUCKET_UNLINK }, { "bucket stats", OPT::BUCKET_STATS }, { "bucket check", OPT::BUCKET_CHECK }, + { "bucket check olh", OPT::BUCKET_CHECK_OLH }, + { "bucket check unlinked", OPT::BUCKET_CHECK_UNLINKED }, { "bucket sync checkpoint", OPT::BUCKET_SYNC_CHECKPOINT }, { "bucket sync info", OPT::BUCKET_SYNC_INFO }, { "bucket sync status", OPT::BUCKET_SYNC_STATUS }, @@ -3161,6 +3172,9 @@ int main(int argc, const char **argv) bool num_shards_specified = false; std::optional bucket_index_max_shards; int max_concurrent_ios = 32; + ceph::timespan min_age = std::chrono::hours(1); + bool hide_progress = false; + bool dump_keys = false; uint64_t orphan_stale_secs = (24 * 3600); int detail = false; @@ -3380,6 +3394,8 @@ int main(int argc, const char **argv) cerr << "ERROR: failed to parse max concurrent ios: " << err << std::endl; return EINVAL; } + } else if (ceph_argparse_witharg(args, i, &val, "--min-age-hours", (char*)NULL)) { + min_age = std::chrono::hours(atoi(val.c_str())); } else if (ceph_argparse_witharg(args, i, &val, "--orphan-stale-secs", (char*)NULL)) { orphan_stale_secs = (uint64_t)strict_strtoll(val.c_str(), 10, &err); if (!err.empty()) { @@ -3456,6 +3472,10 @@ int main(int argc, const char **argv) // do nothing } else if (ceph_argparse_binary_flag(args, i, &inconsistent_index, NULL, "--inconsistent-index", (char*)NULL)) { // do nothing + } else if (ceph_argparse_flag(args, i, "--hide-progress", (char*)NULL)) { + hide_progress = true; + } else if (ceph_argparse_flag(args, i, "--dump-keys", (char*)NULL)) { + dump_keys = true; } else if (ceph_argparse_witharg(args, i, &val, "--caps", (char*)NULL)) { caps = val; } else if (ceph_argparse_witharg(args, i, &val, "-i", "--infile", (char*)NULL)) { @@ -5598,6 +5618,9 @@ int main(int argc, const char **argv) bucket_op.set_delete_children(delete_child_objects); bucket_op.set_fix_index(fix); bucket_op.set_max_aio(max_concurrent_ios); + bucket_op.set_min_age(min_age); + bucket_op.set_dump_keys(dump_keys); + bucket_op.set_hide_progress(hide_progress); // required to gather errors from operations std::string err_msg; @@ -7246,6 +7269,14 @@ next: } } + if (opt_cmd == OPT::BUCKET_CHECK_OLH) { + RGWBucketAdminOp::check_index_olh(store, bucket_op, f, dpp()); + } + + if (opt_cmd == OPT::BUCKET_CHECK_UNLINKED) { + RGWBucketAdminOp::check_index_unlinked(store, bucket_op, f, dpp()); + } + if (opt_cmd == OPT::BUCKET_RM) { if (!inconsistent_index) { RGWBucketAdminOp::remove_bucket(store, bucket_op, null_yield, dpp(), bypass_gc, true); diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 3d680514af112..f73e8c4489d17 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -58,6 +58,15 @@ #define BUCKET_TAG_TIMEOUT 30 +// these values are copied from cls/rgw/cls_rgw.cc +static const string BI_OLH_ENTRY_NS_START = "\x80" "1001_"; +static const string BI_INSTANCE_ENTRY_NS_START = "\x80" "1000_"; + +// number of characters that we should allow to be buffered by the formatter +// before flushing (used by index check methods with dump_keys=true) +static constexpr int FORMATTER_LEN_FLUSH_THRESHOLD = 4 * 1024 * 1024; + + // default number of entries to list with each bucket listing call // (use marker to bridge between calls) static constexpr size_t listing_max_entries = 1000; @@ -996,6 +1005,380 @@ int RGWBucket::check_object_index(const DoutPrefixProvider *dpp, return 0; } +/** + * Loops over all olh entries in a bucket shard and finds ones with + * exists=false and pending_removal=true. If the pending log is empty on + * these entries, they were left behind after the last remaining version of + * an object was deleted or after an incomplete upload. This was known to + * happen historically due to concurrency conflicts among requests referencing + * the same object key. If op_state.fix_index is true, we continue where the + * request left off by calling RGWRados::clear_olh. If the pending log is not + * empty, we attempt to apply it. + */ +static int check_index_olh(rgw::sal::RGWRadosStore* const rados_store, + rgw::sal::RGWRadosBucket* const bucket, + const DoutPrefixProvider *dpp, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + const int shard, + uint64_t* const count_out, + optional_yield y) +{ + string marker = BI_OLH_ENTRY_NS_START; + bool is_truncated = true; + list entries; + + RGWObjectCtx obj_ctx(rados_store); + RGWRados* store = rados_store->getRados(); + RGWRados::BucketShard bs(store); + + int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl; + return ret; + } + + *count_out = 0; + do { + entries.clear(); + ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl; + break; + } + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + marker = entry.idx; + if (entry.type != BIIndexType::OLH) { + is_truncated = false; + break; + } + rgw_bucket_olh_entry olh_entry; + auto iiter = entry.data.cbegin(); + try { + decode(olh_entry, iiter); + } catch (buffer::error& err) { + ldpp_dout(dpp, -1) << "ERROR failed to decode olh entry for key: " << entry.idx << dendl; + continue; + } + if (olh_entry.exists || !olh_entry.pending_removal) { + continue; + } + if (op_state.will_fix_index()) { + rgw_obj obj(bucket->get_key(), olh_entry.key.name); + if (olh_entry.pending_log.empty()) { + ret = store->clear_olh(dpp, obj_ctx, obj, bucket->get_info(), olh_entry.tag, olh_entry.epoch, y); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR failed to clear olh for: " << olh_entry.key.name << " clear_olh(): " << cpp_strerror(-ret) << dendl; + continue; + } + } else { + std::unique_ptr object = bucket->get_object({olh_entry.key.name}); + RGWObjState *state; + ret = object->get_obj_state(dpp, &obj_ctx, *bucket, &state, y, false); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR failed to get state for: " << olh_entry.key.name << " get_obj_state(): " << cpp_strerror(-ret) << dendl; + continue; + } + ret = store->update_olh(dpp, obj_ctx, state, bucket->get_info(), obj); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR failed to update olh for: " << olh_entry.key.name << " update_olh(): " << cpp_strerror(-ret) << dendl; + continue; + } + } + } + if (op_state.dump_keys) { + flusher.get_formatter()->dump_string("", olh_entry.key.name); + if (flusher.get_formatter()->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) { + flusher.flush(); + } + } + *count_out += 1; + } + } while (is_truncated); + flusher.flush(); + return 0; +} + +/** + * Spawns separate coroutines to check each bucket shard for leftover + * olh entries (and remove them if op_state.fix_index is true). + */ +int RGWBucket::check_index_olh(rgw::sal::RGWRadosStore* const rados_store, + const DoutPrefixProvider *dpp, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher) +{ + if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) { + ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl; + return 0; + } + + Formatter* formatter = flusher.get_formatter(); + if (op_state.dump_keys) { + formatter->open_array_section(""); + } + + const int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1); + std::string verb = op_state.will_fix_index() ? "removed" : "found"; + uint64_t count_out = 0; + + boost::asio::io_context context; + int next_shard = 0; + + const int max_aio = std::max(1, op_state.get_max_aio()); + rgw::sal::RGWRadosBucket rados_bucket(rados_store, bucket_info); + + for (int i=0; i= max_shards) { + return; + } + optional_yield y(context, yield); + uint64_t shard_count; + int r = ::check_index_olh(rados_store, &rados_bucket, dpp, op_state, flusher, shard, &shard_count, y); + if (r < 0) { + ldpp_dout(dpp, -1) << "NOTICE: error processing shard " << shard << + " check_index_olh(): " << r << dendl; + } + count_out += shard_count; + if (!op_state.hide_progress) { + ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count << + " entries " << verb << ")" << dendl; + } + } + }); + } + try { + context.run(); + } catch (const std::system_error& e) { + return -e.code().value(); + } + if (!op_state.hide_progress) { + ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out << + " entries " << verb << ")" << dendl; + } + if (op_state.dump_keys) { + formatter->close_section(); + flusher.flush(); + } + return 0; +} + +/** + * Indicates whether a versioned bucket instance entry is listable in the + * index. It does this by looping over all plain entries with prefix equal to + * the key name, and checking whether any have an instance ID matching the one + * on the specified key. The existence of an instance entry without a matching + * plain entry indicates that the object was uploaded successfully, but the + * request exited prior to linking the object into the index (via the creation + * of a plain entry). + */ +static int is_versioned_instance_listable(const DoutPrefixProvider *dpp, + RGWRados::BucketShard& bs, + const cls_rgw_obj_key& key, + bool& listable, + optional_yield y) +{ + const std::string empty_delim; + cls_rgw_obj_key marker; + rgw_cls_list_ret result; + listable = false; + + do { + librados::ObjectReadOperation op; + cls_rgw_bucket_list_op(op, marker, key.name, empty_delim, 1000, + true, &result); + bufferlist ibl; + int r = bs.bucket_obj.operate(dpp, &op, &ibl, y); + if (r < 0) { + return r; + } + + for (auto const& entry : result.dir.m) { + if (entry.second.key == key) { + listable = true; + return 0; + } + marker = entry.second.key; + } + } while (result.is_truncated); + return 0; +} + +/** + * Loops over all instance entries in a bucket shard and finds ones with + * versioned_epoch=0 and an mtime that is earlier than op_state.min_age + * relative to the current time. These entries represent objects that were + * uploaded successfully but were not successfully linked into the object + * index. As an extra precaution, we also verify that these entries are indeed + * non listable (have no corresponding plain entry in the index). We can assume + * that clients received an error response for the associated upload requests + * since the bucket index linking transaction did not complete. Therefore, if + * op_state.fix_index is true, we remove the object that is associated with the + * instance entry. + */ +static int check_index_unlinked(rgw::sal::RGWRadosStore* const rados_store, + rgw::sal::RGWRadosBucket* const bucket, + const DoutPrefixProvider *dpp, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + const int shard, + uint64_t* const count_out, + optional_yield y) +{ + string marker = BI_INSTANCE_ENTRY_NS_START; + bool is_truncated = true; + list entries; + + RGWObjectCtx obj_ctx(rados_store); + RGWRados* store = rados_store->getRados(); + RGWRados::BucketShard bs(store); + + int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl; + return ret; + } + + ceph::real_clock::time_point now = ceph::real_clock::now(); + ceph::real_clock::time_point not_after = now - op_state.min_age; + + *count_out = 0; + do { + entries.clear(); + ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl; + break; + } + list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_cls_bi_entry& entry = *iter; + marker = entry.idx; + if (entry.type != BIIndexType::Instance) { + is_truncated = false; + break; + } + rgw_bucket_dir_entry dir_entry; + auto iiter = entry.data.cbegin(); + try { + decode(dir_entry, iiter); + } catch (buffer::error& err) { + ldpp_dout(dpp, -1) << "ERROR failed to decode instance entry for key: " << + entry.idx << dendl; + continue; + } + if (dir_entry.versioned_epoch != 0 || dir_entry.meta.mtime > not_after) { + continue; + } + bool listable; + ret = is_versioned_instance_listable(dpp, bs, dir_entry.key, listable, y); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR is_versioned_instance_listable(key='" << + dir_entry.key << "'): " << cpp_strerror(-ret) << dendl; + continue; + } + if (listable) { + continue; + } + if (op_state.will_fix_index()) { + rgw_obj_key key(dir_entry.key.name, dir_entry.key.instance); + ret = rgw_remove_object(dpp, rados_store, bucket->get_info(), bucket->get_key(), key); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR rgw_remove_obj(key='" << + dir_entry.key << "'): " << cpp_strerror(-ret) << dendl; + continue; + } + } + if (op_state.dump_keys) { + Formatter* const formatter = flusher.get_formatter(); + formatter->open_object_section("object_instance"); + formatter->dump_string("name", dir_entry.key.name); + formatter->dump_string("instance", dir_entry.key.instance); + formatter->close_section(); + if (formatter->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) { + flusher.flush(); + } + } + *count_out += 1; + } + } while (is_truncated); + flusher.flush(); + return 0; +} + +/** + * Spawns separate coroutines to check each bucket shard for unlinked + * instance entries (and remove them if op_state.fix_index is true). + */ +int RGWBucket::check_index_unlinked(rgw::sal::RGWRadosStore* const rados_store, + const DoutPrefixProvider *dpp, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher) +{ + if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) { + ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl; + return 0; + } + + Formatter* formatter = flusher.get_formatter(); + if (op_state.dump_keys) { + formatter->open_array_section(""); + } + + const int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1); + std::string verb = op_state.will_fix_index() ? "removed" : "found"; + uint64_t count_out = 0; + + int max_aio = std::max(1, op_state.get_max_aio()); + int next_shard = 0; + boost::asio::io_context context; + rgw::sal::RGWRadosBucket rados_bucket(rados_store, bucket_info); + + for (int i=0; i= max_shards) { + return; + } + uint64_t shard_count; + optional_yield y {context, yield}; + int r = ::check_index_unlinked(rados_store, &rados_bucket, dpp, op_state, flusher, shard, &shard_count, y); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard << + " check_index_unlinked(): " << r << dendl; + } + count_out += shard_count; + if (!op_state.hide_progress) { + ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count << + " entries " << verb << ")" << dendl; + } + } + }); + } + try { + context.run(); + } catch (const std::system_error& e) { + return -e.code().value(); + } + + if (!op_state.hide_progress) { + ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out << + " entries " << verb << ")" << dendl; + } + if (op_state.dump_keys) { + formatter->close_section(); + flusher.flush(); + } + return 0; +} int RGWBucket::check_index(const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, @@ -1231,6 +1614,46 @@ int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpStat } +int RGWBucketAdminOp::check_index_olh(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp) +{ + RGWBucket bucket; + int ret = bucket.init(store, op_state, null_yield, dpp); + if (ret < 0) { + ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl; + return ret; + } + flusher.start(0); + ret = bucket.check_index_olh(store, dpp, op_state, flusher); + if (ret < 0) { + ldpp_dout(dpp, -1) << "check_index_olh(): " << ret << dendl; + return ret; + } + flusher.flush(); + return 0; +} + +int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RGWRadosStore* store, + RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + const DoutPrefixProvider *dpp) +{ + flusher.start(0); + RGWBucket bucket; + int ret = bucket.init(store, op_state, null_yield, dpp); + if (ret < 0) { + ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl; + return ret; + } + ret = bucket.check_index_unlinked(store, dpp, op_state, flusher); + if (ret < 0) { + ldpp_dout(dpp, -1) << "check_index_unlinked(): " << ret << dendl; + return ret; + } + flusher.flush(); + return 0; +} + int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp) { diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index f9c6aa1945a4e..09088e0d83ebf 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -261,7 +261,10 @@ struct RGWBucketAdminOpState { bool delete_child_objects; bool bucket_stored; bool sync_bucket; + bool dump_keys; + bool hide_progress; int max_aio = 0; + ceph::timespan min_age = std::chrono::hours::zero(); rgw_bucket bucket; @@ -271,8 +274,11 @@ struct RGWBucketAdminOpState { void set_check_objects(bool value) { check_objects = value; } void set_fix_index(bool value) { fix_index = value; } void set_delete_children(bool value) { delete_child_objects = value; } + void set_hide_progress(bool value) { hide_progress = value; } + void set_dump_keys(bool value) { dump_keys = value; } void set_max_aio(int value) { max_aio = value; } + void set_min_age(ceph::timespan value) { min_age = value; } void set_user_id(const rgw_user& user_id) { if (!user_id.empty()) @@ -326,7 +332,8 @@ struct RGWBucketAdminOpState { RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false), fix_index(false), delete_child_objects(false), - bucket_stored(false), sync_bucket(true) {} + bucket_stored(false), sync_bucket(true), + dump_keys(false), hide_progress(false) {} }; /* @@ -361,6 +368,10 @@ public: RGWFormatterFlusher& flusher, optional_yield y, std::string *err_msg = NULL); + int check_index_olh(rgw::sal::RGWRadosStore* rados_store, const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); + int check_index_unlinked(rgw::sal::RGWRadosStore* rados_store, const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); int check_index(const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state, @@ -401,6 +412,10 @@ public: static int check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp); + static int check_index_olh(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp); + static int check_index_unlinked(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp); static int remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, optional_yield y, const DoutPrefixProvider *dpp, bool bypass_gc = false, bool keep_index_consistent = true); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4b1cb1c45e2a0..4004135747e4a 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7366,12 +7366,12 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, ldpp_dout(dpp, 0) << "ERROR: could not clear olh, r=" << r << dendl; return r; } - } - - r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver); - if (r < 0 && r != -ECANCELED) { - ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl; - return r; + } else { + r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver); + if (r < 0 && r != -ECANCELED) { + ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl; + return r; + } } return 0; diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index c63c63cb55e2c..5c9fb6a288663 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -25,7 +25,9 @@ bucket unlink unlink bucket from specified user bucket stats returns bucket statistics bucket rm remove bucket - bucket check check bucket index + bucket check check bucket index by verifying size and object count stats + bucket check olh check for olh index entries and objects that are pending removal + bucket check unlinked check for object versions that are not visible in a bucket listing bucket chown link bucket to specified user and update its object ACLs bucket reshard reshard bucket bucket rewrite rewrite all objects in the specified bucket @@ -341,6 +343,11 @@ --package name of the lua package that should be added/removed to/from the allowlist --allow-compilation package is allowed to compile C code as part of its installation + Bucket check olh/unlinked options: + --min-age-hours minimum age of unlinked objects to consider for bucket check unlinked (default: 1) + --dump-keys when specified, all keys identified as problematic are printed to stdout + --hide-progress when specified, per-shard progress details are not printed to stderr + radoslist options: --rgw-obj-fs the field separator that will separate the rados object name from the rgw object name; @@ -355,5 +362,3 @@ --setgroup GROUP set gid to group or gid --version show version and quit - - -- 2.39.5