// TODO: issue requests in parallel if they don't write,
// with writes being basically a synchronization barrier
return seastar::do_for_each(std::begin(m->ops), std::end(m->ops),
- [m,&txn,this,os](OSDOp& osd_op) {
- return do_osd_op(*os, osd_op, txn);
- }).then([m,&txn,this,os=std::move(os)] {
- // XXX: the entire lambda can be scheduled conditionally
- // XXX: I'm not txn.empty() is what we want here
- return !txn.empty() ? backend->store_object_state(os, *m, txn)
- : seastar::now();
+ [m,&txn,this,pos=os.get()](OSDOp& osd_op) {
+ return do_osd_op(*pos, osd_op, txn);
+ }).then([&txn,m,this,os=std::move(os)]() mutable {
+ // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
+ return txn.empty() ? seastar::now()
+ : backend->mutate_object(std::move(os), std::move(txn), *m);
});
- }).then([&] {
- return txn.empty() ? seastar::now()
- : backend->submit_transaction(std::move(txn));
- }).then([=] {
+ }).then([m,this] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
}
seastar::future<>
-PGBackend::store_object_state(
- //const hobject_t& oid,
- const cached_os_t os,
- const MOSDOp& m,
- ceph::os::Transaction& txn)
+PGBackend::mutate_object(
+ cached_os_t&& os,
+ ceph::os::Transaction&& txn,
+ const MOSDOp& m)
{
+ logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
if (os->exists) {
#if 0
os.oi.version = ctx->at_version;
// reset cached ObjectState without enforcing eviction
os->oi = object_info_t(os->oi.soid);
}
- return seastar::now();
+ return store->do_transaction(coll, std::move(txn));
}
seastar::future<>
}
return seastar::now();
}
-
-seastar::future<> PGBackend::submit_transaction(ceph::os::Transaction&& txn)
-{
- logger().trace("submit_transaction: num_ops={}", txn.get_num_ops());
- return store->do_transaction(coll, std::move(txn));
-}
ceph::os::CyanStore* store,
const ec_profile_t& ec_profile);
using cached_os_t = boost::local_shared_ptr<ObjectState>;
- seastar::future<> store_object_state(const cached_os_t os,
- const MOSDOp& m,
- ceph::os::Transaction& txn);
seastar::future<cached_os_t> get_object_state(const hobject_t& oid);
seastar::future<> evict_object_state(const hobject_t& oid);
seastar::future<bufferlist> read(const object_info_t& oi,
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
- seastar::future<> submit_transaction(ceph::os::Transaction&& txn);
+ seastar::future<> mutate_object(
+ cached_os_t&& os,
+ ceph::os::Transaction&& txn,
+ const MOSDOp& m);
protected:
const shard_id_t shard;