]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: limit in-flight appends
authorMykola Golub <mgolub@suse.com>
Mon, 9 Jul 2018 12:32:13 +0000 (15:32 +0300)
committerMykola Golub <mgolub@suse.com>
Thu, 30 Aug 2018 14:07:38 +0000 (17:07 +0300)
Signed-off-by: Mykola Golub <mgolub@suse.com>
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/Journaler.cc
src/journal/Journaler.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/test/journal/mock/MockJournaler.h
src/test/journal/test_JournalRecorder.cc
src/test/journal/test_ObjectRecorder.cc

index 0cfebe480f4abb89d6aea989cfea8fb4c81d5c3d..6fd5d7fc189f555ad68b36eb650baffa06e0d0ca 100644 (file)
@@ -50,10 +50,12 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
                                  const std::string &object_oid_prefix,
                                  const JournalMetadataPtr& journal_metadata,
                                  uint32_t flush_interval, uint64_t flush_bytes,
-                                 double flush_age)
+                                 double flush_age,
+                                 uint64_t max_in_flight_appends)
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
-    m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
+    m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+    m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
     m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
     m_current_set(m_journal_metadata->get_active_set()) {
 
@@ -253,7 +255,7 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
     object_number, lock, m_journal_metadata->get_work_queue(),
     m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
     &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
-    m_flush_bytes, m_flush_age));
+    m_flush_bytes, m_flush_age, m_max_in_flight_appends));
   return object_recorder;
 }
 
index a16339faddf73d34a6265bc42f4436d825cd325f..93c0e9e5bb7f64ed5612adaef26f09b359d5f685 100644 (file)
@@ -24,7 +24,7 @@ public:
   JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
                   const JournalMetadataPtr &journal_metadata,
                   uint32_t flush_interval, uint64_t flush_bytes,
-                  double flush_age);
+                  double flush_age, uint64_t max_in_flight_appends);
   ~JournalRecorder();
 
   Future append(uint64_t tag_tid, const bufferlist &bl);
@@ -81,6 +81,7 @@ private:
   uint32_t m_flush_interval;
   uint64_t m_flush_bytes;
   double m_flush_age;
+  uint64_t m_max_in_flight_appends;
 
   Listener m_listener;
   ObjectHandler m_object_handler;
index 65d32de3d1e95f93b62cac15693424c438722360..946b8192a296c2cadff8a3e97681acbc27b6701b 100644 (file)
@@ -392,14 +392,14 @@ void Journaler::committed(const Future &future) {
 }
 
 void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
-                            double flush_age) {
+                            double flush_age, uint64_t max_in_flight_appends) {
   ceph_assert(m_recorder == nullptr);
 
   // TODO verify active object set >= current replay object set
 
   m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
                                   m_metadata, flush_interval, flush_bytes,
-                                  flush_age);
+                                  flush_age, max_in_flight_appends);
 }
 
 void Journaler::stop_append(Context *on_safe) {
index bd3529d06a7559d90fb98fe2ebf24a771a4b4f42..c5194ced512496a538b69392fd53205ae99369a2 100644 (file)
@@ -106,7 +106,8 @@ public:
   void stop_replay(Context *on_finish);
 
   uint64_t get_max_append_size() const;
-  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age);
+  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
+                    uint64_t max_in_flight_appends);
   Future append(uint64_t tag_tid, const bufferlist &bl);
   void flush_append(Context *on_safe);
   void stop_append(Context *on_safe);
index b54ba9ce8f35a856cb560762112f5da9ce468e28..bc7b927b8c86a7de7c287eab48512db83b66f23f 100644 (file)
@@ -22,12 +22,14 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                                ContextWQ *work_queue, SafeTimer &timer,
                                Mutex &timer_lock, Handler *handler,
                                uint8_t order, uint32_t flush_interval,
-                               uint64_t flush_bytes, double flush_age)
+                               uint64_t flush_bytes, double flush_age,
+                               uint64_t max_in_flight_appends)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
     m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
     m_timer_lock(timer_lock), m_handler(handler), m_order(order),
     m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
-    m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+    m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+    m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
     m_lock(lock), m_append_tid(0), m_pending_bytes(0),
     m_size(0), m_overflowed(false), m_object_closed(false),
     m_in_flight_flushes(false), m_aio_scheduled(false) {
@@ -99,6 +101,8 @@ void ObjectRecorder::flush(Context *on_safe) {
       future = Future(m_append_buffers.rbegin()->first);
 
       flush_appends(true);
+    } else if (!m_pending_buffers.empty()) {
+      future = Future(m_pending_buffers.rbegin()->first);
     } else if (!m_in_flight_appends.empty()) {
       AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
       ceph_assert(!append_buffers.empty());
@@ -163,7 +167,7 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
 }
 
 bool ObjectRecorder::close() {
-  assert (m_lock->is_locked());
+  ceph_assert(m_lock->is_locked());
 
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
@@ -263,6 +267,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
 
       // notify of overflow once all in-flight ops are complete
       if (m_in_flight_tids.empty() && !m_aio_scheduled) {
+        m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
         append_overflowed();
         notify_handler_unlock();
       } else {
@@ -293,9 +298,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
   m_in_flight_flushes = false;
   m_in_flight_flushes_cond.Signal();
 
-  if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
-    // all remaining unsent appends should be redirected to new object
-    notify_handler_unlock();
+  if (!m_aio_scheduled) {
+    if (m_in_flight_appends.empty() && m_object_closed) {
+      // all remaining unsent appends should be redirected to new object
+      m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+      notify_handler_unlock();
+    } else {
+      m_aio_scheduled = true;
+      m_lock->Unlock();
+      send_appends_aio();
+    }
   } else {
     m_lock->Unlock();
   }
@@ -357,18 +369,32 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
 }
 
 void ObjectRecorder::send_appends_aio() {
-  librados::ObjectWriteOperation op;
-  client::guard_append(&op, m_soft_max_size);
-  C_Gather *gather_ctx;
+  librados::AioCompletion *rados_completion;
   {
     Mutex::Locker locker(*m_lock);
+    m_aio_scheduled = false;
+
+    if (m_pending_buffers.empty()) {
+      ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty"
+                       << dendl;
+      return;
+    }
+
+    if (m_max_in_flight_appends != 0 &&
+        m_in_flight_tids.size() >= m_max_in_flight_appends) {
+      ldout(m_cct, 20) << __func__ << ": " << m_oid
+                       << " max in flight appends reached" << dendl;
+      return;
+    }
+
     uint64_t append_tid = m_append_tid++;
     m_in_flight_tids.insert(append_tid);
 
     ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
                      << append_tid << dendl;
 
-    gather_ctx = new C_Gather(m_cct, new C_AppendFlush(this, append_tid));
+    librados::ObjectWriteOperation op;
+    client::guard_append(&op, m_soft_max_size);
     auto append_buffers = &m_in_flight_appends[append_tid];
 
     for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
@@ -382,36 +408,13 @@ void ObjectRecorder::send_appends_aio() {
         break;
       }
     }
+    rados_completion = librados::Rados::aio_create_completion(
+        new C_AppendFlush(this, append_tid), nullptr,
+        utils::rados_ctx_callback);
+    int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
+    ceph_assert(r == 0);
   }
-
-  librados::AioCompletion *rados_completion =
-    librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
-                                           utils::rados_ctx_callback);
-  int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
-  ceph_assert(r == 0);
   rados_completion->release();
-
-  {
-    m_lock->Lock();
-    if (m_pending_buffers.empty()) {
-      m_aio_scheduled = false;
-      if (m_in_flight_appends.empty() && m_object_closed) {
-        // all remaining unsent appends should be redirected to new object
-        notify_handler_unlock();
-      } else {
-        m_lock->Unlock();
-      }
-    } else {
-      // additional pending items -- reschedule
-      m_op_work_queue->queue(new FunctionContext([this] (int r) {
-          send_appends_aio();
-        }));
-      m_lock->Unlock();
-    }
-  }
-
-  // allow append op to complete
-  gather_ctx->activate();
 }
 
 void ObjectRecorder::notify_handler_unlock() {
index fd4e88ffafca3a35a69292ebea9a43574b462d4b..2b754e5be3fe1e2c62b0704ae0f9453403997cb3 100644 (file)
@@ -41,7 +41,8 @@ public:
                  uint64_t object_number, std::shared_ptr<Mutex> lock,
                  ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
                  Handler *handler, uint8_t order, uint32_t flush_interval,
-                 uint64_t flush_bytes, double flush_age);
+                 uint64_t flush_bytes, double flush_age,
+                 uint64_t max_in_flight_appends);
   ~ObjectRecorder() override;
 
   inline uint64_t get_object_number() const {
@@ -121,6 +122,7 @@ private:
   uint32_t m_flush_interval;
   uint64_t m_flush_bytes;
   double m_flush_age;
+  uint32_t m_max_in_flight_appends;
 
   FlushHandler m_flush_handler;
 
index 8a72537fcc64cd1a132c3d38f7125bafe394edaf..0b159a8efb2dbbb18e6070c96e33993086cce465 100644 (file)
@@ -120,8 +120,7 @@ struct MockJournaler {
   MOCK_METHOD0(stop_replay, void());
   MOCK_METHOD1(stop_replay, void(Context *on_finish));
 
-  MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes,
-                                  double flush_age));
+  MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t));
   MOCK_CONST_METHOD0(get_max_append_size, uint64_t());
   MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
                                        const bufferlist &bl));
@@ -258,9 +257,11 @@ struct MockJournalerProxy {
     MockJournaler::get_instance().stop_replay(on_finish);
   }
 
-  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age) {
+  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
+                    uint64_t max_in_flight_appends) {
     MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
-                                               flush_age);
+                                               flush_age,
+                                               max_in_flight_appends);
   }
 
   uint64_t get_max_append_size() const {
index 59cc3907f8b92372077827441a89dbab027383f2..fb7c06772eccbd7afd38eac28788e1c2405e8062 100644 (file)
@@ -19,10 +19,11 @@ public:
     RadosTestFixture::TearDown();
   }
 
-  journal::JournalRecorder *create_recorder(const std::string &oid,
-                                            const journal::JournalMetadataPtr &metadata) {
+  journal::JournalRecorder *create_recorder(
+      const std::string &oid, const journal::JournalMetadataPtr &metadata) {
     journal::JournalRecorder *recorder(new journal::JournalRecorder(
-      m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(), 0));
+        m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(),
+        0, 0));
     m_recorders.push_back(recorder);
     return recorder;
   }
index ed071c8d56b45ef0faad98e8344bff9fbd93be7c..21c741e5896377462c64327e8976a791ebd8c32c 100644 (file)
@@ -57,6 +57,7 @@ public:
   uint32_t m_flush_interval;
   uint64_t m_flush_bytes;
   double m_flush_age;
+  uint64_t m_max_in_flight_appends = 0;
   Handler m_handler;
 
   void TearDown() override {
@@ -96,7 +97,8 @@ public:
                                            uint8_t order, shared_ptr<Mutex> lock) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
       m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
-      order, m_flush_interval, m_flush_bytes, m_flush_age));
+      order, m_flush_interval, m_flush_bytes, m_flush_age,
+      m_max_in_flight_appends));
     m_object_recorders.push_back(object);
     m_object_recorder_locks.insert(std::make_pair(oid, lock));
     m_handler.object_lock = lock;