i->second.pending_apply.erase(from);
}
- if (i->second.pending_apply.empty() && i->second.on_all_applied) {
- dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
- i->second.on_all_applied->complete(0);
- i->second.on_all_applied = 0;
- i->second.trace.event("ec write all applied");
- }
if (i->second.pending_commit.empty() && i->second.on_all_commit) {
dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
i->second.on_all_commit->complete(0);
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
- Context *on_all_applied,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
op->roll_forward_to = std::max(roll_forward_to, committed_to);
op->log_entries = log_entries;
std::swap(op->updated_hit_set_history, hset_history);
- op->on_all_applied = on_all_applied;
op->on_all_commit = on_all_commit;
op->tid = tid;
op->reqid = reqid;
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
- Context *on_all_applied,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
return !remote_read.empty() && remote_read_result.empty();
}
- /// In progress write state
+ /// In progress write state.
set<pg_shard_t> pending_commit;
+ // we need pending_apply for pre-mimic peers so that we don't issue a
+ // read on a remote shard before it has applied a previous write. We can
+ // remove this after nautilus.
set<pg_shard_t> pending_apply;
bool write_in_progress() const {
return !pending_commit.empty() || !pending_apply.empty();
ExtentCache::write_pin pin;
/// Callbacks
- Context *on_all_applied = nullptr;
Context *on_all_commit = nullptr;
~Op() {
- delete on_all_applied;
delete on_all_commit;
}
};
virtual void release_locks(ObcLockManager &manager) = 0;
- virtual void op_applied(
- const eversion_t &applied_version) = 0;
-
virtual bool should_send_op(
pg_shard_t peer,
const hobject_t &hoid) = 0;
const vector<pg_log_entry_t> &log_entries, ///< [in] log entries for t
/// [in] hitset history (if updated with this transaction)
boost::optional<pg_hit_set_history_t> &hset_history,
- Context *on_all_applied, ///< [in] called when all acked
Context *on_all_commit, ///< [in] called when all commit
ceph_tid_t tid, ///< [in] tid
osd_reqid_t reqid, ///< [in] reqid
min_last_complete_ondisk,
ctx->log,
ctx->updated_hset_history,
- nullptr,
on_all_commit,
repop->rep_tid,
ctx->reqid,
dout(10) << __func__ << dendl;
for (auto& op : in_progress_ops) {
delete op.second.on_commit;
- delete op.second.on_applied;
}
in_progress_ops.clear();
clear_recovery_state();
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &_log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
- Context *on_all_acked,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
make_pair(
tid,
InProgressOp(
- tid, on_all_commit, on_all_acked,
+ tid, on_all_commit,
orig_op, at_version)
)
);
assert(insert_res.second);
InProgressOp &op = insert_res.first->second;
- op.waiting_for_applied.insert(
- parent->get_actingbackfill_shards().begin(),
- parent->get_actingbackfill_shards().end());
op.waiting_for_commit.insert(
parent->get_actingbackfill_shards().begin(),
parent->get_actingbackfill_shards().end());
tls.push_back(std::move(op_t));
parent->queue_transactions(tls, op.op);
- op_applied(&op);
-}
-
-void ReplicatedBackend::op_applied(
- InProgressOp *op)
-{
- FUNCTRACE(cct);
- OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true);
- dout(10) << __func__ << ": " << op->tid << dendl;
- if (op->op) {
- op->op->mark_event("op_applied");
- op->op->pg_trace.event("op applied");
- }
-
- op->waiting_for_applied.erase(get_parent()->whoami_shard());
-
- if (op->waiting_for_applied.empty()) {
- op->on_applied->complete(0);
- op->on_applied = 0;
- }
- if (op->done()) {
- assert(!op->on_commit && !op->on_applied);
- in_progress_ops.erase(op->tid);
- }
}
void ReplicatedBackend::op_commit(
if (op->waiting_for_commit.empty()) {
op->on_commit->complete(0);
op->on_commit = 0;
- }
- if (op->done()) {
- assert(!op->on_commit && !op->on_applied);
+ assert(!op->on_commit);
in_progress_ops.erase(op->tid);
}
}
ip_op.op->pg_trace.event("sub_op_commit_rec");
}
} else {
- assert(ip_op.waiting_for_applied.count(from));
- if (ip_op.op) {
- ostringstream ss;
- ss << "sub_op_applied_rec from " << from;
- ip_op.op->mark_event_string(ss.str());
- ip_op.op->pg_trace.event("sub_op_applied_rec");
- }
+ // legacy peer; ignore
}
- ip_op.waiting_for_applied.erase(from);
parent->update_peer_last_complete_ondisk(
from,
r->get_last_complete_ondisk());
- if (ip_op.waiting_for_applied.empty() &&
- ip_op.on_applied) {
- ip_op.on_applied->complete(0);
- ip_op.on_applied = 0;
- }
if (ip_op.waiting_for_commit.empty() &&
ip_op.on_commit) {
ip_op.on_commit->complete(0);
- ip_op.on_commit= 0;
- }
- if (ip_op.done()) {
- assert(!ip_op.on_commit && !ip_op.on_applied);
+ ip_op.on_commit = 0;
in_progress_ops.erase(iter);
}
}
tls.push_back(std::move(rm->localt));
tls.push_back(std::move(rm->opt));
parent->queue_transactions(tls, op);
- repop_applied(rm);
// op is cleaned up by oncommit/onapply when both are executed
}
-void ReplicatedBackend::repop_applied(RepModifyRef rm)
-{
- rm->op->mark_event("sub_op_applied");
- rm->applied = true;
- rm->op->pg_trace.event("sup_op_applied");
-
- dout(10) << __func__ << " on " << rm << " op "
- << *rm->op->get_req() << dendl;
- const Message *m = rm->op->get_req();
- const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
- eversion_t version = req->version;
-
- // send ack to acker only if we haven't sent a commit already
- if (!rm->committed) {
- Message *ack = new MOSDRepOpReply(
- req, parent->whoami_shard(),
- 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
- ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- ack->trace = rm->op->pg_trace;
- get_parent()->send_message_osd_cluster(
- rm->ackerosd, ack, get_osdmap()->get_epoch());
- }
-
-}
-
void ReplicatedBackend::repop_commit(RepModifyRef rm)
{
rm->op->mark_commit_sent();
struct InProgressOp {
ceph_tid_t tid;
set<pg_shard_t> waiting_for_commit;
- set<pg_shard_t> waiting_for_applied;
Context *on_commit;
- Context *on_applied;
OpRequestRef op;
eversion_t v;
InProgressOp(
- ceph_tid_t tid, Context *on_commit, Context *on_applied,
+ ceph_tid_t tid, Context *on_commit,
OpRequestRef op, eversion_t v)
- : tid(tid), on_commit(on_commit), on_applied(on_applied),
+ : tid(tid), on_commit(on_commit),
op(op), v(v) {}
bool done() const {
- return waiting_for_commit.empty() &&
- waiting_for_applied.empty();
+ return waiting_for_commit.empty();
}
};
map<ceph_tid_t, InProgressOp> in_progress_ops;
public:
friend class C_OSD_OnOpCommit;
- friend class C_OSD_OnOpApplied;
void call_write_ordered(std::function<void(void)> &&cb) override {
// ReplicatedBackend submits writes inline in submit_transaction, so
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
- Context *on_all_applied,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
boost::optional<pg_hit_set_history_t> &hset_history,
InProgressOp *op,
ObjectStore::Transaction &op_t);
- void op_applied(InProgressOp *op);
void op_commit(InProgressOp *op);
void do_repop_reply(OpRequestRef op);
void do_repop(OpRequestRef op);
struct RepModify {
OpRequestRef op;
- bool applied, committed;
+ bool committed;
int ackerosd;
eversion_t last_complete;
epoch_t epoch_started;
ObjectStore::Transaction opt, localt;
- RepModify() : applied(false), committed(false), ackerosd(-1),
+ RepModify() : committed(false), ackerosd(-1),
epoch_started(0) {}
};
typedef ceph::shared_ptr<RepModify> RepModifyRef;
- struct C_OSD_RepModifyApply;
struct C_OSD_RepModifyCommit;
- void repop_applied(RepModifyRef rm);
void repop_commit(RepModifyRef rm);
bool auto_repair_supported() const override { return false; }