]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-replay-prep: stream events to the prep file
authorJason Dillaman <dillaman@redhat.com>
Wed, 7 Oct 2015 19:22:25 +0000 (15:22 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 15 Oct 2015 18:27:25 +0000 (14:27 -0400)
Avoid building the entire prep event history in memory before
attempting to write the prepped trace file.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 98f513a113f4e8aff17c83991d2e6f0f3738a9c9)

src/rbd_replay/ios.hpp
src/rbd_replay/rbd-replay-prep.cc

index 7d4153a82e37b60a28318ce17edd909852225068..73e6f7674592a179f3a1d8396ed8f3b520961147 100644 (file)
@@ -23,6 +23,7 @@
 #include <iostream>
 #include <map>
 #include <set>
+#include <vector>
 #include "actions.hpp"
 #include "Ser.hpp"
 
@@ -43,8 +44,7 @@ typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t;
 class IO : public boost::enable_shared_from_this<IO> {
 public:
   typedef boost::shared_ptr<IO> ptr;
-
-  typedef boost::weak_ptr<IO> weak_ptr;
+  typedef std::vector<ptr> ptrs;
 
   /**
      @param ionum ID of this %IO
index 0a108759008f8329300a9d481de55b8edfdd7ec7..80a572cdc1661586ce30b6f82ba1f8a36f4a6960 100644 (file)
@@ -187,6 +187,13 @@ public:
 
     struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
 
+    ofstream myfile;
+    myfile.open(output_file_name.c_str(), ios::out | ios::binary | ios::trunc);
+    ASSERT_EXIT(!myfile.fail(), "Error opening output file " <<
+                                output_file_name);
+
+    Ser ser(myfile);
+
     uint64_t trace_start = 0;
     bool first = true;
     while(true) {
@@ -204,7 +211,9 @@ public:
       ts -= trace_start;
       ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is.
 
-      process_event(ts, evt);
+      IO::ptrs ptrs;
+      process_event(ts, evt, &ptrs);
+      serialize_events(ser, ptrs);
 
       int r = bt_iter_next(bt_itr);
       ASSERT_EXIT(r == 0, "Error advancing event iterator");
@@ -212,53 +221,31 @@ public:
 
     bt_ctf_iter_destroy(itr);
 
-    insert_thread_stops();
-
-    ofstream myfile;
-    myfile.open(output_file_name.c_str(), ios::out | ios::binary);
-    Ser ser(myfile);
-    for (vector<IO::ptr>::iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) {
-      (*itr)->write_to(ser);
-    }
+    insert_thread_stops(ser);
     myfile.close();
   }
 
 private:
-  void insert_thread_stops() {
-    sort(m_ios.begin(), m_ios.end(), compare_io_ptrs_by_start_time);
-    for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(), end = m_threads.end(); itr != end; ++itr) {
+  void serialize_events(Ser &ser, const IO::ptrs &ptrs) {
+    for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
+      (*it)->write_to(ser);
+    }
+  }
+
+  void insert_thread_stops(Ser &ser) {
+    IO::ptrs ios;
+    for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
+         end = m_threads.end(); itr != end; ++itr) {
       Thread::ptr thread(itr->second);
-      const action_id_t none = -1;
-      action_id_t ionum = none;
-      action_id_t maxIONum = 0; // only valid if ionum is none
-      for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) {
-       IO::ptr io(*itr2);
-       if (io->ionum() > maxIONum) {
-         maxIONum = io->ionum();
-       }
-       if (io->start_time() > thread->max_ts()) {
-         ionum = io->ionum();
-         break;
-       }
-      }
-      if (ionum == none) {
-       ionum = maxIONum + 2;
-      }
-      for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) {
-       IO::ptr io(*itr2);
-       if (io->ionum() >= ionum) {
-         io->set_ionum(io->ionum() + 2);
-       }
-      }
-      IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(),
-                                              thread->id(),
-                                              m_recent_completions));
-      vector<IO::ptr>::iterator insertion_point = lower_bound(m_ios.begin(), m_ios.end(), stop_thread_io, compare_io_ptrs_by_start_time);
-      m_ios.insert(insertion_point, stop_thread_io);
+      ios.push_back(IO::ptr(new StopThreadIO(next_id(), thread->max_ts(),
+                                             thread->id(),
+                                             m_recent_completions)));
     }
+    serialize_events(ser, ios);
   }
 
-  void process_event(uint64_t ts, struct bt_ctf_event *evt) {
+  void process_event(uint64_t ts, struct bt_ctf_event *evt,
+                     IO::ptrs *ios) {
     const char *event_name = bt_ctf_event_name(evt);
     const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt,
                                                                           BT_STREAM_EVENT_CONTEXT);
@@ -276,7 +263,7 @@ private:
     if (!thread) {
       thread.reset(new Thread(threadID, m_window));
       IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID));
-      m_ios.push_back(io);
+      ios->push_back(io);
     }
     thread->insert_ts(ts);
 
@@ -337,7 +324,7 @@ private:
                                  imagectx, aname.first, aname.second,
                                  readonly));
       thread->issued_io(io, &m_latest_ios);
-      m_ios.push_back(io);
+      ios->push_back(io);
     } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
       completed(thread->latest_io());
       boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
@@ -349,7 +336,7 @@ private:
       IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
                                   imagectx));
       thread->issued_io(io, &m_latest_ios);
-      m_ios.push_back(thread->latest_io());
+      ios->push_back(thread->latest_io());
     } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
       completed(thread->latest_io());
       boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
@@ -363,12 +350,12 @@ private:
       imagectx_id_t imagectx = fields.uint64("imagectx");
       uint64_t offset = fields.uint64("offset");
       uint64_t length = fields.uint64("length");
-      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
       action_id_t ionum = next_id();
       IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
                             offset, length));
       thread->issued_io(io, &m_latest_ios);
-      m_ios.push_back(io);
+      ios->push_back(io);
     } else if (strcmp(event_name, "librbd:read_exit") == 0) {
       completed(thread->latest_io());
     } else if (strcmp(event_name, "librbd:write_enter") == 0 ||
@@ -379,12 +366,12 @@ private:
       uint64_t offset = fields.uint64("off");
       uint64_t length = fields.uint64("buf_len");
       imagectx_id_t imagectx = fields.uint64("imagectx");
-      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
       action_id_t ionum = next_id();
       IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
                              imagectx, offset, length));
       thread->issued_io(io, &m_latest_ios);
-      m_ios.push_back(io);
+      ios->push_back(io);
     } else if (strcmp(event_name, "librbd:write_exit") == 0) {
       completed(thread->latest_io());
     } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
@@ -396,11 +383,11 @@ private:
       imagectx_id_t imagectx = fields.uint64("imagectx");
       uint64_t offset = fields.uint64("offset");
       uint64_t length = fields.uint64("length");
-      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
       action_id_t ionum = next_id();
       IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
                                imagectx, offset, length));
-      m_ios.push_back(io);
+      ios->push_back(io);
       thread->issued_io(io, &m_latest_ios);
       m_pending_ios[completion] = io;
     } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
@@ -412,12 +399,12 @@ private:
       uint64_t length = fields.uint64("len");
       uint64_t completion = fields.uint64("completion");
       imagectx_id_t imagectx = fields.uint64("imagectx");
-      require_image(ts, thread, imagectx, name, snap_name, readonly);
+      require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
       action_id_t ionum = next_id();
       IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
                                 imagectx, offset, length));
       thread->issued_io(io, &m_latest_ios);
-      m_ios.push_back(io);
+      ios->push_back(io);
       m_pending_ios[completion] = io;
     } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
       uint64_t completion = fields.uint64("completion");
@@ -471,7 +458,8 @@ private:
                     imagectx_id_t imagectx,
                     const string& name,
                     const string& snap_name,
-                    bool readonly) {
+                    bool readonly,
+                     IO::ptrs *ios) {
     assert(thread);
     if (m_open_images.count(imagectx) > 0) {
       return;
@@ -482,7 +470,7 @@ private:
                                m_recent_completions, imagectx, aname.first,
                                aname.second, readonly));
     thread->issued_io(io, &m_latest_ios);
-    m_ios.push_back(io);
+    ios->push_back(io);
     completed(io);
     m_open_images.insert(imagectx);
   }
@@ -492,7 +480,6 @@ private:
   uint32_t m_io_count;
   io_set_t m_recent_completions;
   set<imagectx_id_t> m_open_images;
-  vector<IO::ptr> m_ios;
 
   // keyed by completion
   map<uint64_t, IO::ptr> m_pending_ios;