]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG,OSD: prevent pg from completing peering until deletion is complete
authorSamuel Just <sam.just@inktank.com>
Mon, 18 Jun 2012 19:52:06 +0000 (12:52 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:15:00 +0000 (10:15 -0700)
hobject_t must now be globally unique in the filestore.  Thus, if we
start creating objects in a pg before the removal collections for the
previous incarnation are fully removed, we might end up a second
instance of the same hobject violating the filestore rules.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/common/sharedptr_registry.hpp
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/osd_types.cc
src/osd/osd_types.h

index 729b74dd9f4535e54dc221706c0b21dccb816820..bd0c656024d2f27afc1d3bec44f95d2443f54e64 100644 (file)
@@ -52,8 +52,41 @@ class SharedPtrRegistry {
 public:
   SharedPtrRegistry() : lock("SharedPtrRegistry::lock") {}
 
+  VPtr lookup(const K &key) {
+    Mutex::Locker l(lock);
+    while (1) {
+      if (contents.count(key)) {
+       VPtr retval = contents[key].lock();
+       if (retval)
+         return retval;
+      } else {
+       break;
+      }
+      cond.Wait(lock);
+    Mutex::Locker l(lock);
+    }
+    return VPtr();
+  }
+
+  VPtr lookup_or_create(const K &key) {
+    Mutex::Locker l(lock);
+    while (1) {
+      if (contents.count(key)) {
+       VPtr retval = contents[key].lock();
+       if (retval)
+         return retval;
+      } else {
+       break;
+      }
+      cond.Wait(lock);
+    }
+    VPtr retval(new V(), OnRemoval(this, key));
+    contents[key] = retval;
+    return retval;
+  }
+
   template<class A>
-  VPtr lookup(const K &key, const A &arg) {
+  VPtr lookup_or_create(const K &key, const A &arg) {
     Mutex::Locker l(lock);
     while (1) {
       if (contents.count(key)) {
index fccc03ed8e6e5cd2ce3b85037894f2a6cc0f22d0..3bb2b0ab87939872538fa95fb780e2231305cd42 100644 (file)
@@ -1334,11 +1334,15 @@ void OSD::load_pgs()
        continue;
       }
       uint64_t seq;
-      if (it->is_removal(&seq)) {
+      if (it->is_removal(&seq, &pgid)) {
        if (seq >= next_removal_seq)
          next_removal_seq = seq + 1;
-       pair<coll_t, SequencerRef> *to_queue = new pair<coll_t, SequencerRef>;
-       to_queue->first = *it;
+       boost::tuple<coll_t, SequencerRef, DeletingStateRef> *to_queue =
+         new boost::tuple<coll_t, SequencerRef, DeletingStateRef>;
+       to_queue->get<0>() = *it;
+       to_queue->get<1>() = service.osr_registry.lookup_or_create(
+         pgid, stringify(pgid));
+       to_queue->get<2>() = service.deleting_pgs.lookup_or_create(pgid);
        remove_wq.queue(to_queue);
        continue;
       }
@@ -1952,11 +1956,12 @@ void OSD::dump_ops_in_flight(ostream& ss)
 }
 
 // =========================================
-void OSD::RemoveWQ::_process(pair<coll_t, SequencerRef> *item)
+void OSD::RemoveWQ::_process(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item)
 {
-  coll_t &coll = item->first;
-  ObjectStore::Sequencer *osr = item->second.get();
-  store->flush();
+  coll_t &coll = item->get<0>();
+  ObjectStore::Sequencer *osr = item->get<1>().get();
+  if (osr)
+    osr->flush();
   vector<hobject_t> olist;
   store->collection_list(coll, olist);
   //*_dout << "OSD::RemoveWQ::_process removing coll " << coll << std::endl;
@@ -1968,7 +1973,7 @@ void OSD::RemoveWQ::_process(pair<coll_t, SequencerRef> *item)
     if (num % 20 == 0) {
       store->queue_transaction(
        osr, t,
-       new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+       new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->get<1>()));
       t = new ObjectStore::Transaction;
     }
     t->remove(coll, *i);
@@ -1976,7 +1981,7 @@ void OSD::RemoveWQ::_process(pair<coll_t, SequencerRef> *item)
   t->remove_collection(coll);
   store->queue_transaction(
     osr, t,
-    new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+    new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->get<1>()));
   delete item;
 }
 // =========================================
@@ -4683,16 +4688,16 @@ void OSD::_remove_pg(PG *pg)
     for (snapid_t cur = p.get_start();
         cur < p.get_start() + p.get_len();
         ++cur) {
-      coll_t to_remove = get_next_removal_coll();
+      coll_t to_remove = get_next_removal_coll(pg->info.pgid);
       removals.push_back(to_remove);
       rmt->collection_rename(coll_t(pg->info.pgid, cur), to_remove);
     }
   }
-  coll_t to_remove = get_next_removal_coll();
+  coll_t to_remove = get_next_removal_coll(pg->info.pgid);
   removals.push_back(to_remove);
   rmt->collection_rename(coll_t(pg->info.pgid), to_remove);
   if (pg->have_temp_coll()) {
-    to_remove = get_next_removal_coll();
+    to_remove = get_next_removal_coll(pg->info.pgid);
     removals.push_back(to_remove);
     rmt->collection_rename(pg->get_temp_coll(), to_remove);
   }
@@ -4710,10 +4715,12 @@ void OSD::_remove_pg(PG *pg)
   // and handle_notify_timeout
   pg->on_removal();
 
+  DeletingStateRef deleting = service.deleting_pgs.lookup_or_create(pg->info.pgid);
   for (vector<coll_t>::iterator i = removals.begin();
        i != removals.end();
        ++i) {
-    remove_wq.queue(new pair<coll_t, SequencerRef>(*i, pg->osr));
+    remove_wq.queue(new boost::tuple<coll_t, SequencerRef, DeletingStateRef>(
+                     *i, pg->osr, deleting));
   }
 
   recovery_wq.dequeue(pg);
index 11793fa15ba7b88d55a30e6bbcc9aae887522926..e3d24624ecdeb360664a3948e3e0ab9e737ce45f 100644 (file)
@@ -15,6 +15,8 @@
 #ifndef CEPH_OSD_H
 #define CEPH_OSD_H
 
+#include "boost/tuple/tuple.hpp"
+
 #include "PG.h"
 
 #include "msg/Dispatcher.h"
@@ -131,11 +133,32 @@ extern const coll_t meta_coll;
 
 typedef std::tr1::shared_ptr<ObjectStore::Sequencer> SequencerRef;
 
+class DeletingState {
+  Mutex lock;
+  list<Context *> on_deletion_complete;
+public:
+  DeletingState() : lock("DeletingState::lock") {}
+  void register_on_delete(Context *completion) {
+    Mutex::Locker l(lock);
+    on_deletion_complete.push_front(completion);
+  }
+  ~DeletingState() {
+    Mutex::Locker l(lock);
+    for (list<Context *>::iterator i = on_deletion_complete.begin();
+        i != on_deletion_complete.end();
+        ++i) {
+      (*i)->complete(0);
+    }
+  }
+};
+typedef std::tr1::shared_ptr<DeletingState> DeletingStateRef;
+
 class OSD;
 class OSDService {
 public:
   OSD *osd;
   SharedPtrRegistry<pg_t, ObjectStore::Sequencer> osr_registry;
+  SharedPtrRegistry<pg_t, DeletingState> deleting_pgs;
   const int whoami;
   ObjectStore *&store;
   LogClient &clog;
@@ -1137,31 +1160,31 @@ protected:
   } rep_scrub_wq;
 
   // -- removing --
-  struct RemoveWQ : public ThreadPool::WorkQueue<pair<coll_t, SequencerRef> > {
+  struct RemoveWQ : public ThreadPool::WorkQueue<boost::tuple<coll_t, SequencerRef, DeletingStateRef> > {
     ObjectStore *&store;
-    list<pair<coll_t, SequencerRef> *> remove_queue;
+    list<boost::tuple<coll_t, SequencerRef, DeletingStateRef> *> remove_queue;
     RemoveWQ(ObjectStore *&o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<pair<coll_t, SequencerRef> >("OSD::RemoveWQ", ti, 0, tp),
+      : ThreadPool::WorkQueue<boost::tuple<coll_t, SequencerRef, DeletingStateRef> >("OSD::RemoveWQ", ti, 0, tp),
        store(o) {}
 
     bool _empty() {
       return remove_queue.empty();
     }
-    bool _enqueue(pair<coll_t, SequencerRef> *item) {
+    bool _enqueue(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item) {
       remove_queue.push_back(item);
       return true;
     }
-    void _dequeue(pair<coll_t, SequencerRef> *item) {
+    void _dequeue(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item) {
       assert(0);
     }
-    pair<coll_t, SequencerRef> *_dequeue() {
+    boost::tuple<coll_t, SequencerRef, DeletingStateRef> *_dequeue() {
       if (remove_queue.empty())
        return NULL;
-      pair<coll_t, SequencerRef> *item = remove_queue.front();
+      boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item = remove_queue.front();
       remove_queue.pop_front();
       return item;
     }
-    void _process(pair<coll_t, SequencerRef> *item);
+    void _process(boost::tuple<coll_t, SequencerRef, DeletingStateRef> *item);
     void _clear() {
       while (!remove_queue.empty()) {
        delete remove_queue.front();
@@ -1170,8 +1193,8 @@ protected:
     }
   } remove_wq;
   uint64_t next_removal_seq;
-  coll_t get_next_removal_coll() {
-    return coll_t::make_removal_coll(next_removal_seq++);
+  coll_t get_next_removal_coll(pg_t pgid) {
+    return coll_t::make_removal_coll(next_removal_seq++, pgid);
   }
 
  private:
index 9f651a2564477f66fe8bc04a643ac11157a0aafd..bb393cb6c0663eab4a6ff3e422380c727b82dafb 100644 (file)
@@ -58,7 +58,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   backfill_target(-1),
   pg_stats_lock("PG::pg_stats_lock"),
   pg_stats_valid(false),
-  osr(osd->osr_registry.lookup(p, (stringify(p)))),
+  osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
   finish_sync_event(NULL),
   finalizing_scrub(false),
   scrub_block_writes(false),
@@ -3710,6 +3710,9 @@ void PG::start_flush(ObjectStore::Transaction *t,
   flushed = false;
   on_applied->push_back(new ContainerContext<FlushStateRef>(flush_trigger));
   on_safe->push_back(new ContainerContext<FlushStateRef>(flush_trigger));
+  DeletingStateRef del = osd->deleting_pgs.lookup(info.pgid);
+  if (del)
+    del->register_on_delete(new ContainerContext<FlushStateRef>(flush_trigger));
 }
 
 /* Called before initializing peering during advance_map */
index bc40d1b432916d0705df7934a173626e8e5ddbac..846e40fa92ee5c731b8254340f7b62b2f4ddb723 100644 (file)
@@ -280,13 +280,22 @@ bool coll_t::is_pg(pg_t& pgid, snapid_t& snap) const
   return true;
 }
 
-bool coll_t::is_removal(uint64_t *seq) const
+bool coll_t::is_removal(uint64_t *seq, pg_t *pgid) const
 {
   if (str.substr(0, 12) != string("FORREMOVAL_"))
     return false;
 
   stringstream ss(str.substr(12));
   ss >> *seq;
+  char sep;
+  ss >> sep;
+  assert(sep == '_');
+  string pgid_str;
+  ss >> pgid_str;
+  if (!pgid->parse(pgid_str.c_str())) {
+    assert(0);
+    return false;
+  }
   return true;
 }
 
index 35bc0a3bcb1f91a27854cb6906384c1853fb0307..b144d3909944df32d5e1324410d504b8601a071e 100644 (file)
@@ -323,8 +323,8 @@ public:
     return coll_t(pg_to_tmp_str(pgid));
   }
 
-  static coll_t make_removal_coll(uint64_t seq) {
-    return coll_t(seq_to_removal_str(seq));
+  static coll_t make_removal_coll(uint64_t seq, pg_t pgid) {
+    return coll_t(seq_to_removal_str(seq, pgid));
   }
 
   const std::string& to_str() const {
@@ -341,7 +341,7 @@ public:
 
   bool is_pg(pg_t& pgid, snapid_t& snap) const;
   bool is_temp(pg_t& pgid) const;
-  bool is_removal(uint64_t *seq) const;
+  bool is_removal(uint64_t *seq, pg_t *pgid) const;
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& bl);
   inline bool operator==(const coll_t& rhs) const {
@@ -365,9 +365,9 @@ private:
     oss << p << "_TEMP";
     return oss.str();
   }
-  static std::string seq_to_removal_str(uint64_t seq) {
+  static std::string seq_to_removal_str(uint64_t seq, pg_t pgid) {
     std::ostringstream oss;
-    oss << "FORREMOVAL_" << seq;
+    oss << "FORREMOVAL_" << seq << "_" << pgid;
     return oss.str();
   }