// if currently handling flush notifications, wait so that
// we notify in the correct order (since lock is dropped on
// callback)
- while (m_in_flight_callbacks) {
+ while (m_in_flight_callbacks > 0) {
m_in_flight_callbacks_cond.wait(locker);
}
}
if (!m_object_closed && !m_overflowed && send_appends(true, future)) {
- m_in_flight_callbacks = true;
+ ++m_in_flight_callbacks;
notify_handler_unlock(locker, true);
}
}
ceph_assert(!m_object_closed);
m_object_closed = true;
- if (!m_in_flight_tids.empty() || m_in_flight_callbacks) {
+ if (!m_in_flight_tids.empty() || m_in_flight_callbacks > 0) {
m_object_closed_notify = true;
return false;
}
ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
std::unique_lock locker{*m_lock};
- m_in_flight_callbacks = true;
+ ++m_in_flight_callbacks;
auto tid_iter = m_in_flight_tids.find(tid);
ceph_assert(tid_iter != m_in_flight_tids.end());
ldout(m_cct, 20) << "pending tids=" << m_in_flight_tids << dendl;
- // all remaining unsent appends should be redirected to new object
+ // notify of overflow if one just occurred or indicate that all in-flight
+ // appends have completed on a closed object (or wake up stalled flush
+ // requests that was waiting for this strand to complete).
notify_handler_unlock(locker, notify_overflowed);
}
void ObjectRecorder::wake_up_flushes() {
ceph_assert(ceph_mutex_is_locked(*m_lock));
- m_in_flight_callbacks = false;
- m_in_flight_callbacks_cond.notify_all();
+ --m_in_flight_callbacks;
+ if (m_in_flight_callbacks == 0) {
+ m_in_flight_callbacks_cond.notify_all();
+ }
}
void ObjectRecorder::notify_handler_unlock(
std::unique_lock<ceph::mutex>& locker, bool notify_overflowed) {
ceph_assert(ceph_mutex_is_locked(*m_lock));
- ceph_assert(m_in_flight_callbacks);
+ ceph_assert(m_in_flight_callbacks > 0);
if (!m_object_closed && notify_overflowed) {
// TODO need to delay completion until after aio_notify completes