// remote peer ACKs image acquire request
remote_peer_ack_nowait(mock_image_map.get(), global_image_ids_ack, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
ASSERT_TRUE(wait_for_listener_notify(remove_global_image_ids_ack.size()));
remote_peer_ack_wait(mock_image_map.get(), remove_global_image_ids_ack, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
ASSERT_TRUE(wait_for_listener_notify(remove_global_image_ids_ack.size() * 2));
remote_peer_ack_wait(mock_image_map.get(), remove_global_image_ids_ack, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
// trigger duplicate "remove" notification
mock_image_map->update_images("uuid1", {}, std::move(remove_global_image_ids_dup));
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
// remote peer ACKs image acquire request
remote_peer_ack_nowait(mock_image_map.get(), initial_global_image_ids_ack, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
ASSERT_TRUE(wait_for_listener_notify(local_remove_global_image_ids_ack.size()));
remote_peer_ack_wait(mock_image_map.get(), local_remove_global_image_ids_ack, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
// completion shuffle action for now (re)mapped images
remote_peer_ack_nowait(mock_image_map.get(), shuffled_global_image_ids, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
// completion shuffle action for now (re)mapped images
remote_peer_ack_nowait(mock_image_map.get(), shuffled_global_image_ids, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
shuffled_global_image_ids.begin(), shuffled_global_image_ids.end(),
std::inserter(reshuffled, reshuffled.begin()));
ASSERT_TRUE(reshuffled.empty());
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
mock_image_map->update_instances_removed({"9876"});
remote_peer_ack_nowait(mock_image_map.get(), shuffled_global_image_ids, -EBLACKLISTED);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
// remote peer ACKs image acquire request
remote_peer_ack_nowait(mock_image_map.get(), shuffled_global_image_ids, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
// remote peer ACKs image acquire request
remote_peer_ack_nowait(mock_image_map.get(), remote_added_global_image_ids_ack, 0);
+
+ wait_for_scheduled_task();
ASSERT_EQ(0, when_shut_down(mock_image_map.get()));
}
Context *on_acquire = new FunctionContext([this, global_image_id](int r) {
queue_acquire_image(global_image_id);
});
- Context *on_finish = new FunctionContext([this, global_image_id, on_update](int r) {
+ Context *on_finish = new FunctionContext([this, global_image_id](int r) {
handle_add_action(global_image_id, r);
- delete on_update;
});
if (m_policy->add_image(global_image_id, on_update, on_acquire, on_finish)) {
queue_release_image(global_image_id);
});
Context *on_remove = new C_RemoveMap(this, global_image_id);
- Context *on_finish = new FunctionContext([this, global_image_id, on_remove](int r) {
+ Context *on_finish = new FunctionContext([this, global_image_id](int r) {
handle_remove_action(global_image_id, r);
- delete on_remove;
});
if (m_policy->remove_image(global_image_id, on_release, on_remove, on_finish)) {
Context *on_acquire = new FunctionContext([this, global_image_id](int r) {
queue_acquire_image(global_image_id);
});
- Context *on_finish = new FunctionContext([this, global_image_id, on_update](int r) {
+ Context *on_finish = new FunctionContext([this, global_image_id](int r) {
handle_shuffle_action(global_image_id, r);
- delete on_update;
});
if (m_policy->shuffle_image(global_image_id, on_release, on_update, on_acquire, on_finish)) {
}
};
+ // context callbacks which are retry-able get deleted after
+ // transiting to the next state.
struct C_UpdateMap : Context {
ImageMap *image_map;
std::string global_image_id;
}
}
-void Action::execute_completion_callback(int r) {
- auto it = context_map.find(StateTransition::STATE_COMPLETE);
- assert(it != context_map.end());
+void Action::state_callback_complete(StateTransition::State state, bool delete_context) {
+ Context *on_state = nullptr;
+
+ auto it = context_map.find(state);
+ if (it != context_map.end()) {
+ std::swap(it->second, on_state);
+ }
+ if (on_state && delete_context) {
+ delete on_state;
+ }
+}
+
+void Action::execute_completion_callback(int r) {
Context *on_finish = nullptr;
- std::swap(it->second, on_finish); // just called once so its swap'd
+
+ for (auto &ctx : context_map) {
+ Context *on_state = nullptr;
+ std::swap(ctx.second, on_state);
+
+ if (ctx.first == StateTransition::STATE_COMPLETE) {
+ on_finish = on_state;
+ } else if (on_state != nullptr) {
+ delete on_state;
+ }
+ }
if (on_finish != nullptr) {
on_finish->complete(r);
Context *on_finish);
void execute_state_callback(StateTransition::State state);
+ void state_callback_complete(StateTransition::State state, bool delete_context);
void execute_completion_callback(int r);
StateTransition::ActionType get_action_type() const;
bool complete;
if (r == 0) {
post_execute_state_callback(global_image_id, action_state.transition.next_state);
- complete = perform_transition(&action_state, action.get_action_type());
+ complete = perform_transition(&action_state, &action);
} else {
- complete = abort_or_retry(&action_state);
+ complete = abort_or_retry(&action_state, &action);
}
if (complete) {
return complete;
}
-bool Policy::perform_transition(ActionState *action_state, StateTransition::ActionType action_type) {
+bool Policy::perform_transition(ActionState *action_state, Action *action) {
dout(20) << dendl;
assert(m_map_lock.is_wlocked());
StateTransition::State state = action_state->transition.next_state;
+ // delete context based on retry_on_error flag
+ action->state_callback_complete(state, action_state->transition.retry_on_error);
- bool complete = is_transition_complete(action_type, &state);
+ bool complete = is_transition_complete(action->get_action_type(), &state);
dout(10) << ": advancing state: " << action_state->current_state << " -> "
<< state << dendl;
return complete;
}
-bool Policy::abort_or_retry(ActionState *action_state) {
+bool Policy::abort_or_retry(ActionState *action_state, Action *action) {
dout(20) << dendl;
assert(m_map_lock.is_wlocked());
bool complete = !action_state->transition.retry_on_error;
- if (complete && action_state->last_idle_state) {
- dout(10) << ": using last idle state=" << action_state->last_idle_state.get()
- << " as current state" << dendl;
- action_state->current_state = action_state->last_idle_state.get();
+ if (complete) {
+ // we aborted, so the context need not be freed
+ action->state_callback_complete(action_state->transition.next_state, false);
+ if (action_state->last_idle_state) {
+ dout(10) << ": using last idle state=" << action_state->last_idle_state.get()
+ << " as current state" << dendl;
+ action_state->current_state = action_state->last_idle_state.get();
+ }
}
return complete;
void post_execute_state_callback(const std::string &global_image_id, StateTransition::State state);
bool is_transition_complete(StateTransition::ActionType action_type, StateTransition::State *state);
- bool perform_transition(ActionState *action_state, StateTransition::ActionType action_type);
- bool abort_or_retry(ActionState *action_state);
+ bool perform_transition(ActionState *action_state, Action *action);
+ bool abort_or_retry(ActionState *action_state, Action *action);
protected:
typedef std::map<std::string, std::set<std::string> > InstanceToImageMap;