Add AIO test 113 into the mix, enable AIO testing capability in local fsx.
[xfstests-dev.git] / ltp / aio-stress.c
1 /*
2  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of version 2 of the GNU General Public License as
6  * published by the Free Software Foundation.
7  * 
8  * This program is distributed in the hope that it would be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  * 
12  * Further, this software is distributed without any warranty that it is
13  * free of the rightful claim of any third person regarding infringement
14  * or the like.  Any license provided herein, whether implied or
15  * otherwise, applies only to this software file.  Patent licenses, if
16  * any, provided herein do not apply to combinations of this program with
17  * other software, or any other product whatsoever.
18  * 
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write the Free Software Foundation, Inc., 59
21  * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22  * 
23  * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24  * Mountain View, CA  94043, or:
25  * 
26  *
27  * aio-stress
28  *
29  * will open or create each file on the command line, and start a series
30  * of aio to it.  
31  *
32  * aio is done in a rotating loop.  first file1 gets 8 requests, then
33  * file2, then file3 etc.  As each file finishes writing, it is switched
34  * to reads
35  *
36  * io buffers are aligned in case you want to do raw io
37  *
38  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39  *
40  * run aio-stress -h to see the options
41  *
42  * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43  */
44 #define _FILE_OFFSET_BITS 64
45 #define PROG_VERSION "0.18"
46 #define NEW_GETEVENTS
47
48 #include <stdio.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <stdlib.h>
52
53 #include <sys/types.h>
54 #include <sys/stat.h>
55 #include <fcntl.h>
56 #include <unistd.h>
57 #include <sys/time.h>
58 #include <libaio.h>
59 #include <sys/ipc.h>
60 #include <sys/shm.h>
61 #include <sys/mman.h>
62 #include <string.h>
63 #include <pthread.h>
64
65 #define IO_FREE 0
66 #define IO_PENDING 1
67 #define RUN_FOREVER -1
68
69 #ifndef O_DIRECT
70 #define O_DIRECT         040000 /* direct disk access hint */
71 #endif
72
73 enum {
74     WRITE,
75     READ,
76     RWRITE,
77     RREAD,
78     LAST_STAGE,
79 };
80
81 #define USE_MALLOC 0
82 #define USE_SHM 1
83 #define USE_SHMFS 2
84
85 /* 
86  * various globals, these are effectively read only by the time the threads
87  * are started
88  */
89 long stages = 0;
90 unsigned long page_size_mask;
91 int o_direct = 0;
92 int o_sync = 0;
93 int latency_stats = 0;
94 int io_iter = 8;
95 int iterations = RUN_FOREVER;
96 int max_io_submit = 0;
97 long rec_len = 64 * 1024;
98 int depth = 64;
99 int num_threads = 1;
100 int num_contexts = 1;
101 off_t context_offset = 2 * 1024 * 1024;
102 int fsync_stages = 1;
103 int use_shm = 0;
104 int shm_id;
105 char *unaligned_buffer = NULL;
106 char *aligned_buffer = NULL;
107 int padded_reclen = 0;
108 int stonewall = 1;
109 int verify = 0;
110 char *verify_buf = NULL;
111
112 struct io_unit;
113 struct thread_info;
114
115 /* pthread mutexes and other globals for keeping the threads in sync */
116 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
117 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
118 int threads_ending = 0;
119 int threads_starting = 0;
120 struct timeval global_stage_start_time;
121 struct thread_info *global_thread_info;
122
123 /* 
124  * latencies during io_submit are measured, these are the 
125  * granularities for deviations 
126  */
127 #define DEVIATIONS 6
128 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
129 struct io_latency {
130     double max;
131     double min;
132     double total_io;
133     double total_lat;
134     double deviations[DEVIATIONS]; 
135 };
136
137 /* container for a series of operations to a file */
138 struct io_oper {
139     /* already open file descriptor, valid for whatever operation you want */
140     int fd;
141
142     /* starting byte of the operation */
143     off_t start;
144
145     /* ending byte of the operation */
146     off_t end;
147
148     /* size of the read/write buffer */
149     int reclen;
150
151     /* max number of pending requests before a wait is triggered */
152     int depth;
153
154     /* current number of pending requests */
155     int num_pending;
156
157     /* last error, zero if there were none */
158     int last_err;
159
160     /* total number of errors hit. */
161     int num_err;
162
163     /* read,write, random, etc */
164     int rw;
165
166     /* number of ios that will get sent to aio */
167     int total_ios;
168
169     /* number of ios we've already sent */
170     int started_ios;
171
172     /* last offset used in an io operation */
173     off_t last_offset;
174
175     /* stonewalled = 1 when we got cut off before submitting all our ios */
176     int stonewalled;
177
178     /* list management */
179     struct io_oper *next;
180     struct io_oper *prev;
181
182     struct timeval start_time;
183
184     char *file_name;
185 };
186
187 /* a single io, and all the tracking needed for it */
188 struct io_unit {
189     /* note, iocb must go first! */
190     struct iocb iocb;
191
192     /* pointer to parent io operation struct */
193     struct io_oper *io_oper;
194
195     /* aligned buffer */
196     char *buf;
197
198     /* size of the aligned buffer (record size) */
199     int buf_size;
200
201     /* state of this io unit (free, pending, done) */
202     int busy;
203
204     /* result of last operation */
205     long res;
206
207     struct io_unit *next;
208 };
209
210 struct thread_info {
211     io_context_t io_ctx;
212     pthread_t tid;
213
214     /* allocated array of io_unit structs */
215     struct io_unit *ios;
216
217     /* list of io units available for io */
218     struct io_unit *free_ious;
219
220     /* number of io units in the ios array */
221     int num_global_ios;
222
223     /* number of io units in flight */
224     int num_global_pending;
225
226     /* preallocated array of iocb pointers, only used in run_active */
227     struct iocb **iocbs;
228
229     /* preallocated array of events */
230     struct io_event *events;
231
232     /* size of the events array */
233     int num_global_events;
234
235     /* latency stats for io_submit */
236     struct io_latency io_submit_latency;
237
238     /* list of operations still in progress, and of those finished */
239     struct io_oper *active_opers;
240     struct io_oper *finished_opers;
241
242     /* number of files this thread is doing io on */
243     int num_files;
244
245     /* how much io this thread did in the last stage */
246     double stage_mb_trans;
247 };
248
249 static double time_since(struct timeval *tv) {
250     double sec, usec;
251     double ret;
252     struct timeval stop;
253     gettimeofday(&stop, NULL);
254     sec = stop.tv_sec - tv->tv_sec;
255     usec = stop.tv_usec - tv->tv_usec;
256     if (sec > 0 && usec < 0) {
257         sec--;
258         usec += 1000000;
259     } 
260     ret = sec + usec / (double)1000000;
261     if (ret < 0)
262         ret = 0;
263     return ret;
264 }
265
266 static void calc_latency(struct timeval *tv, struct io_latency *lat)
267 {
268     double delta;
269     int i;
270     delta = time_since(tv);
271     delta = delta * 1000;
272
273     if (delta > lat->max)
274         lat->max = delta;
275     if (!lat->min || delta < lat->min)
276         lat->min = delta;
277     lat->total_io++;
278     lat->total_lat += delta;
279     for (i = 0 ; i < DEVIATIONS ; i++) {
280         if (delta < deviations[i]) {
281             lat->deviations[i]++;
282             break;
283         }
284     }
285 }
286
287 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
288 {
289     if (!*list) {
290         *list = oper;
291         oper->prev = oper->next = oper;
292         return;
293     }
294     oper->prev = (*list)->prev;
295     oper->next = *list;
296     (*list)->prev->next = oper;
297     (*list)->prev = oper;
298     return;
299 }
300
301 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
302 {
303     if ((*list)->next == (*list)->prev && *list == (*list)->next) {
304         *list = NULL;
305         return;
306     }
307     oper->prev->next = oper->next;
308     oper->next->prev = oper->prev;
309     if (*list == oper)
310         *list = oper->next;
311 }
312
313 /* worker func to check error fields in the io unit */
314 static int check_finished_io(struct io_unit *io) {
315     int i;
316     if (io->res != io->buf_size) {
317
318                  struct stat s;
319                  fstat(io->io_oper->fd, &s);
320   
321                  /*
322                   * If file size is large enough for the read, then this short
323                   * read is an error.
324                   */
325                  if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
326                      s.st_size > (io->iocb.u.c.offset + io->res)) {
327   
328                                  fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
329                                                  io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
330                                                  io->iocb.u.c.offset, io->buf_size);
331                                  io->io_oper->last_err = io->res;
332                                  io->io_oper->num_err++;
333                                  return -1;
334                  }
335     }
336     if (verify && io->io_oper->rw == READ) {
337         if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
338             fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 
339                     io->io_oper->file_name, io->iocb.u.c.offset);
340             
341             for (i = 0 ; i < io->io_oper->reclen ; i++) {
342                 if (io->buf[i] != verify_buf[i]) {
343                     fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
344                 }
345             }
346             fprintf(stderr, "\n");
347         }
348
349     }
350     return 0;
351 }
352
353 /* worker func to check the busy bits and get an io unit ready for use */
354 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
355     if (io->busy == IO_PENDING)
356         return -1;
357
358     io->busy = IO_PENDING;
359     io->res = 0;
360     io->io_oper = oper;
361     return 0;
362 }
363
364 char *stage_name(int rw) {
365     switch(rw) {
366     case WRITE:
367         return "write";
368     case READ:
369         return "read";
370     case RWRITE:
371         return "random write";
372     case RREAD:
373         return "random read";
374     }
375     return "unknown";
376 }
377
378 static inline double oper_mb_trans(struct io_oper *oper) {
379     return ((double)oper->started_ios * (double)oper->reclen) /
380                 (double)(1024 * 1024);
381 }
382
383 static void print_time(struct io_oper *oper) {
384     double runtime;
385     double tput;
386     double mb;
387
388     runtime = time_since(&oper->start_time); 
389     mb = oper_mb_trans(oper);
390     tput = mb / runtime;
391     fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 
392             stage_name(oper->rw), oper->file_name, tput, mb, runtime);
393 }
394
395 static void print_latency(struct thread_info *t) {
396     struct io_latency *lat = &t->io_submit_latency;
397     double avg = lat->total_lat / lat->total_io;
398     int i;
399     double total_counted = 0;
400     fprintf(stderr, "latency min %.2f avg %.2f max %.2f\n\t", 
401             lat->min, avg, lat->max);
402
403     for (i = 0 ; i < DEVIATIONS ; i++) {
404         fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
405         total_counted += lat->deviations[i];
406     }
407     if (total_counted && lat->total_io - total_counted)
408         fprintf(stderr, " < %.0f", lat->total_io - total_counted);
409     fprintf(stderr, "\n");
410     memset(&t->io_submit_latency, 0, sizeof(t->io_submit_latency));
411 }
412
413 /*
414  * updates the fields in the io operation struct that belongs to this
415  * io unit, and make the io unit reusable again
416  */
417 void finish_io(struct thread_info *t, struct io_unit *io, long result) {
418     struct io_oper *oper = io->io_oper;
419
420     io->res = result;
421     io->busy = IO_FREE;
422     io->next = t->free_ious;
423     t->free_ious = io;
424     oper->num_pending--;
425     t->num_global_pending--;
426     check_finished_io(io);
427     if (oper->num_pending == 0 && 
428        (oper->started_ios == oper->total_ios || oper->stonewalled)) 
429     {
430         print_time(oper);
431     } 
432 }
433
434 int read_some_events(struct thread_info *t) {
435     struct io_unit *event_io;
436     struct io_event *event;
437     int nr;
438     int i; 
439     int min_nr = io_iter;
440
441     if (t->num_global_pending < io_iter)
442         min_nr = t->num_global_pending;
443
444 #ifdef NEW_GETEVENTS
445     nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
446 #else
447     nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
448 #endif
449     if (nr <= 0)
450         return nr;
451
452     for (i = 0 ; i < nr ; i++) {
453         event = t->events + i;
454         event_io = (struct io_unit *)((unsigned long)event->obj); 
455         finish_io(t, event_io, event->res);
456     }
457     return nr;
458 }
459
460 /* 
461  * finds a free io unit, waiting for pending requests if required.  returns
462  * null if none could be found
463  */
464 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
465 {
466     struct io_unit *event_io;
467     int nr;
468
469 retry:
470     if (t->free_ious) {
471         event_io = t->free_ious;
472         t->free_ious = t->free_ious->next;
473         if (grab_iou(event_io, oper)) {
474             fprintf(stderr, "io unit on free list but not free\n");
475             abort();
476         }
477         return event_io;
478     }
479     nr = read_some_events(t);
480     if (nr > 0)
481         goto retry;
482     else
483         fprintf(stderr, "no free ious after read_some_events\n");
484     return NULL;
485 }
486
487 /*
488  * wait for all pending requests for this io operation to finish
489  */
490 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
491     struct io_event event;
492     struct io_unit *event_io;
493
494     if (oper == NULL) {
495         return 0;
496     }
497
498     if (oper->num_pending == 0)
499         goto done;
500
501     /* this func is not speed sensitive, no need to go wild reading
502      * more than one event at a time
503      */
504 #ifdef NEW_GETEVENTS
505     while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
506 #else
507     while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
508 #endif
509         event_io = (struct io_unit *)((unsigned long)event.obj); 
510
511         finish_io(t, event_io, event.res);
512
513         if (oper->num_pending == 0)
514             break;
515     }
516 done:
517     if (oper->num_err) {
518         fprintf(stderr, "%u errors on oper, last %u\n", 
519                 oper->num_err, oper->last_err);
520     }
521     return 0;
522 }
523
524 off_t random_byte_offset(struct io_oper *oper) {
525     off_t num;
526     off_t rand_byte = oper->start;
527     off_t range;
528     off_t offset = 1;
529
530     range = (oper->end - oper->start) / (1024 * 1024);
531     if ((page_size_mask+1) > (1024 * 1024))
532         offset = (page_size_mask+1) / (1024 * 1024);
533     if (range < offset)
534         range = 0;
535     else
536         range -= offset;
537
538     /* find a random mb offset */
539     num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
540     rand_byte += num * 1024 * 1024;
541     
542     /* find a random byte offset */
543     num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
544
545     /* page align */
546     num = (num + page_size_mask) & ~page_size_mask;
547     rand_byte += num;
548
549     if (rand_byte + oper->reclen > oper->end) {
550         rand_byte -= oper->reclen;
551     }
552     return rand_byte;
553 }
554
555 /* 
556  * build an aio iocb for an operation, based on oper->rw and the
557  * last offset used.  This finds the struct io_unit that will be attached
558  * to the iocb, and things are ready for submission to aio after this
559  * is called.
560  *
561  * returns null on error
562  */
563 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
564 {
565     struct io_unit *io;
566     off_t rand_byte;
567
568     io = find_iou(t, oper);
569     if (!io) {
570         fprintf(stderr, "unable to find io unit\n");
571         return NULL;
572     }
573
574     switch(oper->rw) {
575     case WRITE:
576         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
577                        oper->last_offset);
578         oper->last_offset += oper->reclen;
579         break;
580     case READ:
581         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
582                       oper->last_offset);
583         oper->last_offset += oper->reclen;
584         break;
585     case RREAD:
586         rand_byte = random_byte_offset(oper);
587         oper->last_offset = rand_byte;
588         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
589                       rand_byte);
590         break;
591     case RWRITE:
592         rand_byte = random_byte_offset(oper);
593         oper->last_offset = rand_byte;
594         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
595                       rand_byte);
596         
597         break;
598     }
599
600     return io;
601 }
602
603 /* 
604  * wait for any pending requests, and then free all ram associated with
605  * an operation.  returns the last error the operation hit (zero means none)
606  */
607 static int
608 finish_oper(struct thread_info *t, struct io_oper *oper)
609 {
610     unsigned long last_err;
611
612     io_oper_wait(t, oper);
613     last_err = oper->last_err;
614     if (oper->num_pending > 0) {
615         fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
616     }
617     close(oper->fd);
618     free(oper);
619     return last_err;
620 }
621
622 /* 
623  * allocates an io operation and fills in all the fields.  returns
624  * null on error
625  */
626 static struct io_oper * 
627 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
628             int iter, char *file_name)
629 {
630     struct io_oper *oper;
631
632     oper = malloc (sizeof(*oper));
633     if (!oper) {
634         fprintf(stderr, "unable to allocate io oper\n");
635         return NULL;
636     }
637     memset(oper, 0, sizeof(*oper));
638
639     oper->depth = depth;
640     oper->start = start;
641     oper->end = end;
642     oper->last_offset = oper->start;
643     oper->fd = fd;
644     oper->reclen = reclen;
645     oper->rw = rw;
646     oper->total_ios = (oper->end - oper->start) / oper->reclen;
647     oper->file_name = file_name;
648
649     return oper;
650 }
651
652 /*
653  * does setup on num_ios worth of iocbs, but does not actually
654  * start any io
655  */
656 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 
657                struct iocb **my_iocbs) 
658 {
659     int i;
660     struct io_unit *io;
661
662     if (oper->started_ios == 0)
663         gettimeofday(&oper->start_time, NULL);
664
665     if (num_ios == 0)
666         num_ios = oper->total_ios;
667
668     if ((oper->started_ios + num_ios) > oper->total_ios)
669         num_ios = oper->total_ios - oper->started_ios;   
670
671     for( i = 0 ; i < num_ios ; i++) {
672         io = build_iocb(t, oper);
673         if (!io) {
674             return -1;    
675         }
676         my_iocbs[i] = &io->iocb;
677     }
678     return num_ios;
679 }
680
681 /*
682  * runs through the iocbs in the array provided and updates
683  * counters in the associated oper struct
684  */
685 static void update_iou_counters(struct iocb **my_iocbs, int nr) 
686 {
687     struct io_unit *io;
688     int i;
689     for (i = 0 ; i < nr ; i++) {
690         io = (struct io_unit *)(my_iocbs[i]);
691         io->io_oper->num_pending++;
692         io->io_oper->started_ios++;
693     }
694 }
695
696 /* starts some io for a given file, returns zero if all went well */
697 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 
698 {
699     int ret;
700     struct timeval start_time;
701
702 resubmit:
703     gettimeofday(&start_time, NULL);
704     ret = io_submit(t->io_ctx, num_ios, my_iocbs);
705     calc_latency(&start_time, &t->io_submit_latency);
706     if (ret != num_ios) {
707         /* some ios got through */
708         if (ret > 0) {
709             update_iou_counters(my_iocbs, ret);
710             my_iocbs += ret;
711             t->num_global_pending += ret;
712             num_ios -= ret;
713         }
714         /* 
715          * we've used all the requests allocated in aio_init, wait and
716          * retry
717          */
718         if (ret > 0 || ret == -EAGAIN) {
719             if ((ret = read_some_events(t) > 0)) {
720                 goto resubmit;
721             }
722         }
723
724         fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
725         return -1;
726     }
727     update_iou_counters(my_iocbs, ret);
728     t->num_global_pending += ret;
729     return 0;
730 }
731
732 /* 
733  * changes oper->rw to the next in a command sequence, or returns zero
734  * to say this operation is really, completely done for
735  */
736 static int restart_oper(struct io_oper *oper) {
737     int new_rw  = 0;
738     if (oper->last_err)
739         return 0;
740
741     /* this switch falls through */
742     switch(oper->rw) {
743     case WRITE:
744         if (stages & (1 << READ))
745             new_rw = READ;
746     case READ:
747         if (!new_rw && stages & (1 << RWRITE))
748             new_rw = RWRITE;
749     case RWRITE:
750         if (!new_rw && stages & (1 << RREAD))
751             new_rw = RREAD;
752     }
753
754     if (new_rw) {
755         oper->started_ios = 0;
756         oper->last_offset = oper->start;
757         oper->stonewalled = 0;
758
759         /* 
760          * we're restarting an operation with pending requests, so the
761          * timing info won't be printed by finish_io.  Printing it here
762          */
763         if (oper->num_pending)
764             print_time(oper);
765
766         oper->rw = new_rw;
767         return 1;
768     } 
769     return 0;
770 }
771
772 static int oper_runnable(struct io_oper *oper) {
773     struct stat buf;
774     int ret;
775
776     /* first context is always runnable, if started_ios > 0, no need to
777      * redo the calculations
778      */
779     if (oper->started_ios || oper->start == 0)
780         return 1;
781     /*
782      * only the sequential phases force delays in starting */
783     if (oper->rw >= RWRITE)
784         return 1;
785     ret = fstat(oper->fd, &buf);
786     if (ret < 0) {
787         perror("fstat");
788         exit(1);
789     }
790     if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
791         return 0;
792     return 1;
793 }
794
795 /*
796  * runs through all the io operations on the active list, and starts
797  * a chunk of io on each.  If any io operations are completely finished,
798  * it either switches them to the next stage or puts them on the 
799  * finished list.
800  *
801  * this function stops after max_io_submit iocbs are sent down the 
802  * pipe, even if it has not yet touched all the operations on the 
803  * active list.  Any operations that have finished are moved onto
804  * the finished_opers list.
805  */
806 static int run_active_list(struct thread_info *t,
807                          int io_iter,
808                          int max_io_submit)
809 {
810     struct io_oper *oper;
811     struct io_oper *built_opers = NULL;
812     struct iocb **my_iocbs = t->iocbs;
813     int ret = 0;
814     int num_built = 0;
815
816     oper = t->active_opers;
817     while(oper) {
818         if (!oper_runnable(oper)) {
819             oper = oper->next;
820             if (oper == t->active_opers)
821                 break;
822             continue;
823         }
824         ret = build_oper(t, oper, io_iter, my_iocbs);
825         if (ret >= 0) {
826             my_iocbs += ret;
827             num_built += ret;
828             oper_list_del(oper, &t->active_opers);
829             oper_list_add(oper, &built_opers);
830             oper = t->active_opers;
831             if (num_built + io_iter > max_io_submit)
832                 break;
833         } else
834             break;
835     }
836     if (num_built) {
837         ret = run_built(t, num_built, t->iocbs);
838         if (ret < 0) {
839             fprintf(stderr, "error %d on run_built\n", ret);
840             exit(1);
841         }
842         while(built_opers) {
843             oper = built_opers;
844             oper_list_del(oper, &built_opers);
845             oper_list_add(oper, &t->active_opers);
846             if (oper->started_ios == oper->total_ios) {
847                 oper_list_del(oper, &t->active_opers);
848                 oper_list_add(oper, &t->finished_opers);
849             }
850         }
851     }
852     return 0;
853 }
854
855 void drop_shm() {
856     int ret;
857     struct shmid_ds ds;
858     if (use_shm != USE_SHM)
859         return;
860
861     ret = shmctl(shm_id, IPC_RMID, &ds);
862     if (ret) {
863         perror("shmctl IPC_RMID");
864     }
865 }
866
867 void aio_setup(io_context_t *io_ctx, int n)
868 {
869     int res = io_queue_init(n, io_ctx);
870     if (res != 0) {
871         fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
872                 n, res, strerror(-res));
873         exit(3);
874     }
875 }
876
877 /*
878  * allocate io operation and event arrays for a given thread
879  */
880 int setup_ious(struct thread_info *t, 
881               int num_files, int depth, 
882               int reclen, int max_io_submit) {
883     int i;
884     size_t bytes = num_files * depth * sizeof(*t->ios);
885
886     t->ios = malloc(bytes);
887     if (!t->ios) {
888         fprintf(stderr, "unable to allocate io units\n");
889         return -1;
890     }
891     memset(t->ios, 0, bytes);
892
893     for (i = 0 ; i < depth * num_files; i++) {
894         t->ios[i].buf = aligned_buffer;
895         aligned_buffer += padded_reclen;
896         t->ios[i].buf_size = reclen;
897         if (verify)
898             memset(t->ios[i].buf, 'b', reclen);
899         else
900             memset(t->ios[i].buf, 0, reclen);
901         t->ios[i].next = t->free_ious;
902         t->free_ious = t->ios + i;
903     }
904     if (verify) {
905         verify_buf = aligned_buffer;
906         memset(verify_buf, 'b', reclen);
907     }
908
909     t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
910     if (!t->iocbs) {
911         fprintf(stderr, "unable to allocate iocbs\n");
912         goto free_buffers;
913     }
914
915     memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
916
917     t->events = malloc(sizeof(struct io_event) * depth * num_files);
918     if (!t->events) {
919         fprintf(stderr, "unable to allocate ram for events\n");
920         goto free_buffers;
921     }
922     memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
923
924     t->num_global_ios = num_files * depth;
925     t->num_global_events = t->num_global_ios;
926     return 0;
927
928 free_buffers:
929     if (t->ios)
930         free(t->ios);
931     if (t->iocbs)
932         free(t->iocbs);  
933     if (t->events)
934         free(t->events);
935     return -1;
936 }
937
938 /*
939  * The buffers used for file data are allocated as a single big
940  * malloc, and then each thread and operation takes a piece and uses
941  * that for file data.  This lets us do a large shm or bigpages alloc
942  * and without trying to find a special place in each thread to map the
943  * buffers to
944  */
945 int setup_shared_mem(int num_threads, int num_files, int depth, 
946                      int reclen, int max_io_submit) 
947 {
948     char *p = NULL;
949     size_t total_ram;
950     
951     padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
952     padded_reclen = padded_reclen * (page_size_mask+1);
953     total_ram = num_files * depth * padded_reclen + num_threads;
954     if (verify)
955         total_ram += padded_reclen;
956
957     if (use_shm == USE_MALLOC) {
958         p = malloc(total_ram + page_size_mask);
959     } else if (use_shm == USE_SHM) {
960         shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
961         if (shm_id < 0) {
962             perror("shmget");
963             drop_shm();
964             goto free_buffers;
965         }
966         p = shmat(shm_id, (char *)0x50000000, 0);
967         if ((long)p == -1) {
968             perror("shmat");
969             goto free_buffers;
970         }
971         /* won't really be dropped until we shmdt */
972         drop_shm();
973     } else if (use_shm == USE_SHMFS) {
974         char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */    
975         int fd;
976
977         strcpy(mmap_name, "/dev/shm/XXXXXX");
978         fd = mkstemp(mmap_name);
979         if (fd < 0) {
980             perror("mkstemp");
981             goto free_buffers;
982         }
983         unlink(mmap_name);
984         ftruncate(fd, total_ram);
985         shm_id = fd;
986         p = mmap((char *)0x50000000, total_ram,
987                  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
988
989         if (p == MAP_FAILED) {
990             perror("mmap");
991             goto free_buffers;
992         }
993     }
994     if (!p) {
995         fprintf(stderr, "unable to allocate buffers\n");
996         goto free_buffers;
997     }
998     unaligned_buffer = p;
999     p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1000     aligned_buffer = p;
1001     return 0;
1002
1003 free_buffers:
1004     drop_shm();
1005     if (unaligned_buffer)
1006         free(unaligned_buffer);
1007     return -1;
1008 }
1009
1010 /*
1011  * runs through all the thread_info structs and calculates a combined
1012  * throughput
1013  */
1014 void global_thread_throughput(struct thread_info *t, char *this_stage) {
1015     int i;
1016     double runtime = time_since(&global_stage_start_time);
1017     double total_mb = 0;
1018     double min_trans = 0;
1019
1020     for (i = 0 ; i < num_threads ; i++) {
1021         total_mb += global_thread_info[i].stage_mb_trans;
1022         if (!min_trans || t->stage_mb_trans < min_trans)
1023             min_trans = t->stage_mb_trans;
1024     }
1025     if (total_mb) {
1026         fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1027                 total_mb / runtime);
1028         fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1029         if (stonewall)
1030             fprintf(stderr, " min transfer %.2fMB", min_trans);
1031         fprintf(stderr, "\n");
1032     }
1033 }
1034
1035
1036 /* this is the meat of the state machine.  There is a list of
1037  * active operations structs, and as each one finishes the required
1038  * io it is moved to a list of finished operations.  Once they have
1039  * all finished whatever stage they were in, they are given the chance
1040  * to restart and pick a different stage (read/write/random read etc)
1041  *
1042  * various timings are printed in between the stages, along with
1043  * thread synchronization if there are more than one threads.
1044  */
1045 int worker(struct thread_info *t)
1046 {
1047     struct io_oper *oper;
1048     char *this_stage = NULL;
1049     struct timeval stage_time;
1050     int status = 0;
1051     int iteration = 0;
1052     int cnt;
1053
1054     aio_setup(&t->io_ctx, 512);
1055
1056 restart:
1057     printf("Starting %s iter:%d \n", __FUNCTION__,iteration);
1058     if (num_threads > 1) {
1059         printf("num_threads %d \n", num_threads);
1060         pthread_mutex_lock(&stage_mutex);
1061         threads_starting++;
1062         if (threads_starting == num_threads) {
1063             threads_ending = 0;
1064             gettimeofday(&global_stage_start_time, NULL);
1065             pthread_cond_broadcast(&stage_cond);
1066         }
1067         while (threads_starting != num_threads)
1068             pthread_cond_wait(&stage_cond, &stage_mutex);
1069         pthread_mutex_unlock(&stage_mutex);
1070     }
1071     if (t->active_opers) {
1072 //        printf("active_opers %p line:%d\n", t->active_opers, __LINE__);
1073         this_stage = stage_name(t->active_opers->rw);
1074         gettimeofday(&stage_time, NULL);
1075         t->stage_mb_trans = 0;
1076     }
1077     cnt = 0;
1078     /* first we send everything through aio */
1079 //    printf("cnt:%d max_iterations:%d oper:%p\n",cnt, iterations,oper);
1080                         
1081     while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1082 //        printf("active_opers %p line:%d cnt:%d ", t->active_opers,__LINE__,cnt);
1083         if (stonewall && threads_ending) {
1084             oper = t->active_opers;     
1085             oper->stonewalled = 1;
1086             oper_list_del(oper, &t->active_opers);
1087             oper_list_add(oper, &t->finished_opers);
1088 //            printf(" if branch\n");
1089         } else {
1090             run_active_list(t, io_iter,  max_io_submit);
1091 //            printf(" else branch\n");
1092         }
1093         cnt++;
1094     }
1095   
1096     if (latency_stats)
1097         print_latency(t);
1098
1099     /* then we wait for all the operations to finish */
1100     oper = t->finished_opers;
1101 //    printf("line:%d oper:%p\n", __LINE__, oper);
1102     do {
1103         io_oper_wait(t, oper);
1104         if (oper != NULL) {
1105             oper = oper->next;
1106         }
1107     } while (oper != t->finished_opers);
1108 //    printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__);
1109
1110     /* then we do an fsync to get the timing for any future operations
1111      * right, and check to see if any of these need to get restarted
1112      */
1113     oper = t->finished_opers;
1114 //    printf("oper %p line:%d\n", oper,__LINE__);
1115     while (oper) {
1116         if (fsync_stages)
1117             fsync(oper->fd);
1118         t->stage_mb_trans += oper_mb_trans(oper);
1119         if (restart_oper(oper)) {
1120             oper_list_del(oper, &t->finished_opers);
1121             oper_list_add(oper, &t->active_opers);
1122             oper = t->finished_opers;
1123             continue;
1124         }
1125         oper = oper->next;
1126         if (oper == t->finished_opers)
1127             break;
1128     } 
1129
1130     if (t->stage_mb_trans && t->num_files > 0) {
1131 //        printf("num_files %d line:%d\n", t->num_files,__LINE__);
1132         double seconds = time_since(&stage_time);
1133         fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 
1134                 t - global_thread_info, this_stage, t->stage_mb_trans/seconds, 
1135                 t->stage_mb_trans, seconds);
1136     }
1137
1138     if (num_threads > 1) {
1139 //        printf("num_threads %d line:%d\n", num_threads,__LINE__);
1140         pthread_mutex_lock(&stage_mutex);
1141         threads_ending++;
1142         if (threads_ending == num_threads) {
1143             threads_starting = 0;
1144             pthread_cond_broadcast(&stage_cond);
1145             global_thread_throughput(t, this_stage);
1146         }
1147 //        printf("threads_ending %d line:%d\n", threads_ending,__LINE__);
1148         while (threads_ending != num_threads)
1149             pthread_cond_wait(&stage_cond, &stage_mutex);
1150         pthread_mutex_unlock(&stage_mutex);
1151     }
1152
1153     /* someone got restarted, go back to the beginning */
1154     if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1155         iteration++;
1156         goto restart;
1157     }
1158
1159     /* finally, free all the ram */
1160 //    printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__);
1161     while (t->finished_opers) {
1162         oper = t->finished_opers;
1163         oper_list_del(oper, &t->finished_opers);
1164         status = finish_oper(t, oper);
1165     }
1166
1167     if (t->num_global_pending) {
1168         fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1169     }
1170     io_queue_release(t->io_ctx);
1171
1172     return status;
1173 }
1174
1175 typedef void * (*start_routine)(void *);
1176 int run_workers(struct thread_info *t, int num_threads)
1177 {
1178     int ret;
1179     int thread_ret;
1180     int i;
1181
1182 //    printf("%s num_threads %d line:%d\n", __FUNCTION__,num_threads,__LINE__);
1183     for(i = 0 ; i < num_threads ; i++) {
1184         ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1185         if (ret) {
1186             perror("pthread_create");
1187             exit(1);
1188         }
1189     }
1190     for(i = 0 ; i < num_threads ; i++) {
1191         ret = pthread_join(t[i].tid, (void *)&thread_ret);
1192         if (ret) {
1193             perror("pthread_join");
1194             exit(1);
1195         }
1196     }
1197     return 0;
1198 }
1199
1200 off_t parse_size(char *size_arg, off_t mult) {
1201     char c;
1202     int num;
1203     off_t ret;
1204     c = size_arg[strlen(size_arg) - 1];
1205     if (c > '9') {
1206         size_arg[strlen(size_arg) - 1] = '\0';
1207     }
1208     num = atoi(size_arg);
1209     switch(c) {
1210     case 'g':
1211     case 'G':
1212         mult = 1024 * 1024 * 1024;
1213         break;
1214     case 'm':
1215     case 'M':
1216         mult = 1024 * 1024;
1217         break;
1218     case 'k':
1219     case 'K':
1220         mult = 1024;
1221         break;
1222     }
1223     ret = mult * num;
1224     return ret;
1225 }
1226
1227 void print_usage(void) {
1228     printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1229     printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1230     printf("                  file1 [file2 ...]\n");
1231     printf("\t-a size in KB at which to align buffers\n");
1232     printf("\t-b max number of iocbs to give io_submit at once\n");
1233     printf("\t-c number of io contexts per file\n");
1234     printf("\t-C offset between contexts, default 2MB\n");
1235     printf("\t-s size in MB of the test file(s), default 1024MB\n");
1236     printf("\t-r record size in KB used for each io, default 64KB\n");
1237     printf("\t-d number of pending aio requests for each file, default 64\n");
1238     printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1239     printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1240     printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1241     printf("\t-S Use O_SYNC for writes\n");
1242     printf("\t-o add an operation to the list: write=0, read=1,\n"); 
1243     printf("\t   random write=2, random read=3.\n");
1244     printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1245     printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1246     printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1247     printf("\t-n no fsyncs between write stage and read stage\n");
1248     printf("\t-l print io_submit latencies after each stage\n");
1249     printf("\t-t number of threads to run\n");
1250     printf("\t-v verification of bytes written\n");
1251     printf("\t-x turn off thread stonewalling\n");
1252     printf("\t-h this message\n");
1253     printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1254     printf("\t   translate to 400KB, 400MB and 400GB\n");
1255     printf("version %s\n", PROG_VERSION);
1256 }
1257
1258 int main(int ac, char **av) 
1259 {
1260     int rwfd;
1261     int i;
1262     int j;
1263     int c;
1264
1265     off_t file_size = 1 * 1024 * 1024 * 1024;
1266     int first_stage = WRITE;
1267     struct io_oper *oper;
1268     int status = 0;
1269     int num_files = 0;
1270     int open_fds = 0;
1271     struct thread_info *t;
1272
1273     page_size_mask = getpagesize() - 1;
1274
1275     while(1) {
1276         c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lnhOSxv");
1277         if  (c < 0)
1278             break;
1279
1280         switch(c) {
1281         case 'a':
1282             page_size_mask = parse_size(optarg, 1024);
1283             page_size_mask--;
1284             break;
1285         case 'c':
1286             num_contexts = atoi(optarg);
1287             break;
1288         case 'C':
1289             context_offset = parse_size(optarg, 1024 * 1024);
1290         case 'b':
1291             max_io_submit = atoi(optarg);
1292             break;
1293         case 's':
1294             file_size = parse_size(optarg, 1024 * 1024);
1295             break;
1296         case 'd':
1297             depth = atoi(optarg);
1298             break;
1299         case 'r':
1300             rec_len = parse_size(optarg, 1024);
1301             break;
1302         case 'i':
1303             io_iter = atoi(optarg);
1304             break;
1305         case 'I':
1306           iterations = atoi(optarg);
1307         break;
1308         case 'n':
1309             fsync_stages = 0;
1310             break;
1311         case 'l':
1312             latency_stats = 1;
1313             break;
1314         case 'm':
1315             if (!strcmp(optarg, "shm")) {
1316                 fprintf(stderr, "using ipc shm\n");
1317                 use_shm = USE_SHM;
1318             } else if (!strcmp(optarg, "shmfs")) {
1319                 fprintf(stderr, "using /dev/shm for buffers\n");
1320                 use_shm = USE_SHMFS;
1321             }
1322             break;
1323         case 'o': 
1324             i = atoi(optarg);
1325             stages |= 1 << i;
1326             fprintf(stderr, "adding stage %s\n", stage_name(i));
1327             break;
1328         case 'O':
1329             o_direct = O_DIRECT;
1330             break;
1331         case 'S':
1332             o_sync = O_SYNC;
1333             break;
1334         case 't':
1335             num_threads = atoi(optarg);
1336             break;
1337         case 'x':
1338             stonewall = 0;
1339             break;
1340         case 'v':
1341             verify = 1;
1342             break;
1343         case 'h':
1344         default:
1345             print_usage();
1346             exit(1);
1347         }
1348     }
1349
1350     /* 
1351      * make sure we don't try to submit more ios than we have allocated
1352      * memory for
1353      */
1354     if (depth < io_iter) {
1355         io_iter = depth;
1356         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1357     }
1358
1359     if (optind >= ac) {
1360         print_usage();
1361         exit(1);
1362     }
1363
1364     num_files = ac - optind;
1365
1366     if (num_threads > (num_files * num_contexts)) {
1367         num_threads = num_files * num_contexts;
1368         fprintf(stderr, "dropping thread count to the number of contexts %d\n", 
1369                 num_threads);
1370     }
1371
1372     t = malloc(num_threads * sizeof(*t));
1373     if (!t) {
1374         perror("malloc");
1375         exit(1);
1376     }
1377     global_thread_info = t;
1378
1379     /* by default, allow a huge number of iocbs to be sent towards
1380      * io_submit
1381      */
1382     if (!max_io_submit)
1383         max_io_submit = num_files * io_iter * num_contexts;
1384
1385     /*
1386      * make sure we don't try to submit more ios than max_io_submit allows 
1387      */
1388     if (max_io_submit < io_iter) {
1389         io_iter = max_io_submit;
1390         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1391     }
1392
1393     if (!stages) {
1394         stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1395     } else {
1396         for (i = 0 ; i < LAST_STAGE; i++) {
1397             if (stages & (1 << i)) {
1398                 first_stage = i;
1399                 fprintf(stderr, "starting with %s\n", stage_name(i));
1400                 break;
1401             }
1402         }
1403     }
1404
1405     if (file_size < num_contexts * context_offset) {
1406         fprintf(stderr, "file size %Lu too small for %d contexts\n", 
1407                 file_size, num_contexts);
1408         exit(1);
1409     }
1410
1411     fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
1412     fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 
1413             max_io_submit, (page_size_mask + 1)/1024);
1414     fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 
1415             num_threads, num_files, num_contexts, 
1416             context_offset / (1024 * 1024), verify ? "on" : "off");
1417     /* open all the files and do any required setup for them */
1418     for (i = optind ; i < ac ; i++) {
1419         int thread_index;
1420         for (j = 0 ; j < num_contexts ; j++) {
1421             thread_index = open_fds % num_threads;
1422             open_fds++;
1423 //          fprintf(stderr, "adding file %s thread %d\n", av[i], thread_index);
1424
1425             rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1426             assert(rwfd != -1);
1427
1428             oper = create_oper(rwfd, first_stage, j * context_offset, 
1429                                file_size - j * context_offset, rec_len, 
1430                                depth, io_iter, av[i]);
1431             if (!oper) {
1432                 fprintf(stderr, "error in create_oper\n");
1433                 exit(-1);
1434             }
1435             oper_list_add(oper, &t[thread_index].active_opers);
1436             t[thread_index].num_files++;
1437         }
1438     }
1439     if (setup_shared_mem(num_threads, num_files * num_contexts, 
1440                          depth, rec_len, max_io_submit))
1441     {
1442         exit(1);
1443     }
1444     for (i = 0 ; i < num_threads ; i++) {
1445         if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1446                 exit(1);
1447     }
1448     if (num_threads > 1){
1449         printf("Running multi thread version num_threads:%d\n", num_threads);
1450         run_workers(t, num_threads);
1451     }
1452     else {
1453         printf("Running single thread version \n");
1454         status = worker(t);
1455     }
1456
1457
1458     for (i = optind ; i < ac ; i++) {
1459         printf("Cleaning up file %s \n", av[i]);
1460         unlink(av[i]);
1461     }
1462
1463     if (status) {
1464     printf("non zero return %d \n", status);
1465     }
1466     else{
1467         printf("aio-stress Completed successfully %d \n", status);
1468     }
1469
1470     exit(0);
1471 }
1472