From 1cd29f1247f72606bf4b85c140496028dea71b0a Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 28 Sep 2007 22:45:11 +0000 Subject: [PATCH] merged r1850:1862 from trunk git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1863 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/mds/common/Timer.cc | 6 +- branches/sage/mds/common/Timer.h | 4 +- branches/sage/mds/config.cc | 27 +++-- branches/sage/mds/config.h | 4 + branches/sage/mds/mds/MDLog.cc | 2 +- branches/sage/mds/mon/OSDMonitor.cc | 130 ++++++++++++++----------- branches/sage/mds/mon/OSDMonitor.h | 5 + branches/sage/mds/osd/OSD.cc | 22 +++++ branches/sage/mds/osd/OSDMap.h | 7 ++ branches/sage/mds/osd/ReplicatedPG.cc | 10 +- branches/sage/mds/osd/ReplicatedPG.h | 4 +- branches/sage/mds/osd/osd_types.h | 3 +- branches/sage/mds/osdc/Journaler.cc | 62 ++++++++---- branches/sage/mds/osdc/Journaler.h | 20 +++- branches/sage/mds/test/test_disk_bw.cc | 33 ++++--- 15 files changed, 226 insertions(+), 113 deletions(-) diff --git a/branches/sage/mds/common/Timer.cc b/branches/sage/mds/common/Timer.cc index 2606ee344b117..1705bc759ac9f 100644 --- a/branches/sage/mds/common/Timer.cc +++ b/branches/sage/mds/common/Timer.cc @@ -184,11 +184,11 @@ void Timer::cancel_timer() */ -void Timer::add_event_after(float seconds, +void Timer::add_event_after(double seconds, Context *callback) { utime_t when = g_clock.now(); - when.sec_ref() += (int)seconds; + when += seconds; add_event_at(when, callback); } @@ -244,7 +244,7 @@ bool Timer::cancel_event(Context *callback) // ------------------------------- -void SafeTimer::add_event_after(float seconds, Context *c) +void SafeTimer::add_event_after(double seconds, Context *c) { assert(lock.is_locked()); Context *w = new EventWrapper(this, c); diff --git a/branches/sage/mds/common/Timer.h b/branches/sage/mds/common/Timer.h index 414ac3ad4e8eb..3574833c342c3 100644 --- a/branches/sage/mds/common/Timer.h +++ b/branches/sage/mds/common/Timer.h @@ -117,7 +117,7 @@ class Timer { } // schedule events - void add_event_after(float seconds, + void add_event_after(double seconds, Context *callback); void add_event_at(utime_t when, Context *callback); @@ -156,7 +156,7 @@ public: SafeTimer(Mutex& l) : lock(l) { } ~SafeTimer(); - void add_event_after(float seconds, Context *c); + void add_event_after(double seconds, Context *c); void add_event_at(utime_t when, Context *c); void cancel_event(Context *c); void cancel_all(); diff --git a/branches/sage/mds/config.cc b/branches/sage/mds/config.cc index 95d946a36466b..a51db55b3269c 100644 --- a/branches/sage/mds/config.cc +++ b/branches/sage/mds/config.cc @@ -189,6 +189,8 @@ md_config_t g_conf = { journaler_write_head_interval: 15, journaler_cache: false, // cache writes for later readback journaler_prefetch_periods: 50, // * journal object size (1~MB? see above) + journaler_batch_interval: .001, // seconds.. max add'l latency we artificially incur + journaler_batch_max: 16384, // max bytes we'll delay flushing // --- mds --- mds_cache_size: MDS_CACHE_SIZE, @@ -258,7 +260,7 @@ md_config_t g_conf = { osd_shed_reads: false, // forward from primary to replica osd_shed_reads_min_latency: .01, // min local latency osd_shed_reads_min_latency_diff: .01, // min latency difference - osd_shed_reads_min_latency_ratio: 1.2, // 1.2 == 20% higher than peer + osd_shed_reads_min_latency_ratio: 1.5, // 1.2 == 20% higher than peer osd_immediate_read_from_cache: false,//true, // osds to read from the cache immediately? osd_exclusive_caching: true, // replicas evict replicated writes @@ -266,8 +268,8 @@ md_config_t g_conf = { osd_stat_refresh_interval: .5, osd_pg_bits: 4, // bits per osd - osd_object_layout: OBJECT_LAYOUT_HASHINO, - osd_pg_layout: PG_LAYOUT_CRUSH, + osd_object_layout: OBJECT_LAYOUT_HASHINO,//LINEAR,//HASHINO, + osd_pg_layout: PG_LAYOUT_CRUSH,//LINEAR,//CRUSH, osd_max_rep: 4, osd_min_raid_width: 4, osd_max_raid_width: 3, //6, @@ -283,6 +285,8 @@ md_config_t g_conf = { osd_max_pull: 2, osd_pad_pg_log: false, + osd_auto_weight: false, + osd_hack_fast_startup: false, // this breaks localized pgs. @@ -299,11 +303,11 @@ md_config_t g_conf = { ebofs: 1, ebofs_cloneable: false, ebofs_verify: false, - ebofs_commit_ms: 500, // 0 = no forced commit timeout (for debugging/tracing) + ebofs_commit_ms: 1000, // 0 = no forced commit timeout (for debugging/tracing) ebofs_idle_commit_ms: 0, // 0 = no idle detection. UGLY HACK. use bdev_idle_kick_after_ms instead. ebofs_oc_size: 10000, // onode cache ebofs_cc_size: 10000, // cnode cache - ebofs_bc_size: (60 *256), // 4k blocks, *256 for MB + ebofs_bc_size: (50 *256), // 4k blocks, *256 for MB ebofs_bc_max_dirty: (30 *256), // before write() will block ebofs_max_prefetch: 1000, // 4k blocks ebofs_realloc: false, // hrm, this can cause bad fragmentation, don't use! @@ -313,8 +317,8 @@ md_config_t g_conf = { // --- block device --- bdev_lock: true, - bdev_iothreads: 2, // number of ios to queue with kernel - bdev_idle_kick_after_ms: 100, // ms ** FIXME ** this seems to break things, not sure why yet ** + bdev_iothreads: 1, // number of ios to queue with kernel + bdev_idle_kick_after_ms: 100, // ms bdev_el_fw_max_ms: 10000, // restart elevator at least once every 1000 ms bdev_el_bw_max_ms: 3000, // restart elevator at least once every 300 ms bdev_el_bidir: false, // bidirectional elevator? @@ -540,6 +544,7 @@ void parse_config_options(std::vector& args) //g_conf.fake_osd_sync = atoi(args[++i]); + else if (strcmp(args[i], "--doutdir") == 0) { g_conf.dout_dir = args[++i]; } @@ -661,6 +666,10 @@ void parse_config_options(std::vector& args) g_conf.journaler_safe = atoi(args[++i]); else if (strcmp(args[i], "--journaler_cache") == 0) g_conf.journaler_cache = atoi(args[++i]); + else if (strcmp(args[i], "--journaler_batch_interval") == 0) + g_conf.journaler_batch_interval = atof(args[++i]); + else if (strcmp(args[i], "--journaler_batch_max") == 0) + g_conf.journaler_batch_max = atoi(args[++i]); else if (strcmp(args[i], "--mds_cache_size") == 0) g_conf.mds_cache_size = atoi(args[++i]); @@ -869,10 +878,12 @@ void parse_config_options(std::vector& args) else if (strcmp(args[i], "--osd_pad_pg_log") == 0) g_conf.osd_pad_pg_log = atoi(args[++i]); + else if (strcmp(args[i], "--osd_auto_weight") == 0) + g_conf.osd_auto_weight = atoi(args[++i]); + else if (strcmp(args[i], "--osd_hack_fast_startup") == 0) g_conf.osd_hack_fast_startup = atoi(args[++i]); - else if (strcmp(args[i], "--bdev_lock") == 0) g_conf.bdev_lock = atoi(args[++i]); else if (strcmp(args[i], "--bdev_el_bidir") == 0) diff --git a/branches/sage/mds/config.h b/branches/sage/mds/config.h index 973672058c16b..50a3099e693b0 100644 --- a/branches/sage/mds/config.h +++ b/branches/sage/mds/config.h @@ -182,6 +182,8 @@ struct md_config_t { int journaler_write_head_interval; bool journaler_cache; int journaler_prefetch_periods; + double journaler_batch_interval; + size_t journaler_batch_max; // mds int mds_cache_size; @@ -274,6 +276,8 @@ struct md_config_t { int osd_max_pull; bool osd_pad_pg_log; + bool osd_auto_weight; + bool osd_hack_fast_startup; double fakestore_fake_sync; diff --git a/branches/sage/mds/mds/MDLog.cc b/branches/sage/mds/mds/MDLog.cc index 48592ba3b8f5e..275f2629eeb27 100644 --- a/branches/sage/mds/mds/MDLog.cc +++ b/branches/sage/mds/mds/MDLog.cc @@ -77,7 +77,7 @@ void MDLog::init_journaler() // log streamer if (journaler) delete journaler; - journaler = new Journaler(log_inode, mds->objecter, logger); + journaler = new Journaler(log_inode, mds->objecter, logger, &mds->mds_lock); } void MDLog::write_head(Context *c) diff --git a/branches/sage/mds/mon/OSDMonitor.cc b/branches/sage/mds/mon/OSDMonitor.cc index 09677829e939a..200187510f698 100644 --- a/branches/sage/mds/mon/OSDMonitor.cc +++ b/branches/sage/mds/mon/OSDMonitor.cc @@ -119,14 +119,56 @@ void OSDMonitor::create_initial() // start at epoch 1 until all osds boot newmap.inc_epoch(); // = 1 assert(newmap.get_epoch() == 1); + + map weights; + build_crush_map(newmap.crush, weights); + + for (int i=0; i + + // fake osd failures + for (map::iterator i = g_fake_osd_down.begin(); + i != g_fake_osd_down.end(); + i++) { + dout(0) << "will fake osd" << i->first << " DOWN after " << i->second << dendl; + mon->timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 1)); + } + for (map::iterator i = g_fake_osd_out.begin(); + i != g_fake_osd_out.end(); + i++) { + dout(0) << "will fake osd" << i->first << " OUT after " << i->second << dendl; + mon->timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0)); + } + + // encode into pending incremental + newmap.encode(pending_inc.fullmap); +} + + +void OSDMonitor::build_crush_map(Crush& crush, + map& weights) +{ + if (g_conf.num_osd >= 12) { int ndom = g_conf.osd_max_rep; UniformBucket *domain[ndom]; int domid[ndom]; for (int i=0; iadd_item(i, 1.0); + domain[dom]->add_item(i, weights[i] ? weights[i]:1.0); //derr(0) << "osd" << i << " in domain " << dom << dendl; i++; if (i == g_conf.num_osd) break; @@ -150,94 +190,63 @@ void OSDMonitor::create_initial() //derr(0) << "dom " << i << " w " << domain[i]->get_weight() << dendl; root->add_item(domid[i], domain[i]->get_weight()); } - int nroot = newmap.crush.add_bucket(root); + int nroot = crush.add_bucket(root); // rules // replication for (int i=1; i<=ndom; i++) { int r = CRUSH_REP_RULE(i); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } // raid for (int i=g_conf.osd_min_raid_width; i <= g_conf.osd_max_raid_width; i++) { int r = CRUSH_RAID_RULE(i); if (ndom >= i) { - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 1)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, 1, 0)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 1)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, 1, 0)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } else { - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } } // test //vector out; - //newmap.pg_to_osds(0x40200000110ULL, out); + //pg_to_osds(0x40200000110ULL, out); } else { // one bucket Bucket *b = new UniformBucket(1, 0); - int root = newmap.crush.add_bucket(b); + int root = crush.add_bucket(b); for (int i=0; iadd_item(i, 1.0); + b->add_item(i, weights[i] ? weights[i]:1.0); } // rules // replication for (int i=1; i<=g_conf.osd_max_rep; i++) { int r = CRUSH_REP_RULE(i); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } // raid for (int i=g_conf.osd_min_raid_width; i <= g_conf.osd_max_raid_width; i++) { int r = CRUSH_RAID_RULE(i); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); - newmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0)); + crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); } } - - if (g_conf.mds_local_osd) { - // add mds local osds, but don't put them in the crush mapping func - for (int i=0; i - - // fake osd failures - for (map::iterator i = g_fake_osd_down.begin(); - i != g_fake_osd_down.end(); - i++) { - dout(0) << "will fake osd" << i->first << " DOWN after " << i->second << dendl; - mon->timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 1)); - } - for (map::iterator i = g_fake_osd_out.begin(); - i != g_fake_osd_out.end(); - i++) { - dout(0) << "will fake osd" << i->first << " OUT after " << i->second << dendl; - mon->timer.add_event_after(i->second, new C_Mon_FakeOSDFailure(this, i->first, 0)); - } - - // encode into pending incremental - newmap.encode(pending_inc.fullmap); } - bool OSDMonitor::update_from_paxos() { assert(paxos->is_active()); @@ -410,6 +419,11 @@ bool OSDMonitor::should_propose(double& delay) if (osdmap.epoch == 1) { if (pending_inc.new_up.size() == osdmap.get_osds().size()) { delay = 0.0; + if (g_conf.osd_auto_weight) { + Crush crush; + build_crush_map(crush, osd_weight); + crush._encode(pending_inc.crush); + } return true; } else return false; @@ -552,6 +566,8 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) if (osdmap.out_osds.count(from)) pending_inc.new_in.push_back(from); + osd_weight[from] = m->sb.weight; + // wait paxos->wait_for_commit(new C_Booted(this, m)); } @@ -560,7 +576,7 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) void OSDMonitor::_booted(MOSDBoot *m) { - dout(7) << "_booted " << m->inst << dendl; + dout(7) << "_booted " << m->inst << " w " << m->sb.weight << dendl; send_latest(m->inst, m->sb.current_epoch); delete m; } diff --git a/branches/sage/mds/mon/OSDMonitor.h b/branches/sage/mds/mon/OSDMonitor.h index 8db58dc328052..afdd625ae04aa 100644 --- a/branches/sage/mds/mon/OSDMonitor.h +++ b/branches/sage/mds/mon/OSDMonitor.h @@ -41,6 +41,11 @@ private: OSDMap::Incremental pending_inc; map down_pending_out; // osd down -> out + map osd_weight; + + void build_crush_map(Crush& crush, + map& weights); + // svc void create_initial(); bool update_from_paxos(); diff --git a/branches/sage/mds/osd/OSD.cc b/branches/sage/mds/osd/OSD.cc index 0ef5160d2084f..30c320c623aac 100644 --- a/branches/sage/mds/osd/OSD.cc +++ b/branches/sage/mds/osd/OSD.cc @@ -221,6 +221,26 @@ int OSD::init() 50000, g_conf.osd_age - .05); } + + if (g_conf.osd_auto_weight) { + // benchmark + bufferlist bl; + bufferptr bp(1048576); + bp.zero(); + bl.push_back(bp); + utime_t start = g_clock.now(); + for (int i=0; i<1000; i++) + store->write(object_t(999,i), 0, bl.length(), bl, 0); + store->sync(); + utime_t end = g_clock.now(); + end -= start; + dout(0) << "measured " << (1000.0 / (double)end) << " mb/sec" << dendl; + for (int i=0; i<1000; i++) + store->remove(object_t(999,i), 0); + + // set osd weight + superblock.weight = (1000.0 / (double)end); + } } else { dout(2) << "boot" << dendl; @@ -235,6 +255,8 @@ int OSD::init() assert(whoami == superblock.whoami); } + + // log char name[80]; diff --git a/branches/sage/mds/osd/OSDMap.h b/branches/sage/mds/osd/OSDMap.h index 6459d2ef51351..b50f725687d23 100644 --- a/branches/sage/mds/osd/OSDMap.h +++ b/branches/sage/mds/osd/OSDMap.h @@ -90,6 +90,7 @@ public: // full (rare) bufferlist fullmap; // in leiu of below. + bufferlist crush; // incremental map new_up; @@ -109,6 +110,7 @@ public: ::_encode(new_out, bl); ::_encode(new_overload, bl); ::_encode(fullmap, bl); + ::_encode(crush, bl); } void decode(bufferlist& bl, int& off) { ::_decode(epoch, bl, off); @@ -120,6 +122,7 @@ public: ::_decode(new_out, bl, off); ::_decode(new_overload, bl, off); ::_decode(fullmap, bl, off); + ::_decode(crush, bl, off); } Incremental(epoch_t e=0) : epoch(e), mon_epoch(0) {} @@ -219,6 +222,10 @@ private: decode(inc.fullmap); return; } + if (inc.crush.length()) { + int off = 0; + crush._decode(inc.crush, off); + } // nope, incremental. for (map >::iterator i = inc.new_down.begin(); diff --git a/branches/sage/mds/osd/ReplicatedPG.cc b/branches/sage/mds/osd/ReplicatedPG.cc index 9801a13d5da9d..7b5bdf581d643 100644 --- a/branches/sage/mds/osd/ReplicatedPG.cc +++ b/branches/sage/mds/osd/ReplicatedPG.cc @@ -886,16 +886,12 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) } dout(10) << "put_repop deleting " << *repop << dendl; - //repop->lock.Unlock(); assert(rep_gather.count(repop->rep_tid)); rep_gather.erase(repop->rep_tid); delete repop->op; delete repop; - - } else { - //repop->lock.Unlock(); } } @@ -1659,7 +1655,7 @@ void ReplicatedPG::note_failed_osd(int o) dout(10) << "note_failed_osd " << o << dendl; // do async; repop_ack() may modify pg->repop_gather list ls; - for (map::iterator p = rep_gather.begin(); + for (hash_map::iterator p = rep_gather.begin(); p != rep_gather.end(); p++) { //dout(-1) << "checking repop tid " << p->first << dendl; @@ -1685,7 +1681,7 @@ void ReplicatedPG::on_acker_change() } else { // for splay or chain replication, any change is significant. // apply repops - for (map::iterator p = rep_gather.begin(); + for (hash_map::iterator p = rep_gather.begin(); p != rep_gather.end(); p++) { if (!p->second->applied) @@ -1696,7 +1692,7 @@ void ReplicatedPG::on_acker_change() rep_gather.clear(); // and repop waiters - for (map >::iterator p = waiting_for_repop.begin(); + for (hash_map >::iterator p = waiting_for_repop.begin(); p != waiting_for_repop.end(); p++) for (list::iterator pm = p->second.begin(); diff --git a/branches/sage/mds/osd/ReplicatedPG.h b/branches/sage/mds/osd/ReplicatedPG.h index 2680fa217d477..ab44026b43fb2 100644 --- a/branches/sage/mds/osd/ReplicatedPG.h +++ b/branches/sage/mds/osd/ReplicatedPG.h @@ -69,8 +69,8 @@ public: protected: // replica ops // [primary|tail] - map rep_gather; - map > waiting_for_repop; + hash_map rep_gather; + hash_map > waiting_for_repop; // load balancing set balancing_reads; diff --git a/branches/sage/mds/osd/osd_types.h b/branches/sage/mds/osd/osd_types.h index 8a0d0f146fa2c..6c36d0f2ffd77 100644 --- a/branches/sage/mds/osd/osd_types.h +++ b/branches/sage/mds/osd/osd_types.h @@ -312,9 +312,10 @@ public: int32_t whoami; // my role in this fs. epoch_t current_epoch; // most recent epoch epoch_t oldest_map, newest_map; // oldest/newest maps we have. + double weight; OSDSuperblock(uint64_t f=0, int w=0) : magic(MAGIC), fsid(f), whoami(w), - current_epoch(0), oldest_map(0), newest_map(0) {} + current_epoch(0), oldest_map(0), newest_map(0), weight(0) {} }; inline ostream& operator<<(ostream& out, OSDSuperblock& sb) diff --git a/branches/sage/mds/osdc/Journaler.cc b/branches/sage/mds/osdc/Journaler.cc index 3a56347ee16ad..c2719549e2247 100644 --- a/branches/sage/mds/osdc/Journaler.cc +++ b/branches/sage/mds/osdc/Journaler.cc @@ -288,6 +288,32 @@ off_t Journaler::append_entry(bufferlist& bl, Context *onsync) } +void Journaler::_do_flush() +{ + if (write_pos == flush_pos) return; + assert(write_pos > flush_pos); + + // flush + unsigned len = write_pos - flush_pos; + assert(len == write_buf.length()); + dout(10) << "_do_flush flushing " << flush_pos << "~" << len << dendl; + + // submit write for anything pending + // flush _start_ pos to _finish_flush + filer.write(inode, flush_pos, len, write_buf, 0, + g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos), // on ACK + g_conf.journaler_safe ? new C_Flush(this, flush_pos):0); // on COMMIT + pending_flush[flush_pos] = g_clock.now(); + + // adjust pointers + flush_pos = write_pos; + write_buf.clear(); + + dout(10) << "_do_flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl; +} + + + void Journaler::flush(Context *onsync) { // all flushed and acked? @@ -305,23 +331,22 @@ void Journaler::flush(Context *onsync) assert(write_buf.length() == 0); dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl; } else { - // flush - unsigned len = write_pos - flush_pos; - assert(len == write_buf.length()); - dout(10) << "flush flushing " << flush_pos << "~" << len << dendl; - - // submit write for anything pending - // flush _start_ pos to _finish_flush - filer.write(inode, flush_pos, len, write_buf, 0, - g_conf.journaler_safe ? 0:new C_Flush(this, flush_pos), // on ACK - g_conf.journaler_safe ? new C_Flush(this, flush_pos):0); // on COMMIT - pending_flush[flush_pos] = g_clock.now(); - - // adjust pointers - flush_pos = write_pos; - write_buf.clear(); - - dout(10) << "flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl; + if (1) { + // maybe buffer + if (write_buf.length() < g_conf.journaler_batch_max) { + // delay! schedule an event. + dout(20) << "flush delaying flush" << dendl; + if (delay_flush_event) timer.cancel_event(delay_flush_event); + delay_flush_event = new C_DelayFlush(this); + timer.add_event_after(g_conf.journaler_batch_interval, delay_flush_event); + } else { + dout(20) << "flush not delaying flush" << dendl; + _do_flush(); + } + } else { + // always flush + _do_flush(); + } } // queue waiter (at _new_ write_pos; will go when reached by ack_pos) @@ -401,6 +426,9 @@ void Journaler::_finish_read(int r) */ void Journaler::_issue_read(off_t len) { + // make sure we're fully flushed + _do_flush(); + if (_is_reading()) { dout(10) << "_issue_read " << len << " waiting, already reading " << received_pos << "~" << (requested_pos-received_pos) << dendl; diff --git a/branches/sage/mds/osdc/Journaler.h b/branches/sage/mds/osdc/Journaler.h index c6f8ec0867d28..6463d9caf0e6f 100644 --- a/branches/sage/mds/osdc/Journaler.h +++ b/branches/sage/mds/osdc/Journaler.h @@ -80,6 +80,20 @@ class Journaler { Logger *logger; + Mutex *lock; + SafeTimer timer; + + class C_DelayFlush : public Context { + Journaler *journaler; + public: + C_DelayFlush(Journaler *j) : journaler(j) {} + void finish(int r) { + journaler->delay_flush_event = 0; + journaler->_do_flush(); + } + } *delay_flush_event; + + // my state static const int STATE_UNDEF = 0; static const int STATE_READHEAD = 1; @@ -113,6 +127,7 @@ class Journaler { std::map pending_flush; // start offsets and times for pending flushes std::map > waitfor_flush; // when flushed through given offset + void _do_flush(); void _finish_flush(int r, off_t start); class C_Flush; friend class C_Flush; @@ -155,8 +170,9 @@ class Journaler { friend class C_Trim; public: - Journaler(inode_t& inode_, Objecter *obj, Logger *l, off_t fl=0, off_t pff=0) : - inode(inode_), objecter(obj), filer(objecter), logger(l), + Journaler(inode_t& inode_, Objecter *obj, Logger *l, Mutex *lk, off_t fl=0, off_t pff=0) : + inode(inode_), objecter(obj), filer(objecter), logger(l), + lock(lk), timer(*lk), delay_flush_event(0), state(STATE_UNDEF), write_pos(0), flush_pos(0), ack_pos(0), read_pos(0), requested_pos(0), received_pos(0), diff --git a/branches/sage/mds/test/test_disk_bw.cc b/branches/sage/mds/test/test_disk_bw.cc index d1f44af3127ca..fc36da74fadb2 100644 --- a/branches/sage/mds/test/test_disk_bw.cc +++ b/branches/sage/mds/test/test_disk_bw.cc @@ -18,35 +18,42 @@ using namespace std; int main(int argc, char **argv) { void *buf; - int dev_id, count, loop = 0, ret; + int fd, count, loop = 0, ret; - if (argc != 3) { - fprintf(stderr, "Usage: %s device mb\n", argv[0]); + if (argc != 4) { + fprintf(stderr, "Usage: %s device bsize count\n", argv[0]); exit (0); } - count = atoi(argv[2]); - int bsize = 1048576; + int bsize = atoi(argv[2]); + count = atoi(argv[3]); posix_memalign(&buf, sysconf(_SC_PAGESIZE), bsize); - if ((dev_id = open(argv[1], O_DIRECT|O_RDWR)) < 0) { + //if ((fd = open(argv[1], O_SYNC|O_RDWR)) < 0) { + if ((fd = open(argv[1], O_DIRECT|O_RDWR)) < 0) { + fprintf(stderr, "Can't open device %s\n", argv[1]); exit (4); } - fprintf(stderr, "device is %s, dev_id is %d\n", argv[1], dev_id); - + utime_t start = g_clock.now(); while (loop++ < count) { - ret = ::write(dev_id, buf, bsize); - if ((loop % 100) == 0) - fprintf(stderr, "."); + ret = ::write(fd, buf, bsize); + //if ((loop % 100) == 0) + //fprintf(stderr, "."); } + ::fsync(fd); + ::close(fd); utime_t end = g_clock.now(); end -= start; - int mb = count; - cout << mb << " MB, " << end << " seconds, " << ((double)mb / (double)end) << " MB/sec" << std::endl; + char hostname[80]; + gethostname(hostname, 80); + + double mb = bsize*count/1024/1024; + + cout << hostname << "\t" << mb << " MB\t" << end << " seconds\t" << (mb / (double)end) << " MB/sec" << std::endl; } -- 2.39.5