m_append_task(NULL),
m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
- m_object_closed(false) {
+ m_object_closed(false), m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
assert(m_overflow_handler != NULL);
{
Mutex::Locker locker(m_lock);
+ // if currently handling flush notifications, wait so that
+ // we notify in the correct order (since lock is dropped on
+ // callback)
+ if (m_in_flight_flushes) {
+ m_in_flight_flushes_cond.Wait(m_lock);
+ }
+
// attach the flush to the most recent append
if (!m_append_buffers.empty()) {
future = Future(m_append_buffers.rbegin()->first);
// all remaining unsent appends should be redirected to new object
notify_overflow();
}
+ m_in_flight_flushes = true;
}
// Flag the associated futures as complete.
<< dendl;
buf_it->first->safe(r);
}
+
+ // wake up any flush requests that raced with a RADOS callback
+ Mutex::Locker locker(m_lock);
+ m_in_flight_flushes = false;
+ m_in_flight_flushes_cond.Signal();
}
void ObjectRecorder::append_overflowed(uint64_t tid) {
#include "include/Context.h"
#include "include/rados/librados.hpp"
+#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "journal/FutureImpl.h"
bufferlist m_prefetch_bl;
+ bool m_in_flight_flushes;
+ Cond m_in_flight_flushes_cond;
+
void handle_append_task();
void cancel_append_task();
void schedule_append_task();