]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: group entries into single io in directio mode
authorSage Weil <sage@newdream.net>
Sat, 30 Jan 2010 00:34:17 +0000 (16:34 -0800)
committerSage Weil <sage@newdream.net>
Mon, 1 Feb 2010 21:49:57 +0000 (13:49 -0800)
src/include/buffer.h
src/os/FileJournal.cc
src/os/FileJournal.h

index 43ce02ba0bfc505d2261feb9029de73852a732aa..936650986b4bf3a899f299b04c585fe7a3ca93d9 100644 (file)
@@ -804,8 +804,15 @@ public:
       }
     }
 
+    bool is_contiguous() {
+      return &(*_buffers.begin()) == &(*_buffers.rbegin());
+    }
     void rebuild() {
-      ptr nb(_len);
+      ptr nb;
+      if ((_len & ~PAGE_MASK) == 0)
+       nb = buffer::create_page_aligned(_len);
+      else
+       nb = buffer::create(_len);
       unsigned pos = 0;
       for (std::list<ptr>::iterator it = _buffers.begin();
           it != _buffers.end();
index 474859bddb39d96ec64ccd3f1c512694786cf64c..72321f2ec9c8bbf1d74e3d76eb8d79e80edefe88 100644 (file)
@@ -322,7 +322,7 @@ bool FileJournal::check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can
       // yes!
       dout(10) << " wrapping from " << *pos << " to " << get_top() << dendl;
       header.wrap = *pos;
-      *pos = get_top();
+      write_pos = *pos = get_top();
       must_write_header = true;
       return true;
     }
@@ -353,122 +353,77 @@ void FileJournal::prepare_multi_write(bufferlist& bl)
   off64_t queue_pos = write_pos;
 
   int eleft = g_conf.journal_max_write_entries;
-  int bleft = g_conf.journal_max_write_bytes;
+  unsigned bmax = g_conf.journal_max_write_bytes;
 
   if (full_commit_seq || full_restart_seq)
     return;
-
+  
   while (!writeq.empty()) {
-    // grab next item
-    __u64 seq = writeq.front().seq;
-    bufferlist &ebl = writeq.front().bl;
-    off64_t size = 2*sizeof(entry_header_t) + ebl.length();
-
     bool can_wrap = !bl.length();  // only wrap if this is a new thinger
-    if (!check_for_wrap(seq, &queue_pos, size, can_wrap))
-      break;
-
-    // set write_pos?  (check_for_wrap may have moved it)
-    if (!bl.length())
-      write_pos = queue_pos;
-    
-    // add to write buffer
-    dout(15) << "prepare_multi_write will write " << queue_pos << " : seq " << seq
-            << " len " << ebl.length() << " -> " << size
-            << " (left " << eleft << "/" << bleft << ")"
-            << dendl;
-    
-    // add it this entry
-    entry_header_t h;
-    h.seq = seq;
-    h.flags = 0;
-    h.len = ebl.length();
-    h.pre_pad = 0;
-    h.post_pad = 0;
-    h.make_magic(queue_pos, header.fsid);
-
-    // pad?
-    if ((queue_pos + size) % header.alignment) {
-      h.post_pad = header.alignment - ((queue_pos + size) % header.alignment);
-      size += h.post_pad;
-      //dout(20) << "   padding with " << h.post_pad << " bytes, queue_pos now " << queue_pos << dendl;
-    }
-    dout(15) << " pad " << h.pre_pad << " " << h.post_pad << " len " << h.len << dendl;
-
-    bl.append((const char*)&h, sizeof(h));
-    bl.claim_append(ebl);
-    if (h.post_pad) {
-      bufferptr bp = buffer::create_static(h.post_pad, zero_buf);
-      bl.push_back(bp);
-    }
-    bl.append((const char*)&h, sizeof(h));
     
-    if (writeq.front().fin) {
-      writing_seq.push_back(seq);
-      writing_fin.push_back(writeq.front().fin);
-    }
-
-    // pop from writeq
-    writeq.pop_front();
-    journalq.push_back(pair<__u64,off64_t>(seq, queue_pos));
-
-    queue_pos += size;
+    bool r = prepare_single_write(bl, queue_pos, can_wrap);
+    if (!r)
+      break;
 
     if (eleft) {
       if (--eleft == 0) {
-       dout(20) << "    hit max events per write " << g_conf.journal_max_write_entries << dendl;
+       dout(20) << "prepare_multi_write hit max events per write " << g_conf.journal_max_write_entries << dendl;
        break;
       }
     }
-    if (bleft) {
-      bleft -= size;
-      if (bleft == 0) {
-       dout(20) << "    hit max write size " << g_conf.journal_max_write_bytes << dendl;
+    if (bmax) {
+      if (bl.length() >= bmax) {
+       dout(20) << "prepare_multi_write hit max write size " << g_conf.journal_max_write_bytes << dendl;
        break;
       }
     }
   }
+
+  assert(write_pos + bl.length() == queue_pos);
 }
 
-bool FileJournal::prepare_single_dio_write(bufferlist& bl)
+bool FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap)
 {
   // grab next item
   __u64 seq = writeq.front().seq;
   bufferlist &ebl = writeq.front().bl;
-    
   off64_t base_size = 2*sizeof(entry_header_t) + ebl.length();
   off64_t size = ROUND_UP_TO(base_size, header.alignment);
-  
-  if (!check_for_wrap(seq, &write_pos, size, true))
+
+  if (!check_for_wrap(seq, &queue_pos, size, can_wrap))
     return false;
-  if (full_commit_seq || full_restart_seq) return false;
-
-  // build it
-  dout(15) << "prepare_single_dio_write will write " << write_pos << " : seq " << seq
-          << " len " << ebl.length() << " -> " << size << " (base " << base_size << ")" << dendl;
-
-  bufferptr bp = buffer::create_page_aligned(size);
-  entry_header_t *h = (entry_header_t*)bp.c_str();
-  h->seq = seq;
-  h->flags = 0;
-  h->len = ebl.length();
-  h->pre_pad = 0;
-  h->post_pad = size - base_size;
-  dout(15) << " pad " << h->pre_pad << " " << h->post_pad << dendl;
-  h->make_magic(write_pos, header.fsid);
-  ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h)+h->pre_pad);
-  memcpy(bp.c_str() + sizeof(*h) + h->pre_pad + ebl.length() + h->post_pad, h, sizeof(*h));
-  bl.push_back(bp);
-  
+
+  // add to write buffer
+  dout(15) << "prepare_single_write will write " << queue_pos << " : seq " << seq
+          << " len " << ebl.length() << " -> " << size
+          << dendl;
+    
+  // add it this entry
+  entry_header_t h;
+  h.seq = seq;
+  h.pre_pad = 0;
+  h.len = ebl.length();
+  h.post_pad = size - base_size;
+  h.make_magic(queue_pos, header.fsid);
+
+  bl.append((const char*)&h, sizeof(h));
+  bl.claim_append(ebl);
+  if (h.post_pad) {
+    bufferptr bp = buffer::create_static(h.post_pad, zero_buf);
+    bl.push_back(bp);
+  }
+  bl.append((const char*)&h, sizeof(h));
+   
   if (writeq.front().fin) {
     writing_seq.push_back(seq);
     writing_fin.push_back(writeq.front().fin);
   }
-  
+
   // pop from writeq
   writeq.pop_front();
-  journalq.push_back(pair<__u64,off64_t>(seq, write_pos));
+  journalq.push_back(pair<__u64,off64_t>(seq, queue_pos));
+
+  queue_pos += size;
 
   return true;
 }
@@ -507,6 +462,14 @@ void FileJournal::do_write(bufferlist& bl)
   off64_t pos = write_pos;
   ::lseek64(fd, write_pos, SEEK_SET);
 #endif
+
+  // make sure this is a single, contiguous buffer
+  if (directio && !bl.is_contiguous()) {
+    bl.rebuild();
+    //dout(0) << "len " << bl.length() << " page_mask " << PAGE_MASK << " ~" << ~PAGE_MASK << dendl;
+    assert((bl.length() & ~PAGE_MASK) == 0);
+  }
+
   for (list<bufferptr>::const_iterator it = bl.buffers().begin();
        it != bl.buffers().end();
        it++) {
@@ -567,10 +530,7 @@ void FileJournal::write_thread_entry()
     }
     
     bufferlist bl;
-    if (directio)
-      prepare_single_dio_write(bl);
-    else
-      prepare_multi_write(bl);
+    prepare_multi_write(bl);
     do_write(bl);
   }
 
index c4d2baa91535dc622aca4429024d34c2d9b21d1d..c0a95054d585e56c34e892b005b927a2b19e6bff 100644 (file)
@@ -116,8 +116,9 @@ private:
   void write_thread_entry();
 
   bool check_for_wrap(__u64 seq, off64_t *pos, off64_t size, bool can_wrap);
-  bool prepare_single_dio_write(bufferlist& bl);
   void prepare_multi_write(bufferlist& bl);
+  bool prepare_single_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap);
+  bool prepare_single_dio_write(bufferlist& bl, off64_t& queue_pos, bool can_wrap);
   void do_write(bufferlist& bl);
 
   class Writer : public Thread {