]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,PG: clean up pg removal
authorSamuel Just <sam.just@inktank.com>
Fri, 29 Jun 2012 21:11:07 +0000 (14:11 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:15:00 +0000 (10:15 -0700)
PG opsequencers will be used for removing a pg.  If the pg is recreated
before the removal is complete, we need the new pg incarnation to be
able to inherit the osr of its predecessor.

Previously, we queued the pg for removal and only rendered it unusable
after the contents were fully removed.  Now, we syncronously remove it
from the map and queue a transaction renaming the collections.  We then
asyncronously clean up those collections.  If the pg is recreated, it
will inherit the same osr until the cleanup is complete ensuring correct
op ordering with respect to the collection rename.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/Makefile.am
src/common/sharedptr_registry.hpp [new file with mode: 0644]
src/os/ObjectStore.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.cc
src/osd/osd_types.h

index e3e9cf9ad165101f33dfe473e34220ba5daac30c..8702f3cc6d42f66a328fc24fde9096a9a52c8f68 100644 (file)
@@ -1265,6 +1265,7 @@ noinst_HEADERS = \
        common/admin_socket_client.h \
        common/shared_cache.hpp \
        common/simple_cache.hpp \
+       common/sharedptr_registry.hpp \
         common/MemoryModel.h\
         common/Mutex.h\
        common/PrebufferedStreambuf.h\
diff --git a/src/common/sharedptr_registry.hpp b/src/common/sharedptr_registry.hpp
new file mode 100644 (file)
index 0000000..729b74d
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SHAREDPTR_REGISTRY_H
+#define CEPH_SHAREDPTR_REGISTRY_H
+
+#include <map>
+#include <memory>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+/**
+ * Provides a registry of shared_ptr<V> indexed by K while
+ * the references are alive.
+ */
+template <class K, class V>
+class SharedPtrRegistry {
+  Mutex lock;
+  Cond cond;
+  typedef std::tr1::shared_ptr<V> VPtr;
+  typedef std::tr1::weak_ptr<V> WeakVPtr;
+  map<K, WeakVPtr> contents;
+
+  class OnRemoval {
+    SharedPtrRegistry<K,V> *parent;
+    K key;
+  public:
+    OnRemoval(SharedPtrRegistry<K,V> *parent, K key) :
+      parent(parent), key(key) {}
+    void operator()(V *to_remove) {
+      {
+       Mutex::Locker l(parent->lock);
+       parent->contents.erase(key);
+       parent->cond.Signal();
+      }
+      delete to_remove;
+    }
+  };
+  friend class OnRemoval;
+
+public:
+  SharedPtrRegistry() : lock("SharedPtrRegistry::lock") {}
+
+  template<class A>
+  VPtr lookup(const K &key, const A &arg) {
+    Mutex::Locker l(lock);
+    while (1) {
+      if (contents.count(key)) {
+       VPtr retval = contents[key].lock();
+       if (retval)
+         return retval;
+      } else {
+       break;
+      }
+      cond.Wait(lock);
+    }
+    VPtr retval(new V(arg), OnRemoval(this, key));
+    contents[key] = retval;
+    return retval;
+  }
+};
+
+#endif
index bcbffe4883d14b2a467202f5f741d3946b3c719e..ecc24a5a45bfd41f553d45d61d24245877ffc5f0 100644 (file)
@@ -599,7 +599,16 @@ public:
       delete t;
     }
   };
-
+  template<class T>
+  struct C_DeleteTransactionHolder : public Context {
+    ObjectStore::Transaction *t;
+    T obj;
+    C_DeleteTransactionHolder(ObjectStore::Transaction *tt, T &obj) :
+      t(tt), obj(obj) {}
+    void finish(int r) {
+      delete t;
+    }
+  };
 
   virtual unsigned apply_transaction(Transaction& t, Context *ondisk=0) = 0;
   virtual unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0) = 0;
index a65b87061ed5ac4c7ec52f807afe1655f3a45696..fccc03ed8e6e5cd2ce3b85037894f2a6cc0f22d0 100644 (file)
@@ -152,7 +152,6 @@ OSDService::OSDService(OSD *osd) :
   snap_trim_wq(osd->snap_trim_wq),
   scrub_wq(osd->scrub_wq),
   scrub_finalize_wq(osd->scrub_finalize_wq),
-  remove_wq(osd->remove_wq),
   rep_scrub_wq(osd->rep_scrub_wq),
   class_handler(osd->class_handler),
   publish_lock("OSDService::publish_lock"),
@@ -710,7 +709,8 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
   scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
   rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
-  remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
+  remove_wq(store, g_conf->osd_remove_thread_timeout, &disk_tp),
+  next_removal_seq(0),
   watch_lock("OSD::watch_lock"),
   watch_timer(external_messenger->cct, watch_lock),
   service(this)
@@ -1149,9 +1149,6 @@ void OSD::clear_temp(ObjectStore *store, coll_t tmp)
   vector<hobject_t> objects;
   store->collection_list(tmp, objects);
 
-  if (objects.empty())
-    return;
-
   // delete them.
   ObjectStore::Transaction t;
   unsigned removed = 0;
@@ -1332,6 +1329,19 @@ void OSD::load_pgs()
       if (it->is_temp(pgid))
        clear_temp(store, *it);
       dout(10) << "load_pgs skipping non-pg " << *it << dendl;
+      if (it->is_temp(pgid)) {
+       clear_temp(store, *it);
+       continue;
+      }
+      uint64_t seq;
+      if (it->is_removal(&seq)) {
+       if (seq >= next_removal_seq)
+         next_removal_seq = seq + 1;
+       pair<coll_t, SequencerRef> *to_queue = new pair<coll_t, SequencerRef>;
+       to_queue->first = *it;
+       remove_wq.queue(to_queue);
+       continue;
+      }
       continue;
     }
     if (snap != CEPH_NOSNAP) {
@@ -1941,6 +1951,34 @@ void OSD::dump_ops_in_flight(ostream& ss)
   op_tracker.dump_ops_in_flight(ss);
 }
 
+// =========================================
+void OSD::RemoveWQ::_process(pair<coll_t, SequencerRef> *item)
+{
+  coll_t &coll = item->first;
+  ObjectStore::Sequencer *osr = item->second.get();
+  store->flush();
+  vector<hobject_t> olist;
+  store->collection_list(coll, olist);
+  //*_dout << "OSD::RemoveWQ::_process removing coll " << coll << std::endl;
+  uint64_t num = 1;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  for (vector<hobject_t>::iterator i = olist.begin();
+       i != olist.end();
+       ++i, ++num) {
+    if (num % 20 == 0) {
+      store->queue_transaction(
+       osr, t,
+       new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+      t = new ObjectStore::Transaction;
+    }
+    t->remove(coll, *i);
+  }
+  t->remove_collection(coll);
+  store->queue_transaction(
+    osr, t,
+    new ObjectStore::C_DeleteTransactionHolder<SequencerRef>(t, item->second));
+  delete item;
+}
 // =========================================
 
 void OSD::do_mon_report()
@@ -3623,8 +3661,10 @@ void OSD::activate_map()
 
     if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
       //pool is deleted!
-      queue_pg_for_deletion(pg);
-      //pg->unlock();
+      pg->get();
+      _remove_pg(pg);
+      pg->unlock();
+      pg->put();
       continue;
     } else {
       pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
@@ -4204,7 +4244,7 @@ void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg)
   if (!ctx.transaction->empty()) {
     ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
     int tr = store->queue_transaction(
-      &pg->osr,
+      pg->osr.get(),
       ctx.transaction, ctx.on_applied, ctx.on_safe);
     assert(tr == 0);
     ctx.transaction = new ObjectStore::Transaction;
@@ -4228,7 +4268,7 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg)
   } else {
     ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
     int tr = store->queue_transaction(
-      &pg->osr,
+      pg->osr.get(),
       ctx.transaction, ctx.on_applied, ctx.on_safe);
     assert(tr == 0);
   }
@@ -4440,7 +4480,8 @@ void OSD::handle_pg_trim(OpRequestRef op)
       ObjectStore::Transaction *t = new ObjectStore::Transaction;
       pg->trim(*t, m->trim_to);
       pg->write_info(*t);
-      int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t));
+      int tr = store->queue_transaction(pg->osr.get(), t,
+                                       new ObjectStore::C_DeleteTransaction(t));
       assert(tr == 0);
     }
     pg->unlock();
@@ -4543,14 +4584,10 @@ void OSD::handle_pg_query(OpRequestRef op)
 
     if (pg_map.count(pgid)) {
       pg = _lookup_lock_pg(pgid);
-      if (!pg->deleting) {
-       pg->queue_query(it->second.epoch_sent, it->second.epoch_sent,
-                       from, it->second);
-       pg->unlock();
-       continue;
-      } else {
-       pg->unlock();
-      }
+      pg->queue_query(it->second.epoch_sent, it->second.epoch_sent,
+                     from, it->second);
+      pg->unlock();
+      continue;
     }
 
     // get active crush mapping
@@ -4622,151 +4659,50 @@ void OSD::handle_pg_remove(OpRequestRef op)
     dout(5) << "queue_pg_for_deletion: " << pgid << dendl;
     PG *pg = _lookup_lock_pg(pgid);
     if (pg->info.history.same_interval_since <= m->get_epoch()) {
-      if (pg->deleting) {
-       dout(10) << *pg << " already removing." << dendl;
-      } else {
-       assert(pg->get_primary() == m->get_source().num());
-       queue_pg_for_deletion(pg);
-      }
+      assert(pg->get_primary() == m->get_source().num());
+      pg->get();
+      _remove_pg(pg);
+      pg->unlock();
+      pg->put();
     } else {
       dout(10) << *pg << " ignoring remove request, pg changed in epoch "
               << pg->info.history.same_interval_since
               << " > " << m->get_epoch() << dendl;
+      pg->unlock();
     }
-    pg->unlock();
-  }
-}
-
-
-void OSD::queue_pg_for_deletion(PG *pg)
-{
-  dout(10) << *pg << " removing." << dendl;
-  pg->assert_locked();
-  assert(pg->get_role() == -1);
-  if (!pg->deleting) {
-    pg->deleting = true;
-    remove_wq.queue(pg);
   }
 }
 
 void OSD::_remove_pg(PG *pg)
 {
-  pg_t pgid = pg->info.pgid;
-  dout(10) << "_remove_pg " << pgid << dendl;
-  
-  pg->lock();
-  if (!pg->deleting) {
-    pg->unlock();
-    return;
-  }
-  
-  // reset log, last_complete, in case deletion gets canceled
-  pg->info.last_complete = eversion_t();
-  pg->info.last_update = eversion_t();
-  pg->info.log_tail = eversion_t();
-  pg->log.zero();
-  pg->ondisklog.zero();
-
-  {
-    ObjectStore::Transaction *t = new ObjectStore::Transaction;
-    pg->write_info(*t);
-    pg->write_log(*t);
-    int tr = store->queue_transaction(&pg->osr, t);
-    assert(tr == 0);
-  }
-  
-  // flush all pg operations to the fs, so we can rely on
-  // collection_list below.
-  pg->osr.flush();
-
-  int n = 0;
-
+  vector<coll_t> removals;
   ObjectStore::Transaction *rmt = new ObjectStore::Transaction;
-
-  // snap collections
   for (interval_set<snapid_t>::iterator p = pg->snap_collections.begin();
        p != pg->snap_collections.end();
-       p++) {
+       ++p) {
     for (snapid_t cur = p.get_start();
         cur < p.get_start() + p.get_len();
         ++cur) {
-      vector<hobject_t> olist;      
-      store->collection_list(coll_t(pgid, cur), olist);
-      dout(10) << "_remove_pg " << pgid << " snap " << cur << " " << olist.size() << " objects" << dendl;
-      for (vector<hobject_t>::iterator q = olist.begin();
-          q != olist.end();
-          q++) {
-       ObjectStore::Transaction *t = new ObjectStore::Transaction;
-       t->remove(coll_t(pgid, cur), *q);
-       t->remove(coll_t(pgid), *q);          // we may hit this twice, but it's harmless
-       int tr = store->queue_transaction(&pg->osr, t);
-       assert(tr == 0);
-       
-       if ((++n & 0xff) == 0) {
-         pg->unlock();
-         pg->lock();
-         if (!pg->deleting) {
-           dout(10) << "_remove_pg aborted on " << *pg << dendl;
-           pg->unlock();
-           return;
-         }
-       }
-      }
-      rmt->remove_collection(coll_t(pgid, cur));
+      coll_t to_remove = get_next_removal_coll();
+      removals.push_back(to_remove);
+      rmt->collection_rename(coll_t(pg->info.pgid, cur), to_remove);
     }
   }
-
-  // (what remains of the) main collection
-  vector<hobject_t> olist;
-  store->collection_list(coll_t(pgid), olist);
-  dout(10) << "_remove_pg " << pgid << " " << olist.size() << " objects" << dendl;
-  for (vector<hobject_t>::iterator p = olist.begin();
-       p != olist.end();
-       p++) {
-    ObjectStore::Transaction *t = new ObjectStore::Transaction;
-    t->remove(coll_t(pgid), *p);
-    int tr = store->queue_transaction(&pg->osr, t);
-    assert(tr == 0);
-
-    if ((++n & 0xff) == 0) {
-      pg->unlock();
-      pg->lock();
-      if (!pg->deleting) {
-       dout(10) << "_remove_pg aborted on " << *pg << dendl;
-       pg->unlock();
-       return;
-      }
-    }
-  }
-
-  pg->unlock();
-
-  dout(10) << "_remove_pg " << pgid << " flushing store" << dendl;
-  store->flush();
-  
-  dout(10) << "_remove_pg " << pgid << " taking osd_lock" << dendl;
-  osd_lock.Lock();
-  pg->lock();
-  
-  if (!pg->deleting) {
-    osd_lock.Unlock();
-    pg->unlock();
-    return;
-  }
-
-  dout(10) << "_remove_pg " << pgid << " removing final" << dendl;
-
-  {
-    rmt->remove(coll_t::META_COLL, pg->log_oid);
-    rmt->remove(coll_t::META_COLL, pg->biginfo_oid);
-    rmt->remove_collection(coll_t(pgid));
-    int tr = store->queue_transaction(NULL, rmt);
-    assert(tr == 0);
+  coll_t to_remove = get_next_removal_coll();
+  removals.push_back(to_remove);
+  rmt->collection_rename(coll_t(pg->info.pgid), to_remove);
+  if (pg->have_temp_coll()) {
+    to_remove = get_next_removal_coll();
+    removals.push_back(to_remove);
+    rmt->collection_rename(pg->get_temp_coll(), to_remove);
   }
+  rmt->remove(coll_t::META_COLL, pg->log_oid);
+  rmt->remove(coll_t::META_COLL, pg->biginfo_oid);
 
-  if (store->collection_exists(coll_t::make_temp_coll(pg->get_pgid()))) {
-    clear_temp(store, coll_t::make_temp_coll(pg->get_pgid()));
-  }
+  store->queue_transaction(
+    pg->osr.get(), rmt,
+    new ObjectStore::C_DeleteTransactionHolder<
+      SequencerRef>(rmt, pg->osr));
 
   // on_removal, which calls remove_watchers_and_notifies, and the erasure from 
   // the pg_map must be done together without unlocking the pg lock,
@@ -4774,18 +4710,28 @@ void OSD::_remove_pg(PG *pg)
   // and handle_notify_timeout
   pg->on_removal();
 
+  for (vector<coll_t>::iterator i = removals.begin();
+       i != removals.end();
+       ++i) {
+    remove_wq.queue(new pair<coll_t, SequencerRef>(*i, pg->osr));
+  }
+
+  recovery_wq.dequeue(pg);
+  scrub_wq.dequeue(pg);
+  scrub_finalize_wq.dequeue(pg);
+  snap_trim_wq.dequeue(pg);
+  pg_stat_queue_dequeue(pg);
+  op_wq.dequeue(pg);
+  peering_wq.dequeue(pg);
+
+  pg->deleting = true;
+
   // remove from map
-  pg_map.erase(pgid);
+  pg_map.erase(pg->info.pgid);
   pg->put(); // since we've taken it out of map
-  service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
 
+  service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
   _put_pool(pg->pool);
-
-  // unlock, and probably delete
-  pg->unlock();
-  pg->put();  // will delete, if last reference
-  osd_lock.Unlock();
-  dout(10) << "_remove_pg " << pgid << " all done" << dendl;
 }
 
 
@@ -4866,8 +4812,12 @@ void OSD::do_recovery(PG *pg)
     dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
     recovery_wq.queue(pg);
   } else {
-
     pg->lock();
+    if (pg->deleting) {
+      pg->unlock();
+      pg->put();
+      return;
+    }
     
     dout(10) << "do_recovery starting " << max
             << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops) on "
@@ -5327,6 +5277,11 @@ void OSD::dequeue_op(PG *pg)
   OpRequestRef op;
 
   pg->lock();
+  if (pg->deleting) {
+    pg->unlock();
+    pg->put();
+    return;
+  }
   assert(!pg->op_queue.empty());
   op = pg->op_queue.front();
   pg->op_queue.pop_front();
index 03b3822536595f7f5a806092ed4defa84ba64209..11793fa15ba7b88d55a30e6bbcc9aae887522926 100644 (file)
@@ -49,6 +49,7 @@ using namespace __gnu_cxx;
 #include "OpRequest.h"
 #include "common/shared_cache.hpp"
 #include "common/simple_cache.hpp"
+#include "common/sharedptr_registry.hpp"
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
@@ -128,10 +129,13 @@ class OpsFlightSocketHook;
 
 extern const coll_t meta_coll;
 
+typedef std::tr1::shared_ptr<ObjectStore::Sequencer> SequencerRef;
+
 class OSD;
 class OSDService {
 public:
   OSD *osd;
+  SharedPtrRegistry<pg_t, ObjectStore::Sequencer> osr_registry;
   const int whoami;
   ObjectStore *&store;
   LogClient &clog;
@@ -146,7 +150,6 @@ public:
   ThreadPool::WorkQueue<PG> &snap_trim_wq;
   ThreadPool::WorkQueue<PG> &scrub_wq;
   ThreadPool::WorkQueue<PG> &scrub_finalize_wq;
-  ThreadPool::WorkQueue<PG> &remove_wq;
   ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
   ClassHandler  *&class_handler;
 
@@ -499,7 +502,7 @@ private:
   OpsFlightSocketHook *admin_ops_hook;
 
   // -- op queue --
-  deque<PG*> op_queue;
+  list<PG*> op_queue;
   int op_queue_len;
 
   struct OpWQ : public ThreadPool::WorkQueue<PG> {
@@ -509,7 +512,14 @@ private:
 
     bool _enqueue(PG *pg);
     void _dequeue(PG *pg) {
-      assert(0);
+      for (list<PG*>::iterator i = osd->op_queue.begin();
+          i != osd->op_queue.end();
+          ) {
+       if (*i == pg)
+         osd->op_queue.erase(i++);
+       else
+         ++i;
+      }
     }
     bool _empty() {
       return osd->op_queue.empty();
@@ -531,14 +541,21 @@ private:
 
   // -- peering queue --
   struct PeeringWQ : public ThreadPool::WorkQueue<PG> {
-    deque<PG*> peering_queue;
+    list<PG*> peering_queue;
     OSD *osd;
     PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
       : ThreadPool::WorkQueue<PG>(
        "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
 
     void _dequeue(PG *pg) {
-      assert(0);
+      for (list<PG*>::iterator i = peering_queue.begin();
+          i != peering_queue.end();
+          ) {
+       if (*i == pg)
+         peering_queue.erase(i++);
+       else
+         ++i;
+      }
     }
     bool _enqueue(PG *pg) {
       pg->get();
@@ -809,7 +826,6 @@ protected:
   void handle_pg_backfill(OpRequestRef op);
 
   void handle_pg_remove(OpRequestRef op);
-  void queue_pg_for_deletion(PG *pg);
   void _remove_pg(PG *pg);
 
   // -- commands --
@@ -1121,45 +1137,42 @@ protected:
   } rep_scrub_wq;
 
   // -- removing --
-  xlist<PG*> remove_queue;
-
-  struct RemoveWQ : public ThreadPool::WorkQueue<PG> {
-    OSD *osd;
-    RemoveWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::RemoveWQ", ti, 0, tp), osd(o) {}
+  struct RemoveWQ : public ThreadPool::WorkQueue<pair<coll_t, SequencerRef> > {
+    ObjectStore *&store;
+    list<pair<coll_t, SequencerRef> *> remove_queue;
+    RemoveWQ(ObjectStore *&o, time_t ti, ThreadPool *tp)
+      : ThreadPool::WorkQueue<pair<coll_t, SequencerRef> >("OSD::RemoveWQ", ti, 0, tp),
+       store(o) {}
 
     bool _empty() {
-      return osd->remove_queue.empty();
+      return remove_queue.empty();
     }
-    bool _enqueue(PG *pg) {
-      if (pg->remove_item.is_on_list())
-       return false;
-      pg->get();
-      osd->remove_queue.push_back(&pg->remove_item);
+    bool _enqueue(pair<coll_t, SequencerRef> *item) {
+      remove_queue.push_back(item);
       return true;
     }
-    void _dequeue(PG *pg) {
-      if (pg->remove_item.remove_myself())
-       pg->put();
+    void _dequeue(pair<coll_t, SequencerRef> *item) {
+      assert(0);
     }
-    PG *_dequeue() {
-      if (osd->remove_queue.empty())
+    pair<coll_t, SequencerRef> *_dequeue() {
+      if (remove_queue.empty())
        return NULL;
-      PG *pg = osd->remove_queue.front();
-      osd->remove_queue.pop_front();
-      return pg;
-    }
-    void _process(PG *pg) {
-      osd->_remove_pg(pg);
+      pair<coll_t, SequencerRef> *item = remove_queue.front();
+      remove_queue.pop_front();
+      return item;
     }
+    void _process(pair<coll_t, SequencerRef> *item);
     void _clear() {
-      while (!osd->remove_queue.empty()) {
-       PG *pg = osd->remove_queue.front();
-       osd->remove_queue.pop_front();
-       pg->put();
+      while (!remove_queue.empty()) {
+       delete remove_queue.front();
+       remove_queue.pop_front();
       }
     }
   } remove_wq;
+  uint64_t next_removal_seq;
+  coll_t get_next_removal_coll() {
+    return coll_t::make_removal_coll(next_removal_seq++);
+  }
 
  private:
   bool ms_dispatch(Message *m);
index e2c333afb4a3760cf8ce9dda84554fcc05304fbc..9f651a2564477f66fe8bc04a643ac11157a0aafd 100644 (file)
@@ -40,6 +40,37 @@ static ostream& _prefix(std::ostream *_dout, const PG *pg) {
   return *_dout << pg->gen_prefix();
 }
 
+PG::PG(OSDService *o, OSDMapRef curmap,
+       PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) :
+  osd(o), osdmap_ref(curmap), pool(_pool),
+  _lock("PG::_lock"),
+  ref(0), deleting(false), dirty_info(false), dirty_log(false),
+  info(p), coll(p), log_oid(loid), biginfo_oid(ioid),
+  recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
+  recovery_ops_active(0),
+  waiting_on_backfill(0),
+  role(0),
+  state(0),
+  need_up_thru(false),
+  need_flush(false),
+  last_peering_reset(0),
+  heartbeat_peer_lock("PG::heartbeat_peer_lock"),
+  backfill_target(-1),
+  pg_stats_lock("PG::pg_stats_lock"),
+  pg_stats_valid(false),
+  osr(osd->osr_registry.lookup(p, (stringify(p)))),
+  finish_sync_event(NULL),
+  finalizing_scrub(false),
+  scrub_block_writes(false),
+  scrub_active(false),
+  scrub_reserved(false), scrub_reserve_failed(false),
+  scrub_waiting_on(0),
+  active_rep_scrub(0),
+  recovery_state(this)
+{
+  pool->get();
+}
+
 void PG::lock(bool no_lockdep)
 {
   _lock.Lock(no_lockdep);
@@ -1436,7 +1467,7 @@ void PG::do_pending_flush()
   assert(is_locked());
   if (need_flush) {
     dout(10) << "do_pending_flush doing pending flush" << dendl;
-    osr.flush();
+    osr->flush();
     need_flush = false;
     dout(10) << "do_pending_flush done" << dendl;
   }
@@ -1546,7 +1577,7 @@ void PG::_activate_committed(epoch_t e, entity_inst_t& primary)
   if (dirty_info) {
     ObjectStore::Transaction *t = new ObjectStore::Transaction;
     write_info(*t);
-    int tr = osd->store->queue_transaction(&osr, t);
+    int tr = osd->store->queue_transaction(osr.get(), t);
     assert(tr == 0);
   }
 
@@ -1566,7 +1597,6 @@ void PG::all_activated_and_committed()
   assert(peer_activated.size() == acting.size());
 
   info.history.last_epoch_started = get_osdmap()->get_epoch();
-  dirty_info = true;
 
   // make sure CLEAN is marked if we've been clean in this interval
   if (info.last_complete == info.last_update &&
@@ -2860,7 +2890,7 @@ void PG::build_scrub_map(ScrubMap &map)
 
   // wait for any writes on our pg to flush to disk first.  this avoids races
   // with scrub starting immediately after trim or recovery completion.
-  osr.flush();
+  osr->flush();
 
   // objects
   vector<hobject_t> ls;
@@ -3039,6 +3069,11 @@ void PG::scrub()
 {
 
   lock();
+  if (deleting) {
+    unlock();
+    put();
+    return;
+  }
 
   if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
     dout(10) << "scrub -- not primary or active or not clean" << dendl;
@@ -3292,6 +3327,11 @@ void PG::_compare_scrubmaps(const map<int,ScrubMap*> &maps,
 
 void PG::scrub_finalize() {
   lock();
+  if (deleting) {
+    unlock();
+    return;
+  }
+
   assert(last_update_applied == info.last_update);
 
   if (scrub_epoch_start != info.history.same_interval_since) {
@@ -3406,7 +3446,7 @@ void PG::scrub_finalize() {
   {
     ObjectStore::Transaction *t = new ObjectStore::Transaction;
     write_info(*t);
-    int tr = osd->store->queue_transaction(&osr, t);
+    int tr = osd->store->queue_transaction(osr.get(), t);
     assert(tr == 0);
   }
 
@@ -3633,7 +3673,6 @@ void PG::on_removal()
   osd->scrub_wq.dequeue(this);
   osd->scrub_finalize_wq.dequeue(this);
   osd->snap_trim_wq.dequeue(this);
-  osd->remove_wq.dequeue(this);
   osd->pg_stat_queue_dequeue(this);
 
   remove_watchers_and_notifies();
@@ -3758,13 +3797,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
   // pg->on_*
   on_change();
 
-  if (deleting) {
-    dout(10) << *this << " canceling deletion!" << dendl;
-    deleting = false;
-    osd->remove_wq.dequeue(this);
-    osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
-  }
-    
+  assert(!deleting);
+
   if (role != oldrole) {
     // old primary?
     if (oldrole == 0) {
@@ -3923,8 +3957,6 @@ ostream& operator<<(ostream& out, const PG& pg)
   if (pg.snap_trimq.size())
     out << " snaptrimq=" << pg.snap_trimq;
 
-  if (pg.deleting)
-    out << " DELETING";
   out << "]";
 
 
@@ -4077,7 +4109,6 @@ void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
   }
   if (old_peering_evt(evt))
     return;
-  assert(!deleting);
   recovery_state.handle_event(evt, rctx);
 }
 
index 22f88bcf85dc6fff3c8781c7b5e1d4d0c4ae77d0..be1d58d2ab365f2d514e7a6b0273c0c20bfcf377 100644 (file)
@@ -424,7 +424,7 @@ public:
 
   /* You should not use these items without taking their respective queue locks
    * (if they have one) */
-  xlist<PG*>::item recovery_item, scrub_item, scrub_finalize_item, snap_trim_item, remove_item, stat_queue_item;
+  xlist<PG*>::item recovery_item, scrub_item, scrub_finalize_item, snap_trim_item, stat_queue_item;
   int recovery_ops_active;
   bool waiting_on_backfill;
 #ifdef DEBUG_RECOVERY_OIDS
@@ -628,7 +628,7 @@ protected:
   pg_stat_t pg_stats_stable;
 
   // for ordering writes
-  ObjectStore::Sequencer osr;
+  std::tr1::shared_ptr<ObjectStore::Sequencer> osr;
 
   void update_stats();
   void clear_stats();
@@ -775,6 +775,8 @@ public:
   void build_scrub_map(ScrubMap &map);
   void build_inc_scrub_map(ScrubMap &map, eversion_t v);
   virtual int _scrub(ScrubMap &map, int& errors, int& fixed) { return 0; }
+  virtual coll_t get_temp_coll() = 0;
+  virtual bool have_temp_coll() = 0;
   void clear_scrub_reserved();
   void scrub_reserve_replicas();
   void scrub_unreserve_replicas();
@@ -1363,36 +1365,7 @@ public:
 
  public:
   PG(OSDService *o, OSDMapRef curmap,
-     PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) :
-    osd(o), osdmap_ref(curmap), pool(_pool),
-    _lock("PG::_lock"),
-    ref(0), deleting(false), dirty_info(false), dirty_log(false),
-    info(p), coll(p), log_oid(loid), biginfo_oid(ioid),
-    recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), remove_item(this), stat_queue_item(this),
-    recovery_ops_active(0),
-    waiting_on_backfill(0),
-    role(0),
-    state(0),
-    need_up_thru(false),
-    need_flush(false),
-    last_peering_reset(0),
-    heartbeat_peer_lock("PG::heartbeat_peer_lock"),
-    backfill_target(-1),
-    flushed(true),
-    pg_stats_lock("PG::pg_stats_lock"),
-    pg_stats_valid(false),
-    osr(stringify(p)),
-    finish_sync_event(NULL),
-    finalizing_scrub(false),
-    scrub_block_writes(false),
-    scrub_active(false),
-    scrub_reserved(false), scrub_reserve_failed(false),
-    scrub_waiting_on(0),
-    active_rep_scrub(0),
-    recovery_state(this)
-  {
-    pool->get();
-  }
+     PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid);
   virtual ~PG() {
     pool->put();
   }
@@ -1540,7 +1513,7 @@ public:
   virtual void do_sub_op_reply(OpRequestRef op) = 0;
   virtual void do_scan(OpRequestRef op) = 0;
   virtual void do_backfill(OpRequestRef op) = 0;
-  virtual bool snap_trimmer() = 0;
+  virtual void snap_trimmer() = 0;
 
   virtual int do_command(vector<string>& cmd, ostream& ss,
                         bufferlist& idata, bufferlist& odata) = 0;
index 0da6767d2e0e95f8b2b657173e3e2f644852bd5c..31e9f864de4cd95a815311db316aa29294b1d765 100644 (file)
@@ -457,7 +457,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
 
        hobject_t next;
        hobject_t current = response.handle;
-       osr.flush();
+       osr->flush();
        int r = osd->store->collection_list_partial(coll, current,
                                                    list_size,
                                                    list_size,
@@ -1121,7 +1121,7 @@ void ReplicatedPG::do_scan(OpRequestRef op)
   case MOSDPGScan::OP_SCAN_GET_DIGEST:
     {
       BackfillInterval bi;
-      osr.flush();
+      osr->flush();
       scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
       MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
                                         get_osdmap()->get_epoch(), m->query_epoch,
@@ -1198,7 +1198,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op)
 
       ObjectStore::Transaction *t = new ObjectStore::Transaction;
       write_info(*t);
-      int tr = osd->store->queue_transaction(&osr, t);
+      int tr = osd->store->queue_transaction(osr.get(), t);
       assert(tr == 0);
     }
     break;
@@ -1243,7 +1243,7 @@ bool ReplicatedPG::get_obs_to_trim(snapid_t &snap_to_trim,
   }
 
   // flush pg ops to fs so we can rely on collection_list()
-  osr.flush();
+  osr->flush();
 
   osd->store->collection_list(col_to_trim, obs_to_trim);
 
@@ -1398,9 +1398,14 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
   return repop;
 }
 
-bool ReplicatedPG::snap_trimmer()
+void ReplicatedPG::snap_trimmer()
 {
   lock();
+  if (deleting) {
+    unlock();
+    put();
+    return;
+  }
   dout(10) << "snap_trimmer entry" << dendl;
   if (is_primary()) {
     entity_inst_t nobody;
@@ -1409,7 +1414,7 @@ bool ReplicatedPG::snap_trimmer()
       queue_snap_trim();
       unlock();
       put();
-      return true;
+      return;
     }
     if (!scrub_block_writes) {
       dout(10) << "snap_trimmer posting" << dendl;
@@ -1432,7 +1437,7 @@ bool ReplicatedPG::snap_trimmer()
   }
   unlock();
   put();
-  return true;
+  return;
 }
 
 int ReplicatedPG::do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr)
@@ -3363,7 +3368,7 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   Context *onapplied = new C_OSD_OpApplied(this, repop);
   Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc,
                                                        repop->ctx->clone_obc);
-  int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
+  int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
   if (r) {
     derr << "apply_repop  queue_transactions returned " << r << " on " << *repop << dendl;
     assert(0);
@@ -4256,7 +4261,7 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
   
   Context *oncommit = new C_OSD_RepModifyCommit(rm);
   Context *onapply = new C_OSD_RepModifyApply(rm);
-  int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit, 0, op);
+  int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op);
   if (r) {
     dout(0) << "error applying transaction: r = " << r << dendl;
     assert(0);
@@ -4951,7 +4956,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
   }
 
   int r = osd->store->
-    queue_transaction(&osr, t,
+    queue_transaction(osr.get(), t,
                      onreadable,
                      new C_OSD_CommittedPushedObject(this, op,
                                                      info.history.same_interval_since,
@@ -5006,7 +5011,7 @@ void ReplicatedPG::handle_push(OpRequestRef op)
                         t);
 
   int r = osd->store->
-    queue_transaction(&osr, t,
+    queue_transaction(osr.get(), t,
                      onreadable,
                      new C_OSD_CommittedPushedObject(
                        this, op,
@@ -5432,7 +5437,7 @@ void ReplicatedPG::sub_op_remove(OpRequestRef op)
 
   ObjectStore::Transaction *t = new ObjectStore::Transaction;
   remove_object_with_snap_hardlinks(*t, m->poid);
-  int r = osd->store->queue_transaction(&osr, t);
+  int r = osd->store->queue_transaction(osr.get(), t);
   assert(r == 0);
 }
 
@@ -5597,7 +5602,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
     write_info(*t);
   }
 
-  osd->store->queue_transaction(&osr, t, c, NULL, new C_OSD_OndiskWriteUnlockList(&c->obcs));
+  osd->store->queue_transaction(osr.get(), t, c, NULL, new C_OSD_OndiskWriteUnlockList(&c->obcs));
              
   // Send out the PG log to all replicas
   // So that they know what is lost
@@ -5970,7 +5975,7 @@ int ReplicatedPG::recover_primary(int max)
 
              recover_got(soid, latest->version);
 
-             osd->store->queue_transaction(&osr, t,
+             osd->store->queue_transaction(osr.get(), t,
                                            new C_OSD_AppliedRecoveredObject(this, t, obc),
                                            new C_OSD_CommittedPushedObject(this, OpRequestRef(),
                                                                            info.history.same_interval_since,
@@ -6184,7 +6189,7 @@ int ReplicatedPG::recover_backfill(int max)
   // objects.
   dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
   backfill_info.clear();
-  osr.flush();
+  osr->flush();
   scan_range(backfill_pos, local_min, local_max, &backfill_info);
 
   int ops = 0;
@@ -6198,7 +6203,7 @@ int ReplicatedPG::recover_backfill(int max)
   while (ops < max) {
     if (backfill_info.begin <= pbi.begin &&
        !backfill_info.extends_to_end() && backfill_info.empty()) {
-      osr.flush();
+      osr->flush();
       scan_range(backfill_info.end, local_min, local_max, &backfill_info);
       backfill_info.trim();
     }
@@ -6717,7 +6722,7 @@ boost::statechart::result ReplicatedPG::RepColTrim::react(const SnapTrim&)
   
   // flush all operations to fs so we can rely on collection_list
   // below.
-  pg->osr.flush();
+  pg->osr->flush();
 
   vector<hobject_t> obs_to_trim;
   pg->osd->store->collection_list(col_to_trim, obs_to_trim);
@@ -6840,7 +6845,7 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&
   pg->snap_collections.erase(sn);
   pg->write_info(*t);
   t->remove_collection(c);
-  int tr = pg->osd->store->queue_transaction(&pg->osr, t);
+  int tr = pg->osd->store->queue_transaction(pg->osr.get(), t);
   assert(tr == 0);
 
   context<SnapTrimmer>().need_share_pg_info = true;
index 8bd5105550c16130db070926ae82bb12238be726..1d3caee4621bbbb4840310245ccfbc79d432a96a 100644 (file)
@@ -806,13 +806,21 @@ public:
                       coll_t &col_to_trim,
                       vector<hobject_t> &obs_to_trim);
   RepGather *trim_object(const hobject_t &coid, const snapid_t &sn);
-  bool snap_trimmer();
+  void snap_trimmer();
   int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
   void do_osd_op_effects(OpContext *ctx);
 private:
   bool temp_created;
   coll_t temp_coll;
   coll_t get_temp_coll(ObjectStore::Transaction *t);
+public:
+  bool have_temp_coll() {
+    return temp_created;
+  }
+  coll_t get_temp_coll() {
+    return temp_coll;
+  }
+private:
   struct NotTrimming;
   struct SnapTrim : boost::statechart::event< SnapTrim > {
     SnapTrim() : boost::statechart::event < SnapTrim >() {}
index 79a628050b2e8b770b210fc854e2027a6e00f14d..bc40d1b432916d0705df7934a173626e8e5ddbac 100644 (file)
@@ -280,6 +280,15 @@ bool coll_t::is_pg(pg_t& pgid, snapid_t& snap) const
   return true;
 }
 
+bool coll_t::is_removal(uint64_t *seq) const
+{
+  if (str.substr(0, 12) != string("FORREMOVAL_"))
+    return false;
+
+  stringstream ss(str.substr(12));
+  ss >> *seq;
+  return true;
+}
 
 void coll_t::encode(bufferlist& bl) const
 {
index f863b6445f4d56bc2d4ddb256a7f619072af9729..35bc0a3bcb1f91a27854cb6906384c1853fb0307 100644 (file)
@@ -323,6 +323,10 @@ public:
     return coll_t(pg_to_tmp_str(pgid));
   }
 
+  static coll_t make_removal_coll(uint64_t seq) {
+    return coll_t(seq_to_removal_str(seq));
+  }
+
   const std::string& to_str() const {
     return str;
   }
@@ -337,6 +341,7 @@ public:
 
   bool is_pg(pg_t& pgid, snapid_t& snap) const;
   bool is_temp(pg_t& pgid) const;
+  bool is_removal(uint64_t *seq) const;
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& bl);
   inline bool operator==(const coll_t& rhs) const {
@@ -360,6 +365,11 @@ private:
     oss << p << "_TEMP";
     return oss.str();
   }
+  static std::string seq_to_removal_str(uint64_t seq) {
+    std::ostringstream oss;
+    oss << "FORREMOVAL_" << seq;
+    return oss.str();
+  }
 
   std::string str;
 };