]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add BidManager for sync fairness via watch/notify
authorCasey Bodley <cbodley@redhat.com>
Mon, 5 Dec 2022 21:43:44 +0000 (16:43 -0500)
committerShilpa Jagannath <smanjara@redhat.com>
Wed, 10 May 2023 15:24:21 +0000 (11:24 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/driver/rados/sync_fairness.cc [new file with mode: 0644]
src/rgw/driver/rados/sync_fairness.h [new file with mode: 0644]

index dc824d35504f9adc187983e353e7865c4a030d51..8f05f7bea93b79eeaf1b5237b8748ff262c6b7bb 100644 (file)
@@ -192,7 +192,8 @@ set(librgw_common_srcs
   driver/rados/rgw_trim_datalog.cc
   driver/rados/rgw_trim_mdlog.cc
   driver/rados/rgw_user.cc
-  driver/rados/rgw_zone.cc)
+  driver/rados/rgw_zone.cc
+  driver/rados/sync_fairness.cc)
 
 list(APPEND librgw_common_srcs
   driver/immutable_config/store.cc
diff --git a/src/rgw/driver/rados/sync_fairness.cc b/src/rgw/driver/rados/sync_fairness.cc
new file mode 100644 (file)
index 0000000..6406d92
--- /dev/null
@@ -0,0 +1,349 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2022 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include <mutex>
+#include <random>
+#include <vector>
+#include <boost/container/flat_map.hpp>
+#include "include/encoding.h"
+#include "include/rados/librados.hpp"
+#include "rgw_sal_rados.h"
+#include "rgw_cr_rados.h"
+#include "sync_fairness.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace rgw::sync_fairness {
+
+using bid_value = uint16_t;
+using bid_vector = std::vector<bid_value>; // bid per replication log shard
+
+using notifier_id = uint64_t;
+using bidder_map = boost::container::flat_map<notifier_id, bid_vector>;
+
+struct BidRequest {
+  bid_vector bids;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bids, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator& p) {
+    DECODE_START(1, p);
+    decode(bids, p);
+    DECODE_FINISH(p);
+  }
+};
+WRITE_CLASS_ENCODER(BidRequest);
+
+struct BidResponse {
+  bid_vector bids;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bids, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator& p) {
+    DECODE_START(1, p);
+    decode(bids, p);
+    DECODE_FINISH(p);
+  }
+};
+WRITE_CLASS_ENCODER(BidResponse);
+
+
+static void encode_notify_request(const bid_vector& bids, bufferlist& bl)
+{
+  BidRequest request;
+  request.bids = bids; // copy the vector
+  encode(request, bl);
+}
+
+static int apply_notify_responses(const bufferlist& bl, bidder_map& bidders)
+{
+  bc::flat_map<std::pair<uint64_t, uint64_t>, bufferlist> replies;
+  std::vector<std::pair<uint64_t, uint64_t>> timeouts;
+  try {
+    // decode notify responses
+    auto p = bl.cbegin();
+
+    using ceph::decode;
+    decode(replies, p);
+    decode(timeouts, p);
+
+    // add peers that replied
+    for (const auto& peer : replies) {
+      auto q = peer.second.cbegin();
+      BidResponse response;
+      decode(response, q);
+
+      uint64_t peer_id = peer.first.first;
+      bidders[peer_id] = std::move(response.bids);
+    }
+
+    // remove peers that timed out
+    for (const auto& peer : timeouts) {
+      uint64_t peer_id = peer.first;
+      bidders.erase(peer_id);
+    }
+  } catch (const buffer::error& e) {
+    return -EIO;
+  }
+  return 0;
+}
+
+
+// server interface to handle bid notifications from peers
+struct Server {
+  virtual ~Server() = default;
+
+  virtual void on_peer_bid(uint64_t peer_id, bid_vector peer_bids,
+                           bid_vector& my_bids) = 0;
+};
+
+
+// rados watcher for sync fairness notifications
+class Watcher : public librados::WatchCtx2 {
+  const DoutPrefixProvider* dpp;
+  sal::RadosStore* const store;
+  rgw_raw_obj obj;
+  Server* server;
+  rgw_rados_ref ref;
+  uint64_t handle = 0;
+
+ public:
+  Watcher(const DoutPrefixProvider* dpp, sal::RadosStore* store,
+          const rgw_raw_obj& obj, Server* server)
+    : dpp(dpp), store(store), obj(obj), server(server)
+  {}
+  ~Watcher()
+  {
+    stop();
+  }
+
+  int start()
+  {
+    int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
+    if (r < 0) {
+      return r;
+    }
+
+    // register a watch on the control object
+    r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
+    if (r == -ENOENT) {
+      constexpr bool exclusive = true;
+      r = ref.pool.ioctx().create(ref.obj.oid, exclusive);
+      if (r == -EEXIST || r == 0) {
+        r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
+      }
+    }
+    if (r < 0) {
+      ldpp_dout(dpp, -1) << "Failed to watch " << ref.obj
+          << " with " << cpp_strerror(-r) << dendl;
+      ref.pool.ioctx().close();
+      return r;
+    }
+
+    ldpp_dout(dpp, 10) << "Watching " << ref.obj.oid << dendl;
+    return 0;
+  }
+
+  int restart()
+  {
+    int r = ref.pool.ioctx().unwatch2(handle);
+    if (r < 0) {
+      ldpp_dout(dpp, -1) << "Failed to unwatch on " << ref.obj
+          << " with " << cpp_strerror(-r) << dendl;
+    }
+    r = ref.pool.ioctx().watch2(ref.obj.oid, &handle, this);
+    if (r < 0) {
+      ldpp_dout(dpp, -1) << "Failed to restart watch on " << ref.obj
+          << " with " << cpp_strerror(-r) << dendl;
+      ref.pool.ioctx().close();
+    }
+    return r;
+  }
+
+  void stop()
+  {
+    if (handle) {
+      ref.pool.ioctx().unwatch2(handle);
+      ref.pool.ioctx().close();
+    }
+  }
+
+  // respond to bid notifications
+  void handle_notify(uint64_t notify_id, uint64_t cookie,
+                     uint64_t notifier_id, bufferlist& bl)
+  {
+    if (cookie != handle) {
+      return;
+    }
+
+    BidRequest request;
+    try {
+      auto p = bl.cbegin();
+      decode(request, p);
+    } catch (const buffer::error& e) {
+      ldpp_dout(dpp, -1) << "Failed to decode notification: " << e.what() << dendl;
+      return;
+    }
+
+    BidResponse response;
+    server->on_peer_bid(notifier_id, std::move(request.bids), response.bids);
+
+    bufferlist reply;
+    encode(response, reply);
+
+    ref.pool.ioctx().notify_ack(ref.obj.oid, notify_id, cookie, reply);
+  }
+
+  // reestablish the watch if it gets disconnected
+  void handle_error(uint64_t cookie, int err)
+  {
+    if (cookie != handle) {
+      return;
+    }
+    if (err == -ENOTCONN) {
+      ldpp_dout(dpp, 4) << "Disconnected watch on " << ref.obj << dendl;
+      restart();
+    }
+  }
+}; // Watcher
+
+
+class RadosBidManager;
+
+// RGWRadosNotifyCR wrapper coroutine
+class NotifyCR : public RGWCoroutine {
+  rgw::sal::RadosStore* store;
+  RadosBidManager* mgr;
+  rgw_raw_obj obj;
+  bufferlist request;
+  bufferlist response;
+ public:
+  NotifyCR(rgw::sal::RadosStore* store, RadosBidManager* mgr,
+           const rgw_raw_obj& obj, const bid_vector& my_bids)
+      : RGWCoroutine(store->ctx()), store(store), mgr(mgr), obj(obj)
+  {
+    encode_notify_request(my_bids, request);
+  }
+
+  int operate(const DoutPrefixProvider* dpp) override;
+};
+
+
+class RadosBidManager : public BidManager, public Server, public DoutPrefix {
+  sal::RadosStore* store;
+  rgw_raw_obj obj;
+  Watcher watcher;
+
+  std::mutex mutex;
+  bid_vector my_bids;
+  bidder_map all_bids;
+
+ public:
+  RadosBidManager(sal::RadosStore* store, const rgw_raw_obj& watch_obj,
+                 std::size_t num_shards)
+    : DoutPrefix(store->ctx(), dout_subsys, "sync fairness: "),
+      store(store), obj(watch_obj), watcher(this, store, watch_obj, this)
+  {
+    // fill my_bids with random values
+    std::random_device rd;
+    std::default_random_engine rng{rd()};
+    std::uniform_int_distribution<bid_value> dist;
+
+    my_bids.resize(num_shards);
+    std::generate(my_bids.begin(), my_bids.end(), [&] { return dist(rng); });
+  }
+
+  int start() override
+  {
+    return watcher.start();
+  }
+
+  void on_peer_bid(uint64_t peer_id, bid_vector peer_bids,
+                   bid_vector& my_bids) override
+  {
+    ldpp_dout(this, 10) << "received bids from peer " << peer_id << dendl;
+
+    auto lock = std::scoped_lock{mutex};
+    all_bids[peer_id] = std::move(peer_bids);
+    my_bids = this->my_bids;
+  }
+
+  bool is_highest_bidder(std::size_t index)
+  {
+    auto lock = std::scoped_lock{mutex};
+    const bid_value my_bid = my_bids.at(index); // may throw
+
+    for (const auto& peer_bids : all_bids) {
+      const bid_value peer_bid = peer_bids.second.at(index); // may throw
+      if (peer_bid > my_bid) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  RGWCoroutine* notify_cr()
+  {
+    auto lock = std::scoped_lock{mutex};
+    return new NotifyCR(store, this, obj, my_bids);
+  }
+
+  void notify_response(const bufferlist& bl)
+  {
+    ldpp_dout(this, 10) << "received notify response from peers" << dendl;
+
+    auto lock = std::scoped_lock{mutex};
+
+    // clear existing bids in case any peers went away. note that this may
+    // remove newer bids from peer notifications that raced with ours
+    all_bids.clear();
+
+    apply_notify_responses(bl, all_bids);
+  }
+};
+
+
+int NotifyCR::operate(const DoutPrefixProvider* dpp)
+{
+  static constexpr uint64_t timeout_ms = 15'000;
+  reenter(this) {
+    yield call(new RGWRadosNotifyCR(store, obj, request,
+                                    timeout_ms, &response));
+    if (retcode < 0) {
+      return set_cr_error(retcode);
+    }
+    mgr->notify_response(response);
+    return set_cr_done();
+  }
+  return 0;
+}
+
+
+auto create_rados_bid_manager(sal::RadosStore* store,
+                              const rgw_raw_obj& watch_obj,
+                              std::size_t num_shards)
+  -> std::unique_ptr<BidManager>
+{
+  return std::make_unique<RadosBidManager>(store, watch_obj, num_shards);
+}
+
+} // namespace rgw::sync_fairness
diff --git a/src/rgw/driver/rados/sync_fairness.h b/src/rgw/driver/rados/sync_fairness.h
new file mode 100644 (file)
index 0000000..4c301cb
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2022 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#pragma once
+
+#include <memory>
+
+namespace rgw::sal { class RadosStore; }
+struct rgw_raw_obj;
+class RGWCoroutine;
+
+/// watch/notify protocol to coordinate the sharing of sync locks
+///
+/// each gateway generates a set of random bids, and broadcasts them regularly
+/// to other active gateways. in response, the peer gateways send their own set
+/// of bids
+///
+/// sync will only lock and process log shards where it holds the highest bid
+namespace rgw::sync_fairness {
+
+class BidManager {
+ public:
+  virtual ~BidManager() {}
+
+  /// establish a watch, creating the control object if necessary
+  virtual int start() = 0;
+
+  /// returns true if we're the highest bidder on the given shard index
+  virtual bool is_highest_bidder(std::size_t index) = 0;
+
+  /// return a coroutine that broadcasts our current bids and records the
+  /// bids from other peers that respond
+  virtual RGWCoroutine* notify_cr() = 0;
+};
+
+// rados BidManager factory
+auto create_rados_bid_manager(sal::RadosStore* store,
+                              const rgw_raw_obj& watch_obj,
+                              std::size_t num_shards)
+  -> std::unique_ptr<BidManager>;
+
+} // namespace rgw::sync_fairness