]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
FileJournal: batch pop and unpop from writeq and completions 6701/head
authorXinze Chi <xinze@xsky.com>
Fri, 11 Sep 2015 09:32:41 +0000 (17:32 +0800)
committerXinze Chi <xinze@xsky.com>
Fri, 11 Dec 2015 14:56:30 +0000 (22:56 +0800)
Signed-off-by: Xinze Chi <xinze@xsky.com>
src/os/FileJournal.cc
src/os/FileJournal.h

index 86d269b33f0b90c7454950060429661daa0b003d..70591974fed9be1d8854265bb593f6850341c821 100644 (file)
@@ -878,45 +878,57 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
     return -ENOSPC;
   
   while (!writeq_empty()) {
-    int r = prepare_single_write(bl, queue_pos, orig_ops, orig_bytes);
-    if (r == -ENOSPC) {
-      if (orig_ops)
-       break;         // commit what we have
-
-      if (logger)
-       logger->inc(l_os_j_full);
-
-      if (wait_on_full) {
-       dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
-      } else {
-       dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
-
-       // throw out what we have so far
-       full_state = FULL_FULL;
-       while (!writeq_empty()) {
-         put_throttle(1, peek_write().orig_len);
-         pop_write();
-       }  
-       print_header(header);
+    list<write_item> items;
+    batch_pop_write(items);
+    list<write_item>::iterator it = items.begin();
+    while (it != items.end()) {
+      int r = prepare_single_write(*it, bl, queue_pos, orig_ops, orig_bytes);
+      if (r == 0) { // prepare ok, delete it
+         items.erase(it++);
       }
-
-      return -ENOSPC;  // hrm, full on first op
-    }
-
-    if (eleft) {
-      if (--eleft == 0) {
-       dout(20) << "prepare_multi_write hit max events per write " << g_conf->journal_max_write_entries << dendl;
-       break;
+      if (r == -ENOSPC) {
+        // the journal maybe full, insert the left item to writeq
+        batch_unpop_write(items);
+        if (orig_ops)
+          goto out;         // commit what we have
+
+        if (logger)
+          logger->inc(l_os_j_full);
+
+        if (wait_on_full) {
+          dout(20) << "prepare_multi_write full on first entry, need to wait" << dendl;
+        } else {
+          dout(20) << "prepare_multi_write full on first entry, restarting journal" << dendl;
+
+          // throw out what we have so far
+          full_state = FULL_FULL;
+          while (!writeq_empty()) {
+            put_throttle(1, peek_write().orig_len);
+            pop_write();
+          }
+          print_header(header);
+        }
+        
+        return -ENOSPC;  // hrm, full on first op
       }
-    }
-    if (bmax) {
-      if (bl.length() >= bmax) {
-       dout(20) << "prepare_multi_write hit max write size " << g_conf->journal_max_write_bytes << dendl;
-       break;
+      if (eleft) {
+        if (--eleft == 0) {
+          dout(20) << "prepare_multi_write hit max events per write " << g_conf->journal_max_write_entries << dendl;
+          batch_unpop_write(items);
+          goto out;
+        }
+      }
+      if (bmax) {
+        if (bl.length() >= bmax) {
+          dout(20) << "prepare_multi_write hit max write size " << g_conf->journal_max_write_bytes << dendl;
+          batch_unpop_write(items);
+          goto out;
+        }
       }
     }
   }
 
+out:
   dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
   assert((write_pos + bl.length() == queue_pos) ||
          (write_pos + bl.length() - header.max_size + get_top() == queue_pos));
@@ -946,11 +958,13 @@ void FileJournal::queue_completions_thru(uint64_t seq)
 {
   assert(finisher_lock.is_locked());
   utime_t now = ceph_clock_now(g_ceph_context);
-  while (!completions_empty()) {
-    completion_item next = completion_peek_front();
+  list<completion_item> items;
+  batch_pop_completions(items);
+  list<completion_item>::iterator it = items.begin();
+  while (it != items.end()) {
+    completion_item& next = *it;
     if (next.seq > seq)
       break;
-    completion_pop_front();
     utime_t lat = now;
     lat -= next.start;
     dout(10) << "queue_completions_thru seq " << seq
@@ -964,14 +978,15 @@ void FileJournal::queue_completions_thru(uint64_t seq)
       finisher->queue(next.finish);
     if (next.tracked_op)
       next.tracked_op->mark_event("journaled_completion_queued");
+    items.erase(it++);
   }
+  batch_unpop_completions(items);
   finisher_cond.Signal();
 }
 
-int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
+
+int FileJournal::prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes)
 {
-  // grab next item
-  write_item &next_write = peek_write();
   uint64_t seq = next_write.seq;
   bufferlist &ebl = next_write.bl;
   off64_t size = ebl.length();
@@ -1010,8 +1025,6 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
   if (next_write.tracked_op)
     next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
 
-  // pop from writeq
-  pop_write();
   journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
   writing_seq = seq;
 
@@ -1666,6 +1679,20 @@ void FileJournal::pop_write()
   writeq.pop_front();
 }
 
+void FileJournal::batch_pop_write(list<write_item> &items)
+{
+  assert(write_lock.is_locked());
+  Mutex::Locker locker(writeq_lock);
+  writeq.swap(items);
+}
+
+void FileJournal::batch_unpop_write(list<write_item> &items)
+{
+  assert(write_lock.is_locked());
+  Mutex::Locker locker(writeq_lock);
+  writeq.splice(writeq.begin(), items);
+}
+
 void FileJournal::commit_start(uint64_t seq)
 {
   dout(10) << "commit_start" << dendl;
index 50bc8109b9e9305c16495278939178ca82bf1194..602a034805754cc852a6fe5f9c5a88418b25325d 100644 (file)
@@ -66,17 +66,27 @@ public:
 
   Mutex writeq_lock;
   Cond writeq_cond;
-  deque<write_item> writeq;
+  list<write_item> writeq;
   bool writeq_empty();
   write_item &peek_write();
   void pop_write();
+  void batch_pop_write(list<write_item> &items);
+  void batch_unpop_write(list<write_item> &items);
 
   Mutex completions_lock;
-  deque<completion_item> completions;
+  list<completion_item> completions;
   bool completions_empty() {
     Mutex::Locker l(completions_lock);
     return completions.empty();
   }
+  void batch_pop_completions(list<completion_item> &items) {
+    Mutex::Locker l(completions_lock);
+    completions.swap(items);
+  }
+  void batch_unpop_completions(list<completion_item> &items) {
+    Mutex::Locker l(completions_lock);
+    completions.splice(completions.begin(), items);
+  }
   completion_item completion_peek_front() {
     Mutex::Locker l(completions_lock);
     assert(!completions.empty());
@@ -311,7 +321,8 @@ private:
 
   int check_for_full(uint64_t seq, off64_t pos, off64_t size);
   int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
-  int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes);
+  int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
+    uint64_t& orig_ops, uint64_t& orig_bytes);
   void do_write(bufferlist& bl);
 
   void write_finish_thread_entry();