AppendBuffers append_buffers;
{
- Mutex::Locker locker(*m_lock);
+ m_lock->Lock();
auto tid_iter = m_in_flight_tids.find(tid);
assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty() && !m_aio_scheduled) {
append_overflowed();
- notify_handler();
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
}
return;
}
assert(!append_buffers.empty());
m_in_flight_appends.erase(iter);
- if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
- // all remaining unsent appends should be redirected to new object
- notify_handler();
- }
m_in_flight_flushes = true;
+ m_lock->Unlock();
}
// Flag the associated futures as complete.
}
// wake up any flush requests that raced with a RADOS callback
- Mutex::Locker locker(*m_lock);
+ m_lock->Lock();
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
+
+ if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
+ }
}
void ObjectRecorder::append_overflowed() {
m_append_buffers.begin(),
m_append_buffers.end());
restart_append_buffers.swap(m_append_buffers);
+
+ for (AppendBuffers::const_iterator it = m_append_buffers.begin();
+ it != m_append_buffers.end(); ++it) {
+ ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
+ << dendl;
+ it->first->detach();
+ }
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
rados_completion->release();
{
- Mutex::Locker locker(*m_lock);
+ m_lock->Lock();
if (m_pending_buffers.empty()) {
m_aio_scheduled = false;
if (m_in_flight_appends.empty() && m_object_closed) {
// all remaining unsent appends should be redirected to new object
- notify_handler();
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
}
} else {
// additional pending items -- reschedule
m_op_work_queue->queue(new FunctionContext([this] (int r) {
send_appends_aio();
}));
+ m_lock->Unlock();
}
}
gather_ctx->activate();
}
-void ObjectRecorder::notify_handler() {
+void ObjectRecorder::notify_handler_unlock() {
assert(m_lock->is_locked());
-
- for (AppendBuffers::const_iterator it = m_append_buffers.begin();
- it != m_append_buffers.end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
- << dendl;
- it->first->detach();
- }
-
if (m_object_closed) {
m_lock->Unlock();
m_handler->closed(this);
- m_lock->Lock();
} else {
// TODO need to delay completion until after aio_notify completes
m_lock->Unlock();
m_handler->overflow(this);
- m_lock->Lock();
}
}