]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move rgw_sync_log_trim.* to rgw_trim_bilog.* 27579/head
authorCasey Bodley <cbodley@redhat.com>
Sat, 13 Apr 2019 17:04:34 +0000 (13:04 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 18 Apr 2019 17:16:17 +0000 (13:16 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync_log_trim.cc [deleted file]
src/rgw/rgw_sync_log_trim.h [deleted file]
src/rgw/rgw_trim_bilog.cc [new file with mode: 0644]
src/rgw/rgw_trim_bilog.h [new file with mode: 0644]

index cff1ead0ebfef322737cd4175bf42930e6a1b2f8..dac869f824fb2f824693546cc3115a5c805306d6 100644 (file)
@@ -72,8 +72,8 @@ set(librgw_common_srcs
   rgw_sync_module_pubsub.cc
   rgw_pubsub_push.cc
   rgw_sync_module_pubsub_rest.cc
-  rgw_sync_log_trim.cc
   rgw_sync_trace.cc
+  rgw_trim_bilog.cc
   rgw_trim_datalog.cc
   rgw_trim_mdlog.cc
   rgw_period_history.cc
index de75af45e1e40ea5c49780985a1b3d42e96ee1fd..8ef49038aa50490c64f7f4249856521e2f768ef5 100644 (file)
@@ -44,9 +44,9 @@ extern "C" {
 #include "rgw_usage.h"
 #include "rgw_orphan.h"
 #include "rgw_sync.h"
+#include "rgw_trim_bilog.h"
 #include "rgw_trim_datalog.h"
 #include "rgw_trim_mdlog.h"
-#include "rgw_sync_log_trim.h"
 #include "rgw_data_sync.h"
 #include "rgw_rest_conn.h"
 #include "rgw_realm_watcher.h"
index 33e69c965941001e33f241dd3ba6ece687a88bb1..ea9e53d5e9c058dfbe08589275c66a4e6b34cfdc 100644 (file)
@@ -23,7 +23,6 @@
 #include "rgw_metadata.h"
 #include "rgw_sync_counters.h"
 #include "rgw_sync_module.h"
-#include "rgw_sync_log_trim.h"
 
 #include "cls/lock/cls_lock_client.h"
 
index 2c35c7f397b14fdaec0a464e1b2f32619a32b9d7..d713dd58eddd1aeff814fbda3ff6241872092e23 100644 (file)
@@ -25,7 +25,7 @@
 #include "rgw_meta_sync_status.h"
 #include "rgw_period_puller.h"
 #include "rgw_sync_module.h"
-#include "rgw_sync_log_trim.h"
+#include "rgw_trim_bilog.h"
 #include "rgw_service.h"
 
 #include "services/svc_rados.h"
diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc
deleted file mode 100644 (file)
index 53aa8e1..0000000
+++ /dev/null
@@ -1,1095 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2017 Red Hat, Inc
- *
- * Author: Casey Bodley <cbodley@redhat.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- */
-
-#include <mutex>
-#include <boost/circular_buffer.hpp>
-#include <boost/container/flat_map.hpp>
-
-#include "include/scope_guard.h"
-#include "common/bounded_key_counter.h"
-#include "common/errno.h"
-#include "rgw_sync_log_trim.h"
-#include "rgw_cr_rados.h"
-#include "rgw_cr_rest.h"
-#include "rgw_data_sync.h"
-#include "rgw_metadata.h"
-#include "rgw_rados.h"
-#include "rgw_zone.h"
-#include "rgw_sync.h"
-
-#include "services/svc_zone.h"
-
-#include <boost/asio/yield.hpp>
-#include "include/ceph_assert.h"
-
-#define dout_subsys ceph_subsys_rgw
-
-#undef dout_prefix
-#define dout_prefix (*_dout << "trim: ")
-
-using rgw::BucketTrimConfig;
-using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
-
-const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
-using rgw::BucketTrimStatus;
-
-
-// watch/notify api for gateways to coordinate about which buckets to trim
-enum TrimNotifyType {
-  NotifyTrimCounters = 0,
-  NotifyTrimComplete,
-};
-WRITE_RAW_ENCODER(TrimNotifyType);
-
-struct TrimNotifyHandler {
-  virtual ~TrimNotifyHandler() = default;
-
-  virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
-};
-
-/// api to share the bucket trim counters between gateways in the same zone.
-/// each gateway will process different datalog shards, so the gateway that runs
-/// the trim process needs to accumulate their counters
-struct TrimCounters {
-  /// counter for a single bucket
-  struct BucketCounter {
-    std::string bucket; //< bucket instance metadata key
-    int count{0};
-
-    BucketCounter() = default;
-    BucketCounter(const std::string& bucket, int count)
-      : bucket(bucket), count(count) {}
-
-    void encode(bufferlist& bl) const;
-    void decode(bufferlist::const_iterator& p);
-  };
-  using Vector = std::vector<BucketCounter>;
-
-  /// request bucket trim counters from peer gateways
-  struct Request {
-    uint16_t max_buckets; //< maximum number of bucket counters to return
-
-    void encode(bufferlist& bl) const;
-    void decode(bufferlist::const_iterator& p);
-  };
-
-  /// return the current bucket trim counters
-  struct Response {
-    Vector bucket_counters;
-
-    void encode(bufferlist& bl) const;
-    void decode(bufferlist::const_iterator& p);
-  };
-
-  /// server interface to query the hottest buckets
-  struct Server {
-    virtual ~Server() = default;
-
-    virtual void get_bucket_counters(int count, Vector& counters) = 0;
-    virtual void reset_bucket_counters() = 0;
-  };
-
-  /// notify handler
-  class Handler : public TrimNotifyHandler {
-    Server *const server;
-   public:
-    explicit Handler(Server *server) : server(server) {}
-
-    void handle(bufferlist::const_iterator& input, bufferlist& output) override;
-  };
-};
-std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
-{
-  return out << rhs.bucket << ":" << rhs.count;
-}
-
-void TrimCounters::BucketCounter::encode(bufferlist& bl) const
-{
-  using ceph::encode;
-  // no versioning to save space
-  encode(bucket, bl);
-  encode(count, bl);
-}
-void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
-{
-  using ceph::decode;
-  decode(bucket, p);
-  decode(count, p);
-}
-WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
-
-void TrimCounters::Request::encode(bufferlist& bl) const
-{
-  ENCODE_START(1, 1, bl);
-  encode(max_buckets, bl);
-  ENCODE_FINISH(bl);
-}
-void TrimCounters::Request::decode(bufferlist::const_iterator& p)
-{
-  DECODE_START(1, p);
-  decode(max_buckets, p);
-  DECODE_FINISH(p);
-}
-WRITE_CLASS_ENCODER(TrimCounters::Request);
-
-void TrimCounters::Response::encode(bufferlist& bl) const
-{
-  ENCODE_START(1, 1, bl);
-  encode(bucket_counters, bl);
-  ENCODE_FINISH(bl);
-}
-void TrimCounters::Response::decode(bufferlist::const_iterator& p)
-{
-  DECODE_START(1, p);
-  decode(bucket_counters, p);
-  DECODE_FINISH(p);
-}
-WRITE_CLASS_ENCODER(TrimCounters::Response);
-
-void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
-                                   bufferlist& output)
-{
-  Request request;
-  decode(request, input);
-  auto count = std::min<uint16_t>(request.max_buckets, 128);
-
-  Response response;
-  server->get_bucket_counters(count, response.bucket_counters);
-  encode(response, output);
-}
-
-/// api to notify peer gateways that trim has completed and their bucket change
-/// counters can be reset
-struct TrimComplete {
-  struct Request {
-    void encode(bufferlist& bl) const;
-    void decode(bufferlist::const_iterator& p);
-  };
-  struct Response {
-    void encode(bufferlist& bl) const;
-    void decode(bufferlist::const_iterator& p);
-  };
-
-  /// server interface to reset bucket counters
-  using Server = TrimCounters::Server;
-
-  /// notify handler
-  class Handler : public TrimNotifyHandler {
-    Server *const server;
-   public:
-    explicit Handler(Server *server) : server(server) {}
-
-    void handle(bufferlist::const_iterator& input, bufferlist& output) override;
-  };
-};
-
-void TrimComplete::Request::encode(bufferlist& bl) const
-{
-  ENCODE_START(1, 1, bl);
-  ENCODE_FINISH(bl);
-}
-void TrimComplete::Request::decode(bufferlist::const_iterator& p)
-{
-  DECODE_START(1, p);
-  DECODE_FINISH(p);
-}
-WRITE_CLASS_ENCODER(TrimComplete::Request);
-
-void TrimComplete::Response::encode(bufferlist& bl) const
-{
-  ENCODE_START(1, 1, bl);
-  ENCODE_FINISH(bl);
-}
-void TrimComplete::Response::decode(bufferlist::const_iterator& p)
-{
-  DECODE_START(1, p);
-  DECODE_FINISH(p);
-}
-WRITE_CLASS_ENCODER(TrimComplete::Response);
-
-void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
-                                   bufferlist& output)
-{
-  Request request;
-  decode(request, input);
-
-  server->reset_bucket_counters();
-
-  Response response;
-  encode(response, output);
-}
-
-
-/// rados watcher for bucket trim notifications
-class BucketTrimWatcher : public librados::WatchCtx2 {
-  RGWRados *const store;
-  const rgw_raw_obj& obj;
-  rgw_rados_ref ref;
-  uint64_t handle{0};
-
-  using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
-  boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
-
- public:
-  BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
-                    TrimCounters::Server *counters)
-    : store(store), obj(obj) {
-    handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
-    handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
-  }
-
-  ~BucketTrimWatcher() {
-    stop();
-  }
-
-  int start() {
-    int r = store->get_raw_obj_ref(obj, &ref);
-    if (r < 0) {
-      return r;
-    }
-
-    // register a watch on the realm's control object
-    r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
-    if (r == -ENOENT) {
-      constexpr bool exclusive = true;
-      r = ref.ioctx.create(ref.obj.oid, exclusive);
-      if (r == -EEXIST || r == 0) {
-        r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
-      }
-    }
-    if (r < 0) {
-      lderr(store->ctx()) << "Failed to watch " << ref.obj
-          << " with " << cpp_strerror(-r) << dendl;
-      ref.ioctx.close();
-      return r;
-    }
-
-    ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
-    return 0;
-  }
-
-  int restart() {
-    int r = ref.ioctx.unwatch2(handle);
-    if (r < 0) {
-      lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
-          << " with " << cpp_strerror(-r) << dendl;
-    }
-    r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
-    if (r < 0) {
-      lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
-          << " with " << cpp_strerror(-r) << dendl;
-      ref.ioctx.close();
-    }
-    return r;
-  }
-
-  void stop() {
-    if (handle) {
-      ref.ioctx.unwatch2(handle);
-      ref.ioctx.close();
-    }
-  }
-
-  /// respond to bucket trim notifications
-  void handle_notify(uint64_t notify_id, uint64_t cookie,
-                     uint64_t notifier_id, bufferlist& bl) override {
-    if (cookie != handle) {
-      return;
-    }
-    bufferlist reply;
-    try {
-      auto p = bl.cbegin();
-      TrimNotifyType type;
-      decode(type, p);
-
-      auto handler = handlers.find(type);
-      if (handler != handlers.end()) {
-        handler->second->handle(p, reply);
-      } else {
-        lderr(store->ctx()) << "no handler for notify type " << type << dendl;
-      }
-    } catch (const buffer::error& e) {
-      lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
-    }
-    ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply);
-  }
-
-  /// reestablish the watch if it gets disconnected
-  void handle_error(uint64_t cookie, int err) override {
-    if (cookie != handle) {
-      return;
-    }
-    if (err == -ENOTCONN) {
-      ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
-      restart();
-    }
-  }
-};
-
-
-/// Interface to communicate with the trim manager about completed operations
-struct BucketTrimObserver {
-  virtual ~BucketTrimObserver() = default;
-
-  virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
-  virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
-};
-
-/// populate the status with the minimum stable marker of each shard
-template <typename Iter>
-int take_min_status(CephContext *cct, Iter first, Iter last,
-                    std::vector<std::string> *status)
-{
-  status->clear();
-  std::optional<uint64_t> num_shards;
-  for (auto peer = first; peer != last; ++peer) {
-    const size_t peer_shards = peer->size();
-    if (!num_shards) {
-      num_shards = peer_shards;
-      status->resize(*num_shards);
-    } else if (*num_shards != peer_shards) {
-      // all peers must agree on the number of shards
-      return -EINVAL;
-    }
-    auto m = status->begin();
-    for (auto& shard : *peer) {
-      auto& marker = *m++;
-      // only consider incremental sync markers
-      if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
-        continue;
-      }
-      // always take the first marker, or any later marker that's smaller
-      if (peer == first || marker > shard.inc_marker.position) {
-        marker = std::move(shard.inc_marker.position);
-      }
-    }
-  }
-  return 0;
-}
-
-/// trim each bilog shard to the given marker, while limiting the number of
-/// concurrent requests
-class BucketTrimShardCollectCR : public RGWShardCollectCR {
-  static constexpr int MAX_CONCURRENT_SHARDS = 16;
-  RGWRados *const store;
-  const RGWBucketInfo& bucket_info;
-  const std::vector<std::string>& markers; //< shard markers to trim
-  size_t i{0}; //< index of current shard marker
- public:
-  BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
-                           const std::vector<std::string>& markers)
-    : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
-      store(store), bucket_info(bucket_info), markers(markers)
-  {}
-  bool spawn_next() override;
-};
-
-bool BucketTrimShardCollectCR::spawn_next()
-{
-  while (i < markers.size()) {
-    const auto& marker = markers[i];
-    const auto shard_id = i++;
-
-    // skip empty markers
-    if (!marker.empty()) {
-      ldout(cct, 10) << "trimming bilog shard " << shard_id
-          << " of " << bucket_info.bucket << " at marker " << marker << dendl;
-      spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
-                                    std::string{}, marker),
-            false);
-      return true;
-    }
-  }
-  return false;
-}
-
-/// trim the bilog of all of the given bucket instance's shards
-class BucketTrimInstanceCR : public RGWCoroutine {
-  RGWRados *const store;
-  RGWHTTPManager *const http;
-  BucketTrimObserver *const observer;
-  std::string bucket_instance;
-  const std::string& zone_id; //< my zone id
-  RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
-  int child_ret = 0;
-
-  using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
-  std::vector<StatusShards> peer_status; //< sync status for each peer
-  std::vector<std::string> min_markers; //< min marker per shard
-
- public:
-  BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
-                       BucketTrimObserver *observer,
-                       const std::string& bucket_instance)
-    : RGWCoroutine(store->ctx()), store(store),
-      http(http), observer(observer),
-      bucket_instance(bucket_instance),
-      zone_id(store->svc.zone->get_zone().id),
-      peer_status(store->svc.zone->get_zone_data_notify_to_map().size())
-  {}
-
-  int operate() override;
-};
-
-int BucketTrimInstanceCR::operate()
-{
-  reenter(this) {
-    ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
-
-    // query peers for sync status
-    set_status("fetching sync status from peers");
-    yield {
-      // query data sync status from each sync peer
-      rgw_http_param_pair params[] = {
-        { "type", "bucket-index" },
-        { "status", nullptr },
-        { "bucket", bucket_instance.c_str() },
-        { "source-zone", zone_id.c_str() },
-        { nullptr, nullptr }
-      };
-
-      auto p = peer_status.begin();
-      for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
-        using StatusCR = RGWReadRESTResourceCR<StatusShards>;
-        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
-              false);
-        ++p;
-      }
-      // in parallel, read the local bucket instance info
-      spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
-                                           bucket_instance, &bucket_info),
-            false);
-    }
-    // wait for a response from each peer. all must respond to attempt trim
-    while (num_spawned()) {
-      yield wait_for_child();
-      collect(&child_ret, nullptr);
-      if (child_ret < 0) {
-        drain_all();
-        return set_cr_error(child_ret);
-      }
-    }
-
-    // determine the minimum marker for each shard
-    retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
-                              &min_markers);
-    if (retcode < 0) {
-      ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
-      return set_cr_error(retcode);
-    }
-
-    // trim shards with a ShardCollectCR
-    ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
-       << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
-    set_status("trimming bilog shards");
-    yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
-    // ENODATA just means there were no keys to trim
-    if (retcode == -ENODATA) {
-      retcode = 0;
-    }
-    if (retcode < 0) {
-      ldout(cct, 4) << "failed to trim bilog shards: "
-          << cpp_strerror(retcode) << dendl;
-      return set_cr_error(retcode);
-    }
-
-    observer->on_bucket_trimmed(std::move(bucket_instance));
-    return set_cr_done();
-  }
-  return 0;
-}
-
-/// trim each bucket instance while limiting the number of concurrent operations
-class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
-  RGWRados *const store;
-  RGWHTTPManager *const http;
-  BucketTrimObserver *const observer;
-  std::vector<std::string>::const_iterator bucket;
-  std::vector<std::string>::const_iterator end;
- public:
-  BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http,
-                              BucketTrimObserver *observer,
-                              const std::vector<std::string>& buckets,
-                              int max_concurrent)
-    : RGWShardCollectCR(store->ctx(), max_concurrent),
-      store(store), http(http), observer(observer),
-      bucket(buckets.begin()), end(buckets.end())
-  {}
-  bool spawn_next() override;
-};
-
-bool BucketTrimInstanceCollectCR::spawn_next()
-{
-  if (bucket == end) {
-    return false;
-  }
-  spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
-  ++bucket;
-  return true;
-}
-
-/// correlate the replies from each peer gateway into the given counter
-int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
-{
-  counter.clear();
-
-  try {
-    // decode notify responses
-    auto p = bl.cbegin();
-    std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
-    std::set<std::pair<uint64_t, uint64_t>> timeouts;
-    decode(replies, p);
-    decode(timeouts, p);
-
-    for (auto& peer : replies) {
-      auto q = peer.second.cbegin();
-      TrimCounters::Response response;
-      decode(response, q);
-      for (const auto& b : response.bucket_counters) {
-        counter.insert(b.bucket, b.count);
-      }
-    }
-  } catch (const buffer::error& e) {
-    return -EIO;
-  }
-  return 0;
-}
-
-/// metadata callback has the signature bool(string&& key, string&& marker)
-using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
-
-/// lists metadata keys, passing each to a callback until it returns false.
-/// on reaching the end, it will restart at the beginning and list up to the
-/// initial marker
-class AsyncMetadataList : public RGWAsyncRadosRequest {
-  CephContext *const cct;
-  RGWMetadataManager *const mgr;
-  const std::string section;
-  const std::string start_marker;
-  MetadataListCallback callback;
-
-  int _send_request() override;
- public:
-  AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
-                    RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
-                    const std::string& section, const std::string& start_marker,
-                    const MetadataListCallback& callback)
-    : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
-      section(section), start_marker(start_marker), callback(callback)
-  {}
-};
-
-int AsyncMetadataList::_send_request()
-{
-  void* handle = nullptr;
-  std::list<std::string> keys;
-  bool truncated{false};
-  std::string marker;
-
-  // start a listing at the given marker
-  int r = mgr->list_keys_init(section, start_marker, &handle);
-  if (r == -EINVAL) {
-    // restart with empty marker below
-  } else if (r < 0) {
-    ldout(cct, 10) << "failed to init metadata listing: "
-        << cpp_strerror(r) << dendl;
-    return r;
-  } else {
-    ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
-
-    // release the handle when scope exits
-    auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
-
-    do {
-      // get the next key and marker
-      r = mgr->list_keys_next(handle, 1, keys, &truncated);
-      if (r < 0) {
-        ldout(cct, 10) << "failed to list metadata: "
-            << cpp_strerror(r) << dendl;
-        return r;
-      }
-      marker = mgr->get_marker(handle);
-
-      if (!keys.empty()) {
-        ceph_assert(keys.size() == 1);
-        auto& key = keys.front();
-        if (!callback(std::move(key), std::move(marker))) {
-          return 0;
-        }
-      }
-    } while (truncated);
-
-    if (start_marker.empty()) {
-      // already listed all keys
-      return 0;
-    }
-  }
-
-  // restart the listing from the beginning (empty marker)
-  handle = nullptr;
-
-  r = mgr->list_keys_init(section, "", &handle);
-  if (r < 0) {
-    ldout(cct, 10) << "failed to restart metadata listing: "
-        << cpp_strerror(r) << dendl;
-    return r;
-  }
-  ldout(cct, 20) << "restarting metadata listing" << dendl;
-
-  // release the handle when scope exits
-  auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
-  do {
-    // get the next key and marker
-    r = mgr->list_keys_next(handle, 1, keys, &truncated);
-    if (r < 0) {
-      ldout(cct, 10) << "failed to list metadata: "
-          << cpp_strerror(r) << dendl;
-      return r;
-    }
-    marker = mgr->get_marker(handle);
-
-    if (!keys.empty()) {
-      ceph_assert(keys.size() == 1);
-      auto& key = keys.front();
-      // stop at original marker
-      if (marker >= start_marker) {
-        return 0;
-      }
-      if (!callback(std::move(key), std::move(marker))) {
-        return 0;
-      }
-    }
-  } while (truncated);
-
-  return 0;
-}
-
-/// coroutine wrapper for AsyncMetadataList
-class MetadataListCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *const async_rados;
-  RGWMetadataManager *const mgr;
-  const std::string& section;
-  const std::string& start_marker;
-  MetadataListCallback callback;
-  RGWAsyncRadosRequest *req{nullptr};
- public:
-  MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
-                 RGWMetadataManager *mgr, const std::string& section,
-                 const std::string& start_marker,
-                 const MetadataListCallback& callback)
-    : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
-      section(section), start_marker(start_marker), callback(callback)
-  {}
-  ~MetadataListCR() override {
-    request_cleanup();
-  }
-
-  int send_request() override {
-    req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
-                                mgr, section, start_marker, callback);
-    async_rados->queue(req);
-    return 0;
-  }
-  int request_complete() override {
-    return req->get_ret_status();
-  }
-  void request_cleanup() override {
-    if (req) {
-      req->finish();
-      req = nullptr;
-    }
-  }
-};
-
-class BucketTrimCR : public RGWCoroutine {
-  RGWRados *const store;
-  RGWHTTPManager *const http;
-  const BucketTrimConfig& config;
-  BucketTrimObserver *const observer;
-  const rgw_raw_obj& obj;
-  ceph::mono_time start_time;
-  bufferlist notify_replies;
-  BucketChangeCounter counter;
-  std::vector<std::string> buckets; //< buckets selected for trim
-  BucketTrimStatus status;
-  RGWObjVersionTracker objv; //< version tracker for trim status object
-  std::string last_cold_marker; //< position for next trim marker
-
-  static const std::string section; //< metadata section for bucket instances
- public:
-  BucketTrimCR(RGWRados *store, RGWHTTPManager *http,
-               const BucketTrimConfig& config, BucketTrimObserver *observer,
-               const rgw_raw_obj& obj)
-    : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
-      observer(observer), obj(obj), counter(config.counter_size)
-  {}
-
-  int operate() override;
-};
-
-const std::string BucketTrimCR::section{"bucket.instance"};
-
-int BucketTrimCR::operate()
-{
-  reenter(this) {
-    start_time = ceph::mono_clock::now();
-
-    if (config.buckets_per_interval) {
-      // query watch/notify for hot buckets
-      ldout(cct, 10) << "fetching active bucket counters" << dendl;
-      set_status("fetching active bucket counters");
-      yield {
-        // request the top bucket counters from each peer gateway
-        const TrimNotifyType type = NotifyTrimCounters;
-        TrimCounters::Request request{32};
-        bufferlist bl;
-        encode(type, bl);
-        encode(request, bl);
-        call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
-                                  &notify_replies));
-      }
-      if (retcode < 0) {
-        ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
-        return set_cr_error(retcode);
-      }
-
-      // select the hottest buckets for trim
-      retcode = accumulate_peer_counters(notify_replies, counter);
-      if (retcode < 0) {
-        ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
-        return set_cr_error(retcode);
-      }
-      buckets.reserve(config.buckets_per_interval);
-
-      const int max_count = config.buckets_per_interval -
-                            config.min_cold_buckets_per_interval;
-      counter.get_highest(max_count,
-        [this] (const std::string& bucket, int count) {
-          buckets.push_back(bucket);
-        });
-    }
-
-    if (buckets.size() < config.buckets_per_interval) {
-      // read BucketTrimStatus for marker position
-      set_status("reading trim status");
-      using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
-      yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj,
-                                &status, true, &objv));
-      if (retcode < 0) {
-        ldout(cct, 10) << "failed to read bilog trim status: "
-            << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-      if (status.marker == "MAX") {
-        status.marker.clear(); // restart at the beginning
-      }
-      ldout(cct, 10) << "listing cold buckets from marker="
-          << status.marker << dendl;
-
-      set_status("listing cold buckets for trim");
-      yield {
-        // capture a reference so 'this' remains valid in the callback
-        auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
-        // list cold buckets to consider for trim
-        auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
-          // filter out keys that we trimmed recently
-          if (observer->trimmed_recently(bucket)) {
-            return true;
-          }
-          // filter out active buckets that we've already selected
-          auto i = std::find(buckets.begin(), buckets.end(), bucket);
-          if (i != buckets.end()) {
-            return true;
-          }
-          buckets.emplace_back(std::move(bucket));
-          // remember the last cold bucket spawned to update the status marker
-          last_cold_marker = std::move(marker);
-          // return true if there's room for more
-          return buckets.size() < config.buckets_per_interval;
-        };
-
-        call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
-                                section, status.marker, cb));
-      }
-      if (retcode < 0) {
-        ldout(cct, 4) << "failed to list bucket instance metadata: "
-            << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-    }
-
-    // trim bucket instances with limited concurrency
-    set_status("trimming buckets");
-    ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
-    yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
-                                               config.concurrent_buckets));
-    // ignore errors from individual buckets
-
-    // write updated trim status
-    if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
-      set_status("writing updated trim status");
-      status.marker = std::move(last_cold_marker);
-      ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
-      using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
-      yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj,
-                                 status, &objv));
-      if (retcode < 0) {
-        ldout(cct, 4) << "failed to write updated trim status: "
-            << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-    }
-
-    // notify peers that trim completed
-    set_status("trim completed");
-    yield {
-      const TrimNotifyType type = NotifyTrimComplete;
-      TrimComplete::Request request;
-      bufferlist bl;
-      encode(type, bl);
-      encode(request, bl);
-      call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
-                                nullptr));
-    }
-    if (retcode < 0) {
-      ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
-      return set_cr_error(retcode);
-    }
-
-    ldout(cct, 4) << "bucket index log processing completed in "
-        << ceph::mono_clock::now() - start_time << dendl;
-    return set_cr_done();
-  }
-  return 0;
-}
-
-class BucketTrimPollCR : public RGWCoroutine {
-  RGWRados *const store;
-  RGWHTTPManager *const http;
-  const BucketTrimConfig& config;
-  BucketTrimObserver *const observer;
-  const rgw_raw_obj& obj;
-  const std::string name{"trim"}; //< lock name
-  const std::string cookie;
-
- public:
-  BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http,
-                   const BucketTrimConfig& config,
-                   BucketTrimObserver *observer, const rgw_raw_obj& obj)
-    : RGWCoroutine(store->ctx()), store(store), http(http),
-      config(config), observer(observer), obj(obj),
-      cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
-  {}
-
-  int operate() override;
-};
-
-int BucketTrimPollCR::operate()
-{
-  reenter(this) {
-    for (;;) {
-      set_status("sleeping");
-      wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
-
-      // prevent others from trimming for our entire wait interval
-      set_status("acquiring trim lock");
-      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
-                                          obj, name, cookie,
-                                          config.trim_interval_sec));
-      if (retcode < 0) {
-        ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
-        continue;
-      }
-
-      set_status("trimming");
-      yield call(new BucketTrimCR(store, http, config, observer, obj));
-      if (retcode < 0) {
-        // on errors, unlock so other gateways can try
-        set_status("unlocking");
-        yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
-                                              obj, name, cookie));
-      }
-    }
-  }
-  return 0;
-}
-
-/// tracks a bounded list of events with timestamps. old events can be expired,
-/// and recent events can be searched by key. expiration depends on events being
-/// inserted in temporal order
-template <typename T, typename Clock = ceph::coarse_mono_clock>
-class RecentEventList {
- public:
-  using clock_type = Clock;
-  using time_point = typename clock_type::time_point;
-
-  RecentEventList(size_t max_size, const ceph::timespan& max_duration)
-    : events(max_size), max_duration(max_duration)
-  {}
-
-  /// insert an event at the given point in time. this time must be at least as
-  /// recent as the last inserted event
-  void insert(T&& value, const time_point& now) {
-    // ceph_assert(events.empty() || now >= events.back().time)
-    events.push_back(Event{std::move(value), now});
-  }
-
-  /// performs a linear search for an event matching the given key, whose type
-  /// U can be any that provides operator==(U, T)
-  template <typename U>
-  bool lookup(const U& key) const {
-    for (const auto& event : events) {
-      if (key == event.value) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /// remove events that are no longer recent compared to the given point in time
-  void expire_old(const time_point& now) {
-    const auto expired_before = now - max_duration;
-    while (!events.empty() && events.front().time < expired_before) {
-      events.pop_front();
-    }
-  }
-
- private:
-  struct Event {
-    T value;
-    time_point time;
-  };
-  boost::circular_buffer<Event> events;
-  const ceph::timespan max_duration;
-};
-
-namespace rgw {
-
-// read bucket trim configuration from ceph context
-void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
-{
-  const auto& conf = cct->_conf;
-
-  config.trim_interval_sec =
-      conf.get_val<int64_t>("rgw_sync_log_trim_interval");
-  config.counter_size = 512;
-  config.buckets_per_interval =
-      conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
-  config.min_cold_buckets_per_interval =
-      conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
-  config.concurrent_buckets =
-      conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
-  config.notify_timeout_ms = 10000;
-  config.recent_size = 128;
-  config.recent_duration = std::chrono::hours(2);
-}
-
-class BucketTrimManager::Impl : public TrimCounters::Server,
-                                public BucketTrimObserver {
- public:
-  RGWRados *const store;
-  const BucketTrimConfig config;
-
-  const rgw_raw_obj status_obj;
-
-  /// count frequency of bucket instance entries in the data changes log
-  BucketChangeCounter counter;
-
-  using RecentlyTrimmedBucketList = RecentEventList<std::string>;
-  using clock_type = RecentlyTrimmedBucketList::clock_type;
-  /// track recently trimmed buckets to focus trim activity elsewhere
-  RecentlyTrimmedBucketList trimmed;
-
-  /// serve the bucket trim watch/notify api
-  BucketTrimWatcher watcher;
-
-  /// protect data shared between data sync, trim, and watch/notify threads
-  std::mutex mutex;
-
-  Impl(RGWRados *store, const BucketTrimConfig& config)
-    : store(store), config(config),
-      status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid),
-      counter(config.counter_size),
-      trimmed(config.recent_size, config.recent_duration),
-      watcher(store, status_obj, this)
-  {}
-
-  /// TrimCounters::Server interface for watch/notify api
-  void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
-    buckets.reserve(count);
-    std::lock_guard<std::mutex> lock(mutex);
-    counter.get_highest(count, [&buckets] (const std::string& key, int count) {
-                          buckets.emplace_back(key, count);
-                        });
-    ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
-  }
-
-  void reset_bucket_counters() override {
-    ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
-    std::lock_guard<std::mutex> lock(mutex);
-    counter.clear();
-    trimmed.expire_old(clock_type::now());
-  }
-
-  /// BucketTrimObserver interface to remember successfully-trimmed buckets
-  void on_bucket_trimmed(std::string&& bucket_instance) override {
-    ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
-    std::lock_guard<std::mutex> lock(mutex);
-    trimmed.insert(std::move(bucket_instance), clock_type::now());
-  }
-
-  bool trimmed_recently(const boost::string_view& bucket_instance) override {
-    std::lock_guard<std::mutex> lock(mutex);
-    return trimmed.lookup(bucket_instance);
-  }
-};
-
-BucketTrimManager::BucketTrimManager(RGWRados *store,
-                                     const BucketTrimConfig& config)
-  : impl(new Impl(store, config))
-{
-}
-BucketTrimManager::~BucketTrimManager() = default;
-
-int BucketTrimManager::init()
-{
-  return impl->watcher.start();
-}
-
-void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
-{
-  std::lock_guard<std::mutex> lock(impl->mutex);
-  // filter recently trimmed bucket instances out of bucket change counter
-  if (impl->trimmed.lookup(bucket)) {
-    return;
-  }
-  impl->counter.insert(bucket.to_string());
-}
-
-RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
-{
-  return new BucketTrimPollCR(impl->store, http, impl->config,
-                              impl.get(), impl->status_obj);
-}
-
-RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
-{
-  // return the trim coroutine without any polling
-  return new BucketTrimCR(impl->store, http, impl->config,
-                          impl.get(), impl->status_obj);
-}
-
-} // namespace rgw
diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h
deleted file mode 100644 (file)
index 13d1f63..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2017 Red Hat, Inc
- *
- * Author: Casey Bodley <cbodley@redhat.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- */
-
-#ifndef RGW_SYNC_LOG_TRIM_H
-#define RGW_SYNC_LOG_TRIM_H
-
-#include <memory>
-#include <boost/utility/string_view.hpp>
-#include "include/encoding.h"
-#include "common/ceph_time.h"
-
-class CephContext;
-class RGWCoroutine;
-class RGWHTTPManager;
-class RGWRados;
-
-namespace rgw {
-
-/// Interface to inform the trim process about which buckets are most active
-struct BucketChangeObserver {
-  virtual ~BucketChangeObserver() = default;
-
-  virtual void on_bucket_changed(const boost::string_view& bucket_instance) = 0;
-};
-
-/// Configuration for BucketTrimManager
-struct BucketTrimConfig {
-  /// time interval in seconds between bucket trim attempts
-  uint32_t trim_interval_sec{0};
-  /// maximum number of buckets to track with BucketChangeObserver
-  size_t counter_size{0};
-  /// maximum number of buckets to process each trim interval
-  uint32_t buckets_per_interval{0};
-  /// minimum number of buckets to choose from the global bucket instance list
-  uint32_t min_cold_buckets_per_interval{0};
-  /// maximum number of buckets to process in parallel
-  uint32_t concurrent_buckets{0};
-  /// timeout in ms for bucket trim notify replies
-  uint64_t notify_timeout_ms{0};
-  /// maximum number of recently trimmed buckets to remember (should be small
-  /// enough for a linear search)
-  size_t recent_size{0};
-  /// maximum duration to consider a trim as 'recent' (should be some multiple
-  /// of the trim interval, at least)
-  ceph::timespan recent_duration{0};
-};
-
-/// fill out the BucketTrimConfig from the ceph context
-void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config);
-
-/// Determines the buckets on which to focus trim activity, using two sources of
-/// input: the frequency of entries read from the data changes log, and a global
-/// listing of the bucket.instance metadata. This allows us to trim active
-/// buckets quickly, while also ensuring that all buckets will eventually trim
-class BucketTrimManager : public BucketChangeObserver {
-  class Impl;
-  std::unique_ptr<Impl> impl;
- public:
-  BucketTrimManager(RGWRados *store, const BucketTrimConfig& config);
-  ~BucketTrimManager();
-
-  int init();
-
-  /// increment a counter for the given bucket instance
-  void on_bucket_changed(const boost::string_view& bucket_instance) override;
-
-  /// create a coroutine to run the bucket trim process every trim interval
-  RGWCoroutine* create_bucket_trim_cr(RGWHTTPManager *http);
-
-  /// create a coroutine to trim buckets directly via radosgw-admin
-  RGWCoroutine* create_admin_bucket_trim_cr(RGWHTTPManager *http);
-};
-
-/// provides persistent storage for the trim manager's current position in the
-/// list of bucket instance metadata
-struct BucketTrimStatus {
-  std::string marker; //< metadata key of current bucket instance
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(marker, bl);
-    ENCODE_FINISH(bl);
-  }
-  void decode(bufferlist::const_iterator& p) {
-    DECODE_START(1, p);
-    decode(marker, p);
-    DECODE_FINISH(p);
-  }
-
-  static const std::string oid;
-};
-
-} // namespace rgw
-
-WRITE_CLASS_ENCODER(rgw::BucketTrimStatus);
-
-#endif // RGW_SYNC_LOG_TRIM_H
diff --git a/src/rgw/rgw_trim_bilog.cc b/src/rgw/rgw_trim_bilog.cc
new file mode 100644 (file)
index 0000000..fd89278
--- /dev/null
@@ -0,0 +1,1095 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * Author: Casey Bodley <cbodley@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include <mutex>
+#include <boost/circular_buffer.hpp>
+#include <boost/container/flat_map.hpp>
+
+#include "include/scope_guard.h"
+#include "common/bounded_key_counter.h"
+#include "common/errno.h"
+#include "rgw_trim_bilog.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_data_sync.h"
+#include "rgw_metadata.h"
+#include "rgw_rados.h"
+#include "rgw_zone.h"
+#include "rgw_sync.h"
+
+#include "services/svc_zone.h"
+
+#include <boost/asio/yield.hpp>
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "trim: ")
+
+using rgw::BucketTrimConfig;
+using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
+
+const std::string rgw::BucketTrimStatus::oid = "bilog.trim";
+using rgw::BucketTrimStatus;
+
+
+// watch/notify api for gateways to coordinate about which buckets to trim
+enum TrimNotifyType {
+  NotifyTrimCounters = 0,
+  NotifyTrimComplete,
+};
+WRITE_RAW_ENCODER(TrimNotifyType);
+
+struct TrimNotifyHandler {
+  virtual ~TrimNotifyHandler() = default;
+
+  virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0;
+};
+
+/// api to share the bucket trim counters between gateways in the same zone.
+/// each gateway will process different datalog shards, so the gateway that runs
+/// the trim process needs to accumulate their counters
+struct TrimCounters {
+  /// counter for a single bucket
+  struct BucketCounter {
+    std::string bucket; //< bucket instance metadata key
+    int count{0};
+
+    BucketCounter() = default;
+    BucketCounter(const std::string& bucket, int count)
+      : bucket(bucket), count(count) {}
+
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::const_iterator& p);
+  };
+  using Vector = std::vector<BucketCounter>;
+
+  /// request bucket trim counters from peer gateways
+  struct Request {
+    uint16_t max_buckets; //< maximum number of bucket counters to return
+
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::const_iterator& p);
+  };
+
+  /// return the current bucket trim counters
+  struct Response {
+    Vector bucket_counters;
+
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::const_iterator& p);
+  };
+
+  /// server interface to query the hottest buckets
+  struct Server {
+    virtual ~Server() = default;
+
+    virtual void get_bucket_counters(int count, Vector& counters) = 0;
+    virtual void reset_bucket_counters() = 0;
+  };
+
+  /// notify handler
+  class Handler : public TrimNotifyHandler {
+    Server *const server;
+   public:
+    explicit Handler(Server *server) : server(server) {}
+
+    void handle(bufferlist::const_iterator& input, bufferlist& output) override;
+  };
+};
+std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
+{
+  return out << rhs.bucket << ":" << rhs.count;
+}
+
+void TrimCounters::BucketCounter::encode(bufferlist& bl) const
+{
+  using ceph::encode;
+  // no versioning to save space
+  encode(bucket, bl);
+  encode(count, bl);
+}
+void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p)
+{
+  using ceph::decode;
+  decode(bucket, p);
+  decode(count, p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
+
+void TrimCounters::Request::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  encode(max_buckets, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimCounters::Request::decode(bufferlist::const_iterator& p)
+{
+  DECODE_START(1, p);
+  decode(max_buckets, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::Request);
+
+void TrimCounters::Response::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  encode(bucket_counters, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimCounters::Response::decode(bufferlist::const_iterator& p)
+{
+  DECODE_START(1, p);
+  decode(bucket_counters, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::Response);
+
+void TrimCounters::Handler::handle(bufferlist::const_iterator& input,
+                                   bufferlist& output)
+{
+  Request request;
+  decode(request, input);
+  auto count = std::min<uint16_t>(request.max_buckets, 128);
+
+  Response response;
+  server->get_bucket_counters(count, response.bucket_counters);
+  encode(response, output);
+}
+
+/// api to notify peer gateways that trim has completed and their bucket change
+/// counters can be reset
+struct TrimComplete {
+  struct Request {
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::const_iterator& p);
+  };
+  struct Response {
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::const_iterator& p);
+  };
+
+  /// server interface to reset bucket counters
+  using Server = TrimCounters::Server;
+
+  /// notify handler
+  class Handler : public TrimNotifyHandler {
+    Server *const server;
+   public:
+    explicit Handler(Server *server) : server(server) {}
+
+    void handle(bufferlist::const_iterator& input, bufferlist& output) override;
+  };
+};
+
+void TrimComplete::Request::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimComplete::Request::decode(bufferlist::const_iterator& p)
+{
+  DECODE_START(1, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimComplete::Request);
+
+void TrimComplete::Response::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimComplete::Response::decode(bufferlist::const_iterator& p)
+{
+  DECODE_START(1, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimComplete::Response);
+
+void TrimComplete::Handler::handle(bufferlist::const_iterator& input,
+                                   bufferlist& output)
+{
+  Request request;
+  decode(request, input);
+
+  server->reset_bucket_counters();
+
+  Response response;
+  encode(response, output);
+}
+
+
+/// rados watcher for bucket trim notifications
+class BucketTrimWatcher : public librados::WatchCtx2 {
+  RGWRados *const store;
+  const rgw_raw_obj& obj;
+  rgw_rados_ref ref;
+  uint64_t handle{0};
+
+  using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
+  boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
+
+ public:
+  BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
+                    TrimCounters::Server *counters)
+    : store(store), obj(obj) {
+    handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
+    handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
+  }
+
+  ~BucketTrimWatcher() {
+    stop();
+  }
+
+  int start() {
+    int r = store->get_raw_obj_ref(obj, &ref);
+    if (r < 0) {
+      return r;
+    }
+
+    // register a watch on the realm's control object
+    r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
+    if (r == -ENOENT) {
+      constexpr bool exclusive = true;
+      r = ref.ioctx.create(ref.obj.oid, exclusive);
+      if (r == -EEXIST || r == 0) {
+        r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
+      }
+    }
+    if (r < 0) {
+      lderr(store->ctx()) << "Failed to watch " << ref.obj
+          << " with " << cpp_strerror(-r) << dendl;
+      ref.ioctx.close();
+      return r;
+    }
+
+    ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl;
+    return 0;
+  }
+
+  int restart() {
+    int r = ref.ioctx.unwatch2(handle);
+    if (r < 0) {
+      lderr(store->ctx()) << "Failed to unwatch on " << ref.obj
+          << " with " << cpp_strerror(-r) << dendl;
+    }
+    r = ref.ioctx.watch2(ref.obj.oid, &handle, this);
+    if (r < 0) {
+      lderr(store->ctx()) << "Failed to restart watch on " << ref.obj
+          << " with " << cpp_strerror(-r) << dendl;
+      ref.ioctx.close();
+    }
+    return r;
+  }
+
+  void stop() {
+    if (handle) {
+      ref.ioctx.unwatch2(handle);
+      ref.ioctx.close();
+    }
+  }
+
+  /// respond to bucket trim notifications
+  void handle_notify(uint64_t notify_id, uint64_t cookie,
+                     uint64_t notifier_id, bufferlist& bl) override {
+    if (cookie != handle) {
+      return;
+    }
+    bufferlist reply;
+    try {
+      auto p = bl.cbegin();
+      TrimNotifyType type;
+      decode(type, p);
+
+      auto handler = handlers.find(type);
+      if (handler != handlers.end()) {
+        handler->second->handle(p, reply);
+      } else {
+        lderr(store->ctx()) << "no handler for notify type " << type << dendl;
+      }
+    } catch (const buffer::error& e) {
+      lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
+    }
+    ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply);
+  }
+
+  /// reestablish the watch if it gets disconnected
+  void handle_error(uint64_t cookie, int err) override {
+    if (cookie != handle) {
+      return;
+    }
+    if (err == -ENOTCONN) {
+      ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl;
+      restart();
+    }
+  }
+};
+
+
+/// Interface to communicate with the trim manager about completed operations
+struct BucketTrimObserver {
+  virtual ~BucketTrimObserver() = default;
+
+  virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
+  virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
+};
+
+/// populate the status with the minimum stable marker of each shard
+template <typename Iter>
+int take_min_status(CephContext *cct, Iter first, Iter last,
+                    std::vector<std::string> *status)
+{
+  status->clear();
+  std::optional<uint64_t> num_shards;
+  for (auto peer = first; peer != last; ++peer) {
+    const size_t peer_shards = peer->size();
+    if (!num_shards) {
+      num_shards = peer_shards;
+      status->resize(*num_shards);
+    } else if (*num_shards != peer_shards) {
+      // all peers must agree on the number of shards
+      return -EINVAL;
+    }
+    auto m = status->begin();
+    for (auto& shard : *peer) {
+      auto& marker = *m++;
+      // only consider incremental sync markers
+      if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
+        continue;
+      }
+      // always take the first marker, or any later marker that's smaller
+      if (peer == first || marker > shard.inc_marker.position) {
+        marker = std::move(shard.inc_marker.position);
+      }
+    }
+  }
+  return 0;
+}
+
+/// trim each bilog shard to the given marker, while limiting the number of
+/// concurrent requests
+class BucketTrimShardCollectCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+  RGWRados *const store;
+  const RGWBucketInfo& bucket_info;
+  const std::vector<std::string>& markers; //< shard markers to trim
+  size_t i{0}; //< index of current shard marker
+ public:
+  BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
+                           const std::vector<std::string>& markers)
+    : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
+      store(store), bucket_info(bucket_info), markers(markers)
+  {}
+  bool spawn_next() override;
+};
+
+bool BucketTrimShardCollectCR::spawn_next()
+{
+  while (i < markers.size()) {
+    const auto& marker = markers[i];
+    const auto shard_id = i++;
+
+    // skip empty markers
+    if (!marker.empty()) {
+      ldout(cct, 10) << "trimming bilog shard " << shard_id
+          << " of " << bucket_info.bucket << " at marker " << marker << dendl;
+      spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
+                                    std::string{}, marker),
+            false);
+      return true;
+    }
+  }
+  return false;
+}
+
+/// trim the bilog of all of the given bucket instance's shards
+class BucketTrimInstanceCR : public RGWCoroutine {
+  RGWRados *const store;
+  RGWHTTPManager *const http;
+  BucketTrimObserver *const observer;
+  std::string bucket_instance;
+  const std::string& zone_id; //< my zone id
+  RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+  int child_ret = 0;
+
+  using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
+  std::vector<StatusShards> peer_status; //< sync status for each peer
+  std::vector<std::string> min_markers; //< min marker per shard
+
+ public:
+  BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
+                       BucketTrimObserver *observer,
+                       const std::string& bucket_instance)
+    : RGWCoroutine(store->ctx()), store(store),
+      http(http), observer(observer),
+      bucket_instance(bucket_instance),
+      zone_id(store->svc.zone->get_zone().id),
+      peer_status(store->svc.zone->get_zone_data_notify_to_map().size())
+  {}
+
+  int operate() override;
+};
+
+int BucketTrimInstanceCR::operate()
+{
+  reenter(this) {
+    ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
+
+    // query peers for sync status
+    set_status("fetching sync status from peers");
+    yield {
+      // query data sync status from each sync peer
+      rgw_http_param_pair params[] = {
+        { "type", "bucket-index" },
+        { "status", nullptr },
+        { "bucket", bucket_instance.c_str() },
+        { "source-zone", zone_id.c_str() },
+        { nullptr, nullptr }
+      };
+
+      auto p = peer_status.begin();
+      for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
+        using StatusCR = RGWReadRESTResourceCR<StatusShards>;
+        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+              false);
+        ++p;
+      }
+      // in parallel, read the local bucket instance info
+      spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
+                                           bucket_instance, &bucket_info),
+            false);
+    }
+    // wait for a response from each peer. all must respond to attempt trim
+    while (num_spawned()) {
+      yield wait_for_child();
+      collect(&child_ret, nullptr);
+      if (child_ret < 0) {
+        drain_all();
+        return set_cr_error(child_ret);
+      }
+    }
+
+    // determine the minimum marker for each shard
+    retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
+                              &min_markers);
+    if (retcode < 0) {
+      ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
+      return set_cr_error(retcode);
+    }
+
+    // trim shards with a ShardCollectCR
+    ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
+       << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
+    set_status("trimming bilog shards");
+    yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
+    // ENODATA just means there were no keys to trim
+    if (retcode == -ENODATA) {
+      retcode = 0;
+    }
+    if (retcode < 0) {
+      ldout(cct, 4) << "failed to trim bilog shards: "
+          << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+
+    observer->on_bucket_trimmed(std::move(bucket_instance));
+    return set_cr_done();
+  }
+  return 0;
+}
+
+/// trim each bucket instance while limiting the number of concurrent operations
+class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
+  RGWRados *const store;
+  RGWHTTPManager *const http;
+  BucketTrimObserver *const observer;
+  std::vector<std::string>::const_iterator bucket;
+  std::vector<std::string>::const_iterator end;
+ public:
+  BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http,
+                              BucketTrimObserver *observer,
+                              const std::vector<std::string>& buckets,
+                              int max_concurrent)
+    : RGWShardCollectCR(store->ctx(), max_concurrent),
+      store(store), http(http), observer(observer),
+      bucket(buckets.begin()), end(buckets.end())
+  {}
+  bool spawn_next() override;
+};
+
+bool BucketTrimInstanceCollectCR::spawn_next()
+{
+  if (bucket == end) {
+    return false;
+  }
+  spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false);
+  ++bucket;
+  return true;
+}
+
+/// correlate the replies from each peer gateway into the given counter
+int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter)
+{
+  counter.clear();
+
+  try {
+    // decode notify responses
+    auto p = bl.cbegin();
+    std::map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
+    std::set<std::pair<uint64_t, uint64_t>> timeouts;
+    decode(replies, p);
+    decode(timeouts, p);
+
+    for (auto& peer : replies) {
+      auto q = peer.second.cbegin();
+      TrimCounters::Response response;
+      decode(response, q);
+      for (const auto& b : response.bucket_counters) {
+        counter.insert(b.bucket, b.count);
+      }
+    }
+  } catch (const buffer::error& e) {
+    return -EIO;
+  }
+  return 0;
+}
+
+/// metadata callback has the signature bool(string&& key, string&& marker)
+using MetadataListCallback = std::function<bool(std::string&&, std::string&&)>;
+
+/// lists metadata keys, passing each to a callback until it returns false.
+/// on reaching the end, it will restart at the beginning and list up to the
+/// initial marker
+class AsyncMetadataList : public RGWAsyncRadosRequest {
+  CephContext *const cct;
+  RGWMetadataManager *const mgr;
+  const std::string section;
+  const std::string start_marker;
+  MetadataListCallback callback;
+
+  int _send_request() override;
+ public:
+  AsyncMetadataList(CephContext *cct, RGWCoroutine *caller,
+                    RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr,
+                    const std::string& section, const std::string& start_marker,
+                    const MetadataListCallback& callback)
+    : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr),
+      section(section), start_marker(start_marker), callback(callback)
+  {}
+};
+
+int AsyncMetadataList::_send_request()
+{
+  void* handle = nullptr;
+  std::list<std::string> keys;
+  bool truncated{false};
+  std::string marker;
+
+  // start a listing at the given marker
+  int r = mgr->list_keys_init(section, start_marker, &handle);
+  if (r == -EINVAL) {
+    // restart with empty marker below
+  } else if (r < 0) {
+    ldout(cct, 10) << "failed to init metadata listing: "
+        << cpp_strerror(r) << dendl;
+    return r;
+  } else {
+    ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl;
+
+    // release the handle when scope exits
+    auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
+
+    do {
+      // get the next key and marker
+      r = mgr->list_keys_next(handle, 1, keys, &truncated);
+      if (r < 0) {
+        ldout(cct, 10) << "failed to list metadata: "
+            << cpp_strerror(r) << dendl;
+        return r;
+      }
+      marker = mgr->get_marker(handle);
+
+      if (!keys.empty()) {
+        ceph_assert(keys.size() == 1);
+        auto& key = keys.front();
+        if (!callback(std::move(key), std::move(marker))) {
+          return 0;
+        }
+      }
+    } while (truncated);
+
+    if (start_marker.empty()) {
+      // already listed all keys
+      return 0;
+    }
+  }
+
+  // restart the listing from the beginning (empty marker)
+  handle = nullptr;
+
+  r = mgr->list_keys_init(section, "", &handle);
+  if (r < 0) {
+    ldout(cct, 10) << "failed to restart metadata listing: "
+        << cpp_strerror(r) << dendl;
+    return r;
+  }
+  ldout(cct, 20) << "restarting metadata listing" << dendl;
+
+  // release the handle when scope exits
+  auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); });
+  do {
+    // get the next key and marker
+    r = mgr->list_keys_next(handle, 1, keys, &truncated);
+    if (r < 0) {
+      ldout(cct, 10) << "failed to list metadata: "
+          << cpp_strerror(r) << dendl;
+      return r;
+    }
+    marker = mgr->get_marker(handle);
+
+    if (!keys.empty()) {
+      ceph_assert(keys.size() == 1);
+      auto& key = keys.front();
+      // stop at original marker
+      if (marker >= start_marker) {
+        return 0;
+      }
+      if (!callback(std::move(key), std::move(marker))) {
+        return 0;
+      }
+    }
+  } while (truncated);
+
+  return 0;
+}
+
+/// coroutine wrapper for AsyncMetadataList
+class MetadataListCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *const async_rados;
+  RGWMetadataManager *const mgr;
+  const std::string& section;
+  const std::string& start_marker;
+  MetadataListCallback callback;
+  RGWAsyncRadosRequest *req{nullptr};
+ public:
+  MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados,
+                 RGWMetadataManager *mgr, const std::string& section,
+                 const std::string& start_marker,
+                 const MetadataListCallback& callback)
+    : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr),
+      section(section), start_marker(start_marker), callback(callback)
+  {}
+  ~MetadataListCR() override {
+    request_cleanup();
+  }
+
+  int send_request() override {
+    req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(),
+                                mgr, section, start_marker, callback);
+    async_rados->queue(req);
+    return 0;
+  }
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = nullptr;
+    }
+  }
+};
+
+class BucketTrimCR : public RGWCoroutine {
+  RGWRados *const store;
+  RGWHTTPManager *const http;
+  const BucketTrimConfig& config;
+  BucketTrimObserver *const observer;
+  const rgw_raw_obj& obj;
+  ceph::mono_time start_time;
+  bufferlist notify_replies;
+  BucketChangeCounter counter;
+  std::vector<std::string> buckets; //< buckets selected for trim
+  BucketTrimStatus status;
+  RGWObjVersionTracker objv; //< version tracker for trim status object
+  std::string last_cold_marker; //< position for next trim marker
+
+  static const std::string section; //< metadata section for bucket instances
+ public:
+  BucketTrimCR(RGWRados *store, RGWHTTPManager *http,
+               const BucketTrimConfig& config, BucketTrimObserver *observer,
+               const rgw_raw_obj& obj)
+    : RGWCoroutine(store->ctx()), store(store), http(http), config(config),
+      observer(observer), obj(obj), counter(config.counter_size)
+  {}
+
+  int operate() override;
+};
+
+const std::string BucketTrimCR::section{"bucket.instance"};
+
+int BucketTrimCR::operate()
+{
+  reenter(this) {
+    start_time = ceph::mono_clock::now();
+
+    if (config.buckets_per_interval) {
+      // query watch/notify for hot buckets
+      ldout(cct, 10) << "fetching active bucket counters" << dendl;
+      set_status("fetching active bucket counters");
+      yield {
+        // request the top bucket counters from each peer gateway
+        const TrimNotifyType type = NotifyTrimCounters;
+        TrimCounters::Request request{32};
+        bufferlist bl;
+        encode(type, bl);
+        encode(request, bl);
+        call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
+                                  &notify_replies));
+      }
+      if (retcode < 0) {
+        ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl;
+        return set_cr_error(retcode);
+      }
+
+      // select the hottest buckets for trim
+      retcode = accumulate_peer_counters(notify_replies, counter);
+      if (retcode < 0) {
+        ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl;
+        return set_cr_error(retcode);
+      }
+      buckets.reserve(config.buckets_per_interval);
+
+      const int max_count = config.buckets_per_interval -
+                            config.min_cold_buckets_per_interval;
+      counter.get_highest(max_count,
+        [this] (const std::string& bucket, int count) {
+          buckets.push_back(bucket);
+        });
+    }
+
+    if (buckets.size() < config.buckets_per_interval) {
+      // read BucketTrimStatus for marker position
+      set_status("reading trim status");
+      using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
+      yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj,
+                                &status, true, &objv));
+      if (retcode < 0) {
+        ldout(cct, 10) << "failed to read bilog trim status: "
+            << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+      if (status.marker == "MAX") {
+        status.marker.clear(); // restart at the beginning
+      }
+      ldout(cct, 10) << "listing cold buckets from marker="
+          << status.marker << dendl;
+
+      set_status("listing cold buckets for trim");
+      yield {
+        // capture a reference so 'this' remains valid in the callback
+        auto ref = boost::intrusive_ptr<RGWCoroutine>{this};
+        // list cold buckets to consider for trim
+        auto cb = [this, ref] (std::string&& bucket, std::string&& marker) {
+          // filter out keys that we trimmed recently
+          if (observer->trimmed_recently(bucket)) {
+            return true;
+          }
+          // filter out active buckets that we've already selected
+          auto i = std::find(buckets.begin(), buckets.end(), bucket);
+          if (i != buckets.end()) {
+            return true;
+          }
+          buckets.emplace_back(std::move(bucket));
+          // remember the last cold bucket spawned to update the status marker
+          last_cold_marker = std::move(marker);
+          // return true if there's room for more
+          return buckets.size() < config.buckets_per_interval;
+        };
+
+        call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr,
+                                section, status.marker, cb));
+      }
+      if (retcode < 0) {
+        ldout(cct, 4) << "failed to list bucket instance metadata: "
+            << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+    }
+
+    // trim bucket instances with limited concurrency
+    set_status("trimming buckets");
+    ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
+    yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets,
+                                               config.concurrent_buckets));
+    // ignore errors from individual buckets
+
+    // write updated trim status
+    if (!last_cold_marker.empty() && status.marker != last_cold_marker) {
+      set_status("writing updated trim status");
+      status.marker = std::move(last_cold_marker);
+      ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl;
+      using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
+      yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj,
+                                 status, &objv));
+      if (retcode < 0) {
+        ldout(cct, 4) << "failed to write updated trim status: "
+            << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+    }
+
+    // notify peers that trim completed
+    set_status("trim completed");
+    yield {
+      const TrimNotifyType type = NotifyTrimComplete;
+      TrimComplete::Request request;
+      bufferlist bl;
+      encode(type, bl);
+      encode(request, bl);
+      call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
+                                nullptr));
+    }
+    if (retcode < 0) {
+      ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
+      return set_cr_error(retcode);
+    }
+
+    ldout(cct, 4) << "bucket index log processing completed in "
+        << ceph::mono_clock::now() - start_time << dendl;
+    return set_cr_done();
+  }
+  return 0;
+}
+
+class BucketTrimPollCR : public RGWCoroutine {
+  RGWRados *const store;
+  RGWHTTPManager *const http;
+  const BucketTrimConfig& config;
+  BucketTrimObserver *const observer;
+  const rgw_raw_obj& obj;
+  const std::string name{"trim"}; //< lock name
+  const std::string cookie;
+
+ public:
+  BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http,
+                   const BucketTrimConfig& config,
+                   BucketTrimObserver *observer, const rgw_raw_obj& obj)
+    : RGWCoroutine(store->ctx()), store(store), http(http),
+      config(config), observer(observer), obj(obj),
+      cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
+  {}
+
+  int operate() override;
+};
+
+int BucketTrimPollCR::operate()
+{
+  reenter(this) {
+    for (;;) {
+      set_status("sleeping");
+      wait(utime_t{static_cast<time_t>(config.trim_interval_sec), 0});
+
+      // prevent others from trimming for our entire wait interval
+      set_status("acquiring trim lock");
+      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
+                                          obj, name, cookie,
+                                          config.trim_interval_sec));
+      if (retcode < 0) {
+        ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
+        continue;
+      }
+
+      set_status("trimming");
+      yield call(new BucketTrimCR(store, http, config, observer, obj));
+      if (retcode < 0) {
+        // on errors, unlock so other gateways can try
+        set_status("unlocking");
+        yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store,
+                                              obj, name, cookie));
+      }
+    }
+  }
+  return 0;
+}
+
+/// tracks a bounded list of events with timestamps. old events can be expired,
+/// and recent events can be searched by key. expiration depends on events being
+/// inserted in temporal order
+template <typename T, typename Clock = ceph::coarse_mono_clock>
+class RecentEventList {
+ public:
+  using clock_type = Clock;
+  using time_point = typename clock_type::time_point;
+
+  RecentEventList(size_t max_size, const ceph::timespan& max_duration)
+    : events(max_size), max_duration(max_duration)
+  {}
+
+  /// insert an event at the given point in time. this time must be at least as
+  /// recent as the last inserted event
+  void insert(T&& value, const time_point& now) {
+    // ceph_assert(events.empty() || now >= events.back().time)
+    events.push_back(Event{std::move(value), now});
+  }
+
+  /// performs a linear search for an event matching the given key, whose type
+  /// U can be any that provides operator==(U, T)
+  template <typename U>
+  bool lookup(const U& key) const {
+    for (const auto& event : events) {
+      if (key == event.value) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /// remove events that are no longer recent compared to the given point in time
+  void expire_old(const time_point& now) {
+    const auto expired_before = now - max_duration;
+    while (!events.empty() && events.front().time < expired_before) {
+      events.pop_front();
+    }
+  }
+
+ private:
+  struct Event {
+    T value;
+    time_point time;
+  };
+  boost::circular_buffer<Event> events;
+  const ceph::timespan max_duration;
+};
+
+namespace rgw {
+
+// read bucket trim configuration from ceph context
+void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config)
+{
+  const auto& conf = cct->_conf;
+
+  config.trim_interval_sec =
+      conf.get_val<int64_t>("rgw_sync_log_trim_interval");
+  config.counter_size = 512;
+  config.buckets_per_interval =
+      conf.get_val<int64_t>("rgw_sync_log_trim_max_buckets");
+  config.min_cold_buckets_per_interval =
+      conf.get_val<int64_t>("rgw_sync_log_trim_min_cold_buckets");
+  config.concurrent_buckets =
+      conf.get_val<int64_t>("rgw_sync_log_trim_concurrent_buckets");
+  config.notify_timeout_ms = 10000;
+  config.recent_size = 128;
+  config.recent_duration = std::chrono::hours(2);
+}
+
+class BucketTrimManager::Impl : public TrimCounters::Server,
+                                public BucketTrimObserver {
+ public:
+  RGWRados *const store;
+  const BucketTrimConfig config;
+
+  const rgw_raw_obj status_obj;
+
+  /// count frequency of bucket instance entries in the data changes log
+  BucketChangeCounter counter;
+
+  using RecentlyTrimmedBucketList = RecentEventList<std::string>;
+  using clock_type = RecentlyTrimmedBucketList::clock_type;
+  /// track recently trimmed buckets to focus trim activity elsewhere
+  RecentlyTrimmedBucketList trimmed;
+
+  /// serve the bucket trim watch/notify api
+  BucketTrimWatcher watcher;
+
+  /// protect data shared between data sync, trim, and watch/notify threads
+  std::mutex mutex;
+
+  Impl(RGWRados *store, const BucketTrimConfig& config)
+    : store(store), config(config),
+      status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid),
+      counter(config.counter_size),
+      trimmed(config.recent_size, config.recent_duration),
+      watcher(store, status_obj, this)
+  {}
+
+  /// TrimCounters::Server interface for watch/notify api
+  void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
+    buckets.reserve(count);
+    std::lock_guard<std::mutex> lock(mutex);
+    counter.get_highest(count, [&buckets] (const std::string& key, int count) {
+                          buckets.emplace_back(key, count);
+                        });
+    ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
+  }
+
+  void reset_bucket_counters() override {
+    ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
+    std::lock_guard<std::mutex> lock(mutex);
+    counter.clear();
+    trimmed.expire_old(clock_type::now());
+  }
+
+  /// BucketTrimObserver interface to remember successfully-trimmed buckets
+  void on_bucket_trimmed(std::string&& bucket_instance) override {
+    ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
+    std::lock_guard<std::mutex> lock(mutex);
+    trimmed.insert(std::move(bucket_instance), clock_type::now());
+  }
+
+  bool trimmed_recently(const boost::string_view& bucket_instance) override {
+    std::lock_guard<std::mutex> lock(mutex);
+    return trimmed.lookup(bucket_instance);
+  }
+};
+
+BucketTrimManager::BucketTrimManager(RGWRados *store,
+                                     const BucketTrimConfig& config)
+  : impl(new Impl(store, config))
+{
+}
+BucketTrimManager::~BucketTrimManager() = default;
+
+int BucketTrimManager::init()
+{
+  return impl->watcher.start();
+}
+
+void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
+{
+  std::lock_guard<std::mutex> lock(impl->mutex);
+  // filter recently trimmed bucket instances out of bucket change counter
+  if (impl->trimmed.lookup(bucket)) {
+    return;
+  }
+  impl->counter.insert(bucket.to_string());
+}
+
+RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http)
+{
+  return new BucketTrimPollCR(impl->store, http, impl->config,
+                              impl.get(), impl->status_obj);
+}
+
+RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http)
+{
+  // return the trim coroutine without any polling
+  return new BucketTrimCR(impl->store, http, impl->config,
+                          impl.get(), impl->status_obj);
+}
+
+} // namespace rgw
diff --git a/src/rgw/rgw_trim_bilog.h b/src/rgw/rgw_trim_bilog.h
new file mode 100644 (file)
index 0000000..13d1f63
--- /dev/null
@@ -0,0 +1,110 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * Author: Casey Bodley <cbodley@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef RGW_SYNC_LOG_TRIM_H
+#define RGW_SYNC_LOG_TRIM_H
+
+#include <memory>
+#include <boost/utility/string_view.hpp>
+#include "include/encoding.h"
+#include "common/ceph_time.h"
+
+class CephContext;
+class RGWCoroutine;
+class RGWHTTPManager;
+class RGWRados;
+
+namespace rgw {
+
+/// Interface to inform the trim process about which buckets are most active
+struct BucketChangeObserver {
+  virtual ~BucketChangeObserver() = default;
+
+  virtual void on_bucket_changed(const boost::string_view& bucket_instance) = 0;
+};
+
+/// Configuration for BucketTrimManager
+struct BucketTrimConfig {
+  /// time interval in seconds between bucket trim attempts
+  uint32_t trim_interval_sec{0};
+  /// maximum number of buckets to track with BucketChangeObserver
+  size_t counter_size{0};
+  /// maximum number of buckets to process each trim interval
+  uint32_t buckets_per_interval{0};
+  /// minimum number of buckets to choose from the global bucket instance list
+  uint32_t min_cold_buckets_per_interval{0};
+  /// maximum number of buckets to process in parallel
+  uint32_t concurrent_buckets{0};
+  /// timeout in ms for bucket trim notify replies
+  uint64_t notify_timeout_ms{0};
+  /// maximum number of recently trimmed buckets to remember (should be small
+  /// enough for a linear search)
+  size_t recent_size{0};
+  /// maximum duration to consider a trim as 'recent' (should be some multiple
+  /// of the trim interval, at least)
+  ceph::timespan recent_duration{0};
+};
+
+/// fill out the BucketTrimConfig from the ceph context
+void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config);
+
+/// Determines the buckets on which to focus trim activity, using two sources of
+/// input: the frequency of entries read from the data changes log, and a global
+/// listing of the bucket.instance metadata. This allows us to trim active
+/// buckets quickly, while also ensuring that all buckets will eventually trim
+class BucketTrimManager : public BucketChangeObserver {
+  class Impl;
+  std::unique_ptr<Impl> impl;
+ public:
+  BucketTrimManager(RGWRados *store, const BucketTrimConfig& config);
+  ~BucketTrimManager();
+
+  int init();
+
+  /// increment a counter for the given bucket instance
+  void on_bucket_changed(const boost::string_view& bucket_instance) override;
+
+  /// create a coroutine to run the bucket trim process every trim interval
+  RGWCoroutine* create_bucket_trim_cr(RGWHTTPManager *http);
+
+  /// create a coroutine to trim buckets directly via radosgw-admin
+  RGWCoroutine* create_admin_bucket_trim_cr(RGWHTTPManager *http);
+};
+
+/// provides persistent storage for the trim manager's current position in the
+/// list of bucket instance metadata
+struct BucketTrimStatus {
+  std::string marker; //< metadata key of current bucket instance
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(marker, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator& p) {
+    DECODE_START(1, p);
+    decode(marker, p);
+    DECODE_FINISH(p);
+  }
+
+  static const std::string oid;
+};
+
+} // namespace rgw
+
+WRITE_CLASS_ENCODER(rgw::BucketTrimStatus);
+
+#endif // RGW_SYNC_LOG_TRIM_H