]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: svc.bucket_sync: add new svc for dealing with bucket sync policy
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 16 Aug 2019 22:34:45 +0000 (15:34 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
svc.bucket_sync_sobj keeps a cache of bucket sync policies (per bucket
instance), and chains to the metadata cache.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_service.cc
src/rgw/rgw_service.h
src/rgw/services/svc_bucket_sync.h [new file with mode: 0644]
src/rgw/services/svc_bucket_sync_sobj.cc [new file with mode: 0644]
src/rgw/services/svc_bucket_sync_sobj.h [new file with mode: 0644]

index 4ae49bb484662337eaeef36f4a37f724e0495575..2addc4b172c1fe0ebb0c9526ca439e1dc264be76 100644 (file)
@@ -25,6 +25,7 @@ set(librgw_common_srcs
   services/svc_bilog_rados.cc
   services/svc_bucket.cc
   services/svc_bucket_sobj.cc
+  services/svc_bucket_sync_sobj.cc
   services/svc_cls.cc
   services/svc_datalog_rados.cc
   services/svc_mdlog.cc
index e6f4f55a14adfbc63c3cd09fdff9cc1bedcd52bc..a5e9746fa87604889d07ef8c2cd0de5464715846 100644 (file)
@@ -7,6 +7,7 @@
 #include "services/svc_bi_rados.h"
 #include "services/svc_bilog_rados.h"
 #include "services/svc_bucket_sobj.h"
+#include "services/svc_bucket_sync_sobj.h"
 #include "services/svc_cls.h"
 #include "services/svc_datalog_rados.h"
 #include "services/svc_mdlog.h"
@@ -49,6 +50,7 @@ int RGWServices_Def::init(CephContext *cct,
 {
   finisher = std::make_unique<RGWSI_Finisher>(cct);
   bucket_sobj = std::make_unique<RGWSI_Bucket_SObj>(cct);
+  bucket_sync_sobj = std::make_unique<RGWSI_Bucket_Sync_SObj>(cct);
   bi_rados = std::make_unique<RGWSI_BucketIndex_RADOS>(cct);
   bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
   cls = std::make_unique<RGWSI_Cls>(cct);
@@ -80,6 +82,8 @@ int RGWServices_Def::init(CephContext *cct,
   bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
                     bi_rados.get(), meta.get(), meta_be_sobj.get(),
                     sync_modules.get());
+  bucket_sync_sobj->init(zone.get(), sysobj_cache.get(),
+                         bucket_sobj.get());
   cls->init(zone.get(), rados.get());
   datalog_rados->init(zone.get(), cls.get());
   mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get());
@@ -208,6 +212,12 @@ int RGWServices_Def::init(CephContext *cct,
       return r;
     }
 
+    r = bucket_sync_sobj->start();
+    if (r < 0) {
+      ldout(cct, 0) << "ERROR: failed to start bucket_sync service (" << cpp_strerror(-r) << dendl;
+      return r;
+    }
+
     r = user_rados->start();
     if (r < 0) {
       ldout(cct, 0) << "ERROR: failed to start user_rados service (" << cpp_strerror(-r) << dendl;
@@ -269,6 +279,8 @@ int RGWServices::do_init(CephContext *_cct, bool have_cache, bool raw, bool run_
   bilog_rados = _svc.bilog_rados.get();
   bucket_sobj = _svc.bucket_sobj.get();
   bucket = bucket_sobj;
+  bucket_sync_sobj = _svc.bucket_sync_sobj.get();
+  bucket_sync = bucket_sync_sobj;
   cls = _svc.cls.get();
   datalog_rados = _svc.datalog_rados.get();
   mdlog = _svc.mdlog.get();
index a40616ad25a0add10eea47a405eb5c1613b87821..eeeb23208fca8303142231887de0514c04f39b80 100644 (file)
@@ -47,6 +47,8 @@ public:
 class RGWSI_Finisher;
 class RGWSI_Bucket;
 class RGWSI_Bucket_SObj;
+class RGWSI_Bucket_Sync;
+class RGWSI_Bucket_Sync_SObj;
 class RGWSI_BucketIndex;
 class RGWSI_BucketIndex_RADOS;
 class RGWSI_BILog_RADOS;
@@ -77,6 +79,7 @@ struct RGWServices_Def
 
   std::unique_ptr<RGWSI_Finisher> finisher;
   std::unique_ptr<RGWSI_Bucket_SObj> bucket_sobj;
+  std::unique_ptr<RGWSI_Bucket_Sync_SObj> bucket_sync_sobj;
   std::unique_ptr<RGWSI_BucketIndex_RADOS> bi_rados;
   std::unique_ptr<RGWSI_BILog_RADOS> bilog_rados;
   std::unique_ptr<RGWSI_Cls> cls;
@@ -114,6 +117,8 @@ struct RGWServices
   RGWSI_Finisher *finisher{nullptr};
   RGWSI_Bucket *bucket{nullptr};
   RGWSI_Bucket_SObj *bucket_sobj{nullptr};
+  RGWSI_Bucket_Sync *bucket_sync{nullptr};
+  RGWSI_Bucket_Sync_SObj *bucket_sync_sobj{nullptr};
   RGWSI_BucketIndex *bi{nullptr};
   RGWSI_BucketIndex_RADOS *bi_rados{nullptr};
   RGWSI_BILog_RADOS *bilog_rados{nullptr};
diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h
new file mode 100644 (file)
index 0000000..13c89ec
--- /dev/null
@@ -0,0 +1,38 @@
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+
+#include "svc_bucket_types.h"
+
+class RGWBucketSyncPolicyHandler;
+
+
+class RGWSI_Bucket_Sync : public RGWServiceInstance
+{
+public:
+  RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {}
+
+  virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+                                 const rgw_bucket& bucket,
+                                 std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+                                 optional_yield y) = 0;
+};
+
+
diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc
new file mode 100644 (file)
index 0000000..3a8c370
--- /dev/null
@@ -0,0 +1,79 @@
+#include "svc_bucket_sync_sobj.h"
+#include "svc_zone.h"
+#include "svc_sys_obj_cache.h"
+#include "svc_bucket_sobj.h"
+
+#include "rgw/rgw_bucket_sync.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+
+RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
+}
+
+void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
+                                  RGWSI_SysObj_Cache *_cache_svc,
+                                  RGWSI_Bucket_SObj *bucket_sobj_svc)
+{
+  svc.zone = _zone_svc;
+  svc.cache = _cache_svc;
+  svc.bucket_sobj = bucket_sobj_svc;
+}
+
+int RGWSI_Bucket_Sync_SObj::do_start()
+{
+  sync_policy_cache.reset(new RGWChainedCacheImpl<bucket_sync_policy_cache_entry>);
+  sync_policy_cache->init(svc.cache);
+
+  return 0;
+}
+
+int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+                                               const rgw_bucket& bucket,
+                                               std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+                                               optional_yield y)
+{
+  string key = RGWSI_Bucket::get_bi_meta_key(bucket);
+  string cache_key("bi/");
+  cache_key.append(key);
+
+  if (auto e = sync_policy_cache->find(cache_key)) {
+    *handler = e->handler;
+    return 0;
+  }
+
+
+  rgw_cache_entry_info cache_info;
+
+  RGWBucketInfo bucket_info;
+
+  int r = svc.bucket_sobj->read_bucket_instance_info(ctx,
+                                                     key,
+                                                     &bucket_info,
+                                                     nullptr,
+                                                     nullptr,
+                                                     y,
+                                                     &cache_info);
+  if (r < 0) {
+    if (r != -ENOENT) {
+      ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << key << ") returned r=" << r << dendl;
+    }
+    return r;
+  }
+
+  bucket_sync_policy_cache_entry e;
+  e.handler.reset(new RGWBucketSyncPolicyHandler(svc.zone, bucket_info));
+
+  r = e.handler->init();
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: RGWBucketSyncPolicyHandler::init() returned r=" << r << dendl;
+    return r;
+  }
+
+  if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
+    ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
+  }
+
+  return 0;
+}
+
diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h
new file mode 100644 (file)
index 0000000..40745c2
--- /dev/null
@@ -0,0 +1,66 @@
+
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+
+#include "svc_meta_be.h"
+#include "svc_bucket_types.h"
+#include "svc_bucket_sync.h"
+
+class RGWSI_Zone;
+class RGWSI_SysObj_Cache;
+class RGWSI_Bucket_SObj;
+
+template <class T>
+class RGWChainedCacheImpl;
+
+class RGWBucketSyncPolicyHandler;
+
+class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
+{
+  struct bucket_sync_policy_cache_entry {
+    std::shared_ptr<RGWBucketSyncPolicyHandler> handler;
+  };
+
+  using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl<bucket_sync_policy_cache_entry>;
+  unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
+
+  int do_start() override;
+
+public:
+  struct Svc {
+    RGWSI_Zone *zone{nullptr};
+    RGWSI_SysObj_Cache *cache{nullptr};
+    RGWSI_Bucket_SObj *bucket_sobj{nullptr};
+  } svc;
+
+  RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {}
+  ~RGWSI_Bucket_Sync_SObj();
+
+  void init(RGWSI_Zone *_zone_svc,
+            RGWSI_SysObj_Cache *_cache_svc,
+            RGWSI_Bucket_SObj *_bucket_sobj_svc);
+
+
+  int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+                         const rgw_bucket& bucket,
+                         std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+                         optional_yield y) override;
+};
+