osd/rados
+- rewrite tcpmessenger
+- osd query to osdmon on startup
+- add pg->info.pg_member_since. wrt replication ops.
+- write log to store
+ - backlog?
- handle down osds
- down recv behavior
- clarify powercycle process
- osd restart
- save pg state in store
-- add pg->info.pg_member_since. wrt replication ops.
-- pg log trim
- - replica feedback to primary
- - primary trim floor to replicas
- osdmap history?
- keep maps on disk?
- incremental maps?
int BlockDevice::open_fd()
{
- return ::open(dev, O_CREAT|O_RDWR|O_SYNC|O_DIRECT, 0);
+ return ::open(dev, O_RDWR|O_SYNC|O_DIRECT, 0);
}
int BlockDevice::open(kicker *idle)
finisher_lock.Unlock();
- ebofs_lock.Lock();
+ //ebofs_lock.Lock(); // um.. why lock this? -sage
finish_contexts(ls, 0);
- ebofs_lock.Unlock();
+ //ebofs_lock.Unlock();
finisher_lock.Lock();
}
-void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
+void Ebofs::apply_write(Onode *on, off_t off, size_t len, bufferlist& bl)
{
ObjectCache *oc = on->get_oc(&bc);
// *** file i/o ***
-bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl,
+bool Ebofs::attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl,
Cond *will_wait_on, bool *will_wait_on_bool)
{
- dout(10) << "attempt_read " << *on << " len " << len << " off " << off << endl;
+ dout(10) << "attempt_read " << *on << " " << off << "~" << len << endl;
ObjectCache *oc = on->get_oc(&bc);
// map
}
int Ebofs::read(object_t oid,
- size_t len, off_t off,
+ off_t off, size_t len,
bufferlist& bl)
{
ebofs_lock.Lock();
- dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << endl;
+ int r = _read(oid, off, len, bl);
+ ebofs_lock.Unlock();
+ return r;
+}
+
+int Ebofs::_read(object_t oid, off_t off, size_t len, bufferlist& bl)
+{
+ dout(7) << "_read " << hex << oid << dec << " " << off << "~" << len << endl;
Onode *on = get_onode(oid);
if (!on) {
- dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << " ... dne " << endl;
- ebofs_lock.Unlock();
+ dout(7) << "_read " << hex << oid << dec << " " << off << "~" << len << " ... dne " << endl;
return -ENOENT; // object dne?
}
while (1) {
// check size bound
if (off >= on->object_size) {
- dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << " ... off past eof " << on->object_size << endl;
+ dout(7) << "_read " << hex << oid << dec << " " << off << "~" << len << " ... off past eof " << on->object_size << endl;
r = -ESPIPE; // FIXME better errno?
break;
}
size_t will_read = MIN(off+len, on->object_size) - off;
bool done;
- if (attempt_read(on, will_read, off, bl, &cond, &done))
+ if (attempt_read(on, off, will_read, bl, &cond, &done))
break; // yay
// wait
cond.Wait(ebofs_lock);
if (on->deleted) {
- dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << " ... object deleted" << endl;
+ dout(7) << "_read " << hex << oid << dec << " " << off << "~" << len << " ... object deleted" << endl;
r = -ENOENT;
break;
}
trim_bc();
- ebofs_lock.Unlock();
-
if (r < 0) return r; // return error,
- dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << " ... got " << bl.length() << endl;
+ dout(7) << "_read " << hex << oid << dec << " " << off << "~" << len << " ... got " << bl.length() << endl;
return bl.length(); // or bytes read.
}
p != t.ops.end();
p++) {
switch (*p) {
+ case Transaction::OP_READ:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ bufferlist *pbl = t.pbls.front(); t.pbls.pop_front();
+ if (_read(oid, offset, len, *pbl) < 0) {
+ dout(7) << "apply_transaction fail on _read" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_STAT:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ struct stat *st = t.psts.front(); t.psts.pop_front();
+ if (_stat(oid, st) < 0) {
+ dout(7) << "apply_transaction fail on _stat" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+ case Transaction::OP_GETATTR:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
+ if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
+ dout(7) << "apply_transaction fail on _getattr" << endl;
+ r &= bit;
+ }
+ }
+ break;
+
+
case Transaction::OP_WRITE:
{
object_t oid = t.oids.front(); t.oids.pop_front();
dirty_onode(on); // dirty onode!
// apply write to buffer cache
- apply_write(on, length, offset, bl);
+ apply_write(on, offset, length, bl);
// done.
put_onode(on);
int Ebofs::stat(object_t oid, struct stat *st)
{
ebofs_lock.Lock();
- dout(7) << "stat " << hex << oid << dec << endl;
+ int r = _stat(oid,st);
+ ebofs_lock.Unlock();
+ return r;
+}
+
+int Ebofs::_stat(object_t oid, struct stat *st)
+{
+ dout(7) << "_stat " << hex << oid << dec << endl;
Onode *on = get_onode(oid);
- if (!on) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!on) return -ENOENT;
// ??
st->st_size = on->object_size;
put_onode(on);
- ebofs_lock.Unlock();
return 0;
}
int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size)
{
- int r = 0;
ebofs_lock.Lock();
- dout(8) << "getattr " << hex << oid << dec << " '" << name << "' maxlen " << size << endl;
+ int r = _getattr(oid, name, value, size);
+ ebofs_lock.Unlock();
+ return r;
+}
+
+int Ebofs::_getattr(object_t oid, const char *name, void *value, size_t size)
+{
+ dout(8) << "_getattr " << hex << oid << dec << " '" << name << "' maxlen " << size << endl;
Onode *on = get_onode(oid);
- if (!on) {
- ebofs_lock.Unlock();
- return -ENOENT;
- }
+ if (!on) return -ENOENT;
string n(name);
+ int r = 0;
if (on->attr.count(n) == 0) {
r = -1;
} else {
memcpy(value, on->attr[n].data, r );
}
put_onode(on);
- ebofs_lock.Unlock();
return r;
}
block_t start, block_t len,
interval_set<block_t>& alloc,
block_t& old_bfirst, block_t& old_blast);
- void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
- bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl,
+ void apply_write(Onode *on, off_t off, size_t len, bufferlist& bl);
+ bool attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl,
Cond *will_wait_on, bool *will_wait_on_bool);
// ** finisher **
// object interface
bool exists(object_t);
int stat(object_t, struct stat*);
- int read(object_t, size_t len, off_t off, bufferlist& bl);
+ int read(object_t, off_t off, size_t len, bufferlist& bl);
int write(object_t oid, size_t len, off_t off, bufferlist& bl, bool fsync=true);
int write(object_t oid, size_t len, off_t offset, bufferlist& bl, Context *onsafe);
int truncate(object_t oid, off_t size, Context *onsafe=0);
private:
// private interface -- use if caller already holds lock
+ int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
+ int _stat(object_t oid, struct stat *st);
+ int _getattr(object_t oid, const char *name, void *value, size_t size);
+
bool _write_will_block();
int _write(object_t oid, size_t len, off_t offset, bufferlist& bl);
int _truncate(object_t oid, off_t size);
#include "mds/MDCluster.h"
#include "mds/MDS.h"
#include "osd/OSD.h"
+#include "mds/OSDMonitor.h"
#include "client/Client.h"
#include "client/SyntheticClient.h"
gethostname(hostname,100);
//int pid = getpid();
+ // create mon
+ OSDMonitor *mon = new OSDMonitor(0, new FakeMessenger(MSG_ADDR_MON(0)));
+ mon->init();
+
// create mds
MDS *mds[NUMMDS];
OSD *mdsosd[NUMMDS];
for (int i=0; i<NUMOSD; i++) {
osd[i]->init();
}
+
- // create client
+ // create client(s)
for (int i=0; i<NUMCLIENT; i++) {
client[i]->init();
syn[i]->start_thread();
}
+
+
for (int i=0; i<NUMCLIENT; i++) {
cout << "waiting for synthetic client " << i << " to finish" << endl;
typedef __uint64_t ps_t; // placement seed
typedef __uint64_t pg_t; // placement group
typedef __uint64_t coll_t; // collection id
+typedef __uint64_t epoch_t; // map epoch
+typedef __uint64_t tid_t; // transaction id
#ifdef OBJECT128
typedef lame128_t object_t;
typedef __uint64_t object_t; // object id
#endif
-
#define PG_NONE 0xffffffffffffffffLL
+
+class OSDSuperblock {
+public:
+ __uint64_t fsid; // unique fs id (random number)
+ int whoami; // my role in this fs.
+ epoch_t current_epoch; // most recent epoch
+ epoch_t oldest_map, newest_map; // oldest/newest maps we have.
+ OSDSuperblock(__uint64_t f=0, int w=0) :
+ fsid(f), whoami(w),
+ current_epoch(0), oldest_map(0), newest_map(0) {}
+};
+
+
// new types
-typedef __uint64_t tid_t; // transaction id
class ObjectExtent {
public:
<< ")";
}
-/*
-struct ostat {
- object_t object_id;
- size_t size;
- time_t ctime;
- time_t mtime;
-};
-*/
-
-struct onode_t {
- object_t oid;
- pg_t pgid;
- version_t version;
- size_t size;
- //time_t ctime, mtime;
-};
// client types
#include "MDBalancer.h"
#include "IdAllocator.h"
#include "AnchorTable.h"
-#include "OSDMonitor.h"
-//#include "PGManager.h"
#include "include/filepath.h"
//MDS *g_mds;
-class C_FakeOSDFailure : public Context {
- MDS *mds;
- int osd;
- bool down;
-public:
- C_FakeOSDFailure(MDS *m, int o, bool d) : mds(m), osd(o), down(d) {}
- void finish(int r) {
- mds->fake_osd_failure(osd,down);
- }
-};
-
// cons/des
MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
mdlog = new MDLog(this);
balancer = new MDBalancer(this);
anchormgr = new AnchorTable(this);
- osdmonitor = new OSDMonitor(this);
req_rate = 0;
pgmanager = 0;
}*/
- // <HACK set up OSDMap from g_conf>
- osdmap = new OSDMap();
- osdmap->set_pg_bits(g_conf.osd_pg_bits);
- osdmap->inc_epoch(); // = 1
- assert(osdmap->get_epoch() == 1);
-
- if (g_conf.mkfs) osdmap->set_mkfs();
-
- Bucket *b = new UniformBucket(1, 0);
- int root = osdmap->crush.add_bucket(b);
- for (int i=0; i<g_conf.num_osd; i++) {
- osdmap->osds.insert(i);
- b->add_item(i, 1);
- }
-
- for (int i=1; i<5; i++) {
- osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
- osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0));
- osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
- }
-
- if (g_conf.mds_local_osd) {
- // add mds osds, but don't put them in the crush mapping func
- for (int i=0; i<g_conf.num_mds; i++)
- osdmap->osds.insert(i+10000);
- }
-
- if (whoami == 0) {
- // fake osd failures
- for (map<int,float>::iterator i = g_fake_osd_down.begin();
- i != g_fake_osd_down.end();
- i++) {
- dout(0) << "osd" << i->first << " DOWN after " << i->second << endl;
- g_timer.add_event_after(i->second, new C_FakeOSDFailure(this, i->first, 1));
- }
- for (map<int,float>::iterator i = g_fake_osd_out.begin();
- i != g_fake_osd_out.end();
- i++) {
- dout(0) << "osd" << i->first << " OUT after " << i->second << endl;
- g_timer.add_event_after(i->second, new C_FakeOSDFailure(this, i->first, 0));
- }
- }
-
- // </HACK>
+ osdmap = 0;
objecter = new Objecter(messenger, osdmap);
filer = new Filer(objecter);
if (mdstore) { delete mdstore; mdstore = NULL; }
if (mdlog) { delete mdlog; mdlog = NULL; }
if (balancer) { delete balancer; balancer = NULL; }
- if (osdmonitor) { delete osdmonitor; osdmonitor = 0; }
if (idalloc) { delete idalloc; idalloc = NULL; }
if (anchormgr) { delete anchormgr; anchormgr = NULL; }
if (osdmap) { delete osdmap; osdmap = 0; }
int MDS::init()
{
-
- osdmonitor->init();
-
+ // request osd map
+ dout(5) << "requesting osdmap from mon0" << endl;
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), MSG_ADDR_MON(0));
return 0;
}
// shut down cache
mdcache->shutdown();
+ // tell monitor to die
+ messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), MSG_ADDR_MON(0));
+
// shut down messenger
messenger->shutdown();
}
-void MDS::fake_osd_failure(int osd, bool down)
-{
- if (down) {
- dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
- osdmap->down_osds.insert(osd);
- } else {
- dout(1) << "fake_osd_failure OUT osd" << osd << endl;
- osdmap->out_osds.insert(osd);
- }
- osdmap->inc_epoch();
- bcast_osd_map();
-}
-
-void MDS::bcast_osd_map()
-{
- dout(1) << "bcast_osd_map epoch " << osdmap->get_epoch() << endl;
- assert(get_nodeid() == 0);
- // tell mds
- for (int i=0; i<get_cluster()->get_num_mds(); i++) {
- messenger->send_message(new MOSDMap(osdmap),
- MSG_ADDR_MDS(i));
- }
-
- // tell osds
- set<int> osds;
- osdmap->get_all_osds(osds);
- for (set<int>::iterator it = osds.begin();
- it != osds.end();
- it++) {
- messenger->send_message(new MOSDMap(osdmap),
- MSG_ADDR_OSD(*it));
- }
-
- // tell clients
- for (set<int>::iterator it = mounted_clients.begin();
- it != mounted_clients.end();
- it++) {
- messenger->send_message(new MOSDMap(osdmap),
- MSG_ADDR_CLIENT(*it));
- }
-}
switch (m->get_type()) {
// OSD ===============
+ /*
case MSG_OSD_MKFS_ACK:
handle_osd_mkfs_ack(m);
return;
+ */
case MSG_OSD_OPREPLY:
objecter->handle_osd_op_reply((class MOSDOpReply*)m);
return;
handle_osd_map((MOSDMap*)m);
return;
- case MSG_OSD_GETMAP:
- handle_osd_getmap(m);
- return;
// MDS
case MSG_MDS_SHUTDOWNSTART: // mds0 -> mds1+
// paused?
+ if (!osdmap) {
+ dout(3) << "no osdmap yet, waiting" << endl;
+ mds_paused = true;
+ }
+
if (mds_paused) {
dout(3) << "paused" << endl;
waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m));
anchormgr->proc_message(m);
break;
- case MDS_PORT_OSDMON:
- osdmonitor->proc_message(m);
- break;
-
case MDS_PORT_CACHE:
mdcache->proc_message(m);
break;
}
-void MDS::handle_osd_getmap(Message *m)
-{
- dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl;
-
- messenger->send_message(new MOSDMap(osdmap),
- m->get_source());
- delete m;
-}
void MDS::handle_osd_map(MOSDMap *m)
if (osdmap) {
dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << " > " << osdmap->get_epoch() << endl;
} else {
- dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << endl;
+ dout(3) << "handle_osd_map got first osd map epoch " << m->get_epoch() << endl;
+ objecter->osdmap = osdmap = new OSDMap;
+
+ // unpause
+ mds_paused = false;
+ queue_finished(waiting_for_unpause);
}
osdmap->decode(m->get_osdmap());
// kick requests who might be timing out on the wrong osds
// ** FIXME **
+ // tell clients
+ for (set<int>::iterator it = mounted_clients.begin();
+ it != mounted_clients.end();
+ it++) {
+ messenger->send_message(new MOSDMap(osdmap),
+ MSG_ADDR_CLIENT(*it));
+ }
+
} else {
dout(3) << "handle_osd_map ignoring osd map epoch " << m->get_epoch() << " <= " << osdmap->get_epoch() << endl;
}
}
+/*
void MDS::mkfs(Context *onfinish)
{
dout(7) << "mkfs, wiping all OSDs" << endl;
waiting_for_mkfs = 0;
}
}
+*/
// fake out idalloc (reset, pretend loaded)
idalloc->reset();
- //if (pgmanager) pgmanager->mark_open();
// init osds too
- mkfs(new C_MDS_Unpause(this));
- waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m));
- return;
+ //mkfs(new C_MDS_Unpause(this));
+ //waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m));
}
}
class Filer;
class AnchorTable;
-class OSDMonitor;
class MDCluster;
class CInode;
class CDir;
Objecter *objecter;
Filer *filer; // for reading/writing to/from osds
AnchorTable *anchormgr;
- OSDMonitor *osdmonitor;
// PGManager *pgmanager;
protected:
public:
void mkfs(Context *onfinish);
void handle_osd_mkfs_ack(Message *m);
- void bcast_osd_map();
- void fake_osd_failure(int osd, bool d);
// messages
void proc_message(Message *m);
#include "OSDMonitor.h"
-#include "MDS.h"
+
#include "osd/OSDMap.h"
#include "msg/Message.h"
#include "messages/MPingAck.h"
#include "messages/MFailure.h"
#include "messages/MFailureAck.h"
+#include "messages/MOSDMap.h"
+#include "messages/MOSDBoot.h"
#include "common/Timer.h"
#include "common/Clock.h"
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".osdmon "
+#define dout(l) if (l<=g_conf.debug) cout << "mon" << whoami << " "
}
};
+class C_OM_FakeOSDFailure : public Context {
+ OSDMonitor *mon;
+ int osd;
+ bool down;
+public:
+ C_OM_FakeOSDFailure(OSDMonitor *m, int o, bool d) : mon(m), osd(o), down(d) {}
+ void finish(int r) {
+ mon->fake_osd_failure(osd,down);
+ }
+};
+
+
+
void OSDMonitor::fake_reorg()
{
if (d > 0) {
dout(1) << "changing OSD map, marking osd" << d-1 << " out" << endl;
- mds->osdmap->mark_out(d-1);
+ osdmap->mark_out(d-1);
}
dout(1) << "changing OSD map, marking osd" << d << " down" << endl;
- mds->osdmap->mark_down(d);
+ osdmap->mark_down(d);
- mds->osdmap->inc_epoch();
+ osdmap->inc_epoch();
d++;
// bcast
- mds->bcast_osd_map();
+ bcast_osd_map();
// do it again?
if (g_conf.num_osd - d > 4 &&
void OSDMonitor::init()
{
+ dout(1) << "init" << endl;
+
+ // <HACK set up OSDMap from g_conf>
+ osdmap = new OSDMap();
+ osdmap->set_pg_bits(g_conf.osd_pg_bits);
+ osdmap->inc_epoch(); // = 1
+ assert(osdmap->get_epoch() == 1);
+
+ if (g_conf.mkfs) osdmap->set_mkfs();
+
+ Bucket *b = new UniformBucket(1, 0);
+ int root = osdmap->crush.add_bucket(b);
+ for (int i=0; i<g_conf.num_osd; i++) {
+ osdmap->osds.insert(i);
+ b->add_item(i, 1);
+ }
- if (mds->get_nodeid() == 0 &&
+ for (int i=1; i<5; i++) {
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ }
+
+ if (g_conf.mds_local_osd) {
+ // add mds osds, but don't put them in the crush mapping func
+ for (int i=0; i<g_conf.num_mds; i++)
+ osdmap->osds.insert(i+10000);
+ }
+
+ // </HACK>
+
+
+
+ if (whoami == 0 &&
g_conf.num_osd > 4 &&
g_conf.fake_osdmap_expand) {
dout(1) << "scheduling OSD map reorg at " << g_conf.fake_osdmap_expand << endl;
g_timer.add_event_after(g_conf.fake_osdmap_expand,
new C_OM_Faker(this));
}
-
+
+ if (whoami == 0) {
+ // fake osd failures
+ for (map<int,float>::iterator i = g_fake_osd_down.begin();
+ i != g_fake_osd_down.end();
+ i++) {
+ dout(0) << "osd" << i->first << " DOWN after " << i->second << endl;
+ g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 1));
+ }
+ for (map<int,float>::iterator i = g_fake_osd_out.begin();
+ i != g_fake_osd_out.end();
+ i++) {
+ dout(0) << "osd" << i->first << " OUT after " << i->second << endl;
+ g_timer.add_event_after(i->second, new C_OM_FakeOSDFailure(this, i->first, 0));
+ }
+ }
+
+
+ // i'm ready!
+ messenger->set_dispatcher(this);
}
-void OSDMonitor::proc_message(Message *m)
+void OSDMonitor::dispatch(Message *m)
{
switch (m->get_type()) {
case MSG_FAILURE:
case MSG_PING_ACK:
handle_ping_ack((MPingAck*)m);
break;
+
+ case MSG_OSD_GETMAP:
+ handle_osd_getmap(m);
+ return;
+
+ case MSG_OSD_BOOT:
+ handle_osd_boot((MOSDBoot*)m);
+ return;
+
+ case MSG_SHUTDOWN:
+ handle_shutdown(m);
+ return;
+
+ default:
+ dout(0) << "unknown message " << *m << endl;
+ assert(0);
}
}
+void OSDMonitor::handle_shutdown(Message *m)
+{
+ dout(1) << "shutdown from " << m->get_source() << endl;
+ messenger->shutdown();
+ delete m;
+}
+
void OSDMonitor::handle_ping_ack(MPingAck *m)
{
// ...
dout(1) << "osd failure: " << MSG_ADDR_NICE(m->get_failed()) << " from " << MSG_ADDR_NICE(m->get_source()) << endl;
// ack
- mds->messenger->send_message(new MFailureAck(m),
- m->get_source(), m->get_source_port());
+ messenger->send_message(new MFailureAck(m),
+ m->get_source(), m->get_source_port());
delete m;
}
+void OSDMonitor::fake_osd_failure(int osd, bool down)
+{
+ if (down) {
+ dout(1) << "fake_osd_failure DOWN osd" << osd << endl;
+ osdmap->down_osds.insert(osd);
+ } else {
+ dout(1) << "fake_osd_failure OUT osd" << osd << endl;
+ osdmap->out_osds.insert(osd);
+ }
+ osdmap->inc_epoch();
+ bcast_osd_map();
+}
+
+
+void OSDMonitor::handle_osd_boot(MOSDBoot *m)
+{
+ dout(7) << "osd_boot from " << m->get_source() << endl;
+
+ // FIXME: check for reboots, etc.
+ // ...mark up in map...
+
+ messenger->send_message(new MOSDMap(osdmap),
+ m->get_source());
+ delete m;
+}
+
+void OSDMonitor::handle_osd_getmap(Message *m)
+{
+ dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl;
+
+ messenger->send_message(new MOSDMap(osdmap),
+ m->get_source());
+ delete m;
+}
+
+
+void OSDMonitor::bcast_osd_map()
+{
+ dout(1) << "bcast_osd_map epoch " << osdmap->get_epoch() << endl;
+
+ // tell mds
+ for (int i=0; i<g_conf.num_mds; i++) {
+ messenger->send_message(new MOSDMap(osdmap),
+ MSG_ADDR_MDS(i));
+ }
+
+ // tell osds
+ set<int> osds;
+ osdmap->get_all_osds(osds);
+ for (set<int>::iterator it = osds.begin();
+ it != osds.end();
+ it++) {
+ messenger->send_message(new MOSDMap(osdmap),
+ MSG_ADDR_OSD(*it));
+ }
+}
#include <set>
using namespace std;
-class MDS;
-class Message;
+#include "include/types.h"
+#include "msg/Messenger.h"
-class OSDMonitor {
- MDS *mds;
+class OSDMap;
+class OSDMonitor : public Dispatcher {
+ // me
+ int whoami;
+ Messenger *messenger;
+
+ // maps
+ OSDMap *osdmap;
+ map<epoch_t, OSDMap*> osdmaps;
+
+ // monitoring
map<int,time_t> last_heard_from_osd;
map<int,time_t> last_pinged_osd;
// etc..
set<int> my_osds;
public:
- OSDMonitor(MDS *mds) {
- this->mds = mds;
+ OSDMonitor(int w, Messenger *m) :
+ whoami(w),
+ messenger(m),
+ osdmap(0) {
}
void init();
- void proc_message(Message *m);
+ void dispatch(Message *m);
+ void handle_shutdown(Message *m);
+
+ void handle_osd_boot(class MOSDBoot *m);
+ void handle_osd_getmap(Message *m);
+
void handle_ping_ack(class MPingAck *m);
void handle_failure(class MFailure *m);
+ void bcast_osd_map();
+
+
// hack
+ void fake_osd_failure(int osd, bool down);
void fake_reorg();
};
--- /dev/null
+#ifndef __MOSDBOOT_H
+#define __MOSDBOOT_H
+
+#include "msg/Message.h"
+
+#include "include/types.h"
+
+class MOSDBoot : public Message {
+ OSDSuperblock sb;
+
+ public:
+ MOSDBoot() {}
+ MOSDBoot(OSDSuperblock& s) :
+ Message(MSG_OSD_BOOT),
+ sb(s) {
+ }
+
+ char *get_type_name() { return "oboot"; }
+
+ void encode_payload() {
+ payload.append((char*)&sb, sizeof(sb));
+ }
+ void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(sb), (char*)&sb);
+ off += sizeof(sb);
+ }
+};
+
+#endif
int pg_role;//, rg_nrep;
epoch_t map_epoch;
+ version_t pg_trim_to; // primary->replica: trim to here
+
int op;
size_t length, offset;
version_t version;
const version_t get_version() { return st.version; }
const version_t get_old_version() { return st.old_version; }
+ const version_t get_pg_trim_to() { return st.pg_trim_to; }
+ void set_pg_trim_to(version_t v) { st.pg_trim_to = v; }
+
const int get_op() { return st.op; }
const size_t get_length() { return st.length; }
const size_t get_offset() { return st.offset; }
const bool wants_ack() { return st.want_ack; }
const bool wants_commit() { return st.want_commit; }
+
void set_data(bufferlist &d) {
data.claim(d);
st._data_len = data.length();
size_t object_size;
version_t version;
+ version_t pg_complete_thru;
+
epoch_t _new_map_epoch;
size_t _data_len, _oc_len;
} MOSDOpReply_st;
size_t get_object_size() { return st.object_size; }
version_t get_version() { return st.version; }
+ version_t get_pg_complete_thru() { return st.pg_complete_thru; }
+ void set_pg_complete_thru(version_t v) { st.pg_complete_thru = v; }
+
void set_result(int r) { st.result = r; }
void set_length(size_t s) { st.length = s; }
void set_offset(size_t o) { st.offset = o; }
class MOSDPGSummary : public Message {
epoch_t epoch;
pg_t pgid;
- bufferlist sumbl;
public:
+ PG::PGInfo info;
+ bufferlist sumbl;
+
epoch_t get_epoch() { return epoch; }
-
+
MOSDPGSummary() {}
MOSDPGSummary(version_t mv, pg_t pgid, PG::PGSummary &summary) :
Message(MSG_OSD_PG_SUMMARY) {
void encode_payload() {
payload.append((char*)&epoch, sizeof(epoch));
payload.append((char*)&pgid, sizeof(pgid));
+ payload.append((char*)&info, sizeof(info));
payload.claim_append(sumbl);
}
void decode_payload() {
off += sizeof(epoch);
payload.copy(off, sizeof(pgid), (char*)&pgid);
off += sizeof(pgid);
+ payload.copy(off, sizeof(info), (char*)&info);
+ off += sizeof(info);
payload.splice(0, off);
sumbl.claim(payload);
#define MSG_SHUTDOWN 99999
-#define MSG_OSD_OP 14 // delete, etc.
-#define MSG_OSD_OPREPLY 15 // delete, etc.
-#define MSG_OSD_PING 16
+#define MSG_OSD_OP 20 // delete, etc.
+#define MSG_OSD_OPREPLY 21 // delete, etc.
+#define MSG_OSD_PING 22
-#define MSG_OSD_GETMAP 17
-#define MSG_OSD_MAP 18
+#define MSG_OSD_GETMAP 23
+#define MSG_OSD_MAP 24
-#define MSG_OSD_MKFS_ACK 19
+#define MSG_OSD_BOOT 25
+#define MSG_OSD_MKFS_ACK 26
#define MSG_OSD_PG_NOTIFY 50
#define MSG_OSD_PG_QUERY 51
// use fixed offsets and static entity -> logical addr mapping!
-#define MSG_ADDR_DIRECTORY_BASE 0
+#define MSG_ADDR_NAMER_BASE 0
#define MSG_ADDR_RANK_BASE 0x10000000 // per-rank messenger services
#define MSG_ADDR_MDS_BASE 0x20000000
#define MSG_ADDR_OSD_BASE 0x30000000
-#define MSG_ADDR_CLIENT_BASE 0x40000000
+#define MSG_ADDR_MON_BASE 0x40000000
+#define MSG_ADDR_CLIENT_BASE 0x50000000
#define MSG_ADDR_TYPE_MASK 0xf0000000
#define MSG_ADDR_NUM_MASK 0x0fffffff
case MSG_ADDR_RANK_BASE: return "rank";
case MSG_ADDR_MDS_BASE: return "mds";
case MSG_ADDR_OSD_BASE: return "osd";
+ case MSG_ADDR_MON_BASE: return "mon";
case MSG_ADDR_CLIENT_BASE: return "client";
- case MSG_ADDR_DIRECTORY_BASE: return "namer";
+ case MSG_ADDR_NAMER_BASE: return "namer";
}
return "unknown";
}
bool is_client() const { return type() == MSG_ADDR_CLIENT_BASE; }
bool is_mds() const { return type() == MSG_ADDR_MDS_BASE; }
bool is_osd() const { return type() == MSG_ADDR_OSD_BASE; }
- bool is_namer() const { return type() == MSG_ADDR_DIRECTORY_BASE; }
+ bool is_mon() const { return type() == MSG_ADDR_MON_BASE; }
+ bool is_namer() const { return type() == MSG_ADDR_NAMER_BASE; }
};
inline bool operator== (const msg_addr_t& l, const msg_addr_t& r) { return l._addr == r._addr; }
#define MSG_ADDR_RANK(x) msg_addr_t(MSG_ADDR_RANK_BASE,x)
#define MSG_ADDR_MDS(x) msg_addr_t(MSG_ADDR_MDS_BASE,x)
#define MSG_ADDR_OSD(x) msg_addr_t(MSG_ADDR_OSD_BASE,x)
+#define MSG_ADDR_MON(x) msg_addr_t(MSG_ADDR_MON_BASE,x)
#define MSG_ADDR_CLIENT(x) msg_addr_t(MSG_ADDR_CLIENT_BASE,x)
+#define MSG_ADDR_NAMER(x) msg_addr_t(MSG_ADDR_NAMER_BASE,x)
#define MSG_ADDR_UNDEF msg_addr_t()
-#define MSG_ADDR_DIRECTORY msg_addr_t(MSG_ADDR_DIRECTORY_BASE,0)
+#define MSG_ADDR_DIRECTORY MSG_ADDR_NAMER(0)
#define MSG_ADDR_RANK_NEW MSG_ADDR_RANK(MSG_ADDR_NEW)
#define MSG_ADDR_MDS_NEW MSG_ADDR_MDS(MSG_ADDR_NEW)
#define MSG_ADDR_OSD_NEW MSG_ADDR_OSD(MSG_ADDR_NEW)
#define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW)
+#define MSG_ADDR_NAMER_NEW MSG_ADDR_NAMER(MSG_ADDR_NEW)
#define MSG_ADDR_ISCLIENT(x) x.is_client()
#define MSG_ADDR_TYPE(x) x.type_str()
#include "messages/MFailure.h"
#include "messages/MFailureAck.h"
+#include "messages/MOSDBoot.h"
#include "messages/MOSDPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
m = new MFailureAck();
break;
+ case MSG_OSD_BOOT:
+ m = new MOSDBoot();
+ break;
case MSG_OSD_PING:
m = new MOSDPing();
break;
case MSG_OSD_PG_REMOVE:
m = new MOSDPGRemove();
break;
- /*
- case MSG_OSD_PG_QUERY:
- m = new MOSDPGQuery();
- break;
- case MSG_OSD_PG_QUERYREPLY:
- m = new MOSDPGQueryReply();
- break;
- */
+
// clients
case MSG_CLIENT_MOUNT:
m = new MClientMount();
}
int FakeStore::read(object_t oid,
- size_t len, off_t offset,
+ off_t offset, size_t len,
bufferlist& bl) {
dout(20) << "read " << oid << " len " << len << " off " << offset << endl;
int remove(object_t oid, Context *onsafe);
int truncate(object_t oid, off_t size, Context *onsafe);
int read(object_t oid,
- size_t len, off_t offset,
+ off_t offset, size_t len,
bufferlist& bl);
int write(object_t oid,
size_t len, off_t offset,
#include "msg/Messenger.h"
#include "msg/Message.h"
-#include "mds/MDS.h"
#include "msg/HostMonitor.h"
#include "messages/MGenericMessage.h"
#include "messages/MPingAck.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
#include "messages/MOSDPGNotify.h"
sprintf(s, "osd%d", whoami);
string st = s;
monitor = new HostMonitor(m, st);
- monitor->set_notify_port(MDS_PORT_OSDMON);
// <hack> for testing monitoring
int i = whoami;
if (++i == g_conf.num_osd) i = 0;
monitor->get_hosts().insert(MSG_ADDR_OSD(i));
- monitor->get_notify().insert(MSG_ADDR_MDS(0));
+ monitor->get_notify().insert(MSG_ADDR_MON(0));
// </hack>
// log
int r = store->mount();
assert(r>=0);
- // age?
- if (g_conf.osd_age_time > 0) {
- Ager ager(store);
- ager.age(g_conf.osd_age_time, g_conf.osd_age, g_conf.osd_age / 2.0, 5, g_conf.osd_age);
+ if (g_conf.osd_mkfs) {
+ // age?
+ if (g_conf.osd_age_time > 0) {
+ dout(2) << "age" << endl;
+ Ager ager(store);
+ ager.age(g_conf.osd_age_time, g_conf.osd_age, g_conf.osd_age / 2.0, 5, g_conf.osd_age);
+ }
+ }
+ else {
+ dout(2) << "boot" << endl;
+
+ // read superblock
+ read_superblock();
}
// monitor.
monitor->init();
+
+ // i'm ready!
+ messenger->set_dispatcher(this);
+
+ // announce to monitor i exist and have booted.
+ messenger->send_message(new MOSDBoot(superblock), MSG_ADDR_MON(0));
+
}
osd_lock.Unlock();
- // i'm ready!
- messenger->set_dispatcher(this);
-
return 0;
}
bufferlist bl;
bl.append((char*)&superblock, sizeof(superblock));
- int r = store->write(SUPERBLOCK_OBJECT, 0, sizeof(superblock), bl);
+ int r = store->write(SUPERBLOCK_OBJECT, 0, sizeof(superblock), bl, false);
assert(r >= 0);
}
+void OSD::read_superblock()
+{
+ dout(10) << "read_superblock" << endl;
+
+ bufferlist bl;
+ store->read(SUPERBLOCK_OBJECT, 0, sizeof(superblock), bl);
+
+ assert(whoami == superblock.whoami); // fixme!
+}
+
// object locks
void OSD::_remove_pg(pg_t pgid)
{
+ dout(10) << "_remove_pg " << hex << pgid << dec << endl;
+
// remove from store
list<object_t> olist;
store->collection_list(pgid, olist);
}
} else {
// no map? starting up?
- dout(7) << "no OSDMap, asking MDS" << endl;
+ dout(7) << "no OSDMap, asking monitor" << endl;
if (waiting_for_osdmap.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
- MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ MSG_ADDR_MON(0));
waiting_for_osdmap.push_back(m);
break;
}
case OSD_OP_REP_WRITE:
case OSD_OP_REP_TRUNCATE:
case OSD_OP_REP_DELETE:
- handle_rep_op_ack(m->get_tid(), m->get_result(), m->get_commit(), MSG_ADDR_NUM(m->get_source()));
- delete m;
+ {
+ const pg_t pgid = m->get_pg();
+ if (pg_map.count(pgid)) {
+ PG *pg = _lock_pg(pgid);
+ assert(pg);
+ handle_rep_op_ack(pg, m->get_tid(), m->get_result(), m->get_commit(), MSG_ADDR_NUM(m->get_source()),
+ m->get_pg_complete_thru());
+ _unlock_pg(pgid);
+ } else {
+ // pg dne! whatev.
+ }
+ delete m;
+ }
break;
default:
}
/*
- *
- * NOTE: called hold osd_lock, opqueue active.
+ * NOTE: called holding pg lock /////osd_lock, opqueue active.
*/
-void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd)
+void OSD::handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit, int fromosd, version_t pg_complete_thru)
{
- if (!replica_ops.count(tid)) {
- dout(7) << "not waiting for tid " << tid << " replica op reply, map must have changed, dropping." << endl;
+ if (!pg->replica_ops.count(tid)) {
+ dout(7) << "not waiting for repop reply tid " << tid << " in " << *pg
+ << ", map must have changed, dropping." << endl;
return;
}
- OSDReplicaOp *repop = replica_ops[tid];
+ OSDReplicaOp *repop = pg->replica_ops[tid];
MOSDOp *op = repop->op;
- pg_t pgid = op->get_pg();
- dout(7) << "handle_rep_op_ack " << tid << " op " << op << " result " << result << " commit " << commit << " from osd" << fromosd << endl;
+ dout(7) << "handle_rep_op_ack " << tid << " op " << op
+ << " result " << result << " commit " << commit << " from osd" << fromosd
+ << " in " << *pg
+ << endl;
/*
* for now, we take a lazy approach to handling replica set changes
*
* one optimization: if the rep_write is received by the new primary, they can
* (at their discretion) apply it and remove the object from their missing list...
+ * or: if a replica sees tha the old primary is not down, it might assume that its
+ * state will be recovered (ie the new version) and apply the write.
*/
if (1) { //if (result >= 0) {
// success
assert(repop->waitfor_commit.count(tid));
repop->waitfor_commit.erase(tid);
repop->waitfor_ack.erase(tid);
- replica_ops.erase(tid);
+
+ repop->pg_complete_thru[fromosd] = pg_complete_thru;
- replica_pg_osd_tids[pgid][fromosd].erase(tid);
- if (replica_pg_osd_tids[pgid][fromosd].empty()) replica_pg_osd_tids[pgid].erase(fromosd);
- if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
+ pg->replica_ops.erase(tid);
+ pg->replica_tids_by_osd[fromosd].erase(tid);
+ if (pg->replica_tids_by_osd[fromosd].empty()) pg->replica_tids_by_osd.erase(fromosd);
} else {
// ack
repop->waitfor_ack.erase(tid);
void OSD::wait_for_new_map(Message *m)
{
- // ask MDS
+ // ask
if (waiting_for_osdmap.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
- MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ MSG_ADDR_MON(0));
waiting_for_osdmap.push_back(m);
}
// store it.
dout(7) << "update_map got osd map epoch " << new_epoch << endl;
osdmaps[new_epoch] = newmap;
- store->write(get_osdmap_object_name(new_epoch), state.length(), 0, state);
+ store->write(get_osdmap_object_name(new_epoch), state.length(), 0, state, false);
// twiddle superblock map bounds
if (new_epoch > superblock.newest_map)
take_waiters(waiting_for_osdmap);
} else {
- // cruddy. request missing map(s?) from MDS
+ // cruddy. request missing map(s?)
dout(7) << "update_map missing map " << current_map+1 << endl;
assert(0);
}
pg->set_role(role);
pg->last_epoch_started_any =
pg->info.last_epoch_started =
- pg->info.same_primary_since = osdmap->get_epoch();
+ pg->info.same_primary_since =
+ pg->info.same_role_since = osdmap->get_epoch();
pg->state_set(PG::STATE_ACTIVE);
dout(7) << "created " << *pg << endl;
pg->set_role(role);
pg->last_epoch_started_any =
pg->info.last_epoch_started =
- pg->info.same_primary_since = osdmap->get_epoch();
+ pg->info.same_primary_since =
+ pg->info.same_role_since = osdmap->get_epoch();
pg->state_set(PG::STATE_ACTIVE);
dout(7) << "created " << *pg << endl;
// forget about where missing items are, or anything we're pulling
pg->missing.loc.clear();
pg->objects_pulling.clear();
+ pg->requested_thru = 0;
}
if (role != oldrole) {
+ pg->info.same_role_since = osdmap->get_epoch();
+
// old primary?
if (oldrole == 0) {
pg->state_clear(PG::STATE_CLEAN);
dout(10) << *pg << " peer osd" << *down << " is down" << endl;
// NAK any ops to the down osd
- if (replica_pg_osd_tids[pgid].count(*down)) {
- set<__uint64_t> s = replica_pg_osd_tids[pgid][*down];
+ if (pg->replica_tids_by_osd.count(*down)) {
+ set<__uint64_t> s = pg->replica_tids_by_osd[*down];
dout(10) << " " << *pg << " naking replica ops to down osd" << *down << " " << s << endl;
for (set<__uint64_t>::iterator tid = s.begin();
tid != s.end();
tid++)
- handle_rep_op_ack(*tid, -1, false, *down);
+ handle_rep_op_ack(pg, *tid, -1, false, *down);
}
}
}
dout(3) << "handle_osd_map ignoring osd map epoch " << m->get_epoch() << " <= " << osdmap->get_epoch() << endl;
}
+ /*
if (osdmap->is_mkfs()) {
// ack
messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK),
- m->get_source());
+ MSG_ADDR_MDS(0));
}
+ */
delete m;
}
*/
bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
{
- int from = MSG_ADDR_NUM(m->get_source());
-
// newer map?
if (epoch > osdmap->get_epoch()) {
dout(7) << " from newer map epoch " << epoch << " > " << osdmap->get_epoch() << endl;
// down osd?
if (m->get_source().is_osd() &&
osdmap->is_down(m->get_source().num())) {
- if (m->get_map_epoch() > osdmap->get_epoch()) {
+ if (epoch > osdmap->get_epoch()) {
dout(7) << "msg from down " << m->get_source()
<< ", waiting for new map" << endl;
wait_for_new_map(m);
// open, stat collection
PG *pg = new PG(this, pgid);
- //pg->fetch(store);
pg_map[pgid] = pg;
+ // read pg info
+ store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
+
+ // read pg log
+ pg->read_log(store);
+
return pg;
}
pg->acting = acting;
pg->info.same_primary_since = it->same_primary_since;
pg->set_role(0);
+ pg->info.same_role_since = osdmap->get_epoch();
pg->last_epoch_started_any = it->last_epoch_started;
pg->build_prior();
delete m;
}
+
+
/** PGLog
* from non-primary to primary
* includes log and info
// i am PRIMARY
assert(pg->peer_log_requested.count(from));
+ // note peer's missing.
pg->peer_missing[from] = m->missing;
// merge into our own log
- dout(10) << " merging " << m->log << endl;
- dout(10) << " before " << pg->log << " " << pg->missing << endl;
- assert(m->log.top >= pg->log.bottom);
- assert(m->log.bottom <= pg->log.top || pg->log.empty());
- pg->log.merge(m->log, pg->missing);
+ pg->merge_log(m->log);
// and did i find anything?
for (map<object_t, version_t>::iterator p = pg->missing.missing.begin();
p != pg->missing.missing.end();
p++) {
- if ((p->second <= m->log.bottom || // were complete through stamp?
- (m->log.updated.count(p->first) && m->log.updated[p->first] == p->second)) && // or logged it and aren't missing it?
- m->missing.missing.count(p->first) == 0) {
- pg->missing.loc[p->first] = from; // .. then they have what we want!
+ const object_t oid = p->first;
+ const version_t v = p->second;
+ if (v <= m->info.last_complete || // peer is complete through stamp?
+ (m->log.updated.count(oid) &&
+ m->log.updated[oid] == v &&
+ m->missing.missing.count(oid) == 0)) { // or logged it and aren't missing it?
+ pg->missing.loc[oid] = from; // ...then they have it!
}
}
-
- dout(10) << " after " << pg->log << " " << pg->missing << endl;
+ dout(10) << " missing now " << pg->missing << endl;
pg->clean_up_local();
-
- pg->info.last_update = pg->log.top;
- pg->info.log_floor = pg->log.bottom;
// peer
map< int, map<pg_t,version_t> > query_map;
} else {
// i am REPLICA
- dout(10) << *pg << " log was " << pg->log << " " << pg->missing << endl;
dout(10) << *pg << " got " << m->log << " " << m->missing << endl;
- // merge log+missing
- pg->log.merge(m->log, pg->missing);
- pg->missing.merge_loc(m->missing);
- dout(10) << *pg << " log now " << pg->log << " " << pg->missing
- << " " << pg->missing.missing << " " << pg->missing.loc
- << endl;
+ // merge log
+ pg->merge_log(m->log);
+
+ // locate missing items
+ if (pg->missing.num_lost() > 0) {
+ // see if we can find anything new!
+ for (map<object_t,version_t>::iterator p = pg->missing.missing.begin();
+ p != pg->missing.missing.end();
+ p++) {
+ const object_t oid = p->first;
+ if (m->missing.loc.count(oid)) {
+ assert(m->missing.missing[oid] >= pg->missing.missing[oid]);
+ pg->missing.loc[oid] = m->missing.loc[oid];
+ } else {
+ pg->missing.loc[oid] = from;
+ }
+ }
+ }
+
+ dout(10) << *pg << " missing now " << pg->missing << endl;
assert(pg->missing.num_lost() == 0);
// clean up any stray objects
pg->clean_up_local();
// ok active!
- pg->info.last_update = pg->log.top;
pg->info.last_epoch_started = osdmap->get_epoch();
pg->info.same_primary_since = m->info.same_primary_since;
pg->state_set(PG::STATE_ACTIVE);
- pg->clear_primary_recovery_state();
// take any waiters
take_waiters(pg->waiting_for_active);
// initiate any recovery?
- if (!pg->log.empty())
- pg->do_recovery();
+ pg->start_recovery();
}
_unlock_pg(pgid);
*/
void OSD::handle_pg_summary(MOSDPGSummary *m)
{
+/*
int from = MSG_ADDR_NUM(m->get_source());
pg_t pgid = m->get_pgid();
<< ", my log bottom " << pg->log.bottom
<< endl;
- // merge into my log!
+ // merge into my (recovery) log!
assert(last_complete >= pg->log.bottom); // FIXME?
assert(pg->log.top > last_complete);
} else {
// REPLICA
- dout(10) << *pg << " got summary from primary osd" << from
+ dout(10) << *pg << " got current summary from primary osd" << from
<< endl;
assert(from == pg->acting[0]);
- assert(0); // hmm this is sorta messed.. our log won't be correct wrt deletes if we share it..
- /*
- // build log
- pg->log.from_summary(m->summary);
-
- // deleted items?
+ // fetch local object set
+ PGSummary local;
+ generate_summary(local);
+ // merge current summary into recovery log, missing map.
+ for (map<object_t,version_t>::const_iterator p = m->summary.objects.begin();
+ p != m->summary.objects.end();
+ p++) {
+ assert(pg->log.deleted.count(p->first) == 0); /// this summary should be up-to-date.
+ if (pg->log.updated.count(p->first)) {
+ assert(pg->log.updated[p->first] <= p->second);
+ if (pg->log.updated[p->first] == p->second) continue; // already logged.
+ pg->log.rupdated.erase(pg->log.updated[p->first]); // hose older update
+ }
+ dout(10) << " from summary " << hex << p->first << dec
+ << " v " << p->second << endl;
+ assert(log.bottom == 0 || p->second < log.bottom);
+ updated[p->first] = p->second;
+ rupdated[p->second] = p->first;
+
+ // missing?
+ // FIXME: should we hose old version at this point?
+ if (local.objects.count(p->first) == 0 ||
+ local.objects[p->first] < p->second) {
+ dout(10) << " missing " << hex << p->first << dec
+ << " v " << p->second << endl;
+ pg->missing.missing[p->first] = p->second;
+ pg->missing.rmissing[p->second] = p->first;
+ if (m->missing.loc.count(p->first)) // did primary tell us where to get it?
+ pg->missing.loc[p->first] = m->missing.loc[p->first];
+ else
+ pg->missing.loc[p->first] = from; // ok, then primary has it.
+ }
+ }
+ // at this point, pg->log.updated should reflect the correct object set.
+ // fix up log top/bottom, info
+ assert(pg->log.top <= m->info.last_updated);
+ pg->log.top = m->info.last_updated;
+ if (pg->log.bottom == 0)
+ pg->log.bottom = pg->log.top; // bottom == top; we can't share this log.
- pg->clean_up_local();
-
+ // hose extra objects
+ for (map<object_t,version_t>::const_iterator p = local.objects.begin();
+ p != local.objects.end();
+ p++) {
+ if (pg->log.updated.count(p->first) == 0) {
+ dout(10) << " removing stray " << hex << oid << dec
+ << " v " << ov << " < " << last_complete << endl;
+ osd->store->remove(oid);
+ }
+ }
+ // ok go!
pg->info.last_epoch_started = osdmap->get_epoch();
pg->info.same_primary_since = m->info.same_primary_since;
pg->state_set(PG::STATE_ACTIVE);
- */
-
- /*
- // copy summary. FIXME.
- if (pg->content_summary == 0)
- pg->content_summary = new PG::PGContentSummary();
- *pg->content_summary = *sum;
- // i'm now active!
- pg->state_set(PG::STATE_ACTIVE);
-
// take any waiters
take_waiters(pg->waiting_for_active);
- // initiate any recovery?
- pg->plan_recovery();
- */
+ // kickstart recovery.
+ pg->start_recovery();
}
_unlock_pg(pgid);
delete m;
+*/
}
-
/** PGQuery
* from primary to replica | other
* NOTE: called with opqueue active.
pg->set_role(role);
pg->info.same_primary_since = calc_pg_primary_since(acting[0], pgid, m->get_epoch());
+ pg->info.same_role_since = osdmap->get_epoch();
dout(10) << *pg << " dne (before), but i am role " << role << endl;
}
// info
dout(10) << *pg << " sending info" << endl;
notify_list[from].push_back(pg->info);
- } else if (it->second == PG_QUERY_SUMMARY) {
- // summary
- dout(10) << *pg << " sending content summary" << endl;
- PG::PGSummary summary;
- pg->generate_summary(summary);
- MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), summary);
- messenger->send_message(m, MSG_ADDR_OSD(from));
} else {
- // log + info
- dout(10) << *pg << " sending info+log since " << it->second << endl;
MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->get_pgid());
m->info = pg->info;
- m->log.copy_after(pg->log, it->second);
+
+ if (it->second == PG_QUERY_SUMMARY) {
+ dout(10) << *pg << " sending info+summary/backlog" << endl;
+ m->log = pg->log;
+ if (!m->log.backlog) pg->generate_backlog(m->log);
+ } else {
+ dout(10) << *pg << " sending info+log since " << it->second << endl;
+ m->log.copy_after(pg->log, it->second);
+ }
m->missing = pg->missing;
+
messenger->send_message(m, MSG_ADDR_OSD(from));
}
{
long got = 0;
- // get object size
- struct stat st;
- int r = store->stat(op->get_oid(), &st);
- assert(r == 0);
-
- // check version
- version_t v = 0;
- store->getattr(op->get_oid(), "version", &v, sizeof(v));
- assert(v >= op->get_version());
+ const object_t oid = op->get_oid();
+
+ struct stat st, st2;
+ version_t v;
+ bufferlist bl;
+
+ dout(7) << "rep_pull on " << hex << oid << dec << " v >= " << op->get_version() << endl;
+
+ while (1) {
+ // read size
+ int r = store->stat(oid, &st);
+ if (r < 0) {
+ // reply with -EEXIST
+ dout(7) << "rep_pull don't have " << hex << oid << dec << endl;
+ MOSDOpReply *reply = new MOSDOpReply(op, -EEXIST, osdmap, true);
+ messenger->send_message(reply, op->get_asker());
+ delete op;
+ return;
+ }
+
+ // read object+version
+ ObjectStore::Transaction t;
+ t.read(oid, 0, st.st_size, &bl);
+ int len = sizeof(v);
+ t.getattr(oid, "version", &v, &len);
+ t.stat(oid, &st2);
+ unsigned tr = store->apply_transaction(t);
+
+ // verify size matches
+ if (tr == 0 && st2.st_size == bl.length()) break; // excellent.
+
+ dout(7) << "rep_pull read " << bl.length() << " of " << st.st_size << "/" << st2.st_size << " byte object, retrying" << endl;
+ bl.clear();
+ }
- dout(7) << "rep_pull on "
+ dout(7) << "rep_pull has "
<< hex << op->get_oid() << dec
<< " v " << v << " >= " << op->get_version()
+ << " size " << bl.length()
<< " in " << *pg
<< endl;
+ assert(v >= op->get_version());
- // read
- bufferlist bl;
- got = store->read(op->get_oid(),
- st.st_size, 0,
- bl);
- assert(got == st.st_size);
-
// reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
reply->set_result(0);
// write object and add it to the PG
ObjectStore::Transaction t;
+ t.remove(oid); // in case old version exists
t.write(oid, 0, op->get_length(), op->get_data());
t.setattr(oid, "version", &v, sizeof(v));
t.collection_add(pgid, oid);
pg->objects_pulling.erase(oid);
pg->missing.got(oid, v);
+ // kick waiters
+ if (pg->waiting_for_missing_object.count(oid))
+ take_waiters(pg->waiting_for_missing_object[oid]);
+
// raise last_complete?
map<version_t, object_t>::iterator p;
for (p = pg->log.rupdated.lower_bound(pg->info.last_complete);
// continue
pg->do_recovery();
}
-
- // finish waiters
- if (pg->waiting_for_missing_object.count(oid))
- take_waiters(pg->waiting_for_missing_object[oid]);
-
+
_unlock_pg(pgid);
delete op;
OSD *osd;
MOSDOp *op;
+ version_t pg_last_complete;
+
Mutex lock;
Cond cond;
- bool ack;
+ bool acked;
bool waiting;
- C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo) : osd(o), op(oo),
- ack(false), waiting(false) { }
+ C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, version_t lc) : osd(o), op(oo), pg_last_complete(lc),
+ acked(false), waiting(false) { }
void finish(int r) {
lock.Lock();
- while (!ack) {
+ while (!acked) {
waiting = true;
cond.Wait(lock);
}
- assert(ack);
+ assert(acked);
lock.Unlock();
- osd->op_rep_modify_commit(op);
+ osd->op_rep_modify_commit(op, pg_last_complete);
}
void ack() {
lock.Lock();
- ack = true;
+ acked = true;
if (waiting) cond.Signal();
lock.Unlock();
}
};
-void OSD::op_rep_modify_commit(MOSDOp *op)
+void OSD::op_rep_modify_commit(MOSDOp *op, version_t last_complete)
{
- // hack: hack_blah is true until 'ack' has been sent.
+ // send commit.
dout(10) << "rep_modify_commit on op " << *op << endl;
MOSDOpReply *commit = new MOSDOpReply(op, 0, osdmap, true);
+ commit->set_pg_complete_thru(last_complete);
messenger->send_message(commit, op->get_asker());
delete op;
}
// process a modification operation
+class C_OSD_WriteCommit : public Context {
+public:
+ OSD *osd;
+ OSDReplicaOp *repop;
+ version_t pg_last_complete;
+ C_OSD_WriteCommit(OSD *o, OSDReplicaOp *op, version_t lc) : osd(o), repop(op), pg_last_complete(lc) {}
+ void finish(int r) {
+ osd->op_modify_commit(repop, pg_last_complete);
+ }
+};
+
+
/** op_rep_modify
* process a replicated modify.
* NOTE: called from opqueue.
object_t oid = op->get_oid();
version_t nv = op->get_version();
- Context *oncommit = new C_OSD_RepModifyCommit(this, op);
+ ObjectStore::Transaction t;
+
+ // update PG log
+ if (pg->info.last_update < nv)
+ prepare_log_transaction(t, op, nv, pg);
+ // else, we are playing catch-up, don't update pg metadata! (FIXME?)
+
+ // do op?
+ C_OSD_RepModifyCommit *oncommit = 0;
// check current version
version_t myv = 0;
- if (store->exists(oid))
- store->getattr(oid, "version", &myv, sizeof(myv));
-
- if (myv > op->get_old_version()) {
- // i already have newer than myv
- assert(myv <= nv);
- dout(10) << "rep_modify on " << hex << oid << dec
- << " ov " << op->get_old_version() << " < " << myv
- << " <= nv " << nv
- << ", noop"
+ store->getattr(oid, "version", &myv, sizeof(myv)); // this is a noop if oid dne
+ dout(10) << "op_rep_modify existing " << hex << oid << dec << " v " << myv << endl;
+
+ // is this an old update?
+ if (nv <= myv) {
+ // we have a newer version. pretend we do a regular commit!
+ dout(10) << "op_rep_modify on " << hex << oid << dec
+ << " v " << nv << " <= myv, noop"
+ << " in " << *pg
<< endl;
+ oncommit = new C_OSD_RepModifyCommit(this, op,
+ pg->info.last_complete);
+ }
- // do a null op, with a commit waiter.
- // this is overkill if we pulled this object earlier, but chances are it was very recent
- // if we're receiving this replicated op.
- ObjectStore::Transaction t;
- store->apply_transaction(t, oncommit);
- }
- else if (myv < op->get_old_version()) {
- dout(0) << "rep_modify " << hex << oid << dec
- << " ov " << op->get_old_version() << " > my " << myv
- << ", waiting"
- << endl;
- // this should never happen.. we should catch missing objects before they get into the opqueue!
- assert(0);
+ // missing?
+ else if (pg->missing.missing.count(oid)) {
+ // old or missing. wait!
+ dout(10) << "op_rep_modify on " << hex << oid << dec
+ << " v " << nv << " > myv, wait"
+ << " in " << *pg
+ << endl;
+ if (pg->missing.missing[oid] > op->get_version())
+ pg->missing.add(oid, op->get_version()); // now we're missing the _newer_ version.
+ waitfor_missing_object(op, pg);
}
else {
- dout(10) << "rep_modify on " << hex << oid << dec
+ // we're good.
+ dout(10) << "op_rep_modify on " << hex << oid << dec
<< " v " << nv << " (from " << myv << ")"
<< " in " << *pg
<< endl;
assert(op->get_old_version() == myv);
- op_apply(op, op->get_version(), pg, oncommit);
-
- if (op->get_op() == OSD_OP_REP_WRITE) {
- logger->inc("r_wr");
- logger->inc("r_wrb", op->get_length());
- }
+ prepare_op_transaction(t, op, nv, pg);
+ oncommit = new C_OSD_RepModifyCommit(this, op,
+ pg->info.last_complete);
+ }
+
+ // go
+ unsigned tr = store->apply_transaction(t, oncommit);
+ if (tr != 0 && // no errors
+ tr != 2) { // or error on collection_add
+ cerr << "error applying transaction: r = " << tr << endl;
+ assert(tr == 0);
+ }
+
+ // ack?
+ if (oncommit) {
+ // ack
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+ messenger->send_message(ack, op->get_asker());
+ oncommit->ack();
}
-
- // ack
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
- messenger->send_message(ack, op->get_asker());
- oncommit->ack();
}
void OSD::handle_op(MOSDOp *op)
{
- const object_t oid = op->get_oid();
const pg_t pgid = op->get_pg();
int acting_primary = osdmap->get_pg_acting_primary( pgid );
PG *pg = get_pg(pgid);
// check osd map. same primary?
if (op->get_map_epoch() != osdmap->get_epoch()) {
// make sure source is still primary
- int myrole = osdmap->get_pg_acting_role(op->get_pg(), whoami);
+ const int myrole = pg->get_role(); //osdmap->get_pg_acting_role(op->get_pg(), whoami);
if (acting_primary != MSG_ADDR_NUM(op->get_source()) ||
myrole <= 0 ||
dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
}
- // are we missing the object?
- if (pg->missing.missing.count(oid)) {
- // NO. we don't have it (yet).
- version_t v = pg->missing.missing[oid];
- if (pg->objects_pulling.count(oid)) {
- dout(7) << "already pulling "
- << hex << oid << dec
- << " v " << v
- << " in " << *pg
- << endl;
- } else {
- dout(7) << "immediately pulling "
- << hex << oid << dec
- << " v " << v
- << " in " << *pg
- << endl;
- pull(pg, oid, v);
- }
- pg->waiting_for_missing_object[oid].push_back(op);
- return;
- }
-
if (g_conf.osd_maxthreads < 1) {
do_op(op, pg); // do it now
} else {
// regular op
switch (op->get_op()) {
case OSD_OP_READ:
- op_read(op);
+ op_read(op, pg);
break;
case OSD_OP_STAT:
- op_stat(op);
+ op_stat(op, pg);
break;
case OSD_OP_WRITE:
case OSD_OP_ZERO:
// ===============================
// OPS
+bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
+{
+ const object_t oid = op->get_oid();
+
+ // are we missing the object?
+ if (pg->missing.missing.count(oid)) {
+ // we don't have it (yet).
+ version_t v = pg->missing.missing[oid];
+ if (pg->objects_pulling.count(oid)) {
+ dout(7) << "missing "
+ << hex << oid << dec
+ << " v " << v
+ << " in " << *pg
+ << ", already pulling"
+ << endl;
+ } else {
+ dout(7) << "missing "
+ << hex << oid << dec
+ << " v " << v
+ << " in " << *pg
+ << ", pulling"
+ << endl;
+ pull(pg, oid, v);
+ }
+ pg->waiting_for_missing_object[oid].push_back(op);
+ return true;
+ }
+
+ return false;
+}
+
+
// READ OPS
/** op_read
* client read op
* NOTE: called from opqueue.
*/
-void OSD::op_read(MOSDOp *op)
+void OSD::op_read(MOSDOp *op, PG *pg)
{
const object_t oid = op->get_oid();
- //if the target object is locked for writing by another client, put 'op' to the waiting queue
- if (block_if_wrlocked(op)) {
- return; //read will be handled later, after the object becomes unlocked
- }
+ if (waitfor_missing_object(op, pg)) return;
+
+ // if the target object is locked for writing by another client, put 'op' to the waiting queue
+ // for _any_ op type -- eg only the locker can unlock!
+ if (block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks
+ dout(10) << "op_read " << hex << oid << dec
+ << " " << op->get_offset() << "~" << op->get_length()
+ << " in " << *pg
+ << endl;
+
// read into a buffer
bufferlist bl;
long got = store->read(oid,
- op->get_length(), op->get_offset(),
+ op->get_offset(), op->get_length(),
bl);
// set up reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
reply->set_length(0);
}
- dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+ dout(12) << " read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
logger->inc("rd");
if (got >= 0) logger->inc("rdb", got);
* client stat
* NOTE: called from opqueue
*/
-void OSD::op_stat(MOSDOp *op)
+void OSD::op_stat(MOSDOp *op, PG *pg)
{
object_t oid = op->get_oid();
+ if (waitfor_missing_object(op, pg)) return;
+
+ // if the target object is locked for writing by another client, put 'op' to the waiting queue
+ if (block_if_wrlocked(op)) return; //read will be handled later, after the object unlocks
+
struct stat st;
memset(&st, sizeof(st), 0);
int r = store->stat(oid, &st);
wr->set_version(repop->new_version);
wr->set_old_version(repop->old_version);
wr->set_pg_role(1); // replica
+ wr->set_pg_trim_to(pg->peers_complete_thru);
messenger->send_message(wr, MSG_ADDR_OSD(osd));
repop->osds.insert(osd);
repop->waitfor_ack[tid] = osd;
repop->waitfor_commit[tid] = osd;
-
- replica_ops[tid] = repop;
- replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
+
+ //replica_ops[tid] = repop;
+ //replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
+ pg->replica_ops[tid] = repop;
+ pg->replica_tids_by_osd[osd].insert(tid);
}
// done.
if (repop->can_delete()) {
+ // adjust peers_complete_thru
+ if (!repop->pg_complete_thru.empty()) {
+ map<int,version_t>::iterator p = repop->pg_complete_thru.begin();
+ version_t min = p->second;
+ p++;
+ while (p != repop->pg_complete_thru.end()) {
+ if (p->second < min) min = p->second;
+ p++;
+ }
+
+ pg_t pgid = repop->op->get_pg();
+ osd_lock.Lock();
+ if (pg_map.count(pgid)) {
+ PG *pg = pg_map[pgid]; //_lock_pg(pgid);
+ if (min > pg->peers_complete_thru) {
+ //dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+ pg->peers_complete_thru = min;
+ }
+ //_unlock_pg(pgid);
+ }
+ osd_lock.Unlock();
+ }
+
dout(10) << "put_repop deleting " << *repop << endl;
repop->lock.Unlock();
delete repop->op;
}
}
-class C_OSD_WriteCommit : public Context {
-public:
- OSD *osd;
- OSD::OSDReplicaOp *repop;
- C_OSD_WriteCommit(OSD *o, OSD::OSDReplicaOp *op) : osd(o), repop(op) {}
- void finish(int r) {
- osd->op_modify_commit(repop);
- }
-};
-void OSD::op_modify_commit(OSDReplicaOp *repop)
+void OSD::op_modify_commit(OSDReplicaOp *repop, version_t pg_complete_thru)
{
dout(10) << "op_modify_commit on op " << *repop->op << endl;
get_repop(repop);
{
assert(repop->waitfor_commit.count(0));
repop->waitfor_commit.erase(0);
+ repop->pg_complete_thru[whoami] = pg_complete_thru;
}
put_repop(repop);
}
/** op_modify
- * process lcient modify op
+ * process client modify op
* NOTE: called from opqueue.
*/
void OSD::op_modify(MOSDOp *op, PG *pg)
{
object_t oid = op->get_oid();
+ if (waitfor_missing_object(op, pg)) return;
+
+ // if the target object is locked for writing by another client, put 'op' to the waiting queue
+ // for _any_ op type -- eg only the locker can unlock!
+ if (block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks
+
char *opname = 0;
switch (op->get_op()) {
- case OSD_OP_WRITE: opname = "op_write"; break;
- case OSD_OP_ZERO: opname = "op_zero"; break;
- case OSD_OP_DELETE: opname = "op_delete"; break;
- case OSD_OP_TRUNCATE: opname = "op_truncate"; break;
- case OSD_OP_WRLOCK: opname = "op_wrlock"; break;
- case OSD_OP_WRUNLOCK: opname = "op_wrunlock"; break;
+ case OSD_OP_WRITE: opname = "write"; break;
+ case OSD_OP_ZERO: opname = "zero"; break;
+ case OSD_OP_DELETE: opname = "delete"; break;
+ case OSD_OP_TRUNCATE: opname = "truncate"; break;
+ case OSD_OP_WRLOCK: opname = "wrlock"; break;
+ case OSD_OP_WRUNLOCK: opname = "wrunlock"; break;
default: assert(0);
}
store->getattr(oid, "version", &ov, sizeof(ov));
version_t nv = messenger->get_lamport();
- dout(12) << " " << opname
+ dout(10) << "op_modify " << opname
<< " " << hex << oid << dec
<< " v " << nv
+ << " ov " << ov
<< " off " << op->get_offset() << " len " << op->get_length()
<< endl;
repop->waitfor_ack[0] = whoami; // will need local ack, commit
repop->waitfor_commit[0] = whoami;
- osd_lock.Lock();
repop->lock.Lock();
{
for (unsigned i=1; i<pg->acting.size(); i++) {
}
}
repop->lock.Unlock();
- osd_lock.Unlock();
// pre-ack
//MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
//messenger->send_message(reply, op->get_asker());
- // do it locally
- Context *oncommit = new C_OSD_WriteCommit(this, repop);
- op_apply(op, nv, pg, oncommit);
+ // prepare
+ ObjectStore::Transaction t;
+ prepare_log_transaction(t, op, nv, pg);
+ prepare_op_transaction(t, op, nv, pg);
+
+ // go
+ Context *oncommit = new C_OSD_WriteCommit(this, repop, pg->info.last_complete);
+ unsigned r = store->apply_transaction(t, oncommit);
+ if (r != 0 && // no errors
+ r != 2) { // or error on collection_add
+ cerr << "error applying transaction: r = " << r << endl;
+ assert(r == 0);
+ }
// local ack
get_repop(repop);
-/** op_apply
+void OSD::prepare_log_transaction(ObjectStore::Transaction& t,
+ MOSDOp *op, version_t& version, PG *pg)
+{
+ const object_t oid = op->get_oid();
+ const pg_t pgid = op->get_pg();
+
+ PG::PGLog::Entry logentry(op->get_oid(), version);
+
+ // update pg log
+ assert(version > pg->log.top);
+ assert(pg->info.last_update == pg->log.top);
+ if (op->get_op() == OSD_OP_DELETE ||
+ op->get_op() == OSD_OP_REP_DELETE) {
+ dout(10) << "prepare_log_transaction " << op->get_op()
+ << " - " << hex << oid << dec
+ << " v " << version
+ << " in " << *pg << endl;
+ pg->log.add_delete(oid, version);
+ logentry.deleted = true;
+ } else {
+ dout(10) << "prepare_log_transaction " << op->get_op()
+ << " + " << hex << oid << dec
+ << " v " << version
+ << " in " << *pg << endl;
+ pg->log.add_update(oid, version);
+ }
+ assert(pg->log.top == version);
+ if (pg->info.last_complete == pg->log.top)
+ pg->info.last_complete = version;
+ pg->info.last_update = version;
+
+ // write to pg log
+ pg->append_log(t, logentry, op->get_pg_trim_to());
+
+ // write pg info
+ t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+}
+
+
+/** prepare_op_transaction
* apply an op to the store wrapped in a transaction.
*/
-void OSD::op_apply(MOSDOp *op, version_t version, PG *pg, Context* oncommit)
+void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
+ MOSDOp *op, version_t& version, PG *pg)
{
- object_t oid = op->get_oid();
- pg_t pgid = op->get_pg();
+ const object_t oid = op->get_oid();
+ const pg_t pgid = op->get_pg();
- dout(10) << "op_apply " << op->get_op()
+ dout(10) << "prepare_op_transaction " << op->get_op()
<< " " << hex << oid << dec
<< " v " << version
<< " in " << *pg << endl;
- // if the target object is locked for writing by another client, put 'op' to the waiting queue
- // for _any_ op type -- eg only the locker can unlock!
- if (block_if_wrlocked(op))
- return; // op will be handled later, after the object becomes unlocked
-
- // prepare the transaction
- ObjectStore::Transaction t;
-
- PG::PGLog::Entry logentry(op->get_oid(), version);
-
// the op
switch (op->get_op()) {
case OSD_OP_WRLOCK:
t.setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t));
}
break;
-
+
case OSD_OP_WRUNLOCK:
case OSD_OP_REP_WRUNLOCK:
{ // unlock objects
}
}
break;
-
+
case OSD_OP_WRITE:
case OSD_OP_REP_WRITE:
{ // write
t.write( oid, op->get_offset(), op->get_length(), bl );
}
break;
-
+
case OSD_OP_TRUNCATE:
case OSD_OP_REP_TRUNCATE:
{ // truncate
{ // delete
//r = store->remove(oid);
t.remove(oid);
- logentry.deleted = true;
}
break;
default:
assert(0);
}
-
-
- // update pg log
- if (op->get_op() == OSD_OP_DELETE ||
- op->get_op() == OSD_OP_REP_DELETE) {
- pg->log.add_delete(oid, version);
- } else {
- pg->log.add_update(oid, version);
- }
- if (pg->info.last_complete == pg->info.last_update)
- pg->info.last_complete = version;
- pg->info.last_update = version;
-
- // write log to disk
- bufferlist bl;
- bl.append( (char*)&logentry, sizeof(logentry) );
- t.write( pgid, pg->ondisklog.get_write_pos(), bl.length(), bl );
- pg->ondisklog.inc_write_pos( bl.length() );
-
// object collection, version
if (op->get_op() == OSD_OP_DELETE ||
} else {
// add object to c
t.collection_add(pgid, oid);
-
+
// object version
t.setattr(oid, "version", &version, sizeof(version));
}
-
- // inc pg version
- t.collection_setattr(pgid, "version", &version, sizeof(version));
-
- // ok, go!
- unsigned r = store->apply_transaction(t, oncommit);
- if (r == 0 && // no errors
- r == 2) { // or error on collection_add
- cerr << "error applying transaction: r = " << r << endl;
- assert(r == 0);
- }
}
*
*/
-class OSD : public Dispatcher {
-public:
-
- /** superblock
- */
- const static object_t SUPERBLOCK_OBJECT = 0;
-
- class Superblock {
- public:
- __uint64_t fsid; // unique fs id (random number)
- int whoami; // my role in this fs.
- epoch_t current_epoch; // most recent epoch
- epoch_t oldest_map, newest_map; // oldest/newest maps we have.
- Superblock(__uint64_t f=0, int w=0) : fsid(f), whoami(w),
- current_epoch(0), oldest_map(0), newest_map(0) {}
- } superblock;
-
- object_t get_osdmap_object_name(epoch_t epoch) { return (object_t)epoch; }
-
- void write_superblock();
-
-
/** OSDReplicaOp
* state associated with an in-progress replicated update.
*/
utime_t start;
- bool cancel;
+ //bool cancel;
bool sent_ack, sent_commit;
set<int> osds;
version_t new_version, old_version;
+
+ map<int,version_t> pg_complete_thru;
- OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) :
+ OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) :
op(o),
//local_ack(false), local_commit(false),
- cancel(false),
+ //cancel(false),
sent_ack(false), sent_commit(false),
new_version(nv), old_version(ov)
{ }
bool can_delete() { return waitfor_ack.empty() && waitfor_commit.empty(); }
};
+
+class OSD : public Dispatcher {
+public:
+
+ /** superblock
+ */
+ const static object_t SUPERBLOCK_OBJECT = 0;
+ OSDSuperblock superblock;
+
+ object_t get_osdmap_object_name(epoch_t epoch) { return (object_t)epoch; }
+
+ void write_superblock();
+ void read_superblock();
+
+
/** OSD **/
protected:
Messenger *messenger;
void do_op(class MOSDOp *m, PG *pg); // actually do it
- int apply_write(MOSDOp *op, version_t v,
- Context *oncommit = 0);
- void op_apply(MOSDOp* op, version_t version, PG *pg, Context* oncommit = 0);
+ void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, version_t& version, PG *pg);
+ void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, version_t& version, PG *pg);
+ bool waitfor_missing_object(MOSDOp *op, PG *pg);
+
friend class PG;
hash_map<pg_t, list<Message*> > waiting_for_pg;
// replica ops
- map<__uint64_t, OSDReplicaOp*> replica_ops;
- map<pg_t, map<int, set<__uint64_t> > > replica_pg_osd_tids; // pg -> osd -> tid
-
void get_repop(OSDReplicaOp*);
void put_repop(OSDReplicaOp*); // will send ack/commit msgs, and delete as necessary.
void issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd);
- void handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd);
+ void handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit, int fromosd,
+ version_t pg_complete_thru=0);
// recovery
void do_notifies(map< int, list<PG::PGInfo> >& notify_list);
void op_rep_pull_reply(class MOSDOpReply *op);
void op_rep_modify(class MOSDOp *op, PG *pg); // write, trucnate, delete
- void op_rep_modify_commit(class MOSDOp *op);
+ void op_rep_modify_commit(class MOSDOp *op, version_t last_complete);
friend class C_OSD_RepModifyCommit;
void handle_ping(class MPing *m);
void handle_op(class MOSDOp *m);
- void op_read(class MOSDOp *m);
- void op_stat(class MOSDOp *m);
+ void op_read(class MOSDOp *m, PG *pg);
+ void op_stat(class MOSDOp *m, PG *pg);
void op_modify(class MOSDOp *m, PG *pg);
- void op_modify_commit(class OSDReplicaOp *repop);
+ void op_modify_commit(class OSDReplicaOp *repop, version_t last_complete);
// for replication
void handle_op_reply(class MOSDOpReply *m);
void force_remount();
};
-inline ostream& operator<<(ostream& out, OSD::OSDReplicaOp& repop)
+inline ostream& operator<<(ostream& out, OSDReplicaOp& repop)
{
out << "repop(wfack=" << repop.waitfor_ack << " wfcommit=" << repop.waitfor_commit;
- if (repop.cancel) out << " cancel";
+ //if (repop.cancel) out << " cancel";
out << " op=" << *(repop.op);
out << " repop=" << &repop;
+ out << " lc=" << repop.pg_complete_thru;
out << ")";
return out;
}
#define PG_TYPE_STARTOSD 2 // place primary on a specific OSD (named by the pg_bits)
-typedef __uint64_t epoch_t;
/** OSDMap
*/
class Transaction {
public:
- static const int OP_WRITE = 1; // oid, offset, len, bl
- static const int OP_TRUNCATE = 2; // oid, len
- static const int OP_REMOVE = 3; // oid
- static const int OP_SETATTR = 4; // oid, attrname, attrval
- static const int OP_RMATTR = 5; // oid, attrname
- static const int OP_MKCOLL = 6; // cid
- static const int OP_RMCOLL = 7; // cid
- static const int OP_COLL_ADD = 8; // cid, oid
- static const int OP_COLL_REMOVE = 9; // cid, oid
- static const int OP_COLL_SETATTR = 10; // cid, attrname, attrval
- static const int OP_COLL_RMATTR = 11; // cid, attrname
+ static const int OP_READ = 1; // oid, offset, len, pbl
+ static const int OP_STAT = 2; // oid, pstat
+ static const int OP_GETATTR = 3; // oid, attrname, pattrval
+ static const int OP_WRITE = 10; // oid, offset, len, bl
+ static const int OP_TRUNCATE = 11; // oid, len
+ static const int OP_REMOVE = 12; // oid
+ static const int OP_SETATTR = 13; // oid, attrname, attrval
+ static const int OP_RMATTR = 14; // oid, attrname
+ static const int OP_MKCOLL = 20; // cid
+ static const int OP_RMCOLL = 21; // cid
+ static const int OP_COLL_ADD = 22; // cid, oid
+ static const int OP_COLL_REMOVE = 23; // cid, oid
+ static const int OP_COLL_SETATTR = 24; // cid, attrname, attrval
+ static const int OP_COLL_RMATTR = 25; // cid, attrname
list<int> ops;
list<bufferlist> bls;
list<const char*> attrnames;
list< pair<const void*,int> > attrvals;
+ list<bufferlist*> pbls;
+ list<struct stat*> psts;
+ list< pair<void*,int*> > pattrvals;
+
+ void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
+ int op = OP_READ;
+ ops.push_back(op);
+ oids.push_back(oid);
+ offsets.push_back(off);
+ lengths.push_back(len);
+ pbls.push_back(pbl);
+ }
+ void stat(object_t oid, struct stat *st) {
+ int op = OP_STAT;
+ ops.push_back(op);
+ oids.push_back(oid);
+ psts.push_back(st);
+ }
+ void getattr(object_t oid, const char* name, void* val, int *plen) {
+ int op = OP_GETATTR;
+ ops.push_back(op);
+ oids.push_back(oid);
+ attrnames.push_back(name);
+ pattrvals.push_back(pair<void*,int*>(val,plen));
+ }
+
void write(object_t oid, off_t off, size_t len, bufferlist& bl) {
int op = OP_WRITE;
ops.push_back(op);
if (p == t.ops.end()) last = onsafe;
switch (*p) {
+ case Transaction::OP_READ:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ off_t offset = t.offsets.front(); t.offsets.pop_front();
+ size_t len = t.lengths.front(); t.lengths.pop_front();
+ bufferlist *pbl = t.pbls.front(); t.pbls.pop_front();
+ read(oid, offset, len, *pbl);
+ }
+ break;
+ case Transaction::OP_STAT:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ struct stat *st = t.psts.front(); t.psts.pop_front();
+ stat(oid, st);
+ }
+ break;
+ case Transaction::OP_GETATTR:
+ {
+ object_t oid = t.oids.front(); t.oids.pop_front();
+ const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+ pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
+ *pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
+ }
+ break;
+
case Transaction::OP_WRITE:
{
object_t oid = t.oids.front(); t.oids.pop_front();
}
virtual int read(object_t oid,
- size_t len, off_t offset,
+ off_t offset, size_t len,
bufferlist& bl) = 0;
virtual int write(object_t oid,
#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGSummary.h"
#include "messages/MOSDPGRemove.h"
#undef dout
/******* PGLog ********/
-
-void PG::PGLog::trim(version_t s)
+void PG::PGLog::trim(ObjectStore::Transaction& t, version_t s)
{
// trim updated
while (!updated.empty()) {
rdeleted.erase(rit);
}
- bottom = s;
+ // raise bottom?
+ if (bottom < s) {
+ bottom = s;
+ backlog = false;
+ }
}
}
-
-void PG::PGLog::merge(const PGLog &other, PGMissing &missing)
+void PG::merge_log(const PGLog &olog)
{
- if (empty()) {
+ dout(10) << "merge_log " << olog << " into " << log << endl;
+
+ if (is_empty()) {
// special (easy) case
- updated = other.updated;
- rupdated = other.rupdated;
- deleted = other.deleted;
- rdeleted = other.rdeleted;
- top = other.top;
- bottom = other.bottom;
+ assert(info.last_complete == 0);
+ assert(log.empty());
+
+ // copy the log.
+ log = olog;
+ info.last_update = log.top;
+ info.log_floor = log.bottom;
// add all to missing
- for (map<object_t,version_t>::const_iterator pu = updated.begin();
- pu != updated.end();
+ for (map<object_t,version_t>::const_iterator pu = log.updated.begin();
+ pu != log.updated.end();
pu++)
missing.add(pu->first, pu->second);
-
+
+ dout(10) << "merge_log result " << log << endl;
return;
}
// extend on bottom?
- if (other.bottom < bottom) {
- version_t newbottom = other.bottom;
+ if (olog.bottom < log.bottom &&
+ olog.top >= log.bottom) {
+ version_t newbottom = olog.bottom;
+ dout(10) << "merge_log extending bottom to " << newbottom << endl;
// updated
- for (map<version_t,object_t>::const_iterator p = other.rupdated.begin();
- p != other.rupdated.end();
+ for (map<version_t,object_t>::const_iterator p = olog.rupdated.upper_bound(olog.bottom);
+ p != olog.rupdated.end();
p++) {
- if (p->first > bottom) break;
- if (updated.count(p->second) || deleted.count(p->second)) continue;
- updated[p->second] = p->first;
- rupdated[p->first] = p->second;
+ // verify?
+ if (p->first > log.bottom) break;
+ if (log.backlog)
+ assert((log.updated.count(p->second) && log.updated[p->second] >= p->first) ||
+ (log.deleted.count(p->second) && log.deleted[p->second] > p->first));
+
+ if (log.updated.count(p->second) || log.deleted.count(p->second)) continue;
+ log.updated[p->second] = p->first;
+ log.rupdated[p->first] = p->second;
+ if (is_empty())
+ missing.add(p->second, p->first);
}
// deleted
- for (map<version_t,object_t>::const_iterator p = other.rdeleted.begin();
- p != other.rdeleted.end();
+ for (map<version_t,object_t>::const_iterator p = olog.rdeleted.begin();
+ p != olog.rdeleted.end();
p++) {
- if (p->first > bottom) break;
- if (updated.count(p->second) || deleted.count(p->second)) continue;
- deleted[p->second] = p->first;
- rdeleted[p->first] = p->second;
+ assert(p->first >= olog.bottom);
+ if (p->first > log.bottom) break;
+ if (log.updated.count(p->second) || log.deleted.count(p->second)) continue;
+ log.deleted[p->second] = p->first;
+ log.rdeleted[p->first] = p->second;
}
- bottom = newbottom;
+ info.log_floor = log.bottom = newbottom;
}
+ // backlog?
+ if (olog.backlog && !log.backlog &&
+ olog.top >= log.bottom) {
+ dout(10) << "merge_log filling in backlog" << endl;
+
+ for (map<version_t,object_t>::const_iterator p = olog.rupdated.begin();
+ p != olog.rupdated.end();
+ p++) {
+ if (p->first >= olog.bottom) break;
+ if (log.deleted.count(p->second)) {
+ assert(log.deleted[p->second] > p->first); // delete must happen later..
+ assert(log.top > olog.top); // our log must have more new stuff..
+ continue; // gets deleted later.
+ }
+ log.updated[p->second] = p->first;
+ log.rupdated[p->first] = p->second;
+ }
+ log.backlog = true;
+ }
+
// extend on top?
- if (other.top > top) {
- map<version_t,object_t>::const_iterator pu = other.rupdated.lower_bound(top);
- map<version_t,object_t>::const_iterator pd = other.rdeleted.lower_bound(top);
+ if (olog.top > log.top &&
+ olog.bottom <= log.top) {
+ dout(10) << "merge_log extending top to " << olog.top << endl;
+ map<version_t,object_t>::const_iterator pu = olog.rupdated.upper_bound(log.top);
+ map<version_t,object_t>::const_iterator pd = olog.rdeleted.upper_bound(log.top);
// both
- while (pu != other.rupdated.end() && pd != other.rdeleted.end()) {
+ while (pu != olog.rupdated.end() && pd != olog.rdeleted.end()) {
assert(pd->first != pu->first);
if (pu->first > pd->first) {
- add_update(pu->second, pu->first);
+ log.add_update(pu->second, pu->first);
missing.add(pu->second, pu->first);
pu++;
} else {
- add_delete(pd->second, pd->first);
+ log.add_delete(pd->second, pd->first);
missing.rm(pu->second, pu->first);
pd++;
}
}
// tail
- while (pu != other.rupdated.end()) {
- add_update(pu->second, pu->first);
+ while (pu != olog.rupdated.end()) {
+ log.add_update(pu->second, pu->first);
missing.add(pu->second, pu->first);
pu++;
}
- while (pd != other.rdeleted.end()) {
- add_delete(pd->second, pd->first);
+ while (pd != olog.rdeleted.end()) {
+ log.add_delete(pd->second, pd->first);
missing.rm(pu->second, pu->first);
pd++;
}
- top = other.top;
+
+ info.last_update = log.top = olog.top;
}
+
+ dout(10) << "merge_log result " << log << endl;
}
-/** assumilate_summary
- * reconstruct "complete" log based on a summary
- * ie we have recent history (as log) and a summary
- * want a log that takes us up to present.
- */
-void PG::PGLog::assimilate_summary(const PGSummary &sum, version_t last_complete)
+
+
+
+void PG::generate_backlog(PGLog& log)
{
- assert(last_complete >= bottom);
- assert(top > last_complete);
+ dout(10) << "generate_backlog to " << log << endl;
- // updated
+ assert(!log.backlog);
+ log.backlog = true;
+
+ list<object_t> olist;
+ osd->store->collection_list(info.pgid, olist);
+
+ int local = 0;
+ for (list<object_t>::iterator it = olist.begin();
+ it != olist.end();
+ it++) {
+ local++;
+ assert(log.deleted.count(*it) == 0);
+
+ if (log.updated.count(*it)) continue;
+
+ version_t v;
+ osd->store->getattr(*it,
+ "version",
+ &v, sizeof(v));
+ log.updated[*it] = v;
+ }
+
+ dout(10) << local << " local objects, "
+ << log.updated.size() << " in pg" << endl;
+}
+
+/** assumilate_summary
+ * assimilate recovery info into a log based on a summary.
+ * ie we have recent history (as log) and a summary of an older pg state.
+ *
+ * NOTE: this does NOT give us a complete log that we can share, because it omits
+ * deletion info. ie log.bottom is NOT adjusted down. but it is sufficient for us
+ * to get to the correct state.
+ *
+ */
+/*
+void PG::PGLog::assimilate_summary(const PGSummary &sum)
+{
+ dout(10) << "assimilate_summary" << endl;
+
+ // merge in the summary. do not adjust log top/bottom.
for (map<object_t,version_t>::const_iterator p = sum.objects.begin();
p != sum.objects.end();
p++) {
- if (deleted.count(p->first) &&
- deleted[p->first] > p->second) continue;
- if (updated.count(p->first)) continue;
+ if (deleted.count(p->first)) {
+ if (deleted[p->first] >= p->second) continue; // it gets deleted later.
+ rdeleted.erase(deleted[p->first]); // hose old deletion
+ deleted.erase(p->first);
+ }
+ if (updated.count(p->first)) {
+ if (updated[p->first] > p->second) continue; // we logged newer version.
+ rupdated.erase(updated[p->first]); // hose older update
+ }
+ dout(10) << "assimilate_summary " << hex << p->first << dec
+ << " v " << p->second << endl;
+ assert(log.bottom == 0 || p->second < log.bottom);
updated[p->first] = p->second;
rupdated[p->second] = p->first;
}
+
+ // at this point, 'updated' should reflect the correct object set.
- bottom = 0;
+ // fetch local object set
+ PGSummary local;
+ generate_summary(local);
+
+ // look for things we have but shouldn't!
+ for (map<object_t,version_t>::const_iterator p = local.objects.begin();
+ p != local.objects.end();
+ p++) {
+ if (updated.count(p->first) == 0) {
+ assert(p->second <= last_complete);
+ dout(10) << "assimilate_summary removing " << hex << oid << dec
+ << " v " << ov << " < " << last_complete << endl;
+ osd->store->remove(oid);
+ }
+ }
}
-
+*/
ostream& PG::PGLog::print(ostream& out) const
{
version_t newest_update = info.last_update;
int newest_update_osd = osd->whoami;
version_t oldest_update_needed = info.last_update; // only of acting (current) osd set
+ version_t peers_complete_thru = info.last_complete;
for (map<int,PGInfo>::iterator it = peer_info.begin();
it != peer_info.end();
newest_update = it->second.last_update;
newest_update_osd = it->first;
}
- if (is_acting(it->first) &&
- it->second.last_update < oldest_update_needed)
- oldest_update_needed = it->second.last_update;
+ if (is_acting(it->first)) {
+ if (it->second.last_update < oldest_update_needed)
+ oldest_update_needed = it->second.last_update;
+ if (it->second.last_complete < peers_complete_thru)
+ peers_complete_thru = it->second.last_complete;
+ }
}
// get log?
} else {
dout(10) << " i have the most up-to-date pg v " << info.last_update << endl;
}
+ dout(10) << " peers_complete_thru " << peers_complete_thru << endl;
+ dout(10) << " oldest_update_needed " << oldest_update_needed << endl;
// -- is that the whole story? (is my log sufficient?)
- if (info.last_complete < log.bottom) {
- // nope. fetch a summary from someone.
+ if (info.last_complete < log.bottom && !log.backlog) {
+ // nope. fetch backlog from someone.
if (peer_summary_requested.count(newest_update_osd)) {
dout(10) << "i am complete thru " << info.last_complete
<< ", but my log starts at " << log.bottom
- << ". already waiting for summary from osd" << newest_update_osd
+ << ". already waiting for summary/backlog from osd" << newest_update_osd
<< endl;
} else {
dout(10) << "i am complete thru " << info.last_complete
<< ", but my log starts at " << log.bottom
- << ". fetching summary from osd" << newest_update_osd
+ << ". fetching summary/backlog from osd" << newest_update_osd
<< endl;
assert(newest_update_osd != osd->whoami); // can't be me!
query_map[newest_update_osd][info.pgid] = PG_QUERY_SUMMARY;
int peer = it->first;
if (peer_summary_requested.count(peer)) {
- dout(10) << " already requested summary from osd" << peer << endl;
+ dout(10) << " already requested summary/backlog from osd" << peer << endl;
waiting = true;
continue;
}
- dout(10) << " requesting summary from osd" << peer << endl;
+ dout(10) << " requesting summary/backlog from osd" << peer << endl;
query_map[peer][info.pgid] = PG_QUERY_INFO;
peer_summary_requested.insert(peer);
waiting = true;
}
-
+
if (!waiting) {
dout(10) << missing.num_lost() << " objects are still lost, waiting+hoping for a notify from someone else!" << endl;
}
return;
}
+ // sanity check
+ assert(missing.num_lost() == 0);
+ assert(info.last_complete >= log.bottom || log.backlog);
- // -- do i need to generate a larger log for any of my peers?
- PGSummary summary;
- if (oldest_update_needed > log.bottom) {
- dout(10) << "my log isn't long enough for all peers: bottom "
+ // -- do i need to generate backlog for any of my peers?
+ if (oldest_update_needed < log.bottom && !log.backlog) {
+ dout(10) << "generating backlog for some peers, bottom "
<< log.bottom << " > " << oldest_update_needed
<< endl;
- generate_summary(summary);
+ generate_backlog(log);
}
if (peer_info[peer].is_clean())
clean_set.insert(peer);
- if (peer_info[peer].last_update < log.bottom) {
- // need full summary
- dout(10) << "sending complete summary to osd" << peer
- << ", their last_update was " << peer_info[peer].last_update
- << endl;
- MOSDPGSummary *m = new MOSDPGSummary(osd->osdmap->get_epoch(),
- info.pgid,
- summary);
- osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
- } else {
- // need incremental (or no) log update.
- dout(10) << "sending incremental|empty log ("
- << peer_info[peer].last_update << "," << info.last_update
- << "] to osd" << peer << endl;
- dout(20) << " btw my whole log is " << log.print(cout) << endl;
- MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(),
- info.pgid);
- m->info = info;
- if (peer_info[peer].last_update < info.last_update) {
- m->log.copy_after(log, peer_info[peer].last_update);
+
+ MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(),
+ info.pgid);
+ m->info = info;
+
+ if (peer_info[peer].last_update == info.last_update) {
+ // empty log
+ }
+ else if (peer_info[peer].last_update < log.bottom) {
+ // summary/backlog
+ assert(log.backlog);
+ m->log = log;
+ }
+ else {
+ // incremental log
+ assert(peer_info[peer].last_update < info.last_update);
+ m->log.copy_after(log, peer_info[peer].last_update);
+ }
- // build missing list for them too
- for (map<object_t, version_t>::iterator p = m->log.updated.begin();
- p != m->log.updated.end();
- p++) {
- const object_t oid = p->first;
- const version_t v = p->second;
- m->missing.add(oid, v);
- if (missing.is_missing(oid, v)) { // we don't have it?
- assert(missing.loc.count(oid)); // nothing should be lost!
- m->missing.loc[oid] = missing.loc[oid];
- } else {
- m->missing.loc[oid] = osd->whoami; // we have it.
- }
- }
-
- }
- osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+ // build missing list for them too
+ for (map<object_t, version_t>::iterator p = m->log.updated.begin();
+ p != m->log.updated.end();
+ p++) {
+ const object_t oid = p->first;
+ const version_t v = p->second;
+ if (missing.is_missing(oid, v)) { // we don't have it?
+ assert(missing.loc.count(oid)); // nothing should be lost!
+ m->missing.add(oid, v);
+ m->missing.loc[oid] = missing.loc[oid];
+ }
+ // note: peer will assume we have it if we don't say otherwise.
+ // else m->missing.loc[oid] = osd->whoami; // we have it.
}
+
+ dout(10) << "sending " << m->log << " " << m->missing
+ << " to osd" << peer << endl;
+
+ osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
}
// all clean?
}
-void PG::generate_summary(PGSummary &summary)
-{
- dout(10) << "generating summary" << endl;
-
- list<object_t> olist;
- osd->store->collection_list(info.pgid, olist);
-
- for (list<object_t>::iterator it = olist.begin();
- it != olist.end();
- it++) {
- version_t v;
- osd->store->getattr(*it,
- "version",
- &v, sizeof(v));
- summary.objects[*it] = v;
- }
-
- dout(10) << summary.objects.size() << " local objects. " << endl;
-}
-
-
-
-
/**
* do one recovery op.
return false;
}
- if (!objects_pulling.count(p->second) && // not already pulling
+ if (!objects_pulling.count(p->second) && // not already pulling
missing.is_missing(p->second, p->first)) // and missing
break;
stray_set.clear();
}
+
+
+void PG::append_log(ObjectStore::Transaction& t, PG::PGLog::Entry& logentry, version_t trim_to)
+{
+ // write entry
+ bufferlist bl;
+ bl.append( (char*)&logentry, sizeof(logentry) );
+ t.write( info.pgid, ondisklog.top, bl.length(), bl );
+
+ // update block map?
+ if (ondisklog.top % 4096 == 0)
+ ondisklog.block_map[ondisklog.top] = logentry.version;
+
+ ondisklog.top += bl.length();
+ t.collection_setattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
+
+ // trim?
+ if (trim_to > log.bottom) {
+ dout(10) << " trimming " << log << " to " << trim_to << endl;
+ log.trim(t, trim_to);
+ if (ondisklog.trim_to(trim_to))
+ t.collection_setattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
+ }
+ dout(10) << " ondisklog bytes [" << ondisklog.bottom << "," << ondisklog.top << ")" << endl;
+}
+
+void PG::read_log(ObjectStore *store)
+{
+ // load bounds
+ store->collection_getattr(info.pgid, "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
+ store->collection_getattr(info.pgid, "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
+
+ // read
+ bufferlist bl;
+ store->read(info.pgid, ondisklog.bottom, ondisklog.top-ondisklog.bottom, bl);
+
+ off_t pos = ondisklog.bottom;
+ while (pos < ondisklog.top) {
+ PG::PGLog::Entry e;
+ bl.copy(pos-ondisklog.bottom, sizeof(e), (char*)&e);
+
+ if (pos % 4096 == 0)
+ ondisklog.block_map[pos] = e.version;
+
+ if (e.deleted)
+ log.add_delete(e.oid, e.version);
+ else
+ log.add_update(e.oid, e.version);
+
+ pos += sizeof(e);
+ }
+
+ store->collection_getattr(info.pgid, "top", &log.top, sizeof(log.top));
+ store->collection_getattr(info.pgid, "bottom", &log.bottom, sizeof(log.bottom));
+ store->collection_getattr(info.pgid, "backlog", &log.backlog, sizeof(log.backlog));
+}
+
class OSD;
-
#define PG_QUERY_INFO ((version_t)0xffffffffffffffffULL)
#define PG_QUERY_SUMMARY ((version_t)0xfffffffffffffffeULL)
epoch_t last_epoch_started; // last epoch started.
epoch_t last_epoch_finished; // last epoch finished.
epoch_t same_primary_since; // upper bound: same primary at least back through this epoch.
+ epoch_t same_role_since; // upper bound: i have held same role since
PGInfo(pg_t p=0) : pgid(p),
- last_update(0), last_complete(0),
+ last_update(0), last_complete(0),
last_epoch_started(0), last_epoch_finished(0),
- same_primary_since(0) {}
+ same_primary_since(0), same_role_since(0) {}
bool is_clean() { return last_update == last_complete; }
};
missing.erase(oid);
}
- void merge_loc(PGMissing &other) {
- if (num_lost() > 0) {
- // see if we can find anything new!
- cout << "merge_loc " << endl;//*this << " from " << other << endl;
- for (map<object_t,version_t>::iterator p = missing.begin();
- p != missing.end();
- p++) {
- assert(other.missing[p->first] >= missing[p->first]);
- if (other.loc.count(p->first))
- loc[p->first] = other.loc[p->first];
- }
- }
- }
-
void _encode(bufferlist& blist) {
::_encode(missing, blist);
::_encode(loc, blist);
object_t oid;
version_t version;
bool deleted;
+ Entry() {}
Entry(object_t o, version_t v, bool d=false) :
oid(o), version(v), deleted(d) {}
};
- version_t top; // corresponds to newest entry.
- version_t bottom; // corresponds to entry prev to oldest entry (t=bottom is trimmed).
- map<object_t, version_t> updated; // oid -> v. items > bottom, + version.
- map<version_t, object_t> rupdated; // v -> oid.
- map<object_t, version_t> deleted; // oid -> when. items <= bottom that no longer exist
+ /** top, bottom
+ * top - newest entry (update|delete)
+ * bottom - entry previous to oldest (update|delete) for which we have
+ * complete negative information.
+ * i.e. we can infer pg contents for any store whose last_update >= bottom.
+ */
+ version_t top; // newest entry (update|delete)
+ version_t bottom; // version prior to oldest (update|delete)
+
+ /** backlog - true if log is a complete summary of pg contents.
+ * updated will include all items in pg, but deleted will not include
+ * negative entries for items deleted prior to 'bottom'.
+ */
+ bool backlog;
+
+ /** update, deleted maps **/
+ map<object_t, version_t> updated; // oid -> v.
+ map<version_t, object_t> rupdated; // v -> oid.
+ map<object_t, version_t> deleted; // oid -> when.
map<version_t, object_t> rdeleted; // when -> oid.
- PGLog() : top(0), bottom(0) {}
+ /****/
+ PGLog() : top(0), bottom(0), backlog(false) {}
bool empty() const {
return top == 0;
void _encode(bufferlist& blist) const {
blist.append((char*)&top, sizeof(top));
blist.append((char*)&bottom, sizeof(bottom));
+ blist.append((char*)&backlog, sizeof(backlog));
::_encode(updated, blist);
::_encode(deleted, blist);
}
off += sizeof(top);
blist.copy(off, sizeof(bottom), (char*)&bottom);
off += sizeof(bottom);
+ blist.copy(off, sizeof(backlog), (char*)&backlog);
+ off += sizeof(backlog);
+
::_decode(updated, blist, off);
::_decode(deleted, blist, off);
top = when;
}
- void trim(version_t s);
+ void trim(ObjectStore::Transaction &t, version_t s);
void copy_after(const PGLog &other, version_t v);
- void merge(const PGLog &other, PGMissing &missing);
- void assimilate_summary(const PGSummary &sum, version_t last_complete);
ostream& print(ostream& out) const;
};
class PGOndiskLog {
- off_t pos;
public:
- PGOndiskLog() : pos(0) {}
+ off_t bottom, top;
+ map<off_t,version_t> block_map; // block -> first stamp logged there
- off_t get_write_pos() {
- return pos;
- }
- void inc_write_pos(int i) {
- pos += i;
+ PGOndiskLog() : bottom(0), top(0) {}
+
+ bool trim_to(version_t v) {
+ map<off_t,version_t>::iterator p = block_map.upper_bound(v);
+ if (p == block_map.begin()) return false;
+ p--;
+ if (p == block_map.begin()) return false;
+
+ while (1) {
+ map<off_t,version_t>::iterator t = block_map.begin();
+ if (t == p) break;
+ block_map.erase(t);
+ }
+ bottom = p->first;
+ return true;
}
};
PGOndiskLog ondisklog;
PGMissing missing;
+
protected:
int role; // 0 = primary, 1 = replica, -1=none.
int state; // see bit defns above
public:
vector<int> acting;
epoch_t last_epoch_started_any;
+ version_t last_complete_commit;
protected:
// [primary only] content recovery state
+ version_t peers_complete_thru;
set<int> prior_set; // current+prior OSDs, as defined by last_epoch_started_any.
set<int> stray_set; // non-acting osds that have PG data.
set<int> clean_set; // current OSDs that are clean
set<int> peer_summary_requested;
friend class OSD;
+ map<__uint64_t, class OSDReplicaOp*> replica_ops;
+ map<int, set<__uint64_t> > replica_tids_by_osd; // osd -> (tid,...)
+
+
+ // [primary|replica]
+ // pg waiters
+ list<class Message*> waiting_for_active;
+ hash_map<object_t,
+ list<class Message*> > waiting_for_missing_object;
+
+ // recovery
+ version_t requested_thru;
+ map<object_t, version_t> objects_pulling; // which objects are currently being pulled
+
public:
void clear_primary_recovery_state() {
peer_info.clear();
peer_info_requested.clear();
peer_log_requested.clear();
clear_primary_recovery_state();
+ peers_complete_thru = 0;
}
public:
void build_prior();
void adjust_prior(); // based on new peer_info.last_epoch_started_any
- // pg waiters
- list<class Message*> waiting_for_active;
- hash_map<object_t,
- list<class Message*> > waiting_for_missing_object;
+ bool adjust_peers_complete_thru() {
+ version_t t = info.last_complete;
+ for (unsigned i=1; i<acting.size(); i++)
+ if (peer_info[i].last_complete < t)
+ t = peer_info[i].last_complete;
+ if (t > peers_complete_thru) {
+ peers_complete_thru = t;
+ return true;
+ }
+ return false;
+ }
+
+ void generate_backlog(PGLog& log); // generate summary backlog by scanning store.
+ void merge_log(const PGLog &olog);
- // recovery
- version_t requested_thru;
- map<object_t, version_t> objects_pulling; // which objects are currently being pulled
-
void peer(map< int, map<pg_t,version_t> >& query_map);
- void generate_summary(PGSummary &summary);
bool do_recovery();
+ void start_recovery() {
+ requested_thru = 0;
+ do_recovery();
+ }
void clean_up_local();
void clean_replicas();
osd(o),
info(p),
role(0),
- state(0)
+ state(0),
+ last_epoch_started_any(0),
+ last_complete_commit(0),
+ peers_complete_thru(0),
+ requested_thru(0)
{ }
pg_t get_pgid() const { return info.pgid; }
bool is_clean() const { return state_test(STATE_CLEAN); }
bool is_stray() const { return state_test(STATE_STRAY); }
+ bool is_empty() const { return info.last_complete == 0; }
+
int num_active_ops() const {
return objects_pulling.size();
}
-
-
- // pg state storage
- /*
- void store() {
- if (!osd->store->collection_exists(pgid))
- osd->store->create_collection(pgid);
- // ***
- }
- void fetch() {
- //osd->store->collection_getattr(pgid, "role", &role, sizeof(role));
- //osd->store->collection_getattr(pgid, "primary_since", &primary_since, sizeof(primary_since));
- //osd->store->collection_getattr(pgid, "state", &state, sizeof(state));
- }
- void list_objects(list<object_t>& ls) {
- osd->store->collection_list(pgid, ls);
- }*/
+ // pg on-disk state
+ void append_log(ObjectStore::Transaction& t, PG::PGLog::Entry& logentry, version_t trim_to);
+ void read_log(ObjectStore *store);
};
inline ostream& operator<<(ostream& out, const PG::PGLog& log)
{
- return out << "log(" << log.bottom << "," << log.top << "]";
+ out << "log(" << log.bottom << "," << log.top << "]";
+ if (log.backlog) out << "+backlog";
+ return out;
}
inline ostream& operator<<(ostream& out, const PG::PGMissing& missing)
--- /dev/null
+#!/usr/bin/perl
+
+use strict;
+my %op;
+
+my $line = 0;
+while (<>) {
+ #print "$line: $_";
+ $line++;
+
+ #osd3 do_op MOSDOp(client0.933 oid 100000000000008 0x84b4480) in pg[pginfo(4020000000d v 5662/0 e 2/1) r=0 active (0,5662]]
+ if (my ($from, $opno, $oid, $op) = /do_op MOSDOp\((\S+) op (\d+) oid (\d+) (\w+)\)/) {
+# print "$op\n";
+ if ($opno == 2 || $opno == 11 || $opno == 12 || $opno == 14 || $opno == 15) {
+ $op{$op} = $from;
+ }
+ }
+
+ # commits
+ #osd1 op_modify_commit on op MOSDOp(client1.289 oid 100000100000002 0x51a2f788)
+ if (my ($op) = /op_modify_commit.* (\w+)\)/) {
+ delete $op{$op};
+ }
+ #osd4 rep_modify_commit on op MOSDOp(osd3.289 oid 100000000000008 0x84b0980)
+ if (my ($op) = /rep_modify_commit.* (\w+)\)/) {
+ delete $op{$op};
+ }
+
+ # forwarded?
+ if (my ($op) = /sending (\w+) to osd/) {
+ delete $op{$op};
+ }
+
+}
+
+for my $op (keys %op) {
+ print "---- lost op $op $op{$op}\n";
+}
--- /dev/null
+#!/usr/bin/perl
+
+use strict;
+my %ack;
+my %commit;
+
+my $line = 0;
+while (<>) {
+ #print "$line: $_";
+ $line++;
+
+ #client1.objecter write tid 305 osd1 oid 100000100000002 922848~10000
+ if (my ($who, $tid) = /^(\S+)\.objecter write tid\D+(\d+)\D+osd/) {
+# print "$who.$tid\n";
+ $ack{"$who.$tid"} = $line;
+ $commit{"$who.$tid"} = $line;
+ }
+
+ #client1.objecter handle_osd_write_reply 304 commit 0
+ #client1.objecter handle_osd_write_reply 777 commit 1
+ if (my ($who, $tid, $commit) = /^(\S+)\.objecter handle_osd_write_reply\D+(\d+)\D+commit\D+(\d)/) {
+# print "$who.$tid\n";
+ delete $ack{"$who.$tid"};
+ delete $commit{"$who.$tid"} if $commit;
+ }
+
+}
+
+for my $op (keys %commit) {
+ print "---- lost commit $op $commit{$op}\n";
+}
+for my $op (keys %ack) {
+ print "---- lost ack $op $commit{$op}\n";
+}
#include "mds/MDCluster.h"
#include "mds/MDS.h"
#include "osd/OSD.h"
+#include "mds/OSDMonitor.h"
#include "client/Client.h"
#include "client/SyntheticClient.h"
int started = 0;
//if (myrank == 0) g_conf.debug = 20;
+
+ // create mon
+ if (myrank == 0) {
+ OSDMonitor *mon = new OSDMonitor(0, new TCPMessenger(MSG_ADDR_MON(0)));
+ mon->init();
+ }
// create mds
MDS *mds[NUMMDS];