btrfs: add test for zone auto reclaim
[xfstests-dev.git] / ltp / aio-stress.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
4  */
5
6 /*
7  * aio-stress
8  *
9  * will open or create each file on the command line, and start a series
10  * of aio to it.  
11  *
12  * aio is done in a rotating loop.  first file1 gets 8 requests, then
13  * file2, then file3 etc.  As each file finishes writing, it is switched
14  * to reads
15  *
16  * io buffers are aligned in case you want to do raw io
17  *
18  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
19  *
20  * run aio-stress -h to see the options
21  *
22  * Please mail Chris Mason (mason@suse.com) with bug reports or patches
23  */
24 #define _FILE_OFFSET_BITS 64
25 #define PROG_VERSION "0.21"
26 #define NEW_GETEVENTS
27
28 #include <stdio.h>
29 #include <errno.h>
30 #include <assert.h>
31 #include <stdlib.h>
32
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <fcntl.h>
36 #include <unistd.h>
37 #include <sys/time.h>
38 #include <libaio.h>
39 #include <sys/ipc.h>
40 #include <sys/shm.h>
41 #include <sys/mman.h>
42 #include <string.h>
43 #include <pthread.h>
44
45 #define IO_FREE 0
46 #define IO_PENDING 1
47 #define RUN_FOREVER -1
48
49 #ifndef O_DIRECT
50 #define O_DIRECT         040000 /* direct disk access hint */
51 #endif
52
53 enum {
54     WRITE,
55     READ,
56     RWRITE,
57     RREAD,
58     LAST_STAGE,
59 };
60
61 #define USE_MALLOC 0
62 #define USE_SHM 1
63 #define USE_SHMFS 2
64
65 /* 
66  * various globals, these are effectively read only by the time the threads
67  * are started
68  */
69 long stages = 0;
70 unsigned long page_size_mask;
71 int o_direct = 0;
72 int o_sync = 0;
73 int latency_stats = 0;
74 int completion_latency_stats = 0;
75 int io_iter = 8;
76 int iterations = RUN_FOREVER;
77 int max_io_submit = 0;
78 long rec_len = 64 * 1024;
79 int depth = 64;
80 int num_threads = 1;
81 int num_contexts = 1;
82 off_t context_offset = 2 * 1024 * 1024;
83 int fsync_stages = 1;
84 int use_shm = 0;
85 int shm_id;
86 char *unaligned_buffer = NULL;
87 char *aligned_buffer = NULL;
88 int padded_reclen = 0;
89 int stonewall = 1;
90 int verify = 0;
91 char *verify_buf = NULL;
92 int unlink_files = 0;
93
94 struct io_unit;
95 struct thread_info;
96
97 /* pthread mutexes and other globals for keeping the threads in sync */
98 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
99 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
100 int threads_ending = 0;
101 int threads_starting = 0;
102 struct timeval global_stage_start_time;
103 struct thread_info *global_thread_info;
104
105 /* 
106  * latencies during io_submit are measured, these are the 
107  * granularities for deviations 
108  */
109 #define DEVIATIONS 6
110 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
111 struct io_latency {
112     double max;
113     double min;
114     double total_io;
115     double total_lat;
116     double deviations[DEVIATIONS]; 
117 };
118
119 /* container for a series of operations to a file */
120 struct io_oper {
121     /* already open file descriptor, valid for whatever operation you want */
122     int fd;
123
124     /* starting byte of the operation */
125     off_t start;
126
127     /* ending byte of the operation */
128     off_t end;
129
130     /* size of the read/write buffer */
131     int reclen;
132
133     /* max number of pending requests before a wait is triggered */
134     int depth;
135
136     /* current number of pending requests */
137     int num_pending;
138
139     /* last error, zero if there were none */
140     int last_err;
141
142     /* total number of errors hit. */
143     int num_err;
144
145     /* read,write, random, etc */
146     int rw;
147
148     /* number of ios that will get sent to aio */
149     int total_ios;
150
151     /* number of ios we've already sent */
152     int started_ios;
153
154     /* last offset used in an io operation */
155     off_t last_offset;
156
157     /* stonewalled = 1 when we got cut off before submitting all our ios */
158     int stonewalled;
159
160     /* list management */
161     struct io_oper *next;
162     struct io_oper *prev;
163
164     struct timeval start_time;
165
166     char *file_name;
167 };
168
169 /* a single io, and all the tracking needed for it */
170 struct io_unit {
171     /* note, iocb must go first! */
172     struct iocb iocb;
173
174     /* pointer to parent io operation struct */
175     struct io_oper *io_oper;
176
177     /* aligned buffer */
178     char *buf;
179
180     /* size of the aligned buffer (record size) */
181     int buf_size;
182
183     /* state of this io unit (free, pending, done) */
184     int busy;
185
186     /* result of last operation */
187     long res;
188
189     struct io_unit *next;
190
191     struct timeval io_start_time;               /* time of io_submit */
192 };
193
194 struct thread_info {
195     io_context_t io_ctx;
196     pthread_t tid;
197
198     /* allocated array of io_unit structs */
199     struct io_unit *ios;
200
201     /* list of io units available for io */
202     struct io_unit *free_ious;
203
204     /* number of io units in the ios array */
205     int num_global_ios;
206
207     /* number of io units in flight */
208     int num_global_pending;
209
210     /* preallocated array of iocb pointers, only used in run_active */
211     struct iocb **iocbs;
212
213     /* preallocated array of events */
214     struct io_event *events;
215
216     /* size of the events array */
217     int num_global_events;
218
219     /* latency stats for io_submit */
220     struct io_latency io_submit_latency;
221
222     /* list of operations still in progress, and of those finished */
223     struct io_oper *active_opers;
224     struct io_oper *finished_opers;
225
226     /* number of files this thread is doing io on */
227     int num_files;
228
229     /* how much io this thread did in the last stage */
230     double stage_mb_trans;
231
232     /* latency completion stats i/o time from io_submit until io_getevents */
233     struct io_latency io_completion_latency;
234 };
235
236 /*
237  * return seconds between start_tv and stop_tv in double precision
238  */
239 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
240 {
241     double sec, usec;
242     double ret;
243     sec = stop_tv->tv_sec - start_tv->tv_sec;
244     usec = stop_tv->tv_usec - start_tv->tv_usec;
245     if (sec > 0 && usec < 0) {
246         sec--;
247         usec += 1000000;
248     } 
249     ret = sec + usec / (double)1000000;
250     if (ret < 0)
251         ret = 0;
252     return ret;
253 }
254
255 /*
256  * return seconds between start_tv and now in double precision
257  */
258 static double time_since_now(struct timeval *start_tv)
259 {
260     struct timeval stop_time;
261     gettimeofday(&stop_time, NULL);
262     return time_since(start_tv, &stop_time);
263 }
264
265 /*
266  * Add latency info to latency struct 
267  */
268 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
269                         struct io_latency *lat)
270 {
271     double delta;
272     int i;
273     delta = time_since(start_tv, stop_tv);
274     delta = delta * 1000;
275
276     if (delta > lat->max)
277         lat->max = delta;
278     if (!lat->min || delta < lat->min)
279         lat->min = delta;
280     lat->total_io++;
281     lat->total_lat += delta;
282     for (i = 0 ; i < DEVIATIONS ; i++) {
283         if (delta < deviations[i]) {
284             lat->deviations[i]++;
285             break;
286         }
287     }
288 }
289
290 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
291 {
292     if (!*list) {
293         *list = oper;
294         oper->prev = oper->next = oper;
295         return;
296     }
297     oper->prev = (*list)->prev;
298     oper->next = *list;
299     (*list)->prev->next = oper;
300     (*list)->prev = oper;
301     return;
302 }
303
304 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
305 {
306     if ((*list)->next == (*list)->prev && *list == (*list)->next) {
307         *list = NULL;
308         return;
309     }
310     oper->prev->next = oper->next;
311     oper->next->prev = oper->prev;
312     if (*list == oper)
313         *list = oper->next;
314 }
315
316 /* worker func to check error fields in the io unit */
317 static int check_finished_io(struct io_unit *io) {
318     int i;
319     if (io->res != io->buf_size) {
320
321                  struct stat s;
322                  fstat(io->io_oper->fd, &s);
323   
324                  /*
325                   * If file size is large enough for the read, then this short
326                   * read is an error.
327                   */
328                  if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
329                      s.st_size > (io->iocb.u.c.offset + io->res)) {
330   
331                                  fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
332                                                  io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
333                                                  io->iocb.u.c.offset, io->buf_size);
334                                  io->io_oper->last_err = io->res;
335                                  io->io_oper->num_err++;
336                                  return -1;
337                  }
338     }
339     if (verify && io->io_oper->rw == READ) {
340         if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
341             fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 
342                     io->io_oper->file_name, io->iocb.u.c.offset);
343             
344             for (i = 0 ; i < io->io_oper->reclen ; i++) {
345                 if (io->buf[i] != verify_buf[i]) {
346                     fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
347                 }
348             }
349             fprintf(stderr, "\n");
350         }
351
352     }
353     return 0;
354 }
355
356 /* worker func to check the busy bits and get an io unit ready for use */
357 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
358     if (io->busy == IO_PENDING)
359         return -1;
360
361     io->busy = IO_PENDING;
362     io->res = 0;
363     io->io_oper = oper;
364     return 0;
365 }
366
367 char *stage_name(int rw) {
368     switch(rw) {
369     case WRITE:
370         return "write";
371     case READ:
372         return "read";
373     case RWRITE:
374         return "random write";
375     case RREAD:
376         return "random read";
377     }
378     return "unknown";
379 }
380
381 static inline double oper_mb_trans(struct io_oper *oper) {
382     return ((double)oper->started_ios * (double)oper->reclen) /
383                 (double)(1024 * 1024);
384 }
385
386 static void print_time(struct io_oper *oper) {
387     double runtime;
388     double tput;
389     double mb;
390
391     runtime = time_since_now(&oper->start_time); 
392     mb = oper_mb_trans(oper);
393     tput = mb / runtime;
394     fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 
395             stage_name(oper->rw), oper->file_name, tput, mb, runtime);
396 }
397
398 static void print_lat(char *str, struct io_latency *lat) {
399     double avg = lat->total_lat / lat->total_io;
400     int i;
401     double total_counted = 0;
402     fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", 
403             str, lat->min, avg, lat->max);
404
405     for (i = 0 ; i < DEVIATIONS ; i++) {
406         fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
407         total_counted += lat->deviations[i];
408     }
409     if (total_counted && lat->total_io - total_counted)
410         fprintf(stderr, " < %.0f", lat->total_io - total_counted);
411     fprintf(stderr, "\n");
412     memset(lat, 0, sizeof(*lat));
413 }
414
415 static void print_latency(struct thread_info *t)
416 {
417     struct io_latency *lat = &t->io_submit_latency;
418     print_lat("latency", lat);
419 }
420
421 static void print_completion_latency(struct thread_info *t)
422 {
423     struct io_latency *lat = &t->io_completion_latency;
424     print_lat("completion latency", lat);
425 }
426
427 /*
428  * updates the fields in the io operation struct that belongs to this
429  * io unit, and make the io unit reusable again
430  */
431 void finish_io(struct thread_info *t, struct io_unit *io, long result,
432                 struct timeval *tv_now) {
433     struct io_oper *oper = io->io_oper;
434
435     calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
436     io->res = result;
437     io->busy = IO_FREE;
438     io->next = t->free_ious;
439     t->free_ious = io;
440     oper->num_pending--;
441     t->num_global_pending--;
442     check_finished_io(io);
443     if (oper->num_pending == 0 && 
444        (oper->started_ios == oper->total_ios || oper->stonewalled)) 
445     {
446         print_time(oper);
447     } 
448 }
449
450 int read_some_events(struct thread_info *t) {
451     struct io_unit *event_io;
452     struct io_event *event;
453     int nr;
454     int i; 
455     int min_nr = io_iter;
456     struct timeval stop_time;
457
458     if (t->num_global_pending < io_iter)
459         min_nr = t->num_global_pending;
460
461 #ifdef NEW_GETEVENTS
462     nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
463 #else
464     nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
465 #endif
466     if (nr <= 0)
467         return nr;
468
469     gettimeofday(&stop_time, NULL);
470     for (i = 0 ; i < nr ; i++) {
471         event = t->events + i;
472         event_io = (struct io_unit *)((unsigned long)event->obj); 
473         finish_io(t, event_io, event->res, &stop_time);
474     }
475     return nr;
476 }
477
478 /* 
479  * finds a free io unit, waiting for pending requests if required.  returns
480  * null if none could be found
481  */
482 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
483 {
484     struct io_unit *event_io;
485     int nr;
486
487 retry:
488     if (t->free_ious) {
489         event_io = t->free_ious;
490         t->free_ious = t->free_ious->next;
491         if (grab_iou(event_io, oper)) {
492             fprintf(stderr, "io unit on free list but not free\n");
493             abort();
494         }
495         return event_io;
496     }
497     nr = read_some_events(t);
498     if (nr > 0)
499         goto retry;
500     else
501         fprintf(stderr, "no free ious after read_some_events\n");
502     return NULL;
503 }
504
505 /*
506  * wait for all pending requests for this io operation to finish
507  */
508 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
509     struct io_event event;
510     struct io_unit *event_io;
511
512     if (oper == NULL) {
513         return 0;
514     }
515
516     if (oper->num_pending == 0)
517         goto done;
518
519     /* this func is not speed sensitive, no need to go wild reading
520      * more than one event at a time
521      */
522 #ifdef NEW_GETEVENTS
523     while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
524 #else
525     while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
526 #endif
527         struct timeval tv_now;
528         event_io = (struct io_unit *)((unsigned long)event.obj); 
529
530         gettimeofday(&tv_now, NULL);
531         finish_io(t, event_io, event.res, &tv_now);
532
533         if (oper->num_pending == 0)
534             break;
535     }
536 done:
537     if (oper->num_err) {
538         fprintf(stderr, "%u errors on oper, last %u\n", 
539                 oper->num_err, oper->last_err);
540     }
541     return 0;
542 }
543
544 off_t random_byte_offset(struct io_oper *oper) {
545     off_t num;
546     off_t rand_byte = oper->start;
547     off_t range;
548     off_t offset = 1;
549
550     range = (oper->end - oper->start) / (1024 * 1024);
551     if ((page_size_mask+1) > (1024 * 1024))
552         offset = (page_size_mask+1) / (1024 * 1024);
553     if (range < offset)
554         range = 0;
555     else
556         range -= offset;
557
558     /* find a random mb offset */
559     num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
560     rand_byte += num * 1024 * 1024;
561     
562     /* find a random byte offset */
563     num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
564
565     /* page align */
566     num = (num + page_size_mask) & ~page_size_mask;
567     rand_byte += num;
568
569     if (rand_byte + oper->reclen > oper->end) {
570         rand_byte -= oper->reclen;
571     }
572     return rand_byte;
573 }
574
575 /* 
576  * build an aio iocb for an operation, based on oper->rw and the
577  * last offset used.  This finds the struct io_unit that will be attached
578  * to the iocb, and things are ready for submission to aio after this
579  * is called.
580  *
581  * returns null on error
582  */
583 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
584 {
585     struct io_unit *io;
586     off_t rand_byte;
587
588     io = find_iou(t, oper);
589     if (!io) {
590         fprintf(stderr, "unable to find io unit\n");
591         return NULL;
592     }
593
594     switch(oper->rw) {
595     case WRITE:
596         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
597                        oper->last_offset);
598         oper->last_offset += oper->reclen;
599         break;
600     case READ:
601         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
602                       oper->last_offset);
603         oper->last_offset += oper->reclen;
604         break;
605     case RREAD:
606         rand_byte = random_byte_offset(oper);
607         oper->last_offset = rand_byte;
608         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
609                       rand_byte);
610         break;
611     case RWRITE:
612         rand_byte = random_byte_offset(oper);
613         oper->last_offset = rand_byte;
614         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
615                       rand_byte);
616         
617         break;
618     }
619
620     return io;
621 }
622
623 /* 
624  * wait for any pending requests, and then free all ram associated with
625  * an operation.  returns the last error the operation hit (zero means none)
626  */
627 static int
628 finish_oper(struct thread_info *t, struct io_oper *oper)
629 {
630     unsigned long last_err;
631
632     io_oper_wait(t, oper);
633     last_err = oper->last_err;
634     if (oper->num_pending > 0) {
635         fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
636     }
637     close(oper->fd);
638     free(oper);
639     return last_err;
640 }
641
642 /* 
643  * allocates an io operation and fills in all the fields.  returns
644  * null on error
645  */
646 static struct io_oper * 
647 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
648             int iter, char *file_name)
649 {
650     struct io_oper *oper;
651
652     oper = malloc (sizeof(*oper));
653     if (!oper) {
654         fprintf(stderr, "unable to allocate io oper\n");
655         return NULL;
656     }
657     memset(oper, 0, sizeof(*oper));
658
659     oper->depth = depth;
660     oper->start = start;
661     oper->end = end;
662     oper->last_offset = oper->start;
663     oper->fd = fd;
664     oper->reclen = reclen;
665     oper->rw = rw;
666     oper->total_ios = (oper->end - oper->start) / oper->reclen;
667     oper->file_name = file_name;
668
669     return oper;
670 }
671
672 /*
673  * does setup on num_ios worth of iocbs, but does not actually
674  * start any io
675  */
676 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 
677                struct iocb **my_iocbs) 
678 {
679     int i;
680     struct io_unit *io;
681
682     if (oper->started_ios == 0)
683         gettimeofday(&oper->start_time, NULL);
684
685     if (num_ios == 0)
686         num_ios = oper->total_ios;
687
688     if ((oper->started_ios + num_ios) > oper->total_ios)
689         num_ios = oper->total_ios - oper->started_ios;   
690
691     for( i = 0 ; i < num_ios ; i++) {
692         io = build_iocb(t, oper);
693         if (!io) {
694             return -1;    
695         }
696         my_iocbs[i] = &io->iocb;
697     }
698     return num_ios;
699 }
700
701 /*
702  * runs through the iocbs in the array provided and updates
703  * counters in the associated oper struct
704  */
705 static void update_iou_counters(struct iocb **my_iocbs, int nr,
706         struct timeval *tv_now) 
707 {
708     struct io_unit *io;
709     int i;
710     for (i = 0 ; i < nr ; i++) {
711         io = (struct io_unit *)(my_iocbs[i]);
712         io->io_oper->num_pending++;
713         io->io_oper->started_ios++;
714         io->io_start_time = *tv_now;    /* set time of io_submit */
715     }
716 }
717
718 /* starts some io for a given file, returns zero if all went well */
719 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 
720 {
721     int ret;
722     struct timeval start_time;
723     struct timeval stop_time;
724
725 resubmit:
726     gettimeofday(&start_time, NULL);
727     ret = io_submit(t->io_ctx, num_ios, my_iocbs);
728     gettimeofday(&stop_time, NULL);
729     calc_latency(&start_time, &stop_time, &t->io_submit_latency);
730
731     if (ret != num_ios) {
732         /* some ios got through */
733         if (ret > 0) {
734             update_iou_counters(my_iocbs, ret, &stop_time);
735             my_iocbs += ret;
736             t->num_global_pending += ret;
737             num_ios -= ret;
738         }
739         /* 
740          * we've used all the requests allocated in aio_init, wait and
741          * retry
742          */
743         if (ret > 0 || ret == -EAGAIN) {
744             int old_ret = ret;
745             if ((ret = read_some_events(t) > 0)) {
746                 goto resubmit;
747             } else {
748                 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
749                 abort();
750             }
751         }
752
753         fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
754         return -1;
755     }
756     update_iou_counters(my_iocbs, ret, &stop_time);
757     t->num_global_pending += ret;
758     return 0;
759 }
760
761 /* 
762  * changes oper->rw to the next in a command sequence, or returns zero
763  * to say this operation is really, completely done for
764  */
765 static int restart_oper(struct io_oper *oper) {
766     int new_rw  = 0;
767     if (oper->last_err)
768         return 0;
769
770     /* this switch falls through */
771     switch(oper->rw) {
772     case WRITE:
773         if (stages & (1 << READ))
774             new_rw = READ;
775     case READ:
776         if (!new_rw && stages & (1 << RWRITE))
777             new_rw = RWRITE;
778     case RWRITE:
779         if (!new_rw && stages & (1 << RREAD))
780             new_rw = RREAD;
781     }
782
783     if (new_rw) {
784         oper->started_ios = 0;
785         oper->last_offset = oper->start;
786         oper->stonewalled = 0;
787
788         /* 
789          * we're restarting an operation with pending requests, so the
790          * timing info won't be printed by finish_io.  Printing it here
791          */
792         if (oper->num_pending)
793             print_time(oper);
794
795         oper->rw = new_rw;
796         return 1;
797     } 
798     return 0;
799 }
800
801 static int oper_runnable(struct io_oper *oper) {
802     struct stat buf;
803     int ret;
804
805     /* first context is always runnable, if started_ios > 0, no need to
806      * redo the calculations
807      */
808     if (oper->started_ios || oper->start == 0)
809         return 1;
810     /*
811      * only the sequential phases force delays in starting */
812     if (oper->rw >= RWRITE)
813         return 1;
814     ret = fstat(oper->fd, &buf);
815     if (ret < 0) {
816         perror("fstat");
817         exit(1);
818     }
819     if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
820         return 0;
821     return 1;
822 }
823
824 /*
825  * runs through all the io operations on the active list, and starts
826  * a chunk of io on each.  If any io operations are completely finished,
827  * it either switches them to the next stage or puts them on the 
828  * finished list.
829  *
830  * this function stops after max_io_submit iocbs are sent down the 
831  * pipe, even if it has not yet touched all the operations on the 
832  * active list.  Any operations that have finished are moved onto
833  * the finished_opers list.
834  */
835 static int run_active_list(struct thread_info *t,
836                          int io_iter,
837                          int max_io_submit)
838 {
839     struct io_oper *oper;
840     struct io_oper *built_opers = NULL;
841     struct iocb **my_iocbs = t->iocbs;
842     int ret = 0;
843     int num_built = 0;
844
845     oper = t->active_opers;
846     while(oper) {
847         if (!oper_runnable(oper)) {
848             oper = oper->next;
849             if (oper == t->active_opers)
850                 break;
851             continue;
852         }
853         ret = build_oper(t, oper, io_iter, my_iocbs);
854         if (ret >= 0) {
855             my_iocbs += ret;
856             num_built += ret;
857             oper_list_del(oper, &t->active_opers);
858             oper_list_add(oper, &built_opers);
859             oper = t->active_opers;
860             if (num_built + io_iter > max_io_submit)
861                 break;
862         } else
863             break;
864     }
865     if (num_built) {
866         ret = run_built(t, num_built, t->iocbs);
867         if (ret < 0) {
868             fprintf(stderr, "error %d on run_built\n", ret);
869             exit(1);
870         }
871         while(built_opers) {
872             oper = built_opers;
873             oper_list_del(oper, &built_opers);
874             oper_list_add(oper, &t->active_opers);
875             if (oper->started_ios == oper->total_ios) {
876                 oper_list_del(oper, &t->active_opers);
877                 oper_list_add(oper, &t->finished_opers);
878             }
879         }
880     }
881     return 0;
882 }
883
884 void drop_shm() {
885     int ret;
886     struct shmid_ds ds;
887     if (use_shm != USE_SHM)
888         return;
889
890     ret = shmctl(shm_id, IPC_RMID, &ds);
891     if (ret) {
892         perror("shmctl IPC_RMID");
893     }
894 }
895
896 void aio_setup(io_context_t *io_ctx, int n)
897 {
898     int res = io_queue_init(n, io_ctx);
899     if (res != 0) {
900         fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
901                 n, res, strerror(-res));
902         exit(3);
903     }
904 }
905
906 /*
907  * allocate io operation and event arrays for a given thread
908  */
909 int setup_ious(struct thread_info *t, 
910               int num_files, int depth, 
911               int reclen, int max_io_submit) {
912     int i;
913     size_t bytes = num_files * depth * sizeof(*t->ios);
914
915     t->ios = malloc(bytes);
916     if (!t->ios) {
917         fprintf(stderr, "unable to allocate io units\n");
918         return -1;
919     }
920     memset(t->ios, 0, bytes);
921
922     for (i = 0 ; i < depth * num_files; i++) {
923         t->ios[i].buf = aligned_buffer;
924         aligned_buffer += padded_reclen;
925         t->ios[i].buf_size = reclen;
926         if (verify)
927             memset(t->ios[i].buf, 'b', reclen);
928         else
929             memset(t->ios[i].buf, 0, reclen);
930         t->ios[i].next = t->free_ious;
931         t->free_ious = t->ios + i;
932     }
933     if (verify) {
934         verify_buf = aligned_buffer;
935         memset(verify_buf, 'b', reclen);
936     }
937
938     t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
939     if (!t->iocbs) {
940         fprintf(stderr, "unable to allocate iocbs\n");
941         goto free_buffers;
942     }
943
944     memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
945
946     t->events = malloc(sizeof(struct io_event) * depth * num_files);
947     if (!t->events) {
948         fprintf(stderr, "unable to allocate ram for events\n");
949         goto free_buffers;
950     }
951     memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
952
953     t->num_global_ios = num_files * depth;
954     t->num_global_events = t->num_global_ios;
955     return 0;
956
957 free_buffers:
958     if (t->ios)
959         free(t->ios);
960     if (t->iocbs)
961         free(t->iocbs);  
962     if (t->events)
963         free(t->events);
964     return -1;
965 }
966
967 /*
968  * The buffers used for file data are allocated as a single big
969  * malloc, and then each thread and operation takes a piece and uses
970  * that for file data.  This lets us do a large shm or bigpages alloc
971  * and without trying to find a special place in each thread to map the
972  * buffers to
973  */
974 int setup_shared_mem(int num_threads, int num_files, int depth, 
975                      int reclen, int max_io_submit) 
976 {
977     char *p = NULL;
978     size_t total_ram;
979     
980     padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
981     padded_reclen = padded_reclen * (page_size_mask+1);
982     total_ram = num_files * depth * padded_reclen + num_threads;
983     if (verify)
984         total_ram += padded_reclen;
985
986     if (use_shm == USE_MALLOC) {
987         p = malloc(total_ram + page_size_mask);
988     } else if (use_shm == USE_SHM) {
989         shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
990         if (shm_id < 0) {
991             perror("shmget");
992             drop_shm();
993             goto free_buffers;
994         }
995         p = shmat(shm_id, (char *)0x50000000, 0);
996         if ((long)p == -1) {
997             perror("shmat");
998             goto free_buffers;
999         }
1000         /* won't really be dropped until we shmdt */
1001         drop_shm();
1002     } else if (use_shm == USE_SHMFS) {
1003         char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */    
1004         int fd;
1005
1006         strcpy(mmap_name, "/dev/shm/XXXXXX");
1007         fd = mkstemp(mmap_name);
1008         if (fd < 0) {
1009             perror("mkstemp");
1010             goto free_buffers;
1011         }
1012         unlink(mmap_name);
1013         ftruncate(fd, total_ram);
1014         shm_id = fd;
1015         p = mmap((char *)0x50000000, total_ram,
1016                  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1017
1018         if (p == MAP_FAILED) {
1019             perror("mmap");
1020             goto free_buffers;
1021         }
1022     }
1023     if (!p) {
1024         fprintf(stderr, "unable to allocate buffers\n");
1025         goto free_buffers;
1026     }
1027     unaligned_buffer = p;
1028     p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1029     aligned_buffer = p;
1030     return 0;
1031
1032 free_buffers:
1033     drop_shm();
1034     if (unaligned_buffer)
1035         free(unaligned_buffer);
1036     return -1;
1037 }
1038
1039 /*
1040  * runs through all the thread_info structs and calculates a combined
1041  * throughput
1042  */
1043 void global_thread_throughput(struct thread_info *t, char *this_stage) {
1044     int i;
1045     double runtime = time_since_now(&global_stage_start_time);
1046     double total_mb = 0;
1047     double min_trans = 0;
1048
1049     for (i = 0 ; i < num_threads ; i++) {
1050         total_mb += global_thread_info[i].stage_mb_trans;
1051         if (!min_trans || t->stage_mb_trans < min_trans)
1052             min_trans = t->stage_mb_trans;
1053     }
1054     if (total_mb) {
1055         fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1056                 total_mb / runtime);
1057         fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1058         if (stonewall)
1059             fprintf(stderr, " min transfer %.2fMB", min_trans);
1060         fprintf(stderr, "\n");
1061     }
1062 }
1063
1064
1065 /* this is the meat of the state machine.  There is a list of
1066  * active operations structs, and as each one finishes the required
1067  * io it is moved to a list of finished operations.  Once they have
1068  * all finished whatever stage they were in, they are given the chance
1069  * to restart and pick a different stage (read/write/random read etc)
1070  *
1071  * various timings are printed in between the stages, along with
1072  * thread synchronization if there are more than one threads.
1073  */
1074 int worker(struct thread_info *t)
1075 {
1076     struct io_oper *oper;
1077     char *this_stage = NULL;
1078     struct timeval stage_time;
1079     int status = 0;
1080     int iteration = 0;
1081     int cnt;
1082
1083     aio_setup(&t->io_ctx, 512);
1084
1085 restart:
1086     if (num_threads > 1) {
1087         pthread_mutex_lock(&stage_mutex);
1088         threads_starting++;
1089         if (threads_starting == num_threads) {
1090             threads_ending = 0;
1091             gettimeofday(&global_stage_start_time, NULL);
1092             pthread_cond_broadcast(&stage_cond);
1093         }
1094         while (threads_starting != num_threads)
1095             pthread_cond_wait(&stage_cond, &stage_mutex);
1096         pthread_mutex_unlock(&stage_mutex);
1097     }
1098     if (t->active_opers) {
1099         this_stage = stage_name(t->active_opers->rw);
1100         gettimeofday(&stage_time, NULL);
1101         t->stage_mb_trans = 0;
1102     }
1103
1104     cnt = 0;
1105     /* first we send everything through aio */
1106     while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1107         if (stonewall && threads_ending) {
1108             oper = t->active_opers;
1109             oper->stonewalled = 1;
1110             oper_list_del(oper, &t->active_opers);
1111             oper_list_add(oper, &t->finished_opers);
1112         } else {
1113             run_active_list(t, io_iter,  max_io_submit);
1114         }
1115         cnt++;
1116     }
1117     if (latency_stats)
1118         print_latency(t);
1119
1120     if (completion_latency_stats)
1121         print_completion_latency(t);
1122
1123     /* then we wait for all the operations to finish */
1124     oper = t->finished_opers;
1125     do {
1126         if (!oper)
1127                 break;
1128         io_oper_wait(t, oper);
1129         oper = oper->next;
1130     } while(oper != t->finished_opers);
1131
1132     /* then we do an fsync to get the timing for any future operations
1133      * right, and check to see if any of these need to get restarted
1134      */
1135     oper = t->finished_opers;
1136     while(oper) {
1137         if (fsync_stages)
1138             fsync(oper->fd);
1139         t->stage_mb_trans += oper_mb_trans(oper);
1140         if (restart_oper(oper)) {
1141             oper_list_del(oper, &t->finished_opers);
1142             oper_list_add(oper, &t->active_opers);
1143             oper = t->finished_opers;
1144             continue;
1145         }
1146         oper = oper->next;
1147         if (oper == t->finished_opers)
1148             break;
1149     } 
1150
1151     if (t->stage_mb_trans && t->num_files > 0) {
1152         double seconds = time_since_now(&stage_time);
1153         fprintf(stderr, "thread %llu %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1154                 (unsigned long long)(t - global_thread_info), this_stage,
1155                 t->stage_mb_trans/seconds, t->stage_mb_trans, seconds);
1156     }
1157
1158     if (num_threads > 1) {
1159         pthread_mutex_lock(&stage_mutex);
1160         threads_ending++;
1161         if (threads_ending == num_threads) {
1162             threads_starting = 0;
1163             pthread_cond_broadcast(&stage_cond);
1164             global_thread_throughput(t, this_stage);
1165         }
1166         while(threads_ending != num_threads)
1167             pthread_cond_wait(&stage_cond, &stage_mutex);
1168         pthread_mutex_unlock(&stage_mutex);
1169     }
1170     
1171     /* someone got restarted, go back to the beginning */
1172     if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1173         iteration++;
1174         goto restart;
1175     }
1176
1177     /* finally, free all the ram */
1178     while(t->finished_opers) {
1179         oper = t->finished_opers;
1180         oper_list_del(oper, &t->finished_opers);
1181         status = finish_oper(t, oper);
1182     }
1183
1184     if (t->num_global_pending) {
1185         fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1186     }
1187     io_queue_release(t->io_ctx);
1188     
1189     return status;
1190 }
1191
1192 typedef void * (*start_routine)(void *);
1193 int run_workers(struct thread_info *t, int num_threads)
1194 {
1195     int ret;
1196     int i;
1197
1198     for(i = 0 ; i < num_threads ; i++) {
1199         ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1200         if (ret) {
1201             perror("pthread_create");
1202             exit(1);
1203         }
1204     }
1205     for(i = 0 ; i < num_threads ; i++) {
1206         ret = pthread_join(t[i].tid, NULL);
1207         if (ret) {
1208             perror("pthread_join");
1209             exit(1);
1210         }
1211     }
1212     return 0;
1213 }
1214
1215 off_t parse_size(char *size_arg, off_t mult) {
1216     char c;
1217     int num;
1218     off_t ret;
1219     c = size_arg[strlen(size_arg) - 1];
1220     if (c > '9') {
1221         size_arg[strlen(size_arg) - 1] = '\0';
1222     }
1223     num = atoi(size_arg);
1224     switch(c) {
1225     case 'g':
1226     case 'G':
1227         mult = 1024 * 1024 * 1024;
1228         break;
1229     case 'm':
1230     case 'M':
1231         mult = 1024 * 1024;
1232         break;
1233     case 'k':
1234     case 'K':
1235         mult = 1024;
1236         break;
1237     case 'b':
1238     case 'B':
1239         mult = 1;
1240         break;
1241     }
1242     ret = mult * num;
1243     return ret;
1244 }
1245
1246 void print_usage(void) {
1247     printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1248     printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1249     printf("                  file1 [file2 ...]\n");
1250     printf("\t-a size in KB at which to align buffers\n");
1251     printf("\t-b max number of iocbs to give io_submit at once\n");
1252     printf("\t-c number of io contexts per file\n");
1253     printf("\t-C offset between contexts, default 2MB\n");
1254     printf("\t-s size in MB of the test file(s), default 1024MB\n");
1255     printf("\t-r record size in KB used for each io, default 64KB\n");
1256     printf("\t-d number of pending aio requests for each file, default 64\n");
1257     printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1258     printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1259     printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1260     printf("\t-S Use O_SYNC for writes\n");
1261     printf("\t-o add an operation to the list: write=0, read=1,\n"); 
1262     printf("\t   random write=2, random read=3.\n");
1263     printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1264     printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1265     printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1266     printf("\t-n no fsyncs between write stage and read stage\n");
1267     printf("\t-l print io_submit latencies after each stage\n");
1268     printf("\t-L print io completion latencies after each stage\n");
1269     printf("\t-t number of threads to run\n");
1270     printf("\t-u unlink files after completion\n");
1271     printf("\t-v verification of bytes written\n");
1272     printf("\t-x turn off thread stonewalling\n");
1273     printf("\t-h this message\n");
1274     printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1275     printf("\t   translate to 400KB, 400MB and 400GB\n");
1276     printf("version %s\n", PROG_VERSION);
1277 }
1278
1279 int main(int ac, char **av) 
1280 {
1281     int rwfd;
1282     int i;
1283     int j;
1284     int c;
1285
1286     off_t file_size = 1 * 1024 * 1024 * 1024;
1287     int first_stage = WRITE;
1288     struct io_oper *oper;
1289     int status = 0;
1290     int num_files = 0;
1291     int open_fds = 0;
1292     struct thread_info *t;
1293
1294     page_size_mask = getpagesize() - 1;
1295
1296     while(1) {
1297         c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1298         if  (c < 0)
1299             break;
1300
1301         switch(c) {
1302         case 'a':
1303             page_size_mask = parse_size(optarg, 1024);
1304             page_size_mask--;
1305             break;
1306         case 'c':
1307             num_contexts = atoi(optarg);
1308             break;
1309         case 'C':
1310             context_offset = parse_size(optarg, 1024 * 1024);
1311         case 'b':
1312             max_io_submit = atoi(optarg);
1313             break;
1314         case 's':
1315             file_size = parse_size(optarg, 1024 * 1024);
1316             break;
1317         case 'd':
1318             depth = atoi(optarg);
1319             break;
1320         case 'r':
1321             rec_len = parse_size(optarg, 1024);
1322             break;
1323         case 'i':
1324             io_iter = atoi(optarg);
1325             break;
1326         case 'I':
1327           iterations = atoi(optarg);
1328         break;
1329         case 'n':
1330             fsync_stages = 0;
1331             break;
1332         case 'l':
1333             latency_stats = 1;
1334             break;
1335         case 'L':
1336             completion_latency_stats = 1;
1337             break;
1338         case 'm':
1339             if (!strcmp(optarg, "shm")) {
1340                 fprintf(stderr, "using ipc shm\n");
1341                 use_shm = USE_SHM;
1342             } else if (!strcmp(optarg, "shmfs")) {
1343                 fprintf(stderr, "using /dev/shm for buffers\n");
1344                 use_shm = USE_SHMFS;
1345             }
1346             break;
1347         case 'o': 
1348             i = atoi(optarg);
1349             stages |= 1 << i;
1350             fprintf(stderr, "adding stage %s\n", stage_name(i));
1351             break;
1352         case 'O':
1353             o_direct = O_DIRECT;
1354             break;
1355         case 'S':
1356             o_sync = O_SYNC;
1357             break;
1358         case 't':
1359             num_threads = atoi(optarg);
1360             break;
1361         case 'x':
1362             stonewall = 0;
1363             break;
1364         case 'u':
1365             unlink_files = 1;
1366             break;
1367         case 'v':
1368             verify = 1;
1369             break;
1370         case 'h':
1371         default:
1372             print_usage();
1373             exit(1);
1374         }
1375     }
1376
1377     /* 
1378      * make sure we don't try to submit more ios than we have allocated
1379      * memory for
1380      */
1381     if (depth < io_iter) {
1382         io_iter = depth;
1383         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1384     }
1385
1386     if (optind >= ac) {
1387         print_usage();
1388         exit(1);
1389     }
1390
1391     num_files = ac - optind;
1392
1393     if (num_threads > (num_files * num_contexts)) {
1394         num_threads = num_files * num_contexts;
1395         fprintf(stderr, "dropping thread count to the number of contexts %d\n", 
1396                 num_threads);
1397     }
1398
1399     t = calloc(num_threads, sizeof(*t));
1400     if (!t) {
1401         perror("calloc");
1402         exit(1);
1403     }
1404     global_thread_info = t;
1405
1406     /* by default, allow a huge number of iocbs to be sent towards
1407      * io_submit
1408      */
1409     if (!max_io_submit)
1410         max_io_submit = num_files * io_iter * num_contexts;
1411
1412     /*
1413      * make sure we don't try to submit more ios than max_io_submit allows 
1414      */
1415     if (max_io_submit < io_iter) {
1416         io_iter = max_io_submit;
1417         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1418     }
1419
1420     if (!stages) {
1421         stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1422     } else {
1423         for (i = 0 ; i < LAST_STAGE; i++) {
1424             if (stages & (1 << i)) {
1425                 first_stage = i;
1426                 fprintf(stderr, "starting with %s\n", stage_name(i));
1427                 break;
1428             }
1429         }
1430     }
1431
1432     if (file_size < num_contexts * context_offset) {
1433         fprintf(stderr, "file size %Lu too small for %d contexts\n", 
1434                 (unsigned long long)file_size, num_contexts);
1435         exit(1);
1436     }
1437
1438     fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n",
1439             (unsigned long long)file_size / (1024 * 1024),
1440             rec_len / 1024, depth, io_iter);
1441     fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 
1442             max_io_submit, (page_size_mask + 1)/1024);
1443     fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 
1444             num_threads, num_files, num_contexts, 
1445             (unsigned long long)context_offset / (1024 * 1024),
1446             verify ? "on" : "off");
1447     /* open all the files and do any required setup for them */
1448     for (i = optind ; i < ac ; i++) {
1449         int thread_index;
1450         for (j = 0 ; j < num_contexts ; j++) {
1451             thread_index = open_fds % num_threads;
1452             open_fds++;
1453
1454             rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1455             assert(rwfd != -1);
1456
1457             oper = create_oper(rwfd, first_stage, j * context_offset, 
1458                                file_size - j * context_offset, rec_len, 
1459                                depth, io_iter, av[i]);
1460             if (!oper) {
1461                 fprintf(stderr, "error in create_oper\n");
1462                 exit(-1);
1463             }
1464             oper_list_add(oper, &t[thread_index].active_opers);
1465             t[thread_index].num_files++;
1466         }
1467     }
1468     if (setup_shared_mem(num_threads, num_files * num_contexts, 
1469                          depth, rec_len, max_io_submit))
1470     {
1471         exit(1);
1472     }
1473     for (i = 0 ; i < num_threads ; i++) {
1474         if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1475                 exit(1);
1476     }
1477     if (num_threads > 1){
1478         printf("Running multi thread version num_threads:%d\n", num_threads);
1479         run_workers(t, num_threads);
1480     } else {
1481         printf("Running single thread version \n");
1482         status = worker(t);
1483     }
1484     if (unlink_files) {
1485         for (i = optind ; i < ac ; i++) {
1486             printf("Cleaning up file %s \n", av[i]);
1487             unlink(av[i]);
1488         }
1489     }
1490
1491     if (status) {
1492         exit(1);
1493     }
1494     return status;
1495 }
1496
1497