]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client request/reply encoded into buffers, not ropes
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 28 Jun 2005 19:09:19 +0000 (19:09 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 28 Jun 2005 19:09:19 +0000 (19:09 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@357 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/include/buffer.h
ceph/include/bufferlist.h
ceph/include/filepath.h
ceph/messages/MClientReply.h
ceph/messages/MClientRequest.h
ceph/msg/FakeMessenger.cc
ceph/msg/MPIMessenger.cc
ceph/msg/Message.h
ceph/msg/TCPMessenger.cc

index e6f02e842924e52c3a02b4a762fc78e11bd9e72d..ce0382fcae84bab4331a552a996241400207d7aa 100644 (file)
@@ -72,16 +72,21 @@ class buffer {
        }
   }
   
-  buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) : 
+  buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT, int alloc_len=0) : 
        _dataptr(0), 
         _len(l), 
-        _alloc_len(l), 
         _ref(0),
         _myptr(0) {
+       
+       if (alloc_len) 
+         _alloc_len = alloc_len;
+       else
+         _alloc_len = l;
+
        _myptr = mode & BUFFER_MODE_FREE ? true:false;
        bdbout(1) << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl;
        if (mode & BUFFER_MODE_COPY) {
-         _dataptr = new char[l];
+         _dataptr = new char[_alloc_len];
          bdbout(1) << "buffer.malloc " << (void*)_dataptr << endl;
          buffer_total_alloc += _alloc_len;
          memcpy(_dataptr, p, l);
@@ -110,6 +115,7 @@ class buffer {
        _len = l;
   }
   int length() { return _len; }
+  int unused_tail_length() { return _alloc_len - _len; }
 
   friend ostream& operator<<(ostream& out, buffer& b);
 };
@@ -191,6 +197,13 @@ class bufferptr {
   }
 
 
+  bool at_buffer_head() {
+       return _off == 0;
+  }
+  bool at_buffer_tail() {
+       return _off + _len == _buffer->_len;
+  }
+
   // accessors for my subset
   char *c_str() {
        return _buffer->_dataptr + _off;
@@ -201,6 +214,11 @@ class bufferptr {
   int offset() {
        return _off;
   }
+  int unused_tail_length() {
+       if (!at_buffer_tail()) return 0;
+       return _buffer->unused_tail_length();
+  }
+
 
 
   // modifiers
index 6ed712086aa003b3a718f0e7f126c41a6ede90d4..0ff2801fe5335c4d26d708df6625362823dbc0bd 100644 (file)
@@ -179,9 +179,31 @@ class bufferlist {
 
   void append(const char *data, int len) {
        if (len == 0) return;
+
+       int alen = 0;
        
-       // just add another buffer
-       push_back(new buffer(data, len));  
+       // copy into the tail buffer?
+       if (!_buffers.empty()) {
+         int avail = _buffers.back().unused_tail_length();
+         if (avail > 0) {
+               //cout << "copying up to " << len << " into tail " << avail << " bytes of tail buf" << endl;
+               if (avail > len) 
+                 avail = len;
+               int blen = _buffers.back().length();
+               memcpy(_buffers.back().c_str() + blen, data, avail);
+               blen += avail;
+               _buffers.back().set_length(blen);
+               data += avail;
+               len -= avail;
+         }
+         alen = _buffers.back().length();
+       }
+       if (len == 0) return;
+
+       // just add another buffer.
+       // alloc a bit extra, in case we do a bunch of appends.   FIXME be smarter!
+       if (alen < 128) alen = 128;
+       push_back(new buffer(data, len, BUFFER_MODE_DEFAULT, alen));  
   }
   void append(bufferptr& bp) {
        push_back(bp);
@@ -328,6 +350,18 @@ inline ostream& operator<<(ostream& out, bufferlist& bl) {
 
 // encoder/decode helpers
 
+// string
+inline void _encode(string& s, bufferlist& bl) 
+{
+  bl.append(s.c_str(), s.length()+1);
+}
+inline void _decode(string& s, bufferlist& bl, int& off)
+{
+  s = bl.c_str() + off;
+  off += s.length() + 1;
+}
+
+
 // set<int>
 inline void _encode(set<int>& s, bufferlist& bl)
 {
index 9424ae5cd7cd9265db7958ae6a0638ecee1a0a6a..b3893bd94fd7b5c86abe9da4cb78ec9c9f1d5bef 100644 (file)
@@ -17,6 +17,9 @@ using namespace std;
 #include <ext/rope>
 using namespace __gnu_cxx;
 
+#include "bufferlist.h"
+
+
 class filepath {
   string path;
   vector<string> bits;
@@ -137,6 +140,29 @@ class filepath {
     }
   }
 
+  void _encode(bufferlist& bl) {
+    char n = bits.size();
+    bl.append((char*)&n, sizeof(char));
+    for (vector<string>::iterator it = bits.begin();
+         it != bits.end();
+         it++) { 
+      bl.append((*it).c_str(), (*it).length()+1);
+    }
+  }
+
+  void _decode(bufferlist& bl, int& off) {
+    clear();
+
+    char n;
+    bl.copy(off, sizeof(char), (char*)&n);
+    off += sizeof(char);
+    for (int i=0; i<n; i++) {
+      string s = bl.c_str() + off;
+      off += s.length() + 1;
+         add_dentry(s);
+    }
+  }
+
 };
 
 inline ostream& operator<<(ostream& out, filepath& path)
index 715ae51c0d26f354cbac1ed21dd398d19045b617..1706379b3408cc988b1d8f2dfc4577ade77a06c9 100644 (file)
@@ -62,27 +62,27 @@ class c_inode_info {
        in->get_dist_spec(this->dist, whoami);
   }
   
-  void _rope(crope &s) {
-       s.append((char*)&inode, sizeof(inode));
-       s.append((char*)&inode_soft_valid, sizeof(inode_soft_valid));
-       s.append((char*)&inode_hard_valid, sizeof(inode_hard_valid));
-
-       ::_rope(ref_dn, s);
-       ::_rope(symlink, s);
-       ::_rope(dist, s);       // distn
+  void _encode(bufferlist &bl) {
+       bl.append((char*)&inode, sizeof(inode));
+       bl.append((char*)&inode_soft_valid, sizeof(inode_soft_valid));
+       bl.append((char*)&inode_hard_valid, sizeof(inode_hard_valid));
+
+       ::_encode(ref_dn, bl);
+       ::_encode(symlink, bl);
+       ::_encode(dist, bl);    // distn
   }
   
-  void _unrope(crope &s, int& off) {
-       s.copy(off, sizeof(inode), (char*)&inode);
+  void _decode(bufferlist &bl, int& off) {
+       bl.copy(off, sizeof(inode), (char*)&inode);
        off += sizeof(inode);
-       s.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid);
+       bl.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid);
        off += sizeof(inode_soft_valid);
-       s.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid);
+       bl.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid);
        off += sizeof(inode_hard_valid);
 
-       ::_unrope(ref_dn, s, off);
-       ::_unrope(symlink, s, off);
-       ::_unrope(dist, s, off);
+       ::_decode(ref_dn, bl, off);
+       ::_decode(symlink, bl, off);
+       ::_decode(dist, bl, off);
   }
 };
 
@@ -146,40 +146,40 @@ class MClientReply : public Message {
 
 
   // serialization
-  virtual void decode_payload(crope& s, int& off) {
-       s.copy(off, sizeof(st), (char*)&st);
+  virtual void decode_payload() {
+       int off = 0;
+       payload.copy(off, sizeof(st), (char*)&st);
        off += sizeof(st);
 
-       path = s.c_str() + off;
-       off += path.length() + 1;
+       _decode(path, payload, off);
 
        for (int i=0; i<st.trace_depth; i++) {
          c_inode_info *ci = new c_inode_info;
-         ci->_unrope(s, off);
+         ci->_decode(payload, off);
          trace.push_back(ci);
        }
 
        if (st.dir_size) {
          for (int i=0; i<st.dir_size; i++) {
                c_inode_info *ci = new c_inode_info;
-               ci->_unrope(s, off);
+               ci->_decode(payload, off);
                dir_contents.push_back(ci);
          }
        }
   }
-  virtual void encode_payload(crope& r) {
+  virtual void encode_payload() {
        st.dir_size = dir_contents.size();
        st.trace_depth = trace.size();
        
-       r.append((char*)&st, sizeof(st));
-       r.append(path.c_str(), path.length()+1);
+       payload.append((char*)&st, sizeof(st));
+       _encode(path, payload);
 
        vector<c_inode_info*>::iterator it;
        for (it = trace.begin(); it != trace.end(); it++) 
-         (*it)->_rope(r);
+         (*it)->_encode(payload);
 
        for (it = dir_contents.begin(); it != dir_contents.end(); it++) 
-         (*it)->_rope(r);
+         (*it)->_encode(payload);
   }
 
   // builders
index 1b667a6422e32adf66343e4392e975ed5407f974..80036b9393e74e32aeb3f1dd90456c5239ee3a18 100644 (file)
@@ -101,19 +101,20 @@ class MClientRequest : public Message {
   string& get_sarg2() { return sarg2; }
   size_t get_sizearg() { return st.sizearg; }
 
-  virtual void decode_payload(crope& s, int& off) {
-       s.copy(off, sizeof(st), (char*)&st);
+  virtual void decode_payload() {
+       int off;
+       payload.copy(off, sizeof(st), (char*)&st);
        off += sizeof(st);
-       path._unrope(s, off);
-       _unrope(sarg, s, off);
-       _unrope(sarg2, s, off);
+       path._decode(payload, off);
+       _decode(sarg, payload, off);
+       _decode(sarg2, payload, off);
   }
 
-  virtual void encode_payload(crope& r) {
-       r.append((char*)&st, sizeof(st));
-       path._rope(r);
-       _rope(sarg, r);
-       _rope(sarg2, r);
+  virtual void encode_payload() {
+       payload.append((char*)&st, sizeof(st));
+       path._encode(payload);
+       _encode(sarg, payload);
+       _encode(sarg2, payload);
   }
 };
 
index 32e03e269e58e9a10a3b17c2889b169366f1a99b..02ea99cc8db0fa17260b9e4c7cf352fbe22081e4 100644 (file)
@@ -155,6 +155,7 @@ int fakemessenger_do_loop_2()
                
                if (g_conf.fakemessenger_serialize) {
                  // encode
+                 m->reset_payload();
                  m->encode_payload();
                  msg_envelope_t env = m->get_envelope();
                  bufferlist bl = m->get_payload();
index 9748998da3895e473a8d413f8ee46443442ede6e..e7406bd89e55f535627f932ee32158f204a1e28e 100644 (file)
@@ -241,6 +241,7 @@ int mpi_send(Message *m, int tag)
   } 
 
   // marshall
+  m->reset_payload();
   m->encode_payload();
   msg_envelope_t *env = &m->get_envelope();
   bufferlist blist = m->get_payload();
index 83b739a80e55013bd17f25d300d977865468fd36..4eb97457d1df8913690aa245a6c824de236005a5 100644 (file)
@@ -192,6 +192,10 @@ class Message {
 
 
   // PAYLOAD ----
+  void reset_payload() {
+       payload.clear();
+  }
+
   // overload either the rope version (easier!)
   virtual void encode_payload(crope& s)           { assert(0); }
   virtual void decode_payload(crope& s, int& off) { assert(0); }
@@ -207,12 +211,13 @@ class Message {
        assert(off == payload.length());
   }
   virtual void encode_payload() {
+       assert(payload.length() == 0);  // caller should reset payload
+
        // use crope for convenience, small messages. FIXME someday.
        crope r;
        encode_payload(r);
 
        // copy payload
-       payload.clear();
        payload.push_back( new buffer(r.c_str(), r.length()) );
   }
   
index 2209d256a42d09222133ce40228f46302384f7c8..23b3fb3252cc6efbeb0e2f6d01d946486fc3c674 100644 (file)
@@ -412,6 +412,7 @@ int tcp_send(Message *m)
   int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world);
 
   // marshall
+  m->reset_payload();
   m->encode_payload();
   msg_envelope_t *env = &m->get_envelope();
   bufferlist blist = m->get_payload();