namespace dmc = rgw::dmclock;
class AsioFrontend {
- RGWProcessEnv env;
+ RGWProcessEnv& env;
RGWFrontendConfig* conf;
boost::asio::io_context context;
std::string uri_prefix;
void accept(Listener& listener, boost::system::error_code ec);
public:
- AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+ AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
dmc::SchedulerCtx& sched_ctx)
: env(env), conf(conf), pause_mutex(context.get_executor()),
lua_manager(env.driver->get_lua_manager())
class RGWAsioFrontend::Impl : public AsioFrontend {
public:
- Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+ Impl(RGWProcessEnv& env, RGWFrontendConfig* conf,
rgw::dmclock::SchedulerCtx& sched_ctx)
: AsioFrontend(env, conf, sched_ctx) {}
};
-RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
+RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv& env,
RGWFrontendConfig* conf,
rgw::dmclock::SchedulerCtx& sched_ctx)
: impl(new Impl(env, conf, sched_ctx))
class Impl;
std::unique_ptr<Impl> impl;
public:
- RGWAsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+ RGWAsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
rgw::dmclock::SchedulerCtx& sched_ctx);
~RGWAsioFrontend() override;
protected:
RGWFrontendConfig* conf;
RGWProcess* pprocess;
- RGWProcessEnv env;
+ RGWProcessEnv& env;
RGWProcessControlThread* thread;
public:
conf->get_val("prefix", "", &uri_prefix);
RGWLoadGenProcess *pp = new RGWLoadGenProcess(
- g_ceph_context, &env, num_threads, std::move(uri_prefix), conf);
+ g_ceph_context, env, num_threads, std::move(uri_prefix), conf);
pprocess = pp;
s->cio = io;
/* XXX and -then- stash req_state pointers everywhere they are needed */
- ret = req->init(rgw_env, driver, io, s);
+ ret = req->init(rgw_env, env.driver, io, s);
if (ret < 0) {
ldpp_dout(op, 10) << "failed to initialize request" << dendl;
abort_req(s, op, ret);
<< e.what() << dendl;
}
if (should_log) {
- rgw_log_op(nullptr /* !rest */, s, op, olog);
+ rgw_log_op(nullptr /* !rest */, s, op, env.olog);
}
int http_ret = s->err.http_ret;
rgw_env.set("HTTP_HOST", "");
- int ret = req->init(rgw_env, driver, &io_ctx, s);
+ int ret = req->init(rgw_env, env.driver, &io_ctx, s);
if (ret < 0) {
ldpp_dout(op, 10) << "failed to initialize request" << dendl;
abort_req(s, op, ret);
int RGWLibFrontend::init()
{
std::string uri_prefix; // empty
- pprocess = new RGWLibProcess(g_ceph_context, &env,
+ pprocess = new RGWLibProcess(g_ceph_context, env,
g_conf()->rgw_thread_pool_size, uri_prefix, conf);
return 0;
}
using unique_lock = std::unique_lock<std::mutex>;
public:
- RGWLibProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+ RGWLibProcess(CephContext* cct, RGWProcessEnv& pe, int num_threads,
std::string uri_prefix, RGWFrontendConfig* _conf) :
RGWProcess(cct, pe, num_threads, std::move(uri_prefix), _conf),
gen(0), shutdown(false) {}
int content_length, std::atomic<bool>* fail_flag)
{
RGWLoadGenRequest* req =
- new RGWLoadGenRequest(driver->get_new_req_id(), method, resource,
+ new RGWLoadGenRequest(env.driver->get_new_req_id(), method, resource,
content_length, fail_flag);
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
{
RGWLoadGenRequest* req = static_cast<RGWLoadGenRequest*>(r);
- RGWLoadGenRequestEnv env;
+ RGWLoadGenRequestEnv renv;
utime_t tm = ceph_clock_now();
- env.port = 80;
- env.content_length = req->content_length;
- env.content_type = "binary/octet-stream";
- env.request_method = req->method;
- env.uri = req->resource;
- env.set_date(tm);
- env.sign(dpp, access_key);
+ renv.port = 80;
+ renv.content_length = req->content_length;
+ renv.content_type = "binary/octet-stream";
+ renv.request_method = req->method;
+ renv.uri = req->resource;
+ renv.set_date(tm);
+ renv.sign(dpp, access_key);
- RGWLoadGenIO real_client_io(&env);
+ RGWLoadGenIO real_client_io(&renv);
RGWRestfulIO client_io(cct, &real_client_io);
ActiveRateLimiter ratelimit(cct);
- int ret = process_request(driver, rest, req, uri_prefix,
- *auth_registry, &client_io, olog,
+ int ret = process_request(env.driver, env.rest, req, uri_prefix,
+ *env.auth_registry, &client_io, env.olog,
null_yield, nullptr, nullptr, nullptr,
ratelimit.get_active(),
nullptr,
std::deque<RGWRequest*> m_req_queue;
protected:
CephContext *cct;
- rgw::sal::Driver* driver;
- rgw_auth_registry_ptr_t auth_registry;
- OpsLogSink* olog;
+ RGWProcessEnv& env;
ThreadPool m_tp;
Throttle req_throttle;
- RGWREST* rest;
RGWFrontendConfig* conf;
int sock_fd;
std::string uri_prefix;
- rgw::lua::Background* lua_background;
std::unique_ptr<rgw::sal::LuaManager> lua_manager;
struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue<RGWRequest> {
public:
RGWProcess(CephContext* const cct,
- RGWProcessEnv* const pe,
+ RGWProcessEnv& env,
const int num_threads,
std::string uri_prefix,
RGWFrontendConfig* const conf)
- : cct(cct),
- driver(pe->driver),
- auth_registry(pe->auth_registry),
- olog(pe->olog),
+ : cct(cct), env(env),
m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
req_throttle(cct, "rgw_ops", num_threads * 2),
- rest(pe->rest),
conf(conf),
sock_fd(-1),
uri_prefix(std::move(uri_prefix)),
- lua_background(pe->lua_background),
- lua_manager(driver->get_lua_manager()),
+ lua_manager(env.driver->get_lua_manager()),
req_wq(this,
ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
void unpause_with_new_config(rgw::sal::Driver* const driver,
rgw_auth_registry_ptr_t auth_registry) {
- this->driver = driver;
- this->auth_registry = std::move(auth_registry);
+ env.driver = driver;
+ env.auth_registry = std::move(auth_registry);
lua_manager = driver->get_lua_manager();
m_tp.unpause();
}
class RGWLoadGenProcess : public RGWProcess {
RGWAccessKey access_key;
public:
- RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+ RGWLoadGenProcess(CephContext* cct, RGWProcessEnv& env, int num_threads,
std::string uri_prefix, RGWFrontendConfig* _conf)
- : RGWProcess(cct, pe, num_threads, std::move(uri_prefix), _conf) {}
+ : RGWProcess(cct, env, num_threads, std::move(uri_prefix), _conf) {}
void run() override;
void checkpoint();
void handle_request(const DoutPrefixProvider *dpp, RGWRequest* req) override;