From: Casey Bodley Date: Mon, 5 Dec 2022 21:43:44 +0000 (-0500) Subject: rgw: add BidManager for sync fairness via watch/notify X-Git-Tag: v19.0.0~1234^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=40c01798944a624255a8d53ad6bcc269073c7220;p=ceph.git rgw: add BidManager for sync fairness via watch/notify Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index dc824d35504f..8f05f7bea93b 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -192,7 +192,8 @@ set(librgw_common_srcs driver/rados/rgw_trim_datalog.cc driver/rados/rgw_trim_mdlog.cc driver/rados/rgw_user.cc - driver/rados/rgw_zone.cc) + driver/rados/rgw_zone.cc + driver/rados/sync_fairness.cc) list(APPEND librgw_common_srcs driver/immutable_config/store.cc diff --git a/src/rgw/driver/rados/sync_fairness.cc b/src/rgw/driver/rados/sync_fairness.cc new file mode 100644 index 000000000000..6406d924012d --- /dev/null +++ b/src/rgw/driver/rados/sync_fairness.cc @@ -0,0 +1,349 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2022 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include +#include +#include +#include "include/encoding.h" +#include "include/rados/librados.hpp" +#include "rgw_sal_rados.h" +#include "rgw_cr_rados.h" +#include "sync_fairness.h" + +#include + +#define dout_subsys ceph_subsys_rgw + +namespace rgw::sync_fairness { + +using bid_value = uint16_t; +using bid_vector = std::vector; // bid per replication log shard + +using notifier_id = uint64_t; +using bidder_map = boost::container::flat_map; + +struct BidRequest { + bid_vector bids; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bids, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator& p) { + DECODE_START(1, p); + decode(bids, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(BidRequest); + +struct BidResponse { + bid_vector bids; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(bids, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator& p) { + DECODE_START(1, p); + decode(bids, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(BidResponse); + + +static void encode_notify_request(const bid_vector& bids, bufferlist& bl) +{ + BidRequest request; + request.bids = bids; // copy the vector + encode(request, bl); +} + +static int apply_notify_responses(const bufferlist& bl, bidder_map& bidders) +{ + bc::flat_map, bufferlist> replies; + std::vector> timeouts; + try { + // decode notify responses + auto p = bl.cbegin(); + + using ceph::decode; + decode(replies, p); + decode(timeouts, p); + + // add peers that replied + for (const auto& peer : replies) { + auto q = peer.second.cbegin(); + BidResponse response; + decode(response, q); + + uint64_t peer_id = peer.first.first; + bidders[peer_id] = std::move(response.bids); + } + + // remove peers that timed out + for (const auto& peer : timeouts) { + uint64_t peer_id = peer.first; + bidders.erase(peer_id); + } + } catch (const buffer::error& e) { + return -EIO; + } + return 0; +} + + +// server interface to handle bid notifications from peers +struct Server { + virtual ~Server() = default; + + virtual void on_peer_bid(uint64_t peer_id, bid_vector peer_bids, + bid_vector& my_bids) = 0; +}; + + +// rados watcher for sync fairness notifications +class Watcher : public librados::WatchCtx2 { + const DoutPrefixProvider* dpp; + sal::RadosStore* const store; + rgw_raw_obj obj; + Server* server; + rgw_rados_ref ref; + uint64_t handle = 0; + + public: + Watcher(const DoutPrefixProvider* dpp, sal::RadosStore* store, + const rgw_raw_obj& obj, Server* server) + : dpp(dpp), store(store), obj(obj), server(server) + {} + ~Watcher() + { + stop(); + } + + int start() + { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + return r; + } + + // register a watch on the control object + r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); + if (r == -ENOENT) { + constexpr bool exclusive = true; + r = ref.pool.ioctx().create(ref.obj.oid, exclusive); + if (r == -EEXIST || r == 0) { + r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); + } + } + if (r < 0) { + ldpp_dout(dpp, -1) << "Failed to watch " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + ref.pool.ioctx().close(); + return r; + } + + ldpp_dout(dpp, 10) << "Watching " << ref.obj.oid << dendl; + return 0; + } + + int restart() + { + int r = ref.pool.ioctx().unwatch2(handle); + if (r < 0) { + ldpp_dout(dpp, -1) << "Failed to unwatch on " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + } + r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this); + if (r < 0) { + ldpp_dout(dpp, -1) << "Failed to restart watch on " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + ref.pool.ioctx().close(); + } + return r; + } + + void stop() + { + if (handle) { + ref.pool.ioctx().unwatch2(handle); + ref.pool.ioctx().close(); + } + } + + // respond to bid notifications + void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) + { + if (cookie != handle) { + return; + } + + BidRequest request; + try { + auto p = bl.cbegin(); + decode(request, p); + } catch (const buffer::error& e) { + ldpp_dout(dpp, -1) << "Failed to decode notification: " << e.what() << dendl; + return; + } + + BidResponse response; + server->on_peer_bid(notifier_id, std::move(request.bids), response.bids); + + bufferlist reply; + encode(response, reply); + + ref.pool.ioctx().notify_ack(ref.obj.oid, notify_id, cookie, reply); + } + + // reestablish the watch if it gets disconnected + void handle_error(uint64_t cookie, int err) + { + if (cookie != handle) { + return; + } + if (err == -ENOTCONN) { + ldpp_dout(dpp, 4) << "Disconnected watch on " << ref.obj << dendl; + restart(); + } + } +}; // Watcher + + +class RadosBidManager; + +// RGWRadosNotifyCR wrapper coroutine +class NotifyCR : public RGWCoroutine { + rgw::sal::RadosStore* store; + RadosBidManager* mgr; + rgw_raw_obj obj; + bufferlist request; + bufferlist response; + public: + NotifyCR(rgw::sal::RadosStore* store, RadosBidManager* mgr, + const rgw_raw_obj& obj, const bid_vector& my_bids) + : RGWCoroutine(store->ctx()), store(store), mgr(mgr), obj(obj) + { + encode_notify_request(my_bids, request); + } + + int operate(const DoutPrefixProvider* dpp) override; +}; + + +class RadosBidManager : public BidManager, public Server, public DoutPrefix { + sal::RadosStore* store; + rgw_raw_obj obj; + Watcher watcher; + + std::mutex mutex; + bid_vector my_bids; + bidder_map all_bids; + + public: + RadosBidManager(sal::RadosStore* store, const rgw_raw_obj& watch_obj, + std::size_t num_shards) + : DoutPrefix(store->ctx(), dout_subsys, "sync fairness: "), + store(store), obj(watch_obj), watcher(this, store, watch_obj, this) + { + // fill my_bids with random values + std::random_device rd; + std::default_random_engine rng{rd()}; + std::uniform_int_distribution dist; + + my_bids.resize(num_shards); + std::generate(my_bids.begin(), my_bids.end(), [&] { return dist(rng); }); + } + + int start() override + { + return watcher.start(); + } + + void on_peer_bid(uint64_t peer_id, bid_vector peer_bids, + bid_vector& my_bids) override + { + ldpp_dout(this, 10) << "received bids from peer " << peer_id << dendl; + + auto lock = std::scoped_lock{mutex}; + all_bids[peer_id] = std::move(peer_bids); + my_bids = this->my_bids; + } + + bool is_highest_bidder(std::size_t index) + { + auto lock = std::scoped_lock{mutex}; + const bid_value my_bid = my_bids.at(index); // may throw + + for (const auto& peer_bids : all_bids) { + const bid_value peer_bid = peer_bids.second.at(index); // may throw + if (peer_bid > my_bid) { + return false; + } + } + return true; + } + + RGWCoroutine* notify_cr() + { + auto lock = std::scoped_lock{mutex}; + return new NotifyCR(store, this, obj, my_bids); + } + + void notify_response(const bufferlist& bl) + { + ldpp_dout(this, 10) << "received notify response from peers" << dendl; + + auto lock = std::scoped_lock{mutex}; + + // clear existing bids in case any peers went away. note that this may + // remove newer bids from peer notifications that raced with ours + all_bids.clear(); + + apply_notify_responses(bl, all_bids); + } +}; + + +int NotifyCR::operate(const DoutPrefixProvider* dpp) +{ + static constexpr uint64_t timeout_ms = 15'000; + reenter(this) { + yield call(new RGWRadosNotifyCR(store, obj, request, + timeout_ms, &response)); + if (retcode < 0) { + return set_cr_error(retcode); + } + mgr->notify_response(response); + return set_cr_done(); + } + return 0; +} + + +auto create_rados_bid_manager(sal::RadosStore* store, + const rgw_raw_obj& watch_obj, + std::size_t num_shards) + -> std::unique_ptr +{ + return std::make_unique(store, watch_obj, num_shards); +} + +} // namespace rgw::sync_fairness diff --git a/src/rgw/driver/rados/sync_fairness.h b/src/rgw/driver/rados/sync_fairness.h new file mode 100644 index 000000000000..4c301cbdf294 --- /dev/null +++ b/src/rgw/driver/rados/sync_fairness.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2022 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#pragma once + +#include + +namespace rgw::sal { class RadosStore; } +struct rgw_raw_obj; +class RGWCoroutine; + +/// watch/notify protocol to coordinate the sharing of sync locks +/// +/// each gateway generates a set of random bids, and broadcasts them regularly +/// to other active gateways. in response, the peer gateways send their own set +/// of bids +/// +/// sync will only lock and process log shards where it holds the highest bid +namespace rgw::sync_fairness { + +class BidManager { + public: + virtual ~BidManager() {} + + /// establish a watch, creating the control object if necessary + virtual int start() = 0; + + /// returns true if we're the highest bidder on the given shard index + virtual bool is_highest_bidder(std::size_t index) = 0; + + /// return a coroutine that broadcasts our current bids and records the + /// bids from other peers that respond + virtual RGWCoroutine* notify_cr() = 0; +}; + +// rados BidManager factory +auto create_rados_bid_manager(sal::RadosStore* store, + const rgw_raw_obj& watch_obj, + std::size_t num_shards) + -> std::unique_ptr; + +} // namespace rgw::sync_fairness