ObjectRecorder::~ObjectRecorder() {
assert(m_append_task == NULL);
assert(m_append_buffers.empty());
+ assert(m_in_flight_tids.empty());
assert(m_in_flight_appends.empty());
}
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
Mutex::Locker locker(m_lock);
+ assert(m_in_flight_tids.empty());
assert(m_in_flight_appends.empty());
assert(m_object_closed || m_overflowed);
append_buffers->splice(append_buffers->end(), m_append_buffers,
AppendBuffers append_buffers;
{
Mutex::Locker locker(m_lock);
+ auto tid_iter = m_in_flight_tids.find(tid);
+ assert(tid_iter != m_in_flight_tids.end());
+ m_in_flight_tids.erase(tid_iter);
+
InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
- if (iter == m_in_flight_appends.end()) {
- // must have seen an overflow on a previous append op
- assert(m_overflowed);
- return;
- } else if (r == -EOVERFLOW) {
- m_overflowed = true;
- append_overflowed(tid);
+ if (r == -EOVERFLOW || m_overflowed) {
+ if (iter != m_in_flight_appends.end()) {
+ m_overflowed = true;
+ append_overflowed(tid);
+ } else {
+ // must have seen an overflow on a previous append op
+ assert(r == -EOVERFLOW && m_overflowed);
+ }
+
+ // notify of overflow once all in-flight ops are complete
+ if (m_in_flight_tids.empty()) {
+ notify_overflow();
+ }
return;
}
- assert(!m_overflowed || r != 0);
+ assert(iter != m_in_flight_appends.end());
append_buffers.swap(iter->second);
assert(!append_buffers.empty());
m_append_buffers.begin(),
m_append_buffers.end());
restart_append_buffers.swap(m_append_buffers);
- notify_overflow();
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
m_size += it->second.length();
}
+ m_in_flight_tids.insert(append_tid);
m_in_flight_appends[append_tid].swap(*append_buffers);
librados::AioCompletion *rados_completion =
#include "common/RefCountedObj.h"
#include "journal/FutureImpl.h"
#include <list>
+#include <map>
+#include <set>
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include "include/assert.h"
}
private:
+ typedef std::set<uint64_t> InFlightTids;
typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
struct FlushHandler : public FutureImpl::FlushHandler {
uint64_t m_append_tid;
uint32_t m_pending_bytes;
+ InFlightTids m_in_flight_tids;
InFlightAppends m_in_flight_appends;
uint64_t m_size;
bool m_overflowed;