return out << g_clock.now() << " mds" << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") ";
}
+/*
+ * write caps and lock ids
+ */
+struct cinode_lock_info_t cinode_lock_info[] = {
+ { CEPH_LOCK_IFILE, CEPH_CAP_ANY_FILE_WR },
+ { CEPH_LOCK_IAUTH, CEPH_CAP_AUTH_EXCL },
+ { CEPH_LOCK_ILINK, CEPH_CAP_LINK_EXCL },
+ { CEPH_LOCK_IXATTR, CEPH_CAP_XATTR_EXCL },
+ { CEPH_LOCK_IFLOCK, CEPH_CAP_FLOCK_EXCL }
+};
+int num_cinode_locks = 5;
+
ostream& operator<<(ostream& out, CInode& in)
old.inode = *pi;
old.xattrs = xattrs;
+ old.inode.trim_client_ranges(follows);
+
if (!(old.inode.rstat == old.inode.accounted_rstat))
dirty_old_rstats.insert(follows);
e.time_warp_seq = i->time_warp_seq;
// max_size is min of projected, actual
- e.max_size = MIN(oi->client_ranges.count(client) ? oi->client_ranges[client].last : 0,
- pi->client_ranges.count(client) ? pi->client_ranges[client].last : 0);
+ e.max_size = MIN(oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0,
+ pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0);
e.files = i->dirstat.nfiles;
e.subdirs = i->dirstat.nsubdirs;
m->head.time_warp_seq = i->time_warp_seq;
// max_size is min of projected, actual.
- uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].last : 0;
- uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].last : 0;
+ uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
+ uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
m->head.max_size = MIN(oldms, newms);
i = pauth ? pi:oi;
ostream& operator<<(ostream& out, CInode& in);
+struct cinode_lock_info_t {
+ int lock;
+ int wr_caps;
+};
+
+extern cinode_lock_info_t cinode_lock_info[];
+extern int num_cinode_locks;
// cached inode wrapper
class CInode : public MDSCacheObject {
int replica_caps_wanted; // [replica] what i've requested from auth
utime_t replica_caps_wanted_keep_until;
+ map<int, set<client_t> > client_snap_caps; // [auth] [snap] dirty metadata we still need from the head
+
ceph_lock_state_t fcntl_locks;
ceph_lock_state_t flock_locks;
bool is_base() { return is_root() || is_mdsdir(); }
bool is_system() { return inode.ino < MDS_INO_SYSTEM_BASE; }
+ bool is_head() { return last == CEPH_NOSNAP; }
+
// note: this overloads MDSCacheObject
bool is_ambiguous_auth() {
return state_test(STATE_AMBIGUOUSAUTH) ||
if (need_issue) {
if (pneed_issue)
*pneed_issue = true;
- else
+ else if (in->is_head())
issue_caps(in);
}
}
}
- if (need_issue)
+ if (need_issue && in->is_head())
issue_caps(in);
dout(10) << "eval done" << dendl;
if (!in->xattrlock.is_stable())
eval_gather(&in->xattrlock, false, &need_issue, &finishers);
- if (need_issue)
+ if (need_issue && in->is_head())
issue_caps(in);
finish_contexts(finishers);
mut->cleanup();
delete mut;
- bool sup = false; // avoid sending two caps msgs, one for cap expansion, one for file_max change.
- if (cap && (cap->wanted() & ~cap->pending())) {
- issue_caps(in, cap);
- cap->inc_suppress();
- sup = true;
- }
- if (sup)
- cap->dec_suppress();
+ if (!in->is_head()) {
+ dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
+ // check for snap writeback completion
+ bool gather = false;
+ map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
+ while (p != in->client_snap_caps.end()) {
+ SimpleLock *lock = in->get_lock(p->first);
+ assert(lock);
+ dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
+ << " lock " << *lock << " on " << *in << dendl;
+ lock->put_wrlock();
+ p->second.erase(client);
+ if (p->second.empty()) {
+ gather = true;
+ in->client_snap_caps.erase(p++);
+ } else
+ p++;
+ }
+ if (gather)
+ eval_cap_gather(in);
+ } else {
+ bool sup = false; // avoid sending two caps msgs, one for cap expansion, one for file_max change.
+ if (cap && (cap->wanted() & ~cap->pending())) {
+ issue_caps(in, cap);
+ cap->inc_suppress();
+ sup = true;
+ }
+ if (sup)
+ cap->dec_suppress();
- if (share && in->is_auth() && in->filelock.is_stable())
- share_inode_max_size(in);
+ if (share && in->is_auth() && in->filelock.is_stable())
+ share_inode_max_size(in);
+ }
// unlinked stray? may need to purge (e.g., after all caps are released)
mdcache->maybe_eval_stray(in);
<< " on " << *in << dendl;
}
+ assert(in->is_head());
+
// count conflicts with
int nissued = 0;
for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
Capability *cap = *p;
CInode *in = cap->get_inode();
+ assert(in->is_head());
if (cap->is_stale()) {
dout(10) << " clearing stale flag on " << *in << dendl;
cap->set_stale(false);
};
-void Locker::calc_new_client_ranges(CInode *in, uint64_t size, map<client_t,byte_range_t>& new_ranges)
+void Locker::calc_new_client_ranges(CInode *in, uint64_t size, map<client_t,client_writeable_range_t>& new_ranges)
{
inode_t *latest = in->get_projected_inode();
uint64_t ms = ROUND_UP_TO((size+1)<<1, latest->get_layout_size_increment());
p != in->client_caps.end();
p++) {
if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
- new_ranges[p->first].first = 0;
+ client_writeable_range_t& nr = new_ranges[p->first];
+ nr.range.first = 0;
+ nr.follows = latest->client_ranges[p->first].follows;
if (latest->client_ranges.count(p->first)) {
- uint64_t last = latest->client_ranges[p->first].last;
- new_ranges[p->first].last = MAX(ms, last);
- } else
- new_ranges[p->first].last = ms;
+ client_writeable_range_t& oldr = latest->client_ranges[p->first];
+ nr.range.last = MAX(ms, oldr.range.last);
+ nr.follows = oldr.follows;
+ } else {
+ nr.range.last = ms;
+ nr.follows = in->first - 1;
+ }
}
}
-
}
bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
assert(in->is_auth());
inode_t *latest = in->get_projected_inode();
- map<client_t,byte_range_t> new_ranges;
+ map<client_t, client_writeable_range_t> new_ranges;
uint64_t size = latest->size;
if (update_size)
size = new_size;
Capability *cap = 0;
if (in)
cap = in->get_client_cap(client);
- if (!cap)
+ if (!cap && in != head_in)
cap = head_in->get_client_cap(client);
if (!cap) {
dout(7) << "handle_client_caps no cap for client" << client << " on " << *in << dendl;
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
}
- if (!_do_cap_update(in, cap, m->get_dirty(), follows, m, ack)) {
- if (ack)
- mds->send_message_client_counted(ack, m->get_connection());
- eval_cap_gather(in);
- }
- // remove cap _after_ do_cap_update() (which takes the Capability*)
- if (cap->get_inode()->last < CEPH_NOSNAP) {
- dout(10) << " flushsnap releasing cloned cap" << dendl;
- cap->get_inode()->remove_client_cap(client);
- } else {
- dout(10) << " flushsnap NOT releasing live cap" << dendl;
- }
+ _do_snap_update(in, m->get_dirty(), follows, m, ack);
} else
dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
goto out;
}
-
- // for this and all subsequent versions of this inode,
- while (1) {
- if (cap->get_cap_id() != m->get_cap_id()) {
- dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
- } else {
- // filter wanted based on what we could ever give out (given auth/replica status)
- cap->confirm_receipt(m->get_seq(), m->get_caps());
- dout(10) << " follows " << follows
- << " retains " << ccap_string(m->get_caps())
- << " dirty " << ccap_string(m->get_caps())
- << " on " << *in << dendl;
-
- MClientCaps *ack = 0;
-
- if (m->get_dirty() && in->is_auth()) {
- dout(7) << " flush client" << client << " dirty " << ccap_string(m->get_dirty())
- << " seq " << m->get_seq() << " on " << *in << dendl;
- ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
- m->get_caps(), 0, m->get_dirty(), 0);
- ack->set_client_tid(m->get_client_tid());
+ if (cap->get_cap_id() != m->get_cap_id()) {
+ dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
+ } else {
+ // intermediate snap inodes
+ while (in != head_in) {
+ assert(in->last != CEPH_NOSNAP);
+ if (in->is_auth() && m->get_dirty()) {
+ dout(10) << " updating intermediate snapped inode " << *in << dendl;
+ _do_cap_update(in, NULL, m->get_dirty(), follows, m, NULL);
}
- int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
- if (new_wanted != cap->wanted()) {
- if (new_wanted & ~cap->wanted()) {
- // exapnding caps. make sure we aren't waiting for a log flush
- if (!in->filelock.is_stable() ||
- !in->authlock.is_stable() ||
- !in->xattrlock.is_stable())
- mds->mdlog->flush();
- }
-
- adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
+ in = mdcache->pick_inode_snap(in, in->last);
+ }
+
+ // head inode, and cap
+ MClientCaps *ack = 0;
+
+ cap->confirm_receipt(m->get_seq(), m->get_caps());
+ dout(10) << " follows " << follows
+ << " retains " << ccap_string(m->get_caps())
+ << " dirty " << ccap_string(m->get_caps())
+ << " on " << *in << dendl;
+
+ if (m->get_dirty() && in->is_auth()) {
+ dout(7) << " flush client" << client << " dirty " << ccap_string(m->get_dirty())
+ << " seq " << m->get_seq() << " on " << *in << dendl;
+ ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
+ m->get_caps(), 0, m->get_dirty(), 0);
+ ack->set_client_tid(m->get_client_tid());
+ }
+
+ // filter wanted based on what we could ever give out (given auth/replica status)
+ int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
+ if (new_wanted != cap->wanted()) {
+ if (new_wanted & ~cap->wanted()) {
+ // exapnding caps. make sure we aren't waiting for a log flush
+ if (!in->filelock.is_stable() ||
+ !in->authlock.is_stable() ||
+ !in->xattrlock.is_stable())
+ mds->mdlog->flush();
}
- if (m->get_op() == CEPH_CAP_OP_DROP)
- can_issue = false;
-
- if (in->is_auth() &&
- _do_cap_update(in, cap, m->get_dirty(), follows, m, ack)) {
- // updated, cap msg is delayed
- cap->inc_suppress();
- eval(in, CEPH_CAP_LOCKS);
- cap->dec_suppress();
- if (cap->wanted() & ~cap->pending())
- mds->mdlog->flush();
- } else {
- // no update, ack now.
- if (ack)
- mds->send_message_client_counted(ack, m->get_connection());
+ adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
+ }
+ if (m->get_op() == CEPH_CAP_OP_DROP)
+ can_issue = false;
- bool did_issue = eval(in, CEPH_CAP_LOCKS);
- if (!did_issue && (cap->wanted() & ~cap->pending()))
- issue_caps(in, cap);
- }
+ if (in->is_auth() &&
+ _do_cap_update(in, cap, m->get_dirty(), follows, m, ack)) {
+ // updated, cap msg is delayed
+ cap->inc_suppress();
+ eval(in, CEPH_CAP_LOCKS);
+ cap->dec_suppress();
+
+ if (cap->wanted() & ~cap->pending())
+ mds->mdlog->flush();
+ } else {
+ // no update, ack now.
+ if (ack)
+ mds->send_message_client_counted(ack, m->get_connection());
+
+ bool did_issue = eval(in, CEPH_CAP_LOCKS);
+ if (!did_issue && (cap->wanted() & ~cap->pending()))
+ issue_caps(in, cap);
}
-
- // done?
- if (in->last == CEPH_NOSNAP)
- break;
-
- // next!
- in = mdcache->pick_inode_snap(in, in->last);
- cap = in->get_client_cap(client);
- assert(cap);
}
-
+
out:
m->put();
}
return t + 1;
}
+void Locker::_do_snap_update(CInode *in, int dirty, snapid_t follows, MClientCaps *m, MClientCaps *ack)
+{
+ dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
+ << " follows " << follows
+ << " on " << *in << dendl;
+
+ SnapRealm *realm = in->find_snaprealm();
+ snapid_t snap = realm->get_snap_following(follows);
+
+ dout(10) << " snap is " << snap << dendl;
+ if (snap == CEPH_NOSNAP) {
+ // hmm, i guess snap was already deleted? just ack!
+ dout(10) << " wow, the snap following " << follows
+ << " was already deleted. nothing to record, just ack." << dendl;
+ mds->send_message_client_counted(ack, m->get_connection());
+ return;
+ }
+
+ client_t client = m->get_source().num();
+
+ EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
+ mds->mdlog->start_entry(le);
+ Mutation *mut = new Mutation;
+ mut->ls = mds->mdlog->get_current_segment();
+
+ // normal metadata updates that we can apply to the head as well.
+
+ // xattrs update?
+ map<string,bufferptr> *px = 0;
+ if ((dirty & CEPH_CAP_XATTR_EXCL) &&
+ m->xattrbl.length() &&
+ m->head.xattr_version > in->get_projected_inode()->xattr_version)
+ px = new map<string,bufferptr>;
+
+ inode_t *pi = in->project_inode(px);
+ pi->version = in->pre_dirty();
+
+ _update_cap_fields(in, dirty, m, pi);
+
+ // xattr
+ if (px) {
+ dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
+ pi->xattr_version = m->head.xattr_version;
+ bufferlist::iterator p = m->xattrbl.begin();
+ ::decode(*px, p);
+ }
+
+ if (pi->client_ranges.count(client)) {
+ if (in->last == follows+1) {
+ dout(10) << " removing client_range entirely" << dendl;
+ pi->client_ranges.erase(client);
+ } else {
+ dout(10) << " client_range now follows " << snap << dendl;
+ pi->client_ranges[client].follows = snap;
+ }
+ }
+
+ mut->auth_pin(in);
+ mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
+ mdcache->journal_dirty_inode(mut, &le->metablob, in, follows);
+
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_sync(new C_Locker_FileUpdate_finish(this, in, mut, false,
+ client, NULL, ack));
+}
+
+
+void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
+{
+ // file
+ if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
+ utime_t atime = m->get_atime();
+ utime_t mtime = m->get_mtime();
+ utime_t ctime = m->get_ctime();
+ uint64_t size = m->get_size();
+
+ if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
+ ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
+ dout(7) << " mtime " << pi->mtime << " -> " << mtime
+ << " for " << *in << dendl;
+ pi->mtime = mtime;
+ }
+ if (ctime > pi->ctime) {
+ dout(7) << " ctime " << pi->ctime << " -> " << ctime
+ << " for " << *in << dendl;
+ pi->ctime = ctime;
+ }
+ if (in->inode.is_file() && // ONLY if regular file
+ size > pi->size) {
+ dout(7) << " size " << pi->size << " -> " << size
+ << " for " << *in << dendl;
+ pi->size = size;
+ pi->rstat.rbytes = size;
+ }
+ if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
+ dout(7) << " atime " << pi->atime << " -> " << atime
+ << " for " << *in << dendl;
+ pi->atime = atime;
+ }
+ if ((dirty & CEPH_CAP_FILE_EXCL) &&
+ ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
+ dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
+ << " for " << *in << dendl;
+ pi->time_warp_seq = m->get_time_warp_seq();
+ }
+ }
+ // auth
+ if (dirty & CEPH_CAP_AUTH_EXCL) {
+ if (m->head.uid != pi->uid) {
+ dout(7) << " uid " << pi->uid
+ << " -> " << m->head.uid
+ << " for " << *in << dendl;
+ pi->uid = m->head.uid;
+ }
+ if (m->head.gid != pi->gid) {
+ dout(7) << " gid " << pi->gid
+ << " -> " << m->head.gid
+ << " for " << *in << dendl;
+ pi->gid = m->head.gid;
+ }
+ if (m->head.mode != pi->mode) {
+ dout(7) << " mode " << oct << pi->mode
+ << " -> " << m->head.mode << dec
+ << " for " << *in << dendl;
+ pi->mode = m->head.mode;
+ }
+ }
+
+}
+
/*
* update inode based on cap flush|flushsnap|wanted.
* adjust max_size, if needed.
MClientCaps *ack)
{
dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
- << " issued " << ccap_string(cap->issued())
- << " wanted " << ccap_string(cap->wanted())
+ << " issued " << ccap_string(cap ? cap->issued() : 0)
+ << " wanted " << ccap_string(cap ? cap->wanted() : 0)
<< " on " << *in << dendl;
assert(in->is_auth());
client_t client = m->get_source().num();
// increase or zero max_size?
uint64_t size = m->get_size();
bool change_max = false;
- uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].last : 0;
+ uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
uint64_t new_max = old_max;
if (in->is_file()) {
- if ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR) {
+ if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
if (m->get_max_size() > new_max) {
dout(10) << "client requests file_max " << m->get_max_size()
<< " > max " << old_max << dendl;
!in->filelock.can_wrlock(client) &&
!in->filelock.can_force_wrlock(client)) {
dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
+ assert(in->last == CEPH_NOSNAP);
if (in->filelock.is_stable()) {
bool need_issue = false;
cap->inc_suppress();
Mutation *mut = new Mutation;
mut->ls = mds->mdlog->get_current_segment();
- // file
- if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
- utime_t atime = m->get_atime();
- utime_t mtime = m->get_mtime();
- utime_t ctime = m->get_ctime();
- uint64_t size = m->get_size();
-
- if (((dirty & CEPH_CAP_FILE_WR) && mtime > latest->mtime) ||
- ((dirty & CEPH_CAP_FILE_EXCL) && mtime != latest->mtime)) {
- dout(7) << " mtime " << pi->mtime << " -> " << mtime
- << " for " << *in << dendl;
- pi->mtime = mtime;
- }
- if (ctime > latest->ctime) {
- dout(7) << " ctime " << pi->ctime << " -> " << ctime
- << " for " << *in << dendl;
- pi->ctime = ctime;
- }
- if (in->inode.is_file() && // ONLY if regular file
- size > latest->size) {
- dout(7) << " size " << pi->size << " -> " << size
- << " for " << *in << dendl;
- pi->size = size;
- pi->rstat.rbytes = size;
- }
- if ((dirty & CEPH_CAP_FILE_EXCL) && atime != latest->atime) {
- dout(7) << " atime " << pi->atime << " -> " << atime
- << " for " << *in << dendl;
- pi->atime = atime;
- }
- if ((dirty & CEPH_CAP_FILE_EXCL) &&
- ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
- dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
- << " for " << *in << dendl;
- pi->time_warp_seq = m->get_time_warp_seq();
- }
- }
+ _update_cap_fields(in, dirty, m, pi);
+
if (change_max) {
dout(7) << " max_size " << old_max << " -> " << new_max
<< " for " << *in << dendl;
if (new_max) {
- pi->client_ranges[client].first = 0;
- pi->client_ranges[client].last = new_max;
+ pi->client_ranges[client].range.first = 0;
+ pi->client_ranges[client].range.last = new_max;
+ pi->client_ranges[client].follows = in->first - 1;
} else
pi->client_ranges.erase(client);
}
wrlock_force(&in->filelock, mut); // wrlock for duration of journal
// auth
- if (dirty & CEPH_CAP_AUTH_EXCL) {
- if (m->head.uid != latest->uid) {
- dout(7) << " uid " << pi->uid
- << " -> " << m->head.uid
- << " for " << *in << dendl;
- pi->uid = m->head.uid;
- }
- if (m->head.gid != latest->gid) {
- dout(7) << " gid " << pi->gid
- << " -> " << m->head.gid
- << " for " << *in << dendl;
- pi->gid = m->head.gid;
- }
- if (m->head.mode != latest->mode) {
- dout(7) << " mode " << oct << pi->mode
- << " -> " << m->head.mode << dec
- << " for " << *in << dendl;
- pi->mode = m->head.mode;
- }
-
+ if (dirty & CEPH_CAP_AUTH_EXCL)
wrlock_force(&in->authlock, mut);
- }
// xattr
if (px) {
protected:
void adjust_cap_wanted(Capability *cap, int wanted, int issue_seq);
void handle_client_caps(class MClientCaps *m);
+ void _update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi);
+ void _do_snap_update(CInode *in, int dirty, snapid_t follows, MClientCaps *m, MClientCaps *ack);
bool _do_cap_update(CInode *in, Capability *cap, int dirty, snapid_t follows, MClientCaps *m,
MClientCaps *ack=0);
void handle_client_cap_release(class MClientCapRelease *m);
void file_update_finish(CInode *in, Mutation *mut, bool share, client_t client, Capability *cap,
MClientCaps *ack);
public:
- void calc_new_client_ranges(CInode *in, uint64_t size, map<client_t,byte_range_t>& new_ranges);
+ void calc_new_client_ranges(CInode *in, uint64_t size, map<client_t, client_writeable_range_t>& new_ranges);
bool check_inode_max_size(CInode *in, bool force_wrlock=false, bool update_size=false, uint64_t newsize=0,
utime_t mtime=utime_t());
void share_inode_max_size(CInode *in);
oldin->symlink = in->symlink;
oldin->xattrs = in->xattrs;
+ oldin->inode.trim_client_ranges(last);
+
in->first = last+1;
dout(10) << "cow_inode " << *in << " to " << *oldin << dendl;
p++) {
client_t client = p->first;
Capability *cap = p->second;
- if ((cap->issued() & CEPH_CAP_ANY_WR) &&
+ int issued = cap->issued();
+ if ((issued & CEPH_CAP_ANY_WR) &&
cap->client_follows <= oldin->first) {
- // clone to oldin
- Capability *newcap = oldin->add_client_cap(client, 0, in->containing_realm);
- cap->item_session_caps.get_list()->push_back(&newcap->item_session_caps);
- newcap->issue(cap->issued());
- newcap->set_last_issue_stamp(cap->get_last_issue_stamp());
- newcap->client_follows = cap->client_follows;
- dout(10) << " cloning client" << client << " wr cap " << cap
- << " follows " << cap->client_follows
- << " to " << newcap << " on cloned inode" << dendl;
+ // note in oldin
+ for (int i = 0; i < num_cinode_locks; i++) {
+ if (issued & cinode_lock_info[i].wr_caps) {
+ int lockid = cinode_lock_info[i].lock;
+ SimpleLock *lock = oldin->get_lock(lockid);
+ assert(lock);
+ oldin->client_snap_caps[lockid].insert(client);
+ oldin->auth_pin(lock);
+ lock->set_state(LOCK_SNAP_SYNC); // gathering
+ lock->get_wrlock(true);
+ dout(10) << " client" << client << " cap " << ccap_string(issued & cinode_lock_info[i].wr_caps)
+ << " wrlock lock " << *lock << " on " << *oldin << dendl;
+ }
+ }
cap->client_follows = last;
} else {
- dout(10) << " not cloning client" << client << " cap follows " << cap->client_follows << dendl;
+ dout(10) << " ignoring client" << client << " cap follows " << cap->client_follows << dendl;
}
}
- if (oldin->is_any_caps())
- oldin->filelock.set_state(LOCK_LOCK);
- else if (oldin->inode.client_ranges.size()) {
- dout(10) << "cow_inode WARNING client_ranges " << oldin->inode.client_ranges << " on " << *oldin << dendl;
- //oldin->inode.max_size = 0;
- }
return oldin;
}
}
in->auth_unpin(this);
- mds->locker->issue_caps(in);
+ if (in->is_head())
+ mds->locker->issue_caps(in);
}
}
}
p != inode_map.end();
++p) {
CInode *in = p->second;
- if (in->is_any_caps()) {
+ if (in->is_head() && in->is_any_caps()) {
if (!mds->locker->eval(in, CEPH_CAP_LOCKS))
mds->locker->issue_caps(in);
}
continue;
bool recover = false;
- for (map<client_t,byte_range_t>::iterator p = in->inode.client_ranges.begin();
+ for (map<client_t,client_writeable_range_t>::iterator p = in->inode.client_ranges.begin();
p != in->inode.client_ranges.end();
p++) {
Capability *cap = in->get_client_cap(p->first);
dout(15) << "session " << session << " not in sessionmap!" << dendl;
} else if (m->cmd[0] == "issue_caps") {
long inum = strtol(m->cmd[1].c_str(), 0, 10);
- CInode * ino = mdcache->get_inode(inodeno_t(inum));
- if (ino) {
- bool r = locker->issue_caps(ino);
+ CInode *in = mdcache->get_inode(inodeno_t(inum));
+ if (in) {
+ bool r = locker->issue_caps(in);
dout(20) << "called issue_caps on inode " << inum
<< " with result " << r << dendl;
} else dout(15) << "inode " << inum << " not in mdcache!" << dendl;
reply_request(mdr, -EROFS);
return;
}
+
// can only open a dir with mode FILE_MODE_PIN, at least for now.
if (cur->inode.is_dir())
cmode = CEPH_FILE_MODE_PIN;
in->inode.version = dn->pre_dirty();
if (cmode & CEPH_FILE_MODE_WR) {
- in->inode.client_ranges[client].first = 0;
- in->inode.client_ranges[client].last = in->inode.get_layout_size_increment();
+ in->inode.client_ranges[client].range.first = 0;
+ in->inode.client_ranges[client].range.last = in->inode.get_layout_size_increment();
+ in->inode.client_ranges[client].follows = follows;
}
in->inode.rstat.rfiles = 1;
pi->mtime = now;
// adjust client's max_size?
- map<client_t,byte_range_t> new_ranges;
+ map<client_t,client_writeable_range_t> new_ranges;
mds->locker->calc_new_client_ranges(cur, pi->size, new_ranges);
if (pi->client_ranges != new_ranges) {
dout(10) << " client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
pi->truncate_seq++;
if (cmode & CEPH_FILE_MODE_WR) {
- pi->client_ranges[client].first = 0;
- pi->client_ranges[client].last = pi->get_layout_size_increment();
+ pi->client_ranges[client].range.first = 0;
+ pi->client_ranges[client].range.last = pi->get_layout_size_increment();
+ pi->client_ranges[client].follows = in->find_snaprealm()->get_newest_seq();
}
mdr->ls = mdlog->get_current_segment();
case LOCK_PRE_SCAN: return "*->scan";
case LOCK_SCAN: return "scan";
+ case LOCK_SNAP_SYNC: return "snap->sync";
+
default: assert(0); return 0;
}
}
[LOCK_SYNC] = { 0, false, LOCK_SYNC, ANY, 0, ANY, 0, 0, ANY, 0, CEPH_CAP_GSHARED,0,0,CEPH_CAP_GSHARED },
[LOCK_LOCK_SYNC] = { LOCK_SYNC, false, LOCK_LOCK, ANY, XCL, XCL, 0, 0, XCL, 0, 0,0,0,0 },
[LOCK_EXCL_SYNC] = { LOCK_SYNC, true, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, 0,CEPH_CAP_GSHARED,0,0 },
+ [LOCK_SNAP_SYNC] = { LOCK_SYNC, false, LOCK_LOCK, 0, 0, 0, 0, AUTH,0, 0, 0,0,0,0 },
[LOCK_LOCK] = { 0, false, LOCK_LOCK, AUTH, 0, FW, 0, 0, 0, 0, 0,0,0,0 },
[LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, ANY, 0, 0, 0, 0, 0, 0, 0,0,0,0 },
[LOCK_LOCK_EXCL] = { LOCK_EXCL, false, LOCK_LOCK, ANY, 0, 0, 0, 0, 0, 0, CEPH_CAP_GSHARED,0,0,0 },
[LOCK_REMOTEXLOCK]={ LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, 0, 0, 0, 0,0,0,0 },
+
};
struct sm_t sm_simplelock = {
[LOCK_SYNC] = { 0, false, LOCK_SYNC, ANY, 0, ANY, 0, 0, ANY, 0, CEPH_CAP_GSHARED,0,0,CEPH_CAP_GSHARED },
[LOCK_LOCK_SYNC] = { LOCK_SYNC, false, LOCK_LOCK, AUTH, 0, 0, 0, 0, 0, 0, 0,0,0,0 },
[LOCK_MIX_SYNC] = { LOCK_SYNC, false, LOCK_LOCK, 0, 0, 0, 0, 0, 0, 0, 0,0,0,0 },
-
+ [LOCK_SNAP_SYNC] = { LOCK_SYNC, false, LOCK_LOCK, 0, 0, 0, 0, AUTH,0, 0, 0,0,0,0 },
+
[LOCK_LOCK] = { 0, false, LOCK_LOCK, AUTH, 0, FW, AUTH,0, 0, ANY, 0,0,0,0 },
[LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, 0, 0, 0, 0, 0, 0,0,0,0 },
[LOCK_MIX_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, 0, 0, 0, 0,0,0,0 },
[LOCK_EXCL_SYNC] = { LOCK_SYNC, true, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, 0,CEPH_CAP_GSHARED|CEPH_CAP_GCACHE|CEPH_CAP_GRD,0,0 },
[LOCK_MIX_SYNC] = { LOCK_SYNC, false, LOCK_MIX, 0, 0, 0, 0, 0, 0, 0, CEPH_CAP_GRD|CEPH_CAP_GLAZYIO,0,0,CEPH_CAP_GRD },
[LOCK_MIX_SYNC2] = { LOCK_SYNC, false, 0, 0, 0, 0, 0, 0, 0, 0, CEPH_CAP_GRD|CEPH_CAP_GLAZYIO,0,0,CEPH_CAP_GRD },
-
+ [LOCK_SNAP_SYNC] = { LOCK_SYNC, false, LOCK_LOCK, 0, 0, 0, 0, AUTH,0, 0, 0,0,0,0 },
+
[LOCK_LOCK] = { 0, false, LOCK_LOCK, AUTH, 0, REQ, AUTH,0, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,0 },
[LOCK_SYNC_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, AUTH, 0, REQ, 0, 0, 0, 0, CEPH_CAP_GCACHE,0,0,CEPH_CAP_GCACHE },
[LOCK_EXCL_LOCK] = { LOCK_LOCK, false, LOCK_LOCK, 0, 0, 0, 0, XCL, 0, 0, CEPH_CAP_GCACHE|CEPH_CAP_GBUFFER,0,0,CEPH_CAP_GCACHE },
#define LOCK_SCAN 30
#define LOCK_SCAN_LOCK 31
-#define LOCK_MAX 32
+#define LOCK_SNAP_SYNC 32
+
+#define LOCK_MAX 33
// -------------------------
// lock actions
return l.first == r.first && l.last == r.last;
}
+
+struct client_writeable_range_t {
+ byte_range_t range;
+ snapid_t follows; // aka "data+metadata flushed thru"
+
+ void encode(bufferlist &bl) const {
+ __u8 v = 1;
+ ::encode(v, bl);
+ ::encode(range, bl);
+ ::encode(follows, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ __u8 v;
+ ::decode(v, bl);
+ ::decode(range, bl);
+ ::decode(follows, bl);
+ }
+};
+WRITE_CLASS_ENCODER(client_writeable_range_t)
+
+inline ostream& operator<<(ostream& out, const client_writeable_range_t& r)
+{
+ return out << r.range << "@" << r.follows;
+}
+inline bool operator==(const client_writeable_range_t& l, const client_writeable_range_t& r) {
+ return l.range == r.range && l.follows == r.follows;
+}
+
+
+
inline ostream& operator<<(ostream& out, ceph_filelock& l) {
out << "start: " << l.start << ", length: " << l.length
<< ", client: " << l.client << ", pid: " << l.pid
utime_t atime; // file data access time.
uint32_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes())
- map<client_t,byte_range_t> client_ranges; // client(s) can write to these ranges
+ map<client_t,client_writeable_range_t> client_ranges; // client(s) can write to these ranges
// dirfrag, recursive accountin
frag_info_t dirstat;
uint64_t get_max_size() const {
uint64_t max = 0;
- for (map<client_t,byte_range_t>::const_iterator p = client_ranges.begin();
+ for (map<client_t,client_writeable_range_t>::const_iterator p = client_ranges.begin();
p != client_ranges.end();
++p)
- if (p->second.last > max)
- max = p->second.last;
+ if (p->second.range.last > max)
+ max = p->second.range.last;
return max;
}
void set_max_size(uint64_t new_max) {
if (new_max == 0) {
client_ranges.clear();
} else {
- for (map<client_t,byte_range_t>::iterator p = client_ranges.begin();
+ for (map<client_t,client_writeable_range_t>::iterator p = client_ranges.begin();
p != client_ranges.end();
++p)
- p->second.last = new_max;
+ p->second.range.last = new_max;
+ }
+ }
+
+ void trim_client_ranges(snapid_t last) {
+ map<client_t, client_writeable_range_t>::iterator p = client_ranges.begin();
+ while (p != client_ranges.end()) {
+ if (p->second.follows >= last)
+ client_ranges.erase(p++);
+ else
+ p++;
}
}
void encode(bufferlist &bl) const {
- __u8 v = 2;
+ __u8 v = 3;
::encode(v, bl);
::encode(ino, bl);
::decode(mtime, p);
::decode(atime, p);
::decode(time_warp_seq, p);
- ::decode(client_ranges, p);
+ if (v >= 3) {
+ ::decode(client_ranges, p);
+ } else {
+ map<client_t, byte_range_t> m;
+ ::decode(m, p);
+ for (map<client_t, byte_range_t>::iterator q = m.begin(); q != m.end(); q++)
+ client_ranges[q->first].range = q->second;
+ }
::decode(dirstat, p);
::decode(rstat, p);
return cached_seq;
}
+ snapid_t get_snap_following(snapid_t follows) {
+ check_cache();
+ set<snapid_t> s = get_snaps();
+ set<snapid_t>::iterator p = s.upper_bound(follows);
+ if (p != s.end())
+ return *p;
+ return CEPH_NOSNAP;
+ }
+
void adjust_parent();
void split_at(SnapRealm *child);