endfunction()
set(librgw_common_srcs
+ services/svc_finisher.cc
services/svc_notify.cc
services/svc_quota.cc
services/svc_rados.cc
op_ret = -ERR_INVALID_PART;
return;
} else {
- manifest.append(obj_part.manifest, store->svc.zone.get());
+ manifest.append(obj_part.manifest, store->svc.zone);
}
bool part_compressed = (obj_part.cs_info.compression_type != "none");
// try to read the period from rados
period.set_id(period_id);
period.set_epoch(0);
- int r = period.init(store->ctx(), store->svc.sysobj.get());
+ int r = period.init(store->ctx(), store->svc.sysobj);
if (r < 0) {
if (store->svc.zone->is_meta_master()) {
// can't pull if we're the master
#include "common/errno.h"
#include "common/Formatter.h"
#include "common/Throttle.h"
-#include "common/Finisher.h"
#include "rgw_rados.h"
#include "rgw_zone.h"
#include "services/svc_zone_utils.h"
#include "services/svc_quota.h"
#include "services/svc_sys_obj.h"
+#include "services/svc_sys_obj_cache.h"
#include "compressor/Compressor.h"
#define dout_subsys ceph_subsys_rgw
-static string notify_oid_prefix = "notify";
-static string *notify_oids = NULL;
static string shadow_ns = "shadow";
static string dir_oid_prefix = ".dir.";
static string default_bucket_index_pool_suffix = "rgw.buckets.index";
sync_log_trimmer = nullptr;
bucket_trim = boost::none;
}
- if (finisher) {
- finisher->stop();
- }
- if (finisher) {
- /* delete finisher only after cleaning up watches, as watch error path might call
- * into finisher. We stop finisher before finalizing watch to make sure we don't
- * actually handle any racing work
- */
- delete finisher;
- }
if (meta_notifier) {
meta_notifier->stop();
delete meta_notifier;
}
}
- finisher = new Finisher(cct);
- finisher->start();
-
period_puller.reset(new RGWPeriodPuller(this));
period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
svc.zone->get_current_period()));
auto& zone_params = svc.zone->get_zone_params();
auto& zone = svc.zone->get_zone();
-#warning sync service needed
/* no point of running sync thread if we don't have a master zone configured
or there is no rest_master_conn */
if (zonegroup.master_zone.empty() || !svc.zone->get_master_conn()
return ret;
}
+/*
+ * FIXME: in the future formattable will derive from formatter, so formattable
+ * could be constructed directly
+ */
+static bool to_formattable(CephContext *cct, JSONFormatter& f, JSONFormattable *result)
+{
+ stringstream ss;
+ f.flush(ss);
+ string s = ss.str();
+
+ JSONParser jp;
+ if (!jp.parse(s.c_str(), s.size())) {
+ ldout(cct, 0) << "failed to parse formatter string: data=" << s << dendl;
+ return false;
+ }
+
+ result->decode_json(&jp);
+
+ return true;
+}
+
/**
* Initialize the RADOS instance and prepare to do other ops
* Returns 0 on success, -ERR# on failure.
svc_registry = std::make_unique<RGWServiceRegistry>(cct);
JSONFormattable zone_svc_conf;
- ret = svc_registry->get_instance("zone", zone_svc_conf, &svc.zone);
+ ret = svc_registry->get_instance("zone", zone_svc_conf, &_svc.zone);
if (ret < 0) {
return ret;
}
+ svc.zone = _svc.zone.get();
JSONFormattable zone_utils_svc_conf;
- ret = svc_registry->get_instance("zone_utils", zone_utils_svc_conf, &svc.zone_utils);
+ ret = svc_registry->get_instance("zone_utils", zone_utils_svc_conf, &_svc.zone_utils);
if (ret < 0) {
return ret;
}
+ svc.zone_utils = _svc.zone_utils.get();
JSONFormattable quota_svc_conf;
- ret = svc_registry->get_instance("quota", quota_svc_conf, &svc.quota);
+ ret = svc_registry->get_instance("quota", quota_svc_conf, &_svc.quota);
if (ret < 0) {
return ret;
}
+ svc.quota = _svc.quota.get();
+
+ if (use_cache) {
+ JSONFormattable cache_svc_conf;
+ ret = svc_registry->get_instance("sys_obj_cache", cache_svc_conf, &_svc.cache);
+ if (ret < 0) {
+ return ret;
+ }
+ svc.cache = _svc.cache.get();
+ }
JSONFormattable sysobj_svc_conf;
- ret = svc_registry->get_instance("sys_obj", quota_svc_conf, &svc.sysobj);
+
+ JSONFormatter f;
+ encode_json("cache", use_cache, &f);
+ if (!to_formattable(cct, f, &sysobj_svc_conf)) {
+ assert(0);
+ }
+ ret = svc_registry->get_instance("sys_obj", sysobj_svc_conf, &_svc.sysobj);
if (ret < 0) {
return ret;
}
+ svc.sysobj = _svc.sysobj.get();
host_id = svc.zone_utils->gen_host_id();
return init_complete();
}
-void RGWRados::schedule_context(Context *c) {
- finisher->queue(c);
-}
-
int RGWRados::list_raw_prefixed_objs(const rgw_pool& pool, const string& prefix, list<string>& result)
{
bool is_truncated;
RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread,
bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
{
- RGWRados *store = NULL;
- if (!use_cache) {
- store = new RGWRados;
- } else {
- store = new RGWCache<RGWRados>;
- }
+ RGWRados *store = new RGWRados;
+
+ store->set_use_cache(use_cache);
if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
delete store;
if (!svc.cache) {
return false;
}
- return svc.cache->call_inspec(s, f);
+ return svc.cache->call_inspect(s, f);
}
bool RGWRados::call_erase(const std::string& s) {
class RGWSI_ZoneUtils;
class RGWSI_Quota;
class RGWSI_SysObj;
+class RGWSI_SysObj_Cache;
/* flags for put_obj_meta() */
#define PUT_OBJ_CREATE 0x01
explicit RGWObjectCtx(RGWRados *_store, req_state *_s) : store(_store), s(_s), obj(store), raw(store) { }
};
-class Finisher;
class RGWAsyncRadosProcessor;
template <class T>
RGWQuotaHandler *quota_handler;
- Finisher *finisher;
-
RGWCoroutinesManagerRegistry *cr_registry;
RGWSyncModulesManager *sync_modules_manager{nullptr};
RGWServiceRegistryRef svc_registry;
RGWIndexCompletionManager *index_completion_manager{nullptr};
+
+ bool use_cache{false};
public:
- RGWRados() : lock("rados_timer_lock"), timer(NULL),
+ RGWRados(): lock("rados_timer_lock"), timer(NULL),
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
data_notifier(NULL), meta_sync_processor_thread(NULL),
binfo_cache(NULL), obj_tombstone_cache(nullptr),
pools_initialized(false),
quota_handler(NULL),
- finisher(NULL),
cr_registry(NULL),
meta_mgr(NULL), data_log(NULL), reshard(NULL) {}
+ void set_use_cache(bool status) {
+ use_cache = status;
+ }
+
uint64_t get_new_req_id() {
return ++max_req_id;
}
std::shared_ptr<RGWSI_ZoneUtils> zone_utils;
std::shared_ptr<RGWSI_Quota> quota;
std::shared_ptr<RGWSI_SysObj> sysobj;
+ std::shared_ptr<RGWSI_SysObj_Cache> cache;
+ } _svc;
+
+ struct {
+ RGWSI_RADOS *rados{nullptr};
+ RGWSI_Zone *zone{nullptr};
+ RGWSI_ZoneUtils *zone_utils{nullptr};
+ RGWSI_Quota *quota{nullptr};
+ RGWSI_SysObj *sysobj{nullptr};
+ RGWSI_SysObj_Cache *cache{nullptr};
} svc;
/**
return initialize();
}
/** Initialize the RADOS instance and prepare to do other ops */
- virtual int init_rados();
+ int init_rados();
int init_complete();
int initialize();
void finalize();
period.set_id(period_id);
period.set_epoch(epoch);
- http_ret = period.init(store->ctx(), store->svc.sysobj.get(), realm_id, realm_name);
+ http_ret = period.init(store->ctx(), store->svc.sysobj, realm_id, realm_name);
if (http_ret < 0)
ldout(store->ctx(), 5) << "failed to read period" << dendl;
}
auto cct = store->ctx();
// initialize the period without reading from rados
- period.init(cct, store->svc.sysobj.get(), false);
+ period.init(cct, store->svc.sysobj, false);
// decode the period from input
const auto max_size = cct->_conf->rgw_max_put_param_size;
// period that we haven't restarted with yet. we also don't want to modify
// the objects in use by RGWRados
RGWRealm realm(period.get_realm());
- http_ret = realm.init(cct, store->svc.sysobj.get());
+ http_ret = realm.init(cct, store->svc.sysobj);
if (http_ret < 0) {
lderr(cct) << "failed to read current realm: "
<< cpp_strerror(-http_ret) << dendl;
}
RGWPeriod current_period;
- http_ret = current_period.init(cct, store->svc.sysobj.get(), realm.get_id());
+ http_ret = current_period.init(cct, store->svc.sysobj, realm.get_id());
if (http_ret < 0) {
lderr(cct) << "failed to read current period: "
<< cpp_strerror(-http_ret) << dendl;
// read realm
realm.reset(new RGWRealm(id, name));
- http_ret = realm->init(g_ceph_context, store->svc.sysobj.get());
+ http_ret = realm->init(g_ceph_context, store->svc.sysobj);
if (http_ret < 0)
lderr(store->ctx()) << "failed to read realm id=" << id
<< " name=" << name << dendl;
#include "rgw_service.h"
+#include "services/svc_finisher.h"
+#include "services/svc_notify.h"
#include "services/svc_rados.h"
#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
#include "services/svc_quota.h"
#include "services/svc_sys_obj.h"
+#include "services/svc_sys_obj_cache.h"
+#include "services/svc_sys_obj_core.h"
#define dout_subsys ceph_subsys_rgw
void RGWServiceRegistry::register_all(CephContext *cct)
{
+ services["finisher"] = make_shared<RGWS_Finisher>(cct);
+ services["notify"] = make_shared<RGWS_Notify>(cct);
services["rados"] = make_shared<RGWS_RADOS>(cct);
services["zone"] = make_shared<RGWS_Zone>(cct);
services["zone_utils"] = make_shared<RGWS_ZoneUtils>(cct);
services["quota"] = make_shared<RGWS_Quota>(cct);
services["sys_obj"] = make_shared<RGWS_SysObj>(cct);
+ services["sys_obj_cache"] = make_shared<RGWS_SysObj_Cache>(cct);
+ services["sys_obj_core"] = make_shared<RGWS_SysObj_Core>(cct);
}
bool RGWServiceRegistry::find(const string& name, RGWServiceRef *svc)
for (auto& g : zonegroups) {
for (auto& z : g.second.zones) {
std::unique_ptr<RGWRESTConn> conn{
- new RGWRESTConn(store->ctx(), store->svc.zone.get(), z.first, z.second.endpoints)};
+ new RGWRESTConn(store->ctx(), store->svc.zone, z.first, z.second.endpoints)};
connections.emplace(z.first, std::move(conn));
}
}
auto& root_conf = root_profile->conn_conf;
root_profile->conn.reset(new S3RESTConn(sync_env->cct,
- sync_env->store->svc.zone.get(),
+ sync_env->store->svc.zone,
id,
{ root_conf->endpoint },
root_conf->key,
auto& c = i.second;
c->conn.reset(new S3RESTConn(sync_env->cct,
- sync_env->store->svc.zone.get(),
+ sync_env->store->svc.zone,
id,
{ c->conn_conf->endpoint },
c->conn_conf->key,
--- /dev/null
+#include "common/Finisher.h"
+
+#include "svc_finisher.h"
+#include "svc_zone.h"
+
+#include "rgw/rgw_zone.h"
+
+int RGWS_Finisher::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+ instance->reset(new RGWSI_Finisher(this, cct));
+ return 0;
+}
+
+std::map<string, RGWServiceInstance::dependency> RGWSI_Finisher::get_deps()
+{
+ std::map<string, RGWServiceInstance::dependency> dep;
+ return dep;
+}
+
+int RGWSI_Finisher::init()
+{
+ finisher = new Finisher(cct);
+ finisher->start();
+
+ return 0;
+}
+
+void RGWSI_Finisher::shutdown()
+{
+ if (finisher) {
+ finisher->stop();
+
+ map<int, ShutdownCB *> cbs;
+ cbs.swap(shutdown_cbs); /* move cbs out, in case caller unregisetrs */
+ for (auto& iter : cbs) {
+ iter.second->call();
+ }
+ delete finisher;
+ }
+}
+
+void RGWSI_Finisher::register_caller(ShutdownCB *cb, int *phandle)
+{
+ *phandle = ++handles_counter;
+ shutdown_cbs[*phandle] = cb;
+}
+
+void RGWSI_Finisher::unregister_caller(int handle)
+{
+ shutdown_cbs.erase(handle);
+}
+
+void RGWSI_Finisher::schedule_context(Context *c)
+{
+ finisher->queue(c);
+}
+
--- /dev/null
+#ifndef CEPH_RGW_SERVICES_FINISHER_H
+#define CEPH_RGW_SERVICES_FINISHER_H
+
+
+#include "rgw/rgw_service.h"
+
+class Context;
+class Finisher;
+
+class RGWS_Finisher : public RGWService
+{
+public:
+ RGWS_Finisher(CephContext *cct) : RGWService(cct, "finisher") {}
+
+ int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
+};
+
+class RGWSI_Finisher : public RGWServiceInstance
+{
+public:
+ class ShutdownCB;
+
+private:
+ Finisher *finisher{nullptr};
+
+ std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
+ int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
+ int init() override;
+ void shutdown() override;
+
+ std::map<int, ShutdownCB *> shutdown_cbs;
+ std::atomic<int> handles_counter;
+
+public:
+ RGWSI_Finisher(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
+ ~RGWSI_Finisher();
+
+ class ShutdownCB {
+ public:
+ virtual ~ShutdownCB() {}
+ virtual void call() = 0;
+ };
+
+ void register_caller(ShutdownCB *cb, int *phandle);
+ void unregister_caller(int handle);
+
+ void schedule_context(Context *c);
+};
+
+#endif
#include "common/errno.h"
#include "svc_notify.h"
+#include "svc_finisher.h"
#include "svc_zone.h"
#include "svc_rados.h"
static string notify_oid_prefix = "notify";
+int RGWS_Notify::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+ instance->reset(new RGWSI_Notify(this, cct));
+ return 0;
+}
+
class RGWWatcher : public librados::WatchCtx2 {
CephContext *cct;
RGWSI_Notify *svc;
}
};
-int RGWS_Notify::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+
+class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
{
- instance->reset(new RGWSI_Notify(this, cct));
- return 0;
-}
+ RGWSI_Notify *svc;
+public:
+ RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
+ void call() override {
+ svc->shutdown();
+ }
+};
std::map<string, RGWServiceInstance::dependency> RGWSI_Notify::get_deps()
{
.conf = "{}" };
deps["rados_dep"] = { .name = "rados",
.conf = "{}" };
+ deps["finisher_dep"] = { .name = "finisher",
+ .conf = "{}" };
return deps;
}
assert(zone_svc);
rados_svc = static_pointer_cast<RGWSI_RADOS>(dep_refs["rados_dep"]);
assert(rados_svc);
+ finisher_svc = static_pointer_cast<RGWSI_Finisher>(dep_refs["finisher_dep"]);
+ assert(finisher_svc);
return 0;
}
void RGWSI_Notify::finalize_watch()
{
+ if (finalized) {
+ return;
+ }
+
for (int i = 0; i < num_watchers; i++) {
RGWWatcher *watcher = watchers[i];
watcher->unregister_watch();
return ret;
}
+ shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
+ finisher_svc->register_caller(shutdown_cb, &finisher_handle);
+
return 0;
}
void RGWSI_Notify::shutdown()
{
+ finisher_svc->unregister_caller(finisher_handle);
finalize_watch();
}
RWLock::WLocker l(watchers_lock);
cb = _cb;
}
+
+void RGWSI_Notify::schedule_context(Context *c)
+{
+ finisher_svc->schedule_context(c);
+}
class RGWSI_Zone;
+class RGWSI_Finisher;
class RGWWatcher;
class RGWS_Notify : public RGWService
{
public:
- RGWS_Notify(CephContext *cct) : RGWService(cct, "quota") {}
+ RGWS_Notify(CephContext *cct) : RGWService(cct, "notify") {}
int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
};
+class RGWSI_Notify_ShutdownCB;
+
class RGWSI_Notify : public RGWServiceInstance
{
+ friend class RGWWatcher;
+ friend class RGWSI_Notify_ShutdownCB;
+
public:
class CB;
+
private:
std::shared_ptr<RGWSI_Zone> zone_svc;
std::shared_ptr<RGWSI_RADOS> rados_svc;
+ std::shared_ptr<RGWSI_Finisher> finisher_svc;
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
double inject_notify_timeout_probability{0};
unsigned max_notify_retries{0};
- friend class RGWWatcher;
-
string get_control_oid(int i);
RGWSI_RADOS::Obj pick_control_obj(const string& key);
CB *cb{nullptr};
+ int finisher_handle{0};
+ RGWSI_Notify_ShutdownCB *shutdown_cb{nullptr};
+
+ bool finalized{false};
+
int init_watch();
void finalize_watch();
void set_enabled(bool status);
int robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl);
+
+ void schedule_context(Context *c);
public:
RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
+ ~RGWSI_Notify();
class CB {
public:
#define dout_subsys ceph_subsys_rgw
+int RGWS_SysObj_Cache::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+ instance->reset(new RGWSI_SysObj_Cache(this, cct));
+ return 0;
+}
+
class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
{
RGWSI_SysObj_Cache *svc;
class RGWSI_SysObj_Cache_CB;
+class RGWS_SysObj_Cache : public RGWService
+{
+public:
+ RGWS_SysObj_Cache(CephContext *cct) : RGWService(cct, "sysobj_cache") {}
+
+ int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
+};
+
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
{
friend class RGWSI_SysObj_Cache_CB;
#define dout_subsys ceph_subsys_rgw
+int RGWS_SysObj_Core::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+ instance->reset(new RGWSI_SysObj_Core(this, cct));
+ return 0;
+}
+
int RGWSI_SysObj_Core::GetObjState::get_rados_obj(RGWSI_RADOS *rados_svc,
RGWSI_Zone *zone_svc,
rgw_raw_obj& obj,
}
};
+class RGWS_SysObj_Core : public RGWService
+{
+public:
+ RGWS_SysObj_Core(CephContext *cct) : RGWService(cct, "sysobj_core") {}
+
+ int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
+};
+
class RGWSI_SysObj_Core : public RGWServiceInstance
{
friend class RGWSI_SysObj;