// --- objecter ---
objecter_buffer_uncommitted: true, // this must be true for proper failure handling
+ objecter_map_request_interval: 15.0, // request a new map every N seconds, if we have pending io
// --- journaler ---
journaler_allow_split_entries: true,
// objecter
bool objecter_buffer_uncommitted;
+ double objecter_map_request_interval;
// journaler
bool journaler_allow_split_entries;
class MOSDGetMap : public Message {
public:
- epoch_t start;
+ epoch_t start, want;
- MOSDGetMap(epoch_t s=0) :
+ MOSDGetMap(epoch_t s=0, epoch_t w=0) :
Message(MSG_OSD_GETMAP),
- start(s) {
- }
+ start(s), want(w) { }
epoch_t get_start_epoch() { return start; }
+ epoch_t get_want_epoch() { return want; }
char *get_type_name() { return "get_osd_map"; }
void print(ostream& out) {
- out << "get_osd_map(" << start << ")";
+ out << "get_osd_map(have " << start;
+ if (want) out << " want " << want;
+ out << ")";
}
void encode_payload() {
::_encode(start, payload);
+ ::_encode(want, payload);
}
void decode_payload() {
int off = 0;
::_decode(start, payload, off);
+ ::_decode(want, payload, off);
}
};
// new map!
bcast_latest_mds();
-
+ send_to_waiting();
+
return true;
}
{
dout(7) << "handle_osd_getmap from " << m->get_source() << " from " << m->get_start_epoch() << dendl;
- if (m->get_start_epoch())
- send_incremental(m->get_source_inst(), m->get_start_epoch());
- else
+ if (m->get_start_epoch()) {
+ if (m->get_want_epoch() <= osdmap.get_epoch())
+ send_incremental(m->get_source_inst(), m->get_start_epoch());
+ else
+ waiting_for_map[m->get_source_inst()] = pair<epoch_t,epoch_t>(m->get_start_epoch(),
+ m->get_want_epoch());
+ } else
send_full(m->get_source_inst());
delete m;
{
dout(10) << "send_to_waiting " << osdmap.get_epoch() << dendl;
- for (map<entity_name_t,pair<entity_inst_t,epoch_t> >::iterator i = awaiting_map.begin();
- i != awaiting_map.end();
- i++) {
- if (i->second.second)
- send_incremental(i->second.first, i->second.second);
- else
- send_full(i->second.first);
+ map<entity_inst_t,pair<epoch_t,epoch_t> >::iterator i = waiting_for_map.begin();
+ while (i != waiting_for_map.end()) {
+ if (i->second.first) {
+ if (i->second.second <= osdmap.get_epoch())
+ send_incremental(i->first, i->second.first);
+ else {
+ dout(10) << "send_to_waiting skipping " << i->first
+ << " has " << i->second.first
+ << " wants " << i->second.second
+ << endl;
+ i++;
+ continue;
+ }
+ } else
+ send_full(i->first);
+
+ waiting_for_map.erase(i++);
}
}
send_incremental(who, start);
} else {
dout(5) << "send_latest to " << who << " later" << dendl;
- awaiting_map[who.name].first = who;
- awaiting_map[who.name].second = start;
+ waiting_for_map[who] = pair<epoch_t,epoch_t>(start, 0);
}
}
OSDMap osdmap;
private:
- map<entity_name_t, pair<entity_inst_t, epoch_t> > awaiting_map;
+ map<entity_inst_t, pair<epoch_t,epoch_t> > waiting_for_map; // who -> (has, wants)
// [leader]
OSDMap::Incremental pending_inc;
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#include "Objecter.h"
#include "osd/OSDMap.h"
delete m;
}
+
+void Objecter::maybe_request_map()
+{
+ utime_t now;
+
+ if (last_epoch_requested <= osdmap->get_epoch() ||
+ (now = g_clock.now()) - last_epoch_requested_stamp > g_conf.objecter_map_request_interval) {
+ dout(10) << "maybe_request_map requesting next osd map" << dendl;
+ last_epoch_requested_stamp = now;
+ last_epoch_requested = osdmap->get_epoch()+1;
+ messenger->send_message(new MOSDGetMap(osdmap->get_epoch(), last_epoch_requested),
+ monmap->get_inst(monmap->pick_mon()));
+ }
+}
+
+
+
void Objecter::scan_pgs(set<pg_t>& changed_pgs)
{
dout(10) << "scan_pgs" << dendl;
<< " = osd" << who << dendl;
}
messenger->send_message(m, osdmap->get_inst(who));
- }
+ } else
+ maybe_request_map();
return last_tid;
}
}
messenger->send_message(m, osdmap->get_inst(pg.primary()));
- }
+ } else
+ maybe_request_map();
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
#ifndef __OBJECTER_H
#define __OBJECTER_H
int num_unacked;
int num_uncommitted;
+ epoch_t last_epoch_requested;
+ utime_t last_epoch_requested_stamp;
+
+ void maybe_request_map();
+
/*** track pending operations ***/
// read
public: