Throttle::Throttle(CephContext *cct, std::string n, int64_t m)
: cct(cct), name(n),
- count(0), max(m),
+ max(m),
lock("Throttle::lock")
{
assert(m >= 0);
logger = b.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
- logger->set(l_throttle_max, max);
+ logger->set(l_throttle_max, max.read());
}
Throttle::~Throttle()
void Throttle::_reset_max(int64_t m)
{
- if (m < max && !cond.empty())
+ assert(lock.is_locked());
+ if (m < ((int64_t)max.read()) && !cond.empty())
cond.front()->SignalOne();
- max = m;
- logger->set(l_throttle_max, max);
+ logger->set(l_throttle_max, m);
+ max.set((size_t)m);
}
bool Throttle::_wait(int64_t c)
assert(m > 0);
_reset_max(m);
}
- ldout(cct, 5) << "wait" << dendl;
+ ldout(cct, 10) << "wait" << dendl;
return _wait(0);
}
int64_t Throttle::take(int64_t c)
{
assert(c >= 0);
- Mutex::Locker l(lock);
- ldout(cct, 5) << "take " << c << dendl;
- count += c;
+ ldout(cct, 10) << "take " << c << dendl;
+ {
+ Mutex::Locker l(lock);
+ count.add(c);
+ }
logger->inc(l_throttle_take);
logger->inc(l_throttle_take_sum, c);
- logger->set(l_throttle_val, count);
- return count;
+ logger->set(l_throttle_val, count.read());
+ return count.read();
}
bool Throttle::get(int64_t c, int64_t m)
{
assert(c >= 0);
- Mutex::Locker l(lock);
- ldout(cct, 5) << "get " << c << " (" << count << " -> " << (count + c) << ")" << dendl;
- if (m) {
- assert(m > 0);
- _reset_max(m);
+ ldout(cct, 10) << "get " << c << " (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
+ bool waited = false;
+ {
+ Mutex::Locker l(lock);
+ if (m) {
+ assert(m > 0);
+ _reset_max(m);
+ }
+ waited = _wait(c);
+ count.add(c);
}
- bool waited = _wait(c);
- count += c;
logger->inc(l_throttle_get);
logger->inc(l_throttle_get_sum, c);
- logger->set(l_throttle_val, count);
+ logger->set(l_throttle_val, count.read());
return waited;
}
assert (c >= 0);
Mutex::Locker l(lock);
if (_should_wait(c) || !cond.empty()) {
- ldout(cct, 2) << "get_or_fail " << c << " failed" << dendl;
+ ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
logger->inc(l_throttle_get_or_fail_fail);
return false;
} else {
- ldout(cct, 5) << "get_or_fail " << c << " success (" << count << " -> " << (count + c) << ")" << dendl;
- count += c;
+ ldout(cct, 10) << "get_or_fail " << c << " success (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
+ count.add(c);
logger->inc(l_throttle_get_or_fail_success);
logger->inc(l_throttle_get);
logger->inc(l_throttle_get_sum, c);
- logger->set(l_throttle_val, count);
+ logger->set(l_throttle_val, count.read());
return true;
}
}
int64_t Throttle::put(int64_t c)
{
assert(c >= 0);
+ ldout(cct, 10) << "put " << c << " (" << count.read() << " -> " << (count.read()-c) << ")" << dendl;
Mutex::Locker l(lock);
- ldout(cct, 5) << "put " << c << " (" << cout << " -> " << (count-c) << ")" << dendl;
if (c) {
if (!cond.empty())
cond.front()->SignalOne();
- count -= c;
- assert(count >= 0); //if count goes negative, we failed somewhere!
+ assert(((int64_t)count.read()) >= c); //if count goes negative, we failed somewhere!
+ count.sub(c);
logger->inc(l_throttle_put);
logger->inc(l_throttle_put_sum, c);
- logger->set(l_throttle_val, count);
+ logger->set(l_throttle_val, count.read());
}
- return count;
+ return count.read();
}
#include "Mutex.h"
#include "Cond.h"
#include <list>
+#include "include/atomic.h"
class CephContext;
class PerfCounters;
CephContext *cct;
std::string name;
PerfCounters *logger;
- int64_t count, max;
+ ceph::atomic_t count, max;
Mutex lock;
list<Cond*> cond;
private:
void _reset_max(int64_t m);
bool _should_wait(int64_t c) {
+ int64_t m = max.read();
+ int64_t cur = count.read();
return
- max &&
- ((c <= max && count + c > max) || // normally stay under max
- (c >= max && count > max)); // except for large c
+ m &&
+ ((c <= m && cur + c > m) || // normally stay under max
+ (c >= m && cur > m)); // except for large c
}
bool _wait(int64_t c);
public:
int64_t get_current() {
- Mutex::Locker l(lock);
- return count;
+ return count.read();
}
- int64_t get_max() { return max; }
+ int64_t get_max() { return max.read(); }
bool wait(int64_t m = 0);