#define dout_subsys ceph_subsys_rgw
struct rgw_http_req_data : public RefCountedObject {
- CURL *easy_handle;
- curl_slist *h;
+ CURL *easy_handle{nullptr};
+ curl_slist *h{nullptr};
uint64_t id;
- int ret;
+ int ret{0};
std::atomic<bool> done = { false };
- RGWHTTPClient *client;
- void *user_info;
- bool registered;
- RGWHTTPManager *mgr;
+ RGWHTTPClient *client{nullptr};
+ void *user_info{nullptr};
+ bool registered{false};
+ RGWHTTPManager *mgr{nullptr};
char error_buf[CURL_ERROR_SIZE];
+ bool write_paused{false};
Mutex lock;
Cond cond;
- rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
- client(nullptr), user_info(nullptr), registered(false),
- mgr(NULL), lock("rgw_http_req_data::lock") {
+ rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
memset(error_buf, 0, sizeof(error_buf));
}
return ret;
}
+ void set_state(RGWHTTPRequestSetState state) {
+ Mutex::Locker l(lock);
+ CURLcode rc;
+ int bitmask;
+ switch (state) {
+ case SET_WRITE_PAUSED:
+ bitmask = CURLPAUSE_SEND;
+ break;
+ case SET_WRITE_RESUME:
+ bitmask = CURLPAUSE_CONT;
+ break;
+ default:
+ /* shouldn't really be here */
+ return;
+ }
+ rc = curl_easy_pause(easy_handle, bitmask);
+ if (rc != CURLE_OK) {
+ dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+ }
+ }
+
void finish(int r) {
Mutex::Locker l(lock);
void * const _info)
{
RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- int ret = client->send_data(ptr, size * nmemb);
+ bool pause = false;
+ int ret = client->send_data(ptr, size * nmemb, &pause);
if (ret < 0) {
dout(0) << "WARNING: client->send_data() returned ret="
<< ret << dendl;
}
+ if (ret == 0 &&
+ pause) {
+ return CURL_READFUNC_PAUSE;
+ }
+
return ret;
}
return 0;
}
- int ret = req_data->client->send_data(ptr, size * nmemb);
+ bool pause;
+
+ int ret = req_data->client->send_data(ptr, size * nmemb, &pause);
if (ret < 0) {
dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
}
+ if (ret == 0 &&
+ pause) {
+ req_data->write_paused = true;
+ return CURL_READFUNC_PAUSE;
+ }
+
return ret;
}
+Mutex& RGWHTTPClient::get_req_lock()
+{
+ return req_data->lock;
+}
+
+void RGWHTTPClient::_set_write_paused(bool pause)
+{
+ assert(req_data->lock.is_locked());
+
+ RGWHTTPManager *mgr = req_data->mgr;
+ if (pause == req_data->write_paused) {
+ return;
+ }
+ if (pause) {
+ mgr->set_request_state(this, SET_WRITE_PAUSED);
+ } else {
+ mgr->set_request_state(this, SET_WRITE_RESUME);
+ }
+}
+
static curl_slist *headers_to_slist(param_vec_t& headers)
{
curl_slist *h = NULL;
RGWHTTPClient::~RGWHTTPClient()
{
if (req_data) {
- RGWHTTPManager *http_manager = req_data->get_manager();
+ RGWHTTPManager *http_manager = req_data->mgr;
if (http_manager) {
http_manager->remove_request(this);
}
_complete_request(req_data);
}
+void RGWHTTPManager::_set_req_state(set_state& ss)
+{
+ ss.req->set_state(ss.state);
+}
/*
* hook request to the curl multi handle
*/
void RGWHTTPManager::manage_pending_requests()
{
reqs_lock.get_read();
- if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
+ if (max_threaded_req == num_reqs &&
+ unregistered_reqs.empty() &&
+ reqs_change_state.empty()) {
reqs_lock.unlock();
return;
}
}
}
+ if (!reqs_change_state.empty()) {
+ for (auto siter : reqs_change_state) {
+ _set_req_state(siter);
+ }
+ reqs_change_state.clear();
+ }
+
for (auto piter : remove_reqs) {
rgw_http_req_data *req_data = piter.first;
int r = piter.second;
return 0;
}
+int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
+{
+ rgw_http_req_data *req_data = client->get_req_data();
+
+ assert(req_data->lock.is_locked());
+
+ /* can only do that if threaded */
+ if (!is_threaded) {
+ return -EINVAL;
+ }
+
+ bool suggested_paused;
+ switch (state) {
+ case SET_WRITE_PAUSED:
+ suggested_paused = true;
+ break;
+ case SET_WRITE_RESUME:
+ suggested_paused = false;
+ break;
+ default:
+ /* shouldn't really be here */
+ return -EIO;
+ }
+ if (suggested_paused == req_data->write_paused) {
+ return 0;
+ }
+
+ req_data->write_paused = suggested_paused;
+
+ reqs_change_state.push_back(set_state(req_data, state));
+ int ret = signal_thread();
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
/*
* the synchronous, non-threaded request processing method.
*/
using param_vec_t = vector<param_pair_t>;
struct rgw_http_req_data;
+class RGWHTTPManager;
class RGWHTTPClient
{
CephContext *cct;
param_vec_t headers;
+ RGWHTTPManager *get_manager();
+
int init_request(const char *method,
const char *url,
rgw_http_req_data *req_data,
virtual int receive_data(void *ptr, size_t len) {
return 0;
}
+ virtual int send_data(void *ptr, size_t len, bool *pause) {
+ return send_data(ptr, len);
+ }
virtual int send_data(void *ptr, size_t len) {
return 0;
}
size_t size,
size_t nmemb,
void *_info);
+
+ Mutex& get_req_lock();
+
+ /* needs to be called under req_lock() */
+ void _set_write_paused(bool pause);
public:
static const long HTTP_STATUS_NOSTATUS = 0;
static const long HTTP_STATUS_UNAUTHORIZED = 401;
class RGWCompletionManager;
+enum RGWHTTPRequestSetState {
+ SET_NOP = 0,
+ SET_WRITE_PAUSED = 1,
+ SET_WRITE_RESUME = 2,
+};
+
class RGWHTTPManager {
+ struct set_state {
+ rgw_http_req_data *req;
+ RGWHTTPRequestSetState state;
+
+ set_state(rgw_http_req_data *_req, RGWHTTPRequestSetState _state) : req(_req), state(_state) {}
+ };
CephContext *cct;
RGWCompletionManager *completion_mgr;
void *multi_handle;
RWLock reqs_lock;
map<uint64_t, rgw_http_req_data *> reqs;
list<rgw_http_req_data *> unregistered_reqs;
+ list<set_state> reqs_change_state;
map<uint64_t, rgw_http_req_data *> complete_reqs;
int64_t num_reqs;
int64_t max_threaded_req;
void unlink_request(rgw_http_req_data *req_data);
void finish_request(rgw_http_req_data *req_data, int r);
void _finish_request(rgw_http_req_data *req_data, int r);
+ void _set_req_state(set_state& ss);
int link_request(rgw_http_req_data *req_data);
void manage_pending_requests();
int add_request(RGWHTTPClient *client, const char *method, const char *url,
bool send_data_hint = false);
int remove_request(RGWHTTPClient *client);
+ int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
/* only for non threaded case */
int process_requests(bool wait_for_data, bool *done);
return len;
}
-int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
+void RGWRESTStreamRWRequest::add_send_data(bufferlist& bl)
{
+ Mutex::Locker req_locker(get_req_lock());
+ Mutex::Locker wl(write_lock);
+ outbl.claim_append(bl);
+ _set_write_paused(false);
+}
+
+int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
+{
+ Mutex::Locker wl(write_lock);
+
if (outbl.length() == 0) {
+ *pause = true;
return 0;
}
- uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
+ len = std::min(len, (size_t)outbl.length());
+
+ bufferlist bl;
+ outbl.splice(0, len, &bl);
+ uint64_t send_size = bl.length();
if (send_size > 0) {
- memcpy(ptr, outbl.c_str() + write_ofs, send_size);
+ memcpy(ptr, bl.c_str(), send_size);
write_ofs += send_size;
}
return send_size;
class RGWRESTStreamRWRequest : public RGWRESTSimpleRequest {
Mutex lock;
+ Mutex write_lock;
RGWGetDataCB *cb;
bufferlist outbl;
bufferlist in_data;
- size_t chunk_ofs;
- size_t ofs;
+ size_t chunk_ofs{0};
+ size_t ofs{0};
RGWHTTPManager http_manager;
const char *method;
- uint64_t write_ofs;
+ uint64_t write_ofs{0};
+ bool send_paused{false};
protected:
int handle_header(const string& name, const string& val) override;
public:
- int send_data(void *ptr, size_t len) override;
+ int send_data(void *ptr, size_t len, bool *pause) override;
int receive_data(void *ptr, size_t len) override;
RGWRESTStreamRWRequest(CephContext *_cct, const char *_method, const string& _url, RGWGetDataCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
- lock("RGWRESTStreamReadRequest"), cb(_cb),
- chunk_ofs(0), ofs(0), http_manager(_cct), method(_method), write_ofs(0) {
+ lock("RGWRESTStreamRWRequest"), write_lock("RGWRESTStreamRWRequest::write_lock"), cb(_cb),
+ http_manager(_cct), method(_method) {
}
virtual ~RGWRESTStreamRWRequest() override {}
int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL);
}
void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+
+ void add_send_data(bufferlist& bl);
};
class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {