dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
}
-void ECBackend::call_write_ordered(std::function<void(void)> &&cb)
-{
- if (waiting_state.empty() &&
- waiting_reads.empty()) {
- dout(10) << __func__ << " sync" << dendl;
- cb();
+void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
+ if (!waiting_state.empty()) {
+ waiting_state.back().on_write.emplace_back(std::move(cb));
+ } else if (!waiting_reads.empty()) {
+ waiting_reads.back().on_write.emplace_back(std::move(cb));
} else {
- ceph_tid_t tid = parent->get_tid();
- Op& op = tid_to_op_map[tid];
- op.tid = tid;
- op.on_write = std::move(cb);
- if (!waiting_state.empty()) {
- dout(10) << __func__ << " tid " << tid << " waiting_state" << dendl;
- waiting_state.push_back(op);
- } else if (!waiting_reads.empty()) {
- dout(10) << __func__ << " tid " << tid << " waiting_reads" << dendl;
- waiting_reads.push_back(op);
- } else {
- ceph_abort();
- }
+ // Nothing earlier in the pipeline, just call it
+ cb();
}
}
if (waiting_reads.empty())
return false;
Op *op = &(waiting_reads.front());
- if (op->on_write) {
- waiting_reads.pop_front();
- op->on_write();
- tid_to_op_map.erase(op->tid);
- return true;
- }
if (op->read_in_progress())
return false;
waiting_reads.pop_front();
}
}
if (should_write_local) {
- handle_sub_write(
- get_parent()->whoami_shard(),
- op->client_op,
- local_write_op,
- op->trace,
- op->on_local_applied_sync);
- op->on_local_applied_sync = 0;
+ handle_sub_write(
+ get_parent()->whoami_shard(),
+ op->client_op,
+ local_write_op,
+ op->trace,
+ op->on_local_applied_sync);
+ op->on_local_applied_sync = 0;
}
+
+ for (auto i = op->on_write.begin();
+ i != op->on_write.end();
+ op->on_write.erase(i++)) {
+ (*i)();
+ }
+
return true;
}
map<hobject_t, ObjectContextRef> obc_map;
/// see call_write_ordered
- std::function<void(void)> on_write;
+ std::list<std::function<void(void)> > on_write;
/// Generated internally
set<hobject_t> temp_added;
Context *on_local_applied_sync = nullptr;
Context *on_all_applied = nullptr;
Context *on_all_commit = nullptr;
-
- Op() {}
- Op(ceph_tid_t t, std::function<void(void)>&& cb)
- : tid(t), on_write(cb) { }
~Op() {
delete on_local_applied_sync;
delete on_all_applied;