publish_lock{ceph::make_mutex("OSDService::publish_lock")},
pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
max_oldest_map(0),
- sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
+ scrubs_pending(0),
scrubs_active(0),
- agent_lock("OSDService::agent_lock"),
agent_valid_iterator(false),
agent_ops(0),
flush_mode_high_count(0),
agent_active(true),
agent_thread(this),
agent_stop_flag(false),
- agent_timer_lock("OSDService::agent_timer_lock"),
agent_timer(osd->client_messenger->cct, agent_timer_lock),
last_recalibrate(ceph_clock_now()),
promote_max_objects(0),
promote_max_bytes(0),
objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, NULL, 0, 0)),
m_objecter_finishers(cct->_conf->osd_objecter_finishers),
- watch_lock("OSDService::watch_lock"),
watch_timer(osd->client_messenger->cct, watch_lock),
next_notif_id(0),
- recovery_request_lock("OSDService::recovery_request_lock"),
recovery_request_timer(cct, recovery_request_lock, false),
- sleep_lock("OSDService::sleep_lock"),
sleep_timer(cct, sleep_lock, false),
reserver_finisher(cct),
local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
- pg_temp_lock("OSDService::pg_temp_lock"),
snap_reserver(cct, &reserver_finisher,
cct->_conf->osd_max_trimming_pgs),
- recovery_lock("OSDService::recovery_lock"),
recovery_ops_active(0),
recovery_ops_reserved(0),
recovery_paused(false),
- map_cache_lock("OSDService::map_cache_lock"),
map_cache(cct, cct->_conf->osd_map_cache_size),
map_bl_cache(cct->_conf->osd_map_cache_size),
map_bl_inc_cache(cct->_conf->osd_map_cache_size),
- stat_lock("OSDService::stat_lock"),
- full_status_lock("OSDService::full_status_lock"),
cur_state(NONE),
cur_ratio(0), physical_ratio(0),
- epoch_lock("OSDService::epoch_lock"),
boot_epoch(0), up_epoch(0), bind_epoch(0)
-#ifdef PG_DEBUG_REFS
- , pgid_lock("OSDService::pgid_lock")
-#endif
{
objecter->init();
void OSDService::activate_map()
{
// wake/unwake the tiering agent
- agent_lock.Lock();
+ std::lock_guard l{agent_lock};
agent_active =
!osdmap->test_flag(CEPH_OSDMAP_NOTIERAGENT) &&
osd->is_active();
- agent_cond.Signal();
- agent_lock.Unlock();
+ agent_cond.notify_all();
}
void OSDService::request_osdmap_update(epoch_t e)
void OSDService::agent_entry()
{
dout(10) << __func__ << " start" << dendl;
- agent_lock.Lock();
+ std::unique_lock agent_locker{agent_lock};
while (!agent_stop_flag) {
if (agent_queue.empty()) {
dout(20) << __func__ << " empty queue" << dendl;
- agent_cond.Wait(agent_lock);
+ agent_cond.wait(agent_locker);
continue;
}
uint64_t level = agent_queue.rbegin()->first;
if (!flush_mode_high_count)
agent_flush_quota = cct->_conf->osd_agent_max_low_ops - agent_ops;
if (agent_flush_quota <= 0 || top.empty() || !agent_active) {
- agent_cond.Wait(agent_lock);
+ agent_cond.wait(agent_locker);
continue;
}
dout(10) << "high_count " << flush_mode_high_count
<< " agent_ops " << agent_ops
<< " flush_quota " << agent_flush_quota << dendl;
- agent_lock.Unlock();
+ agent_locker.unlock();
if (!pg->agent_work(max, agent_flush_quota)) {
dout(10) << __func__ << " " << pg->pg_id
<< " no agent_work, delay for " << cct->_conf->osd_agent_delay_time
osd->logger->inc(l_osd_tier_delay);
// Queue a timer to call agent_choose_mode for this pg in 5 seconds
- agent_timer_lock.Lock();
+ std::lock_guard timer_locker{agent_timer_lock};
Context *cb = new AgentTimeoutCB(pg);
agent_timer.add_event_after(cct->_conf->osd_agent_delay_time, cb);
- agent_timer_lock.Unlock();
}
- agent_lock.Lock();
+ agent_locker.lock();
}
- agent_lock.Unlock();
dout(10) << __func__ << " finish" << dendl;
}
}
agent_stop_flag = true;
- agent_cond.Signal();
+ agent_cond.notify_all();
}
agent_thread.join();
}
bool OSDService::inc_scrubs_pending()
{
bool result = false;
-
- sched_scrub_lock.Lock();
+ std::lock_guard l{sched_scrub_lock};
if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
<< " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
} else {
dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
}
- sched_scrub_lock.Unlock();
-
return result;
}
void OSDService::dec_scrubs_pending()
{
- sched_scrub_lock.Lock();
+ std::lock_guard l{sched_scrub_lock};
dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
<< " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
--scrubs_pending;
ceph_assert(scrubs_pending >= 0);
- sched_scrub_lock.Unlock();
}
void OSDService::inc_scrubs_active(bool reserved)
{
- sched_scrub_lock.Lock();
+ std::lock_guard l{sched_scrub_lock};
++(scrubs_active);
if (reserved) {
--(scrubs_pending);
<< " (max " << cct->_conf->osd_max_scrubs
<< ", pending " << scrubs_pending << ")" << dendl;
}
- sched_scrub_lock.Unlock();
}
void OSDService::dec_scrubs_active()
{
- sched_scrub_lock.Lock();
+ std::lock_guard l{sched_scrub_lock};
dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
<< " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
--scrubs_active;
ceph_assert(scrubs_active >= 0);
- sched_scrub_lock.Unlock();
}
void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
std::pair<epoch_t, PGRef> p,
uint64_t reserved_pushes)
{
- ceph_assert(recovery_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
enqueue_back(
OpQueueItem(
unique_ptr<OpQueueItem::OpQueueable>(
MonClient *mc,
const std::string &dev, const std::string &jdev) :
Dispatcher(cct_),
- osd_lock("OSD::osd_lock"),
tick_timer(cct, osd_lock),
- tick_timer_lock("OSD::tick_timer_lock"),
tick_timer_without_osd_lock(cct, tick_timer_lock),
gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
cluster_messenger(internal_messenger),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
get_num_op_threads()),
command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
- session_waiting_lock("OSD::session_waiting_lock"),
- osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
- heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false),
heartbeat_need_update(true),
hb_front_client_messenger(hb_client_front),
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
&osd_op_tp),
- map_lock("OSD::map_lock"),
last_pg_create_epoch(0),
- mon_report_lock("OSD::mon_report_lock"),
boot_finisher(cct),
up_thru_wanted(0),
requested_full_first(0),
new C_Tick_WithoutOSDLock(this));
}
- osd_lock.Unlock();
+ osd_lock.unlock();
r = monc->authenticate();
if (r < 0) {
exit(1);
}
- osd_lock.Lock();
+ osd_lock.lock();
if (is_stopping())
return 0;
{
if (!service.prepare_to_stop())
return 0; // already shutting down
- osd_lock.Lock();
+ osd_lock.lock();
if (is_stopping()) {
- osd_lock.Unlock();
+ osd_lock.unlock();
return 0;
}
dout(0) << "shutdown" << dendl;
delete test_ops_hook;
test_ops_hook = NULL;
- osd_lock.Unlock();
+ osd_lock.unlock();
- heartbeat_lock.Lock();
- heartbeat_stop = true;
- heartbeat_cond.Signal();
- heartbeat_lock.Unlock();
+ {
+ std::lock_guard l{heartbeat_lock};
+ heartbeat_stop = true;
+ heartbeat_cond.notify_all();
+ }
heartbeat_thread.join();
osd_op_tp.drain();
boot_finisher.wait_for_empty();
- osd_lock.Lock();
+ osd_lock.lock();
boot_finisher.stop();
reset_heartbeat_peers(true);
service.dump_live_pgids();
#endif
- osd_lock.Unlock();
+ osd_lock.unlock();
cct->_conf.remove_observer(this);
- osd_lock.Lock();
+ osd_lock.lock();
service.meta_ch.reset();
}
monc->shutdown();
- osd_lock.Unlock();
-
- map_lock.get_write();
- osdmap = OSDMapRef();
- map_lock.put_write();
-
+ osd_lock.unlock();
+ {
+ std::unique_lock l{map_lock};
+ osdmap = OSDMapRef();
+ }
for (auto s : shards) {
std::lock_guard l(s->osdmap_lock);
s->shard_osdmap = OSDMapRef();
void OSD::load_pgs()
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(0) << "load_pgs" << dendl;
{
pg->init_collection_pool_opts();
if (pg->is_primary()) {
- Mutex::Locker locker(m_perf_queries_lock);
+ std::lock_guard locker{m_perf_queries_lock};
pg->set_dynamic_perf_stats_queries(m_perf_queries);
}
void OSD::maybe_update_heartbeat_peers()
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
if (is_waiting_for_healthy() || is_active()) {
utime_t now = ceph_clock_now();
void OSD::reset_heartbeat_peers(bool all)
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(10) << "reset_heartbeat_peers" << dendl;
utime_t stale = ceph_clock_now();
stale -= cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
int from = m->get_source().num();
- heartbeat_lock.Lock();
+ heartbeat_lock.lock();
if (is_stopping()) {
- heartbeat_lock.Unlock();
+ heartbeat_lock.unlock();
m->put();
return;
}
OSDMapRef curmap = service.get_osdmap();
if (!curmap) {
- heartbeat_lock.Unlock();
+ heartbeat_lock.unlock();
m->put();
return;
}
break;
}
- heartbeat_lock.Unlock();
+ heartbeat_lock.unlock();
m->put();
}
void OSD::heartbeat_entry()
{
- std::lock_guard l(heartbeat_lock);
+ std::unique_lock l(heartbeat_lock);
if (is_stopping())
return;
while (!heartbeat_stop) {
heartbeat();
double wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
- utime_t w;
- w.set_from_double(wait);
+ auto w = ceph::make_timespan(wait);
dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
- heartbeat_cond.WaitInterval(heartbeat_lock, w);
+ heartbeat_cond.wait_for(l, w);
if (is_stopping())
return;
dout(30) << "heartbeat_entry woke up" << dendl;
void OSD::heartbeat_check()
{
- ceph_assert(heartbeat_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(heartbeat_lock));
utime_t now = ceph_clock_now();
// check for incoming heartbeats (move me elsewhere?)
void OSD::heartbeat()
{
- ceph_assert(heartbeat_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
dout(30) << "heartbeat" << dendl;
// get CPU load avg
void OSD::tick()
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(10) << "tick" << dendl;
if (is_active() || is_waiting_for_healthy()) {
void OSD::tick_without_osd_lock()
{
- ceph_assert(tick_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(tick_timer_lock));
dout(10) << "tick_without_osd_lock" << dendl;
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
// osd_lock is not being held, which means the OSD state
// might change when doing the monitor report
if (is_active() || is_waiting_for_healthy()) {
- heartbeat_lock.Lock();
- heartbeat_check();
- heartbeat_lock.Unlock();
-
- map_lock.get_read();
+ {
+ std::lock_guard l{heartbeat_lock};
+ heartbeat_check();
+ }
+ map_lock.lock_shared();
std::lock_guard l(mon_report_lock);
// mon report?
send_full_update();
send_failures();
}
- map_lock.put_read();
+ map_lock.unlock_shared();
epoch_t max_waiting_epoch = 0;
for (auto s : shards) {
} else if (is_booting()) {
_send_boot(); // resend boot message
} else {
- map_lock.get_read();
+ map_lock.lock_shared();
std::lock_guard l2(mon_report_lock);
utime_t now = ceph_clock_now();
requeue_failures();
send_failures();
- map_lock.put_read();
+ map_lock.unlock_shared();
if (is_active()) {
send_beacon(ceph::coarse_mono_clock::now());
}
boot_finisher.queue(
new FunctionContext(
[this](int r) {
- std::lock_guard l(osd_lock);
+ std::unique_lock l(osd_lock);
if (is_preboot()) {
dout(10) << __func__ << " waiting for peering work to drain"
<< dendl;
- osd_lock.Unlock();
+ l.unlock();
for (auto shard : shards) {
shard->wait_min_pg_epoch(osdmap->get_epoch());
}
- osd_lock.Lock();
+ l.lock();
}
if (is_preboot()) {
_send_boot();
void OSD::queue_want_up_thru(epoch_t want)
{
- map_lock.get_read();
+ std::shared_lock map_locker{map_lock};
epoch_t cur = osdmap->get_up_thru(whoami);
- std::lock_guard l(mon_report_lock);
+ std::lock_guard report_locker(mon_report_lock);
if (want > up_thru_wanted) {
dout(10) << "queue_want_up_thru now " << want << " (was " << up_thru_wanted << ")"
<< ", currently " << cur
<< ", currently " << cur
<< dendl;
}
- map_lock.put_read();
}
void OSD::send_alive()
{
- ceph_assert(mon_report_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(mon_report_lock));
if (!osdmap->exists(whoami))
return;
epoch_t up_thru = osdmap->get_up_thru(whoami);
dout(10) << __func__ << " " << first << ".." << last
<< ", previously requested "
<< requested_full_first << ".." << requested_full_last << dendl;
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
ceph_assert(first > 0 && last > 0);
ceph_assert(first <= last);
ceph_assert(first >= requested_full_first); // we shouldn't ever ask for older maps
void OSD::got_full_map(epoch_t e)
{
ceph_assert(requested_full_first <= requested_full_last);
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
if (requested_full_first == 0) {
dout(20) << __func__ << " " << e << ", nothing requested" << dendl;
return;
void OSD::send_failures()
{
- ceph_assert(map_lock.is_locked());
- ceph_assert(mon_report_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(map_lock));
+ ceph_assert(ceph_mutex_is_locked(mon_report_lock));
std::lock_guard l(heartbeat_lock);
utime_t now = ceph_clock_now();
while (!failure_queue.empty()) {
namespace {
class unlock_guard {
- Mutex& m;
+ ceph::mutex& m;
public:
- explicit unlock_guard(Mutex& mutex)
+ explicit unlock_guard(ceph::mutex& mutex)
: m(mutex)
{
m.unlock();
pg->queue_snap_retrim(snap);
pg->unlock();
}
- osd_lock.Lock();
+ osd_lock.lock();
if (is_stopping()) {
return;
}
// lock!
- osd_lock.Lock();
+ osd_lock.lock();
if (is_stopping()) {
- osd_lock.Unlock();
+ osd_lock.unlock();
m->put();
return true;
}
do_waiters();
_dispatch(m);
- osd_lock.Unlock();
+ osd_lock.unlock();
return true;
}
void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
{
- ceph_assert(session->session_dispatch_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock));
auto i = session->waiting_on_map.begin();
while (i != session->waiting_on_map.end()) {
void OSD::do_waiters()
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(10) << "do_waiters -- start" << dendl;
while (!finished.empty()) {
void OSD::_dispatch(Message *m)
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(20) << "_dispatch " << m << " " << *m << dendl;
switch (m->get_type()) {
// stats every time we're called. This has equivalent cost to the
// previous implementation's worst case where all PGs are busy and
// their stats are always enqueued for sending.
- RWLock::RLocker l(map_lock);
+ std::shared_lock l{map_lock};
osd_stat_t cur_stat = service.get_osd_stat();
cur_stat.os_perf_stat = store->get_cur_stats();
void OSD::note_down_osd(int peer)
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
cluster_messenger->mark_down_addrs(osdmap->get_cluster_addrs(peer));
- heartbeat_lock.Lock();
+ std::lock_guard l{heartbeat_lock};
failure_queue.erase(peer);
failure_pending.erase(peer);
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
}
heartbeat_peers.erase(p);
}
- heartbeat_lock.Unlock();
}
void OSD::note_up_osd(int peer)
}
}
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
map<epoch_t,OSDMapRef> added_maps;
map<epoch_t,bufferlist> added_maps_bl;
if (m->fsid != monc->get_fsid()) {
dout(10) << __func__ << " bailing, we are shutting down" << dendl;
return;
}
- map_lock.get_write();
+ map_lock.lock();
bool do_shutdown = false;
bool do_restart = false;
}
}
- map_lock.put_write();
+ map_lock.unlock();
check_osdmap_features();
void OSD::consume_map()
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
/** make sure the cluster is speaking in SORTBITWISE, because we don't
void OSD::activate_map()
{
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
auto priv = con->get_priv();
if (auto s = static_cast<Session*>(priv.get()); s) {
if (!is_fast_dispatch)
- s->session_dispatch_lock.Lock();
+ s->session_dispatch_lock.lock();
clear_session_waiting_on_map(s);
con->set_priv(nullptr); // break ref <-> session cycle, if any
s->con.reset();
if (!is_fast_dispatch)
- s->session_dispatch_lock.Unlock();
+ s->session_dispatch_lock.unlock();
}
return false;
}
dout(15) << "require_same_or_newer_map " << epoch
<< " (i am " << osdmap->get_epoch() << ") " << m << dendl;
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
// do they have a newer map?
if (epoch > osdmap->get_epoch()) {
// RECOVERY
void OSDService::_maybe_queue_recovery() {
- ceph_assert(recovery_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
uint64_t available_pushes;
while (!awaiting_throttle.empty() &&
_recover_now(&available_pushes)) {
// This is true for the first recovery op and when the previous recovery op
// has been scheduled in the past. The next recovery op is scheduled after
// completing the sleep from now.
- if (service.recovery_schedule_time < ceph_clock_now()) {
- service.recovery_schedule_time = ceph_clock_now();
+
+ if (auto now = ceph::real_clock::now();
+ service.recovery_schedule_time < now) {
+ service.recovery_schedule_time = now;
}
- service.recovery_schedule_time += recovery_sleep;
+ service.recovery_schedule_time += ceph::make_timespan(recovery_sleep);
service.sleep_timer.add_event_at(service.recovery_schedule_time,
- recovery_requeue_callback);
+ recovery_requeue_callback);
dout(20) << "Recovery event scheduled at "
<< service.recovery_schedule_time << dendl;
return;
void OSD::handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed)
{
- Mutex::Locker l(osd_lock);
+ std::lock_guard l{osd_lock};
if (changed.count("osd_max_backfills")) {
service.local_reserver.set_max(cct->_conf->osd_max_backfills);
service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
dout(1) << queries.size() - supported_queries.size()
<< " unsupported queries" << dendl;
}
-
{
- Mutex::Locker locker(m_perf_queries_lock);
+ std::lock_guard locker{m_perf_queries_lock};
m_perf_queries = supported_queries;
m_perf_limits = queries;
}
-
std::vector<PGRef> pgs;
_get_pgs(&pgs);
for (auto& pg : pgs) {
#include "msg/Dispatcher.h"
-#include "common/Mutex.h"
-#include "common/RWLock.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/AsyncReserver.h"
private:
// -- scrub scheduling --
- Mutex sched_scrub_lock;
+ ceph::mutex sched_scrub_lock = ceph::make_mutex("OSDService::sched_scrub_lock");
int scrubs_pending;
int scrubs_active;
private:
// -- agent shared state --
- Mutex agent_lock;
- Cond agent_cond;
+ ceph::mutex agent_lock = ceph::make_mutex("OSDService::agent_lock");
+ ceph::condition_variable agent_cond;
map<uint64_t, set<PGRef> > agent_queue;
set<PGRef>::iterator agent_queue_pos;
bool agent_valid_iterator;
}
} agent_thread;
bool agent_stop_flag;
- Mutex agent_timer_lock;
+ ceph::mutex agent_timer_lock = ceph::make_mutex("OSDService::agent_timer_lock");
SafeTimer agent_timer;
public:
agent_valid_iterator = false; // inserting higher-priority queue
set<PGRef>& nq = agent_queue[priority];
if (nq.empty())
- agent_cond.Signal();
+ agent_cond.notify_all();
nq.insert(pg);
}
std::lock_guard l(agent_lock);
ceph_assert(agent_ops > 0);
--agent_ops;
- agent_cond.Signal();
+ agent_cond.notify_all();
}
/// note start of an async (flush) op
--agent_ops;
ceph_assert(agent_oids.count(oid) == 1);
agent_oids.erase(oid);
- agent_cond.Signal();
+ agent_cond.notify_all();
}
/// check if we are operating on an object
vector<Finisher*> objecter_finishers;
// -- Watch --
- Mutex watch_lock;
+ ceph::mutex watch_lock = ceph::make_mutex("OSDService::watch_lock");
SafeTimer watch_timer;
uint64_t next_notif_id;
uint64_t get_next_id(epoch_t cur_epoch) {
}
// -- Recovery/Backfill Request Scheduling --
- Mutex recovery_request_lock;
+ ceph::mutex recovery_request_lock = ceph::make_mutex("OSDService::recovery_request_lock");
SafeTimer recovery_request_timer;
// For async recovery sleep
bool recovery_needs_sleep = true;
- utime_t recovery_schedule_time = utime_t();
+ ceph::real_clock::time_point recovery_schedule_time;
// For recovery & scrub & snap
- Mutex sleep_lock;
+ ceph::mutex sleep_lock = ceph::make_mutex("OSDService::sleep_lock");
SafeTimer sleep_timer;
// -- tids --
AsyncReserver<spg_t> remote_reserver;
// -- pg merge --
- Mutex merge_lock = {"OSD::merge_lock"};
+ ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
map<pg_t,eversion_t> ready_to_merge_source; // pg -> version
map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target; // pg -> (version,les,lec)
set<pg_t> not_ready_to_merge_source;
// -- pg_temp --
private:
- Mutex pg_temp_lock;
+ ceph::mutex pg_temp_lock = ceph::make_mutex("OSDService::pg_temp_lock");
struct pg_temp_t {
vector<int> acting;
bool forced = false;
private:
// -- pg recovery and associated throttling --
- Mutex recovery_lock;
+ ceph::mutex recovery_lock = ceph::make_mutex("OSDService::recovery_lock");
list<pair<epoch_t, PGRef> > awaiting_throttle;
utime_t defer_recovery_until;
}
// osd map cache (past osd maps)
- Mutex map_cache_lock;
+ ceph::mutex map_cache_lock = ceph::make_mutex("OSDService::map_cache_lock");
SharedLRU<epoch_t, const OSDMap> map_cache;
SimpleLRU<epoch_t, bufferlist> map_bl_cache;
SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
void shutdown();
// -- stats --
- Mutex stat_lock;
+ ceph::mutex stat_lock = ceph::make_mutex("OSDService::stat_lock");
osd_stat_t osd_stat;
uint32_t seq = 0;
// -- OSD Full Status --
private:
friend TestOpsSocketHook;
- mutable Mutex full_status_lock;
+ mutable ceph::mutex full_status_lock = ceph::make_mutex("OSDService::full_status_lock");
enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
const char *get_full_state_name(s_names s) const {
switch (s) {
// -- epochs --
private:
- mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+ // protects access to boot_epoch, up_epoch, bind_epoch
+ mutable ceph::mutex epoch_lock = ceph::make_mutex("OSDService::epoch_lock");
epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
epoch_t up_epoch; // _most_recent_ epoch we were marked up
epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
#ifdef PG_DEBUG_REFS
- Mutex pgid_lock;
+ ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
map<spg_t, int> pgid_tracker;
map<spg_t, PG*> live_pgs;
void add_pgid(spg_t pgid, PG *pg);
class OSD : public Dispatcher,
public md_config_obs_t {
/** OSD **/
- Mutex osd_lock; // global lock
+ // global lock
+ ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
SafeTimer tick_timer; // safe timer (osd_lock)
// Tick timer for those stuff that do not need osd_lock
- Mutex tick_timer_lock;
+ ceph::mutex tick_timer_lock = ceph::make_mutex("OSD::tick_timer_lock");
SafeTimer tick_timer_without_osd_lock;
std::string gss_ktfile_client{};
private:
void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
- Mutex session_waiting_lock;
+ ceph::mutex session_waiting_lock = ceph::make_mutex("OSD::session_waiting_lock");
set<SessionRef> session_waiting_for_map;
/// Caller assumes refs for included Sessions
void osdmap_subscribe(version_t epoch, bool force_request);
/** @} monc helpers */
- Mutex osdmap_subscribe_lock;
+ ceph::mutex osdmap_subscribe_lock = ceph::make_mutex("OSD::osdmap_subscribe_lock");
epoch_t latest_subscribed_epoch{0};
// -- heartbeat --
int peer;
explicit HeartbeatSession(int p) : peer(p) {}
};
- Mutex heartbeat_lock;
+ ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
map<int, int> debug_heartbeat_drops_remaining;
- Cond heartbeat_cond;
+ ceph::condition_variable heartbeat_cond;
bool heartbeat_stop;
std::atomic<bool> heartbeat_need_update;
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
void heartbeat_kick() {
std::lock_guard l(heartbeat_lock);
- heartbeat_cond.Signal();
+ heartbeat_cond.notify_all();
}
struct T_Heartbeat : public Thread {
list<OpRequestRef> finished;
void take_waiters(list<OpRequestRef>& ls) {
- ceph_assert(osd_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(osd_lock));
finished.splice(finished.end(), ls);
}
void do_waiters();
pool_pg_num_history_t pg_num_history;
- RWLock map_lock;
+ ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
list<OpRequestRef> waiting_for_osdmap;
deque<utime_t> osd_markdown_log;
}
protected:
- Mutex merge_lock = {"OSD::merge_lock"};
+ ceph::mutex merge_lock = ceph::make_mutex("OSD::merge_lock");
/// merge epoch -> target pgid -> source pgid -> pg
map<epoch_t,map<spg_t,map<spg_t,PGRef>>> merge_waiters;
void _finish_splits(set<PGRef>& pgs);
// == monitor interaction ==
- Mutex mon_report_lock;
+ ceph::mutex mon_report_lock = ceph::make_mutex("OSD::mon_report_lock");
utime_t last_mon_report;
Finisher boot_finisher;
void cancel_pending_failures();
ceph::coarse_mono_clock::time_point last_sent_beacon;
- Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+ ceph::mutex min_last_epoch_clean_lock = ceph::make_mutex("OSD::min_last_epoch_clean_lock");
epoch_t min_last_epoch_clean = 0;
// which pgs were scanned for min_lec
std::vector<pg_t> min_last_epoch_clean_pgs;
void get_perf_reports(
std::map<OSDPerfMetricQuery, OSDPerfMetricReport> *reports);
- Mutex m_perf_queries_lock = {"OSD::m_perf_queries_lock"};
+ ceph::mutex m_perf_queries_lock = ceph::make_mutex("OSD::m_perf_queries_lock");
std::list<OSDPerfMetricQuery> m_perf_queries;
std::map<OSDPerfMetricQuery, OSDPerfMetricLimits> m_perf_limits;
};
finish = ceph_clock_now();
complete();
}
- cond.Signal();
+ cond.notify_all();
fin = onfinish;
onfinish = nullptr;
}
bool aborted = false;
Context *onfinish = nullptr;
- Mutex lock = {"ParallelPGMapper::Job::lock"};
- Cond cond;
+ ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
+ ceph::condition_variable cond;
Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
virtual ~Job() {
virtual void complete() = 0;
void set_finish_event(Context *fin) {
- lock.Lock();
+ lock.lock();
if (shards == 0) {
// already done.
- lock.Unlock();
+ lock.unlock();
fin->complete(0);
} else {
// set finisher
onfinish = fin;
- lock.Unlock();
+ lock.unlock();
}
}
bool is_done() {
return finish - start;
}
void wait() {
- std::lock_guard l(lock);
- while (shards > 0) {
- cond.Wait(lock);
- }
+ std::unique_lock l(lock);
+ cond.wait(l, [this] { return shards == 0; });
}
bool wait_for(double duration) {
utime_t until = start;
until += duration;
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
while (shards > 0) {
if (ceph_clock_now() >= until) {
return false;
}
- cond.Wait(lock);
+ cond.wait(l);
}
return true;
}
void abort() {
Context *fin = nullptr;
{
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
aborted = true;
fin = onfinish;
onfinish = nullptr;
- while (shards > 0) {
- cond.Wait(lock);
- }
+ cond.wait(l, [this] { return shards == 0; });
}
if (fin) {
fin->complete(-ECANCELED);
scrub_queued(false),
recovery_queued(false),
recovery_ops_active(0),
- heartbeat_peer_lock("PG::heartbeat_peer_lock"),
backfill_reserving(false),
- pg_stats_publish_lock("PG::pg_stats_publish_lock"),
pg_stats_publish_valid(false),
finish_sync_event(NULL),
- backoff_lock("PG::backoff_lock"),
scrub_after_recovery(false),
active_pushes(0),
recovery_state(
void PG::lock(bool no_lockdep) const
{
- _lock.Lock(no_lockdep);
+ _lock.lock(no_lockdep);
// if we have unrecorded dirty state with the lock dropped, there is a bug
ceph_assert(!recovery_state.debug_has_dirty_state());
{
dout(10) << __func__ << " " << *b << dendl;
std::lock_guard l(backoff_lock);
- ceph_assert(b->lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
ceph_assert(b->pg == this);
auto p = backoffs.find(b->begin);
// may race with release_backoffs()
void PG::update_heartbeat_peers(set<int> new_peers)
{
bool need_update = false;
- heartbeat_peer_lock.Lock();
+ heartbeat_peer_lock.lock();
if (new_peers == heartbeat_peers) {
dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
} else {
heartbeat_peers.swap(new_peers);
need_update = true;
}
- heartbeat_peer_lock.Unlock();
+ heartbeat_peer_lock.unlock();
if (need_update)
osd->need_heartbeat_peer_update();
if (!is_primary())
return;
- pg_stats_publish_lock.Lock();
-
+ std::lock_guard l{pg_stats_publish_lock};
auto stats = recovery_state.prepare_stats_for_publish(
pg_stats_publish_valid,
pg_stats_publish,
pg_stats_publish = stats.value();
pg_stats_publish_valid = true;
}
-
- pg_stats_publish_lock.Unlock();
}
void PG::clear_publish_stats()
{
dout(15) << "clear_stats" << dendl;
- pg_stats_publish_lock.Lock();
+ std::lock_guard l{pg_stats_publish_lock};
pg_stats_publish_valid = false;
- pg_stats_publish_lock.Unlock();
}
/**
// This lock protects not only the stats OSDService but also setting the
// pg primary_bytes. That's why we don't immediately unlock
- Mutex::Locker l(osd->stat_lock);
+ std::lock_guard l{osd->stat_lock};
osd_stat_t cur_stat = osd->osd_stat;
if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
(rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
// wait for repair to apply to avoid confusing other bits of the system.
{
- Cond my_cond;
- Mutex my_lock("PG::_scan_snaps my_lock");
+ ceph::condition_variable my_cond;
+ ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
int r = 0;
bool done;
t.register_on_applied_sync(
- new C_SafeCond(&my_lock, &my_cond, &done, &r));
+ new C_SafeCond(my_lock, my_cond, &done, &r));
r = osd->store->queue_transaction(ch, std::move(t));
if (r != 0) {
derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
<< dendl;
} else {
- my_lock.Lock();
- while (!done)
- my_cond.Wait(my_lock);
- my_lock.Unlock();
+ std::unique_lock l{my_lock};
+ my_cond.wait(l, [&done] { return done;});
}
}
}
unlock();
});
- utime_t delete_schedule_time = ceph_clock_now();
- delete_schedule_time += osd_delete_sleep;
- Mutex::Locker l(osd->sleep_lock);
+ auto delete_schedule_time = ceph::real_clock::now();
+ delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
+ std::lock_guard l{osd->sleep_lock};
osd->sleep_timer.add_event_at(delete_schedule_time,
- delete_requeue_callback);
+ delete_requeue_callback);
dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
return;
}
void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
{
- pg_stats_publish_lock.Lock();
+ std::lock_guard l{pg_stats_publish_lock};
if (pg_stats_publish_valid) {
f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
}
- pg_stats_publish_lock.Unlock();
}
void PG::with_heartbeat_peers(std::function<void(int)> f)
{
- heartbeat_peer_lock.Lock();
+ std::lock_guard l{heartbeat_peer_lock};
for (auto p : heartbeat_peers) {
f(p);
}
for (auto p : probe_targets) {
f(p);
}
- heartbeat_peer_lock.Unlock();
}
uint64_t PG::get_min_alloc_size() const {
per_state_info() : enter(0), exit(0), events(0) {}
};
map<const char *,per_state_info> info;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock");
public:
- PGRecoveryStats() : lock("PGRecoverStats::lock") {}
+ PGRecoveryStats() = default;
void reset() {
std::lock_guard l(lock);
void unlock() const {
//generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
ceph_assert(!recovery_state.debug_has_dirty_state());
- _lock.Unlock();
+ _lock.unlock();
}
bool is_locked() const {
return _lock.is_locked();
// get() should be called on pointer copy (to another thread, etc.).
// put() should be called on destruction of some previously copied pointer.
// unlock() when done with the current pointer (_most common_).
- mutable Mutex _lock = {"PG::_lock"};
+ mutable ceph::mutex _lock = ceph::make_mutex("PG::_lock");
std::atomic<unsigned int> ref{0};
#ifdef PG_DEBUG_REFS
- Mutex _ref_id_lock = {"PG::_ref_id_lock"};
+ ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock");
map<uint64_t, string> _live_ids;
map<string, uint64_t> _tag_counts;
uint64_t _ref_id = 0;
void set_probe_targets(const set<pg_shard_t> &probe_set) override;
void clear_probe_targets() override;
- Mutex heartbeat_peer_lock;
+ ceph::mutex heartbeat_peer_lock =
+ ceph::make_mutex("PG::heartbeat_peer_lock");
set<int> heartbeat_peers;
set<int> probe_targets;
// The value of num_bytes could be negative,
// but we don't let info.stats.stats.sum.num_bytes go negative.
void add_num_bytes(int64_t num_bytes) {
- ceph_assert(_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(_lock));
if (num_bytes) {
recovery_state.update_stats(
[num_bytes](auto &history, auto &stats) {
}
}
void sub_num_bytes(int64_t num_bytes) {
- ceph_assert(_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(_lock));
ceph_assert(num_bytes >= 0);
if (num_bytes) {
recovery_state.update_stats(
// Only used in testing so not worried about needing the PG lock here
int64_t get_stats_num_bytes() {
- Mutex::Locker l(_lock);
+ std::lock_guard l{_lock};
int num_bytes = info.stats.stats.sum.num_bytes;
if (pool.info.is_erasure()) {
num_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
object_stat_collection_t unstable_stats;
// publish stats
- Mutex pg_stats_publish_lock;
+ ceph::mutex pg_stats_publish_lock =
+ ceph::make_mutex("PG::pg_stats_publish_lock");
bool pg_stats_publish_valid;
pg_stat_t pg_stats_publish;
friend class C_DeleteMore;
// -- backoff --
- Mutex backoff_lock; // orders inside Backoff::lock
+ ceph::mutex backoff_lock = // orders inside Backoff::lock
+ ceph::make_mutex("PG::backoff_lock");
map<hobject_t,set<BackoffRef>> backoffs;
void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end);
PGBackend::build_pg_backend(
_pool.info, ec_profile, this, coll_t(p), ch, o->store, cct)),
object_contexts(o->cct, o->cct->_conf->osd_pg_object_context_cache_count),
- snapset_contexts_lock("PrimaryLogPG::snapset_contexts_lock"),
new_backfill(false),
temp_seq(0),
snap_trimmer_machine(this)
SharedLRU<hobject_t, ObjectContext> object_contexts;
// map from oid.snapdir() to SnapSetContext *
map<hobject_t, SnapSetContext*> snapset_contexts;
- Mutex snapset_contexts_lock;
+ ceph::mutex snapset_contexts_lock =
+ ceph::make_mutex("PrimaryLogPG::snapset_contexts_lock");
// debug order that client ops are applied
map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
_register_snapset_context(ssc);
}
void _register_snapset_context(SnapSetContext *ssc) {
- ceph_assert(snapset_contexts_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(snapset_contexts_lock));
if (!ssc->registered) {
ceph_assert(snapset_contexts.count(ssc->oid) == 0);
ssc->registered = true;
#define CEPH_OSD_SESSION_H
#include "common/RefCountedObj.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "global/global_context.h"
#include "include/spinlock.h"
#include "OSDCap.h"
}
}
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("Backoff::lock");
// NOTE: the owning PG and session are either
// - *both* set, or
// - both null (teardown), or
: RefCountedObject(g_ceph_context, 0),
pgid(pgid),
id(i),
- lock("Backoff::lock"),
pg(pg),
session(s),
begin(b),
entity_addr_t socket_addr;
WatchConState wstate;
- Mutex session_dispatch_lock;
+ ceph::mutex session_dispatch_lock =
+ ceph::make_mutex("Session::session_dispatch_lock");
boost::intrusive::list<OpRequest> waiting_on_map;
ceph::spinlock sent_epoch_lock;
epoch_t last_sent_epoch;
/// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
- Mutex backoff_lock;
+ ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock");
std::atomic<int> backoff_count= {0}; ///< simple count of backoffs
map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs;
con(con_),
socket_addr(con_->get_peer_socket_addr()),
wstate(cct),
- session_dispatch_lock("Session::session_dispatch_lock"),
- last_sent_epoch(0),
- backoff_lock("Session::backoff_lock")
+ last_sent_epoch(0)
{}
entity_addr_t& get_peer_socket_addr() {
// called by PG::release_*_backoffs and PG::clear_backoffs()
void rm_backoff(BackoffRef b) {
std::lock_guard l(backoff_lock);
- ceph_assert(b->lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
ceph_assert(b->session == this);
auto i = backoffs.find(b->pgid);
if (i != backoffs.end()) {
notify_id(notify_id),
version(version),
osd(osd),
- cb(NULL),
- lock("Notify::lock") {}
+ cb(NULL) {}
NotifyRef Notify::makeNotifyRef(
ConnectionRef client,
public:
explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {}
void finish(int) override {
- notif->osd->watch_lock.Unlock();
- notif->lock.Lock();
+ notif->osd->watch_lock.unlock();
+ notif->lock.lock();
if (!canceled)
notif->do_timeout(); // drops lock
else
- notif->lock.Unlock();
- notif->osd->watch_lock.Lock();
+ notif->lock.unlock();
+ notif->osd->watch_lock.lock();
}
void cancel() override {
- ceph_assert(notif->lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked(notif->lock));
canceled = true;
}
};
void Notify::do_timeout()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked(lock));
dout(10) << "timeout" << dendl;
cb = nullptr;
if (is_discarded()) {
- lock.Unlock();
+ lock.unlock();
return;
}
ceph_assert(complete);
set<WatchRef> _watchers;
_watchers.swap(watchers);
- lock.Unlock();
+ lock.unlock();
for (set<WatchRef>::iterator i = _watchers.begin();
i != _watchers.end();
void Notify::register_cb()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked(lock));
{
- osd->watch_lock.Lock();
+ std::lock_guard l{osd->watch_lock};
cb = new NotifyTimeoutCB(self.lock());
if (!osd->watch_timer.add_event_after(timeout, cb)) {
cb = nullptr;
}
- osd->watch_lock.Unlock();
}
}
void Notify::unregister_cb()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked(lock));
if (!cb)
return;
cb->cancel();
{
- osd->watch_lock.Lock();
+ std::lock_guard l{osd->watch_lock};
osd->watch_timer.cancel_event(cb);
cb = nullptr;
- osd->watch_lock.Unlock();
}
}
OSDService *osd(watch->osd);
ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl;
boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg);
- osd->watch_lock.Unlock();
+ osd->watch_lock.unlock();
pg->lock();
watch->cb = nullptr;
if (!watch->is_discarded() && !canceled)
watch->pg->handle_watch_timeout(watch);
delete this; // ~Watch requires pg lock!
pg->unlock();
- osd->watch_lock.Lock();
+ osd->watch_lock.lock();
}
};
OSDService *osd;
CancelableContext *cb;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("Notify::lock");
/// (gid,cookie) -> reply_bl for everyone who acked the notify
std::multimap<std::pair<uint64_t,uint64_t>, ceph::buffer::list> notify_replies;
* Lives in the Session object of an OSD connection
*/
class WatchConState {
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("WatchConState");
std::set<WatchRef> watches;
public:
CephContext* cct;
- explicit WatchConState(CephContext* cct) : lock("WatchConState"), cct(cct) {}
+ explicit WatchConState(CephContext* cct) : cct(cct) {}
/// Add a watch
void addWatch(