]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: add initial split support
authorSamuel Just <sam.just@inktank.com>
Wed, 21 Nov 2012 00:47:49 +0000 (16:47 -0800)
committerSamuel Just <sam.just@inktank.com>
Fri, 7 Dec 2012 06:51:52 +0000 (22:51 -0800)
PGs are split after updating to the map on which they split.
OSD::activate_map populates the set of currently "splitting"
pgs.  Messages for those pgs are delayed until the split
is complete.  We add the newly split children to pg_map
once the transaction populating their on-disk state completes.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc

index f9ea8624be16af49c5af11d54602a68581c29a4b..ec3035c0a66f409ecbc65cfb9ebe9a36dc85dc20 100644 (file)
@@ -175,9 +175,57 @@ OSDService::OSDService(OSD *osd) :
   map_cache_lock("OSDService::map_lock"),
   map_cache(g_conf->osd_map_cache_size),
   map_bl_cache(g_conf->osd_map_cache_size),
-  map_bl_inc_cache(g_conf->osd_map_cache_size)
+  map_bl_inc_cache(g_conf->osd_map_cache_size),
+  in_progress_split_lock("OSDService::in_progress_split_lock")
 {}
 
+void OSDService::_start_split(const set<pg_t> &pgs)
+{
+  for (set<pg_t>::const_iterator i = pgs.begin();
+       i != pgs.end();
+       ++i) {
+    assert(!in_progress_splits.count(*i));
+    in_progress_splits.insert(*i);
+  }
+}
+
+void OSDService::expand_pg_num(OSDMapRef old_map,
+                              OSDMapRef new_map)
+{
+  Mutex::Locker l(in_progress_split_lock);
+  set<pg_t> children;
+  for (set<pg_t>::iterator i = in_progress_splits.begin();
+       i != in_progress_splits.end();
+       ) {
+    assert(old_map->have_pg_pool(i->pool()));
+    if (!new_map->have_pg_pool(i->pool())) {
+      in_progress_splits.erase(i++);
+    } else {
+      i->is_split(old_map->get_pg_num(i->pool()),
+                 new_map->get_pg_num(i->pool()), &children);
+      ++i;
+    }
+  }
+  _start_split(children);
+}
+
+bool OSDService::splitting(pg_t pgid)
+{
+  Mutex::Locker l(in_progress_split_lock);
+  return in_progress_splits.count(pgid);
+}
+
+void OSDService::complete_split(const set<pg_t> &pgs)
+{
+  Mutex::Locker l(in_progress_split_lock);
+  for (set<pg_t>::const_iterator i = pgs.begin();
+       i != pgs.end();
+       ++i) {
+    assert(in_progress_splits.count(*i));
+    in_progress_splits.erase(*i);
+  }
+}
+
 void OSDService::need_heartbeat_peer_update()
 {
   osd->need_heartbeat_peer_update();
@@ -1271,6 +1319,22 @@ PG *OSD::_open_lock_pg(
 {
   assert(osd_lock.is_locked());
 
+  PG* pg = _make_pg(createmap, pgid);
+
+  pg_map[pgid] = pg;
+
+  if (hold_map_lock)
+    pg->lock_with_map_lock_held(no_lockdep_check);
+  else
+    pg->lock(no_lockdep_check);
+  pg->get();  // because it's in pg_map
+  return pg;
+}
+
+PG* OSD::_make_pg(
+  OSDMapRef createmap,
+  pg_t pgid)
+{
   dout(10) << "_open_lock_pg " << pgid << dendl;
   PGPool pool = _get_pool(pgid.pool(), createmap);
 
@@ -1283,17 +1347,39 @@ PG *OSD::_open_lock_pg(
   else 
     assert(0);
 
-  assert(pg_map.count(pgid) == 0);
-  pg_map[pgid] = pg;
-
-  if (hold_map_lock)
-    pg->lock_with_map_lock_held(no_lockdep_check);
-  else
-    pg->lock(no_lockdep_check);
-  pg->get();  // because it's in pg_map
   return pg;
 }
 
+
+void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
+{
+  epoch_t e(service.get_osdmap()->get_epoch());
+  pg->get();  // For pg_map
+  pg_map[pg->info.pgid] = pg;
+  dout(10) << "Adding newly split pg " << *pg << dendl;
+  vector<int> up, acting;
+  pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid, up, acting);
+  int role = pg->get_osdmap()->calc_pg_role(service.whoami, acting);
+  pg->set_role(role);
+  service.reg_last_pg_scrub(pg->info.pgid,
+                           pg->info.history.last_scrub_stamp);
+  pg->handle_loaded(rctx);
+  pg->write_if_dirty(*(rctx->transaction));
+  pg->queue_null(e, e);
+  map<pg_t, list<PG::CephPeeringEvtRef> >::iterator to_wake =
+    peering_wait_for_split.find(pg->info.pgid);
+  if (to_wake != peering_wait_for_split.end()) {
+    for (list<PG::CephPeeringEvtRef>::iterator i =
+          to_wake->second.begin();
+        i != to_wake->second.end();
+        ++i) {
+      pg->queue_peering_event(*i);
+    }
+    peering_wait_for_split.erase(to_wake);
+  }
+  wake_pg_waiters(pg->info.pgid);
+}
+
 PG *OSD::_create_lock_pg(
   OSDMapRef createmap,
   pg_t pgid, bool newly_created, bool hold_map_lock,
@@ -1567,13 +1653,14 @@ void OSD::build_past_intervals_parallel()
     store->apply_transaction(t);
 }
 
-
 /*
  * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
  * hasn't changed since the given epoch and we are the primary.
  */
-PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
-                         epoch_t epoch, int from, int& created, bool primary)
+PG *OSD::get_or_create_pg(
+  const pg_info_t& info, pg_interval_map_t& pi,
+  epoch_t epoch, int from, int& created, bool primary,
+  OpRequestRef op)
 {
   PG *pg;
 
@@ -1594,6 +1681,10 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
       return NULL;
     }
 
+    if (service.splitting(info.pgid)) {
+      assert(0);
+    }
+
     bool create = false;
     if (primary) {
       assert(role == 0);  // otherwise, probably bug in project_pg_history.
@@ -3865,7 +3956,9 @@ void OSD::check_osdmap_features()
   }
 }
 
-void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
+void OSD::advance_pg(
+  epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx,
+  set<boost::intrusive_ptr<PG> > *new_pgs)
 {
   assert(pg->is_locked());
   epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1;
@@ -3879,9 +3972,22 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
        next_epoch <= osd_epoch;
        ++next_epoch) {
     OSDMapRef nextmap = get_map(next_epoch);
+
     vector<int> newup, newacting;
     nextmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting);
     pg->handle_advance_map(nextmap, lastmap, newup, newacting, rctx);
+
+    // Check for split!
+    set<pg_t> children;
+    if (pg->info.pgid.is_split(
+       lastmap->get_pg_num(pg->pool.id),
+       nextmap->get_pg_num(pg->pool.id),
+       &children)) {
+      split_pgs(
+       pg, children, new_pgs, lastmap, nextmap,
+       rctx);
+    }
+
     lastmap = nextmap;
   }
   pg->handle_activate_map(rctx);
@@ -3953,6 +4059,22 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
       waiting_for_pg.erase(p++);
     }
   }
+  map<pg_t, list<PG::CephPeeringEvtRef> >::iterator q =
+    peering_wait_for_split.begin();
+  while (q != peering_wait_for_split.end()) {
+    pg_t pgid = q->first;
+
+    // am i still primary?
+    vector<int> acting;
+    int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+    int role = osdmap->calc_pg_role(whoami, acting, nrep);
+    if (role >= 0) {
+      ++q;  // still me
+    } else {
+      dout(10) << " discarding waiting ops for " << pgid << dendl;
+      peering_wait_for_split.erase(q++);
+    }
+  }
 }
 
 void OSD::activate_map()
@@ -3971,6 +4093,9 @@ void OSD::activate_map()
 
   list<PG*> to_remove;
 
+  service.expand_pg_num(service.get_osdmap(),
+                       osdmap);
+
   // scan pg's
   for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
        it != pg_map.end();
@@ -3987,11 +4112,18 @@ void OSD::activate_map()
     if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean)
       oldest_last_clean = pg->info.history.last_epoch_clean;
 
+    set<pg_t> split_pgs;
     if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
       //pool is deleted!
       pg->get();
       to_remove.push_back(pg);
+    } else if (it->first.is_split(
+                service.get_osdmap()->get_pg_num(it->first.pool()),
+                osdmap->get_pg_num(it->first.pool()),
+                &split_pgs)) {
+      service.start_split(split_pgs);
     }
+
     pg->unlock();
   }
 
@@ -4330,6 +4462,59 @@ bool OSD::can_create_pg(pg_t pgid)
   return true;
 }
 
+void OSD::split_pgs(
+  PG *parent,
+  const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+  OSDMapRef curmap,
+  OSDMapRef nextmap,
+  PG::RecoveryCtx *rctx)
+{
+  for (set<pg_t>::const_iterator i = childpgids.begin();
+       i != childpgids.end();
+       ++i) {
+    dout(10) << "Splitting " << *parent << " into " << *i << dendl;
+    assert(service.splitting(*i));
+    PG* child = _make_pg(nextmap, *i);
+    child->lock(true);
+    out_pgs->insert(child);
+
+    unsigned pg_num = nextmap->get_pg_num(
+      parent->pool.id);
+
+    unsigned split_bits = i->get_split_bits(pg_num);
+    dout(10) << "pg_num is " << pg_num << dendl;
+    dout(10) << "m_seed " << i->ps() << dendl;
+    dout(10) << "split_bits is " << split_bits << dendl;
+
+    rctx->transaction->split_collection(
+      coll_t(parent->info.pgid),
+      split_bits,
+      i->m_seed,
+      coll_t(*i));
+    for (interval_set<snapid_t>::iterator k = parent->snap_collections.begin();
+        k != parent->snap_collections.end();
+        ++k) {
+      for (snapid_t j = k.get_start(); j < k.get_start() + k.get_len();
+          ++j) {
+       rctx->transaction->split_collection(
+         coll_t(parent->info.pgid, j),
+         split_bits,
+         i->m_seed,
+         coll_t(*i, j));
+      }
+    }
+    child->snap_collections = parent->snap_collections;
+    parent->split_into(
+      *i,
+      child,
+      split_bits);
+
+    child->write_if_dirty(*(rctx->transaction));
+    child->unlock();
+  }
+}
+  
+
 void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& t,
                   C_Contexts *tfin)
 {
@@ -4820,8 +5005,17 @@ void OSD::handle_pg_notify(OpRequestRef op)
     }
 
     int created = 0;
+    if (service.splitting(it->first.info.pgid)) {
+      peering_wait_for_split[it->first.info.pgid].push_back(
+       PG::CephPeeringEvtRef(
+         new PG::CephPeeringEvt(
+           it->first.epoch_sent, it->first.query_epoch,
+           PG::MNotifyRec(from, it->first))));
+      continue;
+    }
+
     pg = get_or_create_pg(it->first.info, it->second,
-                         it->first.query_epoch, from, created, true);
+                          it->first.query_epoch, from, created, true, op);
     if (!pg)
       continue;
     pg->queue_notify(it->first.epoch_sent, it->first.query_epoch, from, it->first);
@@ -4846,9 +5040,18 @@ void OSD::handle_pg_log(OpRequestRef op)
     return;
   }
 
+  if (service.splitting(m->info.pgid)) {
+    peering_wait_for_split[m->info.pgid].push_back(
+      PG::CephPeeringEvtRef(
+       new PG::CephPeeringEvt(
+         m->get_epoch(), m->get_query_epoch(),
+         PG::MLogRec(from, m))));
+    return;
+  }
+
   int created = 0;
   PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(), 
-                           from, created, false);
+                            from, created, false, op);
   if (!pg)
     return;
   op->mark_started();
@@ -4880,8 +5083,16 @@ void OSD::handle_pg_info(OpRequestRef op)
       continue;
     }
 
+    if (service.splitting(p->first.info.pgid)) {
+      peering_wait_for_split[p->first.info.pgid].push_back(
+       PG::CephPeeringEvtRef(
+         new PG::CephPeeringEvt(
+           p->first.epoch_sent, p->first.query_epoch,
+           PG::MInfoRec(from, p->first.info, p->first.epoch_sent))));
+      continue;
+    }
     PG *pg = get_or_create_pg(p->first.info, p->second, p->first.epoch_sent,
-                             from, created, false);
+                              from, created, false, op);
     if (!pg)
       continue;
     pg->queue_info(p->first.epoch_sent, p->first.query_epoch, from,
@@ -5121,6 +5332,15 @@ void OSD::handle_pg_query(OpRequestRef op)
       continue;
     }
 
+    if (service.splitting(pgid)) {
+      peering_wait_for_split[pgid].push_back(
+       PG::CephPeeringEvtRef(
+         new PG::CephPeeringEvt(
+           it->second.epoch_sent, it->second.epoch_sent,
+           PG::MQuery(from, it->second, it->second.epoch_sent))));
+      continue;
+    }
+
     PG *pg = 0;
 
     if (pg_map.count(pgid)) {
@@ -5653,6 +5873,11 @@ void OSD::handle_sub_op(OpRequestRef op)
   _share_map_incoming(m->get_source_inst(), m->map_epoch,
                      (Session*)m->get_connection()->get_priv());
 
+  if (service.splitting(pgid)) {
+    waiting_for_pg[pgid].push_back(op);
+    return;
+  }
+
   PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
   if (!pg) {
     return;
@@ -5804,6 +6029,30 @@ void OSDService::queue_for_peering(PG *pg)
   peering_wq.queue(pg);
 }
 
+struct C_CompleteSplits : public Context {
+  OSD *osd;
+  set<boost::intrusive_ptr<PG> > pgs;
+  C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in)
+    : osd(osd), pgs(in) {}
+  void finish(int r) {
+    Mutex::Locker l(osd->osd_lock);
+    PG::RecoveryCtx rctx = osd->create_context();
+    set<pg_t> to_complete;
+    for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
+        i != pgs.end();
+        ++i) {
+      (*i)->lock();
+      osd->add_newly_split_pg(&**i, &rctx);
+      osd->dispatch_context_transaction(rctx, &**i);
+      if (!((*i)->deleting))
+       to_complete.insert((*i)->info.pgid);
+      (*i)->unlock();
+    }
+    osd->service.complete_split(to_complete);
+    osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
+  }
+};
+
 void OSD::process_peering_events(const list<PG*> &pgs)
 {
   bool need_up_thru = false;
@@ -5813,6 +6062,7 @@ void OSD::process_peering_events(const list<PG*> &pgs)
   for (list<PG*>::const_iterator i = pgs.begin();
        i != pgs.end();
        ++i) {
+    set<boost::intrusive_ptr<PG> > split_pgs;
     PG *pg = *i;
     pg->lock();
     curmap = service.get_osdmap();
@@ -5820,7 +6070,7 @@ void OSD::process_peering_events(const list<PG*> &pgs)
       pg->unlock();
       continue;
     }
-    advance_pg(curmap->get_epoch(), pg, &rctx);
+    advance_pg(curmap->get_epoch(), pg, &rctx, &split_pgs);
     if (!pg->peering_queue.empty()) {
       PG::CephPeeringEvtRef evt = pg->peering_queue.front();
       pg->peering_queue.pop_front();
@@ -5830,6 +6080,10 @@ void OSD::process_peering_events(const list<PG*> &pgs)
     same_interval_since = MAX(pg->info.history.same_interval_since,
                              same_interval_since);
     pg->write_if_dirty(*rctx.transaction);
+    if (split_pgs.size()) {
+      rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
+      split_pgs.clear();
+    }
     dispatch_context_transaction(rctx, pg);
     pg->unlock();
   }
index ba0da042a31e9e2a884a8cee4e9a54387097cb4a..a87431a69d12afcd3b728026f99ca95db541e741 100644 (file)
@@ -133,6 +133,7 @@ class AuthAuthorizeHandlerRegistry;
 
 class OpsFlightSocketHook;
 class HistoricOpsSocketHook;
+struct C_CompleteSplits;
 
 extern const coll_t meta_coll;
 
@@ -365,6 +366,19 @@ public:
   void init();
   void shutdown();
 
+  // split
+  Mutex in_progress_split_lock;
+  set<pg_t> in_progress_splits;
+  void _start_split(const set<pg_t> &pgs);
+  void start_split(const set<pg_t> &pgs) {
+    Mutex::Locker l(in_progress_split_lock);
+    return _start_split(pgs);
+  }
+  void complete_split(const set<pg_t> &pgs);
+  bool splitting(pg_t pgid);
+  void expand_pg_num(OSDMapRef old_map,
+                    OSDMapRef new_map);
+
   OSDService(OSD *osd);
 };
 class OSD : public Dispatcher {
@@ -601,6 +615,7 @@ private:
   }
   friend class OpsFlightSocketHook;
   friend class HistoricOpsSocketHook;
+  friend class C_CompleteSplits;
   OpsFlightSocketHook *admin_ops_hook;
   HistoricOpsSocketHook *historic_ops_hook;
 
@@ -742,7 +757,9 @@ private:
   void note_down_osd(int osd);
   void note_up_osd(int osd);
   
-  void advance_pg(epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx);
+  void advance_pg(
+    epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx,
+    set<boost::intrusive_ptr<PG> > *split_pgs);
   void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin);
   void activate_map();
 
@@ -782,6 +799,7 @@ protected:
   // -- placement groups --
   hash_map<pg_t, PG*> pg_map;
   map<pg_t, list<OpRequestRef> > waiting_for_pg;
+  map<pg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;
 
   PGPool _get_pool(int id, OSDMapRef createmap);
@@ -803,11 +821,15 @@ protected:
   PG   *_lookup_qlock_pg(pg_t pgid);
 
   PG *lookup_lock_raw_pg(pg_t pgid);
+  PG* _make_pg(OSDMapRef createmap, pg_t pgid);
+  void add_newly_split_pg(PG *pg,
+                         PG::RecoveryCtx *rctx);
 
   PG *get_or_create_pg(const pg_info_t& info,
-                      pg_interval_map_t& pi,
-                      epoch_t epoch, int from, int& pcreated,
-                      bool primary);
+                       pg_interval_map_t& pi,
+                       epoch_t epoch, int from, int& pcreated,
+                       bool primary,
+                       OpRequestRef op);
   
   void load_pgs();
   void build_past_intervals_parallel();
@@ -848,7 +870,12 @@ protected:
 
   void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin);
   void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
-
+  void split_pgs(
+    PG *parent,
+    const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+    OSDMapRef curmap,
+    OSDMapRef nextmap,
+    PG::RecoveryCtx *rctx);
 
   // == monitor interaction ==
   utime_t last_mon_report;
index dfac75c5ca5748b54633603cee0c23221abdc1c7..2323b988f7070f8f34b8ee91e665c1482291176f 100644 (file)
@@ -1978,8 +1978,6 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
   child->info.last_backfill = info.last_backfill;
 
   child->info.stats = info.stats;
-  info.stats.stats_invalid = true;
-  child->info.stats.stats_invalid = true;
 
   child->snap_trimq = snap_trimq;