]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Journaler ignores fragmented entries at end of journal.
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 20 Nov 2006 22:16:34 +0000 (22:16 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 20 Nov 2006 22:16:34 +0000 (22:16 +0000)
does not truncate them, though!

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@962 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osdc/Journaler.cc

index 7a3dbf0823d1008a025576c95d8716de441aae7a..ab1f7813258dfa81429434875e894b57d770c06b 100644 (file)
@@ -225,10 +225,34 @@ void Journaler::_finish_flush(int r, off_t start)
 
 off_t Journaler::append_entry(bufferlist& bl, Context *onsync)
 {
+  size_t s = bl.length();
+
+  if (!g_conf.journaler_allow_split_entries) {
+    // will we span a stripe boundary?
+    int p = inode.layout.stripe_size;
+    if (write_pos / p != (write_pos + bl.length() + sizeof(s)) / p) {
+      // yes.
+      // move write_pos forward.
+      off_t owp = write_pos;
+      write_pos += p;
+      write_pos -= (write_pos % p);
+      
+      // pad with zeros.
+      bufferptr bp(write_pos - owp);
+      bp.zero();
+      assert(bp.length() >= 4);
+      write_buf.push_back(bp);
+      
+      // now flush.
+      flush();
+      
+      dout(12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << endl;
+    }
+  }
+       
   dout(10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(size_t)) << endl;
   
   // append
-  size_t s = bl.length();
   write_buf.append((char*)&s, sizeof(s));
   write_buf.append(bl);
   write_pos += sizeof(s) + s;
@@ -439,23 +463,37 @@ bool Journaler::is_readable()
   if (read_pos == write_pos) return false;
 
   // have enough for entry size?
-  size_t s;
-  if (read_buf.length() < sizeof(s)) {
-    if (!_is_reading())
-      _issue_read(fetch_len);
-    return false;
-  }
-  read_buf.copy(0, sizeof(s), (char*)&s);
+  size_t s = 0;
+  if (read_buf.length() >= sizeof(s)) 
+    read_buf.copy(0, sizeof(s), (char*)&s);
+
+  // entry and payload?
+  if (read_buf.length() >= sizeof(s) &&
+      read_buf.length() >= sizeof(s) + s) 
+    return true;  // yep, next entry is ready.
+
+  // darn it!
 
-  // have entirely of next entry?
-  if (read_buf.length() < sizeof(s) + s) {
-    if (!_is_reading())
-      _issue_read(MAX(fetch_len, sizeof(s)+s-read_buf.length())); 
+  // partial fragment at the end?
+  if (received_pos == write_pos) {
+    dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << endl;
+    write_pos = flush_pos = ack_pos = read_pos;
+    assert(write_buf.length() == 0);
+
+    // truncate?
+    // FIXME: how much?
+    
     return false;
+  } 
+
+  // start reading some more?
+  if (!_is_reading()) {
+    if (s)
+      fetch_len = MAX(fetch_len, sizeof(s)+s-read_buf.length()); 
+    _issue_read(fetch_len);
   }
-  
-  // next entry is ready!
-  return true;
+
+  return false;
 }