]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: implement "flush journal" asok 2855/head
authorJohn Spray <john.spray@redhat.com>
Fri, 31 Oct 2014 12:41:18 +0000 (12:41 +0000)
committerJohn Spray <john.spray@redhat.com>
Mon, 1 Dec 2014 12:09:13 +0000 (12:09 +0000)
Start a new segment and flush everything older
to the backing store.

Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/LogSegment.h
src/mds/MDLog.cc
src/mds/MDLog.h
src/mds/MDS.cc
src/mds/MDS.h

index 035cc81a72abc1570bc406efd50506204ecec253..6626afe73d3daa1888ed06264b435e23d29310cc 100644 (file)
@@ -71,6 +71,14 @@ class LogSegment {
   // try to expire
   void try_to_expire(MDS *mds, MDSGatherBuilder &gather_bld, int op_prio);
 
+  std::list<MDSInternalContextBase*> expiry_waiters;
+
+  void wait_for_expiry(MDSInternalContextBase *c)
+  {
+    assert(c != NULL);
+    expiry_waiters.push_back(c);
+  }
+
   // cons
   LogSegment(uint64_t _seq, loff_t off=-1) :
     seq(_seq), offset(off), end(off), num_events(0),
index 60c84e4f941f41180f8484ab5e93462d25664d85..456e97d551c76dcb7713160ab329bb08af214001 100644 (file)
@@ -562,6 +562,57 @@ void MDLog::trim(int m)
 }
 
 
+/**
+ * Like ::trim, but instead of trimming to max_segments, trim all but the latest
+ * segment.
+ */
+int MDLog::trim_all()
+{
+  submit_mutex.Lock();
+
+  dout(10) << __func__ << ": "
+          << segments.size()
+           << "/" << expiring_segments.size()
+           << "/" << expired_segments.size() << dendl;
+
+  map<uint64_t,LogSegment*>::iterator p = segments.begin();
+  for (; (p != segments.end()) && (p->second != peek_current_segment());) {
+    LogSegment *ls = p->second;
+    ++p;
+
+    // Caller should have flushed journaler before calling this
+    if (pending_events.count(ls->seq)) {
+      dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
+      submit_mutex.Unlock();
+      return -EAGAIN;
+    }
+
+    if (expiring_segments.count(ls)) {
+      dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
+             << ", " << ls->num_events << " events" << dendl;
+    } else if (expired_segments.count(ls)) {
+      dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
+             << ", " << ls->num_events << " events" << dendl;
+    } else {
+      assert(expiring_segments.count(ls) == 0);
+      expiring_segments.insert(ls);
+      expiring_events += ls->num_events;
+      submit_mutex.Unlock();
+
+      uint64_t last_seq = ls->seq;
+      try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
+
+      submit_mutex.Lock();
+      p = segments.lower_bound(last_seq + 1);
+    }
+  }
+
+  _trim_expired_segments();
+
+  return 0;
+}
+
+
 void MDLog::try_expire(LogSegment *ls, int op_prio)
 {
   MDSGatherBuilder gather_bld(g_ceph_context);
@@ -632,6 +683,12 @@ void MDLog::_trim_expired_segments()
     journaler->write_head(0);
 }
 
+void MDLog::trim_expired_segments()
+{
+  submit_mutex.Lock();
+  _trim_expired_segments();
+}
+
 void MDLog::_expired(LogSegment *ls)
 {
   assert(submit_mutex.is_locked_by_me());
@@ -646,6 +703,13 @@ void MDLog::_expired(LogSegment *ls)
     // expired.
     expired_segments.insert(ls);
     expired_events += ls->num_events;
+
+    // Trigger all waiters
+    for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
+        i != ls->expiry_waiters.end(); ++i) {
+      (*i)->complete(0);
+    }
+    ls->expiry_waiters.clear();
     
     logger->inc(l_mdl_evex, ls->num_events);
     logger->inc(l_mdl_segex);
index 2d00ed21380a778a3dbb82e94d6506a092afedb2..601148dc912da68e222f8b3ce3ce42fa4fad2c17 100644 (file)
@@ -146,6 +146,13 @@ protected:
   } submit_thread;
   friend class SubmitThread;
 
+public:
+  const std::set<LogSegment*> &get_expiring_segments() const
+  {
+    return expiring_segments;
+  }
+protected:
+
   // -- subtreemaps --
   friend class ESubtreeMap;
   friend class MDCache;
@@ -294,7 +301,13 @@ private:
   void _trim_expired_segments();
 
 public:
+  void trim_expired_segments();
   void trim(int max=-1);
+  int trim_all();
+  bool expiry_done() const
+  {
+    return expiring_segments.empty() && expired_segments.empty();
+  };
 
 private:
   void write_head(MDSInternalContextBase *onfinish);
index a4d27945e2f985f3ca4bba0a94ade64d80f6f7f7..6060014d106d05ea147bfd22e322fbf8b0304f04 100644 (file)
@@ -217,7 +217,7 @@ public:
 bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format,
                    ostream& ss)
 {
-  dout(1) << "asok_command: " << command << dendl;
+  dout(1) << "asok_command: " << command << " (starting...)" << dendl;
 
   Formatter *f = new_formatter(format);
   if (!f)
@@ -297,9 +297,14 @@ bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format,
     string path;
     cmd_getval(g_ceph_context, cmdmap, "path", path);
     command_flush_path(f, path);
+  } else if (command == "flush journal") {
+    command_flush_journal(f);
   }
   f->flush(ss);
   delete f;
+
+  dout(1) << "asok_command: " << command << " (complete)" << dendl;
+
   return true;
 }
 
@@ -327,6 +332,113 @@ void MDS::command_flush_path(Formatter *f, const string& path)
   f->close_section(); // results
 }
 
+/**
+ * Wrapper around _command_flush_journal that
+ * handles serialization of result
+ */
+void MDS::command_flush_journal(Formatter *f)
+{
+  assert(f != NULL);
+
+  std::stringstream ss;
+  const int r = _command_flush_journal(&ss);
+  f->open_object_section("result");
+  f->dump_string("message", ss.str());
+  f->dump_int("return_code", r);
+  f->close_section();
+}
+
+/**
+ * Implementation of "flush journal" asok command.
+ *
+ * @param ss
+ * Optionally populate with a human readable string describing the
+ * reason for any unexpected return status.
+ */
+int MDS::_command_flush_journal(std::stringstream *ss)
+{
+  assert(ss != NULL);
+
+  Mutex::Locker l(mds_lock);
+
+  // I need to seal off the current segment, and then mark all previous segments
+  // for expiry
+  mdlog->start_new_segment();
+  int r = 0;
+
+  // Flush initially so that all the segments older than our new one
+  // will be elegible for expiry
+  C_SaferCond mdlog_flushed;
+  mdlog->flush();
+  mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
+  mds_lock.Unlock();
+  r = mdlog_flushed.wait();
+  mds_lock.Lock();
+  if (r != 0) {
+    *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
+    return r;
+  }
+
+  // Put all the old log segments into expiring or expired state
+  dout(5) << __func__ << ": beginning segment expiry" << dendl;
+  r = mdlog->trim_all();
+  if (r != 0) {
+    *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
+    return r;
+  }
+
+  // Attach contexts to wait for all expiring segments to expire
+  MDSGatherBuilder expiry_gather(g_ceph_context);
+
+  std::list<C_SaferCond*> expired_ctxs;
+  const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
+  for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
+       i != expiring_segments.end(); ++i) {
+    (*i)->wait_for_expiry(expiry_gather.new_sub());
+  }
+  dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
+          << " segments to expire" << dendl;
+
+  if (expiry_gather.has_subs()) {
+    C_SaferCond cond;
+    expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
+    expiry_gather.activate();
+
+    // Drop mds_lock to allow progress until expiry is complete
+    mds_lock.Unlock();
+    int r = cond.wait();
+    mds_lock.Lock();
+
+    assert(r == 0);  // MDLog is not allowed to raise errors via wait_for_expiry
+  }
+
+  dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
+    mdlog->get_journaler()->get_expire_pos() << "/" <<
+    mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+  // Now everyone I'm interested in is expired
+  mdlog->trim_expired_segments();
+
+  dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
+    mdlog->get_journaler()->get_expire_pos() << "/" <<
+    mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+  // Flush the journal header so that readers will start from after the flushed region
+  C_SaferCond wrote_head;
+  mdlog->get_journaler()->write_head(&wrote_head);
+  mds_lock.Unlock();  // Drop lock to allow messenger dispatch progress
+  r = wrote_head.wait();
+  mds_lock.Lock();
+  if (r != 0) {
+      *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
+      return r;
+  }
+
+  dout(5) << __func__ << ": write_head complete, all done!" << dendl;
+
+  return 0;
+}
+
 void MDS::set_up_admin_socket()
 {
   int r;
@@ -361,6 +473,11 @@ void MDS::set_up_admin_socket()
                                     asok_hook,
                                     "Enumerate connected CephFS clients");
   assert(0 == r);
+  r = admin_socket->register_command("flush journal",
+                                    "flush journal",
+                                    asok_hook,
+                                    "Flush the journal to the backing store");
+  assert(0 == r);
 }
 
 void MDS::clean_up_admin_socket()
index 3925bbd508ac9c18f2eefc2a9c4e1d482b1d0b04..e74f0428743a040c7bbc0898830b5297ce4cc1dc 100644 (file)
@@ -378,6 +378,10 @@ private:
   void check_ops_in_flight(); // send off any slow ops to monitor
   void command_scrub_path(Formatter *f, const string& path);
   void command_flush_path(Formatter *f, const string& path);
+  void command_flush_journal(Formatter *f);
+ private:
+  int _command_flush_journal(std::stringstream *ss);
+ public:
     // config observer bits
   virtual const char** get_tracked_conf_keys() const;
   virtual void handle_conf_change(const struct md_config_t *conf,