#include "messages/MMDSSlaveRequest.h"
#include "messages/MMDSFragmentNotify.h"
+#include "messages/MMDSSnapUpdate.h"
#include "messages/MGatherCaps.h"
// for sending cache expire message
set<CInode*> isolated_inodes;
set<CInode*> refragged_inodes;
+ list<pair<CInode*,int> > updated_realms;
// dirs
for (map<dirfrag_t, MMDSCacheRejoin::dirfrag_strong>::iterator p = ack->strong_dirfrags.begin();
CInode *in = get_inode(ino, last);
assert(in);
bufferlist::iterator q = basebl.begin();
+ snapid_t sseq = 0;
+ if (in->snaprealm)
+ sseq = in->snaprealm->srnode.seq;
in->_decode_base(q);
+ if (in->snaprealm && in->snaprealm->srnode.seq != sseq) {
+ int snap_op = sseq > 0 ? CEPH_SNAP_OP_UPDATE : CEPH_SNAP_OP_SPLIT;
+ updated_realms.push_back(pair<CInode*,int>(in, snap_op));
+ }
dout(10) << " got inode base " << *in << dendl;
}
assert(cap_exports[p->first].empty());
}
+ for (auto p : updated_realms) {
+ CInode *in = p.first;
+ bool notify_clients;
+ if (mds->is_rejoin()) {
+ if (!rejoin_pending_snaprealms.count(in)) {
+ in->get(CInode::PIN_OPENINGSNAPPARENTS);
+ rejoin_pending_snaprealms.insert(in);
+ }
+ notify_clients = false;
+ } else {
+ // notify clients if I'm survivor
+ notify_clients = true;
+ }
+ do_realm_invalidate_and_update_notify(in, p.second, notify_clients);
+ }
+
// done?
assert(rejoin_ack_gather.count(from));
rejoin_ack_gather.erase(from);
if (!survivor) {
-
if (rejoin_gather.empty()) {
// eval unstable scatter locks after all wrlocks are rejoined.
while (!rejoin_eval_locks.empty()) {
}
if (gather.has_subs()) {
+ // for multimds, must succeed the first time
+ assert(recovery_set.empty());
+
dout(10) << "open_snaprealms - waiting for "
<< gather.num_subs_remaining() << dendl;
gather.set_finisher(new C_MDC_OpenSnapRealms(this));
case MSG_MDS_OPENINOREPLY:
handle_open_ino_reply(static_cast<MMDSOpenInoReply *>(m));
break;
+
+ case MSG_MDS_SNAPUPDATE:
+ handle_snap_update(static_cast<MMDSSnapUpdate*>(m));
+ break;
default:
derr << "cache unknown message " << m->get_type() << dendl;
mds->mdlog->flush();
}
-
-void MDCache::do_realm_invalidate_and_update_notify(CInode *in, int snapop, bool nosend)
+void MDCache::do_realm_invalidate_and_update_notify(CInode *in, int snapop, bool notify_clients)
{
dout(10) << "do_realm_invalidate_and_update_notify " << *in->snaprealm << " " << *in << dendl;
vector<inodeno_t> split_inos;
vector<inodeno_t> split_realms;
+ bufferlist snapbl;
- if (snapop == CEPH_SNAP_OP_SPLIT) {
- // notify clients of update|split
- for (elist<CInode*>::iterator p = in->snaprealm->inodes_with_caps.begin(member_offset(CInode, item_caps));
- !p.end(); ++p)
- split_inos.push_back((*p)->ino());
-
- for (set<SnapRealm*>::iterator p = in->snaprealm->open_children.begin();
- p != in->snaprealm->open_children.end();
- ++p)
- split_realms.push_back((*p)->inode->ino());
- }
+ if (notify_clients) {
+ assert(in->snaprealm->have_past_parents_open());
+ if (snapop == CEPH_SNAP_OP_SPLIT) {
+ // notify clients of update|split
+ for (elist<CInode*>::iterator p = in->snaprealm->inodes_with_caps.begin(member_offset(CInode, item_caps));
+ !p.end(); ++p)
+ split_inos.push_back((*p)->ino());
- bufferlist snapbl;
- in->snaprealm->build_snap_trace(snapbl);
+ for (set<SnapRealm*>::iterator p = in->snaprealm->open_children.begin();
+ p != in->snaprealm->open_children.end();
+ ++p)
+ split_realms.push_back((*p)->inode->ino());
+ }
+ in->snaprealm->build_snap_trace(snapbl);
+ }
set<SnapRealm*> past_children;
map<client_t, MClientSnap*> updates;
dout(10) << " realm " << *realm << " on " << *realm->inode << dendl;
realm->invalidate_cached_snaps();
- for (map<client_t, xlist<Capability*>* >::iterator p = realm->client_caps.begin();
- p != realm->client_caps.end();
- ++p) {
- assert(!p->second->empty());
- if (!nosend && updates.count(p->first) == 0) {
- MClientSnap *update = new MClientSnap(snapop);
- update->head.split = in->ino();
- update->split_inos = split_inos;
- update->split_realms = split_realms;
- update->bl = snapbl;
- updates[p->first] = update;
+ if (notify_clients) {
+ for (map<client_t, xlist<Capability*>* >::iterator p = realm->client_caps.begin();
+ p != realm->client_caps.end();
+ ++p) {
+ assert(!p->second->empty());
+ if (updates.count(p->first) == 0) {
+ MClientSnap *update = new MClientSnap(snapop);
+ update->head.split = in->ino();
+ update->split_inos = split_inos;
+ update->split_realms = split_realms;
+ update->bl = snapbl;
+ updates[p->first] = update;
+ }
}
}
q.push_back(*p);
}
- if (!nosend)
+ if (notify_clients)
send_snaps(updates);
// notify past children and their descendants if we update/delete old snapshots
dispatch_request(mdr);
}
+void MDCache::send_snap_update(CInode *in, version_t stid, int snap_op)
+{
+ dout(10) << __func__ << " " << *in << " stid " << stid << dendl;
+
+ bufferlist snap_blob;
+ in->encode_snap(snap_blob);
+
+ set<mds_rank_t> mds_set;
+ mds->mdsmap->get_mds_set_lower_bound(mds_set, MDSMap::STATE_RESOLVE);
+ mds_set.erase(mds->get_nodeid());
+ for (auto p : mds_set) {
+ MMDSSnapUpdate *m = new MMDSSnapUpdate(in->ino(), stid, snap_op);
+ m->snap_blob = snap_blob;
+ mds->send_message_mds(m, p);
+ }
+}
+
+void MDCache::handle_snap_update(MMDSSnapUpdate *m)
+{
+ mds_rank_t from = mds_rank_t(m->get_source().num());
+ dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
+
+ if (mds->get_state() < MDSMap::STATE_RESOLVE &&
+ mds->get_want_state() != CEPH_MDS_STATE_RESOLVE) {
+ m->put();
+ return;
+ }
+
+ mds->snapclient->notify_commit(m->get_tid());
+
+ CInode *in = get_inode(m->get_ino());
+ if (in) {
+ assert(!in->is_auth());
+ if (mds->get_state() > MDSMap::STATE_REJOIN ||
+ (mds->is_rejoin() && !in->is_rejoining())) {
+ bufferlist::iterator p = m->snap_blob.begin();
+ in->decode_snap(p);
+
+ bool notify_clients = true;
+ if (mds->is_rejoin()) {
+ if (rejoin_done) {
+ // rejoin_done is non-null, it means open_snaprealms() hasn't been called
+ if (!rejoin_pending_snaprealms.count(in)) {
+ in->get(CInode::PIN_OPENINGSNAPPARENTS);
+ rejoin_pending_snaprealms.insert(in);
+ }
+ notify_clients = false;
+ }
+ }
+ do_realm_invalidate_and_update_notify(in, m->get_snap_op(), notify_clients);
+ }
+ }
+
+ m->put();
+}
// -------------------------------------------------------------------------------
// STRAYS
mdr->apply();
if (snap_is_new) //only new if strayin exists
- mdcache->do_realm_invalidate_and_update_notify(strayin, CEPH_SNAP_OP_SPLIT, true);
+ mdcache->do_realm_invalidate_and_update_notify(strayin, CEPH_SNAP_OP_SPLIT, false);
mdcache->send_dentry_unlink(dn, straydn, mdr);
bool hadrealm = (oldin->snaprealm ? true : false);
oldin->pop_and_dirty_projected_inode(mdr->ls);
if (oldin->snaprealm && !hadrealm)
- mdcache->do_realm_invalidate_and_update_notify(oldin, CEPH_SNAP_OP_SPLIT);
+ mdcache->do_realm_invalidate_and_update_notify(oldin, CEPH_SNAP_OP_SPLIT, false);
} else {
// FIXME this snaprealm is not filled out correctly
//oldin->open_snaprealm(); might be sufficient..
// create snap
dout(10) << "snaprealm now " << *diri->snaprealm << dendl;
+ // notify other mds
+ mdcache->send_snap_update(diri, mdr->more()->stid, op);
+
mdcache->do_realm_invalidate_and_update_notify(diri, op);
// yay
dout(10) << "snaprealm now " << *diri->snaprealm << dendl;
+ // notify other mds
+ mdcache->send_snap_update(diri, mdr->more()->stid, CEPH_SNAP_OP_DESTROY);
+
mdcache->do_realm_invalidate_and_update_notify(diri, CEPH_SNAP_OP_DESTROY);
// yay
dout(10) << "snaprealm now " << *diri->snaprealm << dendl;
- mdcache->do_realm_invalidate_and_update_notify(diri, CEPH_SNAP_OP_UPDATE, true);
+ // notify other mds
+ mdcache->send_snap_update(diri, mdr->more()->stid, CEPH_SNAP_OP_UPDATE);
+
+ mdcache->do_realm_invalidate_and_update_notify(diri, CEPH_SNAP_OP_UPDATE);
// yay
mdr->in[0] = diri;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MMDSSNAPUPDATE_H
+#define CEPH_MMDSSNAPUPDATE_H
+
+#include "msg/Message.h"
+
+class MMDSSnapUpdate : public Message {
+ inodeno_t ino;
+ __s16 snap_op;
+
+public:
+ inodeno_t get_ino() { return ino; }
+ int get_snap_op() { return snap_op; }
+
+ bufferlist snap_blob;
+
+ MMDSSnapUpdate() : Message(MSG_MDS_SNAPUPDATE) {}
+ MMDSSnapUpdate(inodeno_t i, version_t tid, int op) :
+ Message(MSG_MDS_SNAPUPDATE), ino(i), snap_op(op) {
+ set_tid(tid);
+ }
+private:
+ ~MMDSSnapUpdate() override {}
+
+public:
+ const char *get_type_name() const override { return "snap_update"; }
+ void print(ostream& o) const override {
+ o << "snap_update(" << ino << " table_tid " << get_tid() << ")";
+ }
+
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ encode(ino, payload);
+ encode(snap_op, payload);
+ encode(snap_blob, payload);
+ }
+ void decode_payload() override {
+ using ceph::decode;
+ bufferlist::iterator p = payload.begin();
+ decode(ino, p);
+ decode(snap_op, p);
+ decode(snap_blob, p);
+ }
+};
+
+#endif