From: Adam C. Emerson Date: Wed, 27 Jan 2021 01:07:45 +0000 (-0500) Subject: rgw: Actually pull logbacking_generations into datalog X-Git-Tag: v16.2.2~8^2~9^2~13 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d91df95e800f86d95ece8a0d3c84a260a009a1b9;p=ceph.git rgw: Actually pull logbacking_generations into datalog Signed-off-by: Adam C. Emerson (cherry picked from commit eb0f8ffcc785146a1fb249f4531620787be216ba) Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index 2b04d530d1c6f..c64b22d518a9f 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -178,8 +178,8 @@ public: lr::ObjectWriteOperation op; cls_log_trim(op, {}, {}, {}, std::string(marker)); auto r = rgw_rados_operate(ioctx, oids[index], &op, null_yield); - if (r == -ENOENT) r = 0; - if (r < 0) { + if (r == -ENOENT) r = -ENODATA; + if (r < 0 && r != -ENODATA) { lderr(cct) << __PRETTY_FUNCTION__ << ": failed to get info from " << oids[index] << cpp_strerror(-r) << dendl; @@ -191,7 +191,7 @@ public: lr::ObjectWriteOperation op; cls_log_trim(op, {}, {}, {}, std::string(marker)); auto r = ioctx.aio_operate(oids[index], c, &op, 0); - if (r == -ENOENT) r = 0; + if (r == -ENOENT) r = -ENODATA; if (r < 0) { lderr(cct) << __PRETTY_FUNCTION__ << ": failed to get info from " << oids[index] @@ -333,7 +333,7 @@ public: librados::AioCompletion* c) override { int r = 0; if (marker == rgw::cls::fifo::marker(0, 0).to_string()) { - rgw_complete_aio_completion(c, 0); + rgw_complete_aio_completion(c, -ENODATA); } else { fifos[index]->trim(marker, false, c); } @@ -352,6 +352,65 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} +bs::error_code DataLogBackends::handle_init(entries_t e) noexcept { + std::unique_lock l(m); + + for (const auto& [gen_id, gen] : e) { + if (gen.empty) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": ERROR: given empty generation: gen_id=" << gen_id << dendl; + } + if (count(gen_id) != 0) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": ERROR: generation already exists: gen_id=" << gen_id << dendl; + } + try { + switch (gen.type) { + case log_type::omap: + emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards)); + break; + case log_type::fifo: + emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards)); + break; + default: + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id + << ", type" << gen.type << dendl; + return bs::error_code(EFAULT, bs::system_category()); + } + } catch (const bs::system_error& err) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": error setting up backend: gen_id=" << gen_id + << ", err=" << err.what() << dendl; + return err.code(); + } + } + return {}; +} +bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept { + return handle_init(std::move(e)); +} +bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept { + std::unique_lock l(m); + auto i = cbegin(); + if (i->first < new_tail) { + return {}; + } + if (new_tail >= (cend() - 1)->first) { + lderr(datalog.cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl; + return bs::error_code(EFAULT, bs::system_category()); + } + erase(i, upper_bound(new_tail)); + return {}; +} + + int RGWDataChangesLog::start(const RGWZone* _zone, const RGWZoneParams& zoneparams, librados::Rados* lr) @@ -371,31 +430,21 @@ int RGWDataChangesLog::start(const RGWZone* _zone, return -r; } - auto found = log_backing_type(ioctx, *defbacking, num_shards, - [this](int i) { return get_oid(0, i); }, - null_yield); + auto besr = logback_generations::init( + ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) { + return get_oid(gen_id, shard); + }, + num_shards, *defbacking, null_yield, *this); - if (!found) { - lderr(cct) << __PRETTY_FUNCTION__ - << ": Error when checking log type: " - << found.error().message() << dendl; - } - try { - switch (*found) { - case log_type::omap: - bes.set_zero(new RGWDataChangesOmap(ioctx, *this, 0, num_shards)); - break; - case log_type::fifo: - bes.set_zero(new RGWDataChangesFIFO(ioctx, *this, 0, num_shards)); - break; - } - } catch (bs::system_error& e) { + + if (!besr) { lderr(cct) << __PRETTY_FUNCTION__ - << ": Error when starting backend: " - << e.what() << dendl; - return ceph::from_error_code(e.code()); + << ": Error initializing backends: " + << besr.error().message() << dendl; + return ceph::from_error_code(besr.error()); } + bes = std::move(*besr); renew_thread = make_named_thread("rgw_dt_lg_renew", &RGWDataChangesLog::renew_run, this); return 0; @@ -425,7 +474,7 @@ int RGWDataChangesLog::renew_entries() l.unlock(); auto ut = real_clock::now(); - auto be = bes.head(); + auto be = bes->head(); for (const auto& bs : entries) { auto index = choose_oid(bs); @@ -592,7 +641,7 @@ int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; - auto be = bes.head(); + auto be = bes->head(); ret = be->push(index, now, change.key, std::move(bl)); now = real_clock::now(); @@ -634,7 +683,9 @@ int DataLogBackends::list(int shard, int max_entries, if (r < 0) return r; - *out_marker = gencursor(gen_id, out_cursor); + if (out_marker && !out_cursor.empty()) { + *out_marker = gencursor(gen_id, out_cursor); + } for (auto& g : gentries) { g.log_id = gencursor(gen_id, g.log_id); } @@ -653,7 +704,7 @@ int RGWDataChangesLog::list_entries(int shard, int max_entries, std::string* out_marker, bool* truncated) { assert(shard < num_shards); - return bes.list(shard, max_entries, entries, marker, out_marker, truncated); + return bes->list(shard, max_entries, entries, marker, out_marker, truncated); } int RGWDataChangesLog::list_entries(int max_entries, @@ -684,8 +735,12 @@ int RGWDataChangesLog::list_entries(int max_entries, int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info) { assert(shard_id < num_shards); - auto be = bes.head(); - return be->get_info(shard_id, info); + auto be = bes->head(); + auto r = be->get_info(shard_id, info); + if (!info->marker.empty()) { + info->marker = gencursor(be->gen_id, info->marker); + } + return r; } int DataLogBackends::trim_entries(int shard_id, std::string_view marker) @@ -696,13 +751,13 @@ int DataLogBackends::trim_entries(int shard_id, std::string_view marker) const auto tail_gen = begin()->first; if (target_gen < tail_gen) return 0; auto r = 0; - for (auto i = lower_bound(0); - i != end() && i->first <= target_gen && i->first <= head_gen && r >= 0; - i = upper_bound(i->first)) { - auto be = i->second; + for (auto be = lower_bound(0)->second; + be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0; + be = upper_bound(be->gen_id)->second) { l.unlock(); auto c = be->gen_id == target_gen ? cursor : be->max_marker(); r = be->trim(shard_id, c); + if (r == -ENODATA && be->gen_id < target_gen) r = 0; l.lock(); }; return r; @@ -711,7 +766,7 @@ int DataLogBackends::trim_entries(int shard_id, std::string_view marker) int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker) { assert(shard_id < num_shards); - return bes.trim_entries(shard_id, marker); + return bes->trim_entries(shard_id, marker); } class GenTrim : public rgw::cls::fifo::Completion { @@ -735,6 +790,8 @@ public: void handle(Ptr&& p, int r) { auto gen_id = be->gen_id; be.reset(); + if (r == -ENOENT) r = -ENODATA; + if (r == -ENODATA && gen_id < target_gen) r = 0; if (r < 0) { complete(std::move(p), r); return; @@ -781,7 +838,7 @@ int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker, librados::AioCompletion* c) { assert(shard_id < num_shards); - bes.trim_entries(shard_id, marker, c); + bes->trim_entries(shard_id, marker, c); return 0; } diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index 0915bebde11cf..e9a768d546c00 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -36,6 +36,7 @@ #include "cls/log/cls_log_types.h" #include "rgw_basic_types.h" +#include "rgw_log_backing.h" #include "rgw_sync_policy.h" #include "rgw_zone.h" #include "rgw_trim_bilog.h" @@ -121,11 +122,22 @@ class RGWDataChangesLog; class RGWDataChangesBE; -class DataLogBackends - : private bc::flat_map> { +class DataLogBackends final + : public logback_generations, + private bc::flat_map> { + friend class logback_generations; friend class GenTrim; std::mutex m; + RGWDataChangesLog& datalog; + + DataLogBackends(librados::IoCtx& ioctx, + std::string oid, + fu2::unique_function&& get_oid, + int shards, RGWDataChangesLog& datalog) noexcept + : logback_generations(ioctx, oid, std::move(get_oid), + shards), datalog(datalog) {} public: boost::intrusive_ptr head() { @@ -144,20 +156,28 @@ public: void set_zero(RGWDataChangesBE* be) { emplace(0, be); } + + bs::error_code handle_init(entries_t e) noexcept override; + bs::error_code handle_new_gens(entries_t e) noexcept override; + bs::error_code handle_empty_to(uint64_t new_tail) noexcept override; }; class RGWDataChangesLog { + friend DataLogBackends; CephContext *cct; librados::IoCtx ioctx; rgw::BucketChangeObserver *observer = nullptr; const RGWZone* zone; - DataLogBackends bes; + std::unique_ptr bes; const int num_shards; std::string get_prefix() { auto prefix = cct->_conf->rgw_data_log_obj_prefix; return prefix.empty() ? prefix : "data_log"s; } + std::string metadata_log_oid() { + return get_prefix() + "generations_metadata"s; + } std::string prefix; ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock"); diff --git a/src/rgw/rgw_log_backing.h b/src/rgw/rgw_log_backing.h index 55a3139d11e2b..ef2583c35b204 100644 --- a/src/rgw/rgw_log_backing.h +++ b/src/rgw/rgw_log_backing.h @@ -135,6 +135,8 @@ private: protected: const int shards; +private: + uint64_t watchcookie = 0; obj_version version;