From: Yehuda Sadeh Date: Sat, 30 Jun 2018 03:37:35 +0000 (-0700) Subject: rgw: pubsub: message storage functionality X-Git-Tag: v14.1.0~616^2~61 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bdd1a690a66014033e51729a1bd921fe3206632b;p=ceph.git rgw: pubsub: message storage functionality Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 725d098e667ae..a6e9aaf103811 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -228,6 +228,43 @@ struct PSEnv { using PSEnvRef = std::shared_ptr; +class PSEvent { + EventRef event; + +public: + PSEvent(EventRef& _event) : event(_event) {} + + string generate_message_id() { + char buf[64]; + utime_t ts(event->timestamp); + + string etag; + RGWMD5Etag hash; + hash.update(event->bucket.bucket_id); + hash.update(event->key.name); + hash.update(event->key.instance); + hash.finish(&etag); + + assert(etag.size() > 8); + + etag = etag.substr(0, 8); + snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str()); + + return buf; + } + + void format(bufferlist *bl) { + stringstream ss; + JSONFormatter f; + + encode_json("event", *event, &f); + f.flush(ss); + + bl->append(ss.str()); + } + +}; + class PSSubscription : public RefCountedObject { RGWDataSyncEnv *sync_env; PSEnvRef env; @@ -235,6 +272,7 @@ class PSSubscription : public RefCountedObject { shared_ptr get_bucket_info_result; RGWBucketInfo *bucket_info{nullptr}; RGWDataAccessRef data_access; + RGWDataAccess::BucketRef bucket; public: @@ -270,7 +308,14 @@ public: << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; } if (retcode == 0) { - sub->bucket_info = &sub->get_bucket_info_result->bucket_info; + auto& result = sub->get_bucket_info_result; + sub->bucket_info = &result->bucket_info; + + int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); + if (ret < 0) { + ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; + return set_cr_error(ret); + } return set_cr_done(); } @@ -299,6 +344,40 @@ ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_s } }; + class StoreEventCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSSubscriptionRef sub; + PSEvent pse; + PSConfigRef& conf; + PSSubConfigRef& sub_conf; + rgw_object_simple_put_params put_obj; + int i; + public: + StoreEventCR(RGWDataSyncEnv *_sync_env, + PSSubscriptionRef& _sub, + EventRef& _event) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + sub(_sub), + pse(_event), + conf(sub->env->conf), + sub_conf(sub->sub_conf) { + } + + int operate() override { + reenter(this) { + + put_obj.bucket = sub->bucket; + put_obj.key = rgw_obj_key(pse.generate_message_id()); + + pse.format(&put_obj.data); + + yield call(new RGWObjectSimplePutCR(sync_env->async_rados, + sync_env->store, + put_obj)); + } + return 0; + } + }; public: PSSubscription(RGWDataSyncEnv *_sync_env, @@ -312,7 +391,9 @@ public: return new InitCR(sync_env, this); } - RGWCoroutine *store_event_cr(EventRef event); + RGWCoroutine *store_event_cr(EventRef& event) { + return new StoreEventCR(sync_env, this, event); + } friend class InitCR; }; @@ -320,12 +401,6 @@ public: using PSSubscriptionRef = std::shared_ptr; -RGWCoroutine *PSSubscription::store_event_cr(EventRef event) -{ - /* FIXME */ -} - - class PSManager : public RefCountedObject { RGWDataSyncEnv *sync_env;