int64_t io_id;
bufferlist data;
bufferlist extra_data;
+ bool got_all_extra_data{false};
public:
RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
{
Mutex::Locker l(lock);
- if (!has_all_extra_data()) {
+ if (!got_all_extra_data) {
off_t max = extra_data_len - extra_data.length();
if (max > bl_len) {
max = bl_len;
}
bl.splice(0, max, &extra_data);
bl_len -= max;
+ got_all_extra_data = extra_data.length() == extra_data_len;
}
if (bl_len == bl.length()) {
- data.claim_append(bl);
+ data.append(bl);
} else {
bl.splice(0, bl_len, &data);
}
}
bool has_all_extra_data() {
- return (extra_data.length() == extra_data_len);
+ return got_all_extra_data;
}
};
int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
- while (!req->is_done()) {
+ while (!req->is_done() ||
+ in_cb->has_data()) {
*io_pending = true;
if (!in_cb->has_data()) {
yield caller->io_block(0, req->get_io_id());
return 0;
}
+bool RGWStreamReadHTTPResourceCRF::is_done()
+{
+ return req->is_done();
+}
+
void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj)
{
req->set_send_length(rest_obj.content_len);
ldout(cct, 20) << "read " << bl.length() << " bytes" << dendl;
- if (bl.length() == 0) {
- break;
+ if (!in_crf->has_attrs()) {
+ assert (bl.length() == 0);
+ continue;
}
- if (!in_crf->has_attrs()) {
- /* shouldn't happen */
- ldout(cct, 0) << "ERROR: " << __func__ << ": can't handle !in_ctf->has_attrs" << dendl;
- return set_cr_error(-EIO);
+ if (bl.length() == 0 && in_crf->is_done()) {
+ break;
}
if (!sent_attrs) {
int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info);
bool has_attrs() override;
void get_attrs(std::map<string, string> *attrs);
+ bool is_done();
virtual bool need_extra_data() { return false; }
void set_req(RGWHTTPStreamRWRequest *r) {