From c0058fefbd4189a1bd332d4461df4343ea417ab2 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 29 Jan 2010 16:36:10 -0800 Subject: [PATCH] streamtest: do concurrent ios; async commit AND ack --- src/streamtest.cc | 48 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/src/streamtest.cc b/src/streamtest.cc index 3fd8057c821fa..c3f5b8571f945 100644 --- a/src/streamtest.cc +++ b/src/streamtest.cc @@ -27,17 +27,26 @@ struct io { } }; map writes; - +Cond cond; Mutex lock("streamtest.cc lock"); - +unsigned concurrent = 1; +void throttle() +{ + Mutex::Locker l(lock); + while (writes.size() >= concurrent) { + //generic_dout(0) << "waiting" << dendl; + cond.Wait(lock); + } +} void pr(off_t off) { io &i = writes[off]; - dout(2) << off << "\t" + generic_dout(0) << off << "\t" << (i.ack - i.start) << "\t" << (i.commit - i.start) << dendl; writes.erase(off); + cond.Signal(); } void set_start(off_t off, utime_t t) @@ -49,6 +58,7 @@ void set_start(off_t off, utime_t t) void set_ack(off_t off, utime_t t) { Mutex::Locker l(lock); + //generic_dout(0) << "ack " << off << dendl; writes[off].ack = t; if (writes[off].done()) pr(off); @@ -57,12 +67,20 @@ void set_ack(off_t off, utime_t t) void set_commit(off_t off, utime_t t) { Mutex::Locker l(lock); + //generic_dout(0) << "commit " << off << dendl; writes[off].commit = t; if (writes[off].done()) pr(off); } +struct C_Ack : public Context { + off_t off; + C_Ack(off_t o) : off(o) {} + void finish(int r) { + set_ack(off, g_clock.now()); + } +}; struct C_Commit : public Context { off_t off; C_Commit(off_t o) : off(o) {} @@ -87,19 +105,23 @@ int main(int argc, const char **argv) const char *journal = 0; if (args.size() >= 4) journal = args[3]; + if (args.size() >= 5) + concurrent = atoi(args[4]); + + cout << "concurrent = " << concurrent << std::endl; buffer::ptr bp(bytes); bp.zero(); bufferlist bl; bl.push_back(bp); - float interval = 1.0 / 1000; + //float interval = 1.0 / 1000; cout << "#dev " << filename << ", " << seconds << " seconds, " << bytes << " bytes per write" << std::endl; //ObjectStore *fs = new Ebofs(filename, journal); - ObjectStore *fs = new FileStore(filename); + ObjectStore *fs = new FileStore(filename, journal); if (fs->mount() < 0) { cout << "mount failed" << std::endl; @@ -118,16 +140,17 @@ int main(int argc, const char **argv) cout << "# offset\tack\tcommit" << std::endl; while (now < end) { sobject_t poid(object_t("streamtest"), 0); - utime_t start = now; - set_start(pos, now); - ObjectStore::Transaction t; - t.write(coll_t(), poid, pos, bytes, bl); - fs->apply_transaction(t, new C_Commit(pos)); - now = g_clock.now(); - set_ack(pos, now); + + set_start(pos, g_clock.now()); + ObjectStore::Transaction *t = new ObjectStore::Transaction; + t->write(coll_t(), poid, pos, bytes, bl); + fs->queue_transaction(t, new C_Ack(pos), new C_Commit(pos)); pos += bytes; + throttle(); + // wait? + /* utime_t next = start; next += interval; if (now < next) { @@ -136,6 +159,7 @@ int main(int argc, const char **argv) //cout << "sleeping for " << s << " us" << std::endl; usleep((int)s); } + */ } fs->umount(); -- 2.39.5