+!!!
+- make mds shut down with mds_commit_on_shutdown=0 and/or mds_log_flush_on_shutdown=0.
+- test mds scaling w/ makedirs, vs mds_log_on_request
+
+- finish osd replication MOSDOp groundwork
+
big fast todo's:
- client buffer cache
- replication protocol
#include "messages/MGenericMessage.h"
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
-
#include "osd/Filer.h"
#include "common/Cond.h"
switch (m->get_type()) {
// osd
- case MSG_OSD_READREPLY:
- filer->handle_osd_read_reply((MOSDReadReply*)m);
- break;
- case MSG_OSD_WRITEREPLY:
- filer->handle_osd_write_reply((MOSDWriteReply*)m);
- break;
case MSG_OSD_OPREPLY:
filer->handle_osd_op_reply((MOSDOpReply*)m);
break;
assert(reply);
// we got osdcluster!
- int off = 0;
- osdcluster->_unrope(reply->get_osd_cluster_state(), off);
+ osdcluster->decode(reply->get_osd_cluster_state());
dout(1) << "mounted" << endl;
mounted = true;
filename = "log/";
filename += fn;
interval = g_conf.log_interval;
- start = last_logged = g_clock.gettime(); // time 0!
+ start = g_clock.gettimepair(); // time 0!
+ last_logged = 0;
wrote_header = -1;
open = false;
this->type = type;
void Logger::flush(bool force)
{
- double now = g_clock.gettime();
- while (now >= last_logged + interval || force) {
+ timepair_t now = g_clock.gettimepair();
+ double fromstart = timepair_to_double(now - start);
+
+ while (force ||
+ fromstart - last_logged >= interval) {
last_logged += interval;
force = false;
// write line to log
//out << (long)(last_logged - start);
- out << last_logged - start;
+ out << fromstart;
for (vector<string>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
out << "\t" << get(*it);
}
#define __LOGGER_H
#include "include/types.h"
-
+#include "Clock.h"
#include <string>
#include <fstream>
using namespace std;
LogType *type;
- double start;
+ timepair_t start;
double last_logged;
double interval;
int wrote_header;
mds_log_max_trimming: 16,
mds_log_read_inc: 65536,
mds_log_before_reply: true,
- mds_log_flush_on_shutdown: false, //true,
+ mds_log_flush_on_shutdown: true,
mds_bal_replicate_threshold: 500,
mds_bal_unreplicate_threshold: 200,
#include "buffer.h"
#include <list>
+#include <set>
using namespace std;
#include <ext/rope>
// just add another buffer
push_back(new buffer(data, len));
}
+ void append(bufferptr& bp) {
+ push_back(bp);
+ }
+ void append(bufferptr& bp, int len, int off) {
+ bufferptr tempbp(bp, len, off);
+ push_back(tempbp);
+ }
/*
}
// funky modifer
- void splice(int off, int len /*, bufferlist *replace */) { // fixme?
+ void splice(int off, int len, bufferlist *claim_by=0 /*, bufferlist& replace_with */) { // fixme?
// skip off
list<bufferptr>::iterator curbuf = _buffers.begin();
while (off > 0) {
if (off) {
// add a reference to the front bit
// insert it before curbuf (which we'll hose)
- //cout << "keeping front " << off << " of " << *curbuf << endl;
+ cout << "keeping front " << off << " of " << *curbuf << endl;
_buffers.insert( curbuf, bufferptr( *curbuf, off, 0 ) );
}
while (len > 0) {
// partial?
if (off + len < (*curbuf).length()) {
- //cout << "keeping end of " << *curbuf << endl;
+ cout << "keeping end of " << *curbuf << ", losing first " << off+len << endl;
+ if (claim_by)
+ claim_by->append( *curbuf, len, off );
(*curbuf).set_offset( off + len ); // ignore beginning big
- (*curbuf).set_length( len );
- //cout << " now " << *curbuf << endl;
+ (*curbuf).set_length( (*curbuf).length() - len - off );
+ cout << " now " << *curbuf << endl;
break;
}
// hose the whole thing
- //cout << "discarding all of " << *curbuf << endl;
int howmuch = (*curbuf).length() - off;
+ cout << "discarding " << howmuch << " of " << *curbuf << endl;
+ if (claim_by)
+ claim_by->append( *curbuf, howmuch, off );
_buffers.erase( curbuf++ );
len -= howmuch;
off = 0;
+// encoder/decode helpers
+
+// set<int>
+inline void _encode(set<int>& s, bufferlist& bl)
+{
+ int n = s.size();
+ bl.append((char*)&n, sizeof(n));
+ for (set<int>::iterator it = s.begin();
+ it != s.end();
+ it++) {
+ int v = *it;
+ bl.append((char*)&v, sizeof(v));
+ n--;
+ }
+ assert(n==0);
+}
+inline void _decode(set<int>& s, bufferlist& bl, int& off)
+{
+ s.clear();
+ int n;
+ bl.copy(off, sizeof(n), (char*)&n);
+ off += sizeof(n);
+ for (int i=0; i<n; i++) {
+ int v;
+ bl.copy(off, sizeof(v), (char*)&v);
+ off += sizeof(v);
+ s.insert(v);
+ }
+ assert(s.size() == n);
+}
+
+
+
+
#endif
#define MDS_OP_SYMLINK 222
#define MDS_OP_OPEN 301
-#define OSD_OP_READ 304
-#define OSD_OP_WRITE 305
#define MDS_OP_TRUNCATE 306
#define MDS_OP_FSYNC 307
#define MDS_OP_CLOSE 310
return true;
}
+class C_MDC_ShutdownCommit : public Context {
+ MDCache *mdc;
+public:
+ C_MDC_ShutdownCommit(MDCache *mdc) {
+ this->mdc = mdc;
+ }
+ void finish(int r) {
+ mdc->shutdown_commits--;
+ }
+};
void MDCache::shutdown_start()
{
dout(1) << "shutdown_start" << endl;
+ shutdown_commits = 0;
if (g_conf.mds_commit_on_shutdown) {
dout(1) << "shutdown_start committing all dirty dirs" << endl;
CInode *in = it->second;
// commit any dirty dir that's ours
- if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty())
- mds->mdstore->commit_dir(in->dir, NULL);
-
+ if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty()) {
+ mds->mdstore->commit_dir(in->dir, new C_MDC_ShutdownCommit(this));
+ shutdown_commits++;
+ }
}
}
return true;
}
-
+ // commits?
+ if (g_conf.mds_commit_on_shutdown &&
+ shutdown_commits > 0) {
+ dout(7) << "shutdown_commits = " << shutdown_commits << endl;
+ return false;
+ }
+
// flush log?
if (g_conf.mds_log_flush_on_shutdown) {
// (wait for) flush log
public:
// active MDS requests
map<Message*, active_request_t> active_requests;
+
+ int shutdown_commits;
friend class MDBalancer;
{
switch (m->get_type()) {
// OSD ===============
- case MSG_OSD_READREPLY:
- filer->handle_osd_read_reply((MOSDReadReply*)m);
- return;
- case MSG_OSD_WRITEREPLY:
- filer->handle_osd_write_reply((MOSDWriteReply*)m);
- return;
case MSG_OSD_OPREPLY:
- filer->handle_osd_op_reply((MOSDOpReply*)m);
+ filer->handle_osd_op_reply((class MOSDOpReply*)m);
return;
// MDS
class MClientMountAck : public Message {
long pcid;
- crope osd_cluster_state;
+ bufferlist osd_cluster_state;
public:
MClientMountAck() {}
MClientMountAck(MClientMount *mnt, OSDCluster *osdcluster) : Message(MSG_CLIENT_MOUNTACK) {
this->pcid = mnt->get_pcid();
- osdcluster->_rope( osd_cluster_state );
+ osdcluster->encode( osd_cluster_state );
}
- crope& get_osd_cluster_state() { return osd_cluster_state; }
+ bufferlist& get_osd_cluster_state() { return osd_cluster_state; }
void set_pcid(long pcid) { this->pcid = pcid; }
long get_pcid() { return pcid; }
char *get_type_name() { return "CmntA"; }
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(pcid), (char*)&pcid);
+ virtual void decode_payload() {
+ int off;
+ payload.copy(off, sizeof(pcid), (char*)&pcid);
off += sizeof(pcid);
- osd_cluster_state = s.substr(off, s.length()-off);
- off += osd_cluster_state.length();
+ if (off < payload.length())
+ payload.splice( off, payload.length()-off, &osd_cluster_state);
}
- virtual void encode_payload(crope& s) {
- s.append((char*)&pcid, sizeof(pcid));
- s.append(osd_cluster_state);
+ virtual void encode_payload() {
+ payload.append((char*)&pcid, sizeof(pcid));
+ payload.claim_append(osd_cluster_state);
}
};
#define OSD_OP_DELETE 2
#define OSD_OP_ZERORANGE 3
#define OSD_OP_MKFS 10
+#define OSD_OP_READ 20
+#define OSD_OP_WRITE 21
typedef struct {
long tid;
long pcid;
+ msg_addr_t asker;
+
object_t oid;
- int op;
+ repgroup_t rg;
+ __uint64_t ocv;
+ int op;
size_t length, offset;
+
+ size_t _data_len;
} MOSDOp_st;
class MOSDOp : public Message {
MOSDOp_st st;
+ bufferlist data;
friend class MOSDOpReply;
public:
long get_tid() { return st.tid; }
+ msg_addr_t get_asker() { return st.asker; }
+
object_t get_oid() { return st.oid; }
+ repgroup_t get_rg() { return st.rg; }
+ __uint64_t get_ocv() { return st.ocv; }
+
int get_op() { return st.op; }
+ size_t get_length() { return st.length; }
+ size_t get_offset() { return st.offset; }
+
+ void set_data(bufferlist &d) {
+ data.claim(d);
+ st._data_len = data.length();
+ }
+ bufferlist& get_data() {
+ return data;
+ }
+ size_t get_data_len() { return st._data_len; }
// keep a pcid (procedure call id) to match up request+reply
void set_pcid(long pcid) { this->st.pcid = pcid; }
long get_pcid() { return st.pcid; }
- MOSDOp(long tid, object_t oid, int op) :
+ MOSDOp(long tid, msg_addr_t asker,
+ object_t oid, repgroup_t rg, __uint64_t ocv, int op) :
Message(MSG_OSD_OP) {
+ memset(&st, 0, sizeof(st));
this->st.tid = tid;
+ this->st.asker = asker;
+
this->st.oid = oid;
+ this->st.rg = rg;
+ this->st.ocv = ocv;
this->st.op = op;
}
MOSDOp() {}
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(st), (char*)&st);
- off += sizeof(st);
+ void set_length(size_t l) { st.length = l; }
+ void set_offset(size_t o) { st.offset = o; }
+
+
+ // marshalling
+ virtual void decode_payload() {
+ payload.copy(0, sizeof(st), (char*)&st);
+ payload.splice(0, sizeof(st));
+ data.claim(payload);
}
- virtual void encode_payload(crope& s) {
- s.append((char*)&st, sizeof(st));
+ virtual void encode_payload() {
+ payload.push_back( new buffer((char*)&st, sizeof(st)) );
+ payload.claim_append( data );
}
virtual char *get_type_name() { return "oop"; }
#define __MOSDOPREPLY_H
#include "msg/Message.h"
+#include "osd/OSDCluster.h"
#include "MOSDOp.h"
// req
long tid;
long pcid;
+
object_t oid;
- int op;
+ int op;
+
// reply
int result;
- size_t size;
+ size_t length, offset;
+ size_t object_size;
+
+ __uint64_t _new_ocv;
+ size_t _data_len, _oc_len;
} MOSDOpReply_st;
+
class MOSDOpReply : public Message {
MOSDOpReply_st st;
+ bufferlist data;
+ bufferlist osdcluster;
public:
long get_tid() { return st.tid; }
int get_op() { return st.op; }
int get_result() { return st.result; }
- size_t get_size() { return st.size; }
+ size_t get_length() { return st.length; }
+ size_t get_offset() { return st.offset; }
+ size_t get_object_size() { return st.object_size; }
+
+ void set_result(int r) { st.result = r; }
+ void set_length(size_t s) { st.length = s; }
+ void set_offset(size_t o) { st.offset = o; }
+ void set_object_size(size_t s) { st.object_size = s; }
+
+ // data payload
+ void set_data(bufferlist &d) {
+ data.claim(d);
+ st._data_len = data.length();
+ }
+ bufferlist& get_data() {
+ return data;
+ }
- void set_size(size_t s) { st.size = s; }
+ // osdcluster
+ __uint64_t get_ocv() { return st._new_ocv; }
+ bufferlist& get_osdcluster() {
+ return osdcluster;
+ }
// keep a pcid (procedure call id) to match up request+reply
void set_pcid(long pcid) { this->st.pcid = pcid; }
long get_pcid() { return st.pcid; }
- MOSDOpReply(MOSDOp *req, int result) :
+ MOSDOpReply(MOSDOp *req, int result, OSDCluster *oc) :
Message(MSG_OSD_OPREPLY) {
+ memset(&st, 0, sizeof(st));
this->st.pcid = req->st.pcid;
this->st.tid = req->st.tid;
+
this->st.oid = req->st.oid;
this->st.op = req->st.op;
-
this->st.result = result;
+
+ this->st.length = req->st.length; // speculative... OSD should ensure these are correct
+ this->st.offset = req->st.offset;
+
+ // attach updated cluster spec?
+ if (req->get_ocv() < oc->get_version()) {
+ oc->encode(osdcluster);
+ st._new_ocv = oc->get_version();
+ st._oc_len = osdcluster.length();
+ }
}
MOSDOpReply() {}
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(st), (char*)&st);
- off += sizeof(st);
+
+ // marshalling
+ virtual void decode_payload() {
+ payload.copy(0, sizeof(st), (char*)&st);
+ payload.splice(0, sizeof(st));
+ if (st._data_len) payload.splice(0, st._data_len, &data);
+ if (st._oc_len) payload.splice(0, st._oc_len, &osdcluster);
}
- virtual void encode_payload(crope& s) {
- s.append((char*)&st, sizeof(st));
+ virtual void encode_payload() {
+ payload.push_back( new buffer((char*)&st, sizeof(st)) );
+ payload.claim_append( data );
+ payload.claim_append( osdcluster );
}
virtual char *get_type_name() { return "oopr"; }
+++ /dev/null
-#ifndef __MOSDREAD_H
-#define __MOSDREAD_H
-
-#include "msg/Message.h"
-
-/*
- * OSD read request
- *
- * oid - object id
- * offset, len -- guess
- *
- * caveat: if len=0, then the _entire_ object is read. this is currently
- * used by the MDS, and pretty much a dumb idea in general.
- */
-
-typedef struct {
- long tid;
- long pcid;
- size_t len;
- off_t offset;
- object_t oid;
-} MOSDRead_st;
-
-class MOSDRead : public Message {
- MOSDRead_st st;
-
- friend class MOSDReadReply;
-
- public:
- long get_tid() { return st.tid; }
- size_t get_len() { return st.len; }
- off_t get_offset() { return st.offset; }
- object_t get_oid() { return st.oid; }
-
- // keep a pcid (procedure call id) to match up request+reply
- void set_pcid(long pcid) { this->st.pcid = pcid; }
- long get_pcid() { return st.pcid; }
-
- MOSDRead(long tid, object_t oid, size_t len, off_t offset) :
- Message(MSG_OSD_READ) {
- this->st.tid = tid;
- this->st.oid = oid;
- this->st.len = len;
- this->st.offset = offset;
- this->st.pcid = 0;
- }
- MOSDRead() {}
-
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(st), (char*)&st);
- off += sizeof(st);
- }
- virtual void encode_payload(crope& s) {
- s.append((char*)&st, sizeof(st));
- }
-
- virtual char *get_type_name() { return "oread"; }
-};
-
-#endif
+++ /dev/null
-#ifndef __MOSDREADREPLY_H
-#define __MOSDREADREPLY_H
-
-#include "MOSDRead.h"
-
-/*
- * OSD Read Reply
- *
- * oid - object id
- * offset, len - data returned
- *
- * len may not match the read request, if the end of object is hit.
- */
-
-typedef struct {
- long tid;
- long pcid;
- off_t offset;
- object_t oid;
- size_t len;
- long result;
-} MOSDReadReply_st;
-
-class MOSDReadReply : public Message {
- MOSDReadReply_st st;
- bufferlist data;
-
- public:
- size_t get_len() { return st.len; }
- int get_result() { return st.result; }
- object_t get_oid() { return st.oid; }
- off_t get_offset() { return st.offset; }
- long get_tid() { return st.tid; }
-
-
- // keep a pcid (procedure call id) to match up request+reply
- void set_pcid(long pcid) { this->st.pcid = pcid; }
- long get_pcid() { return st.pcid; }
-
- MOSDReadReply() {
- }
- MOSDReadReply(MOSDRead *r, long result) :
- Message(MSG_OSD_READREPLY) {
- this->st.tid = r->st.tid;
- this->st.pcid = r->st.pcid;
- this->st.oid = r->st.oid;
- this->st.offset = r->st.offset;
- this->st.result = result;
- this->st.len = 0;
- }
-
- bufferlist& get_data() {
- return data;
- }
- void set_data(bufferlist &bl) {
- data.claim(bl);
- this->st.len = data.length();
- }
- void set_result(int result) {
- this->st.result = result;
- }
-
- virtual void decode_payload() {
- // warning: only call this once, we modify the payload!
- payload.copy(0, sizeof(st), (char*)&st);
- payload.splice(0, sizeof(st));
- data.claim(payload);
- }
- virtual void encode_payload() {
- payload.push_back( new buffer((char*)&st, sizeof(st)) );
- payload.claim_append(data);
- }
-
- virtual char *get_type_name() { return "oreadr"; }
-};
-
-#endif
+++ /dev/null
-#ifndef __MOSDWRITE_H
-#define __MOSDWRITE_H
-
-#include "msg/Message.h"
-
-/*
- * OSD Write
- *
- * tid - caller's transaction id
- *
- * oid - object id
- * offset, len -
- *
- * flags - passed to open(). not used at all.. this should be removed?
- *
- */
-
-
-typedef struct {
- long tid;
- long pcid;
- off_t offset;
- object_t oid;
- //int flags;
- size_t len;
-} MOSDWrite_st;
-
-class MOSDWrite : public Message {
- MOSDWrite_st st;
- bufferlist data;
-
- friend class MOSDWriteReply;
-
- public:
- long get_tid() { return st.tid; }
- off_t get_offset() { return st.offset; }
- object_t get_oid() { return st.oid; }
- //int get_flags() { return st.flags; }
- long get_len() { return st.len; }
-
- // keep a pcid (procedure call id) to match up request+reply
- void set_pcid(long pcid) { this->st.pcid = pcid; }
- long get_pcid() { return st.pcid; }
-
- MOSDWrite() {}
- MOSDWrite(long tid, object_t oid, size_t len, off_t offset) :
- Message(MSG_OSD_WRITE) {
- this->st.tid = tid;
- this->st.oid = oid;
- this->st.offset = offset;
- //this->st.flags = flags;
- this->st.len = len;
- this->st.pcid = 0;
- }
-
- void set_data(bufferlist &d) {
- data.claim(d);
- assert(data.length() == st.len);
- }
- bufferlist& get_data() {
- return data;
- }
-
-
- virtual void decode_payload() {
- payload.copy(0, sizeof(st), (char*)&st);
- payload.splice(0, sizeof(st));
- data.claim(payload);
- }
-
- virtual void encode_payload() {
- payload.push_back( new buffer((char*)&st, sizeof(st)) );
- payload.claim_append( data );
- }
-
- virtual char *get_type_name() { return "owr"; }
-};
-
-#endif
+++ /dev/null
-#ifndef __MOSDWRITEREPLY_H
-#define __MOSDWRITEREPLY_H
-
-#include "MOSDWrite.h"
-
-/*
- * OSD WRite Reply
- *
- * tid - caller's transaction #
- * oid - object id
- * offset, len - ...
- * result - result code, matchines write() system call: # of bytes written, or error code.
- */
-
-typedef struct {
- long tid;
- long pcid;
- long result;
- off_t offset;
- object_t oid;
-} MOSDWriteReply_st;
-
-class MOSDWriteReply : public Message {
- MOSDWriteReply_st st;
-
- public:
- long get_tid() { return st.tid; }
- long get_result() { return st.result; }
- off_t get_offset() { return st.offset; }
- object_t get_oid() { return st.oid; }
-
- // keep a pcid (procedure call id) to match up request+reply
- void set_pcid(long pcid) { this->st.pcid = pcid; }
- long get_pcid() { return st.pcid; }
-
- MOSDWriteReply() {}
- MOSDWriteReply(MOSDWrite *r, long wrote) :
- Message(MSG_OSD_WRITEREPLY) {
- this->st.pcid = r->st.pcid;
- this->st.tid = r->st.tid;
- this->st.oid = r->st.oid;
- this->st.offset = r->st.offset;
- this->st.result = wrote;
- }
-
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(st), (char*)&st);
- off += sizeof(st);
- }
- virtual void encode_payload(crope& s) {
- s.append((char*)&st, sizeof(st));
- }
-
- virtual char *get_type_name() { return "owrr"; }
-};
-
-#endif
#include "messages/MFailureAck.h"
#include "messages/MOSDPing.h"
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
case MSG_OSD_PING:
m = new MOSDPing();
break;
- case MSG_OSD_READ:
- m = new MOSDRead();
- break;
-
- case MSG_OSD_READREPLY:
- m = new MOSDReadReply();
- break;
-
- case MSG_OSD_WRITE:
- m = new MOSDWrite();
- break;
-
- case MSG_OSD_WRITEREPLY:
- m = new MOSDWriteReply();
- break;
-
case MSG_OSD_OP:
m = new MOSDOp();
break;
#include "OSD.h"
#include "FakeStore.h"
+#include "OSDCluster.h"
#include "mds/MDS.h"
#include "messages/MPing.h"
#include "messages/MPingAck.h"
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
// osd
-
case MSG_SHUTDOWN:
shutdown();
break;
-
+
case MSG_PING:
// take note.
monitor->host_is_alive(m->get_source());
-
handle_ping((MPing*)m);
break;
-
- case MSG_OSD_READ:
- handle_read((MOSDRead*)m);
- break;
-
- case MSG_OSD_WRITE:
- handle_write((MOSDWrite*)m);
- break;
case MSG_OSD_OP:
+ monitor->host_is_alive(m->get_source());
handle_op((MOSDOp*)m);
break;
void OSD::handle_op(MOSDOp *op)
{
+ // check cluster version
+ if (op->get_ocv() > osdcluster->get_version()) {
+ // op's is newer
+ dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+
+ // query MDS
+ dout(7) << "querying MDS" << endl;
+ //messenger->send_message(new MGetOSDCluster(), MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+ assert(0);
+ waiting_for_osdcluster.push_back(op);
+ return;
+ }
+
+ if (op->get_ocv() < osdcluster->get_version()) {
+ // op's is old
+ dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+
+ // verify that we are primary, or acting primary
+ int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+ if (acting_primary != whoami) {
+ dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+ messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+ return;
+ }
+ }
+
+
+ // do the op
switch (op->get_op()) {
+
+ case OSD_OP_READ:
+ op_read(op);
+ break;
+
+ case OSD_OP_WRITE:
+ op_write(op);
+ break;
+
case OSD_OP_MKFS:
dout(3) << "MKFS" << endl;
{
int r = store->mkfs();
- messenger->send_message(new MOSDOpReply(op, r),
- op->get_source(), op->get_source_port());
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster),
+ op->get_asker());
}
+ delete op;
break;
case OSD_OP_DELETE:
dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
// "ack"
- messenger->send_message(new MOSDOpReply(op, r),
- op->get_source(), op->get_source_port());
+ messenger->send_message(new MOSDOpReply(op, r, osdcluster),
+ op->get_asker());
}
+ delete op;
break;
case OSD_OP_STAT:
dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
- MOSDOpReply *reply = new MOSDOpReply(op, r);
- reply->set_size(st.st_size);
- messenger->send_message(reply,
- op->get_source(), op->get_source_port());
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+ reply->set_object_size(st.st_size);
+ messenger->send_message(reply, op->get_asker());
}
+ delete op;
break;
default:
assert(0);
}
-
- delete op;
}
-void OSD::handle_read(MOSDRead *r)
+void OSD::op_read(MOSDOp *r)
{
// read into a buffer
- bufferptr bptr = new buffer(r->get_len()); // prealloc space for entire read
+ bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read
long got = store->read(r->get_oid(),
- r->get_len(), r->get_offset(),
+ r->get_length(), r->get_offset(),
bptr.c_str());
- MOSDReadReply *reply = new MOSDReadReply(r, 0);
+
+ // set up reply
+ MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster);
if (got >= 0) {
- bptr.set_length(got); // properly size buffer
-
+ bptr.set_length(got); // properly size the buffer
+
// give it to the reply in a bufferlist
bufferlist bl;
bl.push_back( bptr );
+
+ reply->set_result(0);
reply->set_data(bl);
+ reply->set_length(got);
} else {
reply->set_result(got); // error
+ reply->set_length(0);
}
- dout(10) << "read got " << got << " / " << r->get_len() << " bytes from " << r->get_oid() << endl;
+ dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
// send it
- messenger->send_message(reply, r->get_source(), r->get_source_port());
+ messenger->send_message(reply, r->get_asker());
delete r;
}
// -- osd_write
-void OSD::handle_write(MOSDWrite *m)
+void OSD::op_write(MOSDOp *m)
{
// take buffers from the message
bufferlist bl;
// assume success. FIXME.
// reply
- MOSDWriteReply *reply = new MOSDWriteReply(m, 0);
- messenger->send_message(reply, m->get_source(), m->get_source_port());
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster);
+ messenger->send_message(reply, m->get_asker());
delete m;
}
#include "common/Mutex.h"
+#include <map>
+using namespace std;
+
+
class Messenger;
-class MOSDRead;
-class MOSDWrite;
class Message;
-class ObjectStore;
-class HostMonitor;
+
+
+// ways to be dirty
+#define RG_DIRTY_LOCAL_LOG 1
+#define RG_DIRTY_LOCAL_SYNC 2
+#define RG_DIRTY_REPLICA_MEM 4
+#define RG_DIRTY_REPLICA_SYNC 8
+
+
+class ReplicaGroup {
+ public:
+ repgroup_t rg;
+ int role; // 1 = primary, 2 = secondary, etc. 0=undef.
+ int state;
+
+ map<object_t, int> dirty_map; // dirty objects
+
+ ReplicaGroup(repgroup_t rg);
+
+ void enumerate_objects(list<object_t>& ls);
+};
+
class OSD : public Dispatcher {
protected:
Messenger *messenger;
int whoami;
- ObjectStore *store;
- HostMonitor *monitor;
+ class OSDCluster *osdcluster;
+ class ObjectStore *store;
+ class HostMonitor *monitor;
+
+ list<class MOSDOp*> waiting_for_osdcluster;
Mutex osd_lock;
OSD(int id, Messenger *m);
~OSD();
+ // startup/shutdown
int init();
int shutdown();
+ // OSDCluster
+ void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
+
+ // messages
virtual void dispatch(Message *m);
void handle_ping(class MPing *m);
void handle_op(class MOSDOp *m);
- void handle_read(MOSDRead *m);
- void handle_write(MOSDWrite *m);
+ void op_read(class MOSDOp *m);
+ void op_write(class MOSDOp *m);
};
#endif
// serialize/unserialize
-void OSDCluster::_rope(crope& r)
+void OSDCluster::encode(bufferlist& blist)
{
- r.append((char*)&version, sizeof(version));
+ blist.append((char*)&version, sizeof(version));
int ngroups = osd_groups.size();
- r.append((char*)&ngroups, sizeof(ngroups));
+ blist.append((char*)&ngroups, sizeof(ngroups));
for (int i=0; i<ngroups; i++) {
- r.append((char*)&osd_groups[i], sizeof(OSDGroup));
+ blist.append((char*)&osd_groups[i], sizeof(OSDGroup));
}
- // failed
+ _encode(down_osds, blist);
+ _encode(failed_osds, blist);
}
-void OSDCluster::_unrope(crope& r, int& off)
+void OSDCluster::decode(bufferlist& blist)
{
- r.copy(off, sizeof(version), (char*)&version);
+ int off = 0;
+ blist.copy(off, sizeof(version), (char*)&version);
off += sizeof(version);
int ngroups;
- r.copy(off, sizeof(ngroups), (char*)&ngroups);
+ blist.copy(off, sizeof(ngroups), (char*)&ngroups);
off += sizeof(ngroups);
osd_groups = vector<OSDGroup>(ngroups);
for (int i=0; i<ngroups; i++) {
- r.copy(off, sizeof(OSDGroup), (char*)&osd_groups[i]);
+ blist.copy(off, sizeof(OSDGroup), (char*)&osd_groups[i]);
off += sizeof(OSDGroup);
}
- // failed
-
+ _decode(down_osds, blist, off);
+ _decode(failed_osds, blist, off);
init_rush();
}
* for mapping (ino, offset, len) to a (list of) byte extents in objects on osds
*/
struct OSDExtent {
- int osds[MAX_REPLICAS];
+ int osd;
object_t oid;
+ repgroup_t rg;
size_t offset, len;
};
// RUSH disk groups
vector<OSDGroup> osd_groups; // RUSH disk groups
+
+ set<int> down_osds; // list of down disks
set<int> failed_osds; // list of failed disks
Rush *rush; // rush implementation
public:
OSDCluster() : version(0), rush(0) { }
+ __uint64_t get_version() { return version; }
+
// cluster state
bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; }
}
// serialize, unserialize
- void _rope(crope& r);
- void _unrope(crope& r, int& off);
+ //void _rope(crope& r);
+ //void _unrope(crope& r, int& off);
+ void encode(bufferlist& blist);
+ void decode(bufferlist& blist);
+
+
+ /**** ****/
+ int get_rg_primary(repgroup_t rg) {
+ int group[NUM_RUSH_REPLICAS];
+ repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+ for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+ if (failed_osds.count(group[i])) continue;
+ return i;
+ }
+ assert(0);
+ return -1; // we fail!
+ }
+ int get_rg_acting_primary(repgroup_t rg) {
+ int group[NUM_RUSH_REPLICAS];
+ repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+ for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+ if (down_osds.count(group[i])) continue;
+ if (failed_osds.count(group[i])) continue;
+ return i;
+ }
+ assert(0);
+ return -1; // we fail!
+ }
/**** mapping facilities ****/
void file_to_extents(inodeno_t ino,
size_t len,
size_t offset,
- int num_reps,
list<OSDExtent>& extents) {
size_t cur = offset;
size_t left = len;
// find oid, osds
size_t blockno = cur / FILE_OBJECT_SIZE;
ex.oid = file_to_object( ino, blockno );
- repgroup_t rg = file_to_repgroup(ino, blockno );
- repgroup_to_osds( rg, ex.osds, num_reps );
+ ex.rg = file_to_repgroup(ino, blockno );
+ ex.osd = get_rg_acting_primary( ex.rg );
// map range into object
ex.offset = cur % FILE_OBJECT_SIZE;
#include "Filer.h"
#include "OSDCluster.h"
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
+//#include "messages/MOSDRead.h"
+//#include "messages/MOSDReadReply.h"
+//#include "messages/MOSDWrite.h"
+//#include "messages/MOSDWriteReply.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
void Filer::dispatch(Message *m)
{
switch (m->get_type()) {
- case MSG_OSD_READREPLY:
- handle_osd_read_reply((MOSDReadReply*)m);
- break;
-
- case MSG_OSD_WRITEREPLY:
- handle_osd_write_reply((MOSDWriteReply*)m);
- break;
-
case MSG_OSD_OPREPLY:
handle_osd_op_reply((MOSDOpReply*)m);
break;
p->bytes_read = 0;
p->onfinish = onfinish;
- int num_rep = 1; // FIXME
-
// find data
list<OSDExtent> extents;
- osdcluster->file_to_extents(ino, len, offset, num_rep, extents);
+ osdcluster->file_to_extents(ino, len, offset, extents);
- dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
+ dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
int nfrag = 0;
last_tid++;
// issue read
- MOSDRead *m = new MOSDRead(last_tid, it->oid, it->len, it->offset);
+ MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+ it->oid, it->rg, osdcluster->get_version(),
+ OSD_OP_READ);
+ m->set_length(it->len);
+ m->set_offset(it->offset);
dout(15) << " read on " << last_tid << endl;
- messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+ messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
// note offset into read buffer
- p->read_off[it->oid] = off;
+ p->read_off[last_tid] = off;
off += it->len;
// add to gather set
void
-Filer::handle_osd_read_reply(MOSDReadReply *m)
+Filer::handle_osd_read_reply(MOSDOpReply *m)
{
// get pio
tid_t tid = m->get_tid();
op_reads.erase( tid );
// copy result into buffer
- size_t off = p->read_off[m->get_oid()];
- dout(7) << "got frag at " << off << " len " << m->get_len() << endl;
+ size_t off = p->read_off[tid];
+ dout(7) << "got frag at " << off << " len " << m->get_length() << endl;
// our op finished
p->outstanding_ops.erase(tid);
Context *onfinish)
{
last_tid++;
- int num_rep = 1;
// pending write record
PendingOSDOp_t *p = new PendingOSDOp_t;
// find data
list<OSDExtent> extents;
- osdcluster->file_to_extents(ino, len, offset, num_rep, extents);
+ osdcluster->file_to_extents(ino, len, offset, extents);
- dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
+ dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
size_t off = 0; // ptr into buffer
last_tid++;
// issue write
- MOSDWrite *m = new MOSDWrite(last_tid, it->oid, it->len, it->offset);
-
+ MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+ it->oid, it->rg, osdcluster->get_version(),
+ OSD_OP_WRITE);
+ m->set_length(it->len);
+ m->set_offset(it->offset);
+
bufferlist cur;
cur.substr_of(bl, off, it->len);
m->set_data(cur);
// send
dout(15) << " write on " << last_tid << endl;
- messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+ messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
}
return 0;
void
-Filer::handle_osd_write_reply(MOSDWriteReply *m)
+Filer::handle_osd_write_reply(MOSDOpReply *m)
{
// get pio
tid_t tid = m->get_tid();
void
Filer::handle_osd_op_reply(MOSDOpReply *m)
{
+ // updated cluster info?
+ if (m->get_ocv() &&
+ m->get_ocv() > osdcluster->get_version()) {
+ dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdcluster->get_version() << endl;
+ osdcluster->decode( m->get_osdcluster() );
+ }
+
+
+ // read or write?
+ switch (m->get_op()) {
+ case OSD_OP_READ:
+ handle_osd_read_reply(m);
+ return;
+ case OSD_OP_WRITE:
+ handle_osd_write_reply(m);
+ return;
+ }
+
+
// get pio
tid_t tid = m->get_tid();
dout(15) << "handle_osd_op_reply on " << tid << endl;
int Filer::remove(inodeno_t ino, size_t size, Context *onfinish)
{
- int num_rep = 1;
-
// pending write record
PendingOSDOp_t *p = new PendingOSDOp_t;
p->onfinish = onfinish;
// find data
list<OSDExtent> extents;
- osdcluster->file_to_extents(ino, size, 0, num_rep, extents);
+ osdcluster->file_to_extents(ino, size, 0, extents);
- dout(7) << "osd remove ino " << ino << " size " << size << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
+ dout(7) << "osd remove ino " << ino << " size " << size << " in " << extents.size() << " extents" << endl;
size_t off = 0; // ptr into buffer
int r = 0; // pick a replica
last_tid++;
- for (int r=0;r<num_rep; r++) {
- // issue delete
- MOSDOp *m = new MOSDOp(last_tid, it->oid, OSD_OP_DELETE);
- messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+ // issue delete
+ MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+ it->oid, it->rg, osdcluster->get_version(),
+ OSD_OP_DELETE);
+ messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
- // add to gather set
- p->outstanding_ops.insert(last_tid);
- op_removes[last_tid] = p;
- }
+ // add to gather set
+ p->outstanding_ops.insert(last_tid);
+ op_removes[last_tid] = p;
}
}
int Filer::mkfs(Context *onfinish)
{
- int num_rep = 1;
-
dout(7) << "mkfs, wiping all OSDs" << endl;
// pending write record
++last_tid;
// issue mkfs
- MOSDOp *m = new MOSDOp(last_tid, 0, OSD_OP_MKFS);
+ MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+ 0, 0, osdcluster->get_version(),
+ OSD_OP_MKFS);
messenger->send_message(m, MSG_ADDR_OSD(*it), 0);
// add to gather set
// issue zero
MOSDOp *m;
- if (it->len == new MOSDOp(last_tid, it->oid, OSD_OP_DELETE);
+ //if (it->len ==
+ m = new MOSDOp(last_tid, messenger->get_myaddr(),
+ it->oid, it->rg, osdcluster->get_version(),
+ OSD_OP_DELETE);
it->len, it->offset);
- messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+ messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
// add to gather set
p->outstanding_ops.insert(last_tid);
int mkfs(Context *c);
- void handle_osd_read_reply(class MOSDReadReply *m);
- void handle_osd_write_reply(class MOSDWriteReply *m);
+ void handle_osd_read_reply(class MOSDOpReply *m);
+ void handle_osd_write_reply(class MOSDOpReply *m);
void handle_osd_op_reply(class MOSDOpReply *m);
};
int main()
{
- bufferptr p1 = new buffer("hello",6);
+ bufferptr p1 = new buffer("123456",6);
bufferptr p2 = p1;
cout << "it is '" << p1.c_str() << "'" << endl;
- bufferptr p3 = new buffer("there",6);
+ bufferptr p3 = new buffer("abcdef",6);
cout << "p3 is " << p3 << endl;
cout << "len is " << bl.length() << endl;
- bl.splice(3,6);
+ bufferlist took;
+ bl.splice(10,4,&took);
- cout << "bl is now " << bl << endl;
- cout << "len is " << bl.length() << endl;
+ cout << "took out " << took << "leftover is " << bl << endl;
+ //cout << "len is " << bl.length() << endl;
bufferlist bl2;
bl2.substr_of(bl, 3, 5);