]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ec: add support for fast read on PGBackend/ECBackend async read
authorGuang Yang <yguang@yahoo-inc.com>
Wed, 15 Jul 2015 21:33:07 +0000 (21:33 +0000)
committerDavid Zafman <dzafman@redhat.com>
Thu, 27 Aug 2015 21:01:56 +0000 (14:01 -0700)
Extend the PGBackend::objects_read_async interface to support *fast read*,
add the implemenation for ECBackend, in which we issue redundant reads,
and use the first returned (to decode) to serve clients.

Signed-off-by: Guang Yang <yguang@yahoo-inc.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h

index 47f326efaf17679b3b08f5156b2ee4908cf9d9a4..50e8a5b33c52ad634e7da38ae2b317ff46cfbee5 100644 (file)
@@ -469,7 +469,8 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
   start_read_op(
     priority,
     m.reads,
-    OpRequestRef());
+    OpRequestRef(),
+    false);
 }
 
 void ECBackend::continue_recovery_op(
@@ -487,7 +488,7 @@ void ECBackend::continue_recovery_op(
       set<pg_shard_t> to_read;
       uint64_t recovery_max_chunk = get_recovery_chunk_size();
       int r = get_min_avail_to_read_shards(
-       op.hoid, want, true, &to_read);
+       op.hoid, want, true, false, &to_read);
       if (r != 0) {
        // we must have lost a recovery source
        assert(!op.recovery_progress.first);
@@ -1024,11 +1025,35 @@ shard_to_read_map.find(from);
 
   assert(rop.in_progress.count(from));
   rop.in_progress.erase(from);
+  bool is_complete = true;
   if (!rop.in_progress.empty()) {
-    dout(10) << __func__ << " readop not complete: " << rop << dendl;
-  } else {
-    dout(10) << __func__ << " readop complete: " << rop << dendl;
+    if (rop.do_redundant_reads) {
+      for (map<hobject_t, read_result_t>::const_iterator iter =
+          rop.complete.begin();
+        iter != rop.complete.end();
+        ++iter) {
+        set<int> have;
+        for (map<pg_shard_t, bufferlist>::const_iterator j =
+            iter->second.returned.front().get<2>().begin();
+          j != iter->second.returned.front().get<2>().end();
+          ++j) {
+          have.insert(j->first.shard);
+        }
+        set<int> want_to_read, dummy_minimum;
+        get_want_to_read_shards(&want_to_read);
+        if (ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum) < 0) {
+          is_complete = false;
+          break;
+        }
+      }
+    } else {
+      is_complete = false;
+    }
+  }
+  if (is_complete) {
     complete_read_op(rop, m);
+  } else {
+    dout(10) << __func__ << " readop not complete: " << rop << dendl;
   }
 }
 
@@ -1317,8 +1342,12 @@ int ECBackend::get_min_avail_to_read_shards(
   const hobject_t &hoid,
   const set<int> &want,
   bool for_recovery,
+  bool do_redundant_reads,
   set<pg_shard_t> *to_read)
 {
+  // Make sure we don't do redundant reads for recovery
+  assert(!for_recovery || !do_redundant_reads);
+
   map<hobject_t, set<pg_shard_t>, hobject_t::BitwiseComparator>::const_iterator miter =
     get_parent()->get_missing_loc_shards().find(hoid);
 
@@ -1380,6 +1409,10 @@ int ECBackend::get_min_avail_to_read_shards(
   if (r < 0)
     return r;
 
+  if (do_redundant_reads) {
+      need.swap(have);
+  } 
+
   if (!to_read)
     return 0;
 
@@ -1395,7 +1428,8 @@ int ECBackend::get_min_avail_to_read_shards(
 void ECBackend::start_read_op(
   int priority,
   map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read,
-  OpRequestRef _op)
+  OpRequestRef _op,
+  bool do_redundant_reads)
 {
   ceph_tid_t tid = get_parent()->get_tid();
   assert(!tid_to_read_map.count(tid));
@@ -1404,6 +1438,7 @@ void ECBackend::start_read_op(
   op.tid = tid;
   op.to_read.swap(to_read);
   op.op = _op;
+  op.do_redundant_reads = do_redundant_reads;
   dout(10) << __func__ << ": starting " << op << dendl;
 
   map<pg_shard_t, ECSubRead> messages;
@@ -1683,7 +1718,8 @@ void ECBackend::objects_read_async(
   const hobject_t &hoid,
   const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
                  pair<bufferlist*, Context*> > > &to_read,
-  Context *on_complete)
+  Context *on_complete,
+  bool fast_read)
 {
   in_progress_client_reads.push_back(ClientAsyncReadStatus(on_complete));
   CallClientContexts *c = new CallClientContexts(
@@ -1700,17 +1736,15 @@ void ECBackend::objects_read_async(
     offsets.push_back(boost::make_tuple(tmp.first, tmp.second, i->first.get<2>()));
   }
 
-  const vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
   set<int> want_to_read;
-  for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
-    int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
-    want_to_read.insert(chunk);
-  }
+  get_want_to_read_shards(&want_to_read);
+    
   set<pg_shard_t> shards;
   int r = get_min_avail_to_read_shards(
     hoid,
     want_to_read,
     false,
+    fast_read,
     &shards);
   assert(r == 0);
 
@@ -1728,7 +1762,8 @@ void ECBackend::objects_read_async(
   start_read_op(
     cct->_conf->osd_client_op_priority,
     for_read_op,
-    OpRequestRef());
+    OpRequestRef(),
+    fast_read);
   return;
 }
 
index e3378595ba45736a490999c2122d1eb7610743fc..8f5201c5d26be056c1670aec13c203909d08f1d8 100644 (file)
@@ -145,7 +145,8 @@ public:
     const hobject_t &hoid,
     const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
                    pair<bufferlist*, Context*> > > &to_read,
-    Context *on_complete);
+    Context *on_complete,
+    bool fast_read = false);
 
 private:
   friend struct ECRecoveryHandle;
@@ -154,6 +155,14 @@ private:
                        sinfo.get_stripe_width());
   }
 
+  void get_want_to_read_shards(set<int> *want_to_read) const {
+    const vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
+    for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
+      int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
+      want_to_read->insert(chunk);
+    }
+  }
+
   /**
    * Recovery
    *
@@ -284,6 +293,10 @@ public:
     int priority;
     ceph_tid_t tid;
     OpRequestRef op; // may be null if not on behalf of a client
+    // True if redundant reads are issued, false otherwise,
+    // this is useful to tradeoff some resources (redundant ops) for
+    // low latency read, especially on relatively idle cluster
+    bool do_redundant_reads;
 
     map<hobject_t, read_request_t, hobject_t::BitwiseComparator> to_read;
     map<hobject_t, read_result_t, hobject_t::BitwiseComparator> complete;
@@ -306,7 +319,8 @@ public:
   void start_read_op(
     int priority,
     map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read,
-    OpRequestRef op);
+    OpRequestRef op,
+    bool do_redundant_reads);
 
 
   /**
@@ -452,6 +466,7 @@ public:
     const hobject_t &hoid,     ///< [in] object
     const set<int> &want,      ///< [in] desired shards
     bool for_recovery,         ///< [in] true if we may use non-acting replicas
+    bool do_redundant_reads,   ///< [in] true if we want to issue redundant reads to reduce latency
     set<pg_shard_t> *to_read   ///< [out] shards to read
     ); ///< @return error code, 0 on success
 
index bb463ee07eb8c162efbdf5f0e0b14db6c05e76a5..52599942b439d0fba2c90c2bf89f364d25ed5b1b 100644 (file)
      const hobject_t &hoid,
      const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
                pair<bufferlist*, Context*> > > &to_read,
-     Context *on_complete) = 0;
+     Context *on_complete, bool fast_read = false) = 0;
 
    virtual bool scrub_supported() { return false; }
    void be_scan_list(
index 5ddc9fd311efce061cc5109525e367fafe077411..7d344e15df0f3bd261efa9cab24489ebb4205616 100644 (file)
@@ -278,8 +278,12 @@ void ReplicatedBackend::objects_read_async(
   const hobject_t &hoid,
   const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
                  pair<bufferlist*, Context*> > > &to_read,
-  Context *on_complete)
+  Context *on_complete,
+  bool fast_read)
 {
+  // There is no fast read implementation for replication backend yet
+  assert(!fast_read);
+
   int r = 0;
   for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
                 pair<bufferlist*, Context*> > >::const_iterator i =
index 155abfb881de87a3a33afb890818e7c061a4394d..a36007d42c287fc6f0121d58754d8dc74e003a5d 100644 (file)
@@ -159,7 +159,8 @@ public:
     const hobject_t &hoid,
     const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
               pair<bufferlist*, Context*> > > &to_read,
-    Context *on_complete);
+               Context *on_complete,
+               bool fast_read = false);
 
 private:
   // push