From 66ee1038d2f66ee43d9eb5d4fdbf948f1c708e65 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 23 Jul 2018 16:43:06 -0700 Subject: [PATCH] rgw: pubsub: events hold a generic formattable Instead of encoding specific object store events, make it hold a generic formattable struct, so that pubsub can be used in the future for registering other event types. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_common.h | 15 ++-- src/rgw/rgw_pubsub.cc | 39 +------- src/rgw/rgw_pubsub.h | 36 ++------ src/rgw/rgw_sync_module_pubsub.cc | 142 +++++++++++++++++++++--------- 4 files changed, 122 insertions(+), 110 deletions(-) diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index c5e0a30b596..619ba124551 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1795,15 +1795,20 @@ struct rgw_obj_key { } void dump(Formatter *f) const; void decode_json(JSONObj *obj); + + string to_str() const { + if (instance.empty()) { + return name; + } + char buf[name.size() + instance.size() + 16]; + snprintf(buf, sizeof(buf), "%s[%s]", name.c_str(), instance.c_str()); + return buf; + } }; WRITE_CLASS_ENCODER(rgw_obj_key) inline ostream& operator<<(ostream& out, const rgw_obj_key &o) { - if (o.instance.empty()) { - return out << o.name; - } else { - return out << o.name << "[" << o.instance << "]"; - } + return out << o.to_str(); } inline ostream& operator<<(ostream& out, const rgw_obj_index_key &o) { diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index d53a0547bd5..f8e968d4953 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -5,51 +5,14 @@ #define dout_subsys ceph_subsys_rgw -void encode_json(const char *name, const RGWPubSubEventType& val, Formatter *f) -{ - switch (val) { - case EVENT_UNKNOWN: - encode_json(name, "EVENT_UNKNOWN", f); - break; - case OBJECT_CREATE: - encode_json(name, "OBJECT_CREATE", f); - break; - case OBJECT_DELETE: - encode_json(name, "OBJECT_DELETE", f); - break; - case DELETE_MARKER_CREATE: - encode_json(name, "DELETE_MARKER_CREATE", f); - break; - }; -} - void rgw_pubsub_event::dump(Formatter *f) const { encode_json("id", id, f); - { - Formatter::ObjectSection s(*f, "bucket"); - encode_json("name", bucket.name, f); - encode_json("id", bucket.bucket_id, f); - } - { - Formatter::ObjectSection s(*f, "object"); - encode_json("name", key.name, f); - encode_json("version-id", key.instance, f); - } - - utime_t mt(mtime); - encode_json("mtime", mt, f); encode_json("event", event, f); utime_t ut(timestamp); encode_json("timestamp", ut, f); - - { - Formatter::ObjectSection s(*f, "attrs"); - for (auto& attr : attrs) { - encode_json(attr.first.c_str(), attr.second.c_str(), f); - } - } + encode_json("info", info, f); } void rgw_pubsub_topic::dump(Formatter *f) const diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 40f3abb6f98..2871f99903e 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -5,48 +5,30 @@ #include "rgw_tools.h" -enum RGWPubSubEventType { - EVENT_UNKNOWN = 0, - OBJECT_CREATE = 1, - OBJECT_DELETE = 2, - DELETE_MARKER_CREATE = 3, -}; - struct rgw_pubsub_event { string id; - rgw_bucket bucket; - rgw_obj_key key; - ceph::real_time mtime; - - RGWPubSubEventType event; + string event; + string source; ceph::real_time timestamp; - - std::vector > attrs; + JSONFormattable info; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(id, bl); - encode(bucket, bl); - encode(key, bl); - encode(mtime, bl); - uint32_t e = (uint32_t)event; - encode(e, bl); + encode(event, bl); + encode(source, bl); encode(timestamp, bl); - encode(attrs, bl); + encode(info, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(id, bl); - decode(bucket, bl); - decode(key, bl); - decode(mtime, bl); - uint32_t e; - decode(e, bl); - event = (RGWPubSubEventType)e; + decode(event, bl); + decode(source, bl); decode(timestamp, bl); - decode(attrs, bl); + decode(info, bl); DECODE_FINISH(bl); } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 61045a1d3af..98124e33b82 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -298,11 +298,94 @@ struct PSConfig { } }; +enum RGWPubSubEventType { + EVENT_UNKNOWN = 0, + OBJECT_CREATE = 1, + OBJECT_DELETE = 2, + DELETE_MARKER_CREATE = 3, +}; + +static const char *event_name(const RGWPubSubEventType& val) +{ + switch (val) { + case OBJECT_CREATE: + return "OBJECT_CREATE"; + case OBJECT_DELETE: + return "OBJECT_DELETE"; + case DELETE_MARKER_CREATE: + return "DELETE_MARKER_CREATE"; + default: + return "EVENT_UNKNOWN"; + }; +} + using PSConfigRef = std::shared_ptr; using EventRef = std::shared_ptr; -static void make_event_ref(EventRef *event) { +struct objstore_event { + string id; + const rgw_bucket& bucket; + const rgw_obj_key& key; + const ceph::real_time& mtime; + const std::vector > *attrs; + + objstore_event(const rgw_bucket& _bucket, + const rgw_obj_key& _key, + const ceph::real_time& _mtime, + const std::vector > *_attrs) : bucket(_bucket), + key(_key), + mtime(_mtime), + attrs(_attrs) {} + + string get_hash() { + string etag; + RGWMD5Etag hash; + hash.update(bucket.bucket_id); + hash.update(key.name); + hash.update(key.instance); + hash.finish(&etag); + + assert(etag.size() > 8); + + return etag.substr(0, 8); + } + + void dump(Formatter *f) const { + encode_json("bucket", bucket, f); + encode_json("key", key, f); + utime_t mt(mtime); + encode_json("mtime", mt, f); + Formatter::ObjectSection s(*f, "attrs"); + if (attrs) { + for (auto& attr : *attrs) { + encode_json(attr.first.c_str(), attr.second.c_str(), f); + } + } + } +}; + +static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, + const rgw_obj_key& key, + const ceph::real_time& mtime, + const std::vector > *attrs, + RGWPubSubEventType event_type, + EventRef *event) { *event = std::make_shared(); + + EventRef& e = *event; + e->event = event_name(event_type); + e->source = bucket.name + "/" + key.name; + e->timestamp = real_clock::now(); + + objstore_event oevent(bucket, key, mtime, attrs); + + string hash = oevent.get_hash(); + utime_t ts(e->timestamp); + char buf[64]; + snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); + e->id = buf; + + encode_json("info", oevent, &e->info); } class PSManager; @@ -331,27 +414,6 @@ class PSEvent { public: PSEvent(EventRef& _event) : event(_event) {} - string generate_message_id() { - char buf[64]; - utime_t ts(event->timestamp); - - string etag; - RGWMD5Etag hash; - hash.update(event->bucket.bucket_id); - hash.update(event->key.name); - hash.update(event->key.instance); - hash.finish(&etag); - - assert(etag.size() > 8); - - etag = etag.substr(0, 8); - snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str()); - - event->id = buf; - - return buf; - } - void format(bufferlist *bl) { bl->append(json_str("event", *event)); } @@ -359,7 +421,10 @@ public: void encode_event(bufferlist& bl) const { encode(*event, bl); } - + + const string& id() { + return event->id; + } }; template @@ -561,7 +626,7 @@ class PSSubscription { reenter(this) { put_obj.bucket = sub->bucket; - put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id()); + put_obj.key = rgw_obj_key(oid_prefix + pse.id()); pse.format(&put_obj.data); @@ -934,7 +999,7 @@ public: ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; for (titer = topics->begin(); titer != topics->end(); ++titer) { - ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; + ldout(sync_env->cct, 10) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl; @@ -990,22 +1055,19 @@ public: ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; - make_event_ref(&event); - event->bucket = bucket_info.bucket; - event->key = key; - event->mtime = mtime; - event->event = OBJECT_CREATE; - event->timestamp = real_clock::now(); { + std::vector > attrs; for (auto& attr : attrs) { string k = attr.first; if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); } - string v = attr.second.to_str(); - auto p = std::make_pair(k, v); - event->attrs.push_back(p); + attrs.push_back(std::make_pair(k, attr.second)); } + make_event_ref(sync_env->cct, + bucket_info.bucket, key, + mtime, &attrs, + OBJECT_CREATE, &event); } yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics)); @@ -1114,12 +1176,12 @@ public: ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; return set_cr_done(); } - make_event_ref(&event); - event->event = event_type; - event->bucket = bucket; - event->key = key; - event->mtime = mtime; - event->timestamp = real_clock::now(); + { + make_event_ref(sync_env->cct, + bucket, key, + mtime, nullptr, + event_type, &event); + } yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics)); if (retcode < 0) { -- 2.39.5