}
std::unique_lock async_request_locker{m_async_request_lock};
- m_async_pending.erase(request);
+ mark_async_request_complete(request);
m_async_op_tracker.finish_op();
}
notify_lock_owner->send();
}
+template <typename I>
+bool ImageWatcher<I>::is_new_request(const AsyncRequestId &id) const {
+ ceph_assert(ceph_mutex_is_locked(m_async_request_lock));
+
+ return m_async_pending.count(id) == 0 && m_async_complete.count(id) == 0;
+}
+
+template <typename I>
+bool ImageWatcher<I>::mark_async_request_complete(const AsyncRequestId &id) {
+ ceph_assert(ceph_mutex_is_locked(m_async_request_lock));
+
+ bool found = m_async_pending.erase(id);
+
+ auto now = ceph_clock_now();
+
+ auto it = m_async_complete_expiration.begin();
+ while (it != m_async_complete_expiration.end() && it->first < now) {
+ m_async_complete.erase(it->second);
+ it = m_async_complete_expiration.erase(it);
+ }
+
+ if (m_async_complete.insert(id).second) {
+ auto expiration_time = now;
+ expiration_time += 600;
+ m_async_complete_expiration.insert({expiration_time, id});
+ }
+
+ return found;
+}
+
template <typename I>
Context *ImageWatcher<I>::remove_async_request(const AsyncRequestId &id) {
std::unique_lock async_request_locker{m_async_request_lock};
return -ERESTART;
} else {
std::unique_lock l{m_async_request_lock};
- if (m_async_pending.count(async_request_id) == 0) {
+ if (is_new_request(async_request_id)) {
m_async_pending.insert(async_request_id);
*new_request = true;
*prog_ctx = new RemoteProgressContext(*this, async_request_id);
auto timeout = 2 * watcher::Notifier::NOTIFY_TIMEOUT / 1000;
- if (m_async_pending.count(request) != 0) {
+ if (!is_new_request(request)) {
auto it = m_async_requests.find(request);
if (it != m_async_requests.end()) {
delete it->second.first;
auto on_finish = new LambdaContext(
[this, request](int r) {
std::unique_lock async_request_locker{m_async_request_lock};
- m_async_pending.erase(request);
+ mark_async_request_complete(request);
});
m_image_ctx.state->notify_unquiesce(on_finish);
Context *ImageWatcher<I>::prepare_unquiesce_request(const AsyncRequestId &request) {
{
std::unique_lock async_request_locker{m_async_request_lock};
- bool found = m_async_pending.erase(request);
+ bool found = mark_async_request_complete(request);
if (!found) {
ldout(m_image_ctx.cct, 20) << this << " " << request
<< ": not found in pending" << dendl;
ceph::shared_mutex m_async_request_lock;
std::map<watch_notify::AsyncRequestId, AsyncRequest> m_async_requests;
std::set<watch_notify::AsyncRequestId> m_async_pending;
+ std::set<watch_notify::AsyncRequestId> m_async_complete;
+ std::set<std::pair<utime_t,
+ watch_notify::AsyncRequestId>> m_async_complete_expiration;
ceph::mutex m_owner_client_id_lock;
watch_notify::ClientId m_owner_client_id;
void notify_lock_owner(watch_notify::Payload *payload, Context *on_finish);
+ bool is_new_request(const watch_notify::AsyncRequestId &id) const;
+ bool mark_async_request_complete(const watch_notify::AsyncRequestId &id);
Context *remove_async_request(const watch_notify::AsyncRequestId &id);
void schedule_async_request_timed_out(const watch_notify::AsyncRequestId &id);
void async_request_timed_out(const watch_notify::AsyncRequestId &id);