]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
buffer tweaks;
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 13 Jun 2005 01:48:56 +0000 (01:48 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 13 Jun 2005 01:48:56 +0000 (01:48 +0000)
logger tweaks;
osd read/writes now go through MOSDOp(Reply), OSD checks cluster version, other replication groundwork

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@296 29311d96-e01e-0410-9327-a35deaab8ce9

25 files changed:
ceph/TODO
ceph/client/Client.cc
ceph/common/Logger.cc
ceph/common/Logger.h
ceph/config.cc
ceph/include/bufferlist.h
ceph/include/types.h
ceph/mds/MDCache.cc
ceph/mds/MDCache.h
ceph/mds/MDS.cc
ceph/messages/MClientMountAck.h
ceph/messages/MOSDOp.h
ceph/messages/MOSDOpReply.h
ceph/messages/MOSDRead.h [deleted file]
ceph/messages/MOSDReadReply.h [deleted file]
ceph/messages/MOSDWrite.h [deleted file]
ceph/messages/MOSDWriteReply.h [deleted file]
ceph/msg/Messenger.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.cc
ceph/osd/OSDMap.h
ceph/osdc/Filer.cc
ceph/osdc/Filer.h
ceph/test/testbuffers.cc

index 472d28f500bf5cf3ccbcdbebfb21980da7a6b6a2..eca841465b9c7ffa2866ccc02c2e2095e17b0ee5 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -1,4 +1,10 @@
 
+!!!
+- make mds shut down with mds_commit_on_shutdown=0 and/or mds_log_flush_on_shutdown=0.
+- test mds scaling w/ makedirs, vs mds_log_on_request 
+
+- finish osd replication MOSDOp groundwork
+
 big fast todo's:
 - client buffer cache
 - replication protocol
index e4a14e3fc1538c5dcb8c60e0cb954466c0a4a3b0..1b810819e59cff96c76f08388150e6469db7ee1e 100644 (file)
 
 #include "messages/MGenericMessage.h"
 
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
-
 #include "osd/Filer.h"
 
 #include "common/Cond.h"
@@ -229,12 +224,6 @@ void Client::dispatch(Message *m)
 
   switch (m->get_type()) {
        // osd
-  case MSG_OSD_READREPLY:
-       filer->handle_osd_read_reply((MOSDReadReply*)m);
-       break;
-  case MSG_OSD_WRITEREPLY:
-       filer->handle_osd_write_reply((MOSDWriteReply*)m);
-       break;
   case MSG_OSD_OPREPLY:
        filer->handle_osd_op_reply((MOSDOpReply*)m);
        break;
@@ -373,8 +362,7 @@ int Client::mount(int mkfs)
   assert(reply);
 
   // we got osdcluster!
-  int off = 0;
-  osdcluster->_unrope(reply->get_osd_cluster_state(), off);
+  osdcluster->decode(reply->get_osd_cluster_state());
 
   dout(1) << "mounted" << endl;
   mounted = true;
index df8d1f9bb936cdbc93cfa4bacde7b1c9fc4bf14f..7f6741753c31b23339e8a311698ea82e28140cb1 100644 (file)
@@ -15,7 +15,8 @@ Logger::Logger(string& fn, LogType *type)
   filename = "log/";
   filename += fn;
   interval = g_conf.log_interval;
-  start = last_logged = g_clock.gettime();  // time 0!
+  start = g_clock.gettimepair();  // time 0!
+  last_logged = 0;
   wrote_header = -1;
   open = false;
   this->type = type;
@@ -68,8 +69,11 @@ long Logger::get(string& key)
 
 void Logger::flush(bool force) 
 {
-  double now = g_clock.gettime();
-  while (now >= last_logged + interval || force) {
+  timepair_t now = g_clock.gettimepair();
+  double fromstart = timepair_to_double(now - start);
+
+  while (force ||
+                fromstart - last_logged >= interval) {
        last_logged += interval;
        force = false;
 
@@ -93,7 +97,7 @@ void Logger::flush(bool force)
 
        // write line to log
        //out << (long)(last_logged - start);
-       out << last_logged - start;
+       out << fromstart;
        for (vector<string>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
          out << "\t" << get(*it);
        }
index 8e0a565d3636b4c6bb7dc044e457cc2d8e40f3a1..314cd4742d7d55a49b9eea29545829c84cd5da48 100644 (file)
@@ -2,7 +2,7 @@
 #define __LOGGER_H
 
 #include "include/types.h"
-
+#include "Clock.h"
 #include <string>
 #include <fstream>
 using namespace std;
@@ -17,7 +17,7 @@ class Logger {
 
   LogType *type;
 
-  double start;
+  timepair_t start;
   double last_logged;
   double interval;
   int wrote_header;
index 5b60d9258a50238ab39e1655f6d9f34174b9f0b0..15cdd7b17a2c586d30ba12c213aacbf43795eb67 100644 (file)
@@ -39,7 +39,7 @@ md_config_t g_conf = {
   mds_log_max_trimming: 16,
   mds_log_read_inc: 65536,
   mds_log_before_reply: true,
-  mds_log_flush_on_shutdown: false,  //true,
+  mds_log_flush_on_shutdown: true,
 
   mds_bal_replicate_threshold: 500,
   mds_bal_unreplicate_threshold: 200,
index bb2e5cf6b776b2a407c60301441f38c66b94821b..e09e58e43991728fb33fb9e3959236f9ba6c1cfd 100644 (file)
@@ -4,6 +4,7 @@
 #include "buffer.h"
 
 #include <list>
+#include <set>
 using namespace std;
 
 #include <ext/rope>
@@ -120,6 +121,13 @@ class bufferlist {
        // just add another buffer
        push_back(new buffer(data, len));  
   }
+  void append(bufferptr& bp) {
+       push_back(bp);
+  }
+  void append(bufferptr& bp, int len, int off) {
+       bufferptr tempbp(bp, len, off);
+       push_back(tempbp);
+  }
   
   
   /*
@@ -191,7 +199,7 @@ class bufferlist {
   }
 
   // funky modifer
-  void splice(int off, int len /*, bufferlist *replace */) {    // fixme?
+  void splice(int off, int len, bufferlist *claim_by=0 /*, bufferlist& replace_with */) {    // fixme?
        // skip off
        list<bufferptr>::iterator curbuf = _buffers.begin();
        while (off > 0) {
@@ -211,23 +219,27 @@ class bufferlist {
        if (off) {
          // add a reference to the front bit
          //  insert it before curbuf (which we'll hose)
-         //cout << "keeping front " << off << " of " << *curbuf << endl;
+         cout << "keeping front " << off << " of " << *curbuf << endl;
          _buffers.insert( curbuf, bufferptr( *curbuf, off, 0 ) );
        }
 
        while (len > 0) {
          // partial?
          if (off + len < (*curbuf).length()) {
-               //cout << "keeping end of " << *curbuf << endl;
+               cout << "keeping end of " << *curbuf << ", losing first " << off+len << endl;
+               if (claim_by) 
+                 claim_by->append( *curbuf, len, off );
                (*curbuf).set_offset( off + len );    // ignore beginning big
-               (*curbuf).set_length( len );
-               //cout << " now " << *curbuf << endl;
+               (*curbuf).set_length( (*curbuf).length() - len - off );
+               cout << " now " << *curbuf << endl;
                break;
          }
 
          // hose the whole thing
-         //cout << "discarding all of " << *curbuf << endl;
          int howmuch = (*curbuf).length() - off;
+         cout << "discarding " << howmuch << " of " << *curbuf << endl;
+         if (claim_by) 
+               claim_by->append( *curbuf, howmuch, off );
          _buffers.erase( curbuf++ );
          len -= howmuch;
          off = 0;
@@ -252,4 +264,38 @@ inline ostream& operator<<(ostream& out, bufferlist& bl) {
 
 
 
+// encoder/decode helpers
+
+// set<int>
+inline void _encode(set<int>& s, bufferlist& bl)
+{
+  int n = s.size();
+  bl.append((char*)&n, sizeof(n));
+  for (set<int>::iterator it = s.begin();
+          it != s.end();
+          it++) {
+       int v = *it;
+       bl.append((char*)&v, sizeof(v));
+       n--;
+  }
+  assert(n==0);
+}
+inline void _decode(set<int>& s, bufferlist& bl, int& off) 
+{
+  s.clear();
+  int n;
+  bl.copy(off, sizeof(n), (char*)&n);
+  off += sizeof(n);
+  for (int i=0; i<n; i++) {
+       int v;
+       bl.copy(off, sizeof(v), (char*)&v);
+       off += sizeof(v);
+       s.insert(v);
+  }
+  assert(s.size() == n);
+}
+
+
+
+
 #endif
index 584549f8c7649bf90873a9855445542446ff59fa..67da3e93882504d02ea9818e566594a6b6c44ffc 100644 (file)
@@ -35,8 +35,6 @@ using namespace __gnu_cxx;
 #define MDS_OP_SYMLINK  222
 
 #define MDS_OP_OPEN     301
-#define OSD_OP_READ     304
-#define OSD_OP_WRITE    305
 #define MDS_OP_TRUNCATE 306
 #define MDS_OP_FSYNC    307
 #define MDS_OP_CLOSE    310
index c4d7fce8e3860a61bb6dec394d84be89cb9388d9..69dc3d9eadf080c58206df2f42195b3148660f51 100644 (file)
@@ -520,11 +520,22 @@ bool MDCache::trim(__int32_t max) {
   return true;
 }
 
+class C_MDC_ShutdownCommit : public Context {
+  MDCache *mdc;
+public:
+  C_MDC_ShutdownCommit(MDCache *mdc) {
+       this->mdc = mdc;
+  }
+  void finish(int r) {
+       mdc->shutdown_commits--;
+  }
+};
 
 void MDCache::shutdown_start()
 {
   dout(1) << "shutdown_start" << endl;
 
+  shutdown_commits = 0;
   if (g_conf.mds_commit_on_shutdown) {
        dout(1) << "shutdown_start committing all dirty dirs" << endl;
 
@@ -534,9 +545,10 @@ void MDCache::shutdown_start()
          CInode *in = it->second;
          
          // commit any dirty dir that's ours
-         if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty())
-               mds->mdstore->commit_dir(in->dir, NULL);
-         
+         if (in->is_dir() && in->dir && in->dir->is_auth() && in->dir->is_dirty()) {
+               mds->mdstore->commit_dir(in->dir, new C_MDC_ShutdownCommit(this));
+               shutdown_commits++;
+         }
        }
   }
 
@@ -555,7 +567,13 @@ bool MDCache::shutdown_pass()
        return true;
   }
 
-
+  // commits?
+  if (g_conf.mds_commit_on_shutdown &&
+         shutdown_commits > 0) {
+       dout(7) << "shutdown_commits = " << shutdown_commits << endl;
+       return false;
+  }
+  
   // flush log?
   if (g_conf.mds_log_flush_on_shutdown) {
        // (wait for) flush log
index 121cfed16a0779a1891efa4192b141b40aaab880..5a62a08fd89cebfb2cce6cd063b7cdc52fe5c5d6 100644 (file)
@@ -117,6 +117,8 @@ class MDCache {
  public:
   // active MDS requests
   map<Message*, active_request_t>   active_requests;
+
+  int shutdown_commits;
   
 
   friend class MDBalancer;
index c63abd45268cf788f4a5f641bcd9de11d8267b6a..8666c024c2734a50c0d09435617ce9e6caa250e6 100644 (file)
@@ -256,14 +256,8 @@ void MDS::proc_message(Message *m)
 {
   switch (m->get_type()) {
        // OSD ===============
-  case MSG_OSD_READREPLY:
-       filer->handle_osd_read_reply((MOSDReadReply*)m);
-       return;
-  case MSG_OSD_WRITEREPLY:
-       filer->handle_osd_write_reply((MOSDWriteReply*)m);
-       return;
   case MSG_OSD_OPREPLY:
-       filer->handle_osd_op_reply((MOSDOpReply*)m);
+       filer->handle_osd_op_reply((class MOSDOpReply*)m);
        return;
 
        // MDS
index c877676d56fc0a5bffdff3774e0ff212a4a8e087..b7b0936279ce9d85792bf082a757e5a807eddcdc 100644 (file)
@@ -8,31 +8,32 @@
 
 class MClientMountAck : public Message {
   long pcid;
-  crope osd_cluster_state;
+  bufferlist osd_cluster_state;
 
  public:
   MClientMountAck() {}
   MClientMountAck(MClientMount *mnt, OSDCluster *osdcluster) : Message(MSG_CLIENT_MOUNTACK) { 
        this->pcid = mnt->get_pcid();
-       osdcluster->_rope( osd_cluster_state );
+       osdcluster->encode( osd_cluster_state );
   }
   
-  crope& get_osd_cluster_state() { return osd_cluster_state; }
+  bufferlist& get_osd_cluster_state() { return osd_cluster_state; }
 
   void set_pcid(long pcid) { this->pcid = pcid; }
   long get_pcid() { return pcid; }
 
   char *get_type_name() { return "CmntA"; }
 
-  virtual void decode_payload(crope& s, int& off) {  
-       s.copy(off, sizeof(pcid), (char*)&pcid);
+  virtual void decode_payload() {  
+       int off;
+       payload.copy(off, sizeof(pcid), (char*)&pcid);
        off += sizeof(pcid);
-       osd_cluster_state = s.substr(off, s.length()-off);
-       off += osd_cluster_state.length();
+       if (off < payload.length())
+         payload.splice( off, payload.length()-off, &osd_cluster_state);
   }
-  virtual void encode_payload(crope& s) {  
-       s.append((char*)&pcid, sizeof(pcid));
-       s.append(osd_cluster_state);
+  virtual void encode_payload() {  
+       payload.append((char*)&pcid, sizeof(pcid));
+       payload.claim_append(osd_cluster_state);
   }
 };
 
index 1c8a3677f967674e89262f11631c8b2311af4ded..b30ea0c5deef48022d08255ccee6565f29d2d7d5 100644 (file)
 #define OSD_OP_DELETE     2
 #define OSD_OP_ZERORANGE  3
 #define OSD_OP_MKFS       10
+#define OSD_OP_READ       20
+#define OSD_OP_WRITE      21
 
 typedef struct {
   long tid;
   long pcid;
+  msg_addr_t asker;
+
   object_t oid;
-  int op;
+  repgroup_t rg;
+  __uint64_t ocv;
 
+  int op;
   size_t length, offset;
+
+  size_t _data_len;
 } MOSDOp_st;
 
 class MOSDOp : public Message {
   MOSDOp_st st;
+  bufferlist data;
 
   friend class MOSDOpReply;
 
  public:
   long get_tid() { return st.tid; }
+  msg_addr_t get_asker() { return st.asker; }
+
   object_t get_oid() { return st.oid; }
+  repgroup_t get_rg() { return st.rg; }
+  __uint64_t get_ocv() { return st.ocv; }
+
   int get_op() { return st.op; }
+  size_t get_length() { return st.length; }
+  size_t get_offset() { return st.offset; }
+
+  void set_data(bufferlist &d) {
+       data.claim(d);
+       st._data_len = data.length();
+  }
+  bufferlist& get_data() {
+       return data;
+  }
+  size_t get_data_len() { return st._data_len; }
 
   // keep a pcid (procedure call id) to match up request+reply
   void set_pcid(long pcid) { this->st.pcid = pcid; }
   long get_pcid() { return st.pcid; }
 
-  MOSDOp(long tid, object_t oid, int op) :
+  MOSDOp(long tid, msg_addr_t asker, 
+                object_t oid, repgroup_t rg, __uint64_t ocv, int op) :
        Message(MSG_OSD_OP) {
+       memset(&st, 0, sizeof(st));
        this->st.tid = tid;
+       this->st.asker = asker;
+
        this->st.oid = oid;
+       this->st.rg = rg;
+       this->st.ocv = ocv;
        this->st.op = op;
   }
   MOSDOp() {}
 
-  virtual void decode_payload(crope& s, int& off) {
-       s.copy(off, sizeof(st), (char*)&st);
-       off += sizeof(st);
+  void set_length(size_t l) { st.length = l; }
+  void set_offset(size_t o) { st.offset = o; }
+
+  
+  // marshalling
+  virtual void decode_payload() {
+       payload.copy(0, sizeof(st), (char*)&st);
+       payload.splice(0, sizeof(st));
+       data.claim(payload);
   }
-  virtual void encode_payload(crope& s) {
-       s.append((char*)&st, sizeof(st));
+  virtual void encode_payload() {
+       payload.push_back( new buffer((char*)&st, sizeof(st)) );
+       payload.claim_append( data );
   }
 
   virtual char *get_type_name() { return "oop"; }
index d0c4c2868af52675a88d24043b28d0e15a143745..ac12e40deeb0da690e40b0e70151910fb8fc8423 100644 (file)
@@ -2,6 +2,7 @@
 #define __MOSDOPREPLY_H
 
 #include "msg/Message.h"
+#include "osd/OSDCluster.h"
 
 #include "MOSDOp.h"
 
@@ -18,16 +19,25 @@ typedef struct {
   // req
   long tid;
   long pcid;
+
   object_t oid;
-  int op;
 
+  int op;
+  
   // reply
   int    result;
-  size_t size;
+  size_t length, offset;
+  size_t object_size;
+
+  __uint64_t _new_ocv;
+  size_t _data_len, _oc_len;
 } MOSDOpReply_st;
 
+
 class MOSDOpReply : public Message {
   MOSDOpReply_st st;
+  bufferlist data;
+  bufferlist osdcluster;
 
  public:
   long     get_tid() { return st.tid; }
@@ -35,31 +45,68 @@ class MOSDOpReply : public Message {
   int      get_op()  { return st.op; }
   
   int    get_result() { return st.result; }
-  size_t get_size()   { return st.size; }
+  size_t get_length() { return st.length; }
+  size_t get_offset() { return st.offset; }
+  size_t get_object_size() { return st.object_size; }
+
+  void set_result(int r) { st.result = r; }
+  void set_length(size_t s) { st.length = s; }
+  void set_offset(size_t o) { st.offset = o; }
+  void set_object_size(size_t s) { st.object_size = s; }
+
+  // data payload
+  void set_data(bufferlist &d) {
+       data.claim(d);
+       st._data_len = data.length();
+  }
+  bufferlist& get_data() {
+       return data;
+  }
 
-  void set_size(size_t s) { st.size = s; }
+  // osdcluster
+  __uint64_t get_ocv() { return st._new_ocv; }
+  bufferlist& get_osdcluster() { 
+       return osdcluster;
+  }
 
   // keep a pcid (procedure call id) to match up request+reply
   void set_pcid(long pcid) { this->st.pcid = pcid; }
   long get_pcid()          { return st.pcid; }
 
-  MOSDOpReply(MOSDOp *req, int result) :
+  MOSDOpReply(MOSDOp *req, int result, OSDCluster *oc) :
        Message(MSG_OSD_OPREPLY) {
+       memset(&st, 0, sizeof(st));
        this->st.pcid = req->st.pcid;
        this->st.tid = req->st.tid;
+
        this->st.oid = req->st.oid;
        this->st.op = req->st.op;
-
        this->st.result = result;
+
+       this->st.length = req->st.length;   // speculative... OSD should ensure these are correct
+       this->st.offset = req->st.offset;
+
+       // attach updated cluster spec?
+       if (req->get_ocv() < oc->get_version()) {
+         oc->encode(osdcluster);
+         st._new_ocv = oc->get_version();
+         st._oc_len = osdcluster.length();
+       }
   }
   MOSDOpReply() {}
 
-  virtual void decode_payload(crope& s, int& off) {
-       s.copy(off, sizeof(st), (char*)&st);
-       off += sizeof(st);
+
+  // marshalling
+  virtual void decode_payload() {
+       payload.copy(0, sizeof(st), (char*)&st);
+       payload.splice(0, sizeof(st));
+       if (st._data_len) payload.splice(0, st._data_len, &data);
+       if (st._oc_len) payload.splice(0, st._oc_len, &osdcluster);
   }
-  virtual void encode_payload(crope& s) {
-       s.append((char*)&st, sizeof(st));
+  virtual void encode_payload() {
+       payload.push_back( new buffer((char*)&st, sizeof(st)) );
+       payload.claim_append( data );
+       payload.claim_append( osdcluster );
   }
 
   virtual char *get_type_name() { return "oopr"; }
diff --git a/ceph/messages/MOSDRead.h b/ceph/messages/MOSDRead.h
deleted file mode 100644 (file)
index f1d5bed..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-#ifndef __MOSDREAD_H
-#define __MOSDREAD_H
-
-#include "msg/Message.h"
-
-/*
- * OSD read request
- *
- * oid - object id
- * offset, len -- guess
- *
- * caveat: if len=0, then the _entire_ object is read.  this is currently
- *   used by the MDS, and pretty much a dumb idea in general.
- */
-
-typedef struct {
-  long tid;
-  long pcid;
-  size_t len;
-  off_t offset;
-  object_t oid;
-} MOSDRead_st;
-
-class MOSDRead : public Message {
-  MOSDRead_st st;
-
-  friend class MOSDReadReply;
-
- public:
-  long get_tid() { return st.tid; }
-  size_t get_len() { return st.len; }
-  off_t get_offset() { return st.offset; }
-  object_t get_oid() { return st.oid; }
-  
-  // keep a pcid (procedure call id) to match up request+reply
-  void set_pcid(long pcid) { this->st.pcid = pcid; }
-  long get_pcid() { return st.pcid; }
-
-  MOSDRead(long tid, object_t oid, size_t len, off_t offset) :
-       Message(MSG_OSD_READ) {
-       this->st.tid = tid;
-       this->st.oid = oid;
-       this->st.len = len;
-       this->st.offset = offset;
-       this->st.pcid = 0;
-  }
-  MOSDRead() {}
-
-  virtual void decode_payload(crope& s, int& off) {
-       s.copy(off, sizeof(st), (char*)&st);
-       off += sizeof(st);
-  }
-  virtual void encode_payload(crope& s) {
-       s.append((char*)&st, sizeof(st));
-  }
-
-  virtual char *get_type_name() { return "oread"; }
-};
-
-#endif
diff --git a/ceph/messages/MOSDReadReply.h b/ceph/messages/MOSDReadReply.h
deleted file mode 100644 (file)
index 1737ca0..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-#ifndef __MOSDREADREPLY_H
-#define __MOSDREADREPLY_H
-
-#include "MOSDRead.h"
-
-/*
- * OSD Read Reply
- *
- * oid - object id
- * offset, len - data returned
- *
- * len may not match the read request, if the end of object is hit.
- */
-
-typedef struct {
-  long tid;
-  long pcid;
-  off_t offset;
-  object_t oid;  
-  size_t len;
-  long result;
-} MOSDReadReply_st;
-
-class MOSDReadReply : public Message {
-  MOSDReadReply_st st;
-  bufferlist data;
-
- public:
-  size_t get_len() { return st.len; }
-  int get_result() { return st.result; }
-  object_t get_oid() { return st.oid; }
-  off_t get_offset() { return st.offset; }
-  long get_tid() { return st.tid; }
-
-
-  // keep a pcid (procedure call id) to match up request+reply
-  void set_pcid(long pcid) { this->st.pcid = pcid; }
-  long get_pcid() { return st.pcid; }
-
-  MOSDReadReply() { 
-  }
-  MOSDReadReply(MOSDRead *r, long result) :
-       Message(MSG_OSD_READREPLY) {
-       this->st.tid = r->st.tid;
-       this->st.pcid = r->st.pcid;
-       this->st.oid = r->st.oid;
-       this->st.offset = r->st.offset;
-       this->st.result = result;
-       this->st.len = 0;
-  }
-
-  bufferlist& get_data() {
-       return data;
-  }
-  void set_data(bufferlist &bl) {
-       data.claim(bl);
-       this->st.len = data.length();
-  }
-  void set_result(int result) {
-       this->st.result = result;
-  }
-  
-  virtual void decode_payload() {
-       // warning: only call this once, we modify the payload!
-       payload.copy(0, sizeof(st), (char*)&st);
-       payload.splice(0, sizeof(st));
-       data.claim(payload);
-  }
-  virtual void encode_payload() {
-       payload.push_back( new buffer((char*)&st, sizeof(st)) );
-       payload.claim_append(data);
-  }
-  
-  virtual char *get_type_name() { return "oreadr"; }
-};
-
-#endif
diff --git a/ceph/messages/MOSDWrite.h b/ceph/messages/MOSDWrite.h
deleted file mode 100644 (file)
index 8a4dd45..0000000
+++ /dev/null
@@ -1,79 +0,0 @@
-#ifndef __MOSDWRITE_H
-#define __MOSDWRITE_H
-
-#include "msg/Message.h"
-
-/*
- * OSD Write
- *
- * tid - caller's transaction id
- * 
- * oid - object id
- * offset, len - 
- *
- * flags - passed to open().  not used at all.. this should be removed?
- * 
- */
-
-
-typedef struct {
-  long tid;
-  long pcid;
-  off_t offset;
-  object_t oid;
-  //int flags;
-  size_t len;
-} MOSDWrite_st;
-
-class MOSDWrite : public Message {
-  MOSDWrite_st st;
-  bufferlist   data;
-
-  friend class MOSDWriteReply;
-
- public:
-  long get_tid() { return st.tid; }
-  off_t get_offset() { return st.offset; }
-  object_t get_oid() { return st.oid; }
-  //int get_flags() { return st.flags; }
-  long get_len() { return st.len; }
-
-  // keep a pcid (procedure call id) to match up request+reply
-  void set_pcid(long pcid) { this->st.pcid = pcid; }
-  long get_pcid() { return st.pcid; }
-
-  MOSDWrite() {}
-  MOSDWrite(long tid, object_t oid, size_t len, off_t offset) :
-       Message(MSG_OSD_WRITE) {
-       this->st.tid = tid;
-       this->st.oid = oid;
-       this->st.offset = offset;
-       //this->st.flags = flags;
-       this->st.len = len;
-       this->st.pcid = 0;
-  }
-
-  void set_data(bufferlist &d) {
-       data.claim(d);
-       assert(data.length() == st.len);
-  }
-  bufferlist& get_data() {
-       return data;
-  }
-
-  
-  virtual void decode_payload() {
-       payload.copy(0, sizeof(st), (char*)&st);
-       payload.splice(0, sizeof(st));
-       data.claim(payload);
-  }
-
-  virtual void encode_payload() {
-       payload.push_back( new buffer((char*)&st, sizeof(st)) );
-       payload.claim_append( data );
-  }
-
-  virtual char *get_type_name() { return "owr"; }
-};
-
-#endif
diff --git a/ceph/messages/MOSDWriteReply.h b/ceph/messages/MOSDWriteReply.h
deleted file mode 100644 (file)
index c339978..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef __MOSDWRITEREPLY_H
-#define __MOSDWRITEREPLY_H
-
-#include "MOSDWrite.h"
-
-/*
- * OSD WRite Reply
- *
- * tid - caller's transaction #
- * oid - object id
- * offset, len - ...
- * result - result code, matchines write() system call: # of bytes written, or error code.
- */
-
-typedef struct {
-  long tid;
-  long pcid;
-  long result;
-  off_t offset;
-  object_t oid;
-} MOSDWriteReply_st;
-
-class MOSDWriteReply : public Message {
-  MOSDWriteReply_st st;
-  
- public:
-  long get_tid() { return st.tid; }
-  long get_result() { return st.result; }
-  off_t get_offset() { return st.offset; }
-  object_t get_oid() { return st.oid; }  
-
-  // keep a pcid (procedure call id) to match up request+reply
-  void set_pcid(long pcid) { this->st.pcid = pcid; }
-  long get_pcid() { return st.pcid; }
-
-  MOSDWriteReply() {}
-  MOSDWriteReply(MOSDWrite *r, long wrote) :
-       Message(MSG_OSD_WRITEREPLY) {
-       this->st.pcid = r->st.pcid;
-       this->st.tid = r->st.tid;
-       this->st.oid = r->st.oid;
-       this->st.offset = r->st.offset;
-       this->st.result = wrote;
-  }
-
-  virtual void decode_payload(crope& s, int& off) {
-       s.copy(off, sizeof(st), (char*)&st);
-       off += sizeof(st);
-  }
-  virtual void encode_payload(crope& s) {
-       s.append((char*)&st, sizeof(st));
-  }
-
-  virtual char *get_type_name() { return "owrr"; }
-};
-
-#endif
index c5c70087fd1d501a3d4ec9e31ac4b4f526d69f06..c3e721fd8d56366a3a17bbc62af34d7506b6b8e2 100644 (file)
@@ -19,10 +19,6 @@ using namespace std;
 #include "messages/MFailureAck.h"
 
 #include "messages/MOSDPing.h"
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 
@@ -109,22 +105,6 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_OSD_PING:
        m = new MOSDPing();
        break;
-  case MSG_OSD_READ:
-       m = new MOSDRead();
-       break;
-
-  case MSG_OSD_READREPLY:
-       m = new MOSDReadReply();
-       break;
-
-  case MSG_OSD_WRITE:
-       m = new MOSDWrite();
-       break;
-
-  case MSG_OSD_WRITEREPLY:
-       m = new MOSDWriteReply();
-       break;
-
   case MSG_OSD_OP:
        m = new MOSDOp();
        break;
index 4283f4e894526c2f60a18bc39652c05bea759a92..c6149ab78bb43b0a47b20d2105c0e83af4301c7e 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "OSD.h"
 #include "FakeStore.h"
+#include "OSDCluster.h"
 
 #include "mds/MDS.h"
 
 
 #include "messages/MPing.h"
 #include "messages/MPingAck.h"
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 
@@ -107,27 +104,18 @@ void OSD::dispatch(Message *m)
   
        
        // osd
-
   case MSG_SHUTDOWN:
        shutdown();
        break;
-
+       
   case MSG_PING:
        // take note.
        monitor->host_is_alive(m->get_source());
-
        handle_ping((MPing*)m);
        break;
-       
-  case MSG_OSD_READ:
-       handle_read((MOSDRead*)m);
-       break;
-
-  case MSG_OSD_WRITE:
-       handle_write((MOSDWrite*)m);
-       break;
 
   case MSG_OSD_OP:
+       monitor->host_is_alive(m->get_source());
        handle_op((MOSDOp*)m);
        break;
 
@@ -158,14 +146,52 @@ void OSD::handle_ping(MPing *m)
 
 void OSD::handle_op(MOSDOp *op)
 {
+  // check cluster version
+  if (op->get_ocv() > osdcluster->get_version()) {
+       // op's is newer
+       dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+       
+       // query MDS
+       dout(7) << "querying MDS" << endl;
+       //messenger->send_message(new MGetOSDCluster(), MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+       assert(0);
+       waiting_for_osdcluster.push_back(op);
+       return;
+  }
+
+  if (op->get_ocv() < osdcluster->get_version()) {
+       // op's is old
+       dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+
+       // verify that we are primary, or acting primary
+       int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+       if (acting_primary != whoami) {
+         dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
+         messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
+         return;
+       }
+  }
+
+  
+  // do the op
   switch (op->get_op()) {
+
+  case OSD_OP_READ:
+       op_read(op);
+       break;
+
+  case OSD_OP_WRITE:
+       op_write(op);
+       break;
+
   case OSD_OP_MKFS:
        dout(3) << "MKFS" << endl;
        {
          int r = store->mkfs();        
-         messenger->send_message(new MOSDOpReply(op, r), 
-                                                         op->get_source(), op->get_source_port());
+         messenger->send_message(new MOSDOpReply(op, r, osdcluster), 
+                                                         op->get_asker());
        }
+       delete op;
        break;
 
   case OSD_OP_DELETE:
@@ -174,9 +200,10 @@ void OSD::handle_op(MOSDOp *op)
          dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
          
          // "ack"
-         messenger->send_message(new MOSDOpReply(op, r), 
-                                                         op->get_source(), op->get_source_port());
+         messenger->send_message(new MOSDOpReply(op, r, osdcluster), 
+                                                         op->get_asker());
        }
+       delete op;
        break;
 
   case OSD_OP_STAT:
@@ -187,46 +214,50 @@ void OSD::handle_op(MOSDOp *op)
   
          dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
          
-         MOSDOpReply *reply = new MOSDOpReply(op, r);
-         reply->set_size(st.st_size);
-         messenger->send_message(reply,
-                                                         op->get_source(), op->get_source_port());
+         MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+         reply->set_object_size(st.st_size);
+         messenger->send_message(reply, op->get_asker());
        }
+       delete op;
        break;
        
   default:
        assert(0);
   }
-
-  delete op;
 }
 
 
 
 
-void OSD::handle_read(MOSDRead *r)
+void OSD::op_read(MOSDOp *r)
 {
   // read into a buffer
-  bufferptr bptr = new buffer(r->get_len());   // prealloc space for entire read
+  bufferptr bptr = new buffer(r->get_length());   // prealloc space for entire read
   long got = store->read(r->get_oid(), 
-                                                r->get_len(), r->get_offset(),
+                                                r->get_length(), r->get_offset(),
                                                 bptr.c_str());
-  MOSDReadReply *reply = new MOSDReadReply(r, 0); 
+
+  // set up reply
+  MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster); 
   if (got >= 0) {
-       bptr.set_length(got);     // properly size buffer
-       
+       bptr.set_length(got);   // properly size the buffer
+
        // give it to the reply in a bufferlist
        bufferlist bl;
        bl.push_back( bptr );
+       
+       reply->set_result(0);
        reply->set_data(bl);
+       reply->set_length(got);
   } else {
        reply->set_result(got);   // error
+       reply->set_length(0);
   }
   
-  dout(10) << "read got " << got << " / " << r->get_len() << " bytes from " << r->get_oid() << endl;
+  dout(10) << "read got " << got << " / " << r->get_length() << " bytes from " << r->get_oid() << endl;
   
   // send it
-  messenger->send_message(reply, r->get_source(), r->get_source_port());
+  messenger->send_message(reply, r->get_asker());
 
   delete r;
 }
@@ -234,7 +265,7 @@ void OSD::handle_read(MOSDRead *r)
 
 // -- osd_write
 
-void OSD::handle_write(MOSDWrite *m)
+void OSD::op_write(MOSDOp *m)
 {
   // take buffers from the message
   bufferlist bl;
@@ -259,8 +290,8 @@ void OSD::handle_write(MOSDWrite *m)
   // assume success.  FIXME.
 
   // reply
-  MOSDWriteReply *reply = new MOSDWriteReply(m, 0);
-  messenger->send_message(reply, m->get_source(), m->get_source_port());
+  MOSDOpReply *reply = new MOSDOpReply(m, 0, osdcluster);
+  messenger->send_message(reply, m->get_asker());
 
   delete m;
 }
index ed4a46efa66ffdc080117c81702873dddeae3c45..ce8d823ca85f7a9b3a681f08451c147fe641aa98 100644 (file)
@@ -6,20 +6,45 @@
 
 #include "common/Mutex.h"
 
+#include <map>
+using namespace std;
+
+
 class Messenger;
-class MOSDRead;
-class MOSDWrite;
 class Message;
-class ObjectStore;
-class HostMonitor;
+
+
+// ways to be dirty
+#define RG_DIRTY_LOCAL_LOG     1
+#define RG_DIRTY_LOCAL_SYNC    2
+#define RG_DIRTY_REPLICA_MEM   4
+#define RG_DIRTY_REPLICA_SYNC  8
+
+
+class ReplicaGroup {
+ public:
+  repgroup_t rg;
+  int        role;    // 1 = primary, 2 = secondary, etc.  0=undef.
+  int        state;   
+
+  map<object_t, int>  dirty_map;  // dirty objects
+  
+  ReplicaGroup(repgroup_t rg);
+
+  void enumerate_objects(list<object_t>& ls);
+};
+
 
 class OSD : public Dispatcher {
  protected:
   Messenger *messenger;
   int whoami;
 
-  ObjectStore *store;
-  HostMonitor *monitor;
+  class OSDCluster  *osdcluster;
+  class ObjectStore *store;
+  class HostMonitor *monitor;
+
+  list<class MOSDOp*> waiting_for_osdcluster;
 
   Mutex osd_lock;
 
@@ -27,15 +52,20 @@ class OSD : public Dispatcher {
   OSD(int id, Messenger *m);
   ~OSD();
   
+  // startup/shutdown
   int init();
   int shutdown();
 
+  // OSDCluster
+  void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
+
+  // messages
   virtual void dispatch(Message *m);
 
   void handle_ping(class MPing *m);
   void handle_op(class MOSDOp *m);
-  void handle_read(MOSDRead *m);
-  void handle_write(MOSDWrite *m);
+  void op_read(class MOSDOp *m);
+  void op_write(class MOSDOp *m);
 };
 
 #endif
index a76d63851c132550733ae717c8f76b9a388b48e4..52664ba85252961928a53605f697334a6d276164 100644 (file)
@@ -6,36 +6,38 @@
 
 // serialize/unserialize
 
-void OSDCluster::_rope(crope& r)
+void OSDCluster::encode(bufferlist& blist)
 {
-  r.append((char*)&version, sizeof(version));
+  blist.append((char*)&version, sizeof(version));
 
   int ngroups = osd_groups.size();
-  r.append((char*)&ngroups, sizeof(ngroups));
+  blist.append((char*)&ngroups, sizeof(ngroups));
   for (int i=0; i<ngroups; i++) {
-       r.append((char*)&osd_groups[i], sizeof(OSDGroup));
+       blist.append((char*)&osd_groups[i], sizeof(OSDGroup));
   }
 
-  // failed
+  _encode(down_osds, blist);
+  _encode(failed_osds, blist);
 }
 
-void OSDCluster::_unrope(crope& r, int& off)
+void OSDCluster::decode(bufferlist& blist)
 {
-  r.copy(off, sizeof(version), (char*)&version);
+  int off = 0;
+  blist.copy(off, sizeof(version), (char*)&version);
   off += sizeof(version);
 
   int ngroups;
-  r.copy(off, sizeof(ngroups), (char*)&ngroups);
+  blist.copy(off, sizeof(ngroups), (char*)&ngroups);
   off += sizeof(ngroups);
 
   osd_groups = vector<OSDGroup>(ngroups);
   for (int i=0; i<ngroups; i++) {
-       r.copy(off, sizeof(OSDGroup), (char*)&osd_groups[i]);
+       blist.copy(off, sizeof(OSDGroup), (char*)&osd_groups[i]);
        off += sizeof(OSDGroup);
   }
 
-  // failed
-
+  _decode(down_osds, blist, off);
+  _decode(failed_osds, blist, off);
 
   init_rush();
 }
index 8a0146198ce734d6df45722dba8e5252f79cc377..8678a2f259ec8725160b93506f8735fae58c6346 100644 (file)
@@ -51,8 +51,9 @@ struct OSDGroup {
  * for mapping (ino, offset, len) to a (list of) byte extents in objects on osds
  */
 struct OSDExtent {
-  int         osds[MAX_REPLICAS];
+  int         osd;
   object_t    oid;
+  repgroup_t  rg;
   size_t      offset, len;
 };
 
@@ -64,6 +65,8 @@ class OSDCluster {
 
   // RUSH disk groups
   vector<OSDGroup> osd_groups;  // RUSH disk groups
+
+  set<int>         down_osds;   // list of down disks
   set<int>         failed_osds; // list of failed disks
 
   Rush             *rush;       // rush implementation
@@ -82,6 +85,8 @@ class OSDCluster {
  public:
   OSDCluster() : version(0), rush(0) { }
 
+  __uint64_t get_version() { return version; }
+
   // cluster state
   bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; }
   
@@ -110,9 +115,35 @@ class OSDCluster {
   }
 
   // serialize, unserialize
-  void _rope(crope& r);
-  void _unrope(crope& r, int& off);
+  //void _rope(crope& r);
+  //void _unrope(crope& r, int& off);
+  void encode(bufferlist& blist);
+  void decode(bufferlist& blist);
+
+
+  /****  ****/
+  int get_rg_primary(repgroup_t rg) {
+       int group[NUM_RUSH_REPLICAS];
+       repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+       for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+         if (failed_osds.count(group[i])) continue;
+         return i;
+       }
+       assert(0);
+       return -1;  // we fail!
 
+  }
+  int get_rg_acting_primary(repgroup_t rg) {
+       int group[NUM_RUSH_REPLICAS];
+       repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+       for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+         if (down_osds.count(group[i])) continue;
+         if (failed_osds.count(group[i])) continue;
+         return i;
+       }
+       assert(0);
+       return -1;  // we fail!
+  }
 
 
   /****   mapping facilities   ****/
@@ -153,7 +184,6 @@ class OSDCluster {
   void file_to_extents(inodeno_t ino,
                                           size_t len,
                                           size_t offset,
-                                          int num_reps,
                                           list<OSDExtent>& extents) {
        size_t cur = offset;
        size_t left = len;
@@ -163,8 +193,8 @@ class OSDCluster {
          // find oid, osds
          size_t blockno = cur / FILE_OBJECT_SIZE;
          ex.oid = file_to_object( ino, blockno );
-         repgroup_t rg = file_to_repgroup(ino, blockno );
-         repgroup_to_osds( rg, ex.osds, num_reps );
+         ex.rg = file_to_repgroup(ino, blockno );
+         ex.osd = get_rg_acting_primary( ex.rg );
 
          // map range into object
          ex.offset = cur % FILE_OBJECT_SIZE;
index 19a5b3757906626621fd73b627574ef777928e29..8c0e8e3fb47dfe9c88061e644df069a3e3ecfb9b 100644 (file)
@@ -4,10 +4,10 @@
 #include "Filer.h"
 #include "OSDCluster.h"
 
-#include "messages/MOSDRead.h"
-#include "messages/MOSDReadReply.h"
-#include "messages/MOSDWrite.h"
-#include "messages/MOSDWriteReply.h"
+//#include "messages/MOSDRead.h"
+//#include "messages/MOSDReadReply.h"
+//#include "messages/MOSDWrite.h"
+//#include "messages/MOSDWriteReply.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 
@@ -34,14 +34,6 @@ Filer::Filer(Messenger *m, OSDCluster *o)
 void Filer::dispatch(Message *m)
 {
   switch (m->get_type()) {
-  case MSG_OSD_READREPLY:
-       handle_osd_read_reply((MOSDReadReply*)m);
-       break;
-       
-  case MSG_OSD_WRITEREPLY:
-       handle_osd_write_reply((MOSDWriteReply*)m);
-       break;
-
   case MSG_OSD_OPREPLY:
        handle_osd_op_reply((MOSDOpReply*)m);
        break;
@@ -86,13 +78,11 @@ Filer::read(inodeno_t ino,
   p->bytes_read = 0;
   p->onfinish = onfinish;
 
-  int num_rep = 1;          // FIXME
-
   // find data
   list<OSDExtent> extents;
-  osdcluster->file_to_extents(ino, len, offset, num_rep, extents);
+  osdcluster->file_to_extents(ino, len, offset, extents);
 
-  dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
+  dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
 
 
   int nfrag = 0;
@@ -104,12 +94,16 @@ Filer::read(inodeno_t ino,
        last_tid++;
        
        // issue read
-       MOSDRead *m = new MOSDRead(last_tid, it->oid, it->len, it->offset);
+       MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+                                                  it->oid, it->rg, osdcluster->get_version(), 
+                                                  OSD_OP_READ);
+       m->set_length(it->len);
+       m->set_offset(it->offset);
        dout(15) << " read on " << last_tid << endl;
-       messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+       messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
 
        // note offset into read buffer
-       p->read_off[it->oid] = off;
+       p->read_off[last_tid] = off;
        off += it->len;
 
        // add to gather set
@@ -123,7 +117,7 @@ Filer::read(inodeno_t ino,
 
 
 void
-Filer::handle_osd_read_reply(MOSDReadReply *m) 
+Filer::handle_osd_read_reply(MOSDOpReply *m) 
 {
   // get pio
   tid_t tid = m->get_tid();
@@ -134,8 +128,8 @@ Filer::handle_osd_read_reply(MOSDReadReply *m)
   op_reads.erase( tid );
 
   // copy result into buffer
-  size_t off = p->read_off[m->get_oid()];
-  dout(7) << "got frag at " << off << " len " << m->get_len() << endl;
+  size_t off = p->read_off[tid];
+  dout(7) << "got frag at " << off << " len " << m->get_length() << endl;
   
   // our op finished
   p->outstanding_ops.erase(tid);
@@ -201,7 +195,6 @@ Filer::write(inodeno_t ino,
                         Context *onfinish)
 {
   last_tid++;
-  int num_rep = 1;
 
   // pending write record
   PendingOSDOp_t *p = new PendingOSDOp_t;
@@ -209,9 +202,9 @@ Filer::write(inodeno_t ino,
   
   // find data
   list<OSDExtent> extents;
-  osdcluster->file_to_extents(ino, len, offset, num_rep, extents);
+  osdcluster->file_to_extents(ino, len, offset, extents);
 
-  dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
+  dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
 
   size_t off = 0;  // ptr into buffer
 
@@ -222,8 +215,12 @@ Filer::write(inodeno_t ino,
        last_tid++;
        
        // issue write
-       MOSDWrite *m = new MOSDWrite(last_tid, it->oid, it->len, it->offset);
-
+       MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+                                                  it->oid, it->rg, osdcluster->get_version(),
+                                                  OSD_OP_WRITE);
+       m->set_length(it->len);
+       m->set_offset(it->offset);
+       
        bufferlist cur;
        cur.substr_of(bl, off, it->len);
        m->set_data(cur);
@@ -236,7 +233,7 @@ Filer::write(inodeno_t ino,
 
        // send
        dout(15) << " write on " << last_tid << endl;
-       messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+       messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
   }
 
   return 0;
@@ -244,7 +241,7 @@ Filer::write(inodeno_t ino,
 
 
 void
-Filer::handle_osd_write_reply(MOSDWriteReply *m)
+Filer::handle_osd_write_reply(MOSDOpReply *m)
 {
   // get pio
   tid_t tid = m->get_tid();
@@ -277,6 +274,25 @@ Filer::handle_osd_write_reply(MOSDWriteReply *m)
 void
 Filer::handle_osd_op_reply(MOSDOpReply *m)
 {
+  // updated cluster info?
+  if (m->get_ocv() && 
+         m->get_ocv() > osdcluster->get_version()) {
+       dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdcluster->get_version() << endl;
+       osdcluster->decode( m->get_osdcluster() );
+  }
+
+
+  // read or write?
+  switch (m->get_op()) {
+  case OSD_OP_READ:
+       handle_osd_read_reply(m);
+       return;
+  case OSD_OP_WRITE:
+       handle_osd_write_reply(m);
+       return;
+  }
+
+
   // get pio
   tid_t tid = m->get_tid();
   dout(15) << "handle_osd_op_reply on " << tid << endl;
@@ -322,17 +338,15 @@ Filer::handle_osd_op_reply(MOSDOpReply *m)
 
 int Filer::remove(inodeno_t ino, size_t size, Context *onfinish)
 {
-  int num_rep = 1;
-
   // pending write record
   PendingOSDOp_t *p = new PendingOSDOp_t;
   p->onfinish = onfinish;
   
   // find data
   list<OSDExtent> extents;
-  osdcluster->file_to_extents(ino, size, 0, num_rep, extents);
+  osdcluster->file_to_extents(ino, size, 0, extents);
 
-  dout(7) << "osd remove ino " << ino << " size " << size << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
+  dout(7) << "osd remove ino " << ino << " size " << size << " in " << extents.size() << " extents" << endl;
 
   size_t off = 0;  // ptr into buffer
 
@@ -342,15 +356,15 @@ int Filer::remove(inodeno_t ino, size_t size, Context *onfinish)
        int r = 0;   // pick a replica
        last_tid++;
        
-       for (int r=0;r<num_rep; r++) {
-         // issue delete
-         MOSDOp *m = new MOSDOp(last_tid, it->oid, OSD_OP_DELETE);
-         messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+       // issue delete
+       MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+                                                  it->oid, it->rg, osdcluster->get_version(),
+                                                  OSD_OP_DELETE);
+       messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
        
-         // add to gather set
-         p->outstanding_ops.insert(last_tid);
-         op_removes[last_tid] = p;
-       }
+       // add to gather set
+       p->outstanding_ops.insert(last_tid);
+       op_removes[last_tid] = p;
   }
 
 }
@@ -379,8 +393,6 @@ int Filer::probe_size(inodeno_t ino, size_t *size, Context *onfinish)
 
 int Filer::mkfs(Context *onfinish)
 {
-  int num_rep = 1;
-  
   dout(7) << "mkfs, wiping all OSDs" << endl;
 
   // pending write record
@@ -398,7 +410,9 @@ int Filer::mkfs(Context *onfinish)
        ++last_tid;
 
        // issue mkfs
-       MOSDOp *m = new MOSDOp(last_tid, 0, OSD_OP_MKFS);
+       MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
+                                                  0, 0, osdcluster->get_version(), 
+                                                  OSD_OP_MKFS);
        messenger->send_message(m, MSG_ADDR_OSD(*it), 0);
        
        // add to gather set
@@ -447,9 +461,12 @@ int Filer::zero(inodeno_t ino,
        
        // issue zero
        MOSDOp *m;
-       if (it->len == new MOSDOp(last_tid, it->oid, OSD_OP_DELETE);
+       //if (it->len == 
+       m = new MOSDOp(last_tid, messenger->get_myaddr(),
+       it->oid, it->rg, osdcluster->get_version(), 
+                                  OSD_OP_DELETE);
        it->len, it->offset);
-       messenger->send_message(m, MSG_ADDR_OSD(it->osds[r]), 0);
+       messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
        
        // add to gather set
        p->outstanding_ops.insert(last_tid);
index d1ace4e6923ab61737b73c58e70de5ea0fde1db9..728b5c36f51e04c611eb02275a4839578bda2e7b 100644 (file)
@@ -105,8 +105,8 @@ class Filer : public Dispatcher {
 
   int mkfs(Context *c);
   
-  void handle_osd_read_reply(class MOSDReadReply *m);
-  void handle_osd_write_reply(class MOSDWriteReply *m);
+  void handle_osd_read_reply(class MOSDOpReply *m);
+  void handle_osd_write_reply(class MOSDOpReply *m);
   void handle_osd_op_reply(class MOSDOpReply *m);
   
 };
index 3a4163cb43958edc85d76b0d1dd0b9206e8614ba..be2298ff838d158cfdf38af8e0ff2084b8e942c0 100644 (file)
@@ -8,12 +8,12 @@ using namespace std;
 int main()
 {
 
-  bufferptr p1 = new buffer("hello",6);
+  bufferptr p1 = new buffer("123456",6);
   bufferptr p2 = p1;
 
   cout << "it is '" << p1.c_str() << "'" << endl;
 
-  bufferptr p3 = new buffer("there",6);
+  bufferptr p3 = new buffer("abcdef",6);
   
   cout << "p3 is " << p3 << endl;
 
@@ -26,10 +26,11 @@ int main()
 
   cout << "len is " << bl.length() << endl;
 
-  bl.splice(3,6);
+  bufferlist took;
+  bl.splice(10,4,&took);
 
-  cout << "bl is now " << bl << endl;
-  cout << "len is " << bl.length() << endl;
+  cout << "took out " << took << "leftover is " << bl << endl;
+  //cout << "len is " << bl.length() << endl;
 
   bufferlist bl2;
   bl2.substr_of(bl, 3, 5);