}
RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
- : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
+ : cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
req_wq(this, g_conf()->rgw_op_thread_timeout,
g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
deque<RGWAsyncRadosRequest *> m_req_queue;
std::atomic<bool> going_down = { false };
protected:
- RGWRados *store;
+ CephContext *cct;
ThreadPool m_tp;
Throttle req_throttle;
} req_wq;
public:
- RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
+ RGWAsyncRadosProcessor(CephContext *_cct, int num_threads);
~RGWAsyncRadosProcessor() {}
void start();
void stop();
sync_log_trimmer->stop();
}
}
- if (async_rados) {
- async_rados->stop();
- }
if (run_sync_thread) {
delete meta_sync_processor_thread;
meta_sync_processor_thread = NULL;
}
delete data_log;
delete sync_tracer;
- if (async_rados) {
- delete async_rados;
- }
delete lc;
lc = NULL;
ctl.meta.mgr->init_oldest_log_period();
}
- async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
- async_rados->start();
-
ret = ctl.meta.mgr->init(current_period.get_id());
if (ret < 0) {
lderr(cct) << "ERROR: failed to initialize metadata log: "
<< pt.second.name << " present in zonegroup" << dendl;
}
}
+ auto async_processor = svc.rados->get_async_processor();
Mutex::Locker l(meta_sync_thread_lock);
meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados);
ret = meta_sync_processor_thread->init();
bool run_sync_thread;
bool run_reshard_thread;
- RGWAsyncRadosProcessor* async_rados;
-
RGWMetaNotifier *meta_notifier;
RGWDataNotifier *data_notifier;
RGWMetaSyncProcessorThread *meta_sync_processor_thread;
#include "common/errno.h"
#include "osd/osd_types.h"
#include "rgw/rgw_tools.h"
+#include "rgw/rgw_cr_rados.h"
#define dout_subsys ceph_subsys_rgw
+RGWSI_RADOS::~RGWSI_RADOS()
+{
+}
+
int RGWSI_RADOS::do_start()
{
int ret = rados.init_with_context(cct);
if (ret < 0) {
return ret;
}
+
+ async_processor.reset(new RGWAsyncRadosProcessor(cct, cct->_conf->rgw_num_async_rados_threads));
+ async_processor->start();
+
return 0;
}
+void RGWSI_RADOS::shutdown()
+{
+ if (async_processor) {
+ async_processor->stop();
+ }
+}
+
librados::Rados* RGWSI_RADOS::get_rados_handle()
{
return &rados;
#include "common/optional_ref_default.h"
+class RGWAsyncRadosProcessor;
+
class RGWAccessListFilter {
public:
virtual ~RGWAccessListFilter() {}
class RGWSI_RADOS : public RGWServiceInstance
{
librados::Rados rados;
+ std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
int do_start() override;
+ void shutdown() override;
public:
struct OpenParams {
public:
RGWSI_RADOS(CephContext *cct) : RGWServiceInstance(cct) {}
+ ~RGWSI_RADOS();
void init() {}
uint64_t instance_id();
+ RGWAsyncRadosProcessor *get_async_processor() {
+ return async_processor.get();
+ }
+
class Handle;
class Pool {