From 3d7b6b3513ec047b444dd90a441bb95ee1d750d5 Mon Sep 17 00:00:00 2001 From: sage Date: Tue, 28 Jun 2005 19:09:19 +0000 Subject: [PATCH] client request/reply encoded into buffers, not ropes git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@357 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/include/buffer.h | 24 ++++++++++++++-- ceph/include/bufferlist.h | 38 +++++++++++++++++++++++-- ceph/include/filepath.h | 26 +++++++++++++++++ ceph/messages/MClientReply.h | 52 +++++++++++++++++----------------- ceph/messages/MClientRequest.h | 21 +++++++------- ceph/msg/FakeMessenger.cc | 1 + ceph/msg/MPIMessenger.cc | 1 + ceph/msg/Message.h | 7 ++++- ceph/msg/TCPMessenger.cc | 1 + 9 files changed, 129 insertions(+), 42 deletions(-) diff --git a/ceph/include/buffer.h b/ceph/include/buffer.h index e6f02e842924e..ce0382fcae84b 100644 --- a/ceph/include/buffer.h +++ b/ceph/include/buffer.h @@ -72,16 +72,21 @@ class buffer { } } - buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) : + buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT, int alloc_len=0) : _dataptr(0), _len(l), - _alloc_len(l), _ref(0), _myptr(0) { + + if (alloc_len) + _alloc_len = alloc_len; + else + _alloc_len = l; + _myptr = mode & BUFFER_MODE_FREE ? true:false; bdbout(1) << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl; if (mode & BUFFER_MODE_COPY) { - _dataptr = new char[l]; + _dataptr = new char[_alloc_len]; bdbout(1) << "buffer.malloc " << (void*)_dataptr << endl; buffer_total_alloc += _alloc_len; memcpy(_dataptr, p, l); @@ -110,6 +115,7 @@ class buffer { _len = l; } int length() { return _len; } + int unused_tail_length() { return _alloc_len - _len; } friend ostream& operator<<(ostream& out, buffer& b); }; @@ -191,6 +197,13 @@ class bufferptr { } + bool at_buffer_head() { + return _off == 0; + } + bool at_buffer_tail() { + return _off + _len == _buffer->_len; + } + // accessors for my subset char *c_str() { return _buffer->_dataptr + _off; @@ -201,6 +214,11 @@ class bufferptr { int offset() { return _off; } + int unused_tail_length() { + if (!at_buffer_tail()) return 0; + return _buffer->unused_tail_length(); + } + // modifiers diff --git a/ceph/include/bufferlist.h b/ceph/include/bufferlist.h index 6ed712086aa00..0ff2801fe5335 100644 --- a/ceph/include/bufferlist.h +++ b/ceph/include/bufferlist.h @@ -179,9 +179,31 @@ class bufferlist { void append(const char *data, int len) { if (len == 0) return; + + int alen = 0; - // just add another buffer - push_back(new buffer(data, len)); + // copy into the tail buffer? + if (!_buffers.empty()) { + int avail = _buffers.back().unused_tail_length(); + if (avail > 0) { + //cout << "copying up to " << len << " into tail " << avail << " bytes of tail buf" << endl; + if (avail > len) + avail = len; + int blen = _buffers.back().length(); + memcpy(_buffers.back().c_str() + blen, data, avail); + blen += avail; + _buffers.back().set_length(blen); + data += avail; + len -= avail; + } + alen = _buffers.back().length(); + } + if (len == 0) return; + + // just add another buffer. + // alloc a bit extra, in case we do a bunch of appends. FIXME be smarter! + if (alen < 128) alen = 128; + push_back(new buffer(data, len, BUFFER_MODE_DEFAULT, alen)); } void append(bufferptr& bp) { push_back(bp); @@ -328,6 +350,18 @@ inline ostream& operator<<(ostream& out, bufferlist& bl) { // encoder/decode helpers +// string +inline void _encode(string& s, bufferlist& bl) +{ + bl.append(s.c_str(), s.length()+1); +} +inline void _decode(string& s, bufferlist& bl, int& off) +{ + s = bl.c_str() + off; + off += s.length() + 1; +} + + // set inline void _encode(set& s, bufferlist& bl) { diff --git a/ceph/include/filepath.h b/ceph/include/filepath.h index 9424ae5cd7cd9..b3893bd94fd7b 100644 --- a/ceph/include/filepath.h +++ b/ceph/include/filepath.h @@ -17,6 +17,9 @@ using namespace std; #include using namespace __gnu_cxx; +#include "bufferlist.h" + + class filepath { string path; vector bits; @@ -137,6 +140,29 @@ class filepath { } } + void _encode(bufferlist& bl) { + char n = bits.size(); + bl.append((char*)&n, sizeof(char)); + for (vector::iterator it = bits.begin(); + it != bits.end(); + it++) { + bl.append((*it).c_str(), (*it).length()+1); + } + } + + void _decode(bufferlist& bl, int& off) { + clear(); + + char n; + bl.copy(off, sizeof(char), (char*)&n); + off += sizeof(char); + for (int i=0; iget_dist_spec(this->dist, whoami); } - void _rope(crope &s) { - s.append((char*)&inode, sizeof(inode)); - s.append((char*)&inode_soft_valid, sizeof(inode_soft_valid)); - s.append((char*)&inode_hard_valid, sizeof(inode_hard_valid)); - - ::_rope(ref_dn, s); - ::_rope(symlink, s); - ::_rope(dist, s); // distn + void _encode(bufferlist &bl) { + bl.append((char*)&inode, sizeof(inode)); + bl.append((char*)&inode_soft_valid, sizeof(inode_soft_valid)); + bl.append((char*)&inode_hard_valid, sizeof(inode_hard_valid)); + + ::_encode(ref_dn, bl); + ::_encode(symlink, bl); + ::_encode(dist, bl); // distn } - void _unrope(crope &s, int& off) { - s.copy(off, sizeof(inode), (char*)&inode); + void _decode(bufferlist &bl, int& off) { + bl.copy(off, sizeof(inode), (char*)&inode); off += sizeof(inode); - s.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid); + bl.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid); off += sizeof(inode_soft_valid); - s.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid); + bl.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid); off += sizeof(inode_hard_valid); - ::_unrope(ref_dn, s, off); - ::_unrope(symlink, s, off); - ::_unrope(dist, s, off); + ::_decode(ref_dn, bl, off); + ::_decode(symlink, bl, off); + ::_decode(dist, bl, off); } }; @@ -146,40 +146,40 @@ class MClientReply : public Message { // serialization - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(st), (char*)&st); + virtual void decode_payload() { + int off = 0; + payload.copy(off, sizeof(st), (char*)&st); off += sizeof(st); - path = s.c_str() + off; - off += path.length() + 1; + _decode(path, payload, off); for (int i=0; i_unrope(s, off); + ci->_decode(payload, off); trace.push_back(ci); } if (st.dir_size) { for (int i=0; i_unrope(s, off); + ci->_decode(payload, off); dir_contents.push_back(ci); } } } - virtual void encode_payload(crope& r) { + virtual void encode_payload() { st.dir_size = dir_contents.size(); st.trace_depth = trace.size(); - r.append((char*)&st, sizeof(st)); - r.append(path.c_str(), path.length()+1); + payload.append((char*)&st, sizeof(st)); + _encode(path, payload); vector::iterator it; for (it = trace.begin(); it != trace.end(); it++) - (*it)->_rope(r); + (*it)->_encode(payload); for (it = dir_contents.begin(); it != dir_contents.end(); it++) - (*it)->_rope(r); + (*it)->_encode(payload); } // builders diff --git a/ceph/messages/MClientRequest.h b/ceph/messages/MClientRequest.h index 1b667a6422e32..80036b9393e74 100644 --- a/ceph/messages/MClientRequest.h +++ b/ceph/messages/MClientRequest.h @@ -101,19 +101,20 @@ class MClientRequest : public Message { string& get_sarg2() { return sarg2; } size_t get_sizearg() { return st.sizearg; } - virtual void decode_payload(crope& s, int& off) { - s.copy(off, sizeof(st), (char*)&st); + virtual void decode_payload() { + int off; + payload.copy(off, sizeof(st), (char*)&st); off += sizeof(st); - path._unrope(s, off); - _unrope(sarg, s, off); - _unrope(sarg2, s, off); + path._decode(payload, off); + _decode(sarg, payload, off); + _decode(sarg2, payload, off); } - virtual void encode_payload(crope& r) { - r.append((char*)&st, sizeof(st)); - path._rope(r); - _rope(sarg, r); - _rope(sarg2, r); + virtual void encode_payload() { + payload.append((char*)&st, sizeof(st)); + path._encode(payload); + _encode(sarg, payload); + _encode(sarg2, payload); } }; diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index 32e03e269e58e..02ea99cc8db0f 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -155,6 +155,7 @@ int fakemessenger_do_loop_2() if (g_conf.fakemessenger_serialize) { // encode + m->reset_payload(); m->encode_payload(); msg_envelope_t env = m->get_envelope(); bufferlist bl = m->get_payload(); diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index 9748998da3895..e7406bd89e55f 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -241,6 +241,7 @@ int mpi_send(Message *m, int tag) } // marshall + m->reset_payload(); m->encode_payload(); msg_envelope_t *env = &m->get_envelope(); bufferlist blist = m->get_payload(); diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 83b739a80e550..4eb97457d1df8 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -192,6 +192,10 @@ class Message { // PAYLOAD ---- + void reset_payload() { + payload.clear(); + } + // overload either the rope version (easier!) virtual void encode_payload(crope& s) { assert(0); } virtual void decode_payload(crope& s, int& off) { assert(0); } @@ -207,12 +211,13 @@ class Message { assert(off == payload.length()); } virtual void encode_payload() { + assert(payload.length() == 0); // caller should reset payload + // use crope for convenience, small messages. FIXME someday. crope r; encode_payload(r); // copy payload - payload.clear(); payload.push_back( new buffer(r.c_str(), r.length()) ); } diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index 2209d256a42d0..23b3fb3252cc6 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -412,6 +412,7 @@ int tcp_send(Message *m) int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world); // marshall + m->reset_payload(); m->encode_payload(); msg_envelope_t *env = &m->get_envelope(); bufferlist blist = m->get_payload(); -- 2.39.5