struct RefCountCallback : public Context {
public:
- PrimaryLogPG *pg;
PrimaryLogPG::OpContext *ctx;
OSDOp& osd_op;
- epoch_t last_peering_reset;
+ bool requeue = false;
- RefCountCallback(PrimaryLogPG *pg, PrimaryLogPG::OpContext *ctx,
- OSDOp &osd_op, epoch_t lpr)
- : pg(pg), ctx(ctx), osd_op(osd_op), last_peering_reset(lpr)
- {}
+ RefCountCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+ : ctx(ctx), osd_op(osd_op) {}
void finish(int r) override {
- std::scoped_lock locker{*pg};
- if (last_peering_reset == pg->get_last_peering_reset()) {
- if (r >= 0) {
- osd_op.rval = 0;
- pg->execute_ctx(ctx);
- } else {
- if (ctx->op) {
- pg->osd->reply_op_error(ctx->op, r);
- }
- pg->close_op_ctx(ctx);
+ // NB: caller must already have pg->lock held
+ if (r >= 0) {
+ osd_op.rval = 0;
+ ctx->pg->execute_ctx(ctx);
+ } else {
+ // on cancel simply toss op out,
+ // or requeue as requested
+ if (r != -ECANCELED) {
+ if (ctx->op)
+ ctx->pg->osd->reply_op_error(ctx->op, r);
+ } else if (requeue) {
+ if (ctx->op)
+ ctx->pg->requeue_op(ctx->op);
}
+ ctx->pg->close_op_ctx(ctx);
}
}
+ void set_requeue(bool rq) {
+ requeue = rq;
+ }
};
struct SetManifestFinisher : public PrimaryLogPG::OpFinisher {
}
};
+struct C_SetManifestRefCountDone : public Context {
+ RefCountCallback* cb;
+ hobject_t soid;
+ C_SetManifestRefCountDone(
+ RefCountCallback* cb, hobject_t soid) : cb(cb), soid(soid) {}
+ void finish(int r) override {
+ if (r == -ECANCELED)
+ return;
+ auto pg = cb->ctx->pg;
+ std::scoped_lock locker{*pg};
+ auto it = pg->manifest_ops.find(soid);
+ if (it == pg->manifest_ops.end()) {
+ // raced with cancel_manifest_ops
+ return;
+ }
+ pg->manifest_ops.erase(it);
+ cb->complete(r);
+ }
+};
+
+void PrimaryLogPG::cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids)
+{
+ dout(10) << __func__ << dendl;
+ auto p = manifest_ops.begin();
+ while (p != manifest_ops.end()) {
+ auto mop = p->second;
+ // cancel objecter op, if we can
+ if (mop->objecter_tid) {
+ tids->push_back(mop->objecter_tid);
+ mop->objecter_tid = 0;
+ }
+ mop->cb->set_requeue(requeue);
+ mop->cb->complete(-ECANCELED);
+ manifest_ops.erase(p++);
+ }
+}
+
void PrimaryLogPG::refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
- SnapContext snapc, bool get, Context *cb, uint64_t offset)
+ SnapContext snapc, bool get, RefCountCallback *cb, uint64_t offset)
{
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
CEPH_OSD_FLAG_RWORDERED;
obj_op.call("cas", "chunk_put", in);
}
- unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
- Context *c;
+ Context *c = nullptr;
if (cb) {
- c = new C_OnFinisher(cb, osd->objecter_finishers[n]);
- } else {
- c = NULL;
+ unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+ C_SetManifestRefCountDone *fin =
+ new C_SetManifestRefCountDone(cb, obc->obs.oi.soid);
+ c = new C_OnFinisher(fin, osd->objecter_finishers[n]);
}
- osd->objecter->mutate(
+ auto tid = osd->objecter->mutate(
soid.oid, oloc, obj_op, snapc,
ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
flags, c);
+ if (cb) {
+ manifest_ops[obc->obs.oi.soid] = std::make_shared<ManifestOp>(cb, tid);
+ }
}
void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
// start
ctx->op_finishers[ctx->current_osd_subop_num].reset(
new SetManifestFinisher(osd_op));
- RefCountCallback *fin = new RefCountCallback(
- this, ctx, osd_op, get_last_peering_reset());
+ RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
refcount_manifest(ctx->obc, target_oloc, target, SnapContext(),
true, fin, 0);
result = -EINPROGRESS;
// start
ctx->op_finishers[ctx->current_osd_subop_num].reset(
new SetManifestFinisher(osd_op));
- RefCountCallback *fin = new RefCountCallback(
- this, ctx, osd_op, get_last_peering_reset());
+ RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
refcount_manifest(ctx->obc, tgt_oloc, target, SnapContext(),
true, fin, src_offset);
result = -EINPROGRESS;
cancel_copy_ops(false, &tids);
cancel_flush_ops(false, &tids);
cancel_proxy_ops(false, &tids);
+ cancel_manifest_ops(false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
apply_and_flush_repops(false);
cancel_copy_ops(is_primary(), &tids);
cancel_flush_ops(is_primary(), &tids);
cancel_proxy_ops(is_primary(), &tids);
+ cancel_manifest_ops(is_primary(), &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
// requeue object waiters
class CopyFromCallback;
class PromoteCallback;
+struct RefCountCallback;
class PrimaryLogPG;
class PGLSFilter;
};
typedef std::shared_ptr<FlushOp> FlushOpRef;
+ friend struct RefCountCallback;
+ struct ManifestOp {
+ RefCountCallback *cb;
+ ceph_tid_t objecter_tid;
+
+ ManifestOp(RefCountCallback* cb, ceph_tid_t tid)
+ : cb(cb), objecter_tid(tid) {}
+ };
+ typedef std::shared_ptr<ManifestOp> ManifestOpRef;
+ map<hobject_t, ManifestOpRef> manifest_ops;
+
boost::scoped_ptr<PGBackend> pgbackend;
PGBackend *get_pgbackend() override {
return pgbackend.get();
uint64_t last_offset);
void handle_manifest_flush(hobject_t oid, ceph_tid_t tid, int r,
uint64_t offset, uint64_t last_offset, epoch_t lpr);
+ void cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids);
void refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
- SnapContext snapc, bool get, Context *cb, uint64_t offset);
+ SnapContext snapc, bool get, RefCountCallback *cb, uint64_t offset);
friend struct C_ProxyChunkRead;
friend class PromoteManifestCallback;