class_handler(osd->class_handler),
osd_max_object_size(cct->_conf, "osd_max_object_size"),
osd_skip_data_digest(cct->_conf, "osd_skip_data_digest"),
- publish_lock("OSDService::publish_lock"),
- pre_publish_lock("OSDService::pre_publish_lock"),
+ publish_lock{ceph::make_mutex("OSDService::publish_lock")},
+ pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
max_oldest_map(0),
peer_map_epoch_lock("OSDService::peer_map_epoch_lock"),
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
pg_slots_by_epoch.erase(pg_slots_by_epoch.iterator_to(*slot));
slot->epoch = 0;
if (waiting_for_min_pg_epoch) {
- min_pg_epoch_cond.Signal();
+ min_pg_epoch_cond.notify_all();
}
}
dout(30) << "min is now " << pg_slots_by_epoch.begin()->epoch
<< " on " << pg_slots_by_epoch.begin()->pg->pg_id << dendl;
if (waiting_for_min_pg_epoch) {
- min_pg_epoch_cond.Signal();
+ min_pg_epoch_cond.notify_all();
}
}
void OSDShard::wait_min_pg_epoch(epoch_t need)
{
- std::lock_guard l(shard_lock);
+ std::unique_lock l{shard_lock};
++waiting_for_min_pg_epoch;
- while (!pg_slots_by_epoch.empty() &&
- pg_slots_by_epoch.begin()->epoch < need) {
- dout(10) << need << " waiting on "
- << pg_slots_by_epoch.begin()->epoch << dendl;
- min_pg_epoch_cond.Wait(shard_lock);
- }
+ min_pg_epoch_cond.wait(l, [need, this] {
+ if (pg_slots_by_epoch.empty()) {
+ return true;
+ } else if (pg_slots_by_epoch.begin()->epoch >= need) {
+ return true;
+ } else {
+ dout(10) << need << " waiting on "
+ << pg_slots_by_epoch.begin()->epoch << dendl;
+ return false;
+ }
+ });
--waiting_for_min_pg_epoch;
}
++p;
}
if (queued) {
- sdata_wait_lock.Lock();
- sdata_cond.SignalOne();
- sdata_wait_lock.Unlock();
+ std::lock_guard l{sdata_wait_lock};
+ sdata_cond.notify_one();
}
}
epoch,
NullEvt())));
- sdata_wait_lock.Lock();
- sdata_cond.SignalOne();
- sdata_wait_lock.Unlock();
+ std::lock_guard l{sdata_wait_lock};
+ sdata_cond.notify_one();
}
void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
bool is_smallest_thread_index = thread_index < osd->num_shards;
// peek at spg_t
- sdata->shard_lock.Lock();
+ sdata->shard_lock.lock();
if (sdata->pqueue->empty() &&
(!is_smallest_thread_index || sdata->context_queue.empty())) {
- sdata->sdata_wait_lock.Lock();
+ std::unique_lock wait_lock{sdata->sdata_wait_lock};
if (is_smallest_thread_index && !sdata->context_queue.empty()) {
// we raced with a context_queue addition, don't wait
- sdata->sdata_wait_lock.Unlock();
+ wait_lock.unlock();
} else if (!sdata->stop_waiting) {
dout(20) << __func__ << " empty q, waiting" << dendl;
osd->cct->get_heartbeat_map()->clear_timeout(hb);
- sdata->shard_lock.Unlock();
- sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
- sdata->sdata_wait_lock.Unlock();
- sdata->shard_lock.Lock();
+ sdata->shard_lock.unlock();
+ sdata->sdata_cond.wait(wait_lock);
+ wait_lock.unlock();
+ sdata->shard_lock.lock();
if (sdata->pqueue->empty() &&
!(is_smallest_thread_index && !sdata->context_queue.empty())) {
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
return;
}
osd->cct->get_heartbeat_map()->reset_timeout(hb,
osd->cct->_conf->threadpool_default_timeout, 0);
} else {
dout(20) << __func__ << " need return immediately" << dendl;
- sdata->sdata_wait_lock.Unlock();
- sdata->shard_lock.Unlock();
+ wait_lock.unlock();
+ sdata->shard_lock.unlock();
return;
}
}
if (sdata->pqueue->empty()) {
if (osd->is_stopping()) {
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
return; // OSD shutdown, discard.
}
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
handle_oncommits(oncommits);
return;
}
OpQueueItem item = sdata->pqueue->dequeue();
if (osd->is_stopping()) {
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
return; // OSD shutdown, discard.
}
uint64_t requeue_seq = slot->requeue_seq;
++slot->num_running;
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
osd->service.maybe_inject_dispatch_delay();
pg->lock();
osd->service.maybe_inject_dispatch_delay();
- sdata->shard_lock.Lock();
+ sdata->shard_lock.lock();
auto q = sdata->pg_slots.find(token);
if (q == sdata->pg_slots.end()) {
// this can happen if we race with pg removal.
dout(20) << __func__ << " slot " << token << " no longer there" << dendl;
pg->unlock();
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
handle_oncommits(oncommits);
return;
}
dout(20) << __func__ << " " << token
<< " nothing queued" << dendl;
pg->unlock();
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
handle_oncommits(oncommits);
return;
}
<< requeue_seq << ", we raced with _wake_pg_slot"
<< dendl;
pg->unlock();
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
handle_oncommits(oncommits);
return;
}
}
unsigned pushes_to_free = qi.get_reserved_pushes();
if (pushes_to_free > 0) {
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
osd->service.release_reserved_pushes(pushes_to_free);
handle_oncommits(oncommits);
return;
}
}
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
handle_oncommits(oncommits);
return;
}
OSDMapRef osdmap = sdata->shard_osdmap;
if (qi.get_map_epoch() > osdmap->get_epoch()) {
_add_slot_waiter(token, slot, std::move(qi));
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
pg->unlock();
handle_oncommits(oncommits);
return;
}
}
- sdata->shard_lock.Unlock();
+ sdata->shard_lock.unlock();
if (!new_children.empty()) {
for (auto shard : osd->shards) {
assert (NULL != sdata);
unsigned priority = item.get_priority();
unsigned cost = item.get_cost();
- sdata->shard_lock.Lock();
+ sdata->shard_lock.lock();
dout(20) << __func__ << " " << item << dendl;
if (priority >= osd->op_prio_cutoff)
else
sdata->pqueue->enqueue(
item.get_owner(), priority, cost, std::move(item));
- sdata->shard_lock.Unlock();
-
- sdata->sdata_wait_lock.Lock();
- sdata->sdata_cond.SignalOne();
- sdata->sdata_wait_lock.Unlock();
+ sdata->shard_lock.unlock();
+ std::lock_guard l{sdata->sdata_wait_lock};
+ sdata->sdata_cond.notify_one();
}
void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
auto& sdata = osd->shards[shard_index];
ceph_assert(sdata);
- sdata->shard_lock.Lock();
+ sdata->shard_lock.lock();
auto p = sdata->pg_slots.find(item.get_ordering_token());
if (p != sdata->pg_slots.end() &&
!p->second->to_process.empty()) {
dout(20) << __func__ << " " << item << dendl;
}
sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
- sdata->shard_lock.Unlock();
- sdata->sdata_wait_lock.Lock();
- sdata->sdata_cond.SignalOne();
- sdata->sdata_wait_lock.Unlock();
+ sdata->shard_lock.unlock();
+ std::lock_guard l{sdata->sdata_wait_lock};
+ sdata->sdata_cond.notify_one();
}
namespace ceph {
private:
// -- superblock --
- Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
+ ceph::mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
OSDSuperblock superblock;
public:
*/
private:
OSDMapRef next_osdmap;
- Cond pre_publish_cond;
+ ceph::condition_variable pre_publish_cond;
public:
void pre_publish_map(OSDMapRef map) {
if (--(i->second) == 0) {
map_reservations.erase(i);
}
- pre_publish_cond.Signal();
+ pre_publish_cond.notify_all();
}
/// blocks until there are no reserved maps prior to next_osdmap
void await_reserved_maps() {
- std::lock_guard l(pre_publish_lock);
+ std::unique_lock l{pre_publish_lock};
ceph_assert(next_osdmap);
- while (true) {
- map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
- if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
- break;
- } else {
- pre_publish_cond.Wait(pre_publish_lock);
- }
- }
+ pre_publish_cond.wait(l, [this] {
+ auto i = map_reservations.cbegin();
+ return (i == map_reservations.cend() ||
+ i->first >= next_osdmap->get_epoch());
+ });
}
OSDMapRef get_next_osdmap() {
std::lock_guard l(pre_publish_lock);
string shard_name;
string sdata_wait_lock_name;
- Mutex sdata_wait_lock;
- Cond sdata_cond;
+ ceph::mutex sdata_wait_lock;
+ ceph::condition_variable sdata_cond;
string osdmap_lock_name;
- Mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
+ ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
OSDMapRef shard_osdmap;
OSDMapRef get_osdmap() {
}
string shard_lock_name;
- Mutex shard_lock; ///< protects remaining members below
+ ceph::mutex shard_lock; ///< protects remaining members below
/// map of slots for each spg_t. maintains ordering of items dequeued
/// from pqueue while _process thread drops shard lock to acquire the
&OSDShardPGSlot::pg_epoch_item>,
boost::intrusive::compare<pg_slot_compare_by_epoch>> pg_slots_by_epoch;
int waiting_for_min_pg_epoch = 0;
- Cond min_pg_epoch_cond;
+ ceph::condition_variable min_pg_epoch_cond;
/// priority queue
std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
osd(osd),
shard_name(string("OSDShard.") + stringify(id)),
sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
- sdata_wait_lock(sdata_wait_lock_name.c_str(), false, true, false),
+ sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
osdmap_lock_name(shard_name + "::osdmap_lock"),
- osdmap_lock(osdmap_lock_name.c_str(), false, false),
+ osdmap_lock{make_mutex(osdmap_lock_name)},
shard_lock_name(shard_name + "::shard_lock"),
- shard_lock(shard_lock_name.c_str(), false, true, false),
+ shard_lock{make_mutex(shard_lock_name)},
context_queue(sdata_wait_lock, sdata_cond) {
if (opqueue == io_queue::weightedpriority) {
pqueue = std::make_unique<
class OSD : public Dispatcher,
public md_config_obs_t {
/** OSD **/
- Mutex osd_lock; // global lock
+ Mutex osd_lock; // global lock
SafeTimer tick_timer; // safe timer (osd_lock)
// Tick timer for those stuff that do not need osd_lock
void return_waiting_threads() override {
for(uint32_t i = 0; i < osd->num_shards; i++) {
OSDShard* sdata = osd->shards[i];
- assert (NULL != sdata);
- sdata->sdata_wait_lock.Lock();
+ assert (NULL != sdata);
+ std::scoped_lock l{sdata->sdata_wait_lock};
sdata->stop_waiting = true;
- sdata->sdata_cond.Signal();
- sdata->sdata_wait_lock.Unlock();
+ sdata->sdata_cond.notify_all();
}
}
for(uint32_t i = 0; i < osd->num_shards; i++) {
OSDShard* sdata = osd->shards[i];
assert (NULL != sdata);
- sdata->sdata_wait_lock.Lock();
+ std::scoped_lock l{sdata->sdata_wait_lock};
sdata->stop_waiting = false;
- sdata->sdata_wait_lock.Unlock();
}
}
snprintf(queue_name, sizeof(queue_name), "%s%" PRIu32, "OSD:ShardedOpWQ:", i);
ceph_assert(NULL != sdata);
- sdata->shard_lock.Lock();
+ std::scoped_lock l{sdata->shard_lock};
f->open_object_section(queue_name);
sdata->pqueue->dump(f);
f->close_section();
- sdata->shard_lock.Unlock();
}
}
return c;
}
void _process(Command *c, ThreadPool::TPHandle &) override {
- osd->osd_lock.Lock();
+ osd->osd_lock.lock();
if (osd->is_stopping()) {
- osd->osd_lock.Unlock();
+ osd->osd_lock.unlock();
delete c;
return;
}
osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
- osd->osd_lock.Unlock();
+ osd->osd_lock.unlock();
delete c;
}
void _clear() override {