}
+void BlockDevice::barrier()
+{
+ lock.Lock();
+ dout(10) << "barrier" << endl;
+ if (!use_next_queue &&
+ !io_queue.empty())
+ use_next_queue = true;
+ lock.Unlock();
+}
+
+void BlockDevice::_bump_queue()
+{
+ if (io_queue.empty() && use_next_queue) {
+ dout(10) << "_bump_queue next_io_queue (" << next_io_queue.size()
+ << ") -> io_queue (" << io_queue.size() << ")" << endl; // empty, duh.
+ use_next_queue = false;
+ io_queue.swap(next_io_queue);
+ }
+}
+
void* BlockDevice::io_thread_entry()
{
lock.Lock();
// dequeue
io_queue_map.erase(bio);
io_queue.erase(prev);
+ _bump_queue();
if (at_end) break;
}
// sleep
io_threads_running--;
- dout(20) << "io_thread" << whoami << " sleeping, " << io_threads_running << " threads now running" << endl;
+ dout(20) << "io_thread" << whoami << " sleeping, " << io_threads_running << " threads now running,"
+ << " queue has " << io_queue.size() << " / " << next_io_queue.size()
+ << endl;
if (g_conf.bdev_idle_kick_after_ms > 0 &&
io_threads_running == 0 &&
// NOTE: lock must be held
dout(15) << "_submit_io " << *b << endl;
- // wake up io_thread(s)?
- if ((int)io_queue.size() == io_threads_running)
- io_wakeup.SignalOne();
- else if ((int)io_queue.size() > io_threads_running)
- io_wakeup.SignalAll();
-
- // [DEBUG] check for overlapping ios
- if (g_conf.bdev_debug_check_io_overlap) {
- // BUG: this doesn't catch everything! eg 1~10000000 will be missed....
- multimap<block_t, biovec*>::iterator p = io_queue.lower_bound(b->start);
- if ((p != io_queue.end() &&
- p->first < b->start+b->length) ||
- (p != io_queue.begin() &&
- (p--, p->second->start + p->second->length > b->start))) {
- dout(1) << "_submit_io new io " << *b
- << " overlaps with existing " << *p->second << endl;
- cerr << "_submit_io new io " << *b
- << " overlaps with existing " << *p->second << endl;
+ if (use_next_queue) {
+ // queue in next queue
+ assert(!io_queue.empty());
+ next_io_queue.insert(pair<block_t,biovec*>(b->start, b));
+ } else {
+ // wake up io_thread(s)?
+ if ((int)io_queue.size() == io_threads_running)
+ io_wakeup.SignalOne();
+ else if ((int)io_queue.size() > io_threads_running)
+ io_wakeup.SignalAll();
+
+ // [DEBUG] check for overlapping ios
+ // BUG: this doesn't detect all overlaps w/ the next queue thing.
+ if (g_conf.bdev_debug_check_io_overlap) {
+ // BUG: this doesn't catch everything! eg 1~10000000 will be missed....
+ multimap<block_t, biovec*>::iterator p = io_queue.lower_bound(b->start);
+ if ((p != io_queue.end() &&
+ p->first < b->start+b->length) ||
+ (p != io_queue.begin() &&
+ (p--, p->second->start + p->second->length > b->start))) {
+ dout(1) << "_submit_io new io " << *b
+ << " overlaps with existing " << *p->second << endl;
+ cerr << "_submit_io new io " << *b
+ << " overlaps with existing " << *p->second << endl;
+ }
}
+
+ // queue anew
+ io_queue.insert(pair<block_t,biovec*>(b->start, b));
}
-
- // queue anew
- io_queue.insert(pair<block_t,biovec*>(b->start, b));
io_queue_map[b] = b->start;
-
}
int BlockDevice::_cancel_io(biovec *bio)
dout(15) << "_cancel_io " << *bio << endl;
block_t b = io_queue_map[bio];
- multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(b);
- while (p->second != bio) p++;
- assert(p->second == bio);
io_queue_map.erase(bio);
- io_queue.erase(p);
+
+ multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(b);
+ while (p->first == b && p->second != bio) p++;
+ if (p->first == b && p->second == bio) {
+ io_queue.erase(p);
+ _bump_queue();
+ } else {
+ p = next_io_queue.lower_bound(b);
+ while (p->first == b && p->second != bio) p++;
+ assert(p->second == bio);
+ next_io_queue.erase(p);
+ }
+
return 0;
}
int BlockDevice::count_io(block_t start, block_t len)
{
+ assert(0);
lock.Lock();
int n = 0;
multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(start);
friend ostream& operator<<(ostream& out, biovec &bio);
- interval_set<block_t> io_block_lock; // blocks currently dispatched to kernel
- multimap<block_t, biovec*> io_queue;
+ interval_set<block_t> io_block_lock; // blocks currently dispatched to kernel
+ multimap<block_t, biovec*> io_queue, next_io_queue;
map<biovec*, block_t> io_queue_map;
Cond io_wakeup;
+ bool use_next_queue;
bool io_stop;
int io_threads_started, io_threads_running;
BlockDevice(char *d) :
dev(d), fd(0), num_blocks(0),
idle_kicker(0),
+ use_next_queue(false),
io_stop(false), io_threads_started(0), io_threads_running(0),
el_dir_forward(true), el_pos(0),
complete_thread(this)
bool is_idle();
+ void barrier();
+ void _bump_queue();
// ** blocking interface **