services/svc_bilog_rados.cc
services/svc_bucket.cc
services/svc_bucket_sobj.cc
+ services/svc_bucket_sync_sobj.cc
services/svc_cls.cc
services/svc_datalog_rados.cc
services/svc_mdlog.cc
#include "services/svc_bi_rados.h"
#include "services/svc_bilog_rados.h"
#include "services/svc_bucket_sobj.h"
+#include "services/svc_bucket_sync_sobj.h"
#include "services/svc_cls.h"
#include "services/svc_datalog_rados.h"
#include "services/svc_mdlog.h"
{
finisher = std::make_unique<RGWSI_Finisher>(cct);
bucket_sobj = std::make_unique<RGWSI_Bucket_SObj>(cct);
+ bucket_sync_sobj = std::make_unique<RGWSI_Bucket_Sync_SObj>(cct);
bi_rados = std::make_unique<RGWSI_BucketIndex_RADOS>(cct);
bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
cls = std::make_unique<RGWSI_Cls>(cct);
bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
bi_rados.get(), meta.get(), meta_be_sobj.get(),
sync_modules.get());
+ bucket_sync_sobj->init(zone.get(), sysobj_cache.get(),
+ bucket_sobj.get());
cls->init(zone.get(), rados.get());
datalog_rados->init(zone.get(), cls.get());
mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get());
return r;
}
+ r = bucket_sync_sobj->start();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start bucket_sync service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
r = user_rados->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start user_rados service (" << cpp_strerror(-r) << dendl;
bilog_rados = _svc.bilog_rados.get();
bucket_sobj = _svc.bucket_sobj.get();
bucket = bucket_sobj;
+ bucket_sync_sobj = _svc.bucket_sync_sobj.get();
+ bucket_sync = bucket_sync_sobj;
cls = _svc.cls.get();
datalog_rados = _svc.datalog_rados.get();
mdlog = _svc.mdlog.get();
class RGWSI_Finisher;
class RGWSI_Bucket;
class RGWSI_Bucket_SObj;
+class RGWSI_Bucket_Sync;
+class RGWSI_Bucket_Sync_SObj;
class RGWSI_BucketIndex;
class RGWSI_BucketIndex_RADOS;
class RGWSI_BILog_RADOS;
std::unique_ptr<RGWSI_Finisher> finisher;
std::unique_ptr<RGWSI_Bucket_SObj> bucket_sobj;
+ std::unique_ptr<RGWSI_Bucket_Sync_SObj> bucket_sync_sobj;
std::unique_ptr<RGWSI_BucketIndex_RADOS> bi_rados;
std::unique_ptr<RGWSI_BILog_RADOS> bilog_rados;
std::unique_ptr<RGWSI_Cls> cls;
RGWSI_Finisher *finisher{nullptr};
RGWSI_Bucket *bucket{nullptr};
RGWSI_Bucket_SObj *bucket_sobj{nullptr};
+ RGWSI_Bucket_Sync *bucket_sync{nullptr};
+ RGWSI_Bucket_Sync_SObj *bucket_sync_sobj{nullptr};
RGWSI_BucketIndex *bi{nullptr};
RGWSI_BucketIndex_RADOS *bi_rados{nullptr};
RGWSI_BILog_RADOS *bilog_rados{nullptr};
--- /dev/null
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+
+#include "svc_bucket_types.h"
+
+class RGWBucketSyncPolicyHandler;
+
+
+class RGWSI_Bucket_Sync : public RGWServiceInstance
+{
+public:
+ RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {}
+
+ virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+ const rgw_bucket& bucket,
+ std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+ optional_yield y) = 0;
+};
+
+
--- /dev/null
+#include "svc_bucket_sync_sobj.h"
+#include "svc_zone.h"
+#include "svc_sys_obj_cache.h"
+#include "svc_bucket_sobj.h"
+
+#include "rgw/rgw_bucket_sync.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+
+RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
+}
+
+void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj_Cache *_cache_svc,
+ RGWSI_Bucket_SObj *bucket_sobj_svc)
+{
+ svc.zone = _zone_svc;
+ svc.cache = _cache_svc;
+ svc.bucket_sobj = bucket_sobj_svc;
+}
+
+int RGWSI_Bucket_Sync_SObj::do_start()
+{
+ sync_policy_cache.reset(new RGWChainedCacheImpl<bucket_sync_policy_cache_entry>);
+ sync_policy_cache->init(svc.cache);
+
+ return 0;
+}
+
+int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+ const rgw_bucket& bucket,
+ std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+ optional_yield y)
+{
+ string key = RGWSI_Bucket::get_bi_meta_key(bucket);
+ string cache_key("bi/");
+ cache_key.append(key);
+
+ if (auto e = sync_policy_cache->find(cache_key)) {
+ *handler = e->handler;
+ return 0;
+ }
+
+
+ rgw_cache_entry_info cache_info;
+
+ RGWBucketInfo bucket_info;
+
+ int r = svc.bucket_sobj->read_bucket_instance_info(ctx,
+ key,
+ &bucket_info,
+ nullptr,
+ nullptr,
+ y,
+ &cache_info);
+ if (r < 0) {
+ if (r != -ENOENT) {
+ ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << key << ") returned r=" << r << dendl;
+ }
+ return r;
+ }
+
+ bucket_sync_policy_cache_entry e;
+ e.handler.reset(new RGWBucketSyncPolicyHandler(svc.zone, bucket_info));
+
+ r = e.handler->init();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: RGWBucketSyncPolicyHandler::init() returned r=" << r << dendl;
+ return r;
+ }
+
+ if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
+ ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
+ }
+
+ return 0;
+}
+
--- /dev/null
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+
+#include "svc_meta_be.h"
+#include "svc_bucket_types.h"
+#include "svc_bucket_sync.h"
+
+class RGWSI_Zone;
+class RGWSI_SysObj_Cache;
+class RGWSI_Bucket_SObj;
+
+template <class T>
+class RGWChainedCacheImpl;
+
+class RGWBucketSyncPolicyHandler;
+
+class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
+{
+ struct bucket_sync_policy_cache_entry {
+ std::shared_ptr<RGWBucketSyncPolicyHandler> handler;
+ };
+
+ using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl<bucket_sync_policy_cache_entry>;
+ unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
+
+ int do_start() override;
+
+public:
+ struct Svc {
+ RGWSI_Zone *zone{nullptr};
+ RGWSI_SysObj_Cache *cache{nullptr};
+ RGWSI_Bucket_SObj *bucket_sobj{nullptr};
+ } svc;
+
+ RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {}
+ ~RGWSI_Bucket_Sync_SObj();
+
+ void init(RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj_Cache *_cache_svc,
+ RGWSI_Bucket_SObj *_bucket_sobj_svc);
+
+
+ int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+ const rgw_bucket& bucket,
+ std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+ optional_yield y) override;
+};
+