]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: new ObjectRecorder closed callback
authorJason Dillaman <dillaman@redhat.com>
Sat, 14 May 2016 22:13:38 +0000 (18:13 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 20 May 2016 00:28:57 +0000 (20:28 -0400)
The callback will be invoked if there were in-flight appends
when the close was requested.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit bba91437dbe3b7a9b6da8a61ccc4c597858c8efc)

src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/test/journal/test_ObjectRecorder.cc

index f78f0c82c497c02d2ffbfb24b4fe30bbe9986d7e..02836b87c861907c1731a47344b9643259564b05 100644 (file)
@@ -49,7 +49,7 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
     m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
     m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
-    m_overflow_handler(this), m_lock("JournalerRecorder::m_lock"),
+    m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
     m_current_set(m_journal_metadata->get_active_set()) {
 
   Mutex::Locker locker(m_lock);
@@ -151,7 +151,7 @@ void JournalRecorder::close_object_set(uint64_t object_set) {
     ObjectRecorderPtr object_recorder = it->second;
     if (object_recorder != NULL &&
         object_recorder->get_object_number() / splay_width == m_current_set) {
-      if (object_recorder->close_object()) {
+      if (object_recorder->close()) {
         // no in-flight ops, immediately create new recorder
         create_next_object_recorder(object_recorder);
       }
@@ -164,7 +164,7 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
     object_number, m_journal_metadata->get_timer(),
-    m_journal_metadata->get_timer_lock(), &m_overflow_handler,
+    m_journal_metadata->get_timer_lock(), &m_object_handler,
     m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
     m_flush_age));
   return object_recorder;
@@ -197,6 +197,10 @@ void JournalRecorder::handle_update() {
   }
 }
 
+void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
+  // TODO
+}
+
 void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
   ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
 
index be92298a4838396f32acd2eb282fc61434172868..dbd289883d46c0f1fcd56ba04cd0cec5d9977f64 100644 (file)
@@ -47,12 +47,16 @@ private:
     }
   };
 
-  struct OverflowHandler : public ObjectRecorder::OverflowHandler {
+  struct ObjectHandler : public ObjectRecorder::Handler {
     JournalRecorder *journal_recorder;
 
-    OverflowHandler(JournalRecorder *_journal_recorder)
-      : journal_recorder(_journal_recorder) {}
+    ObjectHandler(JournalRecorder *_journal_recorder)
+      : journal_recorder(_journal_recorder) {
+    }
 
+    virtual void closed(ObjectRecorder *object_recorder) {
+      journal_recorder->handle_closed(object_recorder);
+    }
     virtual void overflow(ObjectRecorder *object_recorder) {
       journal_recorder->handle_overflow(object_recorder);
     }
@@ -69,7 +73,7 @@ private:
   double m_flush_age;
 
   Listener m_listener;
-  OverflowHandler m_overflow_handler;
+  ObjectHandler m_object_handler;
 
   Mutex m_lock;
 
@@ -83,6 +87,8 @@ private:
   void create_next_object_recorder(ObjectRecorderPtr object_recorder);
 
   void handle_update();
+
+  void handle_closed(ObjectRecorder *object_recorder);
   void handle_overflow(ObjectRecorder *object_recorder);
 };
 
index 78b51a2b4d1ccbe90ebd1a5a479dc53807ed2dba..5972d899b135585791c1be6e78b0015142189dbc 100644 (file)
@@ -19,21 +19,20 @@ namespace journal {
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                                uint64_t object_number,
                                SafeTimer &timer, Mutex &timer_lock,
-                               OverflowHandler *overflow_handler, uint8_t order,
+                               Handler *handler, uint8_t order,
                                uint32_t flush_interval, uint64_t flush_bytes,
                                double flush_age)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
     m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
-    m_overflow_handler(overflow_handler), m_order(order),
-    m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
-    m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
-    m_append_task(NULL),
+    m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
+    m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
+    m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
     m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
     m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
     m_object_closed(false), m_in_flight_flushes(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
-  assert(m_overflow_handler != NULL);
+  assert(m_handler != NULL);
 }
 
 ObjectRecorder::~ObjectRecorder() {
@@ -151,15 +150,17 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
                          m_append_buffers.begin(), m_append_buffers.end());
 }
 
-bool ObjectRecorder::close_object() {
+bool ObjectRecorder::close() {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
   cancel_append_task();
 
   Mutex::Locker locker(m_lock);
-  m_object_closed = true;
   flush_appends(true);
-  return m_in_flight_appends.empty();
+
+  assert(!m_object_closed);
+  m_object_closed = true;
+  return m_in_flight_tids.empty();
 }
 
 void ObjectRecorder::handle_append_task() {
@@ -248,7 +249,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
 
       // notify of overflow once all in-flight ops are complete
       if (m_in_flight_tids.empty()) {
-        notify_overflow();
+        notify_handler();
       }
       return;
     }
@@ -260,7 +261,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
     m_in_flight_appends.erase(iter);
     if (m_in_flight_appends.empty() && m_object_closed) {
       // all remaining unsent appends should be redirected to new object
-      notify_overflow();
+      notify_handler();
     }
     m_in_flight_flushes = true;
   }
@@ -338,7 +339,7 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
   rados_completion->release();
 }
 
-void ObjectRecorder::notify_overflow() {
+void ObjectRecorder::notify_handler() {
   assert(m_lock.is_locked());
 
   for (AppendBuffers::const_iterator it = m_append_buffers.begin();
@@ -348,10 +349,16 @@ void ObjectRecorder::notify_overflow() {
     it->first->detach();
   }
 
-  // TODO need to delay completion until after aio_notify completes
-  m_lock.Unlock();
-  m_overflow_handler->overflow(this);
-  m_lock.Lock();
+  if (m_object_closed) {
+    m_lock.Unlock();
+    m_handler->closed(this);
+    m_lock.Lock();
+  } else {
+    // TODO need to delay completion until after aio_notify completes
+    m_lock.Unlock();
+    m_handler->overflow(this);
+    m_lock.Lock();
+  }
 }
 
 } // namespace journal
index 378ab64df8c660355325076240f22f598ed18a99..53f8cc9ad0dbbb852535cc86be013b1fbaa55b2d 100644 (file)
@@ -29,16 +29,17 @@ typedef std::list<AppendBuffer> AppendBuffers;
 
 class ObjectRecorder : public RefCountedObject, boost::noncopyable {
 public:
-  struct OverflowHandler {
-    virtual ~OverflowHandler() {}
+  struct Handler {
+    virtual ~Handler() {
+    }
+    virtual void closed(ObjectRecorder *object_recorder) = 0;
     virtual void overflow(ObjectRecorder *object_recorder) = 0;
   };
 
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                  uint64_t object_number, SafeTimer &timer, Mutex &timer_lock,
-                 OverflowHandler *overflow_handler, uint8_t order,
-                 uint32_t flush_interval, uint64_t flush_bytes,
-                 double flush_age);
+                 Handler *handler, uint8_t order, uint32_t flush_interval,
+                 uint64_t flush_bytes, double flush_age);
   ~ObjectRecorder();
 
   inline uint64_t get_object_number() const {
@@ -53,7 +54,12 @@ public:
   void flush(const FutureImplPtr &future);
 
   void claim_append_buffers(AppendBuffers *append_buffers);
-  bool close_object();
+
+  bool is_closed() const {
+    Mutex::Locker locker(m_lock);
+    return (m_object_closed && m_in_flight_appends.empty());
+  }
+  bool close();
 
   inline CephContext *cct() const {
     return m_cct;
@@ -110,7 +116,7 @@ private:
   SafeTimer &m_timer;
   Mutex &m_timer_lock;
 
-  OverflowHandler *m_overflow_handler;
+  Handler *m_handler;
 
   uint8_t m_order;
   uint64_t m_soft_max_size;
@@ -149,7 +155,7 @@ private:
   void append_overflowed(uint64_t tid);
   void send_appends(AppendBuffers *append_buffers);
 
-  void notify_overflow();
+  void notify_handler();
 };
 
 } // namespace journal
index f26e5261688b70c9200af57742a1e9aeae778c5c..de82d06209517641588e2b8607f7c7786a0c7502 100644 (file)
@@ -19,13 +19,20 @@ public:
   {
   }
 
-  struct OverflowHandler : public journal::ObjectRecorder::OverflowHandler {
+  struct Handler : public journal::ObjectRecorder::Handler {
     Mutex lock;
     Cond cond;
-    uint32_t overflows;
+    bool is_closed = false;
+    uint32_t overflows = 0;
 
-    OverflowHandler() : lock("lock"), overflows(0) {}
+    Handler() : lock("lock") {
+    }
 
+    virtual void closed(journal::ObjectRecorder *object_recorder) {
+      Mutex::Locker locker(lock);
+      is_closed = true;
+      cond.Signal();
+    }
     virtual void overflow(journal::ObjectRecorder *object_recorder) {
       Mutex::Locker locker(lock);
       journal::AppendBuffers append_buffers;
@@ -43,7 +50,7 @@ public:
   uint32_t m_flush_interval;
   uint64_t m_flush_bytes;
   double m_flush_age;
-  OverflowHandler m_overflow_handler;
+  Handler m_handler;
 
   void TearDown() {
     for (ObjectRecorders::iterator it = m_object_recorders.begin();
@@ -81,7 +88,7 @@ public:
   journal::ObjectRecorderPtr create_object(const std::string &oid,
                                            uint8_t order) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_overflow_handler, order,
+      m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_handler, order,
       m_flush_interval, m_flush_bytes, m_flush_age));
     m_object_recorders.push_back(object);
     return object;
@@ -301,6 +308,40 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
   ASSERT_EQ(0, cond.wait());
 }
 
+TEST_F(TestObjectRecorder, Close) {
+  std::string oid = get_temp_oid();
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  set_flush_interval(2);
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+                                                              "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = {append_buffer1};
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(1U, object->get_pending_appends());
+
+  ASSERT_FALSE(object->close());
+
+  {
+    Mutex::Locker locker(m_handler.lock);
+    while (!m_handler.is_closed) {
+      if (m_handler.cond.WaitInterval(
+            reinterpret_cast<CephContext*>(m_ioctx.cct()),
+            m_handler.lock, utime_t(10, 0)) != 0) {
+        break;
+      }
+    }
+  }
+
+  ASSERT_TRUE(m_handler.is_closed);
+  ASSERT_EQ(0U, object->get_pending_appends());
+}
+
 TEST_F(TestObjectRecorder, Overflow) {
   std::string oid = get_temp_oid();
   ASSERT_EQ(0, create(oid));
@@ -334,15 +375,15 @@ TEST_F(TestObjectRecorder, Overflow) {
 
   bool overflowed = false;
   {
-    Mutex::Locker locker(m_overflow_handler.lock);
-    while (m_overflow_handler.overflows == 0) {
-      if (m_overflow_handler.cond.WaitInterval(
+    Mutex::Locker locker(m_handler.lock);
+    while (m_handler.overflows == 0) {
+      if (m_handler.cond.WaitInterval(
             reinterpret_cast<CephContext*>(m_ioctx.cct()),
-            m_overflow_handler.lock, utime_t(10, 0)) != 0) {
+            m_handler.lock, utime_t(10, 0)) != 0) {
         break;
       }
     }
-    if (m_overflow_handler.overflows != 0) {
+    if (m_handler.overflows != 0) {
       overflowed = true;
     }
   }