}
}
- buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) :
+ buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT, int alloc_len=0) :
_dataptr(0),
_len(l),
- _alloc_len(l),
_ref(0),
_myptr(0) {
+
+ if (alloc_len)
+ _alloc_len = alloc_len;
+ else
+ _alloc_len = l;
+
_myptr = mode & BUFFER_MODE_FREE ? true:false;
bdbout(1) << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl;
if (mode & BUFFER_MODE_COPY) {
- _dataptr = new char[l];
+ _dataptr = new char[_alloc_len];
bdbout(1) << "buffer.malloc " << (void*)_dataptr << endl;
buffer_total_alloc += _alloc_len;
memcpy(_dataptr, p, l);
_len = l;
}
int length() { return _len; }
+ int unused_tail_length() { return _alloc_len - _len; }
friend ostream& operator<<(ostream& out, buffer& b);
};
}
+ bool at_buffer_head() {
+ return _off == 0;
+ }
+ bool at_buffer_tail() {
+ return _off + _len == _buffer->_len;
+ }
+
// accessors for my subset
char *c_str() {
return _buffer->_dataptr + _off;
int offset() {
return _off;
}
+ int unused_tail_length() {
+ if (!at_buffer_tail()) return 0;
+ return _buffer->unused_tail_length();
+ }
+
// modifiers
void append(const char *data, int len) {
if (len == 0) return;
+
+ int alen = 0;
- // just add another buffer
- push_back(new buffer(data, len));
+ // copy into the tail buffer?
+ if (!_buffers.empty()) {
+ int avail = _buffers.back().unused_tail_length();
+ if (avail > 0) {
+ //cout << "copying up to " << len << " into tail " << avail << " bytes of tail buf" << endl;
+ if (avail > len)
+ avail = len;
+ int blen = _buffers.back().length();
+ memcpy(_buffers.back().c_str() + blen, data, avail);
+ blen += avail;
+ _buffers.back().set_length(blen);
+ data += avail;
+ len -= avail;
+ }
+ alen = _buffers.back().length();
+ }
+ if (len == 0) return;
+
+ // just add another buffer.
+ // alloc a bit extra, in case we do a bunch of appends. FIXME be smarter!
+ if (alen < 128) alen = 128;
+ push_back(new buffer(data, len, BUFFER_MODE_DEFAULT, alen));
}
void append(bufferptr& bp) {
push_back(bp);
// encoder/decode helpers
+// string
+inline void _encode(string& s, bufferlist& bl)
+{
+ bl.append(s.c_str(), s.length()+1);
+}
+inline void _decode(string& s, bufferlist& bl, int& off)
+{
+ s = bl.c_str() + off;
+ off += s.length() + 1;
+}
+
+
// set<int>
inline void _encode(set<int>& s, bufferlist& bl)
{
#include <ext/rope>
using namespace __gnu_cxx;
+#include "bufferlist.h"
+
+
class filepath {
string path;
vector<string> bits;
}
}
+ void _encode(bufferlist& bl) {
+ char n = bits.size();
+ bl.append((char*)&n, sizeof(char));
+ for (vector<string>::iterator it = bits.begin();
+ it != bits.end();
+ it++) {
+ bl.append((*it).c_str(), (*it).length()+1);
+ }
+ }
+
+ void _decode(bufferlist& bl, int& off) {
+ clear();
+
+ char n;
+ bl.copy(off, sizeof(char), (char*)&n);
+ off += sizeof(char);
+ for (int i=0; i<n; i++) {
+ string s = bl.c_str() + off;
+ off += s.length() + 1;
+ add_dentry(s);
+ }
+ }
+
};
inline ostream& operator<<(ostream& out, filepath& path)
in->get_dist_spec(this->dist, whoami);
}
- void _rope(crope &s) {
- s.append((char*)&inode, sizeof(inode));
- s.append((char*)&inode_soft_valid, sizeof(inode_soft_valid));
- s.append((char*)&inode_hard_valid, sizeof(inode_hard_valid));
-
- ::_rope(ref_dn, s);
- ::_rope(symlink, s);
- ::_rope(dist, s); // distn
+ void _encode(bufferlist &bl) {
+ bl.append((char*)&inode, sizeof(inode));
+ bl.append((char*)&inode_soft_valid, sizeof(inode_soft_valid));
+ bl.append((char*)&inode_hard_valid, sizeof(inode_hard_valid));
+
+ ::_encode(ref_dn, bl);
+ ::_encode(symlink, bl);
+ ::_encode(dist, bl); // distn
}
- void _unrope(crope &s, int& off) {
- s.copy(off, sizeof(inode), (char*)&inode);
+ void _decode(bufferlist &bl, int& off) {
+ bl.copy(off, sizeof(inode), (char*)&inode);
off += sizeof(inode);
- s.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid);
+ bl.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid);
off += sizeof(inode_soft_valid);
- s.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid);
+ bl.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid);
off += sizeof(inode_hard_valid);
- ::_unrope(ref_dn, s, off);
- ::_unrope(symlink, s, off);
- ::_unrope(dist, s, off);
+ ::_decode(ref_dn, bl, off);
+ ::_decode(symlink, bl, off);
+ ::_decode(dist, bl, off);
}
};
// serialization
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(st), (char*)&st);
+ virtual void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(st), (char*)&st);
off += sizeof(st);
- path = s.c_str() + off;
- off += path.length() + 1;
+ _decode(path, payload, off);
for (int i=0; i<st.trace_depth; i++) {
c_inode_info *ci = new c_inode_info;
- ci->_unrope(s, off);
+ ci->_decode(payload, off);
trace.push_back(ci);
}
if (st.dir_size) {
for (int i=0; i<st.dir_size; i++) {
c_inode_info *ci = new c_inode_info;
- ci->_unrope(s, off);
+ ci->_decode(payload, off);
dir_contents.push_back(ci);
}
}
}
- virtual void encode_payload(crope& r) {
+ virtual void encode_payload() {
st.dir_size = dir_contents.size();
st.trace_depth = trace.size();
- r.append((char*)&st, sizeof(st));
- r.append(path.c_str(), path.length()+1);
+ payload.append((char*)&st, sizeof(st));
+ _encode(path, payload);
vector<c_inode_info*>::iterator it;
for (it = trace.begin(); it != trace.end(); it++)
- (*it)->_rope(r);
+ (*it)->_encode(payload);
for (it = dir_contents.begin(); it != dir_contents.end(); it++)
- (*it)->_rope(r);
+ (*it)->_encode(payload);
}
// builders
string& get_sarg2() { return sarg2; }
size_t get_sizearg() { return st.sizearg; }
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(st), (char*)&st);
+ virtual void decode_payload() {
+ int off;
+ payload.copy(off, sizeof(st), (char*)&st);
off += sizeof(st);
- path._unrope(s, off);
- _unrope(sarg, s, off);
- _unrope(sarg2, s, off);
+ path._decode(payload, off);
+ _decode(sarg, payload, off);
+ _decode(sarg2, payload, off);
}
- virtual void encode_payload(crope& r) {
- r.append((char*)&st, sizeof(st));
- path._rope(r);
- _rope(sarg, r);
- _rope(sarg2, r);
+ virtual void encode_payload() {
+ payload.append((char*)&st, sizeof(st));
+ path._encode(payload);
+ _encode(sarg, payload);
+ _encode(sarg2, payload);
}
};
if (g_conf.fakemessenger_serialize) {
// encode
+ m->reset_payload();
m->encode_payload();
msg_envelope_t env = m->get_envelope();
bufferlist bl = m->get_payload();
}
// marshall
+ m->reset_payload();
m->encode_payload();
msg_envelope_t *env = &m->get_envelope();
bufferlist blist = m->get_payload();
// PAYLOAD ----
+ void reset_payload() {
+ payload.clear();
+ }
+
// overload either the rope version (easier!)
virtual void encode_payload(crope& s) { assert(0); }
virtual void decode_payload(crope& s, int& off) { assert(0); }
assert(off == payload.length());
}
virtual void encode_payload() {
+ assert(payload.length() == 0); // caller should reset payload
+
// use crope for convenience, small messages. FIXME someday.
crope r;
encode_payload(r);
// copy payload
- payload.clear();
payload.push_back( new buffer(r.c_str(), r.length()) );
}
int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world);
// marshall
+ m->reset_payload();
m->encode_payload();
msg_envelope_t *env = &m->get_envelope();
bufferlist blist = m->get_payload();