From ce533f0d5d3624c1b179518c97bbc4c2883b6037 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 20 Aug 2008 15:20:36 -0700 Subject: [PATCH] client: delay snapflush until after data is flushed, sync writes complete --- src/client/Client.cc | 78 ++++++++++++++++++++++++++++---------- src/client/Client.h | 13 ++++--- src/mds/Locker.cc | 5 +-- src/mds/Server.cc | 2 +- src/messages/MClientCaps.h | 4 +- 5 files changed, 69 insertions(+), 33 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index f662a8d631101..dc7685bda0ba5 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1391,7 +1391,7 @@ void Client::put_cap_ref(Inode *in, int cap) snapid_t seq = in->cap_snap_pending; dout(10) << "put_cap_ref doing cap_snap_pending " << seq << " on " << *in << dendl; in->cap_snap_pending = 0; - in->queue_cap_snap(in, seq); + queue_cap_snap(in, seq); signal_cond_list(in->waitfor_caps); // wake up blocked sync writers } if (cap & CEPH_CAP_WRBUFFER) { @@ -1491,6 +1491,16 @@ void Client::check_caps(Inode *in, bool is_delayed) } } +struct C_SnapFlush : public Context { + Client *client; + Inode *in; + snapid_t seq; + C_SnapFlush(Client *c, Inode *i, snapid_t s) : client(c), in(i), seq(s) {} + void finish(int r) { + client->_flushed_cap_snap(in, seq); + } +}; + void Client::queue_cap_snap(Inode *in, snapid_t seq) { int used = in->caps_used(); @@ -1503,20 +1513,37 @@ void Client::queue_cap_snap(Inode *in, snapid_t seq) } else if (used & CEPH_CAP_WR) { dout(10) << "queue_cap_snap WR used, marking cap_snap_pending on " << *in << dendl; in->cap_snap_pending = seq; - } else { - if (used & CEPH_CAP_WRBUFFER) { - dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data" << dendl; - in->take_cap_snap(1, seq); - if (g_conf.client_oc) - _flush(in); - } else { - dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap" << dendl; - in->take_cap_snap(0, seq); + } else if (used & CEPH_CAP_WRBUFFER) { + dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data on " << *in << dendl; + in->take_cap_snap(true, seq); + in->get(); + if (g_conf.client_oc) + _flush(in, new C_SnapFlush(this, in, seq)); + else { + /* + * FIXME: in sync mode, we have no way to wait for prior writes to commit + * without getting starved. so don't wait, for now. + */ + dout(10) << "queue_cap_snap FIXME can't safely flush in sync mode, skipping" << dendl; + in->cap_snaps[seq].flushing = false; flush_snaps(in); } + } else { + dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap on " << *in << dendl; + in->take_cap_snap(false, seq); + in->get(); + flush_snaps(in); } } +void Client::_flushed_cap_snap(Inode *in, snapid_t seq) +{ + dout(10) << "_flushed_cap_snap seq " << seq << " on " << *in << dendl; + assert(in->cap_snaps.count(seq)); + in->cap_snaps[seq].flushing = 0; + flush_snaps(in); +} + void Client::flush_snaps(Inode *in) { dout(10) << "flush_snaps on " << *in << dendl; @@ -1524,21 +1551,27 @@ void Client::flush_snaps(Inode *in) // pick auth mds int mds = -1; + int mseq; for (map::iterator p = in->caps.begin(); p != in->caps.end(); p++) { if (p->second->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL)) { mds = p->first; + mseq = p->second->mseq; break; } } assert(mds >= 0); - for (map::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); p++) { + for (map::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); p++) { dout(10) << "flush_snaps mds" << mds << " follows " << p->first << " size " << p->second.size << " mtime " << p->second.mtime + << " still flushing=" << p->second.flushing << " on " << *in << dendl; - MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino()); + if (p->second.flushing) + continue; + MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), mseq); + m->head.caps = in->caps_issued(); m->head.snap_follows = p->first; m->head.size = p->second.size; p->second.mtime.encode_timeval(&m->head.mtime); @@ -1858,9 +1891,6 @@ void Client::handle_snap(MClientSnap *m) // queue for snap writeback queue_cap_snap(in); - if (g_conf.client_oc) - _flush(in); - if (in->snaprealm) { put_snap_realm(in->snaprealm); in->snaprealm_item.remove_myself(); @@ -2034,11 +2064,17 @@ void Client::handle_cap_flushedsnap(Inode *in, MClientCaps *m) assert(in->caps[mds]); snapid_t follows = m->get_snap_follows(); - dout(5) << "handle_cap_flushedsnap mds" << mds << " flushed snap follows " << follows - << " on " << *in << dendl; - assert(in->cap_snaps.count(follows)); - in->cap_snaps.erase(follows); - + if (in->cap_snaps.count(follows)) { + dout(5) << "handle_cap_flushedsnap mds" << mds << " flushed snap follows " << follows + << " on " << *in << dendl; + in->cap_snaps.erase(follows); + put_inode(in); + } else { + dout(5) << "handle_cap_flushedsnap DUP(?) mds" << mds << " flushed snap follows " << follows + << " on " << *in << dendl; + // we may not have it if we send multiple FLUSHSNAP requests and (get multiple FLUSHEDSNAPs back) + } + delete m; } @@ -3370,8 +3406,8 @@ int Client::_release(Fh *f) { //dout(3) << "op: client->close(open_files[ " << fh << " ]);" << dendl; //dout(3) << "op: open_files.erase( " << fh << " );" << dendl; - dout(5) << "_release " << f << " mode " << f->mode << dendl; Inode *in = f->inode; + dout(5) << "_release " << f << " mode " << f->mode << " on " << *in << dendl; if (in->snapid == CEPH_NOSNAP) { if (in->put_open_ref(f->mode)) { diff --git a/src/client/Client.h b/src/client/Client.h index 42caa352a4b7b..5ded3237a15d6 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -167,12 +167,12 @@ struct InodeCap { InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {} }; -struct SnapCap { +struct CapSnap { //snapid_t follows; // map key __u64 size; utime_t mtime; - int flushing; - SnapCap() : flushing(0) {} + bool flushing; + CapSnap() : flushing(false) {} }; @@ -200,7 +200,7 @@ class Inode { SnapRealm *snaprealm; xlist::item snaprealm_item; Inode *snapdir_parent; // only if we are a snapdir inode - map cap_snaps; // pending flush to mds + map cap_snaps; // pending flush to mds snapid_t cap_snap_pending; //int open_by_mode[CEPH_FILE_MODE_NUM]; @@ -331,7 +331,7 @@ class Inode { return want; } - void take_cap_snap(int flushing, snapid_t seq) { + void take_cap_snap(bool flushing, snapid_t seq) { assert(snaprealm); cap_snaps[seq].size = inode.size; cap_snaps[seq].mtime = inode.mtime; @@ -867,7 +867,8 @@ protected: void check_caps(Inode *in, bool is_delayed); void put_cap_ref(Inode *in, int cap); void flush_snaps(Inode *in); - void queue_cap_snap(Inode *in); + void queue_cap_snap(Inode *in, snapid_t seq=0); + void _flushed_cap_snap(Inode *in, snapid_t seq); void _release(Inode *in, bool checkafter=true); void _flush(Inode *in, Context *onfinish=NULL); diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 3263ae23143fc..bebbde997d389 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -978,9 +978,6 @@ void Locker::handle_client_caps(MClientCaps *m) if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) { dout(7) << " flushsnap follows " << follows << " client" << client << " on " << *in << dendl; - int had = cap->confirm_receipt(m->get_seq(), m->get_caps()); - int has = cap->confirmed(); - // this cap now follows a later snap (i.e. the one initiating this flush, or later) cap->client_follows = follows+1; @@ -993,7 +990,7 @@ void Locker::handle_client_caps(MClientCaps *m) MClientCaps *ack = new MClientCaps(CEPH_CAP_OP_FLUSHEDSNAP, in->inode, 0, 0, 0, 0, 0); ack->set_snap_follows(follows); - _do_cap_update(in, has|had, in->get_caps_wanted(), follows, m, ack); + _do_cap_update(in, m->get_caps(), in->get_caps_wanted(), follows, m, ack); } else { // for this and all subsequent versions of this inode, diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 615aaede9ae24..2839c3e0adf3d 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -426,7 +426,7 @@ void Server::handle_client_reconnect(MClientReconnect *m) inode_t fake_inode; memset(&fake_inode, 0, sizeof(fake_inode)); fake_inode.ino = p->first; - MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first); + MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0); //stale->head.migrate_seq = 0; // FIXME ****** mds->send_message_client(stale, m->get_source_inst()); diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h index 37ae1fb250f51..bb64cbf687d11 100644 --- a/src/messages/MClientCaps.h +++ b/src/messages/MClientCaps.h @@ -82,11 +82,13 @@ class MClientCaps : public Message { head.time_warp_seq = inode.time_warp_seq; } MClientCaps(int op, - inodeno_t ino) : + inodeno_t ino, + int mseq) : Message(CEPH_MSG_CLIENT_CAPS) { memset(&head, 0, sizeof(head)); head.op = op; head.ino = ino; + head.migrate_seq = mseq; } const char *get_type_name() { return "Cfcap";} -- 2.39.5