]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub: message storage functionality
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 30 Jun 2018 03:37:35 +0000 (20:37 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 725d098e667aeb20c2eea41b0cf5d025f635bd8f..a6e9aaf103811843c62fb5d73be682ea915ade36 100644 (file)
@@ -228,6 +228,43 @@ struct PSEnv {
 
 using PSEnvRef = std::shared_ptr<PSEnv>;
 
+class PSEvent {
+  EventRef event;
+
+public:
+  PSEvent(EventRef& _event) : event(_event) {}
+
+  string generate_message_id() {
+    char buf[64];
+    utime_t ts(event->timestamp);
+
+    string etag;
+    RGWMD5Etag hash;
+    hash.update(event->bucket.bucket_id);
+    hash.update(event->key.name);
+    hash.update(event->key.instance);
+    hash.finish(&etag);
+
+    assert(etag.size() > 8);
+
+    etag = etag.substr(0, 8);
+    snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), etag.c_str());
+
+    return buf;
+  }
+
+  void format(bufferlist *bl) {
+    stringstream ss;
+    JSONFormatter f;
+
+    encode_json("event", *event, &f);
+    f.flush(ss);
+
+    bl->append(ss.str());
+  }
+        
+};
+
 class PSSubscription : public RefCountedObject {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
@@ -235,6 +272,7 @@ class PSSubscription : public RefCountedObject {
   shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
   RGWBucketInfo *bucket_info{nullptr};
   RGWDataAccessRef data_access;
+  RGWDataAccess::BucketRef bucket;
 
 public:
 
@@ -270,7 +308,14 @@ public:
               << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
           }
           if (retcode == 0) {
-            sub->bucket_info = &sub->get_bucket_info_result->bucket_info;
+            auto& result = sub->get_bucket_info_result;
+            sub->bucket_info = &result->bucket_info;
+
+            int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
+            if (ret < 0) {
+              ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
+              return set_cr_error(ret);
+            }
             return set_cr_done();
           }
 
@@ -299,6 +344,40 @@ ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_s
     }
   };
 
+  class StoreEventCR : public RGWCoroutine {
+    RGWDataSyncEnv *sync_env;
+    PSSubscriptionRef sub;
+    PSEvent pse;
+    PSConfigRef& conf;
+    PSSubConfigRef& sub_conf;
+    rgw_object_simple_put_params put_obj;
+    int i;
+  public:
+    StoreEventCR(RGWDataSyncEnv *_sync_env,
+                 PSSubscriptionRef& _sub,
+                 EventRef& _event) : RGWCoroutine(_sync_env->cct),
+                                     sync_env(_sync_env),
+                                     sub(_sub),
+                                     pse(_event),
+                                     conf(sub->env->conf),
+                                     sub_conf(sub->sub_conf) {
+    }
+
+    int operate() override {
+      reenter(this) {
+
+        put_obj.bucket = sub->bucket;
+        put_obj.key = rgw_obj_key(pse.generate_message_id());
+
+        pse.format(&put_obj.data);
+        
+        yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
+                                            sync_env->store,
+                                            put_obj));
+      }
+      return 0;
+    }
+  };
 
 public:
   PSSubscription(RGWDataSyncEnv *_sync_env,
@@ -312,7 +391,9 @@ public:
     return new InitCR(sync_env, this);
   }
 
-  RGWCoroutine *store_event_cr(EventRef event);
+  RGWCoroutine *store_event_cr(EventRef& event) {
+    return new StoreEventCR(sync_env, this, event);
+  }
 
   friend class InitCR;
 };
@@ -320,12 +401,6 @@ public:
 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
 
 
-RGWCoroutine *PSSubscription::store_event_cr(EventRef event)
-{
-  /* FIXME */
-}
-
-
 class PSManager : public RefCountedObject
 {
   RGWDataSyncEnv *sync_env;