- soft consistency on (kernel) lookup?
- accurate reconstruction of (syscall) path?
-software raid layer for EBOFS?
-- actually, we just need software raid _awareness_ in the allocator, so
- that we can write only full stripes, without fear of clobbering things on
- failure. then use MD or similar layer provided by kernel.
sage doc
} else {
int fd = client->open(f.c_str(), O_WRONLY|O_CREAT);
assert(fd > 0);
- client->write(fd, " ", 1, size-1);
+ client->write(fd, "", 0, size);
client->close(fd);
client->chmod(f.c_str(), mode & 0777);
#include "include/Context.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " TIMER "
-#define derr(x) if (x <= g_conf.debug) cerr << g_clock.now() << " TIMER "
+#define dout(x) if (x <= g_conf.debug_timer) cout << g_clock.now() << " TIMER "
+#define derr(x) if (x <= g_conf.debug_timer) cerr << g_clock.now() << " TIMER "
#define DBL 10
debug_mds_log: 1,
debug_mds_migrator: 1,
debug_buffer: 0,
+ debug_timer: 0,
debug_filer: 0,
debug_objecter: 0,
debug_journaler: 0,
g_conf.debug_buffer = atoi(args[++i]);
else
g_debug_after_conf.debug_buffer = atoi(args[++i]);
+ else if (strcmp(args[i], "--debug_timer") == 0)
+ if (!g_conf.debug_after)
+ g_conf.debug_timer = atoi(args[++i]);
+ else
+ g_debug_after_conf.debug_timer = atoi(args[++i]);
else if (strcmp(args[i], "--debug_filer") == 0)
if (!g_conf.debug_after)
g_conf.debug_filer = atoi(args[++i]);
int debug_mds_log;
int debug_mds_migrator;
int debug_buffer;
+ int debug_timer;
int debug_filer;
int debug_objecter;
int debug_journaler;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MOSDPGACTIVATESET_H
+#define __MOSDPGACTIVATESET_H
+
+#include "msg/Message.h"
+
+class MOSDPGActivateSet : public Message {
+ epoch_t epoch;
+
+public:
+ list<PG::Info> pg_info;
+
+ epoch_t get_epoch() { return epoch; }
+
+ MOSDPGActivateSet() {}
+ MOSDPGActivateSet(version_t mv) :
+ Message(MSG_OSD_PG_ACTIVATE_SET),
+ epoch(mv) { }
+
+ char *get_type_name() { return "pg_activate_set"; }
+ void print(ostream& out) {
+ out << "pg_activate_set(" << pg_info.size() << " pgs e" << epoch << ")";
+ }
+
+ void encode_payload() {
+ ::_encode(epoch, payload);
+ ::_encode(pg_info, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(epoch, payload, off);
+ ::_decode(pg_info, payload, off);
+ }
+};
+
+#endif
class MOSDPGLog : public Message {
epoch_t epoch;
- pg_t pgid;
public:
PG::Info info;
PG::Missing missing;
epoch_t get_epoch() { return epoch; }
- pg_t get_pgid() { return pgid; }
+ pg_t get_pgid() { return info.pgid; }
MOSDPGLog() {}
- MOSDPGLog(version_t mv, pg_t pgid) :
- Message(MSG_OSD_PG_LOG) {
- this->epoch = mv;
- this->pgid = pgid;
- }
+ MOSDPGLog(version_t mv, PG::Info& i) :
+ Message(MSG_OSD_PG_LOG),
+ epoch(mv), info(i) { }
char *get_type_name() { return "PGlog"; }
void print(ostream& out) {
- out << "pg_log(" << pgid << " e" << epoch << ")";
+ out << "pg_log(" << info.pgid << " e" << epoch << ")";
}
void encode_payload() {
payload.append((char*)&epoch, sizeof(epoch));
- payload.append((char*)&pgid, sizeof(pgid));
payload.append((char*)&info, sizeof(info));
log._encode(payload);
missing._encode(payload);
int off = 0;
payload.copy(off, sizeof(epoch), (char*)&epoch);
off += sizeof(epoch);
- payload.copy(off, sizeof(pgid), (char*)&pgid);
- off += sizeof(pgid);
payload.copy(off, sizeof(info), (char*)&info);
off += sizeof(info);
log._decode(payload, off);
client = mount->get_source().num();
// choose a client id
- if (client < 0 ||
- (client_map.client_addr.count(client) &&
- client_map.client_addr[client] != addr)) {
+ if (client < 0) {
client = pending_inc.next_client;
dout(10) << "mount: assigned client" << client << " to " << addr << endl;
} else {
dout(10) << "mount: client" << client << " requested by " << addr << endl;
+ if (client_map.client_addr.count(client)) {
+ assert(client_map.client_addr[client] != addr);
+ dout(0) << "mount: WARNING: client" << client << " requested by " << addr
+ << ", which used to be " << client_map.client_addr[client] << endl;
+ }
}
pending_inc.add_mount(client, addr);
{
if (down) {
dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
- pending_inc.new_down[osd] = osdmap.osd_inst[osd];
+ pending_inc.new_down[osd].first = osdmap.osd_inst[osd];
+ pending_inc.new_down[osd].second = false;
} else {
dout(1) << "fake_osd_failure OUT osd" << osd << endl;
pending_inc.new_out.push_back(osd);
for (int dom=0; dom<ndom; dom++) {
for (int j=0; j<nper; j++) {
newmap.osds.insert(i);
- newmap.down_osds.insert(i); // initially DOWN
+ newmap.down_osds[i] = true; // initially DOWN
domain[dom]->add_item(i, 1.0);
//cerr << "osd" << i << " in domain " << dom << endl;
i++;
int root = newmap.crush.add_bucket(b);
for (int i=0; i<g_conf.num_osd; i++) {
newmap.osds.insert(i);
- newmap.down_osds.insert(i);
+ newmap.down_osds[i] = true;
b->add_item(i, 1.0);
}
// add mds osds, but don't put them in the crush mapping func
for (int i=0; i<g_conf.num_mds; i++) {
newmap.osds.insert(i+10000);
- newmap.down_osds.insert(i+10000);
+ newmap.down_osds[i+10000] = true;
}
}
pending_inc.mon_epoch = mon->mon_epoch;
// tell me about it
- for (map<int,entity_inst_t>::iterator i = pending_inc.new_down.begin();
+ for (map<int,pair<entity_inst_t,bool> >::iterator i = pending_inc.new_down.begin();
i != pending_inc.new_down.end();
i++) {
- dout(0) << " osd" << i->first << " DOWN " << i->second << endl;
- derr(0) << " osd" << i->first << " DOWN " << i->second << endl;
- mon->messenger->mark_down(i->second.addr);
+ dout(0) << " osd" << i->first << " DOWN " << i->second.first << " clean=" << i->second.second << endl;
+ derr(0) << " osd" << i->first << " DOWN " << i->second.first << " clean=" << i->second.second << endl;
+ mon->messenger->mark_down(i->second.first.addr);
}
for (map<int,entity_inst_t>::iterator i = pending_inc.new_up.begin();
i != pending_inc.new_up.end();
assert(osdmap.is_up(badboy));
assert(osdmap.osd_inst[badboy] == m->get_failed());
- pending_inc.new_down[badboy] = m->get_failed();
+ pending_inc.new_down[badboy].first = m->get_failed();
+ pending_inc.new_down[badboy].second = false;
if (osdmap.is_in(badboy))
down_pending_out[badboy] = g_clock.now();
assert(osdmap.get_inst(from) != m->inst); // preproces should have caught it
// mark previous guy down
- pending_inc.new_down[from] = osdmap.osd_inst[from];
+ pending_inc.new_down[from].first = osdmap.osd_inst[from];
+ pending_inc.new_down[from].second = false;
paxos->wait_for_commit(new C_RetryMessage(this, m));
} else {
it != osdmap.get_osds().end();
it++) {
if (osdmap.is_down(*it)) continue;
- pending_inc.new_down[*it] = osdmap.get_inst(*it);
+ pending_inc.new_down[*it].first = osdmap.get_inst(*it);
+ pending_inc.new_down[*it].second = true; // FIXME: am i sure it's clean? we need a proper osd shutdown sequence!
}
propose_pending();
#include "messages/MOSDOpReply.h"
#include "messages/MOSDMap.h"
#include "messages/MOSDGetMap.h"
+
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGActivateSet.h"
#include "messages/MClientMount.h"
#include "messages/MClientUnmount.h"
case MSG_OSD_PG_REMOVE:
m = new MOSDPGRemove();
break;
+ case MSG_OSD_PG_ACTIVATE_SET:
+ m = new MOSDPGActivateSet();
+ break;
// clients
case MSG_CLIENT_MOUNT:
#define MSG_OSD_PG_SUMMARY 52
#define MSG_OSD_PG_LOG 53
#define MSG_OSD_PG_REMOVE 54
+#define MSG_OSD_PG_ACTIVATE_SET 55
// -- client --
// to monitor
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGActivateSet.h"
#include "common/Logger.h"
#include "common/LogType.h"
// remove from map
pg_map.erase(pgid);
- pg->put();
// unlock, and probably delete
pg->put_unlock(); // will delete, if last reference
* check epochs starting from start to verify the pg acting set hasn't changed
* up until now
*/
-void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
+void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from,
+ vector<int>& last)
{
dout(15) << "project_pg_history " << pgid
<< " from " << from << " to " << osdmap->get_epoch()
<< ", start " << h
<< dendl;
- vector<int> last;
- osdmap->pg_to_acting_osds(pgid, last);
-
for (epoch_t e = osdmap->get_epoch()-1;
e >= from;
e--) {
// acting set change?
if (acting != last &&
- e <= h.same_since) {
+ e > h.same_since) {
dout(15) << "project_pg_history " << pgid << " changed in " << e+1
<< " from " << acting << " -> " << last << dendl;
h.same_since = e+1;
// primary change?
if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
- e <= h.same_primary_since) {
+ e > h.same_primary_since) {
dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << dendl;
h.same_primary_since = e+1;
// acker change?
if (g_conf.osd_rep != OSD_REP_PRIMARY) {
if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
- e <= h.same_acker_since) {
+ e > h.same_acker_since) {
dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << dendl;
h.same_acker_since = e+1;
}
case MSG_OSD_PG_REMOVE:
handle_pg_remove((MOSDPGRemove*)m);
break;
+ case MSG_OSD_PG_ACTIVATE_SET:
+ handle_pg_activate_set((MOSDPGActivateSet*)m);
+ break;
case MSG_OSD_OP:
handle_op((MOSDOp*)m);
t.write( get_osdmap_object_name(cur+1), 0, bl.length(), bl);
// notify messenger
- for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
+ for (map<int,pair<entity_inst_t,bool> >::iterator i = inc.new_down.begin();
i != inc.new_down.end();
i++) {
int osd = i->first;
if (osd == whoami) continue;
- messenger->mark_down(i->second.addr);
+ messenger->mark_down(i->second.first.addr);
peer_map_epoch.erase(MSG_ADDR_OSD(osd));
// kick any replica ops
pg->info.history.same_since =
pg->info.history.same_primary_since =
pg->info.history.same_acker_since = osdmap->get_epoch();
+ pg->write_log(t);
pg->activate(t);
dout(7) << "created " << *pg << dendl;
pg->info.history.same_primary_since =
pg->info.history.same_acker_since =
pg->info.history.same_since = osdmap->get_epoch();
+ pg->write_log(t);
pg->activate(t);
dout(7) << "created " << *pg << dendl;
pg->info.history.same_since =
pg->info.history.same_primary_since =
pg->info.history.same_acker_since = osdmap->get_epoch();
+ pg->write_log(t);
pg->activate(t);
dout(7) << "created " << *pg << dendl;
pg->info.history.same_primary_since =
pg->info.history.same_acker_since =
pg->info.history.same_since = osdmap->get_epoch();
+ pg->write_log(t);
pg->activate(t);
dout(7) << "created " << *pg << dendl;
pg->state_set(PG::STATE_STRAY);
if (nrep == 0) {
- pg->state_set(PG::STATE_CRASHED);
- dout(1) << *pg << " is crashed" << dendl;
+ // did they all shut down cleanly?
+ bool clean = true;
+ vector<int> inset;
+ osdmap->pg_to_osds(pg->info.pgid, inset);
+ for (unsigned i=0; i<inset.size(); i++)
+ if (!osdmap->is_down_clean(inset[i])) clean = false;
+ if (clean) {
+ dout(1) << *pg << " is cleanly inactive" << dendl;
+ } else {
+ pg->state_set(PG::STATE_CRASHED);
+ dout(1) << *pg << " is crashed" << dendl;
+ }
}
}
map< int, list<PG::Info> > notify_list; // primary -> list
map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
+ map<int,MOSDPGActivateSet*> activator_map; // peer -> message
// scan pg's
for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
else if (pg->get_role() == 0 && !pg->is_active()) {
// i am (inactive) primary
pg->build_prior();
- pg->peer(t, query_map);
+ pg->peer(t, query_map, &activator_map);
}
else if (pg->is_stray() &&
pg->get_primary() >= 0) {
// i am residual|replica
notify_list[pg->get_primary()].push_back(pg->info);
}
-
}
if (osdmap->is_mkfs()) // hack: skip the queries/summaries if it's a mkfs
return;
- // notify? (residual|replica)
- do_notifies(notify_list);
-
- // do queries.
+ do_notifies(notify_list); // notify? (residual|replica)
do_queries(query_map);
+ do_activators(activator_map);
logger->set("numpg", pg_map.size());
}
}
+void OSD::do_activators(map<int,MOSDPGActivateSet*>& activator_map)
+{
+ for (map<int,MOSDPGActivateSet*>::iterator p = activator_map.begin();
+ p != activator_map.end();
+ ++p)
+ messenger->send_message(p->second, osdmap->get_inst(p->first));
+ activator_map.clear();
+}
+
+
+
/** PGNotify
// look for unknown PGs i'm primary for
map< int, map<pg_t,PG::Query> > query_map;
+ map<int, MOSDPGActivateSet*> activator_map;
for (list<PG::Info>::iterator it = m->get_pg_list().begin();
it != m->get_pg_list().end();
if (!_have_pg(pgid)) {
// same primary?
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
+
PG::Info::History history = it->history;
- project_pg_history(pgid, history, m->get_epoch());
+ project_pg_history(pgid, history, m->get_epoch(), acting);
if (m->get_epoch() < history.same_primary_since) {
dout(10) << "handle_pg_notify pg " << pgid << " dne, and primary changed in "
<< history.same_primary_since << " (msg from " << m->get_epoch() << ")" << dendl;
continue;
}
+
+ assert(role == 0); // otherwise, probably bug in project_pg_history.
// ok, create PG!
pg = _create_lock_pg(pgid, t);
- osdmap->pg_to_acting_osds(pgid, pg->acting);
- pg->set_role(0);
+ pg->acting.swap(acting);
+ pg->set_role(role);
pg->info.history = history;
-
pg->last_epoch_started_any = it->last_epoch_started;
pg->build_prior();
-
- t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
+ pg->write_log(t);
dout(10) << *pg << " is new" << dendl;
pg->adjust_prior();
// peer
- pg->peer(t, query_map);
+ pg->peer(t, query_map, &activator_map);
}
pg->unlock();
assert(tr == 0);
do_queries(query_map);
+ do_activators(activator_map);
delete m;
}
* NOTE: called with opqueue active.
*/
-void OSD::handle_pg_log(MOSDPGLog *m)
-{
- int from = m->get_source().num();
- const pg_t pgid = m->get_pgid();
- if (!require_same_or_newer_map(m, m->get_epoch())) return;
- if (pg_map.count(pgid) == 0) {
- dout(10) << "handle_pg_log don't have pg " << pgid << ", dropping" << dendl;
- assert(m->get_epoch() < osdmap->get_epoch());
- delete m;
+void OSD::_process_pg_info(epoch_t epoch, int from,
+ PG::Info &info,
+ PG::Log &log,
+ PG::Missing &missing,
+ map<int, MOSDPGActivateSet*>* activator_map)
+{
+ if (pg_map.count(info.pgid) == 0) {
+ dout(10) << "_process_pg_info " << info << " don't have pg" << dendl;
+ assert(epoch < osdmap->get_epoch());
return;
}
- PG *pg = _lookup_lock_pg(pgid);
+ PG *pg = _lookup_lock_pg(info.pgid);
assert(pg);
- if (m->get_epoch() < pg->info.history.same_since) {
- dout(10) << "handle_pg_log " << *pg
- << " from " << m->get_source()
- << " is old, discarding"
- << dendl;
- delete m;
+ dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
+
+ if (epoch < pg->info.history.same_since) {
+ dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
+ pg->unlock();
return;
}
- dout(7) << "handle_pg_log " << *pg
- << " got " << m->log << " " << m->missing
- << " from " << m->get_source() << dendl;
-
//m->log.print(cout);
ObjectStore::Transaction t;
assert(pg->peer_log_requested.count(from) ||
pg->peer_summary_requested.count(from));
- pg->proc_replica_log(m->log, m->missing, from);
+ pg->proc_replica_log(log, missing, from);
// peer
map< int, map<pg_t,PG::Query> > query_map;
- pg->peer(t, query_map);
+ pg->peer(t, query_map, activator_map);
do_queries(query_map);
} else {
// i am REPLICA
- dout(10) << *pg << " got " << m->log << " " << m->missing << dendl;
-
// merge log
- pg->merge_log(m->log, m->missing, from);
- pg->proc_missing(m->log, m->missing, from);
+ pg->merge_log(log, missing, from);
+ pg->proc_missing(log, missing, from);
assert(pg->missing.num_lost() == 0);
// ok activate!
- pg->activate(t);
+ pg->activate(t, activator_map);
}
unsigned tr = store->apply_transaction(t);
assert(tr == 0);
pg->unlock();
+}
+
+
+void OSD::handle_pg_log(MOSDPGLog *m)
+{
+ dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
+
+ int from = m->get_source().num();
+ if (!require_same_or_newer_map(m, m->get_epoch())) return;
+
+ _process_pg_info(m->get_epoch(), from,
+ m->info, m->log, m->missing, 0);
+
+ delete m;
+}
+
+void OSD::handle_pg_activate_set(MOSDPGActivateSet *m)
+{
+ dout(7) << "handle_pg_activate_set " << *m << " from " << m->get_source() << dendl;
+
+ int from = m->get_source().num();
+ if (!require_same_or_newer_map(m, m->get_epoch())) return;
+
+ PG::Log empty_log;
+ PG::Missing empty_missing;
+ map<int,MOSDPGActivateSet*> activator_map;
+
+ for (list<PG::Info>::iterator p = m->pg_info.begin();
+ p != m->pg_info.end();
+ ++p)
+ _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &activator_map);
+
+ do_activators(activator_map);
delete m;
}
PG *pg = 0;
if (pg_map.count(pgid) == 0) {
+ // get active crush mapping
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
+
// same primary?
PG::Info::History history = it->second.history;
- project_pg_history(pgid, history, m->get_epoch());
+ project_pg_history(pgid, history, m->get_epoch(), acting);
if (m->get_epoch() < history.same_since) {
dout(10) << " pg " << pgid << " dne, and pg has changed in "
continue;
}
- // get active crush mapping
- vector<int> acting;
- int nrep = osdmap->pg_to_acting_osds(pgid, acting);
- int role = osdmap->calc_pg_role(whoami, acting, nrep);
-
if (role < 0) {
dout(10) << " pg " << pgid << " dne, and i am not an active replica" << dendl;
PG::Info empty(pgid);
pg->acting.swap( acting );
pg->set_role(role);
pg->info.history = history;
-
- t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
+ pg->write_log(t);
store->apply_transaction(t);
dout(10) << *pg << " dne (before), but i am role " << role << dendl;
dout(10) << *pg << " sending info" << dendl;
notify_list[from].push_back(pg->info);
} else {
- MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->get_pgid());
- m->info = pg->info;
+ MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
m->missing = pg->missing;
if (it->second.type == PG::Query::LOG) {
void _remove_unlock_pg(PG *pg); // remove from store and memory
void load_pgs();
- void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
+ void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from,
+ vector<int>& last);
void activate_pg(pg_t pgid, epoch_t epoch);
class C_Activate : public Context {
void do_notifies(map< int, list<PG::Info> >& notify_list);
void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
+ void do_activators(map<int, MOSDPGActivateSet*>& activator_map);
void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
bool require_current_map(Message *m, epoch_t v);
void handle_pg_query(class MOSDPGQuery *m);
void handle_pg_notify(class MOSDPGNotify *m);
void handle_pg_log(class MOSDPGLog *m);
+ void handle_pg_activate_set(class MOSDPGActivateSet *m);
void handle_pg_remove(class MOSDPGRemove *m);
+ // helper for handle_pg_log and handle_pg_activate_set
+ void _process_pg_info(epoch_t epoch, int from,
+ PG::Info &info,
+ PG::Log &log,
+ PG::Missing &missing,
+ map<int, MOSDPGActivateSet*>* activator_map);
+
public:
OSD(int id, Messenger *m, MonMap *mm, char *dev = 0);
// incremental
map<int32_t,entity_inst_t> new_up;
- map<int32_t,entity_inst_t> new_down;
+ map<int32_t,pair<entity_inst_t,bool> > new_down;
list<int32_t> new_in;
list<int32_t> new_out;
map<int32_t,float> new_overload; // updated overload value
int32_t localized_pg_num_mask; // ditto
set<int32_t> osds; // all osds
- set<int32_t> down_osds; // list of down disks
+ map<int32_t, bool> down_osds; // list of down disks, -> clean shutdown (true/false)
set<int32_t> out_osds; // list of unmapped disks
map<int32_t,float> overload_osds;
map<int32_t,entity_inst_t> osd_inst;
void get_all_osds(set<int>& ls) { ls = osds; }
const set<int>& get_osds() { return osds; }
- const set<int>& get_down_osds() { return down_osds; }
+ const map<int,bool>& get_down_osds() { return down_osds; }
const set<int>& get_out_osds() { return out_osds; }
const map<int,float>& get_overload_osds() { return overload_osds; }
bool exists(int osd) { return osds.count(osd); }
bool is_down(int osd) { return down_osds.count(osd); }
+ bool is_down_clean(int osd) { return down_osds.count(osd) && down_osds[osd]; }
bool is_up(int osd) { return exists(osd) && !is_down(osd); }
bool is_out(int osd) { return out_osds.count(osd); }
bool is_in(int osd) { return exists(osd) && !is_out(osd); }
return false;
}
- void mark_down(int o) { down_osds.insert(o); }
+ void mark_down(int o, bool clean) { down_osds[o] = clean; }
void mark_up(int o) { down_osds.erase(o); }
void mark_out(int o) { out_osds.insert(o); }
void mark_in(int o) { out_osds.erase(o); }
}
// nope, incremental.
- for (map<int32_t,entity_inst_t>::iterator i = inc.new_down.begin();
+ for (map<int32_t,pair<entity_inst_t,bool> >::iterator i = inc.new_down.begin();
i != inc.new_down.end();
i++) {
assert(down_osds.count(i->first) == 0);
- down_osds.insert(i->first);
+ down_osds[i->first] = i->second.second;
assert(osd_inst.count(i->first) == 0 ||
- osd_inst[i->first] == i->second);
+ osd_inst[i->first] == i->second.first);
osd_inst.erase(i->first);
//cout << "epoch " << epoch << " down osd" << i->first << endl;
}
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGActivateSet.h"
#undef dout
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << dbeginl << g_clock.now() << " osd" << osd->whoami << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " "
}
void PG::peer(ObjectStore::Transaction& t,
- map< int, map<pg_t,Query> >& query_map)
+ map< int, map<pg_t,Query> >& query_map,
+ map<int, MOSDPGActivateSet*> *activator_map)
{
dout(10) << "peer. acting is " << acting
<< ", prior_set is " << prior_set << dendl;
// start with the last active set of replicas
set<int> last_started;
vector<int> acting;
+ bool cleanly_down = true;
omap.pg_to_acting_osds(get_pgid(), acting);
for (unsigned i=0; i<acting.size(); i++)
last_started.insert(acting[i]);
//dout(10) << " down in epoch " << e << " is " << omap.get_down_osds() << dendl;
if (omap.is_up(*i))
still_up.insert(*i);
+ else if (!omap.is_down_clean(*i))
+ cleanly_down = false;
}
last_started.swap(still_up);
}
if (last_started.empty()) {
- dout(10) << " crashed since epoch " << last_epoch_started_any << dendl;
- state_set(STATE_CRASHED);
+ if (cleanly_down) {
+ dout(10) << " cleanly stopped since epoch " << last_epoch_started_any << dendl;
+ } else {
+ dout(10) << " crashed since epoch " << last_epoch_started_any << dendl;
+ state_set(STATE_CRASHED);
+ }
} else {
dout(10) << " still active from last started: " << last_started << dendl;
}
}
else if (!is_active()) {
// -- ok, activate!
- activate(t);
+ activate(t, activator_map);
}
}
-void PG::activate(ObjectStore::Transaction& t)
+void PG::activate(ObjectStore::Transaction& t,
+ map<int, MOSDPGActivateSet*> *activator_map)
{
assert(!is_active());
dout(10) << "activate - not complete, " << missing << dendl;
}
-
// if primary..
if (role == 0 &&
osd->osdmap->post_mkfs()) {
int peer = acting[i];
assert(peer_info.count(peer));
- MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(),
- info.pgid);
- m->info = info;
+ MOSDPGLog *m = 0;
if (peer_info[peer].last_update == info.last_update) {
// empty log
- }
- else if (peer_info[peer].last_update < log.bottom) {
- // summary/backlog
- assert(log.backlog);
- m->log = log;
+ if (activator_map) {
+ dout(10) << "activate - peer osd" << peer << " is up to date, queueing in pending_activators" << dendl;
+ if (activator_map->count(peer) == 0)
+ (*activator_map)[peer] = new MOSDPGActivateSet(osd->osdmap->get_epoch());
+ (*activator_map)[peer]->pg_info.push_back(info);
+ } else {
+ dout(10) << "activate - peer osd" << peer << " is up to date, but sending pg_log anyway" << dendl;
+ m = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+ }
}
else {
- // incremental log
- assert(peer_info[peer].last_update < info.last_update);
- m->log.copy_after(log, peer_info[peer].last_update);
+ m = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+ if (peer_info[peer].last_update < log.bottom) {
+ // summary/backlog
+ assert(log.backlog);
+ m->log = log;
+ } else {
+ // incremental log
+ assert(peer_info[peer].last_update < info.last_update);
+ m->log.copy_after(log, peer_info[peer].last_update);
+ }
}
// update local version of peer's missing list!
- {
+ if (m) {
eversion_t plu = peer_info[peer].last_update;
Missing& pm = peer_missing[peer];
for (list<Log::Entry>::iterator p = m->log.log.begin();
pm.add(p->oid, p->version);
}
- dout(10) << "activate sending " << m->log << " " << m->missing
- << " to osd" << peer << dendl;
- //m->log.print(cout);
- osd->messenger->send_message(m, osd->osdmap->get_inst(peer));
+ if (m) {
+ dout(10) << "activate sending " << m->log << " " << m->missing
+ << " to osd" << peer << dendl;
+ //m->log.print(cout);
+ osd->messenger->send_message(m, osd->osdmap->get_inst(peer));
+ }
// update our missing
if (peer_missing[peer].num_missing() == 0) {
-
-
void PG::write_log(ObjectStore::Transaction& t)
{
dout(10) << "write_log" << dendl;
// load bounds
ondisklog.bottom = ondisklog.top = 0;
r = store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
- //assert(r == sizeof(ondisklog.bottom));
+ assert(r == sizeof(ondisklog.bottom));
r = store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
- //assert(r == sizeof(ondisklog.top));
+ assert(r == sizeof(ondisklog.top));
dout(10) << "read_log [" << ondisklog.bottom << "," << ondisklog.top << ")" << dendl;
class OSD;
class MOSDOp;
class MOSDOpReply;
-
+class MOSDPGActivateSet;
/** PG - Replica Placement Group
*
void trim_write_ahead();
- void peer(ObjectStore::Transaction& t, map< int, map<pg_t,Query> >& query_map);
-
- void activate(ObjectStore::Transaction& t);
+ void peer(ObjectStore::Transaction& t,
+ map< int, map<pg_t,Query> >& query_map,
+ map<int, MOSDPGActivateSet*> *activator_map=0);
+ void activate(ObjectStore::Transaction& t,
+ map<int, MOSDPGActivateSet*> *activator_map=0);
virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
inline ostream& operator<<(ostream& out, const PG::Info& pgi)
{
- out << "pginfo(" << pgi.pgid;
+ out << pgi.pgid << "(";
if (pgi.is_empty())
out << " empty";
else
osdmap->apply_incremental(inc);
// notify messenger
- for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
+ for (map<int,pair<entity_inst_t,bool> >::iterator i = inc.new_down.begin();
i != inc.new_down.end();
i++)
- messenger->mark_down(i->second.addr);
+ messenger->mark_down(i->second.first.addr);
}
else if (m->maps.count(e)) {