__le32 num_split_inos;
__le32 num_split_realms;
__le32 trace_len;
-};
+} __attribute__ ((packed));
/* followed by split inos, then split realms, then the trace blob */
/*
__le64 seq; /* snap: version */
__le32 num_snaps;
__le32 num_prior_parent_snaps;
-};
+} __attribute__ ((packed));
/* followed by my snaps, then prior parent snaps */
/*
}
spin_unlock(&inode->i_lock);
if (check)
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
}
/*
cap->issued = cap->implemented = 0;
cap->mds = mds;
cap->flags = 0;
+ cap->flushed_snap = 0;
cap->ci = ci;
list_add(&cap->ci_caps, &ci->i_caps);
* release, ack revoked caps to mds as appropriate.
* @is_delayed if caller just dropped a cap ref, and we probably want to delay
*/
-void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed)
+void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed, int flush_snap)
{
struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = &client->mdsc;
goto ack;
}
+ /* flush snap? */
+ if (flush_snap &&
+ (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))) {
+ if (cap->flushed_snap >=
+ ci->i_snaprealm->cached_context->seq) {
+ dout(10, "flushed_snap %llu >= seq %lld, "
+ "not flushing mds%d\n",
+ cap->flushed_snap,
+ ci->i_snaprealm->cached_context->seq,
+ cap->session->s_mds);
+ continue; /* already flushed for this snap */
+ }
+ goto ack;
+ }
+
if ((cap->issued & ~wanted) == 0)
continue; /* nothing extra, all good */
/* send_cap drops i_lock */
removed_last = __ceph_mdsc_send_cap(mdsc, session, cap,
- used, wanted, !is_delayed);
+ used, wanted, !is_delayed,
+ flush_snap);
if (removed_last)
goto out;
+ /* retake i_lock and restart our cap scan. */
goto retry;
}
if ((size << 1) >= ci->i_max_size &&
(ci->i_reported_size << 1) < ci->i_max_size) {
spin_unlock(&inode->i_lock);
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
} else
spin_unlock(&inode->i_lock);
}
spin_unlock(&ci->vfs_inode.i_lock);
if (last && ci->i_vino.snap == CEPH_NOSNAP)
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
}
spin_unlock(&inode->i_lock);
}
if (atomic_read(&ci->i_wrbuffer_ref) == 0)
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
return rc;
}
dout(10, "__do_pending_vmtruncate %p to %lld\n", inode, to);
vmtruncate(inode, to);
if (atomic_read(&ci->i_wrbuffer_ref) == 0)
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
} else
dout(10, "__do_pending_vmtruncate %p nothing to do\n", inode);
}
last ? "last":"");
if (last)
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
}
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
WARN_ON(v < 0);
if (was_last)
- ceph_check_caps(ci, 0);
+ ceph_check_caps(ci, 0, 0);
}
/* caps */
-static void send_cap_ack(struct ceph_mds_client *mdsc, __u64 ino, int caps,
- int wanted, __u32 seq, __u64 size, __u64 max_size,
+static void send_cap_ack(struct ceph_mds_client *mdsc, __u64 ino, int op,
+ int caps, int wanted, __u32 seq,
+ __u64 size, __u64 max_size,
struct timespec *mtime, struct timespec *atime,
u64 time_warp_seq, u64 follows, int mds)
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
- dout(10, "send_cap_ack %llx ca %d wa %d seq %u follows %lld sz %llu\n",
- ino, caps, wanted, (unsigned)seq, follows, size);
+ dout(10, "send_cap_ack %s %llx caps %d wanted %d seq %u follows %lld"
+ " size %llu\n", ceph_cap_op_name(op), ino, caps, wanted,
+ (unsigned)seq, follows, size);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, 0);
if (IS_ERR(msg))
dout(10, "handle_caps from mds%d\n", mds);
/* decode */
- if (msg->front.iov_len != sizeof(*h))
+ if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
op = le32_to_cpu(h->op);
if (!inode) {
dout(10, "i don't have ino %llx, sending release\n", vino.ino);
- send_cap_ack(mdsc, vino.ino, 0, 0, seq, size, 0, 0, 0, 0, 0, mds);
+ send_cap_ack(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
+ size, 0, 0, 0, 0, 0, mds);
goto no_inode;
}
return;
bad:
- dout(10, "corrupt caps message\n");
+ derr(10, "corrupt caps message\n");
return;
}
int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_inode_cap *cap,
- int used, int wanted, int cancel_work)
+ int used, int wanted,
+ int cancel_work, int flush_snap)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
struct timespec mtime, atime;
int removed_last = 0;
int wake = 0;
+ int op = CEPH_CAP_OP_ACK;
+
+ if (flush_snap)
+ op = CEPH_CAP_OP_FLUSHSNAP;
+ else if (wanted == 0)
+ op = CEPH_CAP_OP_RELEASE;
dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
cap->issued, cap->issued & wanted);
atime = inode->i_atime;
time_warp_seq = ci->i_time_warp_seq;
follows = ci->i_snaprealm->cached_context->seq;
- if (wanted == 0) {
+ if (wanted == 0 && !flush_snap) {
__ceph_remove_cap(cap);
removed_last = list_empty(&ci->i_caps);
if (removed_last && cancel_work)
__cap_delay_cancel(mdsc, ci);
}
+ if (flush_snap)
+ cap->flushed_snap = follows; /* so we only flush it once */
spin_unlock(&inode->i_lock);
if (dropping & CEPH_CAP_RDCACHE) {
}
send_cap_ack(mdsc, ceph_vino(inode).ino,
- keep, wanted, seq,
+ op, keep, wanted, seq,
size, max_size, &mtime, &atime, time_warp_seq,
follows, session->s_mds);
if (wake)
wake_up(&ci->i_cap_wq);
- if (wanted == 0)
+ if (wanted == 0 && !flush_snap)
iput(inode); /* removed cap */
return removed_last;
list_del_init(&ci->i_cap_delay_list);
spin_unlock(&mdsc->cap_delay_lock);
dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
- ceph_check_caps(ci, 1);
+ ceph_check_caps(ci, 1, 0);
iput(&ci->vfs_inode);
}
spin_unlock(&mdsc->cap_delay_lock);
used = wanted = 0;
}
- __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 1);
+ __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 1, 0);
+ }
+}
+
+
+/*
+ * snap
+ */
+
+void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg)
+{
+ struct super_block *sb = mdsc->client->sb;
+ struct ceph_client *client = ceph_sb_to_client(sb);
+ struct ceph_mds_session *session;
+ int mds = le32_to_cpu(msg->hdr.src.name.num);
+ u64 split;
+ int op;
+ int trace_len;
+ struct ceph_snaprealm *realm = 0;
+ void *p = msg->front.iov_base;
+ void *e = p + msg->front.iov_len;
+ struct ceph_mds_snap_head *h;
+ int num_split_inos, num_split_realms;
+ __le64 *split_inos = 0, *split_realms = 0;
+ int i;
+
+ /* decode */
+ if (msg->front.iov_len < sizeof(*h))
+ goto bad;
+ h = p;
+ op = le32_to_cpu(h->op);
+ split = le64_to_cpu(h->split);
+ trace_len = le32_to_cpu(h->trace_len);
+ num_split_inos = le32_to_cpu(h->num_split_inos);
+ num_split_realms = le32_to_cpu(h->num_split_realms);
+ p += sizeof(*h);
+
+ dout(10, "handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
+ ceph_snap_op_name(op), split, trace_len);
+
+ /* find session */
+ spin_lock(&mdsc->lock);
+ session = __get_session(&client->mdsc, mds);
+ spin_unlock(&mdsc->lock);
+ if (!session) {
+ dout(10, "WTF, got snap but no session for mds%d\n", mds);
+ return;
+ }
+
+ mutex_lock(&session->s_mutex);
+ session->s_seq++;
+ mutex_unlock(&session->s_mutex);
+
+ if (op == CEPH_SNAP_OP_SPLIT) {
+ struct ceph_mds_snap_realm *ri;
+
+ split_inos = p;
+ p += sizeof(u64) * num_split_inos;
+ split_realms = p;
+ p += sizeof(u64) * num_split_realms;
+ ceph_decode_need(&p, e, sizeof(*ri), bad);
+ ri = p;
+
+ realm = ceph_get_snaprealm(client, split);
+ if (IS_ERR(realm))
+ goto out;
+ dout(10, "splitting snaprealm %llx %p\n", realm->ino, realm);
+
+ for (i = 0; i < num_split_inos; i++) {
+ struct ceph_vino vino = {
+ .ino = le64_to_cpu(split_inos[i]),
+ .snap = CEPH_NOSNAP,
+ };
+ struct inode *inode = ceph_find_inode(sb, vino);
+ struct ceph_inode_info *ci;
+ if (!inode)
+ continue;
+ ci = ceph_inode(inode);
+ spin_lock(&inode->i_lock);
+ if (!ci->i_snaprealm)
+ goto skip_inode;
+ if (ci->i_snaprealm->created > le64_to_cpu(ri->created)) {
+ dout(15, " leaving %p in newer realm %llx %p\n",
+ inode, ci->i_snaprealm->ino,
+ ci->i_snaprealm);
+ goto skip_inode;
+ }
+ dout(15, " will move %p to split realm %llx %p\n",
+ inode, realm->ino, realm);
+ /*
+ * remove from list, but don't re-add yet. we
+ * don't want the caps to be flushed (again) by
+ * ceph_update_snap_trace below.
+ */
+ list_del_init(&ci->i_snaprealm_item);
+ spin_unlock(&inode->i_lock);
+
+ ceph_check_caps(ci, 0, 1);
+
+ iput(inode);
+ continue;
+
+ skip_inode:
+ spin_unlock(&inode->i_lock);
+ iput(inode);
+ }
+
+ for (i = 0; i < num_split_realms; i++) {
+ struct ceph_snaprealm *child =
+ ceph_get_snaprealm(client,
+ le64_to_cpu(split_realms[i]));
+ if (!child)
+ continue;
+ ceph_adjust_snaprealm_parent(client, child, realm->ino);
+ ceph_put_snaprealm(child);
+ }
+
+ ceph_put_snaprealm(realm);
+ }
+
+ realm = ceph_update_snap_trace(client, p, e,
+ op != CEPH_SNAP_OP_DESTROY);
+ if (IS_ERR(realm))
+ goto bad;
+
+ if (op == CEPH_SNAP_OP_SPLIT) {
+ for (i = 0; i < num_split_inos; i++) {
+ struct ceph_vino vino = {
+ .ino = le64_to_cpu(split_inos[i]),
+ .snap = CEPH_NOSNAP,
+ };
+ struct inode *inode = ceph_find_inode(sb, vino);
+ struct ceph_inode_info *ci;
+ if (!inode)
+ continue;
+ ci = ceph_inode(inode);
+ spin_lock(&inode->i_lock);
+ /* _now_ add to newly split realm */
+ ceph_put_snaprealm(ci->i_snaprealm);
+ list_add(&ci->i_snaprealm_item,
+ &realm->inodes_with_caps);
+ ci->i_snaprealm = realm;
+ realm->nref++;
+ spin_unlock(&inode->i_lock);
+ }
}
+
+ ceph_put_snaprealm(realm);
+ return;
+
+bad:
+ derr(10, "corrupt snap message from mds%d\n", mds);
+out:
+ return;
}
+
+
static int close_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
extern void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
+extern void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
extern void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_inode_cap *cap,
- int used, int wanted, int cancel_work);
+ int used, int wanted,
+ int cancel_work, int flush_snap);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
#endif
size_t size = sizeof(struct ceph_osd_request_head);
if (snapc)
- size += sizeof(u64) + snapc->num_snaps;
+ size += sizeof(u64) * snapc->num_snaps;
req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0);
if (IS_ERR(req))
return req;
return 0;
}
-int ceph_snaprealm_build_context(struct ceph_snaprealm *realm)
+int ceph_build_snap_context(struct ceph_snaprealm *realm)
{
struct ceph_snaprealm *parent = realm->parent;
struct ceph_snap_context *sc;
if (parent) {
if (!parent->cached_context) {
- err = ceph_snaprealm_build_context(parent);
+ err = ceph_build_snap_context(parent);
if (err)
goto fail;
}
num += parent->cached_context->num_snaps;
}
+ /* do i need to update? */
+ if (realm->cached_context && realm->cached_context->seq <= realm->seq &&
+ (!parent ||
+ realm->cached_context->seq <= parent->cached_context->seq)) {
+ dout(10, "build_snap_context %llx %p: %p seq %lld (%d snaps)"
+ " (unchanged)\n",
+ realm->ino, realm, realm->cached_context,
+ realm->cached_context->seq,
+ realm->cached_context->num_snaps);
+ return 0;
+ }
+
+ /* build new */
err = -ENOMEM;
sc = kzalloc(sizeof(*sc) + num*sizeof(u64), GFP_NOFS);
if (!sc)
sort(sc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
sc->num_snaps = num;
- dout(10, "snaprealm_build_context %llx %p : seq %lld %d snaps\n",
- realm->ino, realm, sc->seq, sc->num_snaps);
+ dout(10, "build_snap_context %llx %p: %p seq %lld (%d snaps)\n",
+ realm->ino, realm, sc, sc->seq, sc->num_snaps);
if (realm->cached_context)
ceph_put_snap_context(realm->cached_context);
ceph_put_snap_context(realm->cached_context);
realm->cached_context = 0;
}
- derr(0, "snaprealm_build_context %llx %p fail %d\n", realm->ino,
+ derr(0, "build_snap_context %llx %p fail %d\n", realm->ino,
realm, err);
return err;
}
struct ceph_snaprealm *child;
dout(10, "rebuild_snaprealms %llx %p\n", realm->ino, realm);
- ceph_snaprealm_build_context(realm);
+ ceph_build_snap_context(realm);
list_for_each(p, &realm->children) {
child = list_entry(p, struct ceph_snaprealm, child_item);
struct ceph_snaprealm *realm, *first = 0;
int invalidate = 0;
+ dout(10, "update_snap_trace must_flush=%d\n", must_flush);
more:
ceph_decode_need(&p, e, sizeof(*ri), bad);
ri = p;
}
if (le64_to_cpu(ri->seq) > realm->seq) {
+ struct list_head *p;
dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n",
realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
- // flush caps... and data?
+ list_for_each(p, &realm->inodes_with_caps) {
+ struct ceph_inode_info *ci =
+ list_entry(p, struct ceph_inode_info,
+ i_snaprealm_item);
+ ceph_check_caps(ci, 0, 1);
+ }
+ dout(20, "update_snap_trace cap flush done\n");
} else
dout(10, "update_snap_trace %llx %p seq %lld unchanged\n",
} else if (!realm->cached_context)
invalidate = 1;
+ dout(10, "done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
+ realm, invalidate, p, e);
+
if (p >= e && invalidate)
ceph_rebuild_snaprealms(realm);
err = -EINVAL;
fail:
derr(10, "update_snap_trace error %d\n", err);
- return 0;
+ return ERR_PTR(err);
}
case CEPH_MSG_CLIENT_REQUEST_FORWARD: return "client_request_forward";
case CEPH_MSG_CLIENT_REPLY: return "client_reply";
case CEPH_MSG_CLIENT_CAPS: return "client_caps";
+ case CEPH_MSG_CLIENT_SNAP: return "client_snap";
case CEPH_MSG_CLIENT_LEASE: return "client_lease";
case CEPH_MSG_OSD_GETMAP: return "osd_getmap";
case CEPH_MSG_OSD_MAP: return "osd_map";
case CEPH_MSG_CLIENT_CAPS:
ceph_mdsc_handle_caps(&client->mdsc, msg);
break;
+ case CEPH_MSG_CLIENT_SNAP:
+ ceph_mdsc_handle_snap(&client->mdsc, msg);
+ break;
case CEPH_MSG_CLIENT_LEASE:
ceph_mdsc_handle_lease(&client->mdsc, msg);
break;
int implemented; /* what we've implemneted (for tracking revocation) */
u32 seq, mseq, gen;
int flags; /* stale, etc.? */
+ u64 flushed_snap;
struct ceph_inode_info *ci;
struct list_head ci_caps; /* per-ci caplist */
struct ceph_mds_session *session;
static inline struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
{
+ /*
+ printk("get_snap_context %p %d -> %d\n", sc, atomic_read(&sc->nref),
+ atomic_read(&sc->nref)+1);
+ */
atomic_inc(&sc->nref);
return sc;
}
{
if (!sc)
return;
- if (atomic_dec_and_test(&sc->nref))
+ /*
+ printk("put_snap_context %p %d -> %d\n", sc, atomic_read(&sc->nref),
+ atomic_read(&sc->nref)-1);
+ */
+ if (atomic_dec_and_test(&sc->nref)) {
+ /*printk(" deleting snap_context %p\n", sc);*/
kfree(sc);
+ }
}
struct ceph_snaprealm {
extern struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
void *p, void *e,
int must_flush);
-extern int ceph_snaprealm_build_context(struct ceph_snaprealm *realm);
+extern int ceph_build_snap_context(struct ceph_snaprealm *realm);
extern void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm);
extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
-extern void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed);
+extern void ceph_check_caps(struct ceph_inode_info *ci, int delayed, int flush);
extern void ceph_inode_set_size(struct inode *inode, loff_t size);
extern void ceph_inode_writeback(struct work_struct *work);
extern void ceph_vmtruncate_work(struct work_struct *work);
void print(ostream& out) {
out << "client_snap(" << ceph_snap_op_name(head.op);
if (head.split)
- out << " split=" << head.split;
+ out << " split=" << inodeno_t(head.split);
+ out << " tracelen=" << bl.length();
out << ")";
}