RGWHTTPManager *mgr{nullptr};
char error_buf[CURL_ERROR_SIZE];
bool write_paused{false};
+ bool read_paused{false};
Mutex lock;
Cond cond;
return ret;
}
- void set_state(RGWHTTPRequestSetState state) {
+ void set_state(int bitmask) {
Mutex::Locker l(lock);
CURLcode rc;
int bitmask;
/* shouldn't really be here */
return;
}
+
rc = curl_easy_pause(**curl_handle, bitmask);
if (rc != CURLE_OK) {
dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
if (pause) {
skip_bytes = len;
- } else {
- skip_bytes = 0;
+ return CURL_WRITEFUNC_PAUSE;
}
+ skip_bytes = 0;
+
return len;
}
}
}
+void RGWHTTPClient::_set_read_paused(bool pause)
+{
+ assert(req_data->lock.is_locked());
+
+ RGWHTTPManager *mgr = req_data->mgr;
+ if (pause == req_data->read_paused) {
+ return;
+ }
+ if (pause) {
+ mgr->set_request_state(this, SET_READ_PAUSED);
+ } else {
+ mgr->set_request_state(this, SET_READ_RESUME);
+ }
+}
+
static curl_slist *headers_to_slist(param_vec_t& headers)
{
curl_slist *h = NULL;
void RGWHTTPManager::_set_req_state(set_state& ss)
{
- ss.req->set_state(ss.state);
+ ss.req->set_state(ss.bitmask);
}
/*
* hook request to the curl multi handle
return -EINVAL;
}
- bool suggested_paused;
+ bool suggested_wr_paused;
+ bool suggested_rd_paused;
switch (state) {
case SET_WRITE_PAUSED:
- suggested_paused = true;
+ suggested_wr_paused = true;
break;
case SET_WRITE_RESUME:
- suggested_paused = false;
+ suggested_wr_paused = false;
+ break;
+ case SET_READ_PAUSED:
+ suggested_rd_paused = true;
+ break;
+ case SET_READ_RESUME:
+ suggested_rd_paused = false;
break;
default:
/* shouldn't really be here */
return -EIO;
}
- if (suggested_paused == req_data->write_paused) {
+ if (suggested_wr_paused == req_data->write_paused &&
+ suggested_rd_paused == req_data->read_paused) {
return 0;
}
- req_data->write_paused = suggested_paused;
+ req_data->write_paused = suggested_wr_paused;
+ req_data->read_paused = suggested_rd_paused;
+
+ int bitmask = CURLPAUSE_CONT;
+
+ if (req_data->write_paused) {
+ bitmask |= CURLPAUSE_SEND;
+ }
+
+ if (req_data->read_paused) {
+ bitmask |= CURLPAUSE_RECV;
+ }
- reqs_change_state.push_back(set_state(req_data, state));
+ reqs_change_state.push_back(set_state(req_data, bitmask));
int ret = signal_thread();
if (ret < 0) {
return ret;
/* needs to be called under req_lock() */
void _set_write_paused(bool pause);
+ void _set_read_paused(bool pause);
public:
static const long HTTP_STATUS_NOSTATUS = 0;
static const long HTTP_STATUS_UNAUTHORIZED = 401;
SET_NOP = 0,
SET_WRITE_PAUSED = 1,
SET_WRITE_RESUME = 2,
+ SET_READ_PAUSED = 3,
+ SET_READ_RESUME = 4,
};
class RGWHTTPManager {
struct set_state {
rgw_http_req_data *req;
- RGWHTTPRequestSetState state;
+ int bitmask;
- set_state(rgw_http_req_data *_req, RGWHTTPRequestSetState _state) : req(_req), state(_state) {}
+ set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
};
CephContext *cct;
RGWCompletionManager *completion_mgr;