cns.erase(cn);
}
+#warning shouldn't have more than one entry in complete_reqs per io_id
complete_reqs.push_back(io_completion{io_id, user_info});
cond.Signal();
}
{
env->stack->init_new_io(req);
- in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ));
+ in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
req->set_in_cb(in_cb);
{
env->stack->init_new_io(req);
+ req->set_write_drain_cb(&write_drain_notify_cb);
+
int r = http_manager->add_request(req);
if (r < 0) {
return r;
*/
continue;
}
- if (!req->is_done()) {
+ if (!req->is_done() || out->length() >= max_size) {
yield;
}
}
}
}
-int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data)
+#define PENDING_WRITES_WINDOW (1 * 1024 * 1024)
+
+void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size)
+{
+ lock_guard l(blocked_lock);
+ if (is_blocked && (pending_size < PENDING_WRITES_WINDOW / 2)) {
+ env->manager->io_complete(caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
+ is_blocked = false;
+ }
+}
+
+void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size)
+{
+ crf->write_drain_notify(pending_size);
+}
+
+int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending)
{
-#warning write need to throttle and block
reenter(&write_state) {
while (!req->is_done()) {
+ *io_pending = false;
+ if (req->get_pending_send_size() >= PENDING_WRITES_WINDOW) {
+ *io_pending = true;
+ {
+ lock_guard l(blocked_lock);
+ is_blocked = true;
+
+ /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup
+ * correctly */
+ }
+ yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
+ }
yield req->add_send_data(data);
}
}
total_read += bl.length();
- yield {
- ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl;
- ret = out_crf->write(bl);
- if (ret < 0) {
- return set_cr_error(ret);
+ do {
+ yield {
+ ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl;
+ ret = out_crf->write(bl, &need_retry);
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
}
- }
- if (retcode < 0) {
- ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
- return set_cr_error(ret);
- }
+ if (retcode < 0) {
+ ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
+ return set_cr_error(ret);
+ }
+ } while (need_retry);
} while (true);
do {
#define CEPH_RGW_CR_REST_H
#include <boost/intrusive_ptr.hpp>
+#include <mutex>
#include "include/assert.h" // boost header clobbers our assert.h
#include "rgw_coroutine.h"
virtual int init() = 0;
virtual void send_ready(const rgw_rest_obj& rest_obj) = 0;
virtual int send() = 0;
- virtual int write(bufferlist& data) = 0; /* reentrant */
+ virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */
virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
};
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
+ using lock_guard = std::lock_guard<std::mutex>;
+
+ std::mutex blocked_lock;
+ bool is_blocked;
+
RGWHTTPStreamRWRequest *req{nullptr};
struct multipart_info {
uint64_t part_size;
} multipart;
+ class WriteDrainNotify : public RGWWriteDrainCB {
+ RGWStreamWriteHTTPResourceCRF *crf;
+ public:
+ WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {}
+ void notify(uint64_t pending_size) override;
+ } write_drain_notify_cb;
+
public:
RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
RGWHTTPManager *_http_manager) : env(_env),
caller(_caller),
- http_manager(_http_manager) {}
+ http_manager(_http_manager),
+ write_drain_notify_cb(this) {}
virtual ~RGWStreamWriteHTTPResourceCRF() {}
int init() override {
}
void send_ready(const rgw_rest_obj& rest_obj) override;
int send() override;
- int write(bufferlist& data) override; /* reentrant */
+ int write(bufferlist& data, bool *need_retry) override; /* reentrant */
+ void write_drain_notify(uint64_t pending_size);
int drain_writes(bool *need_retry) override; /* reentrant */
virtual void handle_headers(const std::map<string, string>& headers) {}
_set_write_paused(false);
}
+uint64_t RGWHTTPStreamRWRequest::get_pending_send_size()
+{
+ Mutex::Locker wl(write_lock);
+ return outbl.length();
+}
+
void RGWHTTPStreamRWRequest::finish_write()
{
Mutex::Locker req_locker(get_req_lock());
int RGWHTTPStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
{
- Mutex::Locker wl(write_lock);
+ uint64_t out_len;
+ uint64_t send_size;
+ {
+ Mutex::Locker wl(write_lock);
- if (outbl.length() == 0) {
- if (stream_writes && !write_stream_complete) {
- *pause = true;
+ if (outbl.length() == 0) {
+ if (stream_writes && !write_stream_complete) {
+ *pause = true;
+ }
+ return 0;
}
- return 0;
- }
- len = std::min(len, (size_t)outbl.length());
+ len = std::min(len, (size_t)outbl.length());
+
+ bufferlist bl;
+ outbl.splice(0, len, &bl);
+ send_size = bl.length();
+ if (send_size > 0) {
+ memcpy(ptr, bl.c_str(), send_size);
+ write_ofs += send_size;
+ }
- bufferlist bl;
- outbl.splice(0, len, &bl);
- uint64_t send_size = bl.length();
- if (send_size > 0) {
- memcpy(ptr, bl.c_str(), send_size);
- write_ofs += send_size;
+ out_len = outbl.length();
+ }
+ /* don't need to be under write_lock here, avoid deadlocks in case notify callback
+ * needs to lock */
+ if (write_drain_cb) {
+ write_drain_cb->notify(out_len);
}
return send_size;
}
int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
};
+class RGWWriteDrainCB {
+public:
+ RGWWriteDrainCB() = default;
+ virtual ~RGWWriteDrainCB() = default;
+ virtual void notify(uint64_t pending_size) = 0;
+};
+
class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
Mutex lock;
Mutex write_lock;
RGWGetDataCB *cb{nullptr};
+ RGWWriteDrainCB *write_drain_cb{nullptr};
bufferlist outbl;
bufferlist in_data;
size_t chunk_ofs{0};
}
void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+ void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
void add_send_data(bufferlist& bl);
void set_stream_write(bool s);
+ uint64_t get_pending_send_size();
+
/* finish streaming writes */
void finish_write();
};