#include "osdc/Filer.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "common/admin_socket.h"
#include "common/errno.h"
{
std::unique_ptr<Formatter> f(Formatter::create(format));
f->open_object_section("result");
- m_client->client_lock.Lock();
- if (command == "mds_requests")
- m_client->dump_mds_requests(f.get());
- else if (command == "mds_sessions")
- m_client->dump_mds_sessions(f.get());
- else if (command == "dump_cache")
- m_client->dump_cache(f.get());
- else if (command == "kick_stale_sessions")
- m_client->_kick_stale_sessions();
- else if (command == "status")
- m_client->dump_status(f.get());
- else
- ceph_abort_msg("bad command registered");
- m_client->client_lock.Unlock();
+ {
+ std::lock_guard l{m_client->client_lock};
+ if (command == "mds_requests")
+ m_client->dump_mds_requests(f.get());
+ else if (command == "mds_sessions")
+ m_client->dump_mds_sessions(f.get());
+ else if (command == "dump_cache")
+ m_client->dump_cache(f.get());
+ else if (command == "kick_stale_sessions")
+ m_client->_kick_stale_sessions();
+ else if (command == "status")
+ m_client->dump_status(f.get());
+ else
+ ceph_abort_msg("bad command registered");
+ }
f->close_section();
f->flush(out);
return true;
Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
: Dispatcher(m->cct),
timer(m->cct, client_lock),
- client_lock("Client::client_lock"),
messenger(m),
monclient(mc),
objecter(objecter_),
Client::~Client()
{
- ceph_assert(!client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_not_locked(client_lock));
// It is necessary to hold client_lock, because any inode destruction
// may call into ObjectCacher, which asserts that it's lock (which is
// client_lock) is held.
- client_lock.Lock();
+ std::lock_guard l{client_lock};
tear_down_cache();
- client_lock.Unlock();
}
void Client::tear_down_cache()
void Client::dump_status(Formatter *f)
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
ldout(cct, 1) << __func__ << dendl;
{
timer.init();
objectcacher->start();
-
- client_lock.Lock();
- ceph_assert(!initialized);
-
- messenger->add_dispatcher_tail(this);
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ ceph_assert(!initialized);
+ messenger->add_dispatcher_tail(this);
+ }
_finish_init();
return 0;
}
void Client::_finish_init()
{
- client_lock.Lock();
- // logger
- PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
- plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
- plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
- plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
- plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
- plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
- logger.reset(plb.create_perf_counters());
- cct->get_perfcounters_collection()->add(logger.get());
-
- client_lock.Unlock();
+ {
+ std::lock_guard l{client_lock};
+ // logger
+ PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
+ plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
+ plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
+ plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
+ plb.add_time_avg(l_c_read, "rdlat", "Latency of a file data read operation");
+ plb.add_time_avg(l_c_fsync, "fsync", "Latency of a file sync operation");
+ logger.reset(plb.create_perf_counters());
+ cct->get_perfcounters_collection()->add(logger.get());
+ }
cct->_conf.add_observer(this);
<< cpp_strerror(-ret) << dendl;
}
- client_lock.Lock();
+ std::lock_guard l{client_lock};
initialized = true;
- client_lock.Unlock();
}
void Client::shutdown()
// If we were not mounted, but were being used for sending
// MDS commands, we may have sessions that need closing.
- client_lock.Lock();
- _close_sessions();
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ _close_sessions();
+ }
cct->_conf.remove_observer(this);
cct->get_admin_socket()->unregister_commands(&m_command_hook);
}
objectcacher->stop(); // outside of client_lock! this does a join.
-
- client_lock.Lock();
- ceph_assert(initialized);
- initialized = false;
- timer.shutdown();
- client_lock.Unlock();
-
+ {
+ std::lock_guard l{client_lock};
+ ceph_assert(initialized);
+ initialized = false;
+ timer.shutdown();
+ }
objecter_finisher.wait_for_empty();
objecter_finisher.stop();
}
// set up wait cond
- Cond caller_cond;
+ ceph::condition_variable caller_cond;
request->caller_cond = &caller_cond;
// choose mds
// wait for signal
ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
request->kick = false;
- while (!request->reply && // reply
- request->resend_mds < 0 && // forward
- !request->kick)
- caller_cond.Wait(client_lock);
- request->caller_cond = NULL;
+ std::unique_lock l{client_lock, std::adopt_lock};
+ caller_cond.wait(l, [request] {
+ return (request->reply || // reply
+ request->resend_mds >= 0 || // forward
+ request->kick);
+ });
+ l.release();
+ request->caller_cond = nullptr;
// did we get a reply?
if (request->reply)
// kick dispatcher (we've got it!)
ceph_assert(request->dispatch_cond);
- request->dispatch_cond->Signal();
+ request->dispatch_cond->notify_all();
ldout(cct, 20) << "sendrecv kickback on tid " << tid << " " << request->dispatch_cond << dendl;
request->dispatch_cond = 0;
s->state = MetaSession::STATE_CLOSED;
s->con->mark_down();
signal_context_list(s->waiting_for_open);
- mount_cond.Signal();
+ mount_cond.notify_all();
remove_session_caps(s);
kick_requests_closed(s);
mds_sessions.erase(s->mds_num);
renew_caps(session);
session->state = MetaSession::STATE_OPEN;
if (unmounting)
- mount_cond.Signal();
+ mount_cond.notify_all();
else
connect_mds_targets(from);
signal_context_list(session->waiting_for_open);
bool Client::_any_stale_sessions() const
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
for (const auto &p : mds_sessions) {
if (p.second.state == MetaSession::STATE_STALE) {
request->item.remove_myself();
request->num_fwd = fwd->get_num_fwd();
request->resend_mds = fwd->get_dest_mds();
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
}
bool Client::is_dir_operation(MetaRequest *req)
request->sent_on_mseq == it->second.mseq)) {
ldout(cct, 20) << "have to return ESTALE" << dendl;
} else {
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
return;
}
}
// Only signal the caller once (on the first reply):
// Either its an unsafe reply, or its a safe reply and no unsafe reply was sent.
if (!is_safe || !request->got_unsafe) {
- Cond cond;
+ ceph::condition_variable cond;
request->dispatch_cond = &cond;
// wake up waiter
ldout(cct, 20) << __func__ << " signalling caller " << (void*)request->caller_cond << dendl;
- request->caller_cond->Signal();
+ request->caller_cond->notify_all();
// wake for kick back
- while (request->dispatch_cond) {
- ldout(cct, 20) << __func__ << " awaiting kickback on tid " << tid << " " << &cond << dendl;
- cond.Wait(client_lock);
- }
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [tid, request, &cond, this] {
+ if (request->dispatch_cond) {
+ ldout(cct, 20) << "handle_client_reply awaiting kickback on tid "
+ << tid << " " << &cond << dendl;
+ }
+ return !request->dispatch_cond;
+ });
+ l.release();
}
if (is_safe) {
unregister_request(request);
}
if (unmounting)
- mount_cond.Signal();
+ mount_cond.notify_all();
}
void Client::_handle_full_flag(int64_t pool)
trim_cache();
if (size < lru.lru_get_size() + inode_map.size()) {
ldout(cct, 10) << "unmounting: trim pass, cache shrank, poking unmount()" << dendl;
- mount_cond.Signal();
+ mount_cond.notify_all();
} else {
ldout(cct, 10) << "unmounting: trim pass, size still " << lru.lru_get_size()
<< "+" << inode_map.size() << dendl;
m->set_encoding_version(0); // use connection features to choose encoding
session->con->send_message2(std::move(m));
- mount_cond.Signal();
+ mount_cond.notify_all();
if (session->reclaim_state == MetaSession::RECLAIMING)
signal_cond_list(waiting_for_reclaim);
if (req->aborted()) {
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
continue;
}
if (req->mds == session->mds_num) {
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
req->item.remove_myself();
if (req->got_unsafe) {
public:
C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { }
void finish(int r) override {
- ceph_assert(client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock));
if (r != 0) {
client_t const whoami = client->whoami; // For the benefit of ldout prefix
ldout(client->cct, 1) << "I/O error from flush on inode " << inode
}
}
-void Client::wait_on_list(list<Cond*>& ls)
+void Client::wait_on_list(list<ceph::condition_variable*>& ls)
{
- Cond cond;
+ ceph::condition_variable cond;
ls.push_back(&cond);
- cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l);
+ l.release();
ls.remove(&cond);
}
-void Client::signal_cond_list(list<Cond*>& ls)
+void Client::signal_cond_list(list<ceph::condition_variable*>& ls)
{
- for (list<Cond*>::iterator it = ls.begin(); it != ls.end(); ++it)
- (*it)->Signal();
+ for (auto cond : ls) {
+ cond->notify_all();
+ }
}
void Client::wait_on_context_list(list<Context*>& ls)
{
- Cond cond;
+ ceph::condition_variable cond;
bool done = false;
int r;
- ls.push_back(new C_Cond(&cond, &done, &r));
- while (!done)
- cond.Wait(client_lock);
+ ls.push_back(new C_Cond(cond, &done, &r));
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [&done] { return done;});
+ l.release();
}
void Client::signal_context_list(list<Context*>& ls)
}
void finish(int r) override {
// _async_invalidate takes the lock when it needs to, call this back from outside of lock.
- ceph_assert(!client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
client->_async_invalidate(ino, offset, length);
}
};
void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
{
- ceph_assert(client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(client_lock));
if (!in->oset.dirty_or_tx) {
ldout(cct, 10) << " nothing to flush" << dendl;
return;
offset, size, &onflush);
if (!ret) {
// wait for flush
- client_lock.Unlock();
+ client_lock.unlock();
onflush.wait();
- client_lock.Lock();
+ client_lock.lock();
}
}
void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
{
// std::lock_guard l(client_lock);
- ceph_assert(client_lock.is_locked()); // will be called via dispatch() -> objecter -> ...
+ ceph_assert(ceph_mutex_is_locked(client_lock)); // will be called via dispatch() -> objecter -> ...
Inode *in = static_cast<Inode *>(oset->parent);
ceph_assert(in);
_flushed(in);
signal_cond_list(in->waitfor_caps);
}
s->flushing_caps_tids.clear();
- sync_cond.Signal();
+ sync_cond.notify_all();
}
int Client::_do_remount(bool retry_on_error)
if (oldest_tid <= want) {
ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
<< " (want " << want << ")" << dendl;
- sync_cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ sync_cond.wait(l);
+ l.release();
goto retry;
}
}
signal_cond_list(in->waitfor_caps);
if (session->flushing_caps_tids.empty() ||
*session->flushing_caps_tids.begin() > flush_ack_tid)
- sync_cond.Signal();
+ sync_cond.notify_all();
}
if (!dirty) {
signal_cond_list(in->waitfor_caps);
if (session->flushing_caps_tids.empty() ||
*session->flushing_caps_tids.begin() > flush_ack_tid)
- sync_cond.Signal();
+ sync_cond.notify_all();
}
} else {
ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows
}
void finish(int r) override {
// _async_dentry_invalidate is responsible for its own locking
- ceph_assert(!client->client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_not_locked_by_me(client->client_lock));
client->_async_dentry_invalidate(dirino, ino, name);
}
};
*/
int Client::authenticate()
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
if (monclient->is_authenticated()) {
return 0;
}
- client_lock.Unlock();
+ client_lock.unlock();
int r = monclient->authenticate(cct->_conf->client_mount_timeout);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0) {
return r;
}
do {
C_SaferCond cond;
monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
- client_lock.Unlock();
+ client_lock.unlock();
r = cond.wait();
- client_lock.Lock();
+ client_lock.lock();
} while (r == -EAGAIN);
if (r < 0) {
// wait for sessions to close
ldout(cct, 2) << "waiting for " << mds_sessions.size() << " mds sessions to close" << dendl;
- mount_cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ mount_cond.wait(l);
+ l.release();
}
}
req->abort(err);
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
}
void Client::_unmount(bool abort)
{
+ std::unique_lock lock{client_lock, std::adopt_lock};
if (unmounting)
return;
flush_mdlog_sync();
}
- while (!mds_requests.empty()) {
- ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests" << dendl;
- mount_cond.Wait(client_lock);
- }
-
+ mount_cond.wait(lock, [this] {
+ if (!mds_requests.empty()) {
+ ldout(cct, 10) << "waiting on " << mds_requests.size() << " requests"
+ << dendl;
+ }
+ return mds_requests.empty();
+ });
if (tick_event)
timer.cancel_event(tick_event);
tick_event = 0;
_ll_drop_pins();
- while (unsafe_sync_write > 0) {
- ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting" << dendl;
- mount_cond.Wait(client_lock);
- }
+ mount_cond.wait(lock, [this] {
+ if (unsafe_sync_write > 0) {
+ ldout(cct, 0) << unsafe_sync_write << " unsafe_sync_writes, waiting"
+ << dendl;
+ }
+ return unsafe_sync_write <= 0;
+ });
if (cct->_conf->client_oc) {
// flush/release all buffered data
<< "+" << inode_map.size() << " items"
<< ", waiting (for caps to release?)"
<< dendl;
- utime_t until = ceph_clock_now() + utime_t(5, 0);
- int r = mount_cond.WaitUntil(client_lock, until);
- if (r == ETIMEDOUT) {
+ if (auto r = mount_cond.wait_for(lock, ceph::make_timespan(5));
+ r == std::cv_status::timeout) {
dump_cache(NULL);
}
}
mounted = false;
+ lock.release();
ldout(cct, 2) << "unmounted." << dendl;
}
cct->_conf->client_tick_interval,
new FunctionContext([this](int) {
// Called back via Timer, which takes client_lock for us
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
tick();
}));
utime_t now = ceph_clock_now();
req->abort(-ETIMEDOUT);
if (req->caller_cond) {
req->kick = true;
- req->caller_cond->Signal();
+ req->caller_cond->notify_all();
}
signal_cond_list(waiting_for_mdsmap);
for (auto &p : mds_sessions) {
int Client::_readdir_cache_cb(dir_result_t *dirp, add_dirent_cb_t cb, void *p,
int caps, bool getref)
{
- ceph_assert(client_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(client_lock));
ldout(cct, 10) << __func__ << " " << dirp << " on " << dirp->inode->ino
<< " last_name " << dirp->last_name << " offset " << hex << dirp->offset << dec
<< dendl;
dn_name = dn->name; // fill in name while we have lock
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, in); // _next_ offset
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << " de " << de.d_name << " off " << hex << dn->offset << dec
<< " = " << r << dendl;
if (r < 0) {
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0)
return r;
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode);
- client_lock.Lock();
+ client_lock.lock();
if (r < 0)
return r;
_ll_get(inode);
}
- client_lock.Unlock();
+ client_lock.unlock();
r = cb(p, &de, &stx, next_off, inode); // _next_ offset
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
<< " = " << r << dendl;
ldout(cct, 10) << __func__ << " " << f << dendl;
if (f->pos_locked || !f->pos_waiters.empty()) {
- Cond cond;
+ ceph::condition_variable cond;
f->pos_waiters.push_back(&cond);
ldout(cct, 10) << __func__ << " BLOCKING on " << f << dendl;
- while (f->pos_locked || f->pos_waiters.front() != &cond)
- cond.Wait(client_lock);
+ std::unique_lock l{client_lock, std::adopt_lock};
+ cond.wait(l, [f, me=&cond] {
+ return !f->pos_locked && f->pos_waiters.front() == me;
+ });
+ l.release();
ldout(cct, 10) << __func__ << " UNBLOCKING on " << f << dendl;
ceph_assert(f->pos_waiters.front() == &cond);
f->pos_waiters.pop_front();
// done!
if (onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (ret >= 0 || ret == -ECANCELED) {
in->inline_data.clear();
in->inline_version = CEPH_INLINE_NONE;
off, len, bl, 0, &onfinish);
if (r == 0) {
get_cap_ref(in, CEPH_CAP_FILE_CACHE);
- client_lock.Unlock();
+ client_lock.unlock();
r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
}
ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;
- Mutex flock("Client::_read_sync flock");
- Cond cond;
+ ceph::mutex flock = ceph::make_mutex("Client::_read_sync flock");
+ ceph::condition_variable cond;
while (left > 0) {
C_SaferCond onfinish("Client::_read_sync flock");
bufferlist tbl;
pos, left, &tbl, 0,
in->truncate_size, in->truncate_seq,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
int r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
// if we get ENOENT from OSD, assume 0 bytes returned
if (r == -ENOENT)
ldout(cct, 15) << __func__ << " unsafe_sync_write = " << unsafe_sync_write << dendl;
if (unsafe_sync_write == 0 && unmounting) {
ldout(cct, 10) << __func__ << " -- no more unsafe writes, unmount can proceed" << dendl;
- mount_cond.Signal();
+ mount_cond.notify_all();
}
}
offset, size, bl, ceph::real_clock::now(), 0,
in->truncate_size, in->truncate_seq,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
_sync_write_commit(in);
}
done:
if (nullptr != onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int uninline_ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (uninline_ret >= 0 || uninline_ret == -ECANCELED) {
in->inline_data.clear();
}
if (nullptr != object_cacher_completion) { // wait on a real reply instead of guessing
- client_lock.Unlock();
+ client_lock.unlock();
ldout(cct, 15) << "waiting on data to flush" << dendl;
r = object_cacher_completion->wait();
- client_lock.Lock();
+ client_lock.lock();
ldout(cct, 15) << "got " << r << " from flush writeback" << dendl;
} else {
// FIXME: this can starve
objecter->get_fs_stats(stats, boost::optional<int64_t>(), &cond);
}
- client_lock.Unlock();
+ client_lock.unlock();
int rval = cond.wait();
assert(root);
total_files_on_fs = root->rstat.rfiles + root->rstat.rsubdirs;
- client_lock.Lock();
+ client_lock.lock();
if (rval < 0) {
ldout(cct, 1) << "underlying call to statfs returned error: "
wait_sync_caps(flush_tid);
if (nullptr != cond) {
- client_lock.Unlock();
+ client_lock.unlock();
ldout(cct, 15) << __func__ << " waiting on data to flush" << dendl;
cond->wait();
ldout(cct, 15) << __func__ << " flush finished" << dendl;
- client_lock.Lock();
+ client_lock.lock();
}
return 0;
CEPH_OSD_FLAG_READ,
&onfinish);
- client_lock.Unlock();
+ client_lock.unlock();
int r = onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
if (r >= 0) {
bl.copy(0, bl.length(), buf);
fakesnap.seq = snapseq;
/* lock just in time */
- client_lock.Lock();
+ client_lock.lock();
if (unmounting) {
- client_lock.Unlock();
+ client_lock.unlock();
return -ENOTCONN;
}
0,
onsafe.get());
- client_lock.Unlock();
+ client_lock.unlock();
if (nullptr != onsafe) {
r = onsafe->wait();
}
in->change_attr++;
in->mark_caps_dirty(CEPH_CAP_FILE_WR);
- client_lock.Unlock();
+ client_lock.unlock();
onfinish.wait();
- client_lock.Lock();
+ client_lock.lock();
_sync_write_commit(in);
}
} else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
}
if (nullptr != onuninline) {
- client_lock.Unlock();
+ client_lock.unlock();
int ret = onuninline->wait();
- client_lock.Lock();
+ client_lock.lock();
if (ret >= 0 || ret == -ECANCELED) {
in->inline_data.clear();
objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
nullsnapc, ceph::real_clock::now(), 0, &wr_cond);
- client_lock.Unlock();
+ client_lock.unlock();
int rd_ret = rd_cond.wait();
int wr_ret = wr_cond.wait();
- client_lock.Lock();
+ client_lock.lock();
bool errored = false;
C_SaferCond cond;
if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
- client_lock.Unlock();
+ client_lock.unlock();
cond.wait();
- client_lock.Lock();
+ client_lock.lock();
}
bool blacklisted = objecter->with_osdmap(
mds_rank_t Client::_get_random_up_mds() const
{
- ceph_assert(client_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
std::set<mds_rank_t> up;
mdsmap->get_up_mds_set(up);
objectcacher->start();
objecter->init();
- client_lock.Lock();
+ client_lock.lock();
ceph_assert(!is_initialized());
messenger->add_dispatcher_tail(objecter);
if (r < 0) {
// need to do cleanup because we're in an intermediate init state
timer.shutdown();
- client_lock.Unlock();
+ client_lock.unlock();
objecter->shutdown();
objectcacher->stop();
monclient->shutdown();
}
objecter->start();
- client_lock.Unlock();
+ client_lock.unlock();
_finish_init();
return 0;
int n = 0;
// for object traces
- Mutex lock("synclient foo");
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("synclient foo");
+ ceph::condition_variable cond;
bool ack;
while (!t.end()) {
int64_t oh = t.get_int();
int64_t ol = t.get_int();
object_t oid = file_object_t(oh, ol);
- lock.Lock();
+ std::unique_lock locker{lock};
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
uint64_t size;
ceph::real_time mtime;
client->objecter->stat(oid, oloc, CEPH_NOSNAP, &size, &mtime, 0,
- new C_SafeCond(&lock, &cond, &ack));
- while (!ack) cond.Wait(lock);
- lock.Unlock();
+ new C_SafeCond(lock, cond, &ack));
+ cond.wait(locker, [&ack] { return ack; });
}
else if (strcmp(op, "o_read") == 0) {
int64_t oh = t.get_int();
int64_t len = t.get_int();
object_t oid = file_object_t(oh, ol);
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
- lock.Lock();
+ std::unique_lock locker{lock};
bufferlist bl;
client->objecter->read(oid, oloc, off, len, CEPH_NOSNAP, &bl, 0,
- new C_SafeCond(&lock, &cond, &ack));
- while (!ack) cond.Wait(lock);
- lock.Unlock();
+ new C_SafeCond(lock, cond, &ack));
+ cond.wait(locker, [&ack] { return ack; });
}
else if (strcmp(op, "o_write") == 0) {
int64_t oh = t.get_int();
int64_t len = t.get_int();
object_t oid = file_object_t(oh, ol);
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
- lock.Lock();
+ std::unique_lock locker{lock};
bufferptr bp(len);
bufferlist bl;
bl.push_back(bp);
SnapContext snapc;
client->objecter->write(oid, oloc, off, len, snapc, bl,
ceph::real_clock::now(), 0,
- new C_SafeCond(&lock, &cond, &ack));
- while (!ack) cond.Wait(lock);
- lock.Unlock();
+ new C_SafeCond(lock, cond, &ack));
+ cond.wait(locker, [&ack] { return ack; });
}
else if (strcmp(op, "o_zero") == 0) {
int64_t oh = t.get_int();
int64_t len = t.get_int();
object_t oid = file_object_t(oh, ol);
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
- lock.Lock();
+ std::unique_lock locker{lock};
SnapContext snapc;
client->objecter->zero(oid, oloc, off, len, snapc,
ceph::real_clock::now(), 0,
- new C_SafeCond(&lock, &cond, &ack));
- while (!ack) cond.Wait(lock);
- lock.Unlock();
+ new C_SafeCond(lock, cond, &ack));
+ cond.wait(locker, [&ack] { return ack; });
}
class C_Ref : public Context {
- Mutex& lock;
- Cond& cond;
+ ceph::mutex& lock;
+ ceph::condition_variable& cond;
int *ref;
public:
- C_Ref(Mutex &l, Cond &c, int *r) : lock(l), cond(c), ref(r) {
- lock.Lock();
+ C_Ref(ceph::mutex &l, ceph::condition_variable &c, int *r)
+ : lock(l), cond(c), ref(r) {
+ lock_guard locker{lock};
(*ref)++;
- lock.Unlock();
}
void finish(int) override {
- lock.Lock();
+ lock_guard locker{lock};
(*ref)--;
- cond.Signal();
- lock.Unlock();
+ cond.notify_all();
}
};
bufferlist bl;
bl.push_back(bp);
- Mutex lock("create_objects lock");
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("create_objects lock");
+ ceph::condition_variable cond;
int unsafe = 0;
dout(10) << "writing " << oid << dendl;
starts.push_back(ceph_clock_now());
- client->client_lock.Lock();
- client->objecter->write(oid, oloc, 0, osize, snapc, bl,
- ceph::real_clock::now(), 0,
- new C_Ref(lock, cond, &unsafe));
- client->client_lock.Unlock();
-
- lock.Lock();
- while (unsafe > inflight) {
- dout(20) << "waiting for " << unsafe << " unsafe" << dendl;
- cond.Wait(lock);
+ {
+ std::lock_guard locker{client->client_lock};
+ client->objecter->write(oid, oloc, 0, osize, snapc, bl,
+ ceph::real_clock::now(), 0,
+ new C_Ref(lock, cond, &unsafe));
+ }
+ {
+ std::unique_lock locker{lock};
+ cond.wait(locker, [&unsafe, inflight, this] {
+ if (unsafe > inflight) {
+ dout(20) << "waiting for " << unsafe << " unsafe" << dendl;
+ }
+ return unsafe <= inflight;
+ });
}
- lock.Unlock();
-
utime_t lat = ceph_clock_now();
lat -= starts.front();
starts.pop_front();
}
-
- lock.Lock();
- while (unsafe > 0) {
- dout(10) << "waiting for " << unsafe << " unsafe" << dendl;
- cond.Wait(lock);
+ {
+ std::unique_lock locker{lock};
+ cond.wait(locker, [&unsafe, this] {
+ if (unsafe > 0) {
+ dout(10) << "waiting for " << unsafe << " unsafe" << dendl;
+ }
+ return unsafe <= 0;
+ });
}
- lock.Unlock();
-
dout(5) << "create_objects done" << dendl;
return 0;
}
prime += 2;
}
- Mutex lock("lock");
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("lock");
+ ceph::condition_variable cond;
int unack = 0;
object_locator_t oloc(SYNCLIENT_FIRST_POOL);
SnapContext snapc;
- client->client_lock.Lock();
+ client->client_lock.lock();
utime_t start = ceph_clock_now();
if (write) {
dout(10) << "write to " << oid << dendl;
client->objecter->read(oid, oloc, 0, osize, CEPH_NOSNAP, &inbl, 0,
new C_Ref(lock, cond, &unack));
}
- client->client_lock.Unlock();
+ client->client_lock.unlock();
- lock.Lock();
- while (unack > 0) {
- dout(20) << "waiting for " << unack << " unack" << dendl;
- cond.Wait(lock);
+ {
+ std::unique_lock locker{lock};
+ cond.wait(locker, [&unack, this] {
+ if (unack > 0) {
+ dout(20) << "waiting for " << unack << " unack" << dendl;
+ }
+ return unack <= 0;
+ });
}
- lock.Unlock();
utime_t lat = ceph_clock_now();
lat -= start;
while (pos < size) {
int get = std::min<int>(size - pos, 1048576);
- Mutex flock("synclient chunk_file lock");
- Cond cond;
+ ceph::mutex flock = ceph::make_mutex("synclient chunk_file lock");
+ ceph::condition_variable cond;
bool done;
bufferlist bl;
-
- flock.Lock();
- Context *onfinish = new C_SafeCond(&flock, &cond, &done);
- client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0,
- onfinish);
- while (!done)
- cond.Wait(flock);
- flock.Unlock();
-
+ {
+ std::unique_lock locker{flock};
+ Context *onfinish = new C_SafeCond(flock, cond, &done);
+ client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0,
+ onfinish);
+ cond.wait(locker, [&done] { return done; });
+ }
dout(0) << "got " << bl.length() << " bytes at " << pos << dendl;
if (from_before.length()) {