}
+/**
+ * Like ::trim, but instead of trimming to max_segments, trim all but the latest
+ * segment.
+ */
+int MDLog::trim_all()
+{
+ submit_mutex.Lock();
+
+ dout(10) << __func__ << ": "
+ << segments.size()
+ << "/" << expiring_segments.size()
+ << "/" << expired_segments.size() << dendl;
+
+ map<uint64_t,LogSegment*>::iterator p = segments.begin();
+ for (; (p != segments.end()) && (p->second != peek_current_segment());) {
+ LogSegment *ls = p->second;
+ ++p;
+
+ // Caller should have flushed journaler before calling this
+ if (pending_events.count(ls->seq)) {
+ dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
+ submit_mutex.Unlock();
+ return -EAGAIN;
+ }
+
+ if (expiring_segments.count(ls)) {
+ dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
+ << ", " << ls->num_events << " events" << dendl;
+ } else if (expired_segments.count(ls)) {
+ dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
+ << ", " << ls->num_events << " events" << dendl;
+ } else {
+ assert(expiring_segments.count(ls) == 0);
+ expiring_segments.insert(ls);
+ expiring_events += ls->num_events;
+ submit_mutex.Unlock();
+
+ uint64_t last_seq = ls->seq;
+ try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
+
+ submit_mutex.Lock();
+ p = segments.lower_bound(last_seq + 1);
+ }
+ }
+
+ _trim_expired_segments();
+
+ return 0;
+}
+
+
void MDLog::try_expire(LogSegment *ls, int op_prio)
{
MDSGatherBuilder gather_bld(g_ceph_context);
journaler->write_head(0);
}
+void MDLog::trim_expired_segments()
+{
+ submit_mutex.Lock();
+ _trim_expired_segments();
+}
+
void MDLog::_expired(LogSegment *ls)
{
assert(submit_mutex.is_locked_by_me());
// expired.
expired_segments.insert(ls);
expired_events += ls->num_events;
+
+ // Trigger all waiters
+ for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
+ i != ls->expiry_waiters.end(); ++i) {
+ (*i)->complete(0);
+ }
+ ls->expiry_waiters.clear();
logger->inc(l_mdl_evex, ls->num_events);
logger->inc(l_mdl_segex);
bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format,
ostream& ss)
{
- dout(1) << "asok_command: " << command << dendl;
+ dout(1) << "asok_command: " << command << " (starting...)" << dendl;
Formatter *f = new_formatter(format);
if (!f)
string path;
cmd_getval(g_ceph_context, cmdmap, "path", path);
command_flush_path(f, path);
+ } else if (command == "flush journal") {
+ command_flush_journal(f);
}
f->flush(ss);
delete f;
+
+ dout(1) << "asok_command: " << command << " (complete)" << dendl;
+
return true;
}
f->close_section(); // results
}
+/**
+ * Wrapper around _command_flush_journal that
+ * handles serialization of result
+ */
+void MDS::command_flush_journal(Formatter *f)
+{
+ assert(f != NULL);
+
+ std::stringstream ss;
+ const int r = _command_flush_journal(&ss);
+ f->open_object_section("result");
+ f->dump_string("message", ss.str());
+ f->dump_int("return_code", r);
+ f->close_section();
+}
+
+/**
+ * Implementation of "flush journal" asok command.
+ *
+ * @param ss
+ * Optionally populate with a human readable string describing the
+ * reason for any unexpected return status.
+ */
+int MDS::_command_flush_journal(std::stringstream *ss)
+{
+ assert(ss != NULL);
+
+ Mutex::Locker l(mds_lock);
+
+ // I need to seal off the current segment, and then mark all previous segments
+ // for expiry
+ mdlog->start_new_segment();
+ int r = 0;
+
+ // Flush initially so that all the segments older than our new one
+ // will be elegible for expiry
+ C_SaferCond mdlog_flushed;
+ mdlog->flush();
+ mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
+ mds_lock.Unlock();
+ r = mdlog_flushed.wait();
+ mds_lock.Lock();
+ if (r != 0) {
+ *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
+ return r;
+ }
+
+ // Put all the old log segments into expiring or expired state
+ dout(5) << __func__ << ": beginning segment expiry" << dendl;
+ r = mdlog->trim_all();
+ if (r != 0) {
+ *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
+ return r;
+ }
+
+ // Attach contexts to wait for all expiring segments to expire
+ MDSGatherBuilder expiry_gather(g_ceph_context);
+
+ std::list<C_SaferCond*> expired_ctxs;
+ const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
+ for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
+ i != expiring_segments.end(); ++i) {
+ (*i)->wait_for_expiry(expiry_gather.new_sub());
+ }
+ dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
+ << " segments to expire" << dendl;
+
+ if (expiry_gather.has_subs()) {
+ C_SaferCond cond;
+ expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
+ expiry_gather.activate();
+
+ // Drop mds_lock to allow progress until expiry is complete
+ mds_lock.Unlock();
+ int r = cond.wait();
+ mds_lock.Lock();
+
+ assert(r == 0); // MDLog is not allowed to raise errors via wait_for_expiry
+ }
+
+ dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
+ mdlog->get_journaler()->get_expire_pos() << "/" <<
+ mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+ // Now everyone I'm interested in is expired
+ mdlog->trim_expired_segments();
+
+ dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
+ mdlog->get_journaler()->get_expire_pos() << "/" <<
+ mdlog->get_journaler()->get_trimmed_pos() << dendl;
+
+ // Flush the journal header so that readers will start from after the flushed region
+ C_SaferCond wrote_head;
+ mdlog->get_journaler()->write_head(&wrote_head);
+ mds_lock.Unlock(); // Drop lock to allow messenger dispatch progress
+ r = wrote_head.wait();
+ mds_lock.Lock();
+ if (r != 0) {
+ *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
+ return r;
+ }
+
+ dout(5) << __func__ << ": write_head complete, all done!" << dendl;
+
+ return 0;
+}
+
void MDS::set_up_admin_socket()
{
int r;
asok_hook,
"Enumerate connected CephFS clients");
assert(0 == r);
+ r = admin_socket->register_command("flush journal",
+ "flush journal",
+ asok_hook,
+ "Flush the journal to the backing store");
+ assert(0 == r);
}
void MDS::clean_up_admin_socket()