--- /dev/null
+tasks:
+- workunit:
+ clients:
+ client.0:
+ - rgw/run-bucket-check.sh
--- /dev/null
+#!/usr/bin/env bash
+set -ex
+
+# assume working ceph environment (radosgw-admin in path) and rgw on localhost:80
+# localhost::443 for ssl
+
+mydir=`dirname $0`
+
+python3 -m venv $mydir
+source $mydir/bin/activate
+pip install pip --upgrade
+pip install boto3
+
+## run test
+$mydir/bin/python3 $mydir/test_rgw_bucket_check.py
+
+deactivate
+echo OK.
+
--- /dev/null
+#!/usr/bin/env python3
+
+import logging as log
+import json
+import random
+import botocore
+from common import exec_cmd, create_user, boto_connect
+from time import sleep
+from botocore.config import Config
+
+"""
+Tests behavior of radosgw-admin bucket check commands.
+"""
+# The test cases in this file have been annotated for inventory.
+# To extract the inventory (in csv format) use the command:
+#
+# grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //'
+#
+#
+
+""" Constants """
+USER = 'check-tester'
+DISPLAY_NAME = 'Check Testing'
+ACCESS_KEY = 'OJODXSLNX4LUNHQG99PA'
+SECRET_KEY = '3l6ffld34qaymfomuh832j94738aie2x4p2o8h6n'
+BUCKET_NAME = 'check-bucket'
+
+def put_objects(bucket, key_list):
+ objs = []
+ for key in key_list:
+ o = bucket.put_object(Key=key, Body=b"some_data")
+ objs.append((o.key, o.version_id))
+ return objs
+
+def create_unlinked_objects(conn, bucket, key_list):
+ # creates an unlinked/unlistable object for each key in key_list
+
+ object_versions = []
+ try:
+ exec_cmd('ceph config set client rgw_debug_inject_set_olh_err 2')
+ exec_cmd('ceph config set client rgw_debug_inject_olh_cancel_modification_err true')
+ sleep(1)
+ for key in key_list:
+ tag = str(random.randint(0, 1_000_000))
+ try:
+ bucket.put_object(Key=key, Body=b"some_data", Metadata = {
+ 'tag': tag,
+ })
+ except Exception as e:
+ log.debug(e)
+ out = exec_cmd(f'radosgw-admin bi list --bucket {bucket.name} --object {key}')
+ instance_entries = filter(
+ lambda x: x['type'] == 'instance',
+ json.loads(out.replace(b'\x80', b'0x80')))
+ found = False
+ for ie in instance_entries:
+ instance_id = ie['entry']['instance']
+ ov = conn.ObjectVersion(bucket.name, key, instance_id).head()
+ if ov['Metadata'] and ov['Metadata']['tag'] == tag:
+ object_versions.append((key, instance_id))
+ found = True
+ break
+ if not found:
+ raise Exception(f'failed to create unlinked object for key={key}')
+ finally:
+ exec_cmd('ceph config rm client rgw_debug_inject_set_olh_err')
+ exec_cmd('ceph config rm client rgw_debug_inject_olh_cancel_modification_err')
+ return object_versions
+
+def main():
+ """
+ execute bucket check commands
+ """
+ create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)
+
+ connection = boto_connect(ACCESS_KEY, SECRET_KEY, Config(retries = {
+ 'total_max_attempts': 1,
+ }))
+
+ # pre-test cleanup
+ try:
+ bucket = connection.Bucket(BUCKET_NAME)
+ bucket.objects.all().delete()
+ bucket.object_versions.all().delete()
+ bucket.delete()
+ except botocore.exceptions.ClientError as e:
+ if not e.response['Error']['Code'] == 'NoSuchBucket':
+ raise
+
+ bucket = connection.create_bucket(Bucket=BUCKET_NAME)
+
+ null_version_keys = ['a', 'z']
+ null_version_objs = put_objects(bucket, null_version_keys)
+
+ connection.BucketVersioning(BUCKET_NAME).enable()
+
+ ok_keys = ['a', 'b', 'c', 'd']
+ unlinked_keys = ['c', 'd', 'e', 'f']
+ ok_objs = put_objects(bucket, ok_keys)
+
+ # TESTCASE 'bucket check unlinked does not report normal entries'
+ log.debug('TEST: bucket check unlinked does not report normal entries\n')
+ out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == 0
+
+ unlinked_objs = create_unlinked_objects(connection, bucket, unlinked_keys)
+
+ # TESTCASE 'bucket check unlinked finds unlistable entries'
+ log.debug('TEST: bucket check unlinked finds unlistable entries\n')
+ out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == len(unlinked_keys)
+
+ # TESTCASE 'unlinked entries are not listable'
+ log.debug('TEST: unlinked entries are not listable\n')
+ for ov in bucket.object_versions.all():
+ assert (ov.key, ov.version_id) not in unlinked_objs, f'object "{ov.key}:{ov.version_id}" was found in bucket listing'
+
+ # TESTCASE 'GET returns 404 for unlinked entry keys that have no other versions'
+ log.debug('TEST: GET returns 404 for unlinked entry keys that have no other versions\n')
+ noent_keys = set(unlinked_keys) - set(ok_keys)
+ for key in noent_keys:
+ try:
+ bucket.Object(key).get()
+ assert False, 'GET did not return 404 for key={key} with no prior successful PUT'
+ except botocore.exceptions.ClientError as e:
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 404
+
+ # TESTCASE 'bucket check unlinked fixes unlistable entries'
+ log.debug('TEST: bucket check unlinked fixes unlistable entries\n')
+ out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --fix --min-age-hours 0 --rgw-olh-pending-timeout-sec 0 --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == len(unlinked_keys)
+ for o in unlinked_objs:
+ try:
+ connection.ObjectVersion(bucket.name, o[0], o[1]).head()
+ assert False, f'head for unlistable object {o[0]}:{o[1]} succeeded after fix'
+ except botocore.exceptions.ClientError as e:
+ assert e.response['ResponseMetadata']['HTTPStatusCode'] == 404
+
+ # TESTCASE 'bucket check unlinked fix does not affect normal entries'
+ log.debug('TEST: bucket check unlinked does not affect normal entries\n')
+ all_listable = list(bucket.object_versions.all())
+ assert len(all_listable) == len(ok_keys) + len(null_version_keys), 'some normal objects were not accounted for in object listing after unlinked fix'
+ for o in ok_objs:
+ assert o in map(lambda x: (x.key, x.version_id), all_listable), "normal object not listable after fix"
+ connection.ObjectVersion(bucket.name, o[0], o[1]).head()
+
+ # TESTCASE 'bucket check unlinked does not find new unlistable entries after fix'
+ log.debug('TEST: bucket check unlinked does not find new unlistable entries after fix\n')
+ out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == 0
+
+ # for this set of keys we can produce leftover OLH object/entries by
+ # deleting the normal object instance since we should already have a leftover
+ # pending xattr on the OLH object due to the errors associated with the
+ # prior unlinked entries that were created for the same keys
+ leftover_pending_xattr_keys = set(ok_keys).intersection(unlinked_keys)
+ objs_to_delete = filter(lambda x: x[0] in leftover_pending_xattr_keys, ok_objs)
+
+ for o in objs_to_delete:
+ connection.ObjectVersion(bucket.name, o[0], o[1]).delete()
+
+ for key in leftover_pending_xattr_keys:
+ out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}')
+ idx_entries = json.loads(out.replace(b'\x80', b'0x80'))
+ assert len(idx_entries) > 0, 'failed to create leftover OLH entries for key {key}'
+
+ # TESTCASE 'bucket check olh finds leftover OLH entries'
+ log.debug('TEST: bucket check olh finds leftover OLH entries\n')
+ out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == len(leftover_pending_xattr_keys)
+
+ # TESTCASE 'bucket check olh fixes leftover OLH entries'
+ log.debug('TEST: bucket check olh fixes leftover OLH entries\n')
+ out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --fix --rgw-olh-pending-timeout-sec 0 --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == len(leftover_pending_xattr_keys)
+
+ for key in leftover_pending_xattr_keys:
+ out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}')
+ idx_entries = json.loads(out.replace(b'\x80', b'0x80'))
+ assert len(idx_entries) == 0, 'index entries still exist for key={key} after olh fix'
+
+ # TESTCASE 'bucket check olh does not find new leftover OLH entries after fix'
+ log.debug('TEST: bucket check olh does not find new leftover OLH entries after fix\n')
+ out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --dump-keys')
+ json_out = json.loads(out)
+ assert len(json_out) == 0
+
+ # TESTCASE 'bucket check fixes do not affect null version objects'
+ log.debug('TEST: verify that bucket check fixes do not affect null version objects\n')
+ for o in null_version_objs:
+ connection.ObjectVersion(bucket.name, o[0], 'null').head()
+
+ all_versions = list(map(lambda x: (x.key, x.version_id), bucket.object_versions.all()))
+ for key in null_version_keys:
+ assert (key, 'null') in all_versions
+
+ # Clean up
+ log.debug("Deleting bucket {}".format(BUCKET_NAME))
+ bucket.object_versions.all().delete()
+ bucket.delete()
+
+main()
+log.info("Completed bucket check tests")
execute manual and dynamic resharding commands
"""
create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)
-
+
connection = boto_connect(ACCESS_KEY, SECRET_KEY)
# create a bucket
#include <sstream>
#include <string>
+#include <boost/asio.hpp>
#include <boost/optional.hpp>
extern "C" {
cout << " bucket unlink unlink bucket from specified user\n";
cout << " bucket stats returns bucket statistics\n";
cout << " bucket rm remove bucket\n";
- cout << " bucket check check bucket index\n";
+ cout << " bucket check check bucket index by verifying size and object count stats\n";
+ cout << " bucket check olh check for olh index entries and objects that are pending removal\n";
+ cout << " bucket check unlinked check for object versions that are not visible in a bucket listing \n";
cout << " bucket chown link bucket to specified user and update its object ACLs\n";
cout << " bucket reshard reshard bucket\n";
cout << " bucket rewrite rewrite all objects in the specified bucket\n";
cout << " --context context in which the script runs. one of: preRequest, postRequest\n";
cout << " --package name of the lua package that should be added/removed to/from the allowlist\n";
cout << " --allow-compilation package is allowed to compile C code as part of its installation\n";
+ cout << "\nBucket check olh/unlinked options:\n";
+ cout << " --min-age-hours minimum age of unlinked objects to consider for bucket check unlinked (default: 1)\n";
+ cout << " --dump-keys when specified, all keys identified as problematic are printed to stdout\n";
+ cout << " --hide-progress when specified, per-shard progress details are not printed to stderr\n";
cout << "\nradoslist options:\n";
cout << " --rgw-obj-fs the field separator that will separate the rados\n";
cout << " object name from the rgw object name;\n";
BUCKET_UNLINK,
BUCKET_STATS,
BUCKET_CHECK,
+ BUCKET_CHECK_OLH,
+ BUCKET_CHECK_UNLINKED,
BUCKET_SYNC_CHECKPOINT,
BUCKET_SYNC_INFO,
BUCKET_SYNC_STATUS,
{ "bucket unlink", OPT::BUCKET_UNLINK },
{ "bucket stats", OPT::BUCKET_STATS },
{ "bucket check", OPT::BUCKET_CHECK },
+ { "bucket check olh", OPT::BUCKET_CHECK_OLH },
+ { "bucket check unlinked", OPT::BUCKET_CHECK_UNLINKED },
{ "bucket sync checkpoint", OPT::BUCKET_SYNC_CHECKPOINT },
{ "bucket sync info", OPT::BUCKET_SYNC_INFO },
{ "bucket sync status", OPT::BUCKET_SYNC_STATUS },
bool num_shards_specified = false;
std::optional<int> bucket_index_max_shards;
int max_concurrent_ios = 32;
+ ceph::timespan min_age = std::chrono::hours(1);
+ bool hide_progress = false;
+ bool dump_keys = false;
uint64_t orphan_stale_secs = (24 * 3600);
int detail = false;
cerr << "ERROR: failed to parse max concurrent ios: " << err << std::endl;
return EINVAL;
}
+ } else if (ceph_argparse_witharg(args, i, &val, "--min-age-hours", (char*)NULL)) {
+ min_age = std::chrono::hours(atoi(val.c_str()));
} else if (ceph_argparse_witharg(args, i, &val, "--orphan-stale-secs", (char*)NULL)) {
orphan_stale_secs = (uint64_t)strict_strtoll(val.c_str(), 10, &err);
if (!err.empty()) {
// do nothing
} else if (ceph_argparse_binary_flag(args, i, &inconsistent_index, NULL, "--inconsistent-index", (char*)NULL)) {
// do nothing
+ } else if (ceph_argparse_flag(args, i, "--hide-progress", (char*)NULL)) {
+ hide_progress = true;
+ } else if (ceph_argparse_flag(args, i, "--dump-keys", (char*)NULL)) {
+ dump_keys = true;
} else if (ceph_argparse_witharg(args, i, &val, "--caps", (char*)NULL)) {
caps = val;
} else if (ceph_argparse_witharg(args, i, &val, "-i", "--infile", (char*)NULL)) {
bucket_op.set_delete_children(delete_child_objects);
bucket_op.set_fix_index(fix);
bucket_op.set_max_aio(max_concurrent_ios);
+ bucket_op.set_min_age(min_age);
+ bucket_op.set_dump_keys(dump_keys);
+ bucket_op.set_hide_progress(hide_progress);
// required to gather errors from operations
std::string err_msg;
}
}
+ if (opt_cmd == OPT::BUCKET_CHECK_OLH) {
+ RGWBucketAdminOp::check_index_olh(store, bucket_op, f, dpp());
+ }
+
+ if (opt_cmd == OPT::BUCKET_CHECK_UNLINKED) {
+ RGWBucketAdminOp::check_index_unlinked(store, bucket_op, f, dpp());
+ }
+
if (opt_cmd == OPT::BUCKET_RM) {
if (!inconsistent_index) {
RGWBucketAdminOp::remove_bucket(store, bucket_op, null_yield, dpp(), bypass_gc, true);
#define BUCKET_TAG_TIMEOUT 30
+// these values are copied from cls/rgw/cls_rgw.cc
+static const string BI_OLH_ENTRY_NS_START = "\x80" "1001_";
+static const string BI_INSTANCE_ENTRY_NS_START = "\x80" "1000_";
+
+// number of characters that we should allow to be buffered by the formatter
+// before flushing (used by index check methods with dump_keys=true)
+static constexpr int FORMATTER_LEN_FLUSH_THRESHOLD = 4 * 1024 * 1024;
+
+
// default number of entries to list with each bucket listing call
// (use marker to bridge between calls)
static constexpr size_t listing_max_entries = 1000;
return 0;
}
+/**
+ * Loops over all olh entries in a bucket shard and finds ones with
+ * exists=false and pending_removal=true. If the pending log is empty on
+ * these entries, they were left behind after the last remaining version of
+ * an object was deleted or after an incomplete upload. This was known to
+ * happen historically due to concurrency conflicts among requests referencing
+ * the same object key. If op_state.fix_index is true, we continue where the
+ * request left off by calling RGWRados::clear_olh. If the pending log is not
+ * empty, we attempt to apply it.
+ */
+static int check_index_olh(rgw::sal::RGWRadosStore* const rados_store,
+ rgw::sal::RGWRadosBucket* const bucket,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const int shard,
+ uint64_t* const count_out,
+ optional_yield y)
+{
+ string marker = BI_OLH_ENTRY_NS_START;
+ bool is_truncated = true;
+ list<rgw_cls_bi_entry> entries;
+
+ RGWObjectCtx obj_ctx(rados_store);
+ RGWRados* store = rados_store->getRados();
+ RGWRados::BucketShard bs(store);
+
+ int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ *count_out = 0;
+ do {
+ entries.clear();
+ ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+ break;
+ }
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ marker = entry.idx;
+ if (entry.type != BIIndexType::OLH) {
+ is_truncated = false;
+ break;
+ }
+ rgw_bucket_olh_entry olh_entry;
+ auto iiter = entry.data.cbegin();
+ try {
+ decode(olh_entry, iiter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, -1) << "ERROR failed to decode olh entry for key: " << entry.idx << dendl;
+ continue;
+ }
+ if (olh_entry.exists || !olh_entry.pending_removal) {
+ continue;
+ }
+ if (op_state.will_fix_index()) {
+ rgw_obj obj(bucket->get_key(), olh_entry.key.name);
+ if (olh_entry.pending_log.empty()) {
+ ret = store->clear_olh(dpp, obj_ctx, obj, bucket->get_info(), olh_entry.tag, olh_entry.epoch, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR failed to clear olh for: " << olh_entry.key.name << " clear_olh(): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ } else {
+ std::unique_ptr<rgw::sal::RGWObject> object = bucket->get_object({olh_entry.key.name});
+ RGWObjState *state;
+ ret = object->get_obj_state(dpp, &obj_ctx, *bucket, &state, y, false);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR failed to get state for: " << olh_entry.key.name << " get_obj_state(): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ ret = store->update_olh(dpp, obj_ctx, state, bucket->get_info(), obj);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR failed to update olh for: " << olh_entry.key.name << " update_olh(): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ }
+ }
+ if (op_state.dump_keys) {
+ flusher.get_formatter()->dump_string("", olh_entry.key.name);
+ if (flusher.get_formatter()->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+ flusher.flush();
+ }
+ }
+ *count_out += 1;
+ }
+ } while (is_truncated);
+ flusher.flush();
+ return 0;
+}
+
+/**
+ * Spawns separate coroutines to check each bucket shard for leftover
+ * olh entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_olh(rgw::sal::RGWRadosStore* const rados_store,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+ ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+ return 0;
+ }
+
+ Formatter* formatter = flusher.get_formatter();
+ if (op_state.dump_keys) {
+ formatter->open_array_section("");
+ }
+
+ const int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
+ std::string verb = op_state.will_fix_index() ? "removed" : "found";
+ uint64_t count_out = 0;
+
+ boost::asio::io_context context;
+ int next_shard = 0;
+
+ const int max_aio = std::max(1, op_state.get_max_aio());
+ rgw::sal::RGWRadosBucket rados_bucket(rados_store, bucket_info);
+
+ for (int i=0; i<max_aio; i++) {
+ spawn::spawn(context, [&](yield_context yield) {
+ while (true) {
+ int shard = next_shard;
+ next_shard += 1;
+ if (shard >= max_shards) {
+ return;
+ }
+ optional_yield y(context, yield);
+ uint64_t shard_count;
+ int r = ::check_index_olh(rados_store, &rados_bucket, dpp, op_state, flusher, shard, &shard_count, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "NOTICE: error processing shard " << shard <<
+ " check_index_olh(): " << r << dendl;
+ }
+ count_out += shard_count;
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+ " entries " << verb << ")" << dendl;
+ }
+ }
+ });
+ }
+ try {
+ context.run();
+ } catch (const std::system_error& e) {
+ return -e.code().value();
+ }
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+ " entries " << verb << ")" << dendl;
+ }
+ if (op_state.dump_keys) {
+ formatter->close_section();
+ flusher.flush();
+ }
+ return 0;
+}
+
+/**
+ * Indicates whether a versioned bucket instance entry is listable in the
+ * index. It does this by looping over all plain entries with prefix equal to
+ * the key name, and checking whether any have an instance ID matching the one
+ * on the specified key. The existence of an instance entry without a matching
+ * plain entry indicates that the object was uploaded successfully, but the
+ * request exited prior to linking the object into the index (via the creation
+ * of a plain entry).
+ */
+static int is_versioned_instance_listable(const DoutPrefixProvider *dpp,
+ RGWRados::BucketShard& bs,
+ const cls_rgw_obj_key& key,
+ bool& listable,
+ optional_yield y)
+{
+ const std::string empty_delim;
+ cls_rgw_obj_key marker;
+ rgw_cls_list_ret result;
+ listable = false;
+
+ do {
+ librados::ObjectReadOperation op;
+ cls_rgw_bucket_list_op(op, marker, key.name, empty_delim, 1000,
+ true, &result);
+ bufferlist ibl;
+ int r = bs.bucket_obj.operate(dpp, &op, &ibl, y);
+ if (r < 0) {
+ return r;
+ }
+
+ for (auto const& entry : result.dir.m) {
+ if (entry.second.key == key) {
+ listable = true;
+ return 0;
+ }
+ marker = entry.second.key;
+ }
+ } while (result.is_truncated);
+ return 0;
+}
+
+/**
+ * Loops over all instance entries in a bucket shard and finds ones with
+ * versioned_epoch=0 and an mtime that is earlier than op_state.min_age
+ * relative to the current time. These entries represent objects that were
+ * uploaded successfully but were not successfully linked into the object
+ * index. As an extra precaution, we also verify that these entries are indeed
+ * non listable (have no corresponding plain entry in the index). We can assume
+ * that clients received an error response for the associated upload requests
+ * since the bucket index linking transaction did not complete. Therefore, if
+ * op_state.fix_index is true, we remove the object that is associated with the
+ * instance entry.
+ */
+static int check_index_unlinked(rgw::sal::RGWRadosStore* const rados_store,
+ rgw::sal::RGWRadosBucket* const bucket,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const int shard,
+ uint64_t* const count_out,
+ optional_yield y)
+{
+ string marker = BI_INSTANCE_ENTRY_NS_START;
+ bool is_truncated = true;
+ list<rgw_cls_bi_entry> entries;
+
+ RGWObjectCtx obj_ctx(rados_store);
+ RGWRados* store = rados_store->getRados();
+ RGWRados::BucketShard bs(store);
+
+ int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ ceph::real_clock::time_point now = ceph::real_clock::now();
+ ceph::real_clock::time_point not_after = now - op_state.min_age;
+
+ *count_out = 0;
+ do {
+ entries.clear();
+ ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+ break;
+ }
+ list<rgw_cls_bi_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_cls_bi_entry& entry = *iter;
+ marker = entry.idx;
+ if (entry.type != BIIndexType::Instance) {
+ is_truncated = false;
+ break;
+ }
+ rgw_bucket_dir_entry dir_entry;
+ auto iiter = entry.data.cbegin();
+ try {
+ decode(dir_entry, iiter);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, -1) << "ERROR failed to decode instance entry for key: " <<
+ entry.idx << dendl;
+ continue;
+ }
+ if (dir_entry.versioned_epoch != 0 || dir_entry.meta.mtime > not_after) {
+ continue;
+ }
+ bool listable;
+ ret = is_versioned_instance_listable(dpp, bs, dir_entry.key, listable, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR is_versioned_instance_listable(key='" <<
+ dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ if (listable) {
+ continue;
+ }
+ if (op_state.will_fix_index()) {
+ rgw_obj_key key(dir_entry.key.name, dir_entry.key.instance);
+ ret = rgw_remove_object(dpp, rados_store, bucket->get_info(), bucket->get_key(), key);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR rgw_remove_obj(key='" <<
+ dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+ continue;
+ }
+ }
+ if (op_state.dump_keys) {
+ Formatter* const formatter = flusher.get_formatter();
+ formatter->open_object_section("object_instance");
+ formatter->dump_string("name", dir_entry.key.name);
+ formatter->dump_string("instance", dir_entry.key.instance);
+ formatter->close_section();
+ if (formatter->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+ flusher.flush();
+ }
+ }
+ *count_out += 1;
+ }
+ } while (is_truncated);
+ flusher.flush();
+ return 0;
+}
+
+/**
+ * Spawns separate coroutines to check each bucket shard for unlinked
+ * instance entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_unlinked(rgw::sal::RGWRadosStore* const rados_store,
+ const DoutPrefixProvider *dpp,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher)
+{
+ if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+ ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+ return 0;
+ }
+
+ Formatter* formatter = flusher.get_formatter();
+ if (op_state.dump_keys) {
+ formatter->open_array_section("");
+ }
+
+ const int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
+ std::string verb = op_state.will_fix_index() ? "removed" : "found";
+ uint64_t count_out = 0;
+
+ int max_aio = std::max(1, op_state.get_max_aio());
+ int next_shard = 0;
+ boost::asio::io_context context;
+ rgw::sal::RGWRadosBucket rados_bucket(rados_store, bucket_info);
+
+ for (int i=0; i<max_aio; i++) {
+ spawn::spawn(context, [&](yield_context yield) {
+ while (true) {
+ int shard = next_shard;
+ next_shard += 1;
+ if (shard >= max_shards) {
+ return;
+ }
+ uint64_t shard_count;
+ optional_yield y {context, yield};
+ int r = ::check_index_unlinked(rados_store, &rados_bucket, dpp, op_state, flusher, shard, &shard_count, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard <<
+ " check_index_unlinked(): " << r << dendl;
+ }
+ count_out += shard_count;
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+ " entries " << verb << ")" << dendl;
+ }
+ }
+ });
+ }
+ try {
+ context.run();
+ } catch (const std::system_error& e) {
+ return -e.code().value();
+ }
+
+ if (!op_state.hide_progress) {
+ ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+ " entries " << verb << ")" << dendl;
+ }
+ if (op_state.dump_keys) {
+ formatter->close_section();
+ flusher.flush();
+ }
+ return 0;
+}
int RGWBucket::check_index(const DoutPrefixProvider *dpp,
RGWBucketAdminOpState& op_state,
}
+int RGWBucketAdminOp::check_index_olh(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp)
+{
+ RGWBucket bucket;
+ int ret = bucket.init(store, op_state, null_yield, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+ return ret;
+ }
+ flusher.start(0);
+ ret = bucket.check_index_olh(store, dpp, op_state, flusher);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "check_index_olh(): " << ret << dendl;
+ return ret;
+ }
+ flusher.flush();
+ return 0;
+}
+
+int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RGWRadosStore* store,
+ RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher,
+ const DoutPrefixProvider *dpp)
+{
+ flusher.start(0);
+ RGWBucket bucket;
+ int ret = bucket.init(store, op_state, null_yield, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+ return ret;
+ }
+ ret = bucket.check_index_unlinked(store, dpp, op_state, flusher);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "check_index_unlinked(): " << ret << dendl;
+ return ret;
+ }
+ flusher.flush();
+ return 0;
+}
+
int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
{
bool delete_child_objects;
bool bucket_stored;
bool sync_bucket;
+ bool dump_keys;
+ bool hide_progress;
int max_aio = 0;
+ ceph::timespan min_age = std::chrono::hours::zero();
rgw_bucket bucket;
void set_check_objects(bool value) { check_objects = value; }
void set_fix_index(bool value) { fix_index = value; }
void set_delete_children(bool value) { delete_child_objects = value; }
+ void set_hide_progress(bool value) { hide_progress = value; }
+ void set_dump_keys(bool value) { dump_keys = value; }
void set_max_aio(int value) { max_aio = value; }
+ void set_min_age(ceph::timespan value) { min_age = value; }
void set_user_id(const rgw_user& user_id) {
if (!user_id.empty())
RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false),
fix_index(false), delete_child_objects(false),
- bucket_stored(false), sync_bucket(true) {}
+ bucket_stored(false), sync_bucket(true),
+ dump_keys(false), hide_progress(false) {}
};
/*
RGWFormatterFlusher& flusher,
optional_yield y,
std::string *err_msg = NULL);
+ int check_index_olh(rgw::sal::RGWRadosStore* rados_store, const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher);
+ int check_index_unlinked(rgw::sal::RGWRadosStore* rados_store, const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher);
int check_index(const DoutPrefixProvider *dpp,
RGWBucketAdminOpState& op_state,
static int check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp);
+ static int check_index_olh(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp);
+ static int check_index_unlinked(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state,
+ RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp);
static int remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, optional_yield y,
const DoutPrefixProvider *dpp, bool bypass_gc = false, bool keep_index_consistent = true);
ldpp_dout(dpp, 0) << "ERROR: could not clear olh, r=" << r << dendl;
return r;
}
- }
-
- r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
- if (r < 0 && r != -ECANCELED) {
- ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
- return r;
+ } else {
+ r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
+ if (r < 0 && r != -ECANCELED) {
+ ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
+ return r;
+ }
}
return 0;
bucket unlink unlink bucket from specified user
bucket stats returns bucket statistics
bucket rm remove bucket
- bucket check check bucket index
+ bucket check check bucket index by verifying size and object count stats
+ bucket check olh check for olh index entries and objects that are pending removal
+ bucket check unlinked check for object versions that are not visible in a bucket listing
bucket chown link bucket to specified user and update its object ACLs
bucket reshard reshard bucket
bucket rewrite rewrite all objects in the specified bucket
--package name of the lua package that should be added/removed to/from the allowlist
--allow-compilation package is allowed to compile C code as part of its installation
+ Bucket check olh/unlinked options:
+ --min-age-hours minimum age of unlinked objects to consider for bucket check unlinked (default: 1)
+ --dump-keys when specified, all keys identified as problematic are printed to stdout
+ --hide-progress when specified, per-shard progress details are not printed to stderr
+
radoslist options:
--rgw-obj-fs the field separator that will separate the rados
object name from the rgw object name;
--setgroup GROUP set gid to group or gid
--version show version and quit
-
-