return 0;
}
+bool RGWStreamReadHTTPResourceCRF::has_attrs()
+{
+ return got_attrs;
+}
+
+void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs)
+{
+#warning need to lock in_req->headers
+ *attrs = req->get_out_headers();
+}
+
int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
if (!in_cb->has_data()) {
yield caller->io_block(0, req->get_io_id());
}
+ got_attrs = true;
*io_pending = false;
in_cb->claim_data(out, max_size);
if (!req->is_done()) {
return 0;
}
+void RGWStreamWriteHTTPResourceCRF::set_attrs(const map<string, string>& attrs)
+{
+ for (auto h : attrs) {
+ if (h.first == "CONTENT_LENGTH") {
+ req->set_send_length(atoi(h.second.c_str()));
+ } else {
+ req->append_header(h.first, h.second);
+ }
+ }
+}
+
int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data)
{
#warning write need to throttle and block
return 0;
}
-TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
- RGWHTTPStreamRWRequest *_in_req,
- RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
- in_req(_in_req), out_req(_out_req) {}
-TestSpliceCR::~TestSpliceCR() {
- delete in_crf;
- delete out_crf;
-}
+RGWStreamSpliceCR::RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWStreamReadHTTPResourceCRF *_in_crf,
+ RGWStreamWriteHTTPResourceCRF *_out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+ in_crf(_in_crf), out_crf(_out_crf) {}
+RGWStreamSpliceCR::~RGWStreamSpliceCR() { }
-int TestSpliceCR::operate() {
+int RGWStreamSpliceCR::operate() {
reenter(this) {
- in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
- out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
-
{
int ret = in_crf->init();
if (ret < 0) {
}
} while (need_retry);
- dout(0) << "read " << bl.length() << " bytes" << dendl;
+ ldout(cct, 20) << "read " << bl.length() << " bytes" << dendl;
if (bl.length() == 0) {
break;
}
- if (total_read == 0) {
-#warning need to lock in_req->headers
- for (auto h : in_req->get_out_headers()) {
- if (h.first == "CONTENT_LENGTH") {
- out_req->set_send_length(atoi(h.second.c_str()));
- } else {
- out_req->append_header(h.first, h.second);
- }
- }
+ if (!in_crf->has_attrs()) {
+ /* shouldn't happen */
+ ldout(cct, 0) << "ERROR: " << __func__ << ": can't handle !in_ctf->has_attrs" << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ if (!sent_attrs) {
+ map<string, string> attrs;
+ in_crf->get_attrs(&attrs);
+ out_crf->set_attrs(attrs);
int ret = out_crf->init();
if (ret < 0) {
return set_cr_error(ret);
}
+ sent_attrs = true;
}
total_read += bl.length();
return set_cr_error(ret);
}
- dout(0) << "wrote " << bl.length() << " bytes" << dendl;
+ ldout(cct, 20) << "wrote " << bl.length() << " bytes" << dendl;
} while (true);
do {
}
return 0;
}
+
+TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWHTTPStreamRWRequest *_in_req,
+ RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
+ in_req(_in_req), out_req(_out_req) {}
+int TestSpliceCR::operate() {
+ reenter(this) {
+ in_crf = new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req);
+ out_crf = new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req);
+
+ yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf));
+
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+};
+
public:
virtual int init() = 0;
virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+ virtual bool has_attrs() = 0;
+ virtual void get_attrs(std::map<string, string> *attrs) = 0;
};
class RGWStreamWriteResourceCRF {
public:
virtual int init() = 0;
+ virtual void set_attrs(const std::map<string, string>& attrs) = 0;
virtual int write(bufferlist& data) = 0; /* reentrant */
virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
};
RGWCRHTTPGetDataCB *in_cb{nullptr};
+ bool got_attrs{false};
public:
RGWStreamReadHTTPResourceCRF(CephContext *_cct,
req(_req) {}
virtual ~RGWStreamReadHTTPResourceCRF();
- int init();
+ int init() override;
int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
+ bool has_attrs() override;
+ void get_attrs(std::map<string, string> *pattrs) override;
};
class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
req(_req) {}
virtual ~RGWStreamWriteHTTPResourceCRF() {}
- int init();
- int write(bufferlist& data); /* reentrant */
- int drain_writes(bool *need_retry); /* reentrant */
+ int init() override;
+ void set_attrs(const std::map<string, string>& attrs) override;
+ int write(bufferlist& data) override; /* reentrant */
+ int drain_writes(bool *need_retry) override; /* reentrant */
};
-class TestSpliceCR : public RGWCoroutine {
+class RGWStreamSpliceCR : public RGWCoroutine {
CephContext *cct;
RGWHTTPManager *http_manager;
string url;
- RGWHTTPStreamRWRequest *in_req{nullptr};
- RGWHTTPStreamRWRequest *out_req{nullptr};
RGWStreamReadHTTPResourceCRF *in_crf{nullptr};
RGWStreamWriteHTTPResourceCRF *out_crf{nullptr};
bufferlist bl;
bool need_retry{false};
+ bool sent_attrs{false};
uint64_t total_read{0};
int ret{0};
+public:
+ RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWStreamReadHTTPResourceCRF *_in_crf,
+ RGWStreamWriteHTTPResourceCRF *_out_crf);
+ ~RGWStreamSpliceCR();
+
+ int operate();
+};
+
+class TestSpliceCR : public RGWCoroutine {
+ CephContext *cct;
+ RGWHTTPManager *http_manager;
+ RGWHTTPStreamRWRequest *in_req{nullptr};
+ RGWHTTPStreamRWRequest *out_req{nullptr};
+ RGWStreamReadHTTPResourceCRF *in_crf{nullptr};
+ RGWStreamWriteHTTPResourceCRF *out_crf{nullptr};
public:
TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
RGWHTTPStreamRWRequest *_in_req,
RGWHTTPStreamRWRequest *_out_req);
- ~TestSpliceCR();
int operate();
};