From af522e06f9df183205bb5e4736b1c80aa5724aa7 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Fri, 5 Jun 2020 19:28:29 -0400 Subject: [PATCH] rgw: Fix async caller into Datalog Signed-off-by: Adam C. Emerson --- src/rgw/rgw_bucket.cc | 6 ++++- src/rgw/rgw_bucket.h | 14 ++++++------ src/rgw/rgw_cr_rados.h | 1 - src/rgw/rgw_trim_datalog.cc | 45 +++++++++++++++++++++++++++++++++---- src/rgw/rgw_trim_datalog.h | 3 +++ 5 files changed, 56 insertions(+), 13 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 32f299efbc164..100c2b94e69fa 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -2367,7 +2367,7 @@ int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker, {}, std::string(marker), c, null_yield); } -bool RGWDataChangesLog::going_down() +bool RGWDataChangesLog::going_down() const { return down_flag; } @@ -2426,6 +2426,10 @@ void RGWDataChangesLog::read_clear_modified(map > &modified) modified_shards.clear(); } +std::string_view RGWDataChangesLog::max_marker() const { + return "99999999"sv; +} + void RGWBucketCompleteInfo::dump(Formatter *f) const { encode_json("bucket_info", info, f); encode_json("attrs", attrs, f); diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 127643187cecc..597707e46677e 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -536,6 +536,10 @@ class RGWDataChangesLog { std::thread renew_thread; std::function bucket_filter; + int choose_oid(const rgw_bucket_shard& bs); + bool going_down() const; + bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const; + int renew_entries(); public: @@ -545,11 +549,8 @@ public: void init(RGWSI_Cls *cls_svc); int start(const RGWZone* _zone); - int choose_oid(const rgw_bucket_shard& bs); - std::string get_oid(int shard_id) const; int add_entry(const RGWBucketInfo& bucket_info, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); - int renew_entries(); int list_entries(int shard, int max_entries, std::vector& entries, std::optional marker, @@ -572,13 +573,12 @@ public: this->observer = observer; } - bool going_down(); - void set_bucket_filter(decltype(bucket_filter)&& f) { bucket_filter = std::move(f); } - - bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const; + // a marker that compares greater than any other + std::string_view max_marker() const; + std::string get_oid(int shard_id) const; }; struct rgw_ep_info { diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 0e8a94154f0e5..0f837d2007fb3 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -1342,7 +1342,6 @@ class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR { CephContext *cct; std::string *last_trim_marker; public: - // a marker that compares greater than any timestamp-based index static constexpr const char* max_marker = "99999999"; RGWSyncLogTrimCR(rgw::sal::RGWRadosStore *store, const std::string& oid, diff --git a/src/rgw/rgw_trim_datalog.cc b/src/rgw/rgw_trim_datalog.cc index f5ab685e8147a..26d2cb258964d 100644 --- a/src/rgw/rgw_trim_datalog.cc +++ b/src/rgw/rgw_trim_datalog.cc @@ -22,6 +22,43 @@ namespace { +class DatalogTrimImplCR : public RGWSimpleCoroutine { + rgw::sal::RGWRadosStore *store; + boost::intrusive_ptr cn; + int shard; + std::string marker; + std::string* last_trim_marker; + + public: + DatalogTrimImplCR(rgw::sal::RGWRadosStore* store, int shard, + const std::string& marker, std::string* last_trim_marker) + : RGWSimpleCoroutine(store->ctx()), store(store), shard(shard), + marker(marker), last_trim_marker(last_trim_marker) { + set_description() << "Datalog trim shard=" << shard + << " marker=" << marker; + } + + int send_request() override { + set_status() << "sending request"; + cn = stack->create_completion_notifier(); + return store->svc()->datalog_rados->trim_entries(shard, marker, + cn->completion()); + } + int request_complete() override { + int r = cn->completion()->get_return_value(); + set_status() << "request complete; ret=" << r; + if (r != -ENODATA) { + return r; + } + // nothing left to trim, update last_trim_marker + if (*last_trim_marker < marker && + marker != store->svc()->datalog_rados->max_marker()) { + *last_trim_marker = marker; + } + return 0; + } +}; + /// return the marker that it's safe to trim up to const std::string& get_stable_marker(const rgw_data_sync_marker& m) { @@ -51,7 +88,7 @@ void take_min_markers(IterIn first, IterIn last, IterOut dest) } // anonymous namespace class DataLogTrimCR : public RGWCoroutine { - using TrimCR = RGWSyncLogTrimCR; + using TrimCR = DatalogTrimImplCR; rgw::sal::RGWRadosStore *store; RGWHTTPManager *http; const int num_shards; @@ -68,7 +105,8 @@ class DataLogTrimCR : public RGWCoroutine { num_shards(num_shards), zone_id(store->svc()->zone->get_zone().id), peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()), - min_shard_markers(num_shards, TrimCR::max_marker), + min_shard_markers(num_shards, + std::string(store->svc()->datalog_rados->max_marker())), last_trim(last_trim) {} @@ -127,8 +165,7 @@ int DataLogTrimCR::operate() ldout(cct, 10) << "trimming log shard " << i << " at marker=" << m << " last_trim=" << last_trim[i] << dendl; - spawn(new TrimCR(store, store->svc()->datalog_rados->get_oid(i), - m, &last_trim[i]), + spawn(new TrimCR(store, i, m, &last_trim[i]), true); } } diff --git a/src/rgw/rgw_trim_datalog.h b/src/rgw/rgw_trim_datalog.h index cf64a4c6c9d49..ffdd2b38450c9 100644 --- a/src/rgw/rgw_trim_datalog.h +++ b/src/rgw/rgw_trim_datalog.h @@ -3,6 +3,9 @@ #pragma once +#include +#include + class RGWCoroutine; class RGWRados; class RGWHTTPManager; -- 2.39.5