void JournalMetadata::shutdown() {
assert(m_initialized);
- m_initialized = false;
+ {
+ Mutex::Locker locker(m_lock);
+ m_initialized = false;
+
+ if (m_watch_handle != 0) {
+ m_ioctx.unwatch2(m_watch_handle);
+ m_watch_handle = 0;
+ }
+ }
if (m_timer != NULL) {
Mutex::Locker locker(m_timer_lock);
m_finisher = NULL;
}
- if (m_watch_handle != 0) {
- m_ioctx.unwatch2(m_watch_handle);
- librados::Rados rados(m_ioctx);
- rados.watch_flush();
- m_watch_handle = 0;
- }
+ librados::Rados rados(m_ioctx);
+ rados.watch_flush();
m_async_op_tracker.wait_for_ops();
m_ioctx.aio_flush();
}
void JournalMetadata::schedule_watch_reset() {
- Mutex::Locker locker(m_timer_lock);
+ assert(m_timer_lock.is_locked());
m_timer->add_event_after(0.1, new C_WatchReset(this));
}
void JournalMetadata::handle_watch_reset() {
+ assert(m_timer_lock.is_locked());
+ if (!m_initialized) {
+ return;
+ }
+
int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
void JournalMetadata::handle_watch_error(int err) {
lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
- schedule_watch_reset();
+ Mutex::Locker locker(m_lock);
+ Mutex::Locker timer_locker(m_timer_lock);
+ if (m_initialized && err != -ENOENT) {
+ schedule_watch_reset();
+ }
}
uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
void JournalMetadata::async_notify_update() {
ldout(m_cct, 10) << "async notifying journal header update" << dendl;
+ C_AioNotify *ctx = new C_AioNotify(this);
librados::AioCompletion *comp =
- librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
bufferlist bl;
int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
comp->release();
}
+void JournalMetadata::handle_notified(int r) {
+ ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
+}
+
} // namespace journal
};
struct C_WatchReset : public Context {
- JournalMetadataPtr journal_metadata;
+ JournalMetadata *journal_metadata;
C_WatchReset(JournalMetadata *_journal_metadata)
- : journal_metadata(_journal_metadata) {}
-
+ : journal_metadata(_journal_metadata) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ virtual ~C_WatchReset() {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
virtual void finish(int r) {
journal_metadata->handle_watch_reset();
}
};
struct C_CommitPositionTask : public Context {
- JournalMetadataPtr journal_metadata;
+ JournalMetadata *journal_metadata;
C_CommitPositionTask(JournalMetadata *_journal_metadata)
- : journal_metadata(_journal_metadata) {}
-
+ : journal_metadata(_journal_metadata) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ virtual ~C_CommitPositionTask() {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
virtual void finish(int r) {
journal_metadata->handle_commit_position_task();
};
};
+ struct C_AioNotify : public Context {
+ JournalMetadata* journal_metadata;
+
+ C_AioNotify(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ virtual ~C_AioNotify() {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ virtual void finish(int r) {
+ journal_metadata->handle_notified(r);
+ }
+ };
+
struct C_NotifyUpdate : public Context {
JournalMetadata* journal_metadata;
Context *on_safe;
void handle_watch_reset();
void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
void handle_watch_error(int err);
+ void handle_notified(int r);
};
} // namespace journal