} else if (results->should_requeue) {
ctx->pg->requeue_op(ctx->op);
}
- ctx->pg->close_op_ctx(ctx);
+ ctx->pg->close_op_ctx(ctx, r);
}
}
map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(obc->obs.oi.soid);
if (p == flush_ops.end()) {
dout(10) << __func__ << " no flush in progress, aborting" << dendl;
- close_op_ctx(ctx);
+ close_op_ctx(ctx, -EINVAL);
osd->reply_op_error(op, -EINVAL);
return;
}
} else if (!get_rw_locks(ctx)) {
dout(20) << __func__ << " waiting for rw locks " << dendl;
op->mark_delayed("waiting for rw locks");
- close_op_ctx(ctx);
+ close_op_ctx(ctx, -EBUSY);
return;
}
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
<< " is lost" << dendl;
- close_op_ctx(ctx);
+ close_op_ctx(ctx, -ENFILE);
osd->reply_op_error(op, -ENFILE);
return;
}
if (!op->may_write() && !op->may_cache() && (!obc->obs.exists ||
obc->obs.oi.is_whiteout())) {
- close_op_ctx(ctx);
+ close_op_ctx(ctx, -ENOENT);
osd->reply_op_error(op, -ENOENT);
return;
}
if (already_complete(oldv)) {
reply_ctx(ctx, 0, oldv, entry->user_version);
} else {
- close_op_ctx(ctx);
+ close_op_ctx(ctx, -EBUSY);
if (m->wants_ack()) {
if (already_ack(oldv)) {
if (result == -EAGAIN) {
// clean up after the ctx
- close_op_ctx(ctx);
+ close_op_ctx(ctx, result);
return;
}
void ReplicatedPG::reply_ctx(OpContext *ctx, int r)
{
osd->reply_op_error(ctx->op, r);
- close_op_ctx(ctx);
+ close_op_ctx(ctx, r);
}
void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
{
osd->reply_op_error(ctx->op, r, v, uv);
- close_op_ctx(ctx);
+ close_op_ctx(ctx, r);
}
void ReplicatedPG::log_op_stats(OpContext *ctx)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
- close_op_ctx(ctx);
+ close_op_ctx(ctx, 0);
}
// ========================================================================
if (fop->ctx->op == ctx->op) {
// we couldn't take the write lock on a cache-try-flush before;
// now we are trying again for the lock.
- close_op_ctx(fop->ctx); // clean up the previous ctx and use the new one.
+ // clean up the previous ctx and use the new one.
+ close_op_ctx(fop->ctx, -EAGAIN);
fop->ctx = ctx;
return try_flush_mark_clean(fop);
}
if (!fop->blocking) {
// non-blocking: try to take the lock manually, since we don't
// have a ctx yet.
- dout(20) << __func__ << " taking write lock" << dendl;
- if (!obc->get_write(fop->ctx->op)) {
- dout(10) << __func__ << " waiting on lock" << dendl;
+ if (obc->get_write(fop->ctx->op)) {
+ dout(20) << __func__ << " took write lock" << dendl;
+ } else if (fop->ctx->op) {
+ dout(10) << __func__ << " waiting on write lock" << dendl;
return -EINPROGRESS; // will retry. this ctx is still alive!
+ } else {
+ dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
+ cancel_flush(fop, false);
+ return -ECANCELED;
}
} else {
dout(20) << __func__ << " already holding write lock: "
kick_object_context_blocked(fop->ctx->obc);
}
flush_ops.erase(fop->ctx->obc->obs.oi.soid);
- close_op_ctx(fop->ctx);
+ close_op_ctx(fop->ctx, -ECANCELED);
}
void ReplicatedPG::cancel_flush_ops(bool requeue)
{
dout(20) << __func__ << " " << *repop << dendl;
release_op_ctx_locks(repop->ctx);
+ repop->ctx->finish(0); // FIXME: return value here is sloppy
repop_map.erase(repop->rep_tid);
repop->put();
in_progress_async_reads.begin();
i != in_progress_async_reads.end();
in_progress_async_reads.erase(i++)) {
- close_op_ctx(i->second);
+ close_op_ctx(i->second, -ECANCELED);
requeue_op(i->first);
}