ctx->op ? ctx->op->get_req()->get_connection() :
ConnectionRef());
});
+ ctx->register_on_finish(
+ [ctx, this]() {
+ delete ctx;
+ });
// issue replica writes
ceph_tid_t rep_tid = osd->get_tid();
- RepGather *repop = new_repop(ctx, obc, rep_tid); // new repop claims our obc, src_obc refs
- // note: repop now owns ctx AND ctx->op
-
- repop->src_obc.swap(src_obc); // and src_obc.
- issue_repop(repop);
+ RepGather *repop = new_repop(ctx, obc, rep_tid);
+ issue_repop(repop, ctx);
eval_repop(repop);
repop->put();
}
void ReplicatedPG::eval_repop(RepGather *repop)
{
MOSDOp *m = NULL;
- if (repop->ctx->op)
- m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
+ if (repop->op)
+ m = static_cast<MOSDOp *>(repop->op->get_req());
if (m)
dout(10) << "eval_repop " << *repop
waiting_for_ondisk[repop->v].begin();
i != waiting_for_ondisk[repop->v].end();
++i) {
- osd->reply_op_error(i->first, 0, repop->ctx->at_version,
+ osd->reply_op_error(i->first, 0, repop->v,
i->second);
}
waiting_for_ondisk.erase(repop->v);
++i) {
MOSDOp *m = static_cast<MOSDOp*>(i->first->get_req());
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
- reply->set_reply_versions(repop->ctx->at_version,
+ reply->set_reply_versions(repop->v,
i->second);
reply->add_flags(CEPH_OSD_FLAG_ACK);
osd->send_message_osd_client(reply, m->get_connection());
}
}
-void ReplicatedPG::issue_repop(RepGather *repop)
+void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
{
- OpContext *ctx = repop->ctx;
const hobject_t& soid = ctx->obs->oi.soid;
if (ctx->op &&
((static_cast<MOSDOp *>(
}
}
- repop->obc->ondisk_write_lock();
- if (repop->ctx->clone_obc)
- repop->ctx->clone_obc->ondisk_write_lock();
+ ctx->obc->ondisk_write_lock();
+ if (ctx->clone_obc)
+ ctx->clone_obc->ondisk_write_lock();
bool unlock_snapset_obc = false;
- if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
- repop->obc->obs.oi.soid) {
- repop->ctx->snapset_obc->ondisk_write_lock();
+ if (ctx->snapset_obc && ctx->snapset_obc->obs.oi.soid !=
+ ctx->obc->obs.oi.soid) {
+ ctx->snapset_obc->ondisk_write_lock();
unlock_snapset_obc = true;
}
- repop->ctx->apply_pending_attrs();
+ ctx->apply_pending_attrs();
if (pool.info.require_rollback()) {
- for (vector<pg_log_entry_t>::iterator i = repop->ctx->log.begin();
- i != repop->ctx->log.end();
+ for (vector<pg_log_entry_t>::iterator i = ctx->log.begin();
+ i != ctx->log.end();
++i) {
assert(i->mod_desc.can_rollback());
assert(!i->mod_desc.empty());
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
- repop->obc,
- repop->ctx->clone_obc,
- unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
+ ctx->obc,
+ ctx->clone_obc,
+ unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef());
pgbackend->submit_transaction(
soid,
- repop->ctx->at_version,
- std::move(repop->ctx->op_t),
+ ctx->at_version,
+ std::move(ctx->op_t),
pg_trim_to,
min_last_complete_ondisk,
- repop->ctx->log,
- repop->ctx->updated_hset_history,
+ ctx->log,
+ ctx->updated_hset_history,
onapplied_sync,
on_all_applied,
on_all_commit,
repop->rep_tid,
- repop->ctx->reqid,
- repop->ctx->op);
+ ctx->reqid,
+ ctx->op);
}
ReplicatedPG::RepGather *ReplicatedPG::new_repop(
else
dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
- RepGather *repop = new RepGather(ctx, obc, rep_tid, info.last_complete);
+ RepGather *repop = new RepGather(ctx, rep_tid, info.last_complete);
repop->start = ceph_clock_now(cct);
void ReplicatedPG::remove_repop(RepGather *repop)
{
dout(20) << __func__ << " " << *repop << dendl;
- assert(repop->ctx->obc);
- dout(20) << " obc " << *repop->ctx->obc << dendl;
- if (repop->ctx->clone_obc)
- dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl;
- if (repop->ctx->snapset_obc)
- dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl;
for (auto p = repop->on_finish.begin();
p != repop->on_finish.end();
}
release_object_locks(
- repop->ctx->obc->obs.oi.soid.get_head(),
- repop->ctx->lock_manager);
+ repop->lock_manager);
repop->put();
osd->logger->dec(l_osd_op_wip);
{
RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid);
dout(20) << __func__ << " " << repop << dendl;
- issue_repop(repop);
+ issue_repop(repop, ctx.get());
eval_repop(repop);
repop->put();
}
repop->on_success.clear();
if (requeue) {
- if (repop->ctx->op) {
- dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl;
- rq.push_back(repop->ctx->op);
- repop->ctx->op = OpRequestRef();
+ if (repop->op) {
+ dout(10) << " requeuing " << *repop->op->get_req() << dendl;
+ rq.push_back(repop->op);
+ repop->op = OpRequestRef();
}
// also requeue any dups, interleaved into position
*/
class RepGather {
public:
+ hobject_t hoid;
+ OpRequestRef op;
xlist<RepGather*>::item queue_item;
int nref;
eversion_t v;
- OpContext *ctx;
- ObjectContextRef obc;
- map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator> src_obc;
-
ceph_tid_t rep_tid;
bool rep_aborted, rep_done;
eversion_t pg_local_last_complete;
+ ObcLockManager lock_manager;
+
list<std::function<void()>> on_applied;
list<std::function<void()>> on_committed;
list<std::function<void()>> on_success;
list<std::function<void()>> on_finish;
- RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt,
+ RepGather(OpContext *c, ceph_tid_t rt,
eversion_t lc) :
+ hoid(c->obc->obs.oi.soid),
+ op(c->op),
queue_item(this),
nref(1),
- ctx(c), obc(pi),
rep_tid(rt),
rep_aborted(false), rep_done(false),
all_applied(false), all_committed(false),
pg_local_last_complete(lc),
+ lock_manager(std::move(c->lock_manager)),
on_applied(std::move(c->on_applied)),
on_committed(std::move(c->on_committed)),
on_success(std::move(c->on_success)),
void put() {
assert(nref > 0);
if (--nref == 0) {
- delete ctx; // must already be unlocked
assert(on_applied.empty());
delete this;
//generic_dout(0) << "deleting " << this << dendl;
void repop_all_applied(RepGather *repop);
void repop_all_committed(RepGather *repop);
void eval_repop(RepGather*);
- void issue_repop(RepGather *repop);
+ void issue_repop(RepGather *repop, OpContext *ctx);
RepGather *new_repop(
OpContext *ctx,
ObjectContextRef obc,
<< " rep_tid=" << repop.rep_tid
<< " committed?=" << repop.all_committed
<< " applied?=" << repop.all_applied;
- if (repop.ctx->lock_type != ObjectContext::RWState::RWNONE)
- out << " lock=" << (int)repop.ctx->lock_type;
- if (repop.ctx->op)
- out << " op=" << *(repop.ctx->op->get_req());
out << ")";
return out;
}