]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: add futurized interfaces to CyanStore
authorchunmei Liu <chunmei.liu@intel.com>
Thu, 22 Aug 2019 06:50:02 +0000 (23:50 -0700)
committerKefu Chai <kchai@redhat.com>
Tue, 27 Aug 2019 14:55:57 +0000 (22:55 +0800)
Signed-off-by: chunmei Liu <chunmei.liu@intel.com>
src/crimson/os/cyan_store.cc
src/crimson/os/cyan_store.h
src/crimson/os/futurized_store.h
src/crimson/osd/osd.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_meta.cc

index 3bd953dc3562b8a359200cef25915aa3d61d8387..b69bcd2167162831723ccfce94f05d3fe3b045eb 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "cyan_store.h"
 
+#include <boost/algorithm/string/trim.hpp>
 #include <fmt/format.h>
 #include <fmt/ostream.h>
 
@@ -82,34 +83,35 @@ seastar::future<> CyanStore::umount()
 
 seastar::future<> CyanStore::mkfs(uuid_d new_osd_fsid)
 {
-  std::string fsid_str;
-  int r = read_meta("fsid", &fsid_str);
-  if (r == -ENOENT) {
-    if (new_osd_fsid.is_zero()) {
-      osd_fsid.generate_random();
+  return read_meta("fsid").then([=](auto r, auto fsid_str) {
+    if (r == -ENOENT) {
+      if (new_osd_fsid.is_zero()) {
+        osd_fsid.generate_random();
+      } else {
+        osd_fsid = new_osd_fsid;
+      }
+      return write_meta("fsid", fmt::format("{}", osd_fsid));
+    } else if (r < 0) {
+      throw std::runtime_error("read_meta");
     } else {
-      osd_fsid = new_osd_fsid;
-    }
-    write_meta("fsid", fmt::format("{}", osd_fsid));
-  } else if (r < 0) {
-    throw std::runtime_error("read_meta");
-  } else {
-    logger().info("{} already has fsid {}", __func__, fsid_str);
-    if (!osd_fsid.parse(fsid_str.c_str())) {
-      throw std::runtime_error("failed to parse fsid");
-    } else if (osd_fsid != new_osd_fsid) {
-      logger().error("on-disk fsid {} != provided {}", osd_fsid, new_osd_fsid);
-      throw std::runtime_error("unmatched osd_fsid");
+      logger().info("{} already has fsid {}", __func__, fsid_str);
+      if (!osd_fsid.parse(fsid_str.c_str())) {
+        throw std::runtime_error("failed to parse fsid");
+      } else if (osd_fsid != new_osd_fsid) {
+        logger().error("on-disk fsid {} != provided {}", osd_fsid, new_osd_fsid);
+        throw std::runtime_error("unmatched osd_fsid");
+      } else {
+       return seastar::now();
+      }
     }
-  }
-
-  std::string fn = path + "/collections";
-  ceph::bufferlist bl;
-  std::set<coll_t> collections;
-  ceph::encode(collections, bl);
-  return ceph::buffer::write_file(std::move(bl), fn).then([this] {
-    write_meta("type", "memstore");
-    return seastar::now();
+  }).then([this]{
+    std::string fn = path + "/collections";
+    ceph::bufferlist bl;
+    std::set<coll_t> collections;
+    ceph::encode(collections, bl);
+    return ceph::buffer::write_file(std::move(bl), fn);
+  }).then([this] {
+    return write_meta("type", "memstore");
   });
 }
 
@@ -147,27 +149,25 @@ CyanStore::list_objects(CollectionRef c,
     std::move(objects), next);
 }
 
-CollectionRef CyanStore::create_new_collection(const coll_t& cid)
+seastar::future<CollectionRef> CyanStore::create_new_collection(const coll_t& cid)
 {
   auto c = new Collection{cid};
-  return new_coll_map[cid] = c;
+  new_coll_map[cid] = c;
+  return seastar::make_ready_future<CollectionRef>(c);
 }
 
-CollectionRef CyanStore::open_collection(const coll_t& cid)
+seastar::future<CollectionRef> CyanStore::open_collection(const coll_t& cid)
 {
-  auto cp = coll_map.find(cid);
-  if (cp == coll_map.end())
-    return {};
-  return cp->second;
+  return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
 }
 
-std::vector<coll_t> CyanStore::list_collections()
+seastar::future<std::vector<coll_t>> CyanStore::list_collections()
 {
   std::vector<coll_t> collections;
   for (auto& coll : coll_map) {
     collections.push_back(coll.first);
   }
-  return collections;
+  return seastar::make_ready_future<std::vector<coll_t>>(std::move(collections));
 }
 
 seastar::future<ceph::bufferlist> CyanStore::read(CollectionRef c,
@@ -422,7 +422,7 @@ int CyanStore::_remove(const coll_t& cid, const ghobject_t& oid)
 {
   logger().debug("{} cid={} oid={}",
                 __func__, cid, oid);
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -439,7 +439,7 @@ int CyanStore::_touch(const coll_t& cid, const ghobject_t& oid)
 {
   logger().debug("{} cid={} oid={}",
                 __func__, cid, oid);
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -455,7 +455,7 @@ int CyanStore::_write(const coll_t& cid, const ghobject_t& oid,
                 __func__, cid, oid, offset, len);
   assert(len == bl.length());
 
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -478,7 +478,7 @@ int CyanStore::_omap_set_values(
     "{} {} {} {} keys",
     __func__, cid, oid, aset.size());
 
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -498,7 +498,7 @@ int CyanStore::_omap_set_header(
     "{} {} {} {} bytes",
     __func__, cid, oid, header.length());
 
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -516,7 +516,7 @@ int CyanStore::_omap_rmkeys(
     "{} {} {} {} keys",
     __func__, cid, oid, aset.size());
 
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -537,7 +537,7 @@ int CyanStore::_omap_rmkeyrange(
     "{} {} {} first={} last={}",
     __func__, cid, oid, first, last);
 
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -552,7 +552,7 @@ int CyanStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size
 {
   logger().debug("{} cid={} oid={} size={}",
                 __func__, cid, oid, size);
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -572,7 +572,7 @@ int CyanStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
 {
   logger().debug("{} cid={} oid={}",
                 __func__, cid, oid);
-  auto c = open_collection(cid);
+  auto c = _get_collection(cid);
   if (!c)
     return -ENOENT;
 
@@ -598,8 +598,16 @@ int CyanStore::_create_collection(const coll_t& cid, int bits)
   return 0;
 }
 
-void CyanStore::write_meta(const std::string& key,
-                           const std::string& value)
+CollectionRef CyanStore::_get_collection(const coll_t& cid)
+{
+  auto cp = coll_map.find(cid);
+  if (cp == coll_map.end())
+    return {};
+  return cp->second;
+}
+
+seastar::future<> CyanStore::write_meta(const std::string& key,
+                                       const std::string& value)
 {
   std::string v = value;
   v += "\n";
@@ -608,23 +616,22 @@ void CyanStore::write_meta(const std::string& key,
       r < 0) {
     throw std::runtime_error{fmt::format("unable to write_meta({})", key)};
   }
+  return seastar::make_ready_future<>();
 }
 
-int CyanStore::read_meta(const std::string& key,
-                          std::string* value)
+seastar::future<int, std::string> CyanStore::read_meta(const std::string& key)
 {
-  char buf[4096];
-  int r = safe_read_file(path.c_str(), key.c_str(),
-                         buf, sizeof(buf));
-  if (r <= 0) {
-    return r;
-  }
-  // drop trailing newlines
-  while (r && isspace(buf[r-1])) {
-    --r;
+  std::string fsid(4096, '\0');
+  int r = safe_read_file(path.c_str(), key.c_str(), fsid.data(), fsid.size());
+  if (r > 0) {
+    fsid.resize(r);
+    // drop trailing newlines
+    boost::algorithm::trim_right_if(fsid,
+                                   [](unsigned char c) {return isspace(c);});
+  } else {
+    fsid.clear();
   }
-  *value = std::string{buf, static_cast<size_t>(r)};
-  return 0;
+  return seastar::make_ready_future<int, std::string>(r, fsid);
 }
 
 uuid_d CyanStore::get_fsid() const
index 0ab215fb00f1932f41bb06216b0f7c5e4c67c435..f1f2a1ec515319fcd5cdd1b6c009d793b64c04a0 100644 (file)
@@ -73,16 +73,16 @@ public:
     const std::optional<std::string> &start ///< [in] start, empty for begin
     ) final; ///< @return <done, values> values.empty() iff done
 
-  CollectionRef create_new_collection(const coll_t& cid) final;
-  CollectionRef open_collection(const coll_t& cid) final;
-  std::vector<coll_t> list_collections() final;
+  seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
+  seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
+  seastar::future<std::vector<coll_t>> list_collections() final;
 
   seastar::future<> do_transaction(CollectionRef ch,
                                   Transaction&& txn) final;
 
-  void write_meta(const std::string& key,
+  seastar::future<> write_meta(const std::string& key,
                  const std::string& value) final;
-  int read_meta(const std::string& key, std::string* value) final;
+  seastar::future<int, std::string> read_meta(const std::string& key) final;
   uuid_d get_fsid() const final;
   unsigned get_max_attr_name_length() const final;
 
@@ -113,6 +113,7 @@ private:
   int _setattrs(const coll_t& cid, const ghobject_t& oid,
                 std::map<std::string,bufferptr>& aset);
   int _create_collection(const coll_t& cid, int bits);
+  CollectionRef _get_collection(const coll_t& cid);
 };
 
 }
index ebbe53a4ceffec4ee03de8434d45ec936373635f..a5e3654f1fcef592f1a1a0a90346b9ef308824f9 100644 (file)
@@ -94,16 +94,16 @@ public:
     const std::optional<std::string> &start ///< [in] start, empty for begin
     ) = 0; ///< @return <done, values> values.empty() iff done
 
-  virtual CollectionRef create_new_collection(const coll_t& cid) = 0;
-  virtual CollectionRef open_collection(const coll_t& cid) = 0;
-  virtual std::vector<coll_t> list_collections() = 0;
+  virtual seastar::future<CollectionRef> create_new_collection(const coll_t& cid) = 0;
+  virtual seastar::future<CollectionRef> open_collection(const coll_t& cid) = 0;
+  virtual seastar::future<std::vector<coll_t>> list_collections() = 0;
 
   virtual seastar::future<> do_transaction(CollectionRef ch,
                                   Transaction&& txn) = 0;
 
-  virtual void write_meta(const std::string& key,
-                 const std::string& value) = 0;
-  virtual int read_meta(const std::string& key, std::string* value) = 0;
+  virtual seastar::future<> write_meta(const std::string& key,
+                                      const std::string& value) = 0;
+  virtual seastar::future<int, std::string> read_meta(const std::string& key) = 0;
   virtual uuid_d get_fsid() const  = 0;
   virtual unsigned get_max_attr_name_length() const = 0;
 };
index e38bb9defd4a07aff2aaf1c7586adf6837682c03..312ab278f16789353cd5f17c177c793fd0ecf75e 100644 (file)
@@ -135,16 +135,18 @@ seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
       __func__,
       cluster_fsid,
       superblock.osd_fsid);
-
-    meta_coll = make_unique<OSDMeta>(
-      store->create_new_collection(coll_t::meta()), store.get());
+    return store->create_new_collection(coll_t::meta());
+  }).then([this] (auto ch) {
+    meta_coll = make_unique<OSDMeta>(ch , store.get());
     ceph::os::Transaction t;
     meta_coll->create(t);
     meta_coll->store_superblock(t, superblock);
     return store->do_transaction(meta_coll->collection(), std::move(t));
   }).then([cluster_fsid, this] {
-    store->write_meta("ceph_fsid", cluster_fsid.to_string());
-    store->write_meta("whoami", std::to_string(whoami));
+    return when_all_succeed(
+      store->write_meta("ceph_fsid", cluster_fsid.to_string()),
+      store->write_meta("whoami", std::to_string(whoami)));
+  }).then([cluster_fsid, this] {
     fmt::print("created object store {} for osd.{} fsid {}\n",
                local_conf().get_val<std::string>("osd_data"),
                whoami, cluster_fsid);
@@ -200,8 +202,9 @@ seastar::future<> OSD::start()
   startup_time = ceph::mono_clock::now();
 
   return store->mount().then([this] {
-    meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
-                                     store.get());
+    return store->open_collection(coll_t::meta());
+  }).then([this](auto ch) {
+    meta_coll = make_unique<OSDMeta>(ch, store.get());
     return meta_coll->load_superblock();
   }).then([this](OSDSuperblock&& sb) {
     superblock = std::move(sb);
@@ -399,8 +402,8 @@ seastar::future<> OSD::stop()
 
 seastar::future<> OSD::load_pgs()
 {
-  return seastar::parallel_for_each(store->list_collections(),
-    [this](auto coll) {
+  return store->list_collections().then([this](auto colls) {
+    return seastar::parallel_for_each(colls, [this](auto coll) {
       spg_t pgid;
       if (coll.is_pg(&pgid)) {
         return load_pg(pgid).then([pgid, this](auto&& pg) {
@@ -416,6 +419,7 @@ seastar::future<> OSD::load_pgs()
         return seastar::now();
       }
     });
+  });
 }
 
 seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map, spg_t pgid)
@@ -440,13 +444,18 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map, spg_t pgid)
   })().then([pgid, this, create_map](pg_pool_t&& pool,
                        string&& name,
                        ec_profile_t&& ec_profile) {
-    return seastar::make_ready_future<Ref<PG>>(Ref<PG>{new PG{pgid,
+    return shard_services.get_store().open_collection(coll_t::meta()).then(
+      [this, pgid, create_map, pool=std::move(pool), name, ec_profile]
+      (auto coll_ref) mutable {
+      return seastar::make_ready_future<Ref<PG>>(new PG{pgid,
            pg_shard_t{whoami, pgid.shard},
+           coll_ref,
            std::move(pool),
            std::move(name),
            create_map,
            shard_services,
-           ec_profile}});
+           ec_profile});
+    });
   });
 }
 
@@ -681,6 +690,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
        [this, &info](auto pg, auto startmap) -> seastar::future<Ref<PG>> {
          if (!pg)
            return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+        return store->create_new_collection(coll_t(info->pgid)).then([this, &info, startmap, pg] (auto coll) {
          PeeringCtx rctx;
          const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
 
@@ -695,7 +705,6 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
          }
 
 
-         auto coll = store->create_new_collection(coll_t(info->pgid));
          create_pg_collection(
            rctx.transaction,
            info->pgid,
@@ -721,9 +730,10 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
            *this, pg, pg->get_osdmap_epoch(),
            osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
              return seastar::make_ready_future<Ref<PG>>(pg);
-           });
+         });
        });
     });
+  });
 }
 
 seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
index 196e8342d1b30d3a663799f00daa3a78bcbff3ba..fa4e1401037b3f1a805595ae11708a36f9dfe4b0 100644 (file)
@@ -69,6 +69,7 @@ public:
 PG::PG(
   spg_t pgid,
   pg_shard_t pg_shard,
+  ceph::os::CollectionRef coll_ref,
   pg_pool_t&& pool,
   std::string&& name,
   cached_map_t osdmap,
@@ -76,7 +77,7 @@ PG::PG(
   ec_profile_t profile)
   : pgid{pgid},
     pg_whoami{pg_shard},
-    coll_ref(shard_services.get_store().open_collection(coll)),
+    coll_ref{coll_ref},
     pgmeta_oid{pgid.make_pgmeta_oid()},
     osdmap_gate("PG::osdmap_gate", std::nullopt),
     shard_services{shard_services},
@@ -211,10 +212,11 @@ void PG::init(
 
 seastar::future<> PG::read_state(ceph::os::FuturizedStore* store)
 {
-  coll_ref = store->open_collection(coll_t(pgid));
-  return PGMeta{store, pgid}.load().then(
-    [this, store](pg_info_t pg_info, PastIntervals past_intervals) {
-      return peering_state.init_from_disk_state(
+  return store->open_collection(coll_t(pgid)).then([this, store](auto ch) {
+    coll_ref = ch;
+    return PGMeta{store, pgid}.load();
+  }).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
+    return peering_state.init_from_disk_state(
        std::move(pg_info),
        std::move(past_intervals),
        [this, store, &pg_info] (PGLog &pglog) {
@@ -223,25 +225,25 @@ seastar::future<> PG::read_state(ceph::os::FuturizedStore* store)
            coll_ref,
            peering_state.get_info(),
            pgmeta_oid);
-       });
-    }).then([this, store]() {
-      int primary, up_primary;
-      vector<int> acting, up;
-      peering_state.get_osdmap()->pg_to_up_acting_osds(
+      });
+  }).then([this, store]() {
+    int primary, up_primary;
+    vector<int> acting, up;
+    peering_state.get_osdmap()->pg_to_up_acting_osds(
        pgid.pgid, &up, &up_primary, &acting, &primary);
-      peering_state.init_primary_up_acting(
+    peering_state.init_primary_up_acting(
        up,
        acting,
        up_primary,
        primary);
-      int rr = OSDMap::calc_pg_role(pg_whoami.osd, acting);
-      if (peering_state.get_pool().info.is_replicated() || rr == pg_whoami.shard)
+    int rr = OSDMap::calc_pg_role(pg_whoami.osd, acting);
+    if (peering_state.get_pool().info.is_replicated() || rr == pg_whoami.shard)
        peering_state.set_role(rr);
-      else
+    else
        peering_state.set_role(-1);
 
-      epoch_t epoch = get_osdmap_epoch();
-      shard_services.start_operation<LocalPeeringEvent>(
+    epoch_t epoch = get_osdmap_epoch();
+    shard_services.start_operation<LocalPeeringEvent>(
        this,
        shard_services,
        pg_whoami,
@@ -250,8 +252,8 @@ seastar::future<> PG::read_state(ceph::os::FuturizedStore* store)
        epoch,
        PeeringState::Initialize());
 
-      return seastar::now();
-    });
+    return seastar::now();
+  });
 }
 
 void PG::do_peering_event(
index 542cb7912a3237a6731d146a85d08f0703294021..31d8d315fdf33d59b4111ac8b8ab06c1fe57f283 100644 (file)
@@ -65,6 +65,7 @@ class PG : public boost::intrusive_ref_counter<
 public:
   PG(spg_t pgid,
      pg_shard_t pg_shard,
+     ceph::os::CollectionRef coll_ref,
      pg_pool_t&& pool,
      std::string&& name,
      cached_map_t osdmap,
index 4cf2051ec34a314e4f92859d015f90204804e77e..5510972eb6178cc7715401ce92a317a591faeee8 100644 (file)
@@ -31,8 +31,8 @@ namespace {
 }
 seastar::future<epoch_t> PGMeta::get_epoch()
 {
-  auto ch = store->open_collection(coll_t{pgid});
-  return store->omap_get_values(ch,
+  return store->open_collection(coll_t{pgid}).then([this](auto ch) {
+    return store->omap_get_values(ch,
                                 pgid.make_pgmeta_oid(),
                                 {string{infover_key},
                                  string{epoch_key}}).then(
@@ -51,12 +51,13 @@ seastar::future<epoch_t> PGMeta::get_epoch()
         return seastar::make_ready_future<epoch_t>(*epoch);
       }
     });
+  });
 }
 
 seastar::future<pg_info_t, PastIntervals> PGMeta::load()
 {
-  auto ch = store->open_collection(coll_t{pgid});
-  return store->omap_get_values(ch,
+  return store->open_collection(coll_t{pgid}).then([this](auto ch) {
+    return store->omap_get_values(ch,
                                 pgid.make_pgmeta_oid(),
                                 {string{infover_key},
                                  string{info_key},
@@ -95,4 +96,5 @@ seastar::future<pg_info_t, PastIntervals> PGMeta::load()
         std::move(info),
         std::move(past_intervals));
     });
+  });
 }