#include "common/ceph_context.h"
#include "common/valgrind.h"
+#include <atomic>
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
struct RefCountedObject {
private:
mutable atomic_t nref;
bool RGWDataChangesLog::going_down()
{
- return (down_flag.read() != 0);
+ return down_flag;
}
RGWDataChangesLog::~RGWDataChangesLog() {
- down_flag.set(1);
+ down_flag = true;
renew_thread->stop();
renew_thread->join();
delete renew_thread;
RWLock modified_lock;
map<int, set<string> > modified_shards;
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
struct ChangeStatus {
real_time cur_expiration;
-
#include "common/ceph_json.h"
#include "rgw_coroutine.h"
#include "rgw_boost_asio_yield.h"
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
#define dout_subsys ceph_subsys_rgw
Mutex::Locker l(lock);
while (complete_reqs.empty()) {
cond.Wait(lock);
- if (going_down.read() != 0) {
+ if (going_down) {
return -ECANCELED;
}
}
for (auto cn : cns) {
cn->unregister();
}
- going_down.set(1);
+ going_down = true;
cond.Signal();
}
bool canceled = false; // set on going_down
RGWCoroutinesEnv env;
- uint64_t run_context = run_context_count.inc();
+ uint64_t run_context = ++run_context_count;
lock.get_write();
set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
env.manager = this;
env.scheduled_stacks = &scheduled_stacks;
- for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down.read();) {
+ for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
lock.get_write();
RGWCoroutinesStack *stack = *iter;
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
}
- if (going_down.read() > 0) {
+ if (going_down) {
ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
ret = -ECANCELED;
canceled = true;
}
lock.get_write();
- if (!context_stacks.empty() && !going_down.read()) {
+ if (!context_stacks.empty() && !going_down) {
JSONFormatter formatter(true);
formatter.open_array_section("context_stacks");
for (auto& s : context_stacks) {
lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
formatter.flush(*_dout);
*_dout << dendl;
- assert(context_stacks.empty() || going_down.read()); // assert on deadlock
+ assert(context_stacks.empty() || going_down); // assert on deadlock
}
for (auto stack : context_stacks) {
#include "rgw_common.h"
#include "rgw_boost_asio_coroutine.h"
+#include <atomic>
+
#define RGW_ASYNC_OPS_MGR_WINDOW 100
class RGWCoroutinesStack;
SafeTimer timer;
- atomic_t going_down;
+ std::atomic<bool> going_down = { false };
map<void *, void *> waiters;
class RGWCoroutinesManager {
CephContext *cct;
- atomic_t going_down;
+ std::atomic<bool> going_down = { false };
- atomic64_t run_context_count;
+ std::atomic<int64_t> run_context_count = { 0 };
map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
RWLock lock;
int run(list<RGWCoroutinesStack *>& ops);
int run(RGWCoroutine *op);
void stop() {
- if (going_down.inc() == 1) {
+ bool expected = false;
+ if (going_down.compare_exchange_strong(expected, true)) {
completion_mgr->go_down();
}
}
}
void RGWAsyncRadosProcessor::stop() {
- going_down.set(1);
+ going_down = true;
m_tp.drain(&req_wq);
m_tp.stop();
for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
return set_cr_done();
}
reenter(this) {
- while (!going_down.read()) {
+ while (!going_down) {
yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
#include "common/WorkQueue.h"
#include "common/Throttle.h"
+#include <atomic>
+
class RGWAsyncRadosRequest : public RefCountedObject {
RGWCoroutine *caller;
RGWAioCompletionNotifier *notifier;
class RGWAsyncRadosProcessor {
deque<RGWAsyncRadosRequest *> m_req_queue;
- atomic_t going_down;
+ std::atomic<bool> going_down = { false };
protected:
RGWRados *store;
ThreadPool m_tp;
void queue(RGWAsyncRadosRequest *req);
bool is_going_down() {
- return (going_down.read() != 0);
+ return going_down;
}
};
int interval;
Mutex lock;
- atomic_t going_down;
+ std::atomic<bool> going_down = { false };
bool locked{false};
RGWCoroutine *caller;
}
void go_down() {
- going_down.set(1);
+ going_down = true;
wakeup();
}
#include "rgw_file.h"
#include "rgw_lib_frontend.h"
+#include <atomic>
+
#define dout_subsys ceph_subsys_rgw
using namespace rgw;
const string RGWFileHandle::root_name = "/";
- atomic<uint32_t> RGWLibFS::fs_inst_counter;
+ std::atomic<uint32_t> RGWLibFS::fs_inst_counter;
uint32_t RGWLibFS::write_completion_interval_s = 10;
RGWUserInfo user;
RGWAccessKey key; // XXXX acc_key
- static atomic<uint32_t> fs_inst_counter;
+ static std::atomic<uint32_t> fs_inst_counter;
static uint32_t write_completion_interval_s;
std::string fsid;
bool RGWGC::going_down()
{
- return (down_flag.read() != 0);
+ return down_flag;
}
void RGWGC::start_processor()
void RGWGC::stop_processor()
{
- down_flag.set(1);
+ down_flag = true;
if (worker) {
worker->stop();
worker->join();
#include "include/types.h"
-#include "include/atomic.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "rgw_rados.h"
#include "cls/rgw/cls_rgw_types.h"
+#include <atomic>
+
class RGWGC {
CephContext *cct;
RGWRados *store;
int max_objs;
string *obj_names;
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
int tag_index(const string& tag);
#include "rgw_coroutine.h"
+#include <atomic>
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
curl_slist *h;
uint64_t id;
int ret;
- atomic_t done;
+ std::atomic<bool> done = { false };
RGWHTTPClient *client;
void *user_info;
bool registered;
easy_handle = NULL;
h = NULL;
- done.set(1);
+ done = true;
cond.Signal();
}
bool is_done() {
- return done.read() != 0;
+ return done;
}
int get_retcode() {
void RGWHTTPManager::stop()
{
- if (is_stopped.read()) {
+ if (is_stopped) {
return;
}
- is_stopped.set(1);
+ is_stopped = true;
if (is_threaded) {
- going_down.set(1);
+ going_down = true;
signal_thread();
reqs_thread->join();
delete reqs_thread;
ldout(cct, 20) << __func__ << ": start" << dendl;
- while (!going_down.read()) {
+ while (!going_down) {
int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
if (ret < 0) {
dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
#include "common/RWLock.h"
#include "common/Cond.h"
-#include "include/atomic.h"
#include "rgw_common.h"
#include "rgw_string.h"
+#include <atomic>
+
using param_pair_t = pair<string, string>;
using param_vec_t = vector<param_pair_t>;
string last_url;
bool verify_ssl; // Do not validate self signed certificates, default to false
- atomic_t stopped;
+ std::atomic<unsigned> stopped { 0 };
protected:
CephContext *cct;
RGWCompletionManager *completion_mgr;
void *multi_handle;
bool is_threaded;
- atomic_t going_down;
- atomic_t is_stopped;
+ std::atomic<unsigned> going_down { 0 };
+ std::atomic<unsigned> is_stopped { 0 };
RWLock reqs_lock;
map<uint64_t, rgw_http_req_data *> reqs;
bool TokenCache::going_down() const
{
- return (down_flag.read() != 0);
+ return down_flag;
}
void* TokenCache::RevokeThread::entry()
#include "rgw_http_client.h"
#include "common/Cond.h"
+#include <atomic>
+
int rgw_open_cms_envelope(CephContext *cct,
const std::string& src,
std::string& dst); /* out */
list<string>::iterator lru_iter;
};
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
class RevokeThread : public Thread {
friend class TokenCache;
}
~TokenCache() {
- down_flag.set(1);
+ down_flag = true;
revocator.stop();
revocator.join();
void RGWLC::stop_processor()
{
- down_flag.set(1);
+ down_flag = true;
if (worker) {
worker->stop();
worker->join();
bool RGWLC::going_down()
{
- return (down_flag.read() != 0);
+ return down_flag;
}
bool RGWLC::LCWorker::should_work(utime_t& now)
#include "common/debug.h"
#include "include/types.h"
-#include "include/atomic.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "rgw_multi.h"
#include "cls/rgw/cls_rgw_types.h"
+#include <atomic>
+
using namespace std;
#define HASH_PRIME 7877
#define MAX_ID_LEN 255
RGWRados *store;
int max_objs;
string *obj_names;
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
string cookie;
class LCWorker : public Thread {
#include "rgw_loadgen.h"
#include "rgw_client_io.h"
+#include <atomic>
+
#define dout_subsys ceph_subsys_rgw
extern void signal_shutdown();
vector<string> buckets(num_buckets);
- atomic_t failed;
+ std::atomic<long int> failed = { 0 };
for (i = 0; i < num_buckets; i++) {
buckets[i] = "/loadgen";
string *objs = new string[num_objs];
- if (failed.read()) {
+ if (failed) {
derr << "ERROR: bucket creation failed" << dendl;
goto done;
}
checkpoint();
- if (failed.read()) {
+ if (failed) {
derr << "ERROR: bucket creation failed" << dendl;
goto done;
}
void RGWLoadGenProcess::gen_request(const string& method,
const string& resource,
- int content_length, atomic_t* fail_flag)
+ int content_length, std::atomic<long int>* fail_flag)
{
RGWLoadGenRequest* req =
new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
dout(20) << "process_request() returned " << ret << dendl;
if (req->fail_flag) {
- req->fail_flag->inc();
+ req->fail_flag++;
}
}
#include <map>
#include <string>
#include <vector>
+#include <atomic>
#include "include/types.h"
#include "common/BackTrace.h"
class RGWProcess;
static int signal_fd[2] = {0, 0};
-static atomic_t disable_signal_fd;
+static std::atomic<int64_t> disable_signal_fd = { 0 };
void signal_shutdown()
{
- if (!disable_signal_fd.read()) {
+ if (!disable_signal_fd) {
int val = 0;
int ret = write(signal_fd[0], (char *)&val, sizeof(val));
if (ret < 0) {
bool RGWObjectExpirer::going_down()
{
- return (down_flag.read() != 0);
+ return down_flag;
}
void RGWObjectExpirer::start_processor()
void RGWObjectExpirer::stop_processor()
{
- down_flag.set(1);
+ down_flag = true;
if (worker) {
worker->stop();
worker->join();
#include <iostream>
#include <sstream>
#include <string>
+#include <atomic>
#include "auth/Crypto.h"
#include "rgw_usage.h"
#include "rgw_replica_log.h"
+#include <atomic>
+
class RGWObjectExpirer {
protected:
RGWRados *store;
};
OEWorker *worker;
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
public:
explicit RGWObjectExpirer(RGWRados *_store)
#include "common/WorkQueue.h"
#include "common/Throttle.h"
+#include <atomic>
+
#if !defined(dout_subsys)
#define dout_subsys ceph_subsys_rgw
#define def_dout_subsys
void checkpoint();
void handle_request(RGWRequest* req) override;
void gen_request(const string& method, const string& resource,
- int content_length, atomic_t* fail_flag);
+ int content_length, std::atomic<int64_t>* fail_flag);
void set_access_key(RGWAccessKey& key) { access_key = key; }
};
#include "rgw_bucket.h"
#include "rgw_user.h"
+#include <atomic>
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
}
class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
RWLock rwlock;
map<rgw_bucket, rgw_user> modified_buckets;
}
bool going_down() {
- return (down_flag.read() != 0);
+ return down_flag;
}
void stop() {
- down_flag.set(1);
+ down_flag = true;
rwlock.get_write();
stop_thread(&buckets_sync_thread);
rwlock.unlock();
#ifndef CEPH_RGW_QUOTA_H
#define CEPH_RGW_QUOTA_H
-
#include "include/utime.h"
-#include "include/atomic.h"
#include "common/lru_map.h"
+#include <atomic>
+
static inline int64_t rgw_rounded_kb(int64_t bytes)
{
return (bytes + 1023) / 1024;
#include <string>
#include <iostream>
#include <vector>
+#include <atomic>
#include <list>
#include <map>
#include "auth/Crypto.h" // get_random_bytes()
#include "compressor/Compressor.h"
+#include <atomic>
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
CephContext *cct;
RGWRados *store;
- atomic_t down_flag;
+ std::atomic<bool> down_flag = { false };
string thread_name;
virtual int init() { return 0; }
virtual int process() = 0;
- bool going_down() { return down_flag.read() != 0; }
+ bool going_down() { return down_flag; }
+
void start();
void stop();
};
void RGWRadosThread::stop()
{
- down_flag.set(1);
+ down_flag = true;
stop_process();
if (worker) {
worker->stop();
if (delim_pos >= 0) {
string s = cur_marker.name.substr(0, delim_pos);
s.append(bigger_than_delim);
- cur_marker.set(s);
+ cur_marker = s;
}
}
string skip_after_delim;
while (truncated && count <= max) {
if (skip_after_delim > cur_marker.name) {
- cur_marker.set(skip_after_delim);
+ cur_marker = skip_after_delim;
ldout(cct, 20) << "setting cur_marker=" << cur_marker.name << "[" << cur_marker.instance << "]" << dendl;
}
std::map<string, rgw_bucket_dir_entry> ent_map;
Mutex data_lock;
list<get_obj_aio_data> aio_data;
RGWGetDataCB *client_cb;
- atomic_t cancelled;
- atomic_t err_code;
+ std::atomic<bool> cancelled = { false };
+ std::atomic<int64_t> err_code = { 0 };
Throttle throttle;
list<bufferlist> read_list;
throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
~get_obj_data() override { }
void set_cancelled(int r) {
- cancelled.set(1);
- err_code.set(r);
+ cancelled = true;
+ err_code = r;
}
bool is_cancelled() {
- return cancelled.read() == 1;
+ return cancelled;
}
int get_err_code() {
- return err_code.read();
+ return err_code;
}
int wait_next_io(bool *done) {
if (filter && !filter->filter(oid, oid))
continue;
- e.key.set(oid);
+ e.key = oid;
objs.push_back(e);
}
void get_bucket_instance_ids(const RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
- atomic64_t max_req_id;
+ std::atomic<int64_t> max_req_id = { 0 };
Mutex lock;
Mutex watchers_lock;
SafeTimer *timer;
RGWPeriod current_period;
public:
- RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
+ RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL),
data_notifier(NULL), meta_sync_processor_thread(NULL),
meta_mgr(NULL), data_log(NULL) {}
uint64_t get_new_req_id() {
- return max_req_id.inc();
+ return ++max_req_id;
}
librados::IoCtx* get_lc_pool_ctx() {
#if defined(WITH_RADOSGW_FCGI_FRONTEND)
#include "rgw_fcgi.h"
#endif
+
#include "common/QueueRing.h"
+#include <atomic>
+
struct RGWRequest
{
uint64_t id;
string method;
string resource;
int content_length;
- atomic_t* fail_flag;
+ std::atomic<int64_t>* fail_flag = nullptr;
RGWLoadGenRequest(uint64_t req_id, const string& _m, const string& _r, int _cl,
- atomic_t *ff)
+ std::atomic<int64_t> *ff)
: RGWRequest(req_id), method(_m), resource(_r), content_length(_cl),
fail_flag(ff) {}
};
return -EIO;
}
- int i = counter.inc();
+ int i = ++counter;
endpoint = endpoints[i % endpoints.size()];
return 0;
return endpoint;
}
- int i = counter.inc();
+ int i = ++counter;
endpoint = endpoints[i % endpoints.size()];
return endpoint;
#include "common/ceph_json.h"
#include "common/RefCountedObj.h"
+#include <atomic>
class CephContext;
class RGWRados;
RGWAccessKey key;
string self_zone_group;
string remote_id;
- atomic_t counter;
+ std::atomic<int64_t> counter = { 0 };
public:
::encode(info, bl);
store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
- uint32_t shard_id = counter.inc() % num_shards;
+ uint32_t shard_id = ++counter % num_shards;
return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
void RGWRemoteMetaLog::finish()
{
- going_down.set(1);
+ going_down = true;
stop();
}
// get shard count and oldest log period from master
rgw_mdlog_info mdlog_info;
for (;;) {
- if (going_down.read()) {
+ if (going_down) {
ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
return 0;
}
rgw_meta_sync_status sync_status;
do {
- if (going_down.read()) {
+ if (going_down) {
ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
return 0;
}
ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
return -EIO;
}
- } while (!going_down.read());
+ } while (!going_down);
return 0;
}
#include "include/stringify.h"
#include "common/RWLock.h"
+#include <atomic>
+
#define ERROR_LOGGER_SHARDS 32
#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
vector<string> oids;
int num_shards;
- atomic_t counter;
+ std::atomic<int64_t> counter = { 0 };
public:
RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards);
RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message);
void init_sync_env(RGWMetaSyncEnv *env);
int store_sync_info(const rgw_meta_sync_info& sync_info);
- atomic_t going_down;
+ std::atomic<bool> going_down = { false };
public:
RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,