OPTION(rgw_md_notify_interval_msec, OPT_INT, 200) // metadata changes notification interval to followers
OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admin) spawns the sync thread
OPTION(rgw_sync_lease_period, OPT_INT, 120) // time in second for lease that rgw takes on a specific log (or log shard)
+OPTION(rgw_sync_log_trim_interval, OPT_INT, 1200) // time in seconds between attempts to trim sync logs
OPTION(rgw_period_push_interval, OPT_DOUBLE, 2) // seconds to wait before retrying "period push"
OPTION(rgw_period_push_interval_max, OPT_DOUBLE, 30) // maximum interval after exponential backoff
#include "rgw_coroutine.h"
+#include "rgw_boost_asio_yield.h"
+#undef fork // fails to compile RGWPeriod::fork() below
+
#include "common/Clock.h"
#include "include/rados/librados.hpp"
}
};
+class RGWSyncLogTrimThread : public RGWSyncProcessorThread
+{
+ RGWCoroutinesManager crs;
+ RGWRados *store;
+ RGWHTTPManager http;
+ const utime_t trim_interval;
+
+ uint64_t interval_msec() override { return 0; }
+ void stop_process() override { crs.stop(); }
+public:
+ RGWSyncLogTrimThread(RGWRados *store, int interval)
+ : RGWSyncProcessorThread(store), crs(store->ctx(), nullptr), store(store),
+ http(store->ctx(), crs.get_completion_mgr()),
+ trim_interval(interval, 0)
+ {}
+
+ int init() override {
+ return http.set_threaded();
+ }
+ int process() override {
+ crs.run(new RGWDataLogTrimCR(store, &http,
+ cct->_conf->rgw_data_log_num_shards,
+ trim_interval));
+ return 0;
+ }
+};
+
void RGWRados::wakeup_meta_sync_shards(set<int>& shard_ids)
{
Mutex::Locker l(meta_sync_thread_lock);
RGWDataSyncProcessorThread *thread = iter.second;
thread->stop();
}
+ if (sync_log_trimmer) {
+ sync_log_trimmer->stop();
+ }
}
if (async_rados) {
async_rados->stop();
delete thread;
}
data_sync_processor_threads.clear();
+ delete sync_log_trimmer;
+ sync_log_trimmer = nullptr;
}
if (finisher) {
finisher->stop();
meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados);
ret = meta_sync_processor_thread->init();
if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
+ ldout(cct, 0) << "ERROR: failed to initialize meta sync thread" << dendl;
return ret;
}
meta_sync_processor_thread->start();
RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first);
ret = thread->init();
if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
+ ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
return ret;
}
thread->start();
data_sync_processor_threads[iter->first] = thread;
}
+ auto interval = cct->_conf->rgw_sync_log_trim_interval;
+ if (interval > 0) {
+ sync_log_trimmer = new RGWSyncLogTrimThread(this, interval);
+ ret = sync_log_trimmer->init();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl;
+ return ret;
+ }
+ sync_log_trimmer->start();
+ }
}
data_notifier = new RGWDataNotifier(this);
data_notifier->start();
class RGWObjectExpirer;
class RGWMetaSyncProcessorThread;
class RGWDataSyncProcessorThread;
+class RGWSyncLogTrimThread;
class RGWRESTConn;
/* flags for put_obj_meta() */
RGWMetaSyncProcessorThread *meta_sync_processor_thread;
map<string, RGWDataSyncProcessorThread *> data_sync_processor_threads;
+ RGWSyncLogTrimThread *sync_log_trimmer{nullptr};
+
Mutex meta_sync_thread_lock;
Mutex data_sync_thread_lock;