]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
splitting appears to work, at least sort of
authorSage Weil <sage@newdream.net>
Tue, 18 Mar 2008 19:55:36 +0000 (12:55 -0700)
committerSage Weil <sage@newdream.net>
Tue, 18 Mar 2008 19:55:36 +0000 (12:55 -0700)
14 files changed:
src/Makefile.am
src/messages/MOSDPGActivateSet.h [deleted file]
src/messages/MOSDPGInfo.h [new file with mode: 0644]
src/mon/OSDMonitor.cc
src/mon/PGMonitor.cc
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OSDMap.h
src/osd/PG.cc
src/osd/PG.h
src/osd/osd_types.h
src/start.sh

index 4c5d3e4531191dddb5f29f0aa848ad5ef81e7409..aa87f7130e157923ae406b77250d956c2a944b9a 100644 (file)
@@ -335,7 +335,7 @@ noinst_HEADERS = \
        messages/MOSDMap.h\
        messages/MOSDOp.h\
        messages/MOSDOut.h\
-       messages/MOSDPGActivateSet.h\
+       messages/MOSDPGInfo.h\
        messages/MOSDPGNotify.h\
        messages/MOSDPGQuery.h\
        messages/MOSDPGRemove.h\
diff --git a/src/messages/MOSDPGActivateSet.h b/src/messages/MOSDPGActivateSet.h
deleted file mode 100644 (file)
index a09fc7e..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __MOSDPGACTIVATESET_H
-#define __MOSDPGACTIVATESET_H
-
-#include "msg/Message.h"
-
-class MOSDPGActivateSet : public Message {
-  epoch_t epoch;
-
-public:
-  list<PG::Info> pg_info;
-
-  epoch_t get_epoch() { return epoch; }
-
-  MOSDPGActivateSet() {}
-  MOSDPGActivateSet(version_t mv) :
-    Message(MSG_OSD_PG_ACTIVATE_SET),
-    epoch(mv) { }
-
-  const char *get_type_name() { return "pg_activate_set"; }
-  void print(ostream& out) {
-    out << "pg_activate_set(" << pg_info.size() << " pgs e" << epoch << ")";
-  }
-
-  void encode_payload() {
-    ::_encode(epoch, payload);
-    ::_encode(pg_info, payload);
-  }
-  void decode_payload() {
-    int off = 0;
-    ::_decode(epoch, payload, off);
-    ::_decode(pg_info, payload, off);
-  }
-};
-
-#endif
diff --git a/src/messages/MOSDPGInfo.h b/src/messages/MOSDPGInfo.h
new file mode 100644 (file)
index 0000000..55d59d2
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MOSDPGINFO_H
+#define __MOSDPGINFO_H
+
+#include "msg/Message.h"
+
+class MOSDPGInfo : public Message {
+  epoch_t epoch;
+
+public:
+  list<PG::Info> pg_info;
+
+  epoch_t get_epoch() { return epoch; }
+
+  MOSDPGInfo() {}
+  MOSDPGInfo(version_t mv) :
+    Message(MSG_OSD_PG_INFO),
+    epoch(mv) { }
+
+  const char *get_type_name() { return "pg_info"; }
+  void print(ostream& out) {
+    out << "pg_info(" << pg_info.size() << " pgs e" << epoch << ")";
+  }
+
+  void encode_payload() {
+    ::_encode(epoch, payload);
+    ::_encode(pg_info, payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    ::_decode(epoch, payload, off);
+    ::_decode(pg_info, payload, off);
+  }
+};
+
+#endif
index e4afa18f8cbdda0e91847af6c43afe01915233b7..7688a5ba6f2d28aaf67d8049e0e2897ef049be31 100644 (file)
@@ -785,7 +785,8 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
     }
     else if (m->cmd[1] == "setpgnum" && m->cmd.size() > 2) {
       int n = atoi(m->cmd[2].c_str());
-      if (n > osdmap.get_pg_num()) {
+      if (n > osdmap.get_pg_num() &&
+         osdmap.get_pg_num() == osdmap.get_pgp_num()) {
        ss << "set new pg_num = " << n;
        pending_inc.new_pg_num = n;
        getline(ss, rs);
@@ -793,6 +794,7 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
        return true;
       } else {
        ss << "specified pg_num " << n << " < current " << osdmap.get_pg_num();
+       ss << " or pg_num " << osdmap.get_pg_num() << " > pgp_num " << osdmap.get_pgp_num();
        getline(ss, rs);
        mon->reply_command(m, -EINVAL, rs);
       }
index 7a3258570d6a54df5b70e428f61e76ac2561ec56..0dfbf9cf7d00ed239c05a4b7f84425ceb5801f63 100644 (file)
@@ -343,10 +343,9 @@ void PGMonitor::register_new_pgs()
            int msb = calc_bits_of(parent.u.pg.ps);
            if (!msb) break;
            parent.u.pg.ps &= ~(1<<(msb-1));
-           //dout(10) << " is " << pgid << " parent " << parent << " ?" << dendl;
-           if (pg_map.pg_stat.count(parent) &&
-               pg_map.pg_stat[parent].state != PG_STATE_CREATING) {
-             //dout(10) << "  parent is " << parent << dendl;
+           dout(10) << " is " << pgid << " parent " << parent << " ?" << dendl;
+           if (parent.u.pg.ps < mon->osdmon->osdmap.get_pgp_num()) {
+             dout(10) << "  parent is " << parent << dendl;
              break;
            }
          }
index 04b5944b580d2e13130f8f1a98ca4421d63632d2..d7dfa783015268ff3a49997535c0cfac0b461293 100644 (file)
@@ -41,7 +41,7 @@ using namespace std;
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
-#include "messages/MOSDPGActivateSet.h"
+#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGCreate.h"
 
 #include "messages/MMonMap.h"
@@ -207,8 +207,8 @@ decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data)
   case MSG_OSD_PG_REMOVE:
     m = new MOSDPGRemove;
     break;
-  case MSG_OSD_PG_ACTIVATE_SET:
-    m = new MOSDPGActivateSet;
+  case MSG_OSD_PG_INFO:
+    m = new MOSDPGInfo;
     break;
   case MSG_OSD_PG_CREATE:
     m = new MOSDPGCreate;
index 1e1e9450cc0212204a58b0ad0c7f489b49100225..a3cbd022ec83d152e70deed7ab04243500cc12c9 100644 (file)
@@ -41,7 +41,7 @@
 #define MSG_OSD_PG_SUMMARY     82
 #define MSG_OSD_PG_LOG         83
 #define MSG_OSD_PG_REMOVE      84
-#define MSG_OSD_PG_ACTIVATE_SET 85
+#define MSG_OSD_PG_INFO        85
 
 // CEPH_MSG_PGSTATS            87
 #define MSG_OSD_PG_CREATE      88
index 15c5ebd952183e302fc443d5afd4af2f4ff9b995..09c8fb6253aab7a8caaee088634e6bf302f7938d 100644 (file)
@@ -56,7 +56,7 @@
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
-#include "messages/MOSDPGActivateSet.h"
+#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGCreate.h"
 
 #include "messages/MPGStats.h"
@@ -1089,8 +1089,8 @@ void OSD::dispatch(Message *m)
       case MSG_OSD_PG_REMOVE:
         handle_pg_remove((MOSDPGRemove*)m);
         break;
-      case MSG_OSD_PG_ACTIVATE_SET:
-        handle_pg_activate_set((MOSDPGActivateSet*)m);
+      case MSG_OSD_PG_INFO:
+        handle_pg_info((MOSDPGInfo*)m);
         break;
 
        // client ops
@@ -1441,6 +1441,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
       dout(10) << " no longer primary for " << pgid << ", stopping creation" << dendl;
       creating_pgs.erase(p);
     }
+    p->second.acting.swap(acting);  // keep the latest
     /*
      * adding new ppl to our pg has no effect, since we're still primary,
      * and obviously haven't given the new nodes any data.
@@ -1587,7 +1588,7 @@ void OSD::activate_map(ObjectStore::Transaction& t)
 
   map< int, list<PG::Info> >  notify_list;  // primary -> list
   map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
-  map<int,MOSDPGActivateSet*> activator_map;  // peer -> message
+  map<int,MOSDPGInfo*> info_map;  // peer -> message
 
   // scan pg's
   for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
@@ -1603,7 +1604,7 @@ void OSD::activate_map(ObjectStore::Transaction& t)
     else if (pg->get_role() == 0 && !pg->is_active()) {
       // i am (inactive) primary
       pg->build_prior();
-      pg->peer(t, query_map, &activator_map);
+      pg->peer(t, query_map, &info_map);
     }
     else if (pg->is_stray() &&
              pg->get_primary() >= 0) {
@@ -1617,7 +1618,7 @@ void OSD::activate_map(ObjectStore::Transaction& t)
 
   do_notifies(notify_list);  // notify? (residual|replica)
   do_queries(query_map);
-  do_activators(activator_map);
+  do_infos(info_map);
 
   logger->set("numpg", pg_map.size());
 
@@ -1771,33 +1772,195 @@ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
 // pg creation
 
 
-bool OSD::ready_to_create_pg(pg_t pgid)
+PG *OSD::try_create_pg(pg_t pgid, ObjectStore::Transaction& t)
 {
   assert(creating_pgs.count(pgid));
 
   // priors empty?
   if (!creating_pgs[pgid].prior.empty()) {
-    dout(10) << "ready_to_create_pg " << pgid
+    dout(10) << "try_create_pg " << pgid
             << " - waiting for priors " << creating_pgs[pgid].prior << dendl;
-    return false;
+    return 0;
   }
 
   if (creating_pgs[pgid].parent != pg_t()) {
-    PG *parent = _lookup_lock_pg(creating_pgs[pgid].parent);
+    dout(10) << "try_create_pg " << pgid << " - queuing for split" << dendl;
+    pg_split_ready[creating_pgs[pgid].parent].insert(pgid); 
+    return 0;
+  }
+
+  dout(10) << "try_create_pg " << pgid << " - creating now" << dendl;
+  PG *pg = _create_lock_new_pg(pgid, creating_pgs[pgid].acting, t);
+  return pg;
+}
+
+
+int OSD::num_expected_children_of(pg_t pgid)
+{
+  int n = osdmap->get_pg_num();
+  int o = osdmap->get_pgp_num();
+  assert(n > o);
+
+  /*
+                bits
+o pgp_num = 7      2
+o pgp_num = 8      3
+
+n pg_num  = 8      3
+n pg_num  = 9      4
+
+0   000
+1   001
+2   010
+3   011
+4   100
+ 5  101
+ 6  110
+ 7  111
+ 8 1000
+ 9 1001
+
+max = 2
+
+   */
+
+  assert(pgid.u.pg.ps < o);
+  int obits = calc_bits_of(o)-1;  // lower bound
+  int nbits = calc_bits_of(n-1);  // upper bound
+  assert(nbits > obits);
+    
+  int max = 0xffffffff >> (32 - (nbits-obits)); // == -> 1
+  int num = 0;
+  dout(10) << "num_expected_children_of " << pgid
+          << " o/n " << o << "/" << n
+          << " bits " << obits << "/" << nbits
+          << " max " << max
+          << dendl;
+
+  for (int i=1; i<=max; i++) {
+    int ps = (i << obits) | pgid.u.pg.ps;
+    dout(10) << "num_expected_children_of " << pgid.u.pg.ps << " -> " << ps << dendl;
+    if (ps < o || ps >= n) 
+      continue;
+    num++;
+  }
+
+  dout(10) << "num_expected_children_of " << pgid
+          << " num " << num
+          << dendl;
+
+  return num;
+}
+
+void OSD::kick_pg_split_queue()
+{
+  map< int, map<pg_t,PG::Query> > query_map;
+  map<int, MOSDPGInfo*> info_map;
+  int created = 0;
+
+  dout(10) << "kick_pg_split_queue" << dendl;
+
+  map<pg_t, set<pg_t> >::iterator n = pg_split_ready.begin();
+  while (n != pg_split_ready.end()) {
+    map<pg_t, set<pg_t> >::iterator p = n++;
+    // how many children should this parent have?
+    unsigned nchildren = num_expected_children_of(p->first);
+    if (p->second.size() < nchildren) {
+      dout(15) << " parent " << p->first << " children " << p->second 
+              << " ... waiting for more children" << dendl;
+      continue;
+    }
+
+    PG *parent = _lookup_lock_pg(p->first);
     assert(parent);
     if (!parent->is_clean()) {
-      dout(10) << "ready_to_create_pg " << pgid
-              << " - parent " << parent->info.pgid << " not clean" << dendl;
+      dout(10) << "kick_pg_split_queue parent " << p->first << " not clean" << dendl;
       parent->unlock();
-      return false;
+      continue;
     }
+
+    dout(15) << " parent " << p->first << " children " << p->second 
+            << " ready" << dendl;
+    
+    // FIXME: this should be done in a separate thread, eventually
+
+    // create and lock children
+    ObjectStore::Transaction t;
+    map<pg_t,PG*> children;
+    for (set<pg_t>::iterator q = p->second.begin();
+        q != p->second.end();
+        q++) {
+      PG *pg = _create_lock_new_pg(*q, creating_pgs[*q].acting, t);
+      children[*q] = pg;
+    }
+
+    // split
+    split_pg(parent, children, t); 
+
+    // unlock parent, children
     parent->unlock();
+    for (map<pg_t,PG*>::iterator q = children.begin(); q != children.end(); q++) {
+      PG *pg = q->second;
+      // fix up pg metadata
+      pg->info.last_complete = pg->info.last_update;
+      t.collection_setattr(pg->info.pgid, "info", (char*)&pg->info, sizeof(pg->info));
+      pg->write_log(t);
+
+      if (waiting_for_pg.count(pg->info.pgid)) {
+        take_waiters(waiting_for_pg[pg->info.pgid]);
+        waiting_for_pg.erase(pg->info.pgid);
+      }
+      pg->peer(t, query_map, &info_map);
+
+      pg->unlock();
+      created++;
+    }
+    store->apply_transaction(t);
   }
 
-  dout(10) << "ready_to_create_pg " << pgid << " - ready!" << dendl;      
-  return true;
+  do_queries(query_map);
+  do_infos(info_map);
+  if (created)
+    update_heartbeat_peers();
+
 }
 
+void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t)
+{
+  dout(10) << "split_pg " << *parent << dendl;
+  pg_t parentid = parent->info.pgid;
+
+  list<pobject_t> olist;
+  store->collection_list(parent->info.pgid, olist);  
+
+  while (!olist.empty()) {
+    pobject_t poid = olist.front();
+    olist.pop_front();
+    
+    ceph_object_layout l = osdmap->make_object_layout(poid.oid, parentid.type(), parentid.size(),
+                                                     parentid.pool(), parentid.preferred());
+    if (l.ol_pgid.pg64 != parentid.u.pg64) {
+      pg_t pgid(l.ol_pgid);
+      dout(20) << "  moving " << poid << " from " << parentid << " -> " << pgid << dendl;
+      PG *child = children[pgid];
+      assert(child);
+      eversion_t v;
+      store->getattr(poid, "version", &v, sizeof(v));
+      if (v > child->info.last_update) {
+       child->info.last_update = v;
+       dout(25) << "        tagging pg with v " << v << "  > " << child->info.last_update << dendl;
+      } else {
+       dout(25) << "    not tagging pg with v " << v << " <= " << child->info.last_update << dendl;
+      }
+      t.collection_add(pgid, poid);
+      t.collection_remove(parentid, poid);
+    } else {
+      dout(20) << " leaving " << poid << "   in " << parentid << dendl;
+    }
+  }
+}  
+
+
 /*
  * holding osd_lock
  */
@@ -1808,6 +1971,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
   if (!require_same_or_newer_map(m, m->epoch)) return;
 
   map< int, map<pg_t,PG::Query> > query_map;
+  map<int, MOSDPGInfo*> info_map;
   ObjectStore::Transaction t;
   int created = 0;
 
@@ -1821,7 +1985,11 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
     if (parent != pg_t()) 
       on = parent;
 
-    dout(20) << "mkpg " << pgid << " from parent " << parent << " logically created " << created << dendl;;
+    if (parent != pg_t()) {
+      dout(20) << "mkpg " << pgid << " e" << created << " from parent " << parent << dendl;
+    } else {
+      dout(20) << "mkpg " << pgid << " e" << created << dendl;
+    }
    
     // is it still ours?
     vector<int> acting;
@@ -1852,30 +2020,39 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
     // register.
     creating_pgs[pgid].created = created;
     creating_pgs[pgid].parent = parent;
+    creating_pgs[pgid].acting.swap(acting);
     calc_priors_during(pgid, created, history.same_primary_since, 
                       creating_pgs[pgid].prior);
 
-    if (ready_to_create_pg(pgid)) {
-      dout(10) << "mkpg " << pgid << " creating now" << dendl;
-      PG *pg = _create_lock_new_pg(pgid, acting, t);
-      pg->unlock();
+    // poll priors
+    set<int>& pset = creating_pgs[pgid].prior;
+    dout(10) << "mkpg " << pgid << " e" << created
+            << " : querying priors " << pset << dendl;
+    for (set<int>::iterator p = pset.begin(); p != pset.end(); p++) 
+      if (osdmap->is_up(*p))
+       query_map[*p][pgid].type = PG::Query::INFO;
+    
+    PG *pg = try_create_pg(pgid, t);
+    if (pg) {
       created++;
-    } else {
-      set<int>& pset = creating_pgs[pgid].prior;
-      dout(10) << "mkpg " << pgid << " e " << created
-              << " : waiting for parent and/or querying priors " << pset << dendl;
-      for (set<int>::iterator p = pset.begin(); p != pset.end(); p++) 
-       if (osdmap->is_up(*p))
-         query_map[*p][pgid].type = PG::Query::INFO;
+      if (waiting_for_pg.count(pgid)) {
+        take_waiters(waiting_for_pg[pgid]);
+        waiting_for_pg.erase(pgid);
+      }
+      pg->peer(t, query_map, &info_map);
+      pg->unlock();
     }
   }
 
   store->apply_transaction(t);
+
   do_queries(query_map);
-  delete m;
+  do_infos(info_map);
 
+  kick_pg_split_queue();
   if (created)
     update_heartbeat_peers();
+  delete m;
 }
 
 
@@ -1922,13 +2099,13 @@ void OSD::do_queries(map< int, map<pg_t,PG::Query> >& query_map)
 }
 
 
-void OSD::do_activators(map<int,MOSDPGActivateSet*>& activator_map)
+void OSD::do_infos(map<int,MOSDPGInfo*>& info_map)
 {
-  for (map<int,MOSDPGActivateSet*>::iterator p = activator_map.begin();
-       p != activator_map.end();
+  for (map<int,MOSDPGInfo*>::iterator p = info_map.begin();
+       p != info_map.end();
        ++p) 
     messenger->send_message(p->second, osdmap->get_inst(p->first));
-  activator_map.clear();
+  info_map.clear();
 }
 
 
@@ -1948,7 +2125,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   
   // look for unknown PGs i'm primary for
   map< int, map<pg_t,PG::Query> > query_map;
-  map<int, MOSDPGActivateSet*> activator_map;
+  map<int, MOSDPGInfo*> info_map;
   int created = 0;
 
   for (list<PG::Info>::iterator it = m->get_pg_list().begin();
@@ -1981,14 +2158,10 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
        // is there a creation pending on this pg?
        if (creating_pgs.count(pgid)) {
          creating_pgs[pgid].prior.erase(from);
-         if (!ready_to_create_pg(pgid))
-           continue;
 
-         dout(10) << "handle_pg_notify pg " << pgid
-                  << " finished creation probe and DNE, creating"
-                  << dendl;
-         pg = _create_lock_new_pg(pgid, acting, t);
-         // fall through
+         pg = try_create_pg(pgid, t);
+         if (!pg) 
+           continue;
        } else {
          dout(10) << "handle_pg_notify pg " << pgid
                   << " DNE on source, but creation probe, ignoring" << dendl;
@@ -2060,7 +2233,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
     } else {
       if (it->last_epoch_started > pg->last_epoch_started_any) 
         pg->adjust_prior();
-      pg->peer(t, query_map, &activator_map);
+      pg->peer(t, query_map, &info_map);
     }
 
     pg->unlock();
@@ -2070,8 +2243,10 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   assert(tr == 0);
 
   do_queries(query_map);
-  do_activators(activator_map);
+  do_infos(info_map);
   
+  kick_pg_split_queue();
+
   if (created)
     update_heartbeat_peers();
 
@@ -2093,28 +2268,46 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
                           PG::Info &info, 
                           PG::Log &log, 
                           PG::Missing &missing,
-                          map<int, MOSDPGActivateSet*>* activator_map)
+                          map<int, MOSDPGInfo*>* info_map,
+                          int& created)
 {
-  if (pg_map.count(info.pgid) == 0) {
-    dout(10) << "_process_pg_info " << info << " don't have pg" << dendl;
-    assert(epoch < osdmap->get_epoch());
-    return;
-  }
+  ObjectStore::Transaction t;
 
-  PG *pg = _lookup_lock_pg(info.pgid);
-  assert(pg);
+  PG *pg = 0;
+  if (!_have_pg(info.pgid)) {
+    vector<int> acting;
+    int nrep = osdmap->pg_to_acting_osds(info.pgid, acting);
+    int role = osdmap->calc_pg_role(whoami, acting, nrep);
 
-  dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
+    project_pg_history(info.pgid, info.history, epoch, acting);
+    if (epoch < info.history.same_since) {
+      dout(10) << *pg << " got old info " << info << " on non-existent pg, ignoring" << dendl;
+      return;
+    }
 
-  if (epoch < pg->info.history.same_since) {
-    dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
-    pg->unlock();
-    return;
+    // create pg!
+    assert(role != 0);
+    pg = _create_lock_pg(info.pgid, t);
+    dout(10) << " got info on new pg, creating" << dendl;
+    pg->acting.swap(acting);
+    pg->set_role(role);
+    pg->info.history = info.history;
+    pg->write_log(t);
+    store->apply_transaction(t);
+    created++;
+  } else {
+    pg = _lookup_lock_pg(info.pgid);
+    if (epoch < pg->info.history.same_since) {
+      dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
+      pg->unlock();
+      return;
+    }
   }
+  assert(pg);
+
+  dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
 
   //m->log.print(cout);
-  
-  ObjectStore::Transaction t;
 
   if (pg->is_primary()) {
     // i am PRIMARY
@@ -2125,7 +2318,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
 
     // peer
     map< int, map<pg_t,PG::Query> > query_map;
-    pg->peer(t, query_map, activator_map);
+    pg->peer(t, query_map, info_map);
     do_queries(query_map);
 
   } else {
@@ -2136,7 +2329,7 @@ void OSD::_process_pg_info(epoch_t epoch, int from,
     assert(pg->missing.num_lost() == 0);
 
     // ok activate!
-    pg->activate(t, activator_map);
+    pg->activate(t, info_map);
   }
 
   unsigned tr = store->apply_transaction(t);
@@ -2151,31 +2344,38 @@ void OSD::handle_pg_log(MOSDPGLog *m)
   dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
 
   int from = m->get_source().num();
+  int created = 0;
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
 
   _process_pg_info(m->get_epoch(), from, 
-                  m->info, m->log, m->missing, 0);
+                  m->info, m->log, m->missing, 0,
+                  created);
+  if (created)
+    update_heartbeat_peers();
 
   delete m;
 }
 
-void OSD::handle_pg_activate_set(MOSDPGActivateSet *m)
+void OSD::handle_pg_info(MOSDPGInfo *m)
 {
-  dout(7) << "handle_pg_activate_set " << *m << " from " << m->get_source() << dendl;
+  dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
 
   int from = m->get_source().num();
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
 
   PG::Log empty_log;
   PG::Missing empty_missing;
-  map<int,MOSDPGActivateSet*> activator_map;
+  map<int,MOSDPGInfo*> info_map;
+  int created = 0;
 
   for (list<PG::Info>::iterator p = m->pg_info.begin();
        p != m->pg_info.end();
        ++p) 
-    _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &activator_map);
+    _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &info_map, created);
 
-  do_activators(activator_map);
+  do_infos(info_map);
+  if (created)
+    update_heartbeat_peers();
 
   delete m;
 }
index 0ede9679c2c7c8752d8592e53e5f4387eac8cf63..b8d39e8aacf305394fd9760c951f25e62997e7fd 100644 (file)
@@ -300,15 +300,21 @@ private:
   // -- pg creation --
   struct create_pg_info {
     epoch_t created;
+    vector<int> acting;
     set<int> prior;
     pg_t parent;
     bool has_parent() { return parent != pg_t(); }
   };
   hash_map<pg_t, create_pg_info> creating_pgs;
+  map<pg_t, set<pg_t> > pg_split_ready;  // children ready to be split to, by parent
 
-  bool ready_to_create_pg(pg_t pgid);
+  PG *try_create_pg(pg_t pgid, ObjectStore::Transaction& t);
   void handle_pg_create(class MOSDPGCreate *m);
 
+  int num_expected_children_of(pg_t pgid);
+  void kick_pg_split_queue();
+  void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
+
 
   // -- pg stats --
   Mutex pg_stat_queue_lock;
@@ -346,7 +352,7 @@ private:
 
   void do_notifies(map< int, list<PG::Info> >& notify_list);
   void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
-  void do_activators(map<int, MOSDPGActivateSet*>& activator_map);
+  void do_infos(map<int, MOSDPGInfo*>& info_map);
   void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
 
   bool require_current_map(Message *m, epoch_t v);
@@ -355,15 +361,16 @@ private:
   void handle_pg_query(class MOSDPGQuery *m);
   void handle_pg_notify(class MOSDPGNotify *m);
   void handle_pg_log(class MOSDPGLog *m);
-  void handle_pg_activate_set(class MOSDPGActivateSet *m);
+  void handle_pg_info(class MOSDPGInfo *m);
   void handle_pg_remove(class MOSDPGRemove *m);
 
-  // helper for handle_pg_log and handle_pg_activate_set
+  // helper for handle_pg_log and handle_pg_info
   void _process_pg_info(epoch_t epoch, int from,
                        PG::Info &info, 
                        PG::Log &log, 
                        PG::Missing &missing,
-                       map<int, MOSDPGActivateSet*>* activator_map);
+                       map<int, MOSDPGInfo*>* info_map,
+                       int& created);
 
 
  public:
index 2f9c179088706441ef74898e501cbb50bcad2acf..2291a74936bf9a6b1739b4de732b6513a7540bd6 100644 (file)
@@ -205,6 +205,11 @@ private:
   int get_lpg_num() const { return lpg_num; }
   int get_lpgp_num() const { return lpgp_num; }
 
+  int get_pg_num_mask() const { return pg_num_mask; }
+  int get_pgp_num_mask() const { return pgp_num_mask; }
+  int get_lpg_num_mask() const { return lpg_num_mask; }
+  int get_lpgp_num_mask() const { return lpgp_num_mask; }
+
   /* stamps etc */
   const utime_t& get_ctime() const { return ctime; }
   const utime_t& get_mtime() const { return mtime; }
index a1247847e3ec2b35eda712d82b35a4f67eca53b9..1d9305adb2f8d21abf1511a68b30e6b82aaeffa0 100644 (file)
@@ -24,7 +24,7 @@
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
-#include "messages/MOSDPGActivateSet.h"
+#include "messages/MOSDPGInfo.h"
 
 #define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) *_dout << dbeginl << g_clock.now() << " osd" << osd->whoami << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " "
 
@@ -579,7 +579,7 @@ void PG::clear_primary_state()
 
 void PG::peer(ObjectStore::Transaction& t, 
               map< int, map<pg_t,Query> >& query_map,
-             map<int, MOSDPGActivateSet*> *activator_map)
+             map<int, MOSDPGInfo*> *activator_map)
 {
   dout(10) << "peer.  acting is " << acting 
            << ", prior_set is " << prior_set << dendl;
@@ -832,7 +832,7 @@ void PG::peer(ObjectStore::Transaction& t,
 
 
 void PG::activate(ObjectStore::Transaction& t,
-                 map<int, MOSDPGActivateSet*> *activator_map)
+                 map<int, MOSDPGInfo*> *activator_map)
 {
   assert(!is_active());
 
@@ -906,7 +906,7 @@ void PG::activate(ObjectStore::Transaction& t,
        if (activator_map) {
          dout(10) << "activate - peer osd" << peer << " is up to date, queueing in pending_activators" << dendl;
          if (activator_map->count(peer) == 0)
-           (*activator_map)[peer] = new MOSDPGActivateSet(osd->osdmap->get_epoch());
+           (*activator_map)[peer] = new MOSDPGInfo(osd->osdmap->get_epoch());
          (*activator_map)[peer]->pg_info.push_back(info);
        } else {
          dout(10) << "activate - peer osd" << peer << " is up to date, but sending pg_log anyway" << dendl;
index 7edb71cc73068f0c762e7ba6760c70b126a5d7a5..43a62aa35161e4d545bd68db1a8901592b3dc056 100644 (file)
@@ -40,7 +40,7 @@ class MOSDOp;
 class MOSDOpReply;
 class MOSDSubOp;
 class MOSDSubOpReply;
-class MOSDPGActivateSet;
+class MOSDPGInfo;
 
 /** PG - Replica Placement Group
  *
@@ -523,9 +523,9 @@ public:
 
   void peer(ObjectStore::Transaction& t, 
            map< int, map<pg_t,Query> >& query_map,
-           map<int, MOSDPGActivateSet*> *activator_map=0);
+           map<int, MOSDPGInfo*> *activator_map=0);
   void activate(ObjectStore::Transaction& t, 
-               map<int, MOSDPGActivateSet*> *activator_map=0);
+               map<int, MOSDPGInfo*> *activator_map=0);
 
   virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
 
index b9a1810f837ae8a08a2eca0baab7fdf34edfd5ca..497d36f8e25ab1b81ca8d1f638b75bc0b42d1cc7 100644 (file)
@@ -226,21 +226,23 @@ struct osd_stat_t {
 /*
  * pg states
  */
-#define PG_STATE_CREATING   1  // this had better not collide with PG::STATE_* in osd/PG.h
+#define PG_STATE_CREATING   1  // creating
 #define PG_STATE_ACTIVE     2  // i am active.  (primary: replicas too)
 #define PG_STATE_CLEAN      4  // peers are complete, clean of stray replicas.
 #define PG_STATE_CRASHED    8  // all replicas went down. 
 #define PG_STATE_REPLAY    16  // crashed, waiting for replay
 #define PG_STATE_STRAY     32  // i must notify the primary i exist.
+#define PG_STATE_SPLITTING 64  // i am splitting
 
 static inline std::string pg_state_string(int state) {
   std::string st;
+  if (state & PG_STATE_CREATING) st += "creating+";
   if (state & PG_STATE_ACTIVE) st += "active+";
   if (state & PG_STATE_CLEAN) st += "clean+";
   if (state & PG_STATE_CRASHED) st += "crashed+";
   if (state & PG_STATE_REPLAY) st += "replay+";
   if (state & PG_STATE_STRAY) st += "stray+";
-  if (state & PG_STATE_CREATING) st += "creating+";
+  if (state & PG_STATE_SPLITTING) st += "splitting+";
   if (!st.length()) 
     st = "inactive";
   else 
index 44c83aea5d9b25705ece3a3c48e1e3cb8f574ccf..d83f28d1f20e70503ba8e266865d386955423ebe 100755 (executable)
@@ -39,7 +39,7 @@ $CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap
 for osd in 0 1 2 3 
 do
  $CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd  # initialize empty object store
- $CEPH_BIN/cosd $ARGS dev/osd$osd --debug_osd 20
+ $CEPH_BIN/cosd $ARGS dev/osd$osd --debug_osd 40
 done
 
 # mds