From: Adam C. Emerson Date: Wed, 6 Jan 2021 08:40:50 +0000 (-0500) Subject: rgw: Lay groundwork for multigenerational datalog X-Git-Tag: v17.1.0~2399^2~14 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=27ca609755a2c0e8fd501be46bc20026aa33b93c;p=ceph.git rgw: Lay groundwork for multigenerational datalog Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc index f95b796152d3..3ddb2578d354 100644 --- a/src/rgw/cls_fifo_legacy.cc +++ b/src/rgw/cls_fifo_legacy.cc @@ -32,10 +32,6 @@ #include "cls/fifo/cls_fifo_types.h" #include "cls/fifo/cls_fifo_ops.h" -#include "librados/AioCompletionImpl.h" - -#include "rgw_tools.h" - #include "cls_fifo_legacy.h" namespace rgw::cls::fifo { @@ -382,67 +378,6 @@ struct partinfo_completion : public lr::ObjectOperationCompletion { } }; -template -struct Completion { -private: - lr::AioCompletion* _cur = nullptr; - lr::AioCompletion* _super; -public: - - using Ptr = std::unique_ptr; - - lr::AioCompletion* cur() const { - return _cur; - } - lr::AioCompletion* super() const { - return _super; - } - - Completion(lr::AioCompletion* super) : _super(super) { - super->pc->get(); - } - - ~Completion() { - if (_super) { - _super->pc->put(); - } - if (_cur) - _cur->release(); - _super = nullptr; - _cur = nullptr; - } - - // The only times that aio_operate can return an error are: - // 1. The completion contains a null pointer. This should just - // crash, and in our case it does. - // 2. An attempt is made to write to a snapshot. RGW doesn't use - // snapshots, so we don't care. - // - // So we will just assert that initiating an Aio operation succeeds - // and not worry about recovering. - static lr::AioCompletion* call(Ptr&& p) { - p->_cur = lr::Rados::aio_create_completion(static_cast(p.get()), - &cb); - auto c = p->_cur; - p.release(); - return c; - } - static void complete(Ptr&& p, int r) { - auto c = p->_super; - p->_super = nullptr; - c->pc->put(); - rgw_complete_aio_completion(c, r); - } - - static void cb(lr::completion_t, void* arg) { - auto t = static_cast(arg); - auto r = t->_cur->get_return_value(); - t->_cur->release(); - t->_cur = nullptr; - t->handle(Ptr(t), r); - } -}; - lr::ObjectReadOperation get_part_info(CephContext* cct, fifo::part_header* header, std::uint64_t tid, int* r = 0) diff --git a/src/rgw/cls_fifo_legacy.h b/src/rgw/cls_fifo_legacy.h index b6b5f04bb30a..307abbb19891 100644 --- a/src/rgw/cls_fifo_legacy.h +++ b/src/rgw/cls_fifo_legacy.h @@ -38,6 +38,10 @@ #include "cls/fifo/cls_fifo_types.h" #include "cls/fifo/cls_fifo_ops.h" +#include "librados/AioCompletionImpl.h" + +#include "rgw_tools.h" + namespace rgw::cls::fifo { namespace cb = ceph::buffer; namespace fifo = rados::cls::fifo; @@ -265,6 +269,67 @@ public: lr::AioCompletion* c //< AIO Completion ); }; + +template +struct Completion { +private: + lr::AioCompletion* _cur = nullptr; + lr::AioCompletion* _super; +public: + + using Ptr = std::unique_ptr; + + lr::AioCompletion* cur() const { + return _cur; + } + lr::AioCompletion* super() const { + return _super; + } + + Completion(lr::AioCompletion* super) : _super(super) { + super->pc->get(); + } + + ~Completion() { + if (_super) { + _super->pc->put(); + } + if (_cur) + _cur->release(); + _super = nullptr; + _cur = nullptr; + } + + // The only times that aio_operate can return an error are: + // 1. The completion contains a null pointer. This should just + // crash, and in our case it does. + // 2. An attempt is made to write to a snapshot. RGW doesn't use + // snapshots, so we don't care. + // + // So we will just assert that initiating an Aio operation succeeds + // and not worry about recovering. + static lr::AioCompletion* call(Ptr&& p) { + p->_cur = lr::Rados::aio_create_completion(static_cast(p.get()), + &cb); + auto c = p->_cur; + p.release(); + return c; + } + static void complete(Ptr&& p, int r) { + auto c = p->_super; + p->_super = nullptr; + rgw_complete_aio_completion(c, r); + } + + static void cb(lr::completion_t, void* arg) { + auto t = static_cast(arg); + auto r = t->_cur->get_return_value(); + t->_cur->release(); + t->_cur = nullptr; + t->handle(Ptr(t), r); + } +}; + } #endif // CEPH_RGW_CLS_FIFO_LEGACY_H diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index d2c985b29dbf..45c9d1ccaa18 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -383,10 +383,10 @@ int RGWDataChangesLog::start(const RGWZone* _zone, try { switch (*found) { case log_type::omap: - be = std::make_unique(ioctx, *this, 0, num_shards); + bes.set_zero(new RGWDataChangesOmap(ioctx, *this, 0, num_shards)); break; case log_type::fifo: - be = std::make_unique(ioctx, *this, 0, num_shards); + bes.set_zero(new RGWDataChangesFIFO(ioctx, *this, 0, num_shards)); break; } } catch (bs::system_error& e) { @@ -396,7 +396,6 @@ int RGWDataChangesLog::start(const RGWZone* _zone, return ceph::from_error_code(e.code()); } - ceph_assert(be); renew_thread = make_named_thread("rgw_dt_lg_renew", &RGWDataChangesLog::renew_run, this); return 0; @@ -426,6 +425,7 @@ int RGWDataChangesLog::renew_entries() l.unlock(); auto ut = real_clock::now(); + auto be = bes.head(); for (const auto& bs : entries) { auto index = choose_oid(bs); @@ -593,6 +593,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; + auto be = bes.head(); ret = be->push(index, now, change.key, std::move(bl)); now = real_clock::now(); @@ -616,14 +617,44 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI return ret; } +int DataLogBackends::list(int shard, int max_entries, + std::vector& entries, + std::optional marker, + std::string* out_marker, bool* truncated) +{ + auto [gen_id, cursor] = cursorgeno(marker); + std::string out_cursor; + while (max_entries > 0) { + std::vector gentries; + std::unique_lock l(m); + auto i = lower_bound(gen_id); + if (i == end()) return 0; + auto be = i->second; + auto r = be->list(shard, max_entries, gentries, cursor, + &out_cursor, truncated); + if (r < 0) + return r; + + *out_marker = gencursor(gen_id, out_cursor); + for (auto& g : gentries) { + g.log_id = gencursor(gen_id, g.log_id); + } + max_entries -= gentries.size(); + std::move(gentries.begin(), gentries.end(), + std::back_inserter(entries)); + cursor = {}; + ++gen_id; + } + return 0; +} + int RGWDataChangesLog::list_entries(int shard, int max_entries, std::vector& entries, std::optional marker, std::string* out_marker, bool* truncated) { assert(shard < num_shards); - return be->list(shard, max_entries, entries, std::string(marker.value_or("")), - out_marker, truncated); + return bes.list(shard, max_entries, entries, marker, out_marker, truncated); } int RGWDataChangesLog::list_entries(int max_entries, @@ -654,20 +685,105 @@ int RGWDataChangesLog::list_entries(int max_entries, int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info) { assert(shard_id < num_shards); + auto be = bes.head(); return be->get_info(shard_id, info); } +int DataLogBackends::trim_entries(int shard_id, std::string_view marker) +{ + auto [target_gen, cursor] = cursorgen(marker); + std::unique_lock l(m); + const auto head_gen = (end() - 1)->second->gen_id; + const auto tail_gen = begin()->first; + if (target_gen < tail_gen) return 0; + auto r = 0; + for (auto i = lower_bound(0); + i != end() && i->first <= target_gen && i->first <= head_gen && r >= 0; + i = upper_bound(i->first)) { + auto be = i->second; + l.unlock(); + auto c = be->gen_id == target_gen ? cursor : be->max_marker(); + r = be->trim(shard_id, c); + l.lock(); + }; + return r; +} + int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker) { assert(shard_id < num_shards); - return be->trim(shard_id, marker); + return bes.trim_entries(shard_id, marker); +} + +class GenTrim : public rgw::cls::fifo::Completion { +public: + DataLogBackends* const bes; + const int shard_id; + const uint64_t target_gen; + const std::string cursor; + const uint64_t head_gen; + const uint64_t tail_gen; + boost::intrusive_ptr be; + + GenTrim(DataLogBackends* bes, int shard_id, uint64_t target_gen, std::string cursor, + uint64_t head_gen, uint64_t tail_gen, + boost::intrusive_ptr&& be, + lr::AioCompletion* super) + : Completion(super), bes(bes), shard_id(shard_id), target_gen(target_gen), + cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen), + be(std::move(be)) {} + + void handle(Ptr&& p, int r) { + auto gen_id = be->gen_id; + be.reset(); + if (r < 0) { + complete(std::move(p), r); + return; + } + + { + std::unique_lock l(bes->m); + auto i = bes->upper_bound(gen_id); + if (i == bes->end() || i->first > target_gen || i->first > head_gen) { + l.unlock(); + complete(std::move(p), r); + return; + } + be = i->second; + } + auto c = be->gen_id == target_gen ? cursor : be->max_marker(); + r = be->trim(shard_id, c, call(std::move(p))); + } +}; + +void DataLogBackends::trim_entries(int shard_id, std::string_view marker, + librados::AioCompletion* c) +{ + auto [target_gen, cursor] = cursorgen(marker); + std::unique_lock l(m); + const auto head_gen = (end() - 1)->second->gen_id; + const auto tail_gen = begin()->first; + if (target_gen < tail_gen) { + l.unlock(); + rgw_complete_aio_completion(c, 0); + return; + } + auto be = lower_bound(0)->second; + l.unlock(); + auto p = be.get(); + auto gt = std::make_unique(this, shard_id, target_gen, + std::string(cursor), head_gen, tail_gen, + std::move(be), c); + + p->trim(shard_id, cursor, GenTrim::call(std::move(gt))); } int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker, librados::AioCompletion* c) { assert(shard_id < num_shards); - return be->trim(shard_id, marker, c); + bes.trim_entries(shard_id, marker, c); + return 0; } bool RGWDataChangesLog::going_down() const @@ -721,6 +837,7 @@ void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs) modified_shards[shard_id].insert(key); } -std::string_view RGWDataChangesLog::max_marker() const { - return be->max_marker(); +std::string RGWDataChangesLog::max_marker() const { + return gencursor(std::numeric_limits::max(), + "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); } diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index c84449e45bc7..c207b30aa1e5 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -13,6 +13,8 @@ #include #include +#include +#include #undef FMT_HEADER_ONLY #define FMT_HEADER_ONLY 1 @@ -119,12 +121,37 @@ class RGWDataChangesLog; class RGWDataChangesBE; +class DataLogBackends + : private bc::flat_map> { + friend class GenTrim; + + std::mutex m; +public: + + boost::intrusive_ptr head() { + std::unique_lock l(m); + auto i = end(); + --i; + return i->second; + } + int list(int shard, int max_entries, + std::vector& entries, + std::optional marker, + std::string* out_marker, bool* truncated); + int trim_entries(int shard_id, std::string_view marker); + void trim_entries(int shard_id, std::string_view marker, + librados::AioCompletion* c); + void set_zero(RGWDataChangesBE* be) { + emplace(0, be); + } +}; + class RGWDataChangesLog { CephContext *cct; librados::IoCtx ioctx; rgw::BucketChangeObserver *observer = nullptr; const RGWZone* zone; - std::unique_ptr be; + DataLogBackends bes; const int num_shards; std::string get_prefix() { @@ -213,16 +240,15 @@ public: bucket_filter = std::move(f); } // a marker that compares greater than any other - std::string_view max_marker() const; + std::string max_marker() const; std::string get_oid(uint64_t gen_id, int shard_id) const; }; -class RGWDataChangesBE { +class RGWDataChangesBE : public boost::intrusive_ref_counter { protected: librados::IoCtx& ioctx; CephContext* const cct; RGWDataChangesLog& datalog; - uint64_t gen_id; std::string get_oid(int shard_id) { return datalog.get_oid(gen_id, shard_id); @@ -231,6 +257,8 @@ public: using entries = std::variant, std::vector>; + const uint64_t gen_id; + RGWDataChangesBE(librados::IoCtx& ioctx, RGWDataChangesLog& datalog, uint64_t gen_id) diff --git a/src/rgw/rgw_log_backing.h b/src/rgw/rgw_log_backing.h index 242bf0e1c00a..55a3139d11e2 100644 --- a/src/rgw/rgw_log_backing.h +++ b/src/rgw/rgw_log_backing.h @@ -244,4 +244,13 @@ cursorgen(std::string_view cursor_) { return { *gen_id, cursor }; } +inline std::pair +cursorgeno(std::optional cursor) { + if (cursor) { + return cursorgen(*cursor); + } else { + return { 0, ""s }; + } +} + #endif