timer.shutdown();
}
-void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info)
+void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
{
Mutex::Locker l(lock);
_complete(cn, io_id, user_info);
}
}
-void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info)
+void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
{
if (cn) {
cns.erase(cn);
if (iter != waiters.end()) {
void *user_id = iter->second;
waiters.erase(iter);
- _complete(NULL, -1 /* no IO id */, user_id);
+ _complete(NULL, rgw_io_id() /* no IO id */, user_id);
}
}
}
int RGWCoroutine::io_block(int ret, int64_t io_id) {
+ return io_block(ret, rgw_io_id{io_id, -1});
+}
+
+int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
if (stack->consume_io_finish(io_id)) {
return 0;
}
return ret;
}
-void RGWCoroutine::io_complete(int64_t io_id) {
+void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
stack->io_complete(io_id);
}
completion_mgr->wakeup((void *)this);
}
-void RGWCoroutinesStack::io_complete(int64_t io_id)
+void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
{
RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
completion_mgr->complete(nullptr, io_id, (void *)this);
((RGWAioCompletionNotifier *)arg)->cb();
}
-RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data) : completion_mgr(_mgr),
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
io_id(_io_id),
user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
c = librados::Rados::aio_create_completion((void *)this, NULL,
void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
{
io_provider->set_io_user_info((void *)this);
- io_provider->set_io_id(env->manager->get_next_io_id());
+ io_provider->assign_io(env->manager->get_io_id_provider());
}
-bool RGWCoroutinesStack::try_io_unblock(int64_t io_id)
+bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
{
if (!can_io_unblock(io_id)) {
- io_finish_ids.insert(io_id);
+#warning io_finish_ids needs to be cleaned up when owning stack finishes
+ auto p = io_finish_ids.emplace(io_id.id, io_id);
+ auto& iter = p.first;
+ bool inserted = p.second;
+ if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
+ iter->second.channels |= io_id.channels;
+ }
return false;
}
return true;
}
-bool RGWCoroutinesStack::consume_io_finish(int64_t io_id)
+bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
{
- auto iter = io_finish_ids.find(io_id);
+ auto iter = io_finish_ids.find(io_id.id);
if (iter == io_finish_ids.end()) {
return false;
}
- io_finish_ids.erase(iter);
- return true;
+ int finish_mask = iter->second.channels;
+ bool found = (finish_mask & io_id.channels) != 0;
+
+ finish_mask &= ~(finish_mask & io_id.channels);
+
+ if (finish_mask == 0) {
+ io_finish_ids.erase(iter);
+ }
+ return found;
}
cr->set_sleeping(flag);
}
-void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, int64_t io_id)
+void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
{
RWLock::WLocker wl(lock);
cr->io_complete(io_id);
env.scheduled_stacks = &scheduled_stacks;
for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
+ RGWCompletionManager::io_completion io;
RGWCoroutinesStack *stack = *iter;
++iter;
scheduled_stacks.pop_front();
stack->run_count = 0;
}
- RGWCompletionManager::io_completion io;
while (completion_mgr->try_get_next(&io)) {
handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
}
RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
{
- RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, get_next_io_id(), (void *)stack);
+ rgw_io_id io_id{get_next_io_id(), -1};
+ RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
completion_mgr->register_completion_notifier(cn);
return cn;
}
CephContext *cct;
struct io_completion {
- int64_t io_id;
+ rgw_io_id io_id;
void *user_info;
};
list<io_completion> complete_reqs;
protected:
void _wakeup(void *opaque);
- void _complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info);
+ void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
public:
RGWCompletionManager(CephContext *_cct);
~RGWCompletionManager() override;
- void complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info);
+ void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
int get_next(io_completion *io);
bool try_get_next(io_completion *io);
class RGWAioCompletionNotifier : public RefCountedObject {
librados::AioCompletion *c;
RGWCompletionManager *completion_mgr;
- int64_t io_id;
+ rgw_io_id io_id;
void *user_data;
Mutex lock;
bool registered;
public:
- RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data);
+ RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
~RGWAioCompletionNotifier() override {
c->release();
lock.Lock();
void dump(Formatter *f) const;
- void init_new_io(RGWIOProvider *io_provider);
+ void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
- int io_block(int ret = 0, int64_t io_id = -1);
- void io_complete(int64_t io_id = -1);
+ int io_block(int ret = 0) {
+ return io_block(ret, -1);
+ }
+ int io_block(int ret, int64_t io_id);
+ int io_block(int ret, const rgw_io_id& io_id);
+ void io_complete() {
+ io_complete(rgw_io_id{});
+ }
+ void io_complete(const rgw_io_id& io_id);
};
ostream& operator<<(ostream& out, const RGWCoroutine& cr);
set<RGWCoroutinesStack *> blocked_by_stack;
set<RGWCoroutinesStack *> blocking_stacks;
- set<int64_t> io_finish_ids;
- int64_t io_blocked_id{-1};
+ map<int64_t, rgw_io_id> io_finish_ids;
+ rgw_io_id io_blocked_id;
bool done_flag;
bool error_flag;
void set_io_blocked(bool flag) {
blocked_flag = flag;
}
- void set_io_blocked_id(int64_t io_id) {
+ void set_io_blocked_id(const rgw_io_id& io_id) {
io_blocked_id = io_id;
}
bool is_io_blocked() {
return blocked_flag && !done_flag;
}
- bool can_io_unblock(int64_t io_id) {
- return (io_blocked_id == io_id) ||
- (io_blocked_id < 0);
+ bool can_io_unblock(const rgw_io_id& io_id) {
+ return ((io_blocked_id.id < 0) ||
+ io_blocked_id.intersects(io_id));
}
- bool try_io_unblock(int64_t io_id);
- bool consume_io_finish(int64_t io_id);
+ bool try_io_unblock(const rgw_io_id& io_id);
+ bool consume_io_finish(const rgw_io_id& io_id);
void set_interval_wait(bool flag) {
interval_wait_flag = flag;
}
int wait(const utime_t& interval);
void wakeup();
- void io_complete(int64_t io_id = -1);
+ void io_complete() {
+ io_complete(rgw_io_id{});
+ }
+ void io_complete(const rgw_io_id& io_id);
bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
RWLock lock;
+ RGWIOIDProvider io_id_provider;
+
void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
RGWCompletionManager::io_completion& io, int *waiting_count);
protected:
int64_t get_next_io_id();
void set_sleeping(RGWCoroutine *cr, bool flag);
- void io_complete(RGWCoroutine *cr, int64_t io_id = -1);
+ void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
virtual string get_id();
void dump(Formatter *f) const;
+
+ RGWIOIDProvider& get_io_id_provider() {
+ return io_id_provider;
+ }
};
class RGWSimpleCoroutine : public RGWCoroutine {
Mutex lock;
RGWCoroutinesEnv *env;
RGWCoroutine *cr;
- int64_t io_id;
+ rgw_io_id io_id;
bufferlist data;
bufferlist extra_data;
bool got_all_extra_data{false};
public:
- RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
+ RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
{
{
env->stack->init_new_io(req);
- in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id());
+ in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ));
req->set_in_cb(in_cb);
int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
+ io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
while (!req->is_done() ||
in_cb->has_data()) {
*io_pending = true;
if (!in_cb->has_data()) {
- yield caller->io_block(0, req->get_io_id());
+ yield caller->io_block(0, io_read_mask);
}
got_attrs = true;
if (need_extra_data() && !got_extra_data) {
yield req->finish_write();
*need_retry = !req->is_done();
while (!req->is_done()) {
- yield caller->io_block(0, req->get_io_id());
+ yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
*need_retry = !req->is_done();
}
bool got_attrs{false};
bool got_extra_data{false};
+ rgw_io_id io_read_mask;
+
protected:
rgw_rest_obj rest_obj;
int ret{0};
std::atomic<bool> done = { false };
RGWHTTPClient *client{nullptr};
- int64_t io_id{-1};
+ rgw_io_id control_io_id;
void *user_info{nullptr};
bool registered{false};
RGWHTTPManager *mgr{nullptr};
delete handles;
}
+void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
+{
+ if (id == 0) {
+ id = io_id_provider.get_next();
+ }
+}
+
/*
* the simple set of callbacks will be called on RGWHTTPClient::process()
*/
req_data->mgr = nullptr;
}
if (completion_mgr) {
- completion_mgr->complete(NULL, req_data->io_id, req_data->user_info);
+ completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
}
req_data->put();
req_data->mgr = this;
req_data->client = client;
- req_data->io_id = client->get_io_id();
+ req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
req_data->user_info = client->get_io_user_info();
register_request(req_data);
struct rgw_http_req_data;
class RGWHTTPManager;
+class RGWIOIDProvider
+{
+ std::atomic<int64_t> max = {0};
+
+public:
+ RGWIOIDProvider() {}
+ int64_t get_next() {
+ return ++max;
+ }
+};
+
+struct rgw_io_id {
+ int64_t id{0};
+ int channels{0};
+
+ rgw_io_id() {}
+ rgw_io_id(int64_t _id, int _channels) : id(_id), channels(_channels) {}
+
+ bool intersects(const rgw_io_id& rhs) {
+ return (id == rhs.id && ((channels | rhs.channels) != 0));
+ }
+};
+
class RGWIOProvider
{
+ int64_t id{-1};
+
public:
RGWIOProvider() {}
- virtual void set_io_id(int64_t _io_id) = 0;
+ void assign_io(RGWIOIDProvider& io_id_provider, int io_type = -1);
+ rgw_io_id get_io_id(int io_type) {
+ return rgw_io_id{id, io_type};
+ }
+
virtual void set_io_user_info(void *_user_info) = 0;
- virtual int64_t get_io_id() = 0;
virtual void *get_io_user_info() = 0;
};
bool has_send_len;
long http_status;
- int64_t io_id{-1};
void *user_info{nullptr};
rgw_http_req_data *req_data;
static const long HTTP_STATUS_UNAUTHORIZED = 401;
static const long HTTP_STATUS_NOTFOUND = 404;
+ static constexpr int HTTPCLIENT_IO_READ = 0x1;
+ static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
+ static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
+
virtual ~RGWHTTPClient();
explicit RGWHTTPClient(CephContext *cct,
const string& _method,
method = _method;
}
- void set_io_id(int64_t _io_id) override {
- io_id = _io_id;
- }
-
void set_io_user_info(void *_user_info) override {
user_info = _user_info;
}
- int64_t get_io_id() override {
- return io_id;
- }
-
void *get_io_user_info() override {
return user_info;
}
param_vec_t *extra_headers,
RGWHTTPManager *_mgr);
- void set_io_id(int64_t _io_id) override {
- req.set_io_id(_io_id);
- }
-
- int64_t get_io_id() override {
- return req.get_io_id();
+ rgw_io_id get_io_id(int io_type) {
+ return req.get_io_id(io_type);
}
void set_io_user_info(void *user_info) override {
param_vec_t *extra_headers,
RGWHTTPManager *_mgr);
- void set_io_id(int64_t _io_id) override {
- req.set_io_id(_io_id);
- }
-
- int64_t get_io_id() override {
- return req.get_io_id();
+ rgw_io_id get_io_id(int io_type) {
+ return req.get_io_id(io_type);
}
void set_io_user_info(void *user_info) override {