]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: Use MOSDPGUpdateLogMissing to implement mark_unfound_lost_delete safely 7765/head
authorSamuel Just <sjust@redhat.com>
Fri, 12 Feb 2016 17:27:01 +0000 (09:27 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 25 Feb 2016 19:13:59 +0000 (11:13 -0800)
Using a MOSDPGLog was unsafe since it is not ordered with
respect to repops.  Instead, use a new message sent through
the same paths as repops.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/PGBackend.cc
src/osd/PGBackend.h
src/osd/PGLog.cc
src/osd/PGLog.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.cc
src/osd/osd_types.h

index fc945fec2d9ebaab07ecfbbdfb36a6a073236c62..616019785f77a30d84f13bad61c266c8cb828e13 100644 (file)
@@ -5292,7 +5292,12 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
          // simulate pg <pgid> cmd= for pg->do-command
          if (prefix != "pg")
            cmd_putval(cct, cmdmap, "cmd", prefix);
-         r = pg->do_command(cmdmap, ss, data, odata);
+         r = pg->do_command(cmdmap, ss, data, odata, con, tid);
+         if (r == -EAGAIN) {
+           pg->unlock();
+           // don't reply, pg will do so async
+           return;
+         }
        } else {
          ss << "not primary for pgid " << pgid;
 
@@ -5592,7 +5597,6 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     reply->set_data(odata);
     con->send_message(reply);
   }
-  return;
 }
 
 
index 0b32703037f31104c5a5083bbb1167a03a6c887a..ac1eec876487c046241bc014b8bcbf92c7481869 100644 (file)
@@ -4556,39 +4556,71 @@ void PG::share_pg_info()
   }
 }
 
-/*
- * Share a new segment of this PG's log with some replicas, after PG is active.
- *
- * Updates peer_missing and peer_info.
- */
-void PG::share_pg_log()
+void PG::append_log_entries_update_missing(
+  const list<pg_log_entry_t> &entries,
+  ObjectStore::Transaction &t)
 {
-  dout(10) << __func__ << dendl;
+  assert(!entries.empty());
+  assert(entries.begin()->version > info.last_update);
+
+  PGLogEntryHandler rollbacker;
+  pg_log.append_new_log_entries(
+    info.last_backfill,
+    info.last_backfill_bitwise,
+    entries,
+    &rollbacker);
+  rollbacker.apply(this, &t);
+  info.last_update = pg_log.get_head();
+
+  if (pg_log.get_missing().num_missing() == 0) {
+    // advance last_complete since nothing else is missing!
+    info.last_complete = info.last_update;
+  }
+
+  info.stats.stats_invalid = true;
+  dirty_info = true;
+  write_if_dirty(t);
+}
+
+
+void PG::merge_new_log_entries(
+  const list<pg_log_entry_t> &entries,
+  ObjectStore::Transaction &t)
+{
+  dout(10) << __func__ << " " << entries << dendl;
   assert(is_primary());
 
-  set<pg_shard_t>::const_iterator a = actingbackfill.begin();
-  assert(a != actingbackfill.end());
-  set<pg_shard_t>::const_iterator end = actingbackfill.end();
-  while (a != end) {
-    pg_shard_t peer(*a);
-    ++a;
+  append_log_entries_update_missing(entries, t);
+  for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
+       i != actingbackfill.end();
+       ++i) {
+    pg_shard_t peer(*i);
     if (peer == pg_whoami) continue;
+    assert(peer_missing.count(peer));
+    assert(peer_info.count(peer));
     pg_missing_t& pmissing(peer_missing[peer]);
     pg_info_t& pinfo(peer_info[peer]);
-
-    MOSDPGLog *m = new MOSDPGLog(
-      peer.shard, pg_whoami.shard,
-      info.last_update.epoch, info);
-    m->log.copy_after(pg_log.get_log(), pinfo.last_update);
-
-    for (list<pg_log_entry_t>::const_iterator i = m->log.log.begin();
-        i != m->log.log.end();
-        ++i) {
-      pmissing.add_next_event(*i);
-    }
-    pinfo.last_update = m->log.head;
-
-    osd->send_message_osd_cluster(peer.osd, m, get_osdmap()->get_epoch());
+    PGLog::append_log_entries_update_missing(
+      pinfo.last_backfill,
+      info.last_backfill_bitwise,
+      entries,
+      NULL,
+      pmissing,
+      NULL,
+      this);
+    pinfo.last_update = info.last_update;
+    pinfo.stats.stats_invalid = true;
+  }
+  for (auto &&i: entries) {
+    missing_loc.rebuild(
+      i.soid,
+      get_sort_bitwise(),
+      pg_whoami,
+      actingbackfill,
+      info,
+      pg_log.get_missing(),
+      peer_missing,
+      peer_info);
   }
 }
 
@@ -6640,7 +6672,6 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
     if (pg->cct->_conf->osd_auto_mark_unfound_lost) {
       pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound
                            << " objects unfound and apparently lost, would automatically marking lost but NOT IMPLEMENTED\n";
-      //pg->mark_all_unfound_lost(*context< RecoveryMachine >().get_cur_transaction());
     } else
       pg->osd->clog->error() << pg->info.pgid.pgid << " has " << unfound << " objects unfound and apparently lost\n";
   }
index e0543cc2a24748e030dcde03b8e93e429d73e5e2..ada160153fcf959945dc461a024ffef330c23513 100644 (file)
@@ -423,6 +423,55 @@ public:
       missing_loc.erase(hoid);
     }
 
+    /// Call to update structures for hoid after a change
+    void rebuild(
+      const hobject_t &hoid,
+      bool sort_bitwise,
+      pg_shard_t self,
+      const set<pg_shard_t> to_recover,
+      const pg_info_t &info,
+      const pg_missing_t &missing,
+      const map<pg_shard_t, pg_missing_t> &pmissing,
+      const map<pg_shard_t, pg_info_t> &pinfo) {
+      recovered(hoid);
+      boost::optional<pg_missing_t::item> item;
+      set<pg_shard_t> have;
+      auto miter = missing.missing.find(hoid);
+      if (miter != missing.missing.end()) {
+       item = miter->second;
+      } else {
+       for (auto &&i: to_recover) {
+         if (i == self)
+           continue;
+         auto pmiter = pmissing.find(i);
+         assert(pmiter != pmissing.end());
+         miter = pmiter->second.missing.find(hoid);
+         if (miter != pmiter->second.missing.end()) {
+           item = miter->second;
+           break;
+         }
+       }
+      }
+      if (!item)
+       return; // recovered!
+
+      needs_recovery_map[hoid] = *item;
+      auto mliter =
+       missing_loc.insert(make_pair(hoid, set<pg_shard_t>())).first;
+      assert(info.last_backfill == hobject_t::get_max());
+      assert(info.last_update >= item->need);
+      if (!missing.is_missing(hoid))
+       mliter->second.insert(self);
+      for (auto &&i: pmissing) {
+       auto pinfoiter = pinfo.find(i.first);
+       assert(pinfoiter != pinfo.end());
+       if (item->need <= pinfoiter->second.last_update &&
+           cmp(hoid, pinfoiter->second.last_backfill, sort_bitwise) <= 0 &&
+           !i.second.is_missing(hoid))
+         mliter->second.insert(i.first);
+      }
+    }
+
     const set<pg_shard_t> &get_locations(const hobject_t &hoid) const {
       return missing_loc.count(hoid) ?
        missing_loc.find(hoid)->second : empty_set;
@@ -854,7 +903,6 @@ public:
   bool adjust_need_up_thru(const OSDMapRef osdmap);
 
   bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
-  virtual void mark_all_unfound_lost(int how) = 0;
   virtual void dump_recovery_info(Formatter *f) const = 0;
 
   bool calc_min_last_complete_ondisk() {
@@ -920,11 +968,15 @@ public:
     list<pg_log_entry_t> to_rollback;
     set<hobject_t, hobject_t::BitwiseComparator> to_remove;
     list<pg_log_entry_t> to_trim;
+    list<pair<hobject_t, version_t> > to_stash;
     
     // LogEntryHandler
     void remove(const hobject_t &hoid) {
       to_remove.insert(hoid);
     }
+    void try_stash(const hobject_t &hoid, version_t v) {
+      to_stash.push_back(make_pair(hoid, v));
+    }
     void rollback(const pg_log_entry_t &entry) {
       to_rollback.push_back(entry);
     }
@@ -941,6 +993,11 @@ public:
        SnapRollBacker rollbacker(j->soid, pg, t);
        j->mod_desc.visit(&rollbacker);
       }
+      for (list<pair<hobject_t, version_t> >::iterator i = to_stash.begin();
+          i != to_stash.end();
+          ++i) {
+       pg->get_pgbackend()->try_stash(i->first, i->second, t);
+      }
       for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = to_remove.begin();
           i != to_remove.end();
           ++i) {
@@ -2218,8 +2275,19 @@ public:
 
   /// share pg info after a pg is active
   void share_pg_info();
-  /// share new pg log entries after a pg is active
-  void share_pg_log();
+
+
+  void append_log_entries_update_missing(
+    const list<pg_log_entry_t> &entries,
+    ObjectStore::Transaction &t);
+
+  /**
+   * Merge entries updating missing as necessary on all
+   * actingbackfill logs and missings (also missing_loc)
+   */
+  void merge_new_log_entries(
+    const list<pg_log_entry_t> &entries,
+    ObjectStore::Transaction &t);
 
   void reset_interval_flush();
   void start_peering_interval(
@@ -2315,8 +2383,13 @@ public:
   virtual void do_backfill(OpRequestRef op) = 0;
   virtual void snap_trimmer(epoch_t epoch_queued) = 0;
 
-  virtual int do_command(cmdmap_t cmdmap, ostream& ss,
-                        bufferlist& idata, bufferlist& odata) = 0;
+  virtual int do_command(
+    cmdmap_t cmdmap,
+    ostream& ss,
+    bufferlist& idata,
+    bufferlist& odata,
+    ConnectionRef conn,
+    ceph_tid_t tid) = 0;
 
   virtual void on_role_change() = 0;
   virtual void on_pool_change() = 0;
index 2ffe6a3cff64bd9fc9909cfdb5b94fd7778faaec..d2e7a629680ff7ba2f940b943032cdf5fc10981a 100644 (file)
@@ -59,6 +59,12 @@ struct RollbackVisitor : public ObjectModDesc::Visitor {
     temp.append(t);
     temp.swap(t);
   }
+  void try_rmobject(version_t old_version) {
+    ObjectStore::Transaction temp;
+    pg->rollback_try_stash(hoid, old_version, &temp);
+    temp.append(t);
+    temp.swap(t);
+  }
   void create() {
     ObjectStore::Transaction temp;
     pg->rollback_create(hoid, &temp);
@@ -82,6 +88,17 @@ void PGBackend::rollback(
 }
 
 
+void PGBackend::try_stash(
+  const hobject_t &hoid,
+  version_t v,
+  ObjectStore::Transaction *t)
+{
+  t->try_rename(
+    coll,
+    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+    ghobject_t(hoid, v, get_parent()->whoami_shard().shard));
+}
+
 void PGBackend::on_change_cleanup(ObjectStore::Transaction *t)
 {
   dout(10) << __func__ << dendl;
@@ -253,6 +270,20 @@ void PGBackend::rollback_stash(
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
 }
 
+void PGBackend::rollback_try_stash(
+  const hobject_t &hoid,
+  version_t old_version,
+  ObjectStore::Transaction *t) {
+  assert(!hoid.is_temp());
+  t->remove(
+    coll,
+    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+  t->try_rename(
+    coll,
+    ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard),
+    ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+}
+
 void PGBackend::rollback_create(
   const hobject_t &hoid,
   ObjectStore::Transaction *t) {
index 1af8a0c4c867aad8befd79ed2b38afb9c7de377f..4ba705b66f2a35caa084523bc0b62b5105767557 100644 (file)
@@ -496,6 +496,11 @@ struct shard_info_wrapper;
      ) = 0;
 
 
+   void try_stash(
+     const hobject_t &hoid,
+     version_t v,
+     ObjectStore::Transaction *t);
+
    void rollback(
      const hobject_t &hoid,
      const ObjectModDesc &desc,
@@ -519,6 +524,12 @@ struct shard_info_wrapper;
      version_t old_version,
      ObjectStore::Transaction *t);
 
+   /// Unstash object to rollback stash
+   void rollback_try_stash(
+     const hobject_t &hoid,
+     version_t old_version,
+     ObjectStore::Transaction *t);
+
    /// Delete object to rollback create
    void rollback_create(
      const hobject_t &hoid,
index 3575b0393c8a92edb7a7c10d386e3e1b582b3282..e8f2a3ac49ef36e358d4bb6018ce70276b9a8055 100644 (file)
@@ -570,6 +570,7 @@ void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead
 
 void PGLog::append_log_entries_update_missing(
   const hobject_t &last_backfill,
+  bool last_backfill_bitwise,
   const list<pg_log_entry_t> &entries,
   IndexedLog *log,
   pg_missing_t &missing,
@@ -589,12 +590,20 @@ void PGLog::append_log_entries_update_missing(
       ldpp_dout(dpp, 20) << "update missing, append " << ne << dendl;
       log->index(ne);
     }
-    if (p->soid <= last_backfill) {
+    if (cmp(p->soid, last_backfill, last_backfill_bitwise) <= 0) {
       missing.add_next_event(*p);
-      if (p->is_delete() && rollbacker)
-       rollbacker->remove(p->soid);
+      if (rollbacker) {
+       // hack to match PG::mark_all_unfound_lost
+       if (p->is_lost_delete() && p->mod_desc.can_rollback()) {
+         rollbacker->try_stash(p->soid, p->version.version);
+       } else if (p->is_delete()) {
+         rollbacker->remove(p->soid);
+       }
+      }
     }
   }
+  if (log)
+    log->reset_rollback_info_trimmed_to_riter();
 }
 
 void PGLog::merge_log(ObjectStore::Transaction& t,
@@ -708,6 +717,7 @@ void PGLog::merge_log(ObjectStore::Transaction& t,
     entries.splice(entries.end(), olog.log, from, to);
     append_log_entries_update_missing(
       info.last_backfill,
+      info.last_backfill_bitwise,
       entries,
       &log,
       missing,
index 66adfa2ab9eec2ed10158e61d26e3e4fc30c64ba..7740b3aaa90e51bcd9ec44c7c0f2b59d6ad35842 100644 (file)
@@ -48,6 +48,9 @@ struct PGLog : DoutPrefixProvider {
       const pg_log_entry_t &entry) = 0;
     virtual void remove(
       const hobject_t &hoid) = 0;
+    virtual void try_stash(
+      const hobject_t &entry,
+      version_t v) = 0;
     virtual void trim(
       const pg_log_entry_t &entry) = 0;
     virtual ~LogEntryHandler() {}
@@ -231,6 +234,13 @@ struct PGLog : DoutPrefixProvider {
         ++rollback_info_trimmed_to_riter;
     }
 
+    void reset_rollback_info_trimmed_to_riter() {
+      rollback_info_trimmed_to_riter = log.rbegin();
+      while (rollback_info_trimmed_to_riter != log.rend() &&
+            rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to)
+       ++rollback_info_trimmed_to_riter;
+    }
+
     // indexes objects, caller ops and extra caller ops
     void index() {
       objects.clear();
@@ -257,7 +267,7 @@ struct PGLog : DoutPrefixProvider {
         
       reset_riter();
       indexed_data = PGLOG_INDEXED_ALL;
-        
+      reset_rollback_info_trimmed_to_riter();
     }
 
     void index_objects() const {
@@ -269,7 +279,6 @@ struct PGLog : DoutPrefixProvider {
        }
  
       indexed_data |= PGLOG_INDEXED_OBJECTS;
-
     }
 
     void index_caller_ops() const {
@@ -767,6 +776,7 @@ public:
 
   static void append_log_entries_update_missing(
     const hobject_t &last_backfill,
+    bool last_backfill_bitwise,
     const list<pg_log_entry_t> &entries,
     IndexedLog *log,
     pg_missing_t &missing,
@@ -774,15 +784,20 @@ public:
     const DoutPrefixProvider *dpp);
   void append_new_log_entries(
     const hobject_t &last_backfill,
+    bool last_backfill_bitwise,
     const list<pg_log_entry_t> &entries,
     LogEntryHandler *rollbacker) {
     append_log_entries_update_missing(
       last_backfill,
+      last_backfill_bitwise,
       entries,
       &log,
       missing,
       rollbacker,
       this);
+    if (!entries.empty()) {
+      mark_writeout_from(entries.begin()->version);
+    }
   }
 
   void write_log(ObjectStore::Transaction& t,
index 064c7726e69737bccb4b47edd1d8cfea4d844aab..00343bcb89732ae6d6d63dc8b6cc5556f520c7f9 100644 (file)
@@ -49,6 +49,7 @@
 #include "messages/MOSDPGPushReply.h"
 #include "messages/MOSDPGUpdateLogMissing.h"
 #include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MCommandReply.h"
 
 #include "Watch.h"
 
@@ -685,8 +686,13 @@ int ReplicatedPG::get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilt
 
 // ==========================================================
 
-int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
-                            bufferlist& idata, bufferlist& odata)
+int ReplicatedPG::do_command(
+  cmdmap_t cmdmap,
+  ostream& ss,
+  bufferlist& idata,
+  bufferlist& odata,
+  ConnectionRef con,
+  ceph_tid_t tid)
 {
   const pg_missing_t &missing = pg_log.get_missing();
   string prefix;
@@ -791,10 +797,8 @@ int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
       return -EINVAL;
     }
 
-    ss << "pg has " << unfound
-       << " objects unfound and apparently lost, marking";
-    mark_all_unfound_lost(mode);
-    return 0;
+    mark_all_unfound_lost(mode, con, tid);
+    return -EAGAIN;
   }
   else if (command == "list_missing") {
     hobject_t offset;
@@ -8461,6 +8465,26 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(
 
   return repop;
 }
+
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(
+  ObcLockManager &&manager,
+  boost::optional<std::function<void(void)> > &&on_complete)
+{
+  RepGather *repop = new RepGather(
+    std::move(manager),
+    std::move(on_complete),
+    osd->get_tid(),
+    info.last_complete);
+
+  repop->start = ceph_clock_now(cct);
+
+  repop_queue.push_back(&repop->queue_item);
+  repop->get();
+
+  osd->logger->inc(l_osd_op_wip);
+
+  return repop;
+}
  
 void ReplicatedPG::remove_repop(RepGather *repop)
 {
@@ -8500,6 +8524,123 @@ void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx)
   repop->put();
 }
 
+
+void ReplicatedPG::submit_log_entries(
+  const list<pg_log_entry_t> &entries,
+  ObcLockManager &&manager,
+  boost::optional<std::function<void(void)> > &&on_complete)
+{
+  dout(10) << __func__ << entries << dendl;
+  assert(is_primary());
+
+  ObjectStore::Transaction t;
+
+  eversion_t old_last_update = info.last_update;
+  merge_new_log_entries(entries, t);
+
+  boost::intrusive_ptr<RepGather> repop;
+  set<pg_shard_t> waiting_on;
+  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+    repop = new_repop(
+      std::move(manager),
+      std::move(on_complete));
+  }
+  for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
+       i != actingbackfill.end();
+       ++i) {
+    pg_shard_t peer(*i);
+    if (peer == pg_whoami) continue;
+    assert(peer_missing.count(peer));
+    assert(peer_info.count(peer));
+    if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+      assert(repop);
+      MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
+       entries,
+       spg_t(info.pgid.pgid, i->shard),
+       pg_whoami.shard,
+       get_osdmap()->get_epoch(),
+       repop->rep_tid);
+      osd->send_message_osd_cluster(
+       peer.osd, m, get_osdmap()->get_epoch());
+      waiting_on.insert(peer);
+    } else {
+      MOSDPGLog *m = new MOSDPGLog(
+       peer.shard, pg_whoami.shard,
+       info.last_update.epoch,
+       info);
+      m->log.log = entries;
+      m->log.tail = old_last_update;
+      m->log.head = info.last_update;
+      osd->send_message_osd_cluster(
+       peer.osd, m, get_osdmap()->get_epoch());
+    }
+  }
+  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+    ceph_tid_t rep_tid = repop->rep_tid;
+    waiting_on.insert(pg_whoami);
+    log_entry_update_waiting_on.insert(
+      make_pair(
+       rep_tid,
+       LogUpdateCtx{std::move(repop), std::move(waiting_on)}
+       ));
+    struct OnComplete : public Context {
+      ReplicatedPGRef pg;
+      ceph_tid_t rep_tid;
+      epoch_t epoch;
+      OnComplete(
+       ReplicatedPGRef pg,
+       ceph_tid_t rep_tid,
+       epoch_t epoch)
+       : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
+      void finish(int) override {
+       pg->lock();
+       if (!pg->pg_has_reset_since(epoch)) {
+         auto it = pg->log_entry_update_waiting_on.find(rep_tid);
+         assert(it != pg->log_entry_update_waiting_on.end());
+         auto it2 = it->second.waiting_on.find(pg->pg_whoami);
+         assert(it2 != it->second.waiting_on.end());
+         it->second.waiting_on.erase(it2);
+         if (it->second.waiting_on.empty()) {
+           pg->repop_all_applied(it->second.repop.get());
+           pg->repop_all_committed(it->second.repop.get());
+           pg->log_entry_update_waiting_on.erase(it);
+         }
+       }
+       pg->unlock();
+      }
+    };
+    t.register_on_complete(
+      new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
+  } else {
+    if (on_complete) {
+      struct OnComplete : public Context {
+       ReplicatedPGRef pg;
+       std::function<void(void)> on_complete;
+       epoch_t epoch;
+       OnComplete(
+         ReplicatedPGRef pg,
+         std::function<void(void)> &&on_complete,
+         epoch_t epoch)
+         : pg(pg),
+           on_complete(std::move(on_complete)),
+           epoch(epoch) {}
+       void finish(int) override {
+         pg->lock();
+         if (!pg->pg_has_reset_since(epoch))
+           on_complete();
+         pg->unlock();
+       }
+      };
+      t.register_on_complete(
+       new OnComplete{
+         this, std::move(*on_complete), get_osdmap()->get_epoch()
+         });
+    }
+  }
+  int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
+  assert(r == 0);
+}
+
 // -------------------------------------------------------
 
 void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers)
@@ -8658,11 +8799,12 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
     ctx->log.back().mod_desc.mark_unrollbackable();
   }
 
-  // no ctx->delta_stats
-  simple_opc_submit(std::move(ctx));
 
   // apply new object state.
   ctx->obc->obs = ctx->new_obs;
+
+  // no ctx->delta_stats
+  simple_opc_submit(std::move(ctx));
 }
 
 ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi,
@@ -9465,26 +9607,81 @@ ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
   return obc;
 }
 
-struct C_PG_MarkUnfoundLost : public Context {
-  ReplicatedPGRef pg;
-  list<ObjectContextRef> obcs;
-  explicit C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {}
-  void finish(int r) {
-    pg->_finish_mark_all_unfound_lost(obcs);
-  }
-};
-
 void ReplicatedPG::do_update_log_missing(OpRequestRef &op)
 {
+  MOSDPGUpdateLogMissing *m = static_cast<MOSDPGUpdateLogMissing*>(
+    op->get_req());
+  assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+  ObjectStore::Transaction t;
+  append_log_entries_update_missing(m->entries, t);
+  // TODO FIX
+
+  Context *c = new FunctionContext(
+      [=](int) {
+       MOSDPGUpdateLogMissing *msg =
+         static_cast<MOSDPGUpdateLogMissing*>(
+           op->get_req());
+       MOSDPGUpdateLogMissingReply *reply =
+         new MOSDPGUpdateLogMissingReply(
+           spg_t(info.pgid.pgid, primary_shard().shard),
+           pg_whoami.shard,
+           msg->get_epoch(),
+           msg->get_tid());
+       reply->set_priority(CEPH_MSG_PRIO_HIGH);
+       msg->get_connection()->send_message(reply);
+      });
+
+  /* Hack to work around the fact that ReplicatedBackend sends
+   * ack+commit if commit happens first */
+  if (pool.info.ec_pool()) {
+    t.register_on_complete(c);
+  } else {
+    t.register_on_commit(c);
+  }
+  int tr = osd->store->queue_transaction(
+    osr.get(),
+    std::move(t),
+    nullptr);
+  assert(tr == 0);
 }
 
 void ReplicatedPG::do_update_log_missing_reply(OpRequestRef &op)
 {
+  MOSDPGUpdateLogMissingReply *m =
+    static_cast<MOSDPGUpdateLogMissingReply*>(
+    op->get_req());
+  dout(20) << __func__ << " got reply from "
+          << m->get_from() << dendl;
+
+  auto it = log_entry_update_waiting_on.find(m->get_tid());
+  if (it != log_entry_update_waiting_on.end()) {
+    if (it->second.waiting_on.count(m->get_from())) {
+      it->second.waiting_on.erase(m->get_from());
+    } else {
+      osd->clog->error()
+       << info.pgid << " got reply "
+       << *m << " from shard we are not waiting for "
+       << m->get_from();
+    }
+
+    if (it->second.waiting_on.empty()) {
+      repop_all_applied(it->second.repop.get());
+      repop_all_committed(it->second.repop.get());
+      log_entry_update_waiting_on.erase(it);
+    }
+  } else {
+    osd->clog->error()
+      << info.pgid << " got reply "
+      << *m << " on unknown tid " << m->get_tid();
+  }
 }
 
 /* Mark all unfound objects as lost.
  */
-void ReplicatedPG::mark_all_unfound_lost(int what)
+void ReplicatedPG::mark_all_unfound_lost(
+  int what,
+  ConnectionRef con,
+  ceph_tid_t tid)
 {
   dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
 
@@ -9492,16 +9689,18 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
   pg_log.get_log().print(*_dout);
   *_dout << dendl;
 
-  ObjectStore::Transaction t;
-  C_PG_MarkUnfoundLost *c = new C_PG_MarkUnfoundLost(this);
+  list<pg_log_entry_t> log_entries;
 
   utime_t mtime = ceph_clock_now(cct);
-  info.last_update.epoch = get_osdmap()->get_epoch();
-  const pg_missing_t &missing = pg_log.get_missing();
   map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator m =
     missing_loc.get_needs_recovery().begin();
   map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator mend =
     missing_loc.get_needs_recovery().end();
+
+  ObcLockManager manager;
+  eversion_t v = info.last_update;
+  v.epoch = get_osdmap()->get_epoch();
+  unsigned num_unfound = missing_loc.num_unfound();
   while (m != mend) {
     const hobject_t &oid(m->first);
     if (!missing_loc.is_unfound(oid)) {
@@ -9515,47 +9714,43 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
 
     switch (what) {
     case pg_log_entry_t::LOST_MARK:
-      obc = mark_object_lost(&t, oid, m->second.need, mtime, pg_log_entry_t::LOST_MARK);
-      pg_log.missing_got(m++);
       assert(0 == "actually, not implemented yet!");
-      // we need to be careful about how this is handled on the replica!
       break;
 
     case pg_log_entry_t::LOST_REVERT:
       prev = pick_newest_available(oid);
       if (prev > eversion_t()) {
        // log it
-       ++info.last_update.version;
+       ++v.version;
        pg_log_entry_t e(
-         pg_log_entry_t::LOST_REVERT, oid, info.last_update,
+         pg_log_entry_t::LOST_REVERT, oid, v,
          m->second.need, 0, osd_reqid_t(), mtime);
        e.reverting_to = prev;
-       pg_log.add(e);
+       e.mod_desc.mark_unrollbackable();
+       log_entries.push_back(e);
        dout(10) << e << dendl;
 
        // we are now missing the new version; recovery code will sort it out.
        ++m;
-       pg_log.revise_need(oid, info.last_update);
-       missing_loc.revise_need(oid, info.last_update);
        break;
       }
-      /** fall-thru **/
 
     case pg_log_entry_t::LOST_DELETE:
       {
-       // log it
-       ++info.last_update.version;
-       pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, info.last_update, m->second.need,
+       ++v.version;
+       pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
                     0, osd_reqid_t(), mtime);
-       pg_log.add(e);
+       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+         if (pool.info.require_rollback()) {
+           e.mod_desc.try_rmobject(v.version);
+         } else {
+           e.mod_desc.mark_unrollbackable();
+         }
+       } // otherwise, just do what we used to do
        dout(10) << e << dendl;
+       log_entries.push_back(e);
 
-       t.remove(
-         coll,
-         ghobject_t(oid, ghobject_t::NO_GEN, pg_whoami.shard));
-       pg_log.missing_add_event(e);
        ++m;
-       missing_loc.recovered(oid);
       }
       break;
 
@@ -9563,47 +9758,51 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
       assert(0);
     }
 
-    if (obc)
-      c->obcs.push_back(obc);
+    if (obc) {
+      bool got = manager.get_lock_type(
+       ObjectContext::RWState::RWEXCL,
+       oid,
+       obc,
+       OpRequestRef());
+      if (!got) {
+       assert(0 == "Couldn't lock unfound object?");
+      }
+    }
   }
 
-  dout(30) << __func__ << ": log after:\n";
-  pg_log.get_log().print(*_dout);
-  *_dout << dendl;
-
   info.stats.stats_invalid = true;
 
-  if (missing.num_missing() == 0) {
-    // advance last_complete since nothing else is missing!
-    info.last_complete = info.last_update;
-  }
-
-  dirty_info = true;
-  write_if_dirty(t);
-
-  
-  osd->store->queue_transaction(osr.get(), std::move(t), c, NULL, 
-                            new C_OSD_OndiskWriteUnlockList(&c->obcs));
-             
-  // Send out the PG log to all replicas
-  // So that they know what is lost
-  share_pg_log();
-
-  // queue ourselves so that we push the (now-lost) object_infos to replicas.
-  osd->queue_for_recovery(this);
-}
-
-void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs)
-{
-  lock();
-  dout(10) << "_finish_mark_all_unfound_lost " << dendl;
-
-  if (!deleting)
-    requeue_ops(waiting_for_all_missing);
-  waiting_for_all_missing.clear();
-
-  obcs.clear();
-  unlock();
+  struct OnComplete {
+    ReplicatedPG *pg;
+    std::function<void(void)> on_complete;
+    void operator()() {
+      pg->requeue_ops(pg->waiting_for_all_missing);
+      pg->waiting_for_all_missing.clear();
+      pg->osd->queue_for_recovery(pg);
+    }
+  };
+  submit_log_entries(
+    log_entries,
+    std::move(manager),
+    boost::optional<std::function<void(void)> >(
+      [=]() {
+       requeue_ops(waiting_for_all_missing);
+       waiting_for_all_missing.clear();
+       osd->queue_for_recovery(this);
+
+       stringstream ss;
+       ss << "pg has " << num_unfound
+          << " objects unfound and apparently lost marking";
+       string rs = ss.str();
+       dout(0) << "do_command r=" << 0 << " " << rs << dendl;
+       osd->clog->info() << rs << "\n";
+       if (con) {
+         MCommandReply *reply = new MCommandReply(0, rs);
+         reply->set_tid(tid);
+         con->send_message(reply);
+       }
+      }
+      ));
 }
 
 void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)
index 53cb12ecfc3931755fdc68cfbfea3f349001709e..aece2f100e16d5d0ad378518332e05037a91f13a 100644 (file)
@@ -737,6 +737,23 @@ public:
       on_success(std::move(c->on_success)),
       on_finish(std::move(c->on_finish)) {}
 
+    RepGather(
+      ObcLockManager &&manager,
+      boost::optional<std::function<void(void)> > &&on_complete,
+      ceph_tid_t rt,
+      eversion_t lc) :
+      queue_item(this),
+      nref(1),
+      rep_tid(rt),
+      rep_aborted(false), rep_done(false),
+      all_applied(false), all_committed(false),
+      pg_local_last_complete(lc),
+      lock_manager(std::move(manager)) {
+      if (on_complete) {
+       on_success.push_back(std::move(*on_complete));
+      }
+    }
+
     RepGather *get() {
       nref++;
       return this;
@@ -866,11 +883,29 @@ protected:
     OpContext *ctx,
     ObjectContextRef obc,
     ceph_tid_t rep_tid);
+  RepGather *new_repop(
+    ObcLockManager &&manager,
+    boost::optional<std::function<void(void)> > &&on_complete);
   void remove_repop(RepGather *repop);
 
   OpContextUPtr simple_opc_create(ObjectContextRef obc);
   void simple_opc_submit(OpContextUPtr ctx);
 
+  /**
+   * Merge entries atomically into all actingbackfill osds
+   * adjusting missing and recovery state as necessary
+   */
+  void submit_log_entries(
+    const list<pg_log_entry_t> &entries,
+    ObcLockManager &&manager,
+    boost::optional<std::function<void(void)> > &&on_complete);
+  struct LogUpdateCtx {
+    boost::intrusive_ptr<RepGather> repop;
+    set<pg_shard_t> waiting_on;
+  };
+  map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
+
+
   // hot/cold tracking
   HitSetRef hit_set;        ///< currently accumulating HitSet
   utime_t hit_set_start_stamp;    ///< time the current HitSet started recording
@@ -1426,8 +1461,13 @@ public:
               const PGPool &_pool, spg_t p);
   ~ReplicatedPG() {}
 
-  int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata,
-                bufferlist& odata);
+  int do_command(
+    cmdmap_t cmdmap,
+    ostream& ss,
+    bufferlist& idata,
+    bufferlist& odata,
+    ConnectionRef conn,
+    ceph_tid_t tid) override;
 
   void do_request(
     OpRequestRef& op,
@@ -1583,7 +1623,10 @@ public:
   void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
   void kick_object_context_blocked(ObjectContextRef obc);
 
-  void mark_all_unfound_lost(int what);
+  void mark_all_unfound_lost(
+    int what,
+    ConnectionRef con,
+    ceph_tid_t tid);
   eversion_t pick_newest_available(const hobject_t& oid);
   ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,
                                  const hobject_t& oid, eversion_t version,
@@ -1594,7 +1637,6 @@ public:
 
   void do_update_log_missing_reply(
     OpRequestRef &op);
-  void _finish_mark_all_unfound_lost(list<ObjectContextRef>& obcs);
 
   void on_role_change();
   void on_pool_change();
index 6f26284b69c5f2b89ea1ff5e753ec984450ef24f..eaf7e3ae9b96868a309eecb196b5b944ac1c1e0e 100644 (file)
@@ -3272,6 +3272,12 @@ void ObjectModDesc::visit(Visitor *visitor) const
        visitor->update_snaps(snaps);
        break;
       }
+      case TRY_DELETE: {
+       version_t old_version;
+       ::decode(old_version, bp);
+       visitor->try_rmobject(old_version);
+       break;
+      }
       default:
        assert(0 == "Invalid rollback code");
       }
index 39500d6b473b4c59d3bfe769bad66ac9c20dbe2a..49b4e2bd8ec38d5b556d8cd139929b2fd0c98129 100644 (file)
@@ -2360,6 +2360,15 @@ public:
     virtual void append(uint64_t old_offset) {}
     virtual void setattrs(map<string, boost::optional<bufferlist> > &attrs) {}
     virtual void rmobject(version_t old_version) {}
+    /**
+     * Used to support the unfound_lost_delete log event: if the stashed
+     * version exists, we unstash it, otherwise, we do nothing.  This way
+     * each replica rolls back to whatever state it had prior to the attempt
+     * at mark unfound lost delete
+     */
+    virtual void try_rmobject(version_t old_version) {
+      rmobject(old_version);
+    }
     virtual void create() {}
     virtual void update_snaps(set<snapid_t> &old_snaps) {}
     virtual ~Visitor() {}
@@ -2371,7 +2380,8 @@ public:
     SETATTRS = 2,
     DELETE = 3,
     CREATE = 4,
-    UPDATE_SNAPS = 5
+    UPDATE_SNAPS = 5,
+    TRY_DELETE = 6
   };
   ObjectModDesc() : can_local_rollback(true), rollback_info_completed(false) {}
   void claim(ObjectModDesc &other) {
@@ -2431,6 +2441,16 @@ public:
     rollback_info_completed = true;
     return true;
   }
+  bool try_rmobject(version_t deletion_version) {
+    if (!can_local_rollback || rollback_info_completed)
+      return false;
+    ENCODE_START(1, 1, bl);
+    append_id(TRY_DELETE);
+    ::encode(deletion_version, bl);
+    ENCODE_FINISH(bl);
+    rollback_info_completed = true;
+    return true;
+  }
   void create() {
     if (!can_local_rollback || rollback_info_completed)
       return;