};
+template <class T>
+class RGWSingletonCR : public RGWCoroutine {
+ friend class WrapperCR;
+
+ boost::asio::coroutine wrapper_state;
+ bool started{false};
+ int operate_ret{0};
+
+ struct WaiterInfo {
+ RGWCoroutine *cr{nullptr};
+ T *result;
+ };
+ using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
+
+ deque<WaiterInfoRef> waiters;
+
+ void add_waiter(RGWCoroutine *cr, T *result) {
+ auto waiter = std::make_shared<WaiterInfo>();
+ waiter->cr = cr;
+ waiter->result = result;
+ waiters.push_back(waiter);
+ };
+
+ bool get_next_waiter(WaiterInfoRef *waiter) {
+ if (waiters.empty()) {
+ waiter->reset();
+ return false;
+ }
+
+ *waiter = waiters.front();
+ waiters.pop_front();
+ return true;
+ }
+
+ int operate_wrapper() override {
+ reenter(&wrapper_state) {
+ while (!is_done()) {
+ ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
+ operate_ret = operate();
+ if (operate_ret < 0) {
+ ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
+ }
+ if (!is_done()) {
+ yield;
+ }
+ }
+
+ ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
+ /* we're done, can't yield anymore */
+
+ WaiterInfoRef waiter;
+ while (get_next_waiter(&waiter)) {
+ ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
+ waiter->cr->set_retcode(retcode);
+ waiter->cr->set_sleeping(false);
+ return_result(waiter->result);
+ put();
+ }
+
+ return retcode;
+ }
+ return 0;
+ }
+
+ virtual void return_result(T *result) {}
+
+public:
+ RGWSingletonCR(CephContext *_cct)
+ : RGWCoroutine(_cct) {}
+
+ int execute(RGWCoroutine *caller, T *result = nullptr) {
+ if (!started) {
+ ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
+ started = true;
+ caller->call(this);
+ return 0;
+ } else if (!is_done()) {
+ ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
+ get();
+ add_waiter(caller, result);
+ caller->set_sleeping(true);
+ return 0;
+ }
+
+ ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
+ caller->set_retcode(retcode);
+ return_result(result);
+ return retcode;
+ }
+};
+
+
class PSSubscription;
using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
RGWDataAccessRef data_access;
RGWDataAccess::BucketRef bucket;
-public:
+ class InitCR;
+ InitCR *init_cr{nullptr};
- class InitCR : public RGWCoroutine {
+ class InitCR : public RGWSingletonCR<bool> {
RGWDataSyncEnv *sync_env;
PSSubscriptionRef sub;
rgw_get_bucket_info_params get_bucket_info;
int i;
public:
InitCR(RGWDataSyncEnv *_sync_env,
- PSSubscriptionRef& _sub) : RGWCoroutine(_sync_env->cct),
+ PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
sync_env(_sync_env),
sub(_sub), conf(sub->env->conf),
sub_conf(sub->sub_conf) {
<< get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
return set_cr_error(retcode);
}
-ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
/* second iteration: we got -ENOENT and created a bucket */
}
}
};
+public:
+
PSSubscription(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
env(_env),
sub_conf(_sub_conf),
data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+ virtual ~PSSubscription() {
+ if (init_cr) {
+ init_cr->put();
+ }
+ }
public:
static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
PSSubConfigRef& _sub_conf) {
auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
sub->self = std::shared_ptr<PSSubscription>(sub);
+ sub->init_cr = new InitCR(_sync_env, sub->self);
+ sub->init_cr->get();
return sub->self;
}
- RGWCoroutine *init_cr() {
- return new InitCR(sync_env, self);
+ int call_init_cr(RGWCoroutine *caller) {
+ return init_cr->execute(caller);
}
RGWCoroutine *store_event_cr(EventRef& event) {
map<string, PSSubscriptionRef> subs;
- class GetSubCR : public RGWCoroutine {
+ class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
RGWDataSyncEnv *sync_env;
PSManagerRef mgr;
const string& sub_name;
GetSubCR(RGWDataSyncEnv *_sync_env,
PSManagerRef& _mgr,
const string& _sub_name,
- PSSubscriptionRef *_ref) : RGWCoroutine(_sync_env->cct),
+ PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct),
sync_env(_sync_env),
mgr(_mgr),
sub_name(_sub_name),
ref(_ref),
conf(mgr->env->conf) {
}
+ ~GetSubCR() {
+ mgr->remove_get_sub(sub_name);
+ }
int operate() override {
reenter(this) {
*ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
- yield call((*ref)->init_cr());
+ yield (*ref)->call_init_cr(this);
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl;
- ref->reset();
return set_cr_error(retcode);
}
mgr->subs[sub_name] = *ref;
+
return set_cr_done();
}
return 0;
}
+
+ void return_result(PSSubscriptionRef *result) override {
+ ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
+ if (retcode >= 0) {
+ *result = *ref;
+ }
+ }
};
+ map<string, GetSubCR *> get_subs;
+
+ void remove_get_sub(const string& name) {
+ get_subs.erase(name);
+ }
+
bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) {
auto iter = subs.find(sub_name);
if (iter != subs.end()) {
return mgr->self;
}
- RGWCoroutine *get_subscription_cr(const string& sub_name, PSSubscriptionRef *ref) {
+ int call_get_subscription_cr(RGWCoroutine *caller, const string& sub_name, PSSubscriptionRef *ref) {
if (find_sub_instance(sub_name, ref)) {
/* found it! nothing to execute */
- return nullptr;
+ ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
+ return 0;
+ }
+ auto& gs = get_subs[sub_name];
+ if (!gs) {
+ ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
+ gs = new GetSubCR(sync_env, self, sub_name, ref);
}
- return new GetSubCR(sync_env, self, sub_name, ref);
+ ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
+ return gs->execute(caller, ref);
}
friend class GetSubCR;
}
};
+
class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
- yield call(env->manager->get_subscription_cr(*siter, &sub));
+ yield env->manager->call_get_subscription_cr(this, *siter, &sub);
if (retcode < 0) {
ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
continue;