]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Datalog with selectable backends, FIFO and Omap
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 10 Jun 2020 16:14:36 +0000 (12:14 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 9 Sep 2020 02:09:40 +0000 (22:09 -0400)
By default, use whatever is present (and use FIFO if nothing is
present.)

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
20 files changed:
src/cls/log/cls_log_types.h
src/common/options.cc
src/rgw/CMakeLists.txt
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_datalog.cc [new file with mode: 0644]
src/rgw/rgw_datalog.h [new file with mode: 0644]
src/rgw/rgw_json_enc.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_log.h
src/rgw/rgw_service.cc
src/rgw/rgw_sync_policy.h
src/rgw/rgw_trim_datalog.cc
src/rgw/services/svc_bi_rados.cc
src/rgw/services/svc_bi_rados.h
src/rgw/services/svc_rados.h
src/test/test_rgw_admin_log.cc

index b5ab8f3cb1320d34e3c1ebaa367c5153bf7d3722..c5c00766d8156cf14bcdd9da1ba486d23c18db9e 100644 (file)
@@ -61,6 +61,10 @@ struct cls_log_header {
     DECODE_FINISH(bl);
   }
 };
+inline bool operator ==(const cls_log_header& lhs, const cls_log_header& rhs) {
+  return (lhs.max_marker == rhs.max_marker &&
+         lhs.max_time == rhs.max_time);
+}
 WRITE_CLASS_ENCODER(cls_log_header)
 
 
index b94a4715174eef93815919d856f25646577b53cd..847f979ef729e0f041b610cf6716063190966618 100644 (file)
@@ -7125,6 +7125,17 @@ std::vector<Option> get_rgw_options() {
     .set_description("mclock limit for metadata requests")
     .add_see_also("rgw_dmclock_metadata_res")
     .add_see_also("rgw_dmclock_metadata_wgt"),
+
+   Option("rgw_data_log_backing", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+    .set_default("auto")
+    .set_enum_allowed( { "auto", "fifo", "omap" } )
+    .set_description("Backing store for the RGW data sync log")
+    .set_long_description(
+        "Whether to use the older OMAP backing store or the high performance "
+       "FIFO based backing store. Auto uses whatever already exists "
+       "but will default to FIFO if there isn't an existing log. Either of "
+       "the explicit options will cause startup to fail if the other log is "
+       "still around."),
   });
 }
 
index 5d4ab489d103a52a5c9c787e47aebf1a53b6b310..62de67f4f7fae32f8a6188747d0e10ce653ed31e 100644 (file)
@@ -153,6 +153,7 @@ set(librgw_common_srcs
   rgw_kms.cc
   rgw_url.cc
   rgw_oidc_provider
+  rgw_datalog.cc
   cls_fifo_legacy.cc)
 
 if(WITH_RADOSGW_AMQP_ENDPOINT)
@@ -315,7 +316,7 @@ install(TARGETS radosgw DESTINATION ${CMAKE_INSTALL_LIBDIR})
 add_executable(radosgwd radosgw.cc)
 target_link_libraries(radosgwd radosgw librados
   cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
-  cls_log_client cls_timeindex_client
+  cls_log_client cls_timeindex_client neorados_cls_fifo
   cls_version_client cls_user_client
   global
   ${FCGI_LIBRARY} ${LIB_RESOLV}
@@ -330,7 +331,7 @@ set(radosgw_admin_srcs
 add_executable(radosgw-admin ${radosgw_admin_srcs})
 target_link_libraries(radosgw-admin ${rgw_libs} librados
   cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
-  cls_log_client cls_timeindex_client
+  cls_log_client cls_timeindex_client neorados_cls_fifo
   cls_version_client cls_user_client
   global ${FCGI_LIBRARY} ${LIB_RESOLV}
   ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
@@ -341,7 +342,7 @@ set(radosgw_es_srcs
 add_executable(radosgw-es ${radosgw_es_srcs})
 target_link_libraries(radosgw-es ${rgw_libs} librados
   cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
-  cls_log_client cls_timeindex_client
+  cls_log_client cls_timeindex_client neorados_cls_fifo
   cls_version_client cls_user_client
   global ${FCGI_LIBRARY} ${LIB_RESOLV}
   ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
@@ -359,7 +360,7 @@ set(radosgw_object_expirer_srcs
 add_executable(radosgw-object-expirer ${radosgw_object_expirer_srcs})
 target_link_libraries(radosgw-object-expirer ${rgw_libs} librados
   cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
-  cls_log_client cls_timeindex_client
+  cls_log_client cls_timeindex_client neorados_cls_fifo
   cls_version_client cls_user_client
   global ${FCGI_LIBRARY} ${LIB_RESOLV}
   ${CURL_LIBRARIES} ${EXPAT_LIBRARIES})
@@ -379,6 +380,7 @@ target_link_libraries(rgw
   cls_refcount_client
   cls_log_client
   cls_timeindex_client
+  neorados_cls_fifo
   cls_version_client
   cls_user_client
   global
index 73008201e838990969fa0b02536b93ac7a98e969..490b225ca3b33ae44a23788622f57ffdce6728c8 100644 (file)
@@ -39,6 +39,7 @@ extern "C" {
 #include "rgw_rados.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h"
+#include "rgw_datalog.h"
 #include "rgw_lc.h"
 #include "rgw_log.h"
 #include "rgw_formats.h"
index 100c2b94e69fa1f99b22bd2d1acae2a6d94ffab9..6b5e492a31523ceabefe35b90416e9ad95243746 100644 (file)
@@ -1,23 +1,19 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab ft=cpp
 
-#include <errno.h>
-
-#include <string>
+#include <cerrno>
 #include <map>
 #include <sstream>
+#include <string>
 #include <string_view>
 
 #include <boost/format.hpp>
 
-#undef FMT_HEADER_ONLY
-#define FMT_HEADER_ONLY 1
-#include "fmt/format.h"
-
 #include "common/errno.h"
 #include "common/ceph_json.h"
 #include "include/scope_guard.h"
 
+#include "rgw_datalog.h"
 #include "rgw_rados.h"
 #include "rgw_zone.h"
 #include "rgw_acl.h"
@@ -1976,460 +1972,6 @@ int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::RGWRadosStore *store,
   return fix_bucket_obj_expiry(store, admin_bucket.get_bucket_info(), flusher, dry_run);
 }
 
-void rgw_data_change::dump(Formatter *f) const
-{
-  string type;
-  switch (entity_type) {
-    case ENTITY_TYPE_BUCKET:
-      type = "bucket";
-      break;
-    default:
-      type = "unknown";
-  }
-  encode_json("entity_type", type, f);
-  encode_json("key", key, f);
-  utime_t ut(timestamp);
-  encode_json("timestamp", ut, f);
-}
-
-void rgw_data_change::decode_json(JSONObj *obj) {
-  string s;
-  JSONDecoder::decode_json("entity_type", s, obj);
-  if (s == "bucket") {
-    entity_type = ENTITY_TYPE_BUCKET;
-  } else {
-    entity_type = ENTITY_TYPE_UNKNOWN;
-  }
-  JSONDecoder::decode_json("key", key, obj);
-  utime_t ut;
-  JSONDecoder::decode_json("timestamp", ut, obj);
-  timestamp = ut.to_real_time();
-}
-
-void rgw_data_change_log_entry::dump(Formatter *f) const
-{
-  encode_json("log_id", log_id, f);
-  utime_t ut(log_timestamp);
-  encode_json("log_timestamp", ut, f);
-  encode_json("entry", entry, f);
-}
-
-void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
-  JSONDecoder::decode_json("log_id", log_id, obj);
-  utime_t ut;
-  JSONDecoder::decode_json("log_timestamp", ut, obj);
-  log_timestamp = ut.to_real_time();
-  JSONDecoder::decode_json("entry", entry, obj);
-}
-
-
-RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
-  : cct(cct),
-    num_shards(cct->_conf->rgw_data_log_num_shards),
-    changes(cct->_conf->rgw_data_log_changes_size) {}
-
-void RGWDataChangesLog::init(RGWSI_Cls *cls_svc)
-{
-  svc.cls = cls_svc;
-  assert(svc.cls);
-
-  oids = new string[num_shards];
-  for (int i = 0; i < num_shards; i++) {
-    oids[i] = get_oid(i);
-  }
-}
-
-int RGWDataChangesLog::start(const RGWZone* _zone)
-{
-  zone = _zone;
-  assert(zone);
-  renew_thread = make_named_thread("rgw_dt_lg_renew",
-                                  &RGWDataChangesLog::renew_run, this);
-  return 0;
-}
-
-int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
-    const string& name = bs.bucket.name;
-    int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
-    uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
-
-    return (int)r;
-}
-
-int RGWDataChangesLog::renew_entries()
-{
-  if (!zone->log_data)
-    return 0;
-
-  /* we can't keep the bucket name as part of the cls_log_entry, and we need
-   * it later, so we keep two lists under the map */
-  map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
-
-  lock.lock();
-  map<rgw_bucket_shard, bool> entries;
-  entries.swap(cur_cycle);
-  lock.unlock();
-
-  map<rgw_bucket_shard, bool>::iterator iter;
-  string section;
-  real_time ut = real_clock::now();
-  for (iter = entries.begin(); iter != entries.end(); ++iter) {
-    const rgw_bucket_shard& bs = iter->first;
-
-    int index = choose_oid(bs);
-
-    cls_log_entry entry;
-
-    rgw_data_change change;
-    bufferlist bl;
-    change.entity_type = ENTITY_TYPE_BUCKET;
-    change.key = bs.get_key();
-    change.timestamp = ut;
-    encode(change, bl);
-
-    svc.cls->timelog.prepare_entry(entry, ut, section, change.key, bl);
-
-    m[index].first.push_back(bs);
-    m[index].second.emplace_back(std::move(entry));
-  }
-
-  map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
-  for (miter = m.begin(); miter != m.end(); ++miter) {
-    list<cls_log_entry>& entries = miter->second.second;
-
-    real_time now = real_clock::now();
-
-    int ret = svc.cls->timelog.add(oids[miter->first], entries, nullptr, true, null_yield);
-    if (ret < 0) {
-      /* we don't really need to have a special handling for failed cases here,
-       * as this is just an optimization. */
-      lderr(cct) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
-      return ret;
-    }
-
-    real_time expiration = now;
-    expiration += make_timespan(cct->_conf->rgw_data_log_window);
-
-    list<rgw_bucket_shard>& buckets = miter->second.first;
-    list<rgw_bucket_shard>::iterator liter;
-    for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
-      update_renewed(*liter, expiration);
-    }
-  }
-
-  return 0;
-}
-
-void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
-{
-  ceph_assert(ceph_mutex_is_locked(lock));
-  if (!changes.find(bs, status)) {
-    status = ChangeStatusPtr(new ChangeStatus);
-    changes.add(bs, status);
-  }
-}
-
-void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
-{
-  std::lock_guard l{lock};
-  cur_cycle[bs] = true;
-}
-
-void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
-{
-  std::lock_guard l{lock};
-  ChangeStatusPtr status;
-  _get_change(bs, status);
-
-  ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
-  status->cur_expiration = expiration;
-}
-
-int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
-  rgw_bucket_shard bs(bucket, shard_id);
-
-  return choose_oid(bs);
-}
-
-bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket, optional_yield y) const
-{
-  if (!bucket_filter) {
-    return true;
-  }
-
-  return bucket_filter(bucket, y);
-}
-
-std::string RGWDataChangesLog::get_oid(int i) const {
-  std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
-  if (prefix.empty()) {
-    prefix = "data_log"sv;
-  }
-  return fmt::format("{}.{}", prefix, i);
-}
-
-
-int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
-  auto& bucket = bucket_info.bucket;
-
-  if (!filter_bucket(bucket, null_yield)) {
-    return 0;
-  }
-
-  if (observer) {
-    observer->on_bucket_changed(bucket.get_key());
-  }
-
-  rgw_bucket_shard bs(bucket, shard_id);
-
-  int index = choose_oid(bs);
-  mark_modified(index, bs);
-
-  lock.lock();
-
-  ChangeStatusPtr status;
-  _get_change(bs, status);
-
-  lock.unlock();
-
-  real_time now = real_clock::now();
-
-  status->lock.lock();
-
-  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
-                << " shard_id=" << shard_id << " now=" << now
-                << " cur_expiration=" << status->cur_expiration << dendl;
-
-  if (now < status->cur_expiration) {
-    /* no need to send, recently completed */
-    status->lock.unlock();
-
-    register_renew(bs);
-    return 0;
-  }
-
-  RefCountedCond *cond;
-
-  if (status->pending) {
-    cond = status->cond;
-
-    ceph_assert(cond);
-
-    status->cond->get();
-    status->lock.unlock();
-
-    int ret = cond->wait();
-    cond->put();
-    if (!ret) {
-      register_renew(bs);
-    }
-    return ret;
-  }
-
-  status->cond = new RefCountedCond;
-  status->pending = true;
-
-  string& oid = oids[index];
-  real_time expiration;
-
-  int ret;
-
-  do {
-    status->cur_sent = now;
-
-    expiration = now;
-    expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
-
-    status->lock.unlock();
-
-    bufferlist bl;
-    rgw_data_change change;
-    change.entity_type = ENTITY_TYPE_BUCKET;
-    change.key = bs.get_key();
-    change.timestamp = now;
-    encode(change, bl);
-    string section;
-
-    ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
-
-    ret = svc.cls->timelog.add(oid, now, section, change.key, bl, null_yield);
-
-    now = real_clock::now();
-
-    status->lock.lock();
-
-  } while (!ret && real_clock::now() > expiration);
-
-  cond = status->cond;
-
-  status->pending = false;
-  status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
-  status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
-  status->cond = NULL;
-  status->lock.unlock();
-
-  cond->done(ret);
-  cond->put();
-
-  return ret;
-}
-
-int RGWDataChangesLog::list_entries(int shard, int max_entries,
-                                   std::vector<rgw_data_change_log_entry>& entries,
-                                   std::optional<std::string_view> marker,
-                                   std::string* out_marker, bool* truncated)
-{
-  assert(shard < num_shards);
-  std::list<cls_log_entry> log_entries;
-
-  int ret = svc.cls->timelog.list(oids[shard], {}, {},
-                                 max_entries, log_entries,
-                                 std::string(marker.value_or("")),
-                                 out_marker, truncated, null_yield);
-  if (ret < 0)
-    return ret;
-
-  for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
-    rgw_data_change_log_entry log_entry;
-    log_entry.log_id = iter->id;
-    real_time rt = iter->timestamp.to_real_time();
-    log_entry.log_timestamp = rt;
-    auto liter = iter->data.cbegin();
-    try {
-      decode(log_entry.entry, liter);
-    } catch (buffer::error& err) {
-      lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
-      return -EIO;
-    }
-    entries.push_back(log_entry);
-  }
-
-  return 0;
-}
-
-int RGWDataChangesLog::list_entries(int max_entries,
-                                   std::vector<rgw_data_change_log_entry>& entries,
-                                   LogMarker& marker, bool *ptruncated)
-{
-  bool truncated;
-  entries.clear();
-
-  for (; marker.shard < num_shards && (int)entries.size() < max_entries;
-       marker.shard++, marker.marker.reset()) {
-    int ret = list_entries(marker.shard, max_entries - entries.size(),
-                          entries, marker.marker, NULL, &truncated);
-    if (ret == -ENOENT) {
-      continue;
-    }
-    if (ret < 0) {
-      return ret;
-    }
-    if (truncated) {
-      *ptruncated = true;
-      return 0;
-    }
-  }
-
-  *ptruncated = (marker.shard < num_shards);
-
-  return 0;
-}
-
-int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
-{
-  assert(shard_id < num_shards);
-  string oid = oids[shard_id];
-
-  cls_log_header header;
-
-  int ret = svc.cls->timelog.info(oid, &header, null_yield);
-  if ((ret < 0) && (ret != -ENOENT))
-    return ret;
-
-  info->marker = header.max_marker;
-  info->last_update = header.max_time.to_real_time();
-
-  return 0;
-}
-
-int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
-{
-  assert(shard_id < num_shards);
-  return svc.cls->timelog.trim(oids[shard_id], {}, {},
-                               {}, std::string(marker), nullptr, null_yield);
-}
-
-int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
-                                   librados::AioCompletion* c)
-{
-  assert(shard_id < num_shards);
-  return svc.cls->timelog.trim(oids[shard_id], {}, {},
-                              {}, std::string(marker), c, null_yield);
-}
-
-bool RGWDataChangesLog::going_down() const
-{
-  return down_flag;
-}
-
-RGWDataChangesLog::~RGWDataChangesLog() {
-  down_flag = true;
-  if (renew_thread.joinable()) {
-    renew_stop();
-    renew_thread.join();
-  }
-  delete[] oids;
-}
-
-void RGWDataChangesLog::renew_run() {
-  for (;;) {
-    dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
-    int r = renew_entries();
-    if (r < 0) {
-      dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
-    }
-
-    if (going_down())
-      break;
-
-    int interval = cct->_conf->rgw_data_log_window * 3 / 4;
-    std::unique_lock locker{renew_lock};
-    renew_cond.wait_for(locker, std::chrono::seconds(interval));
-  }
-}
-
-void RGWDataChangesLog::renew_stop()
-{
-  std::lock_guard l{renew_lock};
-  renew_cond.notify_all();
-}
-
-void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
-{
-  auto key = bs.get_key();
-  {
-    std::shared_lock rl{modified_lock}; // read lock to check for existence
-    auto shard = modified_shards.find(shard_id);
-    if (shard != modified_shards.end() && shard->second.count(key)) {
-      return;
-    }
-  }
-
-  std::unique_lock wl{modified_lock}; // write lock for insertion
-  modified_shards[shard_id].insert(key);
-}
-
-void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
-{
-  std::unique_lock wl{modified_lock};
-  modified.swap(modified_shards);
-  modified_shards.clear();
-}
-
-std::string_view RGWDataChangesLog::max_marker() const {
-  return "99999999"sv;
-}
-
 void RGWBucketCompleteInfo::dump(Formatter *f) const {
   encode_json("bucket_info", info, f);
   encode_json("attrs", attrs, f);
index 597707e46677e2fef3a0e1e3254bee51ed1a0f01..cb93aefde74abd9138d9559db2c8e536955271a0 100644 (file)
@@ -6,6 +6,10 @@
 
 #include <string>
 #include <memory>
+#include <variant>
+
+#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
 
 #include "include/types.h"
 #include "rgw_common.h"
@@ -23,7 +27,6 @@
 #include "services/svc_bucket_types.h"
 #include "services/svc_bucket_sync.h"
 
-
 // define as static when RGWBucket implementation completes
 extern void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id);
 
@@ -33,7 +36,7 @@ class RGWBucketInstanceMetadataHandler;
 class RGWUserCtl;
 class RGWBucketCtl;
 class RGWZone;
-
+struct RGWZoneParams;
 namespace rgw { namespace sal {
   class RGWRadosStore;
   class RGWBucketList;
@@ -410,177 +413,6 @@ public:
   static int sync_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, string *err_msg = NULL);
 };
 
-
-enum DataLogEntityType {
-  ENTITY_TYPE_UNKNOWN = 0,
-  ENTITY_TYPE_BUCKET = 1,
-};
-
-struct rgw_data_change {
-  DataLogEntityType entity_type;
-  std::string key;
-  ceph::real_time timestamp;
-
-  void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
-    auto t = std::uint8_t(entity_type);
-    encode(t, bl);
-    encode(key, bl);
-    encode(timestamp, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-     DECODE_START(1, bl);
-     std::uint8_t t;
-     decode(t, bl);
-     entity_type = DataLogEntityType(t);
-     decode(key, bl);
-     decode(timestamp, bl);
-     DECODE_FINISH(bl);
-  }
-
-  void dump(ceph::Formatter* f) const;
-  void decode_json(JSONObj* obj);
-};
-WRITE_CLASS_ENCODER(rgw_data_change)
-
-struct rgw_data_change_log_entry {
-  std::string log_id;
-  ceph::real_time log_timestamp;
-  rgw_data_change entry;
-
-  void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(log_id, bl);
-    encode(log_timestamp, bl);
-    encode(entry, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(ceph::buffer::list::const_iterator& bl) {
-     DECODE_START(1, bl);
-     decode(log_id, bl);
-     decode(log_timestamp, bl);
-     decode(entry, bl);
-     DECODE_FINISH(bl);
-  }
-
-  void dump(ceph::Formatter* f) const;
-  void decode_json(JSONObj* obj);
-};
-WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
-
-struct RGWDataChangesLogInfo {
-  std::string marker;
-  ceph::real_time last_update;
-
-  void dump(ceph::Formatter* f) const;
-  void decode_json(JSONObj* obj);
-};
-
-namespace rgw {
-struct BucketChangeObserver;
-}
-
-struct RGWDataChangesLogMarker {
-  int shard = 0;
-  std::optional<std::string> marker;
-
-  RGWDataChangesLogMarker() = default;
-};
-
-class RGWDataChangesLog {
-  CephContext *cct;
-  rgw::BucketChangeObserver *observer = nullptr;
-  const RGWZone* zone;
-
-  struct Svc {
-    RGWSI_Cls *cls{nullptr};
-  } svc;
-
-  int num_shards;
-  std::string* oids;
-
-  ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
-  ceph::shared_mutex modified_lock =
-    ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
-  std::map<int, set<string> > modified_shards;
-
-  std::atomic<bool> down_flag = { false };
-
-  struct ChangeStatus {
-    std::shared_ptr<const rgw_sync_policy_info> sync_policy;
-    ceph::real_time cur_expiration;
-    ceph::real_time cur_sent;
-    bool pending = false;
-    RefCountedCond *cond = nullptr;
-    ceph::mutex lock =
-      ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
-  };
-
-  using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
-
-  lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
-
-  std::map<rgw_bucket_shard, bool> cur_cycle;
-
-  void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
-  void register_renew(rgw_bucket_shard& bs);
-  void update_renewed(rgw_bucket_shard& bs, ceph::real_time& expiration);
-
-  ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
-  ceph::condition_variable renew_cond;
-  void renew_run();
-  void renew_stop();
-  std::thread renew_thread;
-
-  std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
-  int choose_oid(const rgw_bucket_shard& bs);
-  bool going_down() const;
-  bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
-  int renew_entries();
-
-public:
-
-  RGWDataChangesLog(CephContext* cct);
-  ~RGWDataChangesLog();
-
-  void init(RGWSI_Cls *cls_svc);
-  int start(const RGWZone* _zone);
-
-  int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
-  int get_log_shard_id(rgw_bucket& bucket, int shard_id);
-  int list_entries(int shard, int max_entries,
-                  std::vector<rgw_data_change_log_entry>& entries,
-                  std::optional<std::string_view> marker,
-                  std::string* out_marker, bool* truncated);
-  int trim_entries(int shard_id, std::string_view marker);
-  int trim_entries(int shard_id, std::string_view marker,
-                  librados::AioCompletion* c); // :(
-  int get_info(int shard_id, RGWDataChangesLogInfo *info);
-
-  using LogMarker = RGWDataChangesLogMarker;
-
-  int list_entries(int max_entries,
-                  std::vector<rgw_data_change_log_entry>& entries,
-                  LogMarker& marker, bool* ptruncated);
-
-  void mark_modified(int shard_id, const rgw_bucket_shard& bs);
-  void read_clear_modified(map<int, set<string> > &modified);
-
-  void set_observer(rgw::BucketChangeObserver *observer) {
-    this->observer = observer;
-  }
-
-  void set_bucket_filter(decltype(bucket_filter)&& f) {
-    bucket_filter = std::move(f);
-  }
-  // a marker that compares greater than any other
-  std::string_view max_marker() const;
-  std::string get_oid(int shard_id) const;
-};
-
 struct rgw_ep_info {
   RGWBucketEntryPoint &ep;
   map<std::string, buffer::list>& attrs;
index 94000fd5a1801e90f31d9b9cea242eb2d089d6c8..fd1208d5bef58fba51c458659760f07cdd99a476 100644 (file)
@@ -20,6 +20,7 @@
 #include "rgw_bucket.h"
 #include "rgw_bucket_sync.h"
 #include "rgw_bucket_sync_cache.h"
+#include "rgw_datalog.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_counters.h"
 #include "rgw_sync_error_repo.h"
index ec8e648cd51f50b884f98ece8be10604f983a2a1..393c31a927727cefbfbc07fd34df323380e48a32 100644 (file)
@@ -13,6 +13,7 @@
 #include "rgw_http_client.h"
 #include "rgw_sal.h"
 
+#include "rgw_datalog.h"
 #include "rgw_sync_module.h"
 #include "rgw_sync_trace.h"
 #include "rgw_sync_policy.h"
@@ -348,7 +349,6 @@ struct RGWDataSyncCtx {
 };
 
 class RGWRados;
-class RGWDataChangesLogInfo;
 
 class RGWRemoteDataLog : public RGWCoroutinesManager {
   const DoutPrefixProvider *dpp;
diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc
new file mode 100644 (file)
index 0000000..33edaf7
--- /dev/null
@@ -0,0 +1,902 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <vector>
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/error_code.h"
+
+#include "common/async/blocked_completion.h"
+#include "common/async/librados_completion.h"
+
+#include "cls/fifo/cls_fifo_types.h"
+
+#include "cls_fifo_legacy.h"
+#include "rgw_datalog.h"
+#include "rgw_tools.h"
+
+#define dout_context g_ceph_context
+static constexpr auto dout_subsys = ceph_subsys_rgw;
+
+namespace bs = boost::system;
+
+void rgw_data_change::dump(ceph::Formatter *f) const
+{
+  std::string type;
+  switch (entity_type) {
+    case ENTITY_TYPE_BUCKET:
+      type = "bucket";
+      break;
+    default:
+      type = "unknown";
+  }
+  encode_json("entity_type", type, f);
+  encode_json("key", key, f);
+  utime_t ut(timestamp);
+  encode_json("timestamp", ut, f);
+}
+
+void rgw_data_change::decode_json(JSONObj *obj) {
+  std::string s;
+  JSONDecoder::decode_json("entity_type", s, obj);
+  if (s == "bucket") {
+    entity_type = ENTITY_TYPE_BUCKET;
+  } else {
+    entity_type = ENTITY_TYPE_UNKNOWN;
+  }
+  JSONDecoder::decode_json("key", key, obj);
+  utime_t ut;
+  JSONDecoder::decode_json("timestamp", ut, obj);
+  timestamp = ut.to_real_time();
+}
+
+void rgw_data_change_log_entry::dump(Formatter *f) const
+{
+  encode_json("log_id", log_id, f);
+  utime_t ut(log_timestamp);
+  encode_json("log_timestamp", ut, f);
+  encode_json("entry", entry, f);
+}
+
+void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("log_id", log_id, obj);
+  utime_t ut;
+  JSONDecoder::decode_json("log_timestamp", ut, obj);
+  log_timestamp = ut.to_real_time();
+  JSONDecoder::decode_json("entry", entry, obj);
+}
+
+int RGWDataChangesBE::remove(CephContext* cct, librados::Rados* rados,
+                            const rgw_pool& log_pool)
+{
+  auto num_shards = cct->_conf->rgw_data_log_num_shards;
+  librados::IoCtx ioctx;
+  auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
+                         false, false);
+  if (r < 0) {
+    if (r == -ENOENT) {
+      return 0;
+    } else {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": rgw_init_ioctx failed: " << log_pool.name
+                << ": " << cpp_strerror(-r) << dendl;
+      return r;
+    }
+  }
+  for (auto i = 0; i < num_shards; ++i) {
+    auto oid = get_oid(cct, i);
+    librados::ObjectWriteOperation op;
+    op.remove();
+    auto r = rgw_rados_operate(ioctx, oid, &op, null_yield);
+    if (r < 0 && r != -ENOENT) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": remove failed: " << log_pool.name << "/" << oid
+                << ": " << cpp_strerror(-r) << dendl;
+    }
+  }
+  return 0;
+}
+
+
+class RGWDataChangesOmap final : public RGWDataChangesBE {
+  using centries = std::list<cls_log_entry>;
+  RGWSI_Cls& cls;
+  std::vector<std::string> oids;
+public:
+  RGWDataChangesOmap(CephContext* cct, RGWSI_Cls& cls)
+    : RGWDataChangesBE(cct), cls(cls) {
+    auto num_shards = cct->_conf->rgw_data_log_num_shards;
+    oids.reserve(num_shards);
+    for (auto i = 0; i < num_shards; ++i) {
+      oids.push_back(get_oid(i));
+    }
+  }
+  ~RGWDataChangesOmap() override = default;
+  static int exists(CephContext* cct, RGWSI_Cls& cls, bool* exists,
+                   bool* has_entries) {
+    auto num_shards = cct->_conf->rgw_data_log_num_shards;
+    std::string out_marker;
+    bool truncated = false;
+    std::list<cls_log_entry> log_entries;
+    const cls_log_header empty_info;
+    *exists = false;
+    *has_entries = false;
+    for (auto i = 0; i < num_shards; ++i) {
+      cls_log_header info;
+      auto oid = get_oid(cct, i);
+      auto r = cls.timelog.info(oid, &info, null_yield);
+      if (r < 0 && r != -ENOENT) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": failed to get info " << oid << ": " << cpp_strerror(-r)
+                  << dendl;
+       return r;
+      } else if ((r == -ENOENT) || (info == empty_info)) {
+       continue;
+      }
+      *exists = true;
+      r = cls.timelog.list(oid, {}, {}, 100, log_entries, "", &out_marker,
+                          &truncated, null_yield);
+      if (r < 0) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": failed to list " << oid << ": " << cpp_strerror(-r)
+                  << dendl;
+       return r;
+      } else if (!log_entries.empty()) {
+       *has_entries = true;
+       break; // No reason to continue, once we have both existence
+              // AND non-emptiness
+      }
+    }
+    return 0;
+  }
+
+  void prepare(ceph::real_time ut, const std::string& key,
+              ceph::buffer::list&& entry, entries& out) override {
+    if (!std::holds_alternative<centries>(out)) {
+      ceph_assert(std::visit([](const auto& v) { return std::empty(v); }, out));
+      out = centries();
+    }
+
+    cls_log_entry e;
+    cls.timelog.prepare_entry(e, ut, {}, key, entry);
+    std::get<centries>(out).push_back(std::move(e));
+  }
+  int push(int index, entries&& items) override {
+    auto r = cls.timelog.add(oids[index], std::get<centries>(items),
+                            nullptr, true, null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": failed to push to " << oids[index] << cpp_strerror(-r)
+                << dendl;
+    }
+    return r;
+  }
+  int push(int index, ceph::real_time now,
+          const std::string& key,
+          ceph::buffer::list&& bl) override {
+    auto r = cls.timelog.add(oids[index], now, {}, key, bl, null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": failed to push to " << oids[index]
+                << cpp_strerror(-r) << dendl;
+    }
+    return r;
+  }
+  int list(int index, int max_entries,
+          std::vector<rgw_data_change_log_entry>& entries,
+          std::optional<std::string_view> marker,
+          std::string* out_marker, bool* truncated) override {
+    std::list<cls_log_entry> log_entries;
+    auto r = cls.timelog.list(oids[index], {}, {},
+                             max_entries, log_entries,
+                             std::string(marker.value_or("")),
+                             out_marker, truncated, null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": failed to list " << oids[index]
+                << cpp_strerror(-r) << dendl;
+      return r;
+    }
+    for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+      rgw_data_change_log_entry log_entry;
+      log_entry.log_id = iter->id;
+      auto rt = iter->timestamp.to_real_time();
+      log_entry.log_timestamp = rt;
+      auto liter = iter->data.cbegin();
+      try {
+       decode(log_entry.entry, liter);
+      } catch (ceph::buffer::error& err) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": failed to decode data changes log entry: "
+                  << err.what() << dendl;
+       return -EIO;
+      }
+      entries.push_back(log_entry);
+    }
+    return 0;
+  }
+  int get_info(int index, RGWDataChangesLogInfo *info) override {
+    cls_log_header header;
+    auto r = cls.timelog.info(oids[index], &header, null_yield);
+    if (r == -ENOENT) r = 0;
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": failed to get info from " << oids[index]
+                << cpp_strerror(-r) << dendl;
+    } else {
+      info->marker = header.max_marker;
+      info->last_update = header.max_time.to_real_time();
+    }
+    return r;
+  }
+  int trim(int index, std::string_view marker) override {
+    auto r = cls.timelog.trim(oids[index], {}, {},
+                             {}, std::string(marker), nullptr,
+                             null_yield);
+
+    if (r == -ENOENT) r = 0;
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": failed to get info from " << oids[index]
+                << cpp_strerror(-r) << dendl;
+    }
+    return r;
+  }
+  int trim(int index, std::string_view marker,
+          librados::AioCompletion* c) override {
+    auto r = cls.timelog.trim(oids[index], {}, {},
+                           {}, std::string(marker), c, null_yield);
+
+    if (r == -ENOENT) r = 0;
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": failed to get info from " << oids[index]
+                << cpp_strerror(-r) << dendl;
+    }
+    return r;
+  }
+  std::string_view max_marker() const override {
+    return "99999999"sv;
+  }
+};
+
+class RGWDataChangesFIFO final : public RGWDataChangesBE {
+  using centries = std::vector<ceph::buffer::list>;
+  std::vector<std::unique_ptr<rgw::cls::fifo::FIFO>> fifos;
+public:
+  RGWDataChangesFIFO(CephContext* cct, librados::Rados* rados,
+                    const rgw_pool& log_pool)
+    : RGWDataChangesBE(cct) {
+    librados::IoCtx ioctx;
+    auto shards = cct->_conf->rgw_data_log_num_shards;
+    auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
+                           true, false);
+    if (r < 0) {
+      throw bs::system_error(ceph::to_error_code(r));
+    }
+    fifos.resize(shards);
+    for (auto i = 0; i < shards; ++i) {
+      r = rgw::cls::fifo::FIFO::create(ioctx, get_oid(i),
+                                      &fifos[i], null_yield);
+      if (r < 0) {
+       throw bs::system_error(ceph::to_error_code(r));
+      }
+    }
+    ceph_assert(fifos.size() == unsigned(shards));
+    ceph_assert(std::none_of(fifos.cbegin(), fifos.cend(),
+                            [](const auto& p) {
+                              return p == nullptr;
+                            }));
+  }
+  ~RGWDataChangesFIFO() override = default;
+  static int exists(CephContext* cct, librados::Rados* rados,
+                   const rgw_pool& log_pool, bool* exists, bool* has_entries) {
+    auto num_shards = cct->_conf->rgw_data_log_num_shards;
+    librados::IoCtx ioctx;
+    auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
+                           false, false);
+    if (r < 0) {
+      if (r == -ENOENT) {
+       return 0;
+      } else {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": rgw_init_ioctx failed: " << log_pool.name
+                  << ": " << cpp_strerror(-r) << dendl;
+       return r;
+      }
+    }
+    *exists = false;
+    *has_entries = false;
+    for (auto i = 0; i < num_shards; ++i) {
+      std::unique_ptr<rgw::cls::fifo::FIFO> fifo;
+      auto oid = get_oid(cct, i);
+      std::vector<rgw::cls::fifo::list_entry> log_entries;
+      bool more = false;
+      auto r = rgw::cls::fifo::FIFO::open(ioctx, oid,
+                                         &fifo, null_yield);
+      if (r == -ENOENT || r == -ENODATA) {
+       continue;
+      } else if (r < 0) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": unable to open FIFO: " << log_pool << "/" << oid
+                  << ": " << cpp_strerror(-r) << dendl;
+       return r;
+      }
+      *exists = true;
+      r = fifo->list(1, nullopt, &log_entries, &more,
+                    null_yield);
+      if (r < 0) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": unable to list entries: " << log_pool << "/" << oid
+                  << ": " << cpp_strerror(-r) << dendl;
+      } else if (!log_entries.empty()) {
+       *has_entries = true;
+       break;
+      }
+    }
+    return 0;
+  }
+  void prepare(ceph::real_time, const std::string&,
+              ceph::buffer::list&& entry, entries& out) override {
+    if (!std::holds_alternative<centries>(out)) {
+      ceph_assert(std::visit([](auto& v) { return std::empty(v); }, out));
+      out = centries();
+    }
+    std::get<centries>(out).push_back(std::move(entry));
+  }
+  int push(int index, entries&& items) override {
+    auto r = fifos[index]->push(std::get<centries>(items), null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": unable to push to FIFO: " << get_oid(index)
+                << ": " << cpp_strerror(-r) << dendl;
+    }
+    return r;
+  }
+  int push(int index, ceph::real_time,
+          const std::string&,
+          ceph::buffer::list&& bl) override {
+    auto r = fifos[index]->push(std::move(bl), null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": unable to push to FIFO: " << get_oid(index)
+                << ": " << cpp_strerror(-r) << dendl;
+    }
+    return r;
+  }
+  int list(int index, int max_entries,
+          std::vector<rgw_data_change_log_entry>& entries,
+          std::optional<std::string_view> marker,
+          std::string* out_marker, bool* truncated) override {
+    std::vector<rgw::cls::fifo::list_entry> log_entries;
+    bool more = false;
+    auto r = fifos[index]->list(max_entries, marker, &log_entries, &more,
+                               null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": unable to list FIFO: " << get_oid(index)
+                << ": " << cpp_strerror(-r) << dendl;
+      return r;
+    }
+    for (const auto& entry : log_entries) {
+      rgw_data_change_log_entry log_entry;
+      log_entry.log_id = entry.marker;
+      log_entry.log_timestamp = entry.mtime;
+      auto liter = entry.data.cbegin();
+      try {
+       decode(log_entry.entry, liter);
+      } catch (const buffer::error& err) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": failed to decode data changes log entry: "
+                  << err.what() << dendl;
+       return -EIO;
+      }
+      entries.push_back(std::move(log_entry));
+    }
+    if (truncated)
+      *truncated = more;
+    if (out_marker && !log_entries.empty()) {
+      *out_marker = log_entries.back().marker;
+    }
+    return 0;
+  }
+  int get_info(int index, RGWDataChangesLogInfo *info) override {
+    auto& fifo = fifos[index];
+    auto r = fifo->read_meta(null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": unable to get FIFO metadata: " << get_oid(index)
+                << ": " << cpp_strerror(-r) << dendl;
+      return r;
+    }
+    auto m = fifo->meta();
+    auto p = m.head_part_num;
+    if (p < 0) {
+      info->marker = rgw::cls::fifo::marker{}.to_string();
+      info->last_update = ceph::real_clock::zero();
+      return 0;
+    }
+    rgw::cls::fifo::part_info h;
+    r = fifo->get_part_info(p, &h, null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": unable to get part info: " << get_oid(index) << "/" << p
+                << ": " << cpp_strerror(-r) << dendl;
+      return r;
+    }
+    info->marker = rgw::cls::fifo::marker{p, h.last_ofs}.to_string();
+    info->last_update = h.max_time;
+    return 0;
+  }
+  int trim(int index, std::string_view marker) override {
+    auto r = fifos[index]->trim(marker, null_yield);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": unable to trim FIFO: " << get_oid(index)
+                << ": " << cpp_strerror(-r) << dendl;
+    }
+    return r;
+  }
+  int trim(int index, std::string_view marker,
+          librados::AioCompletion* c) override {
+    int r = 0;
+    if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
+      auto pc = c->pc;
+      pc->get();
+      pc->lock.lock();
+      pc->rval = 0;
+      pc->complete = true;
+      pc->lock.unlock();
+      auto cb_complete = pc->callback_complete;
+      auto cb_complete_arg = pc->callback_complete_arg;
+      if (cb_complete)
+       cb_complete(pc, cb_complete_arg);
+
+      auto cb_safe = pc->callback_safe;
+      auto cb_safe_arg = pc->callback_safe_arg;
+      if (cb_safe)
+       cb_safe(pc, cb_safe_arg);
+
+      pc->lock.lock();
+      pc->callback_complete = NULL;
+      pc->callback_safe = NULL;
+      pc->cond.notify_all();
+      pc->put_unlock();
+    } else {
+      r = fifos[index]->trim(marker, c);
+      if (r < 0) {
+       lderr(cct) << __PRETTY_FUNCTION__
+                  << ": unable to trim FIFO: " << get_oid(index)
+                  << ": " << cpp_strerror(-r) << dendl;
+      }
+    }
+    return r;
+  }
+  std::string_view max_marker() const override {
+    static const std::string mm =
+      rgw::cls::fifo::marker::max().to_string();
+    return std::string_view(mm);
+  }
+};
+
+RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
+  : cct(cct),
+    num_shards(cct->_conf->rgw_data_log_num_shards),
+    changes(cct->_conf->rgw_data_log_changes_size) {}
+
+int RGWDataChangesLog::start(const RGWZone* _zone,
+                            const RGWZoneParams& zoneparams,
+                            RGWSI_Cls *cls, librados::Rados* lr)
+{
+  zone = _zone;
+  assert(zone);
+  auto backing = cct->_conf.get_val<std::string>("rgw_data_log_backing");
+  // Should be guaranteed by `set_enum_allowed`
+  ceph_assert(backing == "auto" || backing == "fifo" || backing == "omap");
+  auto log_pool = zoneparams.log_pool;
+  bool omapexists = false, omaphasentries = false;
+  auto r = RGWDataChangesOmap::exists(cct, *cls, &omapexists, &omaphasentries);
+  if (r < 0) {
+    lderr(cct) << __PRETTY_FUNCTION__
+              << ": Error when checking for existing Omap datalog backend: "
+              << cpp_strerror(-r) << dendl;
+  }
+  bool fifoexists = false, fifohasentries = false;
+  r = RGWDataChangesFIFO::exists(cct, lr, log_pool, &fifoexists, &fifohasentries);
+  if (r < 0) {
+    lderr(cct) << __PRETTY_FUNCTION__
+              << ": Error when checking for existing FIFO datalog backend: "
+              << cpp_strerror(-r) << dendl;
+  }
+  bool has_entries = omaphasentries || fifohasentries;
+  bool remove = false;
+
+  if (omapexists && fifoexists) {
+    if (has_entries) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": Both Omap and FIFO backends exist, cannot continue."
+                << dendl;
+      return -EINVAL;
+    }
+    ldout(cct, 0)
+      << __PRETTY_FUNCTION__
+      << ": Both Omap and FIFO backends exist, but are empty. Will remove."
+      << dendl;
+    remove = true;
+  }
+  if (backing == "omap" && fifoexists) {
+    if (has_entries) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": Omap requested, but FIFO backend exists, cannot continue."
+                << dendl;
+      return -EINVAL;
+    }
+    ldout(cct, 0) << __PRETTY_FUNCTION__
+                 << ": Omap requested, FIFO exists, but is empty. Deleting."
+                 << dendl;
+    remove = true;
+  }
+  if (backing == "fifo" && omapexists) {
+    if (has_entries) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": FIFO requested, but Omap backend exists, cannot continue."
+                << dendl;
+      return -EINVAL;
+    }
+    ldout(cct, 0) << __PRETTY_FUNCTION__
+                 << ": FIFO requested, Omap exists, but is empty. Deleting."
+                 << dendl;
+    remove = true;
+  }
+
+  if (remove) {
+    r = RGWDataChangesBE::remove(cct, lr, log_pool);
+    if (r < 0) {
+      lderr(cct) << __PRETTY_FUNCTION__
+                << ": remove failed, cannot continue."
+                << dendl;
+      return r;
+    }
+    omapexists = false;
+    fifoexists = false;
+  }
+
+  try {
+    if (backing == "omap" || (backing == "auto" && omapexists)) {
+      be = std::make_unique<RGWDataChangesOmap>(cct, *cls);
+    } else if (backing != "omap") {
+      be = std::make_unique<RGWDataChangesFIFO>(cct, lr, log_pool);
+    }
+  } catch (bs::system_error& e) {
+    lderr(cct) << __PRETTY_FUNCTION__
+              << ": Error when starting backend: "
+              << e.what() << dendl;
+    return ceph::from_error_code(e.code());
+  }
+
+  ceph_assert(be);
+  renew_thread = make_named_thread("rgw_dt_lg_renew",
+                                  &RGWDataChangesLog::renew_run, this);
+  return 0;
+}
+
+int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
+  const auto& name = bs.bucket.name;
+  auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
+  auto r = (ceph_str_hash_linux(name.data(), name.size()) +
+           shard_shift) % num_shards;
+  return static_cast<int>(r);
+}
+
+int RGWDataChangesLog::renew_entries()
+{
+  if (!zone->log_data)
+    return 0;
+
+  /* we can't keep the bucket name as part of the cls_log_entry, and we need
+   * it later, so we keep two lists under the map */
+  bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
+                             RGWDataChangesBE::entries>> m;
+
+  std::unique_lock l(lock);
+  decltype(cur_cycle) entries;
+  entries.swap(cur_cycle);
+  l.unlock();
+
+  auto ut = real_clock::now();
+  for (const auto& bs : entries) {
+    auto index = choose_oid(bs);
+
+    rgw_data_change change;
+    bufferlist bl;
+    change.entity_type = ENTITY_TYPE_BUCKET;
+    change.key = bs.get_key();
+    change.timestamp = ut;
+    encode(change, bl);
+
+    m[index].first.push_back(bs);
+    be->prepare(ut, change.key, std::move(bl), m[index].second);
+  }
+
+  for (auto& [index, p] : m) {
+    auto& [buckets, entries] = p;
+
+    auto now = real_clock::now();
+
+    auto ret = be->push(index, std::move(entries));
+    if (ret < 0) {
+      /* we don't really need to have a special handling for failed cases here,
+       * as this is just an optimization. */
+      lderr(cct) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
+      return ret;
+    }
+
+    auto expiration = now;
+    expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
+    for (auto& bs : buckets) {
+      update_renewed(bs, expiration);
+    }
+  }
+
+  return 0;
+}
+
+void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
+                                   ChangeStatusPtr& status)
+{
+  ceph_assert(ceph_mutex_is_locked(lock));
+  if (!changes.find(bs, status)) {
+    status = ChangeStatusPtr(new ChangeStatus);
+    changes.add(bs, status);
+  }
+}
+
+void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
+{
+  std::scoped_lock l{lock};
+  cur_cycle.insert(bs);
+}
+
+void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
+                                      real_time expiration)
+{
+  std::scoped_lock l{lock};
+  ChangeStatusPtr status;
+  _get_change(bs, status);
+
+  ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
+                << bs.bucket.name << " shard_id=" << bs.shard_id
+                << " expiration=" << expiration << dendl;
+  status->cur_expiration = expiration;
+}
+
+int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
+  rgw_bucket_shard bs(bucket, shard_id);
+  return choose_oid(bs);
+}
+
+bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket,
+                                     optional_yield y) const
+{
+  if (!bucket_filter) {
+    return true;
+  }
+
+  return bucket_filter(bucket, y);
+}
+
+std::string RGWDataChangesLog::get_oid(int i) const {
+  return be->get_oid(i);
+}
+
+int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
+  auto& bucket = bucket_info.bucket;
+
+  if (!filter_bucket(bucket, null_yield)) {
+    return 0;
+  }
+
+  if (observer) {
+    observer->on_bucket_changed(bucket.get_key());
+  }
+
+  rgw_bucket_shard bs(bucket, shard_id);
+
+  int index = choose_oid(bs);
+  mark_modified(index, bs);
+
+  std::unique_lock l(lock);
+
+  ChangeStatusPtr status;
+  _get_change(bs, status);
+  l.unlock();
+
+  auto now = real_clock::now();
+
+  std::unique_lock sl(status->lock);
+
+  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
+                << " shard_id=" << shard_id << " now=" << now
+                << " cur_expiration=" << status->cur_expiration << dendl;
+
+  if (now < status->cur_expiration) {
+    /* no need to send, recently completed */
+    sl.unlock();
+    register_renew(bs);
+    return 0;
+  }
+
+  RefCountedCond* cond;
+
+  if (status->pending) {
+    cond = status->cond;
+
+    ceph_assert(cond);
+
+    status->cond->get();
+    sl.unlock();
+
+    int ret = cond->wait();
+    cond->put();
+    if (!ret) {
+      register_renew(bs);
+    }
+    return ret;
+  }
+
+  status->cond = new RefCountedCond;
+  status->pending = true;
+
+  ceph::real_time expiration;
+
+  int ret;
+
+  do {
+    status->cur_sent = now;
+
+    expiration = now;
+    expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
+
+    sl.unlock();
+
+    ceph::buffer::list bl;
+    rgw_data_change change;
+    change.entity_type = ENTITY_TYPE_BUCKET;
+    change.key = bs.get_key();
+    change.timestamp = now;
+    encode(change, bl);
+
+    ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
+
+    ret = be->push(index, now, change.key, std::move(bl));
+
+    now = real_clock::now();
+
+    sl.lock();
+
+  } while (!ret && real_clock::now() > expiration);
+
+  cond = status->cond;
+
+  status->pending = false;
+  /* time of when operation started, not completed */
+  status->cur_expiration = status->cur_sent;
+  status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
+  status->cond = nullptr;
+  sl.unlock();
+
+  cond->done(ret);
+  cond->put();
+
+  return ret;
+}
+
+int RGWDataChangesLog::list_entries(int shard, int max_entries,
+                                   std::vector<rgw_data_change_log_entry>& entries,
+                                   std::optional<std::string_view> marker,
+                                   std::string* out_marker, bool* truncated)
+{
+  assert(shard < num_shards);
+  return be->list(shard, max_entries, entries, std::string(marker.value_or("")),
+                 out_marker, truncated);
+}
+
+int RGWDataChangesLog::list_entries(int max_entries,
+                                   std::vector<rgw_data_change_log_entry>& entries,
+                                   LogMarker& marker, bool *ptruncated)
+{
+  bool truncated;
+  entries.clear();
+  for (; marker.shard < num_shards && int(entries.size()) < max_entries;
+       marker.shard++, marker.marker.reset()) {
+    int ret = list_entries(marker.shard, max_entries - entries.size(),
+                          entries, marker.marker, NULL, &truncated);
+    if (ret == -ENOENT) {
+      continue;
+    }
+    if (ret < 0) {
+      return ret;
+    }
+    if (truncated) {
+      *ptruncated = true;
+      return 0;
+    }
+  }
+  *ptruncated = (marker.shard < num_shards);
+  return 0;
+}
+
+int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
+{
+  assert(shard_id < num_shards);
+  return be->get_info(shard_id, info);
+}
+
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
+{
+  assert(shard_id < num_shards);
+  return be->trim(shard_id, marker);
+}
+
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
+                                   librados::AioCompletion* c)
+{
+  assert(shard_id < num_shards);
+  return be->trim(shard_id, marker, c);
+}
+
+bool RGWDataChangesLog::going_down() const
+{
+  return down_flag;
+}
+
+RGWDataChangesLog::~RGWDataChangesLog() {
+  down_flag = true;
+  if (renew_thread.joinable()) {
+    renew_stop();
+    renew_thread.join();
+  }
+}
+
+void RGWDataChangesLog::renew_run() {
+  for (;;) {
+    dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
+    int r = renew_entries();
+    if (r < 0) {
+      dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
+    }
+
+    if (going_down())
+      break;
+
+    int interval = cct->_conf->rgw_data_log_window * 3 / 4;
+    std::unique_lock locker{renew_lock};
+    renew_cond.wait_for(locker, std::chrono::seconds(interval));
+  }
+}
+
+void RGWDataChangesLog::renew_stop()
+{
+  std::lock_guard l{renew_lock};
+  renew_cond.notify_all();
+}
+
+void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
+{
+  auto key = bs.get_key();
+  {
+    std::shared_lock rl{modified_lock}; // read lock to check for existence
+    auto shard = modified_shards.find(shard_id);
+    if (shard != modified_shards.end() && shard->second.count(key)) {
+      return;
+    }
+  }
+
+  std::unique_lock wl{modified_lock}; // write lock for insertion
+  modified_shards[shard_id].insert(key);
+}
+
+std::string_view RGWDataChangesLog::max_marker() const {
+  return be->max_marker();
+}
diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h
new file mode 100644 (file)
index 0000000..5440b3d
--- /dev/null
@@ -0,0 +1,260 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef CEPH_RGW_DATALOG_H
+#define CEPH_RGW_DATALOG_H
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <variant>
+#include <vector>
+
+#include <boost/container/flat_map.hpp>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/format.h>
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+
+#include "include/rados/librados.hpp"
+
+#include "common/ceph_context.h"
+#include "common/ceph_json.h"
+#include "common/ceph_time.h"
+#include "common/Formatter.h"
+#include "common/lru_map.h"
+#include "common/RefCountedObj.h"
+
+#include "cls/log/cls_log_types.h"
+
+#include "rgw_basic_types.h"
+#include "rgw_sync_policy.h"
+#include "rgw_zone.h"
+#include "rgw_trim_bilog.h"
+
+#include "services/svc_cls.h"
+
+namespace bc = boost::container;
+
+enum DataLogEntityType {
+  ENTITY_TYPE_UNKNOWN = 0,
+  ENTITY_TYPE_BUCKET = 1,
+};
+
+struct rgw_data_change {
+  DataLogEntityType entity_type;
+  std::string key;
+  ceph::real_time timestamp;
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    auto t = std::uint8_t(entity_type);
+    encode(t, bl);
+    encode(key, bl);
+    encode(timestamp, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+     DECODE_START(1, bl);
+     std::uint8_t t;
+     decode(t, bl);
+     entity_type = DataLogEntityType(t);
+     decode(key, bl);
+     decode(timestamp, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter* f) const;
+  void decode_json(JSONObj* obj);
+};
+WRITE_CLASS_ENCODER(rgw_data_change)
+
+struct rgw_data_change_log_entry {
+  std::string log_id;
+  ceph::real_time log_timestamp;
+  rgw_data_change entry;
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(log_id, bl);
+    encode(log_timestamp, bl);
+    encode(entry, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+     DECODE_START(1, bl);
+     decode(log_id, bl);
+     decode(log_timestamp, bl);
+     decode(entry, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter* f) const;
+  void decode_json(JSONObj* obj);
+};
+WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
+
+struct RGWDataChangesLogInfo {
+  std::string marker;
+  ceph::real_time last_update;
+
+  void dump(ceph::Formatter* f) const;
+  void decode_json(JSONObj* obj);
+};
+
+struct RGWDataChangesLogMarker {
+  int shard = 0;
+  std::optional<std::string> marker;
+
+  RGWDataChangesLogMarker() = default;
+};
+
+class RGWDataChangesBE {
+protected:
+  CephContext* const cct;
+private:
+  std::string prefix;
+  static std::string_view get_prefix(CephContext* cct) {
+    std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
+    if (prefix.empty()) {
+      prefix = "data_log"sv;
+    }
+    return prefix;
+  }
+public:
+  using entries = std::variant<std::list<cls_log_entry>,
+                              std::vector<ceph::buffer::list>>;
+
+  RGWDataChangesBE(CephContext* const cct)
+    : cct(cct), prefix(get_prefix(cct)) {}
+  virtual ~RGWDataChangesBE() = default;
+
+  static std::string get_oid(CephContext* cct, int i) {
+    return fmt::format("{}.{}", get_prefix(cct), i);
+  }
+  std::string get_oid(int i) {
+    return fmt::format("{}.{}", prefix, i);
+  }
+  static int remove(CephContext* cct, librados::Rados* rados,
+                   const rgw_pool& log_pool);
+
+
+  virtual void prepare(ceph::real_time now,
+                      const std::string& key,
+                      ceph::buffer::list&& entry,
+                      entries& out) = 0;
+  virtual int push(int index, entries&& items) = 0;
+  virtual int push(int index, ceph::real_time now,
+                  const std::string& key,
+                  ceph::buffer::list&& bl) = 0;
+  virtual int list(int shard, int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  std::optional<std::string_view> marker,
+                  std::string* out_marker, bool* truncated) = 0;
+  virtual int get_info(int index, RGWDataChangesLogInfo *info) = 0;
+  virtual int trim(int index, std::string_view marker) = 0;
+  virtual int trim(int index, std::string_view marker,
+                  librados::AioCompletion* c) = 0;
+  virtual std::string_view max_marker() const = 0;
+};
+
+class RGWDataChangesLog {
+  CephContext *cct;
+  rgw::BucketChangeObserver *observer = nullptr;
+  const RGWZone* zone;
+  std::unique_ptr<RGWDataChangesBE> be;
+
+  const int num_shards;
+
+  ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
+  ceph::shared_mutex modified_lock =
+    ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
+  bc::flat_map<int, bc::flat_set<std::string>> modified_shards;
+
+  std::atomic<bool> down_flag = { false };
+
+  struct ChangeStatus {
+    std::shared_ptr<const rgw_sync_policy_info> sync_policy;
+    ceph::real_time cur_expiration;
+    ceph::real_time cur_sent;
+    bool pending = false;
+    RefCountedCond* cond = nullptr;
+    ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
+  };
+
+  using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
+
+  lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
+
+  bc::flat_set<rgw_bucket_shard> cur_cycle;
+
+  void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+  void register_renew(const rgw_bucket_shard& bs);
+  void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
+
+  ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
+  ceph::condition_variable renew_cond;
+  void renew_run();
+  void renew_stop();
+  std::thread renew_thread;
+
+  std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
+  int choose_oid(const rgw_bucket_shard& bs);
+  bool going_down() const;
+  bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
+  int renew_entries();
+
+public:
+
+  RGWDataChangesLog(CephContext* cct);
+  ~RGWDataChangesLog();
+
+  int start(const RGWZone* _zone, const RGWZoneParams& zoneparams,
+           RGWSI_Cls *cls_svc, librados::Rados* lr);
+
+  int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
+  int get_log_shard_id(rgw_bucket& bucket, int shard_id);
+  int list_entries(int shard, int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  std::optional<std::string_view> marker,
+                  std::string* out_marker, bool* truncated);
+  int trim_entries(int shard_id, std::string_view marker);
+  int trim_entries(int shard_id, std::string_view marker,
+                  librados::AioCompletion* c); // :(
+  int get_info(int shard_id, RGWDataChangesLogInfo *info);
+
+  using LogMarker = RGWDataChangesLogMarker;
+
+  int list_entries(int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  LogMarker& marker, bool* ptruncated);
+
+  void mark_modified(int shard_id, const rgw_bucket_shard& bs);
+  auto read_clear_modified() {
+    std::unique_lock wl{modified_lock};
+    decltype(modified_shards) modified;
+    modified.swap(modified_shards);
+    modified_shards.clear();
+    return modified;
+  }
+
+  void set_observer(rgw::BucketChangeObserver *observer) {
+    this->observer = observer;
+  }
+
+  void set_bucket_filter(decltype(bucket_filter)&& f) {
+    bucket_filter = std::move(f);
+  }
+  // a marker that compares greater than any other
+  std::string_view max_marker() const;
+  std::string get_oid(int shard_id) const;
+};
+
+#endif
index 7d1f8d03762843cccc13386e33fc3d1f1bfcc22f..7fc619186af48985b6ddbc226d7a3f45c8f058c7 100644 (file)
@@ -8,6 +8,7 @@
 #include "rgw_acl_s3.h"
 #include "rgw_cache.h"
 #include "rgw_bucket.h"
+#include "rgw_datalog.h"
 #include "rgw_keystone.h"
 #include "rgw_basic_types.h"
 #include "rgw_op.h"
index edd506f3a7dea311a232aae7a71b390e75ac6ee6..2be91a7266a4ed22e878358b6c0fb5da8a6010a2 100644 (file)
@@ -31,6 +31,7 @@
 #include "rgw_rest_conn.h"
 #include "rgw_cr_rados.h"
 #include "rgw_cr_rest.h"
+#include "rgw_datalog.h"
 #include "rgw_putobj_processor.h"
 
 #include "cls/rgw/cls_rgw_ops.h"
@@ -327,7 +328,8 @@ public:
     http_manager.start();
   }
 
-  int notify_all(map<rgw_zone_id, RGWRESTConn *>& conn_map, map<int, set<string> >& shards) {
+  int notify_all(map<rgw_zone_id, RGWRESTConn *>& conn_map,
+                bc::flat_map<int, bc::flat_set<string> >& shards) {
     rgw_http_param_pair pairs[] = { { "type", "data" },
                                     { "notify", NULL },
                                     { "source-zone", store->svc.zone->get_zone_params().get_id().c_str() },
@@ -337,7 +339,7 @@ public:
     for (auto iter = conn_map.begin(); iter != conn_map.end(); ++iter) {
       RGWRESTConn *conn = iter->second;
       RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
-      stack->call(new RGWPostRESTResourceCR<map<int, set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
+      stack->call(new RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
 
       stacks.push_back(stack);
     }
@@ -459,16 +461,15 @@ int RGWDataNotifier::process()
     return 0;
   }
 
-  map<int, set<string> > shards;
-
-  data_log->read_clear_modified(shards);
+  auto shards = data_log->read_clear_modified();
 
   if (shards.empty()) {
     return 0;
   }
 
-  for (map<int, set<string> >::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
-    ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id=" << iter->first << ": " << iter->second << dendl;
+  for (const auto& [shard_id, keys] : shards) {
+    ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id="
+                  << shard_id << ": " << keys << dendl;
   }
 
   notify_mgr.notify_all(store->svc.zone->get_zone_data_notify_to_map(), shards);
index 2642c95fc040e9b9debe1244097b74fe22d20bb2..39c3846ea0395a3b080982886f181fed13275d2e 100644 (file)
@@ -15,6 +15,7 @@
 
 #pragma once
 
+#include "rgw_datalog.h"
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
 #include "rgw_metadata.h"
index 6980b0403071cefb4be77c6b7a99cea281a9e917..fd98711ca6fa59b988d14695538760a11a6cafc4 100644 (file)
 
 #include "common/errno.h"
 
-#include "rgw_metadata.h"
-#include "rgw_user.h"
 #include "rgw_bucket.h"
+#include "rgw_datalog.h"
+#include "rgw_metadata.h"
 #include "rgw_otp.h"
+#include "rgw_user.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -89,7 +90,6 @@ int RGWServices_Def::init(CephContext *cct,
                          bucket_sobj.get());
   cls->init(zone.get(), rados.get());
   config_key_rados->init(rados.get());
-  datalog_rados->init(cls.get());
   mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get());
   meta->init(sysobj.get(), mdlog.get(), meta_bes);
   meta_be_sobj->init(sysobj.get(), mdlog.get());
@@ -140,9 +140,11 @@ int RGWServices_Def::init(CephContext *cct,
       return r;
     }
 
-    r = datalog_rados->start(&zone->get_zone());
+    r = datalog_rados->start(&zone->get_zone(),
+                            zone->get_zone_params(), cls.get(),
+                            rados->get_rados_handle());
     if (r < 0) {
-      ldout(cct, 0) << "ERROR: failed to start datalog service (" << cpp_strerror(-r) << dendl;
+      ldout(cct, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
       return r;
     }
 
index 0826c05c147e24a7b4d913ce7605e955e0edc28d..5231ab3f4dc9157953851d5883d2d6ed48c47c91 100644 (file)
@@ -16,6 +16,7 @@
 #pragma once
 
 #include "rgw_basic_types.h"
+#include "rgw_tag.h"
 
 
 struct rgw_sync_symmetric_group {
index 26d2cb258964d9da6d8b9960cde9a920b771a742..be84c136ec06e96bf5c7735c27927d30ed5c3258 100644 (file)
@@ -4,9 +4,12 @@
 #include <vector>
 #include <string>
 
+#include "common/errno.h"
+
 #include "rgw_trim_datalog.h"
 #include "rgw_cr_rados.h"
 #include "rgw_cr_rest.h"
+#include "rgw_datalog.h"
 #include "rgw_data_sync.h"
 #include "rgw_zone.h"
 #include "rgw_bucket.h"
@@ -46,6 +49,10 @@ class DatalogTrimImplCR : public RGWSimpleCoroutine {
   }
   int request_complete() override {
     int r = cn->completion()->get_return_value();
+    ldout(cct, 0) << __PRETTY_FUNCTION__ << "(): trim of shard=" << shard
+                 << " marker=" << marker << "failed with r=" << r
+                 << ", " << cpp_strerror(r) << dendl;
+
     set_status() << "request complete; ret=" << r;
     if (r != -ENODATA) {
       return r;
index 6a154f7a4d1b9ee07116da1f28f3b15c1aff8945..4e58438d50293cc6e72a5c1eb8dc02b02754770c 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "rgw/rgw_bucket.h"
 #include "rgw/rgw_zone.h"
+#include "rgw/rgw_datalog.h"
 
 #include "cls/rgw/cls_rgw_client.h"
 
index 932affc74e7ed85db947c3df808838fe9d05c6b5..b25f7449275127fd80e28ed807ece740cf70c669 100644 (file)
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include "rgw/rgw_datalog.h"
 #include "rgw/rgw_service.h"
 #include "rgw/rgw_tools.h"
 
@@ -25,7 +26,6 @@
 struct rgw_bucket_dir_header;
 
 class RGWSI_BILog_RADOS;
-class RGWDataChangesLog;
 
 #define RGW_NO_SHARD -1
 
index 1d0941501f82f4c98bedb34e67aac99964806268..88a2b473a716dc85a3a4fcea0a12f26006f35f68 100644 (file)
@@ -51,7 +51,6 @@ public:
   };
 
 private:
-  librados::Rados* get_rados_handle();
   int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx,
                     const OpenParams& params = {});
   int pool_iterate(librados::IoCtx& ioctx,
@@ -63,6 +62,7 @@ private:
 public:
   RGWSI_RADOS(CephContext *cct);
   ~RGWSI_RADOS();
+  librados::Rados* get_rados_handle();
 
   void init() {}
   void shutdown() override;
index cb24bdda802ce1412b324cde6de3f3aaac395b10..2238c8097bb82860289b7655fc7d2f25df40882b 100644 (file)
@@ -33,6 +33,7 @@ extern "C"{
 #include "common/Finisher.h"
 #include "global/global_init.h"
 #include "rgw/rgw_common.h"
+#include "rgw/rgw_datalog.h"
 #include "rgw/rgw_mdlog.h"
 #include "rgw/rgw_bucket.h"
 #include "rgw/rgw_rados.h"