// TODO: issue requests in parallel if they don't write,
// with writes being basically a synchronization barrier
return seastar::do_for_each(std::begin(m->ops), std::end(m->ops),
- [m,&txn,this,os=std::move(os)](OSDOp& osd_op) {
+ [m,&txn,this,os](OSDOp& osd_op) {
return do_osd_op(*os, osd_op, txn);
+ }).then([m,&txn,this,os=std::move(os)] {
+ // XXX: the entire lambda can be scheduled conditionally
+ // XXX: I'm not txn.empty() is what we want here
+ return !txn.empty() ? backend->store_object_state(os, *m, txn)
+ : seastar::now();
});
}).then([&] {
return txn.empty() ? seastar::now()
#include <fmt/ostream.h>
#include <seastar/core/print.hh>
+#include "messages/MOSDOp.h"
+
#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_object.h"
#include "crimson/os/cyan_store.h"
});
}
+seastar::future<>
+PGBackend::store_object_state(
+ //const hobject_t& oid,
+ const cached_os_t os,
+ const MOSDOp& m,
+ ceph::os::Transaction& txn)
+{
+ if (os->exists) {
+#if 0
+ os.oi.version = ctx->at_version;
+ os.oi.prior_version = ctx->obs->oi.version;
+#endif
+
+ os->oi.last_reqid = m.get_reqid();
+ os->oi.mtime = m.get_mtime();
+ os->oi.local_mtime = ceph_clock_now();
+
+ // object_info_t
+ {
+ ceph::bufferlist osv;
+ encode(os->oi, osv, 0);
+ // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ txn.setattr(coll->cid, ghobject_t{os->oi.soid}, OI_ATTR, std::move(osv));
+ }
+ } else {
+ // reset cached ObjectState without enforcing eviction
+ os->oi = object_info_t(os->oi.soid);
+ }
+ return seastar::now();
+}
+
seastar::future<>
PGBackend::evict_object_state(const hobject_t& oid)
{
});
}
+bool PGBackend::maybe_create_new_object(
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ if (!os.exists) {
+ ceph_assert(!os.oi.is_whiteout());
+ os.exists = true;
+ os.oi.new_object();
+
+ txn.touch(coll->cid, ghobject_t{os.oi.soid});
+ // TODO: delta_stats.num_objects++
+ return false;
+ } else if (os.oi.is_whiteout()) {
+ os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
+ // TODO: delta_stats.num_whiteouts--
+ }
+ return true;
+}
+
seastar::future<> PGBackend::writefull(
ObjectState& os,
const OSDOp& osd_op,
throw ::invalid_argument();
}
- if (os.exists && op.extent.length < os.oi.size) {
+ const bool existing = maybe_create_new_object(os, txn);
+ if (existing && op.extent.length < os.oi.size) {
txn.truncate(coll->cid, ghobject_t{os.oi.soid}, op.extent.length);
}
if (op.extent.length) {
txn.write(coll->cid, ghobject_t{os.oi.soid}, 0, op.extent.length,
osd_op.indata, op.flags);
+ os.oi.size = op.extent.length;
}
return seastar::now();
}
ceph::os::CyanStore* store,
const ec_profile_t& ec_profile);
using cached_os_t = boost::local_shared_ptr<ObjectState>;
+ seastar::future<> store_object_state(const cached_os_t os,
+ const MOSDOp& m,
+ ceph::os::Transaction& txn);
seastar::future<cached_os_t> get_object_state(const hobject_t& oid);
seastar::future<> evict_object_state(const hobject_t& oid);
seastar::future<bufferlist> read(const object_info_t& oi,
size_t offset,
size_t length,
uint32_t flags) = 0;
+ bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
};