From: Yehuda Sadeh Date: Mon, 2 Oct 2017 22:55:37 +0000 (-0700) Subject: rgw: separte stream crfs for read and write X-Git-Tag: v13.1.0~270^2~88 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d5debbb69e67825bec590c5d5524b5fd14bb5085;p=ceph.git rgw: separte stream crfs for read and write also add a base class Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 5a5c8e944f2c..626fdcbaf419 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -53,12 +53,12 @@ public: }; -RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF() +RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF() { delete in_cb; } -int RGWStreamRWHTTPResourceCRF::init() +int RGWStreamReadHTTPResourceCRF::init() { env->stack->init_new_io(req); @@ -74,7 +74,19 @@ int RGWStreamRWHTTPResourceCRF::init() return 0; } -int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) +int RGWStreamWriteHTTPResourceCRF::init() +{ + env->stack->init_new_io(req); + + int r = http_manager->add_request(req); + if (r < 0) { + return r; + } + + return 0; +} + +int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) { reenter(&read_state) { while (!req->is_done()) { @@ -92,7 +104,7 @@ int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *i return 0; } -int RGWStreamRWHTTPResourceCRF::write(bufferlist& data) +int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data) { #warning write need to throttle and block reenter(&write_state) { @@ -103,7 +115,7 @@ int RGWStreamRWHTTPResourceCRF::write(bufferlist& data) return 0; } -int RGWStreamRWHTTPResourceCRF::drain_writes(bool *need_retry) +int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry) { reenter(&drain_state) { *need_retry = true; @@ -118,74 +130,6 @@ int RGWStreamRWHTTPResourceCRF::drain_writes(bool *need_retry) return 0; } -TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), - req(_req) {} -TestCR::~TestCR() { - delete crf; -} - -int TestCR::operate() { - reenter(this) { - crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req); - - { - int ret = crf->init(); - if (ret < 0) { - return set_cr_error(ret); - } - } - - do { - - bl.clear(); - - do { - yield { - ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry); - if (ret < 0) { - return set_cr_error(ret); - } - } - } while (need_retry); - - if (retcode < 0) { - return set_cr_error(ret); - } - - dout(0) << "read " << bl.length() << " bytes" << dendl; - - if (bl.length() == 0) { - break; - } - - yield { - ret = crf->write(bl); - if (ret < 0) { - return set_cr_error(ret); - } - } - - if (retcode < 0) { - return set_cr_error(ret); - } - - dout(0) << "wrote " << bl.length() << " bytes" << dendl; - } while (true); - - do { - yield { - int ret = crf->drain_writes(&need_retry); - if (ret < 0) { - return set_cr_error(ret); - } - } - } while (need_retry); - - return set_cr_done(); - } - return 0; -} - TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_in_req, RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), @@ -197,8 +141,8 @@ TestSpliceCR::~TestSpliceCR() { int TestSpliceCR::operate() { reenter(this) { - in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req); - out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req); + in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req); + out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req); { int ret = in_crf->init(); @@ -244,7 +188,6 @@ int TestSpliceCR::operate() { if (ret < 0) { return set_cr_error(ret); } -dout(0) << __FILE__ << ":" << __LINE__ << ": headers=" << in_req->get_out_headers() << dendl; } total_read += bl.length(); diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index bc430b96de47..df7c93fbfe4a 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -308,7 +308,27 @@ public: class RGWCRHTTPGetDataCB; -class RGWStreamRWHTTPResourceCRF { +class RGWStreamReadResourceCRF { +protected: + boost::asio::coroutine read_state; + +public: + virtual int init() = 0; + virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ +}; + +class RGWStreamWriteResourceCRF { +protected: + boost::asio::coroutine write_state; + boost::asio::coroutine drain_state; + +public: + virtual int init() = 0; + virtual int write(bufferlist& data) = 0; /* reentrant */ + virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ +}; + +class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { RGWCoroutinesEnv *env; RGWCoroutine *caller; RGWHTTPManager *http_manager; @@ -317,13 +337,9 @@ class RGWStreamRWHTTPResourceCRF { RGWCRHTTPGetDataCB *in_cb{nullptr}; - boost::asio::coroutine read_state; - boost::asio::coroutine write_state; - boost::asio::coroutine drain_state; - public: - RGWStreamRWHTTPResourceCRF(CephContext *_cct, + RGWStreamReadHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWHTTPManager *_http_manager, @@ -331,28 +347,33 @@ public: caller(_caller), http_manager(_http_manager), req(_req) {} - ~RGWStreamRWHTTPResourceCRF(); + virtual ~RGWStreamReadHTTPResourceCRF(); int init(); - int read(bufferlist *data, uint64_t max, bool *need_retry); /* reentrant */ - int write(bufferlist& data); /* reentrant */ - int drain_writes(bool *need_retry); /* reentrant */ + int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ }; -class TestCR : public RGWCoroutine { - CephContext *cct; +class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { + RGWCoroutinesEnv *env; + RGWCoroutine *caller; RGWHTTPManager *http_manager; - string url; - RGWHTTPStreamRWRequest *req{nullptr}; - RGWStreamRWHTTPResourceCRF *crf{nullptr}; - bufferlist bl; - bool need_retry{false}; - int ret{0}; + + RGWHTTPStreamRWRequest *req; + public: - TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req); - ~TestCR(); + RGWStreamWriteHTTPResourceCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWHTTPManager *_http_manager, + RGWHTTPStreamRWRequest *_req) : env(_env), + caller(_caller), + http_manager(_http_manager), + req(_req) {} + virtual ~RGWStreamWriteHTTPResourceCRF() {} - int operate(); + int init(); + int write(bufferlist& data); /* reentrant */ + int drain_writes(bool *need_retry); /* reentrant */ }; class TestSpliceCR : public RGWCoroutine { @@ -361,8 +382,8 @@ class TestSpliceCR : public RGWCoroutine { string url; RGWHTTPStreamRWRequest *in_req{nullptr}; RGWHTTPStreamRWRequest *out_req{nullptr}; - RGWStreamRWHTTPResourceCRF *in_crf{nullptr}; - RGWStreamRWHTTPResourceCRF *out_crf{nullptr}; + RGWStreamReadHTTPResourceCRF *in_crf{nullptr}; + RGWStreamWriteHTTPResourceCRF *out_crf{nullptr}; bufferlist bl; bool need_retry{false}; uint64_t total_read{0};