RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http(store->ctx(), crs.get_completion_mgr());
- int ret = http.set_threaded();
+ int ret = http.start();
if (ret < 0) {
cerr << "failed to initialize http client with " << cpp_strerror(ret) << std::endl;
return -ret;
if (opt_cmd == OPT_BILOG_AUTOTRIM) {
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http(store->ctx(), crs.get_completion_mgr());
- int ret = http.set_threaded();
+ int ret = http.start();
if (ret < 0) {
cerr << "failed to initialize http client with " << cpp_strerror(ret) << std::endl;
return -ret;
return 0;
}
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
// cannot run concurrently with run_sync(), so run in a separate manager
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWDataSyncEnv sync_env_local = sync_env;
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWDataSyncEnv sync_env_local = sync_env;
return -EINVAL;
}
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
* RGWHTTPManager has two modes of operation: threaded and non-threaded.
*/
RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
- completion_mgr(_cm), is_threaded(false),
+ completion_mgr(_cm), is_started(false),
reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
reqs_thread(NULL)
{
register_request(req_data);
- if (!is_threaded) {
+ if (!is_started) {
ret = link_request(req_data);
if (ret < 0) {
req_data->put();
{
rgw_http_req_data *req_data = client->get_req_data();
- if (!is_threaded) {
+ if (!is_started) {
unlink_request(req_data);
return 0;
}
assert(req_data->lock.is_locked());
/* can only do that if threaded */
- if (!is_threaded) {
+ if (!is_started) {
return -EINVAL;
}
return 0;
}
-int RGWHTTPManager::set_threaded()
+int RGWHTTPManager::start()
{
int r = pipe(thread_pipe);
if (r < 0) {
thread_pipe[1], thread_pipe[0]);
#endif
- is_threaded = true;
+ is_started = true;
reqs_thread = new ReqsThread(this);
reqs_thread->create("http_manager");
return 0;
is_stopped = true;
- if (is_threaded) {
+ if (is_started) {
going_down = true;
signal_thread();
reqs_thread->join();
{
curl_global_init(CURL_GLOBAL_ALL);
rgw_http_manager = new RGWHTTPManager(cct);
- rgw_http_manager->set_threaded();
+ rgw_http_manager->start();
}
void rgw_http_client_cleanup()
CephContext *cct;
RGWCompletionManager *completion_mgr;
void *multi_handle;
- bool is_threaded;
+ bool is_started;
std::atomic<unsigned> going_down { 0 };
std::atomic<unsigned> is_stopped { 0 };
RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
~RGWHTTPManager();
- int set_threaded();
+ int start();
void stop();
int add_request(RGWHTTPClient *client, bool send_data_hint = false);
http(cct, coroutines.get_completion_mgr()),
push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns)))
{
- http.set_threaded();
- // must spawn the CR thread after set_threaded
+ http.start();
+ // must spawn the CR thread after start
thread = std::thread([this] { coroutines.run(push_all.get()); });
}
~CRThread()
public:
RGWMetaNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
http_manager(store->ctx(), completion_mgr) {
- http_manager.set_threaded();
+ http_manager.start();
}
int notify_all(map<string, RGWRESTConn *>& conn_map, set<int>& shards) {
public:
RGWDataNotifierManager(RGWRados *_store) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
http_manager(store->ctx(), completion_mgr) {
- http_manager.set_threaded();
+ http_manager.start();
}
int notify_all(map<string, RGWRESTConn *>& conn_map, map<int, set<string> >& shards) {
{}
int init() override {
- return http.set_threaded();
+ return http.start();
}
int process() override {
list<RGWCoroutinesStack*> stacks;
{
conn = store->rest_master_conn;
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
// cannot run concurrently with run_sync(), so run in a separate manager
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.set_threaded();
+ int ret = http_manager.start();
if (ret < 0) {
- ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
return ret;
}
RGWMetaSyncEnv sync_env_local = sync_env;
auto cct = g_ceph_context;
RGWHTTPManager http(cct);
- ASSERT_EQ(0, http.set_threaded());
+ ASSERT_EQ(0, http.start());
// default pipe buffer size according to man pipe
constexpr size_t max_pipe_buffer_size = 65536;