From: Yehuda Sadeh Date: Tue, 3 Oct 2017 00:26:57 +0000 (-0700) Subject: rgw: more streaming crf abstraction X-Git-Tag: v13.1.0~270^2~87 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=bbab9e5af0c3b53eb3824c26c785a23708b02072;p=ceph.git rgw: more streaming crf abstraction Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 626fdcbaf419d..e179e74fb3328 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -86,6 +86,17 @@ int RGWStreamWriteHTTPResourceCRF::init() return 0; } +bool RGWStreamReadHTTPResourceCRF::has_attrs() +{ + return got_attrs; +} + +void RGWStreamReadHTTPResourceCRF::get_attrs(std::map *attrs) +{ +#warning need to lock in_req->headers + *attrs = req->get_out_headers(); +} + int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) { reenter(&read_state) { @@ -94,6 +105,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool if (!in_cb->has_data()) { yield caller->io_block(0, req->get_io_id()); } + got_attrs = true; *io_pending = false; in_cb->claim_data(out, max_size); if (!req->is_done()) { @@ -104,6 +116,17 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool return 0; } +void RGWStreamWriteHTTPResourceCRF::set_attrs(const map& attrs) +{ + for (auto h : attrs) { + if (h.first == "CONTENT_LENGTH") { + req->set_send_length(atoi(h.second.c_str())); + } else { + req->append_header(h.first, h.second); + } + } +} + int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data) { #warning write need to throttle and block @@ -130,20 +153,14 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry) return 0; } -TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, - RGWHTTPStreamRWRequest *_in_req, - RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), - in_req(_in_req), out_req(_out_req) {} -TestSpliceCR::~TestSpliceCR() { - delete in_crf; - delete out_crf; -} +RGWStreamSpliceCR::RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWStreamReadHTTPResourceCRF *_in_crf, + RGWStreamWriteHTTPResourceCRF *_out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + in_crf(_in_crf), out_crf(_out_crf) {} +RGWStreamSpliceCR::~RGWStreamSpliceCR() { } -int TestSpliceCR::operate() { +int RGWStreamSpliceCR::operate() { reenter(this) { - in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req); - out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req); - { int ret = in_crf->init(); if (ret < 0) { @@ -169,25 +186,27 @@ int TestSpliceCR::operate() { } } while (need_retry); - dout(0) << "read " << bl.length() << " bytes" << dendl; + ldout(cct, 20) << "read " << bl.length() << " bytes" << dendl; if (bl.length() == 0) { break; } - if (total_read == 0) { -#warning need to lock in_req->headers - for (auto h : in_req->get_out_headers()) { - if (h.first == "CONTENT_LENGTH") { - out_req->set_send_length(atoi(h.second.c_str())); - } else { - out_req->append_header(h.first, h.second); - } - } + if (!in_crf->has_attrs()) { + /* shouldn't happen */ + ldout(cct, 0) << "ERROR: " << __func__ << ": can't handle !in_ctf->has_attrs" << dendl; + return set_cr_error(-EIO); + } + + if (!sent_attrs) { + map attrs; + in_crf->get_attrs(&attrs); + out_crf->set_attrs(attrs); int ret = out_crf->init(); if (ret < 0) { return set_cr_error(ret); } + sent_attrs = true; } total_read += bl.length(); @@ -204,7 +223,7 @@ int TestSpliceCR::operate() { return set_cr_error(ret); } - dout(0) << "wrote " << bl.length() << " bytes" << dendl; + ldout(cct, 20) << "wrote " << bl.length() << " bytes" << dendl; } while (true); do { @@ -220,3 +239,25 @@ int TestSpliceCR::operate() { } return 0; } + +TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWHTTPStreamRWRequest *_in_req, + RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), + in_req(_in_req), out_req(_out_req) {} +int TestSpliceCR::operate() { + reenter(this) { + in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req); + out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req); + + yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf)); + + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; +}; + diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index df7c93fbfe4a6..07296a0a36fe9 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -315,6 +315,8 @@ protected: public: virtual int init() = 0; virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ + virtual bool has_attrs() = 0; + virtual void get_attrs(std::map *attrs) = 0; }; class RGWStreamWriteResourceCRF { @@ -324,6 +326,7 @@ protected: public: virtual int init() = 0; + virtual void set_attrs(const std::map& attrs) = 0; virtual int write(bufferlist& data) = 0; /* reentrant */ virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ }; @@ -337,6 +340,7 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { RGWCRHTTPGetDataCB *in_cb{nullptr}; + bool got_attrs{false}; public: RGWStreamReadHTTPResourceCRF(CephContext *_cct, @@ -349,8 +353,10 @@ public: req(_req) {} virtual ~RGWStreamReadHTTPResourceCRF(); - int init(); + int init() override; int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ + bool has_attrs() override; + void get_attrs(std::map *pattrs) override; }; class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { @@ -371,28 +377,43 @@ public: req(_req) {} virtual ~RGWStreamWriteHTTPResourceCRF() {} - int init(); - int write(bufferlist& data); /* reentrant */ - int drain_writes(bool *need_retry); /* reentrant */ + int init() override; + void set_attrs(const std::map& attrs) override; + int write(bufferlist& data) override; /* reentrant */ + int drain_writes(bool *need_retry) override; /* reentrant */ }; -class TestSpliceCR : public RGWCoroutine { +class RGWStreamSpliceCR : public RGWCoroutine { CephContext *cct; RGWHTTPManager *http_manager; string url; - RGWHTTPStreamRWRequest *in_req{nullptr}; - RGWHTTPStreamRWRequest *out_req{nullptr}; RGWStreamReadHTTPResourceCRF *in_crf{nullptr}; RGWStreamWriteHTTPResourceCRF *out_crf{nullptr}; bufferlist bl; bool need_retry{false}; + bool sent_attrs{false}; uint64_t total_read{0}; int ret{0}; +public: + RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, + RGWStreamReadHTTPResourceCRF *_in_crf, + RGWStreamWriteHTTPResourceCRF *_out_crf); + ~RGWStreamSpliceCR(); + + int operate(); +}; + +class TestSpliceCR : public RGWCoroutine { + CephContext *cct; + RGWHTTPManager *http_manager; + RGWHTTPStreamRWRequest *in_req{nullptr}; + RGWHTTPStreamRWRequest *out_req{nullptr}; + RGWStreamReadHTTPResourceCRF *in_crf{nullptr}; + RGWStreamWriteHTTPResourceCRF *out_crf{nullptr}; public: TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_in_req, RGWHTTPStreamRWRequest *_out_req); - ~TestSpliceCR(); int operate(); };