if (st->cap.caps) {
if (in->snapid == CEPH_NOSNAP)
- add_update_cap(in, mds, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm));
+ add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm));
else {
in->snap_caps |= st->cap.caps;
}
MClientCaps *m = new MClientCaps(op,
in->ino(),
0,
- cap->seq,
+ cap->cap_id, cap->seq,
cap->issued,
want,
cap->flushing,
<< " on " << *in << dendl;
if (p->second.dirty_data || p->second.writing)
continue;
- MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), in->snaprealm->ino, mseq);
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), in->snaprealm->ino, 0, mseq);
m->head.snap_follows = p->first;
m->head.size = p->second.size;
m->head.caps = p->second.issued;
* handle caps update from mds. including mds to mds caps transitions.
* do not block.
*/
-void Client::add_update_cap(Inode *in, int mds,
+void Client::add_update_cap(Inode *in, int mds, __u64 cap_id,
unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm)
{
InodeCap *cap = 0;
}
unsigned old_caps = cap->issued;
+ cap->cap_id = cap_id;
cap->issued |= issued;
cap->implemented |= issued;
cap->seq = seq;
// add/update it
update_snap_trace(m->snapbl);
- add_update_cap(in, mds,
+ add_update_cap(in, mds, m->get_cap_id(),
m->get_caps(), m->get_seq(), m->get_mseq(), m->get_realm());
if (m->get_mseq() > in->exporting_mseq) {
}
struct InodeCap {
+ __u64 cap_id;
unsigned issued;
unsigned implemented;
unsigned wanted; // as known to mds.
void release_lease(Inode *in, Dentry *dn, int mask);
// file caps
- void add_update_cap(Inode *in, int mds,
+ void add_update_cap(Inode *in, int mds, __u64 cap_id,
unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm);
void remove_cap(Inode *in, int mds);
void remove_all_caps(Inode *in);
#define CEPH_MDS_PROTOCOL 5 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 4 /* public/client */
-#define CEPH_MDSC_PROTOCOL 5 /* public/client */
+#define CEPH_MDSC_PROTOCOL 6 /* public/client */
#define CEPH_MONC_PROTOCOL 6 /* public/client */
struct ceph_mds_reply_cap {
__le32 caps, wanted;
+ __le64 cap_id;
__le32 seq, mseq;
__le64 realm;
__le32 ttl_ms; /* ttl, in ms. if readonly and unwanted. */
struct ceph_mds_caps {
__le32 op;
__le64 ino, realm;
+ __le64 cap_id;
__le32 seq;
__le32 caps, wanted, dirty;
__le32 migrate_seq;
/* client reconnect */
struct ceph_mds_cap_reconnect {
+ __le64 cap_id;
__le32 wanted;
__le32 issued;
__le64 size;
* @fmode can be negative, in which case it is ignored.
*/
int ceph_add_cap(struct inode *inode,
- struct ceph_mds_session *session,
+ struct ceph_mds_session *session, u64 cap_id,
int fmode, unsigned issued, unsigned wanted,
unsigned seq, unsigned mseq, u64 realmino,
unsigned ttl_ms, unsigned long ttl_from,
int mds = session->s_mds;
int is_first = 0;
- dout(10, "add_cap on %p mds%d cap %s seq %d\n", inode,
- session->s_mds, ceph_cap_string(issued), seq);
+ dout(10, "add_cap %p mds%d cap %llx %s seq %d\n", inode,
+ session->s_mds, cap_id, ceph_cap_string(issued), seq);
retry:
spin_lock(&inode->i_lock);
cap = __get_cap_for_mds(inode, mds);
dout(10, "add_cap inode %p (%llx.%llx) cap %s now %s seq %d mds%d\n",
inode, ceph_vinop(inode), ceph_cap_string(issued),
ceph_cap_string(issued|cap->issued), seq, mds);
+ cap->cap_id = cap_id;
cap->issued = issued;
cap->implemented |= issued;
cap->seq = seq;
*
* Caller should be holding s_mutex.
*/
-static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
+static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, u64 cid, int op,
int caps, int wanted, int dirty, u64 seq, u64 mseq,
u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
- dout(10, "send_cap_msg %s %llx caps %s wanted %s dirty %s seq %llu/%llu"
- " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
- ceph_cap_string(caps), ceph_cap_string(wanted),
+ dout(10, "send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
+ " seq %llu/%llu follows %lld size %llu\n", ceph_cap_op_name(op),
+ cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
ceph_cap_string(dirty),
seq, mseq, follows, size);
memset(fc, 0, sizeof(*fc));
+ fc->cap_id = cpu_to_le64(cid);
fc->op = cpu_to_le32(op);
fc->seq = cpu_to_le32(seq);
fc->migrate_seq = cpu_to_le32(mseq);
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
+ u64 cap_id = cap->cap_id;
int held = cap->issued | cap->implemented;
int revoking = cap->implemented & ~cap->issued;
int dropping = cap->issued & ~retain;
gid_t gid;
int dirty;
int flushing;
- int last_cap = 0;
+ int last_cap = 0;
dout(10, "__send_cap cap %p session %p %s -> %s (revoking %s)\n",
cap, cap->session,
invalidate_mapping_pages(&inode->i_data, 0, -1);
}
- send_cap_msg(mdsc, ceph_vino(inode).ino,
+ send_cap_msg(mdsc, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode,
dout(10, "flush_snaps %p cap_snap %p follows %lld size %llu\n",
inode, capsnap, next_follows, capsnap->size);
- send_cap_msg(mdsc, ceph_vino(inode).ino,
+ send_cap_msg(mdsc, ceph_vino(inode).ino, 0,
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
capsnap->dirty, 0, mseq,
capsnap->size, 0,
unsigned mseq = le32_to_cpu(im->migrate_seq);
u64 realmino = le64_to_cpu(im->realm);
unsigned long ttl_ms = le32_to_cpu(im->ttl_ms);
+ u64 cap_id = le64_to_cpu(im->cap_id);
if (ci->i_cap_exporting_mds >= 0 &&
ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) {
ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
false);
downgrade_write(&mdsc->snap_rwsem);
- ceph_add_cap(inode, session, -1, issued, wanted, seq, mseq, realmino,
+ ceph_add_cap(inode, session, cap_id, -1,
+ issued, wanted, seq, mseq, realmino,
ttl_ms, jiffies - ttl_ms/2, NULL);
up_read(&mdsc->snap_rwsem);
}
int op;
u32 seq;
struct ceph_vino vino;
+ u64 cap_id;
u64 size, max_size;
int check_caps = 0;
void *xattr_data = NULL;
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
+ cap_id = le64_to_cpu(h->cap_id);
seq = le32_to_cpu(h->seq);
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
return;
release:
- send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
+ send_cap_msg(mdsc, vino.ino, cap_id, CEPH_CAP_OP_RELEASE,
0, 0, 0,
seq, 0,
size, 0, NULL, NULL, 0,
/* were we issued a capability? */
if (info->cap.caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) {
- ceph_add_cap(inode, session, cap_fmode,
+ ceph_add_cap(inode, session,
+ le64_to_cpu(info->cap.cap_id),
+ cap_fmode,
le32_to_cpu(info->cap.caps),
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
struct list_head session_caps; /* per-session caplist */
struct list_head session_rdcaps; /* per-session rdonly caps */
int mds;
+ u64 cap_id;
int issued; /* latest, from the mds */
int implemented; /* what we've implemented (for tracking revocation) */
int flushing; /* dirty fields being written back to mds */
extern void ceph_handle_caps(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern int ceph_add_cap(struct inode *inode,
- struct ceph_mds_session *session,
+ struct ceph_mds_session *session, u64 cap_id,
int fmode, unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino,
unsigned ttl_ms, unsigned long ttl_from,
}
+Capability *CInode::add_client_cap(int client, Session *session,
+ xlist<Capability*> *rdcaps_list, SnapRealm *conrealm)
+{
+ if (client_caps.empty()) {
+ get(PIN_CAPS);
+ if (conrealm)
+ containing_realm = conrealm;
+ else
+ containing_realm = find_snaprealm();
+ containing_realm->inodes_with_caps.push_back(&xlist_caps);
+ }
+
+ assert(client_caps.count(client) == 0);
+ Capability *cap = client_caps[client] = new Capability(this, ++mdcache->last_cap_id, client, rdcaps_list);
+ if (session)
+ session->add_cap(cap);
+
+ cap->client_follows = first-1;
+
+ containing_realm->add_cap(client, cap);
+
+ return cap;
+}
+
+
+
+
+
version_t CInode::pre_dirty()
{
assert(parent || projected_parent.size());
cap->touch(); // move to back of session cap LRU
e.cap.caps = issue;
e.cap.wanted = cap->wanted();
+ e.cap.cap_id = cap->get_cap_id();
e.cap.seq = cap->get_last_seq();
dout(10) << "encode_inodestat issueing " << ccap_string(issue) << " seq " << cap->get_last_seq() << dendl;
e.cap.mseq = cap->get_mseq();
if (c) return c->pending();
return 0;
}
- Capability *add_client_cap(int client, Session *session, xlist<Capability*> *rdcaps_list, SnapRealm *conrealm=0) {
- if (client_caps.empty()) {
- get(PIN_CAPS);
- if (conrealm)
- containing_realm = conrealm;
- else
- containing_realm = find_snaprealm();
- containing_realm->inodes_with_caps.push_back(&xlist_caps);
- }
-
- assert(client_caps.count(client) == 0);
- Capability *cap = client_caps[client] = new Capability(this, client, rdcaps_list);
- if (session)
- session->add_cap(cap);
-
- cap->client_follows = first-1;
-
- containing_realm->add_cap(client, cap);
-
- return cap;
- }
+ Capability *add_client_cap(int client, Session *session,
+ xlist<Capability*> *rdcaps_list, SnapRealm *conrealm=0);
void remove_client_cap(int client) {
assert(client_caps.count(client) == 1);
Capability *cap = client_caps[client];
CInode *inode;
int client;
+ __u64 cap_id;
+
__u32 _wanted; // what the client wants (ideally)
utime_t last_issue_stamp;
xlist<Capability*>::item snaprealm_caps_item;
- Capability(CInode *i, int c, xlist<Capability*> *rl) :
+ Capability(CInode *i, __u64 id, int c, xlist<Capability*> *rl) :
inode(i), client(c),
+ cap_id(id),
_wanted(0),
_pending(0), _issued(0), _num_revoke(0),
last_sent(0),
void set_last_issue_stamp(utime_t t) { last_issue_stamp = t; }
+ __u64 get_cap_id() { return cap_id; }
+
//ceph_seq_t get_last_issue() { return last_issue; }
bool is_suppress() { return suppress > 0; }
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
in->ino(),
in->find_snaprealm()->inode->ino(),
- cap->get_last_seq(),
+ cap->get_cap_id(), cap->get_last_seq(),
after, wanted, 0,
cap->get_mseq());
in->encode_cap_message(m, cap);
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
in->ino(),
in->find_snaprealm()->inode->ino(),
- cap->get_last_seq(),
+ cap->get_cap_id(), cap->get_last_seq(),
cap->pending(), cap->wanted(), 0,
cap->get_mseq());
in->encode_cap_message(m, cap);
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
in->ino(),
in->find_snaprealm()->inode->ino(),
- cap->get_last_seq(),
+ cap->get_cap_id(), cap->get_last_seq(),
cap->pending(), cap->wanted(), 0,
cap->get_mseq());
in->encode_cap_message(m, cap);
// case we get a dup response, so whatever.)
MClientCaps *ack = 0;
if (m->get_dirty()) {
- ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, m->get_dirty(), 0);
+ ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0);
ack->set_snap_follows(follows);
}
if (!_do_cap_update(in, cap, m->get_dirty(), 0, follows, m, ack)) {
// for this and all subsequent versions of this inode,
while (1) {
- // filter wanted based on what we could ever give out (given auth/replica status)
- int wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
- cap->confirm_receipt(m->get_seq(), m->get_caps());
- dout(10) << " follows " << follows
- << " retains " << ccap_string(m->get_caps())
- << " dirty " << ccap_string(m->get_caps())
- << " on " << *in << dendl;
-
- MClientCaps *ack = 0;
- ceph_seq_t releasecap = 0;
-
- if (m->get_dirty() && in->is_auth()) {
- dout(7) << " flush client" << client << " dirty " << ccap_string(m->get_dirty())
- << " seq " << m->get_seq() << " on " << *in << dendl;
- ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, m->get_seq(),
- m->get_caps(), 0, m->get_dirty(), 0);
- }
- if (m->get_caps() == 0) {
+ if (cap->get_cap_id() != m->get_cap_id()) {
+ dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
+ } else {
+ // filter wanted based on what we could ever give out (given auth/replica status)
+ int wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
+ cap->confirm_receipt(m->get_seq(), m->get_caps());
+ dout(10) << " follows " << follows
+ << " retains " << ccap_string(m->get_caps())
+ << " dirty " << ccap_string(m->get_caps())
+ << " on " << *in << dendl;
- assert(ceph_seq_cmp(m->get_seq(), cap->get_last_sent()) <= 0);
- if (m->get_seq() == cap->get_last_sent()) {
- dout(7) << " releasing request client" << client << " seq " << m->get_seq() << " on " << *in << dendl;
- releasecap = m->get_seq();
- } else {
- dout(7) << " NOT releasing request client" << client << " seq " << m->get_seq()
- << " < last_sent " << cap->get_last_sent() << " on " << *in << dendl;
+ MClientCaps *ack = 0;
+ ceph_seq_t releasecap = 0;
+
+ if (m->get_dirty() && in->is_auth()) {
+ dout(7) << " flush client" << client << " dirty " << ccap_string(m->get_dirty())
+ << " seq " << m->get_seq() << " on " << *in << dendl;
+ ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
+ m->get_caps(), 0, m->get_dirty(), 0);
+ }
+ if (m->get_caps() == 0) {
+ assert(ceph_seq_cmp(m->get_seq(), cap->get_last_sent()) <= 0);
+ if (m->get_seq() == cap->get_last_sent()) {
+ dout(7) << " releasing request client" << client << " seq " << m->get_seq() << " on " << *in << dendl;
+ releasecap = m->get_seq();
+ } else {
+ dout(7) << " NOT releasing request client" << client << " seq " << m->get_seq()
+ << " < last_sent " << cap->get_last_sent() << " on " << *in << dendl;
+ }
+ }
+ if (wanted != cap->wanted()) {
+ dout(10) << " wanted " << ccap_string(cap->wanted())
+ << " -> " << ccap_string(wanted) << dendl;
+ cap->set_wanted(wanted);
}
- }
- if (wanted != cap->wanted()) {
- dout(10) << " wanted " << ccap_string(cap->wanted())
- << " -> " << ccap_string(wanted) << dendl;
- cap->set_wanted(wanted);
- }
-
- if (!_do_cap_update(in, cap, m->get_dirty(), m->get_wanted(), follows, m, ack, releasecap)) {
- // no update, ack now.
- if (releasecap)
- _finish_release_cap(in, client, releasecap, ack);
- else if (ack)
- mds->send_message_client(ack, client);
- eval_cap_gather(in);
- if (in->filelock.is_stable())
- file_eval(&in->filelock);
+ if (!_do_cap_update(in, cap, m->get_dirty(), m->get_wanted(), follows, m, ack, releasecap)) {
+ // no update, ack now.
+ if (releasecap)
+ _finish_release_cap(in, client, releasecap, ack);
+ else if (ack)
+ mds->send_message_client(ack, client);
+
+ eval_cap_gather(in);
+ if (in->filelock.is_stable())
+ file_eval(&in->filelock);
+ }
}
-
+
// done?
if (in->last == CEPH_NOSNAP)
break;
did_shutdown_log_cap = false;
+ last_cap_id = 0;
+
client_lease_durations[0] = 5.0;
client_lease_durations[1] = 30.0;
client_lease_durations[2] = 300.0;
MClientCaps *reap = new MClientCaps(CEPH_CAP_OP_IMPORT,
in->ino(),
realm->inode->ino(),
- cap->get_last_seq(),
+ cap->get_cap_id(), cap->get_last_seq(),
cap->pending(), cap->wanted(), 0,
cap->get_mseq());
in->encode_cap_message(reap, cap);
void trim_client_leases();
// -- client caps --
+ __u64 last_cap_id;
xlist<Capability*> client_rdcaps;
void trim_client_rdcaps();
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT,
in->ino(),
in->find_snaprealm()->inode->ino(),
- cap->get_last_seq(),
+ cap->get_cap_id(), cap->get_last_seq(),
cap->pending(), cap->wanted(), 0,
cap->get_mseq());
mds->send_message_client(m, it->first);
for (map<inodeno_t, cap_reconnect_t>::iterator p = m->caps.begin();
p != m->caps.end();
++p) {
+ // make sure our last_cap_id is MAX over all issued caps
+ if (p->second.capinfo.cap_id > mdcache->last_cap_id)
+ mdcache->last_cap_id = p->second.capinfo.cap_id;
+
CInode *in = mdcache->get_inode(p->first);
if (in && in->is_auth()) {
// we recovered it, and it's ours. take note.
inode_t fake_inode;
memset(&fake_inode, 0, sizeof(fake_inode));
fake_inode.ino = p->first;
- MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0);
+ MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0);
//stale->head.migrate_seq = 0; // FIXME ******
mds->send_message_client(stale, m->get_source_inst());
inodeno_t get_ino() { return inodeno_t(head.ino); }
inodeno_t get_realm() { return inodeno_t(head.realm); }
+ __u64 get_cap_id() { return head.cap_id; }
__u64 get_size() { return head.size; }
__u64 get_max_size() { return head.max_size; }
MClientCaps(int op,
inodeno_t ino,
inodeno_t realm,
+ __u64 id,
long seq,
int caps,
int wanted,
head.op = op;
head.ino = ino;
head.realm = realm;
+ head.cap_id = id;
head.seq = seq;
head.caps = caps;
head.wanted = wanted;
}
MClientCaps(int op,
inodeno_t ino, inodeno_t realm,
- int mseq) :
+ __u64 id, int mseq) :
Message(CEPH_MSG_CLIENT_CAPS) {
memset(&head, 0, sizeof(head));
head.op = op;
head.ino = ino;
head.realm = realm;
+ head.cap_id = id;
head.migrate_seq = mseq;
}
void print(ostream& out) {
out << "client_caps(" << ceph_cap_op_name(head.op)
<< " ino " << inodeno_t(head.ino)
+ << " " << head.cap_id
<< " seq " << head.seq
<< " caps=" << ccap_string(head.caps)
<< " dirty=" << ccap_string(head.dirty)