}
void* QuiesceAgent::agent_thread_main() {
- working.clear();
- std::unique_lock lock(agent_mutex);
+ std::unique_lock agent_lock(agent_mutex);
+
+ while (!stop_agent_thread) {
+ TrackedRootsVersion old;
- while(!stop_agent_thread) {
if (pending.armed) {
- working.roots.swap(pending.roots);
- working.db_version = pending.db_version;
- } else {
- // copy current roots
- working.roots = current.roots;
- working.db_version = current.db_version;
+ std::swap(old, current);
+ current.roots.swap(pending.roots);
+ current.db_version = pending.db_version;
}
dout(20)
- << "current = " << current.db_version
- << ", working = " << working.db_version
- << ", pending = " << pending.db_version << dendl;
-
- current.armed = false;
- working.armed = true;
+ << "old = " << old.db_version
+ << ", current = " << current.db_version
+ << dendl;
- // it's safe to clear the pending roots under lock because it shouldn't
+ // it's safe to clear the pending roots under agent_lock because it shouldn't
// ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel.
pending.clear();
- lock.unlock();
+ current.armed = true;
+ upkeep_needed = false;
+
+ // for somebody waiting for the internal state to progress
+ agent_cond.notify_all();
+ agent_lock.unlock();
+
+ _agent_thread_will_work();
- QuiesceMap ack(working.db_version);
+ QuiesceMap ack(current.db_version);
// upkeep what we believe is the current state.
- for (auto& [root, info] : working.roots) {
+ for (auto& [root, info] : current.roots) {
info->lock();
bool should_quiesce = info->should_quiesce();
info->unlock();
// TODO: capturing QuiesceAgent& `this` is potentially dangerous
- // the assumption is that since the root pointer is weak
+ // the assumption is that since the tracked root pointer is weak
// it will have been deleted by the QuiesceAgent shutdown sequence
set_upkeep_needed();
}
}
}
- lock.lock();
+ _agent_thread_did_work();
- bool new_version = current.db_version < working.db_version;
- current.roots.swap(working.roots);
- current.db_version = working.db_version;
-
- lock.unlock();
-
- // clear the old roots and send the ack outside of the lock
- working.roots.clear();
+ // send the ack and clear the old roots outside of the lock
+ bool new_version = current.db_version != old.db_version;
if (new_version || !ack.roots.empty()) {
dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl;
int rc = quiesce_control.agent_ack(std::move(ack));
dout(3) << "got error: " << rc << " trying to send " << ack << dendl;
}
}
+ old.clear();
ack.clear();
- lock.lock();
-
- // notify that we're done working on this version and all acks (if any) were sent
- working.clear();
+ agent_lock.lock();
+ current.armed = false;
// a new pending version could be set while we weren't locked
// if that's the case just go for another pass
// otherwise, wait for updates
- if (!pending.armed && !current.armed && !stop_agent_thread) {
+ while (!pending.armed && !current.armed && !upkeep_needed && !stop_agent_thread) {
// for somebody waiting for the thread to idle
agent_cond.notify_all();
- agent_cond.wait(lock);
+ agent_cond.wait(agent_lock);
}
}
agent_cond.notify_all();
{
std::unique_lock l(agent_mutex);
- auto actual_version = std::max(current.db_version, working.db_version);
- bool rollback = actual_version > version;
-
+ bool rollback = current.db_version > version;
+
if (rollback) {
dout(5) << "version rollback to " << version
- << ". current = " << current.db_version
- << ", working = " << working.db_version
+ << ". current = " << current.db_version
<< ", pending = " << pending.db_version << dendl;
}
dout(20)
<< "current = " << current.db_version
- << ", working = " << working.db_version
<< ", pending = " << pending.db_version << dendl;
- current.armed = true;
+ upkeep_needed = true;
agent_cond.notify_all();
}
agent_thread.create("quiesce.agt");
};
- ~QuiesceAgent() {
+ virtual ~QuiesceAgent() {
shutdown();
}
operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
TrackedRootsVersion current;
- TrackedRootsVersion working;
TrackedRootsVersion pending;
std::mutex agent_mutex;
std::condition_variable agent_cond;
bool stop_agent_thread;
+ bool upkeep_needed;
template<class L>
QuiesceDbVersion await_idle_locked(L &lock) {
- agent_cond.wait(lock, [this] {
- return !(current.armed || working.armed || pending.armed);
+ return await_phase_locked(lock, false, false);
+ }
+
+ template <class L>
+ QuiesceDbVersion await_phase_locked(L& lock, bool pending_armed, bool current_armed)
+ {
+ agent_cond.wait(lock, [=, this] {
+ return ( !upkeep_needed
+ && current.armed == current_armed
+ && pending.armed == pending_armed);
});
- return current.db_version;
+ return std::max(current.db_version, pending.db_version);
}
void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
} agent_thread;
void* agent_thread_main();
+
+ virtual void _agent_thread_will_work() { }
+ virtual void _agent_thread_did_work() { }
+
};
QuiesceDbVersion get_latest_version()
{
std::lock_guard l(agent_mutex);
- return std::max({current.db_version, working.db_version, pending.db_version});
+ return std::max(current.db_version, pending.db_version);
}
TrackedRoots& mutable_tracked_roots() {
return current.roots;
std::unique_lock l(agent_mutex);
return await_idle_locked(l);
}
+
+ using TRV = TrackedRootsVersion;
+ std::optional<std::function<void(TRV& pending, TRV& current)>> before_work;
+
+ void _agent_thread_will_work() {
+ auto f = before_work;
+ if (f) {
+ (*f)(pending, current);
+ }
+ }
};
QuiesceMap latest_ack;
std::unordered_map<QuiesceRoot, QuiescingRoot> quiesce_requests;
auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c);
if (!inserted) {
- // we must update the request id so that old one can't cancel this request.
- it->second.first = req_id;
- if (it->second.second) {
- it->second.second->complete(-EINTR);
- it->second.second = c;
- } else {
- // if we have no context, it means we've completed it
- // since we weren't inserted, we must have successfully quiesced
- c->complete(0);
- }
+ // it's a conflict that MDCache doesn't deal with
+ c->complete(-EINPROGRESS);
+ return req_id;
+ } else {
+ return it->second.first;
}
-
- return it->second.first;
};
ci.cancel_request = [this](RequestHandle h) {
}
template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
- bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
+ bool await_idle_v(QuiesceSetVersion v, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
{
- return timed_run(timeout, [this, version] {
- while (version > agent->await_idle()) { };
+ return timed_run(timeout, [this, v] {
+ while (QuiesceDbVersion {1, v} > agent->await_idle()) { };
});
}
EXPECT_TRUE(await_idle());
- // now we should have seen the ack with root2 quiesced
+ // root1 and root2 are still registered internally
+ // so it should result in a failure to quiesce them again
EXPECT_EQ(3, latest_ack.db_version);
- EXPECT_EQ(1, latest_ack.roots.size());
- EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+ EXPECT_EQ(2, latest_ack.roots.size());
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
// the actual state of the pinned objects shouldn't have changed
EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
- EXPECT_EQ(QS_FAILED, pinned2->get_actual_state());
+ EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
EXPECT_EQ(0, *pinned1->quiesce_result);
- EXPECT_EQ(-EINTR, *pinned2->quiesce_result);
+ EXPECT_FALSE(pinned2->quiesce_result.has_value());
- // releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state
+ // releasing the pinned objects should cancel and remove from internal requests
pinned1.reset();
pinned2.reset();
- EXPECT_TRUE(quiesce_requests.contains("root1"));
- EXPECT_TRUE(quiesce_requests.contains("root2"));
+ EXPECT_FALSE(quiesce_requests.contains("root1"));
+ EXPECT_FALSE(quiesce_requests.contains("root2"));
- EXPECT_TRUE(complete_quiesce("root2"));
+ EXPECT_TRUE(complete_quiesce("root3"));
EXPECT_TRUE(await_idle());
EXPECT_EQ(3, latest_ack.db_version);
- EXPECT_EQ(2, latest_ack.roots.size());
- EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
- EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state);
+ EXPECT_EQ(3, latest_ack.roots.size());
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state);
}
TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
EXPECT_EQ(0, tracked.size());
}
}
+
+
+TEST_F(QuiesceAgentTest, RapidDbUpdates)
+{
+ // This validates that the same new root that happens to be reported
+ // more than once before we have chance to process it is not submitted
+ // multiple times
+
+ // set a handler that will post v2 whlie we're working on v1
+ agent->before_work = [this](TestQuiesceAgent::TRV& p, TestQuiesceAgent::TRV& c) {
+ if (c.db_version.set_version != 1) {
+ return;
+ }
+ agent->before_work.reset();
+ auto ack = update(2, {
+ { "root1", QS_QUIESCING },
+ { "root2", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(2, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ };
+
+ {
+ auto ack = update(1, {
+ { "root1", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(1, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle_v(2));
+
+ // nothing should be in the ack
+ // if we incorrectly submit root1 twice
+ // then it should be repored here as FAILED
+ EXPECT_EQ(2, latest_ack.db_version);
+ EXPECT_EQ(0, latest_ack.roots.size());
+
+ {
+ auto tracked = agent->tracked_roots();
+ EXPECT_EQ(2, tracked.size());
+ }
+}
+