#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
-#include "messages/MOSDPGActivateSet.h"
+#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
#include "messages/MPGStats.h"
case MSG_OSD_PG_REMOVE:
handle_pg_remove((MOSDPGRemove*)m);
break;
- case MSG_OSD_PG_ACTIVATE_SET:
- handle_pg_activate_set((MOSDPGActivateSet*)m);
+ case MSG_OSD_PG_INFO:
+ handle_pg_info((MOSDPGInfo*)m);
break;
// client ops
dout(10) << " no longer primary for " << pgid << ", stopping creation" << dendl;
creating_pgs.erase(p);
}
+ p->second.acting.swap(acting); // keep the latest
/*
* adding new ppl to our pg has no effect, since we're still primary,
* and obviously haven't given the new nodes any data.
map< int, list<PG::Info> > notify_list; // primary -> list
map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
- map<int,MOSDPGActivateSet*> activator_map; // peer -> message
+ map<int,MOSDPGInfo*> info_map; // peer -> message
// scan pg's
for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
else if (pg->get_role() == 0 && !pg->is_active()) {
// i am (inactive) primary
pg->build_prior();
- pg->peer(t, query_map, &activator_map);
+ pg->peer(t, query_map, &info_map);
}
else if (pg->is_stray() &&
pg->get_primary() >= 0) {
do_notifies(notify_list); // notify? (residual|replica)
do_queries(query_map);
- do_activators(activator_map);
+ do_infos(info_map);
logger->set("numpg", pg_map.size());
// pg creation
-bool OSD::ready_to_create_pg(pg_t pgid)
+PG *OSD::try_create_pg(pg_t pgid, ObjectStore::Transaction& t)
{
assert(creating_pgs.count(pgid));
// priors empty?
if (!creating_pgs[pgid].prior.empty()) {
- dout(10) << "ready_to_create_pg " << pgid
+ dout(10) << "try_create_pg " << pgid
<< " - waiting for priors " << creating_pgs[pgid].prior << dendl;
- return false;
+ return 0;
}
if (creating_pgs[pgid].parent != pg_t()) {
- PG *parent = _lookup_lock_pg(creating_pgs[pgid].parent);
+ dout(10) << "try_create_pg " << pgid << " - queuing for split" << dendl;
+ pg_split_ready[creating_pgs[pgid].parent].insert(pgid);
+ return 0;
+ }
+
+ dout(10) << "try_create_pg " << pgid << " - creating now" << dendl;
+ PG *pg = _create_lock_new_pg(pgid, creating_pgs[pgid].acting, t);
+ return pg;
+}
+
+
+int OSD::num_expected_children_of(pg_t pgid)
+{
+ int n = osdmap->get_pg_num();
+ int o = osdmap->get_pgp_num();
+ assert(n > o);
+
+ /*
+ bits
+o pgp_num = 7 2
+o pgp_num = 8 3
+
+n pg_num = 8 3
+n pg_num = 9 4
+
+0 000
+1 001
+2 010
+3 011
+4 100
+ 5 101
+ 6 110
+ 7 111
+ 8 1000
+ 9 1001
+
+max = 2
+
+ */
+
+ assert(pgid.u.pg.ps < o);
+ int obits = calc_bits_of(o)-1; // lower bound
+ int nbits = calc_bits_of(n-1); // upper bound
+ assert(nbits > obits);
+
+ int max = 0xffffffff >> (32 - (nbits-obits)); // == -> 1
+ int num = 0;
+ dout(10) << "num_expected_children_of " << pgid
+ << " o/n " << o << "/" << n
+ << " bits " << obits << "/" << nbits
+ << " max " << max
+ << dendl;
+
+ for (int i=1; i<=max; i++) {
+ int ps = (i << obits) | pgid.u.pg.ps;
+ dout(10) << "num_expected_children_of " << pgid.u.pg.ps << " -> " << ps << dendl;
+ if (ps < o || ps >= n)
+ continue;
+ num++;
+ }
+
+ dout(10) << "num_expected_children_of " << pgid
+ << " num " << num
+ << dendl;
+
+ return num;
+}
+
+void OSD::kick_pg_split_queue()
+{
+ map< int, map<pg_t,PG::Query> > query_map;
+ map<int, MOSDPGInfo*> info_map;
+ int created = 0;
+
+ dout(10) << "kick_pg_split_queue" << dendl;
+
+ map<pg_t, set<pg_t> >::iterator n = pg_split_ready.begin();
+ while (n != pg_split_ready.end()) {
+ map<pg_t, set<pg_t> >::iterator p = n++;
+ // how many children should this parent have?
+ unsigned nchildren = num_expected_children_of(p->first);
+ if (p->second.size() < nchildren) {
+ dout(15) << " parent " << p->first << " children " << p->second
+ << " ... waiting for more children" << dendl;
+ continue;
+ }
+
+ PG *parent = _lookup_lock_pg(p->first);
assert(parent);
if (!parent->is_clean()) {
- dout(10) << "ready_to_create_pg " << pgid
- << " - parent " << parent->info.pgid << " not clean" << dendl;
+ dout(10) << "kick_pg_split_queue parent " << p->first << " not clean" << dendl;
parent->unlock();
- return false;
+ continue;
}
+
+ dout(15) << " parent " << p->first << " children " << p->second
+ << " ready" << dendl;
+
+ // FIXME: this should be done in a separate thread, eventually
+
+ // create and lock children
+ ObjectStore::Transaction t;
+ map<pg_t,PG*> children;
+ for (set<pg_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ q++) {
+ PG *pg = _create_lock_new_pg(*q, creating_pgs[*q].acting, t);
+ children[*q] = pg;
+ }
+
+ // split
+ split_pg(parent, children, t);
+
+ // unlock parent, children
parent->unlock();
+ for (map<pg_t,PG*>::iterator q = children.begin(); q != children.end(); q++) {
+ PG *pg = q->second;
+ // fix up pg metadata
+ pg->info.last_complete = pg->info.last_update;
+ t.collection_setattr(pg->info.pgid, "info", (char*)&pg->info, sizeof(pg->info));
+ pg->write_log(t);
+
+ if (waiting_for_pg.count(pg->info.pgid)) {
+ take_waiters(waiting_for_pg[pg->info.pgid]);
+ waiting_for_pg.erase(pg->info.pgid);
+ }
+ pg->peer(t, query_map, &info_map);
+
+ pg->unlock();
+ created++;
+ }
+ store->apply_transaction(t);
}
- dout(10) << "ready_to_create_pg " << pgid << " - ready!" << dendl;
- return true;
+ do_queries(query_map);
+ do_infos(info_map);
+ if (created)
+ update_heartbeat_peers();
+
}
+void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t)
+{
+ dout(10) << "split_pg " << *parent << dendl;
+ pg_t parentid = parent->info.pgid;
+
+ list<pobject_t> olist;
+ store->collection_list(parent->info.pgid, olist);
+
+ while (!olist.empty()) {
+ pobject_t poid = olist.front();
+ olist.pop_front();
+
+ ceph_object_layout l = osdmap->make_object_layout(poid.oid, parentid.type(), parentid.size(),
+ parentid.pool(), parentid.preferred());
+ if (l.ol_pgid.pg64 != parentid.u.pg64) {
+ pg_t pgid(l.ol_pgid);
+ dout(20) << " moving " << poid << " from " << parentid << " -> " << pgid << dendl;
+ PG *child = children[pgid];
+ assert(child);
+ eversion_t v;
+ store->getattr(poid, "version", &v, sizeof(v));
+ if (v > child->info.last_update) {
+ child->info.last_update = v;
+ dout(25) << " tagging pg with v " << v << " > " << child->info.last_update << dendl;
+ } else {
+ dout(25) << " not tagging pg with v " << v << " <= " << child->info.last_update << dendl;
+ }
+ t.collection_add(pgid, poid);
+ t.collection_remove(parentid, poid);
+ } else {
+ dout(20) << " leaving " << poid << " in " << parentid << dendl;
+ }
+ }
+}
+
+
/*
* holding osd_lock
*/
if (!require_same_or_newer_map(m, m->epoch)) return;
map< int, map<pg_t,PG::Query> > query_map;
+ map<int, MOSDPGInfo*> info_map;
ObjectStore::Transaction t;
int created = 0;
if (parent != pg_t())
on = parent;
- dout(20) << "mkpg " << pgid << " from parent " << parent << " logically created " << created << dendl;;
+ if (parent != pg_t()) {
+ dout(20) << "mkpg " << pgid << " e" << created << " from parent " << parent << dendl;
+ } else {
+ dout(20) << "mkpg " << pgid << " e" << created << dendl;
+ }
// is it still ours?
vector<int> acting;
// register.
creating_pgs[pgid].created = created;
creating_pgs[pgid].parent = parent;
+ creating_pgs[pgid].acting.swap(acting);
calc_priors_during(pgid, created, history.same_primary_since,
creating_pgs[pgid].prior);
- if (ready_to_create_pg(pgid)) {
- dout(10) << "mkpg " << pgid << " creating now" << dendl;
- PG *pg = _create_lock_new_pg(pgid, acting, t);
- pg->unlock();
+ // poll priors
+ set<int>& pset = creating_pgs[pgid].prior;
+ dout(10) << "mkpg " << pgid << " e" << created
+ << " : querying priors " << pset << dendl;
+ for (set<int>::iterator p = pset.begin(); p != pset.end(); p++)
+ if (osdmap->is_up(*p))
+ query_map[*p][pgid].type = PG::Query::INFO;
+
+ PG *pg = try_create_pg(pgid, t);
+ if (pg) {
created++;
- } else {
- set<int>& pset = creating_pgs[pgid].prior;
- dout(10) << "mkpg " << pgid << " e " << created
- << " : waiting for parent and/or querying priors " << pset << dendl;
- for (set<int>::iterator p = pset.begin(); p != pset.end(); p++)
- if (osdmap->is_up(*p))
- query_map[*p][pgid].type = PG::Query::INFO;
+ if (waiting_for_pg.count(pgid)) {
+ take_waiters(waiting_for_pg[pgid]);
+ waiting_for_pg.erase(pgid);
+ }
+ pg->peer(t, query_map, &info_map);
+ pg->unlock();
}
}
store->apply_transaction(t);
+
do_queries(query_map);
- delete m;
+ do_infos(info_map);
+ kick_pg_split_queue();
if (created)
update_heartbeat_peers();
+ delete m;
}
}
-void OSD::do_activators(map<int,MOSDPGActivateSet*>& activator_map)
+void OSD::do_infos(map<int,MOSDPGInfo*>& info_map)
{
- for (map<int,MOSDPGActivateSet*>::iterator p = activator_map.begin();
- p != activator_map.end();
+ for (map<int,MOSDPGInfo*>::iterator p = info_map.begin();
+ p != info_map.end();
++p)
messenger->send_message(p->second, osdmap->get_inst(p->first));
- activator_map.clear();
+ info_map.clear();
}
// look for unknown PGs i'm primary for
map< int, map<pg_t,PG::Query> > query_map;
- map<int, MOSDPGActivateSet*> activator_map;
+ map<int, MOSDPGInfo*> info_map;
int created = 0;
for (list<PG::Info>::iterator it = m->get_pg_list().begin();
// is there a creation pending on this pg?
if (creating_pgs.count(pgid)) {
creating_pgs[pgid].prior.erase(from);
- if (!ready_to_create_pg(pgid))
- continue;
- dout(10) << "handle_pg_notify pg " << pgid
- << " finished creation probe and DNE, creating"
- << dendl;
- pg = _create_lock_new_pg(pgid, acting, t);
- // fall through
+ pg = try_create_pg(pgid, t);
+ if (!pg)
+ continue;
} else {
dout(10) << "handle_pg_notify pg " << pgid
<< " DNE on source, but creation probe, ignoring" << dendl;
} else {
if (it->last_epoch_started > pg->last_epoch_started_any)
pg->adjust_prior();
- pg->peer(t, query_map, &activator_map);
+ pg->peer(t, query_map, &info_map);
}
pg->unlock();
assert(tr == 0);
do_queries(query_map);
- do_activators(activator_map);
+ do_infos(info_map);
+ kick_pg_split_queue();
+
if (created)
update_heartbeat_peers();
PG::Info &info,
PG::Log &log,
PG::Missing &missing,
- map<int, MOSDPGActivateSet*>* activator_map)
+ map<int, MOSDPGInfo*>* info_map,
+ int& created)
{
- if (pg_map.count(info.pgid) == 0) {
- dout(10) << "_process_pg_info " << info << " don't have pg" << dendl;
- assert(epoch < osdmap->get_epoch());
- return;
- }
+ ObjectStore::Transaction t;
- PG *pg = _lookup_lock_pg(info.pgid);
- assert(pg);
+ PG *pg = 0;
+ if (!_have_pg(info.pgid)) {
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(info.pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
- dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
+ project_pg_history(info.pgid, info.history, epoch, acting);
+ if (epoch < info.history.same_since) {
+ dout(10) << *pg << " got old info " << info << " on non-existent pg, ignoring" << dendl;
+ return;
+ }
- if (epoch < pg->info.history.same_since) {
- dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
- pg->unlock();
- return;
+ // create pg!
+ assert(role != 0);
+ pg = _create_lock_pg(info.pgid, t);
+ dout(10) << " got info on new pg, creating" << dendl;
+ pg->acting.swap(acting);
+ pg->set_role(role);
+ pg->info.history = info.history;
+ pg->write_log(t);
+ store->apply_transaction(t);
+ created++;
+ } else {
+ pg = _lookup_lock_pg(info.pgid);
+ if (epoch < pg->info.history.same_since) {
+ dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
+ pg->unlock();
+ return;
+ }
}
+ assert(pg);
+
+ dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
//m->log.print(cout);
-
- ObjectStore::Transaction t;
if (pg->is_primary()) {
// i am PRIMARY
// peer
map< int, map<pg_t,PG::Query> > query_map;
- pg->peer(t, query_map, activator_map);
+ pg->peer(t, query_map, info_map);
do_queries(query_map);
} else {
assert(pg->missing.num_lost() == 0);
// ok activate!
- pg->activate(t, activator_map);
+ pg->activate(t, info_map);
}
unsigned tr = store->apply_transaction(t);
dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
int from = m->get_source().num();
+ int created = 0;
if (!require_same_or_newer_map(m, m->get_epoch())) return;
_process_pg_info(m->get_epoch(), from,
- m->info, m->log, m->missing, 0);
+ m->info, m->log, m->missing, 0,
+ created);
+ if (created)
+ update_heartbeat_peers();
delete m;
}
-void OSD::handle_pg_activate_set(MOSDPGActivateSet *m)
+void OSD::handle_pg_info(MOSDPGInfo *m)
{
- dout(7) << "handle_pg_activate_set " << *m << " from " << m->get_source() << dendl;
+ dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
int from = m->get_source().num();
if (!require_same_or_newer_map(m, m->get_epoch())) return;
PG::Log empty_log;
PG::Missing empty_missing;
- map<int,MOSDPGActivateSet*> activator_map;
+ map<int,MOSDPGInfo*> info_map;
+ int created = 0;
for (list<PG::Info>::iterator p = m->pg_info.begin();
p != m->pg_info.end();
++p)
- _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &activator_map);
+ _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &info_map, created);
- do_activators(activator_map);
+ do_infos(info_map);
+ if (created)
+ update_heartbeat_peers();
delete m;
}