assert(osd_lock.is_locked());
ObjectStore::Transaction t;
+ C_Contexts *fin = new C_Contexts;
logger->inc(l_osd_map);
if (osdmap->is_up(whoami) &&
osdmap->get_addr(whoami) == messenger->get_myaddr()) {
// yay!
- activate_map(t);
+ activate_map(t, fin->contexts);
// process waiters
take_waiters(waiting_for_osdmap);
// superblock and commit
write_superblock(t);
- int r = store->apply_transaction(t);
+ int r = store->apply_transaction(t, fin);
if (r) {
char buf[80];
dout(0) << "error writing map: " << r << " " << strerror_r(-r, buf, sizeof(buf)) << dendl;
}
}
-void OSD::activate_map(ObjectStore::Transaction& t)
+void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
{
assert(osd_lock.is_locked());
// i am (inactive) primary
if (!pg->is_peering() ||
(pg->need_up_thru && up_thru >= pg->info.history.same_acting_since))
- pg->peer(t, query_map, &info_map);
+ pg->peer(t, tfin, query_map, &info_map);
}
else if (pg->is_stray() &&
pg->get_primary() >= 0) {
// create and lock children
ObjectStore::Transaction t;
+ C_Contexts *fin = new C_Contexts;
map<pg_t,PG*> children;
for (set<pg_t>::iterator q = p->second.begin();
q != p->second.end();
wake_pg_waiters(pg->info.pgid);
- pg->peer(t, query_map, &info_map);
+ pg->peer(t, fin->contexts, query_map, &info_map);
pg->update_stats();
pg->unlock();
created++;
}
- int tr = store->apply_transaction(t);
+ int tr = store->apply_transaction(t, fin);
assert(tr == 0);
// remove from queue
map<int, MOSDPGInfo*> info_map;
ObjectStore::Transaction t;
+ C_Contexts *fin = new C_Contexts;
vector<PG*> to_peer;
for (map<pg_t,MOSDPGCreate::create_rec>::iterator p = m->mkpg.begin();
}
}
- int tr = store->apply_transaction(t);
- assert(tr == 0);
-
for (vector<PG*>::iterator p = to_peer.begin(); p != to_peer.end(); p++) {
PG *pg = *p;
pg->lock();
wake_pg_waiters(pg->info.pgid);
- pg->peer(t, query_map, &info_map);
+ pg->peer(t, fin->contexts, query_map, &info_map);
pg->update_stats();
pg->unlock();
}
+ int tr = store->apply_transaction(t, fin);
+ assert(tr == 0);
+
do_queries(query_map);
do_infos(info_map);
if (!require_same_or_newer_map(m, m->get_epoch())) return;
ObjectStore::Transaction t;
+ C_Contexts *fin = new C_Contexts;
// look for unknown PGs i'm primary for
map< int, map<pg_t,PG::Query> > query_map;
pg->state_clear(PG_STATE_CLEAN);
}
- pg->peer(t, query_map, &info_map);
+ pg->peer(t, fin->contexts, query_map, &info_map);
pg->update_stats();
}
pg->unlock();
}
- int tr = store->apply_transaction(t);
+ int tr = store->apply_transaction(t, fin);
assert(tr == 0);
do_queries(query_map);
int& created)
{
ObjectStore::Transaction t;
+ C_Contexts *fin = new C_Contexts;
PG *pg = 0;
if (!_have_pg(info.pgid)) {
// peer
map< int, map<pg_t,PG::Query> > query_map;
- pg->peer(t, query_map, info_map);
+ pg->peer(t, fin->contexts, query_map, info_map);
pg->update_stats();
do_queries(query_map);
} else {
// i am REPLICA
if (!pg->is_active()) {
pg->merge_log(t, info, log, missing, from);
- pg->activate(t, info_map);
+ pg->activate(t, fin->contexts, info_map);
} else {
// just update our stats
dout(10) << *pg << " writing updated stats" << dendl;
}
}
- int tr = store->apply_transaction(t);
+ int tr = store->apply_transaction(t, fin);
assert(tr == 0);
pg->unlock();
map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
ObjectStore::Transaction t;
- pg->peer(t, query_map, NULL);
+ C_Contexts *fin = new C_Contexts;
+ pg->peer(t, fin->contexts, query_map, NULL);
do_queries(query_map);
if (pg->dirty_info)
pg->write_info(t);
if (pg->dirty_log)
pg->write_log(t);
- int tr = store->apply_transaction(t);
+ int tr = store->apply_transaction(t, fin);
assert(tr == 0);
}
pg->get_role() == 0 &&
pg->replay_until == activate_at) {
ObjectStore::Transaction t;
- pg->activate(t);
- int tr = store->apply_transaction(t);
+ C_Contexts *fin = new C_Contexts;
+ pg->activate(t, fin->contexts);
+ int tr = store->apply_transaction(t, fin);
assert(tr == 0);
}
pg->unlock();
}
-void PG::peer(ObjectStore::Transaction& t,
+void PG::peer(ObjectStore::Transaction& t, list<Context*>& tfin,
map< int, map<pg_t,Query> >& query_map,
map<int, MOSDPGInfo*> *activator_map)
{
}
else if (!is_active()) {
// -- ok, activate!
- activate(t, activator_map);
+ activate(t, tfin, activator_map);
}
else if (is_all_uptodate())
- finish_recovery();
+ finish_recovery(t, tfin);
}
-void PG::activate(ObjectStore::Transaction& t,
+void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
map<int, MOSDPGInfo*> *activator_map)
{
assert(!is_active());
// all clean?
if (is_all_uptodate())
- finish_recovery();
+ finish_recovery(t, tfin);
else {
dout(10) << "activate not all replicas are uptodate, queueing recovery" << dendl;
osd->queue_for_recovery(this);
}
};
-void PG::finish_recovery()
+void PG::finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin)
{
dout(10) << "finish_recovery" << dendl;
state_set(PG_STATE_CLEAN);
clear_recovery_state();
+ write_info(t);
+
/*
* sync all this before purging strays. but don't block!
*/
finish_sync_event = new C_PG_FinishRecovery(this);
-
- ObjectStore::Transaction t;
- write_info(t);
- int tr = osd->store->apply_transaction(t, finish_sync_event);
- assert(tr == 0);
+ tfin.push_back(finish_sync_event);
}
void PG::_finish_recovery(Context *c)
bool choose_acting(int newest_update_osd);
bool recover_master_log(map< int, map<pg_t,Query> >& query_map);
- void peer(ObjectStore::Transaction& t,
+ void peer(ObjectStore::Transaction& t, list<Context*>& tfin,
map< int, map<pg_t,Query> >& query_map,
map<int, MOSDPGInfo*> *activator_map=0);
- void activate(ObjectStore::Transaction& t,
+ void activate(ObjectStore::Transaction& t, list<Context*>& tfin,
map<int, MOSDPGInfo*> *activator_map=0);
virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
Context *finish_sync_event;
- void finish_recovery();
+ void finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin);
void _finish_recovery(Context *c);
void cancel_recovery();
void clear_recovery_state();