switch (state) {
case EXPORT_LOCKING:
dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
+ num_locking_exports--;
it->second.state = EXPORT_CANCELLED;
dir->auth_unpin(this);
break;
mut.swap(it->second.mut);
if (it->second.state == EXPORT_CANCELLED) {
- export_state.erase(it);
- dir->clear_exporting();
- // send pending import_maps?
- cache->maybe_send_pending_resolves();
+ export_cancel_finish(it);
}
// drop locks
}
}
-void Migrator::export_cancel_finish(CDir *dir)
+void Migrator::export_cancel_finish(export_state_iterator& it)
{
+ CDir *dir = it->first;
+ bool unpin = (it->second.state == EXPORT_CANCELLING);
+
+ total_exporting_size -= it->second.approx_size;
+ export_state.erase(it);
+
assert(dir->state_test(CDir::STATE_EXPORTING));
dir->clear_exporting();
- // pinned by Migrator::export_notify_abort()
- dir->auth_unpin(this);
+ if (unpin) {
+ // pinned by Migrator::export_notify_abort()
+ dir->auth_unpin(this);
+ }
// send pending import_maps? (these need to go out when all exports have finished.)
cache->maybe_send_pending_resolves();
}
export_finish(dir);
} else if (p->second.state == EXPORT_CANCELLING) {
if (p->second.notify_ack_waiting.empty()) {
- export_state.erase(p);
- export_cancel_finish(dir);
+ export_cancel_finish(p);
}
}
}
if (running)
return;
running = true;
+
+ uint64_t max_total_size = max_export_size * 2;
+
while (!export_queue.empty() &&
- export_state.size() <= 4) {
+ max_total_size > total_exporting_size &&
+ max_total_size - total_exporting_size >=
+ max_export_size * (num_locking_exports + 1)) {
+
dirfrag_t df = export_queue.front().first;
mds_rank_t dest = export_queue.front().second;
export_queue.pop_front();
export_dir(dir, dest);
}
+
running = false;
}
assert(export_state.count(dir) == 0);
export_state_t& stat = export_state[dir];
+ num_locking_exports++;
stat.state = EXPORT_LOCKING;
stat.peer = dest;
stat.tid = mdr->reqid.tid;
vector<LevelData> stack;
stack.emplace_back(dir);
- uint64_t max_size = g_conf->get_val<uint64_t>("mds_max_export_size");
+ uint64_t max_size = max_export_size;
size_t found_size = 0;
size_t skipped_size = 0;
maybe_split_export(dir, results);
if (results.size() == 1 && results.front().first == dir) {
+ num_locking_exports--;
it->second.state = EXPORT_DISCOVERING;
// send ExportDirDiscover (ask target)
filepath path;
assert(g_conf->mds_kill_export_at != 2);
it->second.last_cum_auth_pins_change = ceph_clock_now();
+ it->second.approx_size = results.front().second;
+ total_exporting_size += it->second.approx_size;
// start the freeze, but hold it up with an auth_pin.
dir->freeze_tree();
_mdr->more()->export_dir = sub;
assert(export_state.count(sub) == 0);
- export_state_t& stat = export_state[sub];
-
+ auto& stat = export_state[sub];
+ num_locking_exports++;
stat.state = EXPORT_LOCKING;
stat.peer = dest;
stat.tid = _mdr->reqid.tid;
!diri->nestlock.can_wrlock(-1)) {
dout(7) << "export_dir couldn't acquire all needed locks, failing. "
<< *dir << dendl;
- // .. unwind ..
- dir->unfreeze_tree();
- cache->try_subtree_merge(dir);
-
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
- export_state.erase(it);
-
- dir->clear_exporting();
- cache->maybe_send_pending_resolves();
+ export_try_cancel(dir);
return;
}
dout(7) << "handle_export_notify_ack from " << m->get_source()
<< ": cancelling export, processing notify on " << *dir << dendl;
if (stat.notify_ack_waiting.empty()) {
- export_state.erase(export_state_entry);
- export_cancel_finish(dir);
+ export_cancel_finish(export_state_entry);
}
}
}
MutationRef mut = it->second.mut;
// remove from exporting list, clean up state
+ total_exporting_size -= it->second.approx_size;
export_state.erase(it);
+
+ assert(dir->state_test(CDir::STATE_EXPORTING));
dir->clear_exporting();
cache->show_subtrees();
in->auth_unpin(this);
}
+Migrator::Migrator(MDSRank *m, MDCache *c) : mds(m), cache(c) {
+ max_export_size = g_conf->get_val<uint64_t>("mds_max_export_size");
+ inject_session_race = g_conf->get_val<bool>("mds_inject_migrator_session_race");
+}
+
void Migrator::handle_conf_change(const struct md_config_t *conf,
const std::set <std::string> &changed,
const MDSMap &mds_map)
{
+ if (changed.count("mds_max_export_size"))
+ max_export_size = conf->get_val<uint64_t>("mds_max_export_size");
+
if (changed.count("mds_inject_migrator_session_race")) {
inject_session_race = conf->get_val<bool>("mds_inject_migrator_session_race");
dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
}
// -- cons --
- Migrator(MDSRank *m, MDCache *c) : mds(m), cache(c) {
- inject_session_race = g_conf->get_val<bool>("mds_inject_migrator_session_race");
- inject_message_loss = g_conf->get_val<int64_t>("mds_inject_migrator_message_loss");
- }
+ Migrator(MDSRank *m, MDCache *c);
void handle_conf_change(const struct md_config_t *conf,
const std::set <std::string> &changed,
protected:
// export fun
struct export_state_t {
- int state;
- mds_rank_t peer;
- uint64_t tid;
+ int state = 0;
+ mds_rank_t peer = MDS_RANK_NONE;
+ uint64_t tid = 0;
set<mds_rank_t> warning_ack_waiting;
set<mds_rank_t> notify_ack_waiting;
map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
MutationRef mut;
+ size_t approx_size = 0;
// for freeze tree deadlock detection
utime_t last_cum_auth_pins_change;
- int last_cum_auth_pins;
- int num_remote_waiters; // number of remote authpin waiters
- export_state_t() : state(0), peer(0), tid(0), mut(),
- last_cum_auth_pins(0), num_remote_waiters(0) {}
+ int last_cum_auth_pins = 0;
+ int num_remote_waiters = 0; // number of remote authpin waiters
+ export_state_t() {}
};
-
map<CDir*, export_state_t> export_state;
+ typedef map<CDir*, export_state_t>::iterator export_state_iterator;
+
+ uint64_t total_exporting_size = 0;
+ unsigned num_locking_exports = 0; // exports in locking state (approx_size == 0)
list<pair<dirfrag_t,mds_rank_t> > export_queue;
void export_go(CDir *dir);
void export_go_synced(CDir *dir, uint64_t tid);
void export_try_cancel(CDir *dir, bool notify_peer=true);
- void export_cancel_finish(CDir *dir);
+ void export_cancel_finish(export_state_iterator& it);
void export_reverse(CDir *dir, export_state_t& stat);
void export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds);
void handle_export_ack(MExportDirAck *m);
private:
MDSRank *mds;
MDCache *cache;
+ uint64_t max_export_size = 0;
bool inject_session_race = false;
int inject_message_loss = 0;
};