#include "cyan_store.h"
+#include <boost/algorithm/string/trim.hpp>
#include <fmt/format.h>
#include <fmt/ostream.h>
seastar::future<> CyanStore::mkfs(uuid_d new_osd_fsid)
{
- std::string fsid_str;
- int r = read_meta("fsid", &fsid_str);
- if (r == -ENOENT) {
- if (new_osd_fsid.is_zero()) {
- osd_fsid.generate_random();
+ return read_meta("fsid").then([=](auto r, auto fsid_str) {
+ if (r == -ENOENT) {
+ if (new_osd_fsid.is_zero()) {
+ osd_fsid.generate_random();
+ } else {
+ osd_fsid = new_osd_fsid;
+ }
+ return write_meta("fsid", fmt::format("{}", osd_fsid));
+ } else if (r < 0) {
+ throw std::runtime_error("read_meta");
} else {
- osd_fsid = new_osd_fsid;
- }
- write_meta("fsid", fmt::format("{}", osd_fsid));
- } else if (r < 0) {
- throw std::runtime_error("read_meta");
- } else {
- logger().info("{} already has fsid {}", __func__, fsid_str);
- if (!osd_fsid.parse(fsid_str.c_str())) {
- throw std::runtime_error("failed to parse fsid");
- } else if (osd_fsid != new_osd_fsid) {
- logger().error("on-disk fsid {} != provided {}", osd_fsid, new_osd_fsid);
- throw std::runtime_error("unmatched osd_fsid");
+ logger().info("{} already has fsid {}", __func__, fsid_str);
+ if (!osd_fsid.parse(fsid_str.c_str())) {
+ throw std::runtime_error("failed to parse fsid");
+ } else if (osd_fsid != new_osd_fsid) {
+ logger().error("on-disk fsid {} != provided {}", osd_fsid, new_osd_fsid);
+ throw std::runtime_error("unmatched osd_fsid");
+ } else {
+ return seastar::now();
+ }
}
- }
-
- std::string fn = path + "/collections";
- ceph::bufferlist bl;
- std::set<coll_t> collections;
- ceph::encode(collections, bl);
- return ceph::buffer::write_file(std::move(bl), fn).then([this] {
- write_meta("type", "memstore");
- return seastar::now();
+ }).then([this]{
+ std::string fn = path + "/collections";
+ ceph::bufferlist bl;
+ std::set<coll_t> collections;
+ ceph::encode(collections, bl);
+ return ceph::buffer::write_file(std::move(bl), fn);
+ }).then([this] {
+ return write_meta("type", "memstore");
});
}
std::move(objects), next);
}
-CollectionRef CyanStore::create_new_collection(const coll_t& cid)
+seastar::future<CollectionRef> CyanStore::create_new_collection(const coll_t& cid)
{
auto c = new Collection{cid};
- return new_coll_map[cid] = c;
+ new_coll_map[cid] = c;
+ return seastar::make_ready_future<CollectionRef>(c);
}
-CollectionRef CyanStore::open_collection(const coll_t& cid)
+seastar::future<CollectionRef> CyanStore::open_collection(const coll_t& cid)
{
- auto cp = coll_map.find(cid);
- if (cp == coll_map.end())
- return {};
- return cp->second;
+ return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
}
-std::vector<coll_t> CyanStore::list_collections()
+seastar::future<std::vector<coll_t>> CyanStore::list_collections()
{
std::vector<coll_t> collections;
for (auto& coll : coll_map) {
collections.push_back(coll.first);
}
- return collections;
+ return seastar::make_ready_future<std::vector<coll_t>>(std::move(collections));
}
seastar::future<ceph::bufferlist> CyanStore::read(CollectionRef c,
{
logger().debug("{} cid={} oid={}",
__func__, cid, oid);
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
{
logger().debug("{} cid={} oid={}",
__func__, cid, oid);
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
__func__, cid, oid, offset, len);
assert(len == bl.length());
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
"{} {} {} {} keys",
__func__, cid, oid, aset.size());
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
"{} {} {} {} bytes",
__func__, cid, oid, header.length());
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
"{} {} {} {} keys",
__func__, cid, oid, aset.size());
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
"{} {} {} first={} last={}",
__func__, cid, oid, first, last);
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
{
logger().debug("{} cid={} oid={} size={}",
__func__, cid, oid, size);
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
{
logger().debug("{} cid={} oid={}",
__func__, cid, oid);
- auto c = open_collection(cid);
+ auto c = _get_collection(cid);
if (!c)
return -ENOENT;
return 0;
}
-void CyanStore::write_meta(const std::string& key,
- const std::string& value)
+CollectionRef CyanStore::_get_collection(const coll_t& cid)
+{
+ auto cp = coll_map.find(cid);
+ if (cp == coll_map.end())
+ return {};
+ return cp->second;
+}
+
+seastar::future<> CyanStore::write_meta(const std::string& key,
+ const std::string& value)
{
std::string v = value;
v += "\n";
r < 0) {
throw std::runtime_error{fmt::format("unable to write_meta({})", key)};
}
+ return seastar::make_ready_future<>();
}
-int CyanStore::read_meta(const std::string& key,
- std::string* value)
+seastar::future<int, std::string> CyanStore::read_meta(const std::string& key)
{
- char buf[4096];
- int r = safe_read_file(path.c_str(), key.c_str(),
- buf, sizeof(buf));
- if (r <= 0) {
- return r;
- }
- // drop trailing newlines
- while (r && isspace(buf[r-1])) {
- --r;
+ std::string fsid(4096, '\0');
+ int r = safe_read_file(path.c_str(), key.c_str(), fsid.data(), fsid.size());
+ if (r > 0) {
+ fsid.resize(r);
+ // drop trailing newlines
+ boost::algorithm::trim_right_if(fsid,
+ [](unsigned char c) {return isspace(c);});
+ } else {
+ fsid.clear();
}
- *value = std::string{buf, static_cast<size_t>(r)};
- return 0;
+ return seastar::make_ready_future<int, std::string>(r, fsid);
}
uuid_d CyanStore::get_fsid() const
const std::optional<std::string> &start ///< [in] start, empty for begin
) final; ///< @return <done, values> values.empty() iff done
- CollectionRef create_new_collection(const coll_t& cid) final;
- CollectionRef open_collection(const coll_t& cid) final;
- std::vector<coll_t> list_collections() final;
+ seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
+ seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
+ seastar::future<std::vector<coll_t>> list_collections() final;
seastar::future<> do_transaction(CollectionRef ch,
Transaction&& txn) final;
- void write_meta(const std::string& key,
+ seastar::future<> write_meta(const std::string& key,
const std::string& value) final;
- int read_meta(const std::string& key, std::string* value) final;
+ seastar::future<int, std::string> read_meta(const std::string& key) final;
uuid_d get_fsid() const final;
unsigned get_max_attr_name_length() const final;
int _setattrs(const coll_t& cid, const ghobject_t& oid,
std::map<std::string,bufferptr>& aset);
int _create_collection(const coll_t& cid, int bits);
+ CollectionRef _get_collection(const coll_t& cid);
};
}
const std::optional<std::string> &start ///< [in] start, empty for begin
) = 0; ///< @return <done, values> values.empty() iff done
- virtual CollectionRef create_new_collection(const coll_t& cid) = 0;
- virtual CollectionRef open_collection(const coll_t& cid) = 0;
- virtual std::vector<coll_t> list_collections() = 0;
+ virtual seastar::future<CollectionRef> create_new_collection(const coll_t& cid) = 0;
+ virtual seastar::future<CollectionRef> open_collection(const coll_t& cid) = 0;
+ virtual seastar::future<std::vector<coll_t>> list_collections() = 0;
virtual seastar::future<> do_transaction(CollectionRef ch,
Transaction&& txn) = 0;
- virtual void write_meta(const std::string& key,
- const std::string& value) = 0;
- virtual int read_meta(const std::string& key, std::string* value) = 0;
+ virtual seastar::future<> write_meta(const std::string& key,
+ const std::string& value) = 0;
+ virtual seastar::future<int, std::string> read_meta(const std::string& key) = 0;
virtual uuid_d get_fsid() const = 0;
virtual unsigned get_max_attr_name_length() const = 0;
};
__func__,
cluster_fsid,
superblock.osd_fsid);
-
- meta_coll = make_unique<OSDMeta>(
- store->create_new_collection(coll_t::meta()), store.get());
+ return store->create_new_collection(coll_t::meta());
+ }).then([this] (auto ch) {
+ meta_coll = make_unique<OSDMeta>(ch , store.get());
ceph::os::Transaction t;
meta_coll->create(t);
meta_coll->store_superblock(t, superblock);
return store->do_transaction(meta_coll->collection(), std::move(t));
}).then([cluster_fsid, this] {
- store->write_meta("ceph_fsid", cluster_fsid.to_string());
- store->write_meta("whoami", std::to_string(whoami));
+ return when_all_succeed(
+ store->write_meta("ceph_fsid", cluster_fsid.to_string()),
+ store->write_meta("whoami", std::to_string(whoami)));
+ }).then([cluster_fsid, this] {
fmt::print("created object store {} for osd.{} fsid {}\n",
local_conf().get_val<std::string>("osd_data"),
whoami, cluster_fsid);
startup_time = ceph::mono_clock::now();
return store->mount().then([this] {
- meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
- store.get());
+ return store->open_collection(coll_t::meta());
+ }).then([this](auto ch) {
+ meta_coll = make_unique<OSDMeta>(ch, store.get());
return meta_coll->load_superblock();
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
seastar::future<> OSD::load_pgs()
{
- return seastar::parallel_for_each(store->list_collections(),
- [this](auto coll) {
+ return store->list_collections().then([this](auto colls) {
+ return seastar::parallel_for_each(colls, [this](auto coll) {
spg_t pgid;
if (coll.is_pg(&pgid)) {
return load_pg(pgid).then([pgid, this](auto&& pg) {
return seastar::now();
}
});
+ });
}
seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map, spg_t pgid)
})().then([pgid, this, create_map](pg_pool_t&& pool,
string&& name,
ec_profile_t&& ec_profile) {
- return seastar::make_ready_future<Ref<PG>>(Ref<PG>{new PG{pgid,
+ return shard_services.get_store().open_collection(coll_t::meta()).then(
+ [this, pgid, create_map, pool=std::move(pool), name, ec_profile]
+ (auto coll_ref) mutable {
+ return seastar::make_ready_future<Ref<PG>>(new PG{pgid,
pg_shard_t{whoami, pgid.shard},
+ coll_ref,
std::move(pool),
std::move(name),
create_map,
shard_services,
- ec_profile}});
+ ec_profile});
+ });
});
}
[this, &info](auto pg, auto startmap) -> seastar::future<Ref<PG>> {
if (!pg)
return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+ return store->create_new_collection(coll_t(info->pgid)).then([this, &info, startmap, pg] (auto coll) {
PeeringCtx rctx;
const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
}
- auto coll = store->create_new_collection(coll_t(info->pgid));
create_pg_collection(
rctx.transaction,
info->pgid,
*this, pg, pg->get_osdmap_epoch(),
osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
return seastar::make_ready_future<Ref<PG>>(pg);
- });
+ });
});
});
+ });
}
seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
PG::PG(
spg_t pgid,
pg_shard_t pg_shard,
+ ceph::os::CollectionRef coll_ref,
pg_pool_t&& pool,
std::string&& name,
cached_map_t osdmap,
ec_profile_t profile)
: pgid{pgid},
pg_whoami{pg_shard},
- coll_ref(shard_services.get_store().open_collection(coll)),
+ coll_ref{coll_ref},
pgmeta_oid{pgid.make_pgmeta_oid()},
osdmap_gate("PG::osdmap_gate", std::nullopt),
shard_services{shard_services},
seastar::future<> PG::read_state(ceph::os::FuturizedStore* store)
{
- coll_ref = store->open_collection(coll_t(pgid));
- return PGMeta{store, pgid}.load().then(
- [this, store](pg_info_t pg_info, PastIntervals past_intervals) {
- return peering_state.init_from_disk_state(
+ return store->open_collection(coll_t(pgid)).then([this, store](auto ch) {
+ coll_ref = ch;
+ return PGMeta{store, pgid}.load();
+ }).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
+ return peering_state.init_from_disk_state(
std::move(pg_info),
std::move(past_intervals),
[this, store, &pg_info] (PGLog &pglog) {
coll_ref,
peering_state.get_info(),
pgmeta_oid);
- });
- }).then([this, store]() {
- int primary, up_primary;
- vector<int> acting, up;
- peering_state.get_osdmap()->pg_to_up_acting_osds(
+ });
+ }).then([this, store]() {
+ int primary, up_primary;
+ vector<int> acting, up;
+ peering_state.get_osdmap()->pg_to_up_acting_osds(
pgid.pgid, &up, &up_primary, &acting, &primary);
- peering_state.init_primary_up_acting(
+ peering_state.init_primary_up_acting(
up,
acting,
up_primary,
primary);
- int rr = OSDMap::calc_pg_role(pg_whoami.osd, acting);
- if (peering_state.get_pool().info.is_replicated() || rr == pg_whoami.shard)
+ int rr = OSDMap::calc_pg_role(pg_whoami.osd, acting);
+ if (peering_state.get_pool().info.is_replicated() || rr == pg_whoami.shard)
peering_state.set_role(rr);
- else
+ else
peering_state.set_role(-1);
- epoch_t epoch = get_osdmap_epoch();
- shard_services.start_operation<LocalPeeringEvent>(
+ epoch_t epoch = get_osdmap_epoch();
+ shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
epoch,
PeeringState::Initialize());
- return seastar::now();
- });
+ return seastar::now();
+ });
}
void PG::do_peering_event(
public:
PG(spg_t pgid,
pg_shard_t pg_shard,
+ ceph::os::CollectionRef coll_ref,
pg_pool_t&& pool,
std::string&& name,
cached_map_t osdmap,
}
seastar::future<epoch_t> PGMeta::get_epoch()
{
- auto ch = store->open_collection(coll_t{pgid});
- return store->omap_get_values(ch,
+ return store->open_collection(coll_t{pgid}).then([this](auto ch) {
+ return store->omap_get_values(ch,
pgid.make_pgmeta_oid(),
{string{infover_key},
string{epoch_key}}).then(
return seastar::make_ready_future<epoch_t>(*epoch);
}
});
+ });
}
seastar::future<pg_info_t, PastIntervals> PGMeta::load()
{
- auto ch = store->open_collection(coll_t{pgid});
- return store->omap_get_values(ch,
+ return store->open_collection(coll_t{pgid}).then([this](auto ch) {
+ return store->omap_get_values(ch,
pgid.make_pgmeta_oid(),
{string{infover_key},
string{info_key},
std::move(info),
std::move(past_intervals));
});
+ });
}