+- change same_inst_since to align with "in" set
+- tag MClientRequest with mdsmap v
+- push new mdsmap to clients on send_message_client, based on the tag?
+ - hrm, what about exports and stale caps wonkiness... there's a race with the REAP. hmm.
+
+
some smallish projects:
- crush rewrite in C
- pg_bit/pg_num changes
- report crashed pgs?
+messenger
+- fix messenger shutdown.. we shouldn't delete messenger, since the caller may be referencing it, etc.
+
simplemessenger
- close idle connections
- buffer sent messages until a receive is acknowledged (handshake!)
char *c_str() { assert(_raw); return _raw->data + _off; }
unsigned length() const { return _len; }
unsigned offset() const { return _off; }
- unsigned unused_tail_length() const { return _raw->len - (_off+_len); }
+ unsigned start() const { return _off; }
+ unsigned end() const { return _off + _len; }
+ unsigned unused_tail_length() const {
+ if (_raw)
+ return _raw->len - (_off+_len);
+ else
+ return 0;
+ }
const char& operator[](unsigned n) const {
assert(_raw);
assert(n < _len);
// my private bits
std::list<ptr> _buffers;
unsigned _len;
+ ptr append_buffer; // where i put small appends.
public:
// cons/des
const std::list<ptr>& buffers() const { return _buffers; }
unsigned length() const {
-#if 0
+#if 1
// DEBUG: verify _len
unsigned len = 0;
- for (std::list<ptr>::iterator it = _buffers.begin();
+ for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
it++) {
len += (*it).length();
void append(const char *data, unsigned len) {
- if (len == 0) return;
-
- unsigned alen = 0;
-
- // copy into the tail buffer?
- if (!_buffers.empty()) {
- unsigned avail = _buffers.back().unused_tail_length();
- if (avail > 0) {
- //std::cout << "copying up to " << len << " into tail " << avail << " bytes of tail buf " << _buffers.back() << std::endl;
- if (avail > len)
- avail = len;
- _buffers.back().append(data, avail);
- _len += avail;
- data += avail;
- len -= avail;
+ while (len > 0) {
+ // put what we can into the existing append_buffer.
+ if (append_buffer.unused_tail_length() > 0) {
+ unsigned gap = append_buffer.unused_tail_length();
+ if (gap > len) gap = len;
+ append_buffer.append(data, gap);
+ append(append_buffer, append_buffer.end() - gap, gap); // add segment to the list
+ len -= gap;
+ data += gap;
}
- alen = _buffers.back().length();
+ if (len == 0) break; // done!
+
+ // make a new append_buffer!
+ unsigned alen = BUFFER_PAGE_SIZE * (((len-1) / BUFFER_PAGE_SIZE) + 1);
+ append_buffer = create_page_aligned(alen);
+ append_buffer.set_length(0); // unused, so far.
}
- if (len == 0) return;
-
- // just add another buffer.
- // alloc a bit extra, in case we do a bunch of appends. FIXME be smarter!
- if (alen < 4096) alen = 4096;
- ptr bp = create(alen);
- bp.set_length(len);
- bp.copy_in(0, len, data);
- push_back(bp);
}
void append(ptr& bp) {
push_back(bp);
push_back(tempbp);
}
void append(const list& bl) {
- list temp(bl); // copy list
- claim_append(temp); // and append
+ _len += bl._len;
+ for (std::list<ptr>::const_iterator p = bl._buffers.begin();
+ p != bl._buffers.end();
+ ++p)
+ _buffers.push_back(*p);
}
}
}
- void substr_of(list& other, unsigned off, unsigned len) {
+ void substr_of(const list& other, unsigned off, unsigned len) {
assert(off + len <= other.length());
clear();
// skip off
- std::list<ptr>::iterator curbuf = other._buffers.begin();
+ std::list<ptr>::const_iterator curbuf = other._buffers.begin();
while (off > 0) {
assert(curbuf != _buffers.end());
if (off >= (*curbuf).length()) {
inline void _encode(const bufferlist& s, bufferlist& bl)
{
uint32_t len = s.length();
+ cout << "_encode bufferlist len " << len << endl;
_encoderaw(len, bl);
bl.append(s);
}
bool anchored; // auth only?
// file (data access)
- off_t size, max_size;
+ off_t size, max_size, allocated_size;
utime_t mtime; // file data modify time.
utime_t atime; // file data access time.
}
- MOSDMap() :
- Message(MSG_OSD_MAP) {}
- MOSDMap(OSDMap *oc) :
- Message(MSG_OSD_MAP) {
+ MOSDMap() : Message(MSG_OSD_MAP) { }
+ MOSDMap(OSDMap *oc) : Message(MSG_OSD_MAP) {
oc->encode(maps[oc->get_epoch()]);
}
// reply with latest mds, osd maps
mon->mdsmon->send_latest(to);
- mon->osdmon->send_latest(0, to);
+ mon->osdmon->send_latest(to);
delete m;
}
{
if (m->get_state() == MDSMap::STATE_BOOT) {
dout(10) << "_updated (booted) mds" << from << " " << *m << endl;
- mon->osdmon->send_latest(0, mdsmap.get_inst(from));
+ mon->osdmon->send_latest(mdsmap.get_inst(from));
} else {
dout(10) << "_updated mds" << from << " " << *m << endl;
}
// tell a random osd
int osd = rand() % g_conf.num_osd;
- send_latest(0, osdmap.get_inst(osd));
+ send_latest(osdmap.get_inst(osd));
}
}
propose_pending();
- send_latest(0, osdmap.get_inst(r)); // after
+ send_latest(osdmap.get_inst(r)); // after
}
void OSDMonitor::_reported_failure(MOSDFailure *m)
{
dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_from() << endl;
- send_latest(m->get_epoch(), m->get_from());
+ send_latest(m->get_from(), m->get_epoch());
}
void OSDMonitor::_booted(MOSDBoot *m)
{
dout(7) << "_booted " << m->inst << endl;
- send_latest(m->sb.current_epoch, m->inst);
+ send_latest(m->inst, m->sb.current_epoch);
delete m;
}
}
}
-void OSDMonitor::send_latest(epoch_t since, entity_inst_t who)
+void OSDMonitor::send_latest(entity_inst_t who, epoch_t since)
{
if (paxos->is_readable()) {
dout(5) << "send_latest to " << who << " now" << endl;
- if (since)
- send_incremental(since, who);
- else
+ if (since == (epoch_t)(-1))
send_full(who);
+ else
+ send_incremental(since, who);
} else {
dout(5) << "send_latest to " << who << " later" << endl;
awaiting_map[who.name].first = who;
e--) {
bufferlist bl;
if (mon->store->get_bl_sn(bl, "osdmap", e) > 0) {
- dout(20) << "send_incremental inc " << e << endl;
+ dout(20) << "send_incremental inc " << e << " " << bl.length() << " bytes" << endl;
m->incremental_maps[e] = bl;
}
else if (mon->store->get_bl_sn(bl, "osdmap_full", e) > 0) {
void mark_all_down();
- void send_latest(epoch_t since, entity_inst_t i);
+ void send_latest(entity_inst_t i, epoch_t since=(epoch_t)(-1));
void fake_osd_failure(int osd, bool down);
void fake_osdmap_update();
int FakeStore::write(object_t oid,
off_t offset, size_t len,
- bufferlist& bl,
+ const bufferlist& bl,
Context *onsafe)
{
dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
int remove(object_t oid, Context *onsafe);
int truncate(object_t oid, off_t size, Context *onsafe);
int read(object_t oid, off_t offset, size_t len, bufferlist& bl);
- int write(object_t oid, off_t offset, size_t len, bufferlist& bl, Context *onsafe);
+ int write(object_t oid, off_t offset, size_t len, const bufferlist& bl, Context *onsafe);
void sync();
void sync(Context *onsafe);
dout(10) << "handle_osd_map decoding inc map epoch " << cur+1 << dendl;
bufferlist bl;
- if (m->incremental_maps.count(cur+1))
+ if (m->incremental_maps.count(cur+1)) {
+ dout(10) << " using provided inc map" << endl;
bl = m->incremental_maps[cur+1];
- else
+ } else {
+ dout(10) << " using my locally stored inc map" << endl;
get_inc_map_bl(cur+1, bl);
+ }
OSDMap::Incremental inc;
int off = 0;
list<int> old_overload; // no longer overload
void encode(bufferlist& bl) {
- bl.append((char*)&epoch, sizeof(epoch));
- bl.append((char*)&mon_epoch, sizeof(mon_epoch));
- bl.append((char*)&ctime, sizeof(ctime));
+ ::_encode(epoch, bl);
+ ::_encode(mon_epoch, bl);
+ ::_encode(ctime, bl);
::_encode(new_up, bl);
::_encode(new_down, bl);
::_encode(new_in, bl);
::_encode(fullmap, bl);
}
void decode(bufferlist& bl, int& off) {
- bl.copy(off, sizeof(epoch), (char*)&epoch);
- off += sizeof(epoch);
- bl.copy(off, sizeof(mon_epoch), (char*)&mon_epoch);
- off += sizeof(mon_epoch);
- bl.copy(off, sizeof(ctime), (char*)&ctime);
- off += sizeof(ctime);
+ ::_decode(epoch, bl, off);
+ ::_decode(mon_epoch, bl, off);
+ ::_decode(ctime, bl, off);
::_decode(new_up, bl, off);
::_decode(new_down, bl, off);
::_decode(new_in, bl, off);
pattrsets.push_back(&aset);
}
- void write(object_t oid, off_t off, size_t len, bufferlist& bl) {
+ void write(object_t oid, off_t off, size_t len, const bufferlist& bl) {
int op = OP_WRITE;
ops.push_back(op);
oids.push_back(oid);
virtual int read(object_t oid,
off_t offset, size_t len,
bufferlist& bl) = 0;
-
- /*virtual int write(object_t oid,
- off_t offset, size_t len,
- bufferlist& bl,
- bool fsync=true) = 0;
- */
virtual int write(object_t oid,
off_t offset, size_t len,
- bufferlist& bl,
+ const bufferlist& bl,
Context *onsafe) = 0;//{ return -1; }
virtual void trim_from_cache(object_t oid,
off_t offset, size_t len) { }