services/svc_finisher.cc
services/svc_notify.cc
services/svc_quota.cc
+ services/svc_sync_modules.cc
services/svc_rados.cc
services/svc_sys_obj.cc
services/svc_sys_obj_cache.cc
(is_read_only_set ? &read_only : NULL),
endpoints, ptier_type,
psync_from_all, sync_from, sync_from_rm,
- predirect_zone);
+ predirect_zone,
+ store->svc.sync_modules->get_manager());
if (ret < 0) {
cerr << "failed to add zone " << zone_name << " to zonegroup " << zonegroup.get_name() << ": "
<< cpp_strerror(-ret) << std::endl;
ptier_type,
psync_from_all,
sync_from, sync_from_rm,
- predirect_zone);
+ predirect_zone,
+ store->svc.sync_modules->get_manager());
if (ret < 0) {
cerr << "failed to add zone " << zone_name << " to zonegroup " << zonegroup.get_name()
<< ": " << cpp_strerror(-ret) << std::endl;
(is_read_only_set ? &read_only : NULL),
endpoints, ptier_type,
psync_from_all, sync_from, sync_from_rm,
- predirect_zone);
+ predirect_zone,
+ store->svc.sync_modules->get_manager());
if (ret < 0) {
cerr << "failed to update zonegroup: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
RGWSyncModuleInstanceRef sync_module;
- int ret = store->get_sync_modules_manager()->create_instance(g_ceph_context, store->get_zone().tier_type,
+ int ret = store->svc.sync_modules->get_manager()->create_instance(g_ceph_context, store->get_zone().tier_type,
store->get_zone_params().tier_config, &sync_module);
if (ret < 0) {
lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
#include "cls/lock/cls_lock_client.h"
#include "services/svc_zone.h"
+#include "services/svc_sync_modules.h"
#include "include/random.h"
return -EIO;
}
- if (!store->get_sync_modules_manager()->supports_data_export(zone_def->tier_type)) {
+ if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
return -ENOTSUP;
}
#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
#include "services/svc_quota.h"
+#include "services/svc_sync_modules.h"
#include "services/svc_sys_obj.h"
#include "services/svc_sys_obj_cache.h"
delete meta_mgr;
delete binfo_cache;
delete obj_tombstone_cache;
- delete sync_modules_manager;
if (reshard_wait.get()) {
reshard_wait->stop();
}
}
- sync_modules_manager = new RGWSyncModulesManager();
-
- rgw_register_sync_modules(sync_modules_manager);
-
auto crs = std::unique_ptr<RGWCoroutinesManagerRegistry>{
new RGWCoroutinesManagerRegistry(cct)};
ret = crs->hook_to_admin_command("cr dump");
if (run_sync_thread) {
auto& zone_public_config = svc.zone->get_zone();
- ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module);
+ ret = svc.sync_modules->get_manager()->create_instance(cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module);
if (ret < 0) {
lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
if (ret == -ENOENT) {
lderr(cct) << "ERROR: " << zone_public_config.tier_type
<< " sync module does not exist. valid sync modules: "
- << sync_modules_manager->get_registered_module_names()
+ << svc.sync_modules->get_manager()->get_registered_module_names()
<< dendl;
}
return ret;
}
svc.quota = _svc.quota.get();
+ JSONFormattable sync_modules_svc_conf;
+ ret = svc_registry->get_instance("sync_modules", sync_modules_svc_conf, &_svc.sync_modules);
+ if (ret < 0) {
+ return ret;
+ }
+ svc.sync_modules = _svc.sync_modules.get();
+
if (use_cache) {
JSONFormattable cache_svc_conf;
ret = svc_registry->get_instance("sys_obj_cache", cache_svc_conf, &_svc.cache);
class RGWSI_Zone;
class RGWSI_ZoneUtils;
class RGWSI_Quota;
+class RGWSI_SyncModules;
class RGWSI_SysObj;
class RGWSI_SysObj_Cache;
RGWCoroutinesManagerRegistry *cr_registry;
- RGWSyncModulesManager *sync_modules_manager{nullptr};
RGWSyncModuleInstanceRef sync_module;
bool writeable_zone{false};
std::shared_ptr<RGWSI_Zone> zone;
std::shared_ptr<RGWSI_ZoneUtils> zone_utils;
std::shared_ptr<RGWSI_Quota> quota;
+ std::shared_ptr<RGWSI_SyncModules> sync_modules;
std::shared_ptr<RGWSI_SysObj> sysobj;
std::shared_ptr<RGWSI_SysObj_Cache> cache;
} _svc;
RGWSI_Zone *zone{nullptr};
RGWSI_ZoneUtils *zone_utils{nullptr};
RGWSI_Quota *quota{nullptr};
+ RGWSI_SyncModules *sync_modules{nullptr};
RGWSI_SysObj *sysobj{nullptr};
RGWSI_SysObj_Cache *cache{nullptr};
} svc;
tombstone_cache_t *get_tombstone_cache() {
return obj_tombstone_cache;
}
-
- RGWSyncModulesManager *get_sync_modules_manager() {
- return sync_modules_manager;
- }
const RGWSyncModuleInstanceRef& get_sync_module() {
return sync_module;
}
// if period id is empty, handle as 'period commit'
if (period.get_id().empty()) {
- http_ret = period.commit(realm, current_period, error_stream);
+ http_ret = period.commit(store, realm, current_period, error_stream);
if (http_ret < 0) {
lderr(cct) << "master zone failed to commit period" << dendl;
}
#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
#include "services/svc_quota.h"
+#include "services/svc_sync_modules.h"
#include "services/svc_sys_obj.h"
#include "services/svc_sys_obj_cache.h"
#include "services/svc_sys_obj_core.h"
services["zone"] = make_shared<RGWS_Zone>(cct);
services["zone_utils"] = make_shared<RGWS_ZoneUtils>(cct);
services["quota"] = make_shared<RGWS_Quota>(cct);
+ services["sync_modules"] = make_shared<RGWS_SyncModules>(cct);
services["sys_obj"] = make_shared<RGWS_SysObj>(cct);
services["sys_obj_cache"] = make_shared<RGWS_SysObj_Cache>(cct);
services["sys_obj_core"] = make_shared<RGWS_SysObj_Core>(cct);
#include "rgw_zone.h"
#include "rgw_realm_watcher.h"
#include "rgw_meta_sync_status.h"
+#include "rgw_sync.h"
#include "services/svc_zone.h"
#include "services/svc_sys_obj.h"
int RGWZoneGroup::add_zone(const RGWZoneParams& zone_params, bool *is_master, bool *read_only,
const list<string>& endpoints, const string *ptier_type,
bool *psync_from_all, list<string>& sync_from, list<string>& sync_from_rm,
- string *predirect_zone)
+ string *predirect_zone, RGWSyncModulesManager *sync_mgr)
{
auto& zone_id = zone_params.get_id();
auto& zone_name = zone_params.get_name();
}
if (ptier_type) {
zone.tier_type = *ptier_type;
-#warning FIXME
-#if 0
- if (!store->get_sync_modules_manager()->get_module(*ptier_type, nullptr)) {
+ if (!sync_mgr->get_module(*ptier_type, nullptr)) {
ldout(cct, 0) << "ERROR: could not found sync module: " << *ptier_type
<< ", valid sync modules: "
- << store->get_sync_modules_manager()->get_registered_module_names()
+ << sync_mgr->get_registered_module_names()
<< dendl;
return -ENOENT;
}
-#endif
}
if (psync_from_all) {
int RGWRealm::notify_zone(bufferlist& bl)
{
- // open a context on the realm's pool
rgw_pool pool{get_pool(cct)};
- librados::IoCtx ctx;
- int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx);
- if (r < 0) {
- ldout(cct, 0) << "Failed to open pool " << pool << dendl;
- return r;
- }
- // send a notify on the realm object
- r = ctx.notify2(get_control_oid(), bl, 0, nullptr);
- if (r < 0) {
- ldout(cct, 0) << "Realm notify failed with " << r << dendl;
- return r;
+ auto obj_ctx = sysobj_svc->init_obj_ctx();
+ auto sysobj = sysobj_svc->get_obj(obj_ctx, rgw_raw_obj{pool, get_control_oid()});
+ int ret = sysobj.wn().notify(bl, 0, nullptr);
+ if (ret < 0) {
+ return ret;
}
return 0;
}
return r;
}
-int RGWPeriod::update_sync_status(const RGWPeriod ¤t_period,
+int RGWPeriod::update_sync_status(RGWRados *store, /* for now */
+ const RGWPeriod ¤t_period,
std::ostream& error_stream,
bool force_if_stale)
{
return 0;
}
-int RGWPeriod::commit(RGWRealm& realm, const RGWPeriod& current_period,
+int RGWPeriod::commit(RGWRados *store,
+ RGWRealm& realm, const RGWPeriod& current_period,
std::ostream& error_stream, bool force_if_stale)
{
auto zone_svc = sysobj_svc->get_zone_svc();
// did the master zone change?
if (master_zone != current_period.get_master_zone()) {
// store the current metadata sync status in the period
- int r = update_sync_status(current_period, error_stream, force_if_stale);
+ int r = update_sync_status(store, current_period, error_stream, force_if_stale);
if (r < 0) {
ldout(cct, 0) << "failed to update metadata sync status: "
<< cpp_strerror(-r) << dendl;
}
set<rgw_pool> pools;
- r = get_zones_pool_set(cct, zone_svc, zones, id, pools);
+ r = get_zones_pool_set(cct, sysobj_svc, zones, id, pools);
if (r < 0) {
ldout(cct, 0) << "Error: get_zones_pool_names" << r << dendl;
return r;
}
class JSONObj;
+class RGWSyncModulesManager;
struct RGWNameToId {
std::string obj_id;
int add_zone(const RGWZoneParams& zone_params, bool *is_master, bool *read_only,
const list<std::string>& endpoints, const std::string *ptier_type,
bool *psync_from_all, list<std::string>& sync_from, list<std::string>& sync_from_rm,
- std::string *predirect_zone);
+ std::string *predirect_zone, RGWSyncModulesManager *sync_mgr);
int remove_zone(const std::string& zone_id);
int rename_zone(const RGWZoneParams& zone_params);
rgw_pool get_pool(CephContext *cct) override;
const std::string get_period_oid_prefix();
// gather the metadata sync status for each shard; only for use on master zone
- int update_sync_status(const RGWPeriod ¤t_period,
+ int update_sync_status(RGWRados *store,
+ const RGWPeriod ¤t_period,
std::ostream& error_stream, bool force_if_stale);
public:
int update();
// commit a staging period; only for use on master zone
- int commit(RGWRealm& realm, const RGWPeriod ¤t_period,
+ int commit(RGWRados *store,
+ RGWRealm& realm, const RGWPeriod ¤t_period,
std::ostream& error_stream, bool force_if_stale = false);
void encode(bufferlist& bl) const {
#include "common/Finisher.h"
#include "svc_finisher.h"
-#include "svc_zone.h"
-
-#include "rgw/rgw_zone.h"
int RGWS_Finisher::create_instance(const string& conf, RGWServiceInstanceRef *instance)
{
void RGWSI_Finisher::shutdown()
{
+ if (finalized) {
+ return;
+ }
+
if (finisher) {
finisher->stop();
}
delete finisher;
}
+
+ finalized = true;
+}
+
+RGWSI_Finisher::~RGWSI_Finisher()
+{
+ shutdown();
}
void RGWSI_Finisher::register_caller(ShutdownCB *cb, int *phandle)
private:
Finisher *finisher{nullptr};
+ bool finalized{false};
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
- int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
+ int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override {
+ return 0;
+ }
int init() override;
void shutdown() override;
void RGWSI_Notify::finalize_watch()
{
- if (finalized) {
- return;
- }
-
for (int i = 0; i < num_watchers; i++) {
RGWWatcher *watcher = watchers[i];
watcher->unregister_watch();
void RGWSI_Notify::shutdown()
{
+ if (finalized) {
+ return;
+ }
+
finisher_svc->unregister_caller(finisher_handle);
finalize_watch();
+
+ delete shutdown_cb;
+
+ finalized = true;
+}
+
+RGWSI_Notify::~RGWSI_Notify()
+{
+ shutdown();
}
int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
--- /dev/null
+#include "svc_sync_modules.h"
+
+#include "rgw/rgw_sync_module.h"
+
+int RGWS_SyncModules::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+ instance->reset(new RGWSI_SyncModules(this, cct));
+ return 0;
+}
+
+std::map<string, RGWServiceInstance::dependency> RGWSI_SyncModules::get_deps()
+{
+ return std::map<string, RGWServiceInstance::dependency>();
+}
+
+int RGWSI_SyncModules::load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs)
+{
+ return 0;
+}
+
+int RGWSI_SyncModules::init()
+{
+ sync_modules_manager = new RGWSyncModulesManager();
+ rgw_register_sync_modules(sync_modules_manager);
+ return 0;
+}
+
+RGWSI_SyncModules::~RGWSI_SyncModules()
+{
+ delete sync_modules_manager;
+}
+
--- /dev/null
+#ifndef CEPH_RGW_SERVICES_SYNC_MODULES_H
+#define CEPH_RGW_SERVICES_SYNC_MODULES_H
+
+
+#include "rgw/rgw_service.h"
+
+
+class RGWSyncModulesManager;
+
+class RGWS_SyncModules : public RGWService
+{
+public:
+ RGWS_SyncModules(CephContext *cct) : RGWService(cct, "sync_modules") {}
+
+ int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
+};
+
+class RGWSI_SyncModules : public RGWServiceInstance
+{
+ RGWSyncModulesManager *sync_modules_manager;
+
+ std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
+ int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
+ int init() override;
+
+public:
+ RGWSI_SyncModules(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
+ ~RGWSI_SyncModules();
+
+ RGWSyncModulesManager *get_manager() {
+ return sync_modules_manager;
+ }
+};
+
+#endif
+
{
bool is_truncated;
- auto rados_svc = source.get_rados_svc();
- auto rados_pool = rados_svc->pool(source.pool);
+ auto rados_pool = source.rados_svc->pool(source.pool);
auto op = rados_pool.op();
return svc->omap_del(obj, key);
}
+int RGWSI_SysObj::Obj::WNOp::notify(bufferlist& bl,
+ uint64_t timeout_ms,
+ bufferlist *pbl)
+{
+ RGWSI_SysObj_Core *svc = source.core_svc;
+ rgw_raw_obj& obj = source.obj;
+
+ return svc->notify(obj, bl, timeout_ms, pbl);
+}
+
RGWSI_Zone *RGWSI_SysObj::get_zone_svc()
{
return core_svc->get_zone_svc();
RGWSysObjectCtx& ctx;
rgw_raw_obj obj;
- RGWSI_RADOS *get_rados_svc();
-
public:
Obj(RGWSI_SysObj_Core *_core_svc,
RGWSysObjectCtx& _ctx,
int set(const map<std::string, bufferlist>& m);
int del(const std::string& key);
};
+
+ struct WNOp {
+ Obj& source;
+
+ WNOp(Obj& _source) : source(_source) {}
+
+ int notify(bufferlist& bl,
+ uint64_t timeout_ms,
+ bufferlist *pbl);
+ };
ROp rop() {
return ROp(*this);
}
OmapOp omap() {
return OmapOp(*this);
}
+
+ WNOp wn() {
+ return WNOp(*this);
+ }
};
class Pool {
friend class Op;
+ RGWSI_RADOS *rados_svc;
RGWSI_SysObj_Core *core_svc;
rgw_pool pool;
- RGWSI_RADOS *get_rados_svc();
-
public:
- Pool(RGWSI_SysObj_Core *_core_svc,
- const rgw_pool& _pool) : core_svc(_core_svc),
+ Pool(RGWSI_RADOS *_rados_svc,
+ RGWSI_SysObj_Core *_core_svc,
+ const rgw_pool& _pool) : rados_svc(_rados_svc),
+ core_svc(_core_svc),
pool(_pool) {}
rgw_pool& get_pool() {
Obj get_obj(RGWSysObjectCtx& obj_ctx, const rgw_raw_obj& obj);
Pool get_pool(const rgw_pool& pool) {
- return Pool(core_svc.get(), pool);
+ return Pool(rados_svc.get(), core_svc.get(), pool);
}
RGWSI_Zone *get_zone_svc();
return r;
}
+int RGWSI_SysObj_Cache::get_attr(rgw_raw_obj& obj,
+ const char *attr_name,
+ bufferlist *dest)
+{
+ rgw_pool pool;
+ string oid;
+
+ normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
+ string name = normal_name(pool, oid);
+
+ ObjectCacheInfo info;
+
+ uint32_t flags = CACHE_FLAG_XATTRS;
+
+ if (cache.get(name, info, flags, nullptr) == 0) {
+ if (info.status < 0)
+ return info.status;
+
+ auto iter = info.xattrs.find(attr_name);
+ if (iter == info.xattrs.end()) {
+ return -ENODATA;
+ }
+
+ *dest = iter->second;
+ return dest->length();
+ }
+ /* don't try to cache this one */
+ return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest);
+}
+
int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
map<string, bufferlist> *rmattrs,
return r;
}
+int RGWSI_SysObj_Core::notify(rgw_raw_obj& obj,
+ bufferlist& bl,
+ uint64_t timeout_ms,
+ bufferlist *pbl)
+{
+ RGWSI_RADOS::Obj rados_obj;
+ int r = get_rados_obj(zone_svc.get(), obj, &rados_obj);
+ if (r < 0) {
+ ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl;
+ return r;
+ }
+
+ r = rados_obj.notify(bl, timeout_ms, pbl);
+ return r;
+}
+
int RGWSI_SysObj_Core::remove(RGWSysObjectCtxBase& obj_ctx,
RGWObjVersionTracker *objv_tracker,
rgw_raw_obj& obj)
virtual int omap_set(rgw_raw_obj& obj, const map<std::string, bufferlist>& m, bool must_exist = false);
virtual int omap_del(rgw_raw_obj& obj, const std::string& key);
+ virtual int notify(rgw_raw_obj& obj,
+ bufferlist& bl,
+ uint64_t timeout_ms,
+ bufferlist *pbl);
+
/* wrappers */
int get_system_obj_state_impl(RGWSysObjectCtxBase *rctx, rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker);
int get_system_obj_state(RGWSysObjectCtxBase *rctx, rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker);
#include "svc_zone.h"
#include "svc_rados.h"
#include "svc_sys_obj.h"
+#include "svc_sync_modules.h"
#include "rgw/rgw_zone.h"
#include "rgw/rgw_rest_conn.h"
std::map<string, RGWServiceInstance::dependency> RGWSI_Zone::get_deps()
{
- RGWServiceInstance::dependency dep1 = { .name = "sys_obj",
- .conf = "{}" };
map<string, RGWServiceInstance::dependency> deps;
- deps["sys_obj_dep"] = dep1;
-
- RGWServiceInstance::dependency dep2 = { .name = "rados_obj",
- .conf = "{}" };
- map<string, RGWServiceInstance::dependency> deps2;
- deps["rados_dep"] = dep2;
+ deps["sys_obj_dep"] = { .name = "sys_obj",
+ .conf = "{}" };
+ deps["rados_dep"] = { .name = "rados_obj",
+ .conf = "{}" };
+ deps["sync_modules_dep"] = { .name = "sync_modules",
+ .conf = "{}" };
return deps;
}
rados_svc = static_pointer_cast<RGWSI_RADOS>(dep_refs["rados_dep"]);
assert(rados_svc);
+ sync_modules_svc = static_pointer_cast<RGWSI_SyncModules>(dep_refs["sync_modules_dep"]);
+ assert(sync_modules_svc);
+
return 0;
}
+bool RGWSI_Zone::zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone)
+{
+ return target_zone.syncs_from(source_zone.name) &&
+ sync_modules_svc->get_manager()->supports_data_export(source_zone.tier_type);
+}
+
int RGWSI_Zone::init()
{
int ret = realm->init(cct, sysobj_svc.get());
class RGWSI_RADOS;
class RGWSI_SysObj;
+class RGWSI_SyncModules;
class RGWRESTConn;
{
std::shared_ptr<RGWSI_SysObj> sysobj_svc;
std::shared_ptr<RGWSI_RADOS> rados_svc;
+ std::shared_ptr<RGWSI_SyncModules> sync_modules_svc;
std::shared_ptr<RGWRealm> realm;
std::shared_ptr<RGWZoneGroup> zonegroup;