#define dout_subsys ceph_subsys_rgw
-void encode_json(const char *name, const RGWPubSubEventType& val, Formatter *f)
-{
- switch (val) {
- case EVENT_UNKNOWN:
- encode_json(name, "EVENT_UNKNOWN", f);
- break;
- case OBJECT_CREATE:
- encode_json(name, "OBJECT_CREATE", f);
- break;
- case OBJECT_DELETE:
- encode_json(name, "OBJECT_DELETE", f);
- break;
- case DELETE_MARKER_CREATE:
- encode_json(name, "DELETE_MARKER_CREATE", f);
- break;
- };
-}
-
void rgw_pubsub_event::dump(Formatter *f) const
{
encode_json("id", id, f);
- {
- Formatter::ObjectSection s(*f, "bucket");
- encode_json("name", bucket.name, f);
- encode_json("id", bucket.bucket_id, f);
- }
- {
- Formatter::ObjectSection s(*f, "object");
- encode_json("name", key.name, f);
- encode_json("version-id", key.instance, f);
- }
-
- utime_t mt(mtime);
- encode_json("mtime", mt, f);
encode_json("event", event, f);
utime_t ut(timestamp);
encode_json("timestamp", ut, f);
-
- {
- Formatter::ObjectSection s(*f, "attrs");
- for (auto& attr : attrs) {
- encode_json(attr.first.c_str(), attr.second.c_str(), f);
- }
- }
+ encode_json("info", info, f);
}
void rgw_pubsub_topic::dump(Formatter *f) const
}
};
+enum RGWPubSubEventType {
+ EVENT_UNKNOWN = 0,
+ OBJECT_CREATE = 1,
+ OBJECT_DELETE = 2,
+ DELETE_MARKER_CREATE = 3,
+};
+
+static const char *event_name(const RGWPubSubEventType& val)
+{
+ switch (val) {
+ case OBJECT_CREATE:
+ return "OBJECT_CREATE";
+ case OBJECT_DELETE:
+ return "OBJECT_DELETE";
+ case DELETE_MARKER_CREATE:
+ return "DELETE_MARKER_CREATE";
+ default:
+ return "EVENT_UNKNOWN";
+ };
+}
+
using PSConfigRef = std::shared_ptr<PSConfig>;
using EventRef = std::shared_ptr<rgw_pubsub_event>;
-static void make_event_ref(EventRef *event) {
+struct objstore_event {
+ string id;
+ const rgw_bucket& bucket;
+ const rgw_obj_key& key;
+ const ceph::real_time& mtime;
+ const std::vector<std::pair<std::string, std::string> > *attrs;
+
+ objstore_event(const rgw_bucket& _bucket,
+ const rgw_obj_key& _key,
+ const ceph::real_time& _mtime,
+ const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
+ key(_key),
+ mtime(_mtime),
+ attrs(_attrs) {}
+
+ string get_hash() {
+ string etag;
+ RGWMD5Etag hash;
+ hash.update(bucket.bucket_id);
+ hash.update(key.name);
+ hash.update(key.instance);
+ hash.finish(&etag);
+
+ assert(etag.size() > 8);
+
+ return etag.substr(0, 8);
+ }
+
+ void dump(Formatter *f) const {
+ encode_json("bucket", bucket, f);
+ encode_json("key", key, f);
+ utime_t mt(mtime);
+ encode_json("mtime", mt, f);
+ Formatter::ObjectSection s(*f, "attrs");
+ if (attrs) {
+ for (auto& attr : *attrs) {
+ encode_json(attr.first.c_str(), attr.second.c_str(), f);
+ }
+ }
+ }
+};
+
+static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
+ const rgw_obj_key& key,
+ const ceph::real_time& mtime,
+ const std::vector<std::pair<std::string, std::string> > *attrs,
+ RGWPubSubEventType event_type,
+ EventRef *event) {
*event = std::make_shared<rgw_pubsub_event>();
+
+ EventRef& e = *event;
+ e->event = event_name(event_type);
+ e->source = bucket.name + "/" + key.name;
+ e->timestamp = real_clock::now();
+
+ objstore_event oevent(bucket, key, mtime, attrs);
+
+ string hash = oevent.get_hash();
+ utime_t ts(e->timestamp);
+ char buf[64];
+ snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
+ e->id = buf;
+
+ encode_json("info", oevent, &e->info);
}
class PSManager;
public:
PSEvent(EventRef& _event) : event(_event) {}
- string generate_message_id() {
- char buf[64];
- utime_t ts(event->timestamp);
-
- string etag;
- RGWMD5Etag hash;
- hash.update(event->bucket.bucket_id);
- hash.update(event->key.name);
- hash.update(event->key.instance);
- hash.finish(&etag);
-
- assert(etag.size() > 8);
-
- etag = etag.substr(0, 8);
- snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str());
-
- event->id = buf;
-
- return buf;
- }
-
void format(bufferlist *bl) {
bl->append(json_str("event", *event));
}
void encode_event(bufferlist& bl) const {
encode(*event, bl);
}
-
+
+ const string& id() {
+ return event->id;
+ }
};
template <class T>
reenter(this) {
put_obj.bucket = sub->bucket;
- put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id());
+ put_obj.key = rgw_obj_key(oid_prefix + pse.id());
pse.format(&put_obj.data);
ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
for (titer = topics->begin(); titer != topics->end(); ++titer) {
- ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
+ ldout(sync_env->cct, 10) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
- make_event_ref(&event);
- event->bucket = bucket_info.bucket;
- event->key = key;
- event->mtime = mtime;
- event->event = OBJECT_CREATE;
- event->timestamp = real_clock::now();
{
+ std::vector<std::pair<std::string, std::string> > attrs;
for (auto& attr : attrs) {
string k = attr.first;
if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
}
- string v = attr.second.to_str();
- auto p = std::make_pair(k, v);
- event->attrs.push_back(p);
+ attrs.push_back(std::make_pair(k, attr.second));
}
+ make_event_ref(sync_env->cct,
+ bucket_info.bucket, key,
+ mtime, &attrs,
+ OBJECT_CREATE, &event);
}
yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics));
ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
return set_cr_done();
}
- make_event_ref(&event);
- event->event = event_type;
- event->bucket = bucket;
- event->key = key;
- event->mtime = mtime;
- event->timestamp = real_clock::now();
+ {
+ make_event_ref(sync_env->cct,
+ bucket, key,
+ mtime, nullptr,
+ event_type, &event);
+ }
yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics));
if (retcode < 0) {