]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add radosgw-admin bucket check olh/unlinked commands
authorCory Snyder <csnyder@1111systems.com>
Thu, 25 May 2023 16:59:04 +0000 (16:59 +0000)
committerCory Snyder <csnyder@1111systems.com>
Wed, 4 Oct 2023 08:51:38 +0000 (08:51 +0000)
Adds commands to radosgw-admin for checking for and fixing leftover
entries in the bucket index (and associated RADOS objects).

Fixes: https://tracker.ceph.com/issues/62075
Signed-off-by: Cory Snyder <csnyder@1111systems.com>
(cherry picked from commit 9b2042a0750109f575b02c9c4b127dd941994662)

Conflicts:
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_bucket.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/rgw_admin.cc

Cherry-pick notes:
- conflicts due to rados files being moved into driver directory
- conflicts due to rename of rados classes

qa/suites/rgw/verify/tasks/bucket-check.yaml [new file with mode: 0644]
qa/workunits/rgw/run-bucket-check.sh [new file with mode: 0755]
qa/workunits/rgw/test_rgw_bucket_check.py [new file with mode: 0755]
qa/workunits/rgw/test_rgw_reshard.py
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_rados.cc
src/test/cli/radosgw-admin/help.t

diff --git a/qa/suites/rgw/verify/tasks/bucket-check.yaml b/qa/suites/rgw/verify/tasks/bucket-check.yaml
new file mode 100644 (file)
index 0000000..4955d41
--- /dev/null
@@ -0,0 +1,5 @@
+tasks:
+- workunit:
+    clients:
+      client.0:
+        - rgw/run-bucket-check.sh
diff --git a/qa/workunits/rgw/run-bucket-check.sh b/qa/workunits/rgw/run-bucket-check.sh
new file mode 100755 (executable)
index 0000000..85e02db
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+set -ex
+
+# assume working ceph environment (radosgw-admin in path) and rgw on localhost:80
+# localhost::443 for ssl
+
+mydir=`dirname $0`
+
+python3 -m venv $mydir
+source $mydir/bin/activate
+pip install pip --upgrade
+pip install boto3
+
+## run test
+$mydir/bin/python3 $mydir/test_rgw_bucket_check.py
+
+deactivate
+echo OK.
+
diff --git a/qa/workunits/rgw/test_rgw_bucket_check.py b/qa/workunits/rgw/test_rgw_bucket_check.py
new file mode 100755 (executable)
index 0000000..4b4f776
--- /dev/null
@@ -0,0 +1,209 @@
+#!/usr/bin/env python3
+
+import logging as log
+import json
+import random
+import botocore
+from common import exec_cmd, create_user, boto_connect
+from time import sleep
+from botocore.config import Config
+
+"""
+Tests behavior of radosgw-admin bucket check commands. 
+"""
+# The test cases in this file have been annotated for inventory.
+# To extract the inventory (in csv format) use the command:
+#
+#   grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //'
+#
+#
+
+""" Constants """
+USER = 'check-tester'
+DISPLAY_NAME = 'Check Testing'
+ACCESS_KEY = 'OJODXSLNX4LUNHQG99PA'
+SECRET_KEY = '3l6ffld34qaymfomuh832j94738aie2x4p2o8h6n'
+BUCKET_NAME = 'check-bucket'
+
+def put_objects(bucket, key_list):
+    objs = []
+    for key in key_list:
+        o = bucket.put_object(Key=key, Body=b"some_data")
+        objs.append((o.key, o.version_id))
+    return objs
+
+def create_unlinked_objects(conn, bucket, key_list):
+    # creates an unlinked/unlistable object for each key in key_list
+    
+    object_versions = []
+    try:
+        exec_cmd('ceph config set client rgw_debug_inject_set_olh_err 2')
+        exec_cmd('ceph config set client rgw_debug_inject_olh_cancel_modification_err true')
+        sleep(1)
+        for key in key_list:
+            tag = str(random.randint(0, 1_000_000))
+            try:
+                bucket.put_object(Key=key, Body=b"some_data", Metadata = {
+                    'tag': tag,
+                })
+            except Exception as e:
+                log.debug(e)
+            out = exec_cmd(f'radosgw-admin bi list --bucket {bucket.name} --object {key}')
+            instance_entries = filter(
+                lambda x: x['type'] == 'instance',
+                json.loads(out.replace(b'\x80', b'0x80')))
+            found = False
+            for ie in instance_entries:
+                instance_id = ie['entry']['instance']
+                ov = conn.ObjectVersion(bucket.name, key, instance_id).head()
+                if ov['Metadata'] and ov['Metadata']['tag'] == tag:
+                    object_versions.append((key, instance_id))
+                    found = True
+                    break
+            if not found:
+                raise Exception(f'failed to create unlinked object for key={key}')
+    finally:
+        exec_cmd('ceph config rm client rgw_debug_inject_set_olh_err')
+        exec_cmd('ceph config rm client rgw_debug_inject_olh_cancel_modification_err')
+    return object_versions
+
+def main():
+    """
+    execute bucket check commands
+    """
+    create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)
+
+    connection = boto_connect(ACCESS_KEY, SECRET_KEY, Config(retries = {
+        'total_max_attempts': 1,
+    }))
+
+    # pre-test cleanup
+    try:
+        bucket = connection.Bucket(BUCKET_NAME)
+        bucket.objects.all().delete()
+        bucket.object_versions.all().delete()
+        bucket.delete()
+    except botocore.exceptions.ClientError as e:
+        if not e.response['Error']['Code'] == 'NoSuchBucket':
+            raise
+
+    bucket = connection.create_bucket(Bucket=BUCKET_NAME)
+
+    null_version_keys = ['a', 'z']
+    null_version_objs = put_objects(bucket, null_version_keys)
+
+    connection.BucketVersioning(BUCKET_NAME).enable()
+
+    ok_keys = ['a', 'b', 'c', 'd']
+    unlinked_keys = ['c', 'd', 'e', 'f']
+    ok_objs = put_objects(bucket, ok_keys)
+
+    # TESTCASE 'bucket check unlinked does not report normal entries'
+    log.debug('TEST: bucket check unlinked does not report normal entries\n')
+    out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == 0
+
+    unlinked_objs = create_unlinked_objects(connection, bucket, unlinked_keys)
+    
+    # TESTCASE 'bucket check unlinked finds unlistable entries'
+    log.debug('TEST: bucket check unlinked finds unlistable entries\n')
+    out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == len(unlinked_keys)
+
+    # TESTCASE 'unlinked entries are not listable'
+    log.debug('TEST: unlinked entries are not listable\n')
+    for ov in bucket.object_versions.all():
+        assert (ov.key, ov.version_id) not in unlinked_objs, f'object "{ov.key}:{ov.version_id}" was found in bucket listing'
+
+    # TESTCASE 'GET returns 404 for unlinked entry keys that have no other versions'
+    log.debug('TEST: GET returns 404 for unlinked entry keys that have no other versions\n')
+    noent_keys = set(unlinked_keys) - set(ok_keys)
+    for key in noent_keys:
+        try:
+            bucket.Object(key).get()
+            assert False, 'GET did not return 404 for key={key} with no prior successful PUT'
+        except botocore.exceptions.ClientError as e:
+            assert e.response['ResponseMetadata']['HTTPStatusCode'] == 404
+            
+    # TESTCASE 'bucket check unlinked fixes unlistable entries'
+    log.debug('TEST: bucket check unlinked fixes unlistable entries\n')
+    out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --fix --min-age-hours 0 --rgw-olh-pending-timeout-sec 0 --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == len(unlinked_keys)
+    for o in unlinked_objs:
+        try:
+            connection.ObjectVersion(bucket.name, o[0], o[1]).head()
+            assert False, f'head for unlistable object {o[0]}:{o[1]} succeeded after fix'
+        except botocore.exceptions.ClientError as e:
+            assert e.response['ResponseMetadata']['HTTPStatusCode'] == 404
+
+    # TESTCASE 'bucket check unlinked fix does not affect normal entries'
+    log.debug('TEST: bucket check unlinked does not affect normal entries\n')
+    all_listable = list(bucket.object_versions.all())
+    assert len(all_listable) == len(ok_keys) + len(null_version_keys), 'some normal objects were not accounted for in object listing after unlinked fix'
+    for o in ok_objs:
+        assert o in map(lambda x: (x.key, x.version_id), all_listable), "normal object not listable after fix"
+        connection.ObjectVersion(bucket.name, o[0], o[1]).head()
+
+    # TESTCASE 'bucket check unlinked does not find new unlistable entries after fix'
+    log.debug('TEST: bucket check unlinked does not find new unlistable entries after fix\n')
+    out = exec_cmd(f'radosgw-admin bucket check unlinked --bucket {BUCKET_NAME} --min-age-hours 0 --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == 0
+    
+    # for this set of keys we can produce leftover OLH object/entries by
+    # deleting the normal object instance since we should already have a leftover
+    # pending xattr on the OLH object due to the errors associated with the 
+    # prior unlinked entries that were created for the same keys 
+    leftover_pending_xattr_keys = set(ok_keys).intersection(unlinked_keys)
+    objs_to_delete = filter(lambda x: x[0] in leftover_pending_xattr_keys, ok_objs)
+        
+    for o in objs_to_delete:
+        connection.ObjectVersion(bucket.name, o[0], o[1]).delete()
+
+    for key in leftover_pending_xattr_keys:
+        out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}')
+        idx_entries = json.loads(out.replace(b'\x80', b'0x80'))
+        assert len(idx_entries) > 0, 'failed to create leftover OLH entries for key {key}'
+        
+    # TESTCASE 'bucket check olh finds leftover OLH entries'
+    log.debug('TEST: bucket check olh finds leftover OLH entries\n')
+    out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == len(leftover_pending_xattr_keys)
+
+    # TESTCASE 'bucket check olh fixes leftover OLH entries'
+    log.debug('TEST: bucket check olh fixes leftover OLH entries\n')
+    out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --fix --rgw-olh-pending-timeout-sec 0 --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == len(leftover_pending_xattr_keys)
+    
+    for key in leftover_pending_xattr_keys:
+        out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}')
+        idx_entries = json.loads(out.replace(b'\x80', b'0x80'))
+        assert len(idx_entries) == 0, 'index entries still exist for key={key} after olh fix'
+
+    # TESTCASE 'bucket check olh does not find new leftover OLH entries after fix'
+    log.debug('TEST: bucket check olh does not find new leftover OLH entries after fix\n')
+    out = exec_cmd(f'radosgw-admin bucket check olh --bucket {BUCKET_NAME} --dump-keys')
+    json_out = json.loads(out)
+    assert len(json_out) == 0
+
+    # TESTCASE 'bucket check fixes do not affect null version objects'
+    log.debug('TEST: verify that bucket check fixes do not affect null version objects\n')
+    for o in null_version_objs:
+        connection.ObjectVersion(bucket.name, o[0], 'null').head()
+        
+    all_versions = list(map(lambda x: (x.key, x.version_id), bucket.object_versions.all()))
+    for key in null_version_keys:
+        assert (key, 'null') in all_versions
+
+    # Clean up
+    log.debug("Deleting bucket {}".format(BUCKET_NAME))
+    bucket.object_versions.all().delete()
+    bucket.delete()
+
+main()
+log.info("Completed bucket check tests")
index 54fddb469e9f5a4718b79260d5fc3517613abaad..842f70da14b59d813d8677c565b81b22258e068f 100755 (executable)
@@ -75,7 +75,7 @@ def main():
     execute manual and dynamic resharding commands
     """
     create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)
-    
+
     connection = boto_connect(ACCESS_KEY, SECRET_KEY)
 
     # create a bucket
index 0c0bd340555347fd1cfd3bd6788b6a1c11dedb82..862daea15c85a0a72ee4f24d6addcfa5cc10d94f 100644 (file)
@@ -6,6 +6,7 @@
 #include <sstream>
 #include <string>
 
+#include <boost/asio.hpp>
 #include <boost/optional.hpp>
 
 extern "C" {
@@ -131,7 +132,9 @@ void usage()
   cout << "  bucket unlink              unlink bucket from specified user\n";
   cout << "  bucket stats               returns bucket statistics\n";
   cout << "  bucket rm                  remove bucket\n";
-  cout << "  bucket check               check bucket index\n";
+  cout << "  bucket check               check bucket index by verifying size and object count stats\n";
+  cout << "  bucket check olh           check for olh index entries and objects that are pending removal\n";
+  cout << "  bucket check unlinked      check for object versions that are not visible in a bucket listing \n";
   cout << "  bucket chown               link bucket to specified user and update its object ACLs\n";
   cout << "  bucket reshard             reshard bucket\n";
   cout << "  bucket rewrite             rewrite all objects in the specified bucket\n";
@@ -439,6 +442,10 @@ void usage()
   cout << "   --context                 context in which the script runs. one of: preRequest, postRequest\n";
   cout << "   --package                 name of the lua package that should be added/removed to/from the allowlist\n";
   cout << "   --allow-compilation       package is allowed to compile C code as part of its installation\n";
+  cout << "\nBucket check olh/unlinked options:\n";
+  cout << "   --min-age-hours           minimum age of unlinked objects to consider for bucket check unlinked (default: 1)\n";
+  cout << "   --dump-keys               when specified, all keys identified as problematic are printed to stdout\n";
+  cout << "   --hide-progress           when specified, per-shard progress details are not printed to stderr\n";
   cout << "\nradoslist options:\n";
   cout << "   --rgw-obj-fs              the field separator that will separate the rados\n";
   cout << "                             object name from the rgw object name;\n";
@@ -606,6 +613,8 @@ enum class OPT {
   BUCKET_UNLINK,
   BUCKET_STATS,
   BUCKET_CHECK,
+  BUCKET_CHECK_OLH,
+  BUCKET_CHECK_UNLINKED,
   BUCKET_SYNC_CHECKPOINT,
   BUCKET_SYNC_INFO,
   BUCKET_SYNC_STATUS,
@@ -812,6 +821,8 @@ static SimpleCmd::Commands all_cmds = {
   { "bucket unlink", OPT::BUCKET_UNLINK },
   { "bucket stats", OPT::BUCKET_STATS },
   { "bucket check", OPT::BUCKET_CHECK },
+  { "bucket check olh", OPT::BUCKET_CHECK_OLH },
+  { "bucket check unlinked", OPT::BUCKET_CHECK_UNLINKED },
   { "bucket sync checkpoint", OPT::BUCKET_SYNC_CHECKPOINT },
   { "bucket sync info", OPT::BUCKET_SYNC_INFO },
   { "bucket sync status", OPT::BUCKET_SYNC_STATUS },
@@ -3161,6 +3172,9 @@ int main(int argc, const char **argv)
   bool num_shards_specified = false;
   std::optional<int> bucket_index_max_shards;
   int max_concurrent_ios = 32;
+  ceph::timespan min_age = std::chrono::hours(1);
+  bool hide_progress = false;
+  bool dump_keys = false;
   uint64_t orphan_stale_secs = (24 * 3600);
   int detail = false;
 
@@ -3380,6 +3394,8 @@ int main(int argc, const char **argv)
         cerr << "ERROR: failed to parse max concurrent ios: " << err << std::endl;
         return EINVAL;
       }
+    } else if (ceph_argparse_witharg(args, i, &val, "--min-age-hours", (char*)NULL)) {
+      min_age = std::chrono::hours(atoi(val.c_str()));
     } else if (ceph_argparse_witharg(args, i, &val, "--orphan-stale-secs", (char*)NULL)) {
       orphan_stale_secs = (uint64_t)strict_strtoll(val.c_str(), 10, &err);
       if (!err.empty()) {
@@ -3456,6 +3472,10 @@ int main(int argc, const char **argv)
      // do nothing
     } else if (ceph_argparse_binary_flag(args, i, &inconsistent_index, NULL, "--inconsistent-index", (char*)NULL)) {
      // do nothing
+    } else if (ceph_argparse_flag(args, i, "--hide-progress", (char*)NULL)) {
+      hide_progress = true;
+    } else if (ceph_argparse_flag(args, i, "--dump-keys", (char*)NULL)) {
+      dump_keys = true;
     } else if (ceph_argparse_witharg(args, i, &val, "--caps", (char*)NULL)) {
       caps = val;
     } else if (ceph_argparse_witharg(args, i, &val, "-i", "--infile", (char*)NULL)) {
@@ -5598,6 +5618,9 @@ int main(int argc, const char **argv)
   bucket_op.set_delete_children(delete_child_objects);
   bucket_op.set_fix_index(fix);
   bucket_op.set_max_aio(max_concurrent_ios);
+  bucket_op.set_min_age(min_age);
+  bucket_op.set_dump_keys(dump_keys);
+  bucket_op.set_hide_progress(hide_progress);
 
   // required to gather errors from operations
   std::string err_msg;
@@ -7246,6 +7269,14 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::BUCKET_CHECK_OLH) {
+    RGWBucketAdminOp::check_index_olh(store, bucket_op, f, dpp());
+  }
+
+  if (opt_cmd == OPT::BUCKET_CHECK_UNLINKED) {
+    RGWBucketAdminOp::check_index_unlinked(store, bucket_op, f, dpp());
+  }
+
   if (opt_cmd == OPT::BUCKET_RM) {
     if (!inconsistent_index) {
       RGWBucketAdminOp::remove_bucket(store, bucket_op, null_yield, dpp(), bypass_gc, true);
index 3d680514af112830d40e4ccbdf9aa92421086319..f73e8c4489d174891c56b1dcf0dabd5e76308485 100644 (file)
 
 #define BUCKET_TAG_TIMEOUT 30
 
+// these values are copied from cls/rgw/cls_rgw.cc
+static const string BI_OLH_ENTRY_NS_START = "\x80" "1001_";
+static const string BI_INSTANCE_ENTRY_NS_START = "\x80" "1000_";
+
+// number of characters that we should allow to be buffered by the formatter
+// before flushing (used by index check methods with dump_keys=true)
+static constexpr int FORMATTER_LEN_FLUSH_THRESHOLD = 4 * 1024 * 1024;
+
+
 // default number of entries to list with each bucket listing call
 // (use marker to bridge between calls)
 static constexpr size_t listing_max_entries = 1000;
@@ -996,6 +1005,380 @@ int RGWBucket::check_object_index(const DoutPrefixProvider *dpp,
   return 0;
 }
 
+/**
+ * Loops over all olh entries in a bucket shard and finds ones with
+ * exists=false and pending_removal=true. If the pending log is empty on
+ * these entries, they were left behind after the last remaining version of
+ * an object was deleted or after an incomplete upload. This was known to
+ * happen historically due to concurrency conflicts among requests referencing
+ * the same object key. If op_state.fix_index is true, we continue where the
+ * request left off by calling RGWRados::clear_olh. If the pending log is not
+ * empty, we attempt to apply it.
+ */
+static int check_index_olh(rgw::sal::RGWRadosStore* const rados_store,
+                           rgw::sal::RGWRadosBucket* const bucket,
+                           const DoutPrefixProvider *dpp,
+                           RGWBucketAdminOpState& op_state,
+                           RGWFormatterFlusher& flusher,
+                           const int shard, 
+                           uint64_t* const count_out,
+                           optional_yield y)
+{
+  string marker = BI_OLH_ENTRY_NS_START;
+  bool is_truncated = true;
+  list<rgw_cls_bi_entry> entries;
+
+  RGWObjectCtx obj_ctx(rados_store);
+  RGWRados* store = rados_store->getRados();
+  RGWRados::BucketShard bs(store);
+
+  int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+
+  *count_out = 0;
+  do {
+    entries.clear();
+    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+    if (ret < 0) {
+      ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+      break;
+    }
+    list<rgw_cls_bi_entry>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      rgw_cls_bi_entry& entry = *iter;
+      marker = entry.idx;
+      if (entry.type != BIIndexType::OLH) {
+        is_truncated = false;
+        break;
+      }
+      rgw_bucket_olh_entry olh_entry;
+      auto iiter = entry.data.cbegin();
+      try {
+        decode(olh_entry, iiter);
+      } catch (buffer::error& err) {
+        ldpp_dout(dpp, -1) << "ERROR failed to decode olh entry for key: " << entry.idx << dendl;
+        continue;
+      }
+      if (olh_entry.exists || !olh_entry.pending_removal) {
+        continue;
+      }
+      if (op_state.will_fix_index()) {
+        rgw_obj obj(bucket->get_key(), olh_entry.key.name);
+        if (olh_entry.pending_log.empty()) {
+          ret = store->clear_olh(dpp, obj_ctx, obj, bucket->get_info(), olh_entry.tag, olh_entry.epoch, y);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "ERROR failed to clear olh for: " << olh_entry.key.name << " clear_olh(): " << cpp_strerror(-ret) << dendl;
+            continue;
+          }
+        } else {
+          std::unique_ptr<rgw::sal::RGWObject> object = bucket->get_object({olh_entry.key.name});
+          RGWObjState *state;
+          ret = object->get_obj_state(dpp, &obj_ctx, *bucket, &state, y, false);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "ERROR failed to get state for: " << olh_entry.key.name << " get_obj_state(): " << cpp_strerror(-ret) << dendl;
+            continue;
+          }
+          ret = store->update_olh(dpp, obj_ctx, state, bucket->get_info(), obj);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "ERROR failed to update olh for: " << olh_entry.key.name << " update_olh(): " << cpp_strerror(-ret) << dendl;
+            continue;
+          }
+        }
+      }
+      if (op_state.dump_keys) {
+        flusher.get_formatter()->dump_string("", olh_entry.key.name);
+        if (flusher.get_formatter()->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+          flusher.flush();
+        }
+      }
+      *count_out += 1;
+    }
+  } while (is_truncated);
+  flusher.flush();
+  return 0;
+}
+
+/**
+ * Spawns separate coroutines to check each bucket shard for leftover
+ * olh entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_olh(rgw::sal::RGWRadosStore* const rados_store,
+                               const DoutPrefixProvider *dpp,
+                               RGWBucketAdminOpState& op_state,
+                               RGWFormatterFlusher& flusher)
+{
+  if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+    ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+    return 0;
+  }
+
+  Formatter* formatter = flusher.get_formatter();
+  if (op_state.dump_keys) {
+    formatter->open_array_section("");
+  }
+
+  const int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
+  std::string verb = op_state.will_fix_index() ? "removed" : "found";
+  uint64_t count_out = 0;
+
+  boost::asio::io_context context;
+  int next_shard = 0;
+
+  const int max_aio = std::max(1, op_state.get_max_aio());
+  rgw::sal::RGWRadosBucket rados_bucket(rados_store, bucket_info);
+
+  for (int i=0; i<max_aio; i++) {
+    spawn::spawn(context, [&](yield_context yield) {
+      while (true) {
+        int shard = next_shard;
+        next_shard += 1;
+        if (shard >= max_shards) {
+          return;
+        }
+        optional_yield y(context, yield);
+        uint64_t shard_count;
+        int r = ::check_index_olh(rados_store, &rados_bucket, dpp, op_state, flusher, shard, &shard_count, y);
+        if (r < 0) {
+          ldpp_dout(dpp, -1) << "NOTICE: error processing shard " << shard << 
+            " check_index_olh(): " << r << dendl;
+        }
+        count_out += shard_count;
+        if (!op_state.hide_progress) {
+          ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+            " entries " << verb << ")" << dendl;
+        }
+      }
+    });
+  }
+  try {
+    context.run();
+  } catch (const std::system_error& e) {
+    return -e.code().value();
+  }
+  if (!op_state.hide_progress) {
+    ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+      " entries " << verb << ")" << dendl;
+  }
+  if (op_state.dump_keys) {
+    formatter->close_section();
+    flusher.flush();
+  }
+  return 0;
+}
+
+/**
+ * Indicates whether a versioned bucket instance entry is listable in the
+ * index. It does this by looping over all plain entries with prefix equal to
+ * the key name, and checking whether any have an instance ID matching the one
+ * on the specified key. The existence of an instance entry without a matching
+ * plain entry indicates that the object was uploaded successfully, but the
+ * request exited prior to linking the object into the index (via the creation
+ * of a plain entry).
+ */
+static int is_versioned_instance_listable(const DoutPrefixProvider *dpp,
+                                          RGWRados::BucketShard& bs,
+                                          const cls_rgw_obj_key& key,
+                                          bool& listable,
+                                          optional_yield y)
+{
+  const std::string empty_delim;
+  cls_rgw_obj_key marker;
+  rgw_cls_list_ret result;
+  listable = false;
+
+  do {
+    librados::ObjectReadOperation op;
+    cls_rgw_bucket_list_op(op, marker, key.name, empty_delim, 1000,
+                           true, &result);
+    bufferlist ibl;
+    int r = bs.bucket_obj.operate(dpp, &op, &ibl, y);
+    if (r < 0) {
+      return r;
+    }
+    
+    for (auto const& entry : result.dir.m) {
+      if (entry.second.key == key) {
+        listable = true;
+        return 0;
+      }
+      marker = entry.second.key;
+    }
+  } while (result.is_truncated);
+  return 0;
+}
+
+/**
+ * Loops over all instance entries in a bucket shard and finds ones with
+ * versioned_epoch=0 and an mtime that is earlier than op_state.min_age
+ * relative to the current time. These entries represent objects that were
+ * uploaded successfully but were not successfully linked into the object
+ * index. As an extra precaution, we also verify that these entries are indeed
+ * non listable (have no corresponding plain entry in the index). We can assume
+ * that clients received an error response for the associated upload requests
+ * since the bucket index linking transaction did not complete. Therefore, if
+ * op_state.fix_index is true, we remove the object that is associated with the
+ * instance entry.
+ */
+static int check_index_unlinked(rgw::sal::RGWRadosStore* const rados_store,
+                                rgw::sal::RGWRadosBucket* const bucket,
+                                const DoutPrefixProvider *dpp,
+                                RGWBucketAdminOpState& op_state,
+                                RGWFormatterFlusher& flusher,
+                                const int shard, 
+                                uint64_t* const count_out,
+                                optional_yield y)
+{
+  string marker = BI_INSTANCE_ENTRY_NS_START;
+  bool is_truncated = true;
+  list<rgw_cls_bi_entry> entries;
+
+  RGWObjectCtx obj_ctx(rados_store);
+  RGWRados* store = rados_store->getRados();
+  RGWRados::BucketShard bs(store);
+
+  int ret = bs.init(dpp, bucket->get_info(), bucket->get_info().layout.current_index, shard);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR bs.init(bucket=" << bucket << "): " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+
+  ceph::real_clock::time_point now = ceph::real_clock::now();
+  ceph::real_clock::time_point not_after = now - op_state.min_age;
+
+  *count_out = 0;
+  do {
+    entries.clear();
+    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated);
+    if (ret < 0) {
+      ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
+      break;
+    }
+    list<rgw_cls_bi_entry>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      rgw_cls_bi_entry& entry = *iter;
+      marker = entry.idx;
+      if (entry.type != BIIndexType::Instance) {
+        is_truncated = false;
+        break;
+      }
+      rgw_bucket_dir_entry dir_entry;
+      auto iiter = entry.data.cbegin();
+      try {
+        decode(dir_entry, iiter);
+      } catch (buffer::error& err) {
+        ldpp_dout(dpp, -1) << "ERROR failed to decode instance entry for key: " <<
+          entry.idx << dendl;
+        continue;
+      }
+      if (dir_entry.versioned_epoch != 0 || dir_entry.meta.mtime > not_after) {
+        continue;
+      }
+      bool listable;
+      ret = is_versioned_instance_listable(dpp, bs, dir_entry.key, listable, y);
+      if (ret < 0) {
+        ldpp_dout(dpp, -1) << "ERROR is_versioned_instance_listable(key='" <<
+          dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+        continue;
+      }
+      if (listable) {
+        continue;
+      }
+      if (op_state.will_fix_index()) {
+        rgw_obj_key key(dir_entry.key.name, dir_entry.key.instance);
+        ret = rgw_remove_object(dpp, rados_store, bucket->get_info(), bucket->get_key(), key);
+        if (ret < 0) {
+          ldpp_dout(dpp, -1) << "ERROR rgw_remove_obj(key='" <<
+            dir_entry.key << "'): " << cpp_strerror(-ret) << dendl;
+          continue;
+        }
+      }
+      if (op_state.dump_keys) {
+        Formatter* const formatter = flusher.get_formatter();
+        formatter->open_object_section("object_instance");
+        formatter->dump_string("name", dir_entry.key.name);
+        formatter->dump_string("instance", dir_entry.key.instance);
+        formatter->close_section();
+        if (formatter->get_len() > FORMATTER_LEN_FLUSH_THRESHOLD) {
+          flusher.flush();
+        }
+      }
+      *count_out += 1;
+    }
+  } while (is_truncated);
+  flusher.flush();
+  return 0;
+}
+
+/**
+ * Spawns separate coroutines to check each bucket shard for unlinked
+ * instance entries (and remove them if op_state.fix_index is true).
+ */
+int RGWBucket::check_index_unlinked(rgw::sal::RGWRadosStore* const rados_store,
+                                    const DoutPrefixProvider *dpp,
+                                    RGWBucketAdminOpState& op_state,
+                                    RGWFormatterFlusher& flusher)
+{
+  if ((bucket_info.versioning_status() & BUCKET_VERSIONED) == 0) {
+    ldpp_dout(dpp, 0) << "WARNING: this command is only applicable to versioned buckets" << dendl;
+    return 0;
+  }
+
+  Formatter* formatter = flusher.get_formatter();
+  if (op_state.dump_keys) {
+    formatter->open_array_section("");
+  }
+
+  const int max_shards = (bucket_info.layout.current_index.layout.normal.num_shards > 0 ? bucket_info.layout.current_index.layout.normal.num_shards : 1);
+  std::string verb = op_state.will_fix_index() ? "removed" : "found";
+  uint64_t count_out = 0;
+
+  int max_aio = std::max(1, op_state.get_max_aio());
+  int next_shard = 0;
+  boost::asio::io_context context;
+  rgw::sal::RGWRadosBucket rados_bucket(rados_store, bucket_info);
+
+  for (int i=0; i<max_aio; i++) {
+    spawn::spawn(context, [&](yield_context yield) {
+      while (true) {
+        int shard = next_shard;
+        next_shard += 1;
+        if (shard >= max_shards) {
+          return;
+        }
+        uint64_t shard_count;
+        optional_yield y {context, yield};
+        int r = ::check_index_unlinked(rados_store, &rados_bucket, dpp, op_state, flusher, shard, &shard_count, y);
+        if (r < 0) {
+          ldpp_dout(dpp, -1) << "ERROR: error processing shard " << shard << 
+            " check_index_unlinked(): " << r << dendl;
+        }
+        count_out += shard_count;
+        if (!op_state.hide_progress) {
+          ldpp_dout(dpp, 1) << "NOTICE: finished shard " << shard << " (" << shard_count <<
+            " entries " << verb << ")" << dendl;
+        }
+      }
+    });
+  }
+  try {
+    context.run();
+  } catch (const std::system_error& e) {
+    return -e.code().value();
+  }
+
+  if (!op_state.hide_progress) {
+    ldpp_dout(dpp, 1) << "NOTICE: finished all shards (" << count_out <<
+      " entries " << verb << ")" << dendl;
+  }
+  if (op_state.dump_keys) {
+    formatter->close_section();
+    flusher.flush();
+  }
+  return 0;
+}
 
 int RGWBucket::check_index(const DoutPrefixProvider *dpp,
         RGWBucketAdminOpState& op_state,
@@ -1231,6 +1614,46 @@ int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpStat
 
 }
 
+int RGWBucketAdminOp::check_index_olh(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state,
+                  RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp)
+{
+  RGWBucket bucket;
+  int ret = bucket.init(store, op_state, null_yield, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+    return ret;
+  }
+  flusher.start(0);
+  ret = bucket.check_index_olh(store, dpp, op_state, flusher);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "check_index_olh(): " << ret << dendl;
+    return ret;
+  }
+  flusher.flush();
+  return 0;
+}
+
+int RGWBucketAdminOp::check_index_unlinked(rgw::sal::RGWRadosStore* store,
+                                           RGWBucketAdminOpState& op_state,
+                                           RGWFormatterFlusher& flusher,
+                                           const DoutPrefixProvider *dpp)
+{
+  flusher.start(0);
+  RGWBucket bucket;
+  int ret = bucket.init(store, op_state, null_yield, dpp);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "bucket.init(): " << ret << dendl;
+    return ret;
+  }
+  ret = bucket.check_index_unlinked(store, dpp, op_state, flusher);
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "check_index_unlinked(): " << ret << dendl;
+    return ret;
+  }
+  flusher.flush();
+  return 0;
+}
+
 int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
                   RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp)
 {
index f9c6aa1945a4e1dc039a519159397509517cea6e..09088e0d83ebf5c90595027b6153392dbaa3dd5a 100644 (file)
@@ -261,7 +261,10 @@ struct RGWBucketAdminOpState {
   bool delete_child_objects;
   bool bucket_stored;
   bool sync_bucket;
+  bool dump_keys;
+  bool hide_progress;
   int max_aio = 0;
+  ceph::timespan min_age = std::chrono::hours::zero();
 
   rgw_bucket bucket;
 
@@ -271,8 +274,11 @@ struct RGWBucketAdminOpState {
   void set_check_objects(bool value) { check_objects = value; }
   void set_fix_index(bool value) { fix_index = value; }
   void set_delete_children(bool value) { delete_child_objects = value; }
+  void set_hide_progress(bool value) { hide_progress = value; }
+  void set_dump_keys(bool value) { dump_keys = value; }
 
   void set_max_aio(int value) { max_aio = value; }
+  void set_min_age(ceph::timespan value) { min_age = value; }
 
   void set_user_id(const rgw_user& user_id) {
     if (!user_id.empty())
@@ -326,7 +332,8 @@ struct RGWBucketAdminOpState {
 
   RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false), 
                             fix_index(false), delete_child_objects(false),
-                            bucket_stored(false), sync_bucket(true)  {}
+                            bucket_stored(false), sync_bucket(true),
+                            dump_keys(false), hide_progress(false)  {}
 };
 
 /*
@@ -361,6 +368,10 @@ public:
                          RGWFormatterFlusher& flusher,
                          optional_yield y,
                          std::string *err_msg = NULL);
+  int check_index_olh(rgw::sal::RGWRadosStore* rados_store, const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state,
+                      RGWFormatterFlusher& flusher);
+  int check_index_unlinked(rgw::sal::RGWRadosStore* rados_store, const DoutPrefixProvider *dpp, RGWBucketAdminOpState& op_state,
+                           RGWFormatterFlusher& flusher);
 
   int check_index(const DoutPrefixProvider *dpp,
           RGWBucketAdminOpState& op_state,
@@ -401,6 +412,10 @@ public:
 
   static int check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
                   RGWFormatterFlusher& flusher, optional_yield y, const DoutPrefixProvider *dpp);
+  static int check_index_olh(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state,
+                             RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp);
+  static int check_index_unlinked(rgw::sal::RGWRadosStore* store, RGWBucketAdminOpState& op_state,
+                                  RGWFormatterFlusher& flusher, const DoutPrefixProvider *dpp);
 
   static int remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, optional_yield y,
       const DoutPrefixProvider *dpp, bool bypass_gc = false, bool keep_index_consistent = true);
index 4b1cb1c45e2a00c65ab943da5378d676e983fcc9..4004135747e4a77d68c28b08ed9c354bba1ee577 100644 (file)
@@ -7366,12 +7366,12 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
       ldpp_dout(dpp, 0) << "ERROR: could not clear olh, r=" << r << dendl;
       return r;
     }
-  }
-
-  r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
-  if (r < 0 && r != -ECANCELED) {
-    ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
-    return r;
+  } else {
+    r = bucket_index_trim_olh_log(dpp, bucket_info, state, obj, last_ver);
+    if (r < 0 && r != -ECANCELED) {
+      ldpp_dout(dpp, 0) << "ERROR: could not trim olh log, r=" << r << dendl;
+      return r;
+    }
   }
 
   return 0;
index c63c63cb55e2ca62f8080f889fd7d933aa3f8e9d..5c9fb6a2886633e8cd4bf697159acd9f5ca5f2fa 100644 (file)
@@ -25,7 +25,9 @@
     bucket unlink              unlink bucket from specified user
     bucket stats               returns bucket statistics
     bucket rm                  remove bucket
-    bucket check               check bucket index
+    bucket check               check bucket index by verifying size and object count stats
+    bucket check olh           check for olh index entries and objects that are pending removal
+    bucket check unlinked      check for object versions that are not visible in a bucket listing 
     bucket chown               link bucket to specified user and update its object ACLs
     bucket reshard             reshard bucket
     bucket rewrite             rewrite all objects in the specified bucket
      --package                 name of the lua package that should be added/removed to/from the allowlist
      --allow-compilation       package is allowed to compile C code as part of its installation
   
+  Bucket check olh/unlinked options:
+     --min-age-hours           minimum age of unlinked objects to consider for bucket check unlinked (default: 1)
+     --dump-keys               when specified, all keys identified as problematic are printed to stdout
+     --hide-progress           when specified, per-shard progress details are not printed to stderr
+  
   radoslist options:
      --rgw-obj-fs              the field separator that will separate the rados
                                object name from the rgw object name;
     --setgroup GROUP  set gid to group or gid
     --version         show version and quit
   
-
-