From: Xavi Hernandez Date: Mon, 17 Feb 2025 09:50:37 +0000 (+0000) Subject: libcephfs_proxy: implement asynchronous callbacks X-Git-Tag: v20.3.0~362^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a49069748f0446aa4abbdd26c4d9de19eba1e2f1;p=ceph.git libcephfs_proxy: implement asynchronous callbacks To handle asynchronous callbacks from the daemon to a client, each client will negotiate the enablement of the ASYNC_CBK feature. This will create a new thread on the client side that will listen to callbacks sent by the daemon. Signed-off-by: Xavi Hernandez --- diff --git a/src/libcephfs_proxy/CMakeLists.txt b/src/libcephfs_proxy/CMakeLists.txt index e19841241e7e..83de8bdf9fcb 100644 --- a/src/libcephfs_proxy/CMakeLists.txt +++ b/src/libcephfs_proxy/CMakeLists.txt @@ -1,4 +1,4 @@ -set(proxy_common_srcs proxy_link.c proxy_log.c) +set(proxy_common_srcs proxy_link.c proxy_async.c proxy_log.c) set(libcephfsd_srcs libcephfsd.c proxy_manager.c proxy_mount.c proxy_helpers.c ${proxy_common_srcs}) set(libcephfs_proxy_srcs libcephfs_proxy.c ${proxy_common_srcs}) diff --git a/src/libcephfs_proxy/proxy.h b/src/libcephfs_proxy/proxy.h index 89033d4d548e..162eec81b183 100644 --- a/src/libcephfs_proxy/proxy.h +++ b/src/libcephfs_proxy/proxy.h @@ -57,6 +57,9 @@ typedef struct _proxy_link proxy_link_t; struct _proxy_link_negotiate; typedef struct _proxy_link_negotiate proxy_link_negotiate_t; +struct _proxy_async; +typedef struct _proxy_async proxy_async_t; + typedef int32_t (*proxy_output_write_t)(proxy_output_t *); typedef int32_t (*proxy_output_full_t)(proxy_output_t *); diff --git a/src/libcephfs_proxy/proxy_async.c b/src/libcephfs_proxy/proxy_async.c new file mode 100644 index 000000000000..fad6370dd094 --- /dev/null +++ b/src/libcephfs_proxy/proxy_async.c @@ -0,0 +1,124 @@ + +#include + +#include "include/cephfs/libcephfs.h" + +#include "proxy_async.h" +#include "proxy_requests.h" +#include "proxy_log.h" + +typedef int32_t (*proxy_cbk_handler_t)(proxy_async_t *, proxy_cbk_t *, void *, + uint32_t); + +static proxy_cbk_handler_t libcephfsd_cbk_handlers[LIBCEPHFSD_CBK_TOTAL_OPS] = { +}; + +static void * +proxy_async_worker(void *arg) +{ + proxy_cbk_t cbk; + proxy_async_t *async; + struct iovec iov[2]; + int32_t err; + + async = arg; + + while (true) { + iov[0].iov_base = &cbk; + iov[0].iov_len = sizeof(cbk); + iov[1].iov_base = NULL; + iov[1].iov_len = 0; + + err = proxy_link_req_recv(async->fd, iov, 2); + if (err <= 0) { + break; + } + + if (cbk.header.op >= LIBCEPHFSD_CBK_TOTAL_OPS) { + proxy_log(LOG_ERR, ENOSYS, + "Unknown asynchronous callback"); + } else if (libcephfsd_cbk_handlers[cbk.header.op] == NULL) { + proxy_log(LOG_ERR, EOPNOTSUPP, + "Unsupported asynchronous callback"); + } else { + err = libcephfsd_cbk_handlers[cbk.header.op]( + async, &cbk, iov[1].iov_base, iov[1].iov_len); + } + + if (iov[1].iov_base != NULL) { + proxy_free(iov[1].iov_base); + } + + if (err < 0) { + break; + } + } + + if (err < 0) { + proxy_log(LOG_ERR, -err, "Asynchronous worker found an error"); + + /* Force disconnection from main connection. */ + proxy_link_close(async->link); + } + + proxy_log(LOG_INFO, 0, "Asynchronous worker stopped"); + + return NULL; +} + +int32_t proxy_async_client(proxy_async_t *async, proxy_link_t *link, int32_t sd) +{ + int32_t err, data, fd, size; + + size = sizeof(fd); + err = proxy_link_ctrl_recv(sd, &data, sizeof(data), SCM_RIGHTS, &fd, + &size); + if (err < 0) { + return err; + } + if (size < sizeof(fd)) { + return proxy_log(LOG_ERR, ENODATA, + "Failed to receive the asynchronous socket"); + } + + random_init(&async->random); + async->fd = fd; + async->link = link; + + err = pthread_create(&async->thread, NULL, proxy_async_worker, async); + if (err < 0) { + close(fd); + return proxy_log(LOG_ERR, err, + "Failed to create asynchronous worker"); + } + + return 0; +} + +int32_t proxy_async_server(proxy_async_t *async, int32_t sd) +{ + int32_t fds[2]; + int32_t err, data; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + return proxy_log(LOG_ERR, errno, + "Failed to create a socket pair"); + } + + data = 0; + err = proxy_link_ctrl_send(sd, &data, sizeof(data), SCM_RIGHTS, fds, + sizeof(int32_t)); + close(fds[0]); + if (err < 0) { + goto failed; + } + + async->fd = fds[1]; + + return 0; + +failed: + close(fds[1]); + + return err; +} diff --git a/src/libcephfs_proxy/proxy_async.h b/src/libcephfs_proxy/proxy_async.h new file mode 100644 index 000000000000..b38c9e3f1e25 --- /dev/null +++ b/src/libcephfs_proxy/proxy_async.h @@ -0,0 +1,23 @@ + +#ifndef __LIBCEPHFS_PROXY_ASYNC_H__ +#define __LIBCEPHFS_PROXY_ASYNC_H__ + +#include "proxy.h" +#include "proxy_link.h" +#include "proxy_helpers.h" + +#include + +struct _proxy_async { + pthread_t thread; + proxy_random_t random; + proxy_link_t *link; + int32_t fd; +}; + +int32_t proxy_async_client(proxy_async_t *async, proxy_link_t *link, + int32_t sd); + +int32_t proxy_async_server(proxy_async_t *async, int32_t sd); + +#endif /* __LIBCEPHFS_PROXY_ASYNC_H__ */ diff --git a/src/libcephfs_proxy/proxy_requests.h b/src/libcephfs_proxy/proxy_requests.h index f1f72f5b15a5..a9bf3524aa77 100644 --- a/src/libcephfs_proxy/proxy_requests.h +++ b/src/libcephfs_proxy/proxy_requests.h @@ -106,9 +106,19 @@ enum { LIBCEPHFSD_OP_LL_RELEASEDIR, LIBCEPHFSD_OP_MOUNT_PERMS, + /* Add more operations above this comment. */ + LIBCEPHFSD_OP_TOTAL_OPS }; +enum { + LIBCEPHFSD_CBK_NULL = 0, + + /* Add more callbacks above this comment. */ + + LIBCEPHFSD_CBK_TOTAL_OPS +}; + #define CEPH_TYPE_REQ(_name, _fields...) \ struct _proxy_##_name##_req; \ typedef struct _proxy_##_name##_req proxy_##_name##_req_t; \ @@ -338,4 +348,8 @@ typedef union _proxy_req { proxy_ceph_mount_perms_req_t mount_perms; } proxy_req_t; +typedef union _proxy_cbk { + proxy_link_req_t header; +} proxy_cbk_t; + #endif