]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-client.git/commitdiff
ceph_san: moving to magzaines
authorAlex Markuze <amarkuze@redhat.com>
Sun, 16 Mar 2025 15:27:09 +0000 (15:27 +0000)
committerAlex Markuze <amarkuze@redhat.com>
Sun, 16 Mar 2025 15:27:59 +0000 (15:27 +0000)
include/linux/ceph/ceph_san.h
include/linux/ceph/ceph_san_batch.h [new file with mode: 0644]
include/linux/ceph/ceph_san_pagefrag.h [new file with mode: 0644]
net/ceph/Makefile
net/ceph/ceph_san.c
net/ceph/ceph_san_batch.c [new file with mode: 0644]
net/ceph/ceph_san_pagefrag.c [new file with mode: 0644]

index e91e14efa3df017471d56c331143aaf45c3a83cc..cb804baa5175f11abadeb33687d9dfa7eb9a0d46 100644 (file)
@@ -6,71 +6,11 @@
 #include <linux/spinlock.h>
 #include <linux/sched.h>
 #include <linux/slab.h>
+#include "ceph_san_pagefrag.h"
 
 DECLARE_PER_CPU(struct ceph_san_percore_logger, ceph_san_percore);
 DECLARE_PER_CPU(struct cephsan_pagefrag, ceph_san_pagefrag);
 
-/*
- * Pagefrag Allocator for ceph_san:
- *  - A contiguous 4-page buffer (16KB) is allocated.
- *  - The allocator maintains two unsigned int indices (head and tail) into the buffer.
- *  - cephsan_pagefrag_alloc(n) returns a pointer to n contiguous bytes (if available) and
- *    advances the head pointer by n bytes (wrapping around at the end).
- *  - cephsan_pagefrag_free(n) advances the tail pointer by n bytes.
- *
- * This simple ring-buffer allocator is intended for short-lived allocations in the Ceph SAN code.
- */
-
-#define CEPHSAN_PAGEFRAG_SIZE  (1<<22)  /* 4MB */
-
-/* Pagefrag allocator structure */
-struct cephsan_pagefrag {
-    struct page *pages;
-    void *buffer;
-    unsigned int head;
-    unsigned int tail;
-};
-
-/* The ceph san log entry structure is now private to ceph_san.c.
- * Use log_cephsan() below.
- */
-
-/* get_cephsan() and alloc_cephsan() have been removed from the public API. */
-
-/* New log_cephsan now accepts a line number, a pointer to a u8 buffer (typically function name),
- * and an optional parameter. It uses the current task's journal_info field.
- */
-
-int cephsan_pagefrag_init(struct cephsan_pagefrag *pf);
-int cephsan_pagefrag_init_with_buffer(struct cephsan_pagefrag *pf, void *buffer, size_t size);
-
-
-/**
- * cephsan_pagefrag_alloc - Allocate bytes from the pagefrag buffer.
- * @n: number of bytes to allocate.
- *
- * Allocates @n bytes if there is sufficient free space in the buffer.
- * Advances the head pointer by @n bytes (wrapping around if needed).
- *
- * Return: pointer to the allocated memory, or NULL if not enough space.
- */
-u64 cephsan_pagefrag_alloc(struct cephsan_pagefrag *pf, unsigned int n);
-
-/**
- * cephsan_pagefrag_free - Free bytes in the pagefrag allocator.
- * @n: number of bytes to free.
- *
- * Advances the tail pointer by @n bytes (wrapping around if needed).
- */
-void cephsan_pagefrag_free(struct cephsan_pagefrag *pf, unsigned int n);
-/**
- * cephsan_pagefrag_deinit - Deinitialize the pagefrag allocator.
- *
- * Frees the allocated buffer and resets the head and tail pointers.
- */
-void cephsan_pagefrag_deinit(struct cephsan_pagefrag *pf);
-
-
 #ifdef CONFIG_DEBUG_FS
 #define CEPH_SAN_MAX_LOGS (8192 << 2) //4MB per core
 #define LOG_BUF_SIZE 256
@@ -111,8 +51,9 @@ struct ceph_san_log_entry_tls {
     u64 ts;
     char *buf;
 };
+
 struct histogram {
-       u64 counters[32];
+    u64 counters[32];
 };
 
 struct ceph_san_percore_logger {
@@ -121,6 +62,7 @@ struct ceph_san_percore_logger {
     struct ceph_san_log_entry *logs;
     struct histogram histogram;
 };
+
 struct ceph_san_tls_logger {
     char comm[TASK_COMM_LEN];
     pid_t pid;
@@ -130,10 +72,9 @@ struct ceph_san_tls_logger {
 
 /* Bundled TLS context containing both logger and memory caches */
 struct tls_ceph_san_context {
-       u64 sig;
+    u64 sig;
     struct list_head list;  /* For global list of contexts */
     struct ceph_san_tls_logger logger;
-    /* We no longer use pagefrag for log entries */
 };
 
 /* Global list of all TLS contexts and its protection lock */
diff --git a/include/linux/ceph/ceph_san_batch.h b/include/linux/ceph/ceph_san_batch.h
new file mode 100644 (file)
index 0000000..0c56670
--- /dev/null
@@ -0,0 +1,51 @@
+#ifndef CEPH_SAN_BATCH_H
+#define CEPH_SAN_BATCH_H
+
+#include <linux/types.h>
+#include <linux/percpu.h>
+#include <linux/spinlock.h>
+
+/* Size of each magazine (number of elements it can hold) */
+#define CEPH_SAN_MAGAZINE_SIZE 32
+
+/* Structure representing a single magazine */
+struct ceph_san_magazine {
+    struct list_head list;      /* For linking in global pools */
+    unsigned int count;         /* Number of elements currently in magazine */
+    void *elements[CEPH_SAN_MAGAZINE_SIZE];
+};
+
+/* Per-CPU magazine state */
+struct ceph_san_cpu_magazine {
+    struct ceph_san_magazine *mag;  /* Current magazine for this CPU */
+};
+
+/* Global magazine pools */
+struct ceph_san_batch {
+    struct list_head full_magazines;   /* List of full magazines */
+    struct list_head empty_magazines;  /* List of empty magazines */
+    spinlock_t full_lock;             /* Protects full magazine list and count */
+    spinlock_t empty_lock;            /* Protects empty magazine list and count */
+    unsigned int nr_full;             /* Protected by full_lock */
+    unsigned int nr_empty;            /* Protected by empty_lock */
+    struct ceph_san_cpu_magazine __percpu *cpu_magazines; /* Per-CPU magazines */
+    struct kmem_cache *magazine_cache; /* Cache for magazine allocations */
+    void *(*alloc_element)(void);     /* Function to allocate new elements */
+    void (*free_element)(void *);     /* Function to free elements */
+};
+
+/* Initialize the batching system */
+int ceph_san_batch_init(struct ceph_san_batch *batch,
+                       void *(*alloc_element)(void),
+                       void (*free_element)(void *));
+
+/* Clean up the batching system */
+void ceph_san_batch_cleanup(struct ceph_san_batch *batch);
+
+/* Get an element from the batch */
+void *ceph_san_batch_get(struct ceph_san_batch *batch);
+
+/* Put an element back into the batch */
+void ceph_san_batch_put(struct ceph_san_batch *batch, void *element);
+
+#endif /* CEPH_SAN_BATCH_H */
diff --git a/include/linux/ceph/ceph_san_pagefrag.h b/include/linux/ceph/ceph_san_pagefrag.h
new file mode 100644 (file)
index 0000000..5690741
--- /dev/null
@@ -0,0 +1,26 @@
+#ifndef CEPH_SAN_PAGEFRAG_H
+#define CEPH_SAN_PAGEFRAG_H
+
+#include <linux/types.h>
+#include <linux/mm.h>
+
+#define CEPHSAN_PAGEFRAG_SIZE  (1<<22)  /* 4MB */
+
+/* Pagefrag allocator structure */
+struct cephsan_pagefrag {
+    struct page *pages;
+    void *buffer;
+    unsigned int head;
+    unsigned int tail;
+};
+
+int cephsan_pagefrag_init(struct cephsan_pagefrag *pf);
+int cephsan_pagefrag_init_with_buffer(struct cephsan_pagefrag *pf, void *buffer, size_t size);
+u64 cephsan_pagefrag_alloc(struct cephsan_pagefrag *pf, unsigned int n);
+void cephsan_pagefrag_free(struct cephsan_pagefrag *pf, unsigned int n);
+void cephsan_pagefrag_deinit(struct cephsan_pagefrag *pf);
+void *cephsan_pagefrag_get_ptr(struct cephsan_pagefrag *pf, u64 val);
+
+#define CEPHSAN_PAGEFRAG_GET_N(val)  ((val) >> 32)
+
+#endif /* CEPH_SAN_PAGEFRAG_H */
index bbfff0dd9081a72a2e8674201ea0826a63f36c33..1df96e9561fa7dc341f9c837cf9f7198f70c882a 100644 (file)
@@ -10,7 +10,7 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
        osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
        striper.o \
        debugfs.o \
-       ceph_san.o \
+       ceph_san.o ceph_san_pagefrag.o ceph_san_batch.o \
        auth.o auth_none.o \
        crypto.o armor.o \
        auth_x.o \
index 77e7cc8aaf89050594409ade0b9f5535272f6ffc..710ba1a69f5fce34cd0fa81845e8682540ebf6a8 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/jiffies.h>
 #include <linux/ceph/ceph_san.h>
 #include <linux/mm.h>
+#include <linux/module.h>
 
 /* Use per-core TLS logger; no global list or lock needed */
 DEFINE_PER_CPU(struct ceph_san_percore_logger, ceph_san_percore);
@@ -24,12 +25,8 @@ struct kmem_cache *ceph_san_log_256_cache;
 
 struct kmem_cache *ceph_san_tls_logger_cache;
 
-static inline void *cephsan_pagefrag_get_ptr(struct cephsan_pagefrag *pf, u64 val);
-/* The definitions for struct ceph_san_log_entry and struct ceph_san_tls_logger
- * have been moved to cephsan.h (under CONFIG_DEBUG_FS) to avoid duplication.
- */
-
 #define CEPH_SAN_SIG 0xDEADC0DE
+
 /* Release function for TLS storage */
 static void ceph_san_tls_release(void *ptr)
 {
@@ -37,9 +34,10 @@ static void ceph_san_tls_release(void *ptr)
     if (!context)
         return;
 
-    if (context->sig != CEPH_SAN_SIG)
-           pr_err("sig is wrong %p %llx != %llx", context, context->sig, CEPH_SAN_SIG);
-           return;
+    if (context->sig != CEPH_SAN_SIG) {
+        pr_err("sig is wrong %p %llx != %llx", context, context->sig, CEPH_SAN_SIG);
+        return;
+    }
 
     /* Remove from global list with lock protection */
     spin_lock(&g_ceph_san_contexts_lock);
@@ -47,24 +45,25 @@ static void ceph_san_tls_release(void *ptr)
     spin_unlock(&g_ceph_san_contexts_lock);
 
     /* Free all log entries */
-        int head_idx = context->logger.head_idx & (CEPH_SAN_MAX_LOGS - 1);
-        int tail_idx = (head_idx + 1) & (CEPH_SAN_MAX_LOGS - 1);
+    int head_idx = context->logger.head_idx & (CEPH_SAN_MAX_LOGS - 1);
+    int tail_idx = (head_idx + 1) & (CEPH_SAN_MAX_LOGS - 1);
 
     for (int i = tail_idx; (i & (CEPH_SAN_MAX_LOGS - 1)) != head_idx; i++) {
         struct ceph_san_log_entry_tls *entry = &context->logger.logs[i & (CEPH_SAN_MAX_LOGS - 1)];
         if (entry->buf) {
             if (entry->ts & 0x1)
-                    kmem_cache_free(ceph_san_log_256_cache, entry->buf);
-                       else
-                    kmem_cache_free(ceph_san_log_128_cache, entry->buf);
-                       entry->buf = NULL;
-               }
-       }
+                kmem_cache_free(ceph_san_log_256_cache, entry->buf);
+            else
+                kmem_cache_free(ceph_san_log_128_cache, entry->buf);
+            entry->buf = NULL;
+        }
+    }
 
     kmem_cache_free(ceph_san_tls_logger_cache, context);
 }
 
-static struct tls_ceph_san_context *get_cephsan_context(void) {
+static struct tls_ceph_san_context *get_cephsan_context(void)
+{
     struct tls_ceph_san_context *context;
 
     context = current->tls.state;
@@ -72,15 +71,15 @@ static struct tls_ceph_san_context *get_cephsan_context(void) {
         return context;
 
     context = kmem_cache_alloc(ceph_san_tls_logger_cache, GFP_KERNEL);
-       if (!context) {
-               pr_err("Failed to allocate TLS logger for PID %d\n", current->pid);
-               return NULL;
-       }
+    if (!context) {
+        pr_err("Failed to allocate TLS logger for PID %d\n", current->pid);
+        return NULL;
+    }
 
-       context->logger.pid = current->pid;
-       memcpy(context->logger.comm, current->comm, TASK_COMM_LEN);
+    context->logger.pid = current->pid;
+    memcpy(context->logger.comm, current->comm, TASK_COMM_LEN);
 
-     /* Initialize list entry */
+    /* Initialize list entry */
     INIT_LIST_HEAD(&context->list);
     context->sig = CEPH_SAN_SIG;
 
@@ -95,7 +94,8 @@ static struct tls_ceph_san_context *get_cephsan_context(void) {
     return context;
 }
 
-void log_cephsan_tls(char *buf) {
+void log_cephsan_tls(char *buf)
+{
     /* Use the task's TLS storage */
     int len = strlen(buf);
     struct tls_ceph_san_context *ctx;
@@ -125,10 +125,10 @@ void log_cephsan_tls(char *buf) {
         /* Allocate new buffer from appropriate cache */
         if (len <= LOG_BUF_SMALL) {
             new_buf = kmem_cache_alloc(ceph_san_log_128_cache, GFP_KERNEL);
-                       entry->ts = jiffies & ~0x1;
+            entry->ts = jiffies & ~0x1;
         } else {
             new_buf = kmem_cache_alloc(ceph_san_log_256_cache, GFP_KERNEL);
-                       entry->ts = jiffies | 0x1;
+            entry->ts = jiffies | 0x1;
         }
     } else {
         /* Reuse existing buffer since size category hasn't changed */
@@ -145,7 +145,8 @@ void log_cephsan_tls(char *buf) {
     logger->head_idx = head_idx;
 }
 
-static void log_cephsan_percore(char *buf) {
+static void log_cephsan_percore(char *buf)
+{
     /* Use the per-core TLS logger */
     u64 buf_idx;
     int len = strlen(buf);
@@ -173,7 +174,8 @@ static void log_cephsan_percore(char *buf) {
     }
 }
 
-void log_cephsan(char *buf) {
+void log_cephsan(char *buf)
+{
     log_cephsan_percore(buf);
     log_cephsan_tls(buf);
 }
@@ -195,7 +197,7 @@ void cephsan_cleanup(void)
         }
     }
 #if 0
-       /* Let the TLS contexts cleanup lazily */
+    /* Let the TLS contexts cleanup lazily */
     if (ceph_san_tls_logger_cache) {
         kmem_cache_destroy(ceph_san_tls_logger_cache);
         ceph_san_tls_logger_cache = NULL;
@@ -281,129 +283,6 @@ cleanup_128_cache:
 }
 EXPORT_SYMBOL(cephsan_init);
 
-/**
- * cephsan_pagefrag_init - Initialize the pagefrag allocator.
- *
- * Allocates a 16KB contiguous buffer and resets head and tail pointers.
- *
- * Return: 0 on success, negative error code on failure.
- */
-int cephsan_pagefrag_init(struct cephsan_pagefrag *pf)
-{
-       pf->pages = alloc_pages(GFP_KERNEL, get_order(CEPHSAN_PAGEFRAG_SIZE));
-       if (!pf->pages)
-               return -ENOMEM;
-
-       pf->buffer = page_address(pf->pages);
-       pf->head = 0;
-       pf->tail = 0;
-       return 0;
-}
-EXPORT_SYMBOL(cephsan_pagefrag_init);
-
-/**
- * cephsan_pagefrag_init_with_buffer - Initialize pagefrag with an existing buffer
- * @pf: pagefrag allocator to initialize
- * @buffer: pre-allocated buffer to use
- * @size: size of the buffer
- *
- * Return: 0 on success
- */
-int cephsan_pagefrag_init_with_buffer(struct cephsan_pagefrag *pf, void *buffer, size_t size)
-{
-    pf->pages = NULL; /* No pages allocated, using provided buffer */
-    pf->buffer = buffer;
-    pf->head = 0;
-    pf->tail = 0;
-    return 0;
-}
-EXPORT_SYMBOL(cephsan_pagefrag_init_with_buffer);
-
-/**
- * cephsan_pagefrag_alloc - Allocate bytes from the pagefrag buffer.
- * @n: number of bytes to allocate.
- *
- * Allocates @n bytes if there is sufficient free space in the buffer.
- * Advances the head pointer by @n bytes (wrapping around if needed).
- *
- * Return: pointer to the allocated memory, or NULL if not enough space.
- */
-u64 cephsan_pagefrag_alloc(struct cephsan_pagefrag *pf, unsigned int n)
-{
-       /* Case 1: tail > head */
-       if (pf->tail > pf->head) {
-               if (pf->tail - pf->head >= n) {
-                       unsigned int prev_head = pf->head;
-                       pf->head += n;
-                       return ((u64)n << 32) | prev_head;
-               } else {
-                       pr_err("Not enough space in pagefrag buffer\n");
-                       return 0;
-               }
-       }
-       /* Case 2: tail <= head */
-       if (pf->head + n <= CEPHSAN_PAGEFRAG_SIZE) {
-               /* Normal allocation */
-               unsigned int prev_head = pf->head;
-               pf->head += n;
-               return ((u64)n << 32) | prev_head;
-       } else {
-               /* Need to wrap around */
-               if (n <= pf->tail) {
-                       pf->head = n;
-                       n += CEPHSAN_PAGEFRAG_SIZE - pf->head;
-                       return ((u64)n << 32) | 0;
-               } else {
-                       pr_err("Not enough space for wrap-around allocation\n");
-                       return 0;
-               }
-       }
-       pr_err("impossible: Not enough space in pagefrag buffer\n");
-       return 0;
-}
-EXPORT_SYMBOL(cephsan_pagefrag_alloc);
-/**
- * cephsan_pagefrag_get_ptr - Get buffer pointer from pagefrag allocation result
- * @pf: pagefrag allocator
- * @val: return value from cephsan_pagefrag_alloc
- *
- * Return: pointer to allocated buffer region
- */
-static inline void *cephsan_pagefrag_get_ptr(struct cephsan_pagefrag *pf, u64 val)
-{
-       return pf->buffer + (val & 0xFFFFFFFF);
-}
-
-#define CEPHSAN_PAGEFRAG_GET_N(val)  ((val) >> 32)
-
-/**
- * cephsan_pagefrag_free - Free bytes in the pagefrag allocator.
- * @n: number of bytes to free.
- *
- * Advances the tail pointer by @n bytes (wrapping around if needed).
- */
-void cephsan_pagefrag_free(struct cephsan_pagefrag *pf, unsigned int n)
-{
-       pf->tail = (pf->tail + n) & (CEPHSAN_PAGEFRAG_SIZE - 1);
-}
-EXPORT_SYMBOL(cephsan_pagefrag_free);
-/**
- * cephsan_pagefrag_deinit - Deinitialize the pagefrag allocator.
- *
- * Frees the allocated buffer and resets the head and tail pointers.
- */
-void cephsan_pagefrag_deinit(struct cephsan_pagefrag *pf)
-{
-    if (pf->pages) {
-        free_pages((unsigned long)pf->pages, get_order(CEPHSAN_PAGEFRAG_SIZE));
-        pf->pages = NULL;
-    }
-    /* Don't free buffer if it was provided externally */
-       pf->buffer = NULL;
-       pf->head = pf->tail = 0;
-}
-EXPORT_SYMBOL(cephsan_pagefrag_deinit);
-
 /**
  * cephsan_dump_all_contexts - Dump logs from all TLS contexts to a buffer
  * @buf: Buffer to write logs to
diff --git a/net/ceph/ceph_san_batch.c b/net/ceph/ceph_san_batch.c
new file mode 100644 (file)
index 0000000..1f9cb5c
--- /dev/null
@@ -0,0 +1,313 @@
+#include <linux/slab.h>
+#include <linux/module.h>
+#include <linux/percpu.h>
+#include <linux/spinlock.h>
+#include <linux/list.h>
+#include <linux/ceph/ceph_san_batch.h>
+
+/* Number of magazines to preallocate during initialization */
+#define CEPH_SAN_INIT_MAGAZINES 4
+
+static struct ceph_san_magazine *alloc_magazine(struct ceph_san_batch *batch, bool fill)
+{
+    struct ceph_san_magazine *mag;
+    int i;
+
+    mag = kmem_cache_alloc(batch->magazine_cache, GFP_KERNEL);
+    if (!mag)
+        return NULL;
+
+    INIT_LIST_HEAD(&mag->list);
+    mag->count = 0;
+
+    /* Pre-fill magazine if requested and allocation function exists */
+    if (fill && batch->alloc_element) {
+        for (i = 0; i < CEPH_SAN_MAGAZINE_SIZE; i++) {
+            void *element = batch->alloc_element();
+            if (!element) {
+                /* Clean up already allocated elements on failure */
+                while (mag->count > 0)
+                    batch->free_element(mag->elements[--mag->count]);
+                kmem_cache_free(batch->magazine_cache, mag);
+                return NULL;
+            }
+            mag->elements[mag->count++] = element;
+        }
+    }
+    return mag;
+}
+
+static void free_magazine(struct ceph_san_batch *batch, struct ceph_san_magazine *mag)
+{
+    /* Free all elements in the magazine */
+    while (mag->count > 0)
+        batch->free_element(mag->elements[--mag->count]);
+    kmem_cache_free(batch->magazine_cache, mag);
+}
+
+/**
+ * ceph_san_batch_init - Initialize the batching system
+ * @batch: Batch structure to initialize
+ * @alloc_element: Function to allocate new elements
+ * @free_element: Function to free elements
+ *
+ * Allocates and initializes the per-CPU magazines and global pools.
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int ceph_san_batch_init(struct ceph_san_batch *batch,
+                       void *(*alloc_element)(void),
+                       void (*free_element)(void *))
+{
+    int cpu, i;
+    struct ceph_san_cpu_magazine *cpu_mag;
+    struct ceph_san_magazine *mag;
+
+    if (!alloc_element || !free_element)
+        return -EINVAL;
+
+    /* Store allocation and free functions */
+    batch->alloc_element = alloc_element;
+    batch->free_element = free_element;
+
+    /* Initialize counters */
+    batch->nr_full = 0;
+    batch->nr_empty = 0;
+
+    /* Create magazine cache */
+    batch->magazine_cache = kmem_cache_create("ceph_san_magazine",
+                                            sizeof(struct ceph_san_magazine),
+                                            0, SLAB_HWCACHE_ALIGN, NULL);
+    if (!batch->magazine_cache)
+        return -ENOMEM;
+
+    /* Initialize global magazine lists */
+    INIT_LIST_HEAD(&batch->full_magazines);
+    INIT_LIST_HEAD(&batch->empty_magazines);
+    spin_lock_init(&batch->full_lock);
+    spin_lock_init(&batch->empty_lock);
+
+    /* Allocate per-CPU magazines */
+    batch->cpu_magazines = alloc_percpu(struct ceph_san_cpu_magazine);
+    if (!batch->cpu_magazines)
+        goto cleanup_cache;
+
+    /* Initialize per-CPU magazines */
+    for_each_possible_cpu(cpu) {
+        cpu_mag = per_cpu_ptr(batch->cpu_magazines, cpu);
+        cpu_mag->mag = NULL;
+    }
+
+    /* Pre-allocate some magazines - half empty, half full */
+    for (i = 0; i < CEPH_SAN_INIT_MAGAZINES; i++) {
+        /* Alternate between empty and full magazines */
+        mag = alloc_magazine(batch, i & 1);
+        if (!mag)
+            goto cleanup;
+
+        if (i & 1) {
+            /* Add full magazine to full pool */
+            spin_lock(&batch->full_lock);
+            list_add(&mag->list, &batch->full_magazines);
+            batch->nr_full++;
+            spin_unlock(&batch->full_lock);
+        } else {
+            /* Add empty magazine to empty pool */
+            spin_lock(&batch->empty_lock);
+            list_add(&mag->list, &batch->empty_magazines);
+            batch->nr_empty++;
+            spin_unlock(&batch->empty_lock);
+        }
+    }
+
+    return 0;
+
+cleanup:
+    ceph_san_batch_cleanup(batch);
+    return -ENOMEM;
+
+cleanup_cache:
+    kmem_cache_destroy(batch->magazine_cache);
+    return -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_san_batch_init);
+
+/**
+ * ceph_san_batch_cleanup - Clean up the batching system
+ * @batch: Batch structure to clean up
+ */
+void ceph_san_batch_cleanup(struct ceph_san_batch *batch)
+{
+    int cpu;
+    struct ceph_san_magazine *mag, *tmp;
+    struct ceph_san_cpu_magazine *cpu_mag;
+
+    /* Free per-CPU magazines */
+    if (batch->cpu_magazines) {
+        for_each_possible_cpu(cpu) {
+            cpu_mag = per_cpu_ptr(batch->cpu_magazines, cpu);
+            if (cpu_mag->mag)
+                free_magazine(batch, cpu_mag->mag);
+        }
+        free_percpu(batch->cpu_magazines);
+    }
+
+    /* Free magazines in the full pool */
+    spin_lock(&batch->full_lock);
+    list_for_each_entry_safe(mag, tmp, &batch->full_magazines, list) {
+        list_del(&mag->list);
+        batch->nr_full--;
+        free_magazine(batch, mag);
+    }
+    spin_unlock(&batch->full_lock);
+
+    /* Free magazines in the empty pool */
+    spin_lock(&batch->empty_lock);
+    list_for_each_entry_safe(mag, tmp, &batch->empty_magazines, list) {
+        list_del(&mag->list);
+        batch->nr_empty--;
+        free_magazine(batch, mag);
+    }
+    spin_unlock(&batch->empty_lock);
+
+    /* Destroy magazine cache */
+    if (batch->magazine_cache)
+        kmem_cache_destroy(batch->magazine_cache);
+}
+EXPORT_SYMBOL(ceph_san_batch_cleanup);
+
+/**
+ * ceph_san_batch_get - Get an element from the batch
+ * @batch: Batch to get element from
+ *
+ * Return: Element from the magazine, or NULL if none available
+ */
+void *ceph_san_batch_get(struct ceph_san_batch *batch)
+{
+    struct ceph_san_cpu_magazine *cpu_mag;
+    struct ceph_san_magazine *old_mag, *new_mag;
+    void *element = NULL;
+
+    cpu_mag = this_cpu_ptr(batch->cpu_magazines);
+
+    /* If we have a magazine and it has elements, use it */
+    if (cpu_mag->mag && cpu_mag->mag->count > 0) {
+        element = cpu_mag->mag->elements[--cpu_mag->mag->count];
+        return element;
+    }
+
+    /* Current magazine is empty, try to get a full one */
+    old_mag = cpu_mag->mag;
+
+    /* Return old magazine to empty pool if we have one */
+    if (old_mag) {
+        spin_lock(&batch->empty_lock);
+        list_add(&old_mag->list, &batch->empty_magazines);
+        batch->nr_empty++;
+        spin_unlock(&batch->empty_lock);
+        cpu_mag->mag = NULL;
+    }
+
+    /* Try to get a full magazine first */
+    spin_lock(&batch->full_lock);
+    if (!list_empty(&batch->full_magazines)) {
+        new_mag = list_first_entry(&batch->full_magazines,
+                                 struct ceph_san_magazine, list);
+        list_del(&new_mag->list);
+        batch->nr_full--;
+        spin_unlock(&batch->full_lock);
+
+        cpu_mag->mag = new_mag;
+        element = new_mag->elements[--new_mag->count];
+    } else {
+        spin_unlock(&batch->full_lock);
+        /* No full magazine available, create and fill a new one */
+        new_mag = alloc_magazine(batch, true);
+        if (new_mag && new_mag->count > 0) {
+            cpu_mag->mag = new_mag;
+            element = new_mag->elements[--new_mag->count];
+        } else if (new_mag) {
+            /* Magazine allocated but couldn't be filled */
+            spin_lock(&batch->empty_lock);
+            list_add(&new_mag->list, &batch->empty_magazines);
+            batch->nr_empty++;
+            spin_unlock(&batch->empty_lock);
+        }
+    }
+
+    return element;
+}
+EXPORT_SYMBOL(ceph_san_batch_get);
+
+/**
+ * ceph_san_batch_put - Put an element back into the batch
+ * @batch: Batch to put element into
+ * @element: Element to put back
+ */
+void ceph_san_batch_put(struct ceph_san_batch *batch, void *element)
+{
+    struct ceph_san_cpu_magazine *cpu_mag;
+    struct ceph_san_magazine *old_mag, *new_mag;
+
+    cpu_mag = this_cpu_ptr(batch->cpu_magazines);
+
+    /* If we don't have a magazine, get an empty one */
+    if (!cpu_mag->mag) {
+        spin_lock(&batch->empty_lock);
+        if (!list_empty(&batch->empty_magazines)) {
+            cpu_mag->mag = list_first_entry(&batch->empty_magazines,
+                                          struct ceph_san_magazine, list);
+            list_del(&cpu_mag->mag->list);
+            batch->nr_empty--;
+            spin_unlock(&batch->empty_lock);
+        } else {
+            spin_unlock(&batch->empty_lock);
+            /* No empty magazine available, allocate a new one */
+            cpu_mag->mag = alloc_magazine(batch, false);
+        }
+
+        if (!cpu_mag->mag) {
+            /* If we can't get a magazine, free the element */
+            batch->free_element(element);
+            return;
+        }
+    }
+
+    /* If current magazine isn't full, add to it */
+    if (cpu_mag->mag->count < CEPH_SAN_MAGAZINE_SIZE) {
+        cpu_mag->mag->elements[cpu_mag->mag->count++] = element;
+        return;
+    }
+
+    /* Current magazine is full, move it to the full pool */
+    old_mag = cpu_mag->mag;
+
+    /* Try to get an empty magazine */
+    spin_lock(&batch->empty_lock);
+    if (!list_empty(&batch->empty_magazines)) {
+        new_mag = list_first_entry(&batch->empty_magazines,
+                                 struct ceph_san_magazine, list);
+        list_del(&new_mag->list);
+        batch->nr_empty--;
+        spin_unlock(&batch->empty_lock);
+    } else {
+        spin_unlock(&batch->empty_lock);
+        new_mag = alloc_magazine(batch, false);
+    }
+
+    if (new_mag) {
+        /* Move full magazine to full pool */
+        spin_lock(&batch->full_lock);
+        list_add(&old_mag->list, &batch->full_magazines);
+        batch->nr_full++;
+        spin_unlock(&batch->full_lock);
+
+        /* Use new magazine */
+        cpu_mag->mag = new_mag;
+        new_mag->elements[new_mag->count++] = element;
+    } else {
+        /* Failed to get new magazine, free the element */
+        batch->free_element(element);
+    }
+}
+EXPORT_SYMBOL(ceph_san_batch_put);
diff --git a/net/ceph/ceph_san_pagefrag.c b/net/ceph/ceph_san_pagefrag.c
new file mode 100644 (file)
index 0000000..7557721
--- /dev/null
@@ -0,0 +1,128 @@
+#include <linux/slab.h>
+#include <linux/mm.h>
+#include <linux/module.h>
+#include <linux/ceph/ceph_san_pagefrag.h>
+
+/**
+ * cephsan_pagefrag_init - Initialize the pagefrag allocator.
+ *
+ * Allocates a 4MB contiguous buffer and resets head and tail pointers.
+ *
+ * Return: 0 on success, negative error code on failure.
+ */
+int cephsan_pagefrag_init(struct cephsan_pagefrag *pf)
+{
+    pf->pages = alloc_pages(GFP_KERNEL, get_order(CEPHSAN_PAGEFRAG_SIZE));
+    if (!pf->pages)
+        return -ENOMEM;
+
+    pf->buffer = page_address(pf->pages);
+    pf->head = 0;
+    pf->tail = 0;
+    return 0;
+}
+EXPORT_SYMBOL(cephsan_pagefrag_init);
+
+/**
+ * cephsan_pagefrag_init_with_buffer - Initialize pagefrag with an existing buffer
+ * @pf: pagefrag allocator to initialize
+ * @buffer: pre-allocated buffer to use
+ * @size: size of the buffer
+ *
+ * Return: 0 on success
+ */
+int cephsan_pagefrag_init_with_buffer(struct cephsan_pagefrag *pf, void *buffer, size_t size)
+{
+    pf->pages = NULL; /* No pages allocated, using provided buffer */
+    pf->buffer = buffer;
+    pf->head = 0;
+    pf->tail = 0;
+    return 0;
+}
+EXPORT_SYMBOL(cephsan_pagefrag_init_with_buffer);
+
+/**
+ * cephsan_pagefrag_alloc - Allocate bytes from the pagefrag buffer.
+ * @n: number of bytes to allocate.
+ *
+ * Allocates @n bytes if there is sufficient free space in the buffer.
+ * Advances the head pointer by @n bytes (wrapping around if needed).
+ *
+ * Return: pointer to the allocated memory, or NULL if not enough space.
+ */
+u64 cephsan_pagefrag_alloc(struct cephsan_pagefrag *pf, unsigned int n)
+{
+    /* Case 1: tail > head */
+    if (pf->tail > pf->head) {
+        if (pf->tail - pf->head >= n) {
+            unsigned int prev_head = pf->head;
+            pf->head += n;
+            return ((u64)n << 32) | prev_head;
+        } else {
+            pr_err("Not enough space in pagefrag buffer\n");
+            return 0;
+        }
+    }
+    /* Case 2: tail <= head */
+    if (pf->head + n <= CEPHSAN_PAGEFRAG_SIZE) {
+        /* Normal allocation */
+        unsigned int prev_head = pf->head;
+        pf->head += n;
+        return ((u64)n << 32) | prev_head;
+    } else {
+        /* Need to wrap around */
+        if (n <= pf->tail) {
+            pf->head = n;
+            n += CEPHSAN_PAGEFRAG_SIZE - pf->head;
+            return ((u64)n << 32) | 0;
+        } else {
+            pr_err("Not enough space for wrap-around allocation\n");
+            return 0;
+        }
+    }
+    pr_err("impossible: Not enough space in pagefrag buffer\n");
+    return 0;
+}
+EXPORT_SYMBOL(cephsan_pagefrag_alloc);
+
+/**
+ * cephsan_pagefrag_get_ptr - Get buffer pointer from pagefrag allocation result
+ * @pf: pagefrag allocator
+ * @val: return value from cephsan_pagefrag_alloc
+ *
+ * Return: pointer to allocated buffer region
+ */
+void *cephsan_pagefrag_get_ptr(struct cephsan_pagefrag *pf, u64 val)
+{
+    return pf->buffer + (val & 0xFFFFFFFF);
+}
+EXPORT_SYMBOL(cephsan_pagefrag_get_ptr);
+
+/**
+ * cephsan_pagefrag_free - Free bytes in the pagefrag allocator.
+ * @n: number of bytes to free.
+ *
+ * Advances the tail pointer by @n bytes (wrapping around if needed).
+ */
+void cephsan_pagefrag_free(struct cephsan_pagefrag *pf, unsigned int n)
+{
+    pf->tail = (pf->tail + n) & (CEPHSAN_PAGEFRAG_SIZE - 1);
+}
+EXPORT_SYMBOL(cephsan_pagefrag_free);
+
+/**
+ * cephsan_pagefrag_deinit - Deinitialize the pagefrag allocator.
+ *
+ * Frees the allocated buffer and resets the head and tail pointers.
+ */
+void cephsan_pagefrag_deinit(struct cephsan_pagefrag *pf)
+{
+    if (pf->pages) {
+        free_pages((unsigned long)pf->pages, get_order(CEPHSAN_PAGEFRAG_SIZE));
+        pf->pages = NULL;
+    }
+    /* Don't free buffer if it was provided externally */
+    pf->buffer = NULL;
+    pf->head = pf->tail = 0;
+}
+EXPORT_SYMBOL(cephsan_pagefrag_deinit);