osd/FakeStore.o\
osd/Filer.o\
osd/OSDMap.o\
+ osd/RG.o\
osd/rush.o\
common/Logger.o\
common/Clock.o\
{
// which client am i?
whoami = MSG_ADDR_NUM(m->get_myaddr());
- cout << "i am client " << whoami << " " << MSG_ADDR_NICE(m->get_myaddr()) << endl;
-
mounted = false;
fake_clock: false,
fakemessenger_serialize: true,
+ fake_osdmap_expand: false,
debug: 1,
debug_mds_balancer: 1,
else if (strcmp(args[i], "--numosd") == 0)
g_conf.num_osd = atoi(args[++i]);
+ else if (strcmp(args[i], "--fake_osdmap_expand") == 0)
+ g_conf.fake_osdmap_expand = atoi(args[++i]);
+
else if (strcmp(args[i], "--debug") == 0)
g_conf.debug = atoi(args[++i]);
else if (strcmp(args[i], "--debug_mds_balancer") == 0)
bool fake_clock;
bool fakemessenger_serialize;
+ bool fake_osdmap_expand;
+
int debug;
int debug_mds_balancer;
int debug_mds_log;
bool mds_commit_on_shutdown;
bool mds_verify_export_dirauth; // debug flag
+
// osd
int osd_num_rg;
bool osd_fsync;
_buffer(other._buffer),
_len(other._len),
_off(other._off) {
- _buffer->_get();
+ if (_buffer) _buffer->_get();
}
// assignment operator
bufferptr& operator=(const bufferptr& other) {
- assert(0);
+ //assert(0);
// discard old
discard_buffer();
_buffer = other._buffer;
_len = other._len;
_off = other._off;
- _buffer->_get();
+ if (_buffer) _buffer->_get();
+ return *this;
}
~bufferptr() {
// -- inode --
+typedef __uint64_t version_t;
+
typedef __uint64_t inodeno_t; // ino
#define INODE_MODE_FILE 0100000 // S_IFREG
time_t ctime;
// hard (permissions)
- mode_t mode;
- uid_t uid;
- gid_t gid;
+ mode_t mode;
+ uid_t uid;
+ gid_t gid;
+ FileLayout layout;
// soft
__uint64_t size;
time_t atime, mtime; // maybe atime different? "lazy"?
+ int nlink;
// special stuff
- FileLayout layout;
- unsigned char hash_seed; // 0 if not hashed.
- int nlink;
- bool anchored;
- __uint64_t file_data_version;
+ unsigned char hash_seed; // only defined for dir; 0 if not hashed.
+ bool anchored; // auth only
+ version_t file_data_version; // auth only
};
#define RG_NONE 0xffffffffffffffffLL
+struct onode_t {
+ object_t oid;
+ repgroup_t rgid;
+ version_t version;
+ size_t size;
+ //time_t ctime, mtime;
+};
+
+
+
// client types
typedef int fh_t; // file handle
// HACK osd map change
- if (false) {
+ if (g_conf.fake_osdmap_expand) {
static int didit = 0;
if (whoami == 0 &&
elapsed.sec() > 10 && !didit &&
* oid - object id
* op - OSD_OP_DELETE, etc.
*
- * rg_role - who we want ... 0 == primary, this is what clients/mds will do.
- * rg_nrep - how many replicas we want ... just for writes currently?
- *
*/
+#define OSD_OP_MKFS 20
+
+// client ops
#define OSD_OP_READ 1
#define OSD_OP_WRITE 2
#define OSD_OP_STAT 10
#define OSD_OP_DELETE 11
#define OSD_OP_TRUNCATE 12
#define OSD_OP_ZERORANGE 13
-#define OSD_OP_MKFS 20
+
+// replication/recovery -- these ops are relative to a specific object version #
+#define OSD_OP_REP_PULL 30 // whole object read
+#define OSD_OP_REP_PUSH 31 // whole object write
+#define OSD_OP_REP_REMOVE 32 // delete replica
+#define OSD_OP_REP_WRITE 33 // replicated (partial object) write
#define OSD_OP_FLAG_TRUNCATE 1 // truncate object after end of write
object_t oid;
repgroup_t rg;
int rg_role;//, rg_nrep;
- __uint64_t map_version;
+ version_t map_version;
int op;
size_t length, offset;
+ version_t version;
size_t _data_len;
} MOSDOp_st;
friend class MOSDOpReply;
public:
- long get_tid() { return st.tid; }
+ long get_tid() { return st.tid; }
msg_addr_t get_asker() { return st.asker; }
- object_t get_oid() { return st.oid; }
+ object_t get_oid() { return st.oid; }
repgroup_t get_rg() { return st.rg; }
+ version_t get_map_version() { return st.map_version; }
+
int get_rg_role() { return st.rg_role; } // who am i asking for?
- //int get_rg_nrep() { return st.rg_nrep; }
- __uint64_t get_map_version() { return st.map_version; }
+ version_t get_version() { return st.version; }
- int get_op() { return st.op; }
+ int get_op() { return st.op; }
size_t get_length() { return st.length; }
size_t get_offset() { return st.offset; }
long get_pcid() { return st.pcid; }
MOSDOp(long tid, msg_addr_t asker,
- object_t oid, repgroup_t rg, __uint64_t mapversion, int op) :
+ object_t oid, repgroup_t rg, version_t mapversion, int op) :
Message(MSG_OSD_OP) {
memset(&st, 0, sizeof(st));
this->st.tid = tid;
void set_length(size_t l) { st.length = l; }
void set_offset(size_t o) { st.offset = o; }
-
+ void set_version(version_t v) { st.version = v; }
// marshalling
virtual void decode_payload() {
int result;
size_t length, offset;
size_t object_size;
+ version_t version;
__uint64_t _new_map_version;
size_t _data_len, _oc_len;
size_t get_length() { return st.length; }
size_t get_offset() { return st.offset; }
size_t get_object_size() { return st.object_size; }
+ version_t get_version() { return st.version; }
void set_result(int r) { st.result = r; }
void set_length(size_t s) { st.length = s; }
this->st.length = req->st.length; // speculative... OSD should ensure these are correct
this->st.offset = req->st.offset;
+ this->st.version = req->st.version;
// attach updated cluster spec?
if (oc &&
#undef dout
#define dout(l) if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
+#include "include/bufferlist.h"
+
+#include <map>
+#include <ext/hash_map>
+using namespace __gnu_cxx;
// crap-a-crap hash
#define HASH_DIRS 128LL
// end crap hash
+map<int, hash_map<object_t, map<const char*, bufferptr> > > fakeattrs;
+
FakeStore::FakeStore(char *base, int whoami)
{
int FakeStore::setattr(object_t oid, const char *name,
void *value, size_t size)
{
- string fn;
- get_oname(oid, fn);
- return setxattr(fn.c_str(), name, value, size, 0);
+ if (1) {
+ bufferptr bp(new buffer((char*)value,size));
+ fakeattrs[whoami][oid][name] = bp;
+ return 0;
+ } else {
+ string fn;
+ get_oname(oid, fn);
+ int r = setxattr(fn.c_str(), name, value, size, 0);
+ if (r == -1)
+ cerr << " errno is " << errno << " " << strerror(errno) << endl;
+ assert(r == 0);
+ return r;
+ }
}
int FakeStore::getattr(object_t oid, const char *name,
void *value, size_t size)
{
- string fn;
- get_oname(oid, fn);
- return getxattr(fn.c_str(), name, value, size);
+ if (1) {
+ if (fakeattrs[whoami][oid].count(name)) {
+ size_t l = fakeattrs[whoami][oid][name].length();
+ if (l > size) l = size;
+ bufferlist bl;
+ bl.append(fakeattrs[whoami][oid][name]);
+ bl.copy(0, l, (char*)value);
+ return l;
+ } else {
+ return -1;
+ }
+ } else {
+ string fn;
+ get_oname(oid, fn);
+ int r = getxattr(fn.c_str(), name, value, size);
+ // assert(r == 0);
+ return r;
+ }
}
int FakeStore::listattr(object_t oid, char *attrs, size_t size)
last_tid = 0;
+ max_recovery_ops = 5;
+
+ pending_ops = 0;
+ waiting_for_no_ops = false;
+
+
// use fake store
#ifdef USE_OBFS
store = new OBFSStore(whoami, NULL, "/dev/sdb3");
+// object locks
+void OSD::lock_object(object_t oid)
+{
+ osd_lock.Lock();
+ if (object_lock.count(oid)) {
+ Cond c;
+ dout(7) << "lock_object " << hex << oid << dec << " waiting as " << &c << endl;
+ object_lock_waiters[oid].push_back(&c);
+ c.Wait(osd_lock);
+ assert(object_lock.count(oid));
+ } else {
+ dout(7) << "lock_object " << hex << oid << dec << endl;
+ object_lock.insert(oid);
+ }
+ osd_lock.Unlock();
+}
+void OSD::unlock_object(object_t oid)
+{
+ osd_lock.Lock();
+ assert(object_lock.count(oid));
+ if (object_lock_waiters.count(oid)) {
+ // someone is in line
+ list<Cond*>& ls = object_lock_waiters[oid];
+ Cond *c = ls.front();
+ dout(7) << "unlock_object " << hex << oid << dec << " waking up next guy " << c << endl;
+ ls.pop_front();
+ if (ls.empty())
+ object_lock_waiters.erase(oid);
+ c->Signal();
+ } else {
+ // nobody waiting
+ dout(7) << "unlock_object " << hex << oid << dec << endl;
+ object_lock.erase(oid);
+ }
+ osd_lock.Unlock();
+}
// --------------------------------------
dout(1) << " got unknown message " << m->get_type() << endl;
assert(0);
}
+
+ if (!finished.empty()) {
+ list<Message*> waiting;
+ waiting.splice(waiting.begin(), finished);
+ for (list<Message*>::iterator it = waiting.begin();
+ it != waiting.end();
+ it++) {
+ dispatch(*it);
+ }
+ }
+
}
void OSD::handle_op_reply(MOSDOpReply *m)
{
- replica_write_lock.Lock();
- MOSDOp *op = replica_writes[m->get_tid()];
- dout(7) << "got replica write ack tid " << m->get_tid() << " orig op " << op << endl;
+ switch (m->get_op()) {
+ case OSD_OP_REP_PULL:
+ op_rep_pull_reply(m);
+ break;
+ case OSD_OP_REP_PUSH:
+ op_rep_push_reply(m);
+ break;
+ case OSD_OP_REP_REMOVE:
+ op_rep_remove_reply(m);
+ break;
- replica_write_tids[op].erase(m->get_tid());
- if (replica_write_tids[op].empty())
- replica_write_cond[op]->Signal();
+ case OSD_OP_REP_WRITE:
+ { // oldcrap
+ /*
+ replica_write_lock.Lock();
+ MOSDOp *op = replica_writes[m->get_tid()];
+ dout(7) << "got replica write ack tid " << m->get_tid() << " orig op " << op << endl;
+
+ replica_write_tids[op].erase(m->get_tid());
+ if (replica_write_tids[op].empty())
+ replica_write_cond[op]->Signal();
+
+ replica_write_lock.Unlock();
+ */
+ }
+ break;
- replica_write_lock.Unlock();
+ default:
+ assert(0);
+ }
}
void OSD::handle_osd_map(MOSDMap *m)
{
- // SAB
- osd_lock.Lock();
+ // wait for ops to finish
+ wait_for_no_ops();
+
+ osd_lock.Lock(); // actually, don't need this if we finish all ops?
if (!osdmap ||
m->get_version() > osdmap->get_version()) {
delete m;
// process waiters
- list<Message*> waiting;
- waiting.splice(waiting.begin(), waiting_for_osdmap);
+ take_waiters(waiting_for_osdmap);
- osd_lock.Unlock();
-
- for (list<Message*>::iterator it = waiting.begin();
- it != waiting.end();
- it++) {
- dispatch(*it);
- }
} else {
dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
- osd_lock.Unlock();
}
+
+ osd_lock.Unlock();
}
}
-bool OSD::rg_exists(repgroup_t rg)
+bool OSD::rg_exists(repgroup_t rgid)
{
struct stat st;
- if (store->collection_stat(rg, &st) == 0)
+ if (store->collection_stat(rgid, &st) == 0)
return true;
else
return false;
}
-RG *OSD::open_rg(repgroup_t rg)
+RG *OSD::open_rg(repgroup_t rgid)
{
// already open?
- if (rg_map.count(rg))
- return rg_map[rg];
-
- // stat collection
- RG *r = new RG(rg);
- if (rg_exists(rg)) {
- // exists
- r->fetch(store);
- } else {
- // dne
- r->store(store);
- }
- rg_map[rg] = r;
+ if (rg_map.count(rgid))
+ return rg_map[rgid];
- return r;
+ // exists?
+ if (!rg_exists(rgid))
+ return 0;
+
+ // open, stat collection
+ RG *rg = new RG(whoami, rgid);
+ rg->fetch(store);
+ rg_map[rgid] = rg;
+
+ return rg;
}
+RG *OSD::new_rg(repgroup_t rgid)
+{
+ assert(rg_map.count(rgid) == 0);
+ assert(!rg_exists(rgid));
+
+ RG *rg = new RG(whoami, rgid);
+ rg->store(store);
+ rg_map[rgid] = rg;
+ return rg;
+}
int nrep = osdmap->repgroup_to_acting_osds(rgid, acting);
assert(nrep > 0);
int primary = acting[0];
- int role = -1;
+ int role = -1; // -1, 0, 1
for (int i=0; i<nrep; i++)
- if (acting[i] == whoami) role = i;
+ if (acting[i] == whoami) role = i>0 ? 1:0;
-
if (role != rg->get_role()) {
// role change.
- dout(10) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl;
+ dout(10) << " rg " << hex << rgid << dec << " acting role change " << rg->get_role() << " -> " << role << endl;
// am i old-primary?
if (rg->get_role() == 0) {
for (map<int, RGPeer*>::iterator it = rg->get_peers().begin();
it != rg->get_peers().end();
it++) {
- dout(10) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
+ dout(10) << " rg " << hex << rgid << dec << " old-primary, dropping old peer " << it->first << endl;
rg->get_old_replica_set().insert(it->first);
delete it->second;
}
// did primary change?
if (primary != rg->get_primary()) {
- dout(10) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
+ dout(10) << " rg " << hex << rgid << dec << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
// re-peer
rg->state_clear(RG_STATE_PEERED);
// check replicas
for (int r=1; r<nrep; r++) {
if (rg->get_peer(r) == 0) {
- dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+ dout(10) << " rg " << hex << rgid << dec << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
start_map[acting[r]][rg] = r;
}
}
}
+
/** peer_notify
* Send an MOSDRGNotify to a primary, with a list of RGs that I have
* content for, and they are primary for.
assert(nrep > 0);
assert(acting[0] == whoami);
- // get/open RG
+ // open RG?
RG *rg = open_rg(rgid);
// previously unknown RG?
- if (rg->get_peers().empty()) {
- dout(10) << " rg " << rgid << " is new" << endl;
+ if (!rg) {
+ rg = new_rg(rgid);
+ dout(10) << " rg " << hex << rgid << dec << " is new, nrep=" << nrep << endl;
for (int r=1; r<nrep; r++) {
- if (rg->get_peer(r) == 0) {
- dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
- start_map[acting[r]][rg] = r;
- }
+ dout(10) << " rg " << hex << rgid << dec << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+ start_map[acting[r]][rg] = r;
}
}
// peered with this guy specifically?
RGPeer *rgp = rg->get_peer(from);
if (!rgp && start_map[from].count(rg) == 0) {
- dout(7) << " rg " << rgid << " primary needs to peer with residual notifier osd" << from << endl;
+ dout(7) << " rg " << hex << rgid << dec << " primary needs to peer with residual notifier osd" << from << endl;
start_map[from][rg] = -1;
}
}
it++) {
repgroup_t rgid = *it;
+ // open RG
+ RG *rg = open_rg(rgid);
+
// dne?
- if (!rg_exists(rgid)) {
- dout(10) << " rg " << rgid << " dne" << endl;
- ack->rg_dne.push_back(rgid);
- continue;
+ if (!rg) {
+ // get active rush mapping
+ vector<int> acting;
+ int nrep = osdmap->repgroup_to_acting_osds(rgid, acting);
+ assert(nrep > 0);
+ int role = -1;
+ for (int i=0; i<nrep; i++)
+ if (acting[i] == whoami) role = i>0 ? 1:0;
+ assert(role != 0);
+
+ if (role < 0) {
+ dout(10) << " rg " << hex << rgid << dec << " dne, and i am not an active replica" << endl;
+ ack->rg_dne.push_back(rgid);
+ continue;
+ }
+
+ dout(10) << " rg " << hex << rgid << dec << " dne (yet), but i am new active replica " << role << endl;
+ rg = new_rg(rgid);
}
- // get/open RG
- RG *rg = open_rg(rgid);
-
// report back state and rg content
ack->rg_state[rgid].state = rg->get_state();
ack->rg_state[rgid].deleted = rg->get_deleted_objects();
// list objects
- list<object_t> olist;
- rg->list_objects(store,olist);
-
- dout(10) << " rg " << rgid << " has state " << rg->get_state() << ", " << olist.size() << " objects" << endl;
-
- for (list<object_t>::iterator it = olist.begin();
- it != olist.end();
- it++) {
- version_t v = 0;
- store->getattr(*it,
- "version",
- &v, sizeof(v));
- ack->rg_state[rgid].objects[*it] = v;
- }
+ rg->scan_local_objects(store);
+ ack->rg_state[rgid].objects = rg->local_objects;
+
+ dout(10) << " rg " << hex << rgid << dec << " has state " << rg->get_state() << ", " << ack->rg_state[rgid].objects.size() << " objects" << endl;
}
// reply
for (list<repgroup_t>::iterator it = m->rg_dne.begin();
it != m->rg_dne.end();
it++) {
- dout(10) << " rg " << *it << " dne on osd" << from << endl;
+ dout(10) << " rg " << hex << *it << dec << " dne on osd" << from << endl;
RG *rg = open_rg(*it);
assert(rg);
for (map<repgroup_t, RGReplicaInfo>::iterator it = m->rg_state.begin();
it != m->rg_state.end();
it++) {
- dout(10) << " rg " << it->first << " got state " << it->second.state
+ dout(10) << " rg " << hex << it->first << dec << " got state " << it->second.state
<< " " << it->second.objects.size() << " objects, "
<< it->second.deleted.size() << " deleted" << endl;
assert(rgp);
rgp->peer_state = it->second;
+ rgp->state_set(RG_PEER_STATE_ACTIVE);
+
+ // fully peered?
+ bool fully = true;
+ for (map<int, RGPeer*>::iterator pit = rg->get_peers().begin();
+ pit != rg->get_peers().end();
+ pit++) {
+ if (!pit->second->is_active()) fully = false;
+ }
+
+ if (fully) {
+ dout(10) << " rg " << hex << it->first << dec << " fully peered, analyzing" << endl;
+ rg->mark_peered();
+ rg->analyze_peers(store);
+
+ do_recovery(rg);
+ }
}
// done
+// RECOVERY
+
+void OSD::do_recovery(RG *rg)
+{
+ // pull?
+ if (!rg->is_complete()) {
+ rg_pull(rg, max_recovery_ops);
+ } else {
+ if (!rg->is_clean()) {
+ rg_push(rg, max_recovery_ops);
+ rg_clean(rg, max_recovery_ops);
+ }
+ }
+}
+
+
+// pull
+
+void OSD::rg_pull(RG *rg, int maxops)
+{
+ int ops = rg->num_active_ops();
+
+ dout(7) << "rg_pull rg " << hex << rg->get_rgid() << dec << " " << ops << "/" << maxops << " active ops" << endl;
+
+ while (ops < maxops) {
+ object_t oid;
+ version_t v;
+ int peer;
+ if (!rg->pull_plan.get_next(oid, v, peer)) break;
+ RGPeer *rgp = rg->get_proxy_peer(oid);
+ if (rgp == 0) {
+ dout(7) << " apparently already pulled " << hex << oid << dec << endl;
+ continue;
+ }
+ if (rgp->is_pulling(oid)) {
+ dout(7) << " already pulling " << hex << oid << dec << endl;
+ continue;
+ }
+ pull_replica(oid, v, rgp);
+ ops++;
+ }
+}
+
+void OSD::pull_replica(object_t oid, version_t v, RGPeer *p)
+{
+ dout(7) << "pull_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
+
+ // add to fetching list
+ p->pull(oid, v);
+
+ // send op
+ __uint64_t tid = ++last_tid;
+ MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
+ oid, p->rg->get_rgid(),
+ osdmap->get_version(),
+ OSD_OP_REP_PULL);
+ op->set_version(v);
+ op->set_rg_role(-1); // whatever, not 0
+ messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+
+ // register
+ pull_ops[tid] = p;
+}
+
+void OSD::op_rep_pull(MOSDOp *op)
+{
+ dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+ lock_object(op->get_oid());
+
+ // get object size
+ struct stat st;
+ int r = store->stat(op->get_oid(), &st);
+ assert(r == 0);
+
+ // check version
+ version_t v = 0;
+ store->getattr(op->get_oid(), "version", &v, sizeof(v));
+ assert(v == op->get_version());
+
+ // read
+ bufferptr bptr = new buffer(st.st_size); // prealloc space for entire read
+ long got = store->read(op->get_oid(),
+ st.st_size, 0,
+ bptr.c_str());
+ assert(got == st.st_size);
+
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap);
+ bptr.set_length(got); // properly size the buffer
+ bufferlist bl;
+ bl.push_back( bptr );
+ reply->set_result(0);
+ reply->set_data(bl);
+ reply->set_length(got);
+ reply->set_offset(0);
+
+ messenger->send_message(reply, op->get_asker());
+
+ unlock_object(op->get_oid());
+ delete op;
+}
+
+void OSD::op_rep_pull_reply(MOSDOpReply *op)
+{
+ dout(7) << "op_rep_pull_reply " << hex << op->get_oid() << dec << " size " << op->get_length() << endl;
+
+ osd_lock.Lock();
+ RGPeer *p = pull_ops[op->get_tid()];
+ RG *rg = p->rg;
+ assert(p); // FIXME: how will this work?
+ assert(p->is_pulling(op->get_oid()));
+ assert(p->pulling_version(op->get_oid()) == op->get_version());
+ osd_lock.Unlock();
+
+ // write it and add it to the RG
+ store->write(op->get_oid(), op->get_length(), 0, op->get_data().c_str());
+ p->rg->add_object(store, op->get_oid());
+
+ // close out pull op.
+ osd_lock.Lock();
+ pull_ops.erase(op->get_tid());
+ rg->pulled(op->get_oid(), op->get_version(), p);
+
+ // finish waiters
+ if (waiting_for_object.count(op->get_oid()))
+ take_waiters(waiting_for_object[op->get_oid()]);
+
+ // more?
+ do_recovery(rg);
+
+ osd_lock.Unlock();
+
+ delete op;
+}
+
+
+// push
+
+void OSD::rg_push(RG *rg, int maxops)
+{
+ int ops = rg->num_active_ops();
+
+ dout(7) << "rg_push rg " << hex << rg->get_rgid() << dec << " " << ops << "/" << maxops << " active ops" << endl;
+
+ while (ops < maxops) {
+ object_t oid;
+ version_t v;
+ int peer;
+ if (!rg->push_plan.get_next(oid, v, peer)) break;
+
+ RGPeer *p = rg->get_peer(peer);
+ assert(p);
+ push_replica(oid, v, p);
+ ops++;
+ }
+
+}
+
+void OSD::push_replica(object_t oid, version_t v, RGPeer *p)
+{
+ dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osd" << p->get_peer() << endl;
+
+ // add to list
+ p->push(oid, v);
+
+ // send op
+ __uint64_t tid = ++last_tid;
+ MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
+ oid, p->rg->get_rgid(),
+ osdmap->get_version(),
+ OSD_OP_REP_PUSH);
+ op->set_version(v);
+ op->set_rg_role(-1); // whatever, not 0
+
+ // include object content
+ struct stat st;
+ store->stat(oid, &st);
+ bufferptr b = new buffer(st.st_size);
+ store->read(oid, st.st_size, 0, b.c_str());
+ op->get_data().append(b);
+ op->set_length(st.st_size);
+ op->set_offset(0);
+
+ messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+
+ // register
+ push_ops[tid] = p;
+}
+
+void OSD::op_rep_push(MOSDOp *op)
+{
+ dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+ lock_object(op->get_oid());
+
+ // exists?
+ if (store->exists(op->get_oid())) {
+ store->truncate(op->get_oid(), 0);
+
+ version_t ov = 0;
+ store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
+ assert(ov <= op->get_version());
+ }
+
+ // write out buffers
+ bufferlist bl;
+ bl.claim( op->get_data() );
+
+ off_t off = 0;
+ for (list<bufferptr>::iterator it = bl.buffers().begin();
+ it != bl.buffers().end();
+ it++) {
+ int r = store->write(op->get_oid(),
+ (*it).length(), off,
+ (*it).c_str(),
+ false); // write async, no rush
+ assert((unsigned)r == (*it).length());
+ off += (*it).length();
+ }
+
+ // set version
+ version_t v = op->get_version();
+ store->setattr(op->get_oid(), "version", &v, sizeof(v));
+
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap);
+ messenger->send_message(reply, op->get_asker());
+
+ unlock_object(op->get_oid());
+ delete op;
+}
+
+void OSD::op_rep_push_reply(MOSDOpReply *op)
+{
+ dout(7) << "op_rep_push_reply " << hex << op->get_oid() << dec << endl;
+
+ osd_lock.Lock();
+ RGPeer *p = push_ops[op->get_tid()];
+ RG *rg = p->rg;
+ assert(p); // FIXME: how will this work?
+ assert(p->is_pushing(op->get_oid()));
+ assert(p->pushing_version(op->get_oid()) == op->get_version());
+
+ // close out push op.
+ push_ops.erase(op->get_tid());
+ rg->pushed(op->get_oid(), op->get_version(), p);
+
+ // more?
+ do_recovery(rg);
+
+ osd_lock.Unlock();
+
+ delete op;
+}
+
+
+// clean
+
+void OSD::rg_clean(RG *rg, int maxops)
+{
+ int ops = rg->num_active_ops();
+
+ dout(7) << "rg_clean rg " << hex << rg->get_rgid() << dec << " " << ops << "/" << maxops << " active ops" << endl;
+
+ while (ops < maxops) {
+ object_t oid;
+ version_t v;
+ int peer;
+ if (!rg->clean_plan.get_next(oid, v, peer)) break;
+
+ RGPeer *p = rg->get_peer(peer);
+ assert(p);
+ remove_replica(oid, v, p);
+ ops++;
+ }
+}
+
+void OSD::remove_replica(object_t oid, version_t v, RGPeer *p)
+{
+ dout(7) << "remove_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
+
+ p->remove(oid, v);
+
+ // send op
+ __uint64_t tid = ++last_tid;
+ MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
+ oid, p->rg->get_rgid(),
+ osdmap->get_version(),
+ OSD_OP_REP_REMOVE);
+ op->set_version(v);
+ op->set_rg_role(-1); // whatever, not 0
+ messenger->send_message(op, MSG_ADDR_OSD(p->get_peer()));
+
+ // register
+ remove_ops[tid] = p;
+}
+
+void OSD::op_rep_remove(MOSDOp *op)
+{
+ dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+ lock_object(op->get_oid());
+
+ // sanity checks
+ assert(store->exists(op->get_oid()));
+
+ version_t v = 0;
+ store->getattr(op->get_oid(), "version", &v, sizeof(v));
+ assert(v == op->get_version());
+
+ // remove
+ int r = store->remove(op->get_oid());
+ assert(r == 0);
+
+ // reply
+ messenger->send_message(new MOSDOpReply(op, r, osdmap),
+ op->get_asker());
+
+ unlock_object(op->get_oid());
+ delete op;
+}
+
+void OSD::op_rep_remove_reply(MOSDOpReply *op)
+{
+ dout(7) << "op_rep_remove_reply " << hex << op->get_oid() << dec << endl;
+
+ osd_lock.Lock();
+ RGPeer *p = remove_ops[op->get_tid()];
+ RG *rg = p->rg;
+ assert(p); // FIXME: how will this work?
+ assert(p->is_removing(op->get_oid()));
+ assert(p->removing_version(op->get_oid()) == op->get_version());
+
+ // close out push op.
+ remove_ops.erase(op->get_tid());
+ rg->removed(op->get_oid(), op->get_version(), p);
+
+ // more?
+ do_recovery(rg);
+
+ osd_lock.Unlock();
+
+ delete op;
+}
if (op->get_map_version() < osdmap->get_version()) {
// op's is old
dout(7) << "op map " << op->get_map_version() << " < " << osdmap->get_version() << endl;
+
+ if (op->get_rg_role() != 0) {
+ dout(7) << " dropping rep op with old map" << endl;
+ delete op;
+ return;
+ }
}
// did this op go to the right OSD?
if (op->get_rg_role() == 0) {
- repgroup_t rg = op->get_rg();
- int acting_primary = osdmap->get_rg_acting_primary( rg );
+ repgroup_t rgid = op->get_rg();
+ int acting_primary = osdmap->get_rg_acting_primary( rgid );
if (acting_primary != whoami) {
dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
logger->inc("fwd");
return;
}
+
+ // proxy?
+ RG *rg = open_rg(rgid);
+ if (!rg) {
+ // fail now?
+ dout(7) << "hit non-existent rg " << hex << op->get_rg() << dec << ", creating willy nilly for now" << endl;
+ rg = new_rg(rgid); // for now.. FIXME
+ }
+ else {
+ if (!rg->is_complete()) {
+ // consult RG object map
+ RGPeer *rgp = rg->get_proxy_peer(op->get_oid());
+ version_t v = rg->get_proxy_version(op->get_oid());
+
+ if (op->get_op() == OSD_OP_WRITE && v == 0) {
+ // totally new object.
+ }
+ else if (rgp) {
+ // need to pull
+ dout(7) << "need to pull object " << hex << op->get_oid() << dec << endl;
+ RGPeer *rgp = rg->get_proxy_peer(op->get_oid());
+ if (!rgp->is_pulling(op->get_oid())) {
+ pull_replica(op->get_oid(), v, rgp);
+ }
+ waiting_for_object[op->get_oid()].push_back(op);
+ return;
+ }
+ }
+
+ }
}
// queue op
}
void OSD::queue_op(MOSDOp *op) {
+ // inc pending count
+ osd_lock.Lock();
+ pending_ops++;
+ osd_lock.Unlock();
+
threadpool->put_op(op);
}
// do the op
switch (op->get_op()) {
+ case OSD_OP_MKFS:
+ op_mkfs(op);
+ break;
+
case OSD_OP_READ:
op_read(op);
break;
-
case OSD_OP_WRITE:
op_write(op);
break;
-
- case OSD_OP_MKFS:
- op_mkfs(op);
- break;
-
case OSD_OP_DELETE:
op_delete(op);
break;
-
case OSD_OP_TRUNCATE:
op_truncate(op);
break;
-
case OSD_OP_STAT:
op_stat(op);
break;
+
+ // replication/recovery
+ case OSD_OP_REP_PULL:
+ op_rep_pull(op);
+ break;
+ case OSD_OP_REP_PUSH:
+ op_rep_push(op);
+ break;
+ case OSD_OP_REP_REMOVE:
+ op_rep_remove(op);
+ break;
default:
assert(0);
}
+
+ // finish
+ osd_lock.Lock();
+ assert(pending_ops > 0);
+ pending_ops--;
+ if (pending_ops == 0 && waiting_for_no_ops)
+ no_pending_ops.Signal();
+ osd_lock.Unlock();
}
+void OSD::wait_for_no_ops()
+{
+ osd_lock.Lock();
+ if (pending_ops > 0) {
+ dout(7) << "wait_for_no_ops - waiting for " << pending_ops << endl;
+ waiting_for_no_ops = true;
+ no_pending_ops.Wait(osd_lock);
+ waiting_for_no_ops = false;
+ assert(pending_ops == 0);
+ }
+ dout(7) << "wait_for_no_ops - none" << endl;
+ osd_lock.Unlock();
+}
void OSD::op_read(MOSDOp *r)
{
reply->set_length(0);
}
- dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
+ dout(12) << "read got " << got << " / " << r->get_length() << " bytes from obj " << hex << r->get_oid() << dec << endl;
logger->inc("rd");
if (got >= 0) logger->inc("rdb", got);
write_sync); // write synchronously
off += (*it).length();
if (r < 0) {
- dout(1) << "write error on " << op->get_oid() << " len " << (*it).length() << " off " << off << " r = " << r << endl;
+ dout(1) << "write error on " << hex << op->get_oid() << dec << " len " << (*it).length() << " off " << off << " r = " << r << endl;
assert(r >= 0);
}
}
// update object metadata
- if (!existed) {
+ osd_lock.Lock();
+ version_t v = 1;
+ if (op->get_rg_role() == -1) {
+ v = op->get_version();
+ store->setattr(op->get_oid(), "version", &v, sizeof(v));
+ } else if (existed) {
+ // get + inc version
+ store->getattr(op->get_oid(), "version", &v, sizeof(v));
+ v++;
+ } else {
// add to RG collection
- osd_lock.Lock();
RG *r = open_rg(op->get_rg());
r->add_object(store, op->get_oid());
- osd_lock.Unlock();
}
+ store->setattr(op->get_oid(), "version", &v, sizeof(v));
+ osd_lock.Unlock();
logger->inc("wr");
logger->inc("wrb", op->get_length());
void OSD::op_delete(MOSDOp *op)
{
int r = store->remove(op->get_oid());
- dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
+ dout(12) << "delete on " << hex << op->get_oid() << dec << " r = " << r << endl;
// "ack"
messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
void OSD::op_truncate(MOSDOp *op)
{
int r = store->truncate(op->get_oid(), op->get_offset());
- dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
+ dout(3) << "truncate on " << hex << op->get_oid() << dec << " at " << op->get_offset() << " r = " << r << endl;
// "ack"
messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
memset(&st, sizeof(st), 0);
int r = store->stat(op->get_oid(), &st);
- dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
+ dout(3) << "stat on " << hex << op->get_oid() << dec << " r = " << r << " size = " << st.st_size << endl;
MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap);
reply->set_object_size(st.st_size);
#include "ObjectStore.h"
+#include "RG.h"
+
#include <map>
using namespace std;
#include <ext/hash_map>
+#include <ext/hash_set>
using namespace __gnu_cxx;
class Messenger;
class Message;
-typedef __uint64_t version_t;
-
-
-struct RGReplicaInfo {
- int state;
- map<object_t,version_t> objects; // remote object list
- map<object_t,version_t> deleted; // remote delete list
-
- void _encode(bufferlist& blist) {
- blist.append((char*)&state, sizeof(state));
- ::_encode(objects, blist);
- ::_encode(deleted, blist);
- }
- void _decode(bufferlist& blist, int& off) {
- blist.copy(off, sizeof(state), (char*)&state);
- off += sizeof(state);
- ::_decode(objects, blist, off);
- ::_decode(deleted, blist, off);
- }
-
- RGReplicaInfo() : state(0) { }
-};
-
-
-/** RGPeer
- * state associated with non-primary OSDS with RG content.
- * only used by primary.
- */
-
-// by primary
-#define RG_PEER_STATE_ACTIVE 1 // peer has acked our request, sent back RG state.
-#define RG_PEER_STATE_COMPLETE 2 // peer has everything replicated
-
-class RGPeer {
- private:
- int peer;
- int role; // 0 primary, 1+ replica, -1 residual
- int state;
-
- // peer state
- public:
- RGReplicaInfo peer_state;
-
- protected:
- // active|residual: used by primary for syncing (old) replicas
- map<object_t,version_t> fetching; // objects i'm reading from replica
- map<object_t,version_t> stray; // objects that need to be deleted
-
- // active: used by primary for normal replication stuff
- map<object_t,version_t> writing; // objects i've written to replica
- map<object_t,version_t> flushing; // objects i've written to remote buffer cache only
-
- public:
- RGPeer(int p, int r) : peer(p), role(r), state(0) { }
-
- int get_role() { return role; }
- int get_peer() { return peer; }
- bool state_test(int m) { return state & m != 0; }
- void state_set(int m) { state |= m; }
- void state_clear(int m) { state &= ~m; }
-
- bool is_active() { return state_test(RG_PEER_STATE_ACTIVE); }
- bool is_complete() { return state_test(RG_PEER_STATE_COMPLETE); }
-
- bool is_residual() { return role < 0; }
- bool is_empty() { return is_active() && peer_state.objects.empty(); } // *** && peer_state & COMPLETE
-};
-
-
-
-
-
-
-/** RG - Replica Group
- *
- */
-// bits used on any
-#define RG_STATE_COMPLETE 1 // i have full RG contents locally.
-#define RG_STATE_PEERED 2 // i have contacted prior primary and all
- // replica osds and/or fetched their
- // content lists, and thus know what's up.
- // or, i have check in w/ new primary (on replica)
-
-// on primary or old-primary only
-#define RG_STATE_CLEAN 4 // i am fully replicated
-
-class RG {
- protected:
- repgroup_t rgid;
- int role; // 0 = primary, 1 = secondary, etc. -1=undef/none.
- int state; // see bit defns above
-
- int primary; // replica: who the primary is (if not me)
- set<int> old_replica_set; // old primary: where replicas used to be
-
- map<int, RGPeer*> peers; // primary: (soft state) active peers
-
- // for unstable states,
- map<object_t, version_t> deleted_objects; // locally deleted objects
-
- public:
- RG(repgroup_t r) : rgid(r),
- role(0),
- state(0),
- primary(-1) { }
-
- repgroup_t get_rgid() { return rgid; }
- int get_role() { return role; }
- int get_primary() { return primary; }
-
- void set_role(int r) { role = r; }
- void set_primary(int p) { primary = p; }
-
- map<int, RGPeer*>& get_peers() { return peers; }
- RGPeer* get_peer(int p) {
- if (peers.count(p)) return peers[p];
- return 0;
- }
- RGPeer* new_peer(int p, int r) {
- return peers[p] = new RGPeer(p, r);
- }
- void remove_peer(int p) {
- assert(peers.count(p));
- delete peers[p];
- peers.erase(p);
- }
-
- set<int>& get_old_replica_set() { return old_replica_set; }
- map<object_t, version_t>& get_deleted_objects() { return deleted_objects; }
-
-
- int get_state() { return state; }
- bool state_test(int m) { return (state & m) != 0; }
- void set_state(int s) { state = s; }
- void state_set(int m) { state |= m; }
- void state_clear(int m) { state &= ~m; }
-
- void store(ObjectStore *store) {
- if (!store->collection_exists(rgid))
- store->collection_create(rgid);
- store->collection_setattr(rgid, "role", &role, sizeof(role));
- store->collection_setattr(rgid, "primary", &primary, sizeof(primary));
- store->collection_setattr(rgid, "state", &state, sizeof(state));
- }
- void fetch(ObjectStore *store) {
- store->collection_getattr(rgid, "role", &role, sizeof(role));
- store->collection_getattr(rgid, "primary", &primary, sizeof(primary));
- store->collection_getattr(rgid, "state", &state, sizeof(state));
- }
-
- void add_object(ObjectStore *store, object_t oid) {
- store->collection_add(rgid, oid);
- }
- void remove_object(ObjectStore *store, object_t oid) {
- store->collection_remove(rgid, oid);
- }
- void list_objects(ObjectStore *store, list<object_t>& ls) {
- store->collection_list(rgid, ls);
- }
-};
-
-
-/** Onode
- * per-object OSD metadata
- */
-class Onode {
- object_t oid;
- version_t version;
-
- map<int, version_t> stray_replicas; // osds w/ stray replicas.
-
- public:
- Onode(object_t o) : oid(o), version(0) { }
-
- void store(ObjectStore *store) {
-
- }
- void fetch(ObjectStore *store) {
-
- }
-
-};
class OSD : public Dispatcher {
class HostMonitor *monitor;
class Logger *logger;
+ int max_recovery_ops;
+
// global lock
- Mutex osd_lock;
+ Mutex osd_lock;
+
+ // per-object locking (serializing)
+ hash_set<object_t> object_lock;
+ hash_map<object_t, list<Cond*> > object_lock_waiters;
+ void lock_object(object_t oid);
+ void unlock_object(object_t oid);
+
+ // finished waiting messages, that will go at tail of dispatch()
+ list<class Message*> finished;
+ void take_waiters(list<class Message*>& ls) {
+ finished.splice(finished.end(), ls);
+ }
+
+ // -- objects --
+ int read_onode(onode_t& onode);
+ int write_onode(onode_t& onode);
// -- ops --
class ThreadPool<class OSD, class MOSDOp> *threadpool;
+ int pending_ops;
+ bool waiting_for_no_ops;
+ Cond no_pending_ops;
void queue_op(class MOSDOp *m);
+ void wait_for_no_ops();
+
public:
void do_op(class MOSDOp *m);
static void doop(OSD *o, MOSDOp *op) {
// <old replica hack>
- __uint64_t last_tid;
Mutex replica_write_lock;
map<MOSDOp*, Cond*> replica_write_cond;
map<MOSDOp*, set<__uint64_t> > replica_write_tids;
// -- replication --
hash_map<repgroup_t, RG*> rg_map;
+ set<RG*> rg_unstable;
+ __uint64_t last_tid;
+ map<__uint64_t,RGPeer*> pull_ops; // tid -> RGPeer*
+ map<__uint64_t,RGPeer*> push_ops; // tid -> RGPeer*
+ map<__uint64_t,RGPeer*> remove_ops; // tid -> RGPeer*
+
+ hash_map<object_t, list<Message*> > waiting_for_object;
+ hash_map<repgroup_t, list<Message*> > waiting_for_rg;
void get_rg_list(list<repgroup_t>& ls);
bool rg_exists(repgroup_t rg);
- RG *open_rg(repgroup_t rg); // return RG, load state from store (if needed)
+ RG *new_rg(repgroup_t rg); // create new RG
+ RG *open_rg(repgroup_t rg); // return existing RG, load state from store (if needed)
void close_rg(repgroup_t rg); // close in-memory state
void remove_rg(repgroup_t rg); // remove state from store
void peer_notify(int primary, list<repgroup_t>& rg_list);
void peer_start(int replica, map<RG*,int>& rg_map);
+ void do_recovery(RG *rg);
+ void rg_pull(RG *rg, int maxops);
+ void rg_push(RG *rg, int maxops);
+ void rg_clean(RG *rg, int maxops);
+
+ void pull_replica(object_t oid, version_t v, RGPeer *p);
+ void push_replica(object_t oid, version_t v, RGPeer *p);
+ void remove_replica(object_t oid, version_t v, RGPeer *p);
+
void handle_rg_notify(class MOSDRGNotify *m);
void handle_rg_peer(class MOSDRGPeer *m);
void handle_rg_peer_ack(class MOSDRGPeerAck *m);
+ void op_rep_pull(class MOSDOp *op);
+ void op_rep_pull_reply(class MOSDOpReply *op);
+ void op_rep_push(class MOSDOp *op);
+ void op_rep_push_reply(class MOSDOpReply *op);
+ void op_rep_remove(class MOSDOp *op);
+ void op_rep_remove_reply(class MOSDOpReply *op);
+
+
public:
OSD(int id, Messenger *m);
~OSD();
#define OID_ONO_BITS 30 // 1mb * 10^9 = 1 petabyte files
#define OID_INO_BITS (64-30) // 2^34 =~ 16 billion files
+#define RG_NUM_BITS 32
+#define RG_REP_BITS 10
//#define MAX_FILE_SIZE (FILE_OBJECT_SIZE << OID_ONO_BITS) // 1 PB
// hash (ino+ono). nrep needs to be reversible (see repgroup_to_nrep).
static hash<int> H;
- return (H(inode.ino+ono) % g_conf.osd_num_rg) +
- ((inode.layout.num_rep-1) * g_conf.osd_num_rg);
+ return ((repgroup_t)(H(inode.ino+ono) % g_conf.osd_num_rg) & ((1LL<<RG_NUM_BITS)-1LL)) +
+ ((repgroup_t)inode.layout.num_rep << RG_NUM_BITS);
}
/* get nrep from rgid */
int repgroup_to_nrep(repgroup_t rg) {
- return rg / g_conf.osd_num_rg;
+ return rg >> RG_NUM_BITS;
}
/* map (repgroup) to a raw list of osds.
--- /dev/null
+
+#include "RG.h"
+#include "config.h"
+
+#undef dout
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << ".rg" << hex << rgid << dec << " "
+
+
+void RG::mark_peered()
+{
+ dout(10) << "mark_peered" << endl;
+ state_set(RG_STATE_PEERED);
+}
+
+void RG::pulled(object_t oid, version_t v, RGPeer *p)
+{
+ dout(10) << "pulled o " << hex << oid << dec << " v " << v << " from " << p->get_peer() << endl;
+
+ local_objects[oid] = v;
+
+ // update peer state
+ p->pulled(oid);
+
+ objects_loc.erase(oid); // object is now local
+ if (objects_loc.empty()) {
+ assert(!is_complete());
+ mark_complete();
+
+ if (!is_clean()) {
+ plan_push();
+ plan_cleanup();
+ }
+ }
+}
+
+void RG::mark_complete()
+{
+ dout(10) << "mark_complete" << endl;
+ assert(!is_complete());
+
+ // done pulling objects!
+ state_set(RG_STATE_COMPLETE);
+ pull_plan.clear();
+
+ // hose any !complete state
+ objects.clear();
+ objects_loc.clear();
+ deleted_objects.clear();
+}
+
+void RG::pushed(object_t oid, version_t v, RGPeer *p)
+{
+ dout(10) << "pushed o " << hex << oid << dec << " v " << v << " from " << p->get_peer() << endl;
+
+ // update peer state
+ p->pushed(oid);
+
+ // clean now?
+}
+
+void RG::removed(object_t oid, version_t v, RGPeer *p)
+{
+ dout(10) << "removed o " << hex << oid << dec << " v " << v << " from " << p->get_peer() << endl;
+
+ // update peer state
+ p->removed(oid);
+
+ // clean now?
+}
+
+
+
+
+void RG::analyze_peers(ObjectStore *store)
+{
+ dout(10) << "analyze_peers" << endl;
+
+ // compare
+ map<object_t, int> nreps; // not quite accurate. for pull.
+
+ objects = local_objects; // start w/ local object set.
+
+ // newest objects -> objects
+ for (map<int, RGPeer*>::iterator pit = peers.begin();
+ pit != peers.end();
+ pit++) {
+ for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
+ oit != pit->second->peer_state.objects.end();
+ oit++) {
+ // know this object?
+ if (objects.count(oit->first)) {
+ object_t v = objects[oit->first];
+ if (oit->second < v) // older?
+ continue; // useless
+ else if (oit->second == v) // same?
+ nreps[oit->first]++; // not quite accurate bc local_objects isn't included in nrep
+ else { // newer!
+ objects[oit->first] = oit->second;
+ nreps[oit->first] = 0;
+ objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
+ }
+ } else {
+ // newly seen object!
+ objects[oit->first] = oit->second;
+ nreps[oit->first] = 0;
+ objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame.
+ }
+ }
+ }
+
+ // remove deleted objects
+ assim_deleted_objects(deleted_objects); // locally
+ for (map<int, RGPeer*>::iterator pit = peers.begin();
+ pit != peers.end();
+ pit++)
+ assim_deleted_objects(pit->second->peer_state.deleted); // on peers
+
+ // plan pull
+ // order objects by replication level
+ map<int, list<object_t> > byrep;
+ for (map<object_t, int>::iterator oit = objects_loc.begin();
+ oit != objects_loc.end();
+ oit++)
+ byrep[nreps[oit->first]].push_back(oit->first);
+ // make plan
+ pull_plan.clear();
+ for (map<int, list<object_t> >::iterator it = byrep.begin();
+ it != byrep.end();
+ it++) {
+ for (list<object_t>::iterator oit = it->second.begin();
+ oit != it->second.end();
+ oit++) {
+ dout(10) << " rg " << hex << rgid << dec << " o " << *oit << " will proxy+pull" << endl;
+ pull_plan.push_front(*oit, objects[*oit], objects_loc[*oit]);
+ }
+ }
+
+ // just cleanup old local objects
+ // FIXME: do this async?
+ for (map<object_t, version_t>::iterator it = local_objects.begin();
+ it != local_objects.end();
+ it++) {
+ if (objects.count(it->first) && objects[it->first] == it->second) continue; // same!
+
+ dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl;
+ store->remove(it->first);
+ local_objects.erase(it->first);
+ }
+}
+
+
+void RG::plan_push()
+{
+ dout(10) << "plan_push" << endl;
+ assert(is_complete());
+ assert(is_peered());
+
+ // push
+ push_plan.clear();
+ for (map<object_t, version_t>::iterator oit = local_objects.begin();
+ oit != local_objects.end();
+ oit++) {
+ for (map<int, RGPeer*>::iterator pit = peers.begin();
+ pit != peers.end();
+ pit++) {
+ RGPeer *rgp = pit->second;
+ if (rgp->get_role() < 0) continue;
+
+ if (rgp->peer_state.objects.count(oit->first) == 0 ||
+ oit->second < rgp->peer_state.objects[oit->first]) {
+ dout(10) << " remote o " << hex << oit->first << dec << " v " << oit->second << " on osd" << rgp->get_peer() << " old|dne, pushing" << endl;
+ push_plan.push_back(oit->first, oit->second, pit->first);
+ }
+ }
+ }
+}
+
+void RG::plan_cleanup()
+{
+ dout(10) << "plan_cleanup" << endl;
+ assert(is_complete());
+ assert(is_peered());
+
+ // cleanup
+ clean_plan.clear();
+ for (map<int, RGPeer*>::iterator pit = peers.begin();
+ pit != peers.end();
+ pit++) {
+ RGPeer *rgp = pit->second;
+ for (map<object_t, version_t>::iterator oit = pit->second->peer_state.objects.begin();
+ oit != pit->second->peer_state.objects.end();
+ oit++) {
+ if (rgp->get_role() < 0) {
+ dout(10) << " remote o " << hex << oit->first << dec << " v " << oit->second << " on osd" << rgp->get_peer() << " stray, removing" << endl;
+ }
+ else if (local_objects.count(oit->first) == 0) {
+ dout(10) << " remote o " << hex << oit->first << dec << " v " << oit->second << " on osd" << rgp->get_peer() << " deleted, removing" << endl;
+ }
+ else continue;
+ clean_plan.push_back(oit->first, oit->second, pit->first);
+ }
+ }
+
+}
--- /dev/null
+
+#include "include/types.h"
+#include "include/bufferlist.h"
+#include "ObjectStore.h"
+
+struct RGReplicaInfo {
+ int state;
+ map<object_t,version_t> objects; // remote object list
+ map<object_t,version_t> deleted; // remote delete list
+
+ void _encode(bufferlist& blist) {
+ blist.append((char*)&state, sizeof(state));
+ ::_encode(objects, blist);
+ ::_encode(deleted, blist);
+ }
+ void _decode(bufferlist& blist, int& off) {
+ blist.copy(off, sizeof(state), (char*)&state);
+ off += sizeof(state);
+ ::_decode(objects, blist, off);
+ ::_decode(deleted, blist, off);
+ }
+
+ RGReplicaInfo() : state(0) { }
+};
+
+
+/** RGPeer
+ * state associated with non-primary OSDS with RG content.
+ * only used by primary.
+ */
+
+// by primary
+#define RG_PEER_STATE_ACTIVE 1 // peer has acked our request, sent back RG state.
+#define RG_PEER_STATE_COMPLETE 2 // peer has everything replicated
+
+class RGPeer {
+ public:
+ class RG *rg;
+ private:
+ int peer;
+ int role; // 0 primary, 1+ replica, -1 residual
+ int state;
+
+ // peer state
+ public:
+ RGReplicaInfo peer_state;
+
+ protected:
+ // recovery: for pulling content from (old) replicas
+ map<object_t,version_t> pulling;
+ map<object_t,version_t> pushing;
+ map<object_t,version_t> removing;
+
+ // replication: for pushing replicas (new or old)
+ map<object_t,version_t> writing; // objects i've written to replica
+ map<object_t,version_t> flushing; // objects i've written to remote buffer cache only
+
+ public:
+ RGPeer(class RG *rg, int p, int ro) : rg(rg), peer(p), role(ro), state(0) { }
+
+ int get_role() { return role; }
+ int get_peer() { return peer; }
+ bool state_test(int m) { return state & m != 0; }
+ void state_set(int m) { state |= m; }
+ void state_clear(int m) { state &= ~m; }
+
+ bool is_active() { return state_test(RG_PEER_STATE_ACTIVE); }
+ bool is_complete() { return state_test(RG_PEER_STATE_COMPLETE); }
+
+ bool is_residual() { return role < 0; }
+ bool is_empty() { return is_active() && peer_state.objects.empty(); } // *** && peer_state & COMPLETE
+
+ void pull(object_t o, version_t v) { pulling[o] = v; }
+ bool is_pulling(object_t o) { return pulling.count(o); }
+ version_t pulling_version(object_t o) { return pulling[o]; }
+ void pulled(object_t o) { pulling.erase(o); }
+
+ void push(object_t o, version_t v) { pushing[o] = v; }
+ bool is_pushing(object_t o) { return pushing.count(o); }
+ version_t pushing_version(object_t o) { return pushing[o]; }
+ void pushed(object_t o) { pushing.erase(o); }
+
+ void remove(object_t o, version_t v) { removing[o] = v; }
+ bool is_removing(object_t o) { return removing.count(o); }
+ version_t removing_version(object_t o) { return removing[o]; }
+ void removed(object_t o) {
+ removing.erase(o);
+ peer_state.objects.erase(o);
+ }
+
+ int num_active_ops() {
+ return pulling.size() + pushing.size() + removing.size();
+ }
+};
+
+
+
+
+// a task list for moving objects around
+class RGQueue {
+ list<object_t> objects;
+ list<version_t> versions;
+ list<int> peers;
+ public:
+ void push_back(object_t o, version_t v, int p) {
+ objects.push_back(o); versions.push_back(v); peers.push_back(p);
+ }
+ void push_front(object_t o, version_t v, int p) {
+ objects.push_front(o); versions.push_front(v); peers.push_front(p);
+ }
+ bool get_next(object_t& o, version_t& v, int& p) {
+ if (objects.empty()) return false;
+ o = objects.front(); v = versions.front(); p = peers.front();
+ objects.pop_front(); versions.pop_front(); peers.pop_front();
+ return true;
+ }
+ void clear() {
+ objects.clear(); versions.clear(); peers.clear();
+ }
+ bool empty() { return objects.empty(); }
+};
+
+
+
+/** RG - Replica Group
+ *
+ */
+
+// bits used on any
+#define RG_STATE_COMPLETE 1 // i have full RG contents locally.
+#define RG_STATE_PEERED 2 // i have contacted prior primary and all
+ // replica osds and/or fetched their
+ // content lists, and thus know what's up.
+ // or, i have check in w/ new primary (on replica)
+
+// on primary or old-primary only
+#define RG_STATE_CLEAN 4 // i am fully replicated
+
+class RG {
+ protected:
+ int whoami; // osd, purely for debug output, yucka
+
+ repgroup_t rgid;
+ int role; // 0 = primary, 1 = secondary, etc. -1=undef/none.
+ int state; // see bit defns above
+
+ int primary; // replica: who the primary is (if not me)
+ set<int> old_replica_set; // old primary: where replicas used to be
+
+ map<int, RGPeer*> peers; // primary: (soft state) active peers
+
+ public:
+ RGQueue pull_plan;
+ RGQueue push_plan;
+ RGQueue clean_plan;
+
+ list<class Message*> waiting_for_peered; // any op will hang until peered
+ map<object_t, list<class Message*> > waiting_for_object; // ops waiting for specific objects.
+
+ // recovery
+ map<object_t, version_t> local_objects;
+ map<object_t, version_t> objects; // what the current object set is
+ map<object_t, int> objects_loc; // where latest live
+
+ // for unstable states,
+ map<object_t, version_t> deleted_objects; // locally deleted objects
+
+ public:
+ RG(int osd, repgroup_t r) : whoami(osd), rgid(r),
+ role(0),
+ state(0),
+ primary(-1) { }
+
+ repgroup_t get_rgid() { return rgid; }
+ int get_role() { return role; }
+ int get_primary() { return primary; }
+
+ void set_role(int r) { role = r; }
+ void set_primary(int p) { primary = p; }
+
+ bool is_primary() { return role == 0; }
+ bool is_residual() { return role < 0; }
+
+ bool is_pulling() { return !pull_plan.empty(); }
+ void pulled(object_t oid, version_t v, RGPeer *p);
+ bool is_pushing() { return !push_plan.empty(); }
+ void pushed(object_t oid, version_t v, RGPeer *p);
+ bool is_removing() { return !push_plan.empty(); }
+ void removed(object_t oid, version_t v, RGPeer *p);
+
+ RGPeer* get_proxy_peer(object_t o) {
+ if (objects_loc.count(o))
+ return get_peer(objects_loc[o]);
+ return 0;
+ }
+ version_t get_proxy_version(object_t o) { return objects[o]; }
+
+ int get_state() { return state; }
+ bool state_test(int m) { return (state & m) != 0; }
+ void set_state(int s) { state = s; }
+ void state_set(int m) { state |= m; }
+ void state_clear(int m) { state &= ~m; }
+
+ bool is_peered() { return state_test(RG_STATE_PEERED); }
+ void mark_peered();
+ bool is_complete() { return state_test(RG_STATE_COMPLETE); }
+ void mark_complete();
+ bool is_clean() { return state_test(RG_STATE_CLEAN); }
+ void mark_clean();
+
+ int num_active_ops() {
+ int o = 0;
+ for (map<int, RGPeer*>::iterator it = peers.begin();
+ it != peers.end();
+ it++)
+ o += it->second->num_active_ops();
+ return o;
+ }
+
+ map<int, RGPeer*>& get_peers() { return peers; }
+ RGPeer* get_peer(int p) {
+ if (peers.count(p)) return peers[p];
+ return 0;
+ }
+ RGPeer* new_peer(int p, int r) {
+ return peers[p] = new RGPeer(this, p, r);
+ }
+ void remove_peer(int p) {
+ assert(peers.count(p));
+ delete peers[p];
+ peers.erase(p);
+ }
+
+ set<int>& get_old_replica_set() { return old_replica_set; }
+ map<object_t, version_t>& get_deleted_objects() { return deleted_objects; }
+
+
+
+ void store(ObjectStore *store) {
+ if (!store->collection_exists(rgid))
+ store->collection_create(rgid);
+ store->collection_setattr(rgid, "role", &role, sizeof(role));
+ store->collection_setattr(rgid, "primary", &primary, sizeof(primary));
+ store->collection_setattr(rgid, "state", &state, sizeof(state));
+ }
+ void fetch(ObjectStore *store) {
+ store->collection_getattr(rgid, "role", &role, sizeof(role));
+ store->collection_getattr(rgid, "primary", &primary, sizeof(primary));
+ store->collection_getattr(rgid, "state", &state, sizeof(state));
+ }
+
+ void add_object(ObjectStore *store, object_t oid) {
+ store->collection_add(rgid, oid);
+ }
+ void remove_object(ObjectStore *store, object_t oid) {
+ store->collection_remove(rgid, oid);
+ }
+ void list_objects(ObjectStore *store, list<object_t>& ls) {
+ store->collection_list(rgid, ls);
+ }
+
+ void scan_local_objects(ObjectStore *store) {
+ list<object_t> olist;
+ local_objects.clear();
+ list_objects(store,olist);
+ for (list<object_t>::iterator it = olist.begin();
+ it != olist.end();
+ it++) {
+ version_t v = 0;
+ store->getattr(*it,
+ "version",
+ &v, sizeof(v));
+ local_objects[*it] = v;
+ }
+ }
+
+
+ void assim_deleted_objects(map<object_t,version_t>& dl) {
+ for (map<object_t, version_t>::iterator oit = dl.begin();
+ oit != dl.end();
+ oit++) {
+ if (objects.count(oit->first) == 0) continue; // dne
+ if (objects[oit->first] < oit->second) { // deleted.
+ objects.erase(oit->first);
+ objects_loc.erase(oit->first);
+ }
+ }
+ }
+
+ void analyze_peers(ObjectStore *store);
+ void plan_push();
+ void plan_cleanup();
+
+};
+