]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: tiering: add proxy read support
authorZhiqiang Wang <zhiqiang.wang@intel.com>
Tue, 18 Nov 2014 07:54:47 +0000 (15:54 +0800)
committerSage Weil <sage@redhat.com>
Sun, 11 Jan 2015 04:35:30 +0000 (20:35 -0800)
wip 9979

Signed-off-by: Zhiqiang Wang <zhiqiang.wang@intel.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 2de48e639c28e53b3780a5aeeca9a50b30ec2152..a35bcca7550b27fd1d008d958160e2cb5a3d1c45 100644 (file)
@@ -1904,6 +1904,101 @@ void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc)
   return;
 }
 
+struct C_ProxyRead : public Context {
+  ReplicatedPGRef pg;
+  hobject_t oid;
+  epoch_t last_peering_reset;
+  ceph_tid_t tid;
+  ReplicatedPG::ProxyReadOpRef prdop;
+  C_ProxyRead(ReplicatedPG *p, hobject_t o, epoch_t lpr,
+            const ReplicatedPG::ProxyReadOpRef& prd)
+    : pg(p), oid(o), last_peering_reset(lpr),
+      tid(0), prdop(prd)
+  {}
+  void finish(int r) {
+    pg->lock();
+    if (last_peering_reset == pg->get_last_peering_reset()) {
+      pg->finish_proxy_read(oid, tid, r);
+    }
+    pg->unlock();
+  }
+};
+
+void ReplicatedPG::do_proxy_read(OpRequestRef op)
+{
+  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+  object_locator_t oloc(m->get_object_locator());
+  oloc.pool = pool.info.tier_of;
+
+  hobject_t soid(m->get_oid(),
+                m->get_object_locator().key,
+                m->get_snapid(),
+                m->get_pg().ps(),
+                m->get_object_locator().get_pool(),
+                m->get_object_locator().nspace);
+  unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+  dout(10) << __func__ << " Start proxy read for " << *m << dendl;
+
+  ProxyReadOpRef prdop(new ProxyReadOp(op, soid, m->ops));
+
+  ObjectOperation obj_op;
+  obj_op.dup(prdop->ops);
+
+  C_ProxyRead *fin = new C_ProxyRead(this, soid, get_last_peering_reset(), prdop);
+  ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, obj_op,
+                                 m->get_snapid(), NULL,
+                                 flags, fin,
+                                 &prdop->user_version, &prdop->data_offset);
+  fin->tid = tid;
+  prdop->objecter_tid = tid;
+  proxyread_ops[tid] = prdop;
+  in_progress_proxy_reads[soid].push_back(op);
+}
+
+void ReplicatedPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
+{
+  dout(10) << __func__ << " " << oid << " tid " << tid
+          << " " << cpp_strerror(r) << dendl;
+
+  map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.find(tid);
+  if (p == proxyread_ops.end()) {
+    dout(10) << __func__ << " no proxyread_op found" << dendl;
+    return;
+  }
+  ProxyReadOpRef prdop = p->second;
+  if (tid != prdop->objecter_tid) {
+    dout(10) << __func__ << " tid " << tid << " != prdop " << prdop
+            << " tid " << prdop->objecter_tid << dendl;
+    return;
+  }
+  if (oid != prdop->soid) {
+    dout(10) << __func__ << " oid " << oid << " != prdop " << prdop
+            << " soid " << prdop->soid << dendl;
+    return;
+  }
+  proxyread_ops.erase(tid);
+
+  map<hobject_t, list<OpRequestRef> >::iterator q = in_progress_proxy_reads.find(oid);
+  if (q == in_progress_proxy_reads.end()) {
+    dout(10) << __func__ << " no in_progress_proxy_reads found" << dendl;
+    return;
+  }
+  assert(q->second.size());
+  OpRequestRef op = q->second.front();
+  assert(op == prdop->op);
+  q->second.pop_front();
+  if (q->second.size() == 0) {
+    in_progress_proxy_reads.erase(oid);
+  }
+
+  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+  OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
+  ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+  ctx->user_at_version = prdop->user_version;
+  ctx->data_off = prdop->data_offset;
+  complete_read_ctx(r, ctx);
+}
+
 class PromoteCallback: public ReplicatedPG::CopyCallback {
   ObjectContextRef obc;
   ReplicatedPG *pg;
@@ -5792,7 +5887,11 @@ void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
     publish_stats_to_osd();
 
     // on read, return the current object version
-    reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
+    if (ctx->obs) {
+      reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
+    } else {
+      reply->set_reply_versions(eversion_t(), ctx->user_at_version);
+    }
   } else if (result == -ENOENT) {
     // on ENOENT, set a floor for what the next user version will be.
     reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
index 62d9e11c0cf917869dd37b7e312651c62c222cdd..8307c86e7fefefba7baa1b1192a2e81577df569c 100644 (file)
@@ -210,6 +210,21 @@ public:
   friend class CopyFromCallback;
   friend class PromoteCallback;
 
+  struct ProxyReadOp {
+    OpRequestRef op;
+    hobject_t soid;
+    ceph_tid_t objecter_tid;
+    vector<OSDOp> &ops;
+    version_t user_version;
+    int data_offset;
+
+    ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
+      : op(_op), soid(oid),
+        objecter_tid(0), ops(_ops),
+       user_version(0), data_offset(0) { }
+  };
+  typedef boost::shared_ptr<ProxyReadOp> ProxyReadOpRef;
+
   struct FlushOp {
     ObjectContextRef obc;       ///< obc we are flushing
     OpRequestRef op;            ///< initiating op
@@ -594,6 +609,22 @@ public:
        snapset = &obc->ssc->snapset;
       }
     }
+    OpContext(OpRequestRef _op, osd_reqid_t _reqid,
+              vector<OSDOp>& _ops, ReplicatedPG *_pg) :
+      op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
+      modify(false), user_modify(false), undirty(false), cache_evict(false),
+      bytes_written(0), bytes_read(0), user_at_version(0),
+      current_osd_subop_num(0),
+      op_t(NULL),
+      data_off(0), reply(NULL), pg(_pg),
+      num_read(0),
+      num_write(0),
+      copy_cb(NULL),
+      async_read_result(0),
+      inflightreads(0),
+      lock_to_release(NONE),
+      on_finish(NULL),
+      release_snapset_obc(false) { }
     void reset_obs(ObjectContextRef obc) {
       new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
       if (obc->ssc) {
@@ -1306,6 +1337,15 @@ protected:
   bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
   int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
 
+  // -- proxyread --
+  map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
+  map<hobject_t, list<OpRequestRef> > in_progress_proxy_reads;
+
+  void do_proxy_read(OpRequestRef op);
+  void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
+
+  friend struct C_ProxyRead;
+
 public:
   ReplicatedPG(OSDService *o, OSDMapRef curmap,
               const PGPool &_pool, spg_t p);
index 79068d9bc2a841b009932a5da5ab9450a6fed2b6..349409b7d715cdf75d34790bd62ab63876b5c010 100644 (file)
@@ -2974,6 +2974,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     *op->objver = m->get_user_version();
   if (op->reply_epoch)
     *op->reply_epoch = m->get_map_epoch();
+  if (op->data_offset)
+    *op->data_offset = m->get_header().data_off;
 
   // per-op result demuxing
   vector<OSDOp> out_ops;
index 5bc9c5f1fd74cd94482c148edff700a150e49220..761ef6331659bdf5a71d084e880dc641478195af 100644 (file)
@@ -1004,6 +1004,18 @@ struct ObjectOperation {
     // sure older osds don't trip over an unsupported opcode.
     set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
   }
+
+  void dup(vector<OSDOp>& sops) {
+    ops = sops;
+    out_bl.resize(sops.size());
+    out_handler.resize(sops.size());
+    out_rval.resize(sops.size());
+    for (uint32_t i = 0; i < sops.size(); i++) {
+      out_bl[i] = &sops[i].outdata;
+      out_handler[i] = NULL;
+      out_rval[i] = &sops[i].rval;
+    }
+  }
 };
 
 
@@ -1165,8 +1177,10 @@ public:
     /// the very first OP of the series and released upon receiving the last OP reply.
     bool ctx_budgeted;
 
+    int *data_offset;
+
     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
-       int f, Context *ac, Context *co, version_t *ov) :
+       int f, Context *ac, Context *co, version_t *ov, int *offset = NULL) :
       session(NULL), incarnation(0),
       target(o, ol, f),
       con(NULL),
@@ -1179,7 +1193,8 @@ public:
       map_dne_bound(0),
       budgeted(false),
       should_resend(true),
-      ctx_budgeted(false) {
+      ctx_budgeted(false),
+      data_offset(offset) {
       ops.swap(op);
       
       /* initialize out_* to match op vector */
@@ -2017,8 +2032,8 @@ public:
   Op *prepare_read_op(const object_t& oid, const object_locator_t& oloc,
             ObjectOperation& op,
             snapid_t snapid, bufferlist *pbl, int flags,
-            Context *onack, version_t *objver = NULL) {
-    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver);
+            Context *onack, version_t *objver = NULL, int *data_offset = NULL) {
+    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset);
     o->priority = op.priority;
     o->snapid = snapid;
     o->outbl = pbl;
@@ -2030,8 +2045,8 @@ public:
   ceph_tid_t read(const object_t& oid, const object_locator_t& oloc,
             ObjectOperation& op,
             snapid_t snapid, bufferlist *pbl, int flags,
-            Context *onack, version_t *objver = NULL) {
-    Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver);
+            Context *onack, version_t *objver = NULL, int *data_offset = NULL) {
+    Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, data_offset);
     return op_submit(o);
   }
   ceph_tid_t pg_read(uint32_t hash, object_locator_t oloc,
@@ -2210,7 +2225,7 @@ public:
     return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, objver);
   }
 
-     
+
   // writes
   ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
                vector<OSDOp>& ops, utime_t mtime,