]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
bdev barrier
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 22 Sep 2006 01:17:38 +0000 (01:17 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 22 Sep 2006 01:17:38 +0000 (01:17 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@869 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/ebofs/BlockDevice.cc
ceph/ebofs/BlockDevice.h
ceph/ebofs/Ebofs.cc

index 2e72fa63dbf2da1f048561187d675d75c226d927..5c3530518eb9ac4be3265fd70f3e3ada4b466aa3 100644 (file)
@@ -85,6 +85,26 @@ block_t BlockDevice::get_num_blocks()
 }
 
 
+void BlockDevice::barrier()
+{
+  lock.Lock();
+  dout(10) << "barrier" << endl;
+  if (!use_next_queue &&
+         !io_queue.empty()) 
+       use_next_queue = true;
+  lock.Unlock();
+}
+
+void BlockDevice::_bump_queue()
+{
+  if (io_queue.empty() && use_next_queue) {
+       dout(10) << "_bump_queue next_io_queue (" << next_io_queue.size() 
+                         << ") -> io_queue (" << io_queue.size() << ")" << endl;  // empty, duh.
+       use_next_queue = false;
+       io_queue.swap(next_io_queue);
+  }
+}
+
 void* BlockDevice::io_thread_entry()
 {
   lock.Lock();
@@ -193,6 +213,7 @@ void* BlockDevice::io_thread_entry()
                  // dequeue
                  io_queue_map.erase(bio);
                  io_queue.erase(prev);
+                 _bump_queue();
                  
                  if (at_end) break;
                }
@@ -253,7 +274,9 @@ void* BlockDevice::io_thread_entry()
          
          // sleep
          io_threads_running--;
-         dout(20) << "io_thread" << whoami << " sleeping, " << io_threads_running << " threads now running" << endl;
+         dout(20) << "io_thread" << whoami << " sleeping, " << io_threads_running << " threads now running," 
+                          << " queue has " << io_queue.size() << " / " << next_io_queue.size()
+                          << endl;
 
          if (g_conf.bdev_idle_kick_after_ms > 0 &&
                  io_threads_running == 0 && 
@@ -413,31 +436,37 @@ void BlockDevice::_submit_io(biovec *b)
   // NOTE: lock must be held
   dout(15) << "_submit_io " << *b << endl;
   
-  // wake up io_thread(s)?
-  if ((int)io_queue.size() == io_threads_running) 
-       io_wakeup.SignalOne();
-  else if ((int)io_queue.size() > io_threads_running) 
-       io_wakeup.SignalAll();
-
-  // [DEBUG] check for overlapping ios
-  if (g_conf.bdev_debug_check_io_overlap) {
-       // BUG: this doesn't catch everything!  eg 1~10000000 will be missed....
-       multimap<block_t, biovec*>::iterator p = io_queue.lower_bound(b->start);
-       if ((p != io_queue.end() &&
-                p->first < b->start+b->length) ||
-               (p != io_queue.begin() && 
-                (p--, p->second->start + p->second->length > b->start))) {
-         dout(1) << "_submit_io new io " << *b 
-                         << " overlaps with existing " << *p->second << endl;
-         cerr << "_submit_io new io " << *b 
-                         << " overlaps with existing " << *p->second << endl;
+  if (use_next_queue) {
+       // queue in next queue
+       assert(!io_queue.empty());
+       next_io_queue.insert(pair<block_t,biovec*>(b->start, b));
+  } else {
+       // wake up io_thread(s)?
+       if ((int)io_queue.size() == io_threads_running) 
+         io_wakeup.SignalOne();
+       else if ((int)io_queue.size() > io_threads_running) 
+         io_wakeup.SignalAll();
+       
+       // [DEBUG] check for overlapping ios
+       // BUG: this doesn't detect all overlaps w/ the next queue thing.
+       if (g_conf.bdev_debug_check_io_overlap) {
+         // BUG: this doesn't catch everything!  eg 1~10000000 will be missed....
+         multimap<block_t, biovec*>::iterator p = io_queue.lower_bound(b->start);
+         if ((p != io_queue.end() &&
+                  p->first < b->start+b->length) ||
+                 (p != io_queue.begin() && 
+                  (p--, p->second->start + p->second->length > b->start))) {
+               dout(1) << "_submit_io new io " << *b 
+                               << " overlaps with existing " << *p->second << endl;
+               cerr << "_submit_io new io " << *b 
+                        << " overlaps with existing " << *p->second << endl;
+         }
        }
+       
+       // queue anew
+       io_queue.insert(pair<block_t,biovec*>(b->start, b));
   }
-
-  // queue anew
-  io_queue.insert(pair<block_t,biovec*>(b->start, b));
   io_queue_map[b] = b->start;
-
 }
 
 int BlockDevice::_cancel_io(biovec *bio) 
@@ -451,11 +480,20 @@ int BlockDevice::_cancel_io(biovec *bio)
   dout(15) << "_cancel_io " << *bio << endl;
 
   block_t b = io_queue_map[bio];
-  multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(b);
-  while (p->second != bio) p++;
-  assert(p->second == bio);
   io_queue_map.erase(bio);
-  io_queue.erase(p);
+  
+  multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(b);
+  while (p->first == b && p->second != bio) p++;
+  if (p->first == b && p->second == bio) {
+       io_queue.erase(p);
+       _bump_queue();
+  } else {
+       p = next_io_queue.lower_bound(b);
+       while (p->first == b && p->second != bio) p++;
+       assert(p->second == bio);
+       next_io_queue.erase(p);
+  }
   return 0;
 }
 
@@ -463,6 +501,7 @@ int BlockDevice::_cancel_io(biovec *bio)
 
 int BlockDevice::count_io(block_t start, block_t len)
 {
+  assert(0);
   lock.Lock();
   int n = 0;
   multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(start);
index dbc15390606a75d781ec0f5208dd7c44e45f0b5e..4f2d4b60574ce6caac97a89abf23bde5ac0412de 100644 (file)
@@ -76,10 +76,11 @@ class BlockDevice {
   friend ostream& operator<<(ostream& out, biovec &bio);
 
 
-  interval_set<block_t>       io_block_lock;    // blocks currently dispatched to kernel
-  multimap<block_t, biovec*> io_queue;
+  interval_set<block_t>      io_block_lock;    // blocks currently dispatched to kernel
+  multimap<block_t, biovec*> io_queue, next_io_queue;
   map<biovec*, block_t>      io_queue_map;
   Cond                       io_wakeup;
+  bool                       use_next_queue;
   bool                       io_stop;
   int                        io_threads_started, io_threads_running;
   
@@ -131,6 +132,7 @@ class BlockDevice {
   BlockDevice(char *d) : 
        dev(d), fd(0), num_blocks(0),
        idle_kicker(0),
+       use_next_queue(false),
        io_stop(false), io_threads_started(0), io_threads_running(0),
        el_dir_forward(true), el_pos(0),
        complete_thread(this) 
@@ -151,6 +153,8 @@ class BlockDevice {
 
   bool is_idle();
 
+  void barrier();
+  void _bump_queue();
 
   // ** blocking interface **
 
index e31e3a1e0417444844396ff2ad08a5ae5b373fac..bee8f4cb477f8f37903b875570b05c2097bad806 100644 (file)
@@ -401,6 +401,9 @@ int Ebofs::commit_thread_entry()
          // (async) write btree nodes
          nodepool.commit_start( dev, super_epoch );
          
+         // blockdev barrier (prioritize our writes!)
+         dev.barrier();
+
          // prepare super (before any changes get made!)
          bufferptr superbp;
          prepare_super(super_epoch, superbp);