From a0f0a25e48c106a93d2a79f44c3c6a8bc86c88b6 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 3 Jul 2018 16:01:16 -0700 Subject: [PATCH] rgw: pubsub: store event in bucket index Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_tools.cc | 4 ++++ src/rgw/rgw_cr_tools.h | 1 + src/rgw/rgw_sync_module_pubsub.cc | 6 ++++++ src/rgw/rgw_tools.cc | 9 ++++++++- src/rgw/rgw_tools.h | 5 +++++ 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_cr_tools.cc b/src/rgw/rgw_cr_tools.cc index 69eed3689b2..a07e709a96b 100644 --- a/src/rgw/rgw_cr_tools.cc +++ b/src/rgw/rgw_cr_tools.cc @@ -234,6 +234,10 @@ int RGWObjectSimplePutCR::Request::_send_request() return -ret; } + if (params.user_data) { + obj->set_user_data(*params.user_data); + } + ret = obj->put(params.data, params.attrs); if (ret < 0) { cerr << "ERROR: put object returned error: " << cpp_strerror(-ret) << std::endl; diff --git a/src/rgw/rgw_cr_tools.h b/src/rgw/rgw_cr_tools.h index 635bbb9302f..748fd94fd2c 100644 --- a/src/rgw/rgw_cr_tools.h +++ b/src/rgw/rgw_cr_tools.h @@ -56,6 +56,7 @@ struct rgw_object_simple_put_params { rgw_obj_key key; bufferlist data; map attrs; + std::optional user_data; }; using RGWObjectSimplePutCR = RGWSimpleWriteOnlyAsyncCR; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 184a61be8f1..af087f4bdee 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -559,6 +559,12 @@ class PSSubscription { put_obj.key = rgw_obj_key(oid_prefix + pse.generate_message_id()); pse.format(&put_obj.data); + + { + bufferlist bl64; + put_obj.data.encode_base64(bl64); + put_obj.user_data = bl64.to_str(); + } yield call(new RGWObjectSimplePutCR(sync_env->async_rados, sync_env->store, diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 2883da6b163..0888bab7672 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -410,9 +410,16 @@ int RGWDataAccess::Object::put(bufferlist& data, } attrs[RGW_ATTR_ACL] = *aclbl; + string *puser_data = nullptr; + if (user_data) { + puser_data = &(*user_data); + } + return processor.complete(obj_size, etag, &mtime, mtime, - attrs, delete_at); + attrs, delete_at, + nullptr, nullptr, + puser_data); } void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy) diff --git a/src/rgw/rgw_tools.h b/src/rgw/rgw_tools.h index 0741eb6fa69..59afa8357bf 100644 --- a/src/rgw/rgw_tools.h +++ b/src/rgw/rgw_tools.h @@ -139,6 +139,7 @@ public: string etag; std::optional olh_epoch; ceph::real_time delete_at; + std::optional user_data; std::optional aclbl; @@ -166,6 +167,10 @@ public: delete_at = _delete_at; } + void set_user_data(const string& _user_data) { + user_data = _user_data; + } + void set_policy(const RGWAccessControlPolicy& policy); friend class Bucket; -- 2.39.5