#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 13 /* public/client */
-#define CEPH_MDSC_PROTOCOL 20 /* public/client */
+#define CEPH_MDSC_PROTOCOL 21 /* public/client */
#define CEPH_MONC_PROTOCOL 12 /* public/client */
CEPH_SESSION_REQUEST_RENEWCAPS,
CEPH_SESSION_RENEWCAPS,
CEPH_SESSION_STALE,
+ CEPH_SESSION_TRIMCAPS,
};
static inline const char *ceph_session_op_name(int op)
case CEPH_SESSION_REQUEST_RENEWCAPS: return "request_renewcaps";
case CEPH_SESSION_RENEWCAPS: return "renewcaps";
case CEPH_SESSION_STALE: return "stale";
+ case CEPH_SESSION_TRIMCAPS: return "trimcaps";
default: return "???";
}
}
__le32 op;
__le64 seq;
struct ceph_timespec stamp;
+ __le32 max_caps;
} __attribute__ ((packed));
/* client_request */
return have;
}
+int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
+{
+ int have = ci->i_snap_caps;
+ struct ceph_cap *cap;
+ struct rb_node *p;
+
+ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ cap = rb_entry(p, struct ceph_cap, ci_node);
+ if (cap == ocap)
+ continue;
+ if (!__cap_is_valid(cap))
+ continue;
+ have |= cap->issued;
+ }
+ return have;
+}
+
static void __touch_cap(struct ceph_cap *cap)
{
struct ceph_mds_session *s = cap->session;
s->s_renew_requested = 0;
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
+ s->s_max_caps = 0;
atomic_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
return request_close_session(mdsc, session);
}
+/*
+ * Trim old(er) caps.
+ */
+static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
+{
+ struct ceph_mds_session *session = arg;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int used, oissued, mine;
+
+ if (session->s_nr_caps <= session->s_max_caps)
+ return -1;
+
+ spin_lock(&inode->i_lock);
+ mine = cap->issued | cap->implemented;
+ used = __ceph_caps_used(ci);
+ oissued = __ceph_caps_issued_other(ci, cap);
+
+ dout(20, "trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
+ inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
+ ceph_cap_string(used));
+ if ((used & ~oissued) & mine)
+ goto out; /* we need these caps */
+
+ /* try to drop referring dentries */
+ d_prune_aliases(inode);
+ dout(20, "trim_caps_cb %p cap %p pruned, count now %d\n",
+ inode, cap, atomic_read(&inode->i_count));
+
+out:
+ spin_unlock(&inode->i_lock);
+ return 0;
+}
+
+static int trim_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ int max_caps)
+{
+ dout(10, "trim_caps mds%d start: %d / %d caps\n", session->s_mds,
+ session->s_nr_caps, max_caps);
+ session->s_max_caps = max_caps;
+ iterate_session_caps(session, trim_caps_cb, session);
+ dout(10, "trim_caps mds%d done: %d / %d caps\n", session->s_mds,
+ session->s_nr_caps, max_caps);
+ return 0;
+}
+
/*
* Allocate cap_release messages. If there is a partially full message
* in the queue, try to allocate enough to cover it's remainder, so that
send_renew_caps(mdsc, session);
break;
+ case CEPH_SESSION_TRIMCAPS:
+ trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
+ break;
+
default:
derr(0, "bad session op %d from mds%d\n", op, mds);
WARN_ON(1);
unsigned long s_cap_ttl; /* when session caps expire */
unsigned long s_renew_requested; /* last time we sent a renew req */
struct list_head s_caps; /* all caps issued by this session */
- int s_nr_caps;
+ int s_nr_caps, s_max_caps;
atomic_t s_ref;
struct list_head s_waiting; /* waiting requests */
struct list_head s_unsafe; /* unsafe requests */
extern int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented);
extern int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int t);
+extern int __ceph_caps_issued_other(struct ceph_inode_info *ci,
+ struct ceph_cap *cap);
static inline int ceph_caps_issued(struct ceph_inode_info *ci)
{
int32_t op;
version_t seq; // used when requesting close, declaring stale
utime_t stamp;
+ int32_t max_caps;
MClientSession() : Message(CEPH_MSG_CLIENT_SESSION) { }
MClientSession(int o, version_t s=0) :
Message(CEPH_MSG_CLIENT_SESSION),
- op(o), seq(s) { }
+ op(o), seq(s), max_caps(0) { }
MClientSession(int o, utime_t st) :
Message(CEPH_MSG_CLIENT_SESSION),
- op(o), seq(0), stamp(st) { }
+ op(o), seq(0), stamp(st), max_caps(0) { }
const char *get_type_name() { return "client_session"; }
void print(ostream& out) {
out << "client_session(" << ceph_session_op_name(op);
if (seq) out << " seq " << seq;
+ if (op == CEPH_SESSION_TRIMCAPS)
+ out << " max_caps " << max_caps;
out << ")";
}
::decode(op, p);
::decode(seq, p);
::decode(stamp, p);
+ ::decode(max_caps, p);
}
void encode_payload() {
::encode(op, payload);
::encode(seq, payload);
::encode(stamp, payload);
+ ::encode(max_caps, payload);
}
};