]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: defined bucket sync sources manager
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 19 Aug 2019 22:43:33 +0000 (15:43 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc

index 5aef8f3b84dd6918928d554fd4c3a97e2fafbd39..0ea81b46af1c662d1824e5e03eec91cba0e3692c 100644 (file)
@@ -2181,9 +2181,12 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
 }
 
 int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
+#warning FIXME
+#if 0
   if (!ctl.bucket->bucket_exports_data(bucket_info.bucket, null_yield)) {
     return 0;
   }
+#endif
 
   auto& bucket = bucket_info.bucket;
 
index aad992d0cba9f9ee252371dbf3690346c970a1f6..72e7dedb69b546e8369624fdeb6adfbe1c706376 100644 (file)
@@ -45,8 +45,8 @@ class RGWBucketSyncPolicyHandler {
 
 public:
   RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
-                             RGWBucketInfo& _bucket_info);
-
+                             RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc),
+                                                            bucket_info(_bucket_info) {}
   int init();
 
   const RGWBucketInfo& get_bucket_info() const {
index ed26f3e349afaa22e59e98a4753f30523ae08538..79ec05e8f55d6571f9ae9da459a7119769dd3768 100644 (file)
@@ -3331,15 +3331,110 @@ int RGWBucketShardIncrementalSyncCR::operate()
   return 0;
 }
 
-#if 0
+class RGWBucketSyncSourcesManager {
+public:
+  static string sync_sources_oid(const rgw_bucket bucket) {
+    return bucket_sync_sources_oid_prefix + "." + bucket.to_str();
+  }
+
+  static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
+    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, status_oid(bucket)),
+  }
+};
+
+struct rgw_bucket_sync_source_local_info {
+  string id;
+  string type;
+  string zone;
+  rgw_bucket bucket;
+  /* FIXME: config */
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(type, bl);
+    encode(zone, bl);
+    encode(bucket, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(type, bl);
+    decode(zone, bl);
+    decode(bucket, bl);
+    DECODE_FINISH(bl);
+  }
+  void dump(ceph::Formatter *f) const {
+    encode_json("id", id, f);
+    encode_json("type", type, f);
+    encode_json("zone", zone, f);
+    encode_json("bucket", bucket, f);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_bucket_sync_source_local_info)
+
+struct rgw_bucket_sync_sources_local_info {
+  map<string, rgw_bucket_sync_source_local_info> sources; /* id -> source */
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(sources, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(sources, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter *f) const {
+    encode_json("sources", type, f);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
+
+class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
+  rgw_bucket bucket;
+
+  rgw_raw_obj sources_obj;
+
+  RGWSyncTraceNodeRef tn;
+
+public:
+  RGWReadBucketSourcesInfoCR(RGWSI_Zone *_zone_svc,
+                            const rgw_bucket& _bucket,
+                            const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_zone_svc->cct),
+      bucket(_bucket),
+      source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
+                                         SSTR(bucket))) {
+  }
+
+  int operate() override;
+};
+
+int RGWReadBucketSourcesInfoCR::operate()
+{
+  reenter(this) {
+    yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+                                                                            sync_env->svc.sysobj,
+                                                                            RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
+                                                                            &pinfo));
+    return set_cr_done();
+  }
+}
+
 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
   rgw_sync_source source;
-  RGWMetaSyncEnv meta_sync_env;
 
-  const std::string status_oid;
+  rgw_raw_obj sources_obj;
 
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
@@ -3347,11 +3442,15 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, const rgw_bucket& bucket, const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bucket(_bucket),
-      status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, bucket)),
-      tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_source",
-                                         SSTR(bucket_shard_str{bs}))) {
+  RGWRunBucketSourcesSyncCR(RGWSI_Zone *_zone_svc,
+                            const rgw_bucket& _bucket,
+                            const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sc->cct),
+      sc(_sc), sync_env(_sc->env),
+      bucket(_bucket),
+      source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
+                                         SSTR(bucket))) {
   }
   ~RGWRunBucketSourcesSyncCR() override {
     if (lease_cr) {
@@ -3369,7 +3468,7 @@ int RGWRunBucketSourcesSyncCR::operate()
       set_status("acquiring sync lock");
       auto store = sync_env->store;
       lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                              rgw_raw_obj(store->svc->zone->get_zone_params().log_pool, status_oid),
+                                              RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
                                               "sync_lock",
                                               cct->_conf->rgw_sync_lease_period,
                                               this));
@@ -3386,7 +3485,7 @@ int RGWRunBucketSourcesSyncCR::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWReadBucketSourcesInfoCR(sc, bs.bucket, &info));
+    yield call(new RGWReadBucketSourcesInfoCR(sc, bucket, &info));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3413,7 +3512,6 @@ int RGWRunBucketSourcesSyncCR::operate()
 
   return 0;
 }
-#endif
 
 int RGWRunBucketSyncCoroutine::operate()
 {