]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: associate a CollectionHandle with each PG
authorSage Weil <sage@redhat.com>
Mon, 18 Jan 2016 16:07:28 +0000 (11:07 -0500)
committerSage Weil <sage@redhat.com>
Wed, 27 Jan 2016 19:34:51 +0000 (14:34 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h

index dffd7a3712990198f4ce807128f58f6869c2102f..55736486fe0f4ae746a1a854fc3f0fb2c07d3cad 100644 (file)
@@ -2946,6 +2946,8 @@ void OSD::load_pgs()
     }
     // there can be no waiters here, so we don't call wake_pg_waiters
 
+    pg->ch = store->open_collection(pg->coll);
+
     // read pg state, log
     pg->read_state(store, bl);
 
@@ -7077,6 +7079,7 @@ void OSD::split_pgs(
     PG* child = _make_pg(nextmap, *i);
     child->lock(true);
     out_pgs->insert(child);
+    rctx->created_pgs.insert(child);
 
     unsigned split_bits = i->get_split_bits(pg_num);
     dout(10) << "pg_num is " << pg_num << dendl;
@@ -7238,10 +7241,27 @@ PG::RecoveryCtx OSD::create_context()
   return rctx;
 }
 
+struct C_OpenPGs : public Context {
+  set<PGRef> pgs;
+  ObjectStore *store;
+  C_OpenPGs(set<PGRef>& p, ObjectStore *s) : store(s) {
+    pgs.swap(p);
+  }
+  void finish(int r) {
+    for (auto p : pgs) {
+      p->ch = store->open_collection(p->coll);
+      assert(p->ch);
+    }
+  }
+};
+
 void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
                                        ThreadPool::TPHandle *handle)
 {
   if (!ctx.transaction->empty()) {
+    if (!ctx.created_pgs.empty()) {
+      ctx.on_applied->add(new C_OpenPGs(ctx.created_pgs, store));
+    }
     ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
     int tr = store->queue_transaction(
       pg->osr.get(),
@@ -7268,11 +7288,16 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
   delete ctx.info_map;
   if ((ctx.on_applied->empty() &&
        ctx.on_safe->empty() &&
-       ctx.transaction->empty()) || !pg) {
+       ctx.transaction->empty() &&
+       ctx.created_pgs.empty()) || !pg) {
     delete ctx.transaction;
     delete ctx.on_applied;
     delete ctx.on_safe;
+    assert(ctx.created_pgs.empty());
   } else {
+    if (!ctx.created_pgs.empty()) {
+      ctx.on_applied->add(new C_OpenPGs(ctx.created_pgs, store));
+    }
     ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
     int tr = store->queue_transaction(
       pg->osr.get(),
index 88b42d061e233fbaebd61c5f859b980ca51fc62a..64d550055f8882735e6687914170f1a26a7f4dc8 100644 (file)
@@ -5446,6 +5446,7 @@ void PG::handle_loaded(RecoveryCtx *rctx)
 void PG::handle_create(RecoveryCtx *rctx)
 {
   dout(10) << "handle_create" << dendl;
+  rctx->created_pgs.insert(this);
   Initialize evt;
   recovery_state.handle_event(evt, rctx);
   ActMap evt2;
index c5f1c12ff7af45806fd13c6014b2a356bac3dfd6..8265f463cddef581d124812b88371821fcd06dfd 100644 (file)
@@ -298,6 +298,7 @@ public:
   void upgrade(ObjectStore *store);
 
   const coll_t coll;
+  ObjectStore::CollectionHandle ch;
   PGLog  pg_log;
   static string get_info_key(spg_t pgid) {
     return stringify(pgid) + "_info";
@@ -530,6 +531,7 @@ public:
     map<int, map<spg_t, pg_query_t> > *query_map;
     map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map;
     map<int, vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list;
+    set<PGRef> created_pgs;
     C_Contexts *on_applied;
     C_Contexts *on_safe;
     ObjectStore::Transaction *transaction;