}
};
- set<event,
- member_hook<event, sh, &event::schedule_link>,
- constant_time_size<false>,
- compare<SchedCompare> > schedule;
+ typedef set<event,
+ member_hook<event, sh, &event::schedule_link>,
+ constant_time_size<false>,
+ compare<SchedCompare> > schedule_type;
- set<event,
- member_hook<event, sh, &event::event_link>,
- constant_time_size<false>,
- compare<EventCompare> > events;
+ schedule_type schedule;
+
+ typedef set<event,
+ member_hook<event, sh, &event::event_link>,
+ constant_time_size<false>,
+ compare<EventCompare> > event_set_type;
+
+ event_set_type events;
std::mutex lock;
using lock_guard = std::lock_guard<std::mutex>;
return e.id;
}
+ // Adjust the timeout of a currently-scheduled event (relative)
+ bool adjust_event(uint64_t id, typename TC::duration duration) {
+ return adjust_event(id, TC::now() + duration);
+ }
+
+ // Adjust the timeout of a currently-scheduled event (absolute)
+ bool adjust_event(uint64_t id, typename TC::time_point when) {
+ std::lock_guard<std::mutex> l(lock);
+
+ event key(id);
+ typename event_set_type::iterator it = events.find(key);
+
+ if (it == events.end())
+ return false;
+
+ event& e = *it;
+
+ schedule.erase(e);
+ e.t = when;
+ schedule.insert(e);
+
+ return true;
+ }
+
// Cancel an event. If the event has already come and gone (or you
// never submitted it) you will receive false. Otherwise you will
// receive true and it is guaranteed the event will not execute.
int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
uint32_t flags);
+/*
+ NFS commit operation
+*/
+
+#define RGW_COMMIT_FLAG_NONE 0x0000
+
+int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+ uint64_t offset, uint64_t length, uint32_t flags);
+
#ifdef __cplusplus
}
#endif
void RGWLibProcess::run()
{
+ /* start write timer */
+ RGWLibFS::write_timer.resume();
+
+ /* gc loop */
while (! shutdown) {
lsubdout(cct, rgw, 5) << "RGWLibProcess GC" << dendl;
unique_lock uniq(mtx);
atomic<uint32_t> RGWLibFS::fs_inst;
+ ceph::timer<ceph::mono_clock> RGWLibFS::write_timer{
+ ceph::construct_suspended};
+
LookupFHResult RGWLibFS::stat_bucket(RGWFileHandle* parent,
const char *path, uint32_t flags)
{
void *buffer)
{
using std::get;
+ using WriteCompletion = RGWLibFS::WriteCompletion;
lock_guard guard(mtx);
if (off != 0) {
lsubdout(fs->get_context(), rgw, 5)
<< __func__
- << object_name()
+ << " " << object_name()
<< " non-0 initial write position " << off
<< dendl;
return -EIO;
delete f->write_req;
f->write_req = nullptr;
return -EIO;
+ } else {
+ if (stateless_open()) {
+ /* start write timer */
+ f->write_req->timer_id =
+ RGWLibFS::write_timer.add_event(
+ std::chrono::seconds(10), WriteCompletion(*this));
+ }
}
}
size_t min_size = off + len;
if (min_size > get_size())
set_size(min_size);
+ if (stateless_open()) {
+ /* bump write timer */
+ RGWLibFS::write_timer.adjust_event(
+ f->write_req->timer_id, std::chrono::seconds(10));
+ }
} else {
/* continuation failed (e.g., non-contiguous write position) */
lsubdout(fs->get_context(), rgw, 5)
return rc;
} /* RGWFileHandle::write */
- int RGWFileHandle::close()
+ int RGWFileHandle::write_finish(uint32_t flags)
{
- lock_guard guard(mtx);
-
+ unique_lock guard{mtx, std::defer_lock};
int rc = 0;
+
+ if (! (flags & FLAG_LOCKED)) {
+ guard.lock();
+ }
+
file* f = get<file>(&variant_type);
if (f && (f->write_req)) {
rc = rgwlib.get_fe()->finish_req(f->write_req);
f->write_req = nullptr;
}
+ return rc;
+ } /* RGWFileHandle::write_finish */
+
+ int RGWFileHandle::close()
+ {
+ lock_guard guard(mtx);
+
+ int rc = write_finish(FLAG_LOCKED);
+
flags &= ~FLAG_OPEN;
return rc;
} /* RGWFileHandle::close */
return 0;
}
+int rgw_commit(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+ uint64_t offset, uint64_t length, uint32_t flags)
+{
+ RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+ return rgw_fh->commit(offset, length, RGWFileHandle::FLAG_NONE);
+}
+
} /* extern "C" */
#include "include/buffer.h"
#include "common/sstring.hh"
#include "common/cohort_lru.h"
+#include "common/ceph_timer.h"
#include "rgw_common.h"
#include "rgw_user.h"
#include "rgw_lib.h"
bool is_dir() const { return (fh.fh_type == RGW_FS_TYPE_DIRECTORY); }
bool creating() const { return flags & FLAG_CREATING; }
bool deleted() const { return flags & FLAG_DELETED; }
+ bool stateless_open() const { return flags & FLAG_STATELESS_OPEN; }
uint32_t open(uint32_t gsh_flags) {
lock_guard guard(mtx);
int readdir(rgw_readdir_cb rcb, void *cb_arg, uint64_t *offset, bool *eof,
uint32_t flags);
int write(uint64_t off, size_t len, size_t *nbytes, void *buffer);
- int write_finish();
+
+ int commit(uint64_t offset, uint64_t length, uint32_t flags) {
+ return 0;
+ }
+
+ int write_finish(uint32_t flags = FLAG_NONE);
int close();
void open_for_create() {
using event_vector = /* boost::small_vector<event, 16> */
std::vector<event>;
- struct state {
+ struct WriteCompletion
+ {
+ RGWFileHandle& rgw_fh;
+
+ WriteCompletion(RGWFileHandle& _fh) : rgw_fh(_fh) {
+ rgw_fh.get_fs()->ref(&rgw_fh);
+ }
+
+ void operator()() {
+ rgw_fh.write_finish();
+ rgw_fh.get_fs()->unref(&rgw_fh);
+ }
+ };
+
+ static ceph::timer<ceph::mono_clock> write_timer;
+
+ struct State {
std::mutex mtx;
std::atomic<uint32_t> flags;
std::deque<event> events;
- state() : flags(0) {}
+
+ State() : flags(0) {}
+
void push_event(const event& ev) {
lock_guard guard(mtx);
events.push_back(ev);
} state;
friend class RGWFileHandle;
+ friend class RGWLibProcess;
public:
RGWFileHandle* rgw_fh;
RGWPutObjProcessor *processor;
buffer::list data;
+ uint64_t timer_id;
MD5 hash;
off_t real_ofs;
size_t bytes_written;