]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: pubsub: more events coverage
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 2 Jul 2018 01:07:58 +0000 (18:07 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 23e5d14c8e86c0bb4c09ed3b002f75001402f475..fbd3eb389066241b28e6e252a5e0374c394fd888 100644 (file)
@@ -236,8 +236,8 @@ struct PSConfig {
     sync_instance = instance_id;
   }
 
-  void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
-    string path = bucket_info.bucket.name + "/" + key.name;
+  void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
+    string path = bucket.name + "/" + key.name;
 
     result->clear();
 
@@ -267,7 +267,7 @@ struct PSConfig {
         continue;
       }
 
-      ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
+      ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
       result->push_back(&topic->second);
     } while (iter != notifications.begin());
   }
@@ -331,6 +331,8 @@ public:
     etag = etag.substr(0, 8);
     snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str());
 
+    event->id = buf;
+
     return buf;
   }
 
@@ -522,6 +524,7 @@ class PSSubscription {
     PSConfigRef& conf;
     PSSubConfigRef& sub_conf;
     rgw_object_simple_put_params put_obj;
+    string oid_prefix;
     int i;
   public:
     StoreEventCR(RGWDataSyncEnv *_sync_env,
@@ -532,13 +535,14 @@ class PSSubscription {
                                      pse(_event),
                                      conf(sub->env->conf),
                                      sub_conf(sub->sub_conf) {
+      oid_prefix = sub->sub_conf->data_oid_prefix;
     }
 
     int operate() override {
       reenter(this) {
 
         put_obj.bucket = sub->bucket;
-        put_obj.key = rgw_obj_key(pse.generate_message_id());
+        put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id());
 
         pse.format(&put_obj.data);
         
@@ -748,42 +752,34 @@ public:
   }
 };
 
-
-class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+class RGWPSHandleObjEvent : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
-  uint64_t versioned_epoch;
+  EventRef event;
+
   vector<PSTopicConfig *> topics;
   vector<PSTopicConfig *>::iterator titer;
   set<string>::iterator siter;
   PSSubscriptionRef sub;
-  EventRef event;
 public:
-  RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                          PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
-                                                                      sync_env(_sync_env),
-                                                                      env(_env),
-                                                                      versioned_epoch(_versioned_epoch) {
-#warning this will need to change obviously
-    env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics);
+  RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env,
+                      PSEnvRef _env,
+                      EventRef& _event) : RGWCoroutine(_sync_env->cct),
+                                          sync_env(_sync_env),
+                                          env(_env),
+                                          event(_event) {
+#warning this will need to change
+    env->conf->get_topics(sync_env->cct, event->bucket, event->key, &topics);
   }
   int operate() override {
     reenter(this) {
-      ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
-                               << " attrs=" << attrs << dendl;
-      make_event_ref(&event);
-      event->bucket = bucket_info.bucket;
-      event->key = key;
-      event->event = OBJECT_CREATE;
-      event->timestamp = real_clock::now();
+      ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone
+                               << " event=" << json_str("event", *event, false) << dendl;
 
       ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
-#warning more event init
 
       for (titer = topics.begin(); titer != topics.end(); ++titer) {
-        ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
+        ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
 
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
@@ -800,7 +796,7 @@ public:
             continue;
           }
 
-#warning publish notification
+#warning push notification
         }
         if (retcode < 0) {
           return set_cr_error(retcode);
@@ -812,6 +808,42 @@ public:
   }
 };
 
+
+class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+  RGWDataSyncEnv *sync_env;
+  PSEnvRef env;
+  uint64_t versioned_epoch;
+  EventRef event;
+public:
+  RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                          PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+                                                                      sync_env(_sync_env),
+                                                                      env(_env),
+                                                                      versioned_epoch(_versioned_epoch) {
+  }
+  int operate() override {
+    reenter(this) {
+      ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
+                               << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+                               << " attrs=" << attrs << dendl;
+      make_event_ref(&event);
+      event->bucket = bucket_info.bucket;
+      event->key = key;
+      event->mtime = mtime;
+      event->event = OBJECT_CREATE;
+      event->timestamp = real_clock::now();
+
+      yield call(new RGWPSHandleObjEvent(sync_env, env, event));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
   PSEnvRef env;
   uint64_t versioned_epoch;
@@ -834,31 +866,35 @@ public:
   }
 };
 
-class RGWPSRemoveRemoteObjCBCR : public RGWCoroutine {
+class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
+  PSEnvRef env;
   RGWBucketInfo bucket_info;
   rgw_obj_key key;
   ceph::real_time mtime;
-  PSEnvRef env;
+  RGWPubSubEventType event_type;
+  EventRef event;
 public:
-  RGWPSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
-                          PSEnvRef _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                        bucket_info(_bucket_info), key(_key),
-                                                        mtime(_mtime), env(_env) {}
+  RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
+                           PSEnvRef _env,
+                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+                           RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct),
+                                                             sync_env(_sync_env),
+                                                             env(_env),
+                                                             bucket_info(_bucket_info), key(_key),
+                                                             mtime(_mtime), event_type(_event_type) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
-      yield {
-#if 0
-        string path = conf->get_obj_path(bucket_info, key);
+      make_event_ref(&event);
+      event->event = event_type;
+      event->bucket = bucket_info.bucket;
+      event->key = key;
+      event->mtime = mtime;
+      event->timestamp = real_clock::now();
 
-        call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
-                                         sync_env->http_manager,
-                                         path, nullptr /* params */));
-#endif
-      }
+      yield call(new RGWPSHandleObjEvent(sync_env, env, event));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -899,7 +935,6 @@ public:
     return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch);
   }
   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    /* versioned and versioned epoch params are useless in the elasticsearch backend case */
     ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
 #warning this should be done correctly
 #if 0
@@ -908,7 +943,7 @@ public:
       return nullptr;
     }
 #endif
-    return new RGWPSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, env);
+    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
   }
   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
@@ -919,7 +954,7 @@ public:
     ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
 #endif
 #warning delete markers need to be handled too
-    return NULL;
+    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
   }
 };