# include "FakeStore.h"
#endif
-#ifdef USE_EBOFS
-# include "ebofs/Ebofs.h"
-#endif
+#include "ebofs/Ebofs.h"
#include "Ager.h"
-#include "mds/MDS.h"
#include "msg/Messenger.h"
#include "msg/Message.h"
+#include "mds/MDS.h"
#include "msg/HostMonitor.h"
#include "messages/MGenericMessage.h"
#include "messages/MOSDMap.h"
#include "messages/MOSDPGNotify.h"
-#include "messages/MOSDPGPeer.h"
-#include "messages/MOSDPGPeerAck.h"
-#include "messages/MOSDPGUpdate.h"
+#include "messages/MOSDPGQuery.h"
+#include "messages/MOSDPGSummary.h"
#include "common/Logger.h"
#include "common/LogType.h"
#define ROLE_TYPE(x) ((x)>0 ? 1:(x))
+
+
+// <hack> force remount hack for performance testing FakeStore
class C_Remount : public Context {
OSD *osd;
public:
osd_lock.Unlock();
dout(0) << "finished remount" << endl;
}
-
+// </hack>
// cons/des
LogType osd_logtype;
-
OSD::OSD(int id, Messenger *m)
{
whoami = id;
g_timer.add_event_after(g_conf.osd_remount_at, new C_Remount(this));
+ // init object store
// try in this order:
// ebofsdev/all
// ebofsdev/$num
if (::lstat(dev_path, &sta) != 0)
sprintf(dev_path, "%s/%s", ebofs_base_path, hostname);
+ if (g_conf.ebofs) {
+ store = new Ebofs(dev_path);
+ }
#ifdef USE_OBFS
- if (g_conf.uofs) {
+ else if (g_conf.uofs) {
store = new OBFSStore(whoami, NULL, dev_path);
}
-#else
-# ifdef USE_EBOFS
- if (g_conf.ebofs) {
- store = new Ebofs(dev_path);
- } else
-# endif
- store = new FakeStore(osd_base_path, whoami);
#endif
+ else {
+ store = new FakeStore(osd_base_path, whoami);
+ }
// monitor
char s[80];
monitor = new HostMonitor(m, st);
monitor->set_notify_port(MDS_PORT_OSDMON);
- // hack
+ // <hack> for testing monitoring
int i = whoami;
if (++i == g_conf.num_osd) i = 0;
monitor->get_hosts().insert(MSG_ADDR_OSD(i));
monitor->get_hosts().insert(MSG_ADDR_OSD(i));
monitor->get_notify().insert(MSG_ADDR_MDS(0));
+ // </hack>
// log
char name[80];
osd_logtype.add_inc("rlsum");
osd_logtype.add_inc("rlnum");
- // Thread pool
+ // request thread pool
{
char name[80];
sprintf(name,"osd%d.threadpool", whoami);
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
if (store) { delete store; store = 0; }
-
}
int OSD::init()
{
osd_lock.Lock();
+ {
+ // mkfs?
+ if (g_conf.osd_mkfs) {
+ dout(2) << "mkfs" << endl;
+ store->mkfs();
+ }
+
+ // mount.
+ int r = store->mount();
+ assert(r>=0);
+
+ // age?
+ if (g_conf.osd_age_time > 0) {
+ Ager ager(store);
+ ager.age(g_conf.osd_age_time, g_conf.osd_age, g_conf.osd_age / 2.0, 5, g_conf.osd_age);
+ }
- if (g_conf.osd_mkfs) {
- dout(2) << "mkfs" << endl;
-
- store->mkfs();
-
- }
- int r = store->mount();
-
- if (g_conf.osd_age_time > 0) {
- Ager ager(store);
- ager.age(g_conf.osd_age_time, g_conf.osd_age, g_conf.osd_age / 2.0, 5, g_conf.osd_age);
+ // monitor.
+ monitor->init();
}
-
- monitor->init();
-
osd_lock.Unlock();
// i'm ready!
messenger->set_dispatcher(this);
- return r;
+ return 0;
}
int OSD::shutdown()
case MSG_OSD_PG_NOTIFY:
handle_pg_notify((MOSDPGNotify*)m);
break;
- case MSG_OSD_PG_PEER:
- handle_pg_peer((MOSDPGPeer*)m);
- break;
- case MSG_OSD_PG_PEERACK:
- handle_pg_peer_ack((MOSDPGPeerAck*)m);
+ case MSG_OSD_PG_QUERY:
+ handle_pg_query((MOSDPGQuery*)m);
break;
- case MSG_OSD_PG_UPDATE:
- handle_pg_update((MOSDPGUpdate*)m);
+ case MSG_OSD_PG_SUMMARY:
+ handle_pg_summary((MOSDPGSummary*)m);
break;
case MSG_OSD_OP:
void OSD::handle_op_reply(MOSDOpReply *m)
{
// did i get a new osdmap?
- if (m->get_map_version() > osdmap->get_version()) {
+ if (m->get_map_epoch() > osdmap->get_epoch()) {
dout(3) << "replica op reply includes a new osd map" << endl;
update_map(m->get_osdmap());
}
case OSD_OP_REP_PULL:
op_rep_pull_reply(m);
break;
- case OSD_OP_REP_PUSH:
- op_rep_push_reply(m);
- break;
- case OSD_OP_REP_REMOVE:
- op_rep_remove_reply(m);
- break;
case OSD_OP_REP_WRITE:
case OSD_OP_REP_TRUNCATE:
/** update_map
* assimilate a new OSDMap. scan pgs.
*/
-void OSD::update_map(bufferlist& state, bool mkfs)
+void OSD::update_map(bufferlist& state)
{
// decode new map
osdmap = new OSDMap();
osdmap->decode(state);
- osdmaps[osdmap->get_version()] = osdmap;
- dout(7) << "got osd map version " << osdmap->get_version() << endl;
+ osdmaps[osdmap->get_epoch()] = osdmap;
+ dout(7) << "got osd map version " << osdmap->get_epoch() << endl;
// pg list
list<pg_t> pg_list;
- if (mkfs) {
- assert(osdmap->get_version() == 1);
+ if (osdmap->is_mkfs()) {
+ dout(1) << "mkfs" << endl;
+ assert(osdmap->get_epoch() == 1);
ps_t maxps = 1LL << osdmap->get_pg_bits();
-
+
// create PGs
for (int nrep = 1;
nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep); // for low osd counts.. hackish bleh
nrep++) {
for (pg_t ps = 0; ps < maxps; ps++) {
pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
- vector<int> acting;
- osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->get_pg_acting_role(pgid, whoami);
+ if (role < 0) continue;
-
- if (acting[0] == whoami) {
- PG *pg = create_pg(pgid);
- pg->acting = acting;
- pg->set_role(0);
- pg->set_primary_since(osdmap->get_version());
- pg->mark_complete( osdmap->get_version() );
-
- dout(7) << "created " << *pg << endl;
+ PG *pg = create_pg(pgid);
+ osdmap->pg_to_acting_osds(pgid, pg->acting);
+ pg->set_role(role);
+ pg->info.last_epoch_started = pg->info.same_primary_since = osdmap->get_epoch();
+ pg->last_epoch_started_any = osdmap->get_epoch();
+ pg->mark_complete();
+ pg->mark_active();
- pg_list.push_back(pgid);
- }
+ dout(7) << "created " << *pg << endl;
+ pg_list.push_back(pgid);
}
// local PG too
pg_t pgid = osdmap->osd_nrep_to_pg(whoami, nrep);
- vector<int> acting;
- osdmap->pg_to_acting_osds(pgid, acting);
-
- if (acting[0] == whoami) {
- PG *pg = create_pg(pgid);
- pg->acting = acting;
- pg->set_role(0);
- pg->set_primary_since(osdmap->get_version());
- pg->mark_complete( osdmap->get_version() );
-
- dout(7) << "created " << *pg << endl;
- pg_list.push_back(pgid);
- }
+ int role = osdmap->get_pg_acting_role(pgid, whoami);
+ if (role < 0) continue;
+ PG *pg = create_pg(pgid);
+ osdmap->pg_to_acting_osds(pgid, pg->acting);
+ pg->set_role(role);
+ pg->info.last_epoch_started = pg->info.same_primary_since = osdmap->get_epoch();
+ pg->last_epoch_started_any = osdmap->get_epoch();
+ pg->mark_complete();
+ pg->mark_active();
+
+ dout(7) << "created " << *pg << endl;
+ pg_list.push_back(pgid);
}
-
-
} else {
// get pg list
get_pg_list(pg_list);
advance_map(pg_list);
activate_map(pg_list);
- /*
- if (mkfs) {
- // mark all peers complete
- for (list<pg_t>::iterator pgid = pg_list.begin();
- pgid != pg_list.end();
- pgid++) {
- PG *pg = get_pg(*pgid);
- for (map<int,PGPeer*>::iterator it = pg->peers.begin();
- it != pg->peers.end();
- it++) {
- PGPeer *p = it->second;
- //dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
-
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
- MSG_ADDR_OSD(p->get_peer()));
- }
- }
- }*/
-
-
// process waiters
take_waiters(waiting_for_osdmap);
}
-void OSD::handle_osd_map(MOSDMap *m)
-{
- // wait for ops to finish
- wait_for_no_ops();
-
- if (m->is_mkfs()) {
- dout(2) << "MKFS" << endl;
- }
-
- if (!osdmap ||
- m->get_version() > osdmap->get_version()) {
- if (osdmap) {
- dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
- } else {
- dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
- }
-
- update_map(m->get_osdmap(), m->is_mkfs());
-
- } else {
- dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
- }
-
- if (m->is_mkfs()) {
- // ack
- messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK),
- m->get_source());
- }
-
- delete m;
-}
-
-
-OSDMap* OSD::get_osd_map(version_t v)
-{
- assert(osdmaps[v]);
- return osdmaps[v];
-}
-
-
-
-// ======================================================
-// REPLICATION
-
-// PG
-
-void OSD::get_pg_list(list<pg_t>& ls)
-{
- // just list collections; assume they're all pg's (for now)
- store->list_collections(ls);
-}
-
-bool OSD::pg_exists(pg_t pgid)
-{
- return store->collection_exists(pgid);
-}
-
-PG *OSD::create_pg(pg_t pgid)
-{
- assert(pg_map.count(pgid) == 0);
- assert(!pg_exists(pgid));
-
- PG *pg = new PG(whoami, pgid);
- //pg->info.created = osdmap->get_version();
-
- pg->store(store);
- pg_map[pgid] = pg;
- return pg;
-}
-
-PG *OSD::get_pg(pg_t pgid)
-{
- // already open?
- if (pg_map.count(pgid))
- return pg_map[pgid];
-
- // exists?
- if (!pg_exists(pgid))
- return 0;
-
- // open, stat collection
- PG *pg = new PG(whoami, pgid);
- pg->fetch(store);
- pg_map[pgid] = pg;
-
- return pg;
-}
-
-
-
-
/**
* scan placement groups, initiate any replication
*/
void OSD::advance_map(list<pg_t>& ls)
{
- dout(7) << "advance_map version " << osdmap->get_version() << endl;
+ dout(7) << "advance_map version " << osdmap->get_epoch() << endl;
// scan pg's
for (list<pg_t>::iterator it = ls.begin();
PG *pg = get_pg(pgid);
assert(pg);
+ // did i finish this epoch?
+ if (pg->is_active()) {
+ assert(pg->info.last_epoch_started == osdmap->get_epoch());
+ pg->info.last_epoch_finished = osdmap->get_epoch();
+ }
+
// get new acting set
vector<int> acting;
int nrep = osdmap->pg_to_acting_osds(pgid, acting);
// no change?
if (acting == pg->acting)
continue;
-
+
+ // primary changed?
+ if (pg->acting[0] != acting[0]) {
+ pg->info.same_primary_since = osdmap->get_epoch();
+ }
+
if (role != pg->get_role()) {
- // role change.
+ // my role changed.
dout(10) << " " << *pg << " role change " << pg->get_role() << " -> " << role << endl;
// old primary?
if (pg->get_role() == 0) {
- // drop peers
- take_waiters(pg->waiting_for_peered);
+ // take waiters
+ take_waiters(pg->waiting_for_active);
for (hash_map<object_t, list<Message*> >::iterator it = pg->waiting_for_missing_object.begin();
it != pg->waiting_for_missing_object.end();
it++)
take_waiters(it->second);
pg->waiting_for_missing_object.clear();
- for (hash_map<object_t, list<Message*> >::iterator it = pg->waiting_for_clean_object.begin();
- it != pg->waiting_for_clean_object.end();
- it++)
- take_waiters(it->second);
- pg->waiting_for_clean_object.clear();
-
+ // drop peers
pg->drop_peers();
- pg->state_clear(PG_STATE_CLEAN);
- pg->discard_recovery_plan();
+ pg->state_clear(PG::STATE_CLEAN);
}
-
+
// new primary?
if (role == 0) {
- pg->set_primary_since(osdmap->get_version());
- pg->state_clear(PG_STATE_PEERED);
+ pg->state_clear(PG::STATE_ACTIVE);
} else {
// we need to announce
- pg->state_set(PG_STATE_STRAY);
+ pg->state_set(PG::STATE_ACTIVE);
if (nrep == 0)
dout(1) << "crashed pg " << *pg << endl;
// no role change.
// did primary change?
if (primary != pg->get_primary()) {
- dout(10) << " " << *pg << " acting primary change " << pg->get_primary() << " -> " << primary << ", !peered" << endl;
+ dout(10) << " " << *pg << " acting primary change "
+ << pg->get_primary() << " -> " << primary
+ << ", !peered" << endl;
// we need to announce
- pg->state_set(PG_STATE_STRAY);
+ pg->state_set(PG::STATE_STRAY);
} else {
// primary is the same.
if (role == 0) {
- // i am (still) primary. but replica set changed.
+ // i am (still) primary. but my replica set changed.
dout(10) << " " << *pg << " replica set changed, !clean !peered" << endl;
- pg->state_clear(PG_STATE_PEERED);
- pg->state_clear(PG_STATE_CLEAN);
+ pg->state_clear(PG::STATE_ACTIVE);
+ pg->state_clear(PG::STATE_CLEAN);
}
}
}
// update PG
pg->acting = acting;
pg->calc_role(whoami);
- pg->store(store);
-
+ //pg->store();
// scan down osds
for (set<int>::const_iterator down = osdmap->get_down_osds().begin();
down != osdmap->get_down_osds().end();
down++) {
- PGPeer *pgp = pg->get_peer(*down);
+ PG::PGPeer *pgp = pg->get_peer(*down);
if (!pgp) continue;
-
+
dout(10) << " " << *pg << " peer osd" << *down << " is down, removing" << endl;
pg->remove_peer(*down);
handle_rep_op_ack(*tid, -1, false, *down);
}
}
-
}
-
}
void OSD::activate_map(list<pg_t>& ls)
{
- dout(7) << "activate_map version " << osdmap->get_version() << endl;
+ dout(7) << "activate_map version " << osdmap->get_epoch() << endl;
- map< int, map<pg_t, version_t> > notify_list; // primary -> pgid -> last_any_complete
- map< int, map<PG*,int> > start_map; // peer -> PG -> peer_role
+ map< int, list<PG::PGInfo> > notify_list; // primary -> list
+ map< int, map<pg_t,version_t> > query_map; // peer -> PG -> get_summary_since
// scan pg's
for (list<pg_t>::iterator it = ls.begin();
if (pg->get_role() == 0) {
// i am primary
- start_peers(pg, start_map);
- }
+ repeer(pg, query_map);
+ }
else if (pg->is_stray()) {
// i am residual|replica
- notify_list[pg->get_primary()][pgid] = pg->get_last_any_complete();
+ notify_list[pg->get_primary()].push_back(pg->info);
}
}
- // notify? (residual|replica)
- for (map< int, map<pg_t, version_t> >::iterator pit = notify_list.begin();
- pit != notify_list.end();
- pit++)
- peer_notify(pit->first, pit->second);
-
- // start peer? (primary)
- for (map< int, map<PG*, int> >::iterator pit = start_map.begin();
- pit != start_map.end();
- pit++)
- peer_start(pit->first, pit->second);
-
+ if (!osdmap->is_mkfs()) { // hack: skip the queries/summaries if it's a mkfs
+ // notify? (residual|replica)
+ do_notifies(notify_list);
+
+ // do queries.
+ do_queries(query_map);
+ }
}
-/** peer_notify
- * Send an MOSDPGNotify to a primary, with a list of PGs that I have
- * content for, and they are primary for.
- */
-void OSD::peer_notify(int primary, map<pg_t, version_t>& pg_list)
-{
- dout(7) << "peer_notify osd" << primary << " on " << pg_list.size() << " PGs" << endl;
- MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_version(), pg_list);
- messenger->send_message(m, MSG_ADDR_OSD(primary));
-}
-
-void OSD::start_peers(PG *pg, map< int, map<PG*,int> >& start_map)
+void OSD::handle_osd_map(MOSDMap *m)
{
- dout(10) << " " << *pg << " last_any_complete " << pg->get_last_any_complete() << endl;
-
- // determine initial peer set
- map<int,int> peerset; // peer -> role
-
- // prior map(s), if OSDs are still up
- for (version_t epoch = pg->get_last_any_complete();
- epoch < osdmap->get_version();
- epoch++) {
- OSDMap *omap = get_osd_map(epoch);
- assert(omap);
-
- vector<int> acting;
- omap->pg_to_acting_osds(pg->get_pgid(), acting);
-
- for (unsigned i=0; i<acting.size(); i++)
- if (osdmap->is_up(acting[i]))
- peerset[acting[i]] = -1;
- }
-
- // current map
- for (unsigned i=1; i<pg->acting.size(); i++)
- peerset[pg->acting[i]] = i>0 ? 1:0;
-
-
- // check peers
- bool havepeers = true;
- for (map<int,int>::iterator it = peerset.begin();
- it != peerset.end();
- it++) {
- int who = it->first;
- int role = it->second;
- if (who == whoami) continue; // nevermind me
+ // wait for ops to finish
+ wait_for_no_ops();
- PGPeer *pgp = pg->get_peer(who);
- if (pgp && pgp->is_active() &&
- pgp->get_role() == role) {
- dout(10) << " " << *pg << " actively peered with osd" << who << " role " << role << endl;
+ if (!osdmap ||
+ m->get_epoch() > osdmap->get_epoch()) {
+ if (osdmap) {
+ dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << " > " << osdmap->get_epoch() << endl;
} else {
- if (pgp) {
- pg->remove_peer(who);
- dout(10) << " " << *pg << " need to re-peer with osd" << who << " role " << role << endl;
- } else {
- dout(10) << " " << *pg << " need to peer with osd" << who << " role " << role << endl;
- }
- start_map[who][pg] = role;
- havepeers = false;
+ dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << endl;
}
- }
- if (havepeers &&
- !pg->is_peered()) {
- dout(10) << " " << *pg << " already has necessary peers, analyzing" << endl;
- pg->mark_peered();
- take_waiters(pg->waiting_for_peered);
+ update_map(m->get_osdmap());
- plan_recovery(pg);
- do_recovery(pg);
+ } else {
+ dout(3) << "handle_osd_map ignoring osd map epoch " << m->get_epoch() << " <= " << osdmap->get_epoch() << endl;
+ }
+
+ if (osdmap->is_mkfs()) {
+ // ack
+ messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK),
+ m->get_source());
}
+
+ delete m;
}
-/** peer_start
- * initiate a peer session with a replica on given list of PGs
- */
-void OSD::peer_start(int replica, map<PG*,int>& pg_map)
+OSDMap* OSD::get_osd_map(version_t v)
{
- dout(7) << "peer_start with osd" << replica << " on " << pg_map.size() << " PGs" << endl;
-
- list<pg_t> pgids;
-
- for (map<PG*,int>::iterator it = pg_map.begin();
- it != pg_map.end();
- it++) {
- PG *pg = it->first;
- int role = it->second;
-
- assert(pg->get_peer(replica) == 0);
- //PGPeer *p =
- pg->new_peer(replica, role);
-
- // set last_request stamp?
- // ...
+ assert(osdmaps[v]);
+ return osdmaps[v];
+}
- pgids.push_back(pg->get_pgid()); // add to list
- }
- MOSDPGPeer *m = new MOSDPGPeer(osdmap->get_version(), pgids);
- messenger->send_message(m,
- MSG_ADDR_OSD(replica));
-}
-bool OSD::require_current_map(Message *m, version_t v)
+bool OSD::require_current_map(Message *m, epoch_t ep)
{
int from = MSG_ADDR_NUM(m->get_source());
// older map?
- if (v < osdmap->get_version()) {
- dout(7) << " from old map version " << v << " < " << osdmap->get_version() << endl;
+ if (ep < osdmap->get_epoch()) {
+ dout(7) << " from old map epoch " << ep << " < " << osdmap->get_epoch() << endl;
delete m; // discard and ignore.
return false;
}
// newer map?
- if (v > osdmap->get_version()) {
- dout(7) << " from newer map version " << v << " > " << osdmap->get_version() << endl;
+ if (ep > osdmap->get_epoch()) {
+ dout(7) << " from newer map epoch " << ep << " > " << osdmap->get_epoch() << endl;
wait_for_new_map(m);
return false;
}
return false;
}
- assert(v == osdmap->get_version());
-
+ assert(ep == osdmap->get_epoch());
return true;
}
* require that we have same (or newer) map, and that
* the source is the pg primary.
*/
-bool OSD::require_current_pg_primary(Message *m, version_t v, PG *pg)
+bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
{
int from = MSG_ADDR_NUM(m->get_source());
// newer map?
- if (v > osdmap->get_version()) {
- dout(7) << " from newer map version " << v << " > " << osdmap->get_version() << endl;
+ if (epoch > osdmap->get_epoch()) {
+ dout(7) << " from newer map epoch " << epoch << " > " << osdmap->get_epoch() << endl;
wait_for_new_map(m);
return false;
}
- // older map?
- if (v < osdmap->get_version()) {
- // same primary?
- // FIXME.. line of succession must match!
- if (from != pg->get_primary()) {
- dout(7) << " not from pg primary osd" << pg->get_primary() << ", dropping" << endl;
- delete m; // discard and ignore.
- return false;
- }
- }
-
// down?
if (osdmap->is_down(from)) {
- dout(7) << " from down OSD osd" << from << ", pinging" << endl;
+ dout(7) << " from down OSD osd" << from
+ << ", pinging?" << endl;
+ assert(epoch < osdmap->get_epoch());
// FIXME
return false;
}
-void OSD::handle_pg_notify(MOSDPGNotify *m)
-{
- int from = MSG_ADDR_NUM(m->get_source());
- dout(7) << "handle_pg_notify from osd" << from << endl;
-
- if (!require_current_map(m, m->get_version())) return;
-
- // look for unknown PGs i'm primary for
- map< int, map<PG*,int> > start_map;
- for (map<pg_t, version_t>::iterator it = m->get_pg_list().begin();
- it != m->get_pg_list().end();
- it++) {
- pg_t pgid = it->first;
- PG *pg = get_pg(pgid);
+// ======================================================
+// REPLICATION
- if (!pg) {
- pg = create_pg(pgid);
+// PG
- int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
- assert(nrep > 0);
- assert(pg->acting[0] == whoami);
- pg->set_role(0);
- pg->set_primary_since( osdmap->get_version() ); // FIXME: this may miss a few epochs!
- pg->mark_any_complete( it->second );
+void OSD::get_pg_list(list<pg_t>& ls)
+{
+ // just list collections; assume they're all pg's (for now)
+ store->list_collections(ls);
+}
- dout(10) << " " << *pg << " is new, nrep=" << nrep << endl;
+bool OSD::pg_exists(pg_t pgid)
+{
+ return store->collection_exists(pgid);
+}
- // start peers
- start_peers(pg, start_map);
+PG *OSD::create_pg(pg_t pgid)
+{
+ assert(pg_map.count(pgid) == 0);
+ assert(!pg_exists(pgid));
+
+ PG *pg = new PG(this, pgid);
+
+ //pg->info.created = osdmap->get_epoch();
+ //pg->store(store);
+
+ pg_map[pgid] = pg;
+ return pg;
+}
+
+PG *OSD::get_pg(pg_t pgid)
+{
+ // already open?
+ if (pg_map.count(pgid))
+ return pg_map[pgid];
+
+ // exists?
+ if (!pg_exists(pgid))
+ return 0;
+
+ // open, stat collection
+ PG *pg = new PG(this, pgid);
+ //pg->fetch(store);
+ pg_map[pgid] = pg;
+
+ return pg;
+}
+
+
+
+
+/** do_notifies
+ * Send an MOSDPGNotify to a primary, with a list of PGs that I have
+ * content for, and they are primary for.
+ */
+
+void OSD::do_notifies(map< int, list<PG::PGInfo> >& notify_list)
+{
+ for (map< int, list<PG::PGInfo> >::iterator it = notify_list.begin();
+ it != notify_list.end();
+ it++) {
+ dout(7) << "do_notify osd" << it->first << " on " << it->second.size() << " PGs" << endl;
+ MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second);
+ messenger->send_message(m, MSG_ADDR_OSD(it->first));
+ }
+}
+
+
+/** do_queries
+ * send out pending queries for info | summaries
+ */
+void OSD::do_queries(map< int, map<pg_t,version_t> >& query_map)
+{
+ for (map< int, map<pg_t, version_t> >::iterator pit = query_map.begin();
+ pit != query_map.end();
+ pit++) {
+ int who = pit->first;
+ dout(7) << "do_queries querying osd" << who
+ << " on " << pit->second.size() << " PGs" << endl;
+
+ MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(),
+ pit->second);
+ messenger->send_message(m,
+ MSG_ADDR_OSD(who));
+ }
+}
+
+
+/** repeer()
+ * primary: check, query whatever replicas i need to.
+ */
+void OSD::repeer(PG *pg, map< int, map<pg_t,version_t> >& query_map)
+{
+ dout(10) << "repeer " << *pg << endl;
+
+ // determine initial peer set
+ map<int,int> peerset; // peer -> role
+
+ // prior map(s), if OSDs are still up
+ for (version_t epoch = pg->last_epoch_started_any;
+ epoch < osdmap->get_epoch();
+ epoch++) {
+ OSDMap *omap = get_osd_map(epoch);
+ assert(omap);
+
+ vector<int> acting;
+ omap->pg_to_acting_osds(pg->get_pgid(), acting);
+
+ for (unsigned i=0; i<acting.size(); i++)
+ if (osdmap->is_up(acting[i]))
+ peerset[acting[i]] = -1;
+ }
+
+ // current map
+ for (unsigned i=1; i<pg->acting.size(); i++)
+ peerset[pg->acting[i]] = i>0 ? 1:0;
+
+
+ // -- query info from everyone.
+ bool haveallinfo = true;
+ for (map<int,int>::iterator it = peerset.begin();
+ it != peerset.end();
+ it++) {
+ int who = it->first;
+ int role = it->second;
+ if (who == whoami) continue; // nevermind me
+
+ PG::PGPeer *pgp = pg->get_peer(who);
+ if (pgp && pgp->have_info()) {
+ dout(10) << *pg << " have info from osd" << who << " role " << role << endl;
+ continue;
+ }
+ if (pgp && pgp->state_test(PG::PGPeer::STATE_QINFO)) {
+ dout(10) << *pg << " waiting for osd" << who << " role " << role << endl;
+ } else {
+ dout(10) << *pg << " querying info from osd" << who << " role " << role << endl;
+ query_map[who][pg->get_pgid()] = 0;
+ }
+ haveallinfo = false;
+ }
+ if (!haveallinfo) return;
+
+
+ // -- ok, we have all info. who has latest PG content summary?
+ version_t newest_update = pg->info.last_update;
+ int newest_update_osd = whoami;
+ version_t oldest_update = pg->info.last_update;
+ PG::PGPeer *newest_update_peer = 0;
+
+ for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
+ it != pg->peers.end();
+ it++) {
+ PG::PGPeer *pgp = it->second;
+ assert(pgp->have_info());
+
+ if (pgp->info.last_update > newest_update) {
+ newest_update = pgp->info.last_update;
+ newest_update_osd = it->first;
+ newest_update_peer = pgp;
+ }
+ if (pgp->get_role() == 1 &&
+ pgp->info.last_update < oldest_update)
+ oldest_update = pgp->info.last_update;
+ }
+
+ if (newest_update_peer) {
+ // get contents from newest.
+ assert(!newest_update_peer->have_summary());
+
+ dout(10) << *pg << " newest PG on osd" << newest_update_osd
+ << " v " << newest_update
+ << ", querying summary"
+ << endl;
+ query_map[newest_update_osd][pg->get_pgid()] = 1;
+ return;
+ } else {
+ dout(10) << *pg << " i have the latest: " << pg->info.last_update << endl;
+ }
+
+
+ // -- find pg contents?
+ if (pg->info.last_complete < pg->info.last_update) {
+ if (pg->content_summary->missing > 0) {
+ // search!
+ dout(10) << *pg << " searching for PG contents, querying all peers" << endl;
+ bool didquery = false;
+ for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
+ it != pg->peers.end();
+ it++) {
+ PG::PGPeer *pgp = it->second;
+ if (pgp->have_summary()) continue;
+ query_map[it->first][pg->get_pgid()] = 1;
+ didquery = true;
+ }
+
+ if (didquery) return;
+ } else {
+ dout(10) << *pg << " i have located all objects" << endl;
+ }
+ } else {
+ dout(10) << *pg << " i have all objects" << endl;
+ }
+
+
+ // -- distribute summary?
+
+ // does anyone need it?
+ //if (oldest_update < pg->info.last_update) {
+
+ // generate summary?
+ if (pg->content_summary == 0)
+ pg->generate_content_summary();
+
+ // distribute summary!
+ for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
+ it != pg->peers.end();
+ it++) {
+ PG::PGPeer *pgp = it->second;
+ if (pgp->get_role() != 1) continue;
+
+ pgp->state_clear(PG::PGPeer::STATE_WAITING);
+ pgp->state_set(PG::PGPeer::STATE_ACTIVE);
+
+ //if (pgp->info.last_update < pg->info.last_update) {
+ dout(10) << *pg << " sending summary to osd" << it->first << endl;
+ MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
+ messenger->send_message(m, MSG_ADDR_OSD(it->first));
+ //}
+ }
+ //} else {
+ //dout(10) << *pg << " nobody needs the summary" << endl;
+ //}
+
+ // plan my own recovery
+ pg->plan_recovery();
+
+ // i am active!
+ pg->state_set(PG::STATE_ACTIVE);
+
+ take_waiters(pg->waiting_for_active);
+
+}
+
+
+
+/** PGNotify
+ * from non-primary to primary
+ * includes PGInfo.
+ */
+
+void OSD::handle_pg_notify(MOSDPGNotify *m)
+{
+ dout(7) << "handle_pg_notify from " << m->get_source() << endl;
+ int from = MSG_ADDR_NUM(m->get_source());
+
+ if (!require_same_or_newer_map(m, m->get_epoch())) return;
+
+ // look for unknown PGs i'm primary for
+ map< int, map<pg_t,version_t> > query_map;
+
+ for (list<PG::PGInfo>::iterator it = m->get_pg_list().begin();
+ it != m->get_pg_list().end();
+ it++) {
+ pg_t pgid = it->pgid;
+ PG *pg = get_pg(pgid);
+
+ if (!pg) {
+ pg = create_pg(pgid);
+
+ int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
+ assert(nrep > 0);
+ assert(pg->acting[0] == whoami);
+ pg->info.same_primary_since = it->same_primary_since;
+ pg->set_role(0);
+
+ dout(10) << " " << *pg << " is new, nrep=" << nrep << endl;
+
+ // start peers
+ repeer(pg, query_map);
// kick any waiters
if (waiting_for_pg.count(pgid)) {
take_waiters(waiting_for_pg[pgid]);
waiting_for_pg.erase(pgid);
}
- }
+ } else {
+ // already had pg.
- if (pg->is_peered()) {
- // we're already peered. what do we do with this guy?
- assert(0);
- }
+ // peered with this guy specifically?
+ PG::PGPeer *pgp = pg->get_peer(from);
+ if (!pgp) {
+ int role = osdmap->get_pg_role(pg->get_pgid(), from);
+ pgp = pg->new_peer(from, role);
+ }
- if (it->second > pg->get_last_any_complete())
- pg->mark_any_complete( it->second );
+ pgp->info = *it;
+ pgp->state_set(PG::PGPeer::STATE_INFO);
- // peered with this guy specifically?
- PGPeer *pgp = pg->get_peer(from);
- if (!pgp &&
- start_map[from].count(pg) == 0) {
- dout(7) << " " << *pg << " primary needs to peer with residual notifier osd" << from << endl;
- start_map[from][pg] = -1;
+ repeer(pg, query_map);
}
}
- // start peers?
- if (start_map.empty()) {
- dout(7) << " no new peers" << endl;
- } else {
- for (map< int, map<PG*,int> >::iterator pit = start_map.begin();
- pit != start_map.end();
- pit++)
- peer_start(pit->first, pit->second);
- }
+ do_queries(query_map);
delete m;
}
-void OSD::handle_pg_peer(MOSDPGPeer *m)
+
+/** PGQuery
+ * from primary to replica | other
+ */
+void OSD::handle_pg_query(MOSDPGQuery *m)
{
+ dout(7) << "handle_pg_query from " << m->get_source() << endl;
int from = MSG_ADDR_NUM(m->get_source());
- dout(7) << "handle_pg_peer from osd" << from << endl;
-
- if (!require_current_map(m, m->get_version())) return;
-
- // go
- MOSDPGPeerAck *ack = new MOSDPGPeerAck(osdmap->get_version());
-
- for (list<pg_t>::iterator it = m->get_pg_list().begin();
- it != m->get_pg_list().end();
+
+ if (!require_same_or_newer_map(m, m->get_epoch())) return;
+
+ map< int, list<PG::PGInfo> > notify_list;
+
+ for (map<pg_t,version_t>::iterator it = m->pg_list.begin();
+ it != m->pg_list.end();
it++) {
- pg_t pgid = *it;
-
- // open PG
+ pg_t pgid = it->first;
PG *pg = get_pg(pgid);
- // dne?
if (!pg) {
// get active rush mapping
vector<int> acting;
if (role < 0) {
dout(10) << " pg " << hex << pgid << dec << " dne, and i am not an active replica" << endl;
- ack->pg_dne.push_back(pgid);
+ PG::PGInfo empty(pgid);
+ notify_list[from].push_back(empty);
continue;
}
pg->acting = acting;
pg->set_role(role);
- //if (m->get_version() == 1) pg->mark_complete(); // hack... need a more elegant solution
-
- dout(10) << " " << *pg << " dne (before), but i am role " << role << endl;
-
- // take any waiters
- if (waiting_for_pg.count(pgid)) {
- take_waiters(waiting_for_pg[pgid]);
- waiting_for_pg.erase(pgid);
- }
+ dout(10) << *pg << " dne (before), but i am role " << role << endl;
}
- // PEER
-
- // report back state and pg content
- ack->pg_state[pgid].state = pg->get_state();
- ack->pg_state[pgid].last_complete = pg->get_last_complete();
- ack->pg_state[pgid].last_any_complete = pg->get_last_any_complete();
- pg->scan_local_objects(ack->pg_state[pgid].objects, store); // list my objects
-
- // i am now peered
- pg->state_set(PG_STATE_PEERED);
- pg->state_clear(PG_STATE_STRAY);
-
- if (m->get_version() == 1) {
- pg->mark_complete( m->get_version() ); // it's a mkfs.. mark pg complete too
- }
-
- dout(10) << "sending peer ack " << *pg << " " << ack->pg_state[pgid].objects.size() << " objects" << endl;
+ if (it->second) {
+ // summary
+ MOSDPGSummary *m;
+ if (pg->content_summary == 0) {
+ pg->generate_content_summary();
+ m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
+ delete pg->content_summary;
+ pg->content_summary = 0;
+ } else {
+ m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
+ }
+ messenger->send_message(m, MSG_ADDR_OSD(from));
+ } else {
+ // notify
+ notify_list[from].push_back(pg->info);
+ }
}
-
- // reply
- messenger->send_message(ack,
- MSG_ADDR_OSD(from));
+
+ do_notifies(notify_list);
delete m;
}
-
-void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m)
+void OSD::handle_pg_summary(MOSDPGSummary *m)
{
+ dout(7) << "handle_pg_summary from " << m->get_source() << endl;
int from = MSG_ADDR_NUM(m->get_source());
- dout(7) << "handle_pg_peer_ack from osd" << from << endl;
-
- if (!require_current_map(m, m->get_version())) return;
- // pg_dne first
- for (list<pg_t>::iterator it = m->pg_dne.begin();
- it != m->pg_dne.end();
- it++) {
- PG *pg = get_pg(*it);
- assert(pg);
+ if (!require_same_or_newer_map(m, m->get_epoch())) return;
- dout(10) << " " << *pg << " dne on osd" << from << endl;
-
- PGPeer *pgp = pg->get_peer(from);
- if (pgp) {
- pg->remove_peer(from);
- } else {
- dout(10) << " weird, i didn't have it!" << endl; // multiple lagged peer requests?
- assert(0); // not until peer requests span epochs!
- }
- }
+ map< int, map<pg_t,version_t> > query_map; // peer -> PG -> get_summary_since
- // pg_state
- for (map<pg_t, PGReplicaInfo>::iterator it = m->pg_state.begin();
- it != m->pg_state.end();
- it++) {
- PG *pg = get_pg(it->first);
- assert(pg);
+ pg_t pgid = m->get_pgid();
+ PG::PGContentSummary *sum = m->get_summary();
+ PG *pg = get_pg(pgid);
+ assert(pg);
- dout(10) << " " << *pg << " osd" << from << " remote state " << it->second.state
- << " w/ " << it->second.objects.size() << " objects"
- << ", last_complete " << it->second.last_complete
- << ", last_any_complete " << it->second.last_any_complete
+ if (pg->is_primary()) {
+ // PRIMARY
+ dout(10) << *pg << " got summary from osd" << from
<< endl;
-
- PGPeer *pgp = pg->get_peer(from);
+ PG::PGPeer *pgp = pg->get_peer(from);
assert(pgp);
+ assert(pgp->content_summary == 0); // ?
+ pgp->content_summary = sum;
+ pgp->state_set(PG::PGPeer::STATE_SUMMARY);
- pg->mark_any_complete( it->second.last_any_complete );
-
- pgp->last_complete = it->second.last_complete;
- pgp->objects = it->second.objects;
- pgp->state_set(PG_PEER_STATE_ACTIVE);
-
- // fully peered?
- bool fully = true;
- for (map<int, PGPeer*>::iterator pit = pg->get_peers().begin();
- pit != pg->get_peers().end();
- pit++) {
- dout(10) << " " << *pg << " peer osd" << pit->first << " state " << pit->second->get_state() << endl;
- if (!pit->second->is_active()) fully = false;
+ if (pgp->info.last_update > pg->info.last_update) {
+ // start new summary?
+ if (pg->content_summary == 0)
+ pg->content_summary = new PG::PGContentSummary();
+
+ // assimilate summary info!
+ list<PG::ObjectInfo>::iterator myp = pg->content_summary->ls.begin();
+ list<PG::ObjectInfo>::iterator newp = sum->ls.begin();
+
+ while (newp != sum->ls.end()) {
+ if (myp == pg->content_summary->ls.end()) {
+ // add new item
+ pg->content_summary->ls.insert(myp, 1, *newp);
+ pg->info.last_update = newp->version;
+ if (myp->osd == from) {
+ // remote
+ pg->content_summary->remote++;
+ } else {
+ // missing.
+ myp->osd = -1;
+ pg->content_summary->missing++;
+ }
+ myp++;
+ assert(myp == pg->content_summary->ls.end());
+ } else {
+ assert(myp->oid == newp->oid && myp->version == newp->version);
+ if (myp->osd == -1 && newp->osd == from) {
+ myp->osd = from; // found!
+ pg->content_summary->remote++;
+ pg->content_summary->missing--;
+ }
+ myp++;
+ }
+ newp++;
+ }
+ assert(myp == pg->content_summary->ls.end());
}
-
- if (fully) {
- if (!pg->is_peered()) {
- // now we're peered!
- pg->mark_peered();
- // waiters?
- take_waiters(pg->waiting_for_peered);
+ repeer(pg, query_map);
+
+ } else {
+ // REPLICA
+ dout(10) << *pg << " got summary from primary osd" << from
+ << endl;
+ assert(from == pg->acting[0]);
- dout(10) << " " << *pg << " fully peered, analyzing" << endl;
- plan_recovery(pg);
- do_recovery(pg);
- } else {
- // we're already peered.
- // what's the use of this new guy?
-
- }
- }
- }
+ // copy summary. FIXME.
+ if (pg->content_summary == 0)
+ pg->content_summary = new PG::PGContentSummary();
+ *pg->content_summary = *sum;
- // done
+ // i'm now active!
+ pg->state_set(PG::STATE_ACTIVE);
+
+ // take any waiters
+ take_waiters(pg->waiting_for_active);
+
+ // initiate any recovery?
+ pg->plan_recovery();
+ }
+
delete m;
}
-void OSD::handle_pg_update(MOSDPGUpdate *m)
-{
- int from = MSG_ADDR_NUM(m->get_source());
- dout(7) << "handle_pg_update on " << hex << m->get_pgid() << dec << " from osd" << from
- << " complete=" << m->is_complete()
- << " last_any_complete=" << m->get_last_any_complete()
- << endl;
-
- PG *pg = get_pg(m->get_pgid());
- if (!require_current_pg_primary(m, m->get_version(), pg)) return;
- // update
- if (!pg) {
- dout(7) << "don't have pg " << hex << m->get_pgid() << dec << endl;
- } else {
- // update my info. --what info?
- //pg->assim_info( m->get_pginfo() );
-
- // complete?
- if (m->is_complete()) {
- pg->mark_complete( osdmap->get_version() );
- }
- if (m->get_last_any_complete())
- pg->mark_any_complete( m->get_last_any_complete() );
-
- pg->store(store);
- }
-
- delete m;
-}
// RECOVERY
-void OSD::plan_recovery(PG *pg)
-{
- version_t current_version = osdmap->get_version();
-
- list<PGPeer*> complete_peers;
- pg->plan_recovery(store, current_version, complete_peers);
- if (current_version > 1) {
- for (list<PGPeer*>::iterator it = complete_peers.begin();
- it != complete_peers.end();
- it++) {
- PGPeer *p = *it;
- dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
- MSG_ADDR_OSD(p->get_peer()));
- }
- } else {
- dout(7) << "not sending PGUpdates since this is a mkfs (current_version==1)" << endl;
- }
-}
-void OSD::do_recovery(PG *pg)
-{
- // recover
- if (!pg->is_complete(osdmap->get_version())) {
- pg_pull(pg, max_recovery_ops);
- }
-
- // replicate
- if (pg->is_complete( osdmap->get_version() )) {
- if (!pg->objects_unrep.empty())
- pg_push(pg, max_recovery_ops);
- if (!pg->objects_stray.empty())
- pg_clean(pg, max_recovery_ops);
- }
-}
// pull
{
int ops = pg->num_active_ops();
- dout(7) << "pg_pull pg " << hex << pg->get_pgid() << dec << " " << pg->objects_missing.size() << " to do, " << ops << "/" << maxops << " active" << endl;
+ dout(7) << "pg_pull pg " << *pg
+ << " " << pg->objects_missing.size() << " to do, "
+ << ops << "/" << maxops << " active" << endl;
- while (ops < maxops) {
- object_t oid;
- if (!pg->get_next_pull(oid)) {
- dout(7) << "pg_pull done " << *pg << endl;
- break;
- }
- pull_replica(pg, oid);
+ while (ops < maxops &&
+ !pg->recovery_queue.empty()) {
+ map<version_t, PG::ObjectInfo>::iterator first = pg->recovery_queue.upper_bound(pg->requested_through);
+
+ pull_replica(pg, first->second);
+ pg->requested_through = first->first;
+
ops++;
}
}
-void OSD::pull_replica(PG *pg, object_t oid)
+void OSD::pull_replica(PG *pg, PG::ObjectInfo& oi)
{
- version_t v = pg->objects_missing_v[oid];
-
- // choose a peer
- set<int>::iterator pit = pg->objects_missing[oid].begin();
- PGPeer *p = pg->get_peer(*pit);
- dout(7) << "pull_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
-
- // add to fetching list
- pg->pulling(oid, v, p);
+ // get peer
+ dout(7) << "pull_replica " << hex << oi.oid << dec
+ << " v " << oi.version
+ << " from osd" << oi.osd << endl;
// send op
- __uint64_t tid = ++last_tid;
+ tid_t tid = ++last_tid;
MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
- oid, p->pg->get_pgid(),
- osdmap->get_version(),
+ oi.oid, pg->get_pgid(),
+ osdmap->get_epoch(),
OSD_OP_REP_PULL);
- op->set_version(v);
+ op->set_version(oi.version);
op->set_pg_role(-1); // whatever, not 0
- messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+ messenger->send_message(op, MSG_ADDR_OSD(oi.osd));
- // register
- pull_ops[tid] = p;
+ // take note
+ pull_ops[tid] = oi;
+ pg->objects_pulling[oi.oid] = oi;
}
void OSD::op_rep_pull(MOSDOp *op)
{
long got = 0;
- //lock_object(op->get_oid());
- {
- dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
-
- // get object size
- struct stat st;
- int r = store->stat(op->get_oid(), &st);
- assert(r == 0);
-
- // check version
- version_t v = 0;
- store->getattr(op->get_oid(), "version", &v, sizeof(v));
- assert(v == op->get_version());
-
- // read
- bufferlist bl;
- got = store->read(op->get_oid(),
- st.st_size, 0,
- bl);
- assert(got == st.st_size);
-
- // reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- reply->set_result(0);
- reply->set_data(bl);
- reply->set_length(got);
- reply->set_offset(0);
-
- messenger->send_message(reply, op->get_asker());
- }
- //unlock_object(op->get_oid());
+
+ dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+
+ // get object size
+ struct stat st;
+ int r = store->stat(op->get_oid(), &st);
+ assert(r == 0);
+
+ // check version
+ version_t v = 0;
+ store->getattr(op->get_oid(), "version", &v, sizeof(v));
+ assert(v >= op->get_version());
+
+ // read
+ bufferlist bl;
+ got = store->read(op->get_oid(),
+ st.st_size, 0,
+ bl);
+ assert(got == st.st_size);
+
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+ reply->set_result(0);
+ reply->set_data(bl);
+ reply->set_length(got);
+ reply->set_offset(0);
+ reply->set_version(v);
+
+ messenger->send_message(reply, op->get_asker());
+
delete op;
logger->inc("r_pull");
dout(7) << "rep_pull_reply " << hex << o << dec << " v " << v << " size " << op->get_length() << endl;
- PGPeer *p = pull_ops[op->get_tid()];
- PG *pg = p->pg;
- assert(p); // FIXME: how will this work?
- assert(p->is_pulling(o));
- assert(p->pulling_version(o) == v);
+ PG::ObjectInfo oi = pull_ops[op->get_tid()];
+ assert(v <= op->get_version());
+
+ PG *pg = get_pg(op->get_pg());
+ assert(pg);
+ assert(pg->objects_pulling.count(oi.oid));
// write it and add it to the PG
store->write(o, op->get_length(), 0, op->get_data(), true);
- p->pg->add_object(store, o);
-
+ store->collection_add(pg->get_pgid(), o);
store->setattr(o, "version", &v, sizeof(v));
// close out pull op.
pull_ops.erase(op->get_tid());
+ pg->objects_pulling.erase(o);
- pg->pulled(o, v, p);
+ // bottom of queue?
+ map<version_t,PG::ObjectInfo>::iterator bottom = pg->recovery_queue.begin();
+ if (bottom->first == oi.version)
+ pg->info.last_complete = bottom->first;
+ pg->recovery_queue.erase(oi.version);
// now complete?
- if (pg->objects_missing.empty()) {
- pg->mark_complete(osdmap->get_version());
-
- // distribute new last_any_complete
- dout(7) << " " << *pg << " now complete, updating last_any_complete on peers" << endl;
- for (map<int,PGPeer*>::iterator it = pg->peers.begin();
- it != pg->peers.end();
- it++) {
- PGPeer *p = it->second;
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), false, osdmap->get_version()),
- MSG_ADDR_OSD(p->get_peer()));
- }
+ if (pg->recovery_queue.empty()) {
+ assert(pg->info.last_complete == pg->info.last_update);
+
+ // tell primary?
+ dout(7) << " " << *pg << " recovery complete, telling primary" << endl;
+ list<PG::PGInfo> ls;
+ ls.push_back(pg->info);
+ messenger->send_message(new MOSDPGNotify(osdmap->get_epoch(),
+ ls),
+ MSG_ADDR_OSD(pg->get_primary()));
+ } else {
+ // more?
+ pg->do_recovery();
}
// finish waiters
if (pg->waiting_for_missing_object.count(o))
take_waiters(pg->waiting_for_missing_object[o]);
- // more?
- do_recovery(pg);
-
delete op;
}
-// push
-void OSD::pg_push(PG *pg, int maxops)
-{
- int ops = pg->num_active_ops();
-
- dout(7) << "pg_push pg " << hex << pg->get_pgid() << dec << " " << pg->objects_unrep.size() << " objects, " << ops << "/" << maxops << " active ops" << endl;
-
- while (ops < maxops) {
- object_t oid;
- if (!pg->get_next_push(oid)) {
- dout(7) << "pg_push done " << *pg << endl;
- break;
- }
-
- push_replica(pg, oid);
- ops++;
- }
-}
-
-void OSD::push_replica(PG *pg, object_t oid)
-{
- version_t v = 0;
- store->getattr(oid, "version", &v, sizeof(v));
- assert(v > 0);
-
- set<int>& peers = pg->objects_unrep[oid];
-
- // load object content
- struct stat st;
- store->stat(oid, &st);
- bufferlist bl;
- store->read(oid, st.st_size, 0, bl);
- assert(bl.length() == st.st_size);
-
- dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osds " << peers << " size " << st.st_size << endl;
-
- for (set<int>::iterator pit = peers.begin();
- pit != peers.end();
- pit++) {
- PGPeer *p = pg->get_peer(*pit);
- assert(p);
-
- // add to list
- pg->pushing(oid, v, p);
-
- // send op
- __uint64_t tid = ++last_tid;
- MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
- oid, pg->get_pgid(),
- osdmap->get_version(),
- OSD_OP_REP_PUSH);
- op->set_version(v);
- op->set_pg_role(-1); // whatever, not 0
-
- // include object content
- //op->set_data(bl); // no no bad, will modify bl
- op->get_data() = bl; // _copy_ bufferlist, we may have multiple destinations!
- op->set_length(st.st_size);
- op->set_offset(0);
-
- messenger->send_message(op, MSG_ADDR_OSD(*pit));
-
- // register
- push_ops[tid] = p;
- }
-
-}
-
-void OSD::op_rep_push(MOSDOp *op)
-{
- //lock_object(op->get_oid());
- {
- dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
-
- PG *pg = get_pg(op->get_pg());
- assert(pg);
-
- // exists?
- if (store->exists(op->get_oid())) {
- store->truncate(op->get_oid(), 0);
-
- version_t ov = 0;
- store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
- assert(ov <= op->get_version());
- }
-
- logger->inc("r_push");
- logger->inc("r_pushb", op->get_length());
-
- // write out buffers
- int r = store->write(op->get_oid(),
- op->get_length(), 0,
- op->get_data(),
- false); // FIXME
- pg->add_object(store, op->get_oid());
- assert(r >= 0);
-
- // set version
- version_t v = op->get_version();
- store->setattr(op->get_oid(), "version", &v, sizeof(v));
-
- // reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
- messenger->send_message(reply, op->get_asker());
-
- }
- //unlock_object(op->get_oid());
- delete op;
-}
-
-void OSD::op_rep_push_reply(MOSDOpReply *op)
-{
- object_t oid = op->get_oid();
- version_t v = op->get_version();
-
- dout(7) << "rep_push_reply " << hex << oid << dec << endl;
-
- PGPeer *p = push_ops[op->get_tid()];
- PG *pg = p->pg;
- assert(p); // FIXME: how will this work?
- assert(p->is_pushing(oid));
- assert(p->pushing_version(oid) == v);
-
- // close out push op.
- push_ops.erase(op->get_tid());
- pg->pushed(oid, v, p);
-
- if (p->is_complete()) {
- dout(7) << " telling replica they are complete" << endl;
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
- MSG_ADDR_OSD(p->get_peer()));
- }
-
- // anybody waiting on this object?
- if (pg->waiting_for_clean_object.count(oid) &&
- pg->objects_unrep.count(oid) == 0) {
- dout(7) << "kicking waiter on now replicated object " << hex << oid << dec << endl;
- take_waiters(pg->waiting_for_clean_object[oid]);
- pg->waiting_for_clean_object.erase(oid);
- }
-
- // more?
- do_recovery(pg);
-
- delete op;
-}
-
-
-// clean
-
-void OSD::pg_clean(PG *pg, int maxops)
-{
- int ops = pg->num_active_ops();
-
- dout(7) << "pg_clean pg " << hex << pg->get_pgid() << dec << " " << pg->objects_stray.size() << " stray, " << ops << "/" << maxops << " active ops" << endl;
-
- while (ops < maxops) {
- object_t oid;
- if (!pg->get_next_remove(oid)) {
- dout(7) << "pg_clean done " << *pg << endl;
- break;
- }
-
- remove_replica(pg, oid);
- ops++;
- }
-}
-
-void OSD::remove_replica(PG *pg, object_t oid)
-{
- dout(7) << "remove_replica " << hex << oid << dec << endl;//" v " << v << " from osd" << p->get_peer() << endl;
-
- map<int,version_t>& stray = pg->objects_stray[oid];
- for (map<int, version_t>::iterator it = stray.begin();
- it != stray.end();
- it++) {
- PGPeer *p = pg->get_peer(it->first);
- assert(p);
- const version_t v = it->second;
-
- // add to list
- pg->removing(oid, v, p);
-
- // send op
- __uint64_t tid = ++last_tid;
- MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
- oid, p->pg->get_pgid(),
- osdmap->get_version(),
- OSD_OP_REP_REMOVE);
- op->set_version(v);
- op->set_pg_role(-1); // whatever, not 0
- messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
-
- // register
- remove_ops[tid] = p;
- }
-}
-
-void OSD::op_rep_remove(MOSDOp *op)
-{
- //lock_object(op->get_oid());
- {
- dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
-
- // sanity checks
- assert(store->exists(op->get_oid()));
-
- version_t v = 0;
- store->getattr(op->get_oid(), "version", &v, sizeof(v));
- assert(v == op->get_version());
-
- // remove
- store->collection_remove(op->get_pg(), op->get_oid());
- int r = store->remove(op->get_oid());
- assert(r == 0);
-
- // reply
- messenger->send_message(new MOSDOpReply(op, r, osdmap, true),
- op->get_asker());
- }
- //unlock_object(op->get_oid());
- delete op;
-}
-
-void OSD::op_rep_remove_reply(MOSDOpReply *op)
-{
- object_t oid = op->get_oid();
- version_t v = op->get_version();
- dout(7) << "rep_remove_reply " << hex << oid << dec << endl;
-
- PGPeer *p = remove_ops[op->get_tid()];
- PG *pg = p->pg;
- assert(p); // FIXME: how will this work?
- assert(p->is_removing(oid));
- assert(p->removing_version(oid) == v);
-
- // close out push op.
- remove_ops.erase(op->get_tid());
- pg->removed(oid, v, p);
-
- if (p->is_complete()) {
- dout(7) << " telling replica they are complete" << endl;
- messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
- MSG_ADDR_OSD(p->get_peer()));
- }
-
- // more?
- do_recovery(pg);
-
- delete op;
-}
+// op_rep_modify
+// commit (to disk) callback
class C_OSD_RepModifyCommit : public Context {
public:
OSD *osd;
}
}
+// process a modification operation
+
void OSD::op_rep_modify(MOSDOp *op)
{
- // when we introduce unordered messaging.. FIXME
object_t oid = op->get_oid();
+ // check current version
version_t ov = 0;
if (store->exists(oid))
store->getattr(oid, "version", &ov, sizeof(ov));
- if (op->get_old_version() != ov)
+
+ if (op->get_old_version() != ov) {
+ assert(ov < op->get_old_version());
+
+ // FIXME: block until i get the updated version.
dout(0) << "rep_modify old version is " << ov << " msg sez " << op->get_old_version() << endl;
+ }
assert(op->get_old_version() == ov);
// PG
assert(op->get_data().length() == op->get_length());
oncommit = new C_OSD_RepModifyCommit(this, op);
r = apply_write(op, op->get_version(), oncommit);
- if (ov == 0) pg->add_object(store, oid);
+ store->collection_add(pg->get_pgid(), oid);
logger->inc("r_wr");
logger->inc("r_wrb", op->get_length());
r = store->truncate(oid, op->get_offset());
} else assert(0);
+ // update pg version too
+ pg->info.last_update = op->get_version();
+ if (pg->info.last_complete == ov)
+ pg->info.last_complete = op->get_version();
+
if (oncommit) {
// ack
MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
// REGULAR OP (non-replication)
// is our map version up to date?
- if (op->get_map_version() > osdmap->get_version()) {
+ if (op->get_map_epoch() > osdmap->get_epoch()) {
// op's is newer
- dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
+ dout(7) << "op map " << op->get_map_epoch() << " > " << osdmap->get_epoch() << endl;
wait_for_new_map(op);
return;
}
if (acting_primary != whoami) {
if (acting_primary >= 0) {
- dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+ dout(7) << " acting primary is " << acting_primary
+ << ", forwarding" << endl;
messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
logger->inc("fwd");
} else {
// proxy?
if (!pg) {
- dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl;
+ dout(7) << "hit non-existent pg " << hex << pgid << dec
+ << ", waiting" << endl;
waiting_for_pg[pgid].push_back(op);
return;
}
else {
dout(7) << "handle_op " << op << " in " << *pg << endl;
-
+
// must be peered.
- if (!pg->is_peered()) {
- dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
- pg->waiting_for_peered.push_back(op);
+ if (!pg->is_active()) {
+ dout(7) << "op_write " << *pg << " not active (yet)" << endl;
+ pg->waiting_for_active.push_back(op);
return;
}
const object_t oid = op->get_oid();
- if (!pg->is_complete( osdmap->get_version() )) {
+ if (!pg->is_complete()) {
// consult PG object map
if (pg->objects_missing.count(oid)) {
// need to pull
- dout(7) << "need to pull object " << hex << oid << dec << endl;
+ version_t v = pg->objects_missing[oid];
+ dout(7) << "need to pull object " << hex << oid << dec
+ << " v " << v << endl;
if (!pg->objects_pulling.count(oid))
- pull_replica(pg, oid);
+ pull_replica(pg, pg->recovery_queue[v]);
pg->waiting_for_missing_object[oid].push_back(op);
return;
}
}
- if (!pg->is_clean() &&
- (op->get_op() == OSD_OP_WRITE ||
- op->get_op() == OSD_OP_TRUNCATE ||
- op->get_op() == OSD_OP_DELETE)) {
- // exists but not replicated?
- if (pg->objects_unrep.count(oid)) {
- dout(7) << "object " << hex << oid << dec << " in " << *pg
- << " exists but not clean" << endl;
- pg->waiting_for_clean_object[oid].push_back(op);
- if (pg->objects_pushing.count(oid) == 0)
- push_replica(pg, oid);
- return;
- }
-
- // just stray?
- // FIXME: this is a bit to aggressive; includes inactive peers
- if (pg->objects_stray.count(oid)) {
- dout(7) << "object " << hex << oid << dec << " in " << *pg
- << " dne but is not clean" << endl;
- pg->waiting_for_clean_object[oid].push_back(op);
- if (pg->objects_removing.count(oid) == 0)
- remove_replica(pg, oid);
- return;
- }
- }
+ // okay!
}
} else {
// REPLICATION OP
if (pg) {
- dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
+ dout(7) << "handle_rep_op " << op
+ << " in " << *pg << endl;
} else {
- dout(7) << "handle_rep_op " << op << " in pgid " << op->get_pg() << endl;
+ assert(0);
+ dout(7) << "handle_rep_op " << op
+ << " in pgid " << hex << pgid << dec << endl;
}
+
// check osd map
- if (op->get_map_version() != osdmap->get_version()) {
+ if (op->get_map_epoch() != osdmap->get_epoch()) {
// make sure source is still primary
int curprimary = osdmap->get_pg_acting_primary(op->get_pg());
int myrole = osdmap->get_pg_acting_role(op->get_pg(), whoami);
if (curprimary != MSG_ADDR_NUM(op->get_source()) ||
myrole <= 0) {
- dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary changed on pg " << hex << op->get_pg() << dec << endl;
+ dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch() << ", primary changed on pg " << hex << op->get_pg() << dec << endl;
MOSDOpReply *fail = new MOSDOpReply(op, -1, osdmap, false);
messenger->send_message(fail, op->get_asker());
return;
} else {
- dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary same on pg " << hex << op->get_pg() << dec << endl;
+ dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch() << ", primary same on pg " << hex << op->get_pg() << dec << endl;
}
}
}
-
+
if (g_conf.osd_maxthreads < 1) {
do_op(op); // do it now
} else {
}
/*
- * dequeue called in worker thread, without osd_lock
+ * NOTE: dequeue called in worker thread, without osd_lock
*/
void OSD::dequeue_op(object_t oid)
{
op = ls.front();
ls.pop_front();
- dout(10) << "dequeue_op " << hex << oid << dec << " op " << op << ", " << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+ dout(10) << "dequeue_op " << hex << oid << dec << " op " << op << ", "
+ << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
if (ls.empty())
op_queue.erase(oid);
// do it
do_op(op);
- // unlock
+ // unlock oid
unlock_object(oid);
// finish
-/*
- * do an op
- *
- * object lock may be held (if multithreaded)
+/** do_op - do an op
+ * object lock will be held (if multithreaded)
* osd_lock NOT held.
*/
void OSD::do_op(MOSDOp *op)
case OSD_OP_REP_PULL:
op_rep_pull(op);
break;
- case OSD_OP_REP_PUSH:
- op_rep_push(op);
- break;
- case OSD_OP_REP_REMOVE:
- op_rep_remove(op);
- break;
// replica ops
case OSD_OP_REP_WRITE:
op_stat(op);
break;
case OSD_OP_WRITE:
+ case OSD_OP_ZERO:
case OSD_OP_DELETE:
case OSD_OP_TRUNCATE:
op_modify(op);
dout(7) << " issue_replica_op in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
- // forward the write
+ // forward the write/update/whatever
__uint64_t tid = ++last_tid;
MOSDOp *wr = new MOSDOp(tid,
messenger->get_myaddr(),
oid,
pg->get_pgid(),
- osdmap->get_version(),
+ osdmap->get_epoch(),
100+op->get_op());
wr->get_data() = op->get_data(); // copy bufferlist
wr->set_length(op->get_length());
class C_OSD_WriteCommit : public Context {
public:
OSD *osd;
- OSDReplicaOp *repop;
- C_OSD_WriteCommit(OSD *o, OSDReplicaOp *op) : osd(o), repop(op) {}
+ OSD::OSDReplicaOp *repop;
+ C_OSD_WriteCommit(OSD *o, OSD::OSDReplicaOp *op) : osd(o), repop(op) {}
void finish(int r) {
osd->op_modify_commit(repop);
}
char *opname = 0;
if (op->get_op() == OSD_OP_WRITE) opname = "op_write";
+ if (op->get_op() == OSD_OP_ZERO) opname = "op_zero";
if (op->get_op() == OSD_OP_DELETE) opname = "op_delete";
if (op->get_op() == OSD_OP_TRUNCATE) opname = "op_truncate";
- //lock_object(oid);
+ // version? clean?
+ version_t ov = 0; // 0 == dne (yet)
+ store->getattr(oid, "version", &ov, sizeof(ov));
+
+ //version_t nv = messenger->get_lamport();//op->get_lamport_recv_stamp();
+ version_t nv = ov + 1; //FIXME later
+
+ if (nv <= ov)
+ cerr << opname << " " << hex << oid << dec << " ov " << ov << " nv " << nv
+ << " ... wtf? msg sent " << op->get_lamport_send_stamp()
+ << " recv " << op->get_lamport_recv_stamp() << endl;
+ assert(nv > ov);
+
+ dout(12) << " " << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl;
+
+ // issue replica writes
+ OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
+ repop->start = g_clock.now();
+ repop->waitfor_ack[0] = whoami; // will need local ack, commit
+ repop->waitfor_commit[0] = whoami;
+
+ pg_t pgid = op->get_pg();
+ PG *pg;
+ osd_lock.Lock();
+ repop->lock.Lock();
{
- // version? clean?
- version_t ov = 0; // 0 == dne (yet)
- store->getattr(oid, "version", &ov, sizeof(ov));
-
- //version_t nv = messenger->get_lamport();//op->get_lamport_recv_stamp();
- version_t nv = ov + 1; //FIXME later
-
- if (nv <= ov)
- cerr << opname << " " << hex << oid << dec << " ov " << ov << " nv " << nv
- << " ... wtf? msg sent " << op->get_lamport_send_stamp()
- << " recv " << op->get_lamport_recv_stamp() << endl;
- assert(nv > ov);
-
- dout(12) << " " << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl;
-
- // issue replica writes
- OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
- repop->start = g_clock.now();
- repop->waitfor_ack[0] = whoami; // will need local ack, commit
- repop->waitfor_commit[0] = whoami;
-
- PG *pg;
- osd_lock.Lock();
- repop->lock.Lock();
- {
- pg = get_pg(op->get_pg());
- for (unsigned i=1; i<pg->acting.size(); i++) {
- issue_replica_op(pg, repop, pg->acting[i]);
- }
+ pg = get_pg(pgid);
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ issue_replica_op(pg, repop, pg->acting[i]);
}
- repop->lock.Unlock();
- osd_lock.Unlock();
+ }
+ repop->lock.Unlock();
+ osd_lock.Unlock();
+
+ // pre-ack
+ //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+ //messenger->send_message(reply, op->get_asker());
+
+ // do it
+ int r;
+ if (op->get_op() == OSD_OP_WRITE) {
+ // write
+ assert(op->get_data().length() == op->get_length());
+ Context *oncommit = new C_OSD_WriteCommit(this, repop);
+ r = apply_write(op, nv, oncommit);
- // pre-ack
- //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
- //messenger->send_message(reply, op->get_asker());
+ // put new object in proper collection
+ store->collection_add(pgid, oid); // FIXME : be careful w/ locking
- // do it
- int r;
- if (op->get_op() == OSD_OP_WRITE) {
- // write
- assert(op->get_data().length() == op->get_length());
- Context *oncommit = new C_OSD_WriteCommit(this, repop);
- r = apply_write(op, nv, oncommit);
-
- // put new object in proper collection
- if (ov == 0)
- pg->add_object(store, oid); // FIXME : be careful w/ locking
-
- get_repop(repop);
- assert(repop->waitfor_ack.count(0));
- repop->waitfor_ack.erase(0);
- put_repop(repop);
-
- logger->inc("c_wr");
- logger->inc("c_wrb", op->get_length());
- }
- else if (op->get_op() == OSD_OP_TRUNCATE) {
- // truncate
- r = store->truncate(oid, op->get_offset());
- get_repop(repop);
- assert(repop->waitfor_ack.count(0));
- assert(repop->waitfor_commit.count(0));
- repop->waitfor_ack.erase(0);
- repop->waitfor_commit.erase(0);
- put_repop(repop);
- }
- else if (op->get_op() == OSD_OP_DELETE) {
- // delete
- pg->remove_object(store, op->get_oid()); // be careful with locking
- r = store->remove(oid);
- get_repop(repop);
- assert(repop->waitfor_ack.count(0));
- assert(repop->waitfor_commit.count(0));
- repop->waitfor_ack.erase(0);
- repop->waitfor_commit.erase(0);
- put_repop(repop);
- }
- else assert(0);
+ get_repop(repop);
+ assert(repop->waitfor_ack.count(0));
+ repop->waitfor_ack.erase(0);
+ put_repop(repop);
+ logger->inc("c_wr");
+ logger->inc("c_wrb", op->get_length());
+ }
+ else if (op->get_op() == OSD_OP_TRUNCATE) {
+ // truncate
+ r = store->truncate(oid, op->get_offset());
+ get_repop(repop);
+ assert(repop->waitfor_ack.count(0));
+ assert(repop->waitfor_commit.count(0));
+ repop->waitfor_ack.erase(0);
+ repop->waitfor_commit.erase(0);
+ put_repop(repop);
}
+ else if (op->get_op() == OSD_OP_DELETE) {
+ // delete
+ store->collection_remove(pgid, oid); // be careful with locking
+ r = store->remove(oid);
+ get_repop(repop);
+ assert(repop->waitfor_ack.count(0));
+ assert(repop->waitfor_commit.count(0));
+ repop->waitfor_ack.erase(0);
+ repop->waitfor_commit.erase(0);
+ put_repop(repop);
+ }
+ else assert(0);
+
//unlock_object(oid);
}
*
*/
+#ifndef __PG_H
+#define __PG_H
#include "include/types.h"
#include "include/bufferlist.h"
+
+#include "OSDMap.h"
#include "ObjectStore.h"
#include "msg/Messenger.h"
+#include <list>
+using namespace std;
+
#include <ext/hash_map>
using namespace __gnu_cxx;
-struct PGSummary {
- pg_t pgid;
- version_t version,mtime;
- version_t last_epoch_started;
-};
-
-struct PGContentSummary {
- map<object_t,version_t> objects;
-};
-
-
-
-struct PGReplicaInfo {
- int state;
- version_t last_complete;
- version_t last_any_complete;
- map<object_t,version_t> objects; // remote object list
-
- void _encode(bufferlist& blist) {
- blist.append((char*)&state, sizeof(state));
- blist.append((char*)&last_complete, sizeof(last_complete));
- blist.append((char*)&last_any_complete, sizeof(last_any_complete));
- ::_encode(objects, blist);
- //::_encode(deleted, blist);
- }
- void _decode(bufferlist& blist, int& off) {
- blist.copy(off, sizeof(state), (char*)&state);
- off += sizeof(state);
- blist.copy(off, sizeof(last_complete), (char*)&last_complete);
- off += sizeof(last_complete);
- blist.copy(off, sizeof(last_any_complete), (char*)&last_any_complete);
- off += sizeof(last_any_complete);
- ::_decode(objects, blist, off);
- //::_decode(deleted, blist, off);
- }
-
- PGReplicaInfo() : state(0) { }
-};
-
-
-/** PGPeer
- * state associated with non-primary OSDS with PG content.
- * only used by primary.
- */
-
-// by primary
-#define PG_PEER_STATE_ACTIVE 1 // peer has acked our request, sent back PG state.
-#define PG_PEER_STATE_COMPLETE 2 // peer has everything replicated+clean
-
-class PGPeer {
- public:
- class PG *pg;
- private:
- int peer;
- int role;
- int state;
-
- public:
- // peer state
- version_t last_complete;
- map<object_t,version_t> objects; // cleared after pg->is_peered()
-
- private:
- // recovery todo
- set<object_t> missing; // missing or old objects to push
- set<object_t> stray; // extra objects to delete
-
- // recovery in-flight
- map<object_t,version_t> pulling;
- map<object_t,version_t> pushing;
- map<object_t,version_t> removing;
-
- // replication: for pushing replicas (new or old)
- //map<object_t,version_t> writing; // objects i've written to replica
-
- friend class PG;
-
- public:
- PGPeer(class PG *pg, int p, int ro) :
- pg(pg),
- peer(p),
- role(ro),
- state(0) { }
-
- int get_peer() { return peer; }
- int get_role() { return role; }
-
- int get_state() { return state; }
- bool state_test(int m) { return (state & m) != 0; }
- void state_set(int m) { state |= m; }
- void state_clear(int m) { state &= ~m; }
-
- bool is_active() { return state_test(PG_PEER_STATE_ACTIVE); }
- bool is_complete() { return state_test(PG_PEER_STATE_COMPLETE); }
- bool is_recovering() { return is_active() && !is_complete(); }
-
- bool is_missing(object_t o) {
- if (is_complete()) return false;
- return missing.count(o);
- }
- bool is_stray(object_t o) {
- if (is_complete()) return false;
- return stray.count(o);
- }
-
- // actors
- void pull(object_t o, version_t v) { pulling[o] = v; }
- bool is_pulling(object_t o) { return pulling.count(o); }
- version_t pulling_version(object_t o) { return pulling[o]; }
- void pulled(object_t o) { pulling.erase(o); }
-
- void push(object_t o, version_t v) { pushing[o] = v; }
- bool is_pushing(object_t o) { return pushing.count(o); }
- version_t pushing_version(object_t o) { return pushing[o]; }
- void pushed(object_t o) {
- pushing.erase(o);
- missing.erase(o);
- if (missing.empty() && stray.empty())
- state_set(PG_PEER_STATE_COMPLETE);
- }
-
- void remove(object_t o, version_t v) { removing[o] = v; }
- bool is_removing(object_t o) { return removing.count(o); }
- version_t removing_version(object_t o) { return removing[o]; }
- void removed(object_t o) {
- removing.erase(o);
- stray.erase(o);
- if (missing.empty() && stray.empty())
- state_set(PG_PEER_STATE_COMPLETE);
- }
-
- int num_active_ops() {
- return pulling.size() + pushing.size() + removing.size();
- }
-};
-
-
-
-/*
-// a task list for moving objects around
-class PGQueue {
- list<object_t> objects;
- list<version_t> versions;
- list<int> peers;
- int _size;
- public:
- PGQueue() : _size(0) { }
-
- int size() { return _size; }
-
- void push_back(object_t o, version_t v, int p) {
- objects.push_back(o); versions.push_back(v); peers.push_back(p);
- _size++;
- }
- void push_front(object_t o, version_t v, int p) {
- objects.push_front(o); versions.push_front(v); peers.push_front(p);
- _size++;
- }
- bool get_next(object_t& o, version_t& v, int& p) {
- if (objects.empty()) return false;
- o = objects.front(); v = versions.front(); p = peers.front();
- objects.pop_front(); versions.pop_front(); peers.pop_front();
- _size--;
- return true;
- }
- void clear() {
- objects.clear(); versions.clear(); peers.clear();
- _size = 0;
- }
- bool empty() { return objects.empty(); }
-};
-*/
-
+class OSD;
/** PG - Replica Placement Group
*
*/
-// any
-//#define PG_STATE_COMPLETE 1 // i have full PG contents locally
-#define PG_STATE_PEERED 2 // primary: peered with everybody
- // replica: peered with auth
-
-// primary
-#define PG_STATE_CLEAN 8 // peers are fully replicated and clean of stray objects
-
-// replica
-#define PG_STATE_STRAY 32 // i need to announce myself to new auth
-
-
-
class PG {
- protected:
- int whoami; // osd#, purely for debug output, yucka
-
- pg_t pgid;
- int role; // 0 = primary, 1 = replica, -1=none.
- int state; // see bit defns above
- version_t primary_since; // (only defined if role==0)
-
- version_t last_complete; // me
- version_t last_any_complete; // anybody in the set
+public:
- public:
- map<int, PGPeer*> peers; // primary: (soft state) active peers
-
- public:
- vector<int> acting;
- //pginfo_t info;
-
-
- /*
- lamport_t last_complete_stamp; // lamport timestamp of last complete op
- lamport_t last_modify_stamp; // most recent modification
- lamport_t last_clean_stamp;
- */
-
- // pg waiters
- list<class Message*> waiting_for_peered; // any op will hang until peered
- hash_map<object_t, list<class Message*> > waiting_for_missing_object;
- hash_map<object_t, list<class Message*> > waiting_for_clean_object;
+ /** ObjectInfo
+ * summary info about an object (replica)
+ */
+ struct ObjectInfo {
+ object_t oid;
+ version_t version;
+ int osd; // -1 = unknown. if local, osd == whoami.
+ ObjectInfo(object_t o=0, version_t v=0, int os=-1) : oid(o), version(v), osd(os) {}
+ };
+
+ struct PGInfo {
+ pg_t pgid;
+ version_t last_update; // last object version applied.
+ version_t last_complete; // last pg version pg was complete.
+ epoch_t last_epoch_started; // last epoch started.
+ epoch_t last_epoch_finished; // last epoch finished.
+ epoch_t same_primary_since; //
+ PGInfo(pg_t p=0) : pgid(p),
+ last_update(0), last_complete(0),
+ last_epoch_started(0), last_epoch_finished(0),
+ same_primary_since(0) {}
+ };
+
+ struct PGContentSummary {
+ //version_t since;
+ int remote, missing;
+ list<ObjectInfo> ls;
+
+ void _encode(bufferlist& blist) {
+ //blist.append((char*)&since, sizeof(since));
+ blist.append((char*)&remote, sizeof(remote));
+ blist.append((char*)&missing, sizeof(missing));
+ ::_encode(ls, blist);
+ }
+ void _decode(bufferlist& blist, int& off) {
+ //blist.copy(off, sizeof(since), (char*)&since);
+ //off += sizeof(since);
+ blist.copy(off, sizeof(remote), (char*)&remote);
+ off += sizeof(remote);
+ blist.copy(off, sizeof(missing), (char*)&missing);
+ off += sizeof(missing);
+ ::_decode(ls, blist, off);
+ }
+ PGContentSummary() : remote(0), missing(0) {}
+ };
- // recovery
- map<object_t, set<int> > objects_missing; // pull: missing locally
- map<object_t, version_t > objects_missing_v; // stupid
- map<object_t, set<int> > objects_unrep; // push: missing remotely
- map<object_t, map<int, version_t> > objects_stray; // clean: stray (remote) objects
-
- map<object_t, PGPeer*> objects_pulling;
- map<object_t, set<PGPeer*> > objects_pushing;
- map<object_t, map<PGPeer*, version_t> > objects_removing;
- private:
- map<object_t, set<int> >::iterator pull_pos;
- map<object_t, set<int> >::iterator push_pos;
- map<object_t, map<int, version_t> >::iterator remove_pos;
-
- public:
- bool get_next_pull(object_t& oid) {
- if (objects_missing.empty()) return false;
- if (objects_missing.size() == objects_pulling.size()) return false;
-
- if (objects_pulling.empty() || pull_pos == objects_missing.end())
- pull_pos = objects_missing.begin();
- while (objects_pulling.count(pull_pos->first)) {
- pull_pos++;
- if (pull_pos == objects_missing.end())
- pull_pos = objects_missing.begin();
- }
+ /** PGPeer
+ * state associated with non-primary OSDS with PG content.
+ * only used by primary.
+ */
+
+ class PGPeer {
+ public:
+ // bits
+ static const int STATE_INFO = 1; // we have info
+ static const int STATE_SUMMARY = 2; // we have summary
+ static const int STATE_QINFO = 4; // we are querying info|summary.
+ static const int STATE_QSUMMARY = 8; // we are querying info|summary.
+ static const int STATE_WAITING = 16; // peer is waiting for go.
+ static const int STATE_ACTIVE = 32; // peer is active.
+ //static const int STATE_COMPLETE = 64; // peer is complete.
+
+ class PG *pg;
+ private:
+ int peer;
+ int role;
+ int state;
- oid = pull_pos->first;
- pull_pos++;
- return true;
- }
- bool get_next_push(object_t& oid) {
- if (objects_unrep.empty()) return false;
- if (objects_unrep.size() == objects_pushing.size()) return false;
-
- if (objects_pushing.empty() || push_pos == objects_unrep.end())
- push_pos = objects_unrep.begin();
- while (objects_pushing.count(push_pos->first)) {
- push_pos++;
- if (push_pos == objects_unrep.end())
- push_pos = objects_unrep.begin();
- }
+ public:
+ // peer state
+ PGInfo info;
+ PGContentSummary *content_summary;
- oid = push_pos->first;
- push_pos++;
- return true;
- }
- bool get_next_remove(object_t& oid) {
- if (objects_stray.empty()) return false;
- if (objects_stray.size() == objects_removing.size()) return false;
-
- if (objects_removing.empty() || remove_pos == objects_stray.end())
- remove_pos = objects_stray.begin();
- while (objects_removing.count(remove_pos->first)) {
- remove_pos++;
- if (remove_pos == objects_stray.end())
- remove_pos = objects_stray.begin();
+ friend class PG;
+
+ public:
+ PGPeer(class PG *pg, int p, int ro) :
+ pg(pg),
+ peer(p),
+ role(ro),
+ state(0),
+ content_summary(NULL) { }
+ ~PGPeer() {
+ if (content_summary) delete content_summary;
}
- oid = remove_pos->first;
- remove_pos++;
- return true;
- }
-
- void pulling(object_t oid, version_t v, PGPeer *p) {
- p->pull(oid, v);
- objects_pulling[oid] = p;
- }
- void pulled(object_t oid, version_t v, PGPeer *p);
+ int get_peer() { return peer; }
+ int get_role() { return role; }
+
+ int get_state() { return state; }
+ bool state_test(int m) { return (state & m) != 0; }
+ void state_set(int m) { state |= m; }
+ void state_clear(int m) { state &= ~m; }
+
+ bool have_info() { return state_test(STATE_INFO); }
+ bool have_summary() { return state_test(STATE_SUMMARY); }
+
+ bool is_waiting() { return state_test(STATE_WAITING); }
+ bool is_active() { return state_test(STATE_ACTIVE); }
+ bool is_complete() { return have_info() &&
+ info.last_update == info.last_complete; }
+ };
+
- void pushing(object_t oid, version_t v, PGPeer *p) {
- p->push(oid, v);
- objects_pushing[oid].insert(p);
- }
- void pushed(object_t oid, version_t v, PGPeer *p);
+ /*** PG ****/
+public:
+ // any
+ //static const int STATE_SUMMARY = 1; // i have a content summary.
+ static const int STATE_ACTIVE = 2; // i am active. (primary: replicas too)
+ //static const int STATE_COMPLETE = 4; // i am complete.
- void removing(object_t oid, version_t v, PGPeer *p) {
- p->remove(oid, v);
- objects_removing[oid][p] = v;
- }
- void removed(object_t oid, version_t v, PGPeer *p);
+ // primary
+ static const int STATE_CLEAN = 8; // peers are complete, clean of stray replicas.
+
+ // non-primary
+ static const int STATE_STRAY = 16; // i haven't sent notify yet. primary may not know i exist.
+ protected:
+ OSD *osd;
- // log
- map< version_t, set<object_t> > log_write_version_objects;
- map< object_t, set<version_t> > log_write_object_versions;
- map< version_t, set<object_t> > log_delete_version_objects;
- map< object_t, set<version_t> > log_delete_object_versions;
+ // generic state
+public:
+ PGInfo info;
+ PGContentSummary *content_summary;
- void log_write(object_t o, version_t v) {
- log_write_object_versions[o].insert(v);
- log_write_version_objects[v].insert(o);
- }
- void unlog_write(object_t o, version_t v) {
- log_write_object_versions[o].erase(v);
- log_write_version_objects[v].erase(o);
- }
- void log_delete(object_t o, version_t v) {
- log_delete_object_versions[o].insert(v);
- log_delete_version_objects[v].insert(o);
- }
- void unlog_delete(object_t o, version_t v) {
- log_write_object_versions[o].erase(v);
- log_write_version_objects[v].erase(o);
- }
+protected:
+ int role; // 0 = primary, 1 = replica, -1=none.
+ int state; // see bit defns above
+ // primary state
+public:
+ epoch_t last_epoch_started_any;
+ map<int, PGPeer*> peers; // primary: (soft state) active peers
public:
- void plan_recovery(ObjectStore *store, version_t current_version,
- list<PGPeer*>& complete_peers);
+ vector<int> acting;
- void discard_recovery_plan() {
- assert(waiting_for_peered.empty());
- assert(waiting_for_missing_object.empty());
+ // pg waiters
+ list<class Message*> waiting_for_active;
+ hash_map<object_t,
+ list<class Message*> > waiting_for_missing_object;
- objects_missing.clear();
- objects_missing_v.clear();
- objects_unrep.clear();
- objects_stray.clear();
- }
+ // recovery
+ map<object_t, version_t> objects_missing; // objects (versions) i need
+ map<version_t, ObjectInfo> recovery_queue; // objects i need to pull (in order)
+ version_t requested_through;
+ map<object_t, ObjectInfo> objects_pulling; // which objects are currently being pulled
+
+ void plan_recovery();
+ void generate_content_summary();
+ void do_recovery();
public:
- PG(int osd, pg_t p) : whoami(osd), pgid(p),
+ PG(OSD *o, pg_t p) :
+ osd(o),
+ info(p), content_summary(0),
role(0),
- state(0),
- primary_since(0),
- last_complete(0), last_any_complete(0)
- //last_complete_stamp(0), last_modify_stamp(0), last_clean_stamp(0)
- { }
+ state(0)
+ { }
- pg_t get_pgid() { return pgid; }
+ pg_t get_pgid() { return info.pgid; }
int get_primary() { return acting[0]; }
int get_nrep() { return acting.size(); }
- version_t get_last_complete() { return last_complete; }
- //void set_last_complete(version_t v) { last_complete = v; }
- version_t get_last_any_complete() { return last_any_complete; }
- //void set_last_any_complete(version_t v) { last_any_complete = v; }
-
- version_t get_primary_since() { return primary_since; }
- void set_primary_since(version_t v) { primary_since = v; }
-
int get_role() { return role; }
void set_role(int r) { role = r; }
void calc_role(int whoami) {
}
bool is_primary() { return role == 0; }
bool is_residual() { return role < 0; }
-
+
int get_state() { return state; }
bool state_test(int m) { return (state & m) != 0; }
void set_state(int s) { state = s; }
void state_set(int m) { state |= m; }
void state_clear(int m) { state &= ~m; }
- bool is_complete(version_t v) {
- //return state_test(PG_STATE_COMPLETE);
- return v == last_complete;
- }
- bool is_peered() { return state_test(PG_STATE_PEERED); }
- //bool is_crowned() { return state_test(PG_STATE_CROWNED); }
- bool is_clean() { return state_test(PG_STATE_CLEAN); }
- //bool is_flushing() { return state_test(PG_STATE_FLUSHING); }
- bool is_stray() { return state_test(PG_STATE_STRAY); }
-
- void mark_peered() {
- state_set(PG_STATE_PEERED);
- }
- void mark_complete(version_t v) {
- last_complete = v;
- if (v > last_any_complete) last_any_complete = v;
- }
- void mark_any_complete(version_t v) {
- if (v > last_any_complete) last_any_complete = v;
+ bool is_complete() { return info.last_complete == info.last_update; }
+
+ bool is_active() { return state_test(STATE_ACTIVE); }
+ //bool is_complete() { return state_test(STATE_COMPLETE); }
+ bool is_clean() { return state_test(STATE_CLEAN); }
+ bool is_stray() { return state_test(STATE_STRAY); }
+
+ void mark_complete() {
+ info.last_complete = info.last_update;
}
- void mark_clean() {
- state_set(PG_STATE_CLEAN);
+ void mark_active() {
+ state_set(STATE_ACTIVE);
}
int num_active_ops() {
- int o = 0;
- for (map<int, PGPeer*>::iterator it = peers.begin();
- it != peers.end();
- it++)
- o += it->second->num_active_ops();
- return o;
+ return objects_pulling.size();
}
+ // peers
map<int, PGPeer*>& get_peers() { return peers; }
PGPeer* get_peer(int p) {
if (peers.count(p)) return peers[p];
peers.clear();
}
-
- void store(ObjectStore *store) {
- if (!store->collection_exists(pgid))
- store->create_collection(pgid);
- store->collection_setattr(pgid, "role", &role, sizeof(role));
- store->collection_setattr(pgid, "primary_since", &primary_since, sizeof(primary_since));
- store->collection_setattr(pgid, "state", &state, sizeof(state));
- }
- void fetch(ObjectStore *store) {
- store->collection_getattr(pgid, "role", &role, sizeof(role));
- store->collection_getattr(pgid, "primary_since", &primary_since, sizeof(primary_since));
- store->collection_getattr(pgid, "state", &state, sizeof(state));
- }
- void add_object(ObjectStore *store, const object_t oid) {
- store->collection_add(pgid, oid);
- }
- void remove_object(ObjectStore *store, const object_t oid) {
- store->collection_remove(pgid, oid);
+ // pg state storage
+ /*
+ void store() {
+ if (!osd->store->collection_exists(pgid))
+ osd->store->create_collection(pgid);
+ // ***
}
- void list_objects(ObjectStore *store, list<object_t>& ls) {
- store->collection_list(pgid, ls);
+ void fetch() {
+ //osd->store->collection_getattr(pgid, "role", &role, sizeof(role));
+ //osd->store->collection_getattr(pgid, "primary_since", &primary_since, sizeof(primary_since));
+ //osd->store->collection_getattr(pgid, "state", &state, sizeof(state));
}
- void scan_local_objects(map<object_t, version_t>& local_objects, ObjectStore *store) {
- list<object_t> olist;
- local_objects.clear();
- list_objects(store,olist);
- for (list<object_t>::iterator it = olist.begin();
- it != olist.end();
- it++) {
- version_t v = 0;
- store->getattr(*it,
- "version",
- &v, sizeof(v));
- local_objects[*it] = v;
- cout << " o " << hex << *it << dec << " v " << v << endl;
- }
- }
+ void list_objects(list<object_t>& ls) {
+ osd->store->collection_list(pgid, ls);
+ }*/
+};
-};
+inline ostream& operator<<(ostream& out, PG::ObjectInfo& oi)
+{
+ return out << "object[" << hex << oi.oid << dec
+ << " v " << oi.version
+ << " osd" << oi.osd
+ << "]";
+}
+inline ostream& operator<<(ostream& out, PG::PGInfo& pgi)
+{
+ return out << "pgi(" << hex << pgi.pgid << dec
+ << " v " << pgi.last_update << "/" << pgi.last_complete
+ << " e " << pgi.last_epoch_started << "/" << pgi.last_epoch_finished
+ << ")";
+}
inline ostream& operator<<(ostream& out, PG& pg)
{
- out << "pg[" << hex << pg.get_pgid() << dec << " " << pg.get_role();
- //if (pg.is_complete()) out << " complete";
- if (pg.is_peered()) out << " peered";
+ out << "pg[" << pg.info
+ << " " << pg.get_role();
+ if (pg.is_active()) out << " active";
if (pg.is_clean()) out << " clean";
- out << " lc=" << pg.get_last_complete();
+ if (pg.is_stray()) out << " stray";
out << "]";
return out;
}
+
+#endif