#include "MDBalancer.h"
#include "MDLog.h"
#include "MDSMap.h"
+#include "Mutation.h"
#include "include/filepath.h"
int state = it->second.state;
switch (state) {
+ case EXPORT_LOCKING:
+ dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
+ it->second.state = EXPORT_CANCELLED;
+ dir->auth_unpin(this);
+ dir->state_clear(CDir::STATE_EXPORTING);
+ break;
+
case EXPORT_DISCOVERING:
dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
it->second.state = EXPORT_CANCELLED;
dir->unfreeze_tree(); // cancel the freeze
dir->auth_unpin(this);
- export_unlock(dir);
export_freeze_finish(dir);
dir->state_clear(CDir::STATE_EXPORTING);
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
dir->unfreeze_tree();
cache->adjust_subtree_auth(dir, mds->get_nodeid());
cache->try_subtree_merge(dir); // NOTE: this may journal subtree_map as side effect
- export_unlock(dir);
dir->state_clear(CDir::STATE_EXPORTING);
if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
if (it->second.state == EXPORT_CANCELLED) {
// wake up any waiters
mds->queue_waiters(it->second.waiting_for_finish);
+ // drop locks
+ if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
+ MDRequest *mdr = dynamic_cast<MDRequest*>(it->second.mut);
+ assert(mdr);
+ if (mdr->more()->waiting_on_slave.empty())
+ mds->mdcache->request_finish(mdr);
+ } else if (it->second.mut) {
+ Mutation *mut = it->second.mut;
+ mds->locker->drop_locks(mut);
+ mut->cleanup();
+ delete mut;
+ }
export_state.erase(it);
-
// send pending import_maps? (these need to go out when all exports have finished.)
cache->maybe_send_pending_resolves();
// - that aren't frozen yet (to avoid auth_pin deadlock)
// - they havne't prepped yet (they may need to discover bounds to do that)
if (p->second.peer == who ||
+ p->second.state == EXPORT_LOCKING ||
p->second.state == EXPORT_DISCOVERING ||
p->second.state == EXPORT_FREEZING ||
p->second.state == EXPORT_PREPPING) {
p != export_state.end();
++p) {
CDir *dir = p->first;
- if (p->second.state == EXPORT_DISCOVERING ||
+ if (p->second.state == EXPORT_LOCKING ||
+ p->second.state == EXPORT_DISCOVERING ||
p->second.state == EXPORT_FREEZING) continue;
assert(dir->is_ambiguous_dir_auth());
assert(dir->authority().first == mds->get_nodeid() ||
void Migrator::maybe_do_queued_export()
{
+ static bool running;
+ if (running)
+ return;
+ running = true;
while (!export_queue.empty() &&
export_state.size() <= 4) {
dirfrag_t df = export_queue.front().first;
export_dir(dir, dest);
}
+ running = false;
}
dout(7) << "already exporting" << dendl;
return;
}
-
- // locks?
- set<SimpleLock*> locks;
- get_export_lock_set(dir, locks);
- if (!mds->locker->rdlock_try_set(locks)) {
- dout(7) << "export_dir can't rdlock needed locks, failing." << dendl;
- return;
- }
- // ok.
- mds->locker->rdlock_take_set(locks);
+ dir->auth_pin(this);
+ dir->state_set(CDir::STATE_EXPORTING);
+
+ MDRequest *mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
+ mdr->more()->export_dir = dir;
assert(export_state.count(dir) == 0);
export_state_t& stat = export_state[dir];
- stat.state = EXPORT_DISCOVERING;
+ stat.state = EXPORT_LOCKING;
stat.peer = dest;
- stat.tid = ++last_export_tid;
- stat.locks.swap(locks);
+ stat.tid = mdr->reqid.tid;
+ stat.mut = mdr;
+
+ dispatch_export_dir(mdr);
+}
+
+void Migrator::dispatch_export_dir(MDRequest *mdr)
+{
+ dout(7) << "dispatch_export_dir " << *mdr << dendl;
+ CDir *dir = mdr->more()->export_dir;
+
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
+ // export must have aborted.
+ dout(7) << "export must have aborted " << *mdr << dendl;
+ mds->mdcache->request_finish(mdr);
+ return;
+ }
+ assert(it->second.state == EXPORT_LOCKING);
+
+ if (mdr->aborted || dir->is_frozen() || dir->is_freezing()) {
+ dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
+ export_try_cancel(dir);
+ return;
+ }
+
+ // locks?
+ set<SimpleLock*> rdlocks;
+ set<SimpleLock*> xlocks;
+ set<SimpleLock*> wrlocks;
+ get_export_lock_set(dir, rdlocks);
+
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, NULL, NULL, true)) {
+ if (mdr->aborted)
+ export_try_cancel(dir);
+ return;
+ }
- dir->state_set(CDir::STATE_EXPORTING);
assert(g_conf->mds_kill_export_at != 1);
+ it->second.state = EXPORT_DISCOVERING;
// send ExportDirDiscover (ask target)
filepath path;
dir->inode->make_path(path);
MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
- mds->get_nodeid(), stat.tid);
- mds->send_message_mds(discover, dest);
+ mds->get_nodeid(),
+ it->second.tid);
+ mds->send_message_mds(discover, it->second.peer);
assert(g_conf->mds_kill_export_at != 2);
- // start the freeze, but hold it up with an auth_pin.
utime_t now = ceph_clock_now(g_ceph_context);
export_freezing_dirs.insert(make_pair(now, dir));
export_freezing_state[dir].start_time = now;
- dir->auth_pin(this);
dir->freeze_tree();
assert(dir->is_freezing_tree());
dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir));
}
-
/*
* called on receipt of MExportDirDiscoverAck
* the importer now has the directory's _inode_ in memory, and pinned.
} else {
assert(it->second.state == EXPORT_DISCOVERING);
// release locks to avoid deadlock
- export_unlock(dir);
+ MDRequest *mdr = dynamic_cast<MDRequest*>(it->second.mut);
+ assert(mdr);
+ mds->mdcache->request_finish(mdr);
+ it->second.mut = NULL;
// freeze the subtree
it->second.state = EXPORT_FREEZING;
dir->auth_unpin(this);
CInode *diri = dir->inode;
// ok, try to grab all my locks.
- set<SimpleLock*> locks;
- get_export_lock_set(dir, locks);
- if (!mds->locker->can_rdlock_set(locks)) {
+ set<SimpleLock*> rdlocks;
+ get_export_lock_set(dir, rdlocks);
+ if (!mds->locker->can_rdlock_set(rdlocks)) {
dout(7) << "export_dir couldn't rdlock all needed locks, failing. "
<< *diri << dendl;
return;
}
- mds->locker->rdlock_take_set(locks);
- it->second.locks.swap(locks);
+ it->second.mut = new Mutation;
+ mds->locker->rdlock_take_set(rdlocks, it->second.mut);
cache->show_subtrees();
// unfreeze
dir->unfreeze_tree();
- export_unlock(dir);
-
cache->show_cache();
}
m->put();
}
-void Migrator::export_unlock(CDir *dir)
-{
- dout(10) << "export_unlock " << *dir << dendl;
-
- mds->locker->rdlock_finish_set(export_state[dir].locks);
- export_state[dir].locks.clear();
-
- list<Context*> ls;
- mds->queue_waiters(ls);
-}
-
void Migrator::export_finish(CDir *dir)
{
dout(5) << "export_finish " << *dir << dendl;
!dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid()))
dir->get_inode()->clear_scatter_dirty();
- // unpin path
- export_unlock(dir);
-
// discard delayed expires
cache->discard_delayed_expire(dir);
// queue finishers
mds->queue_waiters(it->second.waiting_for_finish);
+ // unpin path
+ Mutation *mut = it->second.mut;
+ if (mut) {
+ mds->locker->drop_locks(mut);
+ mut->cleanup();
+ delete mut;
+ }
+
export_state.erase(it);
cache->show_subtrees();
class EImportStart;
+struct Mutation;
class Migrator {
private:
MDS *mds;
MDCache *cache;
- uint64_t last_export_tid;
// -- exports --
public:
// export stages. used to clean up intelligently if there's a failure.
- const static int EXPORT_CANCELLED = 0; // cancelled
- const static int EXPORT_DISCOVERING = 1; // dest is disovering export dir
- const static int EXPORT_FREEZING = 2; // we're freezing the dir tree
- const static int EXPORT_PREPPING = 3; // sending dest spanning tree to export bounds
- const static int EXPORT_WARNING = 4; // warning bystanders of dir_auth_pending
- const static int EXPORT_EXPORTING = 5; // sent actual export, waiting for ack
- const static int EXPORT_LOGGINGFINISH = 6; // logging EExportFinish
- const static int EXPORT_NOTIFYING = 7; // waiting for notifyacks
+ const static int EXPORT_CANCELLED = 0; // cancelled
+ const static int EXPORT_LOCKING = 1; // acquiring locks
+ const static int EXPORT_DISCOVERING = 2; // dest is disovering export dir
+ const static int EXPORT_FREEZING = 3; // we're freezing the dir tree
+ const static int EXPORT_PREPPING = 4; // sending dest spanning tree to export bounds
+ const static int EXPORT_WARNING = 5; // warning bystanders of dir_auth_pending
+ const static int EXPORT_EXPORTING = 6; // sent actual export, waiting for ack
+ const static int EXPORT_LOGGINGFINISH = 7; // logging EExportFinish
+ const static int EXPORT_NOTIFYING = 8; // waiting for notifyacks
static const char *get_export_statename(int s) {
switch (s) {
+ case EXPORT_LOCKING: return "locking";
case EXPORT_DISCOVERING: return "discovering";
case EXPORT_FREEZING: return "freezing";
case EXPORT_PREPPING: return "prepping";
int state;
int peer;
uint64_t tid;
- set<SimpleLock*> locks;
set<int> warning_ack_waiting;
set<int> notify_ack_waiting;
map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
list<Context*> waiting_for_finish;
+ Mutation *mut;
+ export_state_t() : mut(NULL) {}
};
map<CDir*, export_state_t> export_state;
public:
// -- cons --
- Migrator(MDS *m, MDCache *c) : mds(m), cache(c), last_export_tid(0) {}
+ Migrator(MDS *m, MDCache *c) : mds(m), cache(c) {}
void dispatch(Message*);
// -- import/export --
// exporter
public:
+ void dispatch_export_dir(MDRequest *mdr);
void export_dir(CDir *dir, int dest);
void export_empty_import(CDir *dir);
void handle_export_ack(MExportDirAck *m);
void export_logged_finish(CDir *dir);
void handle_export_notify_ack(MExportDirNotifyAck *m);
- void export_unlock(CDir *dir);
void export_finish(CDir *dir);
void export_freeze_finish(CDir *dir) {