break;
}
if (oi.is_dirty()) {
- result = start_flush(ctx, false, NULL);
+ result = start_flush(ctx->op, ctx->obc, false, NULL, NULL);
+ if (result == -EINPROGRESS)
+ result = -EAGAIN;
} else {
result = 0;
}
}
hobject_t missing;
if (oi.is_dirty()) {
- result = start_flush(ctx, true, &missing);
+ result = start_flush(ctx->op, ctx->obc, true, &missing, NULL);
+ if (result == -EINPROGRESS)
+ result = -EAGAIN;
} else {
result = 0;
}
}
};
-int ReplicatedPG::start_flush(OpContext *ctx, bool blocking, hobject_t *pmissing)
+int ReplicatedPG::start_flush(
+ OpRequestRef op, ObjectContextRef obc,
+ bool blocking, hobject_t *pmissing,
+ Context *on_flush)
{
- const object_info_t& oi = ctx->obc->obs.oi;
+ const object_info_t& oi = obc->obs.oi;
const hobject_t& soid = oi.soid;
dout(10) << __func__ << " " << soid
<< " v" << oi.version
<< dendl;
// verify there are no (older) check for dirty clones
- SnapSet& snapset = ctx->obc->ssc->snapset;
+ SnapSet& snapset = obc->ssc->snapset;
{
dout(20) << " snapset " << snapset << dendl;
vector<snapid_t>::reverse_iterator p = snapset.clones.rbegin();
}
if (blocking)
- ctx->obc->start_block();
+ obc->start_block();
map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(soid);
if (p != flush_ops.end()) {
FlushOpRef fop = p->second;
- if (fop->ctx->op == ctx->op) {
+ if (fop->op == op) {
// we couldn't take the write lock on a cache-try-flush before;
// now we are trying again for the lock.
- // clean up the previous ctx and use the new one.
- close_op_ctx(fop->ctx, -EAGAIN);
- fop->ctx = ctx;
return try_flush_mark_clean(fop);
}
- if (fop->flushed_version == ctx->obc->obs.oi.user_version &&
+ if (fop->flushed_version == obc->obs.oi.user_version &&
(fop->blocking || !blocking)) {
// nonblocking can join anything
// blocking can only join a blocking flush
dout(20) << __func__ << " piggybacking on existing flush " << dendl;
- fop->dup_ops.push_back(ctx->op);
+ fop->dup_ops.push_back(op);
return -EAGAIN; // clean up this ctx; op will retry later
}
// cancel current flush since it will fail anyway, or because we
// are blocking and the existing flush is nonblocking.
dout(20) << __func__ << " canceling previous flush; it will fail" << dendl;
- if (fop->ctx->op)
- osd->reply_op_error(fop->ctx->op, -EBUSY);
+ if (fop->op)
+ osd->reply_op_error(fop->op, -EBUSY);
while (!fop->dup_ops.empty()) {
osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
fop->dup_ops.pop_front();
}
FlushOpRef fop(new FlushOp);
- fop->ctx = ctx;
+ fop->obc = obc;
fop->flushed_version = oi.user_version;
fop->blocking = blocking;
+ fop->on_flush = on_flush;
+ fop->op = op;
ObjectOperation o;
if (oi.is_whiteout()) {
<< " tid " << fop->objecter_tid << dendl;
return;
}
- ObjectContextRef obc = fop->ctx->obc;
+ ObjectContextRef obc = fop->obc;
fop->objecter_tid = 0;
if (r < 0 && !(r == -ENOENT && fop->removal)) {
- reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
- obc->obs.oi.user_version);
+ if (fop->op)
+ osd->reply_op_error(fop->op, -EBUSY);
if (!fop->dup_ops.empty()) {
dout(20) << __func__ << " requeueing dups" << dendl;
requeue_ops(fop->dup_ops);
}
+ if (fop->on_flush) {
+ Context *on_flush = fop->on_flush;
+ fop->on_flush = NULL;
+ on_flush->complete(-EBUSY);
+ }
flush_ops.erase(oid);
return;
}
- delete fop->ctx->op_t;
- fop->ctx->op_t = pgbackend->get_transaction();
-
r = try_flush_mark_clean(fop);
- if (r == -EBUSY) {
- reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
- obc->obs.oi.user_version);
+ if (r == -EBUSY && fop->op) {
+ osd->reply_op_error(fop->op, r);
}
}
int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
{
- ObjectContextRef obc = fop->ctx->obc;
+ ObjectContextRef obc = fop->obc;
const hobject_t& oid = obc->obs.oi.soid;
+ if (fop->blocking) {
+ obc->stop_block();
+ kick_object_context_blocked(obc);
+ }
+
if (fop->flushed_version != obc->obs.oi.user_version ||
!obc->obs.exists) {
if (obc->obs.exists)
dout(20) << __func__ << " requeueing dups" << dendl;
requeue_ops(fop->dup_ops);
}
- if (fop->blocking) {
- obc->stop_block();
- kick_object_context_blocked(obc);
+ if (fop->on_flush) {
+ Context *on_flush = fop->on_flush;
+ fop->on_flush = NULL;
+ on_flush->complete(-EBUSY);
}
flush_ops.erase(oid);
if (fop->blocking)
}
// successfully flushed; can we clear the dirty bit?
- if (!fop->blocking) {
- // non-blocking: try to take the lock manually, since we don't
- // have a ctx yet.
- if (obc->get_write(fop->ctx->op)) {
- dout(20) << __func__ << " took write lock" << dendl;
- } else if (fop->ctx->op) {
- dout(10) << __func__ << " waiting on write lock" << dendl;
- return -EINPROGRESS; // will retry. this ctx is still alive!
- } else {
- dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
- osd->logger->inc(l_osd_tier_try_flush_fail);
- cancel_flush(fop, false);
- return -ECANCELED;
- }
+ // try to take the lock manually, since we don't
+ // have a ctx yet.
+ if (obc->get_write(fop->op)) {
+ dout(20) << __func__ << " took write lock" << dendl;
+ } else if (fop->op) {
+ dout(10) << __func__ << " waiting on write lock" << dendl;
+ requeue_op(fop->op);
+ requeue_ops(fop->dup_ops);
+ return -EAGAIN; // will retry
} else {
- dout(20) << __func__ << " already holding write lock: "
- << obc->rwstate << dendl;
- assert(obc->rwstate.state == ObjectContext::RWState::RWWRITE);
- assert(fop->ctx->lock_to_release == OpContext::W_LOCK);
-
- // let other writes continue
- obc->stop_block();
- kick_object_context_blocked(obc);
+ dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
+ osd->logger->inc(l_osd_tier_try_flush_fail);
+ cancel_flush(fop, false);
+ return -ECANCELED;
}
dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
- ceph_tid_t rep_tid = osd->get_tid();
- RepGather *repop = new_repop(fop->ctx, obc, rep_tid);
- OpContext *ctx = fop->ctx;
- if (!fop->blocking) {
- ctx->lock_to_release = OpContext::W_LOCK; // we took it above
- }
+ RepGather *repop = simple_repop_create(fop->obc);
+ OpContext *ctx = repop->ctx;
+
+ ctx->on_finish = fop->on_flush;
+ fop->on_flush = NULL;
+
+ ctx->lock_to_release = OpContext::W_LOCK; // we took it above
ctx->at_version = get_next_version();
ctx->new_obs = obc->obs;
osd->logger->inc(l_osd_tier_clean);
- if (!fop->dup_ops.empty()) {
- dout(20) << __func__ << " queueing dups for " << ctx->at_version << dendl;
- list<OpRequestRef>& ls = waiting_for_ondisk[ctx->at_version];
+ if (!fop->dup_ops.empty() || fop->op) {
+ dout(20) << __func__ << " requeueing for " << ctx->at_version << dendl;
+ list<OpRequestRef> ls;
+ if (fop->op)
+ ls.push_back(fop->op);
ls.splice(ls.end(), fop->dup_ops);
+ requeue_ops(ls);
}
simple_repop_submit(repop);
void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue)
{
- dout(10) << __func__ << " " << fop->ctx->obc->obs.oi.soid << " tid "
+ dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
<< fop->objecter_tid << dendl;
if (fop->objecter_tid) {
Mutex::Locker l(osd->objecter_lock);
osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
}
if (requeue) {
- if (fop->ctx->op)
- requeue_op(fop->ctx->op);
+ if (fop->op)
+ requeue_op(fop->op);
requeue_ops(fop->dup_ops);
}
if (fop->blocking) {
- fop->ctx->obc->stop_block();
- kick_object_context_blocked(fop->ctx->obc);
+ fop->obc->stop_block();
+ kick_object_context_blocked(fop->obc);
+ }
+ if (fop->on_flush) {
+ Context *on_flush = fop->on_flush;
+ fop->on_flush = NULL;
+ on_flush->complete(-ECANCELED);
}
- flush_ops.erase(fop->ctx->obc->obs.oi.soid);
- close_op_ctx(fop->ctx, -ECANCELED);
+ flush_ops.erase(fop->obc->obs.oi.soid);
}
void ReplicatedPG::cancel_flush_ops(bool requeue)
// FIXME: flush anything dirty, regardless of what distribution of
// ages we expect.
- vector<OSDOp> ops;
- ceph_tid_t rep_tid = osd->get_tid();
- osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
- OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
- &obc->obs, obc->ssc, this);
- ctx->op_t = pgbackend->get_transaction();
- ctx->obc = obc;
- ctx->mtime = ceph_clock_now(cct);
- ctx->at_version = get_next_version();
- ctx->on_finish = new C_AgentFlushStartStop(this, obc->obs.oi.soid);
-
- int result = start_flush(ctx, false, NULL);
+ Context *on_flush = new C_AgentFlushStartStop(this, obc->obs.oi.soid);
+ int result = start_flush(
+ OpRequestRef(), obc, false, NULL,
+ on_flush);
if (result != -EINPROGRESS) {
+ on_flush->complete(result);
dout(10) << __func__ << " start_flush() failed " << obc->obs.oi
<< " with " << result << dendl;
osd->logger->inc(l_osd_agent_skip);
- if (result != -ECANCELED)
- close_op_ctx(ctx, result);
return false;
}