}
};
-class RGWIndexCompletionThread : public RGWRadosThread, public DoutPrefixProvider {
- RGWRados *store;
-
- uint64_t interval_msec() override {
- return 0;
- }
-
- list<complete_op_data *> completions;
-
- ceph::mutex completions_lock =
- ceph::make_mutex("RGWIndexCompletionThread::completions_lock");
-public:
- RGWIndexCompletionThread(RGWRados *_store)
- : RGWRadosThread(_store, "index-complete"), store(_store) {}
-
- int process(const DoutPrefixProvider *dpp) override;
-
- void add_completion(complete_op_data *completion) {
- {
- std::lock_guard l{completions_lock};
- completions.push_back(completion);
- }
-
- signal();
- }
-
- CephContext *get_cct() const override { return store->ctx(); }
- unsigned get_subsys() const { return dout_subsys; }
- std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw index completion thread: "; }
-};
-
-int RGWIndexCompletionThread::process(const DoutPrefixProvider *dpp)
-{
- list<complete_op_data *> comps;
-
- {
- std::lock_guard l{completions_lock};
- completions.swap(comps);
- }
+class RGWIndexCompletionManager {
+ RGWRados* const store;
+ const int num_shards;
+ ceph::containers::tiny_vector<ceph::mutex> locks;
+ std::vector<set<complete_op_data*>> completions;
+ std::vector<complete_op_data*> retry_completions;
- for (auto c : comps) {
- std::unique_ptr<complete_op_data> up{c};
+ std::thread retry_thread;
+ std::condition_variable cond;
+ std::mutex retry_completions_lock;
- if (going_down()) {
- continue;
- }
- ldpp_dout(this, 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+ bool _stop{false};
- RGWRados::BucketShard bs(store);
- RGWBucketInfo bucket_info;
+ std::atomic<int> cur_shard {0};
- int r = bs.init(c->obj.bucket, c->obj, &bucket_info, this);
- if (r < 0) {
- ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
- /* not much to do */
- continue;
+ void process();
+
+ void add_completion(complete_op_data *completion);
+
+ void stop() {
+ if (retry_thread.joinable()) {
+ _stop = true;
+ cond.notify_all();
+ retry_thread.join();
}
- r = store->guard_reshard(this, &bs, c->obj, bucket_info,
- [&](RGWRados::BucketShard *bs) -> int {
- librados::ObjectWriteOperation o;
- o.assert_exists(); // bucket index shard must exist
- cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
- cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
- c->log_op, c->bilog_op, &c->zones_trace);
- return bs->bucket_obj.operate(this, &o, null_yield);
- });
- if (r < 0) {
- ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
- /* ignoring error, can't do anything about it */
- continue;
- }
- r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id);
- if (r < 0) {
- ldpp_dout(this, -1) << "ERROR: failed writing data log" << dendl;
+ for (int i = 0; i < num_shards; ++i) {
+ std::lock_guard l{locks[i]};
+ for (auto c : completions[i]) {
+ c->stop();
+ }
}
+ completions.clear();
+ }
+
+ int next_shard() {
+ int result = cur_shard % num_shards;
+ cur_shard++;
+ return result;
}
-
- return 0;
-}
-
-class RGWIndexCompletionManager {
- RGWRados *store{nullptr};
- ceph::containers::tiny_vector<ceph::mutex> locks;
- vector<set<complete_op_data *> > completions;
-
- RGWIndexCompletionThread *completion_thread{nullptr};
-
- int num_shards;
-
- std::atomic<int> cur_shard {0};
-
public:
RGWIndexCompletionManager(RGWRados *_store) :
store(_store),
+ num_shards(store->ctx()->_conf->rgw_thread_pool_size),
locks{ceph::make_lock_container<ceph::mutex>(
- store->ctx()->_conf->rgw_thread_pool_size,
+ num_shards,
[](const size_t i) {
return ceph::make_mutex("RGWIndexCompletionManager::lock::" +
std::to_string(i));
- })}
- {
- num_shards = store->ctx()->_conf->rgw_thread_pool_size;
- completions.resize(num_shards);
- }
+ })},
+ completions(num_shards),
+ retry_thread(&RGWIndexCompletionManager::process, this)
+ {}
+
~RGWIndexCompletionManager() {
stop();
}
- int next_shard() {
- int result = cur_shard % num_shards;
- cur_shard++;
- return result;
- }
-
void create_completion(const rgw_obj& obj,
RGWModifyOp op, string& tag,
rgw_bucket_entry_ver& ver,
uint16_t bilog_op,
rgw_zone_set *zones_trace,
complete_op_data **result);
- bool handle_completion(completion_t cb, complete_op_data *arg);
- int start(const DoutPrefixProvider *dpp) {
- completion_thread = new RGWIndexCompletionThread(store);
- int ret = completion_thread->init(dpp);
- if (ret < 0) {
- return ret;
- }
- completion_thread->start();
- return 0;
- }
- void stop() {
- if (completion_thread) {
- completion_thread->stop();
- delete completion_thread;
- }
+ bool handle_completion(completion_t cb, complete_op_data *arg);
- for (int i = 0; i < num_shards; ++i) {
- std::lock_guard l{locks[i]};
- for (auto c : completions[i]) {
- c->stop();
- }
- }
- completions.clear();
+ CephContext* ctx() {
+ return store->ctx();
}
};
static void obj_complete_cb(completion_t cb, void *arg)
{
- complete_op_data *completion = (complete_op_data *)arg;
+ complete_op_data *completion = reinterpret_cast<complete_op_data*>(arg);
completion->lock.lock();
if (completion->stopped) {
completion->lock.unlock(); /* can drop lock, no one else is referencing us */
}
}
+void RGWIndexCompletionManager::process()
+{
+ DoutPrefix dpp(store->ctx(), dout_subsys, "rgw index completion thread: ");
+ while(!_stop) {
+ std::vector<complete_op_data*> comps;
+
+ {
+ std::unique_lock l{retry_completions_lock};
+ cond.wait(l, [this](){return _stop || !retry_completions.empty();});
+ if (_stop) {
+ return;
+ }
+ retry_completions.swap(comps);
+ }
+
+ for (auto c : comps) {
+ std::unique_ptr<complete_op_data> up{c};
+
+ ldpp_dout(&dpp, 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+
+ RGWRados::BucketShard bs(store);
+ RGWBucketInfo bucket_info;
+
+ int r = bs.init(c->obj.bucket, c->obj, &bucket_info, &dpp);
+ if (r < 0) {
+ ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
+ /* not much to do */
+ continue;
+ }
+
+ r = store->guard_reshard(&dpp, &bs, c->obj, bucket_info,
+ [&](RGWRados::BucketShard *bs) -> int {
+ librados::ObjectWriteOperation o;
+ o.assert_exists(); // bucket index shard must exist
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+ cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+ c->log_op, c->bilog_op, &c->zones_trace);
+ return bs->bucket_obj.operate(&dpp, &o, null_yield);
+ });
+ if (r < 0) {
+ ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
+ /* ignoring error, can't do anything about it */
+ continue;
+ }
+
+ r = store->svc.datalog_rados->add_entry(&dpp, bucket_info, bs.shard_id);
+ if (r < 0) {
+ ldpp_dout(&dpp, -1) << "ERROR: failed writing data log" << dendl;
+ }
+ }
+ }
+}
void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
RGWModifyOp op, string& tag,
entry->rados_completion = librados::Rados::aio_create_completion(entry, obj_complete_cb);
std::lock_guard l{locks[shard_id]};
- completions[shard_id].insert(entry);
+ const auto ok = completions[shard_id].insert(entry).second;
+ ceph_assert(ok);
+}
+
+void RGWIndexCompletionManager::add_completion(complete_op_data *completion) {
+ {
+ std::lock_guard l{retry_completions_lock};
+ retry_completions.push_back(completion);
+ }
+ cond.notify_all();
}
bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
auto iter = comps.find(arg);
if (iter == comps.end()) {
+ ldout(arg->manager->ctx(), 0) << __func__ << "(): cannot find completion for obj=" << arg->key << dendl;
return true;
}
int r = rados_aio_get_return_value(cb);
if (r != -ERR_BUSY_RESHARDING) {
+ ldout(arg->manager->ctx(), 20) << __func__ << "(): completion " <<
+ (r == 0 ? "ok" : "failed with " + to_string(r)) <<
+ " for obj=" << arg->key << dendl;
return true;
}
- completion_thread->add_completion(arg);
+ add_completion(arg);
+ ldout(arg->manager->ctx(), 20) << __func__ << "(): async completion added for obj=" << arg->key << dendl;
return false;
}
}
index_completion_manager = new RGWIndexCompletionManager(this);
- ret = index_completion_manager->start(dpp);
- if (ret < 0) {
- return ret;
- }
ret = rgw::notify::init(cct, store, dpp);
if (ret < 0 ) {
ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;