}
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
- for (auto bd : it->second.residual_dirs) {
- bd->unfreeze_tree();
- cache->try_subtree_merge(bd);
- }
if (notify_peer &&
(!mds->is_cluster_degraded() ||
mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
assert(mdr);
- if (mdr->more()->waiting_on_slave.empty())
- mds->mdcache->request_finish(mdr);
+ mds->mdcache->request_kill(mdr);
} else if (mut) {
mds->locker->drop_locks(mut.get());
mut->cleanup();
}
-class C_M_ExportDirWait : public MigratorContext {
- MDRequestRef mdr;
- int count;
-public:
- C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
- : MigratorContext(m), mdr(mdr), count(count) {}
- void finish(int r) override {
- mig->dispatch_export_dir(mdr, count);
- }
-};
-
-
/** export_dir(dir, dest)
* public method to initiate an export.
* will fail if the directory is freezing, frozen, unpinnable, or root.
stat.tid = mdr->reqid.tid;
stat.mut = mdr;
- return mds->mdcache->dispatch_request(mdr);
+ mds->mdcache->dispatch_request(mdr);
}
-void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
+/*
+ * check if directory is too large to be export in whole. If it is,
+ * choose some subdirs, whose total size is suitable.
+ */
+void Migrator::maybe_split_export(CDir* dir, vector<pair<CDir*, size_t> >& results)
{
- dout(7) << "dispatch_export_dir " << *mdr << dendl;
+ static const unsigned frag_size = 800;
+ static const unsigned inode_size = 1000;
+ static const unsigned cap_size = 80;
+ static const unsigned remote_size = 10;
+ static const unsigned null_size = 1;
+
+ // state for depth-first search
+ struct LevelData {
+ CDir *dir;
+ CDir::dentry_key_map::iterator iter;
+ size_t dirfrag_size = frag_size;
+ size_t subdirs_size = 0;
+ bool complete = true;
+ vector<CDir*> siblings;
+ vector<pair<CDir*, size_t> > subdirs;
+ LevelData(const LevelData&) = default;
+ LevelData(CDir *d) :
+ dir(d), iter(d->begin()) {}
+ };
+
+ vector<LevelData> stack;
+ stack.emplace_back(dir);
+
+ uint64_t max_size = g_conf->get_val<uint64_t>("mds_max_export_size");
+ size_t found_size = 0;
+ size_t skipped_size = 0;
+
+ for (;;) {
+ auto& data = stack.back();
+ CDir *cur = data.dir;
+ auto& it = data.iter;
+ auto& dirfrag_size = data.dirfrag_size;
+
+ while(it != cur->end()) {
+ CDentry *dn = it->second;
+ ++it;
+
+ dirfrag_size += dn->name.size();
+ if (dn->get_linkage()->is_null()) {
+ dirfrag_size += null_size;
+ continue;
+ }
+ if (dn->get_linkage()->is_remote()) {
+ dirfrag_size += remote_size;
+ continue;
+ }
+
+ CInode *in = dn->get_linkage()->get_inode();
+ dirfrag_size += inode_size;
+ dirfrag_size += in->get_client_caps().size() * cap_size;
+
+ if (in->is_dir()) {
+ vector<CDir*> ls;
+ in->get_nested_dirfrags(ls);
+ std::reverse(ls.begin(), ls.end());
+
+ bool complete = true;
+ for (auto p = ls.begin(); p != ls.end(); ) {
+ if ((*p)->state_test(CDir::STATE_EXPORTING) ||
+ (*p)->is_freezing_dir() || (*p)->is_frozen_dir()) {
+ complete = false;
+ p = ls.erase(p);
+ } else {
+ ++p;
+ }
+ }
+ if (!complete) {
+ // skip exporting dir's ancestors. because they can't get
+ // frozen (exporting dir's parent inode is auth pinned).
+ for (auto p = stack.rbegin(); p < stack.rend(); ++p) {
+ if (!p->complete)
+ break;
+ p->complete = false;
+ }
+ }
+ if (!ls.empty()) {
+ stack.emplace_back(ls.back());
+ ls.pop_back();
+ stack.back().siblings.swap(ls);
+ break;
+ }
+ }
+ }
+ // did above loop push new dirfrag into the stack?
+ if (stack.back().dir != cur)
+ continue;
+
+ if (data.complete) {
+ auto cur_size = data.subdirs_size + dirfrag_size;
+ // we can do nothing with large dirfrag
+ if (cur_size >= max_size && found_size * 2 > max_size)
+ break;
+
+ found_size += dirfrag_size;
+
+ if (stack.size() > 1) {
+ auto& parent = stack[stack.size() - 2];
+ parent.subdirs.emplace_back(cur, cur_size);
+ parent.subdirs_size += cur_size;
+ }
+ } else {
+ // can't merge current dirfrag to its parent if there is skipped subdir
+ results.insert(results.end(), data.subdirs.begin(), data.subdirs.end());
+ skipped_size += dirfrag_size;
+ }
+
+ vector<CDir*> ls;
+ ls.swap(data.siblings);
+
+ stack.pop_back();
+ if (stack.empty())
+ break;
+
+ if (found_size >= max_size)
+ break;
+
+ // next dirfrag
+ if (!ls.empty()) {
+ stack.emplace_back(ls.back());
+ ls.pop_back();
+ stack.back().siblings.swap(ls);
+ }
+ }
+
+ for (auto& p : stack)
+ results.insert(results.end(), p.subdirs.begin(), p.subdirs.end());
+
+ if (results.empty())
+ results.emplace_back(dir, found_size + skipped_size);
+}
+class C_M_ExportDirWait : public MigratorContext {
+ MDRequestRef mdr;
+ int count;
+public:
+ C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
+ : MigratorContext(m), mdr(mdr), count(count) {}
+ void finish(int r) override {
+ mig->dispatch_export_dir(mdr, count);
+ }
+};
+
+void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
+{
CDir *dir = mdr->more()->export_dir;
+ dout(7) << "dispatch_export_dir " << *mdr << " " << *dir << dendl;
+
map<CDir*,export_state_t>::iterator it = export_state.find(dir);
if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
// export must have aborted.
dout(7) << "export must have aborted " << *mdr << dendl;
- mds->mdcache->request_finish(mdr);
+ assert(mdr->killed || mdr->aborted);
+ if (mdr->aborted) {
+ mdr->aborted = false;
+ mds->mdcache->request_kill(mdr);
+ }
return;
}
assert(it->second.state == EXPORT_LOCKING);
}
assert(g_conf->mds_kill_export_at != 1);
- it->second.state = EXPORT_DISCOVERING;
-
- // send ExportDirDiscover (ask target)
- filepath path;
- dir->inode->make_path(path);
- MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
- mds->get_nodeid(),
- it->second.tid);
- mds->send_message_mds(discover, dest);
- assert(g_conf->mds_kill_export_at != 2);
-
- it->second.last_cum_auth_pins_change = ceph_clock_now();
-
- // start the freeze, but hold it up with an auth_pin.
- dir->freeze_tree();
- assert(dir->is_freezing_tree());
- dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
+
+ vector<pair<CDir*, size_t> > results;
+ maybe_split_export(dir, results);
+
+ if (results.size() == 1 && results.front().first == dir) {
+ it->second.state = EXPORT_DISCOVERING;
+ // send ExportDirDiscover (ask target)
+ filepath path;
+ dir->inode->make_path(path);
+ MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
+ mds->get_nodeid(),
+ it->second.tid);
+ mds->send_message_mds(discover, dest);
+ assert(g_conf->mds_kill_export_at != 2);
+
+ it->second.last_cum_auth_pins_change = ceph_clock_now();
+
+ // start the freeze, but hold it up with an auth_pin.
+ dir->freeze_tree();
+ assert(dir->is_freezing_tree());
+ dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
+ return;
+ }
+
+ dout(7) << "subtree is too large, splitting it into: " << dendl;
+ for (auto& p : results) {
+ CDir *sub = p.first;
+ assert(sub != dir);
+ dout(7) << " sub " << *sub << dendl;
+
+ sub->auth_pin(this);
+ sub->mark_exporting();
+
+ MDRequestRef _mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
+ _mdr->more()->export_dir = sub;
+
+ assert(export_state.count(sub) == 0);
+ export_state_t& stat = export_state[sub];
+
+ stat.state = EXPORT_LOCKING;
+ stat.peer = dest;
+ stat.tid = _mdr->reqid.tid;
+ stat.mut = _mdr;
+ mds->mdcache->dispatch_request(_mdr);
+ }
+
+ // cancel the original one
+ export_try_cancel(dir);
}
/*
// CDir::_freeze_tree() should have forced it into subtree.
assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
-
- set<client_t> export_client_set;
- check_export_size(dir, it->second, export_client_set);
-
// note the bounds.
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
CDir *bound = *p;
// pin it.
- assert(bound->state_test(CDir::STATE_EXPORTBOUND));
+ bound->get(CDir::PIN_EXPORTBOUND);
+ bound->state_set(CDir::STATE_EXPORTBOUND);
dout(7) << " export bound " << *bound << dendl;
prep->add_bound( bound->dirfrag() );
CDir *cur = bound;
char start = '-';
- if (it->second.residual_dirs.count(bound)) {
- start = 'f';
- cache->replicate_dir(bound, it->second.peer, tracebl);
- dout(7) << " added " << *bound << dendl;
- }
-
while (1) {
// don't repeat inodes
if (inodes_added.count(cur->inode->ino()))
// make sure any new instantiations of caps are flushed out
assert(it->second.warning_ack_waiting.empty());
+ set<client_t> export_client_set;
+ get_export_client_set(dir, export_client_set);
+
MDSGatherBuilder gather(g_ceph_context);
mds->server->flush_client_sessions(export_client_set, gather);
if (gather.has_subs()) {
}
}
-void Migrator::check_export_size(CDir *dir, export_state_t& stat, set<client_t>& client_set)
+void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
{
- const unsigned frag_size = 800;
- const unsigned inode_size = 1000;
- const unsigned cap_size = 80;
- const unsigned link_size = 10;
- const unsigned null_size = 1;
-
- uint64_t max_size = g_conf->get_val<uint64_t>("mds_max_export_size");
- uint64_t approx_size = 0;
-
- list<CDir*> dfs;
+ deque<CDir*> dfs;
dfs.push_back(dir);
while (!dfs.empty()) {
CDir *dir = dfs.front();
dfs.pop_front();
-
- approx_size += frag_size;
- for (auto &p : *dir) {
+ for (auto& p : *dir) {
CDentry *dn = p.second;
- if (dn->get_linkage()->is_null()) {
- approx_size += null_size;
- continue;
- }
- if (dn->get_linkage()->is_remote()) {
- approx_size += link_size;
+ if (!dn->get_linkage()->is_primary())
continue;
- }
-
- approx_size += inode_size;
CInode *in = dn->get_linkage()->get_inode();
if (in->is_dir()) {
// directory?
- list<CDir*> ls;
+ vector<CDir*> ls;
in->get_dirfrags(ls);
- for (auto q : ls) {
- if (q->is_subtree_root()) {
- q->state_set(CDir::STATE_EXPORTBOUND);
- q->get(CDir::PIN_EXPORTBOUND);
- } else {
+ for (auto& q : ls) {
+ if (!q->state_test(CDir::STATE_EXPORTBOUND)) {
// include nested dirfrag
assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
- dfs.push_front(q);
+ dfs.push_back(q); // it's ours, recurse (later)
}
}
}
- for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
- q != in->client_caps.end();
- ++q) {
- approx_size += cap_size;
- client_set.insert(q->first);
+ for (auto& q : in->get_client_caps()) {
+ client_set.insert(q.first);
}
}
-
- if (approx_size >= max_size)
- break;
- }
-
- while (!dfs.empty()) {
- CDir *dir = dfs.front();
- dfs.pop_front();
-
- dout(7) << "check_export_size: creating bound " << *dir << dendl;
- assert(dir->is_auth());
- dir->state_set(CDir::STATE_EXPORTBOUND);
- dir->get(CDir::PIN_EXPORTBOUND);
-
- mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
- // Another choice here is finishing all WAIT_UNFREEZE contexts and keeping
- // the newly created subtree unfreeze.
- dir->_freeze_tree();
-
- stat.residual_dirs.insert(dir);
}
}
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
- for (auto bd : stat.residual_dirs) {
- bd->unfreeze_tree();
- cache->try_subtree_merge(bd);
- }
// revoke/resume stale caps
for (auto in : to_eval) {
// (we do this _after_ removing EXPORTBOUND pins, to allow merges)
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
- for (auto bd : it->second.residual_dirs) {
- export_queue.push_front(pair<dirfrag_t,mds_rank_t>(bd->dirfrag(), it->second.peer));
- bd->take_waiting(CDir::WAIT_ANY_MASK, finished);
- bd->unfreeze_tree();
- cache->try_subtree_merge(bd);
- }
// no more auth subtree? clear scatter dirty
if (!dir->get_inode()->is_auth() &&
int r = cache->path_traverse(null_ref, m, NULL, fpath, &trace, NULL, MDS_TRAVERSE_DISCOVER);
if (r > 0) return;
if (r < 0) {
- dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
+ dout(7) << "handle_export_discover failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
}
dout(7) << " all ready, noting auth and freezing import region" << dendl;
if (!mds->mdcache->is_readonly() &&
- dir->get_inode()->filelock.can_wrlock(-1) &&
- dir->get_inode()->nestlock.can_wrlock(-1)) {
+ diri->filelock.can_wrlock(-1) &&
+ diri->nestlock.can_wrlock(-1)) {
it->second.mut = new MutationImpl();
// force some locks. hacky.
mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);