Use the Lockable convention.
Signed-off-by: Sage Weil <sage@redhat.com>
*done = false;
}
void finish(int r) override {
- lock->Lock();
+ lock->lock();
if (rval)
*rval = r;
*done = true;
cond->Signal();
- lock->Unlock();
+ lock->unlock();
}
};
void Finisher::stop()
{
ldout(cct, 10) << __func__ << dendl;
- finisher_lock.Lock();
+ finisher_lock.lock();
finisher_stop = true;
// we don't have any new work to do, but we want the worker to wake up anyway
// to process the stop condition.
finisher_cond.Signal();
- finisher_lock.Unlock();
+ finisher_lock.unlock();
finisher_thread.join(); // wait until the worker exits completely
ldout(cct, 10) << __func__ << " finish" << dendl;
}
void Finisher::wait_for_empty()
{
- finisher_lock.Lock();
+ finisher_lock.lock();
while (!finisher_queue.empty() || finisher_running) {
ldout(cct, 10) << "wait_for_empty waiting" << dendl;
finisher_empty_wait = true;
}
ldout(cct, 10) << "wait_for_empty empty" << dendl;
finisher_empty_wait = false;
- finisher_lock.Unlock();
+ finisher_lock.unlock();
}
void *Finisher::finisher_thread_entry()
{
- finisher_lock.Lock();
+ finisher_lock.lock();
ldout(cct, 10) << "finisher_thread start" << dendl;
utime_t start;
vector<pair<Context*,int>> ls;
ls.swap(finisher_queue);
finisher_running = true;
- finisher_lock.Unlock();
+ finisher_lock.unlock();
ldout(cct, 10) << "finisher_thread doing " << ls << dendl;
if (logger) {
logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
}
- finisher_lock.Lock();
+ finisher_lock.lock();
finisher_running = false;
}
ldout(cct, 10) << "finisher_thread empty" << dendl;
ldout(cct, 10) << "finisher_thread stop" << dendl;
finisher_stop = false;
- finisher_lock.Unlock();
+ finisher_lock.unlock();
return 0;
}
public:
/// Add a context to complete, optionally specifying a parameter for the complete function.
void queue(Context *c, int r = 0) {
- finisher_lock.Lock();
+ finisher_lock.lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
}
finisher_queue.push_back(make_pair(c, r));
if (logger)
logger->inc(l_finisher_queue_len);
- finisher_lock.Unlock();
+ finisher_lock.unlock();
}
void queue(list<Context*>& ls) {
- finisher_lock.Lock();
+ finisher_lock.lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
}
}
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
- finisher_lock.Unlock();
+ finisher_lock.unlock();
ls.clear();
}
void queue(deque<Context*>& ls) {
- finisher_lock.Lock();
+ finisher_lock.lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
}
}
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
- finisher_lock.Unlock();
+ finisher_lock.unlock();
ls.clear();
}
void queue(vector<Context*>& ls) {
- finisher_lock.Lock();
+ finisher_lock.lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
}
}
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
- finisher_lock.Unlock();
+ finisher_lock.unlock();
ls.clear();
}
{
bufferlist bl;
- m_lock.Lock();
+ m_lock.lock();
init_connection(bl);
- m_lock.Unlock();
+ m_lock.unlock();
if (bl.length()) {
/* need to special case the connection init buffer output, as it needs
return;
do {
- m_lock.Lock();
+ m_lock.lock();
cond.Wait(m_lock);
if (going_down) {
- m_lock.Unlock();
+ m_lock.unlock();
break;
}
- m_lock.Unlock();
+ m_lock.unlock();
ret = dump_data(fd);
} while (ret >= 0);
int OutputDataSocket::dump_data(int fd)
{
- m_lock.Lock();
+ m_lock.lock();
list<bufferlist> l = std::move(data);
data.clear();
data_size = 0;
- m_lock.Unlock();
+ m_lock.unlock();
for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) {
bufferlist& bl = *iter;
void OutputDataSocket::shutdown()
{
- m_lock.Lock();
+ m_lock.lock();
going_down = true;
cond.Signal();
- m_lock.Unlock();
+ m_lock.unlock();
if (m_shutdown_wr_fd < 0)
return;
}
void enqueue(const T& entry) {
- lock.Lock();
+ lock.lock();
if (entries.empty()) {
cond.Signal();
}
entries.push_back(entry);
- lock.Unlock();
+ lock.unlock();
}
void dequeue(T *entry) {
- lock.Lock();
+ lock.lock();
if (entries.empty()) {
cond.Wait(lock);
};
ceph_assert(!entries.empty());
*entry = entries.front();
entries.pop_front();
- lock.Unlock();
+ lock.unlock();
};
};
}
Readahead::extent_t Readahead::update(const vector<extent_t>& extents, uint64_t limit) {
- m_lock.Lock();
+ m_lock.lock();
for (vector<extent_t>::const_iterator p = extents.begin(); p != extents.end(); ++p) {
_observe_read(p->first, p->second);
}
if (m_readahead_pos >= limit|| m_last_pos >= limit) {
- m_lock.Unlock();
+ m_lock.unlock();
return extent_t(0, 0);
}
pair<uint64_t, uint64_t> extent = _compute_readahead(limit);
- m_lock.Unlock();
+ m_lock.unlock();
return extent;
}
Readahead::extent_t Readahead::update(uint64_t offset, uint64_t length, uint64_t limit) {
- m_lock.Lock();
+ m_lock.lock();
_observe_read(offset, length);
if (m_readahead_pos >= limit || m_last_pos >= limit) {
- m_lock.Unlock();
+ m_lock.unlock();
return extent_t(0, 0);
}
extent_t extent = _compute_readahead(limit);
- m_lock.Unlock();
+ m_lock.unlock();
return extent;
}
void Readahead::inc_pending(int count) {
ceph_assert(count > 0);
- m_pending_lock.Lock();
+ m_pending_lock.lock();
m_pending += count;
- m_pending_lock.Unlock();
+ m_pending_lock.unlock();
}
void Readahead::dec_pending(int count) {
ceph_assert(count > 0);
- m_pending_lock.Lock();
+ m_pending_lock.lock();
ceph_assert(m_pending >= count);
m_pending -= count;
if (m_pending == 0) {
std::list<Context *> pending_waiting(std::move(m_pending_waiting));
- m_pending_lock.Unlock();
+ m_pending_lock.unlock();
for (auto ctx : pending_waiting) {
ctx->complete(0);
}
} else {
- m_pending_lock.Unlock();
+ m_pending_lock.unlock();
}
}
}
void Readahead::wait_for_pending(Context *ctx) {
- m_pending_lock.Lock();
+ m_pending_lock.lock();
if (m_pending > 0) {
- m_pending_lock.Unlock();
+ m_pending_lock.unlock();
m_pending_waiting.push_back(ctx);
return;
}
- m_pending_lock.Unlock();
+ m_pending_lock.unlock();
ctx->complete(0);
}
void Readahead::set_trigger_requests(int trigger_requests) {
- m_lock.Lock();
+ m_lock.lock();
m_trigger_requests = trigger_requests;
- m_lock.Unlock();
+ m_lock.unlock();
}
uint64_t Readahead::get_min_readahead_size(void) {
}
void Readahead::set_min_readahead_size(uint64_t min_readahead_size) {
- m_lock.Lock();
+ m_lock.lock();
m_readahead_min_bytes = min_readahead_size;
- m_lock.Unlock();
+ m_lock.unlock();
}
void Readahead::set_max_readahead_size(uint64_t max_readahead_size) {
- m_lock.Lock();
+ m_lock.lock();
m_readahead_max_bytes = max_readahead_size;
- m_lock.Unlock();
+ m_lock.unlock();
}
void Readahead::set_alignments(const vector<uint64_t> &alignments) {
- m_lock.Lock();
+ m_lock.lock();
m_alignments = alignments;
- m_lock.Unlock();
+ m_lock.unlock();
}
void Put()
{
- m.Lock();
+ m.lock();
count++;
c.Signal();
- m.Unlock();
+ m.unlock();
}
void Get()
{
- m.Lock();
+ m.lock();
while(count <= 0) {
c.Wait(m);
}
count--;
- m.Unlock();
+ m.unlock();
}
};
cancel_all_events();
stopping = true;
cond.Signal();
- lock.Unlock();
+ lock.unlock();
thread->join();
- lock.Lock();
+ lock.lock();
delete thread;
thread = NULL;
}
void SafeTimer::timer_thread()
{
- lock.Lock();
+ lock.lock();
ldout(cct,10) << "timer_thread starting" << dendl;
while (!stopping) {
utime_t now = ceph_clock_now();
ldout(cct,10) << "timer_thread executing " << callback << dendl;
if (!safe_callbacks)
- lock.Unlock();
+ lock.unlock();
callback->complete(0);
if (!safe_callbacks)
- lock.Lock();
+ lock.lock();
}
// recheck stopping if we dropped the lock
ldout(cct,20) << "timer_thread awake" << dendl;
}
ldout(cct,10) << "timer_thread exiting" << dendl;
- lock.Unlock();
+ lock.unlock();
}
Context* SafeTimer::add_event_after(double seconds, Context *callback)
int v = atoi(buf);
free(buf);
if (v >= 0) {
- _lock.Lock();
+ _lock.lock();
_num_threads = v;
start_threads();
_cond.SignalAll();
- _lock.Unlock();
+ _lock.unlock();
}
}
}
void ThreadPool::worker(WorkThread *wt)
{
- _lock.Lock();
+ _lock.lock();
ldout(cct,10) << "worker start" << dendl;
std::stringstream ss;
<< " (" << processing << " active)" << dendl;
TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
tp_handle.reset_tp_timeout();
- _lock.Unlock();
+ _lock.unlock();
wq->_void_process(item, tp_handle);
- _lock.Lock();
+ _lock.lock();
wq->_void_process_finish(item);
processing--;
ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
cct->get_heartbeat_map()->remove_worker(hb);
- _lock.Unlock();
+ _lock.unlock();
}
void ThreadPool::start_threads()
cct->_conf.add_observer(this);
}
- _lock.Lock();
+ _lock.lock();
start_threads();
- _lock.Unlock();
+ _lock.unlock();
ldout(cct,15) << "started" << dendl;
}
cct->_conf.remove_observer(this);
}
- _lock.Lock();
+ _lock.lock();
_stop = true;
_cond.Signal();
join_old_threads();
- _lock.Unlock();
+ _lock.unlock();
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
++p) {
delete *p;
}
_threads.clear();
- _lock.Lock();
+ _lock.lock();
for (unsigned i=0; i<work_queues.size(); i++)
work_queues[i]->_clear();
_stop = false;
- _lock.Unlock();
+ _lock.unlock();
ldout(cct,15) << "stopped" << dendl;
}
void ThreadPool::pause()
{
ldout(cct,10) << "pause" << dendl;
- _lock.Lock();
+ _lock.lock();
_pause++;
while (processing)
_wait_cond.Wait(_lock);
- _lock.Unlock();
+ _lock.unlock();
ldout(cct,15) << "paused" << dendl;
}
void ThreadPool::pause_new()
{
ldout(cct,10) << "pause_new" << dendl;
- _lock.Lock();
+ _lock.lock();
_pause++;
- _lock.Unlock();
+ _lock.unlock();
}
void ThreadPool::unpause()
{
ldout(cct,10) << "unpause" << dendl;
- _lock.Lock();
+ _lock.lock();
ceph_assert(_pause > 0);
_pause--;
_cond.Signal();
- _lock.Unlock();
+ _lock.unlock();
}
void ThreadPool::drain(WorkQueue_* wq)
{
ldout(cct,10) << "drain" << dendl;
- _lock.Lock();
+ _lock.lock();
_draining++;
while (processing || (wq != NULL && !wq->_empty()))
_wait_cond.Wait(_lock);
_draining--;
- _lock.Unlock();
+ _lock.unlock();
}
void ThreadPool::set_ioprio(int cls, int priority)
while (!stop_threads) {
if (pause_threads) {
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
++num_paused;
wait_cond.Signal();
while (pause_threads) {
cct->_conf->threadpool_empty_queue_max_wait, 0));
}
--num_paused;
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
}
if (drain_threads) {
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
if (wq->is_shard_empty(thread_index)) {
++num_drained;
wait_cond.Signal();
}
--num_drained;
}
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
}
cct->get_heartbeat_map()->reset_timeout(
{
ldout(cct,10) << "start" << dendl;
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
start_threads();
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
ldout(cct,15) << "started" << dendl;
}
void ShardedThreadPool::pause()
{
ldout(cct,10) << "pause" << dendl;
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
pause_threads = true;
ceph_assert(wq != NULL);
wq->return_waiting_threads();
while (num_threads != num_paused){
wait_cond.Wait(shardedpool_lock);
}
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
ldout(cct,10) << "paused" << dendl;
}
void ShardedThreadPool::pause_new()
{
ldout(cct,10) << "pause_new" << dendl;
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
pause_threads = true;
ceph_assert(wq != NULL);
wq->return_waiting_threads();
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
ldout(cct,10) << "paused_new" << dendl;
}
void ShardedThreadPool::unpause()
{
ldout(cct,10) << "unpause" << dendl;
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
pause_threads = false;
wq->stop_return_waiting_threads();
shardedpool_cond.Signal();
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
ldout(cct,10) << "unpaused" << dendl;
}
void ShardedThreadPool::drain()
{
ldout(cct,10) << "drain" << dendl;
- shardedpool_lock.Lock();
+ shardedpool_lock.lock();
drain_threads = true;
ceph_assert(wq != NULL);
wq->return_waiting_threads();
drain_threads = false;
wq->stop_return_waiting_threads();
shardedpool_cond.Signal();
- shardedpool_lock.Unlock();
+ shardedpool_lock.unlock();
ldout(cct,10) << "drained" << dendl;
}
}
bool queue(T *item) {
- pool->_lock.Lock();
+ pool->_lock.lock();
bool r = _enqueue(item);
pool->_cond.SignalOne();
- pool->_lock.Unlock();
+ pool->_lock.unlock();
return r;
}
void dequeue(T *item) {
- pool->_lock.Lock();
+ pool->_lock.lock();
_dequeue(item);
- pool->_lock.Unlock();
+ pool->_lock.unlock();
}
void clear() {
- pool->_lock.Lock();
+ pool->_lock.lock();
_clear();
- pool->_lock.Unlock();
+ pool->_lock.unlock();
}
void lock() {
return ((void*)1); // Not used
}
void _void_process(void *, TPHandle &handle) override {
- _lock.Lock();
+ _lock.lock();
ceph_assert(!to_process.empty());
U u = to_process.front();
to_process.pop_front();
- _lock.Unlock();
+ _lock.unlock();
_process(u, handle);
- _lock.Lock();
+ _lock.lock();
to_finish.push_back(u);
- _lock.Unlock();
+ _lock.unlock();
}
void _void_process_finish(void *) override {
- _lock.Lock();
+ _lock.lock();
ceph_assert(!to_finish.empty());
U u = to_finish.front();
to_finish.pop_front();
- _lock.Unlock();
+ _lock.unlock();
_process_finish(u);
}
}
bool queue(T *item) {
- pool->_lock.Lock();
+ pool->_lock.lock();
bool r = _enqueue(item);
pool->_cond.SignalOne();
- pool->_lock.Unlock();
+ pool->_lock.unlock();
return r;
}
void dequeue(T *item) {
- pool->_lock.Lock();
+ pool->_lock.lock();
_dequeue(item);
- pool->_lock.Unlock();
+ pool->_lock.unlock();
}
void clear() {
- pool->_lock.Lock();
+ pool->_lock.lock();
_clear();
- pool->_lock.Unlock();
+ pool->_lock.unlock();
}
Mutex &get_lock() {
/// take thread pool lock
void lock() {
- _lock.Lock();
+ _lock.lock();
}
/// release thread pool lock
void unlock() {
- _lock.Unlock();
+ _lock.unlock();
}
/// wait for a kick on this thread pool
#ifdef HAVE_RES_NQUERY
int DNSResolver::get_state(CephContext *cct, res_state *ps)
{
- lock.Lock();
+ lock.lock();
if (!states.empty()) {
res_state s = states.front();
states.pop_front();
- lock.Unlock();
+ lock.unlock();
*ps = s;
return 0;
}
- lock.Unlock();
+ lock.unlock();
struct __res_state *s = new struct __res_state;
s->options = 0;
if (res_ninit(s) < 0) {
double bandwidth;
int iops = 0;
mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
- bencher->lock.Lock();
+ bencher->lock.lock();
if (formatter)
formatter->open_array_section("datas");
while(!data.done) {
std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
}
- bencher->lock.Unlock();
+ bencher->lock.unlock();
return NULL;
}
}
char* contentsChars = new char[op_size];
- lock.Lock();
+ lock.lock();
data.done = false;
data.hints = hints;
data.object_size = object_size;
data.avg_latency = 0;
data.latency_diff_sum = 0;
data.object_contents = contentsChars;
- lock.Unlock();
+ lock.unlock();
//fill in contentsChars deterministically so we can check returns
sanitize_object_contents(&data, data.op_size);
void _aio_cb(void *cb, void *arg) {
struct lock_cond *lc = (struct lock_cond *)arg;
- lc->lock->Lock();
+ lc->lock->lock();
lc->cond.Signal();
- lc->lock->Unlock();
+ lc->lock->unlock();
}
int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
ceph_pthread_setname(print_thread, "write_stat");
- lock.Lock();
+ lock.lock();
data.finished = 0;
data.start_time = mono_clock::now();
- lock.Unlock();
+ lock.unlock();
for (int i = 0; i<concurrentios; ++i) {
start_times[i] = mono_clock::now();
r = create_completion(i, _aio_cb, (void *)&lc);
if (r < 0) { //naughty, doesn't clean up heap
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
}
//keep on adding new writes as old ones complete until we've passed minimum time
stopTime = data.start_time + std::chrono::seconds(secondsToRun);
slot = 0;
- lock.Lock();
+ lock.lock();
while (secondsToRun && mono_clock::now() < stopTime) {
bool found = false;
while (1) {
break;
lc.cond.Wait(lock);
}
- lock.Unlock();
+ lock.unlock();
//create new contents and name on the heap, and fill them
newName = generate_object_name_fast(data.started / writes_per_object);
newContents = contents[slot];
newContents->invalidate_crc();
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r != 0) {
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
data.cur_latency = mono_clock::now() - start_times[slot];
data.avg_latency = total_latency / data.finished;
data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
--data.in_flight;
- lock.Unlock();
+ lock.unlock();
release_completion(slot);
//write new stuff to backend
goto ERR;
}
name[slot] = newName;
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
if (data.op_size) {
break;
}
}
- lock.Unlock();
+ lock.unlock();
while (data.finished < data.started) {
slot = data.finished % concurrentios;
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r != 0) {
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
data.cur_latency = mono_clock::now() - start_times[slot];
data.avg_latency = total_latency / data.finished;
data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
--data.in_flight;
- lock.Unlock();
+ lock.unlock();
release_completion(slot);
delete contents[slot];
contents[slot] = 0;
}
timePassed = mono_clock::now() - data.start_time;
- lock.Lock();
+ lock.lock();
data.done = true;
- lock.Unlock();
+ lock.unlock();
pthread_join(print_thread, NULL);
return 0;
ERR:
- lock.Lock();
+ lock.lock();
data.done = 1;
- lock.Unlock();
+ lock.unlock();
pthread_join(print_thread, NULL);
for (int i = 0; i < concurrentios; i++)
if (contents[i])
contents[i] = new bufferlist();
}
- lock.Lock();
+ lock.lock();
data.finished = 0;
data.start_time = mono_clock::now();
- lock.Unlock();
+ lock.unlock();
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
}
//keep on adding new reads as old ones complete
slot = 0;
while ((seconds_to_run && mono_clock::now() < finish_time) &&
num_objects > data.started) {
- lock.Lock();
+ lock.lock();
int old_slot = slot;
bool found = false;
while (1) {
newName = generate_object_name_fast(data.started / writes_per_object, pid);
index[slot] = data.started;
- lock.Unlock();
+ lock.unlock();
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
total_latency += data.cur_latency.count();
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
- lock.Unlock();
+ lock.unlock();
release_completion(slot);
//start new read and check data if requested
if (r < 0) {
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
name[slot] = newName;
}
while (data.finished < data.started) {
slot = data.finished % concurrentios;
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
data.cur_latency = mono_clock::now() - start_times[slot];
release_completion(slot);
if (!no_verify) {
snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
- lock.Unlock();
+ lock.unlock();
if ((contents[slot]->length() != data.op_size) ||
(memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
} else {
- lock.Unlock();
+ lock.unlock();
}
delete contents[slot];
}
timePassed = mono_clock::now() - data.start_time;
- lock.Lock();
+ lock.lock();
data.done = true;
- lock.Unlock();
+ lock.unlock();
pthread_join(print_thread, NULL);
return (errors > 0 ? -EIO : 0);
ERR:
- lock.Lock();
+ lock.lock();
data.done = 1;
- lock.Unlock();
+ lock.unlock();
pthread_join(print_thread, NULL);
return r;
}
contents[i] = new bufferlist();
}
- lock.Lock();
+ lock.lock();
data.finished = 0;
data.start_time = mono_clock::now();
- lock.Unlock();
+ lock.unlock();
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
}
//keep on adding new reads as old ones complete
slot = 0;
while ((seconds_to_run && mono_clock::now() < finish_time)) {
- lock.Lock();
+ lock.lock();
int old_slot = slot;
bool found = false;
while (1) {
// calculate latency here, so memcmp doesn't inflate it
data.cur_latency = mono_clock::now() - start_times[slot];
- lock.Unlock();
+ lock.unlock();
int current_index = index[slot];
cur_contents = contents[slot];
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
- lock.Unlock();
+ lock.unlock();
if (!no_verify) {
snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
if (r < 0) {
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
name[slot] = newName;
}
while (data.finished < data.started) {
slot = data.finished % concurrentios;
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
data.cur_latency = mono_clock::now() - start_times[slot];
release_completion(slot);
if (!no_verify) {
snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
- lock.Unlock();
+ lock.unlock();
if ((contents[slot]->length() != data.op_size) ||
(memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
} else {
- lock.Unlock();
+ lock.unlock();
}
delete contents[slot];
}
timePassed = mono_clock::now() - data.start_time;
- lock.Lock();
+ lock.lock();
data.done = true;
- lock.Unlock();
+ lock.unlock();
pthread_join(print_thread, NULL);
return (errors > 0 ? -EIO : 0);
ERR:
- lock.Lock();
+ lock.lock();
data.done = 1;
- lock.Unlock();
+ lock.unlock();
pthread_join(print_thread, NULL);
return r;
}
int r = 0;
int slot = 0;
- lock.Lock();
+ lock.lock();
data.done = false;
data.in_flight = 0;
data.started = 0;
data.finished = 0;
- lock.Unlock();
+ lock.unlock();
// don't start more completions than files
if (num_objects == 0) {
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
}
//keep on adding new removes as old ones complete
while (data.started < num_objects) {
- lock.Lock();
+ lock.lock();
int old_slot = slot;
bool found = false;
while (1) {
}
lc.cond.Wait(lock);
}
- lock.Unlock();
+ lock.unlock();
newName = generate_object_name_fast(data.started, prevPid);
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
- lock.Unlock();
+ lock.unlock();
release_completion(slot);
//start new remove and check data if requested
if (r < 0) {
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
name[slot] = newName;
}
while (data.finished < data.started) {
slot = data.finished % concurrentios;
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
release_completion(slot);
- lock.Unlock();
+ lock.unlock();
}
- lock.Lock();
+ lock.lock();
data.done = true;
- lock.Unlock();
+ lock.unlock();
completions_done();
return 0;
ERR:
- lock.Lock();
+ lock.lock();
data.done = 1;
- lock.Unlock();
+ lock.unlock();
return r;
}
std::list<Object> objects;
bool objects_remain = true;
- lock.Lock();
+ lock.lock();
data.done = false;
data.in_flight = 0;
data.started = 0;
data.finished = 0;
- lock.Unlock();
+ lock.unlock();
out(cout) << "Warning: using slow linear search" << std::endl;
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
}
//keep on adding new removes as old ones complete
while (objects_remain) {
- lock.Lock();
+ lock.lock();
int old_slot = slot;
bool found = false;
while (1) {
}
lc.cond.Wait(lock);
}
- lock.Unlock();
+ lock.unlock();
// get more objects if necessary
if (objects.empty()) {
objects.pop_front();
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
- lock.Unlock();
+ lock.unlock();
release_completion(slot);
//start new remove and check data if requested
if (r < 0) {
goto ERR;
}
- lock.Lock();
+ lock.lock();
++data.started;
++data.in_flight;
- lock.Unlock();
+ lock.unlock();
name[slot] = newName;
}
while (data.finished < data.started) {
slot = data.finished % concurrentios;
completion_wait(slot);
- lock.Lock();
+ lock.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.Unlock();
+ lock.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
release_completion(slot);
- lock.Unlock();
+ lock.unlock();
}
- lock.Lock();
+ lock.lock();
data.done = true;
- lock.Unlock();
+ lock.unlock();
completions_done();
return 0;
ERR:
- lock.Lock();
+ lock.lock();
data.done = 1;
- lock.Unlock();
+ lock.unlock();
return -EIO;
}