int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
unsigned iovcnt, int64_t offset,
bool write, bool clamp_to_int,
- Context *onfinish, bufferlist *blp)
+ Context *onfinish, bufferlist *blp,
+ bool do_fsync, bool syncdataonly)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
}
if (write) {
- int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish);
+ int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish, do_fsync, syncdataonly);
ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl;
return w;
} else {
delete this;
}
+void Client::C_Write_Finisher::finish_fsync(int r)
+{
+ bool fini;
+ client_t const whoami = clnt->whoami; // For the benefit of ldout prefix
+
+ ldout(clnt->cct, 3) << "finish_fsync r = " << r << dendl;
+
+ fsync_finished = true;
+ fsync_r = r;
+ fini = try_complete();
+
+ if (fini)
+ delete this;
+}
+
bool Client::C_Write_Finisher::try_complete()
{
- if (onuninlinefinished && iofinished) {
+ client_t const whoami = clnt->whoami; // For the benefit of ldout prefix
+
+ ldout(clnt->cct, 19) << "C_Write_Finisher::try_complete this " << this
+ << " onuninlinefinished " << onuninlinefinished
+ << " iofinished " << iofinished
+ << " iofinished_r " << iofinished_r
+ << " fsync_finished " << fsync_finished
+ << dendl;
+
+ if (onuninlinefinished && iofinished && !fsync_finished && iofinished_r >= 0) {
+ // Done with I/O AND uninline, but we want to do fsync
+ CWF_fsync_finish *fsync_f = new CWF_fsync_finish(this);
+ C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(clnt, in, syncdataonly, fsync_f);
+
+ // Kick fsync off... and all will magically complete eventually...
+ ldout(clnt->cct, 19) << "kickoff fsync onfinish " << onfinish << dendl;
+ state->advance();
+ } else if (onuninlinefinished && iofinished) {
+ // Now we are REALLY done...
clnt->put_cap_ref(in, CEPH_CAP_FILE_WR);
- if (onuninlinefinished_r >= 0 || onuninlinefinished_r == -CEPHFS_ECANCELED)
- onfinish->complete(iofinished_r);
- else
+ if (fsync_r < 0) {
+ ldout(clnt->cct, 19) << " complete with fsync_r " << fsync_r << dendl;
+ onfinish->complete(fsync_r);
+ } else if (onuninlinefinished_r < 0 && onuninlinefinished_r != -CEPHFS_ECANCELED) {
+ ldout(clnt->cct, 19) << " complete with onuninlinefinished_r " << onuninlinefinished_r << dendl;
onfinish->complete(onuninlinefinished_r);
+ } else {
+ ldout(clnt->cct, 19) << " complete with iofinished_r " << iofinished_r << dendl;
+ onfinish->complete(iofinished_r);
+ }
+ onfinish = nullptr;
return true;
}
+
return false;
}
int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
- const struct iovec *iov, int iovcnt, Context *onfinish)
+ const struct iovec *iov, int iovcnt, Context *onfinish,
+ bool do_fsync, bool syncdataonly)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
cct->_conf->client_oc &&
(have & (CEPH_CAP_FILE_BUFFER |
CEPH_CAP_FILE_LAZYIO)),
- start, f, in, fpos, offset, size));
+ start, f, in, fpos, offset, size,
+ do_fsync, syncdataonly));
cwf_iofinish->CWF = cwf.get();
}
// do not get here if non-blocking caller (onfinish != nullptr)
r = _write_success(f, start, fpos, offset, size, in);
+ if (r >= 0 && do_fsync) {
+ int64_t r1;
+ client_lock.unlock();
+ r1 = _fsync(f, false);
+ if (r1 < 0)
+ r = r1;
+ client_lock.lock();
+ }
+
done:
// can not get here if non-blocking caller (onfinish != nullptr)
Context *advancer;
client_t const whoami = clnt->whoami; // For the benefit of ldout prefix
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance"
+ << " progress " << progress
+ << " flush_wait " << flush_wait
+ << " flush_completed " << flush_completed
+ << " result " << result
+ << " waitfor_safe " << waitfor_safe
+ << " onfinish " << onfinish
+ << dendl;
+
ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
switch (progress) {
case 0:
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 0" << dendl;
progress = 1;
if (clnt->cct->_conf->client_oc) {
// skip and fall through
case 1:
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 1" << dendl;
progress = 2;
if (waitfor_safe) {
// skip and fall through
case 2:
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case 2" << dendl;
if (flush_completed) {
// we waited for real reply above, now we have it... retrieve result
if (result != 0) {
// ERROR!
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - ERROR!" << dendl;
break;
}
if (flush_tid <= 0) {
// DONE!
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - DONE!" << dendl;
break;
}
progress = 3;
case 3:
+ case 4:
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - case " << progress << dendl;
+ ldout(clnt->cct, 15) << "in->flushing_cap_tids.empty() " << in->flushing_cap_tids.empty()
+ << " in->flushing_caps " << in->flushing_caps
+ << dendl;
// do equivalent of wait_sync_caps(in, flush_tid)
if (in->flushing_caps) {
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - flushing_caps" << dendl;
map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
ceph_assert(it != in->flushing_cap_tids.end());
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance"
+ << " it->first " << it->first
+ << " flush_tid " << flush_tid
+ << dendl;
if (it->first <= flush_tid) {
ldout(clnt->cct, 10) << __func__ << " on " << *in << " flushing "
<< ccap_string(it->second) << " flush_tid " << flush_tid
<< " last " << it->first << dendl;
advancer = new C_nonblocking_fsync_state_advancer(clnt, this);
- clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer);
+ ldout(clnt->cct, 10) << "Adding onfinish " << onfinish
+ << " for C_nonblocking_fsync_state " << this
+ << dendl;
+ if (progress == 3)
+ clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps, advancer);
+ else
+ clnt->add_nonblocking_onfinish_to_context_list(in->waitfor_caps_pending, advancer);
// ------------ here is a state machine break point
// the advancer completion will resume with case 3
+ progress = 4;
return;
}
// DONE!
+ ldout(clnt->cct, 15) << "Client::C_nonblocking_fsync_state::advance - DONE!" << dendl;
}
}
delete this;
}
+void Client::C_nonblocking_fsync_state::complete_flush(int r)
+{
+ client_t const whoami = clnt->whoami; // For the benefit of ldout prefix
+
+ ldout(clnt->cct, 15) << "complete_flush"
+ << " r " << r
+ << " progress " << progress
+ << dendl;
+
+ flush_completed = true;
+ result = r;
+ if (progress == 2)
+ advance();
+}
+
+void Client::C_nonblocking_fsync_state_advancer::finish(int r)
+{
+ client_t const whoami = clnt->whoami; // For the benefit of ldout prefix
+
+ ldout(clnt->cct, 15) << "C_nonblocking_fsync_state_advancer::finish"
+ << " r " << r
+ << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
+ state->advance();
+}
+
void Client::nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish)
{
C_nonblocking_fsync_state *state = new C_nonblocking_fsync_state(this, in, syncdataonly, onfinish);
int64_t Client::ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov,
int iovcnt, int64_t offset, bool write,
- Context *onfinish, bufferlist *bl)
+ Context *onfinish, bufferlist *bl,
+ bool do_fsync, bool syncdataonly)
{
RWRef_t mref_reader(mount_state, CLIENT_MOUNTING);
if (!mref_reader.is_state_satisfied())
std::scoped_lock cl(client_lock);
return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true,
- onfinish, bl);
+ onfinish, bl, do_fsync, syncdataonly);
}
int Client::ll_flush(Fh *fh)
int64_t ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov, int iovcnt,
int64_t offset, bool write,
Context *onfinish = nullptr,
- bufferlist *blp = nullptr);
+ bufferlist *blp = nullptr,
+ bool do_fsync = false, bool syncdataonly = false);
loff_t ll_lseek(Fh *fh, loff_t offset, int whence);
int ll_flush(Fh *fh);
int ll_fsync(Fh *fh, bool syncdataonly);
public:
void finish_io(int r);
void finish_onuninline(int r);
+ void finish_fsync(int r);
C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
bool is_file_write, utime_t start, Fh *f, Inode *in,
- uint64_t fpos, int64_t offset, uint64_t size)
+ uint64_t fpos, int64_t offset, uint64_t size,
+ bool do_fsync, bool syncdataonly)
: clnt(clnt), onfinish(onfinish),
is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
- offset(offset), size(size) {
+ offset(offset), size(size), syncdataonly(syncdataonly) {
iofinished_r = 0;
onuninlinefinished_r = 0;
+ fsync_r = 0;
iofinished = false;
onuninlinefinished = dont_need_uninline;
+ fsync_finished = !do_fsync;
}
void finish(int r) override {
uint64_t fpos;
int64_t offset;
uint64_t size;
+ bool syncdataonly;
int64_t iofinished_r;
int64_t onuninlinefinished_r;
+ int64_t fsync_r;
bool iofinished;
bool onuninlinefinished;
+ bool fsync_finished;
bool try_complete();
};
}
};
+ struct CWF_fsync_finish : public Context {
+ C_Write_Finisher *CWF;
+
+ CWF_fsync_finish(C_Write_Finisher *CWF)
+ : CWF(CWF) {}
+
+ void finish(int r) override {
+ CWF->finish_fsync(r);
+ }
+ };
+
struct C_nonblocking_fsync_state {
Client *clnt;
void advance();
- void complete_flush(int r) {
- flush_completed = true;
- result = r;
- if (progress == 2)
- advance();
- }
+ void complete_flush(int r);
};
struct C_nonblocking_fsync_state_advancer : Context {
: clnt(clnt), state(state) {
}
- void finish(int r) override {
- clnt->client_lock.lock();
- state->advance();
- clnt->client_lock.unlock();
- }
+ void finish(int r) override;
};
struct C_nonblocking_fsync_flush_finisher : Context {
}
void finish(int r) override {
- clnt->client_lock.lock();
+ ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
state->complete_flush(r);
- clnt->client_lock.unlock();
}
};
int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos,
int64_t offset, uint64_t size, Inode *in);
int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
- const struct iovec *iov, int iovcnt, Context *onfinish = nullptr);
+ const struct iovec *iov, int iovcnt, Context *onfinish = nullptr,
+ bool do_fsync = false, bool syncdataonly = false);
int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
unsigned iovcnt, int64_t offset,
bool write, bool clamp_to_int,
Context *onfinish = nullptr,
- bufferlist *blp = nullptr);
+ bufferlist *blp = nullptr,
+ bool do_fsync = false, bool syncdataonly = false);
int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
int64_t offset, bool write, Context *onfinish = nullptr,
bufferlist *blp = nullptr);