]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
libcephfs_proxy: implement asynchronous callbacks
authorXavi Hernandez <xhernandez@gmail.com>
Mon, 17 Feb 2025 09:50:37 +0000 (09:50 +0000)
committerXavi Hernandez <xhernandez@gmail.com>
Tue, 18 Feb 2025 14:20:22 +0000 (15:20 +0100)
To handle asynchronous callbacks from the daemon to a client, each
client will negotiate the enablement of the ASYNC_CBK feature. This will
create a new thread on the client side that will listen to callbacks
sent by the daemon.

Signed-off-by: Xavi Hernandez <xhernandez@gmail.com>
src/libcephfs_proxy/CMakeLists.txt
src/libcephfs_proxy/proxy.h
src/libcephfs_proxy/proxy_async.c [new file with mode: 0644]
src/libcephfs_proxy/proxy_async.h [new file with mode: 0644]
src/libcephfs_proxy/proxy_requests.h

index e19841241e7e0495f5595b767993baf17aa382d0..83de8bdf9fcb0da0bdf5e6126f10b1988c3d5b19 100644 (file)
@@ -1,4 +1,4 @@
-set(proxy_common_srcs proxy_link.c proxy_log.c)
+set(proxy_common_srcs proxy_link.c proxy_async.c proxy_log.c)
 set(libcephfsd_srcs libcephfsd.c proxy_manager.c proxy_mount.c proxy_helpers.c ${proxy_common_srcs})
 set(libcephfs_proxy_srcs libcephfs_proxy.c ${proxy_common_srcs})
 
index 89033d4d548e60b9b8623d54f59632a48ed534c7..162eec81b183d7d975399678dec1c8dfbbd46ea5 100644 (file)
@@ -57,6 +57,9 @@ typedef struct _proxy_link proxy_link_t;
 struct _proxy_link_negotiate;
 typedef struct _proxy_link_negotiate proxy_link_negotiate_t;
 
+struct _proxy_async;
+typedef struct _proxy_async proxy_async_t;
+
 typedef int32_t (*proxy_output_write_t)(proxy_output_t *);
 typedef int32_t (*proxy_output_full_t)(proxy_output_t *);
 
diff --git a/src/libcephfs_proxy/proxy_async.c b/src/libcephfs_proxy/proxy_async.c
new file mode 100644 (file)
index 0000000..fad6370
--- /dev/null
@@ -0,0 +1,124 @@
+
+#include <unistd.h>
+
+#include "include/cephfs/libcephfs.h"
+
+#include "proxy_async.h"
+#include "proxy_requests.h"
+#include "proxy_log.h"
+
+typedef int32_t (*proxy_cbk_handler_t)(proxy_async_t *, proxy_cbk_t *, void *,
+                                      uint32_t);
+
+static proxy_cbk_handler_t libcephfsd_cbk_handlers[LIBCEPHFSD_CBK_TOTAL_OPS] = {
+};
+
+static void *
+proxy_async_worker(void *arg)
+{
+       proxy_cbk_t cbk;
+       proxy_async_t *async;
+       struct iovec iov[2];
+       int32_t err;
+
+       async = arg;
+
+       while (true) {
+               iov[0].iov_base = &cbk;
+               iov[0].iov_len = sizeof(cbk);
+               iov[1].iov_base = NULL;
+               iov[1].iov_len = 0;
+
+               err = proxy_link_req_recv(async->fd, iov, 2);
+               if (err <= 0) {
+                       break;
+               }
+
+               if (cbk.header.op >= LIBCEPHFSD_CBK_TOTAL_OPS) {
+                       proxy_log(LOG_ERR, ENOSYS,
+                                 "Unknown asynchronous callback");
+               } else if (libcephfsd_cbk_handlers[cbk.header.op] == NULL) {
+                       proxy_log(LOG_ERR, EOPNOTSUPP,
+                                 "Unsupported asynchronous callback");
+               } else {
+                       err = libcephfsd_cbk_handlers[cbk.header.op](
+                               async, &cbk, iov[1].iov_base, iov[1].iov_len);
+               }
+
+               if (iov[1].iov_base != NULL) {
+                       proxy_free(iov[1].iov_base);
+               }
+
+               if (err < 0) {
+                       break;
+               }
+       }
+
+       if (err < 0) {
+               proxy_log(LOG_ERR, -err, "Asynchronous worker found an error");
+
+               /* Force disconnection from main connection. */
+               proxy_link_close(async->link);
+       }
+
+       proxy_log(LOG_INFO, 0, "Asynchronous worker stopped");
+
+       return NULL;
+}
+
+int32_t proxy_async_client(proxy_async_t *async, proxy_link_t *link, int32_t sd)
+{
+       int32_t err, data, fd, size;
+
+       size = sizeof(fd);
+       err = proxy_link_ctrl_recv(sd, &data, sizeof(data), SCM_RIGHTS, &fd,
+                                  &size);
+       if (err < 0) {
+               return err;
+       }
+        if (size < sizeof(fd)) {
+                return proxy_log(LOG_ERR, ENODATA,
+                                 "Failed to receive the asynchronous socket");
+        }
+
+       random_init(&async->random);
+       async->fd = fd;
+        async->link = link;
+
+       err = pthread_create(&async->thread, NULL, proxy_async_worker, async);
+       if (err < 0) {
+               close(fd);
+               return proxy_log(LOG_ERR, err,
+                                "Failed to create asynchronous worker");
+       }
+
+       return 0;
+}
+
+int32_t proxy_async_server(proxy_async_t *async, int32_t sd)
+{
+       int32_t fds[2];
+       int32_t err, data;
+
+       if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
+               return proxy_log(LOG_ERR, errno,
+                                "Failed to create a socket pair");
+       }
+
+       data = 0;
+       err = proxy_link_ctrl_send(sd, &data, sizeof(data), SCM_RIGHTS, fds,
+                                  sizeof(int32_t));
+       close(fds[0]);
+       if (err < 0) {
+               goto failed;
+       }
+
+       async->fd = fds[1];
+
+       return 0;
+
+failed:
+       close(fds[1]);
+
+       return err;
+}
diff --git a/src/libcephfs_proxy/proxy_async.h b/src/libcephfs_proxy/proxy_async.h
new file mode 100644 (file)
index 0000000..b38c9e3
--- /dev/null
@@ -0,0 +1,23 @@
+
+#ifndef __LIBCEPHFS_PROXY_ASYNC_H__
+#define __LIBCEPHFS_PROXY_ASYNC_H__
+
+#include "proxy.h"
+#include "proxy_link.h"
+#include "proxy_helpers.h"
+
+#include <pthread.h>
+
+struct _proxy_async {
+       pthread_t thread;
+       proxy_random_t random;
+       proxy_link_t *link;
+       int32_t fd;
+};
+
+int32_t proxy_async_client(proxy_async_t *async, proxy_link_t *link,
+                          int32_t sd);
+
+int32_t proxy_async_server(proxy_async_t *async, int32_t sd);
+
+#endif /* __LIBCEPHFS_PROXY_ASYNC_H__ */
index f1f72f5b15a51fb5b67e075fed33fce0d572145e..a9bf3524aa77240a5c5f8c801bf7064296399398 100644 (file)
@@ -106,9 +106,19 @@ enum {
        LIBCEPHFSD_OP_LL_RELEASEDIR,
        LIBCEPHFSD_OP_MOUNT_PERMS,
 
+       /* Add more operations above this comment. */
+
        LIBCEPHFSD_OP_TOTAL_OPS
 };
 
+enum {
+       LIBCEPHFSD_CBK_NULL = 0,
+
+       /* Add more callbacks above this comment. */
+
+       LIBCEPHFSD_CBK_TOTAL_OPS
+};
+
 #define CEPH_TYPE_REQ(_name, _fields...)                           \
        struct _proxy_##_name##_req;                               \
        typedef struct _proxy_##_name##_req proxy_##_name##_req_t; \
@@ -338,4 +348,8 @@ typedef union _proxy_req {
        proxy_ceph_mount_perms_req_t mount_perms;
 } proxy_req_t;
 
+typedef union _proxy_cbk {
+       proxy_link_req_t header;
+} proxy_cbk_t;
+
 #endif