ebofs.o\
osd/OSD.o
+OSDC_OBJS= \
+ client/Filer.o\
+ client/ObjectCacher.o\
+ client/Objecter.o
+
COMMON_OBJS= \
msg/Messenger.o\
msg/Dispatcher.o\
msg/HostMonitor.o\
- client/Filer.o\
- client/FileCache.o\
- client/ObjectCacher.o\
- client/Objecter.o\
mds/MDCluster.o\
common/Logger.o\
common/Clock.o\
common/Timer.o\
config.o
+
CLIENT_OBJS= \
+ client/FileCache.o\
client/Client.o\
client/SyntheticClient.o\
client/Trace.o
cosd: cosd.cc osd.o msg/NewMessenger.o common.o
${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
-cfuse: cfuse.cc client.o client/fuse.o msg/NewMessenger.o common.o
+cmds: cmds.cc mds.o osdc.o msg/NewMessenger.o common.o
+ ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
+
+cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewMessenger.o common.o
${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
# synthetic workload
-fakesyn: fakesyn.cc mds.o client.o osd.o msg/FakeMessenger.o common.o
+fakesyn: fakesyn.cc mds.o client.o osd.o osdc.o msg/FakeMessenger.o common.o
${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
-mpisyn: mpisyn.cc mds.o client.o osd.o msg/MPIMessenger.cc common.o
+mpisyn: mpisyn.cc mds.o client.o osd.o osdc.o msg/MPIMessenger.cc common.o
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
-tcpsyn: tcpsyn.cc mds.o client.o osd.o ${TCP_OBJS} common.o
+tcpsyn: tcpsyn.cc mds.o client.o osd.o osdc.o ${TCP_OBJS} common.o
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
-newsyn: newsyn.cc mds.o client.o osd.o msg/NewMessenger.o common.o
+newsyn: newsyn.cc mds.o client.o osd.o osdc.o msg/NewMessenger.o common.o
${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
# + obfs
# libceph
-libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
+libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
ld -i $^ -o $@
bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
ebofs.o: ${EBOFS_OBJS}
ld -i -o $@ $^
-client.o: ${CLIENT_OBJS}
+client.o: ${CLIENT_OBJS}
ld -i -o $@ $^
osd.o: ${OSD_OBJS}
ld -i -o $@ $^
+osdc.o: ${OSDC_OBJS}
+ ld -i -o $@ $^
+
osd_obfs.o: osd/OBFSStore.o osd/OSD.ccosd/PG.o osd/ObjectStore.o osd/FakeStore.o
${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
-FAST rados paper
+== FAST rados paper
cluster map
+
+
+== rados client nodes
+
why do we want client op ordering?
- simpler logic in objectcacher
- can pipeline lock + write + unlock, etc.
- accept acks from current|prior primary.
- only accept commits from current primary.
-
- need pg map
- to detect primary changes,
- pg crashes
+== todo
+
-- gzip in messenger?
-remaining hard problems
-- how to cope with file size changes and read/write sharing
-- how to choose osd monitor peer sets
-- mds failure recovery (of course)
+- gzip in messenger?
osd/rados
/- add caller to log
/- make modify ops idempotent
- rdlocks
+- test wrnoop
- pg_bit changes
- use pg->info.same_role_since wrt replication ops.
- report crashed pgs?
- recover period for crashed pgs
osdmonitor
+- monitor pgs
- watch osd utilization; adjust overload in cluster map
objecter
/- detect, cope with crashed PGs, wait for map updates, etc.
/- filer read/write: flip offset,size args
- replay ops (in order) after pg crash
-- sequential lock acquisition
objectcacher
-- locking
-- atomic read/write ops
-- flusher
+- ocacher caps transitions vs locks
+- ocacher flushing, pin Objects* while locks held
+- test read locks
reliability
general
- timer needs cancel sets, schedulers need to cancel outstanding events on shutdown
+remaining hard problems
+- how to cope with file size changes and read/write sharing
+- mds failure recovery (of course)
+
crush
- more efficient failure when all/too many osds are down
r = filer->read(in->inode, offset, size, &blist, onfinish);
- if (r == 0) {
- // wait!
- while (!done)
- cond.Wait(client_lock);
- } else {
- // had it cached, apparently!
- assert(r > 0);
- delete onfinish;
- }
+ assert(r >= 0);
+
+ // wait!
+ while (!done)
+ cond.Wait(client_lock);
}
// copy data into caller's char* buf
osd_maxthreads: 2, // 0 == no threading
osd_max_opq: 10,
osd_mkfs: false,
- osd_fake_sync: false,
osd_age: .8,
osd_age_time: 0,
+ osd_heartbeat_interval: 10,
// --- fakestore ---
fakestore_fake_sync: 2, // 2 seconds
int osd_maxthreads;
int osd_max_opq;
bool osd_mkfs;
- bool osd_fake_sync;
float osd_age;
int osd_age_time;
-
+ int osd_heartbeat_interval;
int fakestore_fake_sync;
bool fakestore_fsync;
return (l.epoch == r.epoch) ? (l.version >= r.version):(l.epoch >= r.epoch);
}
inline ostream& operator<<(ostream& out, const eversion_t e) {
- return out << e.epoch << "." << e.version;
+ return out << e.epoch << "'" << e.version;
}
#define OSD_OP_REP_WRUNLOCK (100+OSD_OP_WRUNLOCK)
#define OSD_OP_REP_RDLOCK (100+OSD_OP_RDLOCK)
#define OSD_OP_REP_RDUNLOCK (100+OSD_OP_RDUNLOCK)
+#define OSD_OP_REP_UPLOCK (100+OSD_OP_UPLOCK)
+#define OSD_OP_REP_DNLOCK (100+OSD_OP_DNLOCK)
#define OSD_OP_REP_PULL 30 // whole object read
//#define OSD_OP_REP_PUSH 31 // whole object write
case OSD_OP_TRUNCATE: return "truncate";
case OSD_OP_WRLOCK: return "wrlock";
case OSD_OP_WRUNLOCK: return "wrunlock";
+ case OSD_OP_RDLOCK: return "rdlock";
+ case OSD_OP_RDUNLOCK: return "rdunlock";
+ case OSD_OP_UPLOCK: return "uplock";
+ case OSD_OP_DNLOCK: return "dnlock";
case OSD_OP_WRNOOP: return "wrnoop";
default: assert(0);
}
<< " inst " << m->get_source_inst()
<< " > " << rank.entity_map[m->get_source()]
<< ", WATCH OUT " << *m << endl;
- //rank.entity_map[m->get_source()] = m->get_source_inst();
+ rank.entity_map[m->get_source()] = m->get_source_inst();
}
if (m->get_dest().type() == MSG_ADDR_RANK_BASE) {
assert(i.rank != my_rank); // hrm?
- assert(entity_map.count(a) == 0);
- entity_map[a] = i;
- connect_rank(i);
+ if (entity_map.count(a) == 0 ||
+ entity_map[a] < i) {
+ entity_map[a] = i;
+ connect_rank(i);
+ } else if (entity_map[a] == i) {
+ dout(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+ derr(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+ } else {
+ dout(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+ derr(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+ }
//if (waiting_for_lookup.count(a))
//lookup(a);
#include "msg/Messenger.h"
#include "msg/Message.h"
-#include "msg/HostMonitor.h"
+//#include "msg/HostMonitor.h"
#include "messages/MGenericMessage.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
+#include "messages/MOSDPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDBoot.h"
if (g_conf.osd_remount_at)
g_timer.add_event_after(g_conf.osd_remount_at, new C_Remount(this));
+ next_heartbeat = new C_Heartbeat(this);
+ g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat);
+
// init object store
// try in this order:
{
if (threadpool) { delete threadpool; threadpool = 0; }
if (osdmap) { delete osdmap; osdmap = 0; }
- if (monitor) { delete monitor; monitor = 0; }
+ //if (monitor) { delete monitor; monitor = 0; }
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
if (store) { delete store; store = 0; }
}
// monitor
- char s[80];
+ /*
+ char s[80];
sprintf(s, "osd%d", whoami);
string st = s;
monitor = new HostMonitor(messenger, st);
monitor->get_notify().insert(MSG_ADDR_MON(0));
// </hack>
+ */
// log
char name[80];
{
dout(1) << "shutdown, timer has " << g_timer.num_event << endl;
+ if (next_heartbeat) g_timer.cancel_event(next_heartbeat);
+
// finish ops
wait_for_no_ops();
pg_map.clear();
// shut everything else down
- monitor->shutdown();
+ //monitor->shutdown();
messenger->shutdown();
osd_lock.Unlock();
}
+// -------------------------------------
+
+void OSD::heartbeat()
+{
+ osd_lock.Lock();
+
+ utime_t now = g_clock.now();
+ utime_t since = now;
+ since.sec_ref() -= g_conf.osd_heartbeat_interval;
+
+ dout(15) << "heartbeat " << now << endl;
+
+ // send pings
+ set<int> pingset;
+ for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
+ i != pg_map.end();
+ i++) {
+ PG *pg = i->second;
+
+ // we want to ping the primary.
+ if (pg->get_role() <= 0) continue;
+ if (pg->acting.size() < 1) continue;
+
+ if (pg->last_heartbeat < since) {
+ pg->last_heartbeat = now;
+ pingset.insert(pg->acting[0]);
+ }
+ }
+ for (set<int>::iterator i = pingset.begin();
+ i != pingset.end();
+ i++)
+ messenger->send_message(new MOSDPing(osdmap->get_epoch()),
+ MSG_ADDR_OSD(*i));
+
+
+ // schedule next!
+ next_heartbeat = new C_Heartbeat(this);
+ g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat);
+
+ osd_lock.Unlock();
+}
+
// --------------------------------------
// dispatch
+void OSD::share_map(msg_addr_t who, epoch_t epoch)
+{
+ // does client have old map?
+ if (who.is_client()) {
+ if (epoch < osdmap->get_epoch()) {
+ dout(10) << who << " has old map " << epoch << " < " << osdmap->get_epoch() << endl;
+ send_incremental_map(epoch, who, true);
+ }
+ }
+
+ // does peer have old map?
+ if (who.is_osd()) {
+ // remember
+ if (peer_map_epoch[who] < epoch)
+ peer_map_epoch[who] = epoch;
+
+ // older?
+ if (peer_map_epoch[who] < osdmap->get_epoch()) {
+ dout(10) << who << " has old map " << epoch << " < " << osdmap->get_epoch() << endl;
+ send_incremental_map(epoch, who, true);
+ peer_map_epoch[who] = osdmap->get_epoch(); // so we don't send it again.
+ }
+ }
+}
+
void OSD::dispatch(Message *m)
{
// check clock regularly
- utime_t now = g_clock.now();
+ //utime_t now = g_clock.now();
//dout(-20) << now << endl;
+ // -- don't need lock --
+ switch (m->get_type()) {
+ case MSG_PING:
+ dout(10) << "ping from " << m->get_source() << endl;
+ delete m;
+ return;
+
+ }
+
+
+ // lock!
osd_lock.Lock();
switch (m->get_type()) {
// -- don't need OSDMap --
+ /*
// host monitor
case MSG_PING_ACK:
case MSG_FAILURE_ACK:
monitor->proc_message(m);
break;
+ */
// map and replication
case MSG_OSD_MAP:
break;
- case MSG_PING:
- // take note.
- monitor->host_is_alive(m->get_source());
- handle_ping((MPing*)m);
- break;
// -- need OSDMap --
break;
}
+
+
+
+
// need OSDMap
switch (m->get_type()) {
+
+ case MSG_OSD_PING:
+ // take note.
+ dout(20) << "osdping from " << m->get_source() << endl;
+ share_map(m->get_source(), ((MOSDPing*)m)->map_epoch);
+ break;
case MSG_OSD_PG_NOTIFY:
handle_pg_notify((MOSDPGNotify*)m);
break;
case MSG_OSD_OP:
- monitor->host_is_alive(m->get_source());
handle_op((MOSDOp*)m);
break;
// for replication etc.
case MSG_OSD_OPREPLY:
- monitor->host_is_alive(m->get_source());
handle_op_reply((MOSDOpReply*)m);
break;
op_rep_pull_reply(m);
break;
+ case OSD_OP_REP_WRNOOP:
case OSD_OP_REP_WRITE:
case OSD_OP_REP_TRUNCATE:
case OSD_OP_REP_DELETE:
case OSD_OP_REP_WRUNLOCK:
case OSD_OP_REP_RDLOCK:
case OSD_OP_REP_RDUNLOCK:
+ case OSD_OP_REP_UPLOCK:
+ case OSD_OP_REP_DNLOCK:
{
const pg_t pgid = m->get_pg();
if (pg_map.count(pgid)) {
+/*
void OSD::handle_ping(MPing *m)
{
dout(7) << "got ping, replying" << endl;
m->get_source(), m->get_source_port(), 0);
delete m;
}
+*/
// all the way?
if (advanced && cur == superblock.newest_map) {
// yay!
- activate_map();
+ activate_map(t);
// process waiters
take_waiters(waiting_for_osdmap);
pg->info.last_epoch_started =
pg->info.same_primary_since =
pg->info.same_role_since = osdmap->get_epoch();
- pg->state_set(PG::STATE_ACTIVE);
+ pg->activate(t);
dout(7) << "created " << *pg << endl;
}
pg->info.last_epoch_started =
pg->info.same_primary_since =
pg->info.same_role_since = osdmap->get_epoch();
- pg->state_set(PG::STATE_ACTIVE);
-
+ pg->activate(t);
+
dout(7) << "created " << *pg << endl;
}
}
}
-void OSD::activate_map()
+void OSD::activate_map(ObjectStore::Transaction& t)
{
dout(7) << "activate_map version " << osdmap->get_epoch() << endl;
else if (pg->get_role() == 0 && !pg->is_active()) {
// i am (inactive) primary
pg->build_prior();
- pg->peer(query_map);
+ pg->peer(t, query_map);
}
else if (pg->is_stray()) {
// i am residual|replica
bool OSD::require_current_map(Message *m, epoch_t ep)
{
- int from = MSG_ADDR_NUM(m->get_source());
-
// older map?
if (ep < osdmap->get_epoch()) {
- dout(7) << " from old map epoch " << ep << " < " << osdmap->get_epoch() << endl;
+ dout(7) << "require_current_map epoch " << ep << " < " << osdmap->get_epoch() << endl;
delete m; // discard and ignore.
return false;
}
// newer map?
if (ep > osdmap->get_epoch()) {
- dout(7) << " from newer map epoch " << ep << " > " << osdmap->get_epoch() << endl;
+ dout(7) << "require_current_map epoch " << ep << " > " << osdmap->get_epoch() << endl;
wait_for_new_map(m);
return false;
}
- // down?
- if (osdmap->is_down(from)) {
- dout(7) << " from down OSD osd" << from << ", dropping" << endl;
- // FIXME
- return false;
- }
-
assert(ep == osdmap->get_epoch());
return true;
}
*/
bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
{
+ dout(10) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << endl;
+
// newer map?
if (epoch > osdmap->get_epoch()) {
dout(7) << " from newer map epoch " << epoch << " > " << osdmap->get_epoch() << endl;
return false;
}
- // down osd?
- if (m->get_source().is_osd() &&
- osdmap->is_down(m->get_source().num())) {
- if (epoch > osdmap->get_epoch()) {
- dout(7) << "msg from down " << m->get_source()
- << ", waiting for new map" << endl;
- wait_for_new_map(m);
- } else {
- dout(7) << "msg from down " << m->get_source()
- << ", dropping" << endl;
- delete m;
- }
- return false;
- }
-
return true;
}
if (pg->acting[i] == whoami) role = i>0 ? 1:0;
pg->set_role(role);
- dout(10) << "load_pgs loaded " << *pg << endl;
+ dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
}
}
pg->adjust_prior();
// peer
- pg->peer(query_map);
+ pg->peer(t, query_map);
}
_unlock_pg(pgid);
// merge into our own log
pg->merge_log(m->log, m->missing, from);
-
- pg->clean_up_local();
// peer
map< int, map<pg_t,PG::Query> > query_map;
- pg->peer(query_map);
+ pg->peer(t, query_map);
do_queries(query_map);
} else {
pg->merge_log(m->log, m->missing, from);
assert(pg->missing.num_lost() == 0);
- // clean up any stray objects
- pg->clean_up_local();
-
// ok active!
- pg->info.last_epoch_started = osdmap->get_epoch();
pg->info.same_primary_since = m->info.same_primary_since;
- pg->state_set(PG::STATE_ACTIVE);
+ pg->activate(t);
// take any waiters
take_waiters(pg->waiting_for_active);
-
- // initiate any recovery?
- pg->start_recovery();
}
- pg->write_log(t);
unsigned tr = store->apply_transaction(t);
assert(tr == 0);
_remove_pg(pgid);
- // unlock. there shouldn't be any waiters, since we're a stray, and pg is presumably clean.
+ // unlock. there shouldn't be any waiters, since we're a stray, and pg is presumably clean0.
assert(pg_lock_waiters.count(pgid) == 0);
_unlock_pg(pgid);
}
assert(pg->missing.loc.count(oid));
int osd = pg->missing.loc[oid];
- dout(7) << "pull " << hex << oid << dec
+ dout(7) << *pg << " pull " << hex << oid << dec
<< " v " << v
<< " from osd" << osd
- << " in " << *pg
<< endl;
// send op
t.write(oid, 0, op->get_length(), op->get_data());
t.setattrs(oid, op->get_attrset());
t.collection_add(pgid, oid);
- unsigned r = store->apply_transaction(t);
- assert(r == 0);
// close out pull op.
pg->objects_pulling.erase(oid);
take_waiters(pg->waiting_for_missing_object[oid]);
// raise last_complete?
+ assert(pg->log.complete_to != pg->log.log.end());
while (pg->log.complete_to != pg->log.log.end()) {
if (pg->missing.missing.count(pg->log.complete_to->oid)) break;
- pg->info.last_complete = pg->log.complete_to->version;
- pg++;
+ if (pg->info.last_complete < pg->log.complete_to->version)
+ pg->info.last_complete = pg->log.complete_to->version;
+ pg->log.complete_to++;
}
dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl;
- if (pg->missing.num_missing() == 0) {
- assert(pg->info.last_complete == pg->info.last_update);
-
- if (pg->is_primary()) {
- // i am primary
- pg->clean_set.insert(whoami);
- if (pg->is_all_clean()) {
- pg->state_set(PG::STATE_CLEAN);
- pg->clean_replicas();
- }
- } else {
- // tell primary
- dout(7) << *pg << " recovery complete, telling primary" << endl;
- list<PG::Info> ls;
- ls.push_back(pg->info);
- messenger->send_message(new MOSDPGNotify(osdmap->get_epoch(),
- ls),
- MSG_ADDR_OSD(pg->get_primary()));
- }
- } else {
- // continue
- pg->do_recovery();
- }
-
+ // continue
+ pg->do_recovery();
+
+ // apply to disk!
+ t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+ unsigned r = store->apply_transaction(t);
+ assert(r == 0);
+
_unlock_pg(pgid);
delete op;
MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
messenger->send_message(ack, op->get_asker());
oncommit->ack();
+
+ pg->last_heartbeat = g_clock.now();
}
}
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
- // does client have old map?
- if (op->get_source().is_client()) {
- if (op->get_map_epoch() < osdmap->get_epoch()) {
- dout(10) << "handle_op client has old map " << op->get_map_epoch() << " < " << osdmap->get_epoch() << endl;
- send_incremental_map(op->get_map_epoch(), op->get_source(), false);
- }
- }
-
- // does peer have old map?
- if (op->get_source().is_osd()) {
- // remember
- if (peer_map_epoch[op->get_source()] < op->get_map_epoch())
- peer_map_epoch[op->get_source()] = op->get_map_epoch();
-
- // older?
- if (peer_map_epoch[op->get_source()] < osdmap->get_epoch()) {
- dout(10) << "handle_op osd has old map " << op->get_map_epoch() << " < " << osdmap->get_epoch() << endl;
- send_incremental_map(op->get_map_epoch(), op->get_source(), true);
- peer_map_epoch[op->get_source()] = osdmap->get_epoch(); // so we don't send it again.
- }
- }
+ share_map(op->get_source(), op->get_map_epoch());
// crashed?
if (acting_primary < 0) {
break;
// replicated ops
+ case OSD_OP_REP_WRNOOP:
case OSD_OP_REP_WRITE:
case OSD_OP_REP_TRUNCATE:
case OSD_OP_REP_DELETE:
case OSD_OP_REP_WRUNLOCK:
case OSD_OP_REP_RDLOCK:
case OSD_OP_REP_RDUNLOCK:
+ case OSD_OP_REP_UPLOCK:
+ case OSD_OP_REP_DNLOCK:
op_rep_modify(op, pg);
break;
case OSD_OP_STAT:
op_stat(op, pg);
break;
+ case OSD_OP_WRNOOP:
case OSD_OP_WRITE:
case OSD_OP_ZERO:
case OSD_OP_DELETE:
case OSD_OP_WRUNLOCK:
case OSD_OP_RDLOCK:
case OSD_OP_RDUNLOCK:
+ case OSD_OP_UPLOCK:
+ case OSD_OP_DNLOCK:
op_modify(op, pg);
break;
default:
if (repop->can_delete()) {
// adjust peers_complete_thru
if (!repop->pg_complete_thru.empty()) {
- map<int,eversion_t>::iterator p = repop->pg_complete_thru.begin();
- eversion_t min = p->second;
- p++;
- while (p != repop->pg_complete_thru.end()) {
- if (p->second < min) min = p->second;
- p++;
- }
-
pg_t pgid = repop->op->get_pg();
osd_lock.Lock();
PG *pg = get_pg(pgid);
if (pg) {
+ eversion_t min = pg->info.last_complete; // hrm....
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ if (repop->pg_complete_thru[i] < min) // note: if we haven't heard, it'll be zero, which is what we want.
+ min = repop->pg_complete_thru[i];
+ }
+
if (min > pg->peers_complete_thru) {
- //dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+ dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
pg->peers_complete_thru = min;
}
//_unlock_pg(pgid);
char dev_path[100];
class ObjectStore *store;
- // failure monitoring
- class HostMonitor *monitor;
+ // heartbeat
+ void heartbeat();
+
+ class C_Heartbeat : public Context {
+ OSD *osd;
+ public:
+ C_Heartbeat(OSD *o) : osd(o) {}
+ void finish(int r) {
+ osd->heartbeat();
+ }
+ } *next_heartbeat;
// global lock
Mutex osd_lock;
// -- osd map --
class OSDMap *osdmap;
list<class Message*> waiting_for_osdmap;
- //map<version_t, OSDMap*> osdmaps;
hash_map<msg_addr_t, epoch_t> peer_map_epoch;
-
+ void share_map(msg_addr_t who, epoch_t epoch);
+
void wait_for_new_map(Message *m);
void handle_osd_map(class MOSDMap *m);
void advance_map(ObjectStore::Transaction& t);
- void activate_map();
+ void activate_map(ObjectStore::Transaction& t);
bool get_map_bl(epoch_t e, bufferlist& bl);
bool get_map(epoch_t e, OSDMap &m);
// messages
virtual void dispatch(Message *m);
- void handle_ping(class MPing *m);
+ //void handle_ping(class MPing *m);
void handle_op(class MOSDOp *m);
void op_read(class MOSDOp *m, PG *pg);
#include "OSD.h"
+#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
i++) {
if (i->version <= v) break;
log.push_front(*i);
- bottom = i->version;
}
+ bottom = v;
}
while (!log.empty()) {
Entry &e = *log.begin();
- assert(requested_to != log.begin());
assert(complete_to != log.begin());
+ assert(requested_to != log.begin());
// remove from index,
unindex(e);
dout(10) << "merge_log " << olog << " from osd" << fromosd
<< " into " << log << endl;
+ cout << "log" << endl;
+ log.print(cout);
+ cout << "olog" << endl;
+ olog.print(cout);
+
if (log.empty() ||
(olog.bottom > log.top && olog.backlog)) { // e.g. log=(0,20] olog=(40,50]+backlog)
// i'm missing everything after old log.top.
// take the whole thing.
log.log.swap(olog.log);
- log.top = olog.top;
- log.bottom = olog.bottom;
- log.backlog = olog.backlog;
+
+ info.last_update = log.top = olog.top;
+ info.log_bottom = log.bottom = olog.bottom;
+ info.log_backlog = log.backlog = olog.backlog;
log.index();
dout(10) << "merge_log result " << log << " " << missing << endl;
+ log.print(cout);
+ dout(10) << "missing " << hex << missing.missing << dec << endl;
return;
}
list<Log::Entry>::iterator to = olog.log.end();
list<Log::Entry>::iterator from = olog.log.end();
while (1) {
+ if (from == olog.log.begin()) break;
from--;
- if (from->version > log.top) {
+ if (from->version < log.top) {
from++;
break;
}
// splice
log.log.splice(log.log.end(),
- log.log, from, to);
+ olog.log, from, to);
- log.top = olog.top;
+ info.last_update = log.top = olog.top;
}
dout(10) << "merge_log result " << log << " " << missing << endl;
+ log.print(cout);
+ dout(10) << "missing " << hex << missing.missing << dec << endl;
}
osd->store->collection_list(info.pgid, olist);
int local = 0;
+ map<eversion_t,Log::Entry> add;
for (list<object_t>::iterator it = olist.begin();
it != olist.end();
it++) {
osd->store->getattr(*it,
"version",
&e.version, sizeof(e.version));
- log.log.push_front(e);
+ add[e.version] = e;
+ }
- // index
- log.index( *log.log.begin() );
+ for (map<eversion_t,Log::Entry>::iterator i = add.begin();
+ i != add.end();
+ i++) {
+ log.log.push_front(i->second);
+ log.index( *log.log.begin() ); // index
}
dout(10) << local << " local objects, "
+ << add.size() << " objects added to backlog, "
<< log.objects.size() << " in pg" << endl;
}
void PG::drop_backlog()
{
- dout(10) << "dropping backlog for " << log << endl;
+ dout(10) << "drop_backlog for " << log << endl;
+ log.print(cout);
assert(log.backlog);
log.backlog = false;
while (!log.log.empty()) {
Log::Entry &e = *log.log.begin();
- if (e.version == log.bottom) break;
- assert(e.version < log.bottom);
+ if (e.version > log.bottom) break;
+ dout(10) << "drop_backlog trimming " << e.version << endl;
log.unindex(e);
log.log.pop_front();
}
}
-void PG::peer(map< int, map<pg_t,Query> >& query_map)
+void PG::peer(ObjectStore::Transaction& t,
+ map< int, map<pg_t,Query> >& query_map)
{
dout(10) << "peer. acting is " << acting
<< ", prior_set is " << prior_set << endl;
eversion_t newest_update = info.last_update;
int newest_update_osd = osd->whoami;
eversion_t oldest_update_needed = info.last_update; // only of acting (current) osd set
- eversion_t peers_complete_thru = info.last_complete;
+ peers_complete_thru = info.last_complete;
for (map<int,Info>::iterator it = peer_info.begin();
it != peer_info.end();
// -- ok, activate!
- info.last_epoch_started = osd->osdmap->get_epoch();
- state_set(PG::STATE_ACTIVE); // i am active!
- dout(10) << "marking active" << endl;
+ activate(t);
clean_set.clear();
if (info.is_clean())
dout(10) << "sending " << m->log << " " << m->missing
<< " to osd" << peer << endl;
+ m->log.print(cout);
+
osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
}
clean_replicas();
}
+ // waiters
osd->take_waiters(waiting_for_active);
}
+void PG::activate(ObjectStore::Transaction& t)
+{
+ // twiddle pg state
+ state_set(STATE_ACTIVE);
+ state_clear(PG::STATE_STRAY);
+ info.last_epoch_started = osd->osdmap->get_epoch();
+
+ if (role == 0) { // primary state
+ peers_complete_thru = 0; // we don't know (yet)!
+ }
+
+ assert(info.last_complete >= log.bottom || log.backlog);
+
+ // write pg info
+ t.collection_setattr(info.pgid, "info", (char*)&info, sizeof(info));
+
+ // write log
+ write_log(t);
+
+ // clean up stray objects
+ clean_up_local(t);
+
+ // init complete pointer
+ if (info.last_complete == info.last_update) {
+ dout(10) << "activate - complete" << endl;
+ log.complete_to == log.log.end();
+ log.requested_to = log.log.end();
+ } else {
+ dout(10) << "activate - not complete, starting recovery" << endl;
+
+ // init complete_to
+ log.complete_to = log.log.begin();
+ while (log.complete_to->version < info.last_complete) {
+ log.complete_to++;
+ assert(log.complete_to != log.log.end());
+ }
+
+ // start recovery
+ log.requested_to = log.log.begin();
+ do_recovery();
+ }
+}
+
+/** clean_up_local
+ * remove any objects that we're storing but shouldn't.
+ * as determined by log.
+ */
+void PG::clean_up_local(ObjectStore::Transaction& t)
+{
+ dout(10) << "clean_up_local" << endl;
+
+ assert(info.last_update >= log.bottom); // otherwise we need some help!
+
+ if (log.backlog) {
+ // be thorough.
+ list<object_t> ls;
+ osd->store->collection_list(info.pgid, ls);
+ set<object_t> s;
+
+ for (list<object_t>::iterator i = ls.begin();
+ i != ls.end();
+ i++)
+ s.insert(*i);
+
+ set<object_t> did;
+ for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+ p != log.log.rend();
+ p++) {
+ if (did.count(p->oid)) continue;
+ did.insert(p->oid);
+
+ if (p->is_delete()) {
+ if (s.count(p->oid)) {
+ dout(10) << " deleting " << hex << p->oid << dec
+ << " when " << p->version << endl;
+ t.remove(p->oid);
+ }
+ s.erase(p->oid);
+ } else {
+ // just leave old objects.. they're missing or whatever
+ s.erase(p->oid);
+ }
+ }
+
+ for (set<object_t>::iterator i = s.begin();
+ i != s.end();
+ i++) {
+ dout(10) << " deleting stray " << hex << *i << dec << endl;
+ t.remove(*i);
+ }
+
+ } else {
+ // just scan the log.
+ set<object_t> did;
+ for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+ p != log.log.rend();
+ p++) {
+ if (did.count(p->oid)) continue;
+ did.insert(p->oid);
+
+ if (p->is_delete()) {
+ dout(10) << " deleting " << hex << p->oid << dec
+ << " when " << p->version << endl;
+ t.remove(p->oid);
+ } else {
+ // keep old(+missing) objects, just for kicks.
+ }
+ }
+ }
+}
+
/**
* do one recovery op.
// look at log!
Log::Entry *latest = 0;
- while (1) {
- if (log.requested_to == log.log.end()) {
- dout(10) << "already requested everything in log" << endl;
- return false;
- }
+ while (log.requested_to != log.log.end()) {
+ dout(10) << "do_recovery "
+ << log.requested_to->version
+ << (log.requested_to->is_update() ? " update":" delete")
+ << " " << hex << log.requested_to->oid << dec
+ << endl;
+ assert(log.objects.count(log.requested_to->oid));
latest = log.objects[log.requested_to->oid];
+
if (latest->is_update() &&
!objects_pulling.count(latest->oid) &&
- missing.is_missing(latest->oid))
- break;
+ missing.is_missing(latest->oid)) {
+ osd->pull(this, latest->oid, latest->version);
+ return true;
+ }
log.requested_to++;
}
- osd->pull(this, latest->oid, latest->version);
+ // done!
+ assert(missing.num_missing() == 0);
+ assert(info.last_complete == info.last_update);
- return true;
-}
+ if (is_primary()) {
+ // i am primary
+ clean_set.insert(osd->whoami);
+ if (is_all_clean()) {
+ state_set(PG::STATE_CLEAN);
+ clean_replicas();
+ }
+ } else {
+ // tell primary
+ dout(7) << "do_recovery complete, telling primary" << endl;
+ list<PG::Info> ls;
+ ls.push_back(info);
+ osd->messenger->send_message(new MOSDPGNotify(osd->osdmap->get_epoch(),
+ ls),
+ MSG_ADDR_OSD(get_primary()));
+ }
+ return false;
+}
-/** clean_up_local
- * remove any objects that we're storing but shouldn't.
- * as determined by log.
- */
-void PG::clean_up_local()
-{
- dout(10) << "clean_up_local" << endl;
- assert(info.last_update >= log.bottom); // otherwise we need some help!
- set<object_t> did;
- for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
- p != log.log.rend();
- p++) {
- if (!p->is_delete()) continue;
- if (did.count(p->oid)) continue;
- did.insert(p->oid);
-
- if (osd->store->exists(p->oid)) {
- eversion_t ov = 0;
- osd->store->getattr(p->oid, "version", &ov, sizeof(ov));
- if (ov < p->version) {
- dout(10) << " removing " << hex << p->oid << dec
- << " v " << ov << " < " << p->version << endl;
- osd->store->remove(p->oid);
- } else {
- dout(10) << " keeping " << hex << p->oid << dec
- << " v " << ov << " >= " << p->version << endl;
- }
- }
- }
-}
void PG::clean_replicas()
{
void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry,
eversion_t trim_to)
{
- // write entry
+ // write entry on disk
bufferlist bl;
bl.append( (char*)&logentry, sizeof(logentry) );
t.write( info.pgid, ondisklog.top, bl.length(), bl );
}
}
log.top = info.last_update;
+ log.index();
+
+ // build missing
+ set<object_t> did;
+ for (list<Log::Entry>::reverse_iterator i = log.log.rbegin();
+ i != log.log.rend();
+ i++) {
+ if (i->version <= info.last_complete) break;
+ if (did.count(i->oid)) continue;
+ did.insert(i->oid);
+
+ if (i->is_delete()) continue;
+
+ eversion_t v;
+ int r = osd->store->getattr(i->oid, "version", &v, sizeof(v));
+ if (r < 0 || v < i->version)
+ missing.add(i->oid, i->version);
+ }
}
hash_set<reqid_t> caller_ops;
// recovery pointers
- bool recovery_valid;
list<Entry>::iterator requested_to; // not inclusive of referenced item
list<Entry>::iterator complete_to; // not inclusive of referenced item
/****/
- IndexedLog() : recovery_valid(false) {}
+ IndexedLog() {}
bool logged_object(object_t oid) {
return objects.count(oid);
void index() {
objects.clear();
+ caller_ops.clear();
for (list<Entry>::iterator i = log.begin();
i != log.end();
i++) {
assert(objects.count(e.oid));
if (objects[e.oid]->version == e.version)
objects.erase(e.oid);
+ caller_ops.erase(e.reqid);
}
// accessors
IndexedLog log;
OndiskLog ondisklog;
Missing missing;
-
+ utime_t last_heartbeat; //
protected:
int role; // 0 = primary, 1 = replica, -1=none.
epoch_t last_epoch_started_any;
eversion_t last_complete_commit;
- protected:
// [primary only] content recovery state
eversion_t peers_complete_thru;
+ protected:
set<int> prior_set; // current+prior OSDs, as defined by last_epoch_started_any.
set<int> stray_set; // non-acting osds that have PG data.
set<int> clean_set; // current OSDs that are clean
list<class Message*> > waiting_for_missing_object;
// recovery
- //eversion_t requested_thru;
map<object_t, eversion_t> objects_pulling; // which objects are currently being pulled
public:
peer_info_requested.clear();
peer_log_requested.clear();
clear_primary_recovery_state();
- peers_complete_thru = 0;
}
public:
void generate_backlog();
void drop_backlog();
- void peer(map< int, map<pg_t,Query> >& query_map);
+ void peer(ObjectStore::Transaction& t, map< int, map<pg_t,Query> >& query_map);
- bool do_recovery();
- void start_recovery() {
- log.recovery_valid = true;
- log.requested_to = log.log.begin();
- log.complete_to = log.log.begin();
+ void activate(ObjectStore::Transaction& t);
- do_recovery();
- }
+ bool do_recovery();
- void clean_up_local();
void clean_replicas();
off_t get_log_write_pos() {
}
+ // pg on-disk content
+ void clean_up_local(ObjectStore::Transaction& t);
+
// pg on-disk state
void write_log(ObjectStore::Transaction& t);
void append_log(ObjectStore::Transaction& t,
{
out << "pg[" << pg.info
<< " r=" << pg.get_role();
+ if (pg.get_role() == 0) out << " pct " << pg.peers_complete_thru;
if (pg.is_active()) out << " active";
if (pg.is_clean()) out << " clean";
if (pg.is_stray()) out << " stray";
Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
file_to_extents(inode, offset, len, rd->extents);
- return objecter->readx(rd, onfinish);
+ return objecter->readx(rd, onfinish) > 0 ? 0:-1;
}
int write(inode_t& inode,
Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
file_to_extents(inode, offset, len, wr->extents);
- return objecter->modifyx(wr, onack, oncommit);
+ return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
}
int zero(inode_t& inode,
Objecter::OSDModify *z = new Objecter::OSDModify(OSD_OP_ZERO);
file_to_extents(inode, offset, len, z->extents);
- return objecter->modifyx(z, onack, oncommit);
+ return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
}
// single object.
// make sure we aren't already locking/locked...
- object_t oid = wr->extents.front()->oid;
+ object_t oid = wr->extents.front().oid;
Object *o = 0;
- if (objects.count(oid)) o = get_object(iod, ino);
+ if (objects.count(oid)) o = get_object(oid, ino);
if (!o ||
(o->lock_state != Object::LOCK_WRLOCK &&
o->lock_state != Object::LOCK_WRLOCKING &&
return unclean;
}
+
+
+void ObjectCacher::kick_sync_writers(inodeno_t ino)
+{
+ if (objects_by_ino.count(ino) == 0) {
+ dout(10) << "kick_sync_writers on " << hex << ino << dec << " dne" << endl;
+ return;
+ }
+
+ dout(10) << "kick_sync_writers on " << hex << ino << dec << endl;
+
+ list<Context*> ls;
+
+ set<Object*>& s = objects_by_ino[ino];
+ for (set<Object*>::iterator i = s.begin();
+ i != s.end();
+ i++) {
+ Object *ob = *i;
+
+ ls.splice(ls.begin(), ob->waitfor_wr);
+ }
+
+ finish_contexts(ls);
+}
+
+void ObjectCacher::kick_sync_readers(inodeno_t ino)
+{
+ if (objects_by_ino.count(ino) == 0) {
+ dout(10) << "kick_sync_readers on " << hex << ino << dec << " dne" << endl;
+ return;
+ }
+
+ dout(10) << "kick_sync_readers on " << hex << ino << dec << endl;
+
+ list<Context*> ls;
+
+ set<Object*>& s = objects_by_ino[ino];
+ for (set<Object*>::iterator i = s.begin();
+ i != s.end();
+ i++) {
+ Object *ob = *i;
+
+ ls.splice(ls.begin(), ob->waitfor_rd);
+ }
+
+ finish_contexts(ls);
+}
off_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
+ void kick_sync_writers(inodeno_t ino);
+ void kick_sync_readers(inodeno_t ino);
+
// file functions
void Objecter::scan_pgs(set<pg_t>& changed_pgs, set<pg_t>& down_pgs)
{
+ dout(10) << "scan_pgs" << endl;
+
for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
i != pg_map.end();
i++) {
PG& pg = i->second;
int old = pg.primary;
- if (pg.calc_primary(pgid, osdmap)) {
+ pg.calc_primary(pgid, osdmap);
+
+ if (old != pg.primary) {
if (osdmap->is_down(old)) {
- dout(10) << "scan_pgs pg " << hex << pgid << dec << " went down" << endl;
+ dout(10) << "scan_pgs pg " << hex << pgid << dec
+ << " (" << pg.active_tids << ")"
+ << " primary " << old << " -> " << pg.primary
+ << " (went down)"
+ << endl;
down_pgs.insert(pgid);
} else {
- dout(10) << "scan_pgs pg " << hex << pgid << dec << " changed" << endl;
+ dout(10) << "scan_pgs pg " << hex << pgid << dec
+ << " (" << pg.active_tids << ")"
+ << " primary " << old << " -> " << pg.primary
+ << " (changed)"
+ << endl;
}
changed_pgs.insert(pgid);
}
void Objecter::kick_requests(set<pg_t>& changed_pgs, set<pg_t>& down_pgs)
{
+ dout(10) << "kick_requests changed " << hex << changed_pgs
+ << ", down " << down_pgs << dec
+ << endl;
+
for (set<pg_t>::iterator i = changed_pgs.begin();
i != changed_pgs.end();
i++) {
// WRITE
if (wr->waitfor_ack.count(tid)) {
- // resubmit
if (down_pgs.count(pgid) == 0) {
- dout(0) << "kick_requests resub WRNOOP " << tid << endl;
+ dout(0) << "kick_requests missing ack, resub WRNOOP " << tid << endl;
modifyx_submit(wr, wr->waitfor_ack[tid], true);
} else {
- dout(0) << "kick_requests resub write " << tid << endl;
+ dout(0) << "kick_requests missing ack, resub write " << tid << endl;
modifyx_submit(wr, wr->waitfor_ack[tid]);
}
wr->waitfor_ack.erase(tid);
wr->waitfor_commit.erase(tid);
}
else if (wr->waitfor_commit.count(tid)) {
- // fake it. FIXME.
- dout(0) << "kick_requests faking commit on write " << tid << endl;
- wr->waitfor_ack.erase(tid);
- if (wr->waitfor_ack.empty() && wr->onack) {
- wr->onack->finish(0);
- delete wr->onack;
- wr->onack = 0;
- }
+ dout(0) << "kick_requests missing commit, resub WRNOOP " << tid << endl;
+ modifyx_submit(wr, wr->waitfor_commit[tid], true);
wr->waitfor_commit.erase(tid);
- if (wr->waitfor_commit.empty()) {
- if (wr->oncommit) {
- wr->oncommit->finish(0);
- delete wr->oncommit;
- }
- delete wr;
- }
}
}
handle_osd_read_reply(m);
break;
+ case OSD_OP_WRNOOP:
case OSD_OP_WRITE:
case OSD_OP_ZERO:
case OSD_OP_WRUNLOCK:
case OSD_OP_WRLOCK:
+ case OSD_OP_RDLOCK:
+ case OSD_OP_RDUNLOCK:
+ case OSD_OP_UPLOCK:
+ case OSD_OP_DNLOCK:
handle_osd_modify_reply(m);
break;
tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl,
- Context *onfinish)
+ Context *onfinish)
{
OSDRead *rd = new OSDRead(bl);
rd->extents.push_back(ObjectExtent(oid, off, len));
OSD_OP_READ);
m->set_length(ex.length);
m->set_offset(ex.start);
- dout(10) << "readx_submit tid " << last_tid << " to osd" << pg.primary
+ dout(10) << "readx_submit " << rd << " tid " << last_tid
<< " oid " << hex << ex.oid << dec << " " << ex.start << "~" << ex.length
- << " (" << ex.buffer_extents.size() << " buffer fragments)" << endl;
+ << " (" << ex.buffer_extents.size() << " buffer fragments)"
+ << " pg " << hex << ex.pgid << dec
+ << " osd" << pg.primary
+ << endl;
if (pg.primary >= 0)
messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0);
{
// get pio
tid_t tid = m->get_tid();
+
+ if (op_read.count(tid) == 0) {
+ dout(7) << "handle_osd_read_reply " << tid << " ... stray" << endl;
+ delete m;
+ return;
+ }
+
dout(7) << "handle_osd_read_reply " << tid << endl;
-
- assert(op_read.count(tid));
OSDRead *rd = op_read[ tid ];
op_read.erase( tid );
if (rd->ops.empty()) {
// all done
size_t bytes_read = 0;
- rd->bl->clear();
if (rd->read_data.size()) {
dout(15) << " assembling frags" << endl;
Context *onfinish = rd->onfinish;
dout(7) << " " << bytes_read << " bytes "
- //<< rd->bl->length()
+ << rd->bl->length()
<< endl;
// done
pg.active_tids.insert(last_tid);
// send
- dout(10) << "modifyx_submit op " << wr->op << " tid " << last_tid
- << " osd" << pg.primary << " oid " << hex << ex.oid << dec
- << " " << ex.start << "~" << ex.length << endl;
+ dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << last_tid
+ << " oid " << hex << ex.oid << dec
+ << " " << ex.start << "~" << ex.length
+ << " pg " << hex << ex.pgid << dec
+ << " osd" << pg.primary
+ << endl;
if (pg.primary >= 0)
messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0);
}
{
// get pio
tid_t tid = m->get_tid();
+
+ if (op_modify.count(tid) == 0) {
+ dout(7) << "handle_osd_modify_reply " << tid << " commit " << m->get_commit() << " ... stray" << endl;
+ delete m;
+ return;
+ }
+
dout(7) << "handle_osd_modify_reply " << tid << " commit " << m->get_commit() << endl;
- assert(op_modify.count(tid));
OSDModify *wr = op_modify[ tid ];
Context *onack = 0;
map<tid_t, ObjectExtent> ops;
map<object_t, bufferlist*> read_data; // bits of data as they come back
- OSDRead(bufferlist *b) : bl(b), onfinish(0) {}
+ OSDRead(bufferlist *b) : bl(b), onfinish(0) {
+ bl->clear();
+ }
};
// generic modify
PG() : primary(-1) {}
- bool calc_primary(pg_t pgid, OSDMap *osdmap) { // return true if change
- int n = osdmap->get_pg_acting_primary(pgid);
- if (n == primary)
- return false;
- primary = n;
- return true;
+ void calc_primary(pg_t pgid, OSDMap *osdmap) { // return true if change
+ primary = osdmap->get_pg_acting_primary(pgid);
}
};