]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os: write file journal optimezation
authorXinze Chi <xinze@xsky.com>
Fri, 6 Nov 2015 13:44:38 +0000 (21:44 +0800)
committerXinze Chi <xinze@xsky.com>
Thu, 12 Nov 2015 21:54:52 +0000 (05:54 +0800)
Currently, there is single write thread for file journal, so it would be bottleneck.
It is important to keep logic of the journal write thread simple. According to the
implementation of transaction encoding, it is almost impossible that the write
bufferlist would be align. So write journal would call rebuild_aligned almost every time.
Because of the memory fragmentation, the bufferlist crc and rebuild would be bottleneck.

My implementation would move the complex logic out of journal write thread.

Signed-off-by: Xinze Chi <xinze@xsky.com>
Reviewed-by: Haomai Wang <haomai@xsky.com>
src/os/FileJournal.cc
src/os/FileJournal.h
src/os/FileStore.cc
src/os/Journal.h
src/os/JournalingObjectStore.cc
src/os/JournalingObjectStore.h

index 4fa419ccbf82a9f1ba83d50142470cdacbe1c54e..7a33d313fffde4962832764b8a66de5e5d9d9d20 100644 (file)
@@ -893,7 +893,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
        // throw out what we have so far
        full_state = FULL_FULL;
        while (!writeq_empty()) {
-         put_throttle(1, peek_write().bl.length());
+         put_throttle(1, peek_write().orig_len);
          pop_write();
        }  
        print_header(header);
@@ -973,54 +973,39 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
   write_item &next_write = peek_write();
   uint64_t seq = next_write.seq;
   bufferlist &ebl = next_write.bl;
-  unsigned head_size = sizeof(entry_header_t);
-  off64_t base_size = 2*head_size + ebl.length();
-
-  int alignment = next_write.alignment; // we want to start ebl with this alignment
-  unsigned pre_pad = 0;
-  if (alignment >= 0)
-    pre_pad = ((unsigned int)alignment - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
-  off64_t size = ROUND_UP_TO(base_size + pre_pad, header.alignment);
-  unsigned post_pad = size - base_size - pre_pad;
+  off64_t size = ebl.length();
 
   int r = check_for_full(seq, queue_pos, size);
   if (r < 0)
     return r;   // ENOSPC or EAGAIN
 
-  orig_bytes += ebl.length();
+  uint32_t orig_len = next_write.orig_len;
+  orig_bytes += orig_len;
   orig_ops++;
 
   // add to write buffer
   dout(15) << "prepare_single_write " << orig_ops << " will write " << queue_pos << " : seq " << seq
-          << " len " << ebl.length() << " -> " << size
-          << " (head " << head_size << " pre_pad " << pre_pad
-          << " ebl " << ebl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
-          << " (ebl alignment " << alignment << ")"
-          << dendl;
+          << " len " << orig_len << " -> " << size << dendl;
     
-  // add it this entry
-  entry_header_t h;
-  memset(&h, 0, sizeof(h));
-  h.seq = seq;
-  h.pre_pad = pre_pad;
-  h.len = ebl.length();
-  h.post_pad = post_pad;
-  h.make_magic(queue_pos, header.get_fsid64());
-  h.crc32c = ebl.crc32c(0);
-
-  bl.append((const char*)&h, sizeof(h));
-  if (pre_pad) {
-    bufferptr bp = buffer::create_static(pre_pad, zero_buf);
-    bl.push_back(bp);
-  }
-  bl.claim_append(ebl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
-
-  if (h.post_pad) {
-    bufferptr bp = buffer::create_static(post_pad, zero_buf);
-    bl.push_back(bp);
-  }
-  bl.append((const char*)&h, sizeof(h));
-
+  unsigned seq_offset = offsetof(entry_header_t, seq);
+  unsigned magic1_offset = offsetof(entry_header_t, magic1);
+  unsigned magic2_offset = offsetof(entry_header_t, magic2);
+
+  bufferptr headerptr = ebl.buffers().front();
+  uint64_t _seq = seq;
+  uint64_t _queue_pos = queue_pos;
+  uint64_t magic2 = entry_header_t::make_magic(seq, orig_len, header.get_fsid64());
+  headerptr.copy_in(seq_offset, sizeof(uint64_t), (char *)&_seq);
+  headerptr.copy_in(magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
+  headerptr.copy_in(magic2_offset, sizeof(uint64_t), (char *)&magic2);
+
+  bufferptr footerptr = ebl.buffers().back();
+  unsigned post_offset  = footerptr.length() - sizeof(entry_header_t);
+  footerptr.copy_in(post_offset + seq_offset, sizeof(uint64_t), (char *)&_seq);
+  footerptr.copy_in(post_offset + magic1_offset, sizeof(uint64_t), (char *)&_queue_pos);
+  footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
+
+  bl.claim_append(ebl);
   if (next_write.tracked_op)
     next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
 
@@ -1041,8 +1026,7 @@ void FileJournal::align_bl(off64_t pos, bufferlist& bl)
   // make sure list segments are page aligned
   if (directio && (!bl.is_aligned(block_size) ||
                   !bl.is_n_align_sized(CEPH_MINIMUM_BLOCK_SIZE))) {
-    bl.rebuild_aligned(CEPH_MINIMUM_BLOCK_SIZE);
-    dout(10) << __func__ << " total memcopy: " << bl.get_memcopy_count() << dendl;
+    assert(0 == "bl should be align");
     if ((bl.length() & (CEPH_MINIMUM_BLOCK_SIZE - 1)) != 0 ||
        (pos & (CEPH_MINIMUM_BLOCK_SIZE - 1)) != 0)
       dout(0) << "rebuild_page_aligned failed, " << bl << dendl;
@@ -1300,7 +1284,7 @@ void FileJournal::write_thread_entry()
       if (write_stop) {
        dout(20) << "write_thread_entry full and stopping, throw out queue and finish up" << dendl;
        while (!writeq_empty()) {
-         put_throttle(1, peek_write().bl.length());
+         put_throttle(1, peek_write().orig_len);
          pop_write();
        }  
        print_header(header);
@@ -1576,7 +1560,60 @@ void FileJournal::check_aio_completion()
 }
 #endif
 
-void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
+int FileJournal::prepare_entry(list<ObjectStore::Transaction*>& tls, bufferlist* tbl) {
+  dout(10) << "prepare_entry " << tls << dendl;
+  unsigned data_len = 0;
+  int data_align = -1; // -1 indicates that we don't care about the alignment
+  bufferlist bl;
+  for (list<ObjectStore::Transaction*>::iterator p = tls.begin();
+      p != tls.end(); ++p) {
+    ObjectStore::Transaction *t = *p;
+    if (t->get_data_length() > data_len &&
+     (int)t->get_data_length() >= g_conf->journal_align_min_size) {
+     data_len = t->get_data_length();
+     data_align = (t->get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
+    }
+    ::encode(*t, bl);
+  }
+  if (tbl->length()) {
+    bl.claim_append(*tbl);
+  }
+  // add it this entry
+  entry_header_t h;
+  unsigned head_size = sizeof(entry_header_t);
+  off64_t base_size = 2*head_size + bl.length();
+  memset(&h, 0, sizeof(h));
+  if (data_align >= 0)
+    h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
+  off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
+  unsigned post_pad = size - base_size - h.pre_pad;
+  h.len = bl.length();
+  h.post_pad = post_pad;
+  h.crc32c = bl.crc32c(0);
+  dout(10) << " len " << bl.length() << " -> " << size
+       << " (head " << head_size << " pre_pad " << h.pre_pad
+       << " bl " << bl.length() << " post_pad " << post_pad << " tail " << head_size << ")"
+       << " (bl alignment " << data_align << ")"
+       << dendl;
+  bufferlist ebl;
+  // header
+  ebl.append((const char*)&h, sizeof(h));
+  if (h.pre_pad) {
+    ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
+  }
+  // payload
+  ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
+  if (h.post_pad) {
+    ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
+  }
+  // footer
+  ebl.append((const char*)&h, sizeof(h));
+  ebl.rebuild_aligned(CEPH_MINIMUM_BLOCK_SIZE);
+  tbl->claim(ebl);
+  return h.len;
+}
+
+void FileJournal::submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
                               Context *oncommit, TrackedOpRef osd_op)
 {
   // dump on queue
@@ -1586,7 +1623,7 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
   assert(e.length() > 0);
 
   throttle_ops.take(1);
-  throttle_bytes.take(e.length());
+  throttle_bytes.take(orig_len);
   if (osd_op)
     osd_op->mark_event("commit_queued_for_journal_write");
   if (logger) {
@@ -1604,7 +1641,7 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
        seq, oncommit, ceph_clock_now(g_ceph_context), osd_op));
     if (writeq.empty())
       writeq_cond.Signal();
-    writeq.push_back(write_item(seq, e, alignment, osd_op));
+    writeq.push_back(write_item(seq, e, orig_len, osd_op));
   }
 }
 
@@ -1737,7 +1774,7 @@ void FileJournal::committed_thru(uint64_t seq)
     dout(15) << " dropping committed but unwritten seq " << peek_write().seq 
             << " len " << peek_write().bl.length()
             << dendl;
-    put_throttle(1, peek_write().bl.length());
+    put_throttle(1, peek_write().orig_len);
     pop_write();
   }
   
index fbe616d301ac70f001d1e546261a454288c4a4e4..50bc8109b9e9305c16495278939178ca82bf1194 100644 (file)
@@ -50,13 +50,13 @@ public:
   struct write_item {
     uint64_t seq;
     bufferlist bl;
-    int alignment;
+    uint32_t orig_len;
     TrackedOpRef tracked_op;
-    write_item(uint64_t s, bufferlist& b, int al, TrackedOpRef opref) :
-      seq(s), alignment(al), tracked_op(opref) {
+    write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
+      seq(s), orig_len(ol), tracked_op(opref) {
       bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
     }
-    write_item() : seq(0), alignment(0) {}
+    write_item() : seq(0), orig_len(0) {}
   };
 
   Mutex finisher_lock;
@@ -88,7 +88,9 @@ public:
     completions.pop_front();
   }
 
-  void submit_entry(uint64_t seq, bufferlist& bl, int alignment,
+  int prepare_entry(list<ObjectStore::Transaction*>& tls, bufferlist* tbl);
+
+  void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
                    Context *oncommit,
                    TrackedOpRef osd_op = TrackedOpRef());
   /// End protected by finisher_lock
@@ -203,14 +205,13 @@ public:
     uint64_t magic1;
     uint64_t magic2;
     
-    void make_magic(off64_t pos, uint64_t fsid) {
-      magic1 = pos;
-      magic2 = fsid ^ seq ^ len;
+    static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
+      return (fsid ^ seq ^ len);
     }
     bool check_magic(off64_t pos, uint64_t fsid) {
       return
-       magic1 == (uint64_t)pos &&
-       magic2 == (fsid ^ seq ^ len);
+    magic1 == (uint64_t)pos &&
+    magic2 == (fsid ^ seq ^ len);
     }
   } __attribute__((__packed__, aligned(4)));
 
@@ -220,7 +221,6 @@ private:
   string fn;
 
   char *zero_buf;
-
   off64_t max_size;
   size_t block_size;
   bool directio, aio, force_aio;
index 5b8c2b5edf39b586eb4e346a88b80f0227e32937..70b7cc92fd0f8bcb79751acbd0f6433bc60b6fed 100644 (file)
@@ -1959,7 +1959,10 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     journal->throttle();
     //prepare and encode transactions data out of lock
     bufferlist tbl;
-    int data_align = _op_journal_transactions_prepare(o->tls, tbl);
+    int orig_len = -1;
+    if (journal && journal->is_writeable()) {
+      orig_len = journal->prepare_entry(o->tls, &tbl);
+    }
     uint64_t op_num = submit_manager.op_submit_start();
     o->op = op_num;
 
@@ -1969,7 +1972,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     if (m_filestore_journal_parallel) {
       dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl;
       
-      _op_journal_transactions(tbl, data_align, o->op, ondisk, osd_op);
+      _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
       
       // queue inside submit_manager op submission lock
       queue_op(osr, o);
@@ -1978,7 +1981,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
       
       osr->queue_journal(o->op);
 
-      _op_journal_transactions(tbl, data_align, o->op,
+      _op_journal_transactions(tbl, orig_len, o->op,
                               new C_JournaledAhead(this, osr, o, ondisk),
                               osd_op);
     } else {
@@ -2011,7 +2014,10 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
 
   //prepare and encode transactions data out of lock
   bufferlist tbl;
-  int data_align = _op_journal_transactions_prepare(tls, tbl);
+  int orig_len = -1;
+  if (journal && journal->is_writeable()) {
+    orig_len = journal->prepare_entry(tls, &tbl);
+  }
   uint64_t op = submit_manager.op_submit_start();
   dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
 
@@ -2022,7 +2028,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   int r = do_transactions(tls, op);
     
   if (r >= 0) {
-    _op_journal_transactions(tbl, data_align, op, ondisk, osd_op);
+    _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
   } else {
     delete ondisk;
   }
index 4f8658fb418f279fa604e65b201aa456ca730c07..d5b918698e496ea9fc5141ccadf32c2f7e4bd025 100644 (file)
@@ -22,6 +22,7 @@
 #include "include/Context.h"
 #include "common/Finisher.h"
 #include "common/TrackedOp.h"
+#include "os/ObjectStore.h"
 
 class PerfCounters;
 
@@ -57,7 +58,7 @@ public:
   // writes
   virtual bool is_writeable() = 0;
   virtual int make_writeable() = 0;
-  virtual void submit_entry(uint64_t seq, bufferlist& e, int alignment,
+  virtual void submit_entry(uint64_t seq, bufferlist& e, uint32_t orig_len,
                            Context *oncommit,
                            TrackedOpRef osd_op = TrackedOpRef()) = 0;
   virtual void commit_start(uint64_t seq) = 0;
@@ -71,6 +72,8 @@ public:
 
   virtual bool should_commit_now() = 0;
 
+  virtual int prepare_entry(list<ObjectStore::Transaction*>& tls, bufferlist* tbl) = 0;
+
   // reads/recovery
   
 };
index 35cf74ae6ae51eb62724fb99704c0844206bd623..599a1b568cb65e1e6375a2d79daa6be27c4ee2bd 100644 (file)
@@ -251,7 +251,7 @@ void JournalingObjectStore::ApplyManager::commit_finish()
 }
 
 void JournalingObjectStore::_op_journal_transactions(
-  bufferlist& tbl, int data_align,  uint64_t op,
+  bufferlist& tbl, uint32_t orig_len, uint64_t op,
   Context *onjournal, TrackedOpRef osd_op)
 {
   if (osd_op.get())
@@ -261,27 +261,9 @@ void JournalingObjectStore::_op_journal_transactions(
     dout(10) << "op_journal_transactions " << op  << dendl;
 
   if (journal && journal->is_writeable()) {
-    journal->submit_entry(op, tbl, data_align, onjournal, osd_op);
+    journal->submit_entry(op, tbl, orig_len, onjournal, osd_op);
   } else if (onjournal) {
     apply_manager.add_waiter(op, onjournal);
   }
 }
 
-int JournalingObjectStore::_op_journal_transactions_prepare(
-  list<ObjectStore::Transaction*>& tls, bufferlist& tbl)
-{
-  dout(10) << "_op_journal_transactions_prepare " << tls << dendl;
-  unsigned data_len = 0;
-  int data_align = -1; // -1 indicates that we don't care about the alignment
-  for (list<ObjectStore::Transaction*>::iterator p = tls.begin();
-      p != tls.end(); ++p) {
-    ObjectStore::Transaction *t = *p;
-    if (t->get_data_length() > data_len &&
-     (int)t->get_data_length() >= g_conf->journal_align_min_size) {
-     data_len = t->get_data_length();
-     data_align = (t->get_data_alignment() - tbl.length()) & ~CEPH_PAGE_MASK;
-    }
-    ::encode(*t, tbl);
-  }
-  return data_align;
-}
index fbfa20ce0f1f53207940c2b602726af74c869e8b..42d13f6491a46f9e09828a2f781493b06762640b 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "ObjectStore.h"
 #include "Journal.h"
+#include "FileJournal.h"
 #include "common/RWLock.h"
 
 class JournalingObjectStore : public ObjectStore {
@@ -114,9 +115,7 @@ protected:
   void journal_write_close();
   int journal_replay(uint64_t fs_op_seq);
 
-  int _op_journal_transactions_prepare(
-    list<ObjectStore::Transaction*>& tls, bufferlist& tbl);
-  void _op_journal_transactions(bufferlist& tls, int data_align, uint64_t op,
+  void _op_journal_transactions(bufferlist& tls, uint32_t orig_len, uint64_t op,
                                Context *onjournal, TrackedOpRef osd_op);
 
   virtual int do_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op_seq) = 0;
@@ -136,7 +135,9 @@ public:
       finisher(g_ceph_context),
       apply_manager(journal, finisher),
       replaying(false) {}
-  
+
+  ~JournalingObjectStore() {
+  }
 };
 
 #endif