From: Yehuda Sadeh Date: Fri, 16 Aug 2019 22:34:45 +0000 (-0700) Subject: rgw: svc.bucket_sync: add new svc for dealing with bucket sync policy X-Git-Tag: v15.1.0~22^2~118 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=53b1eac8bce47291403f7ddf780263013530c857;p=ceph.git rgw: svc.bucket_sync: add new svc for dealing with bucket sync policy svc.bucket_sync_sobj keeps a cache of bucket sync policies (per bucket instance), and chains to the metadata cache. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 4ae49bb4846..2addc4b172c 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -25,6 +25,7 @@ set(librgw_common_srcs services/svc_bilog_rados.cc services/svc_bucket.cc services/svc_bucket_sobj.cc + services/svc_bucket_sync_sobj.cc services/svc_cls.cc services/svc_datalog_rados.cc services/svc_mdlog.cc diff --git a/src/rgw/rgw_service.cc b/src/rgw/rgw_service.cc index e6f4f55a14a..a5e9746fa87 100644 --- a/src/rgw/rgw_service.cc +++ b/src/rgw/rgw_service.cc @@ -7,6 +7,7 @@ #include "services/svc_bi_rados.h" #include "services/svc_bilog_rados.h" #include "services/svc_bucket_sobj.h" +#include "services/svc_bucket_sync_sobj.h" #include "services/svc_cls.h" #include "services/svc_datalog_rados.h" #include "services/svc_mdlog.h" @@ -49,6 +50,7 @@ int RGWServices_Def::init(CephContext *cct, { finisher = std::make_unique(cct); bucket_sobj = std::make_unique(cct); + bucket_sync_sobj = std::make_unique(cct); bi_rados = std::make_unique(cct); bilog_rados = std::make_unique(cct); cls = std::make_unique(cct); @@ -80,6 +82,8 @@ int RGWServices_Def::init(CephContext *cct, bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(), bi_rados.get(), meta.get(), meta_be_sobj.get(), sync_modules.get()); + bucket_sync_sobj->init(zone.get(), sysobj_cache.get(), + bucket_sobj.get()); cls->init(zone.get(), rados.get()); datalog_rados->init(zone.get(), cls.get()); mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get()); @@ -208,6 +212,12 @@ int RGWServices_Def::init(CephContext *cct, return r; } + r = bucket_sync_sobj->start(); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to start bucket_sync service (" << cpp_strerror(-r) << dendl; + return r; + } + r = user_rados->start(); if (r < 0) { ldout(cct, 0) << "ERROR: failed to start user_rados service (" << cpp_strerror(-r) << dendl; @@ -269,6 +279,8 @@ int RGWServices::do_init(CephContext *_cct, bool have_cache, bool raw, bool run_ bilog_rados = _svc.bilog_rados.get(); bucket_sobj = _svc.bucket_sobj.get(); bucket = bucket_sobj; + bucket_sync_sobj = _svc.bucket_sync_sobj.get(); + bucket_sync = bucket_sync_sobj; cls = _svc.cls.get(); datalog_rados = _svc.datalog_rados.get(); mdlog = _svc.mdlog.get(); diff --git a/src/rgw/rgw_service.h b/src/rgw/rgw_service.h index a40616ad25a..eeeb23208fc 100644 --- a/src/rgw/rgw_service.h +++ b/src/rgw/rgw_service.h @@ -47,6 +47,8 @@ public: class RGWSI_Finisher; class RGWSI_Bucket; class RGWSI_Bucket_SObj; +class RGWSI_Bucket_Sync; +class RGWSI_Bucket_Sync_SObj; class RGWSI_BucketIndex; class RGWSI_BucketIndex_RADOS; class RGWSI_BILog_RADOS; @@ -77,6 +79,7 @@ struct RGWServices_Def std::unique_ptr finisher; std::unique_ptr bucket_sobj; + std::unique_ptr bucket_sync_sobj; std::unique_ptr bi_rados; std::unique_ptr bilog_rados; std::unique_ptr cls; @@ -114,6 +117,8 @@ struct RGWServices RGWSI_Finisher *finisher{nullptr}; RGWSI_Bucket *bucket{nullptr}; RGWSI_Bucket_SObj *bucket_sobj{nullptr}; + RGWSI_Bucket_Sync *bucket_sync{nullptr}; + RGWSI_Bucket_Sync_SObj *bucket_sync_sobj{nullptr}; RGWSI_BucketIndex *bi{nullptr}; RGWSI_BucketIndex_RADOS *bi_rados{nullptr}; RGWSI_BILog_RADOS *bilog_rados{nullptr}; diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h new file mode 100644 index 00000000000..13c89ecf8bb --- /dev/null +++ b/src/rgw/services/svc_bucket_sync.h @@ -0,0 +1,38 @@ + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include "rgw/rgw_service.h" + +#include "svc_bucket_types.h" + +class RGWBucketSyncPolicyHandler; + + +class RGWSI_Bucket_Sync : public RGWServiceInstance +{ +public: + RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {} + + virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, + const rgw_bucket& bucket, + std::shared_ptr *handler, + optional_yield y) = 0; +}; + + diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc new file mode 100644 index 00000000000..3a8c3709dfc --- /dev/null +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -0,0 +1,79 @@ +#include "svc_bucket_sync_sobj.h" +#include "svc_zone.h" +#include "svc_sys_obj_cache.h" +#include "svc_bucket_sobj.h" + +#include "rgw/rgw_bucket_sync.h" + +#define dout_subsys ceph_subsys_rgw + + +RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() { +} + +void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc, + RGWSI_SysObj_Cache *_cache_svc, + RGWSI_Bucket_SObj *bucket_sobj_svc) +{ + svc.zone = _zone_svc; + svc.cache = _cache_svc; + svc.bucket_sobj = bucket_sobj_svc; +} + +int RGWSI_Bucket_Sync_SObj::do_start() +{ + sync_policy_cache.reset(new RGWChainedCacheImpl); + sync_policy_cache->init(svc.cache); + + return 0; +} + +int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, + const rgw_bucket& bucket, + std::shared_ptr *handler, + optional_yield y) +{ + string key = RGWSI_Bucket::get_bi_meta_key(bucket); + string cache_key("bi/"); + cache_key.append(key); + + if (auto e = sync_policy_cache->find(cache_key)) { + *handler = e->handler; + return 0; + } + + + rgw_cache_entry_info cache_info; + + RGWBucketInfo bucket_info; + + int r = svc.bucket_sobj->read_bucket_instance_info(ctx, + key, + &bucket_info, + nullptr, + nullptr, + y, + &cache_info); + if (r < 0) { + if (r != -ENOENT) { + ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << key << ") returned r=" << r << dendl; + } + return r; + } + + bucket_sync_policy_cache_entry e; + e.handler.reset(new RGWBucketSyncPolicyHandler(svc.zone, bucket_info)); + + r = e.handler->init(); + if (r < 0) { + ldout(cct, 0) << "ERROR: RGWBucketSyncPolicyHandler::init() returned r=" << r << dendl; + return r; + } + + if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) { + ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; + } + + return 0; +} + diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h new file mode 100644 index 00000000000..40745c2219e --- /dev/null +++ b/src/rgw/services/svc_bucket_sync_sobj.h @@ -0,0 +1,66 @@ + +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include "rgw/rgw_service.h" + +#include "svc_meta_be.h" +#include "svc_bucket_types.h" +#include "svc_bucket_sync.h" + +class RGWSI_Zone; +class RGWSI_SysObj_Cache; +class RGWSI_Bucket_SObj; + +template +class RGWChainedCacheImpl; + +class RGWBucketSyncPolicyHandler; + +class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync +{ + struct bucket_sync_policy_cache_entry { + std::shared_ptr handler; + }; + + using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl; + unique_ptr sync_policy_cache; + + int do_start() override; + +public: + struct Svc { + RGWSI_Zone *zone{nullptr}; + RGWSI_SysObj_Cache *cache{nullptr}; + RGWSI_Bucket_SObj *bucket_sobj{nullptr}; + } svc; + + RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {} + ~RGWSI_Bucket_Sync_SObj(); + + void init(RGWSI_Zone *_zone_svc, + RGWSI_SysObj_Cache *_cache_svc, + RGWSI_Bucket_SObj *_bucket_sobj_svc); + + + int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, + const rgw_bucket& bucket, + std::shared_ptr *handler, + optional_yield y) override; +}; +