Mutex lock;
RGWCoroutinesEnv *env;
RGWCoroutine *cr;
+ RGWHTTPStreamRWRequest *req;
rgw_io_id io_id;
bufferlist data;
bufferlist extra_data;
bool got_all_extra_data{false};
+ bool paused{false};
public:
- RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
+ RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), req(_req) {
+ io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
+ req->set_in_cb(this);
+ }
int handle_data(bufferlist& bl, bool *pause) override {
{
data.append(bl);
}
-#define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024
- if (data.length() >= GET_DATA_WINDOW_SIZE) {
+#define GET_DATA_WINDOW_SIZE 2 * 1024 * 1024
+ uint64_t data_len = data.length();
+ if (data_len >= GET_DATA_WINDOW_SIZE) {
env->manager->io_complete(cr, io_id);
}
+ if (data_len >= 2 * GET_DATA_WINDOW_SIZE) {
+ *pause = true;
+ paused = true;
+ }
return 0;
}
void claim_data(bufferlist *dest, uint64_t max) {
- Mutex::Locker l(lock);
+ bool need_to_unpause = false;
- if (data.length() == 0) {
- return;
- }
+ {
+ Mutex::Locker l(lock);
- if (data.length() < max) {
- max = data.length();
+ if (data.length() == 0) {
+ return;
+ }
+
+ if (data.length() < max) {
+ max = data.length();
+ }
+
+ data.splice(0, max, dest);
+ need_to_unpause = (paused && data.length() <= GET_DATA_WINDOW_SIZE);
}
- data.splice(0, max, dest);
+ if (need_to_unpause) {
+ req->unpause_receive();
+ }
}
bufferlist& get_extra_data() {
{
env->stack->init_new_io(req);
- in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
-
- req->set_in_cb(in_cb);
+ in_cb = new RGWCRHTTPGetDataCB(env, caller, req);
int r = http_manager->add_request(req);
if (r < 0) {
/* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup
* correctly */
}
- yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
+ yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
}
yield req->add_send_data(data);
}
return ret;
}
- void set_state(int bitmask) {
- Mutex::Locker l(lock);
- CURLcode rc;
- int bitmask;
- switch (state) {
- case SET_WRITE_PAUSED:
- bitmask = CURLPAUSE_SEND;
- break;
- case SET_WRITE_RESUME:
- bitmask = CURLPAUSE_CONT;
- break;
- default:
- /* shouldn't really be here */
- return;
- }
-
- rc = curl_easy_pause(**curl_handle, bitmask);
- if (rc != CURLE_OK) {
- dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
- }
- }
-
+ void set_state(int bitmask);
void finish(int r) {
Mutex::Locker l(lock);
bool pause = false;
- size_t skip_bytes = req_data->client->receive_pause_skip;
+ size_t& skip_bytes = req_data->client->receive_pause_skip;
if (skip_bytes >= len) {
skip_bytes -= len;
skip_bytes = 0;
if (pause) {
+ dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
skip_bytes = len;
+ req_data->read_paused = true;
return CURL_WRITEFUNC_PAUSE;
}
return -EINVAL;
}
- bool suggested_wr_paused;
- bool suggested_rd_paused;
+ bool suggested_wr_paused = req_data->write_paused;
+ bool suggested_rd_paused = req_data->read_paused;
+
switch (state) {
case SET_WRITE_PAUSED:
suggested_wr_paused = true;
bufferlist send_bl;
bufferlist::iterator send_iter;
- size_t send_len;
bool has_send_len;
long http_status;
size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
string method;
string url;
+ size_t send_len{0};
+
param_vec_t headers;
RGWHTTPManager *get_manager();
explicit RGWHTTPClient(CephContext *cct,
const string& _method,
const string& _url)
- : send_len(0),
- has_send_len(false),
+ : has_send_len(false),
http_status(HTTP_STATUS_NOSTATUS),
req_data(nullptr),
verify_ssl(cct->_conf->rgw_verify_ssl),
size_t orig_len = len;
if (cb) {
- bufferptr bp((const char *)ptr, len);
- in_data.append(bp);
+ in_data.append((const char *)ptr, len);
+
+ size_t orig_in_data_len = in_data.length();
+
int ret = cb->handle_data(in_data, pause);
if (ret < 0)
return ret;
in_data.clear();
} else {
/* partial read */
+ assert(in_data.length() <= orig_in_data_len);
len = ret;
bufferlist bl;
- in_data.splice(0, len, &bl);
+ size_t left_to_read = orig_in_data_len - len;
+ if (in_data.length() > left_to_read) {
+ in_data.splice(0, in_data.length() - left_to_read, &bl);
+ }
}
}
ofs += len;
stream_writes = s;
}
+void RGWHTTPStreamRWRequest::unpause_receive()
+{
+ Mutex::Locker req_locker(get_req_lock());
+ if (!read_paused) {
+ _set_read_paused(false);
+ }
+}
+
void RGWHTTPStreamRWRequest::add_send_data(bufferlist& bl)
{
Mutex::Locker req_locker(get_req_lock());
Mutex::Locker wl(write_lock);
if (outbl.length() == 0) {
- if (stream_writes && !write_stream_complete) {
+ if ((stream_writes && !write_stream_complete) ||
+ (write_ofs < send_len)) {
*pause = true;
}
return 0;
void set_in_cb(ReceiveCB *_cb) { cb = _cb; }
void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
+ void unpause_receive();
+
void add_send_data(bufferlist& bl);
void set_stream_write(bool s);