cerr << "ceph-fuse[" << getpid() << "]: starting fuse" << std::endl;
tester.init(cfuse, client);
- tester.create();
+ tester.create("tester");
r = cfuse->loop();
tester.join(&tester_rp);
tester_r = static_cast<int>(reinterpret_cast<uint64_t>(tester_rp));
pthread_create(&thread_id, NULL, synthetic_client_thread_entry, this);
assert(thread_id);
+ pthread_setname_np(thread_id, "client");
return 0;
}
void Finisher::start()
{
ldout(cct, 10) << __func__ << dendl;
- finisher_thread.create();
+ finisher_thread.create(thread_name.c_str());
}
void Finisher::stop()
/// should be completed in that place instead.
vector<Context*> finisher_queue;
+ string thread_name;
+
/// Queue for contexts for which the complete function will be called
/// with a parameter other than 0.
list<pair<Context*,int> > finisher_queue_rval;
Finisher(CephContext *cct_) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
- logger(0),
+ thread_name("fn_anonymous"), logger(0),
finisher_thread(this) {}
/// Construct a named Finisher that logs its queue length.
- Finisher(CephContext *cct_, string name) :
+ Finisher(CephContext *cct_, string name, string tn) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
- logger(0),
+ thread_name(tn), logger(0),
finisher_thread(this) {
PerfCountersBuilder b(cct, string("finisher-") + name,
l_finisher_first, l_finisher_last);
m_shutdown_rd_fd = pipe_rd;
m_shutdown_wr_fd = pipe_wr;
m_path = path;
- create();
+ create("out_data_socket");
add_cleanup_file(m_path.c_str());
return true;
}
return r;
}
-void Thread::create(size_t stacksize)
+void Thread::create(const char *name, size_t stacksize)
{
int ret = try_create(stacksize);
if (ret != 0) {
"failed with error %d", ret);
dout_emergency(buf);
assert(ret == 0);
+ } else if (thread_id > 0) {
+ assert(strlen(name) < 16);
+ pthread_setname_np(thread_id, name);
}
}
bool am_self() const;
int kill(int signal);
int try_create(size_t stacksize);
- void create(size_t stacksize = 0);
+ void create(const char *name, size_t stacksize = 0);
int join(void **prval = 0);
int detach();
int set_ioprio(int cls, int prio);
{
ldout(cct,10) << "init" << dendl;
thread = new SafeTimerThread(this);
- thread->create();
+ thread->create("safe_timer");
}
void SafeTimer::shutdown()
#define dout_prefix *_dout << name << " "
-ThreadPool::ThreadPool(CephContext *cct_, string nm, int n, const char *option)
- : cct(cct_), name(nm),
+ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
+ : cct(cct_), name(nm), thread_name(tn),
lockname(nm + "::lock"),
_lock(lockname.c_str()), // this should be safe due to declaration order
_stop(false),
if (r < 0)
lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
- wt->create();
+ wt->create(thread_name.c_str());
}
}
}
}
-ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm,
- uint32_t pnum_threads): cct(pcct_),name(nm),lockname(nm + "::lock"),
+ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
+ uint32_t pnum_threads): cct(pcct_),name(nm),thread_name(tn),lockname(nm + "::lock"),
shardedpool_lock(lockname.c_str()),num_threads(pnum_threads),stop_threads(0),
pause_threads(0),drain_threads(0), num_paused(0), num_drained(0), wq(NULL) {}
WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
threads_shardedpool.push_back(wt);
- wt->create();
+ wt->create(thread_name.c_str());
thread_index++;
}
}
class ThreadPool : public md_config_obs_t {
CephContext *cct;
string name;
+ string thread_name;
string lockname;
Mutex _lock;
Cond _cond;
void worker(WorkThread *wt);
public:
- ThreadPool(CephContext *cct_, string nm, int n, const char *option = NULL);
+ ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
virtual ~ThreadPool();
/// return number of threads currently running
CephContext *cct;
string name;
+ string thread_name;
string lockname;
Mutex shardedpool_lock;
Cond shardedpool_cond;
public:
- ShardedThreadPool(CephContext *cct_, string nm, uint32_t pnum_threads);
+ ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
~ShardedThreadPool(){};
register_command("get_command_descriptions", "get_command_descriptions",
m_getdescs_hook, "list available commands");
- create();
+ create("admin_socket");
add_cleanup_file(m_path.c_str());
return true;
}
return;
}
_service_thread = new CephContextServiceThread(this);
- _service_thread->create();
+ _service_thread->create("service");
ceph_spin_unlock(&_service_thread_lock);
// make logs flush on_exit()
pthread_t print_thread;
pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
+ pthread_setname_np(print_thread, "write_stat");
lock.Lock();
data.finished = 0;
data.start_time = ceph_clock_now(cct);
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
+ pthread_setname_np(print_thread, "seq_read_stat");
utime_t finish_time = data.start_time + time_to_run;
//start initial reads
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
+ pthread_setname_np(print_thread, "rand_read_stat");
utime_t finish_time = data.start_time + time_to_run;
//start initial reads
AsyncCompressor::AsyncCompressor(CephContext *c):
compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c),
job_id(0),
- compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", cct->_conf->async_compressor_threads, "async_compressor_threads"),
+ compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
job_lock("AsyncCompressor::job_lock"),
compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
}
assert(r == 0);
// create thread
- create();
+ create("sginal_handler");
}
~SignalHandler() {
}
compact_queue_cond.Signal();
if (!compact_thread.is_started()) {
- compact_thread.create();
+ compact_thread.create("levdbst_compact");
}
}
}
compact_queue_cond.Signal();
if (!compact_thread.is_started()) {
- compact_thread.create();
+ compact_thread.create("rstore_commpact");
}
}
bool RocksDBStore::check_omap_dir(string &omap_dir)
class ThreadPoolSingleton : public ThreadPool {
public:
ThreadPoolSingleton(CephContext *cct)
- : ThreadPool(cct, "librbd::thread_pool", cct->_conf->rbd_op_threads,
+ : ThreadPool(cct, "librbd::thread_pool", "tp_librbd", cct->_conf->rbd_op_threads,
"rbd_op_threads") {
start();
}
pthread_mutex_lock(&m_queue_mutex);
m_stop = false;
pthread_mutex_unlock(&m_queue_mutex);
- create();
+ create("log");
}
void Log::stop()
logger->set(l_mdl_expos, journaler->get_expire_pos());
logger->set(l_mdl_wrpos, journaler->get_write_pos());
- submit_thread.create();
+ submit_thread.create("md_submit");
}
void MDLog::open(MDSInternalContextBase *c)
dout(5) << "open discovering log bounds" << dendl;
recovery_thread.set_completion(c);
- recovery_thread.create();
+ recovery_thread.create("md_recov_open");
- submit_thread.create();
+ submit_thread.create("md_submit");
// either append() or replay() will follow.
}
journaler = NULL;
recovery_thread.set_completion(new C_ReopenComplete(this, c));
- recovery_thread.create();
+ recovery_thread.create("md_recov_reopen");
}
void MDLog::append()
assert(num_events == 0 || already_replayed);
already_replayed = true;
- replay_thread.create();
+ replay_thread.create("md_log_replay");
}
// who is interested in it.
handle_osd_map();
- progress_thread.create();
+ progress_thread.create("mds_rank_progr");
finisher->start();
}
do_dump(false),
dump_fd_binary(-1),
dump_fmt(true),
- io_work(g_ceph_context, "monstore"),
+ io_work(g_ceph_context, "monstore", "fn_monstore"),
is_open(false) {
string::const_reverse_iterator rit;
int pos = 0;
{
if (!started) {
for (uint64_t i = 0; i < workers.size(); ++i) {
- workers[i]->create();
+ workers[i]->create("ms_async_worker");
}
started = true;
}
ldout(msgr->cct,1) << "accepter.start" << dendl;
// start thread
- create();
+ create("ms_accepter");
return 0;
}
{
assert(!stop);
assert(!dispatch_thread.is_started());
- dispatch_thread.create();
- local_delivery_thread.create();
+ dispatch_thread.create("ms_dispatch");
+ local_delivery_thread.create("ms_local");
}
void DispatchQueue::wait()
reader_needs_join = false;
}
reader_running = true;
- reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+ reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
}
void Pipe::maybe_start_delay_thread()
msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
delay_thread = new DelayedDelivery(this);
- delay_thread->create();
+ delay_thread->create("ms_pipe_delay");
}
}
assert(pipe_lock.is_locked());
assert(!writer_running);
writer_running = true;
- writer_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+ writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
}
void Pipe::join_reader()
lock.Unlock();
reaper_started = true;
- reaper_thread.create();
+ reaper_thread.create("ms_reaper");
return 0;
}
derr << __func__ << " failed: " << cpp_strerror(r) << dendl;
return r;
}
- aio_thread.create();
+ aio_thread.create("bstore_aio");
}
return 0;
}
wal_seq(0),
wal_tp(cct,
"BlueStore::wal_tp",
+ "tp_wal",
cct->_conf->bluestore_wal_threads,
"bluestore_wal_threads"),
wal_wq(this,
finisher.start();
wal_tp.start();
- kv_sync_thread.create();
+ kv_sync_thread.create("bstore_kv_sync");
r = _wal_replay();
if (r < 0)
{
write_stop = false;
aio_stop = false;
- write_thread.create();
+ write_thread.create("journal_write");
#ifdef HAVE_LIBAIO
if (aio)
- write_finish_thread.create();
+ write_finish_thread.create("journal_wrt_fin");
#endif
}
fdcache(g_ceph_context),
wbthrottle(g_ceph_context),
next_osr_id(0),
- throttle_ops(g_ceph_context, "filestore_ops",g_conf->filestore_queue_max_ops),
- throttle_bytes(g_ceph_context, "filestore_bytes",g_conf->filestore_queue_max_bytes),
+ throttle_ops(g_ceph_context, "filestore_ops", g_conf->filestore_queue_max_ops),
+ throttle_bytes(g_ceph_context, "filestore_bytes", g_conf->filestore_queue_max_bytes),
m_ondisk_finisher_num(g_conf->filestore_ondisk_finisher_threads),
m_apply_finisher_num(g_conf->filestore_apply_finisher_threads),
- op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
+ op_tp(g_ceph_context, "FileStore::op_tp", "tp_fstore_op", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
logger(NULL),
for (int i = 0; i < m_ondisk_finisher_num; ++i) {
ostringstream oss;
oss << "filestore-ondisk-" << i;
- Finisher *f = new Finisher(g_ceph_context, oss.str());
+ Finisher *f = new Finisher(g_ceph_context, oss.str(), "fn_odsk_fstore");
ondisk_finishers.push_back(f);
}
for (int i = 0; i < m_apply_finisher_num; ++i) {
ostringstream oss;
oss << "filestore-apply-" << i;
- Finisher *f = new Finisher(g_ceph_context, oss.str());
+ Finisher *f = new Finisher(g_ceph_context, oss.str(), "fn_appl_fstore");
apply_finishers.push_back(f);
}
}
wbthrottle.start();
- sync_thread.create();
+ sync_thread.create("filestore_sync");
if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
ret = journal_replay(initial_op_seq);
JournalingObjectStore(const std::string& path)
: ObjectStore(path),
journal(NULL),
- finisher(g_ceph_context, "JournalObjectStore"),
+ finisher(g_ceph_context, "JournalObjectStore", "fn_jrn_objstore"),
apply_manager(journal, finisher),
replaying(false) {}
Mutex::Locker l(lock);
stopping = false;
}
- create();
+ create("wb_throttle");
}
void WBThrottle::stop()
throttle_ops(g_ceph_context, "keyvaluestore_ops", g_conf->keyvaluestore_queue_max_ops),
throttle_bytes(g_ceph_context, "keyvaluestore_bytes", g_conf->keyvaluestore_queue_max_bytes),
op_finisher(g_ceph_context),
- op_tp(g_ceph_context, "KeyValueStore::op_tp",
+ op_tp(g_ceph_context, "KeyValueStore::op_tp", "tp_kvstore",
g_conf->keyvaluestore_op_threads, "keyvaluestore_op_threads"),
op_wq(this, g_conf->keyvaluestore_op_thread_timeout,
g_conf->keyvaluestore_op_thread_suicide_timeout, &op_tp),
goto out_db;
finisher.start();
- kv_sync_thread.create();
+ kv_sync_thread.create("kstore_kv_sync");
mounted = true;
return 0;
watch_timer.init();
agent_timer.init();
- agent_thread.create();
+ agent_thread.create("osd_srv_agent");
}
void OSDService::final_init()
asok_hook(NULL),
osd_compat(get_osd_compat_set()),
state(STATE_INITIALIZING),
- osd_tp(cct, "OSD::osd_tp", cct->_conf->osd_op_threads, "osd_op_threads"),
- osd_op_tp(cct, "OSD::osd_op_tp",
+ osd_tp(cct, "OSD::osd_tp", "tp_osd", cct->_conf->osd_op_threads, "osd_op_threads"),
+ osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
- recovery_tp(cct, "OSD::recovery_tp", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
- disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
- command_tp(cct, "OSD::command_tp", 1),
+ recovery_tp(cct, "OSD::recovery_tp", "tp_osd_recov", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
+ disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
+ command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
paused_recovery(false),
session_waiting_lock("OSD::session_waiting_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
set_disk_tp_priority();
// start the heartbeat
- heartbeat_thread.create();
+ heartbeat_thread.create("osd_srv_heartbt");
// tick
tick_timer.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick(this));
~ObjectCacher();
void start() {
- flusher_thread.create();
+ flusher_thread.create("flusher");
}
void stop() {
assert(flusher_thread.is_started());
}
renew_thread = new ChangesRenewThread(cct, this);
- renew_thread->create();
+ renew_thread->create("rgw_dt_lg_renew");
}
~RGWDataChangesLog();
void RGWGC::start_processor()
{
worker = new GCWorker(cct, this);
- worker->create();
+ worker->create("rgw_gc");
}
void RGWGC::stop_processor()
public:
RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
- : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
+ : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
req_throttle(cct, "rgw_ops", num_threads * 2),
rest(pe->rest),
conf(_conf),
int run() {
assert(pprocess); /* should have initialized by init() */
thread = new RGWProcessControlThread(pprocess);
- thread->create();
+ thread->create("rgw_frontend");
return 0;
}
void RGWObjectExpirer::start_processor()
{
worker = new OEWorker(store->ctx(), this);
- worker->create();
+ worker->create("rgw_obj_expirer");
}
void RGWObjectExpirer::stop_processor()
rwlock("RGWUserStatsCache::rwlock") {
if (quota_threads) {
buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
- buckets_sync_thread->create();
+ buckets_sync_thread->create("rgw_buck_st_syn");
user_sync_thread = new UserSyncThread(store->ctx(), this);
- user_sync_thread->create();
+ user_sync_thread->create("rgw_user_st_syn");
} else {
buckets_sync_thread = NULL;
user_sync_thread = NULL;
keystone_token_cache = new RGWKeystoneTokenCache(cct, cct->_conf->rgw_keystone_token_cache_size);
keystone_revoke_thread = new KeystoneRevokeThread(cct, this);
- keystone_revoke_thread->create();
+ keystone_revoke_thread->create("rgw_swift_k_rev");
}
do_fadvise(do_fadvise),
sync_interval(sync_interval),
sync_fd(sync_fd),
- tp(cct, "DumbBackend::tp", worker_threads),
+ tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads),
thread(this),
sync_loop_mutex("DumbBackend::sync_loop_mutex"),
sync_loop_stop(0),
pending_commit_mutex("DumbBackend::pending_commit_mutex"),
queue(this, 20, &tp) {
- thread.create();
+ thread.create("thread");
tp.start();
for (unsigned i = 0; i < 10*worker_threads; ++i) {
sem.Put();
for (vector<ceph::shared_ptr<Bencher> >::iterator i = benchers.begin();
i != benchers.end();
++i) {
- (*i)->create();
+ (*i)->create("bencher");
}
for (vector<ceph::shared_ptr<Bencher> >::iterator i = benchers.begin();
i != benchers.end();
if (*i == 'q') {
ThreadPool *tp =
new ThreadPool(
- g_ceph_context, ss.str(), vm["num-threads"].as<unsigned>(), 0);
+ g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as<unsigned>(), 0);
wqs.push_back(
new WQWrapper(
new PassAlong(tp, wqs.back()),
list<T*> ls;
for (int i=0; i<threads; i++) {
T *t = new T(num);
- t->create();
+ t->create("t");
ls.push_back(t);
}
ASSERT_FALSE(throttle.get_or_fail(throttle_max));
Thread_get t(throttle, 7);
- t.create();
+ t.create("t_throttle_1");
usleep(delay);
ASSERT_EQ(throttle.put(throttle_max), 0);
t.join();
ASSERT_FALSE(throttle.get_or_fail(throttle_max));
Thread_get t(throttle, throttle_max);
- t.create();
+ t.create("t_throttle_2");
usleep(delay);
Thread_get u(throttle, 1);
- u.create();
+ u.create("u_throttle_2");
usleep(delay);
throttle.put(throttle_max / 2);
ASSERT_FALSE(throttle.get_or_fail(throttle_max));
Thread_get t(throttle, throttle_max);
- t.create();
+ t.create("t_throttle_3");
usleep(delay);
//
ASSERT_FALSE(throttle->get(5));
t = new Thread_get(*throttle, 7);
- t->create();
+ t->create("t_throttle");
bool blocked;
useconds_t delay = 1;
do {
EXPECT_FALSE(cache.get_weak_refs()[key].first.lock());
Thread_wait t(cache, key, value, Thread_wait::LOOKUP);
- t.create();
+ t.create("wait_lookup_1");
ASSERT_TRUE(wait_for(cache, 1));
EXPECT_EQ(value, *t.ptr);
// waiting on a key does not block lookups on other keys
EXPECT_FALSE(cache.get_weak_refs()[key].first.lock());
Thread_wait t(cache, key, value, Thread_wait::LOOKUP);
- t.create();
+ t.create("wait_lookup_2");
ASSERT_TRUE(wait_for(cache, 1));
EXPECT_EQ(value, *t.ptr);
// waiting on a key does not block lookups on other keys
EXPECT_FALSE(cache.get_weak_refs()[key].first.lock());
Thread_wait t(cache, key, value, Thread_wait::LOWER_BOUND);
- t.create();
+ t.create("wait_lower_bnd");
ASSERT_TRUE(wait_for(cache, 1));
EXPECT_FALSE(t.ptr);
// waiting on a key does not block getting lower_bound on other keys
EXPECT_FALSE(registry.get_contents()[key].first.lock());
Thread_wait t(registry, key, 0, Thread_wait::LOOKUP_OR_CREATE);
- t.create();
+ t.create("wait_lookcreate");
ASSERT_TRUE(wait_for(registry, 1));
EXPECT_FALSE(t.ptr);
// waiting on a key does not block lookups on other keys
EXPECT_FALSE(registry.get_contents()[key].first.lock());
Thread_wait t(registry, key, value, Thread_wait::LOOKUP_OR_CREATE);
- t.create();
+ t.create("wait_lookcreate");
ASSERT_TRUE(wait_for(registry, 1));
EXPECT_FALSE(t.ptr);
// waiting on a key does not block lookups on other keys
EXPECT_FALSE(registry.get_contents()[key].first.lock());
Thread_wait t(registry, key, value, Thread_wait::LOOKUP);
- t.create();
+ t.create("wait_lookup");
ASSERT_TRUE(wait_for(registry, 1));
EXPECT_EQ(value, *t.ptr);
// waiting on a key does not block lookups on other keys
useconds_t delay = 0;
const useconds_t DELAY_MAX = 20 * 1000 * 1000;
Thread_factory sleep_forever;
- sleep_forever.create();
+ sleep_forever.create("sleep_forever");
do {
cout << "Trying (1) with delay " << delay << "us\n";
if (delay > 0)
ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
RBDWriter writer(image1);
- writer.create();
+ writer.create("rbd_writer");
int num_snaps = 10;
for (int i = 0; i < num_snaps; ++i) {
}
void start() {
for (uint64_t i = 0; i < clients.size(); ++i)
- clients[i]->create();
+ clients[i]->create("client");
for (uint64_t i = 0; i < msgrs.size(); ++i)
msgrs[i]->wait();
}
public:
ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay),
- op_tp(g_ceph_context, "ServerDispatcher::op_tp", threads, "serverdispatcher_op_threads"),
+ op_tp(g_ceph_context, "ServerDispatcher::op_tp", "tp_serv_disp", threads, "serverdispatcher_op_threads"),
op_wq(30, 30, &op_tp) {
op_tp.start();
}
atomic_t count(0);
Mutex lock("DispatchTest::lock");
Cond cond;
- worker1.create();
- worker2.create();
+ worker1.create("worker_1");
+ worker2.create("worker_2");
for (int i = 0; i < 10000; ++i) {
count.inc();
worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
EXPECT_EQ(1, obc.unstable_writes);
Thread_read_lock t(obc);
- t.create();
+ t.create("obc_read");
do {
cout << "Trying (1) with delay " << delay << "us\n";
EXPECT_EQ(0, obc.unstable_writes);
Thread_write_lock t(obc);
- t.create();
+ t.create("obc_write");
do {
cout << "Trying (3) with delay " << delay << "us\n";
CondPingPong(): mutex("CondPingPong::mutex"), prod(0), cons(0), count(10000), consumer(this) {}
double run() {
- consumer.create();
+ consumer.create("consumer");
uint64_t start = Cycles::rdtsc();
produce();
uint64_t stop = Cycles::rdtsc();
CenterWorker worker(g_ceph_context);
atomic_t flag(1);
- worker.create();
+ worker.create("evt_center_disp");
EventCallbackRef count_event(new CountEvent(&flag));
worker.center.dispatch_event_external(count_event);
ThreadHelper thread;
uint64_t start = Cycles::rdtsc();
for (int i = 0; i < count; i++) {
- thread.create();
+ thread.create("thread_helper");
thread.join();
}
uint64_t stop = Cycles::rdtsc();
public:
PausyAsyncMap() : lock("PausyAsyncMap"), doer(this) {
- doer.create();
+ doer.create("doer");
}
~PausyAsyncMap() {
doer.join();
WatchNotifyTestCtx ctx;
WatcherUnwatcher *thr = new WatcherUnwatcher(pool_name);
- thr->create();
+ thr->create("watcher_unwatch");
ASSERT_EQ(0, nioctx.create("foo", false));
for (unsigned i = 0; i < 75; ++i) {
}
dout(0) << "starting thread" << dendl;
- foo.create();
+ foo.create("foo");
dout(0) << "starting op" << dendl;
fs->apply_transaction(&osr, t);
TEST(WorkQueue, StartStop)
{
- ThreadPool tp(g_ceph_context, "foo", 10, "");
+ ThreadPool tp(g_ceph_context, "foo", "tp_foo", 10, "");
tp.start();
tp.pause();
TEST(WorkQueue, Resize)
{
- ThreadPool tp(g_ceph_context, "bar", 2, "osd_op_threads");
+ ThreadPool tp(g_ceph_context, "bar", "tp_bar", 2, "osd_op_threads");
tp.start();
objecter(NULL),
lock("MDSUtility::lock"),
timer(g_ceph_context, lock),
- finisher(g_ceph_context, "MDSUtility"),
+ finisher(g_ceph_context, "MDSUtility", "fn_mds_utility"),
waiting_for_mds_map(NULL)
{
monc = new MonClient(g_ceph_context);
if (!started) {
started = true;
- reader_thread.create();
- writer_thread.create();
+ reader_thread.create("rbd_reader");
+ writer_thread.create("rbd_writer");
}
}