From: Matt Benjamin Date: Tue, 16 Aug 2016 22:17:53 +0000 (-0400) Subject: rgw_file: implement stateless write completion timer X-Git-Tag: v10.2.4~98^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=697d4ef7837061bb6b5ebaad07ae86836cdbe126;p=ceph.git rgw_file: implement stateless write completion timer Implements a temporal mechanism to enforce write completion for setups which lack open state tracking (e.g., NFS3). Signed-off-by: Matt Benjamin (cherry picked from commit 2c83ed4bd43fcb5d5497151a157b1dc08022fed9) --- diff --git a/src/common/ceph_timer.h b/src/common/ceph_timer.h index ae95eb250ddb..97b0dc96865b 100644 --- a/src/common/ceph_timer.h +++ b/src/common/ceph_timer.h @@ -89,15 +89,19 @@ namespace ceph { } }; - set, - constant_time_size, - compare > schedule; + typedef set, + constant_time_size, + compare > schedule_type; - set, - constant_time_size, - compare > events; + schedule_type schedule; + + typedef set, + constant_time_size, + compare > event_set_type; + + event_set_type events; std::mutex lock; using lock_guard = std::lock_guard; @@ -231,6 +235,30 @@ namespace ceph { return e.id; } + // Adjust the timeout of a currently-scheduled event (relative) + bool adjust_event(uint64_t id, typename TC::duration duration) { + return adjust_event(id, TC::now() + duration); + } + + // Adjust the timeout of a currently-scheduled event (absolute) + bool adjust_event(uint64_t id, typename TC::time_point when) { + std::lock_guard l(lock); + + event key(id); + typename event_set_type::iterator it = events.find(key); + + if (it == events.end()) + return false; + + event& e = *it; + + schedule.erase(e); + e.t = when; + schedule.insert(e); + + return true; + } + // Cancel an event. If the event has already come and gone (or you // never submitted it) you will receive false. Otherwise you will // receive true and it is guaranteed the event will not execute. diff --git a/src/include/rados/rgw_file.h b/src/include/rados/rgw_file.h index 5fac51acba16..b8545be23e9c 100644 --- a/src/include/rados/rgw_file.h +++ b/src/include/rados/rgw_file.h @@ -299,6 +299,15 @@ int rgw_writev(struct rgw_fs *rgw_fs, int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh, uint32_t flags); +/* + NFS commit operation +*/ + +#define RGW_COMMIT_FLAG_NONE 0x0000 + +int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh, + uint64_t offset, uint64_t length, uint32_t flags); + #ifdef __cplusplus } #endif diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index c47612907542..c129a21430b7 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -81,6 +81,10 @@ namespace rgw { void RGWLibProcess::run() { + /* start write timer */ + RGWLibFS::write_timer.resume(); + + /* gc loop */ while (! shutdown) { lsubdout(cct, rgw, 5) << "RGWLibProcess GC" << dendl; unique_lock uniq(mtx); diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index 8491b7701323..a8a2b3d0156f 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -38,6 +38,9 @@ namespace rgw { atomic RGWLibFS::fs_inst; + ceph::timer RGWLibFS::write_timer{ + ceph::construct_suspended}; + LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent, const char *path, uint32_t flags) { @@ -755,6 +758,7 @@ namespace rgw { void *buffer) { using std::get; + using WriteCompletion = RGWLibFS::WriteCompletion; lock_guard guard(mtx); @@ -783,7 +787,7 @@ namespace rgw { if (off != 0) { lsubdout(fs->get_context(), rgw, 5) << __func__ - << object_name() + << " " << object_name() << " non-0 initial write position " << off << dendl; return -EIO; @@ -806,6 +810,13 @@ namespace rgw { delete f->write_req; f->write_req = nullptr; return -EIO; + } else { + if (stateless_open()) { + /* start write timer */ + f->write_req->timer_id = + RGWLibFS::write_timer.add_event( + std::chrono::seconds(10), WriteCompletion(*this)); + } } } @@ -826,6 +837,11 @@ namespace rgw { size_t min_size = off + len; if (min_size > get_size()) set_size(min_size); + if (stateless_open()) { + /* bump write timer */ + RGWLibFS::write_timer.adjust_event( + f->write_req->timer_id, std::chrono::seconds(10)); + } } else { /* continuation failed (e.g., non-contiguous write position) */ lsubdout(fs->get_context(), rgw, 5) @@ -844,11 +860,15 @@ namespace rgw { return rc; } /* RGWFileHandle::write */ - int RGWFileHandle::close() + int RGWFileHandle::write_finish(uint32_t flags) { - lock_guard guard(mtx); - + unique_lock guard{mtx, std::defer_lock}; int rc = 0; + + if (! (flags & FLAG_LOCKED)) { + guard.lock(); + } + file* f = get(&variant_type); if (f && (f->write_req)) { rc = rgwlib.get_fe()->finish_req(f->write_req); @@ -859,6 +879,15 @@ namespace rgw { f->write_req = nullptr; } + return rc; + } /* RGWFileHandle::write_finish */ + + int RGWFileHandle::close() + { + lock_guard guard(mtx); + + int rc = write_finish(FLAG_LOCKED); + flags &= ~FLAG_OPEN; return rc; } /* RGWFileHandle::close */ @@ -1530,4 +1559,12 @@ int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *handle, return 0; } +int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh, + uint64_t offset, uint64_t length, uint32_t flags) +{ + RGWFileHandle* rgw_fh = get_rgwfh(fh); + + return rgw_fh->commit(offset, length, RGWFileHandle::FLAG_NONE); +} + } /* extern "C" */ diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index 2d94952e877f..9b0fe98a46ea 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -28,6 +28,7 @@ #include "include/buffer.h" #include "common/sstring.hh" #include "common/cohort_lru.h" +#include "common/ceph_timer.h" #include "rgw_common.h" #include "rgw_user.h" #include "rgw_lib.h" @@ -492,6 +493,7 @@ namespace rgw { bool is_dir() const { return (fh.fh_type == RGW_FS_TYPE_DIRECTORY); } bool creating() const { return flags & FLAG_CREATING; } bool deleted() const { return flags & FLAG_DELETED; } + bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; } uint32_t open(uint32_t gsh_flags) { lock_guard guard(mtx); @@ -508,7 +510,12 @@ namespace rgw { int readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof, uint32_t flags); int write(uint64_t off, size_t len, size_t *nbytes, void *buffer); - int write_finish(); + + int commit(uint64_t offset, uint64_t length, uint32_t flags) { + return 0; + } + + int write_finish(uint32_t flags = FLAG_NONE); int close(); void open_for_create() { @@ -717,11 +724,29 @@ namespace rgw { using event_vector = /* boost::small_vector */ std::vector; - struct state { + struct WriteCompletion + { + RGWFileHandle& rgw_fh; + + WriteCompletion(RGWFileHandle& _fh) : rgw_fh(_fh) { + rgw_fh.get_fs()->ref(&rgw_fh); + } + + void operator()() { + rgw_fh.write_finish(); + rgw_fh.get_fs()->unref(&rgw_fh); + } + }; + + static ceph::timer write_timer; + + struct State { std::mutex mtx; std::atomic flags; std::deque events; - state() : flags(0) {} + + State() : flags(0) {} + void push_event(const event& ev) { lock_guard guard(mtx); events.push_back(ev); @@ -729,6 +754,7 @@ namespace rgw { } state; friend class RGWFileHandle; + friend class RGWLibProcess; public: @@ -1932,6 +1958,7 @@ public: RGWFileHandle* rgw_fh; RGWPutObjProcessor *processor; buffer::list data; + uint64_t timer_id; MD5 hash; off_t real_ofs; size_t bytes_written;