]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: fix broken append batching implementation
authorJason Dillaman <dillaman@redhat.com>
Thu, 13 Jun 2019 14:21:33 +0000 (10:21 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 19 Jun 2019 14:37:22 +0000 (10:37 -0400)
The original flush_interval/bytes/age batching options no longer
actually do anything. Integrate these better with the max in-flight
AIO handling.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/test/journal/test_ObjectRecorder.cc

index ab5aa68605e58777252d7672deed4aea2f9fb1a9..bf795e0e6dd9d3c85ece9834fbcf252764b3af20 100644 (file)
@@ -134,7 +134,9 @@ Future JournalRecorder::append(uint64_t tag_tid,
         entry_bl);
   ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
 
-  bool object_full = object_ptr->append_unlock({{future, entry_bl}});
+  bool object_full = object_ptr->append({{future, entry_bl}});
+  m_object_locks[splay_offset]->Unlock();
+
   if (object_full) {
     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
                      << dendl;
@@ -245,12 +247,10 @@ void JournalRecorder::open_object_set() {
       ceph_assert(object_recorder->is_closed());
 
       // ready to close object and open object in active set
-      create_next_object_recorder_unlock(object_recorder);
-    } else {
-      uint8_t splay_offset = object_number % splay_width;
-      m_object_locks[splay_offset]->Unlock();
+      create_next_object_recorder(object_recorder);
     }
   }
+  unlock_object_recorders();
 }
 
 bool JournalRecorder::close_object_set(uint64_t active_set) {
@@ -286,13 +286,12 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
     object_number, lock, m_journal_metadata->get_work_queue(),
-    m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
     &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
     m_flush_bytes, m_flush_age, m_max_in_flight_appends));
   return object_recorder;
 }
 
-void JournalRecorder::create_next_object_recorder_unlock(
+void JournalRecorder::create_next_object_recorder(
     ObjectRecorderPtr object_recorder) {
   ceph_assert(m_lock.is_locked());
 
@@ -318,7 +317,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
       new_object_recorder->get_object_number());
   }
 
-  new_object_recorder->append_unlock(std::move(append_buffers));
+  new_object_recorder->append(std::move(append_buffers));
   m_object_ptrs[splay_offset] = new_object_recorder;
 }
 
index bdeaab58a81aee8c08ab23592261afdea70db6d8..c27520a758954f886a8088150bace2b0f32dad1a 100644 (file)
@@ -109,7 +109,7 @@ private:
 
   ObjectRecorderPtr create_object_recorder(uint64_t object_number,
                                            std::shared_ptr<Mutex> lock);
-  void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
+  void create_next_object_recorder(ObjectRecorderPtr object_recorder);
 
   void handle_update();
 
index eca0ea5655dbd41a89f3ce4a86c904352c933947..162dfe90c0899e9fc6b3ec293d6aec67053c0803 100644 (file)
@@ -20,20 +20,18 @@ namespace journal {
 
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                                uint64_t object_number, shared_ptr<Mutex> lock,
-                               ContextWQ *work_queue, SafeTimer &timer,
-                               Mutex &timer_lock, Handler *handler,
+                               ContextWQ *work_queue, Handler *handler,
                                uint8_t order, uint32_t flush_interval,
                                uint64_t flush_bytes, double flush_age,
-                               uint64_t max_in_flight_appends)
+                               int32_t max_in_flight_appends)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
-    m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
-    m_timer_lock(timer_lock), m_handler(handler), m_order(order),
-    m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
-    m_flush_bytes(flush_bytes), m_flush_age(flush_age),
-    m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
-    m_lock(lock), m_append_tid(0), m_pending_bytes(0),
-    m_size(0), m_overflowed(false), m_object_closed(false),
-    m_in_flight_flushes(false), m_aio_scheduled(false) {
+    m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
+    m_order(order), m_soft_max_size(1 << m_order),
+    m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
+    m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends),
+    m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()),
+    m_append_tid(0), m_overflowed(false), m_object_closed(false),
+    m_in_flight_flushes(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
   ceph_assert(m_handler != NULL);
@@ -42,54 +40,35 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
 
 ObjectRecorder::~ObjectRecorder() {
   ldout(m_cct, 20) << dendl;
-  ceph_assert(m_append_task == NULL);
-  ceph_assert(m_append_buffers.empty());
+  ceph_assert(m_pending_buffers.empty());
   ceph_assert(m_in_flight_tids.empty());
   ceph_assert(m_in_flight_appends.empty());
-  ceph_assert(!m_aio_scheduled);
 }
 
-bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
-  ceph_assert(m_lock->is_locked());
+bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
   ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
 
-  FutureImplPtr last_flushed_future;
-  bool schedule_append = false;
-
-  if (m_overflowed) {
-    m_append_buffers.insert(m_append_buffers.end(),
-                            append_buffers.begin(), append_buffers.end());
-    m_lock->Unlock();
-    ldout(m_cct, 20) << "already overflowed" << dendl;
-    return false;
-  }
+  ceph_assert(m_lock->is_locked());
 
-  for (AppendBuffers::const_iterator iter = append_buffers.begin();
-       iter != append_buffers.end(); ++iter) {
-    if (append(*iter, &schedule_append)) {
-      last_flushed_future = iter->first;
+  FutureImplPtr last_flushed_future;
+  for (auto& append_buffer : append_buffers) {
+    ldout(m_cct, 20) << *append_buffer.first << ", "
+                     << "size=" << append_buffer.second.length() << dendl;
+    bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+    if (flush_requested) {
+      last_flushed_future = append_buffer.first;
     }
-  }
 
-  if (last_flushed_future) {
-    flush(last_flushed_future);
-    m_lock->Unlock();
-  } else {
-    m_lock->Unlock();
-    if (schedule_append) {
-      schedule_append_task();
-    } else {
-      cancel_append_task();
-    }
+    m_pending_buffers.push_back(append_buffer);
+    m_pending_bytes += append_buffer.second.length();
   }
-  return (!m_object_closed && !m_overflowed &&
-          m_size + m_pending_bytes >= m_soft_max_size);
+
+  return send_appends(!!last_flushed_future, last_flushed_future);
 }
 
 void ObjectRecorder::flush(Context *on_safe) {
   ldout(m_cct, 20) << dendl;
 
-  cancel_append_task();
   Future future;
   {
     Mutex::Locker locker(*m_lock);
@@ -102,11 +81,7 @@ void ObjectRecorder::flush(Context *on_safe) {
     }
 
     // attach the flush to the most recent append
-    if (!m_append_buffers.empty()) {
-      future = Future(m_append_buffers.rbegin()->first);
-
-      flush_appends(true);
-    } else if (!m_pending_buffers.empty()) {
+    if (!m_pending_buffers.empty()) {
       future = Future(m_pending_buffers.rbegin()->first);
     } else if (!m_in_flight_appends.empty()) {
       AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
@@ -116,7 +91,11 @@ void ObjectRecorder::flush(Context *on_safe) {
   }
 
   if (future.is_valid()) {
-    future.flush(on_safe);
+    // cannot be invoked while the same lock context
+    m_op_work_queue->queue(new FunctionContext(
+      [future, on_safe] (int r) mutable {
+        future.flush(on_safe);
+      }));
   } else {
     on_safe->complete(0);
   }
@@ -125,38 +104,24 @@ void ObjectRecorder::flush(Context *on_safe) {
 void ObjectRecorder::flush(const FutureImplPtr &future) {
   ldout(m_cct, 20) << "flushing " << *future << dendl;
 
-  ceph_assert(m_lock->is_locked());
-
+  m_lock->Lock();
   if (future->get_flush_handler().get() != &m_flush_handler) {
     // if we don't own this future, re-issue the flush so that it hits the
     // correct journal object owner
     future->flush();
+    m_lock->Unlock();
     return;
   } else if (future->is_flush_in_progress()) {
+    m_lock->Unlock();
     return;
   }
 
-  if (m_object_closed || m_overflowed) {
-    return;
-  }
-
-  AppendBuffers::reverse_iterator r_it;
-  for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
-       ++r_it) {
-    if (r_it->first == future) {
-      break;
-    }
+  bool overflowed = send_appends(true, future);
+  if (overflowed) {
+    notify_handler_unlock();
+  } else {
+    m_lock->Unlock();
   }
-  ceph_assert(r_it != m_append_buffers.rend());
-
-  auto it = (++r_it).base();
-  ceph_assert(it != m_append_buffers.end());
-  ++it;
-
-  AppendBuffers flush_buffers;
-  flush_buffers.splice(flush_buffers.end(), m_append_buffers,
-                       m_append_buffers.begin(), it);
-  send_appends(&flush_buffers);
 }
 
 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
@@ -166,93 +131,24 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
   ceph_assert(m_in_flight_tids.empty());
   ceph_assert(m_in_flight_appends.empty());
   ceph_assert(m_object_closed || m_overflowed);
-  append_buffers->splice(append_buffers->end(), m_append_buffers,
-                         m_append_buffers.begin(), m_append_buffers.end());
+
+  for (auto& append_buffer : m_pending_buffers) {
+    ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
+    append_buffer.first->detach();
+  }
+  append_buffers->splice(append_buffers->end(), m_pending_buffers,
+                         m_pending_buffers.begin(), m_pending_buffers.end());
 }
 
 bool ObjectRecorder::close() {
   ceph_assert(m_lock->is_locked());
 
   ldout(m_cct, 20) << dendl;
-
-  cancel_append_task();
-
-  flush_appends(true);
+  send_appends(true, {});
 
   ceph_assert(!m_object_closed);
   m_object_closed = true;
-  return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
-}
-
-void ObjectRecorder::handle_append_task() {
-  ceph_assert(m_timer_lock.is_locked());
-  m_append_task = NULL;
-
-  Mutex::Locker locker(*m_lock);
-  flush_appends(true);
-}
-
-void ObjectRecorder::cancel_append_task() {
-  Mutex::Locker locker(m_timer_lock);
-  if (m_append_task != NULL) {
-    ldout(m_cct, 20) << dendl;
-    m_timer.cancel_event(m_append_task);
-    m_append_task = NULL;
-  }
-}
-
-void ObjectRecorder::schedule_append_task() {
-  Mutex::Locker locker(m_timer_lock);
-  if (m_append_task == nullptr && m_flush_age > 0) {
-    ldout(m_cct, 20) << dendl;
-    m_append_task = m_timer.add_event_after(
-      m_flush_age, new FunctionContext([this](int) {
-         handle_append_task();
-       }));
-  }
-}
-
-bool ObjectRecorder::append(const AppendBuffer &append_buffer,
-                            bool *schedule_append) {
-  ceph_assert(m_lock->is_locked());
-  ldout(m_cct, 20) << "bytes=" << append_buffer.second.length() << dendl;
-
-  bool flush_requested = false;
-  if (!m_object_closed && !m_overflowed) {
-    flush_requested = append_buffer.first->attach(&m_flush_handler);
-  }
-
-  m_append_buffers.push_back(append_buffer);
-  m_pending_bytes += append_buffer.second.length();
-
-  if (!flush_appends(false)) {
-    *schedule_append = true;
-  }
-  return flush_requested;
-}
-
-bool ObjectRecorder::flush_appends(bool force) {
-  ceph_assert(m_lock->is_locked());
-  ldout(m_cct, 20) << "force=" << force << dendl;
-  if (m_object_closed || m_overflowed) {
-    ldout(m_cct, 20) << "already closed or overflowed" << dendl;
-    return true;
-  }
-
-  if (m_append_buffers.empty() ||
-      (!force &&
-       m_size + m_pending_bytes < m_soft_max_size &&
-       (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
-       (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
-    ldout(m_cct, 20) << "batching append" << dendl;
-    return false;
-  }
-
-  m_pending_bytes = 0;
-  AppendBuffers append_buffers;
-  append_buffers.swap(m_append_buffers);
-  send_appends(&append_buffers);
-  return true;
+  return (m_in_flight_tids.empty() && !m_in_flight_flushes);
 }
 
 void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
@@ -266,18 +162,14 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
     m_in_flight_tids.erase(tid_iter);
 
     InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
-    if (r == -EOVERFLOW || m_overflowed) {
-      if (iter != m_in_flight_appends.end()) {
-        ldout(m_cct, 10) << "append overflowed" << dendl;
-        m_overflowed = true;
-      } else {
-        // must have seen an overflow on a previous append op
-        ceph_assert(r == -EOVERFLOW && m_overflowed);
-      }
+    ceph_assert(iter != m_in_flight_appends.end());
+
+    if (r == -EOVERFLOW) {
+      ldout(m_cct, 10) << "append overflowed" << dendl;
+      m_overflowed = true;
 
       // notify of overflow once all in-flight ops are complete
-      if (m_in_flight_tids.empty() && !m_aio_scheduled) {
-        m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+      if (m_in_flight_tids.empty()) {
         append_overflowed();
         notify_handler_unlock();
       } else {
@@ -286,20 +178,23 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
       return;
     }
 
-    ceph_assert(iter != m_in_flight_appends.end());
     append_buffers.swap(iter->second);
     ceph_assert(!append_buffers.empty());
 
+    for (auto& append_buffer : append_buffers) {
+      m_object_bytes += append_buffer.second.length();
+    }
+    ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
+
     m_in_flight_appends.erase(iter);
     m_in_flight_flushes = true;
     m_lock->Unlock();
   }
 
   // Flag the associated futures as complete.
-  for (AppendBuffers::iterator buf_it = append_buffers.begin();
-       buf_it != append_buffers.end(); ++buf_it) {
-    ldout(m_cct, 20) << *buf_it->first << " marked safe" << dendl;
-    buf_it->first->safe(r);
+  for (auto& append_buffer : append_buffers) {
+    ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
+    append_buffer.first->safe(r);
   }
 
   // wake up any flush requests that raced with a RADOS callback
@@ -307,26 +202,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
   m_in_flight_flushes = false;
   m_in_flight_flushes_cond.Signal();
 
-  if (!m_aio_scheduled) {
-    if (m_in_flight_appends.empty() &&
-        (m_object_closed || m_aio_sent_size >= m_soft_max_size)) {
-      if (m_aio_sent_size >= m_soft_max_size) {
-        ldout(m_cct, 20) << " soft max size reached, notifying overflow"
-                         << dendl;
-        m_overflowed = true;
-      }
-      // all remaining unsent appends should be redirected to new object
-      m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+  if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
+    // all remaining unsent appends should be redirected to new object
+    notify_handler_unlock();
+  } else {
+    bool overflowed = send_appends(false, {});
+    if (overflowed) {
       notify_handler_unlock();
-    } else if (!m_pending_buffers.empty()) {
-      m_aio_scheduled = true;
-      m_lock->Unlock();
-      send_appends_aio();
     } else {
       m_lock->Unlock();
     }
-  } else {
-    m_lock->Unlock();
   }
 }
 
@@ -336,8 +221,6 @@ void ObjectRecorder::append_overflowed() {
   ceph_assert(m_lock->is_locked());
   ceph_assert(!m_in_flight_appends.empty());
 
-  cancel_append_task();
-
   InFlightAppends in_flight_appends;
   in_flight_appends.swap(m_in_flight_appends);
 
@@ -349,93 +232,109 @@ void ObjectRecorder::append_overflowed() {
   }
 
   restart_append_buffers.splice(restart_append_buffers.end(),
-                                m_append_buffers,
-                                m_append_buffers.begin(),
-                                m_append_buffers.end());
-  restart_append_buffers.swap(m_append_buffers);
-
-  for (AppendBuffers::const_iterator it = m_append_buffers.begin();
-       it != m_append_buffers.end(); ++it) {
-    ldout(m_cct, 20) << "overflowed " << *it->first << dendl;
-    it->first->detach();
-  }
+                                m_pending_buffers,
+                                m_pending_buffers.begin(),
+                                m_pending_buffers.end());
+  restart_append_buffers.swap(m_pending_buffers);
 }
 
-void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
+bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
   ldout(m_cct, 20) << dendl;
-  ceph_assert(m_lock->is_locked());
-  ceph_assert(!append_buffers->empty());
 
-  for (AppendBuffers::iterator it = append_buffers->begin();
-       it != append_buffers->end(); ++it) {
-    ldout(m_cct, 20) << "flushing " << *it->first << dendl;
-    it->first->set_flush_in_progress();
-    m_size += it->second.length();
+  ceph_assert(m_lock->is_locked());
+  if (m_object_closed || m_overflowed) {
+    ldout(m_cct, 20) << "already closed or overflowed" << dendl;
+    return false;
   }
 
-  m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
-                           append_buffers->begin(), append_buffers->end());
-  if (!m_aio_scheduled) {
-    m_op_work_queue->queue(new FunctionContext([this] (int r) {
-        send_appends_aio();
-    }));
-    m_aio_scheduled = true;
+  if (m_pending_buffers.empty()) {
+    ldout(m_cct, 20) << "append buffers empty" << dendl;
+    return false;
   }
-}
 
-void ObjectRecorder::send_appends_aio() {
-  ldout(m_cct, 20) << dendl;
-  librados::AioCompletion *rados_completion;
-  {
-    Mutex::Locker locker(*m_lock);
-    m_aio_scheduled = false;
+  if (!force &&
+      ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
+       (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
+       (m_flush_age > 0 &&
+        m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
+    ldout(m_cct, 20) << "forcing batch flush" << dendl;
+    force = true;
+  }
 
-    if (m_pending_buffers.empty()) {
-      ldout(m_cct, 10) << "pending buffers empty" << dendl;
-      return;
+  auto max_in_flight_appends = m_max_in_flight_appends;
+  if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
+    if (!force && max_in_flight_appends == 0) {
+      ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
+      max_in_flight_appends = 1;
     }
+  } else if (max_in_flight_appends < 0) {
+    max_in_flight_appends = 0;
+  }
 
-    if (m_max_in_flight_appends != 0 &&
-        m_in_flight_tids.size() >= m_max_in_flight_appends) {
-      ldout(m_cct, 10) << "max in flight appends reached" << dendl;
-      return;
+  if (!force && max_in_flight_appends != 0 &&
+      static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
+    ldout(m_cct, 10) << "max in flight appends reached" << dendl;
+    return false;
+  }
+
+  librados::ObjectWriteOperation op;
+  client::guard_append(&op, m_soft_max_size);
+
+  size_t append_bytes = 0;
+  AppendBuffers append_buffers;
+  for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
+    auto& future = it->first;
+    auto& bl = it->second;
+    auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
+    if (size == m_soft_max_size) {
+      ldout(m_cct, 10) << "object at capacity " << *future << dendl;
+      m_overflowed = true;
+    } else if (size > m_soft_max_size) {
+      ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
+      m_overflowed = true;
+      break;
     }
 
-    if (m_aio_sent_size >= m_soft_max_size) {
-      ldout(m_cct, 10) << "soft max size reached" << dendl;
-      return;
+    bool flush_break = (force && flush_future && flush_future == future);
+    ldout(m_cct, 20) << "flushing " << *future << dendl;
+    future->set_flush_in_progress();
+
+    op.append(bl);
+    op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+    append_bytes += bl.length();
+    append_buffers.push_back(*it);
+    it = m_pending_buffers.erase(it);
+
+    if (flush_break) {
+      ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
+      break;
     }
+  }
+
+  if (append_bytes > 0) {
+    m_last_flush_time = ceph_clock_now();
 
     uint64_t append_tid = m_append_tid++;
     m_in_flight_tids.insert(append_tid);
+    m_in_flight_appends[append_tid].swap(append_buffers);
+    m_in_flight_bytes += append_bytes;
 
-    ldout(m_cct, 10) << "flushing journal tid=" << append_tid << dendl;
-
-    librados::ObjectWriteOperation op;
-    client::guard_append(&op, m_soft_max_size);
-    auto append_buffers = &m_in_flight_appends[append_tid];
-
-    size_t append_bytes = 0;
-    for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
-      ldout(m_cct, 20) << "flushing " << *it->first << dendl;
-      op.append(it->second);
-      op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
-      m_aio_sent_size += it->second.length();
-      append_bytes += it->second.length();
-      append_buffers->push_back(*it);
-      it = m_pending_buffers.erase(it);
-      if (m_aio_sent_size >= m_soft_max_size) {
-        break;
-      }
-    }
-    rados_completion = librados::Rados::aio_create_completion(
-        new C_AppendFlush(this, append_tid), nullptr,
-        utils::rados_ctx_callback);
+    ceph_assert(m_pending_bytes >= append_bytes);
+    m_pending_bytes -= append_bytes;
+
+    auto rados_completion = librados::Rados::aio_create_completion(
+      new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
     int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
     ceph_assert(r == 0);
-    ldout(m_cct, 20) << "append_bytes=" << append_bytes << dendl;
+    rados_completion->release();
+    ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
+                     << "append_bytes=" << append_bytes << ", "
+                     << "in_flight_bytes=" << m_in_flight_bytes << ", "
+                     << "pending_bytes=" << m_pending_bytes << dendl;
   }
-  rados_completion->release();
+
+  return m_overflowed;
 }
 
 void ObjectRecorder::notify_handler_unlock() {
index 8d250e5f04230367842450caae86ca7c563cdec4..d7cf6e668a97e88d2e598f897ba3242bc8347552 100644 (file)
@@ -4,6 +4,7 @@
 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
 #define CEPH_JOURNAL_OBJECT_RECORDER_H
 
+#include "include/utime.h"
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include "common/Cond.h"
@@ -39,10 +40,9 @@ public:
 
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                  uint64_t object_number, std::shared_ptr<Mutex> lock,
-                 ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
-                 Handler *handler, uint8_t order, uint32_t flush_interval,
-                 uint64_t flush_bytes, double flush_age,
-                 uint64_t max_in_flight_appends);
+                 ContextWQ *work_queue, Handler *handler, uint8_t order,
+                 uint32_t flush_interval, uint64_t flush_bytes,
+                 double flush_age, int32_t max_in_flight_appends);
   ~ObjectRecorder() override;
 
   inline uint64_t get_object_number() const {
@@ -52,7 +52,7 @@ public:
     return m_oid;
   }
 
-  bool append_unlock(AppendBuffers &&append_buffers);
+  bool append(AppendBuffers &&append_buffers);
   void flush(Context *on_safe);
   void flush(const FutureImplPtr &future);
 
@@ -70,7 +70,7 @@ public:
 
   inline size_t get_pending_appends() const {
     Mutex::Locker locker(*m_lock);
-    return m_append_buffers.size();
+    return m_pending_buffers.size();
   }
 
 private:
@@ -87,7 +87,6 @@ private:
       object_recorder->put();
     }
     void flush(const FutureImplPtr &future) override {
-      Mutex::Locker locker(*(object_recorder->m_lock));
       object_recorder->flush(future);
     }
   };
@@ -111,9 +110,6 @@ private:
 
   ContextWQ *m_op_work_queue;
 
-  SafeTimer &m_timer;
-  Mutex &m_timer_lock;
-
   Handler *m_handler;
 
   uint8_t m_order;
@@ -122,20 +118,20 @@ private:
   uint32_t m_flush_interval;
   uint64_t m_flush_bytes;
   double m_flush_age;
-  uint32_t m_max_in_flight_appends;
+  int32_t m_max_in_flight_appends;
 
   FlushHandler m_flush_handler;
 
-  Context *m_append_task = nullptr;
-
   mutable std::shared_ptr<Mutex> m_lock;
-  AppendBuffers m_append_buffers;
+  AppendBuffers m_pending_buffers;
+  uint64_t m_pending_bytes = 0;
+  utime_t m_last_flush_time;
+
   uint64_t m_append_tid;
-  uint32_t m_pending_bytes;
 
   InFlightTids m_in_flight_tids;
   InFlightAppends m_in_flight_appends;
-  uint64_t m_size;
+  uint64_t m_object_bytes = 0;
   bool m_overflowed;
   bool m_object_closed;
 
@@ -143,21 +139,11 @@ private:
 
   bool m_in_flight_flushes;
   Cond m_in_flight_flushes_cond;
+  uint64_t m_in_flight_bytes = 0;
 
-  AppendBuffers m_pending_buffers;
-  uint64_t m_aio_sent_size = 0;
-  bool m_aio_scheduled;
-
-  void handle_append_task();
-  void cancel_append_task();
-  void schedule_append_task();
-
-  bool append(const AppendBuffer &append_buffer, bool *schedule_append);
-  bool flush_appends(bool force);
+  bool send_appends(bool force, FutureImplPtr flush_sentinal);
   void handle_append_flushed(uint64_t tid, int r);
   void append_overflowed();
-  void send_appends(AppendBuffers *append_buffers);
-  void send_appends_aio();
 
   void notify_handler_unlock();
 };
index 4b887f151703718e840fbe3f4b5186fb6e319b6e..7c3e7e99d166646c9b4716146c43a28db60032f2 100644 (file)
@@ -72,14 +72,12 @@ public:
     RadosTestFixture::TearDown();
   }
 
-  inline void set_flush_interval(uint32_t i) {
-    m_flush_interval = i;
-  }
-  inline void set_flush_bytes(uint64_t i) {
-    m_flush_bytes = i;
-  }
-  inline void set_flush_age(double i) {
-    m_flush_age = i;
+  inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
+                                double flush_age, int max_in_flight) {
+    m_flush_interval = flush_interval;
+    m_flush_bytes = flush_bytes;
+    m_flush_age = flush_age;
+    m_max_in_flight_appends = max_in_flight;
   }
 
   journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
@@ -96,9 +94,8 @@ public:
   journal::ObjectRecorderPtr create_object(const std::string &oid,
                                            uint8_t order, shared_ptr<Mutex> lock) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
-      order, m_flush_interval, m_flush_bytes, m_flush_age,
-      m_max_in_flight_appends));
+      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval,
+      m_flush_bytes, m_flush_age, m_max_in_flight_appends));
     m_object_recorders.push_back(object);
     m_object_recorder_locks.insert(std::make_pair(oid, lock));
     m_handler.object_lock = lock;
@@ -113,6 +110,7 @@ TEST_F(TestObjectRecorder, Append) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
+  set_batch_options(0, 0, 0, 0);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -121,15 +119,17 @@ TEST_F(TestObjectRecorder, Append) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
-  ASSERT_EQ(1U, object->get_pending_appends());
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
+  ASSERT_EQ(0U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
-  ASSERT_EQ(2U, object->get_pending_appends());
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
+  ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
   append_buffer2.first->flush(&cond);
@@ -144,7 +144,7 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_flush_interval(2);
+  set_batch_options(2, 0, 0, -1);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -153,14 +153,16 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -175,7 +177,7 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_flush_bytes(10);
+  set_batch_options(0, 10, 0, -1);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -184,14 +186,16 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(0U, object->get_pending_appends());
 
   C_SaferCond cond;
@@ -206,7 +210,7 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_flush_age(0.1);
+  set_batch_options(0, 0, 0.1, -1);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -215,13 +219,15 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               "payload");
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -245,13 +251,15 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
 
   journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
                                                               payload);
   append_buffers = {append_buffer2};
   lock->Lock();
-  ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_TRUE(object->append(std::move(append_buffers)));
+  lock->Unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -266,6 +274,7 @@ TEST_F(TestObjectRecorder, Flush) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
+  set_batch_options(0, 10, 0, -1);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -274,7 +283,8 @@ TEST_F(TestObjectRecorder, Flush) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond1;
@@ -294,6 +304,7 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
+  set_batch_options(0, 10, 0, -1);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -302,15 +313,13 @@ TEST_F(TestObjectRecorder, FlushFuture) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   C_SaferCond cond;
   append_buffer.first->wait(&cond);
-  lock->Lock();
   object->flush(append_buffer.first);
-  ASSERT_TRUE(lock->is_locked());
-  lock->Unlock();
   ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
               append_buffer.first->is_complete());
   ASSERT_EQ(0, cond.wait());
@@ -332,13 +341,11 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer};
 
-  lock->Lock();
   object->flush(append_buffer.first);
-  ASSERT_TRUE(lock->is_locked());
-  lock->Unlock();
   ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
 
   // should automatically flush once its attached to the object
   C_SaferCond cond;
@@ -353,7 +360,7 @@ TEST_F(TestObjectRecorder, Close) {
   journal::JournalMetadataPtr metadata = create_metadata(oid);
   ASSERT_EQ(0, init_metadata(metadata));
 
-  set_flush_interval(2);
+  set_batch_options(2, 0, 0, -1);
   shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
   journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
 
@@ -362,7 +369,8 @@ TEST_F(TestObjectRecorder, Close) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1};
   lock->Lock();
-  ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object->append(std::move(append_buffers)));
+  lock->Unlock();
   ASSERT_EQ(1U, object->get_pending_appends());
 
   lock->Lock();
@@ -402,7 +410,8 @@ TEST_F(TestObjectRecorder, Overflow) {
   journal::AppendBuffers append_buffers;
   append_buffers = {append_buffer1, append_buffer2};
   lock1->Lock();
-  ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
+  ASSERT_TRUE(object1->append(std::move(append_buffers)));
+  lock1->Unlock();
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -433,7 +442,8 @@ TEST_F(TestObjectRecorder, Overflow) {
                                                               payload);
   append_buffers = {append_buffer3};
   lock2->Lock();
-  ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
+  ASSERT_FALSE(object2->append(std::move(append_buffers)));
+  lock2->Unlock();
   append_buffer3.first->flush(NULL);
 
   overflowed = false;