From: Zhiqiang Wang Date: Tue, 18 Nov 2014 07:54:47 +0000 (+0800) Subject: osd: tiering: add proxy read support X-Git-Tag: v0.92~33^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=70d3d08a0b2ac2f922ee8eaf5a7f261b919e6dd4;p=ceph.git osd: tiering: add proxy read support wip 9979 Signed-off-by: Zhiqiang Wang --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 2de48e639c28..a35bcca7550b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1904,6 +1904,101 @@ void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc) return; } +struct C_ProxyRead : public Context { + ReplicatedPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + ceph_tid_t tid; + ReplicatedPG::ProxyReadOpRef prdop; + C_ProxyRead(ReplicatedPG *p, hobject_t o, epoch_t lpr, + const ReplicatedPG::ProxyReadOpRef& prd) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0), prdop(prd) + {} + void finish(int r) { + pg->lock(); + if (last_peering_reset == pg->get_last_peering_reset()) { + pg->finish_proxy_read(oid, tid, r); + } + pg->unlock(); + } +}; + +void ReplicatedPG::do_proxy_read(OpRequestRef op) +{ + MOSDOp *m = static_cast(op->get_req()); + object_locator_t oloc(m->get_object_locator()); + oloc.pool = pool.info.tier_of; + + hobject_t soid(m->get_oid(), + m->get_object_locator().key, + m->get_snapid(), + m->get_pg().ps(), + m->get_object_locator().get_pool(), + m->get_object_locator().nspace); + unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY; + dout(10) << __func__ << " Start proxy read for " << *m << dendl; + + ProxyReadOpRef prdop(new ProxyReadOp(op, soid, m->ops)); + + ObjectOperation obj_op; + obj_op.dup(prdop->ops); + + C_ProxyRead *fin = new C_ProxyRead(this, soid, get_last_peering_reset(), prdop); + ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, obj_op, + m->get_snapid(), NULL, + flags, fin, + &prdop->user_version, &prdop->data_offset); + fin->tid = tid; + prdop->objecter_tid = tid; + proxyread_ops[tid] = prdop; + in_progress_proxy_reads[soid].push_back(op); +} + +void ReplicatedPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r) +{ + dout(10) << __func__ << " " << oid << " tid " << tid + << " " << cpp_strerror(r) << dendl; + + map::iterator p = proxyread_ops.find(tid); + if (p == proxyread_ops.end()) { + dout(10) << __func__ << " no proxyread_op found" << dendl; + return; + } + ProxyReadOpRef prdop = p->second; + if (tid != prdop->objecter_tid) { + dout(10) << __func__ << " tid " << tid << " != prdop " << prdop + << " tid " << prdop->objecter_tid << dendl; + return; + } + if (oid != prdop->soid) { + dout(10) << __func__ << " oid " << oid << " != prdop " << prdop + << " soid " << prdop->soid << dendl; + return; + } + proxyread_ops.erase(tid); + + map >::iterator q = in_progress_proxy_reads.find(oid); + if (q == in_progress_proxy_reads.end()) { + dout(10) << __func__ << " no in_progress_proxy_reads found" << dendl; + return; + } + assert(q->second.size()); + OpRequestRef op = q->second.front(); + assert(op == prdop->op); + q->second.pop_front(); + if (q->second.size() == 0) { + in_progress_proxy_reads.erase(oid); + } + + MOSDOp *m = static_cast(op->get_req()); + OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this); + ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); + ctx->user_at_version = prdop->user_version; + ctx->data_off = prdop->data_offset; + complete_read_ctx(r, ctx); +} + class PromoteCallback: public ReplicatedPG::CopyCallback { ObjectContextRef obc; ReplicatedPG *pg; @@ -5792,7 +5887,11 @@ void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx) publish_stats_to_osd(); // on read, return the current object version - reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version); + if (ctx->obs) { + reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version); + } else { + reply->set_reply_versions(eversion_t(), ctx->user_at_version); + } } else if (result == -ENOENT) { // on ENOENT, set a floor for what the next user version will be. reply->set_enoent_reply_versions(info.last_update, info.last_user_version); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 62d9e11c0cf9..8307c86e7fef 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -210,6 +210,21 @@ public: friend class CopyFromCallback; friend class PromoteCallback; + struct ProxyReadOp { + OpRequestRef op; + hobject_t soid; + ceph_tid_t objecter_tid; + vector &ops; + version_t user_version; + int data_offset; + + ProxyReadOp(OpRequestRef _op, hobject_t oid, vector& _ops) + : op(_op), soid(oid), + objecter_tid(0), ops(_ops), + user_version(0), data_offset(0) { } + }; + typedef boost::shared_ptr ProxyReadOpRef; + struct FlushOp { ObjectContextRef obc; ///< obc we are flushing OpRequestRef op; ///< initiating op @@ -594,6 +609,22 @@ public: snapset = &obc->ssc->snapset; } } + OpContext(OpRequestRef _op, osd_reqid_t _reqid, + vector& _ops, ReplicatedPG *_pg) : + op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0), + modify(false), user_modify(false), undirty(false), cache_evict(false), + bytes_written(0), bytes_read(0), user_at_version(0), + current_osd_subop_num(0), + op_t(NULL), + data_off(0), reply(NULL), pg(_pg), + num_read(0), + num_write(0), + copy_cb(NULL), + async_read_result(0), + inflightreads(0), + lock_to_release(NONE), + on_finish(NULL), + release_snapset_obc(false) { } void reset_obs(ObjectContextRef obc) { new_obs = ObjectState(obc->obs.oi, obc->obs.exists); if (obc->ssc) { @@ -1306,6 +1337,15 @@ protected: bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata); int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter); + // -- proxyread -- + map proxyread_ops; + map > in_progress_proxy_reads; + + void do_proxy_read(OpRequestRef op); + void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r); + + friend struct C_ProxyRead; + public: ReplicatedPG(OSDService *o, OSDMapRef curmap, const PGPool &_pool, spg_t p); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 79068d9bc2a8..349409b7d715 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -2974,6 +2974,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) *op->objver = m->get_user_version(); if (op->reply_epoch) *op->reply_epoch = m->get_map_epoch(); + if (op->data_offset) + *op->data_offset = m->get_header().data_off; // per-op result demuxing vector out_ops; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 5bc9c5f1fd74..761ef6331659 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1004,6 +1004,18 @@ struct ObjectOperation { // sure older osds don't trip over an unsupported opcode. set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK); } + + void dup(vector& sops) { + ops = sops; + out_bl.resize(sops.size()); + out_handler.resize(sops.size()); + out_rval.resize(sops.size()); + for (uint32_t i = 0; i < sops.size(); i++) { + out_bl[i] = &sops[i].outdata; + out_handler[i] = NULL; + out_rval[i] = &sops[i].rval; + } + } }; @@ -1165,8 +1177,10 @@ public: /// the very first OP of the series and released upon receiving the last OP reply. bool ctx_budgeted; + int *data_offset; + Op(const object_t& o, const object_locator_t& ol, vector& op, - int f, Context *ac, Context *co, version_t *ov) : + int f, Context *ac, Context *co, version_t *ov, int *offset = NULL) : session(NULL), incarnation(0), target(o, ol, f), con(NULL), @@ -1179,7 +1193,8 @@ public: map_dne_bound(0), budgeted(false), should_resend(true), - ctx_budgeted(false) { + ctx_budgeted(false), + data_offset(offset) { ops.swap(op); /* initialize out_* to match op vector */ @@ -2017,8 +2032,8 @@ public: Op *prepare_read_op(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, snapid_t snapid, bufferlist *pbl, int flags, - Context *onack, version_t *objver = NULL) { - Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver); + Context *onack, version_t *objver = NULL, int *data_offset = NULL) { + Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; @@ -2030,8 +2045,8 @@ public: ceph_tid_t read(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, snapid_t snapid, bufferlist *pbl, int flags, - Context *onack, version_t *objver = NULL) { - Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver); + Context *onack, version_t *objver = NULL, int *data_offset = NULL) { + Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, data_offset); return op_submit(o); } ceph_tid_t pg_read(uint32_t hash, object_locator_t oloc, @@ -2210,7 +2225,7 @@ public: return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, objver); } - + // writes ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc, vector& ops, utime_t mtime,