-- test hashed readdir
+I think the OSD stuff breaks down info a few different areas:
+
+- Lay out OSD in-memory data structures. I've started this based on the stuff I posted to the list.
+
+- The object store needs to be able to store the metadata for objects and replica groups, via the ObjectStore interface. This boils down to collections and extended attributes (on both objects and collections). The beginnings of both are are implemented in FakeStore, but untested and unfinished.
+- OSD::op_write needs to put new objects in a rg collection.
+
+- Map the actual metadata we're keeping to object and collection xattrs. Probably via fetch/store methods on the in-memory data structures.
+
+- When an OSD gets a new version of the map it needs to go through the process of checking it's active replica groups, seeing if its role has changed, etc.
+
+- The migration process itself. Needs to be tunable somehow
+
+- OSD::op_read needs to do the proxying sort of thing, where if it's is primary but !COMPLETE it needs to wait and fetch the object from the (old) replica it currently resides on, or block until old replicas are scanned.
+
+- OSD::op_write needs to do the replication thing. I think replication should be implemented last, it's probably easier to add once the role changing stuff already works.
+
+
+
+
+
- interactive hash/unhash interface
+- test hashed readdir
+- make logstream.flush align itself to stipes
+
- carefully define/document frozen wrt dir_auth vs hashing
-- make logstream.flush align itself to stipes
+
int result = 0;
// release caps right away?
+ dout(10) << "num_rd " << in->num_rd << " num_wr " << in->num_wr << endl;
if (in->num_rd == 0 &&
in->num_wr == 0) {
// synchronously; FIXME this is dumb
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
+ // release caps locally
+ in->file_caps_seq = 0;
+ in->file_caps = 0;
+ in->file_wr_mtime = 0;
+ in->file_wr_size = 0;
+
+ put_inode(in);
+
+ // make the call .. FIXME there's no reason this has to block!
MClientReply *reply = make_request(req, true, mds_auth);
assert(reply);
int result = reply->get_result();
assert(result == 0);
-
- // success?
- if (in->file_caps_seq == reply->get_file_caps_seq()) {
- // yup.
- dout(5) << "successfully released caps" << endl;
- in->file_caps_seq = 0;
- in->file_caps = 0;
- in->file_wr_mtime = 0;
- in->file_wr_size = 0;
- put_inode(in);
- } else {
- dout(5) << "failed to release caps; i had " << in->file_caps_seq << " mds had " << reply->get_file_caps_seq() << endl;
- }
delete reply;
}
client_bcache_maxfrag: 10, // max actual relative # of bheads over opt rel # of bheads
client_trace: 0,
- fuse_direct_io: 1,
+ fuse_direct_io: 0,
// --- mds ---
mds_cache_size: MDS_CACHE_SIZE,
// osd types
-typedef int repgroup_t; // replica group
+typedef __uint64_t repgroup_t; // replica group
typedef __uint64_t object_t; // object id
typedef __uint64_t coll_t; // collection id
// HACK to test hashing stuff
- if (1) {
+ if (0) {
static map<int,int> didhash;
if (elapsed.sec() > 15 && !didhash[whoami]) {
CInode *in = mdcache->get_inode(100000010);
* oid - object id
* op - OSD_OP_DELETE, etc.
*
+ * rg_role - who we want ... 0 == primary, this is what clients/mds will do.
+ * rg_nrep - how many replicas we want ... just for writes currently?
+ *
*/
#define OSD_OP_READ 1
object_t oid;
repgroup_t rg;
int rg_role, rg_nrep;
- __uint64_t ocv;
+ __uint64_t map_version;
int op;
size_t length, offset;
repgroup_t get_rg() { return st.rg; }
int get_rg_role() { return st.rg_role; } // who am i asking for?
int get_rg_nrep() { return st.rg_nrep; }
- __uint64_t get_ocv() { return st.ocv; }
+ __uint64_t get_map_version() { return st.map_version; }
int get_op() { return st.op; }
size_t get_length() { return st.length; }
long get_pcid() { return st.pcid; }
MOSDOp(long tid, msg_addr_t asker,
- object_t oid, repgroup_t rg, __uint64_t ocv, int op) :
+ object_t oid, repgroup_t rg, __uint64_t mapversion, int op) :
Message(MSG_OSD_OP) {
memset(&st, 0, sizeof(st));
this->st.tid = tid;
this->st.oid = oid;
this->st.rg = rg;
this->st.rg_role = 0;
- this->st.ocv = ocv;
+ this->st.map_version = mapversion;
this->st.op = op;
}
MOSDOp() {}
size_t length, offset;
size_t object_size;
- __uint64_t _new_ocv;
+ __uint64_t _new_map_version;
size_t _data_len, _oc_len;
} MOSDOpReply_st;
}
// osdmap
- __uint64_t get_ocv() { return st._new_ocv; }
+ __uint64_t get_map_version() { return st._new_map_version; }
bufferlist& get_osdmap() {
return osdmap;
}
this->st.offset = req->st.offset;
// attach updated cluster spec?
- if (req->get_ocv() < oc->get_version()) {
+ if (oc &&
+ req->get_map_version() < oc->get_version()) {
oc->encode(osdmap);
- st._new_ocv = oc->get_version();
+ st._new_map_version = oc->get_version();
st._oc_len = osdmap.length();
}
}
#define __BERKELEYDB_H
#include <db.h>
+#include <unistd.h>
+
#include <list>
using namespace std;
+
template<typename K, typename D>
class BDBMap {
private:
DB *dbp;
public:
- BDBMap() {
- int r;
- if ((r = db_create(&dbp, NULL, 0)) != 0) {
- cerr << "db_create: " << db_strerror(r) << endl;
- assert(0);
- }
- }
+ BDBMap() : dbp(0) {}
~BDBMap() {
close();
}
+ bool is_open() { return dbp ? true:false; }
+
// open/close
int open(const char *fn) {
- int r = dbp->open(dbp, NULL, fn, NULL, DB_BTREE, DB_CREATE, 0644);
+ //cout << "open " << fn << endl;
+
+ int r;
+ if ((r = db_create(&dbp, NULL, 0)) != 0) {
+ cerr << "db_create: " << db_strerror(r) << endl;
+ assert(0);
+ }
+
+ dbp->set_errfile(dbp, stderr);
+ dbp->set_errpfx(dbp, "bdbmap");
+
+ r = dbp->open(dbp, NULL, fn, NULL, DB_BTREE, DB_CREATE, 0644);
+ if (r != 0) {
+ dbp->err(dbp, r, "%s", fn);
+ }
assert(r == 0);
return 0;
}
void close() {
- dbp->close(dbp,0);
- dbp = 0;
+ if (dbp) {
+ dbp->close(dbp,0);
+ dbp = 0;
+ }
}
void remove(const char *fn) {
- dbp->remove(dbp, fn, 0, 0);
- dbp = 0;
+ if (!dbp) open(fn);
+ if (dbp) {
+ dbp->remove(dbp, fn, 0, 0);
+ dbp = 0;
+ } else {
+ ::unlink(fn);
+ }
}
// accessors
int put(K key,
D data) {
DBT k;
+ memset(&k, 0, sizeof(k));
k.data = &key;
k.size = sizeof(K);
DBT d;
+ memset(&d, 0, sizeof(d));
d.data = &data;
d.size = sizeof(data);
return dbp->put(dbp, NULL, &k, &d, 0);
int get(K key,
D& data) {
DBT k;
+ memset(&k, 0, sizeof(k));
k.data = &key;
k.size = sizeof(key);
DBT d;
+ memset(&d, 0, sizeof(d));
d.data = &data;
d.size = sizeof(data);
int r = dbp->get(dbp, NULL, &k, &d, 0);
int del(K key) {
DBT k;
+ memset(&k, 0, sizeof(k));
k.data = &key;
k.size = sizeof(key);
return dbp->del(dbp, NULL, &k, 0);
int r = dbp->cursor(dbp, NULL, &cursor, 0);
assert(r == 0);
- K key;
- D data;
-
DBT k,d;
- k.data = &key;
- k.size = sizeof(key);
- d.data = &data;
- d.size = sizeof(data);
+ memset(&k, 0, sizeof(k));
+ memset(&d, 0, sizeof(d));
- while (1) {
- int r = cursor->c_get(cursor, &k, &d, DB_NEXT);
- if (r == DB_NOTFOUND) break;
- assert(r == 0);
+ while ((r = cursor->c_get(cursor, &k, &d, DB_NEXT)) == 0) {
+ K key;
+ assert(k.size == sizeof(key));
+ memcpy(&key, k.data, k.size);
ls.push_back(key);
}
+ if (r != DB_NOTFOUND) {
+ dbp->err(dbp, r, "DBcursor->get");
+ assert(r == DB_NOTFOUND);
+ }
+
cursor->c_close(cursor);
return 0;
}
int FakeStore::finalize()
{
dout(5) << "finalize" << endl;
+
+ // close collections db files
+ close_collections();
+
// nothing
return 0;
}
fn = basedir + "/" + s;
// dout(1) << "oname is " << fn << endl;
}
+void FakeStore::get_collfn(coll_t c, string &fn) {
+ char s[100];
+ sprintf(s, "%d/%02llx/%016llx.co", whoami, HASH_FUNC(c), c);
+ fn = basedir + "/" + s;
+}
+
void FakeStore::wipe_dir(string mydir)
dout(1) << "mkfs in " << mydir << endl;
+ close_collections();
+
// make sure my dir exists
r = ::stat(mydir.c_str(), &st);
if (r != 0) {
else
wipe_dir( subdir );
}
-
+
return r;
}
+// ------------------
+// attributes
+
+int FakeStore::setattr(object_t oid, const char *name,
+ void *value, size_t size)
+{
+ string fn;
+ get_oname(oid, fn);
+ return setxattr(fn.c_str(), name, value, size, 0);
+}
+
+
+int FakeStore::getattr(object_t oid, const char *name,
+ void *value, size_t size)
+{
+ string fn;
+ get_oname(oid, fn);
+ return getxattr(fn.c_str(), name, value, size);
+}
+
+int FakeStore::listattr(object_t oid, char *attrs, size_t size)
+{
+ string fn;
+ get_oname(oid, fn);
+ return listxattr(fn.c_str(), attrs, size);
+}
+
// ------------------
// collections
-void FakeStore::get_collfn(coll_t c, string &fn) {
- char s[100];
- sprintf(s, "collection.%02llx", c);
- fn = basedir;
- fn += "/";
- fn += s;
+// helpers
+
+void FakeStore::open_collections()
+{
+ string cfn;
+ get_dir(cfn);
+ cfn += "/collections";
+ collections.open(cfn.c_str());
+ list<coll_t> ls;
+ collections.list_keys(ls);
}
-void FakeStore::open_collection(coll_t c) {
- if (collection_map.count(c) == 0) {
- string fn;
- get_collfn(c,fn);
- collection_map[c] = new BDBMap<coll_t,int>;
- collection_map[c]->open(fn.c_str());
+
+void FakeStore::close_collections()
+{
+ if (collections.is_open())
+ collections.close();
+
+ for (map<coll_t, BDBMap<object_t, int>*>::iterator it = collection_map.begin();
+ it != collection_map.end();
+ it++) {
+ it->second->close();
}
+ collection_map.clear();
}
+
+
+int FakeStore::open_collection(coll_t c) {
+ if (collection_map.count(c))
+ return 0; // already open.
+
+ string fn;
+ get_collfn(c,fn);
+ collection_map[c] = new BDBMap<coll_t,int>;
+ int r = collection_map[c]->open(fn.c_str());
+ if (r != 0)
+ collection_map.erase(c); // failed
+ return r;
+}
+
+// public
+int FakeStore::list_collections(list<coll_t>& ls)
+{
+ if (!collections.is_open()) open_collections();
+
+ ls.clear();
+ collections.list_keys(ls);
+ return 0;
+}
+
+int FakeStore::collection_stat(coll_t c, struct stat *st) {
+ if (!collections.is_open()) open_collections();
+
+ string fn;
+ get_collfn(c,fn);
+ return ::stat(fn.c_str(), st);
+}
+
int FakeStore::collection_create(coll_t c) {
+ if (!collections.is_open()) open_collections();
+
collections.put(c, 1);
open_collection(c);
return 0;
}
+
int FakeStore::collection_destroy(coll_t c) {
+ if (!collections.is_open()) open_collections();
+
collections.del(c);
open_collection(c);
collection_map.erase(c);
return 0;
}
+
int FakeStore::collection_add(coll_t c, object_t o) {
+ if (!collections.is_open()) open_collections();
+
open_collection(c);
collection_map[c]->put(o,1);
return 0;
}
int FakeStore::collection_remove(coll_t c, object_t o) {
+ if (!collections.is_open()) open_collections();
+
open_collection(c);
collection_map[c]->del(o);
return 0;
}
int FakeStore::collection_list(coll_t c, list<object_t>& o) {
+ if (!collections.is_open()) open_collections();
+
open_collection(c);
collection_map[c]->list_keys(o);
return 0;
}
-
-
-// ------------------
-// attributes
-
-int FakeStore::setattr(object_t oid, const char *name,
- void *value, size_t size)
+int FakeStore::collection_setattr(coll_t cid, const char *name,
+ void *value, size_t size)
{
+ if (!collections.is_open()) open_collections();
+
string fn;
- get_oname(oid, fn);
+ get_collfn(cid,fn);
return setxattr(fn.c_str(), name, value, size, 0);
}
-int FakeStore::getattr(object_t oid, const char *name,
+int FakeStore::collection_getattr(coll_t cid, const char *name,
void *value, size_t size)
{
+ if (!collections.is_open()) open_collections();
+
string fn;
- get_oname(oid, fn);
+ get_collfn(cid,fn);
return getxattr(fn.c_str(), name, value, size);
}
-int FakeStore::listattr(object_t oid, char *attrs, size_t size)
+int FakeStore::collection_listattr(coll_t cid, char *attrs, size_t size)
{
+ if (!collections.is_open()) open_collections();
+
string fn;
- get_oname(oid, fn);
+ get_collfn(cid, fn);
return listxattr(fn.c_str(), attrs, size);
}
+
char *buffer,
bool fsync);
+ int setattr(object_t oid, const char *name,
+ void *value, size_t size);
+ int getattr(object_t oid, const char *name,
+ void *value, size_t size);
+ int listattr(object_t oid, char *attrs, size_t size);
+
// -------------------
// collections
map<coll_t, BDBMap<object_t, int>*> collection_map;
void get_collfn(coll_t c, string &fn);
- void open_collection(coll_t c);
+ int open_collection(coll_t c);
+
+ void open_collections();
+ void close_collections();
public:
+ int list_collections(list<coll_t>& ls);
+ int collection_stat(coll_t c, struct stat *st);
int collection_create(coll_t c);
int collection_destroy(coll_t c);
int collection_add(coll_t c, object_t o);
int collection_remove(coll_t c, object_t o);
int collection_list(coll_t c, list<object_t>& o);
-
- // -------------------
- // attributes
+ int collection_setattr(coll_t c, const char *name,
+ void *value, size_t size);
+ int collection_getattr(coll_t c, const char *name,
+ void *value, size_t size);
+ int collection_listattr(coll_t c, char *attrs, size_t size);
- int setattr(object_t oid, const char *name,
- void *value, size_t size);
- int getattr(object_t oid, const char *name,
- void *value, size_t size);
- int listattr(object_t oid, char *attrs, size_t size);
};
+// ------------------------------------
+// replica groups
+
+void OSD::get_rg_list(list<repgroup_t>& ls)
+{
+ // just list collections; assume they're all rg's (for now)
+ store->list_collections(ls);
+}
+
+
+bool OSD::rg_exists(repgroup_t rg)
+{
+ struct stat st;
+ if (store->collection_stat(rg, &st) == 0)
+ return true;
+ else
+ return false;
+}
+
+
+RG *OSD::open_rg(repgroup_t rg)
+{
+ // already open?
+ if (rg_map.count(rg))
+ return rg_map[rg];
+
+ // stat collection
+ RG *r = new RG(rg);
+ if (rg_exists(rg)) {
+ // exists
+ r->fetch(store);
+ } else {
+ // dne
+ r->store(store);
+ }
+ rg_map[rg] = r;
+
+ return r;
+}
+
+
+
+
+
+// --------------------------------------
// dispatch
void OSD::dispatch(Message *m)
break;
- // for replication..
+ // for replication etc.
case MSG_OSD_OPREPLY:
monitor->host_is_alive(m->get_source());
handle_op_reply((MOSDOpReply*)m);
}
+
+void OSD::update_map(bufferlist& state)
+{
+ // decode new map
+ if (!osdmap) osdmap = new OSDMap();
+ osdmap->decode(state);
+ dout(7) << "update_map version " << osdmap->get_version() << endl;
+
+ // scan replica groups
+ list<repgroup_t> ls;
+ get_rg_list(ls);
+
+ map< int, list<RG*> > primary_ping_queue;
+
+ for (list<repgroup_t>::iterator it = ls.begin();
+ it != ls.end();
+ it++) {
+ repgroup_t rgid = *it;
+ RG *rg = open_rg(rgid);
+ assert(rg);
+
+ // get active rush mapping
+ int acting[NUM_RUSH_REPLICAS];
+ int nrep = osdmap->repgroup_to_acting_osds(rgid, acting, NUM_RUSH_REPLICAS);
+ int primary = acting[0];
+ int role = -1;
+ for (int i=0; i<nrep; i++)
+ if (acting[i] == whoami) role = i;
+
+
+ if (role != rg->get_role()) {
+ // role change.
+ dout(7) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl;
+
+ // am i old-primary?
+ if (rg->get_role() == 0) {
+ // note potential replica set, and drop old peering sessions.
+ for (map<int, RGPeer*>::iterator it = rg->get_peers().begin();
+ it != rg->get_peers().end();
+ it++) {
+ dout(7) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
+ rg->get_old_replica_set().insert(it->first);
+ delete it->second;
+ }
+ rg->get_peers().clear();
+ }
+
+ // we need to re-peer
+ rg->state_clear(RG_STATE_PEERED);
+ rg->set_role(role);
+ rg->store(store);
+ primary_ping_queue[primary].push_back(rg);
+
+ } else {
+ // no role change.
+
+ if (role > 0) {
+ // i am replica.
+
+ // did primary change?
+ if (primary != rg->get_primary()) {
+ dout(7) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
+
+ // re-peer
+ rg->state_clear(RG_STATE_PEERED);
+ rg->set_primary(primary);
+ rg->store(store);
+ primary_ping_queue[primary].push_back(rg);
+ }
+ }
+ else if (role == 0) {
+ // i am primary.
+
+ // check replicas
+ for (int r=1; r<nrep; r++) {
+ if (rg->get_peer(r) == 0) {
+ dout(7) << " rg " << rgid << " primary not peered with replica " << r << " osd" << acting[r] << endl;
+
+ // ***
+ }
+ }
+
+ }
+ }
+ }
+
+
+ // initiate any new peering sessions!
+ for (map< int, list<RG*> >::iterator pit = primary_ping_queue.begin();
+ pit != primary_ping_queue.end();
+ pit++) {
+ // create peer message
+ int primary = pit->first;
+
+
+ for (list<RG*>::iterator rit = pit->second.begin();
+ rit != pit->second.end();
+ rit++) {
+ // add this RG to peer message
+ }
+
+ // send
+
+ }
+
+}
+
+
void OSD::handle_getmap_ack(MOSDGetMapAck *m)
{
// SAB
osd_lock.Lock();
- if (!osdmap) osdmap = new OSDMap();
- osdmap->decode(m->get_osdmap());
- dout(7) << "got OSDMap version " << osdmap->get_version() << endl;
+ update_map(m->get_osdmap());
delete m;
// process waiters
list<MOSDOp*> waiting;
waiting.splice(waiting.begin(), waiting_for_osdmap);
- for (list<MOSDOp*>::iterator it = waiting.begin();
- it != waiting.end();
+ list<MOSDOp*> w = waiting;
+
+ osd_lock.Unlock();
+
+ for (list<MOSDOp*>::iterator it = w.begin();
+ it != w.end();
it++) {
handle_op(*it);
}
-
- // SAB
- osd_lock.Unlock();
}
void OSD::handle_op(MOSDOp *op)
{
- // starting up?
+ // mkfs is special
+ if (op->get_op() == OSD_OP_MKFS) {
+ op_mkfs(op);
+ return;
+ }
+ // no map? starting up?
if (!osdmap) {
- // SAB
osd_lock.Lock();
-
dout(7) << "no OSDMap, starting up" << endl;
if (waiting_for_osdmap.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
waiting_for_osdmap.push_back(op);
-
- // SAB
osd_lock.Unlock();
-
return;
}
-
- // check cluster version
- if (op->get_ocv() > osdmap->get_version()) {
+ // is our map version up to date?
+ if (op->get_map_version() > osdmap->get_version()) {
// op's is newer
- dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl;
+ dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
// query MDS
dout(7) << "querying MDS" << endl;
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+
assert(0);
- // SAB
osd_lock.Lock();
-
waiting_for_osdmap.push_back(op);
-
- // SAB
osd_lock.Unlock();
-
return;
}
- if (op->get_ocv() < osdmap->get_version()) {
+ // does user have old map?
+ if (op->get_map_version() < osdmap->get_version()) {
// op's is old
- dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl;
+ dout(7) << "op map " << op->get_map_version() << " < " << osdmap->get_version() << endl;
}
-
- // am i the right rg_role?
- if (0) {
+ // did this op go to the right OSD?
+ if (op->get_rg_role() == 0) {
repgroup_t rg = op->get_rg();
- if (op->get_rg_role() == 0) {
- // PRIMARY
+ int acting_primary = osdmap->get_rg_acting_primary( rg );
- // verify that we are primary, or acting primary
- int acting_primary = osdmap->get_rg_acting_primary( op->get_rg() );
- if (acting_primary != whoami) {
- dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
- messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
- logger->inc("fwd");
- return;
- }
- } else {
- // REPLICA
- int my_role = osdmap->get_rg_role(rg, whoami);
-
- dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
-
- if (my_role != op->get_rg_role()) {
- assert(0);
- }
- }
+ if (acting_primary != whoami) {
+ dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+ messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+ logger->inc("fwd");
+ return;
+ }
}
+ // queue op
queue_op(op);
- // do_op(op);
}
void OSD::queue_op(MOSDOp *op) {
bool write_sync = op->get_rg_role() == 0; // primary writes synchronously, replicas don't.
+ // new object?
+ bool existed = store->exists(op->get_oid());
+
// take buffers from the message
bufferlist bl;
bl.claim( op->get_data() );
}
}
- // trucnate after?
- /*
- if (m->get_flags() & OSD_OP_FLAG_TRUNCATE) {
- size_t at = m->get_offset() + m->get_length();
- int r = store->truncate(m->get_oid(), at);
- dout(7) << "truncating object after tail of write at " << at << ", r = " << r << endl;
+ // update object metadata
+ if (!existed) {
+ // add to RG collection
+ RG *r = open_rg(op->get_rg());
+ r->add_object(store, op->get_oid());
}
- */
logger->inc("wr");
logger->inc("wrb", op->get_length());
#include "common/Mutex.h"
#include "common/ThreadPool.h"
+#include "ObjectStore.h"
+
#include <map>
using namespace std;
+#include <ext/hash_map>
+using namespace __gnu_cxx;
class Messenger;
#define RG_STATE_CLEAN 4 // i am fully replicated
class RG {
- public:
+ protected:
repgroup_t rg;
int role; // 0 = primary, 1 = secondary, etc. -1=undef/none.
- int state;
+ int state; // see bit defns above
+
+ int primary; // replica: who the primary is (if not me)
+ set<int> old_replica_set; // old primary: where replicas used to be
- map<int, RGPeer> peers;
+ map<int, RGPeer*> peers; // primary: (soft state) active peers
// for unstable states,
- map<object_t, version_t> deleted_objects; // locally
+ map<object_t, version_t> deleted_objects; // locally deleted objects
public:
- RG(repgroup_t rg);
+ RG(repgroup_t r) : rg(r),
+ role(0),
+ state(0) { }
repgroup_t get_rg() { return rg; }
int get_role() { return role; }
- int get_state() { return state; }
+ int get_primary() { return primary; }
+
+ void set_role(int r) { role = r; }
+ void set_primary(int p) { primary = p; }
+
+ map<int, RGPeer*>& get_peers() { return peers; }
+ RGPeer* get_peer(int p) {
+ if (peers.count(p)) return peers[p];
+ return 0;
+ }
+ set<int> get_old_replica_set() { return old_replica_set; }
+
+ int get_state() { return state; }
+ bool state_test(int m) { return (state & m) != 0; }
+ void set_state(int s) { state = s; }
+ void state_set(int m) { state |= m; }
+ void state_clear(int m) { state &= ~m; }
- void enumerate_objects(list<object_t>& ls);
+ void store(ObjectStore *store) {
+ if (!store->collection_exists(rg))
+ store->collection_create(rg);
+ store->collection_setattr(rg, "role", &role, sizeof(role));
+ store->collection_setattr(rg, "primary", &primary, sizeof(primary));
+ store->collection_setattr(rg, "state", &state, sizeof(state));
+ }
+ void fetch(ObjectStore *store) {
+ store->collection_getattr(rg, "role", &role, sizeof(role));
+ store->collection_getattr(rg, "primary", &primary, sizeof(primary));
+ store->collection_getattr(rg, "state", &state, sizeof(state));
+ }
+
+ void add_object(ObjectStore *store, object_t oid) {
+ store->collection_add(rg, oid);
+ }
+ void remove_object(ObjectStore *store, object_t oid) {
+ store->collection_remove(rg, oid);
+ }
+ void list_objects(ObjectStore *store, list<object_t>& ls) {
+ store->collection_list(rg, ls);
+ }
};
map<int, version_t> stray_replicas; // osds w/ stray replicas.
public:
+ Onode(object_t o) : oid(o), version(0) { }
+
+ void store(ObjectStore *store) {
+
+ }
+ void fetch(ObjectStore *store) {
+
+ }
};
Mutex osd_lock;
+ void update_map(bufferlist& state);
+
+ // rg's
+ hash_map<repgroup_t, RG*> rg_map;
+
+ void get_rg_list(list<repgroup_t>& ls);
+ bool rg_exists(repgroup_t rg);
+ RG *open_rg(repgroup_t rg); // return RG, load state from store (if needed)
+ void close_rg(repgroup_t rg); // close in-memory state
+ void remove_rg(repgroup_t rg); // remove state from store
+
public:
OSD(int id, Messenger *m);
~OSD();
int init();
int shutdown();
- // OSDMap
- void update_osd_map(__uint64_t ocv, bufferlist& blist);
-
+ // ops
void queue_op(class MOSDOp *m);
void do_op(class MOSDOp *m);
static void doop(OSD *o, MOSDOp *op) {
o->do_op(op);
};
-
// messages
virtual void dispatch(Message *m);
/* map (repgroup) to a list of osds.
- this is where we (will eventually) use RUSH. */
+ this is where we invoke RUSH. */
int repgroup_to_osds(repgroup_t rg,
int *osds, // list of osd addr's
int num_rep) { // num replicas we want
// get rush list
assert(rush);
rush->GetServersByKey( rg, num_rep, osds );
- return 0;
+ return num_rep;
}
+ int repgroup_to_nonfailed_osds(repgroup_t rg,
+ int *osds, // list of osd addr's
+ int num_rep) { // num replicas we want
+ // get rush list
+ assert(rush);
+ int group[NUM_RUSH_REPLICAS];
+ rush->GetServersByKey( rg, NUM_RUSH_REPLICAS, group );
+ int o = 0;
+ for (int i=0; i<NUM_RUSH_REPLICAS && o<num_rep; i++) {
+ if (failed_osds.count(group[i])) continue;
+ osds[o++] = group[i];
+ }
+ return o;
+ }
+
+ int repgroup_to_acting_osds(repgroup_t rg,
+ int *osds, // list of osd addr's
+ int num_rep) { // num replicas we want
+ // get rush list
+ assert(rush);
+ int group[NUM_RUSH_REPLICAS];
+ rush->GetServersByKey( rg, NUM_RUSH_REPLICAS, group );
+ int o = 0;
+ for (int i=0; i<NUM_RUSH_REPLICAS && o<num_rep; i++) {
+ if (failed_osds.count(group[i])) continue;
+ if (down_osds.count(group[i])) continue;
+ osds[o++] = group[i];
+ }
+ return o;
+ }
+
+
/* map (ino, ono) to an object name
(to be used on any osd in the proper replica group) */
/* map rg to the primary osd */
int get_rg_primary(repgroup_t rg) {
- int group[NUM_RUSH_REPLICAS];
- repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
- for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
- if (failed_osds.count(group[i])) continue;
- return group[i];
- }
- assert(0);
- return -1; // we fail!
-
+ int group[1];
+ int nrep = repgroup_to_nonfailed_osds(rg, group, 1);
+ assert(nrep > 0); // we fail!
+ return group[0];
}
/* map rg to the _acting_ primary osd (primary may be down) */
int get_rg_acting_primary(repgroup_t rg) {
- int group[NUM_RUSH_REPLICAS];
- repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
- for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
- if (down_osds.count(group[i])) continue;
- if (failed_osds.count(group[i])) continue;
- return group[i];
- }
- assert(0);
- return -1; // we fail!
+ int group[1];
+ int nrep = repgroup_to_acting_osds(rg, group, 1);
+ assert(nrep > 0); // we fail!
+ return group[0];
}
/* what replica # is a given osd? 0 primary, -1 for none. */
int get_rg_role(repgroup_t rg, int osd) {
int group[NUM_RUSH_REPLICAS];
- repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+ int nrep = repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+ int role = 0;
+ for (int i=0; i<nrep; i++) {
+ if (failed_osds.count(group[i])) continue;
+ if (group[i] == osd) return role;
+ role++;
+ }
+ return -1; // none
+ }
+ int get_rg_acting_role(repgroup_t rg, int osd) {
+ int group[NUM_RUSH_REPLICAS];
+ int nrep = repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
int role = 0;
- for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+ for (int i=0; i<nrep; i++) {
if (failed_osds.count(group[i])) continue;
+ if (down_osds.count(group[i])) continue;
if (group[i] == osd) return role;
role++;
}
#include "include/types.h"
+#include <sys/stat.h>
+
#include <list>
using namespace std;
size_t len, off_t offset,
char *buffer,
bool fsync=true) = 0;
-
+
+ virtual int setattr(object_t oid, const char *name,
+ void *value, size_t size) {return 0;} //= 0;
+ virtual int getattr(object_t oid, const char *name,
+ void *value, size_t size) {return 0;} //= 0;
+ virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+
// collections
+ virtual int list_collections(list<coll_t>& ls) {return 0;}//= 0;
+ virtual bool collection_exists(coll_t c) {
+ struct stat st;
+ return collection_stat(c, &st) == 0;
+ }
+ virtual int collection_stat(coll_t c, struct stat *st) {return 0;}//= 0;
virtual int collection_create(coll_t c) {return 0;}//= 0;
virtual int collection_destroy(coll_t c) {return 0;}//= 0;
virtual int collection_add(coll_t c, object_t o) {return 0;}//= 0;
virtual int collection_remove(coll_t c, object_t o) {return 0;}// = 0;
virtual int collection_list(coll_t c, list<object_t>& o) {return 0;}//= 0;
- // attributes
- virtual int setattr(object_t oid, const char *name,
- void *value, size_t size) {return 0;} //= 0;
- virtual int getattr(object_t oid, const char *name,
- void *value, size_t size) {return 0;} //= 0;
- virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
-
+ virtual int collection_setattr(object_t oid, const char *name,
+ void *value, size_t size) {return 0;} //= 0;
+ virtual int collection_getattr(object_t oid, const char *name,
+ void *value, size_t size) {return 0;} //= 0;
+ virtual int collection_listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
+
};
#endif
Filer::handle_osd_op_reply(MOSDOpReply *m)
{
// updated cluster info?
- if (m->get_ocv() &&
- m->get_ocv() > osdmap->get_version()) {
- dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdmap->get_version() << endl;
+ if (m->get_map_version() &&
+ m->get_map_version() > osdmap->get_version()) {
+ dout(3) << "op reply has newer map " << m->get_map_version() << " > " << osdmap->get_version() << endl;
osdmap->decode( m->get_osdmap() );
}