From ea414bd2ea31e92e1f30bbc2d9959ef7a84e5420 Mon Sep 17 00:00:00 2001 From: sajibreadd Date: Wed, 14 May 2025 09:12:00 +0200 Subject: [PATCH] mirror-notification --- CMakeLists.txt | 1 + src/client/Client.cc | 85 +- src/client/Client.h | 14 + src/common/ceph_fs.cc | 5 - src/common/options/cephfs-mirror.yaml.in | 61 + src/common/options/mds.yaml.in | 62 + src/include/ceph_fs.h | 29 +- src/include/cephfs/libcephfs.h | 55 + src/include/config-h.in.cmake | 3 + src/libcephfs.cc | 40 + src/mds/CMakeLists.txt | 28 +- src/mds/Locker.cc | 228 ++- src/mds/MDSDaemon.cc | 41 + src/mds/MDSKafka.cc | 764 +++++++ src/mds/MDSKafka.h | 146 ++ src/mds/MDSNotificationManager.cc | 293 +++ src/mds/MDSNotificationManager.h | 65 + src/mds/MDSNotificationMessage.cc | 78 + src/mds/MDSNotificationMessage.h | 23 + src/mds/MDSRank.cc | 62 + src/mds/MDSRank.h | 5 + src/mds/MDSUDPEndpoint.cc | 257 +++ src/mds/MDSUDPEndpoint.h | 67 + src/mds/Server.cc | 203 +- src/mds/Server.h | 16 + src/messages/MClientRequest.h | 108 + src/messages/MNotificationInfoKafkaTopic.h | 80 + src/messages/MNotificationInfoUDPEndpoint.h | 60 + src/msg/Message.cc | 15 + src/msg/Message.h | 5 + src/test/libcephfs/CMakeLists.txt | 1 + src/test/libcephfs/client_cache.cc | 314 +++ src/tools/ceph-dencoder/common_types.h | 2 + src/tools/cephfs_mirror/FSMirror.cc | 2 +- src/tools/cephfs_mirror/PeerReplayer.cc | 2038 +++++++++++++------ src/tools/cephfs_mirror/PeerReplayer.h | 767 ++++++- 36 files changed, 5240 insertions(+), 783 deletions(-) create mode 100644 src/mds/MDSKafka.cc create mode 100644 src/mds/MDSKafka.h create mode 100644 src/mds/MDSNotificationManager.cc create mode 100644 src/mds/MDSNotificationManager.h create mode 100644 src/mds/MDSNotificationMessage.cc create mode 100644 src/mds/MDSNotificationMessage.h create mode 100644 src/mds/MDSUDPEndpoint.cc create mode 100644 src/mds/MDSUDPEndpoint.h create mode 100644 src/messages/MNotificationInfoKafkaTopic.h create mode 100644 src/messages/MNotificationInfoUDPEndpoint.h create mode 100644 src/test/libcephfs/client_cache.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index fbca3df54de84..da2c62e2758d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -544,6 +544,7 @@ endif (WITH_RADOSGW) #option for CephFS option(WITH_CEPHFS "CephFS is enabled" ON) +option(WITH_CEPHFS_NOTIFICATION "CephFS notification is disabled" OFF) if(NOT WIN32) # Please specify 3.[0-7] if you want to build with a certain version of python3. diff --git a/src/client/Client.cc b/src/client/Client.cc index 67a1f65b4f8ab..210895d409c27 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1695,6 +1695,10 @@ mds_rank_t Client::choose_target_mds(MetaRequest *req, Inode** phash_diri) Inode *in = NULL; Dentry *de = NULL; + bool is_notification_op = (req->get_op() == CEPH_MDS_OP_ADD_KAFKA_TOPIC || + req->get_op() == CEPH_MDS_OP_REMOVE_KAFKA_TOPIC || + req->get_op() == CEPH_MDS_OP_ADD_UDP_ENDPOINT || + req->get_op() == CEPH_MDS_OP_REMOVE_UDP_ENDPOINT); if (req->resend_mds >= 0) { mds = req->resend_mds; @@ -1705,6 +1709,10 @@ mds_rank_t Client::choose_target_mds(MetaRequest *req, Inode** phash_diri) if (cct->_conf->client_use_random_mds) goto random_mds; + if (is_notification_op) { + mds = 0; + } + in = req->inode(); de = req->dentry(); if (in) { @@ -10386,8 +10394,17 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp, // success? if (result >= 0) { - if (fhp) + if (fhp) { *fhp = _create_fh(in, flags, cmode, perms); + // ceph_flags_sys2wire/ceph_flags_to_mode() calls above transforms O_DIRECTORY flag + // into CEPH_FILE_MODE_PIN mode. Although this mode is used at server size + // we [ab]use it here to determine whether we should pin inode to prevent from + // undesired cache eviction. + if (cmode == CEPH_FILE_MODE_PIN) { + ldout(cct, 20) << " pinning ll_get() call for " << *in << dendl; + _ll_get(in); + } + } } else { in->put_open_ref(cmode); } @@ -10444,6 +10461,10 @@ int Client::_close(int fd) Fh *fh = get_filehandle(fd); if (!fh) return -CEPHFS_EBADF; + if (fh->mode == CEPH_FILE_MODE_PIN) { + ldout(cct, 20) << " unpinning ll_put() call for " << *(fh->inode.get()) << dendl; + _ll_put(fh->inode.get(), 1); + } int err = _release_fh(fh); fd_map.erase(fd); put_fd(fd); @@ -17445,3 +17466,65 @@ void StandaloneClient::shutdown() objecter->shutdown(); monclient->shutdown(); } + +#ifdef WITH_CEPHFS_NOTIFICATION +// notifications +int Client::add_kafka_topic(const char *topic_name, const char *endpoint_name, + const char *broker, bool use_ssl, const char *user, + const char *password, const char *ca_location, + const char *mechanism, const UserPerm &perm) { + MetaRequest *req = new MetaRequest(CEPH_MDS_OP_ADD_KAFKA_TOPIC); + + KafkaTopicPayload payload(topic_name, endpoint_name, broker, use_ssl, + (user == nullptr ? "" : user), + (password == nullptr ? "" : password), + (ca_location == nullptr || strlen(ca_location) == 0) + ? std::optional(std::nullopt) + : ca_location, + (mechanism == nullptr || strlen(mechanism) == 0) + ? std::optional(std::nullopt) + : mechanism); + bufferlist bl; + encode(payload, bl); + req->set_data(bl); + std::scoped_lock lock(client_lock); + int res = make_request(req, perm); + return res; +} + +int Client::remove_kafka_topic(const char *topic_name, + const char *endpoint_name, + const UserPerm &perm) { + MetaRequest *req = new MetaRequest(CEPH_MDS_OP_REMOVE_KAFKA_TOPIC); + KafkaTopicPayload payload(topic_name, endpoint_name); + bufferlist bl; + encode(payload, bl); + req->set_data(bl); + std::scoped_lock lock(client_lock); + int res = make_request(req, perm); + return res; +} + +int Client::add_udp_endpoint(const char* name, const char* ip, + int port, const UserPerm &perm) { + MetaRequest *req = new MetaRequest(CEPH_MDS_OP_ADD_UDP_ENDPOINT); + UDPEndpointPayload payload(name, ip, port); + bufferlist bl; + encode(payload, bl); + req->set_data(bl); + std::scoped_lock lock(client_lock); + int res = make_request(req, perm); + return res; +} + +int Client::remove_udp_endpoint(const char* name, const UserPerm &perm) { + MetaRequest *req = new MetaRequest(CEPH_MDS_OP_REMOVE_UDP_ENDPOINT); + UDPEndpointPayload payload(name); + bufferlist bl; + encode(payload, bl); + req->set_data(bl); + std::scoped_lock lock(client_lock); + int res = make_request(req, perm); + return res; +} +#endif diff --git a/src/client/Client.h b/src/client/Client.h index 765b8d3cafe23..18105f3610d7b 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -219,6 +219,7 @@ struct dir_result_t { ordered_count = 0; cache_index = 0; buffer.clear(); + fd = -1; } InodeRef inode; @@ -400,6 +401,19 @@ public: int unlinkat(int dirfd, const char *relpath, int flags, const UserPerm& perm); int rename(const char *from, const char *to, const UserPerm& perm, std::string alternate_name=""); + #ifdef WITH_CEPHFS_NOTIFICATION + // notifications + int add_kafka_topic(const char *topic_name, const char *endpoint_name, + const char *broker, bool use_ssl, const char *user, + const char *password, const char *ca_location, + const char *mechanism, const UserPerm &perm); + int remove_kafka_topic(const char *topic_name, const char *endpoint_name, + const UserPerm &perm); + int add_udp_endpoint(const char *name, const char *ip, int port, + const UserPerm &perm); + int remove_udp_endpoint(const char *name, const UserPerm &perm); +#endif + // dirs int mkdir(const char *path, mode_t mode, const UserPerm& perm, std::string alternate_name=""); int mkdirat(int dirfd, const char *relpath, mode_t mode, const UserPerm& perm, diff --git a/src/common/ceph_fs.cc b/src/common/ceph_fs.cc index 380b401df301a..588887c951aa9 100644 --- a/src/common/ceph_fs.cc +++ b/src/common/ceph_fs.cc @@ -77,13 +77,8 @@ int ceph_flags_sys2wire(int flags) ceph_sys2wire(O_EXCL); ceph_sys2wire(O_TRUNC); - #ifndef _WIN32 ceph_sys2wire(O_DIRECTORY); ceph_sys2wire(O_NOFOLLOW); - // In some cases, FILE_FLAG_BACKUP_SEMANTICS may be used instead - // of O_DIRECTORY. We may need some workarounds in order to handle - // the fact that those flags are not available on Windows. - #endif #undef ceph_sys2wire diff --git a/src/common/options/cephfs-mirror.yaml.in b/src/common/options/cephfs-mirror.yaml.in index f826161872b88..284dc0be908d6 100644 --- a/src/common/options/cephfs-mirror.yaml.in +++ b/src/common/options/cephfs-mirror.yaml.in @@ -103,3 +103,64 @@ options: - cephfs-mirror min: 0 max: 11 +- name: cephfs_mirror_max_concurrent_file_transfer + type: uint + level: advanced + desc: number of threads running for file transfers per peer + long_desc: number of threads running for file transfers per peer. + Each thread is responsible for transfering a single file. + default: 3 + services: + - cephfs-mirror + min: 1 + with_legacy: true +- name: cephfs_mirror_threads_per_sync + type: uint + level: advanced + desc: number of threads running for concurrently scanning directory + long_desc: number of threads running associated for each sync. + Each thread is responsible for several libcephfs operations. + default: 3 + services: + - cephfs-mirror + min: 1 + with_legacy: true +- name: cephfs_mirror_remote_diff_base_upon_start + type: bool + level: advanced + desc: take remote cluster as diff base for syncing upon restart + long_desc: take remote cluster as diff base for syncing upon restart + default: true + services: + - cephfs-mirror +- name: cephfs_mirror_sync_latest_snapshot + type: bool + level: advanced + desc: take lates snapshot for syncing + long_desc: take lates snapshot for syncing + default: true + services: + - cephfs-mirror + with_legacy: true +- name: cephfs_mirror_thread_pool_queue_size + type: uint + level: advanced + desc: number of threads running for concurrently scanning directory + long_desc: number of threads running associated for each sync. + Each thread is responsible for several libcephfs operations. + default: 5000 + services: + - cephfs-mirror + min: 3 + with_legacy: true +- name: cephfs_mirror_max_element_in_cache_per_thread + type: uint + level: advanced + desc: number of threads running for concurrently scanning directory + long_desc: number of threads running associated for each sync. + Each thread is responsible for several libcephfs operations. + default: 1000000 + services: + - cephfs-mirror + min: 0 + with_legacy: true diff --git a/src/common/options/mds.yaml.in b/src/common/options/mds.yaml.in index 0dc0b63faf2a0..3792736b6cbc3 100644 --- a/src/common/options/mds.yaml.in +++ b/src/common/options/mds.yaml.in @@ -1679,3 +1679,65 @@ options: - mds flags: - runtime +- name: mds_allow_notification_secrets_in_cleartext + type: bool + level: advanced + desc: Allows sending secrets (e.g. passwords) over non encrypted HTTP messages. + long_desc: When notification endpoint require secrets (e.g. passwords), + we allow the topic creation. This parameter can be set to "true" to bypass + this check. Use this only if mds is on a trusted private network, and + the message broker cannot be configured without password authentication. + Otherwise, this will leak the credentials of your message broker and + compromise its security. + default: false + services: + - mds +- name: mds_kafka_sleep_timeout + type: uint + level: advanced + desc: Time in milliseconds to sleep while polling for kafka replies + long_desc: This will be used to prevent busy waiting for the kafka replies + As well as for the cases where the broker is down and we try to reconnect. + The same values times 3 will be used to sleep if there were no messages + sent or received across all kafka connections + default: 100 + services: + - mds + with_legacy: true +- name: mds_kafka_message_timeout + type: uint + level: advanced + desc: This is the maximum time in milliseconds to deliver a message (including retries) + long_desc: Delivery error occurs when the message timeout is exceeded. + Value must be greater than zero, if set to zero, a value of 1 millisecond will be used. + default: 5000 + services: + - mds +- name: mds_kafka_unacked_message_threshold + type: uint + level: advanced + desc: Maximum number of unacked message stored in memory. + long_desc: If count of unacked message grows than this number then MDSKakfa will start + polling to fetch delivery report. + default: 5000 + services: + - mds + with_legacy: true +- name: mds_notification_dir_mask + type: uint + level: advanced + desc: This indicates which events are subscribed to for any operations on directories + long_desc: There are certain events mentioned. This mask represents OR of those event flag. + which filter out events for directories. + default: 1048575 + services: + - mds +- name: mds_notification_file_mask + type: uint + level: advanced + desc: This indicates which events are subscribed to for any operations on files + long_desc: There are certain events mentioned. This mask represents OR of those event flag. + which filter out events for files. + default: 1048575 + services: + - mds diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 57eb18b0d3e5a..9459741f2452b 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -427,7 +427,11 @@ enum { CEPH_MDS_OP_RMSNAP = 0x01401, CEPH_MDS_OP_LSSNAP = 0x00402, CEPH_MDS_OP_RENAMESNAP = 0x01403, - CEPH_MDS_OP_READDIR_SNAPDIFF = 0x01404, + CEPH_MDS_OP_READDIR_SNAPDIFF = 0x01404, + CEPH_MDS_OP_ADD_KAFKA_TOPIC = 0x01405, + CEPH_MDS_OP_REMOVE_KAFKA_TOPIC = 0x01406, + CEPH_MDS_OP_ADD_UDP_ENDPOINT = 0x01407, + CEPH_MDS_OP_REMOVE_UDP_ENDPOINT = 0x01408, // internal op CEPH_MDS_OP_FRAGMENTDIR= 0x01500, @@ -442,6 +446,29 @@ enum { CEPH_MDS_OP_LOCK_PATH = 0x0150a, }; +enum { + CEPH_MDS_NOTIFY_ACCESS = 0x0000000000000001, + CEPH_MDS_NOTIFY_SET_ATTRIB = 0x0000000000000002, + CEPH_MDS_NOTIFY_CLOSE_WRITE = 0x0000000000000004, + CEPH_MDS_NOTIFY_CLOSE_NOWRITE = 0x0000000000000008, + CEPH_MDS_NOTIFY_CREATE = 0x0000000000000010, + CEPH_MDS_NOTIFY_DELETE = 0x0000000000000020, + CEPH_MDS_NOTIFY_DELETE_SELF = 0x0000000000000040, + CEPH_MDS_NOTIFY_MODIFY = 0x0000000000000080, + CEPH_MDS_NOTIFY_MOVE_SELF = 0x0000000000000100, + CEPH_MDS_NOTIFY_MOVED_FROM = 0x0000000000000200, + CEPH_MDS_NOTIFY_MOVED_TO = 0x0000000000000400, + CEPH_MDS_NOTIFY_OPEN = 0x0000000000000800, + CEPH_MDS_NOTIFY_CLOSE = 0x0000000000001000, + CEPH_MDS_NOTIFY_MOVE = 0x0000000000002000, + CEPH_MDS_NOTIFY_ONESHOT = 0x0000000000004000, + CEPH_MDS_NOTIFY_IGNORED = 0x0000000000008000, + CEPH_MDS_NOTIFY_SET_LAYOUT = 0x0000000000010000, + CEPH_MDS_NOTIFY_SET_XATTRIB = 0x0000000000020000, + CEPH_MDS_NOTIFY_REM_XATTRIB = 0x0000000000040000, + CEPH_MDS_NOTIFY_ONLYDIR = 0x0000000000080000 +}; + #define IS_CEPH_MDS_OP_NEWINODE(op) (op == CEPH_MDS_OP_CREATE || \ op == CEPH_MDS_OP_MKNOD || \ op == CEPH_MDS_OP_MKDIR || \ diff --git a/src/include/cephfs/libcephfs.h b/src/include/cephfs/libcephfs.h index ba0b76e072b57..c50a3a0d744cd 100644 --- a/src/include/cephfs/libcephfs.h +++ b/src/include/cephfs/libcephfs.h @@ -731,6 +731,61 @@ int64_t ceph_telldir(struct ceph_mount_info *cmount, struct ceph_dir_result *dir */ void ceph_seekdir(struct ceph_mount_info *cmount, struct ceph_dir_result *dirp, int64_t offset); +#ifdef WITH_CEPHFS_NOTIFICATION +/** + * Create/replace kafka topic for notification. + * + * @param cmount the ceph mount handle. + * @param topic_name kafka topic name to create. + * @param endpoint_name kafka endpoint name where the topic will be added. + * @param broker address of kafka endpoint. + * @param use_ssl ssl authentication required or not. + * @param user username + * @param password password for authentication + * @param ca_location a trusted entity that issues SSL certificates + * @param mechanism user to specify which of these Kafka SASL mechanisms to + * use when connecting to a Kafka broker that requires authentication. + * @returns 0 on success or a negative return code on error. + */ +int ceph_add_kafka_topic(struct ceph_mount_info *cmount, const char *topic_name, + const char *endpoint_name, const char *broker, + bool use_ssl, const char *user, const char *password, + const char *ca_location, const char *mechanism); + +/** + * Remove kafka topic. + * + * @param cmount the ceph mount handle. + * @param topic_name kafka topic name to remove. + * @param endpoint_name kafka endpoint name from where the topic will be removed. + * @returns 0 on success or a negative return code on error. + */ +int ceph_remove_kafka_topic(struct ceph_mount_info *cmount, + const char *topic_name, const char *endpoint_name); + +/** + * Create/replace a udp endpoint. + * + * @param cmount the ceph mount handle. + * @param name udp endpoint name to create. + * @param ip ip address of udp endpoint. + * @param port port to connect for udp endpoint. + * @returns 0 on success or a negative return code on error. + */ +int ceph_add_udp_endpoint(struct ceph_mount_info *cmount, const char *name, + const char *ip, int port); + +/** + * Remove an udp endpoint. + * + * @param cmount the ceph mount handle. + * @param name udp endpoint name to remove. + * @returns 0 on success or a negative return code on error. + */ +int ceph_remove_udp_endpoint(struct ceph_mount_info *cmount, const char *name); + +#endif + /** * Create a directory. * diff --git a/src/include/config-h.in.cmake b/src/include/config-h.in.cmake index c983eff396370..e27646fc9bed2 100644 --- a/src/include/config-h.in.cmake +++ b/src/include/config-h.in.cmake @@ -133,6 +133,9 @@ /* define if cephfs enabled */ #cmakedefine WITH_CEPHFS +/* define if cephfs notification enabled */ +#cmakedefine WITH_CEPHFS_NOTIFICATION + /* define if systemed is enabled */ #cmakedefine WITH_SYSTEMD diff --git a/src/libcephfs.cc b/src/libcephfs.cc index 7eea6665f6145..44fb65d8ae121 100644 --- a/src/libcephfs.cc +++ b/src/libcephfs.cc @@ -882,6 +882,46 @@ extern "C" int ceph_rename(struct ceph_mount_info *cmount, const char *from, return cmount->get_client()->rename(from, to, cmount->default_perms); } +#ifdef WITH_CEPHFS_NOTIFICATION +// notifications +extern "C" int +ceph_add_kafka_topic(struct ceph_mount_info *cmount, const char *topic_name, + const char *endpoint_name, const char *broker, + bool use_ssl, const char *user, const char *password, + const char *ca_location, const char *mechanism) { + if (!cmount->is_mounted()) + return -CEPHFS_ENOTCONN; + return cmount->get_client()->add_kafka_topic( + topic_name, endpoint_name, broker, use_ssl, user, password, ca_location, + mechanism, cmount->default_perms); +} + +extern "C" int ceph_remove_kafka_topic(struct ceph_mount_info *cmount, + const char *topic_name, + const char *endpoint_name) { + if (!cmount->is_mounted()) + return -CEPHFS_ENOTCONN; + return cmount->get_client()->remove_kafka_topic(topic_name, endpoint_name, + cmount->default_perms); +} + +extern "C" int ceph_add_udp_endpoint(struct ceph_mount_info *cmount, + const char *name, const char *ip, + int port) { + if (!cmount->is_mounted()) + return -CEPHFS_ENOTCONN; + return cmount->get_client()->add_udp_endpoint(name, ip, port, + cmount->default_perms); +} + +extern "C" int ceph_remove_udp_endpoint(struct ceph_mount_info *cmount, + const char *name) { + if (!cmount->is_mounted()) + return -CEPHFS_ENOTCONN; + return cmount->get_client()->remove_udp_endpoint(name, cmount->default_perms); +} +#endif + // dirs extern "C" int ceph_mkdir(struct ceph_mount_info *cmount, const char *path, mode_t mode) { diff --git a/src/mds/CMakeLists.txt b/src/mds/CMakeLists.txt index 0c6c31a3c51a0..d96ad064bd81e 100644 --- a/src/mds/CMakeLists.txt +++ b/src/mds/CMakeLists.txt @@ -1,3 +1,8 @@ + +if (WITH_CEPHFS_NOTIFICATION) + find_package(RDKafka 0.9.2 REQUIRED) +endif() + set(mds_srcs BatchOp.cc Capability.cc @@ -49,8 +54,27 @@ set(mds_srcs ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc - ${CMAKE_SOURCE_DIR}/src/mgr/MDSPerfMetricTypes.cc) + ${CMAKE_SOURCE_DIR}/src/mgr/MDSPerfMetricTypes.cc + MDSNotificationManager.cc) + +if (WITH_CEPHFS_NOTIFICATION) + list(APPEND mds_srcs MDSKafka.cc MDSUDPEndpoint.cc MDSNotificationMessage.cc) +endif() + +message(STATUS "WITH_CEPHFS_NOTIFICATION=${WITH_CEPHFS_NOTIFICATION}") +message(STATUS "Final mds_srcs: ${mds_srcs}") + add_library(mds STATIC ${mds_srcs}) target_link_libraries(mds PRIVATE - heap_profiler cpu_profiler osdc ${LUA_LIBRARIES}) + heap_profiler cpu_profiler osdc ${LUA_LIBRARIES} + ${Boost_LIBRARIES}) + +if (WITH_CEPHFS_NOTIFICATION) + target_link_libraries(mds PRIVATE RDKafka::RDKafka) +endif() + target_include_directories(mds PRIVATE "${LUA_INCLUDE_DIR}") + +if (WITH_CEPHFS_NOTIFICATION) + target_include_directories(mds PRIVATE ${Boost_INCLUDE_DIRS}) +endif() diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index b142848f164da..b493e42ffe6a8 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -3094,6 +3094,8 @@ bool Locker::check_inode_max_size(CInode *in, bool force_wrlock, if (new_mtime > pi.inode->rstat.rctime) pi.inode->rstat.rctime = new_mtime; } + mds->notification_manager->push_notification( + mds->get_nodeid(), in, CEPH_MDS_NOTIFY_MODIFY, false, in->is_dir()); } // use EOpen if the file is still open; otherwise, use EUpdate. @@ -3913,8 +3915,9 @@ void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t foll ack, client)); } -void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t &m, CInode::mempool_inode *pi) -{ +void Locker::_update_cap_fields(CInode *in, int dirty, + const cref_t &m, + CInode::mempool_inode *pi) { if (dirty == 0) return; @@ -3923,8 +3926,8 @@ void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t uint64_t features = m->get_connection()->get_features(); if (m->get_ctime() > pi->ctime) { - dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime() - << " for " << *in << dendl; + dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime() << " for " + << *in << dendl; pi->ctime = m->get_ctime(); if (m->get_ctime() > pi->rstat.rctime) pi->rstat.rctime = m->get_ctime(); @@ -3932,51 +3935,52 @@ void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) && m->get_change_attr() > pi->change_attr) { - dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr() - << " for " << *in << dendl; + dout(7) << " change_attr " << pi->change_attr << " -> " + << m->get_change_attr() << " for " << *in << dendl; pi->change_attr = m->get_change_attr(); } // file - if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { + if (dirty & (CEPH_CAP_FILE_EXCL | CEPH_CAP_FILE_WR)) { utime_t atime = m->get_atime(); utime_t mtime = m->get_mtime(); uint64_t size = m->get_size(); version_t inline_version = m->inline_version; - + if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) || - ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { - dout(7) << " mtime " << pi->mtime << " -> " << mtime - << " for " << *in << dendl; + ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { + dout(7) << " mtime " << pi->mtime << " -> " << mtime << " for " << *in + << dendl; pi->mtime = mtime; if (mtime > pi->rstat.rctime) - pi->rstat.rctime = mtime; - } - if (in->is_file() && // ONLY if regular file - size > pi->size) { - dout(7) << " size " << pi->size << " -> " << size - << " for " << *in << dendl; + pi->rstat.rctime = mtime; + mds->notification_manager->push_notification( + mds->get_nodeid(), in, CEPH_MDS_NOTIFY_MODIFY, false, in->is_dir()); + } + if (in->is_file() && // ONLY if regular file + size > pi->size) { + dout(7) << " size " << pi->size << " -> " << size << " for " << *in + << dendl; pi->size = size; pi->rstat.rbytes = size; } - if (in->is_file() && - (dirty & CEPH_CAP_FILE_WR) && + if (in->is_file() && (dirty & CEPH_CAP_FILE_WR) && inline_version > pi->inline_data.version) { pi->inline_data.version = inline_version; if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0) - pi->inline_data.set_data(m->inline_data); + pi->inline_data.set_data(m->inline_data); else - pi->inline_data.free_data(); + pi->inline_data.free_data(); } if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) { - dout(7) << " atime " << pi->atime << " -> " << atime - << " for " << *in << dendl; + dout(7) << " atime " << pi->atime << " -> " << atime << " for " << *in + << dendl; pi->atime = atime; } if ((dirty & CEPH_CAP_FILE_EXCL) && - ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) { - dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq() - << " for " << *in << dendl; + ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) { + dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " + << m->get_time_warp_seq() << " for " << *in << dendl; pi->time_warp_seq = m->get_time_warp_seq(); } if (m->fscrypt_file.size()) @@ -3985,27 +3989,23 @@ void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t // auth if (dirty & CEPH_CAP_AUTH_EXCL) { if (m->head.uid != pi->uid) { - dout(7) << " uid " << pi->uid - << " -> " << m->head.uid - << " for " << *in << dendl; + dout(7) << " uid " << pi->uid << " -> " << m->head.uid << " for " << *in + << dendl; pi->uid = m->head.uid; } if (m->head.gid != pi->gid) { - dout(7) << " gid " << pi->gid - << " -> " << m->head.gid - << " for " << *in << dendl; + dout(7) << " gid " << pi->gid << " -> " << m->head.gid << " for " << *in + << dendl; pi->gid = m->head.gid; } if (m->head.mode != pi->mode) { - dout(7) << " mode " << oct << pi->mode - << " -> " << m->head.mode << dec - << " for " << *in << dendl; + dout(7) << " mode " << oct << pi->mode << " -> " << m->head.mode << dec + << " for " << *in << dendl; pi->mode = m->head.mode; } if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) { - dout(7) << " btime " << oct << pi->btime - << " -> " << m->get_btime() << dec - << " for " << *in << dendl; + dout(7) << " btime " << oct << pi->btime << " -> " << m->get_btime() + << dec << " for " << *in << dendl; pi->btime = m->get_btime(); } if (m->fscrypt_auth.size()) @@ -4018,80 +4018,78 @@ void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t * adjust max_size, if needed. * if we update, return true; otherwise, false (no updated needed). */ -bool Locker::_do_cap_update(CInode *in, Capability *cap, - int dirty, snapid_t follows, - const cref_t &m, const ref_t &ack, - bool *need_flush) -{ - dout(10) << "_do_cap_update dirty " << ccap_string(dirty) - << " issued " << ccap_string(cap ? cap->issued() : 0) - << " wanted " << ccap_string(cap ? cap->wanted() : 0) - << " on " << *in << dendl; +bool Locker::_do_cap_update(CInode *in, Capability *cap, int dirty, + snapid_t follows, const cref_t &m, + const ref_t &ack, bool *need_flush) { + dout(10) << "_do_cap_update dirty " << ccap_string(dirty) << " issued " + << ccap_string(cap ? cap->issued() : 0) << " wanted " + << ccap_string(cap ? cap->wanted() : 0) << " on " << *in << dendl; ceph_assert(in->is_auth()); client_t client = m->get_source().num(); - const auto& latest = in->get_projected_inode(); + const auto &latest = in->get_projected_inode(); // increase or zero max_size? uint64_t size = m->get_size(); bool change_max = false; uint64_t old_max = latest->get_client_range(client); uint64_t new_max = old_max; - + if (in->is_file()) { bool forced_change_max = false; dout(20) << "inode is file" << dendl; - if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR & in->get_caps_quiesce_mask())) { - dout(20) << "client has write caps; m->get_max_size=" - << m->get_max_size() << "; old_max=" << old_max << dendl; + if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR & + in->get_caps_quiesce_mask())) { + dout(20) << "client has write caps; m->get_max_size=" << m->get_max_size() + << "; old_max=" << old_max << dendl; if (m->get_max_size() > new_max) { - dout(10) << "client requests file_max " << m->get_max_size() - << " > max " << old_max << dendl; - change_max = true; - forced_change_max = true; - new_max = calc_new_max_size(latest, m->get_max_size()); + dout(10) << "client requests file_max " << m->get_max_size() + << " > max " << old_max << dendl; + change_max = true; + forced_change_max = true; + new_max = calc_new_max_size(latest, m->get_max_size()); } else { - new_max = calc_new_max_size(latest, size); + new_max = calc_new_max_size(latest, size); - if (new_max > old_max) - change_max = true; - else - new_max = old_max; + if (new_max > old_max) + change_max = true; + else + new_max = old_max; } } else { if (old_max) { - change_max = true; - new_max = 0; + change_max = true; + new_max = 0; } } - if (in->last == CEPH_NOSNAP && - change_max && - !in->filelock.can_wrlock(client) && - !in->filelock.can_force_wrlock(client)) { - dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl; + if (in->last == CEPH_NOSNAP && change_max && + !in->filelock.can_wrlock(client) && + !in->filelock.can_force_wrlock(client)) { + dout(10) << " i want to change file_max, but lock won't allow it (yet)" + << dendl; if (in->filelock.is_stable()) { - bool need_issue = false; - if (cap) - cap->inc_suppress(); - if (in->get_mds_caps_wanted().empty() && - (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) { - if (in->filelock.get_state() != LOCK_EXCL) - file_excl(&in->filelock, &need_issue); - } else - simple_lock(&in->filelock, &need_issue); - if (need_issue) - issue_caps(in); - if (cap) - cap->dec_suppress(); + bool need_issue = false; + if (cap) + cap->inc_suppress(); + if (in->get_mds_caps_wanted().empty() && + (in->get_loner() >= 0 || + (in->get_wanted_loner() >= 0 && in->try_set_loner()))) { + if (in->filelock.get_state() != LOCK_EXCL) + file_excl(&in->filelock, &need_issue); + } else + simple_lock(&in->filelock, &need_issue); + if (need_issue) + issue_caps(in); + if (cap) + cap->dec_suppress(); } if (!in->filelock.can_wrlock(client) && - !in->filelock.can_force_wrlock(client)) { - C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in, - forced_change_max ? new_max : 0, - 0, utime_t()); + !in->filelock.can_force_wrlock(client)) { + C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize( + this, in, forced_change_max ? new_max : 0, 0, utime_t()); - in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); - change_max = false; + in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms); + change_max = false; } } } @@ -4100,20 +4098,22 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap, int32_t num_locks; auto bli = m->flockbl.cbegin(); decode(num_locks, bli); - for ( int i=0; i < num_locks; ++i) { + for (int i = 0; i < num_locks; ++i) { ceph_filelock decoded_lock; decode(decoded_lock, bli); - in->get_fcntl_lock_state()->held_locks. - insert(pair(decoded_lock.start, decoded_lock)); - ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; + in->get_fcntl_lock_state()->held_locks.insert( + pair(decoded_lock.start, decoded_lock)); + ++in->get_fcntl_lock_state() + ->client_held_lock_counts[(client_t)(decoded_lock.client)]; } decode(num_locks, bli); - for ( int i=0; i < num_locks; ++i) { + for (int i = 0; i < num_locks; ++i) { ceph_filelock decoded_lock; decode(decoded_lock, bli); - in->get_flock_lock_state()->held_locks. - insert(pair(decoded_lock.start, decoded_lock)); - ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)]; + in->get_flock_lock_state()->held_locks.insert( + pair(decoded_lock.start, decoded_lock)); + ++in->get_flock_lock_state() + ->client_held_lock_counts[(client_t)(decoded_lock.client)]; } } @@ -4123,8 +4123,7 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap, // do the update. EUpdate *le = new EUpdate(mds->mdlog, "cap update"); - bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) && - m->xattrbl.length() && + bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) && m->xattrbl.length() && m->head.xattr_version > in->get_projected_inode()->xattr_version; MutationRef mut(new MutationImpl()); @@ -4136,8 +4135,8 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap, _update_cap_fields(in, dirty, m, pi.inode.get()); if (change_max) { - dout(7) << " max_size " << old_max << " -> " << new_max - << " for " << *in << dendl; + dout(7) << " max_size " << old_max << " -> " << new_max << " for " << *in + << dendl; if (new_max) { auto &cr = pi.inode->client_ranges[client]; cr.range.first = 0; @@ -4145,18 +4144,18 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap, cr.follows = in->first - 1; in->mark_clientwriteable(); if (cap) - cap->mark_clientwriteable(); + cap->mark_clientwriteable(); } else { pi.inode->client_ranges.erase(client); if (pi.inode->client_ranges.empty()) - in->clear_clientwriteable(); + in->clear_clientwriteable(); if (cap) - cap->clear_clientwriteable(); + cap->clear_clientwriteable(); } } - - if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) - wrlock_force(&in->filelock, mut); // wrlock for duration of journal + + if (change_max || (dirty & (CEPH_CAP_FILE_EXCL | CEPH_CAP_FILE_WR))) + wrlock_force(&in->filelock, mut); // wrlock for duration of journal // auth if (dirty & CEPH_CAP_AUTH_EXCL) @@ -4164,27 +4163,30 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap, // xattrs update? if (xattr) { - dout(7) << " xattrs v" << pi.inode->xattr_version << " -> " << m->head.xattr_version << dendl; + dout(7) << " xattrs v" << pi.inode->xattr_version << " -> " + << m->head.xattr_version << dendl; decode_new_xattrs(pi.inode.get(), pi.xattrs.get(), m); wrlock_force(&in->xattrlock, mut); } - + mut->auth_pin(in); - mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); + mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, + 0, follows); mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows); // "oldest flush tid" > 0 means client uses unique TID for each flush if (ack && ack->get_oldest_flush_tid() > 0) - le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()), - ack->get_oldest_flush_tid()); + le->metablob.add_client_flush( + metareqid_t(m->get_source(), ack->get_client_tid()), + ack->get_oldest_flush_tid()); unsigned update_flags = 0; if (change_max) update_flags |= UPDATE_SHAREMAX; if (cap) update_flags |= UPDATE_NEEDSISSUE; - mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags, - ack, client)); + mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish( + this, in, mut, update_flags, ack, client)); if (need_flush && !*need_flush && ((change_max && new_max) || // max INCREASE _need_flush_mdlog(in, dirty))) diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index b31d9c95220cc..084c05fa21427 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -532,6 +532,47 @@ void MDSDaemon::set_up_admin_socket() asok_hook, "run cpu profiling on daemon"); ceph_assert(r == 0); + + #ifdef WITH_CEPHFS_NOTIFICATION + r = admin_socket->register_command( + "add_topic " + "name=topic_name,type=CephString,req=true " + "name=endpoint_name,type=CephString,req=true " + "name=broker,type=CephString,req=false " + "name=use_ssl,type=CephBool,req=false " + "name=username,type=CephString,req=false " + "name=password,type=CephString,req=false " + "name=ca_location,type=CephString,req=false " + "name=mechanism,type=CephString,req=false", + asok_hook, + "add topic for notification" + ); + ceph_assert(r == 0); + r = admin_socket->register_command( + "remove_topic " + "name=topic_name,type=CephString,req=true " + "name=endpoint_name,type=CephString,req=true", + asok_hook, + "remove kafka topic" + ); + ceph_assert(r == 0); + r = admin_socket->register_command( + "add_udp_endpoint " + "name=entity,type=CephString,req=true " + "name=ip,type=CephString,req=true " + "name=port,type=CephInt,req=true", + asok_hook, + "add udp endpoint for notification" + ); + ceph_assert(r == 0); + r = admin_socket->register_command( + "remove_udp_endpoint " + "name=entity,type=CephString,req=true", + asok_hook, + "remove UDP endpoint" + ); + ceph_assert(r == 0); + #endif } void MDSDaemon::clean_up_admin_socket() diff --git a/src/mds/MDSKafka.cc b/src/mds/MDSKafka.cc new file mode 100644 index 0000000000000..cbee61bd92371 --- /dev/null +++ b/src/mds/MDSKafka.cc @@ -0,0 +1,764 @@ + +#include "MDSKafka.h" +#include "common/Cond.h" +#include "common/errno.h" +#include "include/fs_types.h" + +#define dout_subsys ceph_subsys_mds + +CephContext *MDSKafka::cct = nullptr; +CephContext *MDSKafkaTopic::cct = nullptr; + +MDSKafkaConnection::MDSKafkaConnection( + const std::string &broker, bool use_ssl, const std::string &user, + const std::string &password, const std::optional &ca_location, + const std::optional &mechanism) + : broker(broker), use_ssl(use_ssl), user(user), password(password), + ca_location(ca_location), mechanism(mechanism) { + combine_hash(); +} + +void MDSKafkaConnection::encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(broker, bl); + encode(use_ssl, bl); + encode(user, bl); + encode(password, bl); + encode(ca_location, bl); + encode(mechanism, bl); + ENCODE_FINISH(bl); +} + +void MDSKafkaConnection::decode(ceph::buffer::list::const_iterator &iter) { + DECODE_START(1, iter); + decode(broker, iter); + decode(use_ssl, iter); + decode(user, iter); + decode(password, iter); + decode(ca_location, iter); + decode(mechanism, iter); + DECODE_FINISH(iter); +} + +void MDSKafkaConnection::dump(ceph::Formatter *f) const { + f->dump_string("broker", broker); + f->dump_bool("use_ssl", use_ssl); + f->dump_string("user", user); + f->dump_string("password", password); + if (ca_location.has_value()) { + f->dump_string("ca_location", ca_location.value()); + } + if (mechanism.has_value()) { + f->dump_string("mechanism", mechanism.value()); + } +} + +void MDSKafkaConnection::generate_test_instances( + std::list &o) { + o.push_back(new MDSKafkaConnection); +} + +bool MDSKafkaConnection::is_empty() const { + return broker.empty() && !use_ssl && user.empty() && password.empty() && + !ca_location.has_value() && !mechanism.has_value(); +} + +MDSKafkaManager::MDSKafkaManager(MDSRank *mds) + : mds(mds), cct(mds->cct), paused(true), object_name("mds_kafka_topics"), + endpoints_epoch(0), prev_endpoints_epoch(0) {} + +int MDSKafkaManager::load_data(std::map &mp) { + int r = update_omap(std::map()); + if (r < 0) { + return r; + } + C_SaferCond sync_finisher; + ObjectOperation op; + op.omap_get_vals("", "", UINT_MAX, &mp, NULL, NULL); + mds->objecter->read(object_t(object_name), + object_locator_t(mds->get_metadata_pool()), op, + CEPH_NOSNAP, NULL, 0, &sync_finisher); + r = sync_finisher.wait(); + if (r < 0) { + lderr(mds->cct) << "Error reading omap values from object '" << object_name + << "':" << cpp_strerror(r) << dendl; + } + return r; +} + +int MDSKafkaManager::update_omap(const std::map &mp) { + C_SaferCond sync_finisher; + ObjectOperation op; + op.omap_set(mp); + mds->objecter->mutate( + object_t(object_name), object_locator_t(mds->get_metadata_pool()), op, + SnapContext(), ceph::real_clock::now(), 0, &sync_finisher); + int r = sync_finisher.wait(); + if (r < 0) { + lderr(mds->cct) << "Error updating omap of object '" << object_name + << "':" << cpp_strerror(r) << dendl; + } + return r; +} + +int MDSKafkaManager::remove_keys(const std::set &st) { + C_SaferCond sync_finisher; + ObjectOperation op; + op.omap_rm_keys(st); + mds->objecter->mutate( + object_t(object_name), object_locator_t(mds->get_metadata_pool()), op, + SnapContext(), ceph::real_clock::now(), 0, &sync_finisher); + int r = sync_finisher.wait(); + if (r < 0) { + lderr(mds->cct) << "Error removing keys from omap of object '" + << object_name << "':" << cpp_strerror(r) << dendl; + } + return r; +} + +int MDSKafkaManager::add_topic_into_disk(const std::string &topic_name, + const std::string &endpoint_name, + const MDSKafkaConnection &connection) { + std::map mp; + std::string key = topic_name + "," + endpoint_name; + bufferlist bl; + encode(connection, bl); + mp[key] = std::move(bl); + int r = update_omap(mp); + return r; +} + +int MDSKafkaManager::remove_topic_from_disk(const std::string &topic_name, + const std::string &endpoint_name) { + std::set st; + std::string key = topic_name + "," + endpoint_name; + st.insert(key); + int r = remove_keys(st); + return r; +} + +int MDSKafkaManager::init() { + std::map mp; + int r = load_data(mp); + if (r < 0) { + lderr(cct) << "Error occurred while initilizing kafka topics" << dendl; + } + for (auto &[key, val] : mp) { + try { + MDSKafkaConnection connection; + auto iter = val.cbegin(); + decode(connection, iter); + size_t pos = key.find(','); + std::string topic_name = key.substr(0, pos); + std::string endpoint_name = key.substr(pos + 1); + add_topic(topic_name, endpoint_name, connection, false); + endpoints_epoch++; + } catch (const ceph::buffer::error &e) { + ldout(cct, 1) << "Undecodable kafka topic found:" << e.what() << dendl; + } + } + return r; +} + +int MDSKafkaManager::remove_topic(const std::string &topic_name, + const std::string &endpoint_name, + bool write_into_disk) { + std::unique_lock lock(endpoint_mutex); + int r = 0; + bool is_empty = false; + auto it = candidate_endpoints.find(endpoint_name); + if (it == candidate_endpoints.end()) { + ldout(cct, 1) << "No kafka endpoint exist having name '" << endpoint_name + << "'" << dendl; + r = -CEPHFS_EINVAL; + goto error_occurred; + } + r = it->second->remove_topic(topic_name, is_empty); + if (r < 0) { + ldout(cct, 1) << "No kafka topic exist with topic name '" << topic_name + << "' with endpoint having endpoint name '" << endpoint_name + << "'" << dendl; + goto error_occurred; + } + if (is_empty) { + candidate_endpoints.erase(it); + endpoints_epoch++; + } + if (write_into_disk) { + r = remove_topic_from_disk(topic_name, endpoint_name); + if (r < 0) { + goto error_occurred; + } + } + ldout(cct, 1) << "Kafka topic named '" << topic_name + << "' having endpoint name '" << endpoint_name + << "' is removed successfully" << dendl; + if (candidate_endpoints.empty()) { + pause(); + } + return r; + +error_occurred: + lderr(cct) << "Kafka topic named '" << topic_name + << "' having endpoint name '" << endpoint_name + << "' can not be removed, failed with an error:" << cpp_strerror(r) + << dendl; + return r; +} + +int MDSKafkaManager::add_topic(const std::string &topic_name, + const std::string &endpoint_name, + const MDSKafkaConnection &connection, + bool write_into_disk) { + std::unique_lock lock(endpoint_mutex); + auto it = candidate_endpoints.find(endpoint_name); + std::shared_ptr kafka; + std::shared_ptr topic; + bool created = false; + int r = 0; + if (it == candidate_endpoints.end()) { + if (candidate_endpoints.size() >= MAX_CONNECTIONS_DEFAULT) { + ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; + r = -CEPHFS_ENOMEM; + goto error_occurred; + } + kafka = MDSKafka::create(cct, connection, endpoint_name); + if (!kafka) { + r = -CEPHFS_ECANCELED; + goto error_occurred; + } + created = true; + } else { + if (!connection.is_empty() && + connection.hash_key != it->second->connection.hash_key) { + ldout(cct, 1) + << "Kafka endpoint name already exist with different endpoint " + "information" + << dendl; + r = -CEPHFS_EINVAL; + goto error_occurred; + } + kafka = it->second; + } + topic = MDSKafkaTopic::create(cct, topic_name, kafka); + if (!topic) { + r = -CEPHFS_ECANCELED; + goto error_occurred; + } + kafka->add_topic(topic_name, topic); + if (created) { + candidate_endpoints[endpoint_name] = kafka; + endpoints_epoch++; + } + if (write_into_disk) { + r = add_topic_into_disk(topic_name, endpoint_name, connection); + if (r < 0) { + goto error_occurred; + } + } + ldout(cct, 1) << "Kafka topic named '" << topic_name + << "' having endpoint name '" << endpoint_name + << "' is added successfully" << dendl; + activate(); + return r; + +error_occurred: + lderr(cct) << "Kafka topic named '" << topic_name + << "' having endpoint name '" << endpoint_name + << "' can not be added, failed with an error:" << cpp_strerror(r) + << dendl; + return r; +} + +void MDSKafkaManager::activate() { + if (!paused) { + return; + } + worker = std::thread(&MDSKafkaManager::run, this); + paused = false; + ldout(cct, 1) << "KafkaManager worker thread started" << dendl; +} + +void MDSKafkaManager::pause() { + if (paused) { + return; + } + paused = true; + pick_message.notify_one(); + if (worker.joinable()) { + worker.join(); + } + ldout(cct, 1) << "KafkaManager worker thread paused" << dendl; +} + +int MDSKafkaManager::send( + const std::shared_ptr &message) { + if (paused) { + return -CEPHFS_ECANCELED; + } + std::unique_lock lock(queue_mutex); + if (message_queue.size() >= MAX_QUEUE_DEFAULT) { + ldout(cct, 5) << "Notification message for kafka with seq_id=" + << message->seq_id << " is dropped as queue is full" << dendl; + return -CEPHFS_EBUSY; + } + message_queue.push(message); + lock.unlock(); + pick_message.notify_one(); + return 0; +} + +void MDSKafkaManager::sync_endpoints() { + uint64_t current_epoch = endpoints_epoch.load(); + if (prev_endpoints_epoch != current_epoch) { + effective_endpoints = candidate_endpoints; + prev_endpoints_epoch = current_epoch; + } +} + +uint64_t MDSKafkaManager::publish( + const std::shared_ptr &message) { + sync_endpoints(); + uint64_t reply_count = 0; + for (auto &[key, endpoint] : effective_endpoints) { + reply_count += endpoint->publish_internal(message); + } + return reply_count; +} + +void MDSKafkaManager::run() { + while (true) { + std::unique_lock queue_lock(queue_mutex); + pick_message.wait(queue_lock, + [this] { return paused || !message_queue.empty(); }); + if (paused) { + break; + } + std::queue> local_message_queue; + swap(local_message_queue, message_queue); + ceph_assert(message_queue.empty()); + queue_lock.unlock(); + while (!local_message_queue.empty() && !paused) { + std::shared_ptr message = + local_message_queue.front(); + local_message_queue.pop(); + publish(message); + } + } +} + +void MDSKafkaConnection::combine_hash() { + hash_key = 0; + boost::hash_combine(hash_key, broker); + boost::hash_combine(hash_key, use_ssl); + boost::hash_combine(hash_key, user); + boost::hash_combine(hash_key, password); + if (ca_location.has_value()) { + boost::hash_combine(hash_key, ca_location.value()); + } + if (mechanism.has_value()) { + boost::hash_combine(hash_key, mechanism.value()); + } +} + +void MDSKafkaTopic::kafka_topic_deleter(rd_kafka_topic_t *topic_ptr) { + if (topic_ptr) { + rd_kafka_topic_destroy(topic_ptr); + } +} + +MDSKafkaTopic::MDSKafkaTopic(const std::string &topic_name) + : topic_name(topic_name) {} + +std::shared_ptr +MDSKafkaTopic::create(CephContext *_cct, const std::string &topic_name, + const std::shared_ptr &kafka_endpoint) { + try { + if (!MDSKafkaTopic::cct && _cct) { + MDSKafkaTopic::cct = _cct; + } + + std::shared_ptr topic_ptr = + std::make_shared(topic_name); + topic_ptr->kafka_topic_ptr.reset(rd_kafka_topic_new( + kafka_endpoint->producer.get(), topic_name.c_str(), nullptr)); + if (!topic_ptr->kafka_topic_ptr) { + return nullptr; + } + return topic_ptr; + } catch (...) { + } + return nullptr; +} + +int64_t MDSKafka::push_unack_event() { + std::unique_lock lock(delivery_mutex); + if (unacked_delivery.size() >= (int)MAX_INFLIGHT_DEFAULT) { + return -1; + } + unacked_delivery.insert(delivery_seq); + int64_t idx = delivery_seq++; + if (unacked_delivery.size() >= + cct->_conf->mds_kafka_unacked_message_threshold) { + lock.unlock(); + pick_delivery_ack.notify_one(); + } + + return idx; +} + +void MDSKafka::acknowledge_event(int64_t idx) { + std::unique_lock lock(delivery_mutex); + auto it = unacked_delivery.find(idx); + if (it == unacked_delivery.end()) { + return; + } + unacked_delivery.erase(it); + // ldout(cct, 0) << ": Message acknowledged=" << idx << dendl; +} + +void MDSKafka::kafka_producer_deleter(rd_kafka_t *producer_ptr) { + if (producer_ptr) { + rd_kafka_flush(producer_ptr, + 10 * 1000); // Wait for max 10 seconds to flush. + rd_kafka_destroy(producer_ptr); // Destroy producer instance. + } +} + +MDSKafka::MDSKafka(const MDSKafkaConnection &connection, + const std::string &endpoint_name) + : connection(connection), stopped(true), endpoint_name(endpoint_name) {} + +void MDSKafka::run_polling() { + while (true) { + std::unique_lock lock(delivery_mutex); + pick_delivery_ack.wait( + lock, [this] { return (stopped || !unacked_delivery.empty()); }); + if (stopped) { + break; + } + lock.unlock(); + poll(cct->_conf->mds_kafka_sleep_timeout); + } +} + +std::shared_ptr MDSKafka::create(CephContext *_cct, + const MDSKafkaConnection &connection, + const std::string &endpoint_name) { + try { + if (!MDSKafka::cct && _cct) { + MDSKafka::cct = _cct; + } + // validation before creating kafka interface + if (connection.broker.empty()) { + return nullptr; + } + if (connection.user.empty() != connection.password.empty()) { + return nullptr; + } + if (!connection.user.empty() && !connection.use_ssl && + !g_conf().get_val( + "mds_allow_notification_secrets_in_cleartext")) { + ldout(cct, 1) << "Kafka connect: user/password are only allowed over " + "secure connection" + << dendl; + return nullptr; + } + std::shared_ptr kafka_ptr = + std::make_shared(connection, endpoint_name); + char errstr[512] = {0}; + auto kafka_conf_deleter = [](rd_kafka_conf_t *conf) { + rd_kafka_conf_destroy(conf); + }; + std::unique_ptr conf( + rd_kafka_conf_new(), kafka_conf_deleter); + if (!conf) { + ldout(cct, 1) << "Kafka connect: failed to allocate configuration" + << dendl; + return nullptr; + } + constexpr std::uint64_t min_message_timeout = 1; + const auto message_timeout = + std::max(min_message_timeout, + cct->_conf.get_val("mds_kafka_message_timeout")); + if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", + std::to_string(message_timeout).c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + if (rd_kafka_conf_set(conf.get(), "bootstrap.servers", + connection.broker.c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + + if (connection.use_ssl) { + if (!connection.user.empty()) { + // use SSL+SASL + if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_SSL", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.username", + connection.user.c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.password", + connection.password.c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) + << "Kafka connect: successfully configured SSL+SASL security" + << dendl; + + if (connection.mechanism) { + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", + connection.mechanism->c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) + << "Kafka connect: successfully configured SASL mechanism" + << dendl; + } else { + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) << "Kafka connect: using default SASL mechanism" + << dendl; + } + } else { + // use only SSL + if (rd_kafka_conf_set(conf.get(), "security.protocol", "SSL", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) << "Kafka connect: successfully configured SSL security" + << dendl; + } + if (connection.ca_location) { + if (rd_kafka_conf_set(conf.get(), "ssl.ca.location", + connection.ca_location->c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) << "Kafka connect: successfully configured CA location" + << dendl; + } else { + ldout(cct, 20) << "Kafka connect: using default CA location" << dendl; + } + ldout(cct, 20) << "Kafka connect: successfully configured security" + << dendl; + } else if (!connection.user.empty()) { + // use SASL+PLAINTEXT + if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_PLAINTEXT", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.username", + connection.user.c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.password", + connection.password.c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) << "Kafka connect: successfully configured SASL_PLAINTEXT" + << dendl; + + if (connection.mechanism) { + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", + connection.mechanism->c_str(), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) + << "Kafka connect: successfully configured SASL mechanism" << dendl; + } else { + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + goto conf_error; + } + ldout(cct, 20) << "Kafka connect: using default SASL mechanism" + << dendl; + } + } + rd_kafka_conf_set_dr_msg_cb(conf.get(), message_callback); + rd_kafka_conf_set_opaque(conf.get(), kafka_ptr.get()); + rd_kafka_conf_set_log_cb(conf.get(), log_callback); + rd_kafka_conf_set_error_cb(conf.get(), poll_err_callback); + { + rd_kafka_t *prod = rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), errstr, + sizeof(errstr)); + if (!prod) { + ldout(cct, 1) << "Kafka connect: failed to create producer: " << errstr + << dendl; + return nullptr; + } + kafka_ptr->producer.reset(prod); + } + ldout(cct, 1) << "Kafka connect: successfully created new producer" + << dendl; + { + const auto log_level = cct->_conf->subsys.get_log_level(ceph_subsys_mds); + if (log_level <= 1) { + rd_kafka_set_log_level(kafka_ptr->producer.get(), 3); + } else if (log_level <= 2) { + rd_kafka_set_log_level(kafka_ptr->producer.get(), 5); + } else if (log_level <= 10) { + rd_kafka_set_log_level(kafka_ptr->producer.get(), 5); + } else { + rd_kafka_set_log_level(kafka_ptr->producer.get(), 5); + } + } + return kafka_ptr; + + conf_error: + ldout(cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl; + return nullptr; + } catch (...) { + } + return nullptr; +} + +void MDSKafka::add_topic(const std::string &topic_name, + const std::shared_ptr &topic) { + std::unique_lock lock(topic_mutex); + topics[topic_name] = topic; + start_polling_thread(); +} + +void MDSKafka::start_polling_thread() { + if (!stopped) { + return; + } + stopped = false; + polling_thread = std::thread(&MDSKafka::run_polling, this); + ldout(cct, 1) << ": polling thread started for kafka enpoint name=" + << endpoint_name << dendl; +} + +void MDSKafka::stop_polling_thread() { + if (stopped) { + return; + } + stopped = true; + pick_delivery_ack.notify_one(); + if (polling_thread.joinable()) { + polling_thread.join(); + } + ldout(cct, 1) << ": polling thread stopped for kafka enpoint name=" + << endpoint_name << dendl; +} + +int MDSKafka::remove_topic(const std::string &topic_name, bool &is_empty) { + std::unique_lock lock(topic_mutex); + auto it = topics.find(topic_name); + if (it == topics.end()) { + return -CEPHFS_EINVAL; + } + topics.erase(it); + is_empty = topics.empty(); + if (is_empty) { + stop_polling_thread(); + } + return 0; +} + +void MDSKafka::log_callback(const rd_kafka_t *rk, int level, const char *fac, + const char *buf) { + if (!cct) { + return; + } + if (level <= 3) { + ldout(cct, 1) << "RDKAFKA-" << level << "-" << fac << ": " + << rd_kafka_name(rk) << ": " << buf << dendl; + } else if (level <= 5) { + ldout(cct, 2) << "RDKAFKA-" << level << "-" << fac << ": " + << rd_kafka_name(rk) << ": " << buf << dendl; + } else if (level <= 6) { + ldout(cct, 10) << "RDKAFKA-" << level << "-" << fac << ": " + << rd_kafka_name(rk) << ": " << buf << dendl; + } else { + ldout(cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " + << rd_kafka_name(rk) << ": " << buf << dendl; + } +} + +void MDSKafka::poll_err_callback(rd_kafka_t *rk, int err, const char *reason, + void *opaque) { + if (!cct) { + return; + } + ldout(cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl; +} + +uint64_t MDSKafka::publish_internal( + const std::shared_ptr &message) { + uint64_t reply_count = 0; + std::shared_lock lock(topic_mutex); + uint64_t read_timeout = + cct->_conf->mds_kafka_sleep_timeout; + for (auto [topic_name, topic_ptr] : topics) { + int64_t idx = push_unack_event(); + if (idx == -1) { + ldout(cct, 5) << "Kafka publish (with callback): failed with error: " + "callback queue full, trying to poll again" + << dendl; + reply_count += rd_kafka_poll(producer.get(), 3 * read_timeout); + idx = push_unack_event(); + if (idx == -1) { + ldout(cct, 5) + << "Kafka publish (with callback): failed with error: " + "message dropped, callback queue full event after polling for " + << 3 * read_timeout << "ms" << dendl; + continue; + } + } + int64_t *tag = new int64_t(idx); + // RdKafka::ErrorCode response = producer->produce( + // topic_name, RdKafka::Topic::PARTITION_UA, + // RdKafka::Producer::RK_MSG_COPY, const_cast(message->c_str()), message->length(), nullptr, 0, 0, tag); + const auto response = rd_kafka_produce( + topic_ptr->kafka_topic_ptr.get(), RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, const_cast(message->message.c_str()), + message->message.length(), nullptr, 0, tag); + if (response == -1) { + const auto err = rd_kafka_last_error(); + ldout(cct, 5) << "Kafka publish: failed to produce for topic: " + << topic_name << ". with error: " << rd_kafka_err2str(err) + << dendl; + + delete tag; + acknowledge_event(idx); + continue; + } + reply_count += rd_kafka_poll(producer.get(), 0); + } + return reply_count; +} + +uint64_t MDSKafka::poll(int read_timeout) { + return rd_kafka_poll(producer.get(), read_timeout); +} + +void MDSKafka::message_callback(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) { + const auto result = rkmessage->err; + const auto kafka_ptr = reinterpret_cast(opaque); + if (result == 0) { + ldout(cct, 20) << "Kafka run: ack received with result=" + << rd_kafka_err2str(result) << dendl; + } else { + ldout(cct, 5) << "Kafka run: nack received with result=" + << rd_kafka_err2str(result) + << " for kafka endpoint having endpoint name: " + << kafka_ptr->endpoint_name << dendl; + } + if (!rkmessage->_private) { + ldout(cct, 20) << "Kafka run: n/ack received without a callback" << dendl; + return; + } + int64_t *tag = reinterpret_cast(rkmessage->_private); + kafka_ptr->acknowledge_event(*tag); + delete tag; +} \ No newline at end of file diff --git a/src/mds/MDSKafka.h b/src/mds/MDSKafka.h new file mode 100644 index 0000000000000..4414662c3d86a --- /dev/null +++ b/src/mds/MDSKafka.h @@ -0,0 +1,146 @@ +#pragma once + +#include "MDSNotificationMessage.h" +#include "MDSRank.h" +#include "common/ceph_context.h" +#include "include/buffer.h" +#include +#include +#include +#include + +class MDSKafka; +class MDSKafkaTopic; + +struct MDSKafkaConnection { + std::string broker; + bool use_ssl; + std::string user; + std::string password; + std::optional ca_location; + std::optional mechanism; + uint64_t hash_key; + MDSKafkaConnection() = default; + MDSKafkaConnection(const std::string &broker, bool use_ssl, + const std::string &user, const std::string &password, + const std::optional &ca_location, + const std::optional &mechanism); + void combine_hash(); + bool is_empty() const; + std::string to_string() const { return broker + ":" + user; } + void encode(ceph::buffer::list &bl) const; + void decode(ceph::buffer::list::const_iterator &iter); + void dump(ceph::Formatter *f) const; + static void generate_test_instances(std::list &o); +}; + +WRITE_CLASS_ENCODER(MDSKafkaConnection) + +class MDSKafkaManager { +public: + MDSKafkaManager(MDSRank *mds); + int init(); + void activate(); + void pause(); + int add_topic(const std::string &topic_name, const std::string &endpoint_name, + const MDSKafkaConnection &connection, bool write_into_disk); + int remove_topic(const std::string &topic_name, + const std::string &endpoint_name, bool write_into_disk); + int send(const std::shared_ptr &message); + CephContext *cct; + +private: + void run(); + uint64_t publish(const std::shared_ptr &message); + int load_data(std::map &mp); + int add_topic_into_disk(const std::string &topic_name, + const std::string &endpoint_name, + const MDSKafkaConnection &connection); + int remove_topic_from_disk(const std::string &topic_name, + const std::string &endpoint_name); + int update_omap(const std::map &mp); + int remove_keys(const std::set &st); + void sync_endpoints(); + static const size_t MAX_CONNECTIONS_DEFAULT = 32; + static const size_t MAX_QUEUE_DEFAULT = 131072; + std::shared_mutex endpoint_mutex; + std::unordered_map> + candidate_endpoints, effective_endpoints; + std::mutex queue_mutex; + std::queue> message_queue; + std::thread worker; + std::condition_variable pick_message; + std::atomic paused; + MDSRank *mds; + std::string object_name; + std::atomic endpoints_epoch = 0; + uint64_t prev_endpoints_epoch = 0; +}; + +class MDSKafkaTopic { +public: + MDSKafkaTopic() = delete; + MDSKafkaTopic(const std::string &topic_name); + static std::shared_ptr + create(CephContext *_cct, const std::string &topic_name, + const std::shared_ptr &kafka_endpoint); + static void kafka_topic_deleter(rd_kafka_topic_t *topic_ptr); + std::unique_ptr + kafka_topic_ptr{nullptr, kafka_topic_deleter}; + friend class MDSKafkaManager; + friend class MDSKafka; + +private: + std::string topic_name; + static CephContext *cct; +}; + +class MDSKafka { +public: + MDSKafka() = delete; + MDSKafka(const MDSKafkaConnection &connection, + const std::string &endpoint_name); + ~MDSKafka() { + stop_polling_thread(); + } + static std::shared_ptr create(CephContext *_cct, + const MDSKafkaConnection &connection, + const std::string &endpoint_name); + uint64_t + publish_internal(const std::shared_ptr &message); + uint64_t poll(int read_timeout); + void add_topic(const std::string &topic_name, + const std::shared_ptr &topic); + int remove_topic(const std::string &topic_name, bool &is_empty); + static void kafka_producer_deleter(rd_kafka_t *producer_ptr); + friend class MDSKafkaManager; + friend class MDSKafkaTopic; + +private: + std::unique_ptr producer{ + nullptr, kafka_producer_deleter}; + std::shared_mutex topic_mutex; + std::unordered_map> topics; + static CephContext *cct; + MDSKafkaConnection connection; + std::string endpoint_name; + static void message_callback(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque); + static void log_callback(const rd_kafka_t *rk, int level, const char *fac, + const char *buf); + static void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, + void *opaque); + int64_t push_unack_event(); + void acknowledge_event(int64_t idx); + void run_polling(); + void stop_polling_thread(); + void start_polling_thread(); + std::atomic stopped; + std::condition_variable pick_delivery_ack; + std::thread polling_thread; + std::unordered_set unacked_delivery; + int64_t delivery_seq = 0; + std::mutex delivery_mutex; + static const size_t MAX_INFLIGHT_DEFAULT = 1 << 20; +}; \ No newline at end of file diff --git a/src/mds/MDSNotificationManager.cc b/src/mds/MDSNotificationManager.cc new file mode 100644 index 0000000000000..dc0deacc0cbf0 --- /dev/null +++ b/src/mds/MDSNotificationManager.cc @@ -0,0 +1,293 @@ +#include "MDSNotificationManager.h" +#include "include/uuid.h" +#define dout_subsys ceph_subsys_mds +#define ALL_BIT_ON_MASK 1048575 +#define ALL_BIT_ON_MASK_EXCEPT_ONLY_DIR 524287 + + +MDSNotificationManager::MDSNotificationManager(MDSRank *mds) + : cct(mds->cct), mds(mds), cur_notification_seq_id(0) { +#ifdef WITH_CEPHFS_NOTIFICATION + uuid_d uid; + uid.generate_random(); + session_id = uid.to_string(); + kafka_manager = std::make_unique(mds); + udp_manager = std::make_unique(mds); +#endif +} + +void MDSNotificationManager::init() { +#ifdef WITH_CEPHFS_NOTIFICATION + int r = kafka_manager->init(); + if (r < 0) { + kafka_manager = nullptr; + } + r = udp_manager->init(); + if (r < 0) { + udp_manager = nullptr; + } +#endif +} + +int MDSNotificationManager::dispatch(const cref_t &m) { +#ifdef WITH_CEPHFS_NOTIFICATION + if (m->get_type() == MSG_MDS_NOTIFICATION_INFO_KAFKA_TOPIC) { + const auto &req = ref_cast(m); + if (!req->is_remove) { + add_kafka_topic(req->topic_name, req->endpoint_name, req->broker, + req->use_ssl, req->user, req->password, req->ca_location, + req->mechanism, false, false); + } else { + remove_kafka_topic(req->topic_name, req->endpoint_name, false, false); + } + return 0; + } else if (m->get_type() == MSG_MDS_NOTIFICATION_INFO_UDP_ENDPOINT) { + const auto &req = ref_cast(m); + if (!req->is_remove) { + add_udp_endpoint(req->name, req->ip, req->port, false, false); + } else { + remove_udp_endpoint(req->name, false, false); + } + return 0; + } + return -CEPHFS_EOPNOTSUPP; +#else + return -CEPHFS_EOPNOTSUPP; +#endif +} + +int MDSNotificationManager::add_kafka_topic( + const std::string &topic_name, const std::string &endpoint_name, + const std::string &broker, bool use_ssl, const std::string &user, + const std::string &password, const std::optional &ca_location, + const std::optional &mechanism, bool write_into_disk, + bool send_peers) { +#ifdef WITH_CEPHFS_NOTIFICATION + if (!kafka_manager) { + ldout(cct, 1) + << "Kafka topic '" << topic_name + << "' creation failed as kafka manager is not initialized correctly" + << dendl; + return -CEPHFS_EFAULT; + } + int r = kafka_manager->add_topic(topic_name, endpoint_name, + MDSKafkaConnection(broker, use_ssl, user, + password, ca_location, + mechanism), + write_into_disk); + if (send_peers && r == 0) { + auto m = make_message( + topic_name, endpoint_name, broker, use_ssl, user, password, ca_location, + mechanism, false); + mds->send_to_peers(m); + } + return r; +#else + return -CEPHFS_EOPNOTSUPP; +#endif +} + +int MDSNotificationManager::remove_kafka_topic(const std::string &topic_name, + const std::string &endpoint_name, + bool write_into_disk, + bool send_peers) { +#ifdef WITH_CEPHFS_NOTIFICATION + if (!kafka_manager) { + ldout(cct, 1) + << "Kafka topic '" << topic_name + << "' removal failed as kafka manager is not initialized correctly" + << dendl; + return -CEPHFS_EFAULT; + } + int r = + kafka_manager->remove_topic(topic_name, endpoint_name, write_into_disk); + if (send_peers && r == 0) { + auto m = make_message(topic_name, + endpoint_name, true); + mds->send_to_peers(m); + } + return r; +#else + return -CEPHFS_EOPNOTSUPP; +#endif +} + +int MDSNotificationManager::add_udp_endpoint(const std::string &name, + const std::string &ip, int port, + bool write_into_disk, + bool send_peers) { +#ifdef WITH_CEPHFS_NOTIFICATION + if (!udp_manager) { + ldout(cct, 1) + << "UDP endpoint '" << name + << "' creation failed as udp manager is not initialized correctly" + << dendl; + return -CEPHFS_EFAULT; + } + int r = udp_manager->add_endpoint(name, MDSUDPConnection(ip, port), + write_into_disk); + if (send_peers && r == 0) { + auto m = make_message(name, ip, port, false); + mds->send_to_peers(m); + } + return r; +#else + return -CEPHFS_EOPNOTSUPP; +#endif +} + +int MDSNotificationManager::remove_udp_endpoint(const std::string &name, + bool write_into_disk, + bool send_peers) { +#ifdef WITH_CEPHFS_NOTIFICATION + if (!udp_manager) { + ldout(cct, 1) + << "UDP endpoint '" << name + << "' removal failed as udp manager is not initialized correctly" + << dendl; + return -CEPHFS_EFAULT; + } + int r = udp_manager->remove_endpoint(name, write_into_disk); + if (send_peers && r == 0) { + auto m = make_message(name, true); + mds->send_to_peers(m); + } + return r; +#else + return -CEPHFS_EOPNOTSUPP; +#endif +} + +#ifdef WITH_CEPHFS_NOTIFICATION +void MDSNotificationManager::push_notification( + const std::shared_ptr &message) { + if (kafka_manager) { + kafka_manager->send(message); + } + if (udp_manager) { + udp_manager->send(message); + } +} +#endif + +void MDSNotificationManager::push_notification(int32_t whoami, CInode *in, + uint64_t notify_mask, + bool projected, bool is_dir) { +#ifdef WITH_CEPHFS_NOTIFICATION + std::string path; + in->make_path_string(path, projected); + std::shared_ptr message = + std::make_shared( + cur_notification_seq_id.fetch_add(1)); + uint64_t filter_mask = ALL_BIT_ON_MASK; + if (is_dir) { + filter_mask = cct->_conf.get_val("mds_notification_dir_mask") | + CEPH_MDS_NOTIFY_ONLYDIR; + notify_mask |= CEPH_MDS_NOTIFY_ONLYDIR; + } else { + filter_mask = cct->_conf.get_val("mds_notification_file_mask"); + } + notify_mask &= filter_mask; + uint64_t check_mask = notify_mask & ALL_BIT_ON_MASK_EXCEPT_ONLY_DIR; + if (check_mask) { + message->create_message(whoami, session_id, notify_mask, path); + push_notification(message); + } +#endif +} + +void MDSNotificationManager::push_notification_link( + int32_t whoami, CInode *targeti, CDentry *destdn, + uint64_t notify_mask_for_target, uint64_t notify_mask_for_link, + bool is_dir) { +#ifdef WITH_CEPHFS_NOTIFICATION + std::string target_path; + targeti->make_path_string(target_path, true, nullptr); + std::string link_path; + destdn->make_path_string(link_path, true); + std::shared_ptr message = + std::make_shared( + cur_notification_seq_id.fetch_add(1)); + uint64_t filter_mask = ALL_BIT_ON_MASK; + if (is_dir) { + filter_mask = cct->_conf.get_val("mds_notification_dir_mask") | + CEPH_MDS_NOTIFY_ONLYDIR; + notify_mask_for_target |= CEPH_MDS_NOTIFY_ONLYDIR; + notify_mask_for_link |= CEPH_MDS_NOTIFY_ONLYDIR; + } else { + filter_mask = cct->_conf.get_val("mds_notification_file_mask"); + } + notify_mask_for_target &= filter_mask; + notify_mask_for_link &= filter_mask; + uint64_t check_mask = (notify_mask_for_target | notify_mask_for_link) & + ALL_BIT_ON_MASK_EXCEPT_ONLY_DIR; + if (check_mask) { + if (target_path == link_path) { + message->create_message(whoami, session_id, notify_mask_for_link, + target_path); + push_notification(message); + return; + } + message->create_link_message(whoami, session_id, notify_mask_for_target, + notify_mask_for_link, target_path, link_path); + push_notification(message); + } +#endif +} + +void MDSNotificationManager::push_notification_move(int32_t whoami, + CDentry *srcdn, + CDentry *destdn, + bool is_dir) { +#ifdef WITH_CEPHFS_NOTIFICATION + std::string dest_path, src_path; + srcdn->make_path_string(src_path, true); + destdn->make_path_string(dest_path, true); + uint64_t mask = CEPH_MDS_NOTIFY_MOVE_SELF; + uint64_t filter_mask = ALL_BIT_ON_MASK; + if (is_dir) { + mask |= CEPH_MDS_NOTIFY_ONLYDIR; + filter_mask = cct->_conf.get_val("mds_notification_dir_mask") | + CEPH_MDS_NOTIFY_ONLYDIR; + } else { + filter_mask = cct->_conf.get_val("mds_notification_file_mask"); + } + mask &= filter_mask; + uint64_t check_mask = mask & ALL_BIT_ON_MASK_EXCEPT_ONLY_DIR; + if (check_mask) { + std::shared_ptr message = + std::make_shared( + cur_notification_seq_id.fetch_add(1)); + message->create_move_message(whoami, session_id, mask, src_path, dest_path); + push_notification(message); + } +#endif +} + +void MDSNotificationManager::push_notification_snap(int32_t whoami, CInode *in, + const std::string &snapname, + uint64_t notify_mask, + bool is_dir) { +#ifdef WITH_CEPHFS_NOTIFICATION + std::string path; + in->make_path_string(path, true, nullptr); + std::shared_ptr message = + std::make_shared( + cur_notification_seq_id.fetch_add(1)); + uint64_t filter_mask = ALL_BIT_ON_MASK; + if (is_dir) { + notify_mask |= CEPH_MDS_NOTIFY_ONLYDIR; + filter_mask = cct->_conf.get_val("mds_notification_dir_mask") | + CEPH_MDS_NOTIFY_ONLYDIR; + } else { + filter_mask = cct->_conf.get_val("mds_notification_file_mask"); + } + notify_mask &= filter_mask; + uint64_t check_mask = notify_mask & ALL_BIT_ON_MASK_EXCEPT_ONLY_DIR; + if (check_mask) { + message->create_snap_message(whoami, session_id, notify_mask, path, + std::string(snapname)); + push_notification(message); + } +#endif +} \ No newline at end of file diff --git a/src/mds/MDSNotificationManager.h b/src/mds/MDSNotificationManager.h new file mode 100644 index 0000000000000..0933032a29ad0 --- /dev/null +++ b/src/mds/MDSNotificationManager.h @@ -0,0 +1,65 @@ +#pragma once +#include "CDentry.h" +#include "CInode.h" +#include "MDSRank.h" +#include "common/ceph_context.h" +#include "include/buffer.h" +#include + +#ifdef WITH_CEPHFS_NOTIFICATION +#include "MDSKafka.h" +#include "MDSNotificationMessage.h" +#include "MDSUDPEndpoint.h" +#include "messages/MNotificationInfoKafkaTopic.h" +#include "messages/MNotificationInfoUDPEndpoint.h" + +class MDSKafkaManager; +class MDSUDPManager; +#endif + +class MDSNotificationManager { +public: + MDSNotificationManager(MDSRank *mds); + void init(); + + // incoming notification endpoints + int dispatch(const cref_t &m); + int add_kafka_topic(const std::string &topic_name, + const std::string &endpoint_name, + const std::string &broker, bool use_ssl, + const std::string &user, const std::string &password, + const std::optional &ca_location, + const std::optional &mechanism, + bool write_into_disk, bool send_peers); + int remove_kafka_topic(const std::string &topic_name, + const std::string &endpoint_name, bool write_into_disk, + bool send_peers); + int add_udp_endpoint(const std::string &name, const std::string &ip, int port, + bool write_into_disk, bool send_peers); + int remove_udp_endpoint(const std::string &name, bool write_into_disk, + bool send_peers); + + void push_notification(int32_t whoami, CInode *in, uint64_t notify_mask, + bool projected, bool is_dir); + void push_notification_link(int32_t whoami, CInode *targeti, CDentry *destdn, + uint64_t notify_mask_for_target, + uint64_t notify_mask_for_link, bool is_dir); + void push_notification_move(int32_t whoami, CDentry *srcdn, CDentry *destdn, + bool is_dir); + void push_notification_snap(int32_t whoami, CInode *in, + const std::string &snapname, uint64_t notify_mask, + bool is_dir); + +private: +#ifdef WITH_CEPHFS_NOTIFICATION + std::unique_ptr kafka_manager; + std::unique_ptr udp_manager; + void + push_notification(const std::shared_ptr &message); +#endif + + CephContext *cct; + std::atomic cur_notification_seq_id; + std::string session_id; + MDSRank *mds; +}; \ No newline at end of file diff --git a/src/mds/MDSNotificationMessage.cc b/src/mds/MDSNotificationMessage.cc new file mode 100644 index 0000000000000..fd4df3f7b53a4 --- /dev/null +++ b/src/mds/MDSNotificationMessage.cc @@ -0,0 +1,78 @@ +#include "MDSNotificationMessage.h" +#include "common/Clock.h" +#include "common/ceph_json.h" + +#define dout_subsys ceph_subsys_mds + +MDSNotificationMessage::MDSNotificationMessage(uint64_t seq_id) + : seq_id(seq_id) {} + +void MDSNotificationMessage::create_message(int32_t whoami, + const std::string &session_id, + const uint64_t mask, + const std::string &path) { + JSONFormatter f; + f.open_object_section(""); + ceph_clock_now().gmtime_nsec(f.dump_stream("timestamp")); + f.dump_int("mds_id", (int64_t)whoami); + f.dump_string("session_id", session_id); + f.dump_unsigned("seq_id", seq_id); + f.dump_unsigned("mask", mask); + f.dump_string("path", path); + f.close_section(); + f.flush(message); +} + +void MDSNotificationMessage::create_move_message(int32_t whoami, + const std::string &session_id, + uint64_t mask, + const std::string &src_path, + const std::string &dest_path) { + JSONFormatter f; + f.open_object_section(""); + ceph_clock_now().gmtime_nsec(f.dump_stream("timestamp")); + f.dump_int("mds_id", (int64_t)whoami); + f.dump_string("session_id", session_id); + f.dump_unsigned("seq_id", seq_id); + f.dump_unsigned("mask", mask); + f.dump_string("src_path", src_path); + f.dump_string("dest_path", dest_path); + f.close_section(); + f.flush(message); +} + +void MDSNotificationMessage::create_link_message(int32_t whoami, + const std::string &session_id, + uint64_t target_mask, + uint64_t link_mask, + const std::string &target_path, + const std::string &link_path) { + JSONFormatter f; + f.open_object_section(""); + ceph_clock_now().gmtime_nsec(f.dump_stream("timestamp")); + f.dump_int("mds_id", (int64_t)whoami); + f.dump_string("session_id", session_id); + f.dump_unsigned("seq_id", seq_id); + f.dump_unsigned("target_mask", target_mask); + f.dump_unsigned("link_mask", link_mask); + f.dump_string("target_path", target_path); + f.dump_string("link_path", link_path); + f.close_section(); + f.flush(message); +} + +void MDSNotificationMessage::create_snap_message( + int32_t whoami, const std::string &session_id, uint64_t mask, + const std::string &path, const std::string &snapshot_name) { + JSONFormatter f; + f.open_object_section(""); + ceph_clock_now().gmtime_nsec(f.dump_stream("timestamp")); + f.dump_int("mds_id", (int64_t)whoami); + f.dump_string("session_id", session_id); + f.dump_unsigned("seq_id", seq_id); + f.dump_unsigned("mask", mask); + f.dump_string("path", path); + f.dump_string("snapshot_name", snapshot_name); + f.close_section(); + f.flush(message); +} \ No newline at end of file diff --git a/src/mds/MDSNotificationMessage.h b/src/mds/MDSNotificationMessage.h new file mode 100644 index 0000000000000..eab664107c0a0 --- /dev/null +++ b/src/mds/MDSNotificationMessage.h @@ -0,0 +1,23 @@ +#pragma once +#include "common/ceph_context.h" +#include "include/Context.h" +#include "include/buffer.h" +#include + +struct MDSNotificationMessage { + bufferlist message; + uint64_t seq_id; + MDSNotificationMessage(uint64_t seq_id); + void create_message(int32_t whoami, const std::string &session_id, + uint64_t mask, const std::string &path); + void create_move_message(int32_t whoami, const std::string &session_id, + uint64_t mask, const std::string &src_path, + const std::string &dest_path); + void create_link_message(int32_t whoami, const std::string &session_id, + uint64_t target_mask, uint64_t link_mask, + const std::string &target_path, + const std::string &link_path); + void create_snap_message(int32_t whoami, const std::string &session_id, + uint64_t mask, const std::string &path, + const std::string &snapshot_name); +}; \ No newline at end of file diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index c766e9ef1f982..60adbe9de08cb 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -552,6 +552,8 @@ MDSRank::MDSRank( server = new Server(this, &metrics_handler); locker = new Locker(this, mdcache); + notification_manager = std::make_unique(this); + quiesce_db_manager.reset(new QuiesceDbManager()); _heartbeat_reset_grace = g_conf().get_val("mds_heartbeat_reset_grace"); @@ -1069,6 +1071,11 @@ bool MDSRank::_dispatch(const cref_t &m, bool new_msg) if (is_stale_message(m)) { return true; } + + if (notification_manager->dispatch(m) == 0) { + return true; + } + // do not proceed if this message cannot be handled if (!is_valid_message(m)) { return false; @@ -1174,6 +1181,17 @@ bool MDSRank::_dispatch(const cref_t &m, bool new_msg) return true; } +void MDSRank::send_to_peers(const ref_t &m) { + set up; + get_mds_map()->get_up_mds_set(up); + for (const auto &r : up) { + if (r == get_nodeid()) { + continue; + } + send_message_mds(m, r); + } +} + void MDSRank::update_mlogger() { if (mlogger) { @@ -2157,6 +2175,7 @@ void MDSRank::active_start() finish_contexts(g_ceph_context, waiting_for_active); // kick waiters quiesce_agent_setup(); + notification_manager->init(); } void MDSRank::recovery_done(int oldstate) @@ -3095,6 +3114,49 @@ void MDSRankDispatcher::handle_asok_command( } else if (command == "quiesce db") { command_quiesce_db(cmdmap, on_finish); return; + } else if (command == "add_topic") { + std::string endpoint_name; + std::string topic_name, broker, username; + std::string password; + bool use_ssl; + std::optional ca_location, mechanism; + cmd_getval(cmdmap, "topic_name", topic_name); + cmd_getval(cmdmap, "endpoint_name", endpoint_name); + cmd_getval(cmdmap, "broker", broker); + if (!cmd_getval(cmdmap, "use_ssl", use_ssl)) { + use_ssl = false; + } + cmd_getval(cmdmap, "username", username); + cmd_getval(cmdmap, "password", password); + std::string ca, mch; + if (cmd_getval(cmdmap, "ca_location", ca)) { + ca_location = ca; + } + if (cmd_getval(cmdmap, "mechanism", mch)) { + mechanism = mch; + } + r = notification_manager->add_kafka_topic( + topic_name, endpoint_name, broker, use_ssl, username, password, + ca_location, mechanism, true, true); + } else if (command == "remove_topic") { + std::string topic_name, endpoint_name; + cmd_getval(cmdmap, "topic_name", topic_name); + cmd_getval(cmdmap, "endpoint_name", endpoint_name); + r = notification_manager->remove_kafka_topic(topic_name, endpoint_name, + true, true); + } + else if (command == "add_udp_endpoint") { + std::string ip, name; + int64_t port; + cmd_getval(cmdmap, "entity", name); + cmd_getval(cmdmap, "ip", ip); + cmd_getval(cmdmap, "port", port); + r = notification_manager->add_udp_endpoint(name, ip, (int)port, true, true); + } + else if (command == "remove_udp_endpoint") { + std::string name; + cmd_getval(cmdmap, "entity", name); + r = notification_manager->remove_udp_endpoint(name, true, true); } else { r = -CEPHFS_ENOSYS; } diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 9ea6ddd96d13b..d781346bd77fb 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -44,6 +44,8 @@ #include "MetricsHandler.h" #include "osdc/Journaler.h" #include "MDSMetaRequest.h" +#include "MDSNotificationManager.h" + // Full .h import instead of forward declaration for PerfCounter, for the // benefit of those including this header and using MDSRank::logger @@ -153,6 +155,7 @@ class ScrubStack; class C_ExecAndReply; class QuiesceDbManager; class QuiesceAgent; +class MDSNotificationManager; /** * The public part of this class's interface is what's exposed to all @@ -198,6 +201,7 @@ class MDSRank { } bool is_daemon_stopping() const; + void send_to_peers(const ref_t& m); MDSTableClient *get_table_client(int t); MDSTableServer *get_table_server(int t); @@ -424,6 +428,7 @@ class MDSRank { SnapServer *snapserver = nullptr; SnapClient *snapclient = nullptr; + std::unique_ptr notification_manager; SessionMap sessionmap; diff --git a/src/mds/MDSUDPEndpoint.cc b/src/mds/MDSUDPEndpoint.cc new file mode 100644 index 0000000000000..32260a4d1111c --- /dev/null +++ b/src/mds/MDSUDPEndpoint.cc @@ -0,0 +1,257 @@ +#include "MDSUDPEndpoint.h" +#include "include/fs_types.h" + +#define dout_subsys ceph_subsys_mds + +MDSUDPConnection::MDSUDPConnection(const std::string &ip, int port) + : ip(ip), port(port) {} + +void MDSUDPConnection::encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(ip, bl); + encode(port, bl); + ENCODE_FINISH(bl); +} + +void MDSUDPConnection::dump(ceph::Formatter *f) const { + f->dump_string("ip", ip); + f->dump_bool("port", port); +} + +void MDSUDPConnection::generate_test_instances( + std::list &o) { + o.push_back(new MDSUDPConnection); +} + +void MDSUDPConnection::decode(ceph::buffer::list::const_iterator &iter) { + DECODE_START(1, iter); + decode(ip, iter); + decode(port, iter); + DECODE_FINISH(iter); +} + +int MDSUDPManager::load_data(std::map &mp) { + int r = update_omap(std::map()); + if (r < 0) { + return r; + } + C_SaferCond sync_finisher; + ObjectOperation op; + op.omap_get_vals("", "", UINT_MAX, &mp, NULL, NULL); + mds->objecter->read(object_t(object_name), + object_locator_t(mds->get_metadata_pool()), op, + CEPH_NOSNAP, NULL, 0, &sync_finisher); + r = sync_finisher.wait(); + if (r < 0) { + lderr(mds->cct) << "Error reading omap values from object '" << object_name + << "':" << cpp_strerror(r) << dendl; + } + return r; +} + +int MDSUDPManager::update_omap(const std::map &mp) { + C_SaferCond sync_finisher; + ObjectOperation op; + op.omap_set(mp); + mds->objecter->mutate( + object_t(object_name), object_locator_t(mds->get_metadata_pool()), op, + SnapContext(), ceph::real_clock::now(), 0, &sync_finisher); + int r = sync_finisher.wait(); + if (r < 0) { + lderr(mds->cct) << "Error updating omap of object '" << object_name + << "':" << cpp_strerror(r) << dendl; + } + return r; +} + +int MDSUDPManager::remove_keys(const std::set &st) { + C_SaferCond sync_finisher; + ObjectOperation op; + op.omap_rm_keys(st); + mds->objecter->mutate( + object_t(object_name), object_locator_t(mds->get_metadata_pool()), op, + SnapContext(), ceph::real_clock::now(), 0, &sync_finisher); + int r = sync_finisher.wait(); + if (r < 0) { + lderr(mds->cct) << "Error removing keys from omap of object '" + << object_name << "':" << cpp_strerror(r) << dendl; + } + return r; +} + +int MDSUDPManager::add_endpoint_into_disk(const std::string &name, + const MDSUDPConnection &connection) { + std::map mp; + bufferlist bl; + encode(connection, bl); + mp[name] = std::move(bl); + int r = update_omap(mp); + return r; +} + +int MDSUDPManager::remove_endpoint_from_disk(const std::string &name) { + std::set st; + st.insert(name); + int r = remove_keys(st); + return r; +} + +MDSUDPManager::MDSUDPManager(MDSRank *mds) + : mds(mds), cct(mds->cct), object_name("mds_udp_endpoints"), paused(true) {} + +int MDSUDPManager::init() { + std::map mp; + int r = load_data(mp); + if (r < 0) { + lderr(cct) << "Error occurred while initilizing UDP endpoints" << dendl; + return r; + } + for (auto &[key, val] : mp) { + try { + MDSUDPConnection connection; + auto iter = val.cbegin(); + decode(connection, iter); + add_endpoint(key, connection, false); + } catch (const ceph::buffer::error &e) { + ldout(cct, 1) + << "No value exist in the omap of object 'mds_udp_endpoints' " + "for udp entity name '" + << key << "'" << dendl; + } + } + return r; +} + +int MDSUDPManager::send( + const std::shared_ptr &message) { + if (paused) { + return -CEPHFS_ECANCELED; + } + std::shared_lock lock(endpoint_mutex); + std::vector buf(2); + for (auto &[key, endpoint] : endpoints) { + uint64_t len = message->message.length(); + buf[0] = boost::asio::buffer(&len, sizeof(len)); + buf[1] = boost::asio::buffer(message->message.c_str(), + message->message.length()); + endpoint->publish_internal(buf, message->seq_id); + } + return 0; +} + +int MDSUDPManager::add_endpoint(const std::string &name, + const MDSUDPConnection &connection, + bool write_into_disk) { + std::unique_lock lock(endpoint_mutex); + std::shared_ptr new_endpoint; + auto it = endpoints.find(name); + int r = 0; + if (it == endpoints.end() && endpoints.size() >= MAX_CONNECTIONS_DEFAULT) { + ldout(cct, 1) << "UDP connect: max connections exceeded" << dendl; + r = -CEPHFS_ENOMEM; + goto error_occurred; + } + new_endpoint = MDSUDPEndpoint::create(cct, name, connection); + if (!new_endpoint) { + ldout(cct, 1) << "UDP connect: udp endpoint creation failed" << dendl; + r = -CEPHFS_ECANCELED; + goto error_occurred; + } + endpoints[name] = new_endpoint; + if (write_into_disk) { + r = add_endpoint_into_disk(name, connection); + if (r < 0) { + goto error_occurred; + } + } + ldout(cct, 1) << "UDP endpoint with entity name '" << name + << "' is added successfully" << dendl; + activate(); + return r; +error_occurred: + lderr(cct) << "UDP endpoint with entity name '" << name + << "' can not be added, failed with an error:" << cpp_strerror(r) + << dendl; + return r; +} + +void MDSUDPManager::activate() { + paused = false; +} + +int MDSUDPManager::remove_endpoint(const std::string &name, + bool write_into_disk) { + std::unique_lock lock(endpoint_mutex); + int r = 0; + auto it = endpoints.find(name); + if (it != endpoints.end()) { + endpoints.erase(it); + if (write_into_disk) { + r = remove_endpoint_from_disk(name); + } + if (r == 0) { + ldout(cct, 1) << "UDP endpoint with entity name '" << name + << "' is removed successfully" << dendl; + } else { + lderr(cct) << "UDP endpoint '" << name + << "' can not be removed, failed with an error:" + << cpp_strerror(r) << dendl; + } + if (endpoints.empty()) { + pause(); + } + return r; + } + ldout(cct, 1) << "No UDP endpoint exist with entity name '" << name << "'" + << dendl; + return -CEPHFS_EINVAL; +} + +void MDSUDPManager::pause() { + paused = true; +} + +MDSUDPEndpoint::MDSUDPEndpoint(CephContext *cct, const std::string &name, + const MDSUDPConnection &connection) + : cct(cct), name(name), socket(io_context), connection(connection), + endpoint(boost::asio::ip::address::from_string(connection.ip), + connection.port) { + try { + boost::system::error_code ec; + socket.open(boost::asio::ip::udp::v4(), ec); + if (ec) { + throw std::runtime_error(ec.message()); + } + } catch (const std::exception &e) { + lderr(cct) << "Error occurred while opening UDP socket with error:" + << e.what() << dendl; + throw; + } +} + +std::shared_ptr +MDSUDPEndpoint::create(CephContext *cct, const std::string &name, + const MDSUDPConnection &connection) { + try { + std::shared_ptr endpoint = + std::make_shared(cct, name, connection); + return endpoint; + } catch (...) { + } + return nullptr; +} + +int MDSUDPEndpoint::publish_internal( + std::vector &buf, uint64_t seq_id) { + boost::system::error_code ec; + socket.send_to(buf, endpoint, 0, ec); + if (ec) { + ldout(cct, 1) << "Error occurred while sending notification having seq_id=" + << seq_id << ":" << ec.message() << dendl; + return -ec.value(); + } else { + ldout(cct, 20) << "Notification having seq_id=" << seq_id << " delivered" + << dendl; + } + return 0; +} \ No newline at end of file diff --git a/src/mds/MDSUDPEndpoint.h b/src/mds/MDSUDPEndpoint.h new file mode 100644 index 0000000000000..0b16696d8c2dd --- /dev/null +++ b/src/mds/MDSUDPEndpoint.h @@ -0,0 +1,67 @@ +#pragma once + +#include "MDSNotificationMessage.h" +#include "MDSRank.h" +#include + +class MDSUDPEndpoint; + +struct MDSUDPConnection { + std::string ip; + int port; + MDSUDPConnection() = default; + MDSUDPConnection(const std::string &ip, int port); + void encode(ceph::buffer::list &bl) const; + void decode(ceph::buffer::list::const_iterator &iter); + void dump(ceph::Formatter *f) const; + static void generate_test_instances(std::list &o); +}; +WRITE_CLASS_ENCODER(MDSUDPConnection) + +class MDSUDPManager { +public: + MDSUDPManager(MDSRank *mds); + int init(); + int send(const std::shared_ptr &message); + int add_endpoint(const std::string &name, const MDSUDPConnection &connection, + bool write_into_disk); + int remove_endpoint(const std::string &name, bool write_into_disk); + +private: + int load_data(std::map &mp); + int add_endpoint_into_disk(const std::string &name, + const MDSUDPConnection &connection); + int remove_endpoint_from_disk(const std::string &name); + int update_omap(const std::map &mp); + int remove_keys(const std::set &st); + void activate(); + void pause(); + CephContext *cct; + std::shared_mutex endpoint_mutex; + std::unordered_map> endpoints; + static const size_t MAX_CONNECTIONS_DEFAULT = 256; + MDSRank *mds; + std::string object_name; + std::atomic paused; +}; + +class MDSUDPEndpoint { +public: + MDSUDPEndpoint() = delete; + MDSUDPEndpoint(CephContext *cct, const std::string &name, + const MDSUDPConnection &connection); + int publish_internal(std::vector &buf, + uint64_t seq_id); + static std::shared_ptr + create(CephContext *cct, const std::string &name, + const MDSUDPConnection &connection); + friend class MDSUDPManager; + +private: + std::string name; + MDSUDPConnection connection; + boost::asio::io_context io_context; + boost::asio::ip::udp::socket socket; + boost::asio::ip::udp::endpoint endpoint; + CephContext *cct; +}; \ No newline at end of file diff --git a/src/mds/Server.cc b/src/mds/Server.cc index bd8e1f11b1cfc..4fa74f58a51d9 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -32,6 +32,10 @@ #include "MetricsHandler.h" #include "cephfs_features.h" #include "MDSContext.h" +#ifdef WITH_CEPHFS_NOTIFICATION +#include "messages/MNotificationInfoKafkaTopic.h" +#include "messages/MNotificationInfoUDPEndpoint.h" +#endif #include "msg/Messenger.h" @@ -245,6 +249,21 @@ void Server::create_logger() plb.add_time_avg(l_mdss_req_snapdiff_latency, "req_snapdiff_latency", "Request type snapshot difference latency"); +#ifdef WITH_CEPHFS_NOTIFICATION + plb.add_time_avg(l_mdss_req_add_kafka_topic_latency, + "req_add_kafka_topic_latency", + "Request type add kafka topic latency"); + plb.add_time_avg(l_mdss_req_remove_kafka_topic_latency, + "req_remove_kafka_topic_latency", + "Request type remove kafka topic latency"); + plb.add_time_avg(l_mdss_req_add_udp_endpoint_latency, + "req_add_udp_endpoint_latency", + "Request type add udp endpoint latency"); + plb.add_time_avg(l_mdss_req_remove_udp_endpoint_latency, + "req_remove_udp_endpoint_latency", + "Request type remove udp endpoint latency"); +#endif + plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY); plb.add_u64_counter(l_mdss_dispatch_client_request, "dispatch_client_request", "Client requests dispatched"); @@ -275,6 +294,19 @@ Server::Server(MDSRank *m, MetricsHandler *metrics_handler) : bal_fragment_size_max = g_conf().get_val("mds_bal_fragment_size_max"); supported_features = feature_bitset_t(CEPHFS_FEATURES_MDS_SUPPORTED); supported_metric_spec = feature_bitset_t(CEPHFS_METRIC_FEATURES_ALL); + // connection_t conn("localhost:9093", true, "admin", "admin-secret", + // std::nullopt, std::nullopt); + // MDSAsyncNotificationManager::create(mds->cct); + // MDSSyncNotificationManager::create(mds->cct); + // notification_manager = std::make_unique(mds); + // topic_ptr = MDSKafkaTopic::create( + // "my-topic", mds->cct, + // connection_t("localhost:9093", true, "admin", "admin-secret", + // std::optional( + // "/home/sajibreadd/croit/certs-kafka/ca-cert"), + // std::optional("PLAIN"))); + // udp_sender = + // MDSUDPNotificationSender::create("udp", mds->cct, "127.0.0.1", 8080); } void Server::dispatch(const cref_t &m) @@ -2179,6 +2211,22 @@ void Server::perf_gather_op_latency(const cref_t &req, utime_t l case CEPH_MDS_OP_READDIR_SNAPDIFF: code = l_mdss_req_snapdiff_latency; break; + +#ifdef WITH_CEPHFS_NOTIFICATION + case CEPH_MDS_OP_ADD_KAFKA_TOPIC: + code = l_mdss_req_add_kafka_topic_latency; + break; + case CEPH_MDS_OP_REMOVE_KAFKA_TOPIC: + code = l_mdss_req_remove_kafka_topic_latency; + break; + case CEPH_MDS_OP_ADD_UDP_ENDPOINT: + code = l_mdss_req_add_udp_endpoint_latency; + break; + case CEPH_MDS_OP_REMOVE_UDP_ENDPOINT: + code = l_mdss_req_remove_udp_endpoint_latency; + break; +#endif + default: dout(1) << ": unknown client op" << dendl; return; @@ -2827,6 +2875,21 @@ void Server::dispatch_client_request(const MDRequestRef& mdr) case CEPH_MDS_OP_READDIR_SNAPDIFF: handle_client_readdir_snapdiff(mdr); break; +#ifdef WITH_CEPHFS_NOTIFICATION + // notifications + case CEPH_MDS_OP_ADD_KAFKA_TOPIC: + handle_client_add_kafka_topic(mdr); + break; + case CEPH_MDS_OP_REMOVE_KAFKA_TOPIC: + handle_client_remove_kafka_topic(mdr); + break; + case CEPH_MDS_OP_ADD_UDP_ENDPOINT: + handle_client_add_udp_endpoint(mdr); + break; + case CEPH_MDS_OP_REMOVE_UDP_ENDPOINT: + handle_client_remove_udp_endpoint(mdr); + break; +#endif default: dout(1) << " unknown client op " << req->get_op() << dendl; @@ -4594,6 +4657,10 @@ void Server::handle_client_open(const MDRequestRef& mdr) if (cmode & CEPH_FILE_MODE_WR) mds->locker->check_inode_max_size(cur); + mds->notification_manager->push_notification( + mds->get_nodeid(), cur, CEPH_MDS_NOTIFY_OPEN | CEPH_MDS_NOTIFY_ACCESS, + true, cur->is_dir()); + // make sure this inode gets into the journal if (cur->is_auth() && cur->last == CEPH_NOSNAP && mdcache->open_file_table.should_log_open(cur)) { @@ -4809,6 +4876,10 @@ void Server::handle_client_openc(const MDRequestRef& mdr) set_reply_extra_bl(req, _inode->ino, mdr->reply_extra_bl); + mds->notification_manager->push_notification( + mds->get_nodeid(), newi, CEPH_MDS_NOTIFY_CREATE | CEPH_MDS_NOTIFY_OPEN, + true, newi->is_dir()); + journal_and_reply(mdr, newi, dn, le, fin); // We hit_dir (via hit_inode) in our finish callback, but by then we might @@ -5486,7 +5557,11 @@ void Server::handle_client_setattr(const MDRequestRef& mdr) le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY); mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur); - + + mds->notification_manager->push_notification( + mds->get_nodeid(), cur, + CEPH_MDS_NOTIFY_SET_ATTRIB | CEPH_MDS_NOTIFY_ACCESS, true, cur->is_dir()); + journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur, truncating_smaller, changed_ranges)); @@ -5551,6 +5626,11 @@ void Server::do_open_truncate(const MDRequestRef& mdr, int cmode) dn = mdr->dn[0].back(); } + mds->notification_manager->push_notification(mds->get_nodeid(), in, + CEPH_MDS_NOTIFY_MODIFY | + CEPH_MDS_NOTIFY_ACCESS | + CEPH_MDS_NOTIFY_OPEN, true, in->is_dir()); + journal_and_reply(mdr, in, dn, le, new C_MDS_inode_update_finish(this, mdr, in, old_size > 0, changed_ranges)); // Although the `open` part can give an early reply, the truncation won't @@ -5639,7 +5719,9 @@ void Server::handle_client_setlayout(const MDRequestRef& mdr) le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY); mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur); - + mds->notification_manager->push_notification( + mds->get_nodeid(), cur, CEPH_MDS_NOTIFY_SET_LAYOUT, true, cur->is_dir()); + journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur)); } @@ -5756,6 +5838,10 @@ void Server::handle_client_setdirlayout(const MDRequestRef& mdr) mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur); mdr->no_early_reply = true; + + mds->notification_manager->push_notification( + mds->get_nodeid(), cur, CEPH_MDS_NOTIFY_SET_LAYOUT, true, cur->is_dir()); + journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur)); } @@ -6728,6 +6814,9 @@ void Server::handle_client_setxattr(const MDRequestRef& mdr) mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY); mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur); + mds->notification_manager->push_notification( + mds->get_nodeid(), cur, CEPH_MDS_NOTIFY_SET_XATTRIB, true, cur->is_dir()); + journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur)); } @@ -6796,6 +6885,8 @@ void Server::handle_client_removexattr(const MDRequestRef& mdr) le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid()); mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY); mdcache->journal_dirty_inode(mdr.get(), &le->metablob, cur); + mds->notification_manager->push_notification( + mds->get_nodeid(), cur, CEPH_MDS_NOTIFY_REM_XATTRIB, true, cur->is_dir()); journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(this, mdr, cur)); } @@ -7113,6 +7204,11 @@ void Server::handle_client_mknod(const MDRequestRef& mdr) PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); le->metablob.add_primary_dentry(dn, newi, true, true, true); + mds->notification_manager->push_notification(mds->get_nodeid(), newi, + CEPH_MDS_NOTIFY_CREATE | + CEPH_MDS_NOTIFY_SET_ATTRIB, + true, newi->is_dir()); + journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi)); mds->balancer->maybe_fragment(dn->get_dir(), false); } @@ -7204,6 +7300,9 @@ void Server::handle_client_mkdir(const MDRequestRef& mdr) // make sure this inode gets into the journal le->metablob.add_opened_ino(newi->ino()); + mds->notification_manager->push_notification( + mds->get_nodeid(), newi, CEPH_MDS_NOTIFY_CREATE, true, newi->is_dir()); + journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi)); // We hit_dir (via hit_inode) in our finish callback, but by then we might @@ -7267,6 +7366,9 @@ void Server::handle_client_symlink(const MDRequestRef& mdr) mdcache->predirty_journal_parents(mdr, &le->metablob, newi, dn->get_dir(), PREDIRTY_PRIMARY|PREDIRTY_DIR, 1); le->metablob.add_primary_dentry(dn, newi, true, true); + mds->notification_manager->push_notification( + mds->get_nodeid(), newi, CEPH_MDS_NOTIFY_CREATE, true, newi->is_dir()); + journal_and_reply(mdr, newi, dn, le, new C_MDS_mknod_finish(this, mdr, dn, newi)); mds->balancer->maybe_fragment(dir, false); @@ -7398,6 +7500,10 @@ void Server::handle_client_link(const MDRequestRef& mdr) // go! ceph_assert(g_conf()->mds_kill_link_at != 1); + mds->notification_manager->push_notification_link( + mds->get_nodeid(), targeti, destdn, CEPH_MDS_NOTIFY_SET_ATTRIB, + CEPH_MDS_NOTIFY_CREATE, targeti->is_dir()); + // local or remote? if (targeti->is_auth()) _link_local(mdr, destdn, targeti, target_realm); @@ -8127,6 +8233,10 @@ void Server::handle_client_unlink(const MDRequestRef& mdr) if (!rmdir && dnl->is_primary() && mdr->dn[0].size() == 1) mds->locker->create_lock_cache(mdr, diri); + mds->notification_manager->push_notification_link( + mds->get_nodeid(), in, dn, CEPH_MDS_NOTIFY_SET_ATTRIB, + CEPH_MDS_NOTIFY_DELETE, in->is_dir()); + // ok! if (dnl->is_remote() && !dnl->get_inode()->is_auth()) _link_remote(mdr, false, dn, dnl->get_inode()); @@ -9156,6 +9266,9 @@ void Server::handle_client_rename(const MDRequestRef& mdr) // -- commit locally -- C_MDS_rename_finish *fin = new C_MDS_rename_finish(this, mdr, srcdn, destdn, straydn); + mds->notification_manager->push_notification_move(mds->get_nodeid(), srcdn, + destdn, srci->is_dir()); + journal_and_reply(mdr, srci, destdn, le, fin); mds->balancer->maybe_fragment(destdn->get_dir(), false); } @@ -11125,6 +11238,10 @@ void Server::handle_client_mksnap(const MDRequestRef& mdr) mdcache->predirty_journal_parents(mdr, &le->metablob, diri, 0, PREDIRTY_PRIMARY, false); mdcache->journal_dirty_inode(mdr.get(), &le->metablob, diri); + mds->notification_manager->push_notification_snap( + mds->get_nodeid(), diri, std::string(snapname), + CEPH_MDS_NOTIFY_CREATE | CEPH_MDS_NOTIFY_SET_ATTRIB, diri->is_dir()); + // journal the snaprealm changes submit_mdlog_entry(le, new C_MDS_mksnap_finish(this, mdr, diri, info), mdr, __func__); @@ -11258,6 +11375,10 @@ void Server::handle_client_rmsnap(const MDRequestRef& mdr) mdcache->predirty_journal_parents(mdr, &le->metablob, diri, 0, PREDIRTY_PRIMARY, false); mdcache->journal_dirty_inode(mdr.get(), &le->metablob, diri); + mds->notification_manager->push_notification_snap( + mds->get_nodeid(), diri, std::string(snapname), + CEPH_MDS_NOTIFY_DELETE | CEPH_MDS_NOTIFY_SET_ATTRIB, diri->is_dir()); + submit_mdlog_entry(le, new C_MDS_rmsnap_finish(this, mdr, diri, snapid), mdr, __func__); mdlog->flush(); @@ -11875,3 +11996,81 @@ bool Server::build_snap_diff( } return it == dir->end(); } + +#ifdef WITH_CEPHFS_NOTIFICATION + +// FIXME handling user rights +void Server::handle_client_add_kafka_topic(const MDRequestRef &mdr) { + const cref_t &req = mdr->client_request; + KafkaTopicPayload payload; + if (req->get_data().length()) { + try { + auto iter = req->get_data().cbegin(); + decode(payload, iter); + } catch (const ceph::buffer::error &e) { + dout(1) << ": no data in kafka topic payload" << dendl; + respond_to_request(mdr, -CEPHFS_EINVAL); + return; + } + } + int r = mds->notification_manager->add_kafka_topic( + payload.topic_name, payload.endpoint_name, payload.broker, + payload.use_ssl, payload.user, payload.password, payload.ca_location, + payload.mechanism, true, true); + respond_to_request(mdr, r); +} + +void Server::handle_client_remove_kafka_topic(const MDRequestRef &mdr) { + const cref_t &req = mdr->client_request; + KafkaTopicPayload payload; + if (req->get_data().length()) { + try { + auto iter = req->get_data().cbegin(); + decode(payload, iter); + } catch (const ceph::buffer::error &e) { + dout(1) << ": no data in kafka topic payload" << dendl; + respond_to_request(mdr, -CEPHFS_EINVAL); + return; + } + } + int r = mds->notification_manager->remove_kafka_topic( + payload.topic_name, payload.endpoint_name, true, true); + respond_to_request(mdr, r); +} + +void Server::handle_client_add_udp_endpoint(const MDRequestRef &mdr) { + const cref_t &req = mdr->client_request; + UDPEndpointPayload payload; + if (req->get_data().length()) { + try { + auto iter = req->get_data().cbegin(); + decode(payload, iter); + } catch (const ceph::buffer::error &e) { + dout(1) << ": no data in udp endpoint payload" << dendl; + respond_to_request(mdr, -CEPHFS_EINVAL); + return; + } + } + int r = mds->notification_manager->add_udp_endpoint(payload.name, payload.ip, + payload.port, true, true); + respond_to_request(mdr, r); +} + +void Server::handle_client_remove_udp_endpoint(const MDRequestRef &mdr) { + const cref_t &req = mdr->client_request; + UDPEndpointPayload payload; + if (req->get_data().length()) { + try { + auto iter = req->get_data().cbegin(); + decode(payload, iter); + } catch (const ceph::buffer::error &e) { + dout(1) << ": no data in udp endpoint payload" << dendl; + respond_to_request(mdr, -CEPHFS_EINVAL); + return; + } + } + int r = + mds->notification_manager->remove_udp_endpoint(payload.name, true, true); + respond_to_request(mdr, r); +} +#endif diff --git a/src/mds/Server.h b/src/mds/Server.h index 031242efa4ed7..6e572cb30a455 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -37,6 +37,7 @@ using namespace std::literals::string_view_literals; #include "Mutation.h" #include "MDSContext.h" +// class MDSNotificationManager; class OSDMap; class LogEvent; class EMetaBlob; @@ -84,6 +85,14 @@ enum { l_mdss_cap_revoke_eviction, l_mdss_cap_acquisition_throttle, l_mdss_req_getvxattr_latency, + +#ifdef WITH_CEPHFS_NOTIFICATION + l_mdss_req_add_kafka_topic_latency, + l_mdss_req_remove_kafka_topic_latency, + l_mdss_req_add_udp_endpoint_latency, + l_mdss_req_remove_udp_endpoint_latency, +#endif + l_mdss_last, }; @@ -122,6 +131,13 @@ public: return last_recall_state; } +#ifdef WITH_CEPHFS_NOTIFICATION + void handle_client_add_kafka_topic(const MDRequestRef& mdr); + void handle_client_remove_kafka_topic(const MDRequestRef& mdr); + void handle_client_add_udp_endpoint(const MDRequestRef& mdr); + void handle_client_remove_udp_endpoint(const MDRequestRef& mdr); +#endif + void handle_client_session(const cref_t &m); void _session_logged(Session *session, uint64_t state_seq, bool open, version_t pv, const interval_set& inos_to_free, version_t piv, diff --git a/src/messages/MClientRequest.h b/src/messages/MClientRequest.h index cb9a888b0bed5..2a0ebbab4f6e6 100644 --- a/src/messages/MClientRequest.h +++ b/src/messages/MClientRequest.h @@ -75,6 +75,114 @@ struct SnapPayload { WRITE_CLASS_ENCODER(SnapPayload) +struct NotificationEndpointPayload { + virtual void encode(ceph::buffer::list &bl) const = 0; + virtual void decode(ceph::buffer::list::const_iterator &iter) = 0; + virtual void dump(ceph::Formatter *f) const = 0; + virtual ~NotificationEndpointPayload() {} +}; + +struct KafkaTopicPayload final : public NotificationEndpointPayload { + std::string topic_name; + std::string endpoint_name; + std::string broker; + bool use_ssl = false; + std::string user, password; + std::optional ca_location; + std::optional mechanism; + KafkaTopicPayload() {} + KafkaTopicPayload(const std::string &topic_name, + const std::string &endpoint_name, const std::string &broker, + bool use_ssl, const std::string &user, + const std::string &password, + const std::optional &ca_location, + const std::optional &mechanism) + : topic_name(topic_name), endpoint_name(endpoint_name), broker(broker), + use_ssl(use_ssl), user(user), password(password), + ca_location(ca_location), mechanism(mechanism) {} + KafkaTopicPayload(const std::string &topic_name, + const std::string &endpoint_name) + : topic_name(topic_name), endpoint_name(endpoint_name) {} + void encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(topic_name, bl); + encode(endpoint_name, bl); + encode(broker, bl); + encode(use_ssl, bl); + encode(user, bl); + encode(password, bl); + encode(ca_location, bl); + encode(mechanism, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator &iter) { + DECODE_START(1, iter); + decode(topic_name, iter); + decode(endpoint_name, iter); + decode(broker, iter); + decode(use_ssl, iter); + decode(user, iter); + decode(password, iter); + decode(ca_location, iter); + decode(mechanism, iter); + DECODE_FINISH(iter); + } + void dump(ceph::Formatter *f) const { + f->dump_string("topic_name", topic_name); + f->dump_string("endpoint_name", endpoint_name); + f->dump_string("broker", broker); + f->dump_bool("use_ssl", use_ssl); + f->dump_string("user", user); + f->dump_string("password", password); + if (ca_location.has_value()) { + f->dump_string("ca_location", ca_location.value()); + } + if (mechanism.has_value()) { + f->dump_string("mechanism", mechanism.value()); + } + } + static void generate_test_instances(std::list &o) { + o.push_back(new KafkaTopicPayload); + } +}; + +WRITE_CLASS_ENCODER(KafkaTopicPayload) + +struct UDPEndpointPayload final : public NotificationEndpointPayload { + std::string name; + std::string ip; + int port = -1; + UDPEndpointPayload() {} + UDPEndpointPayload(const std::string &name, const std::string &ip, int port) + : name(name), ip(ip), port(port) { + } + UDPEndpointPayload(const std::string &name) : name(name) {} + void encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(name, bl); + encode(ip, bl); + encode(port, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator &iter) { + DECODE_START(1, iter); + decode(name, iter); + decode(ip, iter); + decode(port, iter); + DECODE_FINISH(iter); + } + void dump(ceph::Formatter *f) const { + f->dump_string("name", name); + f->dump_string("ip", ip); + f->dump_int("port", port); + } + static void generate_test_instances(std::list &o) { + o.push_back(new UDPEndpointPayload); + } +}; + +WRITE_CLASS_ENCODER(UDPEndpointPayload) + // metadata ops. class MClientRequest final : public MMDSOp { diff --git a/src/messages/MNotificationInfoKafkaTopic.h b/src/messages/MNotificationInfoKafkaTopic.h new file mode 100644 index 0000000000000..1c130fff5950f --- /dev/null +++ b/src/messages/MNotificationInfoKafkaTopic.h @@ -0,0 +1,80 @@ +#pragma once +#include "messages/MMDSOp.h" + +class MNotificationInfoKafkaTopic : public MMDSOp { + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +public: + std::string topic_name; + std::string endpoint_name; + std::string broker; + bool use_ssl; + std::string user, password; + std::optional ca_location; + std::optional mechanism; + bool is_remove; + +protected: + MNotificationInfoKafkaTopic() + : MMDSOp(MSG_MDS_NOTIFICATION_INFO_KAFKA_TOPIC, HEAD_VERSION, + COMPAT_VERSION) {} + MNotificationInfoKafkaTopic(const std::string &topic_name, + const std::string &endpoint_name, bool is_remove) + : MMDSOp(MSG_MDS_NOTIFICATION_INFO_KAFKA_TOPIC, HEAD_VERSION, + COMPAT_VERSION), + topic_name(topic_name), endpoint_name(endpoint_name), + is_remove(is_remove) {} + MNotificationInfoKafkaTopic(const std::string &topic_name, + const std::string &endpoint_name, + const std::string &broker, bool use_ssl, + const std::string &user, + const std::string &password, + const std::optional &ca_location, + const std::optional &mechanism, + bool is_remove) + : MMDSOp(MSG_MDS_NOTIFICATION_INFO_KAFKA_TOPIC, HEAD_VERSION, + COMPAT_VERSION), + topic_name(topic_name), endpoint_name(endpoint_name), broker(broker), + use_ssl(use_ssl), user(user), password(password), + ca_location(ca_location), mechanism(mechanism), is_remove(is_remove) {} + ~MNotificationInfoKafkaTopic() final {} + +public: + std::string_view get_type_name() const override { return "mdskafka_topic"; } + + void print(std::ostream &out) const override { out << "mdskafka_topic"; } + + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(topic_name, payload); + encode(endpoint_name, payload); + encode(broker, payload); + encode(use_ssl, payload); + encode(user, payload); + encode(password, payload); + encode(ca_location, payload); + encode(mechanism, payload); + encode(is_remove, payload); + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + decode(topic_name, p); + decode(endpoint_name, p); + decode(broker, p); + decode(use_ssl, p); + decode(user, p); + decode(password, p); + decode(ca_location, p); + decode(mechanism, p); + decode(is_remove, p); + } + +private: + template + friend boost::intrusive_ptr ceph::make_message(Args &&...args); + template + friend MURef crimson::make_message(Args &&...args); +}; \ No newline at end of file diff --git a/src/messages/MNotificationInfoUDPEndpoint.h b/src/messages/MNotificationInfoUDPEndpoint.h new file mode 100644 index 0000000000000..44cd5c2d022c7 --- /dev/null +++ b/src/messages/MNotificationInfoUDPEndpoint.h @@ -0,0 +1,60 @@ +#pragma once +#include "messages/MMDSOp.h" + +class MNotificationInfoUDPEndpoint : public MMDSOp { + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +public: + std::string name; + std::string ip; + int port; + bool is_remove; + +protected: + MNotificationInfoUDPEndpoint() + : MMDSOp(MSG_MDS_NOTIFICATION_INFO_UDP_ENDPOINT, HEAD_VERSION, + COMPAT_VERSION) {} + MNotificationInfoUDPEndpoint(const std::string &name, bool is_remove) + : MMDSOp(MSG_MDS_NOTIFICATION_INFO_UDP_ENDPOINT, HEAD_VERSION, + COMPAT_VERSION), + name(name), is_remove(is_remove) {} + MNotificationInfoUDPEndpoint(const std::string &name, const std::string &ip, + int port, bool is_remove) + : MMDSOp(MSG_MDS_NOTIFICATION_INFO_UDP_ENDPOINT, HEAD_VERSION, + COMPAT_VERSION), + name(name), ip(ip), port(port), is_remove(is_remove) {} + ~MNotificationInfoUDPEndpoint() final {} + +public: + std::string_view get_type_name() const override { + return "mdsudp_notification_client"; + } + + void print(std::ostream &out) const override { + out << "mdsudp_notification_client"; + } + + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(name, payload); + encode(ip, payload); + encode(port, payload); + encode(is_remove, payload); + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + decode(name, p); + decode(ip, p); + decode(port, p); + decode(is_remove, p); + } + +private: + template + friend boost::intrusive_ptr ceph::make_message(Args &&...args); + template + friend MURef crimson::make_message(Args &&...args); +}; \ No newline at end of file diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 22208d2d1f428..8e367b2007005 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -157,6 +157,11 @@ #include "messages/MMDSFragmentNotify.h" #include "messages/MMDSFragmentNotifyAck.h" +#ifdef WITH_CEPHFS_NOTIFICATION +#include "messages/MNotificationInfoKafkaTopic.h" +#include "messages/MNotificationInfoUDPEndpoint.h" +#endif + #include "messages/MExportDirDiscover.h" #include "messages/MExportDirDiscoverAck.h" #include "messages/MExportDirCancel.h" @@ -881,6 +886,16 @@ Message *decode_message(CephContext *cct, m = make_message(); break; +#ifdef WITH_CEPHFS_NOTIFICATION + case MSG_MDS_NOTIFICATION_INFO_KAFKA_TOPIC: + m = make_message (); + break; + + case MSG_MDS_NOTIFICATION_INFO_UDP_ENDPOINT: + m = make_message (); + break; +#endif + case MSG_MGR_BEACON: m = make_message(); break; diff --git a/src/msg/Message.h b/src/msg/Message.h index 15eb3feadcede..08f1abdb7e931 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -202,6 +202,11 @@ #define MSG_MDS_QUIESCE_DB_LISTING 0x505 // quiesce db replication #define MSG_MDS_QUIESCE_DB_ACK 0x506 // quiesce agent ack back to the db +#ifdef WITH_CEPHFS_NOTIFICATION +#define MSG_MDS_NOTIFICATION_INFO_KAFKA_TOPIC 0x507 +#define MSG_MDS_NOTIFICATION_INFO_UDP_ENDPOINT 0x508 +#endif + // *** generic *** #define MSG_TIMECHECK 0x600 #define MSG_MON_HEALTH 0x601 diff --git a/src/test/libcephfs/CMakeLists.txt b/src/test/libcephfs/CMakeLists.txt index 6cbbbe246a5e2..8ad73529fea12 100644 --- a/src/test/libcephfs/CMakeLists.txt +++ b/src/test/libcephfs/CMakeLists.txt @@ -10,6 +10,7 @@ if(WITH_LIBCEPHFS) main.cc deleg.cc monconfig.cc + client_cache.cc ) target_link_libraries(ceph_test_libcephfs ceph-common diff --git a/src/test/libcephfs/client_cache.cc b/src/test/libcephfs/client_cache.cc new file mode 100644 index 0000000000000..a17bc346553bc --- /dev/null +++ b/src/test/libcephfs/client_cache.cc @@ -0,0 +1,314 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "gtest/gtest.h" +#include "include/cephfs/libcephfs.h" +#include "include/ceph_assert.h" +#include "include/object.h" +#include "include/stringify.h" +#include +#include +#include +#include +#include + +using namespace std; +class TestMount { +public: + ceph_mount_info* cmount = nullptr; + string dir_path; + +public: + TestMount(const char* root_dir_name = "") : dir_path(root_dir_name) { + ceph_create(&cmount, NULL); + ceph_conf_read_file(cmount, NULL); + ceph_conf_parse_env(cmount, NULL); + ceph_assert(0 == ceph_mount(cmount, NULL)); + } + ~TestMount() + { + ceph_shutdown(cmount); + } + + int conf_get(const char *option, char *buf, size_t len) { + return ceph_conf_get(cmount, option, buf, len); + } + + int conf_set(const char *option, const char *val) { + return ceph_conf_set(cmount, option, val); + } + + string make_file_path(const char* relpath) { + string ret = dir_path; + ret += '/'; + ret += relpath; + return ret; + } + + int write_full(const char* relpath, const string& data) + { + auto file_path = make_file_path(relpath); + int fd = ceph_open(cmount, file_path.c_str(), O_WRONLY | O_CREAT, 0666); + if (fd < 0) { + return -EACCES; + } + int r = ceph_write(cmount, fd, data.c_str(), data.size(), 0); + if (r >= 0) { + ceph_truncate(cmount, file_path.c_str(), data.size()); + ceph_fsync(cmount, fd, 0); + } + ceph_close(cmount, fd); + return r; + } + string concat_path(string_view path, string_view name) { + string s(path); + if (s.empty() || s.back() != '/') { + s += '/'; + } + s += name; + return s; + } + int unlink(const char* relpath) + { + auto file_path = make_file_path(relpath); + return ceph_unlink(cmount, file_path.c_str()); + } + + int get_snapid(const char* relpath, uint64_t* res) + { + ceph_assert(res); + snap_info snap_info; + + auto snap_path = make_file_path(relpath); + int r = ceph_get_snap_info(cmount, snap_path.c_str(), &snap_info); + if (r >= 0) { + *res = snap_info.id; + r = 0; + } + return r; + } + + int for_each_readdir(const char* relpath, + std::function fn) + { + auto subdir_path = make_file_path(relpath); + struct ceph_dir_result* ls_dir; + int r = ceph_opendir(cmount, subdir_path.c_str(), &ls_dir); + if (r != 0) { + return r; + } + + while (1) { + struct dirent result; + struct ceph_statx stx; + + r = ceph_readdirplus_r( + cmount, ls_dir, &result, &stx, CEPH_STATX_BASIC_STATS, + 0, + NULL); + if (!r) + break; + if (r < 0) { + std::cerr << "ceph_readdirplus_r failed, error: " + << r << std::endl; + return r; + } + + if (strcmp(result.d_name, ".") == 0 || + strcmp(result.d_name, "..") == 0) { + continue; + } + if (!fn(&result, &stx)) { + r = -EINTR; + break; + } + } + ceph_assert(0 == ceph_closedir(cmount, ls_dir)); + return r; + } + + int mkdir(const char* relpath) + { + auto path = make_file_path(relpath); + return ceph_mkdir(cmount, path.c_str(), 0777); + } + int rmdir(const char* relpath) + { + auto path = make_file_path(relpath); + return ceph_rmdir(cmount, path.c_str()); + } + int purge_dir(const char* relpath0) + { + int r = + for_each_readdir(relpath0, + [&](const dirent* dire, const struct ceph_statx* stx) { + string relpath = concat_path(relpath0, dire->d_name); + + if (S_ISDIR(stx->stx_mode)) { + purge_dir(relpath.c_str()); + rmdir(relpath.c_str()); + } else { + unlink(relpath.c_str()); + } + return true; + }); + if (r != 0) { + return r; + } + if (*relpath0 != 0) { + r = rmdir(relpath0); + } + return r; + } + + ceph_mount_info* get_cmount() { + return cmount; + } + + int test_open(const char* relpath) + { + auto subdir_path = make_file_path(relpath); + int r = ceph_open(cmount, subdir_path.c_str(), O_DIRECTORY | O_RDONLY, 0); + if (r < 0) { + std::cout << "test_open error: " << subdir_path.c_str() << ", " << r << std::endl; + return r; + } + return r; + } + int test_close(int fd) + { + ceph_assert(0 == ceph_close(cmount, fd)); + return 0; + } + + int test_statxat(int fd, const char* entry) + { + int r; + { + struct ceph_statx stx; + r = ceph_statxat(cmount, fd, entry, &stx, CEPH_STATX_MODE | CEPH_STATX_INO, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + std::cout << "test_statxat " << entry << " returns " << r << std::endl; + } else { + // replace CEPH_NOSNAP with 0 as the former is negative + // and hence might be confused with an error. + r = (uint64_t)stx.stx_dev == CEPH_NOSNAP ? 0 : stx.stx_dev; + std::cout << "stx=" << stx.stx_ino << "." << r << std::endl; + } + } + return r; + } + int test_statx(const char* path) + { + int r; + { + struct ceph_statx stx; + r = ceph_statx(cmount, path, &stx, CEPH_STATX_MODE | CEPH_STATX_INO, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + std::cout << "test_statx " << path << " returns " << r << std::endl; + } else { + // replace CEPH_NOSNAP with 0 as the former is negative + // and hence might be confused with an error. + r = (uint64_t)stx.stx_dev == CEPH_NOSNAP ? 0 : stx.stx_dev; + std::cout << "stx=" << stx.stx_ino << "." << r << std::endl; + } + } + return r; + } + +}; + +void prepareTrimCacheTest(TestMount& tm, size_t max_bulk) +{ + ceph_rmsnap(tm.cmount, "/BrokenStatxAfterTrimeCacheTest", "snap1"); + ceph_rmsnap(tm.cmount, "/BrokenStatxAfterTrimeCacheTest", "snap2"); + tm.purge_dir("/BrokenStatxAfterTrimeCacheTest"); + + ASSERT_EQ(0, tm.mkdir("/BrokenStatxAfterTrimeCacheTest")); + ASSERT_EQ(0, tm.mkdir("/BrokenStatxAfterTrimeCacheTest/bulk")); + ASSERT_EQ(0, tm.mkdir("/BrokenStatxAfterTrimeCacheTest/test")); + char path[PATH_MAX]; + for (size_t i = 0; i < max_bulk; i++) { + snprintf(path, PATH_MAX - 1, "/BrokenStatxAfterTrimeCacheTest/bulk/%lu", i); + tm.write_full(path, path); + } + + tm.write_full("/BrokenStatxAfterTrimeCacheTest/test/file1", "abcdef"); + ASSERT_EQ(0, tm.mkdir("/BrokenStatxAfterTrimeCacheTest/.snap/snap1")); + tm.write_full("/BrokenStatxAfterTrimeCacheTest/test/file1", "snap2>>>"); + tm.write_full("/BrokenStatxAfterTrimeCacheTest/test/file2", "snap2>>>abcdef"); + ASSERT_EQ(0, tm.mkdir("/BrokenStatxAfterTrimeCacheTest/.snap/snap2")); +} + +TEST(LibCephFS, BrokenStatxAfterTrimCache) +{ + size_t bulk_count = 100; + { + TestMount tm; + prepareTrimCacheTest(tm, bulk_count); + } + TestMount test_mount; + ASSERT_EQ(0, test_mount.conf_set("client_cache_size", stringify(bulk_count/2).c_str())); + + uint64_t snapid1; + uint64_t snapid2; + + // learn snapshot ids and do basic verification + ASSERT_EQ(0, test_mount.get_snapid("/BrokenStatxAfterTrimeCacheTest/.snap/snap1", &snapid1)); + ASSERT_EQ(0, test_mount.get_snapid("/BrokenStatxAfterTrimeCacheTest/.snap/snap2", &snapid2)); + + int s1fd = test_mount.test_open("/BrokenStatxAfterTrimeCacheTest/.snap/snap1"); + int s2fd = test_mount.test_open("/BrokenStatxAfterTrimeCacheTest/.snap/snap2"); + + // check if file1's statxat points to snap1 + ASSERT_EQ(snapid1, test_mount.test_statxat(s1fd, "test/file1")); + // check if file1's statxat points to snap2 + ASSERT_EQ(snapid2, test_mount.test_statxat(s2fd, "test/file1")); + // check if file2's statxat returns -2 + ASSERT_EQ(-2, test_mount.test_statxat(s1fd, "test/file2")); + // check if file2's statx returns -2 + ASSERT_EQ(-2, test_mount.test_statx("/BrokenStatxAfterTrimeCacheTest/.snap/snap1/test/file2")); + + int cnt = 0; + int r = test_mount.for_each_readdir("/BrokenStatxAfterTrimeCacheTest/bulk", + [&](const dirent*, const struct ceph_statx*) { + ++cnt; + return true; + }); + ASSERT_EQ(0, r); + ASSERT_EQ(bulk_count, cnt); + + // open folder to trigger cache trimming + int bulk_fd = test_mount.test_open("/BrokenStatxAfterTrimeCacheTest/bulk"); + + // checking if statxat returns the same values as above, + // which isn't the case if cache trimming evicted dentries behind + // inodes bound to s1fd/s2fd. + EXPECT_EQ(snapid1, test_mount.test_statxat(s1fd, "test/file1")); + EXPECT_EQ(snapid2, test_mount.test_statxat(s2fd, "test/file1")); + // check if file2's statxat returns -2 + EXPECT_EQ(-2, test_mount.test_statxat(s1fd, "test/file2")); + // check if file2's statx still returns -2, should be fine irrespective of cache state. + // This will also update the cache and bring file2 inode back to good shape + ASSERT_EQ(-2, test_mount.test_statx("/BrokenStatxAfterTrimeCacheTest/.snap/snap1/test/file2")); + // check if file2's statxat returns -2 + ASSERT_EQ(-2, test_mount.test_statxat(s1fd, "test/file2")); + test_mount.test_close(bulk_fd); + + test_mount.test_close(s2fd); + test_mount.test_close(s1fd); + + ceph_rmsnap(test_mount.cmount, "/BrokenStatxAfterTrimeCacheTest", "snap1"); + ceph_rmsnap(test_mount.cmount, "/BrokenStatxAfterTrimeCacheTest", "snap2"); + test_mount.purge_dir("/BrokenStatxAfterTrimeCacheTest"); +} diff --git a/src/tools/ceph-dencoder/common_types.h b/src/tools/ceph-dencoder/common_types.h index e853321645ba2..6233e89e1a8ca 100644 --- a/src/tools/ceph-dencoder/common_types.h +++ b/src/tools/ceph-dencoder/common_types.h @@ -214,6 +214,8 @@ TYPE(openc_response_t) #include "messages/MClientRequest.h" MESSAGE(MClientRequest) TYPE(SnapPayload) +TYPE(KafkaTopicPayload) +TYPE(UDPEndpointPayload) TYPE(MClientRequest::Release) #include "messages/MClientRequestForward.h" diff --git a/src/tools/cephfs_mirror/FSMirror.cc b/src/tools/cephfs_mirror/FSMirror.cc index ea1857b1eba86..f4004cf75464b 100644 --- a/src/tools/cephfs_mirror/FSMirror.cc +++ b/src/tools/cephfs_mirror/FSMirror.cc @@ -429,7 +429,7 @@ void FSMirror::add_peer(const Peer &peer) { return; } m_peer_replayers.emplace(peer, std::move(replayer)); - ceph_assert(m_peer_replayers.size() == 1); // support only a single peer + // ceph_assert(m_peer_replayers.size() == 1); // support only a single peer if (m_perf_counters) { m_perf_counters->inc(l_cephfs_mirror_fs_mirror_peers); } diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc index 7b445b0ebda69..429a619b7cd91 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.cc +++ b/src/tools/cephfs_mirror/PeerReplayer.cc @@ -14,7 +14,10 @@ #include "common/debug.h" #include "common/errno.h" #include "common/perf_counters.h" +#include "common/perf_counters_collection.h" #include "common/perf_counters_key.h" +#include "include/stringify.h" +#include "common/Cond.h" #include "FSMirror.h" #include "PeerReplayer.h" #include "Utils.h" @@ -113,14 +116,16 @@ private: // helper to open a directory relative to a file descriptor int opendirat(MountRef mnt, int dirfd, const std::string &relpath, int flags, ceph_dir_result **dirp) { - int r = ceph_openat(mnt, dirfd, relpath.c_str(), flags, 0); + int r = ceph_openat(mnt, dirfd, relpath.c_str(), flags | O_DIRECTORY, 0); if (r < 0) { return r; } int fd = r; r = ceph_fdopendir(mnt, fd, dirp); - ceph_close(mnt, fd); + if (r < 0) { + ceph_close(mnt, fd); + } return r; } @@ -167,20 +172,23 @@ private: Commands commands; }; -PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror, - RadosRef local_cluster, const Filesystem &filesystem, - const Peer &peer, const std::set> &directories, - MountRef mount, ServiceDaemon *service_daemon) - : m_cct(cct), - m_fs_mirror(fs_mirror), - m_local_cluster(local_cluster), - m_filesystem(filesystem), - m_peer(peer), - m_directories(directories.begin(), directories.end()), - m_local_mount(mount), - m_service_daemon(service_daemon), - m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), - m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) { +PeerReplayer::PeerReplayer( + CephContext *cct, FSMirror *fs_mirror, RadosRef local_cluster, + const Filesystem &filesystem, const Peer &peer, + const std::set> &directories, MountRef mount, + ServiceDaemon *service_daemon) + : m_cct(cct), m_fs_mirror(fs_mirror), m_local_cluster(local_cluster), + m_filesystem(filesystem), m_peer(peer), + m_directories(directories.begin(), directories.end()), + m_local_mount(mount), m_service_daemon(service_daemon), + m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)), + m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + + stringify(peer.uuid))), + task_sink_context( + g_ceph_context->_conf->cephfs_mirror_max_concurrent_file_transfer, + this), + dir_op_handler_context( + g_ceph_context->_conf->cephfs_mirror_threads_per_sync, this) { // reset sync stats sent via service daemon m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0); @@ -232,7 +240,7 @@ PeerReplayer::~PeerReplayer() { int PeerReplayer::init() { dout(20) << ": initial dir list=[" << m_directories << "]" << dendl; for (auto &dir_root : m_directories) { - m_snap_sync_stats.emplace(dir_root, SnapSyncStat()); + m_snap_sync_stats.emplace(dir_root, std::make_shared()); } auto &remote_client = m_peer.remote.client_name; @@ -289,6 +297,18 @@ int PeerReplayer::init() { } std::scoped_lock locker(m_lock); + dout(0) << ": Activating file transfer thread pool having, " + "cephfs_mirror_max_concurrent_file_transfer=" + << g_ceph_context->_conf->cephfs_mirror_max_concurrent_file_transfer + << "cephfs_mirror_threads_per_sync=" + << g_ceph_context->_conf->cephfs_mirror_threads_per_sync + << ", cephfs_mirror_max_concurrent_directory_syncs=" + << g_ceph_context->_conf.get_val( + "cephfs_mirror_max_concurrent_directory_syncs") + << dendl; + task_sink_context.activate(); + polling_thread = std::thread(&PeerReplayer::do_poll, this); + auto nr_replayers = g_ceph_context->_conf.get_val( "cephfs_mirror_max_concurrent_directory_syncs"); dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl; @@ -306,6 +326,7 @@ int PeerReplayer::init() { void PeerReplayer::shutdown() { dout(20) << dendl; + dout(0) << ": starting shutdown" << dendl; { std::scoped_lock locker(m_lock); @@ -314,6 +335,13 @@ void PeerReplayer::shutdown() { m_cond.notify_all(); } + dir_op_handler_context.deactivate(); + task_sink_context.deactivate(); + polling_cv.notify_one(); + if (polling_thread.joinable()) { + polling_thread.join(); + } + dout(0) << ": Operation handler thread pool deactivated" << dendl; for (auto &replayer : m_replayers) { replayer->join(); } @@ -329,7 +357,7 @@ void PeerReplayer::add_directory(string_view dir_root) { std::scoped_lock locker(m_lock); m_directories.emplace_back(dir_root); - m_snap_sync_stats.emplace(dir_root, SnapSyncStat()); + m_snap_sync_stats.emplace(dir_root, std::make_shared()); m_cond.notify_all(); } @@ -347,7 +375,7 @@ void PeerReplayer::remove_directory(string_view dir_root) { if (it1 == m_registered.end()) { m_snap_sync_stats.erase(_dir_root); } else { - it1->second.canceled = true; + it1->second->canceled = true; } m_cond.notify_all(); } @@ -362,8 +390,8 @@ boost::optional PeerReplayer::pick_directory() { boost::optional candidate; for (auto &dir_root : m_directories) { auto &sync_stat = m_snap_sync_stats.at(dir_root); - if (sync_stat.failed) { - std::chrono::duration d = now - *sync_stat.last_failed; + if (sync_stat->failed) { + std::chrono::duration d = now - *sync_stat->last_failed; if (d.count() < retry_timo) { continue; } @@ -383,15 +411,15 @@ int PeerReplayer::register_directory(const std::string &dir_root, dout(20) << ": dir_root=" << dir_root << dendl; ceph_assert(m_registered.find(dir_root) == m_registered.end()); - DirRegistry registry; - int r = try_lock_directory(dir_root, replayer, ®istry); + std::shared_ptr registry = std::make_shared (); + int r = try_lock_directory(dir_root, replayer, registry); if (r < 0) { return r; } dout(5) << ": dir_root=" << dir_root << " registered with replayer=" << replayer << dendl; - m_registered.emplace(dir_root, std::move(registry)); + m_registered.emplace(dir_root, registry); return 0; } @@ -401,7 +429,7 @@ void PeerReplayer::unregister_directory(const std::string &dir_root) { auto it = m_registered.find(dir_root); ceph_assert(it != m_registered.end()); - unlock_directory(it->first, it->second); + unlock_directory(it->first, *it->second); m_registered.erase(it); if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) { m_snap_sync_stats.erase(dir_root); @@ -409,7 +437,8 @@ void PeerReplayer::unregister_directory(const std::string &dir_root) { } int PeerReplayer::try_lock_directory(const std::string &dir_root, - SnapshotReplayerThread *replayer, DirRegistry *registry) { + SnapshotReplayerThread *replayer, + std::shared_ptr ®istry) { dout(20) << ": dir_root=" << dir_root << dendl; int r = ceph_open(m_remote_mount, dir_root.c_str(), O_RDONLY | O_DIRECTORY, 0); @@ -530,7 +559,7 @@ int PeerReplayer::build_snap_map(const std::string &dir_root, if (!info.nr_snap_metadata) { std::string failed_reason = "snapshot '" + snap + "' has invalid metadata"; derr << ": " << failed_reason << dendl; - m_snap_sync_stats.at(dir_root).last_failed_reason = failed_reason; + m_snap_sync_stats.at(dir_root)->last_failed_reason = failed_reason; rv = -EINVAL; } else { auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata); @@ -555,7 +584,9 @@ int PeerReplayer::build_snap_map(const std::string &dir_root, snap_map->emplace(snap_id, snap); } - r = ceph_closedir(mnt, dirp); + if (dirp) { + r = ceph_closedir(mnt, dirp); + } if (r < 0) { derr << ": failed to close " << lr_str << " snap directory=" << snap_dir << ": " << cpp_strerror(r) << dendl; @@ -613,56 +644,588 @@ int PeerReplayer::propagate_snap_renames( return 0; } -int PeerReplayer::remote_mkdir(const std::string &epath, const struct ceph_statx &stx, - const FHandles &fh) { - dout(10) << ": remote epath=" << epath << dendl; +int PeerReplayer::sync_attributes(const std::string &epath, + const struct ceph_statx &stx, + unsigned int change_mask, bool is_dir, + const FHandles &fh) { + int r = 0; + if ((change_mask & CEPH_STATX_UID) || (change_mask & CEPH_STATX_GID)) { + // dout(0) << ": epath-->" << epath << ", ceph_chownat" << dendl; + r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), + stx.stx_uid, stx.stx_gid, AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to chown remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + } + + if ((change_mask & CEPH_STATX_MODE)) { + // dout(0) << ": epath-->" << epath << ", ceph_chmodat" << dendl; + r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), + stx.stx_mode & ~S_IFMT, AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to chmod remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; + return r; + } + } + + if (!is_dir && (change_mask & CEPH_STATX_MTIME)) { + // dout(0) << ": epath-->" << epath << ", ceph_utimensat" << dendl; + struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec}, + {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}}; + r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, + AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to change [am]time on remote directory=" << epath + << ": " << cpp_strerror(r) << dendl; + return r; + } + } + return 0; +} - int r = ceph_mkdirat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFDIR); +int PeerReplayer::_remote_mkdir(const std::string &epath, + const struct ceph_statx &cstx, + const FHandles &fh) { + int r = ceph_mkdirat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), + cstx.stx_mode & ~S_IFDIR); if (r < 0 && r != -EEXIST) { - derr << ": failed to create remote directory=" << epath << ": " << cpp_strerror(r) - << dendl; + derr << ": failed to create remote directory=" << epath << ": " + << cpp_strerror(r) << dendl; return r; } + return 0; +} + +int PeerReplayer::remote_mkdir(const std::string &epath, + const struct ceph_statx &cstx, bool create_fresh, + unsigned int change_mask, const FHandles &fh, + std::shared_ptr &dir_sync_stat) { + dout(10) << ": remote epath=" << epath << dendl; + int r = 0; + + if (create_fresh) { // must need to create + r = _remote_mkdir(epath, cstx, fh); + if (r < 0) { + return r; + } + dir_sync_stat->current_stat.inc_dir_created_count(); + } + + r = sync_attributes(epath, cstx, change_mask, true, fh); + return r; +} - r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid, - AT_SYMLINK_NOFOLLOW); +void PeerReplayer::C_DoDirSync::finish(int r) { if (r < 0) { - derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r) - << dendl; - return r; + return; + } + if (r == 0) { + common_entry_info_count = 0; } + replayer->do_dir_sync(dir_root, cur_path, cstx, dirp, create_fresh, + entry_info_known, entry_info, common_entry_info_count, + thread_pool, fh, dir_registry, op_counter, fin, + dir_sync_stat); +} + +void PeerReplayer::C_TransferAndSyncFile::add_into_stat() { + replayer->task_sink_context.thread_pool_stats.add_file(stx.stx_size); +} - r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT, - AT_SYMLINK_NOFOLLOW); +void PeerReplayer::C_TransferAndSyncFile::remove_from_stat() { + replayer->task_sink_context.thread_pool_stats.remove_file(stx.stx_size); +} + +void PeerReplayer::C_TransferAndSyncFile::finish(int r) { if (r < 0) { - derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r) - << dendl; - return r; + return; } + replayer->transfer_and_sync_file(dir_root, epath, stx, change_mask, fh, + dir_registry, dir_sync_stat); +} - struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec}, - {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}}; - r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW); +void PeerReplayer::C_CleanUpRemoteDir::finish(int r) { if (r < 0) { - derr << ": failed to change [am]time on remote directory=" << epath << ": " + return; + } + r = replayer->cleanup_remote_dir(dir_root, epath, fh, dir_registry, + dir_sync_stat); + if (r < 0 && r != -ENOENT) { + derr << ": failed to cleanup remote directory=" << epath << ": " << cpp_strerror(r) << dendl; - return r; + if (!dir_registry->failed) { + replayer->mark_failed(dir_root, r); + } + } +} + +void PeerReplayer::C_DoDirSyncSnapDiff::finish(int r) { + if (r < 0) { + return; } + replayer->do_dir_sync_using_snapdiff(dir_root, cur_path, sd_info, current, + prev, thread_pool, fh, dir_registry, + op_counter, fin, dir_sync_stat); +} - return 0; +void PeerReplayer::C_DeleteFile::finish(int r) { + if (r < 0) { + return; + } + r = replayer->delete_file(dir_root, epath, fh, dir_registry, dir_sync_stat); + if (r < 0 && r != -ENOENT) { + derr << ": failed to cleanup remote file=" << epath << ": " + << cpp_strerror(r) << dendl; + if (!dir_registry->failed) { + replayer->mark_failed(dir_root, r); + } + } +} + +void PeerReplayer::DirOpHandlerContext::update_state() { + if (g_ceph_context->_conf->cephfs_mirror_threads_per_sync == thread_count) { + return; + } + auto do_dout = [this](Peer &m_peer, const std::string &msg) { + dout(0) << msg << dendl; + }; + do_dout(replayer->m_peer, ": update_state called"); + thread_count = g_ceph_context->_conf->cephfs_mirror_threads_per_sync; + int i = 0; + while (true) { + std::unique_lock lock(context_mutex); + if (i >= thread_pools.size()) { + break; + } + auto thread_pool = thread_pools[i++]; + lock.unlock(); + thread_pool->update_num_threads(thread_count); + } +} + +void PeerReplayer::do_poll() { + std::mutex dummy_mutex; + while (true) { + std::unique_lock lock(dummy_mutex); + polling_cv.wait_for(lock, 30s, [this] { return is_stopping(); }); + if (is_stopping()) { + break; + } + lock.unlock(); + dir_op_handler_context.update_state(); + task_sink_context.update_state(); + } +} + +void PeerReplayer::DirOpHandlerContext::ThreadPool::activate() { + std::unique_lock lock(threadpool_config_mutex); + active = true; + task_queue_limit = std::max(500, 10 * num_threads); + for (int i = 0; i < num_threads; ++i) { + thread_status.emplace_back(new ThreadStatus()); + thread_status[i]->stop_called = false; + workers.emplace_back(std::make_unique(&ThreadPool::run_task, + this, thread_status[i])); + thread_status[i]->active = true; + } +} + +std::shared_ptr +PeerReplayer::DirOpHandlerContext::sync_start(int _num_threads) { + std::unique_lock lock(context_mutex); + if (!active) { + return nullptr; + } + int idx = 0; + if (!unassigned_sync_ids.empty()) { + idx = unassigned_sync_ids.back(); + thread_pools[idx] = + std::make_shared(_num_threads, idx, replayer); + unassigned_sync_ids.pop_back(); + } else { + idx = sync_count++; + thread_pools.emplace_back( + std::make_shared(_num_threads, idx, replayer)); + } + thread_pools[idx]->activate(); + return thread_pools[idx]; +} + +void PeerReplayer::DirOpHandlerContext::dump_stats(Formatter *f) { + std::unique_lock lock(context_mutex); + int task_count = 0; + for (int i = 0; i < thread_pools.size(); ++i) { + task_count += thread_pools[i]->queued_task; + } + f->dump_int("queued_dir_ops_count", task_count); +} + +void PeerReplayer::DirOpHandlerContext::deactivate() { + std::unique_lock lock(context_mutex); + active = false; + for (int i = 0; i < thread_pools.size(); ++i) { + thread_pools[i]->deactivate(); + } +} + +void PeerReplayer::DirOpHandlerContext::ThreadPool::update_num_threads( + int thread_count) { + std::unique_lock lock(threadpool_config_mutex); + if (!active) { + return; + } + if (thread_count == num_threads) { + return; + } + dout(0) << ": updating number of threads in threadpool no=" << thread_idx + << dendl; + + if (thread_count < num_threads) { + for (int i = thread_count; i < num_threads; ++i) { + thread_status[i]->stop_called = true; + dout(0) << ": Lazy shutdown of thread no " << i << " from threadpool no " + << thread_idx << dendl; + } + num_threads = thread_count; + pick_task.notify_all(); + } else { + { + std::scoped_lock thread_lock(mtx); + for (int i = num_threads; + i < workers.size() && num_threads < thread_count; ++i) { + if (thread_status[i]->active) { + swap(workers[i], workers[num_threads]); + swap(thread_status[i], thread_status[num_threads]); + thread_status[num_threads++]->stop_called = false; + dout(0) << ": Reactivating thread no " << i << " of thread pool no " + << thread_idx << dendl; + } + } + } + if (num_threads < workers.size()) { + pick_task.notify_all(); + } + for (int i = num_threads; i < workers.size(); ++i) { + if (workers[i]->joinable()) { + workers[i]->join(); + } + dout(0) + << ": Force shutdown of already lazy shut thread having thread no " + << i << " from threadpool no " << thread_idx << dendl; + } + while (workers.size() > num_threads) { + workers.pop_back(); + if (thread_status.back()) { + delete thread_status.back(); + thread_status.back() = nullptr; + } + thread_status.pop_back(); + } + + for (int i = num_threads; i < thread_count; ++i) { + thread_status.emplace_back(new ThreadStatus()); + thread_status[i]->stop_called = false; + workers.emplace_back(std::make_unique( + &ThreadPool::run_task, this, thread_status[i])); + thread_status[i]->active = true; + dout(0) << ": Creating thread no " << i << " in threadpool no " + << thread_idx << dendl; + } + num_threads = thread_count; + } + dout(0) << ": Number of threads for threadpool no " << thread_idx + << " updated, number of threads=" << thread_count << dendl; +} + +void PeerReplayer::DirOpHandlerContext::ThreadPool::deactivate() { + std::unique_lock lock(threadpool_config_mutex); + active = false; + for (int i = 0; i < thread_status.size(); ++i) { + thread_status[i]->stop_called = true; + } + pick_task.notify_all(); + for (auto &worker : workers) { + if (worker->joinable()) { + worker->join(); + } + } + workers.clear(); + for (int i = 0; i < thread_status.size(); ++i) { + if (thread_status[i]) { + delete thread_status[i]; + thread_status[i] = nullptr; + } + } + thread_status.clear(); + lock.unlock(); + drain_queue(); +} + +void PeerReplayer::DirOpHandlerContext::sync_finish(int idx) { + std::unique_lock lock(context_mutex); + ceph_assert(idx < thread_pools.size()); + thread_pools[idx]->deactivate(); + unassigned_sync_ids.push_back(idx); +} + + +void PeerReplayer::DirOpHandlerContext::ThreadPool::drain_queue() { + std::scoped_lock lock(mtx); + while (!task_queue.empty()) { + auto &task = task_queue.front(); + task->complete(-1); + task_queue.pop(); + } +} + +void PeerReplayer::DirOpHandlerContext::ThreadPool::run_task( + ThreadStatus *status) { + while (true) { + C_MirrorContext *task; + { + std::unique_lock lock(mtx); + pick_task.wait(lock, [this, status] { + return (status->stop_called || !task_queue.empty()); + }); + if (status->stop_called) { + status->active = false; + break; + } + task = task_queue.front(); + task_queue.pop(); + queued_task--; + } + task->complete(0); + } +} + +bool PeerReplayer::DirOpHandlerContext::ThreadPool::do_task_async( + C_MirrorContext *task) { + { + std::unique_lock lock(mtx); + if (task_queue.size() >= + g_ceph_context->_conf->cephfs_mirror_thread_pool_queue_size) { + return false; + } + task->inc_counter(); + task_queue.emplace(task); + queued_task++; + } + pick_task.notify_one(); + return true; +} + +bool PeerReplayer::DirOpHandlerContext::ThreadPool::handle_task_async( + C_MirrorContext *task) { + bool success = false; + if (task_queue.size() < + g_ceph_context->_conf->cephfs_mirror_thread_pool_queue_size) { + success = do_task_async(task); + } + return success; +} + +void PeerReplayer::DirOpHandlerContext::ThreadPool::handle_task_force( + C_MirrorContext *task) { + if (!handle_task_async(task)) { + handle_task_sync(task); + } +} + +void PeerReplayer::DirOpHandlerContext::ThreadPool::handle_task_sync( + C_MirrorContext *task) { + task->inc_counter(); + task->complete(1); +} + +void PeerReplayer::TaskSinkContext::update_state() { + int thread_count = + g_ceph_context->_conf->cephfs_mirror_max_concurrent_file_transfer; + std::unique_lock lock(threadpool_config_mutex); + if (!active) { + return; + } + if (thread_count == workers.size()) { + return; + } + auto do_dout = [this](Peer &m_peer, const std::string &msg) { + dout(0) << msg << dendl; + }; + std::string msg = ": updating number of threads in file transfer threadpool"; + do_dout(replayer->m_peer, msg); + if (thread_count < workers.size()) { + for (int i = thread_count; i < workers.size(); ++i) { + stop_flag[i] = true; + } + pick_task.notify_all(); + for (int i = thread_count; i < workers.size(); ++i) { + if (workers[i].joinable()) { + workers[i].join(); + do_dout(replayer->m_peer, ": shutdown of thread no " + + std::to_string(i) + + "from file transfer thread pool"); + } + } + while (workers.size() > thread_count) { + workers.pop_back(); + stop_flag.pop_back(); + } + } else { + for (int i = workers.size(); i < thread_count; ++i) { + stop_flag.emplace_back(false); + workers.emplace_back(std::thread(&TaskSinkContext::run_task, this, i)); + do_dout(replayer->m_peer, ": creating thread no " + std::to_string(i) + + "in file transfer thread pool"); + } + } + + do_dout(replayer->m_peer, ": number of threads for file transfer thread pool " + "updated, number of threads=" + + std::to_string(thread_count)); +} + +void PeerReplayer::TaskSinkContext::activate() { + std::unique_lock lock(threadpool_config_mutex); + active = true; + for (int i = 0; i < workers.size(); ++i) { + stop_flag[i] = false; + workers[i] = std::thread(&TaskSinkContext::run_task, this, i); + } +} + +void PeerReplayer::TaskSinkContext::drain_queue() { + std::scoped_lock lock(mtx); + for (int i = 0; i < task_queue.size(); ++i) { + while (!task_queue[i].empty()) { + auto &task = task_queue[i].front(); + task->complete(-1); + task_queue[i].pop(); + } + } +} + +PeerReplayer::TaskSinkContext::TaskSinkContext(int num_threads, + PeerReplayer *replayer) + : workers(num_threads), stop_flag(num_threads, true), + thread_pool_stats(num_threads, this), sync_count(0), active(false), + replayer(replayer) { + task_limit = max(10 * num_threads, 5000); + // file_task_queue_limit = 100000; + // other_task_queue_limit = 100000; +} + +void PeerReplayer::TaskSinkContext::deactivate() { + std::unique_lock lock(threadpool_config_mutex); + active = false; + for (int i = 0; i < workers.size(); ++i) { + stop_flag[i] = true; + } + pick_task.notify_all(); + give_task.notify_all(); + for (auto &worker : workers) { + if (worker.joinable()) { + worker.join(); + } + } + lock.unlock(); + drain_queue(); +} + +int PeerReplayer::TaskSinkContext::sync_start() { + std::unique_lock lock(mtx); + int idx = 0; + if (!unassigned_sync_ids.empty()) { + idx = unassigned_sync_ids.back(); + task_queue[idx] = std::queue(); + unassigned_sync_ids.pop_back(); + } else { + idx = sync_count++; + task_queue.emplace_back(std::queue()); + } + return idx; +} + +void PeerReplayer::TaskSinkContext::sync_finish(int idx) { + std::unique_lock(mtx); + ceph_assert(idx < task_queue.size()); + while (!task_queue[idx].empty()) { + auto &task = task_queue[idx].front(); + task->complete(-1); + task_queue[idx].pop(); + } + unassigned_sync_ids.push_back(idx); +} + +void PeerReplayer::TaskSinkContext::run_task(int thread_idx) { + while (true) { + C_MirrorContext* task; + int idx = 0; + uint64_t file_size; + { + std::unique_lock lock(mtx); + pick_task.wait(lock, [this, thread_idx] { + return (stop_flag[thread_idx] || !task_ring.empty()); + }); + if (stop_flag[thread_idx]) { + return; + } + task = task_ring.front(); + idx = task->dir_sync_stat->sync_idx; + ceph_assert(idx < task_queue.size() && !task_queue[idx].empty()); + task_queue[idx].pop(); + task_ring.pop(); + if (!task_queue[idx].empty()) { + task_ring.emplace(task_queue[idx].front()); + } + task->remove_from_stat(); + // thread_pool_stats.remove_file(task->stx.stx_size); + } + give_task.notify_one(); + task->complete(0); + } +} + +void PeerReplayer::TaskSinkContext::do_task_async(C_MirrorContext *task) { + int idx = task->dir_sync_stat->sync_idx; + { + std::unique_lock lock(mtx); + give_task.wait(lock, [this, idx] { + return (!active || task_queue[idx].size() < task_limit); + }); + if (!active) { + return; + } + if (task_queue[idx].empty()) { + task_ring.emplace(task); + } + task_queue[idx].emplace(task); + task->inc_counter(); + task->add_into_stat(); + // thread_pool_stats.add_file(task->stx.stx_size); + } + pick_task.notify_one(); } #define NR_IOVECS 8 // # iovecs #define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec -int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string &epath, - const struct ceph_statx &stx, const FHandles &fh) { +int PeerReplayer::copy_to_remote(const std::string &dir_root, + const std::string &epath, + const struct ceph_statx &stx, + const FHandles &fh, + std::shared_ptr &dir_registry) { dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; + uint64_t total_read = 0, total_wrote = 0; + std::string full_path = entry_path(dir_root, epath); int l_fd; int r_fd; void *ptr; struct iovec iov[NR_IOVECS]; - int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), O_RDONLY | O_NOFOLLOW, 0); + int r = ceph_openat(m_local_mount, fh.c_fd, epath.c_str(), + O_RDONLY | O_NOFOLLOW, 0); if (r < 0) { derr << ": failed to open local file path=" << epath << ": " << cpp_strerror(r) << dendl; @@ -687,13 +1250,13 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string } while (true) { - if (should_backoff(dir_root, &r)) { + if (should_backoff(dir_registry, &r)) { dout(0) << ": backing off r=" << r << dendl; break; } for (int i = 0; i < NR_IOVECS; ++i) { - iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i; + iov[i].iov_base = (char *)ptr + IOVEC_SIZE * i; iov[i].iov_len = IOVEC_SIZE; } @@ -706,6 +1269,9 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string if (r == 0) { break; } + total_read += r; + dout(20) << ": successfully read " << total_read << " bytes from local" + << full_path << dendl; int iovs = (int)(r / IOVEC_SIZE); int t = r % IOVEC_SIZE; @@ -720,6 +1286,9 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string << cpp_strerror(r) << dendl; break; } + total_wrote += r; + dout(20) << ": successfully wrote " << total_wrote << " bytes to remote " + << full_path << dendl; } if (r == 0) { @@ -734,39 +1303,75 @@ int PeerReplayer::copy_to_remote(const std::string &dir_root, const std::string close_remote_fd: if (ceph_close(m_remote_mount, r_fd) < 0) { - derr << ": failed to close remote fd path=" << epath << ": " << cpp_strerror(r) - << dendl; - return -EINVAL; + derr << ": failed to close remote fd path=" << epath << ": " + << cpp_strerror(r) << dendl; + r = -EINVAL; } close_local_fd: if (ceph_close(m_local_mount, l_fd) < 0) { - derr << ": failed to close local fd path=" << epath << ": " << cpp_strerror(r) - << dendl; - return -EINVAL; + derr << ": failed to close local fd path=" << epath << ": " + << cpp_strerror(r) << dendl; + r = -EINVAL; } - + // dout(0) << ": file transfer finished-->" << epath << dendl; return r == 0 ? 0 : r; } -int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string &epath, - const struct ceph_statx &stx, const FHandles &fh, - bool need_data_sync, bool need_attr_sync) { - dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync - << ", need_attr_sync=" << need_attr_sync << dendl; +void PeerReplayer::transfer_and_sync_file( + const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, unsigned int change_mask, const FHandles &fh, + std::shared_ptr &dir_registry, + std::shared_ptr &dir_sync_stat) { + int r = copy_to_remote(dir_root, epath, stx, fh, dir_registry); + if (r < 0) { + if (!dir_registry->failed) { + mark_failed(dir_root, r); + } + derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) + << dendl; + return; + } + if (m_perf_counters) { + m_perf_counters->inc(l_cephfs_mirror_peer_replayer_sync_bytes, + stx.stx_size); + } + if (change_mask > 0) { + r = sync_attributes(epath, stx, change_mask, false, fh); + } + if (r < 0 && !dir_registry->failed) { + mark_failed(dir_root, r); + return; + } + dir_sync_stat->current_stat.inc_file_op_count(true, (change_mask > 0), + stx.stx_size); + dir_sync_stat->current_stat.dec_file_in_flight_count(stx.stx_size); + inc_sync_bytes(dir_root, stx.stx_size); +} + +int PeerReplayer::remote_file_op( + const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, bool need_data_sync, unsigned int change_mask, + const FHandles &fh, + std::shared_ptr &thread_pool, + std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat) { + dout(10) << ": dir_root=" << dir_root << ", epath=" << epath + << ", need_data_sync=" << need_data_sync + << ", stat_change_mask=" << change_mask << dendl; int r; if (need_data_sync) { if (S_ISREG(stx.stx_mode)) { - r = copy_to_remote(dir_root, epath, stx, fh); - if (r < 0) { - derr << ": failed to copy path=" << epath << ": " << cpp_strerror(r) << dendl; - return r; - } - if (m_perf_counters) { - m_perf_counters->inc(l_cephfs_mirror_peer_replayer_sync_bytes, stx.stx_size); - } - inc_sync_bytes(dir_root, stx.stx_size); + C_TransferAndSyncFile *task = new C_TransferAndSyncFile( + dir_root, epath, stx, change_mask, fh, dir_registry, op_counter, fin, + this, dir_sync_stat); + task_sink_context.do_task_async(task); + dir_sync_stat->current_stat.inc_file_in_flight_count(stx.stx_size); + + // task(); + return 0; } else if (S_ISLNK(stx.stx_mode)) { // free the remote link before relinking r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), 0); @@ -774,70 +1379,70 @@ int PeerReplayer::remote_file_op(const std::string &dir_root, const std::string derr << ": failed to remove remote symlink=" << epath << dendl; return r; } - char *target = (char *)alloca(stx.stx_size+1); - r = ceph_readlinkat(m_local_mount, fh.c_fd, epath.c_str(), target, stx.stx_size); + char *target = (char *)alloca(stx.stx_size + 1); + r = ceph_readlinkat(m_local_mount, fh.c_fd, epath.c_str(), target, + stx.stx_size); if (r < 0) { - derr << ": failed to readlink local path=" << epath << ": " << cpp_strerror(r) - << dendl; + derr << ": failed to readlink local path=" << epath << ": " + << cpp_strerror(r) << dendl; return r; } target[stx.stx_size] = '\0'; - r = ceph_symlinkat(m_remote_mount, target, fh.r_fd_dir_root, epath.c_str()); + r = ceph_symlinkat(m_remote_mount, target, fh.r_fd_dir_root, + epath.c_str()); if (r < 0 && r != EEXIST) { - derr << ": failed to symlink remote path=" << epath << " to target=" << target - << ": " << cpp_strerror(r) << dendl; + derr << ": failed to symlink remote path=" << epath + << " to target=" << target << ": " << cpp_strerror(r) << dendl; return r; } + change_mask = (CEPH_STATX_MODE | CEPH_STATX_SIZE | CEPH_STATX_UID | + CEPH_STATX_GID | CEPH_STATX_MTIME); } else { - dout(5) << ": skipping entry=" << epath << ": unsupported mode=" << stx.stx_mode - << dendl; + dout(5) << ": skipping entry=" << epath + << ": unsupported mode=" << stx.stx_mode << dendl; + dir_sync_stat->current_stat.inc_file_op_count(false, false, stx.stx_size); return 0; } } - if (need_attr_sync) { - r = ceph_chownat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_uid, stx.stx_gid, - AT_SYMLINK_NOFOLLOW); - if (r < 0) { - derr << ": failed to chown remote directory=" << epath << ": " << cpp_strerror(r) - << dendl; - return r; - } - - r = ceph_chmodat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), stx.stx_mode & ~S_IFMT, - AT_SYMLINK_NOFOLLOW); - if (r < 0) { - derr << ": failed to chmod remote directory=" << epath << ": " << cpp_strerror(r) - << dendl; - return r; - } - - struct timespec times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec}, - {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec}}; - r = ceph_utimensat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), times, AT_SYMLINK_NOFOLLOW); + if (change_mask) { + r = sync_attributes(epath, stx, change_mask, false, fh); if (r < 0) { - derr << ": failed to change [am]time on remote directory=" << epath << ": " - << cpp_strerror(r) << dendl; return r; } } + dir_sync_stat->current_stat.inc_file_op_count( + need_data_sync, (change_mask > 0), stx.stx_size); return 0; } -int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, - const std::string &epath, const FHandles &fh) { - dout(20) << ": dir_root=" << dir_root << ", epath=" << epath - << dendl; +int PeerReplayer::cleanup_remote_dir( + const std::string &dir_root, const std::string &epath, const FHandles &fh, + std::shared_ptr &dir_registry, + std::shared_ptr &dir_sync_stat) { + dout(20) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; struct ceph_statx tstx; int r = ceph_statxat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), &tstx, CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + CEPH_STATX_SIZE | CEPH_STATX_MTIME, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r == -ENOENT) { + dout(10) << ": directory already removed=" << epath << dendl; + return r; + } + if (!S_ISDIR(tstx.stx_mode)) { + dout(10) << ": although it is a directory in previous snapshot but in " + "remote it is not a directory because of intermediate " + "shutdown, removing " + "it as a file=" + << epath << dendl; + r = delete_file(dir_root, epath, fh, dir_registry, dir_sync_stat); + } if (r < 0) { - derr << ": failed to stat remote directory=" << epath << ": " + derr << ": failed to remove remote directory=" << epath << ": " << cpp_strerror(r) << dendl; return r; } @@ -854,7 +1459,7 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, std::stack rm_stack; rm_stack.emplace(SyncEntry(epath, tdirp, tstx)); while (!rm_stack.empty()) { - if (should_backoff(dir_root, &r)) { + if (should_backoff(dir_registry, &r)) { dout(0) << ": backing off r=" << r << dendl; break; } @@ -868,7 +1473,8 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, struct dirent de; while (true) { r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx, - CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); + CEPH_STATX_MODE, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); if (r < 0) { derr << ": failed to read remote directory=" << entry.epath << dendl; break; @@ -885,15 +1491,17 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, } if (r == 0) { - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), AT_REMOVEDIR); + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), + AT_REMOVEDIR); if (r < 0) { derr << ": failed to remove remote directory=" << entry.epath << ": " << cpp_strerror(r) << dendl; break; } + dir_sync_stat->current_stat.inc_dir_deleted_count(); dout(10) << ": done for remote directory=" << entry.epath << dendl; - if (ceph_closedir(m_remote_mount, entry.dirp) < 0) { + if (entry.dirp && ceph_closedir(m_remote_mount, entry.dirp) < 0) { derr << ": failed to close remote directory=" << entry.epath << dendl; } rm_stack.pop(); @@ -906,8 +1514,8 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, auto epath = entry_path(entry.epath, e_name); if (S_ISDIR(stx.stx_mode)) { ceph_dir_result *dirp; - r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, AT_SYMLINK_NOFOLLOW, - &dirp); + r = opendirat(m_remote_mount, fh.r_fd_dir_root, epath, + AT_SYMLINK_NOFOLLOW, &dirp); if (r < 0) { derr << ": failed to open remote directory=" << epath << ": " << cpp_strerror(r) << dendl; @@ -918,13 +1526,15 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, rm_stack.emplace(SyncEntry(epath, stx)); } } else { - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), 0); + r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, entry.epath.c_str(), + 0); if (r < 0) { derr << ": failed to remove remote directory=" << entry.epath << ": " << cpp_strerror(r) << dendl; break; } dout(10) << ": done for remote file=" << entry.epath << dendl; + dir_sync_stat->current_stat.inc_file_del_count(); rm_stack.pop(); } } @@ -933,7 +1543,7 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, auto &entry = rm_stack.top(); if (entry.is_directory()) { dout(20) << ": closing remote directory=" << entry.epath << dendl; - if (ceph_closedir(m_remote_mount, entry.dirp) < 0) { + if (entry.dirp && ceph_closedir(m_remote_mount, entry.dirp) < 0) { derr << ": failed to close remote directory=" << entry.epath << dendl; } } @@ -944,52 +1554,26 @@ int PeerReplayer::cleanup_remote_dir(const std::string &dir_root, return r; } -int PeerReplayer::should_sync_entry(const std::string &epath, const struct ceph_statx &cstx, - const FHandles &fh, bool *need_data_sync, bool *need_attr_sync) { - dout(10) << ": epath=" << epath << dendl; +int PeerReplayer::delete_file(const std::string &dir_root, + const std::string &epath, const FHandles &fh, + std::shared_ptr &dir_registry, + std::shared_ptr &dir_sync_stat) { + dout(20) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; + int r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, epath.c_str(), 0); + if (r == 0) { + dir_sync_stat->current_stat.inc_file_del_count(); + } + return r; +} - *need_data_sync = false; - *need_attr_sync = false; - struct ceph_statx pstx; - int r = ceph_statxat(fh.p_mnt, fh.p_fd, epath.c_str(), &pstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_CTIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0 && r != -ENOENT && r != -ENOTDIR) { - derr << ": failed to stat prev entry= " << epath << ": " << cpp_strerror(r) - << dendl; - return r; - } - - if (r < 0) { - // inode does not exist in prev snapshot or file type has changed - // (file was S_IFREG earlier, S_IFDIR now). - dout(5) << ": entry=" << epath << ", r=" << r << dendl; - *need_data_sync = true; - *need_attr_sync = true; - return 0; - } - - dout(10) << ": local cur statx: mode=" << cstx.stx_mode << ", uid=" << cstx.stx_uid - << ", gid=" << cstx.stx_gid << ", size=" << cstx.stx_size << ", ctime=" - << cstx.stx_ctime << ", mtime=" << cstx.stx_mtime << dendl; - dout(10) << ": local prev statx: mode=" << pstx.stx_mode << ", uid=" << pstx.stx_uid - << ", gid=" << pstx.stx_gid << ", size=" << pstx.stx_size << ", ctime=" - << pstx.stx_ctime << ", mtime=" << pstx.stx_mtime << dendl; - if ((cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)) { - dout(5) << ": entry=" << epath << " has mode mismatch" << dendl; - *need_data_sync = true; - *need_attr_sync = true; - } else { - *need_data_sync = (cstx.stx_size != pstx.stx_size) || (cstx.stx_mtime != pstx.stx_mtime); - *need_attr_sync = (cstx.stx_ctime != pstx.stx_ctime); - } - - return 0; -} - -int PeerReplayer::propagate_deleted_entries(const std::string &dir_root, - const std::string &epath, const FHandles &fh) { +int PeerReplayer::propagate_deleted_entries( + const std::string &dir_root, const std::string &epath, + std::unordered_map &common_entry_info, + uint64_t &common_entry_info_count, const FHandles &fh, + std::shared_ptr &thread_pool, + std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat) { dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << dendl; ceph_dir_result *dirp; @@ -1006,23 +1590,31 @@ int PeerReplayer::propagate_deleted_entries(const std::string &dir_root, return 0; } if (r == -ENOENT) { - dout(5) << ": epath=" << epath << " missing in previous-snap/remote dir-root" - << dendl; + dout(5) << ": epath=" << epath + << " missing in previous-snap/remote dir-root" << dendl; } return r; } - - struct dirent *dire = (struct dirent *)alloca(512 * sizeof(struct dirent)); + struct ceph_statx pstx; + struct dirent de; while (true) { - if (should_backoff(dir_root, &r)) { + if (should_backoff(dir_registry, &r)) { dout(0) << ": backing off r=" << r << dendl; break; } + unsigned int extra_flags = + (common_entry_info_count < + g_ceph_context->_conf->cephfs_mirror_max_element_in_cache_per_thread) + ? (CEPH_STATX_UID | CEPH_STATX_GID | CEPH_STATX_SIZE | + CEPH_STATX_MTIME) + : 0; + r = ceph_readdirplus_r(fh.p_mnt, dirp, &de, &pstx, + CEPH_STATX_MODE | extra_flags, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); - int len = ceph_getdents(fh.p_mnt, dirp, (char *)dire, 512); - if (len < 0) { - derr << ": failed to read directory entries: " << cpp_strerror(len) << dendl; - r = len; + if (r < 0) { + derr << ": failed to read directory entries: " << cpp_strerror(r) + << dendl; // flip errno to signal that we got an err (possible the // snapshot getting deleted in midst). if (r == -ENOENT) { @@ -1030,77 +1622,69 @@ int PeerReplayer::propagate_deleted_entries(const std::string &dir_root, } break; } - if (len == 0) { + if (r == 0) { dout(10) << ": reached EOD" << dendl; break; } - int nr = len / sizeof(struct dirent); - for (int i = 0; i < nr; ++i) { - if (should_backoff(dir_root, &r)) { - dout(0) << ": backing off r=" << r << dendl; - break; - } - std::string d_name = std::string(dire[i].d_name); - if (d_name == "." || d_name == "..") { - continue; - } - - struct ceph_statx pstx; - auto dpath = entry_path(epath, d_name); - r = ceph_statxat(fh.p_mnt, fh.p_fd, dpath.c_str(), &pstx, - CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0) { - derr << ": failed to stat (prev) directory=" << dpath << ": " - << cpp_strerror(r) << dendl; - // flip errno to signal that we got an err (possible the - // snapshot getting deleted in midst). - if (r == -ENOENT) { - r = -EINVAL; - } - return r; - } + std::string d_name = std::string(de.d_name); + if (d_name == "." || d_name == "..") { + continue; + } + auto dpath = entry_path(epath, d_name); - struct ceph_statx cstx; - r = ceph_statxat(m_local_mount, fh.c_fd, dpath.c_str(), &cstx, - CEPH_STATX_MODE, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0 && r != -ENOENT) { - derr << ": failed to stat local (cur) directory=" << dpath << ": " - << cpp_strerror(r) << dendl; - return r; - } + struct ceph_statx cstx; + r = ceph_statxat(m_local_mount, fh.c_fd, dpath.c_str(), &cstx, + CEPH_STATX_MODE | extra_flags, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0 && r != -ENOENT) { + derr << ": failed to stat local (cur) directory=" << dpath << ": " + << cpp_strerror(r) << dendl; + break; + } - bool purge_remote = true; - if (r == 0) { - // directory entry present in both snapshots -- check inode - // type - if ((pstx.stx_mode & S_IFMT) == (cstx.stx_mode & S_IFMT)) { - dout(5) << ": mode matches for entry=" << d_name << dendl; - purge_remote = false; - } else { - dout(5) << ": mode mismatch for entry=" << d_name << dendl; - } + bool purge_remote = true; + bool entry_present = false; + if (r == 0) { + // directory entry present in both snapshots -- check inode + // type + entry_present = true; + if ((pstx.stx_mode & S_IFMT) == (cstx.stx_mode & S_IFMT)) { + dout(5) << ": mode matches for entry=" << d_name << dendl; + purge_remote = false; } else { - dout(5) << ": entry=" << d_name << " missing in current snapshot" << dendl; + dout(5) << ": mode mismatch for entry=" << d_name << dendl; } + } else { + dout(5) << ": entry=" << d_name << " missing in current snapshot" + << dendl; + } - if (purge_remote) { - dout(5) << ": purging remote entry=" << dpath << dendl; - if (S_ISDIR(pstx.stx_mode)) { - r = cleanup_remote_dir(dir_root, dpath, fh); - } else { - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, dpath.c_str(), 0); - } - - if (r < 0 && r != -ENOENT) { - derr << ": failed to cleanup remote entry=" << d_name << ": " - << cpp_strerror(r) << dendl; - return r; - } + if (purge_remote && !entry_present) { + dout(5) << ": purging remote entry=" << dpath << dendl; + if (S_ISDIR(pstx.stx_mode)) { + C_CleanUpRemoteDir *task = + new C_CleanUpRemoteDir(dir_root, dpath, fh, dir_registry, + op_counter, fin, this, dir_sync_stat); + // task_sink_context.do_task_async(task); + thread_pool->handle_task_force(task); + } else { + C_DeleteFile *task = + new C_DeleteFile(dir_root, dpath, fh, dir_registry, op_counter, fin, + this, dir_sync_stat); + // task_sink_context.do_task_async(task); + thread_pool->handle_task_force(task); } + } else if (extra_flags) { + unsigned int change_mask = 0; + build_change_mask(pstx, cstx, purge_remote, change_mask); + common_entry_info[d_name] = + CommonEntryInfo(S_ISDIR(pstx.stx_mode), purge_remote, change_mask); + ++common_entry_info_count; } } - - ceph_closedir(fh.p_mnt, dirp); + if (dirp) { + ceph_closedir(fh.p_mnt, dirp); + } return r; } @@ -1161,10 +1745,18 @@ int PeerReplayer::pre_sync_check_and_open_handles( MountRef mnt; if (prev) { + dout(0) << "mirroring snapshot '" << current.first + << "' using a local copy of snapshot '" << (*prev).first + << "' as a diff base, dir_root=" << dir_root.c_str() + << dendl; mnt = m_local_mount; auto prev_snap_path = snapshot_path(m_cct, dir_root, (*prev).first); fd = open_dir(mnt, prev_snap_path, (*prev).second); } else { + dout(0) << "mirroring snapshot '" << current.first + << "' using a remote state as a diff base, " + "dir_root = " << dir_root.c_str() + << dendl; mnt = m_remote_mount; fd = open_dir(mnt, dir_root, boost::none); } @@ -1193,7 +1785,7 @@ int PeerReplayer::pre_sync_check_and_open_handles( std::scoped_lock locker(m_lock); auto it = m_registered.find(dir_root); ceph_assert(it != m_registered.end()); - fh->r_fd_dir_root = it->second.fd; + fh->r_fd_dir_root = it->second->fd; } dout(5) << ": using " << ((fh->p_mnt == m_local_mount) ? "local (previous) snapshot" : "remote dir_root") @@ -1222,339 +1814,582 @@ int PeerReplayer::sync_perms(const std::string& path) { return 0; } -void PeerReplayer::post_sync_close_handles(const FHandles &fh) { - dout(20) << dendl; - - // @FHandles.r_fd_dir_root is closed in @unregister_directory since - // its used to acquire an exclusive lock on remote dir_root. - ceph_close(m_local_mount, fh.c_fd); - ceph_close(fh.p_mnt, fh.p_fd); -} - -int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t) { - dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; - FHandles fh; - int r = pre_sync_check_and_open_handles(dir_root, current, boost::none, &fh); - if (r < 0) { - dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; - return r; +void PeerReplayer::build_change_mask(const struct ceph_statx &pstx, + const struct ceph_statx &cstx, + bool create_fresh, + unsigned int &change_mask) { + if (create_fresh) { + change_mask = (CEPH_STATX_MODE | CEPH_STATX_SIZE | CEPH_STATX_UID | + CEPH_STATX_GID | CEPH_STATX_MTIME); + return; } - - BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) { - post_sync_close_handles(fh); - }; - - // record that we are going to "dirty" the data under this - // directory root - auto snap_id_str{stringify(current.second)}; - r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, "ceph.mirror.dirty_snap_id", - snap_id_str.c_str(), snap_id_str.size(), 0); - if (r < 0) { - derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root - << ": " << cpp_strerror(r) << dendl; - return r; + if ((cstx.stx_mode & ~S_IFMT) != (pstx.stx_mode & ~S_IFMT)) { + change_mask = change_mask | CEPH_STATX_MODE; } - - struct ceph_statx tstx; - r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0) { - derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r) - << dendl; - return r; + if (cstx.stx_size != pstx.stx_size) { + change_mask = change_mask | CEPH_STATX_SIZE; } - - ceph_dir_result *tdirp; - r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp); - if (r < 0) { - derr << ": failed to open local snap=" << current.first << ": " << cpp_strerror(r) - << dendl; - return r; + if (cstx.stx_uid != pstx.stx_uid) { + change_mask = change_mask | CEPH_STATX_UID; } + if (cstx.stx_gid != pstx.stx_gid) { + change_mask = change_mask | CEPH_STATX_GID; + } + if (cstx.stx_mtime != pstx.stx_mtime) { + change_mask = change_mask | CEPH_STATX_MTIME; + } +} - std::stack sync_stack; - sync_stack.emplace(SyncEntry(".", tdirp, tstx)); - while (!sync_stack.empty()) { - if (should_backoff(dir_root, &r)) { - dout(0) << ": backing off r=" << r << dendl; - break; - } - - dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl; - std::string e_name; - auto &entry = sync_stack.top(); - dout(20) << ": top of stack path=" << entry.epath << dendl; - if (entry.is_directory()) { - // entry is a directory -- propagate deletes for missing entries - // (and changed inode types) to the remote filesystem. - if (!entry.needs_remote_sync()) { - r = propagate_deleted_entries(dir_root, entry.epath, fh); - if (r < 0 && r != -ENOENT) { - derr << ": failed to propagate missing dirs: " << cpp_strerror(r) << dendl; - break; +void PeerReplayer::do_dir_sync( + const std::string &dir_root, const std::string &cur_path, + const struct ceph_statx &cstx, ceph_dir_result *dirp, bool create_fresh, + bool entry_info_known, CommonEntryInfo &entry_info, + uint64_t common_entry_info_count, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat) { + // dout(0) << ": do_dir_sync1-->" << cur_path << dendl; + int r = 0; + bool is_root = (cur_path == "."); + struct ceph_statx child_stx, pstx; + struct dirent child_de; + std::string child_dname, child_ename, child_path; + C_DoDirSync *task; + std::unordered_map common_entry_info; + int pstat_r = 0, rem_r = 0; + + if (should_backoff(dir_registry, &r)) { + dout(0) << ": backing off r=" << r << dendl; + goto safe_exit; + } + + if (entry_info_known) { + dir_sync_stat->current_stat.inc_cache_hit(); + } + + if (!is_root) { + if (!entry_info_known) { + if (!create_fresh) { + pstat_r = ceph_statxat(fh.p_mnt, fh.p_fd, cur_path.c_str(), &pstx, + CEPH_STATX_MODE | CEPH_STATX_UID | + CEPH_STATX_GID | CEPH_STATX_SIZE | + CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (pstat_r < 0 && pstat_r != -ENOENT && pstat_r != -ENOTDIR) { + r = pstat_r; + derr << ": failed to stat prev entry= " << cur_path << ": " + << cpp_strerror(r) << dendl; + goto sanity_check; } - entry.set_remote_synced(); + entry_info.is_dir = S_ISDIR(pstx.stx_mode); + entry_info.purge_remote = + (pstat_r == 0 && + (cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)); } - - struct ceph_statx stx; - struct dirent de; - while (true) { - r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); - if (r < 0) { - derr << ": failed to local read directory=" << entry.epath << dendl; - break; - } - if (r == 0) { - break; - } - - auto d_name = std::string(de.d_name); - if (d_name != "." && d_name != "..") { - e_name = d_name; - break; + } + if (entry_info.purge_remote || pstat_r < 0) { + create_fresh = true; + } + if (create_fresh && entry_info.purge_remote) { + if (entry_info.is_dir) { + rem_r = cleanup_remote_dir(dir_root, cur_path, fh, dir_registry, + dir_sync_stat); + } else { + rem_r = + delete_file(dir_root, cur_path, fh, dir_registry, dir_sync_stat); + if (rem_r == -EISDIR && S_ISDIR(cstx.stx_mode)) { + dout(10) + << ": although it is not a directory in previous snapshot but in " + "remote it is a directory because of intermediate shutdown=" + << cur_path << dendl; + rem_r = 0; } } - - if (r == 0) { - dout(10) << ": done for directory=" << entry.epath << dendl; - if (ceph_closedir(m_local_mount, entry.dirp) < 0) { - derr << ": failed to close local directory=" << entry.epath << dendl; - } - sync_stack.pop(); - continue; + if (rem_r < 0 && rem_r != -ENOENT) { + derr << ": failed to cleanup remote entry=" << cur_path << ": " + << cpp_strerror(rem_r) << dendl; + r = rem_r; + goto sanity_check; } + } + if (!entry_info_known) { + build_change_mask(pstx, cstx, create_fresh, entry_info.change_mask); + } + if (S_ISDIR(cstx.stx_mode)) { + r = remote_mkdir(cur_path, cstx, create_fresh, entry_info.change_mask, fh, + dir_sync_stat); if (r < 0) { - break; + goto sanity_check; } - - auto epath = entry_path(entry.epath, e_name); - if (S_ISDIR(stx.stx_mode)) { - r = remote_mkdir(epath, stx, fh); - if (r < 0) { - break; - } - ceph_dir_result *dirp; - r = opendirat(m_local_mount, fh.c_fd, epath, AT_SYMLINK_NOFOLLOW, &dirp); + r = opendirat(m_local_mount, fh.c_fd, cur_path, AT_SYMLINK_NOFOLLOW, + &dirp); + if (r < 0) { + derr << ": failed to open local directory=" << cur_path << ": " + << cpp_strerror(r) << dendl; + goto safe_exit; + } + dir_sync_stat->current_stat.inc_dir_scanned_count(); + } else { + bool need_data_sync = create_fresh || + ((entry_info.change_mask & CEPH_STATX_SIZE) > 0) || + (((entry_info.change_mask & CEPH_STATX_MTIME) > 0)); + dout(5) << ": entry=" << cur_path << ", data_sync=" << need_data_sync + << ", attr_change_mask=" << entry_info.change_mask << dendl; + if (need_data_sync || entry_info.change_mask) { + r = remote_file_op(dir_root, cur_path, cstx, need_data_sync, + entry_info.change_mask, fh, thread_pool, + dir_registry, op_counter, fin, dir_sync_stat); if (r < 0) { - derr << ": failed to open local directory=" << epath << ": " - << cpp_strerror(r) << dendl; - break; + goto sanity_check; } - sync_stack.emplace(SyncEntry(epath, dirp, stx)); } else { - sync_stack.emplace(SyncEntry(epath, stx)); - } - } else { - bool need_data_sync = true; - bool need_attr_sync = true; - r = should_sync_entry(entry.epath, entry.stx, fh, - &need_data_sync, &need_attr_sync); - if (r < 0) { - break; + dir_sync_stat->current_stat.inc_file_op_count(0, 0, cstx.stx_size); } + goto sanity_check; + } + } + if (!create_fresh) { + r = propagate_deleted_entries(dir_root, cur_path, common_entry_info, + common_entry_info_count, fh, thread_pool, + dir_registry, op_counter, fin, dir_sync_stat); + if (r < 0 && r != -ENOENT) { + derr << ": failed to propagate missing dirs: " << cpp_strerror(r) + << dendl; + goto safe_exit; + } + } - dout(5) << ": entry=" << entry.epath << ", data_sync=" << need_data_sync - << ", attr_sync=" << need_attr_sync << dendl; - if (need_data_sync || need_attr_sync) { - r = remote_file_op(dir_root, entry.epath, entry.stx, fh, need_data_sync, - need_attr_sync); - if (r < 0) { - break; + while (true) { + if (should_backoff(dir_registry, &r)) { + dout(0) << ": backing off r=" << r << dendl; + goto safe_exit; + } + r = ceph_readdirplus_r(m_local_mount, dirp, &child_de, &child_stx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW, NULL); + if (r < 0) { + derr << ": failed to local read directory=" << cur_path << ":" + << cpp_strerror(r) << dendl; + break; + } + if (r == 0) { + dout(10) << ": done for directory=" << cur_path << dendl; + break; + } + auto child_dname = std::string(child_de.d_name); + if (child_dname == "." || child_dname == "..") { + continue; + } + child_path = entry_path(cur_path, child_dname); + CommonEntryInfo child_info; + bool child_info_known = false; + if (!create_fresh) { + auto it = common_entry_info.find(child_dname); + if (it != common_entry_info.end()) { + child_info_known = true; + child_info = it->second; + common_entry_info.erase(it); + --common_entry_info_count; + if (!S_ISDIR(child_stx.stx_mode) && !child_info.purge_remote && + child_info.change_mask == 0) { + dir_sync_stat->current_stat.inc_cache_hit(); + dir_sync_stat->current_stat.inc_file_op_count(0, 0, + child_stx.stx_size); + continue; } } - dout(10) << ": done for epath=" << entry.epath << dendl; - sync_stack.pop(); } + task = new C_DoDirSync(dir_root, child_path, child_stx, nullptr, + child_info.purge_remote | create_fresh, + child_info_known, child_info, + common_entry_info_count, thread_pool, fh, + dir_registry, op_counter, fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); } - while (!sync_stack.empty()) { - auto &entry = sync_stack.top(); - if (entry.is_directory()) { - dout(20) << ": closing local directory=" << entry.epath << dendl; - if (ceph_closedir(m_local_mount, entry.dirp) < 0) { - derr << ": failed to close local directory=" << entry.epath << dendl; +safe_exit: + if (!is_root && dirp && ceph_closedir(m_local_mount, dirp) < 0) { + derr << ": failed to close local directory=" << cur_path << dendl; + } + +sanity_check: + if (r < 0 && !dir_registry->failed) { + mark_failed(dir_root, r); + } +} + +int PeerReplayer::handle_duplicate_entry( + const std::string &dir_root, const std::string &cur_path, + const ceph_snapdiff_entry_t &sd_entry, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat) { + struct ceph_statx cstx; + int r = + ceph_statxat(m_local_mount, fh.c_fd, cur_path.c_str(), &cstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r == 0) { + struct ceph_statx pstx; + r = ceph_statxat(fh.p_mnt, fh.p_fd, cur_path.c_str(), &pstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat prev entry= " << cur_path << ": " + << cpp_strerror(r) << dendl; + return r; + } + bool purge_remote = (cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT); + bool create_fresh = purge_remote; + unsigned int change_mask = 0; + build_change_mask(pstx, cstx, create_fresh, change_mask); + CommonEntryInfo entry_info(S_ISDIR(pstx.stx_mode), purge_remote, + change_mask); + C_DoDirSync *task = + new C_DoDirSync(dir_root, cur_path, cstx, (ceph_dir_result *)nullptr, + create_fresh, true, entry_info, 0, thread_pool, fh, + dir_registry, op_counter, fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); + return 0; + } + if (DT_DIR == sd_entry.dir_entry.d_type) { + C_CleanUpRemoteDir *task = + new C_CleanUpRemoteDir(dir_root, cur_path, fh, dir_registry, + op_counter, fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); + } else { + C_DeleteFile *task = + new C_DeleteFile(dir_root, cur_path, fh, dir_registry, op_counter, + fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); + } + return 0; +} + +void PeerReplayer::do_dir_sync_using_snapdiff( + const std::string &dir_root, const std::string &cur_path, + ceph_snapdiff_info *sd_info, const Snapshot ¤t, const Snapshot &prev, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat) { + // dout(0) << ": cur_dir-->" << cur_path << dendl; + int r = 0; + bool is_root = (cur_path == "."); + unsigned int change_mask = 0; + bool purge_remote = false; + std::string child_name, child_path; + struct ceph_statx cstx; + ceph_snapdiff_entry_t sd_entry; + bool create_fresh = false; + struct ceph_statx pstx; + std::string prev_entry = ""; + if (should_backoff(dir_registry, &r)) { + dout(0) << ": backing off r=" << r << dendl; + goto safe_exit; + } + + r = ceph_statxat(m_local_mount, fh.c_fd, cur_path.c_str(), &cstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0) { + derr << ": failed to stat cur entry= " << cur_path << ": " + << cpp_strerror(r) << dendl; + goto safe_exit; + } + + r = ceph_statxat(fh.p_mnt, fh.p_fd, cur_path.c_str(), &pstx, + CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | + CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); + if (r < 0 && r != -ENOENT && r != -ENOTDIR) { + derr << ": failed to stat prev entry= " << cur_path << ": " + << cpp_strerror(r) << dendl; + goto safe_exit; + } + purge_remote = + (r == 0 && (cstx.stx_mode & S_IFMT) != (pstx.stx_mode & S_IFMT)); + create_fresh = purge_remote || (r < 0); + r = 0; + build_change_mask(pstx, cstx, create_fresh, change_mask); + + if (create_fresh) { + CommonEntryInfo entry_info(S_ISDIR(pstx.stx_mode), purge_remote, + change_mask); + do_dir_sync(dir_root, cur_path, cstx, (ceph_dir_result *)nullptr, true, + true, entry_info, 0, thread_pool, fh, dir_registry, op_counter, + fin, dir_sync_stat); + goto safe_exit; + } + + if (S_ISDIR(cstx.stx_mode)) { // is a directory + r = remote_mkdir(cur_path, cstx, create_fresh, change_mask, fh, + dir_sync_stat); + if (r < 0) { + goto safe_exit; + } + dir_sync_stat->current_stat.inc_dir_scanned_count(); + } else { // is a file + bool need_data_sync = ((change_mask & CEPH_STATX_SIZE) > 0) || + ((change_mask & CEPH_STATX_MTIME) > 0); + dout(5) << ": entry=" << cur_path << ", data_sync=" << need_data_sync + << ", change_mask=" << change_mask << dendl; + if (need_data_sync || change_mask) { + r = remote_file_op(dir_root, cur_path, cstx, need_data_sync, change_mask, + fh, thread_pool, dir_registry, op_counter, fin, + dir_sync_stat); + if (r < 0) { + goto safe_exit; + } + } else { + dir_sync_stat->current_stat.inc_file_op_count(0, 0, cstx.stx_size); + } + goto safe_exit; + } + dout(20) << ": syncing entry, path=" << cur_path << dendl; + + if (!is_root) { + sd_info = new ceph_snapdiff_info(); + } + r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), cur_path.c_str(), + prev.first.c_str(), current.first.c_str(), sd_info); + if (r != 0) { + derr << ": failed to open snapdiff, r=" << r << dendl; + goto safe_exit; + } + + prev_entry = ""; + while (true) { + if (should_backoff(dir_registry, &r)) { + dout(0) << ": backing off r=" << r << dendl; + goto safe_exit; + } + r = ceph_readdir_snapdiff(sd_info, &sd_entry); + if (r < 0) { + derr << ": failed to read directory=" << cur_path << dendl; + } + if (r <= 0) { + break; + } + // New entry found + child_name = sd_entry.dir_entry.d_name; + if ("." == child_name || ".." == child_name) { + continue; + } + if (child_name == prev_entry) { + continue; + } + prev_entry = child_name; + child_path = entry_diff_path(cur_path, child_name); + // dout(0) << ": child_path-->" << cur_path << "-->" << child_name << ", " + // << sd_entry.snapid << ", " << prev.second << dendl; + if (sd_entry.snapid == prev.second) { // can be a deleted entry + r = handle_duplicate_entry(dir_root, child_path, sd_entry, thread_pool, + fh, dir_registry, op_counter, fin, + dir_sync_stat); + if (r < 0) { + goto safe_exit; } + } else { + C_DoDirSyncSnapDiff *task = new C_DoDirSyncSnapDiff( + dir_root, child_path, nullptr, current, prev, thread_pool, fh, + dir_registry, op_counter, fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); } + } + if (r <= 0) { + goto safe_exit; + } - sync_stack.pop(); +safe_exit: + if (!is_root && sd_info) { + if (ceph_close_snapdiff(sd_info) != 0) { + derr << ": failed to close directory=" << cur_path << dendl; + } + if (sd_info) { + delete sd_info; + sd_info = nullptr; + } } - return r; +sanity_check: + if (r < 0 && !dir_registry->failed) { + mark_failed(dir_root, r); + } } -int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot ¤t, +int PeerReplayer::do_synchronize(const std::string &dir_root, + const Snapshot ¤t, boost::optional prev) { - if (!prev) { - derr << ": invalid previous snapshot" << dendl; - return -ENODATA; + dout(0) + << ": cephfs_mirror_max_concurrent_file_transfer=" + << g_ceph_context->_conf->cephfs_mirror_max_concurrent_file_transfer + << ", cephfs_mirror_threads_per_sync=" + << g_ceph_context->_conf->cephfs_mirror_threads_per_sync + << ", cephfs_mirror_remote_diff_base_upon_start=" + << g_ceph_context->_conf.get_val( + "cephfs_mirror_remote_diff_base_upon_start") + << ", cephfs_mirror_sync_latest_snapshot=" + << g_ceph_context->_conf->cephfs_mirror_sync_latest_snapshot + << ", cephfs_mirror_thread_pool_queue_size=" + << g_ceph_context->_conf->cephfs_mirror_thread_pool_queue_size + << ", cephfs_mirror_max_element_in_cache_per_thread=" + << g_ceph_context->_conf->cephfs_mirror_max_element_in_cache_per_thread + << dendl; + dout(20) << ": dir_root=" << dir_root << ", current=" << current << dendl; + if (prev) { + dout(20) << ": incremental sync check from prev=" << prev << dendl; } - dout(20) << ": incremental sync check from prev=" << prev << dendl; - FHandles fh; int r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); if (r < 0) { - dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; + dout(5) << ": cannot proceeed with sync: " << cpp_strerror(r) << dendl; return r; } - BOOST_SCOPE_EXIT_ALL( (this)(&fh) ) { - post_sync_close_handles(fh); - }; - - // record that we are going to "dirty" the data under this directory root + // record that we are going to "dirty" the data under this + // directory root auto snap_id_str{stringify(current.second)}; - r = ceph_setxattr(m_remote_mount, dir_root.c_str(), "ceph.mirror.dirty_snap_id", - snap_id_str.c_str(), snap_id_str.size(), 0); + r = ceph_fsetxattr(m_remote_mount, fh.r_fd_dir_root, + "ceph.mirror.dirty_snap_id", snap_id_str.c_str(), + snap_id_str.size(), 0); if (r < 0) { - derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root - << ": " << cpp_strerror(r) << dendl; + derr << ": error setting \"ceph.mirror.dirty_snap_id\" on dir_root=" + << dir_root << ": " << cpp_strerror(r) << dendl; + ceph_close(m_local_mount, fh.c_fd); + ceph_close(fh.p_mnt, fh.p_fd); return r; } - struct ceph_statx cstx; - r = ceph_fstatx(m_local_mount, fh.c_fd, &cstx, + struct ceph_statx tstx; + r = ceph_fstatx(m_local_mount, fh.c_fd, &tstx, CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, + CEPH_STATX_SIZE | CEPH_STATX_MTIME, AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); if (r < 0) { derr << ": failed to stat snap=" << current.first << ": " << cpp_strerror(r) << dendl; + ceph_close(m_local_mount, fh.c_fd); + ceph_close(fh.p_mnt, fh.p_fd); return r; } + ceph_dir_result *tdirp = nullptr; ceph_snapdiff_info sd_info; - ceph_snapdiff_entry_t sd_entry; - - //The queue of SyncEntry items (directories) to be synchronized. - //We follow a breadth first approach here based on the snapdiff output. - std::queue sync_queue; - - //start with initial/default entry - std::string epath = ".", npath = "", nabs_path = "", nname = ""; - sync_queue.emplace(SyncEntry(epath, cstx)); - - while (!sync_queue.empty()) { - if (should_backoff(dir_root, &r)) { - dout(0) << ": backing off r=" << r << dendl; - break; - } - r = pre_sync_check_and_open_handles(dir_root, current, prev, &fh); + if (!prev) { + r = ceph_fdopendir(m_local_mount, fh.c_fd, &tdirp); if (r < 0) { - dout(5) << ": cannot proceed with sync: " << cpp_strerror(r) << dendl; + derr << ": failed to open local snap=" << current.first << ": " + << cpp_strerror(r) << dendl; + ceph_close(m_local_mount, fh.c_fd); + ceph_close(fh.p_mnt, fh.p_fd); return r; } + } - dout(20) << ": " << sync_queue.size() << " entries in queue" << dendl; - const auto &queue_entry = sync_queue.front(); - epath = queue_entry.epath; - dout(20) << ": syncing entry, path=" << epath << dendl; - r = ceph_open_snapdiff(fh.p_mnt, dir_root.c_str(), epath.c_str(), - stringify((*prev).first).c_str(), current.first.c_str(), &sd_info); - if (r != 0) { - derr << ": failed to open snapdiff, r=" << r << dendl; - return r; - } - while (0 < (r = ceph_readdir_snapdiff(&sd_info, &sd_entry))) { - if (r < 0) { - derr << ": failed to read directory=" << epath << dendl; - ceph_close_snapdiff(&sd_info); - return r; - } + // starting from this point we shouldn't care about manual closing of fh.c_fd, + // it will be closed automatically when bound tdirp is closed. - //New entry found - nname = sd_entry.dir_entry.d_name; - if ("." == nname || ".." == nname) - continue; - // create path for the newly found entry - npath = entry_diff_path(epath, nname); - nabs_path = entry_diff_path(dir_root, npath); + std::string cur_snap_path = snapshot_path(m_cct, dir_root, current.first); + void *buf_file_count = malloc(20); + std::string rfiles = ""; + int x1 = ceph_getxattr(m_local_mount, cur_snap_path.c_str(), + "ceph.dir.rfiles", buf_file_count, 20); + if (x1 < 0) { + derr << ": failed to read ceph.dir.rfiles xattr for directory=" << dir_root + << dendl; + } else { + rfiles = std::move(std::string((char *)buf_file_count, x1)); + dout(0) << ": xattr ceph.dir.rfiles=" << rfiles + << " for directory=" << dir_root << dendl; + } - r = ceph_statx(sd_info.cmount, nabs_path.c_str(), &cstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (r < 0) { - // can't stat, so it's a deleted entry. - if (DT_DIR == sd_entry.dir_entry.d_type) { // is a directory - r = cleanup_remote_dir(dir_root, npath, fh); - if (r < 0) { - derr << ": failed to remove directory=" << nabs_path << dendl; - break; - } - } - else { // is a file - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0); - if (r < 0) { - break; - } - } - } else { - // stat success, update the existing entry - struct ceph_statx tstx; - int rstat_r = ceph_statx(m_remote_mount, nabs_path.c_str(), &tstx, - CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID | - CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME, - AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW); - if (S_ISDIR(cstx.stx_mode)) { // is a directory - //cleanup if it's a file in the remotefs - if ((0 == rstat_r) && !S_ISDIR(tstx.stx_mode)) { - r = ceph_unlinkat(m_remote_mount, fh.r_fd_dir_root, npath.c_str(), 0); - if (r < 0) { - derr << ": Error in directory sync. Failed to remove file=" - << nabs_path << dendl; - break; - } - } - r = remote_mkdir(npath, cstx, fh); - if (r < 0) { - break; - } - // push it to sync_queue for later processing - sync_queue.emplace(SyncEntry(npath, cstx)); - } else { // is a file - bool need_data_sync = true; - bool need_attr_sync = true; - r = should_sync_entry(npath, cstx, fh, &need_data_sync, &need_attr_sync); - if (r < 0) { - break; - } - dout(5) << ": entry=" << npath << ", data_sync=" << need_data_sync - << ", attr_sync=" << need_attr_sync << dendl; - if (need_data_sync || need_attr_sync) { - //cleanup if it's a directory in the remotefs - if ((0 == rstat_r) && S_ISDIR(tstx.stx_mode)) { - r = cleanup_remote_dir(dir_root, npath, fh); - if (r < 0) { - derr << ": Error in file sync. Failed to remove remote directory=" - << nabs_path << dendl; - break; - } - } - r = remote_file_op(dir_root, npath, cstx, fh, need_data_sync, need_attr_sync); - if (r < 0) { - break; - } - } - } - } - } - if (0 == r) { - dout(10) << ": successfully synchronized the entry=" << epath << dendl; + void *buf_file_bytes = malloc(20); + std::string rbytes = ""; + int x2 = ceph_getxattr(m_local_mount, cur_snap_path.c_str(), + "ceph.dir.rbytes", buf_file_bytes, 20); + if (x2 < 0) { + derr << ": failed to read ceph.dir.rfiles xattr for directory=" << dir_root + << dendl; + } else { + rbytes = std::move(std::string((char *)buf_file_bytes, x2)); + dout(0) << ": xattr ceph.dir.rbytes=" << rbytes + << " for directory=" << dir_root << dendl; + } + + std::atomic op_counter(0); + C_SaferCond fin; + std::unique_lock lock(m_lock); + auto dir_registry = m_registered.at(dir_root); + dir_registry->failed = false, dir_registry->failed_reason = 0; + auto dir_sync_stat = m_snap_sync_stats.at(dir_root); + dir_sync_stat->current_stat.rfiles = std::stoull(rfiles); + dir_sync_stat->current_stat.rbytes = std::stoull(rbytes); + dir_sync_stat->current_stat.start_timer(); + dir_sync_stat->sync_idx = task_sink_context.sync_start(); + lock.unlock(); + int _num_threads = + g_ceph_context->_conf.get_val("cephfs_mirror_threads_per_sync"); + dout(0) << ": number of threads for this sync=" << _num_threads << dendl; + std::shared_ptr thread_pool = + dir_op_handler_context.sync_start(_num_threads); + + dout(0) << ": dir_sync_stat.sync_idx=" << dir_sync_stat->sync_idx + << ", thread_pool=" << thread_pool->thread_idx << dendl; + + if (thread_pool) { + if (!prev) { + C_DoDirSync *task = new C_DoDirSync( + dir_root, ".", tstx, tdirp, false, false, CommonEntryInfo(), 0, + thread_pool, fh, dir_registry, op_counter, &fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); + } else { + C_DoDirSyncSnapDiff *task = new C_DoDirSyncSnapDiff( + dir_root, ".", &sd_info, current, *prev, thread_pool, fh, + dir_registry, op_counter, &fin, this, dir_sync_stat); + thread_pool->handle_task_force(task); } + fin.wait(); + dir_op_handler_context.sync_finish(thread_pool->thread_idx); + task_sink_context.sync_finish(dir_sync_stat->sync_idx); + } - //Close the current open directory and take the next queue_entry, if success or failure. - r = ceph_close_snapdiff(&sd_info); - if (r != 0) { - derr << ": failed to close directory=" << epath << dendl; - } - sync_queue.pop(); + if (!prev && tdirp && ceph_closedir(m_local_mount, tdirp) < 0) { + derr << ": failed to close local directory=." << dendl; + } + if (prev && ceph_close_snapdiff(&sd_info) != 0) { + derr << ": failed to close local directory=." << dendl; } + + if (m_stopping) { + r = -EINPROGRESS; + } + if (r >= 0 && dir_registry->failed) { + r = get_failed_reason(dir_root); + } + lock.lock(); + dir_sync_stat->reset_stats(); + lock.unlock(); + + dout(0) << ": done sync-->" << dir_root << ", " << current.first << dendl; + dout(20) << " cur:" << fh.c_fd << " prev:" << fh.p_fd << " ret = " << r + << dendl; + + // @FHandles.r_fd_dir_root is closed in @unregister_directory since + // its used to acquire an exclusive lock on remote dir_root. + + // c_fd has been used in ceph_fdopendir call so + // there is no need to close this fd manually. + ceph_close(m_local_mount, fh.c_fd); + ceph_close(fh.p_mnt, fh.p_fd); + return r; } @@ -1572,11 +2407,22 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre return r; } + { + std::scoped_lock lock(m_lock); + auto &dir_sync_stat = m_snap_sync_stats.at(dir_root); + bool remote_diff_base = g_ceph_context->_conf.get_val( + "cephfs_mirror_remote_diff_base_upon_start"); + if (dir_sync_stat->synced_snap_count == 0 && remote_diff_base) { + r = -1; + } + } + // r = -1; + // no xattr, can't determine which snap the data belongs to! if (r < 0) { dout(5) << ": missing \"ceph.mirror.dirty_snap_id\" xattr on remote -- using" << " incremental sync with remote scan" << dendl; - r = do_synchronize(dir_root, current); + r = do_synchronize(dir_root, current, boost::none); } else { size_t xlen = r; char *val = (char *)alloca(xlen+1); @@ -1597,7 +2443,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre r = do_synchronize(dir_root, current, prev); } else { dout(5) << ": mismatch -- using incremental sync with remote scan" << dendl; - r = do_synchronize(dir_root, current); + r = do_synchronize(dir_root, current, boost::none); } } @@ -1618,6 +2464,54 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre return r; } +int PeerReplayer::_do_sync_snaps(const std::string &dir_root, + uint64_t cur_snap_id, + std::string cur_snap_name, + uint64_t last_snap_id, + std::string last_snap_name) { + int r = 0; + double start = 0, end = 0, duration = 0; + if (m_perf_counters) { + start = std::chrono::duration_cast( + clock::now().time_since_epoch()) + .count(); + utime_t t; + t.set_from_double(start); + m_perf_counters->tset(l_cephfs_mirror_peer_replayer_last_synced_start, t); + } + set_current_syncing_snap(dir_root, cur_snap_id, cur_snap_name); + boost::optional prev = boost::none; + if (last_snap_id != 0) { + prev = std::make_pair(last_snap_name, last_snap_id); + } + r = synchronize(dir_root, std::make_pair(cur_snap_name, cur_snap_id), prev); + if (r < 0) { + derr << ": failed to synchronize dir_root=" << dir_root + << ", snapshot=" << cur_snap_name << dendl; + clear_current_syncing_snap(dir_root); + return r; + } + if (m_perf_counters) { + m_perf_counters->inc(l_cephfs_mirror_peer_replayer_snaps_synced); + end = std::chrono::duration_cast( + clock::now().time_since_epoch()) + .count(); + utime_t t; + t.set_from_double(end); + m_perf_counters->tset(l_cephfs_mirror_peer_replayer_last_synced_end, t); + duration = end - start; + t.set_from_double(duration); + m_perf_counters->tinc(l_cephfs_mirror_peer_replayer_avg_sync_time, t); + m_perf_counters->tset(l_cephfs_mirror_peer_replayer_last_synced_duration, + t); + m_perf_counters->set(l_cephfs_mirror_peer_replayer_last_synced_bytes, + m_snap_sync_stats.at(dir_root)->sync_bytes); + } + + set_last_synced_stat(dir_root, cur_snap_id, cur_snap_name, duration); + return r; +} + int PeerReplayer::do_sync_snaps(const std::string &dir_root) { dout(20) << ": dir_root=" << dir_root << dendl; @@ -1671,59 +2565,39 @@ int PeerReplayer::do_sync_snaps(const std::string &dir_root) { set_last_synced_snap(dir_root, last_snap_id, last_snap_name); } - dout(5) << ": last snap-id transferred=" << last_snap_id << dendl; - auto it = local_snap_map.upper_bound(last_snap_id); - if (it == local_snap_map.end()) { - dout(20) << ": nothing to synchronize" << dendl; - return 0; - } - - auto snaps_per_cycle = g_ceph_context->_conf.get_val( - "cephfs_mirror_max_snapshot_sync_per_cycle"); - - dout(10) << ": synchronizing from snap-id=" << it->first << dendl; - double start = 0; - double end = 0; - double duration = 0; - for (; it != local_snap_map.end(); ++it) { - if (m_perf_counters) { - start = std::chrono::duration_cast(clock::now().time_since_epoch()).count(); - utime_t t; - t.set_from_double(start); - m_perf_counters->tset(l_cephfs_mirror_peer_replayer_last_synced_start, t); - } - set_current_syncing_snap(dir_root, it->first, it->second); - boost::optional prev = boost::none; - if (last_snap_id != 0) { - prev = std::make_pair(last_snap_name, last_snap_id); - } - r = synchronize(dir_root, std::make_pair(it->second, it->first), prev); - if (r < 0) { - derr << ": failed to synchronize dir_root=" << dir_root - << ", snapshot=" << it->second << dendl; - clear_current_syncing_snap(dir_root); - return r; + if (g_ceph_context->_conf->cephfs_mirror_sync_latest_snapshot) { + auto it = local_snap_map.rbegin(); + if (it->first != last_snap_id) { + dout(5) << ": latest_local_snap_id=" << it->first + << ", latest_local_snap_name=" << it->second << dendl; + r = _do_sync_snaps(dir_root, it->first, it->second, last_snap_id, + last_snap_name); } - if (m_perf_counters) { - m_perf_counters->inc(l_cephfs_mirror_peer_replayer_snaps_synced); - end = std::chrono::duration_cast(clock::now().time_since_epoch()).count(); - utime_t t; - t.set_from_double(end); - m_perf_counters->tset(l_cephfs_mirror_peer_replayer_last_synced_end, t); - duration = end - start; - t.set_from_double(duration); - m_perf_counters->tinc(l_cephfs_mirror_peer_replayer_avg_sync_time, t); - m_perf_counters->tset(l_cephfs_mirror_peer_replayer_last_synced_duration, t); - m_perf_counters->set(l_cephfs_mirror_peer_replayer_last_synced_bytes, m_snap_sync_stats.at(dir_root).sync_bytes); - } - - set_last_synced_stat(dir_root, it->first, it->second, duration); - if (--snaps_per_cycle == 0) { - break; + } else { + dout(5) << ": last snap-id transferred=" << last_snap_id << dendl; + auto it = local_snap_map.upper_bound(last_snap_id); + if (it == local_snap_map.end()) { + dout(20) << ": nothing to synchronize" << dendl; + return 0; } - last_snap_name = it->second; - last_snap_id = it->first; + auto snaps_per_cycle = g_ceph_context->_conf.get_val( + "cephfs_mirror_max_snapshot_sync_per_cycle"); + + dout(10) << ": synchronizing from snap-id=" << it->first << dendl; + for (; it != local_snap_map.end(); ++it) { + r = _do_sync_snaps(dir_root, it->first, it->second, last_snap_id, + last_snap_name); + if (r < 0) { + return r; + } + if (--snaps_per_cycle == 0) { + break; + } + + last_snap_name = it->second; + last_snap_id = it->first; + } } return 0; @@ -1806,38 +2680,44 @@ void PeerReplayer::peer_status(Formatter *f) { f->open_object_section("stats"); for (auto &[dir_root, sync_stat] : m_snap_sync_stats) { f->open_object_section(dir_root); - if (sync_stat.failed) { + if (sync_stat->failed) { f->dump_string("state", "failed"); - if (sync_stat.last_failed_reason) { - f->dump_string("failure_reason", *sync_stat.last_failed_reason); + if (sync_stat->last_failed_reason) { + f->dump_string("failure_reason", *sync_stat->last_failed_reason); } - } else if (!sync_stat.current_syncing_snap) { + } else if (!sync_stat->current_syncing_snap) { f->dump_string("state", "idle"); } else { f->dump_string("state", "syncing"); - f->open_object_section("current_sycning_snap"); - f->dump_unsigned("id", (*sync_stat.current_syncing_snap).first); - f->dump_string("name", (*sync_stat.current_syncing_snap).second); + f->open_object_section("current_syncing_snap"); + f->dump_unsigned("id", (*sync_stat->current_syncing_snap).first); + f->dump_string("name", (*sync_stat->current_syncing_snap).second); + sync_stat->current_stat.dump(f); f->close_section(); } - if (sync_stat.last_synced_snap) { + if (sync_stat->last_synced_snap) { f->open_object_section("last_synced_snap"); - f->dump_unsigned("id", (*sync_stat.last_synced_snap).first); - f->dump_string("name", (*sync_stat.last_synced_snap).second); - if (sync_stat.last_sync_duration) { - f->dump_float("sync_duration", *sync_stat.last_sync_duration); - f->dump_stream("sync_time_stamp") << sync_stat.last_synced; + f->dump_unsigned("id", (*sync_stat->last_synced_snap).first); + f->dump_string("name", (*sync_stat->last_synced_snap).second); + if (sync_stat->last_sync_duration) { + f->dump_float("sync_duration", *sync_stat->last_sync_duration); + f->dump_stream("sync_time_stamp") << sync_stat->last_synced; } - if (sync_stat.last_sync_bytes) { - f->dump_unsigned("sync_bytes", *sync_stat.last_sync_bytes); + if (sync_stat->last_sync_bytes) { + f->dump_unsigned("sync_bytes", *sync_stat->last_sync_bytes); } + sync_stat->last_stat.dump(f); f->close_section(); } - f->dump_unsigned("snaps_synced", sync_stat.synced_snap_count); - f->dump_unsigned("snaps_deleted", sync_stat.deleted_snap_count); - f->dump_unsigned("snaps_renamed", sync_stat.renamed_snap_count); + f->dump_unsigned("snaps_synced", sync_stat->synced_snap_count); + f->dump_unsigned("snaps_deleted", sync_stat->deleted_snap_count); + f->dump_unsigned("snaps_renamed", sync_stat->renamed_snap_count); f->close_section(); // dir_root } + f->open_object_section("thread_pool_stats"); + task_sink_context.dump_stats(f); + dir_op_handler_context.dump_stats(f); + f->close_section(); f->close_section(); // stats } @@ -1850,4 +2730,4 @@ void PeerReplayer::reopen_logs() { } } // namespace mirror -} // namespace cephfs +} // namespace cephfs \ No newline at end of file diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h index 933cb182635ba..074d700e16667 100644 --- a/src/tools/cephfs_mirror/PeerReplayer.h +++ b/src/tools/cephfs_mirror/PeerReplayer.h @@ -10,12 +10,15 @@ #include "ServiceDaemon.h" #include "Types.h" +#include + namespace cephfs { namespace mirror { class FSMirror; class PeerReplayerAdminSocketHook; + class PeerReplayer { public: PeerReplayer(CephContext *cct, FSMirror *fs_mirror, @@ -24,6 +27,28 @@ public: MountRef mount, ServiceDaemon *service_daemon); ~PeerReplayer(); + // file descriptor "triplet" for synchronizing a snapshot + // w/ an added MountRef for accessing "previous" snapshot. + struct FHandles { + // open file descriptor on the snap directory for snapshot + // currently being synchronized. Always use this fd with + // @m_local_mount. + int c_fd; + + // open file descriptor on the "previous" snapshot or on + // dir_root on remote filesystem (based on if the snapshot + // can be used for incremental transfer). Always use this + // fd with p_mnt which either points to @m_local_mount ( + // for local incremental comparison) or @m_remote_mount ( + // for remote incremental comparison). + int p_fd; + MountRef p_mnt; + + // open file descriptor on dir_root on remote filesystem. + // Always use this fd with @m_remote_mount. + int r_fd_dir_root; + }; + // initialize replayer for a peer int init(); @@ -42,36 +67,14 @@ public: // reopen logs void reopen_logs(); + using Snapshot = std::pair; + private: inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id"; inline static const std::string SERVICE_DAEMON_FAILED_DIR_COUNT_KEY = "failure_count"; inline static const std::string SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY = "recovery_count"; - using Snapshot = std::pair; - - // file descriptor "triplet" for synchronizing a snapshot - // w/ an added MountRef for accessing "previous" snapshot. - struct FHandles { - // open file descriptor on the snap directory for snapshot - // currently being synchronized. Always use this fd with - // @m_local_mount. - int c_fd; - - // open file descriptor on the "previous" snapshot or on - // dir_root on remote filesystem (based on if the snapshot - // can be used for incremental transfer). Always use this - // fd with p_mnt which either points to @m_local_mount ( - // for local incremental comparison) or @m_remote_mount ( - // for remote incremental comparison). - int p_fd; - MountRef p_mnt; - - // open file descriptor on dir_root on remote filesystem. - // Always use this fd with @m_remote_mount. - int r_fd_dir_root; - }; - bool is_stopping() { return m_stopping; } @@ -94,31 +97,48 @@ private: struct DirRegistry { int fd; - bool canceled = false; + std::atomic canceled; SnapshotReplayerThread *replayer; + std::atomic failed; + int failed_reason; + DirRegistry(): canceled(false), failed(false) {} + DirRegistry(const DirRegistry ®istry) + : fd(registry.fd), canceled(registry.canceled.load()), + replayer(registry.replayer), failed(registry.failed.load()), + failed_reason(registry.failed_reason) {} + DirRegistry &operator=(const DirRegistry ®istry) { + if (this != ®istry) { + fd = registry.fd; + canceled.store(registry.canceled.load()); + replayer = registry.replayer; + failed.store(registry.failed.load()); + failed_reason = registry.failed_reason; + } + return *this; + } }; struct SyncEntry { std::string epath; ceph_dir_result *dirp; // valid for directories + ceph_snapdiff_info* sd_info; + ceph_snapdiff_entry_t sd_entry; struct ceph_statx stx; + bool create_fresh = false; // set by incremental sync _after_ ensuring missing entries // in the currently synced snapshot have been propagated to // the remote filesystem. bool remote_synced = false; - SyncEntry(std::string_view path, - const struct ceph_statx &stx) - : epath(path), - stx(stx) { - } - SyncEntry(std::string_view path, - ceph_dir_result *dirp, + SyncEntry(std::string_view path, const struct ceph_statx &stx) + : epath(path), stx(stx) {} + SyncEntry(std::string_view path, ceph_dir_result *dirp, + const struct ceph_statx &stx, bool create_fresh = false) + : epath(path), dirp(dirp), stx(stx), create_fresh(create_fresh) {} + SyncEntry(std::string_view path, ceph_snapdiff_info *sd_info, + const ceph_snapdiff_entry_t &sd_entry, const struct ceph_statx &stx) - : epath(path), - dirp(dirp), - stx(stx) { - } + : epath(path), sd_info(sd_info), sd_entry(sd_entry), stx(stx) {} bool is_directory() const { return S_ISDIR(stx.stx_mode); @@ -138,6 +158,27 @@ private: uint64_t recovered_dir_count = 0; }; + struct CommonEntryInfo { + bool is_dir = false; + bool purge_remote = false; + unsigned int change_mask = 0; + CommonEntryInfo() : is_dir(false), purge_remote(false), change_mask(0) {} + CommonEntryInfo(bool is_dir, bool purge_remote, unsigned int change_mask) + : is_dir(is_dir), purge_remote(purge_remote), change_mask(change_mask) { + } + CommonEntryInfo(const CommonEntryInfo &other) + : is_dir(other.is_dir), purge_remote(other.purge_remote), + change_mask(other.change_mask) {} + CommonEntryInfo &operator=(const CommonEntryInfo &other) { + if (this != &other) { + is_dir = other.is_dir; + purge_remote = other.purge_remote; + change_mask = other.change_mask; + } + return *this; + } + }; + struct SnapSyncStat { uint64_t nr_failures = 0; // number of consecutive failures boost::optional last_failed; // lat failed timestamp @@ -152,15 +193,489 @@ private: boost::optional last_sync_duration; boost::optional last_sync_bytes; //last sync bytes for display in status uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync. + int sync_idx; + struct SyncStat { + uint64_t rfiles; + uint64_t rbytes; + std::atomic files_in_flight{0}; + std::atomic files_op[2][2] = {0}; + std::atomic files_deleted{0}; + std::atomic large_files_in_flight{0}; + std::atomic large_files_op[2][2] = {0}; + std::atomic file_bytes_synced{0}; + std::atomic dir_created{0}; + std::atomic dir_deleted{0}; + std::atomic dir_scanned{0}; + std::atomic cache_hit{0}; + boost::optional start_time; + SyncStat() : start_time(boost::none) {} + SyncStat(const SyncStat &other) + : rfiles(other.rfiles), rbytes(other.rbytes), + files_in_flight(other.files_in_flight.load()), + files_deleted(other.files_deleted.load()), + large_files_in_flight(other.large_files_in_flight.load()), + file_bytes_synced(other.file_bytes_synced.load()), + dir_created(other.dir_created.load()), + dir_deleted(other.dir_deleted.load()), + dir_scanned(other.dir_scanned.load()), + cache_hit(other.cache_hit.load()), start_time(other.start_time) { + for (size_t i = 0; i < 2; ++i) { + for (size_t j = 0; j < 2; ++j) { + files_op[i][j].store(other.files_op[i][j].load()); + large_files_op[i][j].store(other.large_files_op[i][j].load()); + } + } + } + SyncStat &operator=(const SyncStat &other) { + if (this != &other) { // Self-assignment check + rfiles = other.rfiles; + rbytes = other.rbytes; + files_in_flight.store(other.files_in_flight.load()); + files_deleted.store(other.files_deleted.load()); + large_files_in_flight.store(other.large_files_in_flight.load()); + file_bytes_synced.store(other.file_bytes_synced.load()); + dir_created.store(other.dir_created.load()); + dir_deleted.store(other.dir_deleted.load()); + dir_scanned.store(other.dir_scanned.load()); + cache_hit.store(other.cache_hit.load()); + start_time = other.start_time; + for (size_t i = 0; i < 2; ++i) { + for (size_t j = 0; j < 2; ++j) { + files_op[i][j].store(other.files_op[i][j].load()); + large_files_op[i][j].store(other.large_files_op[i][j].load()); + } + } + } + return *this; + } + inline void inc_cache_hit() { cache_hit++; } + inline void inc_file_del_count() { files_deleted++; } + inline void inc_file_op_count(bool data_synced, bool attr_synced, + uint64_t file_size) { + files_op[data_synced][attr_synced]++; + if (file_size >= large_file_threshold) { + large_files_op[data_synced][attr_synced]++; + } + file_bytes_synced.fetch_add(file_size, std::memory_order_relaxed); + } + inline void inc_file_in_flight_count(uint64_t file_size) { + files_in_flight++; + if (file_size >= large_file_threshold) { + large_files_in_flight++; + } + } + inline void dec_file_in_flight_count(uint64_t file_size) { + files_in_flight--; + if (file_size >= large_file_threshold) { + large_files_in_flight--; + } + } + inline void inc_dir_created_count() { dir_created++; } + inline void inc_dir_deleted_count() { dir_deleted++; } + inline void inc_dir_scanned_count() { dir_scanned++; } + inline void start_timer() { start_time = clock::now(); } + void dump(Formatter *f) { + f->dump_unsigned("rfiles", rfiles); + f->dump_unsigned("rbytes", rbytes); + f->dump_unsigned("files_bytes_synced", file_bytes_synced.load()); + if (start_time) { + std::chrono::duration duration = clock::now() - *start_time; + double time_elapsed = duration.count(); + f->dump_float("time_elapsed", time_elapsed); + double speed = 0; + std::string syncing_speed = ""; + if (time_elapsed > 0) { + speed = (file_bytes_synced * 8.0) / time_elapsed; + if (speed >= (double)1e9) { + syncing_speed = std::to_string(speed / (double)1e9) + "Gbps"; + } else if (speed >= (double)1e6) { + syncing_speed = std::to_string(speed / (double)1e6) + "Mbps"; + } else if (speed >= (double)1e3) { + syncing_speed = std::to_string(speed / (double)1e3) + "Kbps"; + } else { + syncing_speed = std::to_string(speed) + "bps"; + } + f->dump_string("syncing_speed", syncing_speed); + } + } + f->dump_unsigned("files_in_flight", files_in_flight.load()); + f->dump_unsigned("files_deleted", files_deleted.load()); + f->dump_unsigned("files_data_attr_synced", files_op[1][1].load()); + f->dump_unsigned("files_data_synced", files_op[1][0].load()); + f->dump_unsigned("files_attr_synced", files_op[0][1].load()); + f->dump_unsigned("files_skipped", files_op[0][0].load()); + f->dump_unsigned("large_files_in_flight", large_files_in_flight.load()); + f->dump_unsigned("large_files_data_attr_synced", + large_files_op[1][1].load()); + f->dump_unsigned("large_files_data_synced", + large_files_op[1][0].load()); + f->dump_unsigned("large_files_attr_synced", + large_files_op[0][1].load()); + f->dump_unsigned("large_files_skipped", large_files_op[0][0].load()); + f->dump_unsigned("dir_scanned", dir_scanned); + f->dump_unsigned("dir_created", dir_created); + f->dump_unsigned("dir_deleted", dir_deleted); + f->dump_unsigned("cache_hit", cache_hit); + } + }; + SyncStat last_stat, current_stat; + static const uint64_t large_file_threshold = 4194304; + SnapSyncStat() {} + SnapSyncStat(const SnapSyncStat &other) + : nr_failures(other.nr_failures), last_failed(other.last_failed), + last_failed_reason(other.last_failed_reason), failed(other.failed), + last_synced_snap(other.last_synced_snap), + current_syncing_snap(other.current_syncing_snap), + synced_snap_count(other.synced_snap_count), + deleted_snap_count(other.deleted_snap_count), + renamed_snap_count(other.renamed_snap_count), + last_synced(other.last_synced), + last_sync_duration(other.last_sync_duration), + last_sync_bytes(other.last_sync_bytes), sync_bytes(other.sync_bytes), + sync_idx(other.sync_idx), last_stat(other.last_stat), + current_stat(other.current_stat) {} + + SnapSyncStat &operator=(const SnapSyncStat &other) { + if (this != &other) { // Self-assignment check + nr_failures = other.nr_failures; + last_failed = other.last_failed; + last_failed_reason = other.last_failed_reason; + failed = other.failed; + last_synced_snap = other.last_synced_snap; + current_syncing_snap = other.current_syncing_snap; + synced_snap_count = other.synced_snap_count; + deleted_snap_count = other.deleted_snap_count; + renamed_snap_count = other.renamed_snap_count; + last_synced = other.last_synced; + last_sync_duration = other.last_sync_duration; + last_sync_bytes = other.last_sync_bytes; + sync_bytes = other.sync_bytes; + sync_idx = other.sync_idx; + last_stat = other.last_stat; + current_stat = other.current_stat; + } + return *this; + } + void reset_stats() { + last_stat = current_stat; + last_stat.start_time = boost::none; + current_stat = SyncStat(); + } + }; + + + class C_MirrorContext; + + class DirOpHandlerContext { + public: + class ThreadPool { + public: + ThreadPool(int num_threads, int thread_idx, PeerReplayer *replayer) + : num_threads(num_threads), active(false), task_queue_limit(0), + queued_task(0), thread_idx(thread_idx), m_peer(replayer->m_peer) {} + void activate(); + void deactivate(); + bool do_task_async(C_MirrorContext *task); + void handle_task_force(C_MirrorContext *task); + bool handle_task_async(C_MirrorContext *task); + void handle_task_sync(C_MirrorContext *task); + void update_num_threads(int thread_count); + friend class PeerReplayer; + + private: + struct ThreadStatus { + bool stop_called = true; + bool active = false; + ThreadStatus() {} + ThreadStatus(bool stop_called, bool active) + : stop_called(stop_called), active(active) {} + }; + void run_task(ThreadStatus *status); + void drain_queue(); + int num_threads; + std::queue task_queue; + std::vector> workers; + std::vector thread_status; + bool active; + std::condition_variable pick_task; + std::mutex mtx; + std::mutex threadpool_config_mutex; + int task_queue_limit; + int queued_task = 0; + int thread_idx; + Peer& m_peer; // just for using dout + }; + DirOpHandlerContext(int thread_count, PeerReplayer *replayer) + : active(true), sync_count(0), thread_count(thread_count), + replayer(replayer) {} + + std::shared_ptr sync_start(int _num_threads); + void sync_finish(int idx); + + void dump_stats(Formatter *f); + + void deactivate(); + void update_state(); + + private: + std::vector> thread_pools; + std::vector unassigned_sync_ids; + int sync_count = 0; + std::mutex context_mutex; + bool active; + int thread_count; + PeerReplayer* replayer; // just for using dout + }; + DirOpHandlerContext dir_op_handler_context; + + class TaskSinkContext { + public: + struct ThreadPoolStats { + uint64_t total_bytes_queued = 0; + int large_file_queued = 0; + int file_queued = 0; + static const uint64_t large_file_threshold = 4194304; + TaskSinkContext *task_sink_context; + void add_file(uint64_t file_size) { + large_file_queued += (file_size >= large_file_threshold); + file_queued++; + total_bytes_queued += file_size; + } + void remove_file(uint64_t file_size) { + large_file_queued -= (file_size >= large_file_threshold); + file_queued--; + total_bytes_queued -= file_size; + } + ThreadPoolStats() {} + ThreadPoolStats(int num_threads, TaskSinkContext *task_sink_context) + : task_sink_context(task_sink_context) {} + void dump(Formatter *f) { + f->dump_int("queued_file_count", file_queued); + f->dump_int("queued_large_file_count", large_file_queued); + f->dump_unsigned("bytes_queued_for_transfer", total_bytes_queued); + } + }; + + TaskSinkContext() {} + TaskSinkContext(int num_threads, PeerReplayer *replayer); + void activate(); + void deactivate(); + void do_task_async(C_MirrorContext *task); + // bool do_task_async(C_MirrorContext *task); + void dump_stats(Formatter *f) { + // std::scoped_lock lock(fmtx, omtx); + thread_pool_stats.dump(f); + } + int sync_start(); + void sync_finish(int idx); + void update_state(); + friend class ThreadPoolStats; + friend class C_TransferAndSyncFile; + friend class PeerReplayer; + + private: + void run_task(int idx); + void drain_queue(); + std::queue task_ring; + std::vector> task_queue; + std::condition_variable pick_task; + std::condition_variable give_task; + std::mutex mtx; + std::mutex threadpool_config_mutex; + int task_limit; + std::vector workers; + std::vector stop_flag; + ThreadPoolStats thread_pool_stats; + std::vector unassigned_sync_ids; + int sync_count = 0; + bool active; + PeerReplayer *replayer; // just for using dout + }; + TaskSinkContext task_sink_context; + std::condition_variable polling_cv; + std::thread polling_thread; + void do_poll(); + + class C_MirrorContext : public Context { + public: + C_MirrorContext(std::atomic &op_counter, Context *fin, + PeerReplayer *replayer, + std::shared_ptr &dir_sync_stat) + : op_counter(op_counter), fin(fin), replayer(replayer), + m_peer(replayer->m_peer), dir_sync_stat(dir_sync_stat) {} + virtual void finish(int r) = 0; + void complete(int r) override { + finish(r); + dec_counter(); + delete this; + } + void inc_counter() { ++op_counter; } + virtual void add_into_stat() {} + virtual void remove_from_stat() {} + friend class DirOpHandlerContext; + friend class PeerReplayer; + + protected: + std::atomic &op_counter; + Context *fin; + PeerReplayer *replayer; + Peer &m_peer; // just for using dout + std::shared_ptr &dir_sync_stat; + + private: + void dec_counter() { + --op_counter; + if (op_counter <= 0) { + fin->complete(0); + } + } + }; + + class C_DoDirSync : public C_MirrorContext { + public: + C_DoDirSync(const std::string &dir_root, const std::string &cur_path, + const struct ceph_statx &cstx, ceph_dir_result *dirp, + bool create_fresh, bool entry_info_known, + const CommonEntryInfo &entry_info, + uint64_t common_entry_info_count, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + PeerReplayer *replayer, + std::shared_ptr &dir_sync_stat) + : C_MirrorContext(op_counter, fin, replayer, dir_sync_stat), + dir_root(dir_root), cur_path(cur_path), cstx(cstx), dirp(dirp), + create_fresh(create_fresh), entry_info_known(entry_info_known), + entry_info(entry_info), + common_entry_info_count(common_entry_info_count), + thread_pool(thread_pool), fh(fh), dir_registry(dir_registry) {} + void finish(int r) override; + void set_common_entry_info_count(uint64_t _common_entry_info_count) { + common_entry_info_count = _common_entry_info_count; + } + + private: + const std::string &dir_root; + std::string cur_path; + struct ceph_statx cstx; + ceph_dir_result *dirp; + bool create_fresh; + bool entry_info_known; + CommonEntryInfo entry_info; + uint64_t common_entry_info_count; + std::shared_ptr &thread_pool; + const FHandles &fh; + std::shared_ptr &dir_registry; + }; + + class C_DoDirSyncSnapDiff : public C_MirrorContext { + public: + C_DoDirSyncSnapDiff( + const std::string &dir_root, const std::string &cur_path, + ceph_snapdiff_info *sd_info, const Snapshot ¤t, + const Snapshot &prev, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, PeerReplayer *replayer, + std::shared_ptr &dir_sync_stat) + : C_MirrorContext(op_counter, fin, replayer, dir_sync_stat), + dir_root(dir_root), cur_path(cur_path), sd_info(sd_info), + current(current), prev(prev), thread_pool(thread_pool), fh(fh), + dir_registry(dir_registry) {} + void finish(int r) override; + + private: + const std::string &dir_root; + std::string cur_path; + ceph_snapdiff_info *sd_info; + const Snapshot ¤t; + const Snapshot &prev; + std::shared_ptr &thread_pool; + const FHandles &fh; + std::shared_ptr &dir_registry; + }; + + class C_TransferAndSyncFile : public C_MirrorContext { + public: + C_TransferAndSyncFile(const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, + unsigned int change_mask, const FHandles &fh, + std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + PeerReplayer *replayer, + std::shared_ptr &dir_sync_stat) + : C_MirrorContext(op_counter, fin, replayer, dir_sync_stat), + dir_root(dir_root), epath(epath), stx(stx), change_mask(change_mask), + fh(fh), dir_registry(dir_registry) {} + + void finish(int r) override; + void add_into_stat() override; + void remove_from_stat() override; + friend class TaskSinkContext; + + private: + const std::string &dir_root; + std::string epath; + struct ceph_statx stx; + unsigned int change_mask; + const FHandles &fh; + std::shared_ptr &dir_registry; + }; + + class C_CleanUpRemoteDir : public C_MirrorContext { + public: + C_CleanUpRemoteDir(const std::string &dir_root, const std::string &epath, + const FHandles &fh, + std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + PeerReplayer *replayer, + std::shared_ptr &dir_sync_stat) + : C_MirrorContext(op_counter, fin, replayer, dir_sync_stat), + dir_root(dir_root), epath(epath), fh(fh), dir_registry(dir_registry) { + } + + void finish(int r) override; + + private: + const std::string &dir_root; + std::string epath; + const FHandles &fh; + std::shared_ptr &dir_registry; }; + class C_DeleteFile : public C_MirrorContext { + public: + C_DeleteFile(const std::string &dir_root, const std::string &epath, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + PeerReplayer *replayer, + std::shared_ptr &dir_sync_stat) + : dir_root(dir_root), epath(epath), fh(fh), dir_registry(dir_registry), + C_MirrorContext(op_counter, fin, replayer, dir_sync_stat) {} + + void finish(int r) override; + + private: + const std::string &dir_root; + std::string epath; + const FHandles &fh; + std::shared_ptr &dir_registry; + }; + + friend class C_MirrorContext; + friend class C_DoDirSync; + friend class C_TransferFile; + friend class C_CleanUpRemoteDir; + friend class C_DoDirSyncSnapDiff; + friend class C_DeleteFile; + void _inc_failed_count(const std::string &dir_root) { auto max_failures = g_ceph_context->_conf.get_val( "cephfs_mirror_max_consecutive_failures_per_directory"); auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.last_failed = clock::now(); - if (++sync_stat.nr_failures >= max_failures && !sync_stat.failed) { - sync_stat.failed = true; + sync_stat->last_failed = clock::now(); + if (++sync_stat->nr_failures >= max_failures && !sync_stat->failed) { + sync_stat->failed = true; ++m_service_daemon_stats.failed_dir_count; m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, @@ -169,82 +684,80 @@ private: } void _reset_failed_count(const std::string &dir_root) { auto &sync_stat = m_snap_sync_stats.at(dir_root); - if (sync_stat.failed) { + if (sync_stat->failed) { ++m_service_daemon_stats.recovered_dir_count; m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer, SERVICE_DAEMON_RECOVERED_DIR_COUNT_KEY, m_service_daemon_stats.recovered_dir_count); } - sync_stat.nr_failures = 0; - sync_stat.failed = false; - sync_stat.last_failed = boost::none; - sync_stat.last_failed_reason = boost::none; + sync_stat->nr_failures = 0; + sync_stat->failed = false; + sync_stat->last_failed = boost::none; + sync_stat->last_failed_reason = boost::none; } void _set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, const std::string &snap_name) { auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.last_synced_snap = std::make_pair(snap_id, snap_name); - sync_stat.current_syncing_snap = boost::none; + sync_stat->last_synced_snap = std::make_pair(snap_id, snap_name); + sync_stat->current_syncing_snap = boost::none; } void set_last_synced_snap(const std::string &dir_root, uint64_t snap_id, const std::string &snap_name) { std::scoped_lock locker(m_lock); _set_last_synced_snap(dir_root, snap_id, snap_name); auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.sync_bytes = 0; + sync_stat->sync_bytes = 0; } void set_current_syncing_snap(const std::string &dir_root, uint64_t snap_id, const std::string &snap_name) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.current_syncing_snap = std::make_pair(snap_id, snap_name); + sync_stat->current_syncing_snap = std::make_pair(snap_id, snap_name); } void clear_current_syncing_snap(const std::string &dir_root) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.current_syncing_snap = boost::none; + sync_stat->current_syncing_snap = boost::none; } void inc_deleted_snap(const std::string &dir_root) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); - ++sync_stat.deleted_snap_count; + ++sync_stat->deleted_snap_count; } void inc_renamed_snap(const std::string &dir_root) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); - ++sync_stat.renamed_snap_count; + ++sync_stat->renamed_snap_count; } void set_last_synced_stat(const std::string &dir_root, uint64_t snap_id, const std::string &snap_name, double duration) { std::scoped_lock locker(m_lock); _set_last_synced_snap(dir_root, snap_id, snap_name); auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.last_synced = clock::now(); - sync_stat.last_sync_duration = duration; - sync_stat.last_sync_bytes = sync_stat.sync_bytes; - ++sync_stat.synced_snap_count; + sync_stat->last_synced = clock::now(); + sync_stat->last_sync_duration = duration; + sync_stat->last_sync_bytes = sync_stat->sync_bytes; + ++sync_stat->synced_snap_count; } void inc_sync_bytes(const std::string &dir_root, const uint64_t& b) { std::scoped_lock locker(m_lock); auto &sync_stat = m_snap_sync_stats.at(dir_root); - sync_stat.sync_bytes += b; + sync_stat->sync_bytes += b; } - bool should_backoff(const std::string &dir_root, int *retval) { + bool should_backoff(std::shared_ptr &dir_registry, int *retval) { if (m_fs_mirror->is_blocklisted()) { *retval = -EBLOCKLISTED; return true; } - std::scoped_lock locker(m_lock); - if (is_stopping()) { + if (m_stopping) { // ceph defines EBLOCKLISTED to ESHUTDOWN (108). so use // EINPROGRESS to identify shutdown. *retval = -EINPROGRESS; return true; } - auto &dr = m_registered.at(dir_root); - if (dr.canceled) { + if (dir_registry->canceled || dir_registry->failed) { *retval = -ECANCELED; return true; } @@ -253,6 +766,25 @@ private: return false; } + int get_failed_reason(const std::string &dir_root) { + std::scoped_lock lock(m_lock); + auto &dr = m_registered.at(dir_root); + return dr->failed_reason; + } + + void mark_failed(const std::string &dir_root, int reason) { + std::scoped_lock lock(m_lock); + auto it = m_registered.find(dir_root); + if (it == m_registered.end()) { + return; + } + if (it->second->failed) { + return; + } + it->second->failed = true; + it->second->failed_reason = reason; + } + typedef std::vector> SnapshotReplayers; CephContext *m_cct; @@ -261,9 +793,9 @@ private: Filesystem m_filesystem; Peer m_peer; // probably need to be encapsulated when supporting cancelations - std::map m_registered; + std::map> m_registered; std::vector m_directories; - std::map m_snap_sync_stats; + std::map> m_snap_sync_stats; MountRef m_local_mount; ServiceDaemon *m_service_daemon; PeerReplayerAdminSocketHook *m_asok_hook = nullptr; @@ -272,7 +804,7 @@ private: ceph::condition_variable m_cond; RadosRef m_remote_cluster; MountRef m_remote_mount; - bool m_stopping = false; + std::atomic m_stopping = false; SnapshotReplayers m_replayers; ServiceDaemonStats m_service_daemon_stats; @@ -284,8 +816,9 @@ private: boost::optional pick_directory(); int register_directory(const std::string &dir_root, SnapshotReplayerThread *replayer); void unregister_directory(const std::string &dir_root); - int try_lock_directory(const std::string &dir_root, SnapshotReplayerThread *replayer, - DirRegistry *registry); + int try_lock_directory(const std::string &dir_root, + SnapshotReplayerThread *replayer, + std::shared_ptr ®istry); void unlock_directory(const std::string &dir_root, const DirRegistry ®istry); int sync_snaps(const std::string &dir_root, std::unique_lock &locker); @@ -296,37 +829,113 @@ private: int propagate_snap_deletes(const std::string &dir_root, const std::set &snaps); int propagate_snap_renames(const std::string &dir_root, const std::set> &snaps); - int propagate_deleted_entries(const std::string &dir_root, const std::string &epath, - const FHandles &fh); + + int delete_file(const std::string &dir_root, const std::string &epath, + const FHandles &fh, + std::shared_ptr &dir_registry, + std::shared_ptr &dir_sync_stat); + + int propagate_deleted_entries( + const std::string &dir_root, const std::string &epath, + std::unordered_map &common_entry_info, + uint64_t &common_entry_info_count, const FHandles &fh, + std::shared_ptr &thread_pool, + std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat); + int cleanup_remote_dir(const std::string &dir_root, const std::string &epath, + const FHandles &fh, + std::shared_ptr &dir_registry, + std::shared_ptr &dir_sync_stat); + + void should_sync_entry(const std::string &epath, + const struct ceph_statx &cstx, + const struct ceph_statx &pstx, bool create_fresh, + bool *need_data_sync, uint64_t &change_mask, const FHandles &fh); - int should_sync_entry(const std::string &epath, const struct ceph_statx &cstx, - const FHandles &fh, bool *need_data_sync, bool *need_attr_sync); - int open_dir(MountRef mnt, const std::string &dir_path, boost::optional snap_id); int pre_sync_check_and_open_handles(const std::string &dir_root, const Snapshot ¤t, boost::optional prev, FHandles *fh); - void post_sync_close_handles(const FHandles &fh); + + void build_change_mask(const struct ceph_statx &pstx, + const struct ceph_statx &cstx, bool create_fresh, + unsigned int &change_mask); + + void + do_dir_sync(const std::string &dir_root, const std::string &cur_path, + const struct ceph_statx &cstx, ceph_dir_result *dirp, + bool create_fresh, bool stat_known, CommonEntryInfo &entry_info, + uint64_t common_entry_info_count, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat); int do_synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev); + int handle_duplicate_entry( + const std::string &dir_root, const std::string &cur_path, + const ceph_snapdiff_entry_t &sd_entry, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat); + + void do_dir_sync_using_snapdiff( + const std::string &dir_root, const std::string &cur_path, + ceph_snapdiff_info *sd_info, const Snapshot ¤t, + const Snapshot &prev, + std::shared_ptr &thread_pool, + const FHandles &fh, std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat); + int do_synchronize(const std::string &dir_root, const Snapshot ¤t); int synchronize(const std::string &dir_root, const Snapshot ¤t, boost::optional prev); + + int _do_sync_snaps(const std::string &dir_root, uint64_t cur_snap_id, + std::string cur_snap_name, uint64_t last_snap_id, + std::string last_snap_name); + int do_sync_snaps(const std::string &dir_root); - int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh); - int remote_file_op(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, - const FHandles &fh, bool need_data_sync, bool need_attr_sync); - int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx, - const FHandles &fh); - int sync_perms(const std::string& path); + int _remote_mkdir(const std::string &epath, const struct ceph_statx &cstx, + const FHandles &fh); + + int remote_mkdir(const std::string &epath, const struct ceph_statx &cstx, + bool create_fresh, unsigned int change_mask, + const FHandles &fh, + std::shared_ptr &dir_sync_stat); + + int remote_file_op( + const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, bool need_data_sync, + unsigned int change_mask, const FHandles &fh, + std::shared_ptr &thread_pool, + std::shared_ptr &dir_registry, + std::atomic &op_counter, Context *fin, + std::shared_ptr &dir_sync_stat); + int sync_attributes(const std::string &epath, const struct ceph_statx &stx, + unsigned int change_mask, bool is_dir, + const FHandles &fh); + int copy_to_remote(const std::string &dir_root, const std::string &epath, + const struct ceph_statx &stx, const FHandles &fh, + std::shared_ptr &dir_registry); + void transfer_and_sync_file(const std::string &dir_root, + const std::string &epath, + const struct ceph_statx &stx, + unsigned int change_mask, const FHandles &fh, + std::shared_ptr &dir_registry, + std::shared_ptr &dir_sync_stat); + int sync_perms(const std::string &path); }; } // namespace mirror } // namespace cephfs -#endif // CEPHFS_MIRROR_PEER_REPLAYER_H +#endif // CEPHFS_MIRROR_PEER_REPLAYER_H \ No newline at end of file -- 2.39.5