m_op_work_queue(work_queue), m_handler(handler),
m_order(order), m_soft_max_size(1 << m_order),
m_max_in_flight_appends(max_in_flight_appends),
- m_lock(lock), m_last_flush_time(ceph_clock_now())
+ m_lock(lock)
{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
// if currently handling flush notifications, wait so that
// we notify in the correct order (since lock is dropped on
// callback)
- if (m_in_flight_flushes) {
- m_in_flight_flushes_cond.wait(locker);
+ while (m_in_flight_callbacks) {
+ m_in_flight_callbacks_cond.wait(locker);
}
// attach the flush to the most recent append
void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) {
ldout(m_cct, 20) << "flushing " << *future << dendl;
- m_lock->lock();
- {
- auto flush_handler = future->get_flush_handler();
- auto my_handler = get_flush_handler();
- if (flush_handler != my_handler) {
- // if we don't own this future, re-issue the flush so that it hits the
- // correct journal object owner
- future->flush();
- m_lock->unlock();
- return;
- } else if (future->is_flush_in_progress()) {
- m_lock->unlock();
- return;
- }
+ std::unique_lock locker{*m_lock};
+ auto flush_handler = future->get_flush_handler();
+ auto my_handler = get_flush_handler();
+ if (flush_handler != my_handler) {
+ // if we don't own this future, re-issue the flush so that it hits the
+ // correct journal object owner
+ future->flush();
+ return;
+ } else if (future->is_flush_in_progress()) {
+ return;
}
- bool overflowed = send_appends(true, future);
- if (overflowed) {
- notify_handler_unlock();
- } else {
- m_lock->unlock();
+ if (!m_object_closed && !m_overflowed && send_appends(true, future)) {
+ m_in_flight_callbacks = true;
+ notify_handler_unlock(locker, true);
}
}
ceph_assert(ceph_mutex_is_locked(*m_lock));
ldout(m_cct, 20) << dendl;
+
send_appends(true, {});
ceph_assert(!m_object_closed);
m_object_closed = true;
- return (m_in_flight_tids.empty() && !m_in_flight_flushes);
+
+ if (!m_in_flight_tids.empty() || m_in_flight_callbacks) {
+ m_object_closed_notify = true;
+ return false;
+ }
+
+ return true;
}
void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
- AppendBuffers append_buffers;
- {
- m_lock->lock();
- auto tid_iter = m_in_flight_tids.find(tid);
- ceph_assert(tid_iter != m_in_flight_tids.end());
- m_in_flight_tids.erase(tid_iter);
+ std::unique_lock locker{*m_lock};
+ m_in_flight_callbacks = true;
- InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
- ceph_assert(iter != m_in_flight_appends.end());
+ auto tid_iter = m_in_flight_tids.find(tid);
+ ceph_assert(tid_iter != m_in_flight_tids.end());
+ m_in_flight_tids.erase(tid_iter);
- if (r == -EOVERFLOW) {
- ldout(m_cct, 10) << "append overflowed" << dendl;
- m_overflowed = true;
+ InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+ ceph_assert(iter != m_in_flight_appends.end());
- // notify of overflow once all in-flight ops are complete
- if (m_in_flight_tids.empty()) {
- append_overflowed();
- notify_handler_unlock();
- } else {
- m_lock->unlock();
- }
- return;
+ bool notify_overflowed = false;
+ AppendBuffers append_buffers;
+ if (r == -EOVERFLOW) {
+ ldout(m_cct, 10) << "append overflowed: "
+ << "idle=" << m_in_flight_tids.empty() << ", "
+ << "previous_overflow=" << m_overflowed << dendl;
+ if (m_in_flight_tids.empty()) {
+ append_overflowed();
}
+ if (!m_object_closed && !m_overflowed) {
+ notify_overflowed = true;
+ }
+ m_overflowed = true;
+ } else {
append_buffers.swap(iter->second);
ceph_assert(!append_buffers.empty());
for (auto& append_buffer : append_buffers) {
- m_object_bytes += append_buffer.second.length();
+ auto length = append_buffer.second.length();
+ m_object_bytes += length;
+
+ ceph_assert(m_in_flight_bytes >= length);
+ m_in_flight_bytes -= length;
}
ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
m_in_flight_appends.erase(iter);
- m_in_flight_flushes = true;
- m_lock->unlock();
}
+ locker.unlock();
// Flag the associated futures as complete.
for (auto& append_buffer : append_buffers) {
append_buffer.first->safe(r);
}
- // wake up any flush requests that raced with a RADOS callback
- m_lock->lock();
- m_in_flight_flushes = false;
- m_in_flight_flushes_cond.notify_all();
-
- if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
- // all remaining unsent appends should be redirected to new object
- notify_handler_unlock();
- } else {
- bool overflowed = send_appends(false, {});
- if (overflowed) {
- notify_handler_unlock();
- } else {
- m_lock->unlock();
- }
+ // attempt to kick off more appends to the object
+ locker.lock();
+ if (!m_object_closed && !m_overflowed && send_appends(false, {})) {
+ notify_overflowed = true;
}
+
+ ldout(m_cct, 20) << "pending tids=" << m_in_flight_tids << dendl;
+
+ // all remaining unsent appends should be redirected to new object
+ notify_handler_unlock(locker, notify_overflowed);
}
void ObjectRecorder::append_overflowed() {
if (!force &&
((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
(m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
- (m_flush_age > 0 &&
- m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
+ (m_flush_age > 0 && !m_last_flush_time.is_zero() &&
+ m_last_flush_time + m_flush_age <= ceph_clock_now()))) {
ldout(m_cct, 20) << "forcing batch flush" << dendl;
force = true;
}
+ // start tracking flush time after the first append event
+ if (m_last_flush_time.is_zero()) {
+ m_last_flush_time = ceph_clock_now();
+ }
+
auto max_in_flight_appends = m_max_in_flight_appends;
if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
if (!force && max_in_flight_appends == 0) {
auto& bl = it->second;
auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
if (size == m_soft_max_size) {
- ldout(m_cct, 10) << "object at capacity " << *future << dendl;
+ ldout(m_cct, 10) << "object at capacity (" << size << ") " << *future << dendl;
m_overflowed = true;
} else if (size > m_soft_max_size) {
- ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
+ ldout(m_cct, 10) << "object beyond capacity (" << size << ") " << *future << dendl;
m_overflowed = true;
break;
}
return m_overflowed;
}
-void ObjectRecorder::notify_handler_unlock() {
+void ObjectRecorder::wake_up_flushes() {
ceph_assert(ceph_mutex_is_locked(*m_lock));
- if (m_object_closed) {
- m_lock->unlock();
- m_handler->closed(this);
- } else {
+ m_in_flight_callbacks = false;
+ m_in_flight_callbacks_cond.notify_all();
+}
+
+void ObjectRecorder::notify_handler_unlock(
+ std::unique_lock<ceph::mutex>& locker, bool notify_overflowed) {
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
+ ceph_assert(m_in_flight_callbacks);
+
+ if (!m_object_closed && notify_overflowed) {
// TODO need to delay completion until after aio_notify completes
- m_lock->unlock();
+ ldout(m_cct, 10) << "overflow" << dendl;
+ ceph_assert(m_overflowed);
+
+ locker.unlock();
m_handler->overflow(this);
+ locker.lock();
+ }
+
+ // wake up blocked flush requests
+ wake_up_flushes();
+
+ // An overflow notification might have blocked a close. A close
+ // notification could lead to the immediate destruction of this object
+ // so the object shouldn't be referenced anymore
+ bool object_closed_notify = false;
+ if (m_in_flight_tids.empty()) {
+ std::swap(object_closed_notify, m_object_closed_notify);
+ }
+ ceph_assert(m_object_closed || !object_closed_notify);
+ locker.unlock();
+
+ if (object_closed_notify) {
+ ldout(m_cct, 10) << "closed" << dendl;
+ m_handler->closed(this);
}
}
Handler m_handler;
};
- journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
+ journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
+ uint64_t entry_tid,
const std::string &payload) {
auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
future->init(ceph::ref_t<journal::FutureImpl>());
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
- ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1);
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
auto object = flusher.create_object(oid, 24, &lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
ASSERT_FALSE(object->append(std::move(append_buffers)));
lock.unlock();
- journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
- "payload");
- append_buffers = {append_buffer2};
- lock.lock();
- ASSERT_FALSE(object->append(std::move(append_buffers)));
- lock.unlock();
+ uint32_t offset = 0;
+ journal::AppendBuffer append_buffer2;
+ while (!append_buffer1.first->is_flush_in_progress() &&
+ !append_buffer1.first->is_complete()) {
+ usleep(1000);
+
+ append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
+ ++offset;
+ append_buffers = {append_buffer2};
+
+ lock.lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock.unlock();
+ }
C_SaferCond cond;
append_buffer2.first->wait(&cond);
ASSERT_EQ(0, init_metadata(metadata));
ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
- ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+ ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
auto object = flusher.create_object(oid, 12, &lock);
std::string payload(2048, '1');
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(0U, object1->get_pending_appends());
- ASSERT_TRUE(flusher.wait_for_overflow());
-
auto object2 = flusher.create_object(oid, 12, &lock2);
journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,