seastar::future<> CyanStore::mount()
{
- bufferlist bl;
- string fn = path + "/collections";
- string err;
+ ceph::bufferlist bl;
+ std::string fn = path + "/collections";
+ std::string err;
if (int r = bl.read_file(fn.c_str(), &err); r < 0) {
throw std::runtime_error("read_file");
}
ceph::decode(collections, p);
for (auto& coll : collections) {
- string fn = fmt::format("{}/{}", path, coll);
- bufferlist cbl;
+ std::string fn = fmt::format("{}/{}", path, coll);
+ ceph::bufferlist cbl;
if (int r = cbl.read_file(fn.c_str(), &err); r < 0) {
throw std::runtime_error("read_file");
}
seastar::future<> CyanStore::umount()
{
- set<coll_t> collections;
+ std::set<coll_t> collections;
for (auto& [col, ch] : coll_map) {
collections.insert(col);
- bufferlist bl;
+ ceph::bufferlist bl;
ceph_assert(ch);
ch->encode(bl);
- string fn = fmt::format("{}/{}", path, col);
+ std::string fn = fmt::format("{}/{}", path, col);
if (int r = bl.write_file(fn.c_str()); r < 0) {
throw std::runtime_error("write_file");
}
}
- string fn = path + "/collections";
- bufferlist bl;
+ std::string fn = path + "/collections";
+ ceph::bufferlist bl;
ceph::encode(collections, bl);
if (int r = bl.write_file(fn.c_str()); r < 0) {
throw std::runtime_error("write_file");
seastar::future<> CyanStore::mkfs()
{
- string fsid_str;
+ std::string fsid_str;
int r = read_meta("fsid", &fsid_str);
if (r == -ENOENT) {
osd_fsid.generate_random();
}
}
- string fn = path + "/collections";
- bufferlist bl;
- set<coll_t> collections;
+ std::string fn = path + "/collections";
+ ceph::bufferlist bl;
+ std::set<coll_t> collections;
ceph::encode(collections, bl);
r = bl.write_file(fn.c_str());
if (r < 0)
return collections;
}
-seastar::future<bufferlist> CyanStore::read(CollectionRef c,
+seastar::future<ceph::bufferlist> CyanStore::read(CollectionRef c,
const ghobject_t& oid,
uint64_t offset,
size_t len,
throw std::runtime_error(fmt::format("object does not exist: {}", oid));
}
if (offset >= o->get_size())
- return seastar::make_ready_future<bufferlist>();
+ return seastar::make_ready_future<ceph::bufferlist>();
size_t l = len;
if (l == 0 && offset == 0) // note: len == 0 means read the entire object
l = o->get_size();
else if (offset + l > o->get_size())
l = o->get_size() - offset;
- bufferlist bl;
+ ceph::bufferlist bl;
if (int r = o->read(offset, l, bl); r < 0) {
throw std::runtime_error("read");
}
- return seastar::make_ready_future<bufferlist>(std::move(bl));
+ return seastar::make_ready_future<ceph::bufferlist>(std::move(bl));
}
seastar::future<ceph::bufferptr> CyanStore::get_attr(CollectionRef c,
return seastar::make_ready_future<omap_values_t>(std::move(values));
}
+seastar::future<bool, CyanStore::omap_values_t>
+CyanStore::omap_get_values(
+ CollectionRef c,
+ const ghobject_t &oid,
+ const std::optional<string> &start
+ ) {
+ logger().debug(
+ "{} {} {}",
+ __func__, c->cid, oid);
+ auto o = c->get_object(oid);
+ if (!o) {
+ throw std::runtime_error(fmt::format("object does not exist: {}", oid));
+ }
+ omap_values_t values;
+ for (auto i = start ? o->omap.upper_bound(*start) : o->omap.begin();
+ values.size() < MAX_KEYS_PER_OMAP_GET_CALL && i != o->omap.end();
+ ++i) {
+ values.insert(*i);
+ }
+ return seastar::make_ready_future<bool, omap_values_t>(
+ true, values);
+}
+
seastar::future<> CyanStore::do_transaction(CollectionRef ch,
Transaction&& t)
{
- auto i = t.begin();
- while (i.have_op()) {
- Transaction::Op* op = i.decode_op();
- int r = 0;
- switch (op->op) {
- case Transaction::OP_NOP:
- break;
- case Transaction::OP_REMOVE:
+ int r = 0;
+ try {
+ auto i = t.begin();
+ while (i.have_op()) {
+ Transaction::Op* op = i.decode_op();
+ r = 0;
+ switch (op->op) {
+ case Transaction::OP_NOP:
+ break;
+ case Transaction::OP_REMOVE:
{
- coll_t cid = i.get_cid(op->cid);
- ghobject_t oid = i.get_oid(op->oid);
- r = _remove(cid, oid);
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ r = _remove(cid, oid);
}
- case Transaction::OP_TOUCH:
+ break;
+ case Transaction::OP_TOUCH:
{
coll_t cid = i.get_cid(op->cid);
ghobject_t oid = i.get_oid(op->oid);
r = _touch(cid, oid);
}
break;
- case Transaction::OP_WRITE:
+ case Transaction::OP_WRITE:
{
coll_t cid = i.get_cid(op->cid);
ghobject_t oid = i.get_oid(op->oid);
uint64_t off = op->off;
uint64_t len = op->len;
uint32_t fadvise_flags = i.get_fadvise_flags();
- bufferlist bl;
+ ceph::bufferlist bl;
i.decode_bl(bl);
r = _write(cid, oid, off, len, bl, fadvise_flags);
}
break;
- case Transaction::OP_TRUNCATE:
+ case Transaction::OP_TRUNCATE:
{
coll_t cid = i.get_cid(op->cid);
ghobject_t oid = i.get_oid(op->oid);
r = _truncate(cid, oid, off);
}
break;
- case Transaction::OP_SETATTR:
+ case Transaction::OP_SETATTR:
{
coll_t cid = i.get_cid(op->cid);
ghobject_t oid = i.get_oid(op->oid);
- string name = i.decode_string();
- bufferlist bl;
+ std::string name = i.decode_string();
+ ceph::bufferlist bl;
i.decode_bl(bl);
- map<string, bufferptr> to_set;
+ std::map<std::string, bufferptr> to_set;
to_set[name] = bufferptr(bl.c_str(), bl.length());
r = _setattrs(cid, oid, to_set);
}
break;
- case Transaction::OP_MKCOLL:
+ case Transaction::OP_MKCOLL:
{
coll_t cid = i.get_cid(op->cid);
r = _create_collection(cid, op->split_bits);
}
break;
- default:
- logger().error("bad op {}", static_cast<unsigned>(op->op));
- abort();
- }
- if (r < 0) {
- abort();
+ case Transaction::OP_OMAP_SETKEYS:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ std::map<std::string, ceph::bufferlist> aset;
+ i.decode_attrset(aset);
+ r = _omap_set_values(cid, oid, std::move(aset));
+ }
+ break;
+ case Transaction::OP_OMAP_SETHEADER:
+ {
+ const coll_t &cid = i.get_cid(op->cid);
+ const ghobject_t &oid = i.get_oid(op->oid);
+ ceph::bufferlist bl;
+ i.decode_bl(bl);
+ r = _omap_set_header(cid, oid, bl);
+ }
+ break;
+ case Transaction::OP_OMAP_RMKEYS:
+ {
+ const coll_t &cid = i.get_cid(op->cid);
+ const ghobject_t &oid = i.get_oid(op->oid);
+ std::set<std::string> keys;
+ i.decode_keyset(keys);
+ r = _omap_rmkeys(cid, oid, keys);
+ }
+ break;
+ case Transaction::OP_OMAP_RMKEYRANGE:
+ {
+ const coll_t &cid = i.get_cid(op->cid);
+ const ghobject_t &oid = i.get_oid(op->oid);
+ string first, last;
+ first = i.decode_string();
+ last = i.decode_string();
+ r = _omap_rmkeyrange(cid, oid, first, last);
+ }
+ break;
+ case Transaction::OP_COLL_HINT:
+ {
+ ceph::bufferlist hint;
+ i.decode_bl(hint);
+ // ignored
+ break;
+ }
+ default:
+ logger().error("bad op {}", static_cast<unsigned>(op->op));
+ abort();
+ }
+ if (r < 0) {
+ break;
+ }
}
+ } catch (std::exception &e) {
+ logger().error("{} got exception {}", __func__, e);
+ r = -EINVAL;
+ }
+ if (r < 0) {
+ logger().error(" transaction dump:\n");
+ JSONFormatter f(true);
+ f.open_object_section("transaction");
+ t.dump(&f);
+ f.close_section();
+ std::stringstream str;
+ f.flush(str);
+ logger().error("{}", str.str());
+ ceph_assert(r == 0);
}
return seastar::now();
}
}
int CyanStore::_write(const coll_t& cid, const ghobject_t& oid,
- uint64_t offset, size_t len, const bufferlist& bl,
+ uint64_t offset, size_t len, const ceph::bufferlist& bl,
uint32_t fadvise_flags)
{
logger().debug("{} {} {} {} ~ {}",
return 0;
}
+int CyanStore::_omap_set_values(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ std::map<std::string, ceph::bufferlist> &&aset)
+{
+ logger().debug(
+ "{} {} {} {} keys",
+ __func__, cid, oid, aset.size());
+
+ auto c = open_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_or_create_object(oid);
+ for (auto &&i: aset) {
+ o->omap.insert(std::move(i));
+ }
+ return 0;
+}
+
+int CyanStore::_omap_set_header(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ const ceph::bufferlist &header)
+{
+ logger().debug(
+ "{} {} {} {} bytes",
+ __func__, cid, oid, header.length());
+
+ auto c = open_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_or_create_object(oid);
+ o->omap_header = header;
+ return 0;
+}
+
+int CyanStore::_omap_rmkeys(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ const std::set<std::string> &aset)
+{
+ logger().debug(
+ "{} {} {} {} keys",
+ __func__, cid, oid, aset.size());
+
+ auto c = open_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_or_create_object(oid);
+ for (auto &i: aset) {
+ o->omap.erase(i);
+ }
+ return 0;
+}
+
+int CyanStore::_omap_rmkeyrange(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ const std::string &first,
+ const std::string &last)
+{
+ logger().debug(
+ "{} {} {} first={} last={}",
+ __func__, cid, oid, first, last);
+
+ auto c = open_collection(cid);
+ if (!c)
+ return -ENOENT;
+
+ ObjectRef o = c->get_or_create_object(oid);
+ for (auto i = o->omap.lower_bound(first);
+ i != o->omap.end() && i->first <= last;
+ o->omap.erase(i++));
+ return 0;
+}
+
int CyanStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
{
logger().debug("{} cid={} oid={} size={}",
}
int CyanStore::_setattrs(const coll_t& cid, const ghobject_t& oid,
- map<string,bufferptr>& aset)
+ std::map<std::string,bufferptr>& aset)
{
logger().debug("{} cid={} oid={}",
__func__, cid, oid);
ObjectRef o = c->get_object(oid);
if (!o)
return -ENOENT;
- for (map<string,bufferptr>::const_iterator p = aset.begin(); p != aset.end(); ++p)
+ for (std::map<std::string, bufferptr>::const_iterator p = aset.begin();
+ p != aset.end(); ++p)
o->xattr[p->first] = p->second;
return 0;
}
while (r && isspace(buf[r-1])) {
--r;
}
- *value = string{buf, static_cast<size_t>(r)};
+ *value = std::string{buf, static_cast<size_t>(r)};
return 0;
}
#include <typeinfo>
#include <vector>
+#include <optional>
#include <seastar/core/future.hh>
#include "crimson/os/cyan_collection.h"
// a just-enough store for reading/writing the superblock
class CyanStore {
+ constexpr static unsigned MAX_KEYS_PER_OMAP_GET_CALL = 32;
+
const std::string path;
std::unordered_map<coll_t, CollectionRef> coll_map;
std::map<coll_t,CollectionRef> new_coll_map;
seastar::future<> umount();
seastar::future<> mkfs();
- seastar::future<bufferlist> read(CollectionRef c,
+ seastar::future<ceph::bufferlist> read(CollectionRef c,
const ghobject_t& oid,
uint64_t offset,
size_t len,
std::string_view name);
using attrs_t = std::map<std::string, ceph::bufferptr, std::less<>>;
seastar::future<attrs_t> get_attrs(CollectionRef c, const ghobject_t& oid);
- using omap_values_t = std::map<std::string,bufferlist, std::less<>>;
+
+ using omap_values_t = std::map<std::string,ceph::bufferlist, std::less<>>;
seastar::future<omap_values_t> omap_get_values(
CollectionRef c,
const ghobject_t& oid,
std::vector<std::string>&& keys);
+
seastar::future<std::vector<ghobject_t>, ghobject_t> list_objects(
CollectionRef c,
const ghobject_t& start,
const ghobject_t& end,
uint64_t limit);
+
+ /// Retrieves paged set of values > start (if present)
+ seastar::future<bool, omap_values_t> omap_get_values(
+ CollectionRef c, ///< [in] collection
+ const ghobject_t &oid, ///< [in] oid
+ const std::optional<std::string> &start ///< [in] start, empty for begin
+ ); ///< @return <done, values> values.empty() iff done
+
CollectionRef create_new_collection(const coll_t& cid);
CollectionRef open_collection(const coll_t& cid);
std::vector<coll_t> list_collections();
int _remove(const coll_t& cid, const ghobject_t& oid);
int _touch(const coll_t& cid, const ghobject_t& oid);
int _write(const coll_t& cid, const ghobject_t& oid,
- uint64_t offset, size_t len, const bufferlist& bl,
+ uint64_t offset, size_t len, const ceph::bufferlist& bl,
uint32_t fadvise_flags);
+ int _omap_set_values(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ std::map<std::string, ceph::bufferlist> &&aset);
+ int _omap_set_header(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ const ceph::bufferlist &header);
+ int _omap_rmkeys(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ const std::set<std::string> &aset);
+ int _omap_rmkeyrange(
+ const coll_t& cid,
+ const ghobject_t& oid,
+ const std::string &first,
+ const std::string &last);
int _truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size);
int _setattrs(const coll_t& cid, const ghobject_t& oid,
- map<string,bufferptr>& aset);
+ std::map<std::string,bufferptr>& aset);
int _create_collection(const coll_t& cid, int bits);
};