]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: pubsub: events hold a generic formattable
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 23 Jul 2018 23:43:06 +0000 (16:43 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Instead of encoding specific object store events, make it
hold a generic formattable struct, so that pubsub can be
used in the future for registering other event types.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_common.h
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc

index c5e0a30b596eb067b3aafbdf12a0e9ef9840ed0d..619ba124551566f8ce13a8be4963033d56dcdbac 100644 (file)
@@ -1795,15 +1795,20 @@ struct rgw_obj_key {
   }
   void dump(Formatter *f) const;
   void decode_json(JSONObj *obj);
+
+  string to_str() const {
+    if (instance.empty()) {
+      return name;
+    }
+    char buf[name.size() + instance.size() + 16];
+    snprintf(buf, sizeof(buf), "%s[%s]", name.c_str(), instance.c_str());
+    return buf;
+  }
 };
 WRITE_CLASS_ENCODER(rgw_obj_key)
 
 inline ostream& operator<<(ostream& out, const rgw_obj_key &o) {
-  if (o.instance.empty()) {
-    return out << o.name;
-  } else {
-    return out << o.name << "[" << o.instance << "]";
-  }
+  return out << o.to_str();
 }
 
 inline ostream& operator<<(ostream& out, const rgw_obj_index_key &o) {
index d53a0547bd51c28043bdedabaf9a8930e1e18fc7..f8e968d4953ab6cb8c2cf5e98b0ead7cc26ff61f 100644 (file)
@@ -5,51 +5,14 @@
 
 #define dout_subsys ceph_subsys_rgw
 
-void encode_json(const char *name, const RGWPubSubEventType& val, Formatter *f)
-{
-  switch (val) {
-    case EVENT_UNKNOWN:
-      encode_json(name, "EVENT_UNKNOWN", f);
-      break;
-    case OBJECT_CREATE:
-      encode_json(name, "OBJECT_CREATE", f);
-      break;
-    case OBJECT_DELETE:
-      encode_json(name, "OBJECT_DELETE", f);
-      break;
-    case DELETE_MARKER_CREATE:
-      encode_json(name, "DELETE_MARKER_CREATE", f);
-      break;
-  };
-}
-
 
 void rgw_pubsub_event::dump(Formatter *f) const
 {
   encode_json("id", id, f);
-  {
-    Formatter::ObjectSection s(*f, "bucket");
-    encode_json("name", bucket.name, f);
-    encode_json("id", bucket.bucket_id, f);
-  }
-  {
-    Formatter::ObjectSection s(*f, "object");
-    encode_json("name", key.name, f);
-    encode_json("version-id", key.instance, f);
-  }
-
-  utime_t mt(mtime);
-  encode_json("mtime", mt, f);
   encode_json("event", event, f);
   utime_t ut(timestamp);
   encode_json("timestamp", ut, f);
-
-  {
-    Formatter::ObjectSection s(*f, "attrs");
-    for (auto& attr : attrs) {
-      encode_json(attr.first.c_str(), attr.second.c_str(), f);
-    }
-  }
+  encode_json("info", info, f);
 }
 
 void rgw_pubsub_topic::dump(Formatter *f) const
index 40f3abb6f985c9dd98dd9880157453fe16470d04..2871f99903eb26636f8ee1a4d33cf078a9714524 100644 (file)
@@ -5,48 +5,30 @@
 #include "rgw_tools.h"
 
 
-enum RGWPubSubEventType {
-  EVENT_UNKNOWN        = 0,
-  OBJECT_CREATE        = 1,
-  OBJECT_DELETE        = 2,
-  DELETE_MARKER_CREATE = 3,
-};
-
 struct rgw_pubsub_event {
   string id;
-  rgw_bucket bucket;
-  rgw_obj_key key;
-  ceph::real_time mtime;
-
-  RGWPubSubEventType event;
+  string event;
+  string source;
   ceph::real_time timestamp;
-
-  std::vector<std::pair<std::string, std::string> > attrs;
+  JSONFormattable info;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(id, bl);
-    encode(bucket, bl);
-    encode(key, bl);
-    encode(mtime, bl);
-    uint32_t e = (uint32_t)event;
-    encode(e, bl);
+    encode(event, bl);
+    encode(source, bl);
     encode(timestamp, bl);
-    encode(attrs, bl);
+    encode(info, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(id, bl);
-    decode(bucket, bl);
-    decode(key, bl);
-    decode(mtime, bl);
-    uint32_t e;
-    decode(e, bl);
-    event = (RGWPubSubEventType)e;
+    decode(event, bl);
+    decode(source, bl);
     decode(timestamp, bl);
-    decode(attrs, bl);
+    decode(info, bl);
     DECODE_FINISH(bl);
   }
 
index 61045a1d3af878a29e74794cd9e01e1501532e49..98124e33b82085436f4195b3b9dd9fd1c772a0c2 100644 (file)
@@ -298,11 +298,94 @@ struct PSConfig {
   }
 };
 
+enum RGWPubSubEventType {
+  EVENT_UNKNOWN        = 0,
+  OBJECT_CREATE        = 1,
+  OBJECT_DELETE        = 2,
+  DELETE_MARKER_CREATE = 3,
+};
+
+static const char *event_name(const RGWPubSubEventType& val)
+{
+  switch (val) {
+    case OBJECT_CREATE:
+      return "OBJECT_CREATE";
+    case OBJECT_DELETE:
+      return "OBJECT_DELETE";
+    case DELETE_MARKER_CREATE:
+      return "DELETE_MARKER_CREATE";
+    default:
+      return "EVENT_UNKNOWN";
+  };
+}
+
 using PSConfigRef = std::shared_ptr<PSConfig>;
 using EventRef = std::shared_ptr<rgw_pubsub_event>;
 
-static void make_event_ref(EventRef *event) {
+struct objstore_event {
+  string id;
+  const rgw_bucket& bucket;
+  const rgw_obj_key& key;
+  const ceph::real_time& mtime;
+  const std::vector<std::pair<std::string, std::string> > *attrs;
+
+  objstore_event(const rgw_bucket& _bucket,
+                 const rgw_obj_key& _key,
+                 const ceph::real_time& _mtime,
+                 const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
+                                                  key(_key),
+                                                  mtime(_mtime),
+                                                  attrs(_attrs) {}
+
+  string get_hash() {
+    string etag;
+    RGWMD5Etag hash;
+    hash.update(bucket.bucket_id);
+    hash.update(key.name);
+    hash.update(key.instance);
+    hash.finish(&etag);
+
+    assert(etag.size() > 8);
+
+    return etag.substr(0, 8);
+  }
+
+  void dump(Formatter *f) const {
+    encode_json("bucket", bucket, f);
+    encode_json("key", key, f);
+    utime_t mt(mtime);
+    encode_json("mtime", mt, f);
+    Formatter::ObjectSection s(*f, "attrs");
+    if (attrs) {
+      for (auto& attr : *attrs) {
+        encode_json(attr.first.c_str(), attr.second.c_str(), f);
+      }
+    }
+  }
+};
+
+static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
+                       const rgw_obj_key& key,
+                       const ceph::real_time& mtime,
+                       const std::vector<std::pair<std::string, std::string> > *attrs,
+                       RGWPubSubEventType event_type,
+                       EventRef *event) {
   *event = std::make_shared<rgw_pubsub_event>();
+
+  EventRef& e = *event;
+  e->event = event_name(event_type);
+  e->source = bucket.name + "/" + key.name;
+  e->timestamp = real_clock::now();
+
+  objstore_event oevent(bucket, key, mtime, attrs);
+
+  string hash = oevent.get_hash();
+  utime_t ts(e->timestamp);
+  char buf[64];
+  snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
+  e->id = buf;
+
+  encode_json("info", oevent, &e->info);
 }
 
 class PSManager;
@@ -331,27 +414,6 @@ class PSEvent {
 public:
   PSEvent(EventRef& _event) : event(_event) {}
 
-  string generate_message_id() {
-    char buf[64];
-    utime_t ts(event->timestamp);
-
-    string etag;
-    RGWMD5Etag hash;
-    hash.update(event->bucket.bucket_id);
-    hash.update(event->key.name);
-    hash.update(event->key.instance);
-    hash.finish(&etag);
-
-    assert(etag.size() > 8);
-
-    etag = etag.substr(0, 8);
-    snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str());
-
-    event->id = buf;
-
-    return buf;
-  }
-
   void format(bufferlist *bl) {
     bl->append(json_str("event", *event));
   }
@@ -359,7 +421,10 @@ public:
   void encode_event(bufferlist& bl) const {
     encode(*event, bl);
   }
-        
+
+  const string& id() {
+    return event->id;
+  }
 };
 
 template <class T>
@@ -561,7 +626,7 @@ class PSSubscription {
       reenter(this) {
 
         put_obj.bucket = sub->bucket;
-        put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id());
+        put_obj.key = rgw_obj_key(oid_prefix + pse.id());
 
         pse.format(&put_obj.data);
        
@@ -934,7 +999,7 @@ public:
       ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
 
       for (titer = topics->begin(); titer != topics->end(); ++titer) {
-        ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
+        ldout(sync_env->cct, 10) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
 
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
@@ -990,22 +1055,19 @@ public:
       ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
                                << " attrs=" << attrs << dendl;
-      make_event_ref(&event);
-      event->bucket = bucket_info.bucket;
-      event->key = key;
-      event->mtime = mtime;
-      event->event = OBJECT_CREATE;
-      event->timestamp = real_clock::now();
       {
+        std::vector<std::pair<std::string, std::string> > attrs;
         for (auto& attr : attrs) {
           string k = attr.first;
           if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
             k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
           }
-          string v = attr.second.to_str();
-          auto p = std::make_pair(k, v);
-          event->attrs.push_back(p);
+          attrs.push_back(std::make_pair(k, attr.second));
         }
+        make_event_ref(sync_env->cct,
+                       bucket_info.bucket, key,
+                       mtime, &attrs,
+                       OBJECT_CREATE, &event);
       }
 
       yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics));
@@ -1114,12 +1176,12 @@ public:
         ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
         return set_cr_done();
       }
-      make_event_ref(&event);
-      event->event = event_type;
-      event->bucket = bucket;
-      event->key = key;
-      event->mtime = mtime;
-      event->timestamp = real_clock::now();
+      {
+        make_event_ref(sync_env->cct,
+                       bucket, key,
+                       mtime, nullptr,
+                       event_type, &event);
+      }
 
       yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics));
       if (retcode < 0) {