}
utime_t fromstart = g_clock.recent_now();
+ if (fromstart < start)
+ start = fromstart;
fromstart -= start;
while (force ||
Logger(string fn, LogType *type);
~Logger();
+ void set_start(const utime_t& a) { start = a; }
utime_t& get_start() { return start; }
long inc(const char *s, long v = 1);
double fset(const char *s, double v);
double finc(const char *s, double v);
-
+
void flush(bool force = false);
};
// --- mon ---
mon_tick_interval: 5,
- mon_osd_down_out_interval: 20, // seconds
+ mon_osd_down_out_interval: 15, // seconds
// --- client ---
client_cache_size: 300,
osd_age_time: 0,
osd_heartbeat_interval: 10,
osd_replay_window: 15,
+ osd_max_pull: 2,
// --- fakestore ---
fakestore_fake_sync: 2, // 2 seconds
// --- ebofs ---
ebofs: 1,
- ebofs_commit_ms: 10000, // 0 = no forced commit timeout (for debugging/tracing)
- ebofs_idle_commit_ms: 100,//100, // 0 = no idle detection. use this -or- bdev_idle_kick_after_ms
+ ebofs_commit_ms: 2000, // 0 = no forced commit timeout (for debugging/tracing)
+ ebofs_idle_commit_ms: 100, // 0 = no idle detection. use this -or- bdev_idle_kick_after_ms
ebofs_oc_size: 10000, // onode cache
ebofs_cc_size: 10000, // cnode cache
ebofs_bc_size: (350 *256), // 4k blocks, *256 for MB
g_conf.osd_max_rep = atoi(args[++i]);
else if (strcmp(args[i], "--osd_maxthreads") == 0)
g_conf.osd_maxthreads = atoi(args[++i]);
+ else if (strcmp(args[i], "--osd_max_pull") == 0)
+ g_conf.osd_max_pull = atoi(args[++i]);
else if (strcmp(args[i], "--bdev_lock") == 0)
int osd_age_time;
int osd_heartbeat_interval;
int osd_replay_window;
+ int osd_max_pull;
int fakestore_fake_sync;
bool fakestore_fsync;
if (items.empty())
item_weight = w;
items.push_back(item);
+ weight += item_weight;
make_primes();
}
{
'sleep' => 3,
+ 'osdbits' => [6,8,10],
+ 'pgperbits' => [4,6,8],
+
'nummds' => 1,
- 'numosd' => [16,
- 32,
- 64,
- 128,
-# 256,
-# 1024,
-# 4096,
-# 16384,
-# 65536,
-# 256*256
- ],
+
+ '_dep' => [ 'numosd' => '1 << $osdbits',
+ 'pg_bits' => '$pgperbits + $osdbits',
+ 'n' => '10 + $numosd / 32'],
'numclient' => 0,
- 'n' => 6,
- 'fs' => ['ebofs'],
+ 'fs' => 'ebofs',
'start' => 30,
'end' => 180,
'kill_after' => 180,
- 'osd_pg_bits' => [10, 14, 18],
+ 'osd_pg_bits' => [18],
- 'custom' => '--bdev_lock 0 --ms_stripe_osds',
+ 'custom' => '--bdev_lock 0 --ms_stripe_osds --fake_osdmap_updates 1 --osd_maxthreads 0',
#'custom' => '--tcp_skip_rank0',
'comb' => {
// <HACK set up OSDMap from g_conf>
osdmap = new OSDMap();
+ osdmap->ctime = g_clock.now();
osdmap->set_pg_bits(g_conf.osd_pg_bits);
// start at epoch 0 until all osds boot
//if (g_conf.mkfs) osdmap->set_mkfs();
- Bucket *b = new UniformBucket(1, 0);
- int root = osdmap->crush.add_bucket(b);
- for (int i=0; i<g_conf.num_osd; i++) {
- osdmap->osds.insert(i);
- b->add_item(i, 1);
- }
-
- for (int i=1; i<5; i++) {
- osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
- osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0));
- osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ if (g_conf.num_osd >= 12) {
+ int ndom = g_conf.osd_max_rep;
+ UniformBucket *domain[ndom];
+ int domid[ndom];
+ for (int i=0; i<ndom; i++) {
+ domain[i] = new UniformBucket(1, 0);
+ domid[i] = osdmap->crush.add_bucket(domain[i]);
+ }
+
+ // add osds
+ int nper = ((g_conf.num_osd - 1) / ndom) + 1;
+ cerr << ndom << " failure domains, " << nper << " osds each" << endl;
+ int i = 0;
+ for (int dom=0; dom<ndom; dom++) {
+ for (int j=0; j<nper; j++) {
+ osdmap->osds.insert(i);
+ domain[dom]->add_item(i, 1.0);
+ //cerr << "osd" << i << " in domain " << dom << endl;
+ i++;
+ if (i == g_conf.num_osd) break;
+ }
+ }
+
+ // root
+ Bucket *root = new ListBucket(2);
+ for (int i=0; i<ndom; i++) {
+ //cerr << "dom " << i << " w " << domain[i]->get_weight() << endl;
+ root->add_item(domid[i], domain[i]->get_weight());
+ }
+ int nroot = osdmap->crush.add_bucket(root);
+
+ // rules
+ for (int i=1; i<=ndom; i++) {
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ }
+
+ // test
+ vector<int> out;
+ osdmap->pg_to_osds(0x40200000110ULL, out);
+
+ } else {
+ // one bucket
+ Bucket *b = new UniformBucket(1, 0);
+ int root = osdmap->crush.add_bucket(b);
+ for (int i=0; i<g_conf.num_osd; i++) {
+ osdmap->osds.insert(i);
+ b->add_item(i, 1.0);
+ }
+
+ for (int i=1; i<=g_conf.osd_max_rep; i++) {
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0));
+ osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ }
}
if (g_conf.mds_local_osd) {
dout(-10) << "accept_pending " << osdmap->get_epoch() << " -> " << pending.epoch << endl;
// accept pending into a new map!
+ pending.ctime = g_clock.now();
pending.encode( inc_maps[ pending.epoch ] );
// advance!
i++) {
pending_out.erase(*i);
pending.new_out.push_back( *i );
+ }
+ if (!mark_out.empty()) {
accept_pending();
+
+ // hrmpf. bcast map for now. FIXME FIXME.
+ bcast_latest_osd_map_osd();
}
// next!
boot_epoch = 0;
last_tid = 0;
+ num_pulling = 0;
- max_recovery_ops = 5;
pending_ops = 0;
waiting_for_no_ops = false;
osd_logtype.add_inc("r_wr");
osd_logtype.add_inc("r_wrb");
- osd_logtype.add_inc("rlsum");
osd_logtype.add_inc("rlnum");
osd_logtype.add_set("numpg");
dout(1) << "mkfs" << endl;
assert(osdmap->get_epoch() == 1);
+ //cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << endl;
+ logger->set_start( osdmap->get_ctime() );
+
ps_t maxps = 1LL << osdmap->get_pg_bits();
// create PGs
// did primary change?
if (oldprimary != primary) {
pg->info.same_primary_since = osdmap->get_epoch();
-
- // forget about where missing items are, or anything we're pulling
- pg->missing.loc.clear();
- pg->objects_pulling.clear();
+ pg->cancel_recovery();
}
if (role != oldrole) {
// take note
assert(pg->objects_pulling.count(oid) == 0);
+ num_pulling++;
pg->objects_pulling[oid] = v;
}
*/
void OSD::op_rep_pull(MOSDOp *op, PG *pg)
{
- long got = 0;
-
const object_t oid = op->get_oid();
dout(7) << "rep_pull on " << hex << oid << dec << " v >= " << op->get_version() << endl;
<< " in " << *pg
<< endl;
assert(v >= op->get_version());
+
+ logger->inc("r_pull");
+ logger->inc("r_pullb", bl.length());
// reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), true);
messenger->send_message(reply, op->get_asker());
delete op;
-
- logger->inc("r_pull");
- logger->inc("r_pullb", got);
}
t.collection_add(pgid, oid);
// close out pull op.
+ num_pulling--;
pg->objects_pulling.erase(oid);
pg->missing.got(oid, v);
prepare_op_transaction(t, op, nv, pg);
oncommit = new C_OSD_RepModifyCommit(this, op,
pg->info.last_complete);
+
+ logger->inc("r_wr");
+ logger->inc("r_wrb", op->get_length());
}
// go
class Logger *logger;
- int max_recovery_ops;
-
// local store
char dev_path[100];
class ObjectStore *store;
tid_t last_tid;
+ int num_pulling;
hash_map<pg_t, list<Message*> > waiting_for_pg;
int get_pg_bits() const { return pg_bits; }
void set_pg_bits(int b) { pg_bits = b; }
+ const utime_t& get_ctime() const { return ctime; }
+
bool is_mkfs() const { return epoch == 1; }
//void set_mkfs() { assert(epoch == 1); }
dout(10) << "merge_log " << olog << " from osd" << fromosd
<< " into " << log << endl;
- cout << "log" << endl;
+ /*cout << "log" << endl;
log.print(cout);
cout << "olog" << endl;
olog.print(cout);
-
+ */
if (log.empty() ||
(olog.bottom > log.top && olog.backlog)) { // e.g. log=(0,20] olog=(40,50]+backlog)
// i'm missing everything after old log.top.
}
dout(10) << "merge_log result " << log << " " << missing << endl;
- log.print(cout);
+ //log.print(cout);
// found items?
for (map<object_t,eversion_t>::iterator p = missing.missing.begin();
}
+
+void PG::cancel_recovery()
+{
+ // forget about where missing items are, or anything we're pulling
+ missing.loc.clear();
+ osd->num_pulling -= objects_pulling.size();
+ objects_pulling.clear();
+}
+
/**
* do one recovery op.
* return true if done, false if nothing left to do.
*/
bool PG::do_recovery()
{
- dout(10) << "do_recovery" << endl;
+ dout(-10) << "do_recovery pulling " << objects_pulling.size() << " in pg, "
+ << osd->num_pulling << "/" << g_conf.osd_max_pull << " total"
+ << endl;
+
+ // can we slow down on this PG?
+ if (osd->num_pulling >= g_conf.osd_max_pull && !objects_pulling.empty()) {
+ dout(-10) << "do_recovery already pulling max, waiting" << endl;
+ return true;
+ }
// look at log!
Log::Entry *latest = 0;
void activate(ObjectStore::Transaction& t);
+ void cancel_recovery();
bool do_recovery();
void clean_replicas();