]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
neorados: Create new library
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 14 Aug 2019 00:36:49 +0000 (20:36 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 15 May 2020 14:55:10 +0000 (10:55 -0400)
This library is UNSTABLE and I reserve the right to change its
interface at any time for any reason.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
23 files changed:
src/CMakeLists.txt
src/common/config.cc
src/common/entity_name.cc
src/common/entity_name.h
src/common/snap_types.h
src/include/RADOS/RADOS_Decodable.hpp [deleted file]
src/include/neorados/RADOS.hpp [new file with mode: 0644]
src/include/neorados/RADOS_Decodable.hpp [new file with mode: 0644]
src/include/neorados/buffer_fwd.h [new symlink]
src/include/neorados/completion.h [new symlink]
src/include/object.h
src/mon/ConfigMap.cc
src/mon/ConfigMap.h
src/mount/conf.cc
src/neorados/CMakeLists.txt [new file with mode: 0644]
src/neorados/RADOS.cc [new file with mode: 0644]
src/neorados/RADOSImpl.cc [new file with mode: 0644]
src/neorados/RADOSImpl.h [new file with mode: 0644]
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/CMakeLists.txt
src/test/neorados/CMakeLists.txt [new file with mode: 0644]
src/test/neorados/start_stop.cc [new file with mode: 0644]

index afedbfdd9da2f7d751a42cd502e9e99f29954d86..6d829e0542d1368aded8b1a15c3e0c84d562e422 100644 (file)
@@ -490,6 +490,7 @@ option(WITH_LIBRADOSSTRIPER "build with libradosstriper support" ON)
 
 add_subdirectory(include)
 add_subdirectory(librados)
+add_subdirectory(neorados)
 
 if(WITH_LIBRADOSSTRIPER)
   add_subdirectory(libradosstriper)
index 792cf5fa68cb4d6059a2efe262ea340c22abe745..a8cba940384b8f5ad7c080caefd73653378f3099 100644 (file)
@@ -1319,7 +1319,7 @@ void md_config_t::_get_my_sections(const ConfigValues& values,
 {
   sections.push_back(values.name.to_str());
 
-  sections.push_back(values.name.get_type_name());
+  sections.push_back(values.name.get_type_name().data());
 
   sections.push_back("global");
 }
index 37d02bd94df46f974c1552020827c714b2b1d73c..2eb24829a1c86adf0d3ca95137a4f7df805377e1 100644 (file)
@@ -42,22 +42,22 @@ to_cstr() const
 }
 
 bool EntityName::
-from_str(const string& s)
+from_str(std::string_view s)
 {
   size_t pos = s.find('.');
 
   if (pos == string::npos)
     return false;
-  string type_ = s.substr(0, pos);
-  string id_ = s.substr(pos + 1);
+
+  auto type_ = s.substr(0, pos);
+  auto id_ = s.substr(pos + 1);
   if (set(type_, id_))
     return false;
   return true;
 }
 
 void EntityName::
-set(uint32_t type_, const std::string &id_)
+set(uint32_t type_, std::string_view id_)
 {
   type = type_;
   id = id_;
@@ -72,9 +72,9 @@ set(uint32_t type_, const std::string &id_)
 }
 
 int EntityName::
-set(const std::string &type_, const std::string &id_)
+set(std::string_view type_, std::string_view id_)
 {
-  uint32_t t = str_to_ceph_entity_type(type_.c_str());
+  uint32_t t = str_to_ceph_entity_type(type_);
   if (t == CEPH_ENTITY_TYPE_ANY)
     return -EINVAL;
   set(t, id_);
@@ -88,13 +88,13 @@ set_type(uint32_t type_)
 }
 
 int EntityName::
-set_type(const char *type_)
+set_type(std::string_view type_)
 {
   return set(type_, id);
 }
 
 void EntityName::
-set_id(const std::string &id_)
+set_id(std::string_view id_)
 {
   set(type, id_);
 }
@@ -106,13 +106,13 @@ void EntityName::set_name(entity_name_t n)
   set(n.type(), s);
 }
 
-const char* EntityName::
+std::string_view EntityName::
 get_type_str() const
 {
   return ceph_entity_type_name(type);
 }
 
-const char *EntityName::
+std::string_view EntityName::
 get_type_name() const
 {
   return ceph_entity_type_name(type);
index 1dd56f66d740af8d4c690dbec1aef26e9a01eb98..886c4b4946f8ed94bc29e435388b6e1942b7ffac 100644 (file)
@@ -15,6 +15,8 @@
 #ifndef CEPH_COMMON_ENTITY_NAME_H
 #define CEPH_COMMON_ENTITY_NAME_H
 
+#include <string_view>
+
 #include <ifaddrs.h>
 
 #include "msg/msg_types.h"
@@ -42,15 +44,15 @@ struct EntityName
 
   const std::string& to_str() const;
   const char *to_cstr() const;
-  bool from_str(const std::string& s);
-  void set(uint32_t type_, const std::string &id_);
-  int set(const std::string &type_, const std::string &id_);
+  bool from_str(std::string_view s);
+  void set(uint32_t type_, std::string_view id_);
+  int set(std::string_view type_, std::string_view id_);
   void set_type(uint32_t type_);
-  int set_type(const char *type);
-  void set_id(const std::string &id_);
+  int set_type(std::string_view type);
+  void set_id(std::string_view id_);
   void set_name(entity_name_t n);
 
-  const char* get_type_str() const;
+  std::string_view get_type_str() const;
 
   uint32_t get_type() const { return type; }
   bool is_osd() const { return get_type() == CEPH_ENTITY_TYPE_OSD; }
@@ -59,7 +61,7 @@ struct EntityName
   bool is_client() const { return get_type() == CEPH_ENTITY_TYPE_CLIENT; }
   bool is_mon() const { return get_type() == CEPH_ENTITY_TYPE_MON; }
 
-  const char * get_type_name() const;
+  std::string_view get_type_name() const;
   const std::string &get_id() const;
   bool has_default_id() const;
 
index 175183227d1437360617fafd1e9d8b8e967046f0..958aea339a183ec92cb95d14985d433ed908f012 100644 (file)
@@ -53,7 +53,7 @@ struct SnapContext {
     seq = 0;
     snaps.clear();
   }
-  bool empty() { return seq == 0; }
+  bool empty() const { return seq == 0; }
 
   void encode(ceph::buffer::list& bl) const {
     using ceph::encode;
diff --git a/src/include/RADOS/RADOS_Decodable.hpp b/src/include/RADOS/RADOS_Decodable.hpp
deleted file mode 100644 (file)
index 023ebb8..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2020 Red Hat <contact@redhat.com>
- * Author: Adam C. Emerson
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef RADOS_DECODABLE_HPP
-#define RADOS_DECODABLE_HPP
-
-#include <cstdint>
-#include <cstdlib>
-#include <string>
-#include <iostream>
-#include <tuple>
-#include <utility>
-#include <vector>
-
-namespace RADOS {
-struct Entry {
-  std::string nspace;
-  std::string oid;
-  std::string locator;
-
-  Entry() {}
-  Entry(std::string nspace, std::string oid, std::string locator) :
-    nspace(std::move(nspace)), oid(std::move(oid)), locator(locator) {}
-};
-inline bool operator ==(const Entry& l, const Entry r) {
-  return std::tie(l.nspace, l.oid, l.locator) ==
-    std::tie(r.nspace, r.oid, r.locator);
-}
-inline bool operator !=(const Entry& l, const Entry r) {
-  return std::tie(l.nspace, l.oid, l.locator) !=
-    std::tie(r.nspace, r.oid, r.locator);
-}
-inline bool operator <(const Entry& l, const Entry r) {
-  return std::tie(l.nspace, l.oid, l.locator) <
-    std::tie(r.nspace, r.oid, r.locator);
-}
-inline bool operator <=(const Entry& l, const Entry r) {
-  return std::tie(l.nspace, l.oid, l.locator) <=
-    std::tie(r.nspace, r.oid, r.locator);
-}
-inline bool operator >=(const Entry& l, const Entry r) {
-  return std::tie(l.nspace, l.oid, l.locator) >=
-    std::tie(r.nspace, r.oid, r.locator);
-}
-inline bool operator >(const Entry& l, const Entry r) {
-  return std::tie(l.nspace, l.oid, l.locator) >
-    std::tie(r.nspace, r.oid, r.locator);
-}
-
-inline std::ostream& operator <<(std::ostream& out, const Entry& entry) {
-  if (!entry.nspace.empty())
-    out << entry.nspace << '/';
-  out << entry.oid;
-  if (!entry.locator.empty())
-    out << '@' << entry.locator;
-  return out;
-}
-
-struct CloneInfo {
-  uint64_t cloneid = 0;
-  std::vector<uint64_t> snaps; // ascending
-  std::vector<std::pair<uint64_t, uint64_t>> overlap;// with next newest
-  uint64_t size = 0;
-  CloneInfo() = default;
-};
-
-struct SnapSet {
-  std::vector<CloneInfo> clones; // ascending
-  std::uint64_t seq = 0;   // newest snapid seen by the object
-  SnapSet() = default;
-};
-
-struct ObjWatcher {
-  /// Address of the Watcher
-  std::string addr;
-  /// Watcher ID
-  std::int64_t watcher_id;
-  /// Cookie
-  std::uint64_t cookie;
-  /// Timeout in Seconds
-  std::uint32_t timeout_seconds;
-};
-}
-
-namespace std {
-template<>
-struct hash<::RADOS::Entry> {
-  std::size_t operator ()(::RADOS::Entry e) const {
-    hash<std::string> h;
-    return (h(e.nspace) << 2) ^ (h(e.oid) << 1) ^ h(e.locator);
-  }
-};
-}
-
-#endif // RADOS_DECODABLE_HPP
diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp
new file mode 100644 (file)
index 0000000..288268b
--- /dev/null
@@ -0,0 +1,1119 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef NEORADOS_RADOS_HPP
+#define NEORADOS_RADOS_HPP
+
+#include <cstddef>
+#include <memory>
+#include <tuple>
+#include <string>
+#include <string_view>
+#include <type_traits>
+#include <variant>
+
+#include <boost/asio.hpp>
+
+#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
+#include <boost/uuid/uuid.hpp>
+
+#include <boost/system/error_code.hpp>
+
+// Will be in C++20!
+
+#include "include/expected.hpp"
+
+// Had better be in C++20. Why is this not in Boost?
+
+#include "include/function2.hpp"
+
+// Things broken out so we can decode them in Objecter.
+
+#include "include/neorados/RADOS_Decodable.hpp"
+
+// Needed for type erasure and template support. We can't really avoid
+// it.
+
+#include "common/async/completion.h"
+
+// These are needed for RGW, but in general as a 'shiny new interface'
+// we should try to use forward declarations and provide standard alternatives.
+
+#include "include/common_fwd.h"
+
+#include "include/buffer.h"
+
+#include "common/ceph_time.h"
+
+namespace neorados {
+using namespace std::literals;
+
+class Object;
+class IOContext;
+}
+namespace std {
+template<>
+struct hash<neorados::Object>;
+template<>
+struct hash<neorados::IOContext>;
+}
+
+namespace neorados {
+namespace detail {
+class RADOS;
+}
+
+class RADOS;
+
+// Exists mostly so that repeated operations on the same object don't
+// have to pay for the string copy to construct an object_t.
+
+class Object final {
+  friend RADOS;
+  friend std::hash<Object>;
+
+public:
+  Object();
+  Object(const char* s);
+  Object(std::string_view s);
+  Object(std::string&& s);
+  Object(const std::string& s);
+  ~Object();
+
+  Object(const Object& o);
+  Object& operator =(const Object& o);
+
+  Object(Object&& o);
+  Object& operator =(Object&& o);
+
+  operator std::string_view() const;
+
+  friend std::ostream& operator <<(std::ostream& m, const Object& o);
+  friend bool operator <(const Object& lhs, const Object& rhs);
+  friend bool operator <=(const Object& lhs, const Object& rhs);
+  friend bool operator >=(const Object& lhs, const Object& rhs);
+  friend bool operator >(const Object& lhs, const Object& rhs);
+
+  friend bool operator ==(const Object& lhs, const Object& rhs);
+  friend bool operator !=(const Object& lhs, const Object& rhs);
+
+private:
+
+  static constexpr std::size_t impl_size = 4 * 8;
+  std::aligned_storage_t<impl_size> impl;
+};
+
+// Not the same as the librados::IoCtx, but it does gather together
+// some of the same metadata. Since we're likely to do multiple
+// operations in the same pool or namespace, it doesn't make sense to
+// redo a bunch of lookups and string copies.
+
+class IOContext final {
+  friend RADOS;
+  friend std::hash<IOContext>;
+
+public:
+
+  IOContext();
+  explicit IOContext(std::int64_t pool);
+  IOContext(std::int64_t _pool, std::string_view _ns);
+  IOContext(std::int64_t _pool, std::string&& _ns);
+  ~IOContext();
+
+  IOContext(const IOContext& rhs);
+  IOContext& operator =(const IOContext& rhs);
+
+  IOContext(IOContext&& rhs);
+  IOContext& operator =(IOContext&& rhs);
+
+  std::int64_t pool() const;
+  void pool(std::int64_t _pool);
+
+  std::string_view ns() const;
+  void ns(std::string_view _ns);
+  void ns(std::string&& _ns);
+
+  std::optional<std::string_view> key() const;
+  void key(std::string_view _key);
+  void key(std::string&& _key);
+  void clear_key();
+
+  std::optional<std::int64_t> hash() const;
+  void hash(std::int64_t _hash);
+  void clear_hash();
+
+  std::optional<std::uint64_t> read_snap() const;
+  void read_snap(std::optional<std::uint64_t> _snapid);
+
+  // I can't actually move-construct here since snapid_t is its own
+  // separate class type, not an alias.
+  std::optional<
+    std::pair<std::uint64_t,
+             std::vector<std::uint64_t>>> write_snap_context() const;
+  void write_snap_context(std::optional<
+                         std::pair<std::uint64_t,
+                                     std::vector<std::uint64_t>>> snapc);
+
+  friend std::ostream& operator <<(std::ostream& m, const IOContext& o);
+  friend bool operator <(const IOContext& lhs, const IOContext& rhs);
+  friend bool operator <=(const IOContext& lhs, const IOContext& rhs);
+  friend bool operator >=(const IOContext& lhs, const IOContext& rhs);
+  friend bool operator >(const IOContext& lhs, const IOContext& rhs);
+
+  friend bool operator ==(const IOContext& lhs, const IOContext& rhs);
+  friend bool operator !=(const IOContext& lhs, const IOContext& rhs);
+
+private:
+
+  static constexpr std::size_t impl_size = 16 * 8;
+  std::aligned_storage_t<impl_size> impl;
+};
+
+inline constexpr std::string_view all_nspaces("\001"sv);
+
+enum class cmpxattr_op : std::uint8_t {
+  eq  = 1,
+  ne  = 2,
+  gt  = 3,
+  gte = 4,
+  lt  = 5,
+  lte = 6
+};
+
+namespace alloc_hint {
+enum alloc_hint_t {
+  sequential_write = 1,
+  random_write = 2,
+  sequential_read = 4,
+  random_read = 8,
+  append_only = 16,
+  immutable = 32,
+  shortlived = 64,
+  longlived = 128,
+  compressible = 256,
+  incompressible = 512
+};
+}
+
+class Op {
+  friend RADOS;
+
+public:
+
+  Op(const Op&) = delete;
+  Op& operator =(const Op&) = delete;
+  Op(Op&&);
+  Op& operator =(Op&&);
+  ~Op();
+
+  void set_excl();
+  void set_failok();
+  void set_fadvise_random();
+  void set_fadvise_sequential();
+  void set_fadvise_willneed();
+  void set_fadvise_dontneed();
+  void set_fadvise_nocache();
+
+  void cmpext(uint64_t off, ceph::buffer::list&& cmp_bl, std::size_t* s);
+  void cmpxattr(std::string_view name, cmpxattr_op op,
+               const ceph::buffer::list& val);
+  void cmpxattr(std::string_view name, cmpxattr_op op, std::uint64_t val);
+  void assert_version(uint64_t ver);
+  void assert_exists();
+  void cmp_omap(const boost::container::flat_map<
+                 std::string,
+                 std::pair<ceph::buffer::list, int>>& assertions);
+
+  void exec(std::string_view cls, std::string_view method,
+           const ceph::buffer::list& inbl,
+           ceph::buffer::list* out,
+           boost::system::error_code* ec = nullptr);
+  void exec(std::string_view cls, std::string_view method,
+           const ceph::buffer::list& inbl,
+           fu2::unique_function<void(boost::system::error_code,
+                                     const ceph::buffer::list&) &&> f);
+  void exec(std::string_view cls, std::string_view method,
+           const ceph::buffer::list& inbl,
+           fu2::unique_function<void(boost::system::error_code, int,
+                                     const ceph::buffer::list&) &&> f);
+  void exec(std::string_view cls, std::string_view method,
+           const ceph::buffer::list& inbl,
+           boost::system::error_code* ec = nullptr);
+
+
+  // Flags that apply to all ops in the operation vector
+  void balance_reads();
+  void localize_reads();
+  void order_reads_writes();
+  void ignore_cache();
+  void skiprwlocks();
+  void ignore_overlay();
+  void full_try();
+  void full_force();
+  void ignore_redirect();
+  void ordersnap();
+  void returnvec();
+
+  std::size_t size() const;
+  using Signature = void(boost::system::error_code);
+  using Completion = ceph::async::Completion<Signature>;
+
+  friend std::ostream& operator <<(std::ostream& m, const Op& o);
+protected:
+  Op();
+  static constexpr std::size_t impl_size = 85 * 8;
+  std::aligned_storage_t<impl_size> impl;
+};
+
+// This class is /not/ thread-safe. If you want you can wrap it in
+// something that locks it.
+
+class ReadOp final : public Op {
+  friend RADOS;
+
+public:
+
+  ReadOp() = default;
+  ReadOp(const ReadOp&) = delete;
+  ReadOp(ReadOp&&) = default;
+
+  ReadOp& operator =(const ReadOp&) = delete;
+  ReadOp& operator =(ReadOp&&) = default;
+
+  void read(size_t off, uint64_t len, ceph::buffer::list* out,
+           boost::system::error_code* ec = nullptr);
+  void get_xattr(std::string_view name, ceph::buffer::list* out,
+                boost::system::error_code* ec = nullptr);
+  void get_omap_header(ceph::buffer::list*,
+                      boost::system::error_code* ec = nullptr);
+
+  void sparse_read(uint64_t off, uint64_t len,
+                  ceph::buffer::list* out,
+                  std::vector<std::pair<std::uint64_t, std::uint64_t>>* extents,
+                  boost::system::error_code* ec = nullptr);
+
+  void stat(std::uint64_t* size, ceph::real_time* mtime,
+           boost::system::error_code* ec = nullptr);
+
+  void get_omap_keys(std::optional<std::string_view> start_after,
+                    std::uint64_t max_return,
+                    boost::container::flat_set<std::string>* keys,
+                    bool* truncated,
+                    boost::system::error_code* ec = nullptr);
+
+
+  void get_xattrs(boost::container::flat_map<std::string,
+                                            ceph::buffer::list>* kv,
+                    boost::system::error_code* ec = nullptr);
+
+  void get_omap_vals(std::optional<std::string_view> start_after,
+                    std::optional<std::string_view> filter_prefix,
+                    uint64_t max_return,
+                    boost::container::flat_map<std::string,
+                                               ceph::buffer::list>* kv,
+                    bool* truncated,
+                    boost::system::error_code* ec = nullptr);
+
+
+  void get_omap_vals_by_keys(const boost::container::flat_set<std::string>& keys,
+                            boost::container::flat_map<std::string,
+                                                       ceph::buffer::list>* kv,
+                            boost::system::error_code* ec = nullptr);
+
+  void list_watchers(std::vector<struct ObjWatcher>* watchers,
+                    boost::system::error_code* ec = nullptr);
+
+  void list_snaps(struct SnapSet* snaps,
+                 boost::system::error_code* ec = nullptr);
+};
+
+class WriteOp final : public Op {
+  friend RADOS;
+public:
+
+  WriteOp() = default;
+  WriteOp(const WriteOp&) = delete;
+  WriteOp(WriteOp&&) = default;
+
+  WriteOp& operator =(const WriteOp&) = delete;
+  WriteOp& operator =(WriteOp&&) = default;
+
+  void set_mtime(ceph::real_time t);
+  void create(bool exclusive);
+  void write(uint64_t off, ceph::buffer::list&& bl);
+  void write_full(ceph::buffer::list&& bl);
+  void writesame(std::uint64_t off, std::uint64_t write_len,
+                ceph::buffer::list&& bl);
+  void append(ceph::buffer::list&& bl);
+  void remove();
+  void truncate(uint64_t off);
+  void zero(uint64_t off, uint64_t len);
+  void rmxattr(std::string_view name);
+  void setxattr(std::string_view name,
+               ceph::buffer::list&& bl);
+  void rollback(uint64_t snapid);
+  void set_omap(const boost::container::flat_map<std::string,
+                                                ceph::buffer::list>& map);
+  void set_omap_header(ceph::buffer::list&& bl);
+  void clear_omap();
+  void rm_omap_keys(const boost::container::flat_set<std::string>& to_rm);
+  void set_alloc_hint(uint64_t expected_object_size,
+                     uint64_t expected_write_size,
+                     alloc_hint::alloc_hint_t flags);
+};
+
+
+struct FSStats {
+  uint64_t kb;
+  uint64_t kb_used;
+  uint64_t kb_avail;
+  uint64_t num_objects;
+};
+
+// From librados.h, maybe move into a common file. But I want to see
+// if we need/want to amend/add/remove anything first.
+struct PoolStats {
+  /// space used in bytes
+  uint64_t num_bytes;
+  /// space used in KB
+  uint64_t num_kb;
+  /// number of objects in the pool
+  uint64_t num_objects;
+  /// number of clones of objects
+  uint64_t num_object_clones;
+  /// num_objects * num_replicas
+  uint64_t num_object_copies;
+  /// number of objects missing on primary
+  uint64_t num_objects_missing_on_primary;
+  /// number of objects found on no OSDs
+  uint64_t num_objects_unfound;
+  /// number of objects replicated fewer times than they should be
+  /// (but found on at least one OSD)
+  uint64_t num_objects_degraded;
+  /// number of objects read
+  uint64_t num_rd;
+  /// objects read in KB
+  uint64_t num_rd_kb;
+  /// number of objects written
+  uint64_t num_wr;
+  /// objects written in KB
+  uint64_t num_wr_kb;
+  /// bytes originally provided by user
+  uint64_t num_user_bytes;
+  /// bytes passed compression
+  uint64_t compressed_bytes_orig;
+  /// bytes resulted after compression
+  uint64_t compressed_bytes;
+  /// bytes allocated at storage
+  uint64_t compressed_bytes_alloc;
+};
+
+// Placement group, for PG commands
+struct PG {
+  uint64_t pool;
+  uint32_t seed;
+};
+
+class Cursor final {
+public:
+  static Cursor begin();
+  static Cursor end();
+
+  Cursor();
+  Cursor(const Cursor&);
+  Cursor& operator =(const Cursor&);
+  Cursor(Cursor&&);
+  Cursor& operator =(Cursor&&);
+  ~Cursor();
+
+  friend bool operator ==(const Cursor& lhs,
+                         const Cursor& rhs);
+  friend bool operator !=(const Cursor& lhs,
+                         const Cursor& rhs);
+  friend bool operator <(const Cursor& lhs,
+                        const Cursor& rhs);
+  friend bool operator <=(const Cursor& lhs,
+                         const Cursor& rhs);
+  friend bool operator >=(const Cursor& lhs,
+                         const Cursor& rhs);
+  friend bool operator >(const Cursor& lhs,
+                        const Cursor& rhs);
+
+  std::string to_str() const;
+  static std::optional<Cursor> from_str(const std::string& s);
+
+private:
+  struct end_magic_t {};
+  Cursor(end_magic_t);
+  Cursor(void*);
+  friend RADOS;
+  static constexpr std::size_t impl_size = 16 * 8;
+  std::aligned_storage_t<impl_size> impl;
+};
+
+class RADOS final
+{
+public:
+  static constexpr std::tuple<uint32_t, uint32_t, uint32_t> version() {
+    return {0, 0, 1};
+  }
+
+  using BuildSig = void(boost::system::error_code, RADOS);
+  using BuildComp = ceph::async::Completion<BuildSig>;
+  class Builder {
+    std::optional<std::string> conf_files;
+    std::optional<std::string> cluster;
+    std::optional<std::string> name;
+    std::vector<std::pair<std::string, std::string>> configs;
+    bool no_default_conf = false;
+    bool no_mon_conf = false;
+
+  public:
+    Builder() = default;
+    Builder& add_conf_file(std::string_view v);
+    Builder& set_cluster(std::string_view c) {
+      cluster = std::string(c);
+      return *this;
+    }
+    Builder& set_name(std::string_view n) {
+      name = std::string(n);
+      return *this;
+    }
+    Builder& set_no_default_conf() {
+      no_default_conf = true;
+      return *this;
+    }
+    Builder& set_no_mon_conf() {
+      no_mon_conf = true;
+      return *this;
+    }
+    Builder& set_conf_option(std::string_view opt, std::string_view val) {
+      configs.emplace_back(std::string(opt), std::string(val));
+      return *this;
+    }
+
+    template<typename CompletionToken>
+    auto build(boost::asio::io_context& ioctx, CompletionToken&& token) {
+      boost::asio::async_completion<CompletionToken, BuildSig> init(token);
+      build(ioctx,
+           BuildComp::create(ioctx.get_executor(),
+                             std::move(init.completion_handler)));
+      return init.result.get();
+    }
+
+  private:
+    void build(boost::asio::io_context& ioctx,
+              std::unique_ptr<BuildComp> c);
+  };
+
+
+  template<typename CompletionToken>
+  static auto make_with_cct(CephContext* cct,
+                           boost::asio::io_context& ioctx,
+                           CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, BuildSig> init(token);
+    make_with_cct(cct, ioctx,
+                 BuildComp::create(ioctx.get_executor(),
+                                   std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  RADOS(const RADOS&) = delete;
+  RADOS& operator =(const RADOS&) = delete;
+
+  RADOS(RADOS&&);
+  RADOS& operator =(RADOS&&);
+
+  ~RADOS();
+
+  CephContext* cct();
+
+  using executor_type = boost::asio::io_context::executor_type;
+  executor_type get_executor() const;
+  boost::asio::io_context& get_io_context();
+
+  template<typename CompletionToken>
+  auto execute(const Object& o, const IOContext& ioc, ReadOp&& op,
+              ceph::buffer::list* bl,
+              CompletionToken&& token, uint64_t* objver = nullptr) {
+    boost::asio::async_completion<CompletionToken, Op::Signature> init(token);
+    execute(o, ioc, std::move(op), bl,
+           ReadOp::Completion::create(get_executor(),
+                                      std::move(init.completion_handler)),
+           objver);
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto execute(const Object& o, const IOContext& ioc, WriteOp&& op,
+              CompletionToken&& token, uint64_t* objver = nullptr) {
+    boost::asio::async_completion<CompletionToken, Op::Signature> init(token);
+    execute(o, ioc, std::move(op),
+           Op::Completion::create(get_executor(),
+                                  std::move(init.completion_handler)),
+           objver);
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto execute(const Object& o, std::int64_t pool,
+              ReadOp&& op,
+              ceph::buffer::list* bl,
+              CompletionToken&& token,
+              std::optional<std::string_view> ns = {},
+              std::optional<std::string_view> key = {},
+              uint64_t* objver = nullptr) {
+    boost::asio::async_completion<CompletionToken, Op::Signature> init(token);
+    execute(o, pool, std::move(op), bl,
+           ReadOp::Completion::create(get_executor(),
+                                      std::move(init.completion_handler)),
+           ns, key, objver);
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto execute(const Object& o, std::int64_t pool, WriteOp&& op,
+              CompletionToken&& token,
+              std::optional<std::string_view> ns = {},
+              std::optional<std::string_view> key = {},
+              uint64_t* objver = nullptr) {
+    boost::asio::async_completion<CompletionToken, Op::Signature> init(token);
+    execute(o, pool, std::move(op),
+           Op::Completion::create(get_executor(),
+                                  std::move(init.completion_handler)),
+           ns, key, objver);
+    return init.result.get();
+  }
+
+  boost::uuids::uuid get_fsid() const noexcept;
+
+  using LookupPoolSig = void(boost::system::error_code,
+                            std::int64_t);
+  using LookupPoolComp = ceph::async::Completion<LookupPoolSig>;
+  template<typename CompletionToken>
+  auto lookup_pool(std::string_view name,
+                  CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, LookupPoolSig> init(token);
+    lookup_pool(name,
+               LookupPoolComp::create(get_executor(),
+                                      std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  std::optional<uint64_t> get_pool_alignment(int64_t pool_id);
+
+  using LSPoolsSig = void(std::vector<std::pair<std::int64_t, std::string>>);
+  using LSPoolsComp = ceph::async::Completion<LSPoolsSig>;
+  template<typename CompletionToken>
+  auto list_pools(CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, LSPoolsSig> init(token);
+    list_pools(LSPoolsComp::create(get_executor(),
+                                  std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+
+
+  using SimpleOpSig = void(boost::system::error_code);
+  using SimpleOpComp = ceph::async::Completion<SimpleOpSig>;
+  template<typename CompletionToken>
+  auto create_pool_snap(int64_t pool, std::string_view snapName,
+                       CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    create_pool_snap(pool, snapName,
+                    SimpleOpComp::create(get_executor(),
+                                         std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  using SMSnapSig = void(boost::system::error_code, std::uint64_t);
+  using SMSnapComp = ceph::async::Completion<SMSnapSig>;
+  template<typename CompletionToken>
+  auto allocate_selfmanaged_snap(int64_t pool,
+                                CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SMSnapSig> init(token);
+    allocate_selfmanaged_snap(pool,
+                             SMSnapComp::create(
+                               get_executor(),
+                               std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto delete_pool_snap(int64_t pool, std::string_view snapName,
+                       CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    delete_pool_snap(pool, snapName,
+                    SimpleOpComp::create(get_executor(),
+                                         std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto delete_selfmanaged_snap(int64_t pool, std::string_view snapName,
+                              CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    delete_selfmanaged_snap(pool, snapName,
+                           SimpleOpComp::create(
+                             get_executor(),
+                             std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto create_pool(std::string_view name, std::optional<int> crush_rule,
+                  CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    create_pool(name, crush_rule,
+               SimpleOpComp::create(get_executor(),
+                                    std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto delete_pool(std::string_view name,
+                  CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    delete_pool(name,
+               SimpleOpComp::create(get_executor(),
+                                    std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto delete_pool(int64_t pool,
+                  CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    delete_pool(pool,
+               SimpleOpComp::create(get_executor(),
+                                    std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  using PoolStatSig = void(boost::system::error_code,
+                          boost::container::flat_map<std::string,
+                                                     PoolStats>, bool);
+  using PoolStatComp = ceph::async::Completion<PoolStatSig>;
+  template<typename CompletionToken>
+  auto stat_pools(const std::vector<std::string>& pools,
+                 CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, PoolStatSig> init(token);
+    stat_pools(pools,
+              PoolStatComp::create(get_executor(),
+                                   std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  using StatFSSig = void(boost::system::error_code,
+                        FSStats);
+  using StatFSComp = ceph::async::Completion<StatFSSig>;
+  template<typename CompletionToken>
+  auto statfs(std::optional<int64_t> pool,
+             CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, StatFSSig> init(token);
+    ceph_statfs(pool, StatFSComp::create(get_executor(),
+                                        std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  using WatchCB = fu2::unique_function<void(boost::system::error_code,
+                                           uint64_t notify_id,
+                                           uint64_t cookie,
+                                           uint64_t notifier_id,
+                                           ceph::buffer::list&& bl)>;
+
+  using WatchSig = void(boost::system::error_code ec,
+                       uint64_t cookie);
+  using WatchComp = ceph::async::Completion<WatchSig>;
+  template<typename CompletionToken>
+  auto watch(const Object& o, const IOContext& ioc,
+            std::optional<std::chrono::seconds> timeout,
+            WatchCB&& cb, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, WatchSig> init(token);
+    watch(o, ioc, timeout, std::move(cb),
+         WatchComp::create(get_executor(),
+                           std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto watch(const Object& o, std::int64_t pool,
+            std::optional<std::chrono::seconds> timeout,
+            WatchCB&& cb, CompletionToken&& token,
+            std::optional<std::string_view> ns = {},
+            std::optional<std::string_view> key = {}) {
+    boost::asio::async_completion<CompletionToken, WatchSig> init(token);
+    watch(o, pool, timeout, std::move(cb),
+         WatchComp::create(get_executor(),
+                           std::move(init.completion_handler)),
+         ns, key);
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto notify_ack(const Object& o,
+                 const IOContext& ioc,
+                 uint64_t notify_id,
+                 uint64_t cookie,
+                 ceph::buffer::list&& bl,
+                 CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    notify_ack(o, ioc, notify_id, cookie, std::move(bl),
+              SimpleOpComp::create(get_executor(),
+                                   std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto notify_ack(const Object& o,
+                 std::int64_t pool,
+                 uint64_t notify_id,
+                 uint64_t cookie,
+                 ceph::buffer::list&& bl,
+                 CompletionToken&& token,
+                 std::optional<std::string_view> ns = {},
+                 std::optional<std::string_view> key = {}) {
+    boost::asio::async_completion<CompletionToken, WatchSig> init(token);
+    notify_ack(o, pool, notify_id, cookie, std::move(bl),
+              SimpleOpComp::create(get_executor(),
+                                   std::move(init.completion_handler)),
+              ns, key);
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto unwatch(uint64_t cookie, const IOContext& ioc,
+              CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    unwatch(cookie, ioc,
+           SimpleOpComp::create(get_executor(),
+                                std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto unwatch(uint64_t cookie, std::int64_t pool,
+              CompletionToken&& token,
+              std::optional<std::string_view> ns = {},
+              std::optional<std::string_view> key = {}) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    unwatch(cookie, pool,
+           SimpleOpComp::create(get_executor(),
+                                std::move(init.completion_handler)),
+           ns, key);
+    return init.result.get();
+  }
+
+  // This is one of those places where having to force everything into
+  // a .cc file is really infuriating. If we had modules, that would
+  // let us separate out the implementation details without
+  // sacrificing all the benefits of templates.
+  using VoidOpSig = void();
+  using VoidOpComp = ceph::async::Completion<VoidOpSig>;
+  template<typename CompletionToken>
+  auto flush_watch(CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, VoidOpSig> init(token);
+    flush_watch(VoidOpComp::create(get_executor(),
+                                  std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  using NotifySig = void(boost::system::error_code, ceph::buffer::list);
+  using NotifyComp = ceph::async::Completion<NotifySig>;
+  template<typename CompletionToken>
+  auto notify(const Object& oid, const IOContext& ioc, ceph::buffer::list&& bl,
+             std::optional<std::chrono::milliseconds> timeout,
+             CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, NotifySig> init(token);
+    notify(oid, ioc, std::move(bl), timeout,
+          NotifyComp::create(get_executor(),
+                             std::move(init.completion_handler)));
+
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto notify(const Object& oid, std::int64_t pool, ceph::buffer::list&& bl,
+             std::optional<std::chrono::milliseconds> timeout,
+             CompletionToken&& token,
+             std::optional<std::string_view> ns = {},
+             std::optional<std::string_view> key = {}) {
+    boost::asio::async_completion<CompletionToken, NotifySig> init(token);
+    notify(oid, pool, bl, timeout,
+          NotifyComp::create(get_executor(),
+                             std::move(init.completion_handler)),
+          ns, key);
+
+    return init.result.get();
+  }
+
+  // The versions with pointers are fine for coroutines, but
+  // extraordinarily unappealing for callback-oriented programming.
+  using EnumerateSig = void(boost::system::error_code,
+                           std::vector<Entry>,
+                           Cursor);
+  using EnumerateComp = ceph::async::Completion<EnumerateSig>;
+  template<typename CompletionToken>
+  auto enumerate_objects(const IOContext& ioc, const Cursor& begin,
+                        const Cursor& end, const std::uint32_t max,
+                        const ceph::buffer::list& filter,
+                        CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, EnumerateSig> init(token);
+    enumerate_objects(ioc, begin, end, max, filter,
+                     EnumerateComp::create(get_executor(),
+                                           std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto enumerate_objects(std::int64_t pool, const Cursor& begin,
+                        const Cursor& end, const std::uint32_t max,
+                        const ceph::buffer::list& filter,
+                        CompletionToken&& token,
+                        std::optional<std::string_view> ns = {},
+                        std::optional<std::string_view> key = {}) {
+    boost::asio::async_completion<CompletionToken, EnumerateSig> init(token);
+    enumerate_objects(pool, begin, end, max, filter,
+                     EnumerateComp::create(get_executor(),
+                                           std::move(init.completion_handler)),
+                     ns, key);
+    return init.result.get();
+  }
+
+  using CommandSig = void(boost::system::error_code,
+                         std::string, ceph::buffer::list);
+  using CommandComp = ceph::async::Completion<CommandSig>;
+  template<typename CompletionToken>
+  auto osd_command(int osd, std::vector<std::string>&& cmd,
+                  ceph::buffer::list&& in, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    osd_command(osd, std::move(cmd), std::move(in),
+               CommandComp::create(get_executor(),
+                                     std::move(init.completion_handler)));
+    return init.result.get();
+  }
+  template<typename CompletionToken>
+  auto pg_command(PG pg, std::vector<std::string>&& cmd,
+                 ceph::buffer::list&& in, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    pg_command(pg, std::move(cmd), std::move(in),
+              CommandComp::create(get_executor(),
+                                     std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto mon_command(std::vector<std::string> command,
+                  const ceph::buffer::list& bl,
+                  std::string* outs, ceph::buffer::list* outbl,
+                  CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    mon_command(command, bl, outs, outbl,
+               SimpleOpComp::create(get_executor(),
+                                    std::move(init.completion_handler)));
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto enable_application(std::string_view pool, std::string_view app_name,
+                         bool force, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
+    enable_application(pool, app_name, force,
+                      SimpleOpComp::create(get_executor(),
+                                           std::move(init.completion_handler)));
+    return init.result.get();
+  }
+  uint64_t instance_id() const;
+
+private:
+
+  RADOS();
+
+  friend Builder;
+
+  RADOS(std::unique_ptr<detail::RADOS> impl);
+  static void make_with_cct(CephContext* cct,
+                           boost::asio::io_context& ioctx,
+                   std::unique_ptr<BuildComp> c);
+
+  void execute(const Object& o, const IOContext& ioc, ReadOp&& op,
+              ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
+              uint64_t* objver);
+
+  void execute(const Object& o, const IOContext& ioc, WriteOp&& op,
+              std::unique_ptr<Op::Completion> c, uint64_t* objver);
+
+  void execute(const Object& o, std::int64_t pool, ReadOp&& op,
+              ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
+              std::optional<std::string_view> ns,
+              std::optional<std::string_view> key,
+              uint64_t* objver);
+
+  void execute(const Object& o, std::int64_t pool, WriteOp&& op,
+              std::unique_ptr<Op::Completion> c,
+              std::optional<std::string_view> ns,
+              std::optional<std::string_view> key,
+              uint64_t* objver);
+
+  void lookup_pool(std::string_view name, std::unique_ptr<LookupPoolComp> c);
+  void list_pools(std::unique_ptr<LSPoolsComp> c);
+  void create_pool_snap(int64_t pool, std::string_view snapName,
+                       std::unique_ptr<SimpleOpComp> c);
+  void allocate_selfmanaged_snap(int64_t pool, std::unique_ptr<SMSnapComp> c);
+  void delete_pool_snap(int64_t pool, std::string_view snapName,
+                       std::unique_ptr<SimpleOpComp> c);
+  void delete_selfmanaged_snap(int64_t pool, std::uint64_t snap,
+                              std::unique_ptr<SimpleOpComp> c);
+  void create_pool(std::string_view name, std::optional<int> crush_rule,
+                  std::unique_ptr<SimpleOpComp> c);
+  void delete_pool(std::string_view name,
+                  std::unique_ptr<SimpleOpComp> c);
+  void delete_pool(int64_t pool,
+                  std::unique_ptr<SimpleOpComp> c);
+  void stat_pools(const std::vector<std::string>& pools,
+                 std::unique_ptr<PoolStatComp> c);
+  void stat_fs(std::optional<std::int64_t> pool,
+              std::unique_ptr<StatFSComp> c);
+
+  void watch(const Object& o, const IOContext& ioc,
+            std::optional<std::chrono::seconds> timeout,
+            WatchCB&& cb, std::unique_ptr<WatchComp> c);
+  void watch(const Object& o, std::int64_t pool,
+            std::optional<std::chrono::seconds> timeout,
+            WatchCB&& cb, std::unique_ptr<WatchComp> c,
+            std::optional<std::string_view> ns,
+            std::optional<std::string_view> key);
+  tl::expected<ceph::timespan, boost::system::error_code>
+  watch_check(uint64_t cookie);
+  void notify_ack(const Object& o,
+                 const IOContext& _ioc,
+                 uint64_t notify_id,
+                 uint64_t cookie,
+                 ceph::buffer::list&& bl,
+                 std::unique_ptr<SimpleOpComp>);
+  void notify_ack(const Object& o,
+                 std::int64_t pool,
+                 uint64_t notify_id,
+                 uint64_t cookie,
+                 ceph::buffer::list&& bl,
+                 std::unique_ptr<SimpleOpComp>,
+                 std::optional<std::string_view> ns,
+                 std::optional<std::string_view> key);
+  void unwatch(uint64_t cookie, const IOContext& ioc,
+              std::unique_ptr<SimpleOpComp>);
+  void unwatch(uint64_t cookie, std::int64_t pool,
+              std::unique_ptr<SimpleOpComp>,
+              std::optional<std::string_view> ns,
+              std::optional<std::string_view> key);
+  void notify(const Object& oid, const IOContext& ioctx,
+             ceph::buffer::list&& bl,
+             std::optional<std::chrono::milliseconds> timeout,
+             std::unique_ptr<NotifyComp> c);
+  void notify(const Object& oid, std::int64_t pool,
+             ceph::buffer::list&& bl,
+             std::optional<std::chrono::milliseconds> timeout,
+             std::unique_ptr<NotifyComp> c,
+             std::optional<std::string_view> ns,
+             std::optional<std::string_view> key);
+  void flush_watch(std::unique_ptr<VoidOpComp>);
+
+  void enumerate_objects(const IOContext& ioc, const Cursor& begin,
+                        const Cursor& end, const std::uint32_t max,
+                        const ceph::buffer::list& filter,
+                        std::vector<Entry>* ls,
+                        Cursor* cursor,
+                        std::unique_ptr<SimpleOpComp> c);
+  void enumerate_objects(std::int64_t pool, const Cursor& begin,
+                        const Cursor& end, const std::uint32_t max,
+                        const ceph::buffer::list& filter,
+                        std::vector<Entry>* ls,
+                        Cursor* cursor,
+                        std::unique_ptr<SimpleOpComp> c,
+                        std::optional<std::string_view> ns,
+                        std::optional<std::string_view> key);
+  void enumerate_objects(const IOContext& ioc, const Cursor& begin,
+                        const Cursor& end, const std::uint32_t max,
+                        const ceph::buffer::list& filter,
+                        std::unique_ptr<EnumerateComp> c);
+  void enumerate_objects(std::int64_t pool, const Cursor& begin,
+                        const Cursor& end, const std::uint32_t max,
+                        const ceph::buffer::list& filter,
+                        std::unique_ptr<EnumerateComp> c,
+                        std::optional<std::string_view> ns,
+                        std::optional<std::string_view> key);
+  void osd_command(int osd, std::vector<std::string>&& cmd,
+                  ceph::buffer::list&& in, std::unique_ptr<CommandComp> c);
+  void pg_command(PG pg, std::vector<std::string>&& cmd,
+                 ceph::buffer::list&& in, std::unique_ptr<CommandComp> c);
+
+  void mon_command(std::vector<std::string> command,
+                  const ceph::buffer::list& bl,
+                  std::string* outs, ceph::buffer::list* outbl,
+                  std::unique_ptr<SimpleOpComp> c);
+
+  void enable_application(std::string_view pool, std::string_view app_name,
+                         bool force, std::unique_ptr<SimpleOpComp> c);
+
+
+  // Since detail::RADOS has immovable things inside it, hold a
+  // unique_ptr to it so we can be moved.
+  std::unique_ptr<detail::RADOS> impl;
+};
+
+enum class errc {
+  pool_dne = 1,
+  invalid_snapcontext
+};
+
+const boost::system::error_category& error_category() noexcept;
+}
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::neorados::errc> {
+  static const bool value = true;
+};
+
+template<>
+struct is_error_condition_enum<::neorados::errc> {
+  static const bool value = false;
+};
+}
+
+namespace neorados {
+//  explicit conversion:
+inline boost::system::error_code make_error_code(errc e) noexcept {
+  return { static_cast<int>(e), error_category() };
+}
+
+// implicit conversion:
+inline boost::system::error_condition make_error_condition(errc e) noexcept {
+  return { static_cast<int>(e), error_category() };
+}
+}
+
+namespace std {
+template<>
+struct hash<neorados::Object> {
+  size_t operator ()(const neorados::Object& r) const;
+};
+template<>
+struct hash<neorados::IOContext> {
+  size_t operator ()(const neorados::IOContext& r) const;
+};
+} // namespace std
+
+#endif // NEORADOS_RADOS_HPP
diff --git a/src/include/neorados/RADOS_Decodable.hpp b/src/include/neorados/RADOS_Decodable.hpp
new file mode 100644 (file)
index 0000000..9654a84
--- /dev/null
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef NEORADOS_RADOS_DECODABLE_HPP
+#define NEORADOS_RADOS_DECODABLE_HPP
+
+#include <cstdint>
+#include <cstdlib>
+#include <string>
+#include <iostream>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+namespace neorados {
+struct Entry {
+  std::string nspace;
+  std::string oid;
+  std::string locator;
+
+  Entry() {}
+  Entry(std::string nspace, std::string oid, std::string locator) :
+    nspace(std::move(nspace)), oid(std::move(oid)), locator(locator) {}
+};
+inline bool operator ==(const Entry& l, const Entry r) {
+  return std::tie(l.nspace, l.oid, l.locator) ==
+    std::tie(r.nspace, r.oid, r.locator);
+}
+inline bool operator !=(const Entry& l, const Entry r) {
+  return std::tie(l.nspace, l.oid, l.locator) !=
+    std::tie(r.nspace, r.oid, r.locator);
+}
+inline bool operator <(const Entry& l, const Entry r) {
+  return std::tie(l.nspace, l.oid, l.locator) <
+    std::tie(r.nspace, r.oid, r.locator);
+}
+inline bool operator <=(const Entry& l, const Entry r) {
+  return std::tie(l.nspace, l.oid, l.locator) <=
+    std::tie(r.nspace, r.oid, r.locator);
+}
+inline bool operator >=(const Entry& l, const Entry r) {
+  return std::tie(l.nspace, l.oid, l.locator) >=
+    std::tie(r.nspace, r.oid, r.locator);
+}
+inline bool operator >(const Entry& l, const Entry r) {
+  return std::tie(l.nspace, l.oid, l.locator) >
+    std::tie(r.nspace, r.oid, r.locator);
+}
+
+inline std::ostream& operator <<(std::ostream& out, const Entry& entry) {
+  if (!entry.nspace.empty())
+    out << entry.nspace << '/';
+  out << entry.oid;
+  if (!entry.locator.empty())
+    out << '@' << entry.locator;
+  return out;
+}
+
+struct CloneInfo {
+  uint64_t cloneid = 0;
+  std::vector<uint64_t> snaps; // ascending
+  std::vector<std::pair<uint64_t, uint64_t>> overlap;// with next newest
+  uint64_t size = 0;
+  CloneInfo() = default;
+};
+
+struct SnapSet {
+  std::vector<CloneInfo> clones; // ascending
+  std::uint64_t seq = 0;   // newest snapid seen by the object
+  SnapSet() = default;
+};
+
+struct ObjWatcher {
+  /// Address of the Watcher
+  std::string addr;
+  /// Watcher ID
+  std::int64_t watcher_id;
+  /// Cookie
+  std::uint64_t cookie;
+  /// Timeout in Seconds
+  std::uint32_t timeout_seconds;
+};
+}
+
+namespace std {
+template<>
+struct hash<::neorados::Entry> {
+  std::size_t operator ()(::neorados::Entry e) const {
+    hash<std::string> h;
+    return (h(e.nspace) << 2) ^ (h(e.oid) << 1) ^ h(e.locator);
+  }
+};
+}
+
+#endif // RADOS_DECODABLE_HPP
diff --git a/src/include/neorados/buffer_fwd.h b/src/include/neorados/buffer_fwd.h
new file mode 120000 (symlink)
index 0000000..bd1f6f1
--- /dev/null
@@ -0,0 +1 @@
+../buffer_fwd.h
\ No newline at end of file
diff --git a/src/include/neorados/completion.h b/src/include/neorados/completion.h
new file mode 120000 (symlink)
index 0000000..100678f
--- /dev/null
@@ -0,0 +1 @@
+../../common/async/completion.h
\ No newline at end of file
index b0f21bc455806804bcf50e51873f8aa88929abc2..96951e74de79b881028348d86c05b72d747d15ef 100644 (file)
 
 #include <cstdint>
 #include <cstdio>
-#include <iosfwd>
 #include <iomanip>
+#include <iosfwd>
 #include <string>
-
+#include <string>
+#include <string_view>
 
 #include "include/rados.h"
 #include "include/unordered_map.h"
@@ -40,6 +41,8 @@ struct object_t {
   object_t(const char *s) : name(s) {}
   // cppcheck-suppress noExplicitConstructor
   object_t(const std::string& s) : name(s) {}
+  object_t(std::string&& s) : name(std::move(s)) {}
+  object_t(std::string_view s) : name(s) {}
 
   void swap(object_t& o) {
     name.swap(o.name);
index bf823cc57a27ce253bd8cd15fa867f549d555555..7fb13841df4230871fde8c0550d73de7a6ee0c11 100644 (file)
@@ -144,7 +144,7 @@ ConfigMap::generate_entity_map(
   vector<pair<string,Section*>> sections = { make_pair("global", &global) };
   auto p = by_type.find(name.get_type_name());
   if (p != by_type.end()) {
-    sections.push_back(make_pair(name.get_type_name(), &p->second));
+    sections.emplace_back(name.get_type_name(), &p->second);
   }
   vector<std::string> name_bits;
   boost::split(name_bits, name.to_str(), [](char c){ return c == '.'; });
index bac00955f9a9a1906c8be81693c7b1a7b05c92cb..71b725a40af0439ac29fca67621d134e16615026 100644 (file)
@@ -97,8 +97,8 @@ struct Section {
 
 struct ConfigMap {
   Section global;
-  std::map<std::string,Section> by_type;
-  std::map<std::string,Section> by_id;
+  std::map<std::string,Section, std::less<>> by_type;
+  std::map<std::string,Section, std::less<>> by_id;
 
   Section *find_section(const std::string& name) {
     if (name == "global") {
index ea7103f26096a71a321c83e5a7db3770f5ab6478..7957683175ee75a4bae6686dd77e73bd6bdb0022 100644 (file)
@@ -9,12 +9,15 @@
 #include "common/async/context_pool.h"
 #include "common/ceph_context.h"
 #include "common/ceph_argparse.h"
-#include "global/global_init.h"
 #include "common/config.h"
+#include "global/global_init.h"
+
 #include "auth/KeyRing.h"
-#include "mount.ceph.h"
 #include "mon/MonClient.h"
 
+#include "mount.ceph.h"
+
+
 extern "C" void mount_ceph_get_config_info(const char *config_file,
                                           const char *name,
                                           struct ceph_config_info *cci)
diff --git a/src/neorados/CMakeLists.txt b/src/neorados/CMakeLists.txt
new file mode 100644 (file)
index 0000000..3e56edf
--- /dev/null
@@ -0,0 +1,37 @@
+add_library(neorados_objs OBJECT
+  RADOSImpl.cc)
+add_library(neorados_api_obj OBJECT
+  RADOS.cc)
+
+add_library(libneorados STATIC
+  $<TARGET_OBJECTS:neorados_api_obj>
+  $<TARGET_OBJECTS:neorados_objs>)
+target_link_libraries(libneorados PRIVATE
+  osdc ceph-common cls_lock_client fmt::fmt
+  ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS})
+
+# if(ENABLE_SHARED)
+#   add_library(libneorados ${CEPH_SHARED}
+#     $<TARGET_OBJECTS:neorados_api_obj>
+#     $<TARGET_OBJECTS:neorados_objs>
+#     $<TARGET_OBJECTS:common_buffer_obj>)
+#   set_target_properties(libneorados PROPERTIES
+#     OUTPUT_NAME RADOS
+#     VERSION 0.0.1
+#     SOVERSION 1
+#     CXX_VISIBILITY_PRESET hidden
+#     VISIBILITY_INLINES_HIDDEN ON)
+#   if(NOT APPLE)
+#     set_property(TARGET libneorados APPEND_STRING PROPERTY
+#       LINK_FLAGS " -Wl,--exclude-libs,ALL")
+#   endif()
+# else(ENABLE_SHARED)
+#   add_library(libneorados STATIC
+#     $<TARGET_OBJECTS:neorados_api_obj>
+#     $<TARGET_OBJECTS:neorados_objs>)
+# endif(ENABLE_SHARED)
+# target_link_libraries(libneorados PRIVATE
+#   osdc ceph-common cls_lock_client
+#   ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS})
+# target_link_libraries(libneorados ${rados_libs})
+# install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR})
diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc
new file mode 100644 (file)
index 0000000..bfc09ae
--- /dev/null
@@ -0,0 +1,1658 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#define BOOST_BIND_NO_PLACEHOLDERS
+
+#include <optional>
+#include <string_view>
+
+#include <boost/intrusive_ptr.hpp>
+
+#include <fmt/format.h>
+
+#include "include/ceph_fs.h"
+
+#include "common/ceph_context.h"
+#include "common/ceph_argparse.h"
+#include "common/common_init.h"
+#include "common/hobject.h"
+
+#include "global/global_init.h"
+
+#include "osd/osd_types.h"
+#include "osdc/error_code.h"
+
+#include "neorados/RADOSImpl.h"
+#include "include/neorados/RADOS.hpp"
+
+namespace bc = boost::container;
+namespace bs = boost::system;
+namespace ca = ceph::async;
+namespace cb = ceph::buffer;
+
+namespace neorados {
+// Object
+
+Object::Object() {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t();
+}
+
+Object::Object(const char* s) {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t(s);
+}
+
+Object::Object(std::string_view s) {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t(s);
+}
+
+Object::Object(std::string&& s) {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t(std::move(s));
+}
+
+Object::Object(const std::string& s) {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t(s);
+}
+
+Object::~Object() {
+  reinterpret_cast<object_t*>(&impl)->~object_t();
+}
+
+Object::Object(const Object& o) {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t(*reinterpret_cast<const object_t*>(&o.impl));
+}
+Object& Object::operator =(const Object& o) {
+  *reinterpret_cast<object_t*>(&impl) =
+    *reinterpret_cast<const object_t*>(&o.impl);
+  return *this;
+}
+Object::Object(Object&& o) {
+  static_assert(impl_size >= sizeof(object_t));
+  new (&impl) object_t(std::move(*reinterpret_cast<object_t*>(&o.impl)));
+}
+Object& Object::operator =(Object&& o) {
+  *reinterpret_cast<object_t*>(&impl) =
+    std::move(*reinterpret_cast<object_t*>(&o.impl));
+  return *this;
+}
+
+Object::operator std::string_view() const {
+  return std::string_view(reinterpret_cast<const object_t*>(&impl)->name);
+}
+
+bool operator <(const Object& lhs, const Object& rhs) {
+  return (*reinterpret_cast<const object_t*>(&lhs.impl) <
+         *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator <=(const Object& lhs, const Object& rhs) {
+  return (*reinterpret_cast<const object_t*>(&lhs.impl) <=
+         *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator >=(const Object& lhs, const Object& rhs) {
+  return (*reinterpret_cast<const object_t*>(&lhs.impl) >=
+         *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator >(const Object& lhs, const Object& rhs) {
+  return (*reinterpret_cast<const object_t*>(&lhs.impl) >
+         *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+
+bool operator ==(const Object& lhs, const Object& rhs) {
+  return (*reinterpret_cast<const object_t*>(&lhs.impl) ==
+         *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+bool operator !=(const Object& lhs, const Object& rhs) {
+  return (*reinterpret_cast<const object_t*>(&lhs.impl) !=
+         *reinterpret_cast<const object_t*>(&rhs.impl));
+}
+
+std::ostream& operator <<(std::ostream& m, const Object& o) {
+  return (m << *reinterpret_cast<const object_t*>(&o.impl));
+}
+
+// IOContext
+
+struct IOContextImpl {
+  object_locator_t oloc;
+  snapid_t snap_seq = CEPH_NOSNAP;
+  SnapContext snapc;
+};
+
+IOContext::IOContext() {
+  static_assert(impl_size >= sizeof(IOContextImpl));
+  new (&impl) IOContextImpl();
+}
+
+IOContext::IOContext(std::int64_t _pool) : IOContext() {
+  pool(_pool);
+}
+
+IOContext::IOContext(std::int64_t _pool, std::string_view _ns)
+  : IOContext() {
+  pool(_pool);
+  ns(_ns);
+}
+
+IOContext::IOContext(std::int64_t _pool, std::string&& _ns)
+  : IOContext() {
+  pool(_pool);
+  ns(std::move(_ns));
+}
+
+IOContext::~IOContext() {
+  reinterpret_cast<IOContextImpl*>(&impl)->~IOContextImpl();
+}
+
+IOContext::IOContext(const IOContext& rhs) {
+  static_assert(impl_size >= sizeof(IOContextImpl));
+  new (&impl) IOContextImpl(*reinterpret_cast<const IOContextImpl*>(&rhs.impl));
+}
+
+IOContext& IOContext::operator =(const IOContext& rhs) {
+  *reinterpret_cast<IOContextImpl*>(&impl) =
+    *reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+  return *this;
+}
+
+IOContext::IOContext(IOContext&& rhs) {
+  static_assert(impl_size >= sizeof(IOContextImpl));
+  new (&impl) IOContextImpl(
+    std::move(*reinterpret_cast<IOContextImpl*>(&rhs.impl)));
+}
+
+IOContext& IOContext::operator =(IOContext&& rhs) {
+  *reinterpret_cast<IOContextImpl*>(&impl) =
+    std::move(*reinterpret_cast<IOContextImpl*>(&rhs.impl));
+  return *this;
+}
+
+std::int64_t IOContext::pool() const {
+  return reinterpret_cast<const IOContextImpl*>(&impl)->oloc.pool;
+}
+
+void IOContext::pool(std::int64_t _pool) {
+  reinterpret_cast<IOContextImpl*>(&impl)->oloc.pool = _pool;
+}
+
+std::string_view IOContext::ns() const {
+  return reinterpret_cast<const IOContextImpl*>(&impl)->oloc.nspace;
+}
+
+void IOContext::ns(std::string_view _ns) {
+  reinterpret_cast<IOContextImpl*>(&impl)->oloc.nspace = _ns;
+}
+
+void IOContext::ns(std::string&& _ns) {
+  reinterpret_cast<IOContextImpl*>(&impl)->oloc.nspace = std::move(_ns);
+}
+
+std::optional<std::string_view> IOContext::key() const {
+  auto& oloc = reinterpret_cast<const IOContextImpl*>(&impl)->oloc;
+  if (oloc.key.empty())
+    return std::nullopt;
+  else
+    return std::string_view(oloc.key);
+}
+
+void IOContext::key(std::string_view _key) {
+  auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+  oloc.hash = -1;
+  oloc.key = _key;
+}
+
+void IOContext::key(std::string&&_key) {
+  auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+  oloc.hash = -1;
+  oloc.key = std::move(_key);
+}
+
+void IOContext::clear_key() {
+  auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+  oloc.hash = -1;
+  oloc.key.clear();
+}
+
+std::optional<std::int64_t> IOContext::hash() const {
+  auto& oloc = reinterpret_cast<const IOContextImpl*>(&impl)->oloc;
+  if (oloc.hash < 0)
+    return std::nullopt;
+  else
+    return oloc.hash;
+}
+
+void IOContext::hash(std::int64_t _hash) {
+  auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+  oloc.hash = _hash;
+  oloc.key.clear();
+}
+
+void IOContext::clear_hash() {
+  auto& oloc = reinterpret_cast<IOContextImpl*>(&impl)->oloc;
+  oloc.hash = -1;
+  oloc.key.clear();
+}
+
+
+std::optional<std::uint64_t> IOContext::read_snap() const {
+  auto& snap_seq = reinterpret_cast<const IOContextImpl*>(&impl)->snap_seq;
+  if (snap_seq == CEPH_NOSNAP)
+    return std::nullopt;
+  else
+    return snap_seq;
+}
+void IOContext::read_snap(std::optional<std::uint64_t> _snapid) {
+  auto& snap_seq = reinterpret_cast<IOContextImpl*>(&impl)->snap_seq;
+  snap_seq = _snapid.value_or(CEPH_NOSNAP);
+}
+
+std::optional<
+  std::pair<std::uint64_t,
+           std::vector<std::uint64_t>>> IOContext::write_snap_context() const {
+  auto& snapc = reinterpret_cast<const IOContextImpl*>(&impl)->snapc;
+  if (snapc.empty()) {
+    return std::nullopt;
+  } else {
+    std::vector<uint64_t> v(snapc.snaps.begin(), snapc.snaps.end());
+    return std::make_optional(std::make_pair(uint64_t(snapc.seq), v));
+  }
+}
+
+void IOContext::write_snap_context(
+  std::optional<std::pair<std::uint64_t, std::vector<std::uint64_t>>> _snapc) {
+  auto& snapc = reinterpret_cast<IOContextImpl*>(&impl)->snapc;
+  if (!_snapc) {
+    snapc.clear();
+  } else {
+    SnapContext n(_snapc->first, { _snapc->second.begin(), _snapc->second.end()});
+    if (!n.is_valid()) {
+      throw bs::system_error(EINVAL,
+                            bs::system_category(),
+                            "Invalid snap context.");
+
+    } else {
+      snapc = n;
+    }
+  }
+}
+
+bool operator <(const IOContext& lhs, const IOContext& rhs) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+  const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+  return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) <
+         std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator <=(const IOContext& lhs, const IOContext& rhs) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+  const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+  return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) <=
+         std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator >=(const IOContext& lhs, const IOContext& rhs) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+  const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+  return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) >=
+         std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator >(const IOContext& lhs, const IOContext& rhs) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+  const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+  return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) >
+         std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator ==(const IOContext& lhs, const IOContext& rhs) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+  const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+  return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) ==
+         std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+bool operator !=(const IOContext& lhs, const IOContext& rhs) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&lhs.impl);
+  const auto r = reinterpret_cast<const IOContextImpl*>(&rhs.impl);
+
+  return (std::tie(l->oloc.pool, l->oloc.nspace, l->oloc.key) !=
+         std::tie(r->oloc.pool, r->oloc.nspace, r->oloc.key));
+}
+
+std::ostream& operator <<(std::ostream& m, const IOContext& o) {
+  const auto l = reinterpret_cast<const IOContextImpl*>(&o.impl);
+  return (m << l->oloc.pool << ":" << l->oloc.nspace << ":" << l->oloc.key);
+}
+
+
+// Op
+
+struct OpImpl {
+  ObjectOperation op;
+  std::optional<ceph::real_time> mtime;
+
+  OpImpl() = default;
+
+  OpImpl(const OpImpl& rhs) = delete;
+  OpImpl(OpImpl&& rhs) = default;
+
+  OpImpl& operator =(const OpImpl& rhs) = delete;
+  OpImpl& operator =(OpImpl&& rhs) = default;
+};
+
+Op::Op() {
+  static_assert(Op::impl_size >= sizeof(OpImpl));
+  new (&impl) OpImpl;
+}
+
+Op::Op(Op&& rhs) {
+  new (&impl) OpImpl(std::move(*reinterpret_cast<OpImpl*>(&rhs.impl)));
+}
+Op& Op::operator =(Op&& rhs) {
+  reinterpret_cast<OpImpl*>(&impl)->~OpImpl();
+  new (&impl) OpImpl(std::move(*reinterpret_cast<OpImpl*>(&rhs.impl)));
+  return *this;
+}
+Op::~Op() {
+  reinterpret_cast<OpImpl*>(&impl)->~OpImpl();
+}
+
+void Op::set_excl() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(CEPH_OSD_OP_FLAG_EXCL);
+}
+void Op::set_failok() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+    CEPH_OSD_OP_FLAG_FAILOK);
+}
+void Op::set_fadvise_random() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+    CEPH_OSD_OP_FLAG_FADVISE_RANDOM);
+}
+void Op::set_fadvise_sequential() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+    CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+}
+void Op::set_fadvise_willneed() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+    CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+}
+void Op::set_fadvise_dontneed() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+    CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+}
+void Op::set_fadvise_nocache() {
+  reinterpret_cast<OpImpl*>(&impl)->op.set_last_op_flags(
+    CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
+}
+
+void Op::cmpext(uint64_t off, bufferlist&& cmp_bl, std::size_t* s) {
+  reinterpret_cast<OpImpl*>(&impl)->op.cmpext(off, std::move(cmp_bl), nullptr,
+                                             s);
+}
+void Op::cmpxattr(std::string_view name, cmpxattr_op op, const bufferlist& val) {
+  reinterpret_cast<OpImpl*>(&impl)->
+    op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_STRING, val);
+}
+void Op::cmpxattr(std::string_view name, cmpxattr_op op, std::uint64_t val) {
+  bufferlist bl;
+  encode(val, bl);
+  reinterpret_cast<OpImpl*>(&impl)->
+    op.cmpxattr(name, std::uint8_t(op), CEPH_OSD_CMPXATTR_MODE_U64, bl);
+}
+
+void Op::assert_version(uint64_t ver) {
+  reinterpret_cast<OpImpl*>(&impl)->op.assert_version(ver);
+}
+void Op::assert_exists() {
+  reinterpret_cast<OpImpl*>(&impl)->op.stat(
+    nullptr,
+    static_cast<ceph::real_time*>(nullptr),
+    static_cast<bs::error_code*>(nullptr));
+}
+void Op::cmp_omap(const bc::flat_map<
+                 std::string, std::pair<cb::list,
+                 int>>& assertions) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_cmp(assertions, nullptr);
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+             const bufferlist& inbl,
+             cb::list* out,
+             bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, ec, out);
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+             const bufferlist& inbl,
+             fu2::unique_function<void(bs::error_code,
+                                       const cb::list&) &&> f) {
+  reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, std::move(f));
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+             const bufferlist& inbl,
+             fu2::unique_function<void(bs::error_code, int,
+                                       const cb::list&) &&> f) {
+  reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, std::move(f));
+}
+
+void Op::exec(std::string_view cls, std::string_view method,
+             const bufferlist& inbl, bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.call(cls, method, inbl, ec);
+}
+
+void Op::balance_reads() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_BALANCE_READS;
+}
+void Op::localize_reads() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_LOCALIZE_READS;
+}
+void Op::order_reads_writes() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_RWORDERED;
+}
+void Op::ignore_cache() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
+}
+void Op::skiprwlocks() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_SKIPRWLOCKS;
+}
+void Op::ignore_overlay() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+}
+void Op::full_try() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_TRY;
+}
+void Op::full_force() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_FULL_FORCE;
+}
+void Op::ignore_redirect() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_IGNORE_REDIRECT;
+}
+void Op::ordersnap() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_ORDERSNAP;
+}
+void Op::returnvec() {
+  reinterpret_cast<OpImpl*>(&impl)->op.flags |= CEPH_OSD_FLAG_RETURNVEC;
+}
+
+std::size_t Op::size() const {
+  return reinterpret_cast<const OpImpl*>(&impl)->op.size();
+}
+
+std::ostream& operator <<(std::ostream& m, const Op& o) {
+  return m << reinterpret_cast<const OpImpl*>(&o.impl)->op;
+}
+
+
+// ---
+
+// ReadOp / WriteOp
+
+void ReadOp::read(size_t off, uint64_t len, cb::list* out,
+                 bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.read(off, len, ec, out);
+}
+
+void ReadOp::get_xattr(std::string_view name, cb::list* out,
+                      bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.getxattr(name, ec, out);
+}
+
+void ReadOp::get_omap_header(cb::list* out,
+                            bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_get_header(ec, out);
+}
+
+void ReadOp::sparse_read(uint64_t off, uint64_t len, cb::list* out,
+                        std::vector<std::pair<std::uint64_t,
+                        std::uint64_t>>* extents,
+                        bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.sparse_read(off, len, ec, extents, out);
+}
+
+void ReadOp::stat(std::uint64_t* size, ceph::real_time* mtime,
+                 bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.stat(size, mtime, ec);
+}
+
+void ReadOp::get_omap_keys(std::optional<std::string_view> start_after,
+                          std::uint64_t max_return,
+                          bc::flat_set<std::string>* keys,
+                          bool* done,
+                          bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_get_keys(start_after, max_return,
+                                                    ec, keys, done);
+}
+
+void ReadOp::get_xattrs(bc::flat_map<std::string,
+                       cb::list>* kv,
+                       bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.getxattrs(ec, kv);
+}
+
+void ReadOp::get_omap_vals(std::optional<std::string_view> start_after,
+                          std::optional<std::string_view> filter_prefix,
+                          uint64_t max_return,
+                          bc::flat_map<std::string,
+                          cb::list>* kv,
+                          bool* done,
+                          bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_get_vals(start_after, filter_prefix,
+                                                    max_return, ec, kv, done);
+}
+
+void ReadOp::get_omap_vals_by_keys(
+  const bc::flat_set<std::string>& keys,
+  bc::flat_map<std::string, cb::list>* kv,
+  bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_get_vals_by_keys(keys, ec, kv);
+}
+
+void ReadOp::list_watchers(std::vector<ObjWatcher>* watchers,
+                          bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)-> op.list_watchers(watchers, ec);
+}
+
+void ReadOp::list_snaps(SnapSet* snaps,
+                       bs::error_code* ec) {
+  reinterpret_cast<OpImpl*>(&impl)->op.list_snaps(snaps, nullptr, ec);
+}
+
+// WriteOp
+
+void WriteOp::set_mtime(ceph::real_time t) {
+  auto o = reinterpret_cast<OpImpl*>(&impl);
+  o->mtime = t;
+}
+
+void WriteOp::create(bool exclusive) {
+  reinterpret_cast<OpImpl*>(&impl)->op.create(exclusive);
+}
+
+void WriteOp::write(uint64_t off, bufferlist&& bl) {
+  reinterpret_cast<OpImpl*>(&impl)->op.write(off, bl);
+}
+
+void WriteOp::write_full(bufferlist&& bl) {
+  reinterpret_cast<OpImpl*>(&impl)->op.write_full(bl);
+}
+
+void WriteOp::writesame(uint64_t off, uint64_t write_len, bufferlist&& bl) {
+  reinterpret_cast<OpImpl*>(&impl)->op.writesame(off, write_len, bl);
+}
+
+void WriteOp::append(bufferlist&& bl) {
+  reinterpret_cast<OpImpl*>(&impl)->op.append(bl);
+}
+
+void WriteOp::remove() {
+  reinterpret_cast<OpImpl*>(&impl)->op.remove();
+}
+
+void WriteOp::truncate(uint64_t off) {
+  reinterpret_cast<OpImpl*>(&impl)->op.truncate(off);
+}
+
+void WriteOp::zero(uint64_t off, uint64_t len) {
+  reinterpret_cast<OpImpl*>(&impl)->op.zero(off, len);
+}
+
+void WriteOp::rmxattr(std::string_view name) {
+  reinterpret_cast<OpImpl*>(&impl)->op.rmxattr(name);
+}
+
+void WriteOp::setxattr(std::string_view name,
+                       bufferlist&& bl) {
+  reinterpret_cast<OpImpl*>(&impl)->op.setxattr(name, bl);
+}
+
+void WriteOp::rollback(uint64_t snapid) {
+  reinterpret_cast<OpImpl*>(&impl)->op.rollback(snapid);
+}
+
+void WriteOp::set_omap(
+  const bc::flat_map<std::string, cb::list>& map) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_set(map);
+}
+
+void WriteOp::set_omap_header(bufferlist&& bl) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_set_header(bl);
+}
+
+void WriteOp::clear_omap() {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_clear();
+}
+
+void WriteOp::rm_omap_keys(
+  const bc::flat_set<std::string>& to_rm) {
+  reinterpret_cast<OpImpl*>(&impl)->op.omap_rm_keys(to_rm);
+}
+
+void WriteOp::set_alloc_hint(uint64_t expected_object_size,
+                            uint64_t expected_write_size,
+                            alloc_hint::alloc_hint_t flags) {
+  using namespace alloc_hint;
+  static_assert(sequential_write ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE));
+  static_assert(random_write ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE));
+  static_assert(sequential_read ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ));
+  static_assert(random_read ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ));
+  static_assert(append_only ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY));
+  static_assert(immutable ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE));
+  static_assert(shortlived ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_SHORTLIVED));
+  static_assert(longlived ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_LONGLIVED));
+  static_assert(compressible ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE));
+  static_assert(incompressible ==
+               static_cast<int>(CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE));
+
+  reinterpret_cast<OpImpl*>(&impl)->op.set_alloc_hint(expected_object_size,
+                                                     expected_write_size,
+                                                     flags);
+}
+
+// RADOS
+
+RADOS::Builder& RADOS::Builder::add_conf_file(std::string_view f) {
+  if (conf_files)
+    *conf_files += (", " + std::string(f));
+  else
+    conf_files = std::string(f);
+  return *this;
+}
+
+void RADOS::Builder::build(boost::asio::io_context& ioctx,
+                          std::unique_ptr<BuildComp> c) {
+  constexpr auto env = CODE_ENVIRONMENT_LIBRARY;
+  CephInitParameters ci(env);
+  if (name)
+    ci.name.set(CEPH_ENTITY_TYPE_CLIENT, *name);
+  else
+    ci.name.set(CEPH_ENTITY_TYPE_CLIENT, "admin");
+  uint32_t flags = 0;
+  if (no_default_conf)
+    flags |= CINIT_FLAG_NO_DEFAULT_CONFIG_FILE;
+  if (no_mon_conf)
+    flags |= CINIT_FLAG_NO_MON_CONFIG;
+
+  CephContext *cct = common_preinit(ci, env, flags);
+  if (cluster)
+    cct->_conf->cluster = *cluster;
+
+  if (no_mon_conf)
+    cct->_conf->no_mon_config = true;
+
+  // TODO: Come up with proper error codes here. Maybe augment the
+  // functions with a default bs::error_code* parameter to
+  // pass back.
+  {
+    std::ostringstream ss;
+    auto r = cct->_conf.parse_config_files(conf_files ? conf_files->data() : nullptr,
+                                          &ss, flags);
+    if (r < 0)
+      c->dispatch(std::move(c), ceph::to_error_code(r), RADOS{nullptr});
+  }
+
+  cct->_conf.parse_env(cct->get_module_type());
+
+  for (const auto& [n, v] : configs) {
+    std::stringstream ss;
+    auto r = cct->_conf.set_val(n, v, &ss);
+    if (r < 0)
+      c->dispatch(std::move(c), ceph::to_error_code(-EINVAL), RADOS{nullptr});
+  }
+
+  if (!no_mon_conf) {
+    MonClient mc_bootstrap(cct, ioctx);
+    // TODO This function should return an error code.
+    auto err = mc_bootstrap.get_monmap_and_config();
+    if (err < 0)
+      c->dispatch(std::move(c), ceph::to_error_code(err), RADOS{nullptr});
+  }
+  if (!cct->_log->is_started()) {
+    cct->_log->start();
+  }
+  common_init_finish(cct);
+
+  RADOS::make_with_cct(cct, ioctx, std::move(c));
+}
+
+void RADOS::make_with_cct(CephContext* cct,
+                         boost::asio::io_context& ioctx,
+                         std::unique_ptr<BuildComp> c) {
+  try {
+    auto r = new detail::RADOS(ioctx, cct);
+    r->objecter->wait_for_osd_map(
+      [c = std::move(c), r = std::unique_ptr<detail::RADOS>(r)]() mutable {
+       c->dispatch(std::move(c), bs::error_code{},
+                   RADOS{std::move(r)});
+      });
+  } catch (const bs::system_error& err) {
+    c->dispatch(std::move(c), err.code(), RADOS{nullptr});
+  }
+}
+
+
+RADOS::RADOS() = default;
+
+RADOS::RADOS(std::unique_ptr<detail::RADOS> impl)
+  : impl(std::move(impl)) {}
+
+RADOS::RADOS(RADOS&&) = default;
+RADOS& RADOS::operator =(RADOS&&) = default;
+
+RADOS::~RADOS() = default;
+
+RADOS::executor_type RADOS::get_executor() const {
+  return impl->ioctx.get_executor();
+}
+
+boost::asio::io_context& RADOS::get_io_context() {
+  return impl->ioctx;
+}
+
+void RADOS::execute(const Object& o, const IOContext& _ioc, ReadOp&& _op,
+                   cb::list* bl,
+                   std::unique_ptr<ReadOp::Completion> c, version_t* objver) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+  auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+  auto flags = 0; // Should be in Op.
+
+  impl->objecter->read(
+    *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags,
+    std::move(c), objver);
+}
+
+void RADOS::execute(const Object& o, const IOContext& _ioc, WriteOp&& _op,
+                   std::unique_ptr<WriteOp::Completion> c, version_t* objver) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+  auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+  auto flags = 0; // Should be in Op.
+  ceph::real_time mtime;
+  if (op->mtime)
+    mtime = *op->mtime;
+  else
+    mtime = ceph::real_clock::now();
+
+  impl->objecter->mutate(
+    *oid, ioc->oloc, std::move(op->op), ioc->snapc,
+    mtime, flags,
+    std::move(c), objver);
+}
+
+void RADOS::execute(const Object& o, std::int64_t pool, ReadOp&& _op,
+                   cb::list* bl,
+                   std::unique_ptr<ReadOp::Completion> c,
+                   std::optional<std::string_view> ns,
+                   std::optional<std::string_view> key,
+                   version_t* objver) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+  auto flags = 0; // Should be in Op.
+  object_locator_t oloc;
+  oloc.pool = pool;
+  if (ns)
+    oloc.nspace = *ns;
+  if (key)
+    oloc.key = *key;
+
+  impl->objecter->read(
+    *oid, oloc, std::move(op->op), CEPH_NOSNAP, bl, flags,
+    std::move(c), objver);
+}
+
+void RADOS::execute(const Object& o, std::int64_t pool, WriteOp&& _op,
+                   std::unique_ptr<WriteOp::Completion> c,
+                   std::optional<std::string_view> ns,
+                   std::optional<std::string_view> key,
+                   version_t* objver) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto op = reinterpret_cast<OpImpl*>(&_op.impl);
+  auto flags = 0; // Should be in Op.
+  object_locator_t oloc;
+  oloc.pool = pool;
+  if (ns)
+    oloc.nspace = *ns;
+  if (key)
+    oloc.key = *key;
+
+  ceph::real_time mtime;
+  if (op->mtime)
+    mtime = *op->mtime;
+  else
+    mtime = ceph::real_clock::now();
+
+  impl->objecter->mutate(
+    *oid, oloc, std::move(op->op), {},
+    mtime, flags,
+    std::move(c), objver);
+}
+
+boost::uuids::uuid RADOS::get_fsid() const noexcept {
+  return impl->monclient.get_fsid().uuid;
+}
+
+
+void RADOS::lookup_pool(std::string_view name,
+                       std::unique_ptr<LookupPoolComp> c)
+{
+  // I kind of want to make lookup_pg_pool return
+  // std::optional<int64_t> since it can only return one error code.
+  int64_t ret = impl->objecter->with_osdmap(
+    std::mem_fn(&OSDMap::lookup_pg_pool_name),
+    name);
+  if (ret < 0) {
+    impl->objecter->wait_for_latest_osdmap(
+      [name = std::string(name), c = std::move(c),
+       objecter = impl->objecter.get()]
+      (bs::error_code ec) mutable {
+       int64_t ret =
+         objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
+                               name);
+       if (ret < 0)
+         ca::dispatch(std::move(c), osdc_errc::pool_dne,
+                      std::int64_t(0));
+       else
+         ca::dispatch(std::move(c), bs::error_code{}, ret);
+      });
+  } else if (ret < 0) {
+    ca::dispatch(std::move(c), osdc_errc::pool_dne,
+                std::int64_t(0));
+  } else {
+    ca::dispatch(std::move(c), bs::error_code{}, ret);
+  }
+}
+
+
+std::optional<uint64_t> RADOS::get_pool_alignment(int64_t pool_id)
+{
+  return impl->objecter->with_osdmap(
+    [pool_id](const OSDMap &o) -> std::optional<uint64_t> {
+      if (!o.have_pg_pool(pool_id)) {
+       throw bs::system_error(
+         ENOENT, bs::system_category(),
+         "Cannot find pool in OSDMap.");
+      } else if (o.get_pg_pool(pool_id)->requires_aligned_append()) {
+       return o.get_pg_pool(pool_id)->required_alignment();
+      } else {
+       return std::nullopt;
+      }
+    });
+}
+
+void RADOS::list_pools(std::unique_ptr<LSPoolsComp> c) {
+  impl->objecter->with_osdmap(
+    [&](OSDMap& o) {
+      std::vector<std::pair<std::int64_t, std::string>> v;
+      for (auto p : o.get_pools())
+       v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
+      ca::dispatch(std::move(c), std::move(v));
+    });
+}
+
+void RADOS::create_pool_snap(std::int64_t pool,
+                            std::string_view snapName,
+                            std::unique_ptr<SimpleOpComp> c)
+{
+  impl->objecter->create_pool_snap(
+    pool, snapName,
+    Objecter::PoolOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+       ca::dispatch(std::move(c), e);
+      }));
+}
+
+void RADOS::allocate_selfmanaged_snap(int64_t pool,
+                                     std::unique_ptr<SMSnapComp> c) {
+  impl->objecter->allocate_selfmanaged_snap(
+    pool,
+    ca::Completion<void(bs::error_code, snapid_t)>::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, snapid_t snap) mutable {
+       ca::dispatch(std::move(c), e, snap);
+      }));
+}
+
+void RADOS::delete_pool_snap(std::int64_t pool,
+                            std::string_view snapName,
+                            std::unique_ptr<SimpleOpComp> c)
+{
+  impl->objecter->delete_pool_snap(
+    pool, snapName,
+    Objecter::PoolOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+       ca::dispatch(std::move(c), e);
+      }));
+}
+
+void RADOS::delete_selfmanaged_snap(std::int64_t pool,
+                                   std::uint64_t snap,
+                                   std::unique_ptr<SimpleOpComp> c)
+{
+  impl->objecter->delete_selfmanaged_snap(
+    pool, snap,
+    Objecter::PoolOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+       ca::dispatch(std::move(c), e);
+      }));
+}
+
+void RADOS::create_pool(std::string_view name,
+                       std::optional<int> crush_rule,
+                       std::unique_ptr<SimpleOpComp> c)
+{
+  impl->objecter->create_pool(
+    name,
+    Objecter::PoolOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+       ca::dispatch(std::move(c), e);
+      }),
+      crush_rule.value_or(-1));
+}
+
+void RADOS::delete_pool(std::string_view name,
+                       std::unique_ptr<SimpleOpComp> c)
+{
+  impl->objecter->delete_pool(
+    name,
+    Objecter::PoolOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+       ca::dispatch(std::move(c), e);
+      }));
+}
+
+void RADOS::delete_pool(std::int64_t pool,
+                       std::unique_ptr<SimpleOpComp> c)
+{
+  impl->objecter->delete_pool(
+    pool,
+    Objecter::PoolOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c)](bs::error_code e, const bufferlist&) mutable {
+       ca::dispatch(std::move(c), e);
+      }));
+}
+
+void RADOS::stat_pools(const std::vector<std::string>& pools,
+                      std::unique_ptr<PoolStatComp> c) {
+  impl->objecter->get_pool_stats(
+    pools,
+    [c = std::move(c)]
+    (bs::error_code ec,
+     bc::flat_map<std::string, pool_stat_t> rawresult,
+     bool per_pool) mutable {
+      bc::flat_map<std::string, PoolStats> result;
+      for (auto p = rawresult.begin(); p != rawresult.end(); ++p) {
+       auto& pv = result[p->first];
+       auto& pstat = p->second;
+       store_statfs_t &statfs = pstat.store_stats;
+       uint64_t allocated_bytes = pstat.get_allocated_data_bytes(per_pool) +
+         pstat.get_allocated_omap_bytes(per_pool);
+       // FIXME: raw_used_rate is unknown hence use 1.0 here
+       // meaning we keep net amount aggregated over all replicas
+       // Not a big deal so far since this field isn't exposed
+       uint64_t user_bytes = pstat.get_user_data_bytes(1.0, per_pool) +
+         pstat.get_user_omap_bytes(1.0, per_pool);
+
+       object_stat_sum_t *sum = &p->second.stats.sum;
+       pv.num_kb = shift_round_up(allocated_bytes, 10);
+       pv.num_bytes = allocated_bytes;
+       pv.num_objects = sum->num_objects;
+       pv.num_object_clones = sum->num_object_clones;
+       pv.num_object_copies = sum->num_object_copies;
+       pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary;
+       pv.num_objects_unfound = sum->num_objects_unfound;
+       pv.num_objects_degraded = sum->num_objects_degraded;
+       pv.num_rd = sum->num_rd;
+       pv.num_rd_kb = sum->num_rd_kb;
+       pv.num_wr = sum->num_wr;
+       pv.num_wr_kb = sum->num_wr_kb;
+       pv.num_user_bytes = user_bytes;
+       pv.compressed_bytes_orig = statfs.data_compressed_original;
+       pv.compressed_bytes = statfs.data_compressed;
+       pv.compressed_bytes_alloc = statfs.data_compressed_allocated;
+      }
+
+      ca::dispatch(std::move(c), ec, std::move(result), per_pool);
+    });
+}
+
+void RADOS::stat_fs(std::optional<std::int64_t> _pool,
+                   std::unique_ptr<StatFSComp> c) {
+  boost::optional<int64_t> pool;
+  if (_pool)
+    pool = *pool;
+  impl->objecter->get_fs_stats(
+    pool,
+    [c = std::move(c)](bs::error_code ec, const struct ceph_statfs s) mutable {
+      FSStats fso{s.kb, s.kb_used, s.kb_avail, s.num_objects};
+      c->dispatch(std::move(c), ec, std::move(fso));
+    });
+}
+
+// --- Watch/Notify
+
+void RADOS::watch(const Object& o, const IOContext& _ioc,
+                 std::optional<std::chrono::seconds> timeout, WatchCB&& cb,
+                 std::unique_ptr<WatchComp> c) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+  ObjectOperation op;
+
+  auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, 0);
+  uint64_t cookie = linger_op->get_cookie();
+  linger_op->handle = std::move(cb);
+  op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
+  bufferlist bl;
+  impl->objecter->linger_watch(
+    linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
+    Objecter::LingerOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+       ca::dispatch(std::move(c), e, cookie);
+      }), nullptr);
+}
+
+void RADOS::watch(const Object& o, std::int64_t pool,
+                 std::optional<std::chrono::seconds> timeout, WatchCB&& cb,
+                 std::unique_ptr<WatchComp> c,
+                 std::optional<std::string_view> ns,
+                 std::optional<std::string_view> key) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  object_locator_t oloc;
+  oloc.pool = pool;
+  if (ns)
+    oloc.nspace = *ns;
+  if (key)
+    oloc.key = *key;
+
+  ObjectOperation op;
+
+  Objecter::LingerOp *linger_op = impl->objecter->linger_register(*oid, oloc, 0);
+  uint64_t cookie = linger_op->get_cookie();
+  linger_op->handle = std::move(cb);
+  op.watch(cookie, CEPH_OSD_WATCH_OP_WATCH, timeout.value_or(0s).count());
+  bufferlist bl;
+  impl->objecter->linger_watch(
+    linger_op, op, {}, ceph::real_clock::now(), bl,
+    Objecter::LingerOp::OpComp::create(
+      get_executor(),
+      [c = std::move(c), cookie](bs::error_code e, bufferlist) mutable {
+       ca::dispatch(std::move(c), e, cookie);
+      }), nullptr);
+}
+
+void RADOS::notify_ack(const Object& o,
+                      const IOContext& _ioc,
+                      uint64_t notify_id,
+                      uint64_t cookie,
+                      bufferlist&& bl,
+                      std::unique_ptr<SimpleOpComp> c)
+{
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+  ObjectOperation op;
+  op.notify_ack(notify_id, cookie, bl);
+
+  impl->objecter->read(*oid, ioc->oloc, std::move(op), ioc->snap_seq,
+                      nullptr, 0, std::move(c));
+}
+
+void RADOS::notify_ack(const Object& o,
+                      std::int64_t pool,
+                      uint64_t notify_id,
+                      uint64_t cookie,
+                      bufferlist&& bl,
+                      std::unique_ptr<SimpleOpComp> c,
+                      std::optional<std::string_view> ns,
+                      std::optional<std::string_view> key) {
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  object_locator_t oloc;
+  oloc.pool = pool;
+  if (ns)
+    oloc.nspace = *ns;
+  if (key)
+    oloc.key = *key;
+
+  ObjectOperation op;
+  op.notify_ack(notify_id, cookie, bl);
+  impl->objecter->read(*oid, oloc, std::move(op), CEPH_NOSNAP, nullptr, 0,
+                      std::move(c));
+}
+
+tl::expected<ceph::timespan, bs::error_code> RADOS::watch_check(uint64_t cookie)
+{
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+  return impl->objecter->linger_check(linger_op);
+}
+
+void RADOS::unwatch(uint64_t cookie, const IOContext& _ioc,
+                   std::unique_ptr<SimpleOpComp> c)
+{
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+
+  ObjectOperation op;
+  op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+  impl->objecter->mutate(linger_op->target.base_oid, ioc->oloc, std::move(op),
+                        ioc->snapc, ceph::real_clock::now(), 0,
+                        Objecter::Op::OpComp::create(
+                          get_executor(),
+                          [objecter = impl->objecter.get(),
+                           linger_op, c = std::move(c)]
+                          (bs::error_code ec) mutable {
+                            objecter->linger_cancel(linger_op);
+                            ca::dispatch(std::move(c), ec);
+                          }));
+}
+
+void RADOS::unwatch(uint64_t cookie, std::int64_t pool,
+                   std::unique_ptr<SimpleOpComp> c,
+                   std::optional<std::string_view> ns,
+                   std::optional<std::string_view> key)
+{
+  object_locator_t oloc;
+  oloc.pool = pool;
+  if (ns)
+    oloc.nspace = *ns;
+  if (key)
+    oloc.key = *key;
+
+  Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+
+  ObjectOperation op;
+  op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
+  impl->objecter->mutate(linger_op->target.base_oid, oloc, std::move(op),
+                        {}, ceph::real_clock::now(), 0,
+                        Objecter::Op::OpComp::create(
+                          get_executor(),
+                          [objecter = impl->objecter.get(),
+                           linger_op, c = std::move(c)]
+                          (bs::error_code ec) mutable {
+                            objecter->linger_cancel(linger_op);
+                            ca::dispatch(std::move(c), ec);
+                          }));
+}
+
+void RADOS::flush_watch(std::unique_ptr<VoidOpComp> c)
+{
+  impl->objecter->linger_callback_flush([c = std::move(c)]() mutable {
+                                         ca::post(std::move(c));
+                                       });
+}
+
+struct NotifyHandler : std::enable_shared_from_this<NotifyHandler> {
+  boost::asio::io_context& ioc;
+  boost::asio::io_context::strand strand;
+  Objecter* objecter;
+  Objecter::LingerOp* op;
+  std::unique_ptr<RADOS::NotifyComp> c;
+
+  bool acked = false;
+  bool finished = false;
+  bs::error_code res;
+  bufferlist rbl;
+
+  NotifyHandler(boost::asio::io_context& ioc,
+               Objecter* objecter,
+               Objecter::LingerOp* op,
+               std::unique_ptr<RADOS::NotifyComp> c)
+    : ioc(ioc), strand(ioc), objecter(objecter), op(op), c(std::move(c)) {}
+
+  // Use bind or a lambda to pass this in.
+  void handle_ack(bs::error_code ec,
+                 bufferlist&&) {
+    boost::asio::post(
+      strand,
+      [this, ec, p = shared_from_this()]() mutable {
+       acked = true;
+       maybe_cleanup(ec);
+      });
+  }
+
+  // Notify finish callback. It can actually own the object's storage.
+
+  void operator()(bs::error_code ec,
+                 bufferlist&& bl) {
+    boost::asio::post(
+      strand,
+      [this, ec, p = shared_from_this()]() mutable {
+       finished = true;
+       maybe_cleanup(ec);
+      });
+  }
+
+  // Should be called from strand.
+  void maybe_cleanup(bs::error_code ec) {
+    if (!res && ec)
+      res = ec;
+    if ((acked && finished) || res) {
+      objecter->linger_cancel(op);
+      ceph_assert(c);
+      ca::dispatch(std::move(c), res, std::move(rbl));
+    }
+  }
+};
+
+void RADOS::notify(const Object& o, const IOContext& _ioc, bufferlist&& bl,
+                  std::optional<std::chrono::milliseconds> timeout,
+                  std::unique_ptr<NotifyComp> c)
+{
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+  auto linger_op = impl->objecter->linger_register(*oid, ioc->oloc, 0);
+
+  auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter.get(),
+                                            linger_op, std::move(c));
+  linger_op->on_notify_finish =
+    Objecter::LingerOp::OpComp::create(
+      get_executor(),
+      [cb](bs::error_code ec, ceph::bufferlist bl) mutable {
+       (*cb)(ec, std::move(bl));
+      });
+  ObjectOperation rd;
+  bufferlist inbl;
+  rd.notify(
+    linger_op->get_cookie(), 1,
+    timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout,
+    bl, &inbl);
+
+  impl->objecter->linger_notify(
+    linger_op, rd, ioc->snap_seq, inbl,
+    Objecter::LingerOp::OpComp::create(
+      get_executor(),
+      [cb](bs::error_code ec, ceph::bufferlist bl) mutable {
+       cb->handle_ack(ec, std::move(bl));
+      }), nullptr);
+}
+
+void RADOS::notify(const Object& o, std::int64_t pool, bufferlist&& bl,
+                  std::optional<std::chrono::milliseconds> timeout,
+                  std::unique_ptr<NotifyComp> c,
+                  std::optional<std::string_view> ns,
+                  std::optional<std::string_view> key)
+{
+  auto oid = reinterpret_cast<const object_t*>(&o.impl);
+  object_locator_t oloc;
+  oloc.pool = pool;
+  if (ns)
+    oloc.nspace = *ns;
+  if (key)
+    oloc.key = *key;
+  auto linger_op = impl->objecter->linger_register(*oid, oloc, 0);
+
+  auto cb = std::make_shared<NotifyHandler>(impl->ioctx, impl->objecter.get(),
+                                            linger_op, std::move(c));
+  linger_op->on_notify_finish =
+    Objecter::LingerOp::OpComp::create(
+      get_executor(),
+      [cb](bs::error_code ec, ceph::bufferlist&& bl) mutable {
+       (*cb)(ec, std::move(bl));
+      });
+  ObjectOperation rd;
+  bufferlist inbl;
+  rd.notify(
+    linger_op->get_cookie(), 1,
+    timeout ? timeout->count() : impl->cct->_conf->client_notify_timeout,
+    bl, &inbl);
+
+  impl->objecter->linger_notify(
+    linger_op, rd, CEPH_NOSNAP, inbl,
+    Objecter::LingerOp::OpComp::create(
+      get_executor(),
+      [cb](bs::error_code ec, bufferlist&& bl) mutable {
+       cb->handle_ack(ec, std::move(bl));
+      }), nullptr);
+}
+
+// Enumeration
+
+Cursor::Cursor() {
+  static_assert(impl_size >= sizeof(hobject_t));
+  new (&impl) hobject_t();
+};
+
+Cursor::Cursor(end_magic_t) {
+  static_assert(impl_size >= sizeof(hobject_t));
+  new (&impl) hobject_t(hobject_t::get_max());
+}
+
+Cursor::Cursor(void* p) {
+  static_assert(impl_size >= sizeof(hobject_t));
+  new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(p)));
+}
+
+Cursor Cursor::begin() {
+  Cursor e;
+  return e;
+}
+
+Cursor Cursor::end() {
+  Cursor e(end_magic_t{});
+  return e;
+}
+
+Cursor::Cursor(const Cursor& rhs) {
+  static_assert(impl_size >= sizeof(hobject_t));
+  new (&impl) hobject_t(*reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+Cursor& Cursor::operator =(const Cursor& rhs) {
+  static_assert(impl_size >= sizeof(hobject_t));
+  reinterpret_cast<hobject_t*>(&impl)->~hobject_t();
+  new (&impl) hobject_t(*reinterpret_cast<const hobject_t*>(&rhs.impl));
+  return *this;
+}
+
+Cursor::Cursor(Cursor&& rhs) {
+  static_assert(impl_size >= sizeof(hobject_t));
+  new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(&rhs.impl)));
+}
+
+Cursor& Cursor::operator =(Cursor&& rhs) {
+  static_assert(impl_size >= sizeof(hobject_t));
+  reinterpret_cast<hobject_t*>(&impl)->~hobject_t();
+  new (&impl) hobject_t(std::move(*reinterpret_cast<hobject_t*>(&rhs.impl)));
+  return *this;
+}
+Cursor::~Cursor() {
+  reinterpret_cast<hobject_t*>(&impl)->~hobject_t();
+}
+
+bool operator ==(const Cursor& lhs, const Cursor& rhs) {
+  return (*reinterpret_cast<const hobject_t*>(&lhs.impl) ==
+         *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator !=(const Cursor& lhs, const Cursor& rhs) {
+  return (*reinterpret_cast<const hobject_t*>(&lhs.impl) !=
+         *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator <(const Cursor& lhs, const Cursor& rhs) {
+  return (*reinterpret_cast<const hobject_t*>(&lhs.impl) <
+         *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator <=(const Cursor& lhs, const Cursor& rhs) {
+  return (*reinterpret_cast<const hobject_t*>(&lhs.impl) <=
+         *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator >=(const Cursor& lhs, const Cursor& rhs) {
+  return (*reinterpret_cast<const hobject_t*>(&lhs.impl) >=
+         *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+bool operator >(const Cursor& lhs, const Cursor& rhs) {
+  return (*reinterpret_cast<const hobject_t*>(&lhs.impl) >
+         *reinterpret_cast<const hobject_t*>(&rhs.impl));
+}
+
+std::string Cursor::to_str() const {
+  using namespace std::literals;
+  auto& h = *reinterpret_cast<const hobject_t*>(&impl);
+
+  return h.is_max() ? "MAX"s : h.to_str();
+}
+
+std::optional<Cursor>
+Cursor::from_str(const std::string& s) {
+  Cursor e;
+  auto& h = *reinterpret_cast<hobject_t*>(&e.impl);
+  if (!h.parse(s))
+    return std::nullopt;
+
+  return e;
+}
+
+void RADOS::enumerate_objects(const IOContext& _ioc,
+                             const Cursor& begin,
+                             const Cursor& end,
+                             const std::uint32_t max,
+                             const bufferlist& filter,
+                             std::unique_ptr<EnumerateComp> c) {
+  auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
+
+  impl->objecter->enumerate_objects<Entry>(
+    ioc->oloc.pool,
+    ioc->oloc.nspace,
+    *reinterpret_cast<const hobject_t*>(&begin.impl),
+    *reinterpret_cast<const hobject_t*>(&end.impl),
+    max,
+    filter,
+    [c = std::move(c)]
+    (bs::error_code ec, std::vector<Entry>&& v,
+     hobject_t&& n) mutable {
+      ca::dispatch(std::move(c), ec, std::move(v),
+                  Cursor(static_cast<void*>(&n)));
+    });
+}
+
+void RADOS::enumerate_objects(std::int64_t pool,
+                             const Cursor& begin,
+                             const Cursor& end,
+                             const std::uint32_t max,
+                             const bufferlist& filter,
+                             std::unique_ptr<EnumerateComp> c,
+                             std::optional<std::string_view> ns,
+                             std::optional<std::string_view> key) {
+  impl->objecter->enumerate_objects<Entry>(
+    pool,
+    ns ? *ns : std::string_view{},
+    *reinterpret_cast<const hobject_t*>(&begin.impl),
+    *reinterpret_cast<const hobject_t*>(&end.impl),
+    max,
+    filter,
+    [c = std::move(c)]
+    (bs::error_code ec, std::vector<Entry>&& v,
+     hobject_t&& n) mutable {
+      ca::dispatch(std::move(c), ec, std::move(v),
+                  Cursor(static_cast<void*>(&n)));
+    });
+}
+
+
+void RADOS::osd_command(int osd, std::vector<std::string>&& cmd,
+                       ceph::bufferlist&& in, std::unique_ptr<CommandComp> c) {
+  impl->objecter->osd_command(osd, std::move(cmd), std::move(in), nullptr,
+                             [c = std::move(c)]
+                             (bs::error_code ec,
+                              std::string&& s,
+                              ceph::bufferlist&& b) mutable {
+                               ca::dispatch(std::move(c), ec,
+                                            std::move(s),
+                                            std::move(b));
+                             });
+}
+void RADOS::pg_command(PG pg, std::vector<std::string>&& cmd,
+                      ceph::bufferlist&& in, std::unique_ptr<CommandComp> c) {
+  impl->objecter->pg_command(pg_t{pg.seed, pg.pool}, std::move(cmd), std::move(in), nullptr,
+                            [c = std::move(c)]
+                            (bs::error_code ec,
+                             std::string&& s,
+                             ceph::bufferlist&& b) mutable {
+                              ca::dispatch(std::move(c), ec,
+                                           std::move(s),
+                                           std::move(b));
+                            });
+}
+
+void RADOS::enable_application(std::string_view pool, std::string_view app_name,
+                              bool force, std::unique_ptr<SimpleOpComp> c) {
+  // pre-Luminous clusters will return -EINVAL and application won't be
+  // preserved until Luminous is configured as minimum version.
+  if (!impl->get_required_monitor_features().contains_all(
+       ceph::features::mon::FEATURE_LUMINOUS)) {
+    ca::dispatch(std::move(c), ceph::to_error_code(-EOPNOTSUPP));
+  } else {
+    impl->monclient.start_mon_command(
+      { fmt::format("{{ \"prefix\": \"osd pool application enable\","
+                   "\"pool\": \"{}\", \"app\": \"{}\"{}}}",
+                   pool, app_name,
+                   force ? " ,\"yes_i_really_mean_it\": true" : "")},
+      {}, [c = std::move(c)](bs::error_code e,
+                            std::string, cb::list) mutable {
+           ca::post(std::move(c), e);
+         });
+  }
+}
+
+void RADOS::mon_command(std::vector<std::string> command,
+                       const cb::list& bl,
+                       std::string* outs, cb::list* outbl,
+                       std::unique_ptr<SimpleOpComp> c) {
+
+  impl->monclient.start_mon_command(
+    command, bl,
+    [c = std::move(c), outs, outbl](bs::error_code e,
+                                   std::string s, cb::list bl) mutable {
+      if (outs)
+       *outs = std::move(s);
+      if (outbl)
+       *outbl = std::move(bl);
+      ca::post(std::move(c), e);
+    });
+}
+
+uint64_t RADOS::instance_id() const {
+  return impl->get_instance_id();
+}
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
+class category : public ceph::converting_category {
+public:
+  category() {}
+  const char* name() const noexcept override;
+  const char* message(int ev, char*, std::size_t) const noexcept override;
+  std::string message(int ev) const override;
+  bs::error_condition default_error_condition(int ev) const noexcept
+    override;
+  bool equivalent(int ev, const bs::error_condition& c) const
+    noexcept override;
+  using ceph::converting_category::equivalent;
+  int from_code(int ev) const noexcept override;
+};
+#pragma GCC diagnostic pop
+#pragma clang diagnostic pop
+
+const char* category::name() const noexcept {
+  return "RADOS";
+}
+
+const char* category::message(int ev, char*,
+                             std::size_t) const noexcept {
+  if (ev == 0)
+    return "No error";
+
+  switch (static_cast<errc>(ev)) {
+  case errc::pool_dne:
+    return "Pool does not exist";
+
+  case errc::invalid_snapcontext:
+    return "Invalid snapcontext";
+  }
+
+  return "Unknown error";
+}
+
+std::string category::message(int ev) const {
+  return message(ev, nullptr, 0);
+}
+
+bs::error_condition category::default_error_condition(int ev) const noexcept {
+  switch (static_cast<errc>(ev)) {
+  case errc::pool_dne:
+    return ceph::errc::does_not_exist;
+  case errc::invalid_snapcontext:
+    return bs::errc::invalid_argument;
+  }
+
+  return { ev, *this };
+}
+
+bool category::equivalent(int ev, const bs::error_condition& c) const noexcept {
+  if (static_cast<errc>(ev) == errc::pool_dne) {
+    if (c == bs::errc::no_such_file_or_directory) {
+      return true;
+    }
+  }
+
+  return default_error_condition(ev) == c;
+}
+
+int category::from_code(int ev) const noexcept {
+  switch (static_cast<errc>(ev)) {
+  case errc::pool_dne:
+    return -ENOENT;
+  case errc::invalid_snapcontext:
+    return -EINVAL;
+  }
+  return -EDOM;
+}
+
+const bs::error_category& error_category() noexcept {
+  static const class category c;
+  return c;
+}
+
+CephContext* RADOS::cct() {
+  return impl->cct;
+}
+}
+
+namespace std {
+size_t hash<neorados::Object>::operator ()(
+  const neorados::Object& r) const {
+  static constexpr const hash<object_t> H;
+  return H(*reinterpret_cast<const object_t*>(&r.impl));
+}
+
+size_t hash<neorados::IOContext>::operator ()(
+  const neorados::IOContext& r) const {
+  static constexpr const hash<int64_t> H;
+  static constexpr const hash<std::string> G;
+  const auto l = reinterpret_cast<const neorados::IOContextImpl*>(&r.impl);
+  return H(l->oloc.pool) ^ (G(l->oloc.nspace) << 1) ^ (G(l->oloc.key) << 2);
+}
+}
diff --git a/src/neorados/RADOSImpl.cc b/src/neorados/RADOSImpl.cc
new file mode 100644 (file)
index 0000000..4dd1f37
--- /dev/null
@@ -0,0 +1,112 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#include <boost/system/system_error.hpp>
+
+#include "common/common_init.h"
+
+#include "global/global_init.h"
+
+#include "RADOSImpl.h"
+
+namespace neorados {
+namespace detail {
+
+RADOS::RADOS(boost::asio::io_context& ioctx,
+            boost::intrusive_ptr<CephContext> _cct)
+  : Dispatcher(_cct.detach()),
+    ioctx(ioctx),
+    monclient(cct, ioctx),
+    moncsd(monclient),
+    mgrclient(cct, nullptr, &monclient.monmap),
+    mgrcsd(mgrclient) {
+  auto err = monclient.build_initial_monmap();
+  if (err < 0)
+    throw std::system_error(ceph::to_error_code(err));
+
+  messenger.reset(Messenger::create_client_messenger(cct, "radosclient"));
+  if (!messenger)
+    throw std::bad_alloc();
+
+  // Require OSDREPLYMUX feature. This means we will fail to talk to
+  // old servers. This is necessary because otherwise we won't know
+  // how to decompose the reply data into its constituent pieces.
+  messenger->set_default_policy(
+    Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
+
+  objecter.reset(new Objecter(cct, messenger.get(), &monclient,
+                             ioctx,
+                             cct->_conf->rados_mon_op_timeout,
+                             cct->_conf->rados_osd_op_timeout));
+
+  objecter->set_balanced_budget();
+  monclient.set_messenger(messenger.get());
+  mgrclient.set_messenger(messenger.get());
+  objecter->init();
+  messenger->add_dispatcher_head(&mgrclient);
+  messenger->add_dispatcher_tail(objecter.get());
+  messenger->start();
+  monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
+  err = monclient.init();
+  if (err) {
+    throw boost::system::system_error(ceph::to_error_code(err));
+  }
+  err = monclient.authenticate(cct->_conf->client_mount_timeout);
+  if (err) {
+    throw boost::system::system_error(ceph::to_error_code(err));
+  }
+  messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
+  // Detect older cluster, put mgrclient into compatible mode
+  mgrclient.set_mgr_optional(
+      !get_required_monitor_features().contains_all(
+        ceph::features::mon::FEATURE_LUMINOUS));
+
+  // MgrClient needs this (it doesn't have MonClient reference itself)
+  monclient.sub_want("mgrmap", 0, 0);
+  monclient.renew_subs();
+
+  mgrclient.init();
+  objecter->set_client_incarnation(0);
+  objecter->start();
+
+  messenger->add_dispatcher_tail(this);
+
+  std::unique_lock l(lock);
+  instance_id = monclient.get_global_id();
+}
+
+bool RADOS::ms_dispatch(Message *m)
+{
+  switch (m->get_type()) {
+  // OSD
+  case CEPH_MSG_OSD_MAP:
+    m->put();
+    return true;
+  }
+  return false;
+}
+
+void RADOS::ms_handle_connect(Connection *con) {}
+bool RADOS::ms_handle_reset(Connection *con) {
+  return false;
+}
+void RADOS::ms_handle_remote_reset(Connection *con) {}
+bool RADOS::ms_handle_refused(Connection *con) {
+  return false;
+}
+
+RADOS::~RADOS() = default;
+}
+}
diff --git a/src/neorados/RADOSImpl.h b/src/neorados/RADOSImpl.h
new file mode 100644 (file)
index 0000000..bf9e75d
--- /dev/null
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+#ifndef CEPH_LIBRADOS_RADOSCLIENT_H
+#define CEPH_LIBRADOS_RADOSCLIENT_H
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include <boost/asio.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#include "common/ceph_context.h"
+#include "common/ceph_mutex.h"
+
+#include "mon/MonClient.h"
+
+#include "mgr/MgrClient.h"
+
+#include "osdc/Objecter.h"
+
+namespace neorados {
+  class RADOS;
+namespace detail {
+
+class RADOS : public Dispatcher
+{
+  friend ::neorados::RADOS;
+  struct MsgDeleter {
+    void operator()(Messenger* p) const {
+      if (p) {
+       p->shutdown();
+       p->wait();
+      }
+      delete p;
+    }
+  };
+
+  struct ObjDeleter {
+    void operator()(Objecter* p) const {
+      if (p) {
+       p->shutdown();
+      }
+      delete p;
+    }
+  };
+
+  template<typename T>
+  struct scoped_shutdown {
+    T& m;
+    scoped_shutdown(T& m) : m(m) {}
+
+    ~scoped_shutdown() {
+      m.shutdown();
+    }
+  };
+
+  boost::asio::io_context& ioctx;
+  ceph::mutex lock = ceph::make_mutex("RADOS_unleashed::_::RADOSImpl");
+  int instance_id = -1;
+
+  std::unique_ptr<Messenger, MsgDeleter> messenger;
+
+  MonClient monclient;
+  scoped_shutdown<MonClient> moncsd;
+
+  MgrClient mgrclient;
+  scoped_shutdown<MgrClient> mgrcsd;
+
+  std::unique_ptr<Objecter, ObjDeleter> objecter;
+
+
+public:
+
+  RADOS(boost::asio::io_context& ioctx, boost::intrusive_ptr<CephContext> cct);
+  ~RADOS();
+  bool ms_dispatch(Message *m) override;
+  void ms_handle_connect(Connection *con) override;
+  bool ms_handle_reset(Connection *con) override;
+  void ms_handle_remote_reset(Connection *con) override;
+  bool ms_handle_refused(Connection *con) override;
+  mon_feature_t get_required_monitor_features() const {
+    return monclient.with_monmap(std::mem_fn(&MonMap::get_required_features));
+  }
+  int get_instance_id() const {
+    return instance_id;
+  }
+};
+}
+}
+
+#endif
index 516fe63afd1c2bf8d993407003967f0dfc8284d8..c59807ee4e3896c522ca0be79e1ba0fc60518b79 100644 (file)
@@ -5067,7 +5067,7 @@ void Objecter::enumerate_objects<librados::ListObjectImpl>(
                            hobject_t) &&> on_finish);
 
 template
-void Objecter::enumerate_objects<RADOS::Entry>(
+void Objecter::enumerate_objects<neorados::Entry>(
   int64_t pool_id,
   std::string_view ns,
   hobject_t start,
@@ -5075,7 +5075,7 @@ void Objecter::enumerate_objects<RADOS::Entry>(
   const uint32_t max,
   const cb::list& filter_bl,
   fu2::unique_function<void(bs::error_code,
-                           std::vector<RADOS::Entry>,
+                           std::vector<neorados::Entry>,
                            hobject_t) &&> on_finish);
 
 
@@ -5108,8 +5108,8 @@ void Objecter::_issue_enumerate<librados::ListObjectImpl>(
   hobject_t start,
   std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx);
 template
-void Objecter::_issue_enumerate<RADOS::Entry>(
-  hobject_t start, std::unique_ptr<EnumerationContext<RADOS::Entry>> ctx);
+void Objecter::_issue_enumerate<neorados::Entry>(
+  hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx);
 
 template<typename T>
 void Objecter::_enumerate_reply(
@@ -5210,10 +5210,10 @@ void Objecter::_enumerate_reply<librados::ListObjectImpl>(
   std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx);
 
 template
-void Objecter::_enumerate_reply<RADOS::Entry>(
+void Objecter::_enumerate_reply<neorados::Entry>(
   cb::list&& bl,
   bs::error_code ec,
-  std::unique_ptr<EnumerationContext<RADOS::Entry>>&& ctx);
+  std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx);
 
 namespace {
   using namespace librados;
index a34e02fe018ad2fee13ad2f4bcc6b2a00c9adf00..321055b041b5eb4ed8cf03247f725a41b3485dd6 100644 (file)
@@ -39,7 +39,7 @@
 #include "include/types.h"
 #include "include/rados/rados_types.hpp"
 #include "include/function2.hpp"
-#include "include/RADOS/RADOS_Decodable.hpp"
+#include "include/neorados/RADOS_Decodable.hpp"
 
 #include "common/admin_socket.h"
 #include "common/async/completion.h"
@@ -733,10 +733,10 @@ struct ObjectOperation {
   };
 
   struct CB_ObjectOperation_decodewatchersneo {
-    std::vector<RADOS::ObjWatcher>* pwatchers;
+    std::vector<neorados::ObjWatcher>* pwatchers;
     int* prval;
     boost::system::error_code* pec;
-    CB_ObjectOperation_decodewatchersneo(std::vector<RADOS::ObjWatcher>* pw,
+    CB_ObjectOperation_decodewatchersneo(std::vector<neorados::ObjWatcher>* pw,
                                         int* pr,
                                         boost::system::error_code* pec)
       : pwatchers(pw), prval(pr), pec(pec) {}
@@ -749,7 +749,7 @@ struct ObjectOperation {
          decode(resp, p);
          if (pwatchers) {
            for (const auto& watch_item : resp.entries) {
-             RADOS::ObjWatcher ow;
+             neorados::ObjWatcher ow;
              ow.addr = watch_item.addr.get_legacy_str();
              ow.watcher_id = watch_item.name.num();
              ow.cookie = watch_item.cookie;
@@ -770,11 +770,11 @@ struct ObjectOperation {
 
   struct CB_ObjectOperation_decodesnaps {
     librados::snap_set_t *psnaps;
-    RADOS::SnapSet *neosnaps;
+    neorados::SnapSet *neosnaps;
     int *prval;
     boost::system::error_code* pec;
     CB_ObjectOperation_decodesnaps(librados::snap_set_t* ps,
-                                  RADOS::SnapSet* ns, int* pr,
+                                  neorados::SnapSet* ns, int* pr,
                                   boost::system::error_code* pec)
       : psnaps(ps), neosnaps(ns), prval(pr), pec(pec) {}
     void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
@@ -806,7 +806,7 @@ struct ObjectOperation {
          if (neosnaps) {
            neosnaps->clones.clear();
            for (auto&& c : resp.clones) {
-             RADOS::CloneInfo clone;
+             neorados::CloneInfo clone;
 
              clone.cloneid = std::move(c.cloneid);
              clone.snaps.reserve(c.snaps.size());
@@ -1405,7 +1405,7 @@ struct ObjectOperation {
       out_rval.back() = prval;
     }
   }
-  void list_watchers(vector<RADOS::ObjWatcher>* out,
+  void list_watchers(vector<neorados::ObjWatcher>* out,
                     boost::system::error_code* ec) {
     add_op(CEPH_OSD_OP_LIST_WATCHERS);
     set_handler(CB_ObjectOperation_decodewatchersneo(out, nullptr, ec));
@@ -1422,7 +1422,7 @@ struct ObjectOperation {
     }
   }
 
-  void list_snaps(RADOS::SnapSet *out, int *prval,
+  void list_snaps(neorados::SnapSet *out, int *prval,
                  boost::system::error_code* ec = nullptr) {
     add_op(CEPH_OSD_OP_LIST_SNAPS);
     if (prval || out || ec) {
index cbd715661a34c8e03ee6e25c3fcc43834ffa6acd..25a2f2436c41da670422f1cab61615a19b83d61b 100644 (file)
@@ -35,6 +35,7 @@ add_subdirectory(fs)
 add_subdirectory(journal)
 add_subdirectory(libcephfs)
 add_subdirectory(librados)
+add_subdirectory(neorados)
 add_subdirectory(librados_test_stub)
 if(WITH_LIBRADOSSTRIPER)
   add_subdirectory(libradosstriper)
diff --git a/src/test/neorados/CMakeLists.txt b/src/test/neorados/CMakeLists.txt
new file mode 100644 (file)
index 0000000..38f7ae4
--- /dev/null
@@ -0,0 +1,2 @@
+add_executable(ceph_test_neorados_start_stop start_stop.cc)
+target_link_libraries(ceph_test_neorados_start_stop global libneorados ${unittest_libs})
diff --git a/src/test/neorados/start_stop.cc b/src/test/neorados/start_stop.cc
new file mode 100644 (file)
index 0000000..a2475e3
--- /dev/null
@@ -0,0 +1,175 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat <contact@redhat.com>
+ * Author: Adam C. Emerson <aemerson@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <thread>
+#include <vector>
+
+#include "include/neorados/RADOS.hpp"
+
+#include "common/async/context_pool.h"
+#include "common/ceph_argparse.h"
+
+#include "global/global_init.h"
+
+namespace R = neorados;
+
+
+int main(int argc, char** argv)
+{
+  using namespace std::literals;
+
+  std::vector<const char*> args;
+  argv_to_vec(argc, const_cast<const char**>(argv), args);
+  env_to_vec(args);
+
+  auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+                         CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(cct.get());
+
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(30s);
+  }
+  std::this_thread::sleep_for(30s);
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(30s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(1s);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(500ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(500ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(50ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(50ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(50ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5ms);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5us);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5us);
+  }
+  {
+    ceph::async::io_context_pool p(1);
+    auto r = R::RADOS::make_with_cct(cct.get(), p,
+                                        boost::asio::use_future).get();
+    std::this_thread::sleep_for(5us);
+  }
+  return 0;
+}