]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: support async reads on ec pools
authorSamuel Just <sam.just@inktank.com>
Fri, 6 Dec 2013 21:54:04 +0000 (13:54 -0800)
committerSamuel Just <sam.just@inktank.com>
Wed, 22 Jan 2014 22:39:17 +0000 (14:39 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 0b750420519a1b07eb8750c8a056cc9dff439e8f..cda7db383f373a55df55fcd266f65b6532f6d34c 100644 (file)
 
    virtual void objects_read_async(
      const hobject_t &hoid,
-     uint64_t off,
-     uint64_t len,
-     bufferlist *bl,
+     const list<pair<pair<uint64_t, uint64_t>,
+               pair<bufferlist*, Context*> > > &to_read,
      Context *on_complete) = 0;
  };
 
index 6023654162554e93d3b8f49ecb306885c45a4cd1..1d1809e794e98743f6021372f24e876529de1ff4 100644 (file)
@@ -347,12 +347,26 @@ struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
 };
 void ReplicatedBackend::objects_read_async(
   const hobject_t &hoid,
-  uint64_t off,
-  uint64_t len,
-  bufferlist *bl,
+  const list<pair<pair<uint64_t, uint64_t>,
+                 pair<bufferlist*, Context*> > > &to_read,
   Context *on_complete)
 {
-  int r = osd->store->read(coll, hoid, off, len, *bl);
+  int r = 0;
+  for (list<pair<pair<uint64_t, uint64_t>,
+                pair<bufferlist*, Context*> > >::const_iterator i =
+          to_read.begin();
+       i != to_read.end() && r >= 0;
+       ++i) {
+    int _r = osd->store->read(coll, hoid, i->first.first,
+                             i->first.second, *(i->second.first));
+    if (i->second.second) {
+      osd->gen_wq.queue(
+       get_parent()->bless_gencontext(
+         new AsyncReadCallback(_r, i->second.second)));
+    }
+    if (_r < 0)
+      r = _r;
+  }
   osd->gen_wq.queue(
     get_parent()->bless_gencontext(
       new AsyncReadCallback(r, on_complete)));
index e967d7dbcbd500fc8438e6afc422be809209ae1a..4f4ae6454af66721a589330957d8f8682546b671 100644 (file)
@@ -184,9 +184,8 @@ public:
 
   void objects_read_async(
     const hobject_t &hoid,
-    uint64_t off,
-    uint64_t len,
-    bufferlist *bl,
+    const list<pair<pair<uint64_t, uint64_t>,
+              pair<bufferlist*, Context*> > > &to_read,
     Context *on_complete);
 
 private:
index 013b350b9a6c482b92c53a499ff483666e3c072b..03bd4e4057a9a3e86afdedaab56fe05ca7207a55 100644 (file)
@@ -101,6 +101,41 @@ static void log_subop_stats(
   osd->logger->tinc(tag_lat, latency);
 }
 
+struct OnReadComplete : public Context {
+  ReplicatedPG *pg;
+  ReplicatedPG::OpContext *opcontext;
+  OnReadComplete(
+    ReplicatedPG *pg,
+    ReplicatedPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
+  void finish(int r) {
+    if (r < 0)
+      opcontext->async_read_result = r;
+    opcontext->finish_read(pg);
+  }
+  ~OnReadComplete() {}
+};
+
+// OpContext
+void ReplicatedPG::OpContext::start_async_reads(ReplicatedPG *pg)
+{
+  inflightreads = 1;
+  pg->pgbackend->objects_read_async(
+    obc->obs.oi.soid,
+    pending_async_reads,
+    new OnReadComplete(pg, this));
+  pending_async_reads.clear();
+}
+void ReplicatedPG::OpContext::finish_read(ReplicatedPG *pg)
+{
+  assert(inflightreads > 0);
+  --inflightreads;
+  if (async_reads_complete()) {
+    set<OpContext*>::iterator iter = pg->in_progress_async_reads.find(this);
+    assert(iter != pg->in_progress_async_reads.end());
+    pg->in_progress_async_reads.erase(iter);
+    pg->complete_read_ctx(async_read_result, this);
+  }
+}
 
 class CopyFromCallback: public ReplicatedPG::CopyCallback {
 public:
@@ -146,8 +181,6 @@ public:
   }
 };
 
-
-
 // ======================
 // PGBackend::Listener
 
@@ -1621,11 +1654,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
   // possible to construct an operation that does a read, does a guard
   // check (e.g., CMPXATTR), and then a write.  Then we either succeed
   // with the write, or return a CMPXATTR and the read value.
-  if ((ctx->op_t->empty() && !ctx->modify) || result < 0) {
-    // read.
-    ctx->reply->claim_op_out_data(ctx->ops);
-    ctx->reply->get_header().data_off = ctx->data_off;
-  } else {
+  if (!((ctx->op_t->empty() && !ctx->modify) || result < 0)) {
     // write.  normalize the result code.
     if (result > 0) {
       dout(20) << " zeroing write result code " << result << dendl;
@@ -1636,23 +1665,12 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 
   // read or error?
   if (ctx->op_t->empty() || result < 0) {
-    MOSDOpReply *reply = ctx->reply;
-    ctx->reply = NULL;
-
-    if (result >= 0) {
-      log_op_stats(ctx);
-      publish_stats_to_osd();
-
-      // on read, return the current object version
-      reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
-    } else if (result == -ENOENT) {
-      // on ENOENT, set a floor for what the next user version will be. 
-      reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
+    if (ctx->pending_async_reads.empty()) {
+      complete_read_ctx(result, ctx);
+    } else {
+      in_progress_async_reads.insert(ctx);
+      ctx->start_async_reads(this);
     }
-    
-    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-    osd->send_message_osd_client(reply, m->get_connection());
-    close_op_ctx(ctx);
     return;
   }
 
@@ -2718,6 +2736,16 @@ static int check_offset_and_length(uint64_t offset, uint64_t length, uint64_t ma
   return 0;
 }
 
+struct FillInExtent : public Context {
+  ceph_le64 *r;
+  FillInExtent(ceph_le64 *r) : r(r) {}
+  void finish(int _r) {
+    if (_r >= 0) {
+      *r = _r;
+    }
+  }
+};
+
 int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 {
   int result = 0;
@@ -2802,25 +2830,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     case CEPH_OSD_OP_READ:
       ++ctx->num_read;
       {
-       // read into a buffer
-       bufferlist bl;
-       int r = pgbackend->objects_read_sync(
-         soid, op.extent.offset, op.extent.length, &bl);
-       if (first_read) {
-         first_read = false;
-         ctx->data_off = op.extent.offset;
-       }
-       osd_op.outdata.claim_append(bl);
-       if (r >= 0) 
-         op.extent.length = r;
-       else {
-         result = r;
-         op.extent.length = 0;
-       }
-       ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
-       ctx->delta_stats.num_rd++;
-       dout(10) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
-
        __u32 seq = oi.truncate_seq;
        // are we beyond truncate_size?
        if ( (seq < op.extent.truncate_seq) &&
@@ -2832,15 +2841,35 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          unsigned trim = to-from;
 
          op.extent.length = op.extent.length - trim;
+       }
 
-         bufferlist keep;
-
-         // keep first part of osd_op.outdata; trim at truncation point
-         dout(10) << " obj " << soid << " seq " << seq
-                  << ": trimming overlap " << from << "~" << trim << dendl;
-         keep.substr_of(osd_op.outdata, 0, osd_op.outdata.length() - trim);
-          osd_op.outdata.claim(keep);
+       // read into a buffer
+       bufferlist bl;
+       if (pool.info.ec_pool()) {
+         ctx->pending_async_reads.push_back(
+           make_pair(
+             make_pair(op.extent.offset, op.extent.length),
+             make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
+         dout(10) << " async_read noted for " << soid << dendl;
+       } else {
+         int r = pgbackend->objects_read_sync(
+           soid, op.extent.offset, op.extent.length, &osd_op.outdata);
+         if (r >= 0)
+           op.extent.length = r;
+         else {
+           result = r;
+           op.extent.length = 0;
+         }
+         dout(10) << " read got " << r << " / " << op.extent.length
+                  << " bytes from obj " << soid << dendl;
+       }
+       if (first_read) {
+         first_read = false;
+         ctx->data_off = op.extent.offset;
        }
+       ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+       ctx->delta_stats.num_rd++;
+
       }
       break;
 
@@ -4971,6 +5000,32 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
   }
 }
 
+void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
+{
+  MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
+  assert(ctx->async_reads_complete());
+  ctx->reply->claim_op_out_data(ctx->ops);
+  ctx->reply->get_header().data_off = ctx->data_off;
+
+  MOSDOpReply *reply = ctx->reply;
+  ctx->reply = NULL;
+
+  if (result >= 0) {
+    log_op_stats(ctx);
+    publish_stats_to_osd();
+
+    // on read, return the current object version
+    reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
+  } else if (result == -ENOENT) {
+    // on ENOENT, set a floor for what the next user version will be.
+    reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
+  }
+
+  reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+  osd->send_message_osd_client(reply, m->get_connection());
+  close_op_ctx(ctx);
+}
+
 // ========================================================================
 // copyfrom
 
@@ -8600,6 +8655,12 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
 
   context_registry_on_change();
 
+  for (set<OpContext*>::iterator i = in_progress_async_reads.begin();
+       i != in_progress_async_reads.end();
+       in_progress_async_reads.erase(i++)) {
+    close_op_ctx(*i);
+  }
+
   cancel_copy_ops(is_primary());
   cancel_flush_ops(is_primary());
 
index 6cf4af74fe387bb72c5d6f38fe48d55f2bd2dff6..cb1c7327a0a13381090a9c02ff8c87ca82f6959f 100644 (file)
@@ -449,6 +449,18 @@ public:
       pending_attrs.clear();
     }
 
+    // pending async reads <off, len> -> <outbl, outr>
+    list<pair<pair<uint64_t, uint64_t>,
+             pair<bufferlist*, Context*> > > pending_async_reads;
+    int async_read_result;
+    unsigned inflightreads;
+    friend struct OnReadComplete;
+    void start_async_reads(ReplicatedPG *pg);
+    void finish_read(ReplicatedPG *pg);
+    bool async_reads_complete() {
+      return inflightreads == 0;
+    }
+
     ObjectModDesc mod_desc;
 
     enum { W_LOCK, R_LOCK, NONE } lock_to_release;
@@ -469,6 +481,8 @@ public:
       num_read(0),
       num_write(0),
       copy_cb(NULL),
+      async_read_result(0),
+      inflightreads(0),
       lock_to_release(NONE) {
       if (_ssc) {
        new_snapset = _ssc->snapset;
@@ -487,8 +501,16 @@ public:
       assert(lock_to_release == NONE);
       if (reply)
        reply->put();
+      for (list<pair<pair<uint64_t, uint64_t>,
+                    pair<bufferlist*, Context*> > >::iterator i =
+            pending_async_reads.begin();
+          i != pending_async_reads.end();
+          pending_async_reads.erase(i++)) {
+       delete i->second.second;
+      }
     }
   };
+  friend class OpContext;
 
   /*
    * State on the PG primary associated with the replicated mutation
@@ -869,6 +891,8 @@ protected:
   bool can_skip_promote(OpRequestRef op, ObjectContextRef obc);
 
   int prepare_transaction(OpContext *ctx);
+  set<OpContext*> in_progress_async_reads;
+  void complete_read_ctx(int result, OpContext *ctx);
   
   // pg on-disk content
   void check_local();