]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ECBackend: integrate cache and rmw pipeline
authorSamuel Just <sjust@redhat.com>
Tue, 15 Nov 2016 23:48:21 +0000 (15:48 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 17 Nov 2016 18:41:33 +0000 (10:41 -0800)
Implements the rmw pipeline and integrates the cache.

HashInfo now maintains a projected size for use during the planning
phase of the pipeline.

(Doesn't build without subsequent patches, not worth stubbing out
the interfaces)

Signed-off-by: Samuel Just <sjust@redhat.com>
ceph-erasure-code-corpus
src/common/buffer.cc
src/include/buffer.h
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/ECTransaction.cc
src/osd/ECTransaction.h
src/osd/ECUtil.cc
src/osd/ECUtil.h
src/osd/ReplicatedPG.cc
src/osd/osd_types.h

index 0b00610443a916fabc6668c03337f64d1f773ec9..b5c863495c16975478aa5fc2ca33293c2e0c1a5f 160000 (submodule)
@@ -1 +1 @@
-Subproject commit 0b00610443a916fabc6668c03337f64d1f773ec9
+Subproject commit b5c863495c16975478aa5fc2ca33293c2e0c1a5f
index 892c17f8fe7899b8f54caee830d3d5ec972a2795..bf72ed7d5ad2735f37139da711d30bf64838afe9 100644 (file)
@@ -1870,6 +1870,14 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
        append("\n", 1);
     }
   }
+
+  void buffer::list::prepend_zero(unsigned len)
+  {
+    ptr bp(len);
+    bp.zero(false);
+    _len += len;
+    _buffers.emplace_front(std::move(bp));
+  }
   
   void buffer::list::append_zero(unsigned len)
   {
index 4c05e628dbee1647c7f2e2d7f582d1c80d8a7432..1e30e3e2b6d1afb112741e362650ebe7f75a13af 100644 (file)
@@ -832,6 +832,7 @@ namespace buffer CEPH_BUFFER_API {
     void append(const list& bl);
     void append(std::istream& in);
     void append_zero(unsigned len);
+    void prepend_zero(unsigned len);
     
     /*
      * get a char
index 389200dfc79514c912ae9597af0c00750069cfcf..fe2260ae08607bf11ce3067c848d0c93f7476bca 100644 (file)
@@ -38,6 +38,18 @@ struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
   list<ECBackend::RecoveryOp> ops;
 };
 
+ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
+  switch (rhs.pipeline_state) {
+  case ECBackend::pipeline_state_t::CACHE_VALID:
+    return lhs << "CACHE_VALID";
+  case ECBackend::pipeline_state_t::CACHE_INVALID:
+    return lhs << "CACHE_INVALID";
+  default:
+    assert(0 == "invalid pipeline state");
+  }
+  return lhs; // unreachable
+}
+
 static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
 {
   lhs << "[";
@@ -132,8 +144,17 @@ ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
     lhs << " client_op=";
     rhs.client_op->get_req()->print(lhs);
   }
-  lhs << " pending_commit=" << rhs.pending_commit
+  lhs << " roll_forward_to=" << rhs.roll_forward_to
+      << " temp_added=" << rhs.temp_added
+      << " pending_commit=" << rhs.pending_commit
+      << " temp_cleared=" << rhs.temp_cleared
+      << " pending_read=" << rhs.pending_read
+      << " remote_read=" << rhs.remote_read
+      << " remote_read_result=" << rhs.remote_read_result
       << " pending_apply=" << rhs.pending_apply
+      << " pending_commit=" << rhs.pending_commit
+      << " plan.to_read=" << rhs.plan.to_read
+      << " plan.will_write=" << rhs.plan.will_write
       << ")";
   return lhs;
 }
@@ -509,8 +530,18 @@ void ECBackend::continue_recovery_op(
       op.state = RecoveryOp::READING;
       assert(!op.recovery_progress.data_complete);
       set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
+      uint64_t from = op.recovery_progress.data_recovered_to;
+      uint64_t amount = get_recovery_chunk_size();
+
+      if (op.recovery_progress.first && op.obc) {
+       /* We've got the attrs and the hinfo, might as well use them */
+       op.hinfo = get_hash_info(op.hoid);
+       assert(op.hinfo);
+       op.xattrs = op.obc->attr_cache;
+       ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
+      }
+
       set<pg_shard_t> to_read;
-      uint64_t recovery_max_chunk = get_recovery_chunk_size();
       int r = get_min_avail_to_read_shards(
        op.hoid, want, true, false, &to_read);
       if (r != 0) {
@@ -526,11 +557,12 @@ void ECBackend::continue_recovery_op(
        this,
        op.hoid,
        op.recovery_progress.data_recovered_to,
-       recovery_max_chunk,
+       amount,
        to_read,
-       op.recovery_progress.first);
-      op.extent_requested = make_pair(op.recovery_progress.data_recovered_to,
-                                     recovery_max_chunk);
+       op.recovery_progress.first && !op.obc);
+      op.extent_requested = make_pair(
+       from,
+       amount);
       dout(10) << __func__ << ": IDLE return " << op << dendl;
       return;
     }
@@ -898,8 +930,8 @@ void ECBackend::handle_sub_write(
       new SubWriteApplied(this, msg, op.tid, op.at_version)));
   vector<ObjectStore::Transaction> tls;
   tls.reserve(2);
-  tls.push_back(std::move(localt));
   tls.push_back(std::move(op.t));
+  tls.push_back(std::move(localt));
   get_parent()->queue_transactions(tls, msg);
 }
 
@@ -914,12 +946,15 @@ void ECBackend::handle_sub_read(
       i != op.to_read.end();
       ++i) {
     int r = 0;
-    ECUtil::HashInfoRef hinfo = get_hash_info(i->first);
-    if (!hinfo) {
-      r = -EIO;
-      get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n";
-      dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
-      goto error;
+    ECUtil::HashInfoRef hinfo;
+    if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+      hinfo = get_hash_info(i->first);
+      if (!hinfo) {
+       r = -EIO;
+       get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n";
+       dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
+       goto error;
+      }
     }
     for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j =
           i->second.begin(); j != i->second.end(); ++j) {
@@ -948,22 +983,25 @@ void ECBackend::handle_sub_read(
          );
       }
 
-      // This shows that we still need deep scrub because large enough files
-      // are read in sections, so the digest check here won't be done here.
-      // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
-      // the state of our chunk in case other chunks could substitute.
-      if ((bl.length() == hinfo->get_total_chunk_size()) &&
-         (j->get<0>() == 0)) {
-       dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
-       bufferhash h(-1);
-       h << bl;
-       if (h.digest() != hinfo->get_chunk_hash(shard)) {
-         get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
-                 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n";
-         dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
-                 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
-         r = -EIO;
-         goto error;
+      if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+       // This shows that we still need deep scrub because large enough files
+       // are read in sections, so the digest check here won't be done here.
+       // Do NOT check osd_read_eio_on_bad_digest here.  We need to report
+       // the state of our chunk in case other chunks could substitute.
+       assert(hinfo->has_chunk_hash());
+       if ((bl.length() == hinfo->get_total_chunk_size()) &&
+           (j->get<0>() == 0)) {
+         dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
+         bufferhash h(-1);
+         h << bl;
+         if (h.digest() != hinfo->get_chunk_hash(shard)) {
+           get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
+                                      << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n";
+           dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
+                   << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
+           r = -EIO;
+           goto error;
+         }
        }
       }
     }
@@ -1012,7 +1050,18 @@ void ECBackend::handle_sub_write_reply(
     assert(i->second.pending_apply.count(from));
     i->second.pending_apply.erase(from);
   }
-  check_op(&(i->second));
+
+  if (i->second.pending_apply.empty() && i->second.on_all_applied) {
+    dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
+    i->second.on_all_applied->complete(0);
+    i->second.on_all_applied = 0;
+  }
+  if (i->second.pending_commit.empty() && i->second.on_all_commit) {
+    dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
+    i->second.on_all_commit->complete(0);
+    i->second.on_all_commit = 0;
+  }
+  check_ops();
 }
 
 void ECBackend::handle_sub_read_reply(
@@ -1275,7 +1324,18 @@ void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
 void ECBackend::on_change()
 {
   dout(10) << __func__ << dendl;
-  writing.clear();
+
+  completed_to = eversion_t();
+  committed_to = eversion_t();
+  pipeline_state.clear();
+  waiting_reads.clear();
+  waiting_state.clear();
+  waiting_commit.clear();
+  for (auto &&op: tid_to_op_map) {
+    cache.release_write_pin(op.second.pin);
+  }
+  tid_to_op_map.clear();
+
   tid_to_op_map.clear();
   for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
        i != tid_to_read_map.end();
@@ -1290,12 +1350,6 @@ void ECBackend::on_change()
     }
   }
   tid_to_read_map.clear();
-  for (list<ClientAsyncReadStatus>::iterator i = in_progress_client_reads.begin();
-       i != in_progress_client_reads.end();
-       ++i) {
-    delete i->on_complete;
-    i->on_complete = NULL;
-  }
   in_progress_client_reads.clear();
   shard_to_read_map.clear();
   clear_recovery_state();
@@ -1334,8 +1388,9 @@ void ECBackend::dump_recovery_info(Formatter *f) const
 
 void ECBackend::submit_transaction(
   const hobject_t &hoid,
+  const object_stat_sum_t &delta_stats,
   const eversion_t &at_version,
-  PGTransactionUPtr &&_t,
+  PGTransactionUPtr &&t,
   const eversion_t &trim_to,
   const eversion_t &roll_forward_to,
   const vector<pg_log_entry_t> &log_entries,
@@ -1351,9 +1406,10 @@ void ECBackend::submit_transaction(
   assert(!tid_to_op_map.count(tid));
   Op *op = &(tid_to_op_map[tid]);
   op->hoid = hoid;
+  op->delta_stats = delta_stats;
   op->version = at_version;
   op->trim_to = trim_to;
-  op->roll_forward_to = roll_forward_to;
+  op->roll_forward_to = MAX(roll_forward_to, committed_to);
   op->log_entries = log_entries;
   std::swap(op->updated_hit_set_history, hset_history);
   op->on_local_applied_sync = on_local_applied_sync;
@@ -1363,33 +1419,22 @@ void ECBackend::submit_transaction(
   op->reqid = reqid;
   op->client_op = client_op;
   
-  op->t = std::move(_t);
-
-  set<hobject_t, hobject_t::BitwiseComparator> need_hinfos;
-  ECTransaction::get_append_objects(*(op->t), &need_hinfos);
-  for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = need_hinfos.begin();
-       i != need_hinfos.end();
-       ++i) {
-    ECUtil::HashInfoRef ref = get_hash_info(*i, false);
-    if (!ref) {
-      derr << __func__ << ": get_hash_info(" << *i << ")"
-          << " returned a null pointer and there is no "
-          << " way to recover from such an error in this "
-          << " context" << dendl;
-      assert(0);
-    }
-    op->unstable_hash_infos.insert(
-      make_pair(
-       *i,
-       ref));
-  }
-
   dout(10) << __func__ << ": op " << *op << " starting" << dendl;
-  start_write(op);
-  writing.push_back(op);
+  start_rmw(op, std::move(t));
   dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
 }
 
+void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
+  if (!waiting_state.empty()) {
+    waiting_state.back().on_write.emplace_back(std::move(cb));
+  } else if (!waiting_reads.empty()) {
+    waiting_reads.back().on_write.emplace_back(std::move(cb));
+  } else {
+    // Nothing earlier in the pipeline, just call it
+    cb();
+  }
+}
+
 int ECBackend::get_min_avail_to_read_shards(
   const hobject_t &hoid,
   const set<int> &want,
@@ -1532,19 +1577,15 @@ void ECBackend::start_read_op(
   op.for_recovery = for_recovery;
   dout(10) << __func__ << ": starting " << op << dendl;
   do_read_op(
-    op,
-    to_read);
+    op);
 }
 
-void ECBackend::do_read_op(
-  ReadOp &op,
-  map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read)
+void ECBackend::do_read_op(ReadOp &op)
 {
   int priority = op.priority;
   ceph_tid_t tid = op.tid;
-  op.to_read.swap(to_read);
 
-  dout(10) << __func__ << ": starting additional " << op << dendl;
+  dout(10) << __func__ << ": starting read " << op << dendl;
 
   map<pg_shard_t, ECSubRead> messages;
   for (map<hobject_t, read_request_t,
@@ -1552,6 +1593,9 @@ void ECBackend::do_read_op(
        i != op.to_read.end();
        ++i) {
     bool need_attrs = i->second.want_attrs;
+    list<boost::tuple<
+      uint64_t, uint64_t, map<pg_shard_t, bufferlist> > > &reslist =
+      op.complete[i->first].returned;
     for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
         j != i->second.need.end();
         ++j) {
@@ -1566,14 +1610,21 @@ void ECBackend::do_read_op(
           i->second.to_read.begin();
         j != i->second.to_read.end();
         ++j) {
+      reslist.push_back(
+       boost::make_tuple(
+         j->get<0>(),
+         j->get<1>(),
+         map<pg_shard_t, bufferlist>()));
       pair<uint64_t, uint64_t> chunk_off_len =
        sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
       for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
           k != i->second.need.end();
           ++k) {
-       messages[*k].to_read[i->first].push_back(boost::make_tuple(chunk_off_len.first,
-                                                                   chunk_off_len.second,
-                                                                   j->get<2>()));
+       messages[*k].to_read[i->first].push_back(
+         boost::make_tuple(
+           chunk_off_len.first,
+           chunk_off_len.second,
+           j->get<2>()));
       }
       assert(!need_attrs);
     }
@@ -1599,7 +1650,7 @@ void ECBackend::do_read_op(
       msg,
       get_parent()->get_epoch());
   }
-  dout(10) << __func__ << ": started additional " << op << dendl;
+  dout(10) << __func__ << ": started " << op << dendl;
 }
 
 ECUtil::HashInfoRef ECBackend::get_hash_info(
@@ -1654,33 +1705,130 @@ ECUtil::HashInfoRef ECBackend::get_hash_info(
   return ref;
 }
 
-void ECBackend::check_op(Op *op)
+void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
 {
-  if (op->pending_apply.empty() && op->on_all_applied) {
-    dout(10) << __func__ << " Calling on_all_applied on " << *op << dendl;
-    op->on_all_applied->complete(0);
-    op->on_all_applied = 0;
-  }
-  if (op->pending_commit.empty() && op->on_all_commit) {
-    dout(10) << __func__ << " Calling on_all_commit on " << *op << dendl;
-    op->on_all_commit->complete(0);
-    op->on_all_commit = 0;
-  }
-  if (op->pending_apply.empty() && op->pending_commit.empty()) {
-    // done!
-    assert(writing.front() == op);
-    dout(10) << __func__ << " Completing " << *op << dendl;
-    writing.pop_front();
-    tid_to_op_map.erase(op->tid);
-  }
-  for (map<ceph_tid_t, Op>::iterator i = tid_to_op_map.begin();
-       i != tid_to_op_map.end();
-       ++i) {
-    dout(20) << __func__ << " tid " << i->first <<": " << i->second << dendl;
+  assert(op);
+
+  op->plan = ECTransaction::get_write_plan(
+    sinfo,
+    std::move(t),
+    [&](const hobject_t &i) {
+      ECUtil::HashInfoRef ref = get_hash_info(i, false);
+      if (!ref) {
+       derr << __func__ << ": get_hash_info(" << i << ")"
+            << " returned a null pointer and there is no "
+            << " way to recover from such an error in this "
+            << " context" << dendl;
+       assert(0);
+      }
+      return ref;
+    },
+    get_parent()->get_dpp());
+
+  dout(10) << __func__ << ": " << *op << dendl;
+
+  waiting_state.push_back(*op);
+  check_ops();
+}
+
+bool ECBackend::try_state_to_reads()
+{
+  if (waiting_state.empty())
+    return false;
+
+  Op *op = &(waiting_state.front());
+  if (op->requires_rmw() && pipeline_state.cache_invalid()) {
+    dout(20) << __func__ << ": blocking " << *op
+            << " because it requires an rmw and the cache is invalid"
+            << pipeline_state
+            << dendl;
+    return false;
+  }
+
+  if (op->invalidates_cache()) {
+    dout(20) << __func__ << ": invalidating cache after this op"
+            << dendl;
+    pipeline_state.invalidate();
   }
+
+  waiting_state.pop_front();
+  waiting_reads.push_back(*op);
+
+  if (op->requires_rmw() || pipeline_state.caching_enabled()) {
+    cache.open_write_pin(op->pin);
+
+    extent_set empty;
+    for (auto &&hpair: op->plan.will_write) {
+      auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
+      const extent_set &to_read_plan =
+       to_read_plan_iter == op->plan.to_read.end() ?
+       empty :
+       to_read_plan_iter->second;
+
+      extent_set remote_read = cache.reserve_extents_for_rmw(
+       hpair.first,
+       op->pin,
+       hpair.second,
+       to_read_plan);
+
+      extent_set pending_read = to_read_plan;
+      pending_read.subtract(remote_read);
+
+      if (!remote_read.empty()) {
+       op->remote_read[hpair.first] = std::move(remote_read);
+      }
+      if (!pending_read.empty()) {
+       op->pending_read[hpair.first] = std::move(pending_read);
+      }
+    }
+  } else {
+    op->remote_read = op->plan.to_read;
+  }
+
+  dout(10) << __func__ << ": " << *op << dendl;
+
+  if (!op->remote_read.empty()) {
+    objects_read_async_no_cache(
+      op->remote_read,
+      [this, op](hobject_t::bitwisemap<extent_map> &&results) {
+       op->remote_read_result = std::move(results);
+       check_ops();
+      });
+  }
+
+  return true;
 }
 
-void ECBackend::start_write(Op *op) {
+bool ECBackend::try_reads_to_commit()
+{
+  if (waiting_reads.empty())
+    return false;
+  Op *op = &(waiting_reads.front());
+  if (op->read_in_progress())
+    return false;
+  waiting_reads.pop_front();
+  waiting_commit.push_back(*op);
+
+  dout(10) << __func__ << ": starting commit on " << *op << dendl;
+  dout(20) << __func__ << ": " << cache << dendl;
+
+  get_parent()->apply_stats(
+    op->hoid,
+    op->delta_stats);
+
+  if (pipeline_state.caching_enabled() || op->requires_rmw()) {
+    for (auto &&hpair: op->pending_read) {
+      op->remote_read_result[hpair.first].insert(
+       cache.get_remaining_extents_for_rmw(
+         hpair.first,
+         op->pin,
+         hpair.second));
+    }
+    op->pending_read.clear();
+  } else {
+    assert(op->pending_read.empty());
+  }
+
   map<shard_id_t, ObjectStore::Transaction> trans;
   for (set<pg_shard_t>::const_iterator i =
         get_parent()->get_actingbackfill_shards().begin();
@@ -1688,20 +1836,56 @@ void ECBackend::start_write(Op *op) {
        ++i) {
     trans[i->shard];
   }
-  ObjectStore::Transaction empty;
 
-  ECTransaction::generate_transactions(
-    *(op->t),
-    op->unstable_hash_infos,
-    ec_impl,
-    get_parent()->get_info().pgid.pgid,
-    sinfo,
-    op->log_entries,
-    &trans,
-    &(op->temp_added),
-    &(op->temp_cleared));
+  hobject_t::bitwisemap<extent_map> written;
+  if (op->plan.t) {
+    ECTransaction::generate_transactions(
+      op->plan,
+      ec_impl,
+      get_parent()->get_info().pgid.pgid,
+      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN),
+      sinfo,
+      op->remote_read_result,
+      op->log_entries,
+      &written,
+      &trans,
+      &(op->temp_added),
+      &(op->temp_cleared),
+      get_parent()->get_dpp());
+  }
+
+  dout(20) << __func__ << ": " << cache << dendl;
+  dout(20) << __func__ << ": written: " << written << dendl;
+  dout(20) << __func__ << ": op: " << *op << dendl;
+
+  if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+    for (auto &&i: op->log_entries) {
+      if (i.requires_kraken()) {
+       derr << __func__ << ": log entry " << i << " requires kraken"
+            << " but overwrites are not enabled!" << dendl;
+       assert(0);
+      }
+    }
+  }
+
+  hobject_t::bitwisemap<extent_set> written_set;
+  for (auto &&i: written) {
+    written_set[i.first] = i.second.get_interval_set();
+  }
+  dout(20) << __func__ << ": written_set: " << written_set << dendl;
+  assert(written_set == op->plan.will_write);
+
+  if (pipeline_state.caching_enabled() || op->requires_rmw()) {
+    for (auto &&hpair: written) {
+      dout(20) << __func__ << ": " << hpair << dendl;
+      cache.present_rmw_update(hpair.first, op->pin, hpair.second);
+    }
+  }
+  op->remote_read.clear();
+  op->remote_read_result.clear();
 
   dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
+  ObjectStore::Transaction empty;
 
   for (set<pg_shard_t>::const_iterator i =
         get_parent()->get_actingbackfill_shards().begin();
@@ -1747,6 +1931,69 @@ void ECBackend::start_write(Op *op) {
        i->osd, r, get_parent()->get_epoch());
     }
   }
+
+  for (auto i = op->on_write.begin();
+       i != op->on_write.end();
+       op->on_write.erase(i++)) {
+    (*i)();
+  }
+
+  return true;
+}
+
+bool ECBackend::try_finish_rmw()
+{
+  if (waiting_commit.empty())
+    return false;
+  Op *op = &(waiting_commit.front());
+  if (op->write_in_progress())
+    return false;
+  waiting_commit.pop_front();
+
+  dout(10) << __func__ << ": " << *op << dendl;
+  dout(20) << __func__ << ": " << cache << dendl;
+
+  if (op->roll_forward_to > completed_to)
+    completed_to = op->roll_forward_to;
+  if (op->version > committed_to)
+    committed_to = op->version;
+
+  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+    if (op->version > get_parent()->get_log().get_can_rollback_to() &&
+       waiting_reads.empty() &&
+       waiting_commit.empty()) {
+      // submit a dummy transaction to kick the rollforward
+      auto tid = get_parent()->get_tid();
+      Op *nop = &(tid_to_op_map[tid]);
+      nop->hoid = op->hoid;
+      nop->trim_to = op->trim_to;
+      nop->roll_forward_to = op->version;
+      nop->tid = tid;
+      nop->reqid = op->reqid;
+      waiting_reads.push_back(*nop);
+    }
+  }
+
+  if (pipeline_state.caching_enabled() || op->requires_rmw()) {
+    cache.release_write_pin(op->pin);
+  }
+  tid_to_op_map.erase(op->tid);
+
+  if (waiting_reads.empty() &&
+      waiting_commit.empty()) {
+    pipeline_state.clear();
+    dout(20) << __func__ << ": clearing pipeline_state "
+            << pipeline_state
+            << dendl;
+  }
+  return true;
+}
+
+void ECBackend::check_ops()
+{
+  while (try_state_to_reads() ||
+        try_reads_to_commit() ||
+        try_finish_rmw());
 }
 
 int ECBackend::objects_read_sync(
@@ -1759,31 +2006,137 @@ int ECBackend::objects_read_sync(
   return -EOPNOTSUPP;
 }
 
+void ECBackend::objects_read_async(
+  const hobject_t &hoid,
+  const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+             pair<bufferlist*, Context*> > > &to_read,
+  Context *on_complete,
+  bool fast_read)
+{
+  hobject_t::bitwisemap<std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
+    reads;
+
+  uint32_t flags = 0;
+  extent_set es;
+  for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+        pair<bufferlist*, Context*> > >::const_iterator i =
+        to_read.begin();
+       i != to_read.end();
+       ++i) {
+    pair<uint64_t, uint64_t> tmp =
+      sinfo.offset_len_to_stripe_bounds(
+       make_pair(i->first.get<0>(), i->first.get<1>()));
+
+    extent_set esnew;
+    esnew.insert(tmp.first, tmp.second);
+    es.union_of(esnew);
+    flags |= i->first.get<2>();
+  }
+
+  if (!es.empty()) {
+    auto &offsets = reads[hoid];
+    for (auto j = es.begin();
+        j != es.end();
+        ++j) {
+      offsets.push_back(
+       boost::make_tuple(
+         j.get_start(),
+         j.get_len(),
+         flags));
+    }
+  }
+
+  struct cb {
+    ECBackend *ec;
+    hobject_t hoid;
+    list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+             pair<bufferlist*, Context*> > > to_read;
+    unique_ptr<Context> on_complete;
+    cb(const cb&) = delete;
+    cb(cb &&) = default;
+    cb(ECBackend *ec,
+       const hobject_t &hoid,
+       const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+                  pair<bufferlist*, Context*> > > &to_read,
+       Context *on_complete)
+      : ec(ec),
+       hoid(hoid),
+       to_read(to_read),
+       on_complete(on_complete) {}
+    void operator()(hobject_t::bitwisemap<extent_map> &&results) {
+      auto dpp = ec->get_parent()->get_dpp();
+      ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
+                        << dendl;
+      ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
+                        << dendl;
+
+      auto &got = results[hoid];
+
+      for (auto &&read: to_read) {
+       assert(read.second.first);
+       uint64_t offset = read.first.get<0>();
+       uint64_t length = read.first.get<1>();
+       auto range = got.get_containing_range(offset, length);
+       assert(range.first != range.second);
+       assert(range.first.get_off() <= offset);
+       assert(
+         (offset + length) <=
+         (range.first.get_off() + range.first.get_len()));
+       read.second.first->substr_of(
+         range.first.get_val(),
+         offset - range.first.get_off(),
+         length);
+       if (read.second.second) {
+         read.second.second->complete(length);
+         read.second.second = nullptr;
+       }
+      }
+      to_read.clear();
+      if (on_complete) {
+       on_complete.release()->complete(0);
+      }
+    }
+    ~cb() {
+      for (auto &&i: to_read) {
+       delete i.second.second;
+      }
+      to_read.clear();
+    }
+  };
+  objects_read_and_reconstruct(
+    reads,
+    fast_read,
+    make_gen_lambda_context<hobject_t::bitwisemap<extent_map> &&, cb>(
+      cb(this,
+        hoid,
+        to_read,
+        on_complete)));
+}
+
 struct CallClientContexts :
   public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+  hobject_t hoid;
   ECBackend *ec;
   ECBackend::ClientAsyncReadStatus *status;
-  list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-           pair<bufferlist*, Context*> > > to_read;
+  list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
   CallClientContexts(
+    hobject_t hoid,
     ECBackend *ec,
     ECBackend::ClientAsyncReadStatus *status,
-    const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-                   pair<bufferlist*, Context*> > > &to_read)
-    : ec(ec), status(status), to_read(to_read) {}
+    const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
+    : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
   void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
     ECBackend::read_result_t &res = in.second;
+    extent_map result;
     if (res.r != 0)
       goto out;
     assert(res.returned.size() == to_read.size());
     assert(res.r == 0);
     assert(res.errors.empty());
-    for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-                  pair<bufferlist*, Context*> > >::iterator i = to_read.begin();
-        i != to_read.end();
-        to_read.erase(i++)) {
+    for (auto &&read: to_read) {
       pair<uint64_t, uint64_t> adjusted =
-       ec->sinfo.offset_len_to_stripe_bounds(make_pair(i->first.get<0>(), i->first.get<1>()));
+       ec->sinfo.offset_len_to_stripe_bounds(
+         make_pair(read.get<0>(), read.get<1>()));
       assert(res.returned.front().get<0>() == adjusted.first &&
             res.returned.front().get<1>() == adjusted.second);
       map<int, bufferlist> to_decode;
@@ -1803,82 +2156,64 @@ struct CallClientContexts :
         res.r = r;
         goto out;
       }
-      assert(i->second.second);
-      assert(i->second.first);
-      i->second.first->substr_of(
+      bufferlist trimmed;
+      trimmed.substr_of(
        bl,
-       i->first.get<0>() - adjusted.first,
-       MIN(i->first.get<1>(), bl.length() - (i->first.get<0>() - adjusted.first)));
-      if (i->second.second) {
-       i->second.second->complete(i->second.first->length());
-      }
+       read.get<0>() - adjusted.first,
+       MIN(read.get<1>(),
+           bl.length() - (read.get<0>() - adjusted.first)));
+      result.insert(
+       read.get<0>(), trimmed.length(), std::move(trimmed));
       res.returned.pop_front();
     }
 out:
-    status->complete = true;
-    list<ECBackend::ClientAsyncReadStatus> &ip =
-      ec->in_progress_client_reads;
-    while (ip.size() && ip.front().complete) {
-      if (ip.front().on_complete) {
-       ip.front().on_complete->complete(res.r);
-       ip.front().on_complete = NULL;
-      }
-      ip.pop_front();
-    }
-  }
-  ~CallClientContexts() {
-    for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-                  pair<bufferlist*, Context*> > >::iterator i = to_read.begin();
-        i != to_read.end();
-        to_read.erase(i++)) {
-      delete i->second.second;
-    }
+    status->complete_object(hoid, std::move(result));
+    ec->kick_reads();
   }
 };
 
-void ECBackend::objects_read_async(
-  const hobject_t &hoid,
-  const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-                 pair<bufferlist*, Context*> > > &to_read,
-  Context *on_complete,
-  bool fast_read)
+void ECBackend::objects_read_and_reconstruct(
+  const hobject_t::bitwisemap<
+    std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+  > &reads,
+  bool fast_read,
+  GenContextURef<hobject_t::bitwisemap<extent_map> &&> &&func)
 {
-  in_progress_client_reads.push_back(ClientAsyncReadStatus(on_complete));
-  CallClientContexts *c = new CallClientContexts(
-    this, &(in_progress_client_reads.back()), to_read);
-
-  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets;
-  pair<uint64_t, uint64_t> tmp;
-  for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
-                pair<bufferlist*, Context*> > >::const_iterator i =
-        to_read.begin();
-       i != to_read.end();
-       ++i) {
-    tmp = sinfo.offset_len_to_stripe_bounds(make_pair(i->first.get<0>(), i->first.get<1>()));
-    offsets.push_back(boost::make_tuple(tmp.first, tmp.second, i->first.get<2>()));
+  in_progress_client_reads.emplace_back(
+    reads.size(), std::move(func));
+  if (!reads.size()) {
+    kick_reads();
+    return;
   }
 
   set<int> want_to_read;
   get_want_to_read_shards(&want_to_read);
     
-  set<pg_shard_t> shards;
-  int r = get_min_avail_to_read_shards(
-    hoid,
-    want_to_read,
-    false,
-    fast_read,
-    &shards);
-  assert(r == 0);
-
   map<hobject_t, read_request_t, hobject_t::BitwiseComparator> for_read_op;
-  for_read_op.insert(
-    make_pair(
-      hoid,
-      read_request_t(
-       offsets,
-       shards,
-       false,
-       c)));
+  for (auto &&to_read: reads) {
+    set<pg_shard_t> shards;
+    int r = get_min_avail_to_read_shards(
+      to_read.first,
+      want_to_read,
+      false,
+      fast_read,
+      &shards);
+    assert(r == 0);
+
+    CallClientContexts *c = new CallClientContexts(
+      to_read.first,
+      this,
+      &(in_progress_client_reads.back()),
+      to_read.second);
+    for_read_op.insert(
+      make_pair(
+       to_read.first,
+       read_request_t(
+         to_read.second,
+         shards,
+         false,
+         c)));
+  }
 
   start_read_op(
     CEPH_MSG_PRIO_DEFAULT,
@@ -1907,8 +2242,11 @@ int ECBackend::send_all_remaining_reads(
 
   dout(10) << __func__ << " Read remaining shards " << shards << dendl;
 
-  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets = rop.to_read.find(hoid)->second.to_read;
-  GenContext<pair<RecoveryMessages *, read_result_t& > &> *c = rop.to_read.find(hoid)->second.cb;
+  // TODOSAM: this doesn't seem right
+  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
+    rop.to_read.find(hoid)->second.to_read;
+  GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
+    rop.to_read.find(hoid)->second.cb;
 
   map<hobject_t, read_request_t, hobject_t::BitwiseComparator> for_read_op;
   for_read_op.insert(
@@ -1920,7 +2258,8 @@ int ECBackend::send_all_remaining_reads(
        false,
        c)));
 
-  do_read_op(rop, for_read_op);
+  rop.to_read.swap(for_read_op);
+  do_read_op(rop);
   return 0;
 }
 
@@ -2009,26 +2348,35 @@ void ECBackend::be_deep_scrub(
     o.digest_present = false;
     return;
   } else {
-    if (hinfo->get_total_chunk_size() != pos) {
-      dout(0) << "_scan_list  " << poid << " got incorrect size on read" << dendl;
-      o.ec_size_mismatch = true;
-      return;
-    }
+    if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+      assert(hinfo->has_chunk_hash());
+      if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
+       dout(0) << "_scan_list  " << poid << " got incorrect hash on read" << dendl;
+       o.ec_hash_mismatch = true;
+       return;
+      }
 
-    if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
-      dout(0) << "_scan_list  " << poid << " got incorrect hash on read" << dendl;
-      o.ec_hash_mismatch = true;
-      return;
-    }
+      if (hinfo->get_total_chunk_size() != pos) {
+       dout(0) << "_scan_list  " << poid << " got incorrect size on read" << dendl;
+       o.ec_hash_mismatch = true;
+       return;
+      }
 
-    /* We checked above that we match our own stored hash.  We cannot
-     * send a hash of the actual object, so instead we simply send
-     * our locally stored hash of shard 0 on the assumption that if
-     * we match our chunk hash and our recollection of the hash for
-     * chunk 0 matches that of our peers, there is likely no corruption.
-     */
-    o.digest = hinfo->get_chunk_hash(0);
-    o.digest_present = true;
+      /* We checked above that we match our own stored hash.  We cannot
+       * send a hash of the actual object, so instead we simply send
+       * our locally stored hash of shard 0 on the assumption that if
+       * we match our chunk hash and our recollection of the hash for
+       * chunk 0 matches that of our peers, there is likely no corruption.
+       */
+      o.digest = hinfo->get_chunk_hash(0);
+      o.digest_present = true;
+    } else {
+      /* Hack! We must be using partial overwrites, and partial overwrites
+       * don't support deep-scrub yet
+       */
+      o.digest = 0;
+      o.digest_present = true;
+    }
   }
 
   o.omap_digest = seed;
index fc008fe4b3a501b8fc4b370919c354bfb8c4f68c..3689d118789e42bd9bcbea9cad14337d86c4b510 100644 (file)
 #ifndef ECBACKEND_H
 #define ECBACKEND_H
 
+#include <boost/intrusive/set.hpp>
+#include <boost/intrusive/list.hpp>
+
 #include "OSD.h"
 #include "PGBackend.h"
 #include "erasure-code/ErasureCodeInterface.h"
 #include "ECUtil.h"
 #include "ECTransaction.h"
+#include "ExtentCache.h"
 
 //forward declaration
 struct ECSubWrite;
@@ -88,8 +92,11 @@ public:
 
   void dump_recovery_info(Formatter *f) const;
 
+  void call_write_ordered(std::function<void(void)> &&cb) override;
+
   void submit_transaction(
     const hobject_t &hoid,
+    const object_stat_sum_t &delta_stats,
     const eversion_t &at_version,
     PGTransactionUPtr &&t,
     const eversion_t &trim_to,
@@ -129,12 +136,36 @@ public:
    * ensures that we won't ever have to restart a client initiated read in
    * check_recovery_sources.
    */
+  void objects_read_and_reconstruct(
+    const hobject_t::bitwisemap<
+      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+    > &reads,
+    bool fast_read,
+    GenContextURef<hobject_t::bitwisemap<extent_map> &&> &&func);
+
   friend struct CallClientContexts;
   struct ClientAsyncReadStatus {
-    bool complete;
-    Context *on_complete;
-    explicit ClientAsyncReadStatus(Context *on_complete)
-    : complete(false), on_complete(on_complete) {}
+    unsigned objects_to_read;
+    GenContextURef<hobject_t::bitwisemap<extent_map> &&> func;
+    hobject_t::bitwisemap<extent_map> results;
+    explicit ClientAsyncReadStatus(
+      unsigned objects_to_read,
+      GenContextURef<hobject_t::bitwisemap<extent_map> &&> &&func)
+      : objects_to_read(objects_to_read), func(std::move(func)) {}
+    void complete_object(
+      const hobject_t &hoid,
+      extent_map &&buffers) {
+      assert(objects_to_read);
+      --objects_to_read;
+      assert(!results.count(hoid));
+      results.emplace(hoid, std::move(buffers));
+    }
+    bool is_complete() const {
+      return objects_to_read == 0;
+    }
+    void run() {
+      func.release()->complete(std::move(results));
+    }
   };
   list<ClientAsyncReadStatus> in_progress_client_reads;
   void objects_read_async(
@@ -144,6 +175,32 @@ public:
     Context *on_complete,
     bool fast_read = false);
 
+  template <typename Func>
+  void objects_read_async_no_cache(
+    const hobject_t::bitwisemap<extent_set> &to_read,
+    Func &&on_complete) {
+    hobject_t::bitwisemap<
+      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
+    for (auto &&hpair: to_read) {
+      auto &l = _to_read[hpair.first];
+      for (auto extent: hpair.second) {
+       l.emplace_back(extent.first, extent.second, 0);
+      }
+    }
+    objects_read_and_reconstruct(
+      _to_read,
+      false,
+      make_gen_lambda_context<hobject_t::bitwisemap<extent_map> &&, Func>(
+       std::forward<Func>(on_complete)));
+  }
+  void kick_reads() {
+    while (in_progress_client_reads.size() &&
+          in_progress_client_reads.front().is_complete()) {
+      in_progress_client_reads.front().run();
+      in_progress_client_reads.pop_front();
+    }
+  }
+
 private:
   friend struct ECRecoveryHandle;
   uint64_t get_recovery_chunk_size() const {
@@ -221,7 +278,7 @@ private:
     }
 
     // must be filled if state == WRITING
-    map<shard_id_t, bufferlist> returned_data;
+    map<int, bufferlist> returned_data;
     map<string, bufferlist> xattrs;
     ECUtil::HashInfoRef hinfo;
     ObjectContextRef obc;
@@ -237,6 +294,24 @@ private:
   friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs);
   map<hobject_t, RecoveryOp, hobject_t::BitwiseComparator> recovery_ops;
 
+  void continue_recovery_op(
+    RecoveryOp &op,
+    RecoveryMessages *m);
+  void dispatch_recovery_messages(RecoveryMessages &m, int priority);
+  friend struct OnRecoveryReadComplete;
+  void handle_recovery_read_complete(
+    const hobject_t &hoid,
+    boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
+    boost::optional<map<string, bufferlist> > attrs,
+    RecoveryMessages *m);
+  void handle_recovery_push(
+    PushOp &op,
+    RecoveryMessages *m);
+  void handle_recovery_push_reply(
+    PushReplyOp &op,
+    pg_shard_t from,
+    RecoveryMessages *m);
+
 public:
   /**
    * Low level async read mechanism
@@ -319,8 +394,7 @@ public:
     OpRequestRef op,
     bool do_redundant_reads, bool for_recovery);
 
-  void do_read_op(ReadOp &rop,
-    map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read);
+  void do_read_op(ReadOp &rop);
   int send_all_remaining_reads(
     const hobject_t &hoid,
     ReadOp &rop);
@@ -339,59 +413,122 @@ public:
    * completions. Thus, callbacks and completion are called in order
    * on the writing list.
    */
-  struct Op {
+  struct Op : boost::intrusive::list_base_hook<> {
+    /// From submit_transaction caller, decribes operation
     hobject_t hoid;
+    object_stat_sum_t delta_stats;
     eversion_t version;
     eversion_t trim_to;
-    eversion_t roll_forward_to;
-    vector<pg_log_entry_t> log_entries;
-    map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> obc_map;
     boost::optional<pg_hit_set_history_t> updated_hit_set_history;
-    Context *on_local_applied_sync;
-    Context *on_all_applied;
-    Context *on_all_commit;
+    vector<pg_log_entry_t> log_entries;
     ceph_tid_t tid;
     osd_reqid_t reqid;
-    OpRequestRef client_op;
 
-    std::unique_ptr<PGTransaction> t;
+    eversion_t roll_forward_to; /// Soon to be generated internally
+
+    /// Ancillary also provided from submit_transaction caller
+    map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> obc_map;
+
+    /// see call_write_ordered
+    std::list<std::function<void(void)> > on_write;
 
+    /// Generated internally
     set<hobject_t, hobject_t::BitwiseComparator> temp_added;
     set<hobject_t, hobject_t::BitwiseComparator> temp_cleared;
 
+    ECTransaction::WritePlan plan;
+    bool requires_rmw() const { return !plan.to_read.empty(); }
+    bool invalidates_cache() const { return plan.invalidates_cache; }
+
+    /// In progress read state;
+    hobject_t::bitwisemap<extent_set> pending_read; // subset already being read
+    hobject_t::bitwisemap<extent_set> remote_read;  // subset we must read
+    hobject_t::bitwisemap<extent_map> remote_read_result;
+    bool read_in_progress() const {
+      return !remote_read.empty() && remote_read_result.empty();
+    }
+
+    /// In progress write state
     set<pg_shard_t> pending_commit;
     set<pg_shard_t> pending_apply;
+    bool write_in_progress() const {
+      return !pending_commit.empty() || !pending_apply.empty();
+    }
+
+    /// optional, may be null, for tracking purposes
+    OpRequestRef client_op;
+
+    /// pin for cache
+    ExtentCache::write_pin pin;
 
-    map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> unstable_hash_infos;
+    /// Callbacks
+    Context *on_local_applied_sync = nullptr;
+    Context *on_all_applied = nullptr;
+    Context *on_all_commit = nullptr;
     ~Op() {
       delete on_local_applied_sync;
       delete on_all_applied;
       delete on_all_commit;
     }
   };
+  using op_list = boost::intrusive::list<Op>;
   friend ostream &operator<<(ostream &lhs, const Op &rhs);
 
-  void continue_recovery_op(
-    RecoveryOp &op,
-    RecoveryMessages *m);
+  ExtentCache cache;
+  map<ceph_tid_t, Op> tid_to_op_map; /// Owns Op structure
+
+  /**
+   * We model the possible rmw states as a set of waitlists.
+   * All writes at this time complete in order, so a write blocked
+   * at waiting_state blocks all writes behind it as well (same for
+   * other states).
+   *
+   * Future work: We can break this up into a per-object pipeline
+   * (almost).  First, provide an ordering token to submit_transaction
+   * and require that all operations within a single transaction take
+   * place on a subset of hobject_t space partitioned by that token
+   * (the hashid seem about right to me -- even works for temp objects
+   * if you recall that a temp object created for object head foo will
+   * only ever be referenced by other transactions on foo and aren't
+   * reused).  Next, factor this part into a class and maintain one per
+   * ordering token.  Next, fixup ReplicatedPG's repop queue to be
+   * partitioned by ordering token.  Finally, refactor the op pipeline
+   * so that the log entries passed into submit_tranaction aren't
+   * versioned.  We can't assign versions to them until we actually
+   * submit the operation.  That's probably going to be the hard part.
+   */
+  class pipeline_state_t {
+    enum {
+      CACHE_VALID = 0,
+      CACHE_INVALID = 1
+    } pipeline_state = CACHE_VALID;
+  public:
+    bool caching_enabled() const {
+      return pipeline_state == CACHE_VALID;
+    }
+    bool cache_invalid() const {
+      return !caching_enabled();
+    }
+    void invalidate() {
+      pipeline_state = CACHE_INVALID;
+    }
+    void clear() {
+      pipeline_state = CACHE_VALID;
+    }
+    friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+  } pipeline_state;
 
-  void dispatch_recovery_messages(RecoveryMessages &m, int priority);
-  friend struct OnRecoveryReadComplete;
-  void handle_recovery_read_complete(
-    const hobject_t &hoid,
-    boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
-    boost::optional<map<string, bufferlist> > attrs,
-    RecoveryMessages *m);
-  void handle_recovery_push(
-    PushOp &op,
-    RecoveryMessages *m);
-  void handle_recovery_push_reply(
-    PushReplyOp &op,
-    pg_shard_t from,
-    RecoveryMessages *m);
 
-  map<ceph_tid_t, Op> tid_to_op_map; /// lists below point into here
-  list<Op*> writing;
+  op_list waiting_state;        /// writes waiting on pipe_state
+  op_list waiting_reads;        /// writes waiting on partial stripe reads
+  op_list waiting_commit;       /// writes waiting on initial commit
+  eversion_t completed_to;
+  eversion_t committed_to;
+  void start_rmw(Op *op, PGTransactionUPtr &&t);
+  bool try_state_to_reads();
+  bool try_reads_to_commit();
+  bool try_finish_rmw();
+  void check_ops();
 
   CephContext *cct;
   ErasureCodeInterfaceRef ec_impl;
@@ -454,8 +591,6 @@ public:
                                    const map<string,bufferptr> *attr = NULL);
 
   friend struct ReadCB;
-  void check_op(Op *op);
-  void start_write(Op *op);
 public:
   ECBackend(
     PGBackend::Listener *pg,
@@ -503,5 +638,6 @@ public:
   void _failed_push(const hobject_t &hoid,
     pair<RecoveryMessages *, ECBackend::read_result_t &> &in);
 };
+ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs);
 
 #endif
index 27657504b58cbe3674c0d875e37478922fde353e..f808162caaf405d91aa21f75ddebd07b4a8b80df 100644 (file)
 #include "os/ObjectStore.h"
 #include "common/inline_variant.h"
 
-void ECTransaction::get_append_objects(
-  const PGTransaction &t,
-  set<hobject_t, hobject_t::BitwiseComparator> *out)
-{
-  for (auto &&i: t.op_map) {
-    out->insert(i.first);
-    hobject_t source;
-    if (i.second.has_source(&source))
-      out->insert(source);
-  }
-}
 
-void append(
+void encode_and_write(
   pg_t pgid,
   const hobject_t &oid,
   const ECUtil::stripe_info_t &sinfo,
   ErasureCodeInterfaceRef &ecimpl,
   const set<int> &want,
   uint64_t offset,
-  bufferlist &bl,
+  bufferlist bl,
   uint32_t flags,
   ECUtil::HashInfoRef hinfo,
-  map<shard_id_t, ObjectStore::Transaction> *transactions) {
-
+  extent_map &written,
+  map<shard_id_t, ObjectStore::Transaction> *transactions,
+  DoutPrefixProvider *dpp) {
+  const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
+  assert(sinfo.logical_offset_is_stripe_aligned(offset));
+  assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
   assert(bl.length());
-  assert(offset % sinfo.get_stripe_width() == 0);
-  assert(
-    sinfo.aligned_logical_offset_to_chunk_offset(offset) ==
-    hinfo->get_total_chunk_size());
-  map<int, bufferlist> buffers;
 
-  // align
-  if (bl.length() % sinfo.get_stripe_width())
-    bl.append_zero(
-      sinfo.get_stripe_width() -
-      ((offset + bl.length()) % sinfo.get_stripe_width()));
+  map<int, bufferlist> buffers;
   int r = ECUtil::encode(
     sinfo, ecimpl, bl, want, &buffers);
+  assert(r == 0);
 
-  hinfo->append(
-    sinfo.aligned_logical_offset_to_chunk_offset(offset),
-    buffers);
-  bufferlist hbuf;
-  ::encode(*hinfo, hbuf);
+  written.insert(offset, bl.length(), bl);
+
+  ldpp_dout(dpp, 20) << __func__ << ": " << oid
+                    << " new_size "
+                    << offset + bl.length()
+                    << dendl;
+
+  if (offset >= before_size) {
+    assert(offset == before_size);
+    hinfo->append(
+      sinfo.aligned_logical_offset_to_chunk_offset(offset),
+      buffers);
+  }
 
-  assert(r == 0);
   for (auto &&i : *transactions) {
     assert(buffers.count(i.first));
     bufferlist &enc_bl = buffers[i.first];
-    i.second.set_alloc_hint(
-      coll_t(spg_t(pgid, i.first)),
-      ghobject_t(oid, ghobject_t::NO_GEN, i.first),
-      0, 0,
-      CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
-      CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+    if (offset >= before_size) {
+      i.second.set_alloc_hint(
+       coll_t(spg_t(pgid, i.first)),
+       ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+       0, 0,
+       CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
+       CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+    }
     i.second.write(
       coll_t(spg_t(pgid, i.first)),
       ghobject_t(oid, ghobject_t::NO_GEN, i.first),
@@ -85,28 +79,45 @@ void append(
       enc_bl.length(),
       enc_bl,
       flags);
-    i.second.setattr(
-      coll_t(spg_t(pgid, i.first)),
-      ghobject_t(oid, ghobject_t::NO_GEN, i.first),
-      ECUtil::get_hinfo_key(),
-      hbuf);
   }
 }
 
+bool ECTransaction::requires_overwrite(
+  uint64_t prev_size,
+  const PGTransaction::ObjectOperation &op) {
+  // special handling for truncates to 0
+  if (op.truncate && op.truncate->first == 0)
+    return false;
+  return op.is_none() &&
+    ((!op.buffer_updates.empty() &&
+      (op.buffer_updates.begin().get_off() < prev_size)) ||
+     (op.truncate &&
+      (op.truncate->first < prev_size)));
+}
+
 void ECTransaction::generate_transactions(
-  PGTransaction &t,
-  map<
-    hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
-    > &hash_infos,
+  WritePlan &plan,
   ErasureCodeInterfaceRef &ecimpl,
   pg_t pgid,
+  bool legacy_log_entries,
   const ECUtil::stripe_info_t &sinfo,
+  const hobject_t::bitwisemap<extent_map> &partial_extents,
   vector<pg_log_entry_t> &entries,
+  hobject_t::bitwisemap<extent_map> *written_map,
   map<shard_id_t, ObjectStore::Transaction> *transactions,
   set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
   set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
-  stringstream *out)
+  DoutPrefixProvider *dpp)
 {
+  assert(written_map);
+  assert(transactions);
+  assert(temp_added);
+  assert(temp_removed);
+  assert(plan.t);
+  auto &t = *(plan.t);
+
+  auto &hash_infos = plan.hash_infos;
+
   assert(transactions);
   assert(temp_added);
   assert(temp_removed);
@@ -118,279 +129,528 @@ void ECTransaction::generate_transactions(
 
   t.safe_create_traverse(
     [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
-    const hobject_t &oid = opair.first;
+      const hobject_t &oid = opair.first;
+      auto &op = opair.second;
+      auto &obc_map = t.obc_map;
+      auto &written = (*written_map)[oid];
 
-    auto iter = obj_to_log.find(oid);
-    pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
+      auto iter = obj_to_log.find(oid);
+      pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
 
-    if (entry && opair.second.updated_snaps) {
-      entry->mod_desc.update_snaps(opair.second.updated_snaps->first);
-      vector<snapid_t> snaps(
-       opair.second.updated_snaps->second.begin(),
-       opair.second.updated_snaps->second.end());
-      ::encode(snaps, entry->snaps);
-    }
+      ObjectContextRef obc;
+      auto obiter = t.obc_map.find(oid);
+      if (obiter != t.obc_map.end()) {
+       obc = obiter->second;
+      }
+      if (entry) {
+       assert(obc);
+      } else {
+       assert(oid.is_temp());
+      }
 
-    ObjectContextRef obc;
-    auto obiter = t.obc_map.find(oid);
-    if (obiter != t.obc_map.end()) {
-      obc = obiter->second;
-    }
-    if (entry) {
-      assert(obc);
-    } else {
-      assert(oid.is_temp());
-    }
+      ECUtil::HashInfoRef hinfo;
+      {
+       auto iter = hash_infos.find(oid);
+       assert(iter != hash_infos.end());
+       hinfo = iter->second;
+      }
+
+      if (oid.is_temp()) {
+       if (op.is_fresh_object()) {
+         temp_added->insert(oid);
+       } else if (op.is_delete()) {
+         temp_removed->insert(oid);
+       }
+      }
+
+      if (entry &&
+         entry->is_modify() &&
+         op.updated_snaps) {
+       vector<snapid_t> snaps(
+         op.updated_snaps->second.begin(),
+         op.updated_snaps->second.end());
+       ::encode(snaps, entry->snaps);
+      }
+
+      ldpp_dout(dpp, 20) << "generate_transactions: "
+                        << opair.first
+                        << ", current size is "
+                        << hinfo->get_total_logical_size(sinfo)
+                        << " buffers are "
+                        << op.buffer_updates
+                        << dendl;
+      if (op.truncate) {
+       ldpp_dout(dpp, 20) << "generate_transactions: "
+                          << " truncate is "
+                          << *(op.truncate)
+                          << dendl;
+      }
+
+      if (entry && op.updated_snaps) {
+       entry->mod_desc.update_snaps(op.updated_snaps->first);
+      }
 
-    map<string, boost::optional<bufferlist> > xattr_rollback;
-    ECUtil::HashInfoRef hinfo;
-    {
-      auto iter = hash_infos.find(oid);
-      assert(iter != hash_infos.end());
-      hinfo = iter->second;
+      map<string, boost::optional<bufferlist> > xattr_rollback;
+      assert(hinfo);
       bufferlist old_hinfo;
       ::encode(*hinfo, old_hinfo);
       xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
-    }
+      
+      if (op.is_none() && op.truncate && op.truncate->first == 0) {
+       assert(op.truncate->first == 0);
+       assert(op.truncate->first ==
+              op.truncate->second);
+       assert(entry);
+       assert(obc);
+       
+       if (op.truncate->first != op.truncate->second) {
+         op.truncate->first = op.truncate->second;
+       } else {
+         op.truncate = boost::none;
+       }
 
-    if (opair.second.is_none() && opair.second.truncate) {
-      assert(opair.second.truncate->first == 0);
-      assert(opair.second.truncate->first ==
-            opair.second.truncate->second);
-      assert(entry);
-      assert(obc);
-
-      opair.second.truncate = boost::none;
-      opair.second.delete_first = true;
-      opair.second.init_type = PGTransaction::ObjectOperation::Init::Create();
-
-      if (obc) {
-       /* We need to reapply all of the cached xattrs.
-        * std::map insert fortunately only writes keys
-        * which don't already exist, so this should do
-        * the right thing. */
-       opair.second.attr_updates.insert(
-         obc->attr_cache.begin(),
-         obc->attr_cache.end());
-      }
-    }
+       op.delete_first = true;
+       op.init_type = PGTransaction::ObjectOperation::Init::Create();
 
-    if (oid.is_temp()) {
-      if (opair.second.is_fresh_object()) {
-       temp_added->insert(oid);
-      } else if (opair.second.is_delete()) {
-       temp_removed->insert(oid);
+       if (obc) {
+         /* We need to reapply all of the cached xattrs.
+            * std::map insert fortunately only writes keys
+            * which don't already exist, so this should do
+            * the right thing. */
+         op.attr_updates.insert(
+           obc->attr_cache.begin(),
+           obc->attr_cache.end());
+       }
       }
-    }
 
-    if (opair.second.delete_first) {
-      /* We also want to remove the boost::none entries since
-       * the keys already won't exist */
-      for (auto j = opair.second.attr_updates.begin();
-          j != opair.second.attr_updates.end();
-       ) {
-       if (j->second) {
-         ++j;
+      if (op.delete_first) {
+       /* We also want to remove the boost::none entries since
+          * the keys already won't exist */
+       for (auto j = op.attr_updates.begin();
+            j != op.attr_updates.end();
+         ) {
+         if (j->second) {
+           ++j;
+         } else {
+           op.attr_updates.erase(j++);
+         }
+       }
+       /* Fill in all current entries for xattr rollback */
+       if (obc) {
+         xattr_rollback.insert(
+           obc->attr_cache.begin(),
+           obc->attr_cache.end());
+         obc->attr_cache.clear();
+       }
+       if (entry) {
+         entry->mod_desc.rmobject(entry->version.version);
+         for (auto &&st: *transactions) {
+           st.second.collection_move_rename(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, entry->version.version, st.first));
+         }
        } else {
-         opair.second.attr_updates.erase(j++);
+         for (auto &&st: *transactions) {
+           st.second.remove(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
        }
+       hinfo->clear();
       }
-      /* Fill in all current entries for xattr rollback */
-      if (obc) {
-       xattr_rollback.insert(
-         obc->attr_cache.begin(),
-         obc->attr_cache.end());
-       obc->attr_cache.clear();
+
+      if (op.is_fresh_object() && entry) {
+       entry->mod_desc.create();
       }
-      if (entry) {
-       entry->mod_desc.rmobject(entry->version.version);
-       for (auto &&st: *transactions) {
-         st.second.collection_move_rename(
+
+      match(
+       op.init_type,
+       [&](const PGTransaction::ObjectOperation::Init::None &) {},
+       [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+         for (auto &&st: *transactions) {
+           st.second.touch(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+         for (auto &&st: *transactions) {
+           st.second.clone(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+
+         auto siter = hash_infos.find(op.source);
+         assert(siter != hash_infos.end());
+         hinfo->update_to(*(siter->second));
+
+         if (obc) {
+           auto cobciter = obc_map.find(op.source);
+           assert(cobciter != obc_map.end());
+           obc->attr_cache = cobciter->second->attr_cache;
+         }
+       },
+       [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+         assert(op.source.is_temp());
+         for (auto &&st: *transactions) {
+           st.second.collection_move_rename(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+         }
+         auto siter = hash_infos.find(op.source);
+         assert(siter != hash_infos.end());
+         hinfo->update_to(*(siter->second));
+         if (obc) {
+           auto cobciter = obc_map.find(op.source);
+           assert(cobciter == obc_map.end());
+           obc->attr_cache.clear();
+         }
+       });
+
+      // omap not supported (except 0, handled above)
+      assert(!(op.clear_omap));
+      assert(!(op.omap_header));
+      assert(op.omap_updates.empty());
+
+      if (!op.attr_updates.empty()) {
+       map<string, bufferlist> to_set;
+       for (auto &&j: op.attr_updates) {
+         if (j.second) {
+           to_set[j.first] = *(j.second);
+         } else {
+           for (auto &&st : *transactions) {
+             st.second.rmattr(
+               coll_t(spg_t(pgid, st.first)),
+               ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+               j.first);
+           }
+         }
+         if (obc) {
+           auto citer = obc->attr_cache.find(j.first);
+           if (entry) {
+             if (citer != obc->attr_cache.end()) {
+               // won't overwrite anything we put in earlier
+               xattr_rollback.insert(
+                 make_pair(
+                   j.first,
+                   boost::optional<bufferlist>(citer->second)));
+             } else {
+               // won't overwrite anything we put in earlier
+               xattr_rollback.insert(
+                 make_pair(
+                   j.first,
+                   boost::none));
+             }
+           }
+           if (j.second) {
+             obc->attr_cache[j.first] = *(j.second);
+           } else if (citer != obc->attr_cache.end()) {
+             obc->attr_cache.erase(citer);
+           }
+         } else {
+           assert(!entry);
+         }
+       }
+       for (auto &&st : *transactions) {
+         st.second.setattrs(
            coll_t(spg_t(pgid, st.first)),
            ghobject_t(oid, ghobject_t::NO_GEN, st.first),
-           coll_t(spg_t(pgid, st.first)),
-           ghobject_t(oid, entry->version.version, st.first));
+           to_set);
        }
-      } else {
-       for (auto &&st: *transactions) {
-         st.second.remove(
+       assert(!xattr_rollback.empty());
+      }
+      if (entry && !xattr_rollback.empty()) {
+       entry->mod_desc.setattrs(xattr_rollback);
+      }
+
+      if (op.alloc_hint) {
+       /* logical_to_next_chunk_offset() scales down both aligned and
+          * unaligned offsets
+          
+          * we don't bother to roll this back at this time for two reasons:
+          * 1) it's advisory
+          * 2) we don't track the old value */
+       uint64_t object_size = sinfo.logical_to_next_chunk_offset(
+         op.alloc_hint->expected_object_size);
+       uint64_t write_size = sinfo.logical_to_next_chunk_offset(
+         op.alloc_hint->expected_write_size);
+       
+       for (auto &&st : *transactions) {
+         st.second.set_alloc_hint(
            coll_t(spg_t(pgid, st.first)),
-           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+           object_size,
+           write_size,
+           op.alloc_hint->flags);
        }
       }
-      hinfo->clear();
-    }
 
-    if (opair.second.is_fresh_object() && entry) {
-      entry->mod_desc.create();
-    }
+      extent_map to_write;
+      auto pextiter = partial_extents.find(oid);
+      if (pextiter != partial_extents.end()) {
+       to_write = pextiter->second;
+      }
 
-    match(
-      opair.second.init_type,
-      [&](const PGTransaction::ObjectOperation::Init::None &) {},
-      [&](const PGTransaction::ObjectOperation::Init::Create &op) {
-       for (auto &&st: *transactions) {
-         st.second.touch(
-           coll_t(spg_t(pgid, st.first)),
-           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
-       }
-      },
-      [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
-       for (auto &&st: *transactions) {
-         st.second.clone(
-           coll_t(spg_t(pgid, st.first)),
-           ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
-           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+      vector<pair<uint64_t, uint64_t> > rollback_extents;
+      const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
+
+      uint64_t new_size = orig_size;
+      uint64_t append_after = new_size;
+      ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
+      if (op.truncate && op.truncate->first < new_size) {
+       assert(!op.is_fresh_object());
+       new_size = sinfo.logical_to_next_stripe_offset(
+         op.truncate->first);
+       ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
+                          << new_size << dendl;
+       if (new_size != op.truncate->first) { // 0 the unaligned part
+         bufferlist bl;
+         bl.append_zero(new_size - op.truncate->first);
+         to_write.insert(
+           op.truncate->first,
+           bl.length(),
+           bl);
+         append_after = sinfo.logical_to_prev_stripe_offset(
+           op.truncate->first);
+       } else {
+         append_after = new_size;
        }
+       to_write.erase(
+         new_size,
+         std::numeric_limits<uint64_t>::max() - new_size);
 
-       auto siter = hash_infos.find(op.source);
-       assert(siter != hash_infos.end());
-       *hinfo = *(siter->second);
+       if (entry && !op.is_fresh_object()) {
+         uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
+           op.truncate->first);
+         uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
+           orig_size -
+           sinfo.logical_to_prev_stripe_offset(op.truncate->first));
+         assert(rollback_extents.empty());
 
-       if (obc) {
-         auto cobciter = t.obc_map.find(op.source);
-         assert(cobciter != t.obc_map.end());
-         obc->attr_cache = cobciter->second->attr_cache;
-       }
-      },
-      [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
-       assert(op.source.is_temp());
-       for (auto &&st: *transactions) {
-         st.second.collection_move_rename(
-           coll_t(spg_t(pgid, st.first)),
-           ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
-           coll_t(spg_t(pgid, st.first)),
-           ghobject_t(oid, ghobject_t::NO_GEN, st.first));
-       }
-       auto siter = hash_infos.find(op.source);
-       assert(siter != hash_infos.end());
-       *hinfo = *(siter->second);
-       assert(obc->attr_cache.empty());
-      });
-
-    // omap, truncate not supported (except 0, handled above)
-    assert(!(opair.second.clear_omap));
-    assert(!(opair.second.truncate));
-    assert(!(opair.second.omap_header));
-    assert(opair.second.omap_updates.empty());
-
-    if (!opair.second.attr_updates.empty()) {
-      map<string, bufferlist> to_set;
-      for (auto &&j: opair.second.attr_updates) {
-       if (j.second) {
-         to_set[j.first] = *(j.second);
-       } else {
+         ldpp_dout(dpp, 20) << __func__ << ": saving extent "
+                            << make_pair(restore_from, restore_len)
+                            << dendl;
+         ldpp_dout(dpp, 20) << __func__ << ": truncating to "
+                            << new_size
+                            << dendl;
+         rollback_extents.emplace_back(
+           make_pair(restore_from, restore_len));
          for (auto &&st : *transactions) {
-           st.second.rmattr(
+           st.second.touch(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, entry->version.version, st.first));
+           st.second.clone_range(
              coll_t(spg_t(pgid, st.first)),
              ghobject_t(oid, ghobject_t::NO_GEN, st.first),
-             j.first);
-         }
-       }
-       if (obc) {
-         auto citer = obc->attr_cache.find(j.first);
-         if (entry) {
-           if (citer != obc->attr_cache.end()) {
-             // won't overwrite anything we put in earlier
-             xattr_rollback.insert(
-               make_pair(
-                 j.first,
-                 boost::optional<bufferlist>(citer->second)));
-           } else {
-             // won't overwrite anything we put in earlier
-             xattr_rollback.insert(
-               make_pair(
-                 j.first,
-                 boost::none));
-           }
-         }
-         if (j.second) {
-           obc->attr_cache[j.first] = *(j.second);
-         } else if (citer != obc->attr_cache.end()) {
-           obc->attr_cache.erase(citer);
+             ghobject_t(oid, entry->version.version, st.first),
+             restore_from,
+             restore_len,
+             restore_from);
+           
          }
        } else {
-         assert(!entry);
+         ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
+                            << dendl;
+       }
+       for (auto &&st : *transactions) {
+         st.second.truncate(
+           coll_t(spg_t(pgid, st.first)),
+           ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+           sinfo.aligned_logical_offset_to_chunk_offset(new_size));
        }
       }
-      for (auto &&st : *transactions) {
-       st.second.setattrs(
-         coll_t(spg_t(pgid, st.first)),
-         ghobject_t(oid, ghobject_t::NO_GEN, st.first),
-         to_set);
-      }
-      assert(!xattr_rollback.empty());
-    }
-    if (entry && !xattr_rollback.empty()) {
-      entry->mod_desc.setattrs(xattr_rollback);
-    }
-
-    if (opair.second.alloc_hint) {
-      /* logical_to_next_chunk_offset() scales down both aligned and
-       * unaligned offsets
-
-       * we don't bother to roll this back at this time for two reasons:
-       * 1) it's advisory
-       * 2) we don't track the old value */
-      uint64_t object_size = sinfo.logical_to_next_chunk_offset(
-       opair.second.alloc_hint->expected_object_size);
-      uint64_t write_size = sinfo.logical_to_next_chunk_offset(
-       opair.second.alloc_hint->expected_write_size);
-
-      for (auto &&st : *transactions) {
-       st.second.set_alloc_hint(
-         coll_t(spg_t(pgid, st.first)),
-         ghobject_t(oid, ghobject_t::NO_GEN, st.first),
-         object_size,
-         write_size,
-         opair.second.alloc_hint->flags);
-      }
-    }
 
-
-    if (!opair.second.buffer_updates.empty()) {
-      set<int> want;
-      for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
-       want.insert(i);
-      }
-      if (entry) {
-       entry->mod_desc.append(
-         sinfo.aligned_chunk_offset_to_logical_offset(
-           hinfo->get_total_chunk_size()
-           ));
-      }
-      for (auto &&extent: opair.second.buffer_updates) {
+      uint32_t fadvise_flags = 0;
+      for (auto &&extent: op.buffer_updates) {
        using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+       bufferlist bl;
        match(
          extent.get_val(),
          [&](const BufferUpdate::Write &op) {
-           if (extent.get_len()) {
-             assert(op.buffer.length() == extent.get_len());
-             bufferlist bl = op.buffer;
-             append(
-               pgid,
-               oid,
-               sinfo,
-               ecimpl,
-               want,
-               extent.get_off(),
-               bl,
-               op.fadvise_flags,
-               hinfo,
-               transactions);
-           }
+           bl = op.buffer;
+           fadvise_flags |= op.fadvise_flags;
          },
          [&](const BufferUpdate::Zero &) {
-           assert(
-             0 ==
-             "Zero is not allowed, do_op should have returned ENOTSUPP");
+           bl.append_zero(extent.get_len());
          },
          [&](const BufferUpdate::CloneRange &) {
            assert(
              0 ==
              "CloneRange is not allowed, do_op should have returned ENOTSUPP");
          });
+
+       uint64_t off = extent.get_off();
+       uint64_t len = extent.get_len();
+       uint64_t end = off + len;
+       ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
+                          << make_pair(off, len)
+                          << dendl;
+       assert(len > 0);
+       if (off > new_size) {
+         assert(off > append_after);
+         bl.prepend_zero(off - new_size);
+         len += off - new_size;
+         ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
+                            << off << "->" << new_size
+                            << dendl;
+         off = new_size;
+       }
+       if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
+         uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
+           end);
+         uint64_t tail = aligned_end - end;
+         bl.append_zero(tail);
+         ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
+                            << end << "->" << end+tail
+                            << ", len: " << len << "->" << len+tail
+                            << dendl;
+         end += tail;
+         len += tail;
+       }
+
+       to_write.insert(off, len, bl);
+       if (end > new_size)
+         new_size = end;
       }
-    }
-  });
+
+      if (op.truncate &&
+         op.truncate->second > new_size) {
+       assert(op.truncate->second > append_after);
+       uint64_t truncate_to =
+         sinfo.logical_to_next_stripe_offset(
+           op.truncate->second);
+       uint64_t zeroes = truncate_to - new_size;
+       bufferlist bl;
+       bl.append_zero(zeroes);
+       to_write.insert(
+         new_size,
+         zeroes,
+         bl);
+       new_size = truncate_to;
+       ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
+                          << truncate_to
+                          << dendl;
+      }
+
+      set<int> want;
+      for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
+       want.insert(i);
+      }
+      auto to_overwrite = to_write.intersect(0, append_after);
+      ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
+                        << to_overwrite
+                        << dendl;
+      for (auto &&extent: to_overwrite) {
+       assert(extent.get_off() + extent.get_len() <= append_after);
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
+       if (entry) {
+         uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
+           extent.get_off());
+         uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
+           extent.get_len());
+         ldpp_dout(dpp, 20) << __func__ << ": overwriting "
+                            << restore_from << "~" << restore_len
+                            << dendl;
+         if (rollback_extents.empty()) {
+           for (auto &&st : *transactions) {
+             st.second.touch(
+               coll_t(spg_t(pgid, st.first)),
+               ghobject_t(oid, entry->version.version, st.first));
+           }
+         }
+         rollback_extents.emplace_back(make_pair(restore_from, restore_len));
+         for (auto &&st : *transactions) {
+           st.second.clone_range(
+             coll_t(spg_t(pgid, st.first)),
+             ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+             ghobject_t(oid, entry->version.version, st.first),
+             restore_from,
+             restore_len,
+             restore_from);
+         }
+       }
+       encode_and_write(
+         pgid,
+         oid,
+         sinfo,
+         ecimpl,
+         want,
+         extent.get_off(),
+         extent.get_val(),
+         fadvise_flags,
+         hinfo,
+         written,
+         transactions,
+         dpp);
+      }
+
+      auto to_append = to_write.intersect(
+       append_after,
+       std::numeric_limits<uint64_t>::max() - append_after);
+      ldpp_dout(dpp, 20) << __func__ << ": to_append: "
+                        << to_append
+                        << dendl;
+      for (auto &&extent: to_append) {
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
+       assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
+       ldpp_dout(dpp, 20) << __func__ << ": appending "
+                          << extent.get_off() << "~" << extent.get_len()
+                          << dendl;
+       encode_and_write(
+         pgid,
+         oid,
+         sinfo,
+         ecimpl,
+         want,
+         extent.get_off(),
+         extent.get_val(),
+         fadvise_flags,
+         hinfo,
+         written,
+         transactions,
+         dpp);
+      }
+
+      ldpp_dout(dpp, 20) << __func__ << ": " << oid
+                        << " resetting hinfo to logical size "
+                        << new_size
+                        << dendl;
+      if (!rollback_extents.empty() && entry) {
+       if (entry) {
+         ldpp_dout(dpp, 20) << __func__ << ": " << oid
+                            << " marking rollback extents "
+                            << rollback_extents
+                            << dendl;
+         entry->mod_desc.rollback_extents(
+           entry->version.version, rollback_extents);
+       }
+       hinfo->set_total_chunk_size_clear_hash(
+         sinfo.aligned_logical_offset_to_chunk_offset(new_size));
+      } else {
+       assert(hinfo->get_total_logical_size(sinfo) == new_size);
+      }
+
+      if (entry && !to_append.empty()) {
+       ldpp_dout(dpp, 20) << __func__ << ": marking append "
+                          << append_after
+                          << dendl;
+       entry->mod_desc.append(append_after);
+      }
+
+      bufferlist hbuf;
+      ::encode(*hinfo, hbuf);
+      for (auto &&i : *transactions) {
+       i.second.setattr(
+         coll_t(spg_t(pgid, i.first)),
+         ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+         ECUtil::get_hinfo_key(),
+         hbuf);
+      }
+    });
 }
index aa6bef5fabdf340a7e14a4657ef9580267d99d1c..789f8c8ec93cdbe3135cf4b94099aae563b13a3b 100644 (file)
 #include "ECUtil.h"
 #include "erasure-code/ErasureCodeInterface.h"
 #include "PGTransaction.h"
+#include "ExtentCache.h"
 
 namespace ECTransaction {
-  void get_append_objects(
-    const PGTransaction &t,
-    set<hobject_t, hobject_t::BitwiseComparator> *out);
+  struct WritePlan {
+    PGTransactionUPtr t;
+    bool invalidates_cache = false; // Yes, both are possible
+    hobject_t::bitwisemap<extent_set> to_read;
+    hobject_t::bitwisemap<extent_set> will_write; // superset of to_read
+
+    hobject_t::bitwisemap<ECUtil::HashInfoRef> hash_infos;
+  };
+
+  bool requires_overwrite(
+    uint64_t prev_size,
+    const PGTransaction::ObjectOperation &op);
+
+  template <typename F>
+  WritePlan get_write_plan(
+    const ECUtil::stripe_info_t &sinfo,
+    PGTransactionUPtr &&t,
+    F &&get_hinfo,
+    DoutPrefixProvider *dpp) {
+    WritePlan plan;
+    t->safe_create_traverse(
+      [&](pair<const hobject_t, PGTransaction::ObjectOperation> &i) {
+       ECUtil::HashInfoRef hinfo = get_hinfo(i.first);
+       plan.hash_infos[i.first] = hinfo;
+
+       uint64_t projected_size =
+         hinfo->get_projected_total_logical_size(sinfo);
+
+       if (i.second.has_source()) {
+         plan.invalidates_cache = true;
+       }
+
+       if (i.second.is_delete()) {
+         projected_size = 0;
+       }
+
+       hobject_t source;
+       if (i.second.has_source(&source)) {
+         ECUtil::HashInfoRef shinfo = get_hinfo(source);
+         projected_size = shinfo->get_projected_total_logical_size(sinfo);
+         plan.hash_infos[source] = shinfo;
+       }
+
+       auto &will_write = plan.will_write[i.first];
+       if (i.second.truncate &&
+           i.second.truncate->first < projected_size) {
+         if (!(sinfo.logical_offset_is_stripe_aligned(
+                 i.second.truncate->first))) {
+           plan.to_read[i.first].insert(
+             sinfo.logical_to_prev_stripe_offset(i.second.truncate->first),
+             sinfo.get_stripe_width());
+
+           ldpp_dout(dpp, 20) << __func__ << ": unaligned truncate" << dendl;
+
+           will_write.insert(
+             sinfo.logical_to_prev_stripe_offset(i.second.truncate->first),
+             sinfo.get_stripe_width());
+         }
+         projected_size = sinfo.logical_to_next_stripe_offset(
+           i.second.truncate->first);
+       }
+
+       for (auto &&extent: i.second.buffer_updates) {
+         using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+         if (boost::get<BufferUpdate::CloneRange>(&(extent.get_val()))) {
+           assert(
+             0 ==
+             "CloneRange is not allowed, do_op should have returned ENOTSUPP");
+         }
+
+         uint64_t head_start =
+           sinfo.logical_to_prev_stripe_offset(extent.get_off());
+         uint64_t head_finish =
+           sinfo.logical_to_next_stripe_offset(extent.get_off());
+         if (head_start > projected_size) {
+           head_start = projected_size;
+         }
+         if (head_start != head_finish &&
+             head_start < projected_size) {
+           assert(head_finish <= projected_size);
+           assert(head_finish - head_start == sinfo.get_stripe_width());
+           plan.to_read[i.first].insert(
+             head_start, sinfo.get_stripe_width());
+         }
+
+         uint64_t tail_start =
+           sinfo.logical_to_prev_stripe_offset(
+             extent.get_off() + extent.get_len());
+         uint64_t tail_finish =
+           sinfo.logical_to_next_stripe_offset(
+             extent.get_off() + extent.get_len());
+         if (tail_start != tail_finish &&
+             tail_start != head_start &&
+             tail_start < projected_size) {
+           assert(tail_finish <= projected_size);
+           assert(tail_finish - tail_start == sinfo.get_stripe_width());
+           plan.to_read[i.first].insert(
+             tail_start, sinfo.get_stripe_width());
+         }
+
+         if (head_start != tail_finish) {
+           assert(
+             sinfo.logical_offset_is_stripe_aligned(
+               tail_finish - head_start)
+             );
+           will_write.insert(
+             head_start, tail_finish - head_start);
+           if (tail_finish > projected_size)
+             projected_size = tail_finish;
+         } else {
+           assert(tail_finish <= projected_size);
+         }
+       }
+
+       if (i.second.truncate &&
+           i.second.truncate->second > projected_size) {
+         uint64_t truncating_to =
+           sinfo.logical_to_next_stripe_offset(i.second.truncate->second);
+         ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
+                            <<  truncating_to
+                            << dendl;
+         will_write.insert(projected_size, truncating_to - projected_size);
+         projected_size = truncating_to;
+       }
+
+       ldpp_dout(dpp, 20) << __func__ << ": " << i.first
+                          << " projected size "
+                          << projected_size
+                          << dendl;
+       hinfo->set_projected_total_logical_size(
+         sinfo,
+         projected_size);
+       assert(plan.to_read[i.first].empty() || !i.second.has_source());
+      });
+    plan.t = std::move(t);
+    return plan;
+  }
+
   void generate_transactions(
-    PGTransaction &t,
-    map<
-      hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
-      > &hash_infos,
+    WritePlan &plan,
     ErasureCodeInterfaceRef &ecimpl,
     pg_t pgid,
+    bool legacy_log_entries,
     const ECUtil::stripe_info_t &sinfo,
+    const hobject_t::bitwisemap<extent_map> &partial_extents,
     vector<pg_log_entry_t> &entries,
+    hobject_t::bitwisemap<extent_map> *written,
     map<shard_id_t, ObjectStore::Transaction> *transactions,
     set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
     set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
-    stringstream *out = 0);
+    DoutPrefixProvider *dpp);
 };
 
 #endif
index 53fe1a203bf8130312278915e60869fa0c6edeea..6e046526edaeb6c9ea4b157bbb7ecbefe2bc722d 100644 (file)
@@ -139,16 +139,18 @@ int ECUtil::encode(
 
 void ECUtil::HashInfo::append(uint64_t old_size,
                              map<int, bufferlist> &to_append) {
-  assert(to_append.size() == cumulative_shard_hashes.size());
   assert(old_size == total_chunk_size);
   uint64_t size_to_append = to_append.begin()->second.length();
-  for (map<int, bufferlist>::iterator i = to_append.begin();
-       i != to_append.end();
-       ++i) {
-    assert(size_to_append == i->second.length());
-    assert((unsigned)i->first < cumulative_shard_hashes.size());
-    uint32_t new_hash = i->second.crc32c(cumulative_shard_hashes[i->first]);
-    cumulative_shard_hashes[i->first] = new_hash;
+  if (has_chunk_hash()) {
+    assert(to_append.size() == cumulative_shard_hashes.size());
+    for (map<int, bufferlist>::iterator i = to_append.begin();
+        i != to_append.end();
+        ++i) {
+      assert(size_to_append == i->second.length());
+      assert((unsigned)i->first < cumulative_shard_hashes.size());
+      uint32_t new_hash = i->second.crc32c(cumulative_shard_hashes[i->first]);
+      cumulative_shard_hashes[i->first] = new_hash;
+    }
   }
   total_chunk_size += size_to_append;
 }
@@ -166,6 +168,7 @@ void ECUtil::HashInfo::decode(bufferlist::iterator &bl)
   DECODE_START(1, bl);
   ::decode(total_chunk_size, bl);
   ::decode(cumulative_shard_hashes, bl);
+  projected_total_chunk_size = total_chunk_size;
   DECODE_FINISH(bl);
 }
 
index 2706bba67fca2cb2bfd086c4c2f576021a80f7cb..4ca6f550bee8866e81246648b5cc7648fbca9283 100644 (file)
@@ -41,6 +41,9 @@ public:
       chunk_size(stripe_width / stripe_size) {
     assert(stripe_width % stripe_size == 0);
   }
+  bool logical_offset_is_stripe_aligned(uint64_t logical) const {
+    return (logical % stripe_width) == 0;
+  }
   uint64_t get_stripe_width() const {
     return stripe_width;
   }
@@ -104,12 +107,14 @@ int encode(
   map<int, bufferlist> *out);
 
 class HashInfo {
-  uint64_t total_chunk_size;
+  uint64_t total_chunk_size = 0;
   vector<uint32_t> cumulative_shard_hashes;
+
+  // purely ephemeral, represents the size once all in-flight ops commit
+  uint64_t projected_total_chunk_size = 0;
 public:
-  HashInfo() : total_chunk_size(0) {}
-  explicit HashInfo(unsigned num_chunks)
-  : total_chunk_size(0),
+  HashInfo() {}
+  explicit HashInfo(unsigned num_chunks) :
     cumulative_shard_hashes(num_chunks, -1) {}
   void append(uint64_t old_size, map<int, bufferlist> &to_append);
   void clear() {
@@ -129,7 +134,38 @@ public:
   uint64_t get_total_chunk_size() const {
     return total_chunk_size;
   }
+  uint64_t get_projected_total_chunk_size() const {
+    return projected_total_chunk_size;
+  }
+  uint64_t get_total_logical_size(const stripe_info_t &sinfo) const {
+    return get_total_chunk_size() *
+      (sinfo.get_stripe_width()/sinfo.get_chunk_size());
+  }
+  uint64_t get_projected_total_logical_size(const stripe_info_t &sinfo) const {
+    return get_projected_total_chunk_size() *
+      (sinfo.get_stripe_width()/sinfo.get_chunk_size());
+  }
+  void set_projected_total_logical_size(
+    const stripe_info_t &sinfo,
+    uint64_t logical_size) {
+    assert(sinfo.logical_offset_is_stripe_aligned(logical_size));
+    projected_total_chunk_size = sinfo.aligned_logical_offset_to_chunk_offset(
+      logical_size);
+  }
+  void set_total_chunk_size_clear_hash(uint64_t new_chunk_size) {
+    cumulative_shard_hashes.clear();
+    total_chunk_size = new_chunk_size;
+  }
+  bool has_chunk_hash() const {
+    return !cumulative_shard_hashes.empty();
+  }
+  void update_to(const HashInfo &rhs) {
+    auto ptcs = projected_total_chunk_size;
+    *this = rhs;
+    projected_total_chunk_size = ptcs;
+  }
 };
+
 typedef ceph::shared_ptr<HashInfo> HashInfoRef;
 
 bool is_hinfo_key_string(const string &key);
index 5e1a4a9e10eb519c14ef9755a7b8e66488345a04..ca28a1d50ceb74d56058359311e3b2bcc6d10923 100644 (file)
@@ -12935,6 +12935,13 @@ void ReplicatedPG::_scrub_finish()
   bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
   const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
 
+  if (pool.info.is_hacky_ecoverwrites()) {
+    dout(10) << __func__
+            << ": skipping stat comparisons since hacky_overwrites are enabled"
+            << dendl;
+    return;
+  }
+
   if (info.stats.stats_invalid) {
     info.stats.stats = scrub_cstat;
     info.stats.stats_invalid = false;
index 3c3ab2fff6ca1cd6e78ccd6fe2f85b849e7e31fe..a831df932e1b71050ce857a5bff30f049c5f8f3b 100644 (file)
@@ -1406,6 +1406,10 @@ public:
   }
   uint64_t required_alignment() const { return stripe_width; }
 
+  bool is_hacky_ecoverwrites() const {
+    return has_flag(FLAG_EC_OVERWRITES);
+  }
+
   bool can_shift_osds() const {
     switch (get_type()) {
     case TYPE_REPLICATED: