};
-RGWStreamRWHTTPResourceCRF::~RGWStreamRWHTTPResourceCRF()
+RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF()
{
delete in_cb;
}
-int RGWStreamRWHTTPResourceCRF::init()
+int RGWStreamReadHTTPResourceCRF::init()
{
env->stack->init_new_io(req);
return 0;
}
-int RGWStreamRWHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
+int RGWStreamWriteHTTPResourceCRF::init()
+{
+ env->stack->init_new_io(req);
+
+ int r = http_manager->add_request(req);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
while (!req->is_done()) {
return 0;
}
-int RGWStreamRWHTTPResourceCRF::write(bufferlist& data)
+int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data)
{
#warning write need to throttle and block
reenter(&write_state) {
return 0;
}
-int RGWStreamRWHTTPResourceCRF::drain_writes(bool *need_retry)
+int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
{
reenter(&drain_state) {
*need_retry = true;
return 0;
}
-TestCR::TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
- req(_req) {}
-TestCR::~TestCR() {
- delete crf;
-}
-
-int TestCR::operate() {
- reenter(this) {
- crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, req);
-
- {
- int ret = crf->init();
- if (ret < 0) {
- return set_cr_error(ret);
- }
- }
-
- do {
-
- bl.clear();
-
- do {
- yield {
- ret = crf->read(&bl, 4 * 1024 * 1024, &need_retry);
- if (ret < 0) {
- return set_cr_error(ret);
- }
- }
- } while (need_retry);
-
- if (retcode < 0) {
- return set_cr_error(ret);
- }
-
- dout(0) << "read " << bl.length() << " bytes" << dendl;
-
- if (bl.length() == 0) {
- break;
- }
-
- yield {
- ret = crf->write(bl);
- if (ret < 0) {
- return set_cr_error(ret);
- }
- }
-
- if (retcode < 0) {
- return set_cr_error(ret);
- }
-
- dout(0) << "wrote " << bl.length() << " bytes" << dendl;
- } while (true);
-
- do {
- yield {
- int ret = crf->drain_writes(&need_retry);
- if (ret < 0) {
- return set_cr_error(ret);
- }
- }
- } while (need_retry);
-
- return set_cr_done();
- }
- return 0;
-}
-
TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
RGWHTTPStreamRWRequest *_in_req,
RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
int TestSpliceCR::operate() {
reenter(this) {
- in_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
- out_crf = new RGWStreamRWHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
+ in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
+ out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
{
int ret = in_crf->init();
if (ret < 0) {
return set_cr_error(ret);
}
-dout(0) << __FILE__ << ":" << __LINE__ << ": headers=" << in_req->get_out_headers() << dendl;
}
total_read += bl.length();
class RGWCRHTTPGetDataCB;
-class RGWStreamRWHTTPResourceCRF {
+class RGWStreamReadResourceCRF {
+protected:
+ boost::asio::coroutine read_state;
+
+public:
+ virtual int init() = 0;
+ virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+};
+
+class RGWStreamWriteResourceCRF {
+protected:
+ boost::asio::coroutine write_state;
+ boost::asio::coroutine drain_state;
+
+public:
+ virtual int init() = 0;
+ virtual int write(bufferlist& data) = 0; /* reentrant */
+ virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
+};
+
+class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
RGWCoroutinesEnv *env;
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
RGWCRHTTPGetDataCB *in_cb{nullptr};
- boost::asio::coroutine read_state;
- boost::asio::coroutine write_state;
- boost::asio::coroutine drain_state;
-
public:
- RGWStreamRWHTTPResourceCRF(CephContext *_cct,
+ RGWStreamReadHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
RGWHTTPManager *_http_manager,
caller(_caller),
http_manager(_http_manager),
req(_req) {}
- ~RGWStreamRWHTTPResourceCRF();
+ virtual ~RGWStreamReadHTTPResourceCRF();
int init();
- int read(bufferlist *data, uint64_t max, bool *need_retry); /* reentrant */
- int write(bufferlist& data); /* reentrant */
- int drain_writes(bool *need_retry); /* reentrant */
+ int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
};
-class TestCR : public RGWCoroutine {
- CephContext *cct;
+class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
+ RGWCoroutinesEnv *env;
+ RGWCoroutine *caller;
RGWHTTPManager *http_manager;
- string url;
- RGWHTTPStreamRWRequest *req{nullptr};
- RGWStreamRWHTTPResourceCRF *crf{nullptr};
- bufferlist bl;
- bool need_retry{false};
- int ret{0};
+
+ RGWHTTPStreamRWRequest *req;
+
public:
- TestCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_req);
- ~TestCR();
+ RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
+ RGWCoroutinesEnv *_env,
+ RGWCoroutine *_caller,
+ RGWHTTPManager *_http_manager,
+ RGWHTTPStreamRWRequest *_req) : env(_env),
+ caller(_caller),
+ http_manager(_http_manager),
+ req(_req) {}
+ virtual ~RGWStreamWriteHTTPResourceCRF() {}
- int operate();
+ int init();
+ int write(bufferlist& data); /* reentrant */
+ int drain_writes(bool *need_retry); /* reentrant */
};
class TestSpliceCR : public RGWCoroutine {
string url;
RGWHTTPStreamRWRequest *in_req{nullptr};
RGWHTTPStreamRWRequest *out_req{nullptr};
- RGWStreamRWHTTPResourceCRF *in_crf{nullptr};
- RGWStreamRWHTTPResourceCRF *out_crf{nullptr};
+ RGWStreamReadHTTPResourceCRF *in_crf{nullptr};
+ RGWStreamWriteHTTPResourceCRF *out_crf{nullptr};
bufferlist bl;
bool need_retry{false};
uint64_t total_read{0};