if (verbose) cout << "\t" << client_name
<< "-read_index: getting index_data for " << key
<< " from cache" << std::endl;
- icache_lock.Lock();
+ icache_lock.lock();
if (next_idata != NULL) {
err = icache.get(key, idata, next_idata);
} else {
err = icache.get(key, idata);
}
- icache_lock.Unlock();
+ icache_lock.unlock();
if (err == 0) {
//if (verbose) cout << "CACHE SUCCESS" << std::endl;
}
return read_index(key, idata, next_idata, force_update);
}
- icache_lock.Lock();
+ std::scoped_lock l{icache_lock};
icache.push(this_idata);
- icache_lock.Unlock();
}
auto b = kvmap.begin()->second.cbegin();
idata->decode(b);
<< ", idata is " << idata->str() << std::endl;
ceph_assert(idata->obj != "");
- icache_lock.Lock();
+ icache_lock.lock();
icache.push(key, *idata);
- icache_lock.Unlock();
+ icache_lock.unlock();
if (next_idata != NULL && idata->kdata.prefix != "1") {
next_idata->kdata.parse((++kvmap.begin())->first);
auto nb = (++kvmap.begin())->second.cbegin();
next_idata->decode(nb);
- icache_lock.Lock();
+ std::scoped_lock l{icache_lock};
icache.push(*next_idata);
- icache_lock.Unlock();
}
return err;
}
//for lower half object
map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin();
- client_index_lock.Lock();
+ client_index_lock.lock();
to_create.push_back(object_data(to_string(client_name, client_index++)));
- client_index_lock.Unlock();
+ client_index_lock.unlock();
for (int i = 0; i < k; i++) {
to_create[0].omap.insert(*it);
++it;
to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first);
//for upper half object
- client_index_lock.Lock();
+ client_index_lock.lock();
to_create.push_back(object_data(to_create[0].max_kdata,
args.odata.max_kdata,
to_string(client_name, client_index++)));
- client_index_lock.Unlock();
+ client_index_lock.unlock();
to_create[1].omap.insert(
++args.odata.omap.find(to_create[0].omap.rbegin()->first),
args.odata.omap.end());
if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
<< std::endl;
/////END CRITICAL SECTION/////
- icache_lock.Lock();
+ icache_lock.lock();
for (vector<delete_data>::iterator it = out_data.to_delete.begin();
it != out_data.to_delete.end(); ++it) {
icache.erase(it->max);
it != out_data.to_create.end(); ++it) {
icache.push(index_data(*it));
}
- icache_lock.Unlock();
+ icache_lock.unlock();
return err;
}
}
//this is the high object. it gets created regardless of rebalance or merge.
- client_index_lock.Lock();
+ client_index_lock.lock();
string o2w = to_string(client_name, client_index++);
- client_index_lock.Unlock();
+ client_index_lock.unlock();
index_data idata;
vector<object_data> to_create;
vector<object_data> to_delete;
map<std::string, bufferlist> write1_map;
map<std::string, bufferlist> write2_map;
map<std::string, bufferlist>::iterator it;
- client_index_lock.Lock();
+ client_index_lock.lock();
string o1w = to_string(client_name, client_index++);
- client_index_lock.Unlock();
+ client_index_lock.unlock();
int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size)
/ 2.0);
if (args1.odata.max_kdata != idata1.kdata) {
if (err < 0) {
return err;
}
- icache_lock.Lock();
+ icache_lock.lock();
for (vector<delete_data>::iterator it = out_data.to_delete.begin();
it != out_data.to_delete.end(); ++it) {
icache.erase(it->max);
it != out_data.to_create.end(); ++it) {
icache.push(index_data(*it));
}
- icache_lock.Unlock();
+ icache_lock.unlock();
if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing."
<< std::endl;
/////END CRITICAL SECTION/////
if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
<< std::endl;
/////END CRITICAL SECTION/////
- icache_lock.Lock();
+ std::scoped_lock l{icache_lock};
for (vector<delete_data>::iterator it = idata.to_delete.begin();
it != idata.to_delete.end(); ++it) {
icache.erase(it->max);
it != idata.to_create.end(); ++it) {
icache.push(index_data(*it));
}
- icache_lock.Unlock();
return err;
}
#include "include/utime.h"
#include "include/types.h"
#include "include/encoding.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/Clock.h"
#include "common/Formatter.h"
#include "global/global_context.h"
bool verbose;//if true, display lots of debug output
//shared variables protected with mutexes
- Mutex client_index_lock;
+ ceph::mutex client_index_lock = ceph::make_mutex("client_index_lock");
int client_index; //names of new objects are client_name.client_index
- Mutex icache_lock;
+ ceph::mutex icache_lock = ceph::make_mutex("icache_lock");
IndexCache icache;
friend struct index_data;
cache_size(cache),
cache_refresh(cache_r),
verbose(verb),
- client_index_lock("client_index_lock"),
client_index(0),
- icache_lock("icache_lock"),
icache(cache)
{}
client_name("admin"),
verbose(false),
kvs(NULL),
- data_lock("data lock"),
ops_in_flight(0),
- ops_in_flight_lock("KvStoreBench::ops_in_flight_lock"),
rados_id("admin"),
pool_name("rbd"),
io_ctx_ready(false)
void KvStoreBench::aio_callback_timed(int * err, void *arg) {
timed_args *args = reinterpret_cast<timed_args *>(arg);
- Mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock;
- Mutex * data_lock = &args->kvsb->data_lock;
- Cond * op_avail = &args->kvsb->op_avail;
+ ceph::mutex * ops_in_flight_lock = &args->kvsb->ops_in_flight_lock;
+ ceph::mutex * data_lock = &args->kvsb->data_lock;
+ ceph::condition_variable * op_avail = &args->kvsb->op_avail;
int *ops_in_flight = &args->kvsb->ops_in_flight;
if (*err < 0 && *err != -61) {
cerr << "Error during " << args->op << " operation: " << *err << std::endl;
double time = args->sw.get_time();
args->sw.clear();
- data_lock->Lock();
+ data_lock->lock();
//latency
args->kvsb->data.latency_jf.open_object_section("latency");
args->kvsb->data.latency_jf.dump_float(string(1, args->op).c_str(),
ceph_clock_now());
args->kvsb->data.throughput_jf.close_section();
- data_lock->Unlock();
+ data_lock->unlock();
- ops_in_flight_lock->Lock();
+ ops_in_flight_lock->lock();
(*ops_in_flight)--;
- op_avail->Signal();
- ops_in_flight_lock->Unlock();
+ op_avail->notify_all();
+ ops_in_flight_lock->unlock();
delete args;
}
cout << "done waiting. Starting random operations..." << std::endl;
- Mutex::Locker l(ops_in_flight_lock);
+ std::unique_lock l{ops_in_flight_lock};
for (int i = 0; i < ops; i++) {
ceph_assert(ops_in_flight <= max_ops_in_flight);
if (ops_in_flight == max_ops_in_flight) {
- int err = op_avail.Wait(ops_in_flight_lock);
- if (err < 0) {
- ceph_abort();
- return err;
- }
+ op_avail.wait(l);
ceph_assert(ops_in_flight < max_ops_in_flight);
}
cout << "\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" << i + 1 << " / "
delete cb_args;
}
- while(ops_in_flight > 0) {
- op_avail.Wait(ops_in_flight_lock);
- }
+ op_avail.wait(l, [this] { return ops_in_flight <= 0; });
print_time_data();
return err;