From 1c722bbcac5a344fa0fefd93b3f22305f58070a4 Mon Sep 17 00:00:00 2001 From: Matt Benjamin Date: Mon, 14 Dec 2015 22:10:29 -0500 Subject: [PATCH] librgw: wire up more of continuation Signed-off-by: Matt Benjamin --- src/rgw/librgw.cc | 10 +++++--- src/rgw/rgw_file.cc | 61 ++++++++++++++++++++++++++++++--------------- src/rgw/rgw_file.h | 24 +++++++++++++++--- src/rgw/rgw_lib.h | 12 +++++++-- 4 files changed, 78 insertions(+), 29 deletions(-) diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index dd64df16ddc5c..3fb7b88dcc2bf 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -271,9 +271,10 @@ done: return (ret < 0 ? ret : s->err.ret); } /* process_request */ -int start_continued_request(RGWLibContinuedReq* req) +int RGWLibProcess::start_request(RGWLibContinuedReq* req) { int ret = 0; + int op_ret = 0; dout(1) << "====== " << __func__ << " starting new continued request req=" << hex << req << dec @@ -366,12 +367,15 @@ int start_continued_request(RGWLibContinuedReq* req) op->pre_exec(); req->exec_start(); + op_ret = op->get_ret(); + done: - return ret; + return (ret < 0 ? ret : s->err.ret); } -int finish_continued_request(RGWLibContinuedReq* req) +int RGWLibProcess::finish_request(RGWLibContinuedReq* req) { + return 0; } int RGWLibFrontend::init() diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index 24755a93095d5..0ee06383b20a4 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -100,6 +100,43 @@ LookupFHResult RGWLibFS::stat_leaf(RGWFileHandle* parent, return fhr; } /* RGWLibFS::stat_leaf */ +int RGWFileHandle::write(uint64_t off, size_t len, size_t *bytes_written, + void *buffer) +{ + using std::get; + lock_guard guard(mtx); + + int rc = 0; + buffer::list bl; + bl.push_back( + buffer::create_static(len, static_cast(buffer))); + + file* f = get(&variant_type); + if (! f->write_req) { + /* start */ + std::string object_name = full_object_name(); + f->write_req = + new RGWWriteRequest(fs->get_context(), fs->get_user(), bucket_name(), + object_name); + rc = librgw.get_fe()->start_req(f->write_req); + } + + f->write_req->put_data(off, bl); + rc = f->write_req->exec_continue(); + + size_t min_size = off + len; + if (min_size > get_size()) + set_size(min_size); + + *bytes_written = (rc == 0) ? len : 0; + return rc; +} /* RGWFileHandle::write */ + +RGWFileHandle::file::~file() +{ + delete write_req; +} + /* librgw */ extern "C" { @@ -633,28 +670,12 @@ int rgw_write(struct rgw_fs *rgw_fs, RGWFileHandle* rgw_fh = get_rgwfh(fh); if (! rgw_fh->is_file()) - return -EINVAL; - - /* XXXX testing only */ - buffer::list bl; - bl.push_back( - buffer::create_static(length /* XXX size */, static_cast(buffer))); - - /* XXX */ - std::string oname = rgw_fh->full_object_name(); - RGWPutObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(), - oname, bl); - - int rc = librgw.get_fe()->execute_req(&req); - - /* XXX move into request */ - size_t min_size = offset+length; - if (min_size > rgw_fh->get_size()) - rgw_fh->set_size(min_size); + return -EISDIR; - *bytes_written = (rc == 0) ? req.bytes_written : 0; + if (! rgw_fh->is_open()) + return -EPERM; - return rc; + return rgw_fh->write(offset, length, bytes_written, buffer); } /* diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index d0e477876fb12..0d5858590fc32 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -47,6 +47,7 @@ namespace rgw { class RGWLibFS; class RGWFileHandle; + class RGWWriteRequest; typedef boost::intrusive_ptr RGWFHRef; @@ -148,6 +149,9 @@ namespace rgw { } state; struct file { + RGWWriteRequest* write_req; + file() : write_req(nullptr) {} + ~file(); }; struct directory { @@ -357,6 +361,9 @@ namespace rgw { return EPERM; } + int write(uint64_t off, size_t len, size_t *nbytes, void *buffer); + int write_finish(); + void close() { lock_guard guard(mtx); flags &= ~FLAG_OPEN; @@ -1454,14 +1461,15 @@ class RGWWriteRequest : public RGWLibContinuedReq, public: const std::string& bucket_name; const std::string& obj_name; - buffer::list& bl; /* XXX */ + buffer::list bl; + off_t last_off; + off_t next_off; size_t bytes_written; RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, - const std::string& _bname, const std::string& _oname, - buffer::list& _bl) + const std::string& _bname, const std::string& _oname) : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname), - bl(_bl), bytes_written(0) { + last_off(0), next_off(0), bytes_written(0) { magic = 81; op = this; } @@ -1532,11 +1540,19 @@ public: return len; } + void put_data(off_t off, buffer::list& _bl) { + next_off = off; + bl.claim(_bl); + } + virtual int exec_start() { return 0; } virtual int exec_continue() { + if (next_off != last_off) + return -EIO; + /* XXX consume bl */ return 0; } diff --git a/src/rgw/rgw_lib.h b/src/rgw/rgw_lib.h index 3d570c64af923..54994707055bf 100644 --- a/src/rgw/rgw_lib.h +++ b/src/rgw/rgw_lib.h @@ -198,8 +198,8 @@ public: void set_access_key(RGWAccessKey& key) { access_key = key; } /* requests w/continue semantics */ - int start_continued_request(RGWLibContinuedReq* req); - int finish_continued_request(RGWLibContinuedReq* req); + int start_request(RGWLibContinuedReq* req); + int finish_request(RGWLibContinuedReq* req); }; /* RGWLibProcess */ class RGWLibFrontend : public RGWProcessFrontend { @@ -217,6 +217,14 @@ public: return static_cast(pprocess)->process_request(req); // !async } + inline int start_req(RGWLibContinuedReq* req) { + return static_cast(pprocess)->start_request(req); + } + + inline int finish_req(RGWLibContinuedReq* req) { + return static_cast(pprocess)->finish_request(req); + } + }; /* RGWLibFrontend */ #endif /* RGW_LIB_H */ -- 2.39.5