timer.shutdown();
}
-void RGWCompletionManager::complete(void *user_info)
+void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info)
{
Mutex::Locker l(lock);
- _complete(user_info);
+ _complete(cn, user_info);
}
-void RGWCompletionManager::_complete(void *user_info)
+void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
{
+ Mutex::Locker l(lock);
+ if (cn) {
+ cns.insert(cn);
+ cn->get();
+ }
+}
+
+void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
+{
+ Mutex::Locker l(lock);
+ if (cn) {
+ cns.erase(cn);
+ cn->put();
+ }
+}
+
+void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info)
+{
+ if (cn) {
+ cns.erase(cn);
+ cn->put();
+ }
complete_reqs.push_back(user_info);
cond.Signal();
}
void RGWCompletionManager::go_down()
{
Mutex::Locker l(lock);
+ for (auto cn : cns) {
+ cn->unregister();
+ }
going_down.set(1);
cond.Signal();
}
if (iter != waiters.end()) {
void *user_id = iter->second;
waiters.erase(iter);
- _complete(user_id);
+ _complete(NULL, user_id);
}
}
((RGWAioCompletionNotifier *)arg)->cb();
}
-RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr),
+ user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
}
lock.unlock();
RGWCoroutinesStack *blocked_stack;
- while (completion_mgr.try_get_next((void **)&blocked_stack)) {
+ while (completion_mgr->try_get_next((void **)&blocked_stack)) {
handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
}
* these aren't really waiting for IOs
*/
while (blocked_count - interval_wait_count >= ops_window) {
- ret = completion_mgr.get_next((void **)&blocked_stack);
+ ret = completion_mgr->get_next((void **)&blocked_stack);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
}
while (scheduled_stacks.empty() && blocked_count > 0) {
- ret = completion_mgr.get_next((void **)&blocked_stack);
+ ret = completion_mgr->get_next((void **)&blocked_stack);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
}
class RGWCoroutinesStack;
class RGWCoroutinesManager;
+class RGWAioCompletionNotifier;
-class RGWCompletionManager {
+class RGWCompletionManager : public RefCountedObject {
CephContext *cct;
list<void *> complete_reqs;
+ set<RGWAioCompletionNotifier *> cns;
Mutex lock;
Cond cond;
protected:
void _wakeup(void *opaque);
- void _complete(void *user_info);
+ void _complete(RGWAioCompletionNotifier *cn, void *user_info);
public:
RGWCompletionManager(CephContext *_cct);
~RGWCompletionManager();
- void complete(void *user_info);
+ void complete(RGWAioCompletionNotifier *cn, void *user_info);
int get_next(void **user_info);
bool try_get_next(void **user_info);
*/
void wait_interval(void *opaque, const utime_t& interval, void *user_info);
void wakeup(void *opaque);
+
+ void register_completion_notifier(RGWAioCompletionNotifier *cn);
+ void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
};
/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
librados::AioCompletion *c;
RGWCompletionManager *completion_mgr;
void *user_data;
+ Mutex lock;
+ bool registered;
public:
RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
~RGWAioCompletionNotifier() {
c->release();
+ lock.Lock();
+ bool need_unregister = registered;
+ if (registered) {
+ completion_mgr->get();
+ }
+ registered = false;
+ lock.Unlock();
+ if (need_unregister) {
+ completion_mgr->unregister_completion_notifier(this);
+ completion_mgr->put();
+ }
}
librados::AioCompletion *completion() {
return c;
}
+ void unregister() {
+ Mutex::Locker l(lock);
+ if (!registered) {
+ return;
+ }
+ registered = false;
+ }
+
void cb() {
- completion_mgr->complete(user_data);
+ lock.Lock();
+ if (!registered) {
+ lock.Unlock();
+ put();
+ return;
+ }
+ completion_mgr->get();
+ registered = false;
+ lock.Unlock();
+ completion_mgr->complete(this, user_data);
+ completion_mgr->put();
put();
}
};
void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
protected:
- RGWCompletionManager completion_mgr;
+ RGWCompletionManager *completion_mgr;
RGWCoroutinesManagerRegistry *cr_registry;
int ops_window;
void put_completion_notifier(RGWAioCompletionNotifier *cn);
public:
RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
- completion_mgr(cct), cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
+ cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
+ completion_mgr = new RGWCompletionManager(cct);
if (cr_registry) {
cr_registry->add(this);
}
}
virtual ~RGWCoroutinesManager() {
+ completion_mgr->put();
if (cr_registry) {
cr_registry->remove(this);
}
int run(RGWCoroutine *op);
void stop() {
going_down.set(1);
- completion_mgr.go_down();
+ completion_mgr->go_down();
}
virtual void report_error(RGWCoroutinesStack *op);
RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
- RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
+ RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
RGWCoroutinesStack *allocate_stack();
RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
store(_store), async_rados(async_rados),
- http_manager(store->ctx(), &completion_mgr),
+ http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
int init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger);
reqs.erase(iter);
}
if (completion_mgr) {
- completion_mgr->complete(req_data->client->get_user_info());
+ completion_mgr->complete(NULL, req_data->client->get_user_info());
}
req_data->put();
}
pinfo->marker = header.max_marker;
pinfo->last_update = header.max_time;
}
- completion_manager->complete(user_info);
+ completion_manager->complete(NULL, user_info);
put();
}
public:
RGWMetaNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
- http_manager(store->ctx(), &completion_mgr) {
+ http_manager(store->ctx(), completion_mgr) {
http_manager.set_threaded();
}
public:
RGWDataNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
- http_manager(store->ctx(), &completion_mgr) {
+ http_manager(store->ctx(), completion_mgr) {
http_manager.set_threaded();
}
RGWMetaSyncStatusManager *_sm)
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
store(_store), conn(NULL), async_rados(async_rados),
- http_manager(store->ctx(), &completion_mgr),
+ http_manager(store->ctx(), completion_mgr),
status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {}
~RGWRemoteMetaLog();