From: sage Date: Mon, 13 Jun 2005 01:48:56 +0000 (+0000) Subject: buffer tweaks; X-Git-Tag: v0.1~2085 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4355545e5a49ef3f22c31205c60512a35d12efa6;p=ceph.git buffer tweaks; logger tweaks; osd read/writes now go through MOSDOp(Reply), OSD checks cluster version, other replication groundwork git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@296 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/TODO b/ceph/TODO index 472d28f500bf..eca841465b9c 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -1,4 +1,10 @@ +!!! +- make mds shut down with mds_commit_on_shutdown=0 and/or mds_log_flush_on_shutdown=0. +- test mds scaling w/ makedirs, vs mds_log_on_request + +- finish osd replication MOSDOp groundwork + big fast todo's: - client buffer cache - replication protocol diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index e4a14e3fc153..1b810819e59c 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -13,11 +13,6 @@ #include "messages/MGenericMessage.h" -#include "messages/MOSDRead.h" -#include "messages/MOSDReadReply.h" -#include "messages/MOSDWrite.h" -#include "messages/MOSDWriteReply.h" - #include "osd/Filer.h" #include "common/Cond.h" @@ -229,12 +224,6 @@ void Client::dispatch(Message *m) switch (m->get_type()) { // osd - case MSG_OSD_READREPLY: - filer->handle_osd_read_reply((MOSDReadReply*)m); - break; - case MSG_OSD_WRITEREPLY: - filer->handle_osd_write_reply((MOSDWriteReply*)m); - break; case MSG_OSD_OPREPLY: filer->handle_osd_op_reply((MOSDOpReply*)m); break; @@ -373,8 +362,7 @@ int Client::mount(int mkfs) assert(reply); // we got osdcluster! - int off = 0; - osdcluster->_unrope(reply->get_osd_cluster_state(), off); + osdcluster->decode(reply->get_osd_cluster_state()); dout(1) << "mounted" << endl; mounted = true; diff --git a/ceph/common/Logger.cc b/ceph/common/Logger.cc index df8d1f9bb936..7f6741753c31 100644 --- a/ceph/common/Logger.cc +++ b/ceph/common/Logger.cc @@ -15,7 +15,8 @@ Logger::Logger(string& fn, LogType *type) filename = "log/"; filename += fn; interval = g_conf.log_interval; - start = last_logged = g_clock.gettime(); // time 0! + start = g_clock.gettimepair(); // time 0! + last_logged = 0; wrote_header = -1; open = false; this->type = type; @@ -68,8 +69,11 @@ long Logger::get(string& key) void Logger::flush(bool force) { - double now = g_clock.gettime(); - while (now >= last_logged + interval || force) { + timepair_t now = g_clock.gettimepair(); + double fromstart = timepair_to_double(now - start); + + while (force || + fromstart - last_logged >= interval) { last_logged += interval; force = false; @@ -93,7 +97,7 @@ void Logger::flush(bool force) // write line to log //out << (long)(last_logged - start); - out << last_logged - start; + out << fromstart; for (vector::iterator it = type->keys.begin(); it != type->keys.end(); it++) { out << "\t" << get(*it); } diff --git a/ceph/common/Logger.h b/ceph/common/Logger.h index 8e0a565d3636..314cd4742d7d 100644 --- a/ceph/common/Logger.h +++ b/ceph/common/Logger.h @@ -2,7 +2,7 @@ #define __LOGGER_H #include "include/types.h" - +#include "Clock.h" #include #include using namespace std; @@ -17,7 +17,7 @@ class Logger { LogType *type; - double start; + timepair_t start; double last_logged; double interval; int wrote_header; diff --git a/ceph/config.cc b/ceph/config.cc index 5b60d9258a50..15cdd7b17a2c 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -39,7 +39,7 @@ md_config_t g_conf = { mds_log_max_trimming: 16, mds_log_read_inc: 65536, mds_log_before_reply: true, - mds_log_flush_on_shutdown: false, //true, + mds_log_flush_on_shutdown: true, mds_bal_replicate_threshold: 500, mds_bal_unreplicate_threshold: 200, diff --git a/ceph/include/bufferlist.h b/ceph/include/bufferlist.h index bb2e5cf6b776..e09e58e43991 100644 --- a/ceph/include/bufferlist.h +++ b/ceph/include/bufferlist.h @@ -4,6 +4,7 @@ #include "buffer.h" #include +#include using namespace std; #include @@ -120,6 +121,13 @@ class bufferlist { // just add another buffer push_back(new buffer(data, len)); } + void append(bufferptr& bp) { + push_back(bp); + } + void append(bufferptr& bp, int len, int off) { + bufferptr tempbp(bp, len, off); + push_back(tempbp); + } /* @@ -191,7 +199,7 @@ class bufferlist { } // funky modifer - void splice(int off, int len /*, bufferlist *replace */) { // fixme? + void splice(int off, int len, bufferlist *claim_by=0 /*, bufferlist& replace_with */) { // fixme? // skip off list::iterator curbuf = _buffers.begin(); while (off > 0) { @@ -211,23 +219,27 @@ class bufferlist { if (off) { // add a reference to the front bit // insert it before curbuf (which we'll hose) - //cout << "keeping front " << off << " of " << *curbuf << endl; + cout << "keeping front " << off << " of " << *curbuf << endl; _buffers.insert( curbuf, bufferptr( *curbuf, off, 0 ) ); } while (len > 0) { // partial? if (off + len < (*curbuf).length()) { - //cout << "keeping end of " << *curbuf << endl; + cout << "keeping end of " << *curbuf << ", losing first " << off+len << endl; + if (claim_by) + claim_by->append( *curbuf, len, off ); (*curbuf).set_offset( off + len ); // ignore beginning big - (*curbuf).set_length( len ); - //cout << " now " << *curbuf << endl; + (*curbuf).set_length( (*curbuf).length() - len - off ); + cout << " now " << *curbuf << endl; break; } // hose the whole thing - //cout << "discarding all of " << *curbuf << endl; int howmuch = (*curbuf).length() - off; + cout << "discarding " << howmuch << " of " << *curbuf << endl; + if (claim_by) + claim_by->append( *curbuf, howmuch, off ); _buffers.erase( curbuf++ ); len -= howmuch; off = 0; @@ -252,4 +264,38 @@ inline ostream& operator<<(ostream& out, bufferlist& bl) { +// encoder/decode helpers + +// set +inline void _encode(set& s, bufferlist& bl) +{ + int n = s.size(); + bl.append((char*)&n, sizeof(n)); + for (set::iterator it = s.begin(); + it != s.end(); + it++) { + int v = *it; + bl.append((char*)&v, sizeof(v)); + n--; + } + assert(n==0); +} +inline void _decode(set& s, bufferlist& bl, int& off) +{ + s.clear(); + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; imdc = mdc; + } + void finish(int r) { + mdc->shutdown_commits--; + } +}; void MDCache::shutdown_start() { dout(1) << "shutdown_start" << endl; + shutdown_commits = 0; if (g_conf.mds_commit_on_shutdown) { dout(1) << "shutdown_start committing all dirty dirs" << endl; @@ -534,9 +545,10 @@ void MDCache::shutdown_start() CInode *in = it->second; // commit any dirty dir that's ours - if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty()) - mds->mdstore->commit_dir(in->dir, NULL); - + if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty()) { + mds->mdstore->commit_dir(in->dir, new C_MDC_ShutdownCommit(this)); + shutdown_commits++; + } } } @@ -555,7 +567,13 @@ bool MDCache::shutdown_pass() return true; } - + // commits? + if (g_conf.mds_commit_on_shutdown && + shutdown_commits > 0) { + dout(7) << "shutdown_commits = " << shutdown_commits << endl; + return false; + } + // flush log? if (g_conf.mds_log_flush_on_shutdown) { // (wait for) flush log diff --git a/ceph/mds/MDCache.h b/ceph/mds/MDCache.h index 121cfed16a07..5a62a08fd89c 100644 --- a/ceph/mds/MDCache.h +++ b/ceph/mds/MDCache.h @@ -117,6 +117,8 @@ class MDCache { public: // active MDS requests map active_requests; + + int shutdown_commits; friend class MDBalancer; diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index c63abd45268c..8666c024c273 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -256,14 +256,8 @@ void MDS::proc_message(Message *m) { switch (m->get_type()) { // OSD =============== - case MSG_OSD_READREPLY: - filer->handle_osd_read_reply((MOSDReadReply*)m); - return; - case MSG_OSD_WRITEREPLY: - filer->handle_osd_write_reply((MOSDWriteReply*)m); - return; case MSG_OSD_OPREPLY: - filer->handle_osd_op_reply((MOSDOpReply*)m); + filer->handle_osd_op_reply((class MOSDOpReply*)m); return; // MDS diff --git a/ceph/messages/MClientMountAck.h b/ceph/messages/MClientMountAck.h index c877676d56fc..b7b0936279ce 100644 --- a/ceph/messages/MClientMountAck.h +++ b/ceph/messages/MClientMountAck.h @@ -8,31 +8,32 @@ class MClientMountAck : public Message { long pcid; - crope osd_cluster_state; + bufferlist osd_cluster_state; public: MClientMountAck() {} MClientMountAck(MClientMount *mnt, OSDCluster *osdcluster) : Message(MSG_CLIENT_MOUNTACK) { this->pcid = mnt->get_pcid(); - osdcluster->_rope( osd_cluster_state ); + osdcluster->encode( osd_cluster_state ); } - crope& get_osd_cluster_state() { return osd_cluster_state; } + bufferlist& get_osd_cluster_state() { return osd_cluster_state; } void set_pcid(long pcid) { this->pcid = pcid; } long get_pcid() { return pcid; } char *get_type_name() { return "CmntA"; } - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(pcid), (char*)&pcid); + virtual void decode_payload() { + int off; + payload.copy(off, sizeof(pcid), (char*)&pcid); off += sizeof(pcid); - osd_cluster_state = s.substr(off, s.length()-off); - off += osd_cluster_state.length(); + if (off < payload.length()) + payload.splice( off, payload.length()-off, &osd_cluster_state); } - virtual void encode_payload(crope& s) { - s.append((char*)&pcid, sizeof(pcid)); - s.append(osd_cluster_state); + virtual void encode_payload() { + payload.append((char*)&pcid, sizeof(pcid)); + payload.claim_append(osd_cluster_state); } }; diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index 1c8a3677f967..b30ea0c5deef 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -15,44 +15,82 @@ #define OSD_OP_DELETE 2 #define OSD_OP_ZERORANGE 3 #define OSD_OP_MKFS 10 +#define OSD_OP_READ 20 +#define OSD_OP_WRITE 21 typedef struct { long tid; long pcid; + msg_addr_t asker; + object_t oid; - int op; + repgroup_t rg; + __uint64_t ocv; + int op; size_t length, offset; + + size_t _data_len; } MOSDOp_st; class MOSDOp : public Message { MOSDOp_st st; + bufferlist data; friend class MOSDOpReply; public: long get_tid() { return st.tid; } + msg_addr_t get_asker() { return st.asker; } + object_t get_oid() { return st.oid; } + repgroup_t get_rg() { return st.rg; } + __uint64_t get_ocv() { return st.ocv; } + int get_op() { return st.op; } + size_t get_length() { return st.length; } + size_t get_offset() { return st.offset; } + + void set_data(bufferlist &d) { + data.claim(d); + st._data_len = data.length(); + } + bufferlist& get_data() { + return data; + } + size_t get_data_len() { return st._data_len; } // keep a pcid (procedure call id) to match up request+reply void set_pcid(long pcid) { this->st.pcid = pcid; } long get_pcid() { return st.pcid; } - MOSDOp(long tid, object_t oid, int op) : + MOSDOp(long tid, msg_addr_t asker, + object_t oid, repgroup_t rg, __uint64_t ocv, int op) : Message(MSG_OSD_OP) { + memset(&st, 0, sizeof(st)); this->st.tid = tid; + this->st.asker = asker; + this->st.oid = oid; + this->st.rg = rg; + this->st.ocv = ocv; this->st.op = op; } MOSDOp() {} - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(st), (char*)&st); - off += sizeof(st); + void set_length(size_t l) { st.length = l; } + void set_offset(size_t o) { st.offset = o; } + + + // marshalling + virtual void decode_payload() { + payload.copy(0, sizeof(st), (char*)&st); + payload.splice(0, sizeof(st)); + data.claim(payload); } - virtual void encode_payload(crope& s) { - s.append((char*)&st, sizeof(st)); + virtual void encode_payload() { + payload.push_back( new buffer((char*)&st, sizeof(st)) ); + payload.claim_append( data ); } virtual char *get_type_name() { return "oop"; } diff --git a/ceph/messages/MOSDOpReply.h b/ceph/messages/MOSDOpReply.h index d0c4c2868af5..ac12e40deeb0 100644 --- a/ceph/messages/MOSDOpReply.h +++ b/ceph/messages/MOSDOpReply.h @@ -2,6 +2,7 @@ #define __MOSDOPREPLY_H #include "msg/Message.h" +#include "osd/OSDCluster.h" #include "MOSDOp.h" @@ -18,16 +19,25 @@ typedef struct { // req long tid; long pcid; + object_t oid; - int op; + int op; + // reply int result; - size_t size; + size_t length, offset; + size_t object_size; + + __uint64_t _new_ocv; + size_t _data_len, _oc_len; } MOSDOpReply_st; + class MOSDOpReply : public Message { MOSDOpReply_st st; + bufferlist data; + bufferlist osdcluster; public: long get_tid() { return st.tid; } @@ -35,31 +45,68 @@ class MOSDOpReply : public Message { int get_op() { return st.op; } int get_result() { return st.result; } - size_t get_size() { return st.size; } + size_t get_length() { return st.length; } + size_t get_offset() { return st.offset; } + size_t get_object_size() { return st.object_size; } + + void set_result(int r) { st.result = r; } + void set_length(size_t s) { st.length = s; } + void set_offset(size_t o) { st.offset = o; } + void set_object_size(size_t s) { st.object_size = s; } + + // data payload + void set_data(bufferlist &d) { + data.claim(d); + st._data_len = data.length(); + } + bufferlist& get_data() { + return data; + } - void set_size(size_t s) { st.size = s; } + // osdcluster + __uint64_t get_ocv() { return st._new_ocv; } + bufferlist& get_osdcluster() { + return osdcluster; + } // keep a pcid (procedure call id) to match up request+reply void set_pcid(long pcid) { this->st.pcid = pcid; } long get_pcid() { return st.pcid; } - MOSDOpReply(MOSDOp *req, int result) : + MOSDOpReply(MOSDOp *req, int result, OSDCluster *oc) : Message(MSG_OSD_OPREPLY) { + memset(&st, 0, sizeof(st)); this->st.pcid = req->st.pcid; this->st.tid = req->st.tid; + this->st.oid = req->st.oid; this->st.op = req->st.op; - this->st.result = result; + + this->st.length = req->st.length; // speculative... OSD should ensure these are correct + this->st.offset = req->st.offset; + + // attach updated cluster spec? + if (req->get_ocv() < oc->get_version()) { + oc->encode(osdcluster); + st._new_ocv = oc->get_version(); + st._oc_len = osdcluster.length(); + } } MOSDOpReply() {} - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(st), (char*)&st); - off += sizeof(st); + + // marshalling + virtual void decode_payload() { + payload.copy(0, sizeof(st), (char*)&st); + payload.splice(0, sizeof(st)); + if (st._data_len) payload.splice(0, st._data_len, &data); + if (st._oc_len) payload.splice(0, st._oc_len, &osdcluster); } - virtual void encode_payload(crope& s) { - s.append((char*)&st, sizeof(st)); + virtual void encode_payload() { + payload.push_back( new buffer((char*)&st, sizeof(st)) ); + payload.claim_append( data ); + payload.claim_append( osdcluster ); } virtual char *get_type_name() { return "oopr"; } diff --git a/ceph/messages/MOSDRead.h b/ceph/messages/MOSDRead.h deleted file mode 100644 index f1d5bed2fa03..000000000000 --- a/ceph/messages/MOSDRead.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef __MOSDREAD_H -#define __MOSDREAD_H - -#include "msg/Message.h" - -/* - * OSD read request - * - * oid - object id - * offset, len -- guess - * - * caveat: if len=0, then the _entire_ object is read. this is currently - * used by the MDS, and pretty much a dumb idea in general. - */ - -typedef struct { - long tid; - long pcid; - size_t len; - off_t offset; - object_t oid; -} MOSDRead_st; - -class MOSDRead : public Message { - MOSDRead_st st; - - friend class MOSDReadReply; - - public: - long get_tid() { return st.tid; } - size_t get_len() { return st.len; } - off_t get_offset() { return st.offset; } - object_t get_oid() { return st.oid; } - - // keep a pcid (procedure call id) to match up request+reply - void set_pcid(long pcid) { this->st.pcid = pcid; } - long get_pcid() { return st.pcid; } - - MOSDRead(long tid, object_t oid, size_t len, off_t offset) : - Message(MSG_OSD_READ) { - this->st.tid = tid; - this->st.oid = oid; - this->st.len = len; - this->st.offset = offset; - this->st.pcid = 0; - } - MOSDRead() {} - - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(st), (char*)&st); - off += sizeof(st); - } - virtual void encode_payload(crope& s) { - s.append((char*)&st, sizeof(st)); - } - - virtual char *get_type_name() { return "oread"; } -}; - -#endif diff --git a/ceph/messages/MOSDReadReply.h b/ceph/messages/MOSDReadReply.h deleted file mode 100644 index 1737ca0c5be7..000000000000 --- a/ceph/messages/MOSDReadReply.h +++ /dev/null @@ -1,77 +0,0 @@ -#ifndef __MOSDREADREPLY_H -#define __MOSDREADREPLY_H - -#include "MOSDRead.h" - -/* - * OSD Read Reply - * - * oid - object id - * offset, len - data returned - * - * len may not match the read request, if the end of object is hit. - */ - -typedef struct { - long tid; - long pcid; - off_t offset; - object_t oid; - size_t len; - long result; -} MOSDReadReply_st; - -class MOSDReadReply : public Message { - MOSDReadReply_st st; - bufferlist data; - - public: - size_t get_len() { return st.len; } - int get_result() { return st.result; } - object_t get_oid() { return st.oid; } - off_t get_offset() { return st.offset; } - long get_tid() { return st.tid; } - - - // keep a pcid (procedure call id) to match up request+reply - void set_pcid(long pcid) { this->st.pcid = pcid; } - long get_pcid() { return st.pcid; } - - MOSDReadReply() { - } - MOSDReadReply(MOSDRead *r, long result) : - Message(MSG_OSD_READREPLY) { - this->st.tid = r->st.tid; - this->st.pcid = r->st.pcid; - this->st.oid = r->st.oid; - this->st.offset = r->st.offset; - this->st.result = result; - this->st.len = 0; - } - - bufferlist& get_data() { - return data; - } - void set_data(bufferlist &bl) { - data.claim(bl); - this->st.len = data.length(); - } - void set_result(int result) { - this->st.result = result; - } - - virtual void decode_payload() { - // warning: only call this once, we modify the payload! - payload.copy(0, sizeof(st), (char*)&st); - payload.splice(0, sizeof(st)); - data.claim(payload); - } - virtual void encode_payload() { - payload.push_back( new buffer((char*)&st, sizeof(st)) ); - payload.claim_append(data); - } - - virtual char *get_type_name() { return "oreadr"; } -}; - -#endif diff --git a/ceph/messages/MOSDWrite.h b/ceph/messages/MOSDWrite.h deleted file mode 100644 index 8a4dd4567e2d..000000000000 --- a/ceph/messages/MOSDWrite.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef __MOSDWRITE_H -#define __MOSDWRITE_H - -#include "msg/Message.h" - -/* - * OSD Write - * - * tid - caller's transaction id - * - * oid - object id - * offset, len - - * - * flags - passed to open(). not used at all.. this should be removed? - * - */ - - -typedef struct { - long tid; - long pcid; - off_t offset; - object_t oid; - //int flags; - size_t len; -} MOSDWrite_st; - -class MOSDWrite : public Message { - MOSDWrite_st st; - bufferlist data; - - friend class MOSDWriteReply; - - public: - long get_tid() { return st.tid; } - off_t get_offset() { return st.offset; } - object_t get_oid() { return st.oid; } - //int get_flags() { return st.flags; } - long get_len() { return st.len; } - - // keep a pcid (procedure call id) to match up request+reply - void set_pcid(long pcid) { this->st.pcid = pcid; } - long get_pcid() { return st.pcid; } - - MOSDWrite() {} - MOSDWrite(long tid, object_t oid, size_t len, off_t offset) : - Message(MSG_OSD_WRITE) { - this->st.tid = tid; - this->st.oid = oid; - this->st.offset = offset; - //this->st.flags = flags; - this->st.len = len; - this->st.pcid = 0; - } - - void set_data(bufferlist &d) { - data.claim(d); - assert(data.length() == st.len); - } - bufferlist& get_data() { - return data; - } - - - virtual void decode_payload() { - payload.copy(0, sizeof(st), (char*)&st); - payload.splice(0, sizeof(st)); - data.claim(payload); - } - - virtual void encode_payload() { - payload.push_back( new buffer((char*)&st, sizeof(st)) ); - payload.claim_append( data ); - } - - virtual char *get_type_name() { return "owr"; } -}; - -#endif diff --git a/ceph/messages/MOSDWriteReply.h b/ceph/messages/MOSDWriteReply.h deleted file mode 100644 index c339978d7341..000000000000 --- a/ceph/messages/MOSDWriteReply.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef __MOSDWRITEREPLY_H -#define __MOSDWRITEREPLY_H - -#include "MOSDWrite.h" - -/* - * OSD WRite Reply - * - * tid - caller's transaction # - * oid - object id - * offset, len - ... - * result - result code, matchines write() system call: # of bytes written, or error code. - */ - -typedef struct { - long tid; - long pcid; - long result; - off_t offset; - object_t oid; -} MOSDWriteReply_st; - -class MOSDWriteReply : public Message { - MOSDWriteReply_st st; - - public: - long get_tid() { return st.tid; } - long get_result() { return st.result; } - off_t get_offset() { return st.offset; } - object_t get_oid() { return st.oid; } - - // keep a pcid (procedure call id) to match up request+reply - void set_pcid(long pcid) { this->st.pcid = pcid; } - long get_pcid() { return st.pcid; } - - MOSDWriteReply() {} - MOSDWriteReply(MOSDWrite *r, long wrote) : - Message(MSG_OSD_WRITEREPLY) { - this->st.pcid = r->st.pcid; - this->st.tid = r->st.tid; - this->st.oid = r->st.oid; - this->st.offset = r->st.offset; - this->st.result = wrote; - } - - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(st), (char*)&st); - off += sizeof(st); - } - virtual void encode_payload(crope& s) { - s.append((char*)&st, sizeof(st)); - } - - virtual char *get_type_name() { return "owrr"; } -}; - -#endif diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index c5c70087fd1d..c3e721fd8d56 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -19,10 +19,6 @@ using namespace std; #include "messages/MFailureAck.h" #include "messages/MOSDPing.h" -#include "messages/MOSDRead.h" -#include "messages/MOSDReadReply.h" -#include "messages/MOSDWrite.h" -#include "messages/MOSDWriteReply.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -109,22 +105,6 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_OSD_PING: m = new MOSDPing(); break; - case MSG_OSD_READ: - m = new MOSDRead(); - break; - - case MSG_OSD_READREPLY: - m = new MOSDReadReply(); - break; - - case MSG_OSD_WRITE: - m = new MOSDWrite(); - break; - - case MSG_OSD_WRITEREPLY: - m = new MOSDWriteReply(); - break; - case MSG_OSD_OP: m = new MOSDOp(); break; diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 4283f4e89452..c6149ab78bb4 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -3,6 +3,7 @@ #include "OSD.h" #include "FakeStore.h" +#include "OSDCluster.h" #include "mds/MDS.h" @@ -13,10 +14,6 @@ #include "messages/MPing.h" #include "messages/MPingAck.h" -#include "messages/MOSDRead.h" -#include "messages/MOSDReadReply.h" -#include "messages/MOSDWrite.h" -#include "messages/MOSDWriteReply.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -107,27 +104,18 @@ void OSD::dispatch(Message *m) // osd - case MSG_SHUTDOWN: shutdown(); break; - + case MSG_PING: // take note. monitor->host_is_alive(m->get_source()); - handle_ping((MPing*)m); break; - - case MSG_OSD_READ: - handle_read((MOSDRead*)m); - break; - - case MSG_OSD_WRITE: - handle_write((MOSDWrite*)m); - break; case MSG_OSD_OP: + monitor->host_is_alive(m->get_source()); handle_op((MOSDOp*)m); break; @@ -158,14 +146,52 @@ void OSD::handle_ping(MPing *m) void OSD::handle_op(MOSDOp *op) { + // check cluster version + if (op->get_ocv() > osdcluster->get_version()) { + // op's is newer + dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl; + + // query MDS + dout(7) << "querying MDS" << endl; + //messenger->send_message(new MGetOSDCluster(), MSG_ADDR_MDS(0), MDS_PORT_MAIN); + assert(0); + waiting_for_osdcluster.push_back(op); + return; + } + + if (op->get_ocv() < osdcluster->get_version()) { + // op's is old + dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl; + + // verify that we are primary, or acting primary + int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() ); + if (acting_primary != whoami) { + dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl; + messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0); + return; + } + } + + + // do the op switch (op->get_op()) { + + case OSD_OP_READ: + op_read(op); + break; + + case OSD_OP_WRITE: + op_write(op); + break; + case OSD_OP_MKFS: dout(3) << "MKFS" << endl; { int r = store->mkfs(); - messenger->send_message(new MOSDOpReply(op, r), - op->get_source(), op->get_source_port()); + messenger->send_message(new MOSDOpReply(op, r, osdcluster), + op->get_asker()); } + delete op; break; case OSD_OP_DELETE: @@ -174,9 +200,10 @@ void OSD::handle_op(MOSDOp *op) dout(3) << "delete on " << op->get_oid() << " r = " << r << endl; // "ack" - messenger->send_message(new MOSDOpReply(op, r), - op->get_source(), op->get_source_port()); + messenger->send_message(new MOSDOpReply(op, r, osdcluster), + op->get_asker()); } + delete op; break; case OSD_OP_STAT: @@ -187,46 +214,50 @@ void OSD::handle_op(MOSDOp *op) dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl; - MOSDOpReply *reply = new MOSDOpReply(op, r); - reply->set_size(st.st_size); - messenger->send_message(reply, - op->get_source(), op->get_source_port()); + MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster); + reply->set_object_size(st.st_size); + messenger->send_message(reply, op->get_asker()); } + delete op; break; default: assert(0); } - - delete op; } -void OSD::handle_read(MOSDRead *r) +void OSD::op_read(MOSDOp *r) { // read into a buffer - bufferptr bptr = new buffer(r->get_len()); // prealloc space for entire read + bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read long got = store->read(r->get_oid(), - r->get_len(), r->get_offset(), + r->get_length(), r->get_offset(), bptr.c_str()); - MOSDReadReply *reply = new MOSDReadReply(r, 0); + + // set up reply + MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster); if (got >= 0) { - bptr.set_length(got); // properly size buffer - + bptr.set_length(got); // properly size the buffer + // give it to the reply in a bufferlist bufferlist bl; bl.push_back( bptr ); + + reply->set_result(0); reply->set_data(bl); + reply->set_length(got); } else { reply->set_result(got); // error + reply->set_length(0); } - dout(10) << "read got " << got << " / " << r->get_len() << " bytes from " << r->get_oid() << endl; + dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl; // send it - messenger->send_message(reply, r->get_source(), r->get_source_port()); + messenger->send_message(reply, r->get_asker()); delete r; } @@ -234,7 +265,7 @@ void OSD::handle_read(MOSDRead *r) // -- osd_write -void OSD::handle_write(MOSDWrite *m) +void OSD::op_write(MOSDOp *m) { // take buffers from the message bufferlist bl; @@ -259,8 +290,8 @@ void OSD::handle_write(MOSDWrite *m) // assume success. FIXME. // reply - MOSDWriteReply *reply = new MOSDWriteReply(m, 0); - messenger->send_message(reply, m->get_source(), m->get_source_port()); + MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster); + messenger->send_message(reply, m->get_asker()); delete m; } diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index ed4a46efa66f..ce8d823ca85f 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -6,20 +6,45 @@ #include "common/Mutex.h" +#include +using namespace std; + + class Messenger; -class MOSDRead; -class MOSDWrite; class Message; -class ObjectStore; -class HostMonitor; + + +// ways to be dirty +#define RG_DIRTY_LOCAL_LOG 1 +#define RG_DIRTY_LOCAL_SYNC 2 +#define RG_DIRTY_REPLICA_MEM 4 +#define RG_DIRTY_REPLICA_SYNC 8 + + +class ReplicaGroup { + public: + repgroup_t rg; + int role; // 1 = primary, 2 = secondary, etc. 0=undef. + int state; + + map dirty_map; // dirty objects + + ReplicaGroup(repgroup_t rg); + + void enumerate_objects(list& ls); +}; + class OSD : public Dispatcher { protected: Messenger *messenger; int whoami; - ObjectStore *store; - HostMonitor *monitor; + class OSDCluster *osdcluster; + class ObjectStore *store; + class HostMonitor *monitor; + + list waiting_for_osdcluster; Mutex osd_lock; @@ -27,15 +52,20 @@ class OSD : public Dispatcher { OSD(int id, Messenger *m); ~OSD(); + // startup/shutdown int init(); int shutdown(); + // OSDCluster + void update_osd_cluster(__uint64_t ocv, bufferlist& blist); + + // messages virtual void dispatch(Message *m); void handle_ping(class MPing *m); void handle_op(class MOSDOp *m); - void handle_read(MOSDRead *m); - void handle_write(MOSDWrite *m); + void op_read(class MOSDOp *m); + void op_write(class MOSDOp *m); }; #endif diff --git a/ceph/osd/OSDMap.cc b/ceph/osd/OSDMap.cc index a76d63851c13..52664ba85252 100644 --- a/ceph/osd/OSDMap.cc +++ b/ceph/osd/OSDMap.cc @@ -6,36 +6,38 @@ // serialize/unserialize -void OSDCluster::_rope(crope& r) +void OSDCluster::encode(bufferlist& blist) { - r.append((char*)&version, sizeof(version)); + blist.append((char*)&version, sizeof(version)); int ngroups = osd_groups.size(); - r.append((char*)&ngroups, sizeof(ngroups)); + blist.append((char*)&ngroups, sizeof(ngroups)); for (int i=0; i(ngroups); for (int i=0; i osd_groups; // RUSH disk groups + + set down_osds; // list of down disks set failed_osds; // list of failed disks Rush *rush; // rush implementation @@ -82,6 +85,8 @@ class OSDCluster { public: OSDCluster() : version(0), rush(0) { } + __uint64_t get_version() { return version; } + // cluster state bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; } @@ -110,9 +115,35 @@ class OSDCluster { } // serialize, unserialize - void _rope(crope& r); - void _unrope(crope& r, int& off); + //void _rope(crope& r); + //void _unrope(crope& r, int& off); + void encode(bufferlist& blist); + void decode(bufferlist& blist); + + + /**** ****/ + int get_rg_primary(repgroup_t rg) { + int group[NUM_RUSH_REPLICAS]; + repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS); + for (int i=0; i& extents) { size_t cur = offset; size_t left = len; @@ -163,8 +193,8 @@ class OSDCluster { // find oid, osds size_t blockno = cur / FILE_OBJECT_SIZE; ex.oid = file_to_object( ino, blockno ); - repgroup_t rg = file_to_repgroup(ino, blockno ); - repgroup_to_osds( rg, ex.osds, num_reps ); + ex.rg = file_to_repgroup(ino, blockno ); + ex.osd = get_rg_acting_primary( ex.rg ); // map range into object ex.offset = cur % FILE_OBJECT_SIZE; diff --git a/ceph/osdc/Filer.cc b/ceph/osdc/Filer.cc index 19a5b3757906..8c0e8e3fb47d 100644 --- a/ceph/osdc/Filer.cc +++ b/ceph/osdc/Filer.cc @@ -4,10 +4,10 @@ #include "Filer.h" #include "OSDCluster.h" -#include "messages/MOSDRead.h" -#include "messages/MOSDReadReply.h" -#include "messages/MOSDWrite.h" -#include "messages/MOSDWriteReply.h" +//#include "messages/MOSDRead.h" +//#include "messages/MOSDReadReply.h" +//#include "messages/MOSDWrite.h" +//#include "messages/MOSDWriteReply.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -34,14 +34,6 @@ Filer::Filer(Messenger *m, OSDCluster *o) void Filer::dispatch(Message *m) { switch (m->get_type()) { - case MSG_OSD_READREPLY: - handle_osd_read_reply((MOSDReadReply*)m); - break; - - case MSG_OSD_WRITEREPLY: - handle_osd_write_reply((MOSDWriteReply*)m); - break; - case MSG_OSD_OPREPLY: handle_osd_op_reply((MOSDOpReply*)m); break; @@ -86,13 +78,11 @@ Filer::read(inodeno_t ino, p->bytes_read = 0; p->onfinish = onfinish; - int num_rep = 1; // FIXME - // find data list extents; - osdcluster->file_to_extents(ino, len, offset, num_rep, extents); + osdcluster->file_to_extents(ino, len, offset, extents); - dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl; + dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl; int nfrag = 0; @@ -104,12 +94,16 @@ Filer::read(inodeno_t ino, last_tid++; // issue read - MOSDRead *m = new MOSDRead(last_tid, it->oid, it->len, it->offset); + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->rg, osdcluster->get_version(), + OSD_OP_READ); + m->set_length(it->len); + m->set_offset(it->offset); dout(15) << " read on " << last_tid << endl; - messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0); + messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); // note offset into read buffer - p->read_off[it->oid] = off; + p->read_off[last_tid] = off; off += it->len; // add to gather set @@ -123,7 +117,7 @@ Filer::read(inodeno_t ino, void -Filer::handle_osd_read_reply(MOSDReadReply *m) +Filer::handle_osd_read_reply(MOSDOpReply *m) { // get pio tid_t tid = m->get_tid(); @@ -134,8 +128,8 @@ Filer::handle_osd_read_reply(MOSDReadReply *m) op_reads.erase( tid ); // copy result into buffer - size_t off = p->read_off[m->get_oid()]; - dout(7) << "got frag at " << off << " len " << m->get_len() << endl; + size_t off = p->read_off[tid]; + dout(7) << "got frag at " << off << " len " << m->get_length() << endl; // our op finished p->outstanding_ops.erase(tid); @@ -201,7 +195,6 @@ Filer::write(inodeno_t ino, Context *onfinish) { last_tid++; - int num_rep = 1; // pending write record PendingOSDOp_t *p = new PendingOSDOp_t; @@ -209,9 +202,9 @@ Filer::write(inodeno_t ino, // find data list extents; - osdcluster->file_to_extents(ino, len, offset, num_rep, extents); + osdcluster->file_to_extents(ino, len, offset, extents); - dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl; + dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl; size_t off = 0; // ptr into buffer @@ -222,8 +215,12 @@ Filer::write(inodeno_t ino, last_tid++; // issue write - MOSDWrite *m = new MOSDWrite(last_tid, it->oid, it->len, it->offset); - + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->rg, osdcluster->get_version(), + OSD_OP_WRITE); + m->set_length(it->len); + m->set_offset(it->offset); + bufferlist cur; cur.substr_of(bl, off, it->len); m->set_data(cur); @@ -236,7 +233,7 @@ Filer::write(inodeno_t ino, // send dout(15) << " write on " << last_tid << endl; - messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0); + messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); } return 0; @@ -244,7 +241,7 @@ Filer::write(inodeno_t ino, void -Filer::handle_osd_write_reply(MOSDWriteReply *m) +Filer::handle_osd_write_reply(MOSDOpReply *m) { // get pio tid_t tid = m->get_tid(); @@ -277,6 +274,25 @@ Filer::handle_osd_write_reply(MOSDWriteReply *m) void Filer::handle_osd_op_reply(MOSDOpReply *m) { + // updated cluster info? + if (m->get_ocv() && + m->get_ocv() > osdcluster->get_version()) { + dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdcluster->get_version() << endl; + osdcluster->decode( m->get_osdcluster() ); + } + + + // read or write? + switch (m->get_op()) { + case OSD_OP_READ: + handle_osd_read_reply(m); + return; + case OSD_OP_WRITE: + handle_osd_write_reply(m); + return; + } + + // get pio tid_t tid = m->get_tid(); dout(15) << "handle_osd_op_reply on " << tid << endl; @@ -322,17 +338,15 @@ Filer::handle_osd_op_reply(MOSDOpReply *m) int Filer::remove(inodeno_t ino, size_t size, Context *onfinish) { - int num_rep = 1; - // pending write record PendingOSDOp_t *p = new PendingOSDOp_t; p->onfinish = onfinish; // find data list extents; - osdcluster->file_to_extents(ino, size, 0, num_rep, extents); + osdcluster->file_to_extents(ino, size, 0, extents); - dout(7) << "osd remove ino " << ino << " size " << size << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl; + dout(7) << "osd remove ino " << ino << " size " << size << " in " << extents.size() << " extents" << endl; size_t off = 0; // ptr into buffer @@ -342,15 +356,15 @@ int Filer::remove(inodeno_t ino, size_t size, Context *onfinish) int r = 0; // pick a replica last_tid++; - for (int r=0;roid, OSD_OP_DELETE); - messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0); + // issue delete + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->rg, osdcluster->get_version(), + OSD_OP_DELETE); + messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); - // add to gather set - p->outstanding_ops.insert(last_tid); - op_removes[last_tid] = p; - } + // add to gather set + p->outstanding_ops.insert(last_tid); + op_removes[last_tid] = p; } } @@ -379,8 +393,6 @@ int Filer::probe_size(inodeno_t ino, size_t *size, Context *onfinish) int Filer::mkfs(Context *onfinish) { - int num_rep = 1; - dout(7) << "mkfs, wiping all OSDs" << endl; // pending write record @@ -398,7 +410,9 @@ int Filer::mkfs(Context *onfinish) ++last_tid; // issue mkfs - MOSDOp *m = new MOSDOp(last_tid, 0, OSD_OP_MKFS); + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + 0, 0, osdcluster->get_version(), + OSD_OP_MKFS); messenger->send_message(m, MSG_ADDR_OSD(*it), 0); // add to gather set @@ -447,9 +461,12 @@ int Filer::zero(inodeno_t ino, // issue zero MOSDOp *m; - if (it->len == new MOSDOp(last_tid, it->oid, OSD_OP_DELETE); + //if (it->len == + m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->rg, osdcluster->get_version(), + OSD_OP_DELETE); it->len, it->offset); - messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0); + messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); // add to gather set p->outstanding_ops.insert(last_tid); diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index d1ace4e6923a..728b5c36f51e 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -105,8 +105,8 @@ class Filer : public Dispatcher { int mkfs(Context *c); - void handle_osd_read_reply(class MOSDReadReply *m); - void handle_osd_write_reply(class MOSDWriteReply *m); + void handle_osd_read_reply(class MOSDOpReply *m); + void handle_osd_write_reply(class MOSDOpReply *m); void handle_osd_op_reply(class MOSDOpReply *m); }; diff --git a/ceph/test/testbuffers.cc b/ceph/test/testbuffers.cc index 3a4163cb4395..be2298ff838d 100644 --- a/ceph/test/testbuffers.cc +++ b/ceph/test/testbuffers.cc @@ -8,12 +8,12 @@ using namespace std; int main() { - bufferptr p1 = new buffer("hello",6); + bufferptr p1 = new buffer("123456",6); bufferptr p2 = p1; cout << "it is '" << p1.c_str() << "'" << endl; - bufferptr p3 = new buffer("there",6); + bufferptr p3 = new buffer("abcdef",6); cout << "p3 is " << p3 << endl; @@ -26,10 +26,11 @@ int main() cout << "len is " << bl.length() << endl; - bl.splice(3,6); + bufferlist took; + bl.splice(10,4,&took); - cout << "bl is now " << bl << endl; - cout << "len is " << bl.length() << endl; + cout << "took out " << took << "leftover is " << bl << endl; + //cout << "len is " << bl.length() << endl; bufferlist bl2; bl2.substr_of(bl, 3, 5);