}
void FutureImpl::safe(int r) {
- Mutex::Locker locker(m_lock);
+ m_lock.Lock();
assert(!m_safe);
m_safe = true;
if (m_return_value == 0) {
m_flush_handler.reset();
if (m_consistent) {
- finish();
+ finish_unlock();
+ } else {
+ m_lock.Unlock();
}
}
void FutureImpl::consistent(int r) {
- Mutex::Locker locker(m_lock);
+ m_lock.Lock();
assert(!m_consistent);
m_consistent = true;
m_prev_future.reset();
}
if (m_safe) {
- finish();
+ finish_unlock();
+ } else {
+ m_lock.Unlock();
}
}
-void FutureImpl::finish() {
+void FutureImpl::finish_unlock() {
assert(m_lock.is_locked());
assert(m_safe && m_consistent);
it != contexts.end(); ++it) {
(*it)->complete(m_return_value);
}
- m_lock.Lock();
}
std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
Contexts m_contexts;
void consistent(int r);
- void finish();
+ void finish_unlock();
};
void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
void JournalMetadata::flush_commit_position() {
{
+ Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_task_ctx == NULL) {
return;
}
- Mutex::Locker timer_locker(m_timer_lock);
m_timer->cancel_event(m_commit_position_task_ctx);
m_commit_position_task_ctx = NULL;
}
const ObjectSetPosition &commit_position, Context *on_safe) {
assert(on_safe != NULL);
- Mutex::Locker locker(m_lock);
- ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
- << ", new=" << commit_position << dendl;
- if (commit_position <= m_client.commit_position ||
- commit_position <= m_commit_position) {
- on_safe->complete(-ESTALE);
- return;
- }
+ Context *stale_ctx = nullptr;
+ {
+ Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
+ << ", new=" << commit_position << dendl;
+ if (commit_position <= m_client.commit_position ||
+ commit_position <= m_commit_position) {
+ stale_ctx = on_safe;
+ } else {
+ stale_ctx = m_commit_position_ctx;
- if (m_commit_position_ctx != NULL) {
- m_commit_position_ctx->complete(-ESTALE);
+ m_client.commit_position = commit_position;
+ m_commit_position = commit_position;
+ m_commit_position_ctx = on_safe;
+ schedule_commit_task();
+ }
}
- m_client.commit_position = commit_position;
- m_commit_position = commit_position;
- m_commit_position_ctx = on_safe;
- schedule_commit_task();
+ if (stale_ctx != nullptr) {
+ stale_ctx->complete(-ESTALE);
+ }
}
void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
}
void JournalMetadata::schedule_commit_task() {
+ assert(m_timer_lock.is_locked());
assert(m_lock.is_locked());
- Mutex::Locker timer_locker(m_timer_lock);
if (m_commit_position_task_ctx == NULL) {
m_commit_position_task_ctx = new C_CommitPositionTask(this);
m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
void JournalMetadata::handle_watch_error(int err) {
lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
- Mutex::Locker locker(m_lock);
Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
if (m_initialized && err != -ENOENT) {
schedule_watch_reset();
}
ObjectPlayer::~ObjectPlayer() {
{
+ Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
assert(!m_fetch_in_progress);
assert(m_watch_ctx == NULL);
Mutex::Locker timer_locker(m_timer_lock);
m_watch_interval = interval;
- Mutex::Locker locker(m_lock);
assert(m_watch_ctx == NULL);
m_watch_ctx = on_fetch;
void ObjectPlayer::unwatch() {
ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
Mutex::Locker timer_locker(m_timer_lock);
- Mutex::Locker locker(m_lock);
-
cancel_watch();
m_watch_ctx = NULL;
- m_timer_lock.Unlock();
while (m_watch_in_progress) {
- m_watch_in_progress_cond.Wait(m_lock);
+ m_watch_in_progress_cond.Wait(m_timer_lock);
}
- m_timer_lock.Lock();
}
void ObjectPlayer::front(Entry *entry) const {
void ObjectPlayer::schedule_watch() {
assert(m_timer_lock.is_locked());
- assert(m_lock.is_locked());
if (m_watch_ctx == NULL) {
return;
}
assert(m_timer_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
- {
- Mutex::Locker locker(m_lock);
- assert(m_watch_ctx != NULL);
+ assert(m_watch_ctx != NULL);
- m_watch_in_progress = true;
- m_watch_task = NULL;
- }
+ m_watch_in_progress = true;
+ m_watch_task = NULL;
fetch(new C_WatchFetch(this));
}
Context *on_finish = NULL;
{
Mutex::Locker timer_locker(m_timer_lock);
- Mutex::Locker locker(m_lock);
assert(m_watch_in_progress);
if (r == -ENOENT) {
schedule_watch();
}
{
- Mutex::Locker locker(m_lock);
+ Mutex::Locker locker(m_timer_lock);
m_watch_in_progress = false;
m_watch_in_progress_cond.Signal();
}
bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
FutureImplPtr last_flushed_future;
+ bool schedule_append = false;
{
Mutex::Locker locker(m_lock);
for (AppendBuffers::const_iterator iter = append_buffers.begin();
iter != append_buffers.end(); ++iter) {
- if (append(*iter)) {
+ if (append(*iter, &schedule_append)) {
last_flushed_future = iter->first;
}
}
if (last_flushed_future) {
flush(last_flushed_future);
+ } else if (schedule_append) {
+ schedule_append_task();
+ } else {
+ cancel_append_task();
}
return (m_size + m_pending_bytes >= m_soft_max_size);
}
void ObjectRecorder::flush(Context *on_safe) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ cancel_append_task();
Future future;
{
Mutex::Locker locker(m_lock);
assert(!append_buffers.empty());
future = Future(append_buffers.rbegin()->first);
}
- cancel_append_task();
}
if (future.is_valid()) {
bool ObjectRecorder::close_object() {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ cancel_append_task();
+
Mutex::Locker locker(m_lock);
m_object_closed = true;
- if (flush_appends(true)) {
- cancel_append_task();
- }
+ flush_appends(true);
return m_in_flight_appends.empty();
}
}
}
-bool ObjectRecorder::append(const AppendBuffer &append_buffer) {
+bool ObjectRecorder::append(const AppendBuffer &append_buffer,
+ bool *schedule_append) {
assert(m_lock.is_locked());
bool flush_requested = append_buffer.first->attach(&m_flush_handler);
m_append_buffers.push_back(append_buffer);
m_pending_bytes += append_buffer.second.length();
- if (flush_appends(false)) {
- cancel_append_task();
- } else {
- schedule_append_task();
+ if (!flush_appends(false)) {
+ *schedule_append = true;
}
return flush_requested;
}
ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
<< ", r=" << r << dendl;
- Mutex::Locker locker(m_lock);
- InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
- if (iter == m_in_flight_appends.end()) {
- // must have seen an overflow on a previous append op
- assert(m_overflowed);
- return;
- } else if (r == -EOVERFLOW) {
- m_overflowed = true;
- append_overflowed(tid);
- return;
- }
+ AppendBuffers append_buffers;
+ {
+ Mutex::Locker locker(m_lock);
+ InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+ if (iter == m_in_flight_appends.end()) {
+ // must have seen an overflow on a previous append op
+ assert(m_overflowed);
+ return;
+ } else if (r == -EOVERFLOW) {
+ m_overflowed = true;
+ append_overflowed(tid);
+ return;
+ }
+
+ assert(!m_overflowed || r != 0);
+ append_buffers.swap(iter->second);
+ assert(!append_buffers.empty());
- assert(!m_overflowed || r != 0);
- AppendBuffers &append_buffers = iter->second;
- assert(!append_buffers.empty());
+ m_in_flight_appends.erase(iter);
+ if (m_in_flight_appends.empty() && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ notify_overflow();
+ }
+ }
// Flag the associated futures as complete.
for (AppendBuffers::iterator buf_it = append_buffers.begin();
<< dendl;
buf_it->first->safe(r);
}
- m_in_flight_appends.erase(iter);
-
- if (m_in_flight_appends.empty() && m_object_closed) {
- // all remaining unsent appends should be redirected to new object
- notify_overflow();
- }
}
void ObjectRecorder::append_overflowed(uint64_t tid) {
void cancel_append_task();
void schedule_append_task();
- bool append(const AppendBuffer &append_buffer);
+ bool append(const AppendBuffer &append_buffer, bool *schedule_append);
bool flush_appends(bool force);
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed(uint64_t tid);