// vim: ts=8 sw=2 smarttab
#include "journal/JournalRecorder.h"
+#include "common/errno.h"
#include "journal/Entry.h"
#include "journal/Utils.h"
JournalRecorder::~JournalRecorder() {
m_journal_metadata->remove_listener(&m_listener);
+
+ Mutex::Locker locker(m_lock);
+ assert(m_in_flight_advance_sets == 0);
+ assert(m_in_flight_object_closes == 0);
}
Future JournalRecorder::append(uint64_t tag_tid,
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
- close_object_set(object_ptr->get_object_number() / splay_width);
+ close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
}
-
return Future(future);
}
return object_recoder;
}
-void JournalRecorder::close_object_set(uint64_t object_set) {
+void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
assert(m_lock.is_locked());
- uint8_t splay_width = m_journal_metadata->get_splay_width();
- if (object_set != m_current_set) {
+ // entry overflow from open object
+ if (m_current_set != object_set) {
+ ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
return;
}
+ // we shouldn't overflow upon append if already closed and we
+ // shouldn't receive an overflowed callback if already closed
+ assert(m_in_flight_advance_sets == 0);
+ assert(m_in_flight_object_closes == 0);
+
uint64_t active_set = m_journal_metadata->get_active_set();
- if (active_set < m_current_set + 1) {
- m_journal_metadata->set_active_set(m_current_set + 1);
+ assert(m_current_set == active_set);
+ ++m_current_set;
+ ++m_in_flight_advance_sets;
+
+ ldout(m_cct, 20) << __func__ << ": closing active object set "
+ << object_set << dendl;
+ if (close_object_set(m_current_set)) {
+ advance_object_set();
+ }
+}
+
+void JournalRecorder::advance_object_set() {
+ assert(m_lock.is_locked());
+
+ assert(m_in_flight_object_closes == 0);
+ ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
+ << dendl;
+ m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
+ this));
+}
+
+void JournalRecorder::handle_advance_object_set(int r) {
+ Mutex::Locker locker(m_lock);
+ ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+
+ assert(m_in_flight_advance_sets > 0);
+ --m_in_flight_advance_sets;
+
+ if (r < 0 && r != -ESTALE) {
+ lderr(m_cct) << __func__ << ": failed to advance object set: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+ open_object_set();
+ }
+}
+
+void JournalRecorder::open_object_set() {
+ assert(m_lock.is_locked());
+
+ ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
+ << dendl;
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+ it != m_object_ptrs.end(); ++it) {
+ ObjectRecorderPtr object_recorder = it->second;
+ if (object_recorder->get_object_number() / splay_width != m_current_set) {
+ assert(object_recorder->is_closed());
+
+ // ready to close object and open object in active set
+ create_next_object_recorder(object_recorder);
+ }
}
- m_current_set = m_journal_metadata->get_active_set();
+}
- ldout(m_cct, 10) << __func__ << ": advancing to object set "
- << m_current_set << dendl;
+bool JournalRecorder::close_object_set(uint64_t active_set) {
+ assert(m_lock.is_locked());
// object recorders will invoke overflow handler as they complete
// closing the object to ensure correct order of future appends
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
- if (object_recorder != NULL &&
- object_recorder->get_object_number() / splay_width == m_current_set) {
- if (object_recorder->close()) {
- // no in-flight ops, immediately create new recorder
- create_next_object_recorder(object_recorder);
+ if (object_recorder->get_object_number() / splay_width != active_set) {
+ ldout(m_cct, 10) << __func__ << ": closing object "
+ << object_recorder->get_oid() << dendl;
+ // flush out all queued appends and hold future appends
+ if (!object_recorder->close()) {
+ ++m_in_flight_object_closes;
+ } else {
+ ldout(m_cct, 20) << __func__ << ": object "
+ << object_recorder->get_oid() << " closed" << dendl;
}
}
}
+ return (m_in_flight_object_closes == 0);
}
ObjectRecorderPtr JournalRecorder::create_object_recorder(
ObjectRecorderPtr new_object_recorder = create_object_recorder(
(m_current_set * splay_width) + splay_offset);
+ ldout(m_cct, 10) << __func__ << ": "
+ << "old oid=" << object_recorder->get_oid() << ", "
+ << "new oid=" << new_object_recorder->get_oid() << dendl;
AppendBuffers append_buffers;
object_recorder->claim_append_buffers(&append_buffers);
new_object_recorder->append(append_buffers);
Mutex::Locker locker(m_lock);
uint64_t active_set = m_journal_metadata->get_active_set();
- if (active_set > m_current_set) {
- close_object_set(m_current_set);
+ if (m_current_set < active_set) {
+ // peer journal client advanced the active set
+ ldout(m_cct, 20) << __func__ << ": "
+ << "current_set=" << m_current_set << ", "
+ << "active_set=" << active_set << dendl;
+
+ uint64_t current_set = m_current_set;
+ m_current_set = active_set;
+ if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+ ldout(m_cct, 20) << __func__ << ": closing current object set "
+ << current_set << dendl;
+ if (close_object_set(active_set)) {
+ open_object_set();
+ }
+ }
}
}
void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
- // TODO
+ ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ uint64_t object_number = object_recorder->get_object_number();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+ assert(active_object_recorder->get_object_number() == object_number);
+
+ assert(m_in_flight_object_closes > 0);
+ --m_in_flight_object_closes;
+
+ // object closed after advance active set committed
+ ldout(m_cct, 20) << __func__ << ": object "
+ << active_object_recorder->get_oid() << " closed" << dendl;
+ if (m_in_flight_object_closes == 0) {
+ if (m_in_flight_advance_sets == 0) {
+ // peer forced closing of object set
+ open_object_set();
+ } else {
+ // local overflow advanced object set
+ advance_object_set();
+ }
+ }
}
void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
assert(active_object_recorder->get_object_number() == object_number);
- close_object_set(object_number / splay_width);
- create_next_object_recorder(active_object_recorder);
+ ldout(m_cct, 20) << __func__ << ": object "
+ << active_object_recorder->get_oid() << " overflowed"
+ << dendl;
+ close_and_advance_object_set(object_number / splay_width);
}
} // namespace journal