Support for `attr_cache` in replicated backend is planned for later.
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
return ll_read_ierrorator::now();
}
+PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
+ECBackend::getxattr(
+ const hobject_t& soid,
+ std::string&& key) const
+{
+ // ENOSUPP! ECBackend reads xattr solely from the ObjectContext::attr_cache
+ return crimson::ct_error::enodata::make();
+}
+
} // namespace crimson::osd
ll_read_ierrorator::future<> handle_rep_read_reply(ECSubReadReply& mop);
ll_read_ierrorator::future<> handle_rep_read_reply(Ref<MOSDECSubOpReadReply>);
+ PGBackend::get_attr_ierrorator::future<ceph::bufferlist> getxattr(
+ const hobject_t& soid,
+ std::string&& key) const final;
+
private:
friend class ECRecoveryBackend;
ECCommon::ReadPipeline read_pipeline;
ECCommon::RMWPipeline rmw_pipeline;
+
+ bool is_erasure() const override { return true; }
};
}
std::map<watch_key_t, seastar::shared_ptr<crimson::osd::Watch>> watchers;
// attr cache. ECTransaction is the initial user
- std::map<std::string, ceph::buffer::list, std::less<>> attr_cache;
+ using attr_cache_t = std::map<std::string, ceph::buffer::list, std::less<>>;
+ attr_cache_t attr_cache;
CommonOBCPipeline obc_pipeline;
return std::forward<Func>(f)(pg->get_backend(), std::as_const(obc->obs));
}
+template <class Func>
+auto OpsExecuter::do_read_attr_cache(Func&& f) {
+ ++num_read;
+ // TODO: pass backend as read-only
+ return std::invoke(
+ std::forward<Func>(f),
+ pg->get_backend(),
+ std::as_const(obc->attr_cache),
+ std::as_const(obc->obs));
+}
+
// Defined here because there is a circular dependency between OpsExecuter and PG
template <class Func>
auto OpsExecuter::do_write_op(Func&& f, OpsExecuter::modified_by m) {
check_init_op_params(m);
return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn);
}
+template <class Func>
+auto OpsExecuter::do_write_op_attr_cache(Func&& f, OpsExecuter::modified_by m) {
+ ++num_write;
+ check_init_op_params(m);
+ return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn, obc->attr_cache);
+}
OpsExecuter::call_errorator::future<> OpsExecuter::do_assert_ver(
OSDOp& osd_op,
const ObjectState& os)
return backend.cmp_ext(os, osd_op);
});
case CEPH_OSD_OP_GETXATTR:
- return do_read_op([this, &osd_op](auto& backend, const auto& os) {
- return backend.getxattr(os, osd_op, delta_stats);
+ return do_read_attr_cache([this, &osd_op](auto& backend,
+ const auto& attr_cache,
+ const auto& os) {
+ return backend.getxattr(os, attr_cache, osd_op, delta_stats);
});
case CEPH_OSD_OP_GETXATTRS:
- return do_read_op([this, &osd_op](auto& backend, const auto& os) {
- return backend.get_xattrs(os, osd_op, delta_stats);
+ return do_read_attr_cache([this, &osd_op](auto& backend,
+ const auto& attr_cache,
+ const auto& os) {
+ return backend.get_xattrs(os, attr_cache, osd_op, delta_stats);
});
case CEPH_OSD_OP_CMPXATTR:
return do_read_op([this, &osd_op](auto& backend, const auto& os) {
return backend.cmp_xattr(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_RMXATTR:
- return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
- return backend.rm_xattr(os, osd_op, txn);
+ return do_write_op_attr_cache([&osd_op](auto& backend, auto& os, auto& txn, auto& attr_cache) {
+ return backend.rm_xattr(os, osd_op, txn, attr_cache);
});
case CEPH_OSD_OP_CREATE:
return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
return backend.set_allochint(os, osd_op, txn, delta_stats);
});
case CEPH_OSD_OP_SETXATTR:
- return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
- return backend.setxattr(os, osd_op, txn, delta_stats);
+ return do_write_op_attr_cache([this, &osd_op](auto& backend, auto& os, auto& txn, auto& attr_cache) {
+ return backend.setxattr(os, osd_op, txn, delta_stats, attr_cache);
});
case CEPH_OSD_OP_DELETE:
{
return do_const_op(std::forward<Func>(f));
}
+ template <class Func>
+ auto do_read_attr_cache(Func&& f);
+
template <class Func>
auto do_snapset_op(Func&& f) {
++num_read;
template <class Func>
auto do_write_op(Func&& f, modified_by m = modified_by::user);
+ template <class Func>
+ auto do_write_op_attr_cache(Func&& f, modified_by m = modified_by::user);
decltype(auto) dont_do_legacy_op() {
return crimson::ct_error::operation_not_supported::make();
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- object_stat_sum_t& delta_stats)
+ object_stat_sum_t& delta_stats,
+ ObjectContext::attr_cache_t& attr_cache)
{
if (local_conf()->osd_max_attr_size > 0 &&
osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
}
logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
+ attr_cache[name] = val;
delta_stats.num_wr++;
return setxattr_ierrorator::future<>(seastar::now());
});
bp.copy(osd_op.op.xattr.name_len, aname);
name = "_" + aname;
}
- logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
- return getxattr(os.oi.soid, std::move(name)).safe_then_interruptible(
+ auto get_attr_maybe_from_cache =
+ [&] () mutable -> get_attr_ierrorator::future<ceph::bufferlist> {
+ if (!is_erasure()) {
+ logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
+ return getxattr(os.oi.soid, std::move(name));
+ }
+ if (auto cache_it = attr_cache.find(name); cache_it != std::end(attr_cache)) {
+ return get_attr_ierrorator::make_ready_future<ceph::bufferlist>(
+ cache_it->second);
+ }
+ logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
+ return crimson::ct_error::enodata::make();
+ };
+ return get_attr_maybe_from_cache().safe_then_interruptible(
[&delta_stats, &osd_op] (ceph::bufferlist&& val) {
osd_op.outdata = std::move(val);
osd_op.op.xattr.value_len = osd_op.outdata.length();
});
}
-PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
-PGBackend::getxattr(
- const hobject_t& soid,
- std::string&& key) const
-{
- return seastar::do_with(key, [this, &soid](auto &key) {
- return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>(
- store, coll, ghobject_t{soid}, key, 0);
- });
-}
-
PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
const ObjectState& os,
+ const ObjectContext::attr_cache_t& attr_cache,
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const
{
- return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
- store, coll, ghobject_t{os.oi.soid}, 0).safe_then(
+ auto get_attrs_maybe_from_cache =
+ [&] () {
+ if (!is_erasure()) {
+ logger().debug("getxattrx on obj={} goes into objstore", os.oi.soid);
+ return store->get_attrs(coll, ghobject_t{os.oi.soid});
+ }
+ return crimson::os::FuturizedStore::Shard::get_attrs_ertr::make_ready_future<
+ crimson::os::FuturizedStore::Shard::attrs_t>(attr_cache);
+ };
+ return get_attrs_maybe_from_cache().safe_then(
[&delta_stats, &osd_op](auto&& attrs) {
std::vector<std::pair<std::string, bufferlist>> user_xattrs;
ceph::bufferlist bl;
PGBackend::rm_xattr(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& txn)
+ ceph::os::Transaction& txn,
+ ObjectContext::attr_cache_t& attr_cache)
{
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: {} DNE", __func__, os.oi.soid);
string attr_name{"_"};
bp.copy(osd_op.op.xattr.name_len, attr_name);
txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name);
+ attr_cache.erase(attr_name);
return rm_xattr_iertr::now();
}
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- object_stat_sum_t& delta_stats);
+ object_stat_sum_t& delta_stats,
+ ObjectContext::attr_cache_t& attr_cache);
using get_attr_errorator = crimson::os::FuturizedStore::Shard::get_attr_errorator;
using get_attr_ierrorator =
::crimson::interruptible::interruptible_errorator<
get_attr_errorator>;
get_attr_ierrorator::future<> getxattr(
const ObjectState& os,
+ const ObjectContext::attr_cache_t& attr_cache,
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const;
- get_attr_ierrorator::future<ceph::bufferlist> getxattr(
+ virtual get_attr_ierrorator::future<ceph::bufferlist> getxattr(
const hobject_t& soid,
- std::string&& key) const;
+ std::string&& key) const = 0;
get_attr_ierrorator::future<> get_xattrs(
const ObjectState& os,
+ const ObjectContext::attr_cache_t& attr_cache,
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const;
using cmp_xattr_errorator = get_attr_errorator::extend<
rm_xattr_iertr::future<> rm_xattr(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& trans);
+ ceph::os::Transaction& trans,
+ ObjectContext::attr_cache_t& attr_cache);
interruptible_future<struct stat> stat(
CollectionRef c,
const ghobject_t& oid) const;
boost::container::flat_set<hobject_t> temp_contents;
friend class RecoveryBackend;
+
+ virtual bool is_erasure() const { return false; }
};
}
pg.peering_state.update_pct(m.pg_committed_to);
}
+PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
+ReplicatedBackend::getxattr(
+ const hobject_t& soid,
+ std::string&& key) const
+{
+ return seastar::do_with(key, [this, &soid](auto &key) {
+ return store->get_attr(coll, ghobject_t{soid}, key);
+ });
+}
+
}
void got_rep_op_reply(const MOSDRepOpReply& reply) final;
seastar::future<> stop() final;
void on_actingset_changed(bool same_primary) final;
+
+ PGBackend::get_attr_ierrorator::future<ceph::bufferlist> getxattr(
+ const hobject_t& soid,
+ std::string&& key) const final;
+
private:
ll_read_ierrorator::future<ceph::bufferlist>
_read(const hobject_t& hoid,