if (!diri || g_conf.client_use_random_mds) {
// no root info, pick a random MDS
mds = mdsmap->get_random_in_mds();
- dout(0) << "random mds" << mds << dendl;
+ dout(10) << "random mds" << mds << dendl;
if (mds < 0) mds = 0;
if (0) {
if (osd == whoami) continue;
messenger->mark_down(osdmap->get_addr(i->first));
peer_map_epoch.erase(entity_name_t::OSD(i->first));
-
- // kick any replica ops
- for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
- it != pg_map.end();
- it++) {
- PG *pg = it->second;
-
- pg->lock();
- pg->note_failed_osd(osd);
- pg->unlock();
- }
}
for (map<int32_t,entity_addr_t>::iterator i = inc.new_up.begin();
i != inc.new_up.end();
if (oldrole == 0 || pg->get_role() == 0)
pg->clear_primary_state();
+ // pg->on_*
+ for (int i=0; i<oldacting.size(); i++)
+ if (osdmap->is_down(oldacting[i]))
+ pg->on_osd_failure(oldacting[i]);
pg->on_change();
- if (oldacker != pg->get_acker() && oldacker == whoami) {
+ if (oldacker != pg->get_acker() && oldacker == whoami)
pg->on_acker_change();
- }
if (role != oldrole) {
// old primary?
// REGULAR OP (non-replication)
// note original source
- op->set_client_inst( op->get_source_inst() );
op->clear_payload(); // and hose encoded payload (in case we forward)
// have pg?
*/
void OSD::enqueue_op(PG *pg, Message *op)
{
+ dout(15) << *pg << " enqueue_op " << op << " " << *op << dendl;
// add to pg's op_queue
pg->op_queue.push_back(op);
pending_ops++;
virtual bool is_missing_object(object_t oid) = 0;
virtual void wait_for_missing_object(object_t oid, MOSDOp *op) = 0;
- virtual void note_failed_osd(int osd) = 0;
-
+ virtual void on_osd_failure(int osd) = 0;
virtual void on_acker_change() = 0;
virtual void on_role_change() = 0;
virtual void on_change() = 0;
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl;
osd->messenger->send_message(reply, repop->op->get_client_inst());
+ } else {
+ dout(10) << "put_repop NOT sending ack on " << *repop << dendl;
}
repop->sent_ack = true;
-void ReplicatedPG::note_failed_osd(int o)
+void ReplicatedPG::on_osd_failure(int o)
{
- dout(10) << "note_failed_osd " << o << dendl;
+ dout(10) << "on_osd_failure " << o << dendl;
// do async; repop_ack() may modify pg->repop_gather
list<RepGather*> ls;
for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
p != rep_gather.end();
p++) {
- //dout(-1) << "checking repop tid " << p->first << dendl;
+ dout(-1) << "checking repop tid " << p->first << dendl;
if (p->second->waitfor_ack.count(o) ||
p->second->waitfor_commit.count(o))
ls.push_back(p->second);
void ReplicatedPG::on_change()
{
- // apply repops
- for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
- p != rep_gather.end();
- p++) {
- if (!p->second->applied)
- apply_repop(p->second);
- delete p->second->op;
- delete p->second;
+ dout(10) << "on_change" << dendl;
+
+ if (g_conf.osd_rep == OSD_REP_PRIMARY ||
+ g_conf.osd_rep == OSD_REP_SPLAY) {
+ // apply all local repops
+ // (pg is inactive; we will repeer)
+ for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
+ p != rep_gather.end();
+ p++)
+ if (!p->second->applied)
+ apply_repop(p->second);
+ }
+ else if (g_conf.osd_rep == OSD_REP_CHAIN) {
+ // apply all local repops
+ // (pg is inactive; we will repeer)
+ // note: because we hose rep_gather, clients must resubmit ops on ANY pg membership change.
+ for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
+ p != rep_gather.end();
+ p++) {
+ if (!p->second->applied)
+ apply_repop(p->second);
+ delete p->second->op;
+ delete p->second;
+ }
+ rep_gather.clear();
+
+ // and discard repop waiters (chain/splay artifact)
+ for (hash_map<tid_t, list<Message*> >::iterator p = waiting_for_repop.begin();
+ p != waiting_for_repop.end();
+ p++)
+ for (list<Message*>::iterator pm = p->second.begin();
+ pm != p->second.end();
+ pm++)
+ delete *pm;
+ waiting_for_repop.clear();
}
- rep_gather.clear();
-
- // and discard repop waiters (chain/splay artifact)
- for (hash_map<tid_t, list<Message*> >::iterator p = waiting_for_repop.begin();
- p != waiting_for_repop.end();
- p++)
- for (list<Message*>::iterator pm = p->second.begin();
- pm != p->second.end();
- pm++)
- delete *pm;
- waiting_for_repop.clear();
}
bool is_missing_object(object_t oid);
void wait_for_missing_object(object_t oid, MOSDOp *op);
- void note_failed_osd(int o);
+ void on_osd_failure(int o);
void on_acker_change();
void on_role_change();
void on_change();
i++) {
if (!i->second.active_tids.empty() &&
i->second.last < cutoff) {
- dout(10) << "tick pg " << i->first << " is laggy" << dendl;
+ dout(10) << "tick pg " << i->first << " is laggy: " << i->second.active_tids << dendl;
maybe_request_map();
- break;
+ //break;
}
}
break;
case OSD_OP_STAT:
- handle_osd_stat_reply(m);
- break;
+ handle_osd_stat_reply(m);
+ break;
case OSD_OP_WRNOOP:
case OSD_OP_WRITE:
dout(7) << "handle_osd_modify_reply " << tid
<< (m->get_commit() ? " commit":" ack")
- << " v " << m->get_version()
+ << " v " << m->get_version() << " in " << m->get_pg()
<< dendl;
OSDModify *wr = op_modify[ tid ];
// remove from tid/osd maps
assert(pg.active_tids.count(tid));
pg.active_tids.erase(tid);
+ dout(15) << "handle_osd_modify_reply pg " << m->get_pg() << " still has " << pg.active_tids << dendl;
if (pg.active_tids.empty()) close_pg( m->get_pg() );
// commit.