fsstress: remove duplicate COLLAPSE_RANGE flags
[xfstests-dev.git] / ltp / aio-stress.c
1 /*
2  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of version 2 of the GNU General Public License as
6  * published by the Free Software Foundation.
7  * 
8  * This program is distributed in the hope that it would be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  * 
12  * Further, this software is distributed without any warranty that it is
13  * free of the rightful claim of any third person regarding infringement
14  * or the like.  Any license provided herein, whether implied or
15  * otherwise, applies only to this software file.  Patent licenses, if
16  * any, provided herein do not apply to combinations of this program with
17  * other software, or any other product whatsoever.
18  * 
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write the Free Software Foundation, Inc., 59
21  * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22  * 
23  *
24  * aio-stress
25  *
26  * will open or create each file on the command line, and start a series
27  * of aio to it.  
28  *
29  * aio is done in a rotating loop.  first file1 gets 8 requests, then
30  * file2, then file3 etc.  As each file finishes writing, it is switched
31  * to reads
32  *
33  * io buffers are aligned in case you want to do raw io
34  *
35  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
36  *
37  * run aio-stress -h to see the options
38  *
39  * Please mail Chris Mason (mason@suse.com) with bug reports or patches
40  */
41 #define _FILE_OFFSET_BITS 64
42 #define PROG_VERSION "0.21"
43 #define NEW_GETEVENTS
44
45 #include <stdio.h>
46 #include <errno.h>
47 #include <assert.h>
48 #include <stdlib.h>
49
50 #include <sys/types.h>
51 #include <sys/stat.h>
52 #include <fcntl.h>
53 #include <unistd.h>
54 #include <sys/time.h>
55 #include <libaio.h>
56 #include <sys/ipc.h>
57 #include <sys/shm.h>
58 #include <sys/mman.h>
59 #include <string.h>
60 #include <pthread.h>
61
62 #define IO_FREE 0
63 #define IO_PENDING 1
64 #define RUN_FOREVER -1
65
66 #ifndef O_DIRECT
67 #define O_DIRECT         040000 /* direct disk access hint */
68 #endif
69
70 enum {
71     WRITE,
72     READ,
73     RWRITE,
74     RREAD,
75     LAST_STAGE,
76 };
77
78 #define USE_MALLOC 0
79 #define USE_SHM 1
80 #define USE_SHMFS 2
81
82 /* 
83  * various globals, these are effectively read only by the time the threads
84  * are started
85  */
86 long stages = 0;
87 unsigned long page_size_mask;
88 int o_direct = 0;
89 int o_sync = 0;
90 int latency_stats = 0;
91 int completion_latency_stats = 0;
92 int io_iter = 8;
93 int iterations = RUN_FOREVER;
94 int max_io_submit = 0;
95 long rec_len = 64 * 1024;
96 int depth = 64;
97 int num_threads = 1;
98 int num_contexts = 1;
99 off_t context_offset = 2 * 1024 * 1024;
100 int fsync_stages = 1;
101 int use_shm = 0;
102 int shm_id;
103 char *unaligned_buffer = NULL;
104 char *aligned_buffer = NULL;
105 int padded_reclen = 0;
106 int stonewall = 1;
107 int verify = 0;
108 char *verify_buf = NULL;
109 int unlink_files = 0;
110
111 struct io_unit;
112 struct thread_info;
113
114 /* pthread mutexes and other globals for keeping the threads in sync */
115 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
116 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
117 int threads_ending = 0;
118 int threads_starting = 0;
119 struct timeval global_stage_start_time;
120 struct thread_info *global_thread_info;
121
122 /* 
123  * latencies during io_submit are measured, these are the 
124  * granularities for deviations 
125  */
126 #define DEVIATIONS 6
127 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
128 struct io_latency {
129     double max;
130     double min;
131     double total_io;
132     double total_lat;
133     double deviations[DEVIATIONS]; 
134 };
135
136 /* container for a series of operations to a file */
137 struct io_oper {
138     /* already open file descriptor, valid for whatever operation you want */
139     int fd;
140
141     /* starting byte of the operation */
142     off_t start;
143
144     /* ending byte of the operation */
145     off_t end;
146
147     /* size of the read/write buffer */
148     int reclen;
149
150     /* max number of pending requests before a wait is triggered */
151     int depth;
152
153     /* current number of pending requests */
154     int num_pending;
155
156     /* last error, zero if there were none */
157     int last_err;
158
159     /* total number of errors hit. */
160     int num_err;
161
162     /* read,write, random, etc */
163     int rw;
164
165     /* number of ios that will get sent to aio */
166     int total_ios;
167
168     /* number of ios we've already sent */
169     int started_ios;
170
171     /* last offset used in an io operation */
172     off_t last_offset;
173
174     /* stonewalled = 1 when we got cut off before submitting all our ios */
175     int stonewalled;
176
177     /* list management */
178     struct io_oper *next;
179     struct io_oper *prev;
180
181     struct timeval start_time;
182
183     char *file_name;
184 };
185
186 /* a single io, and all the tracking needed for it */
187 struct io_unit {
188     /* note, iocb must go first! */
189     struct iocb iocb;
190
191     /* pointer to parent io operation struct */
192     struct io_oper *io_oper;
193
194     /* aligned buffer */
195     char *buf;
196
197     /* size of the aligned buffer (record size) */
198     int buf_size;
199
200     /* state of this io unit (free, pending, done) */
201     int busy;
202
203     /* result of last operation */
204     long res;
205
206     struct io_unit *next;
207
208     struct timeval io_start_time;               /* time of io_submit */
209 };
210
211 struct thread_info {
212     io_context_t io_ctx;
213     pthread_t tid;
214
215     /* allocated array of io_unit structs */
216     struct io_unit *ios;
217
218     /* list of io units available for io */
219     struct io_unit *free_ious;
220
221     /* number of io units in the ios array */
222     int num_global_ios;
223
224     /* number of io units in flight */
225     int num_global_pending;
226
227     /* preallocated array of iocb pointers, only used in run_active */
228     struct iocb **iocbs;
229
230     /* preallocated array of events */
231     struct io_event *events;
232
233     /* size of the events array */
234     int num_global_events;
235
236     /* latency stats for io_submit */
237     struct io_latency io_submit_latency;
238
239     /* list of operations still in progress, and of those finished */
240     struct io_oper *active_opers;
241     struct io_oper *finished_opers;
242
243     /* number of files this thread is doing io on */
244     int num_files;
245
246     /* how much io this thread did in the last stage */
247     double stage_mb_trans;
248
249     /* latency completion stats i/o time from io_submit until io_getevents */
250     struct io_latency io_completion_latency;
251 };
252
253 /*
254  * return seconds between start_tv and stop_tv in double precision
255  */
256 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
257 {
258     double sec, usec;
259     double ret;
260     sec = stop_tv->tv_sec - start_tv->tv_sec;
261     usec = stop_tv->tv_usec - start_tv->tv_usec;
262     if (sec > 0 && usec < 0) {
263         sec--;
264         usec += 1000000;
265     } 
266     ret = sec + usec / (double)1000000;
267     if (ret < 0)
268         ret = 0;
269     return ret;
270 }
271
272 /*
273  * return seconds between start_tv and now in double precision
274  */
275 static double time_since_now(struct timeval *start_tv)
276 {
277     struct timeval stop_time;
278     gettimeofday(&stop_time, NULL);
279     return time_since(start_tv, &stop_time);
280 }
281
282 /*
283  * Add latency info to latency struct 
284  */
285 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
286                         struct io_latency *lat)
287 {
288     double delta;
289     int i;
290     delta = time_since(start_tv, stop_tv);
291     delta = delta * 1000;
292
293     if (delta > lat->max)
294         lat->max = delta;
295     if (!lat->min || delta < lat->min)
296         lat->min = delta;
297     lat->total_io++;
298     lat->total_lat += delta;
299     for (i = 0 ; i < DEVIATIONS ; i++) {
300         if (delta < deviations[i]) {
301             lat->deviations[i]++;
302             break;
303         }
304     }
305 }
306
307 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
308 {
309     if (!*list) {
310         *list = oper;
311         oper->prev = oper->next = oper;
312         return;
313     }
314     oper->prev = (*list)->prev;
315     oper->next = *list;
316     (*list)->prev->next = oper;
317     (*list)->prev = oper;
318     return;
319 }
320
321 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
322 {
323     if ((*list)->next == (*list)->prev && *list == (*list)->next) {
324         *list = NULL;
325         return;
326     }
327     oper->prev->next = oper->next;
328     oper->next->prev = oper->prev;
329     if (*list == oper)
330         *list = oper->next;
331 }
332
333 /* worker func to check error fields in the io unit */
334 static int check_finished_io(struct io_unit *io) {
335     int i;
336     if (io->res != io->buf_size) {
337
338                  struct stat s;
339                  fstat(io->io_oper->fd, &s);
340   
341                  /*
342                   * If file size is large enough for the read, then this short
343                   * read is an error.
344                   */
345                  if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
346                      s.st_size > (io->iocb.u.c.offset + io->res)) {
347   
348                                  fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
349                                                  io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
350                                                  io->iocb.u.c.offset, io->buf_size);
351                                  io->io_oper->last_err = io->res;
352                                  io->io_oper->num_err++;
353                                  return -1;
354                  }
355     }
356     if (verify && io->io_oper->rw == READ) {
357         if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
358             fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 
359                     io->io_oper->file_name, io->iocb.u.c.offset);
360             
361             for (i = 0 ; i < io->io_oper->reclen ; i++) {
362                 if (io->buf[i] != verify_buf[i]) {
363                     fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
364                 }
365             }
366             fprintf(stderr, "\n");
367         }
368
369     }
370     return 0;
371 }
372
373 /* worker func to check the busy bits and get an io unit ready for use */
374 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
375     if (io->busy == IO_PENDING)
376         return -1;
377
378     io->busy = IO_PENDING;
379     io->res = 0;
380     io->io_oper = oper;
381     return 0;
382 }
383
384 char *stage_name(int rw) {
385     switch(rw) {
386     case WRITE:
387         return "write";
388     case READ:
389         return "read";
390     case RWRITE:
391         return "random write";
392     case RREAD:
393         return "random read";
394     }
395     return "unknown";
396 }
397
398 static inline double oper_mb_trans(struct io_oper *oper) {
399     return ((double)oper->started_ios * (double)oper->reclen) /
400                 (double)(1024 * 1024);
401 }
402
403 static void print_time(struct io_oper *oper) {
404     double runtime;
405     double tput;
406     double mb;
407
408     runtime = time_since_now(&oper->start_time); 
409     mb = oper_mb_trans(oper);
410     tput = mb / runtime;
411     fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 
412             stage_name(oper->rw), oper->file_name, tput, mb, runtime);
413 }
414
415 static void print_lat(char *str, struct io_latency *lat) {
416     double avg = lat->total_lat / lat->total_io;
417     int i;
418     double total_counted = 0;
419     fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", 
420             str, lat->min, avg, lat->max);
421
422     for (i = 0 ; i < DEVIATIONS ; i++) {
423         fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
424         total_counted += lat->deviations[i];
425     }
426     if (total_counted && lat->total_io - total_counted)
427         fprintf(stderr, " < %.0f", lat->total_io - total_counted);
428     fprintf(stderr, "\n");
429     memset(lat, 0, sizeof(*lat));
430 }
431
432 static void print_latency(struct thread_info *t)
433 {
434     struct io_latency *lat = &t->io_submit_latency;
435     print_lat("latency", lat);
436 }
437
438 static void print_completion_latency(struct thread_info *t)
439 {
440     struct io_latency *lat = &t->io_completion_latency;
441     print_lat("completion latency", lat);
442 }
443
444 /*
445  * updates the fields in the io operation struct that belongs to this
446  * io unit, and make the io unit reusable again
447  */
448 void finish_io(struct thread_info *t, struct io_unit *io, long result,
449                 struct timeval *tv_now) {
450     struct io_oper *oper = io->io_oper;
451
452     calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
453     io->res = result;
454     io->busy = IO_FREE;
455     io->next = t->free_ious;
456     t->free_ious = io;
457     oper->num_pending--;
458     t->num_global_pending--;
459     check_finished_io(io);
460     if (oper->num_pending == 0 && 
461        (oper->started_ios == oper->total_ios || oper->stonewalled)) 
462     {
463         print_time(oper);
464     } 
465 }
466
467 int read_some_events(struct thread_info *t) {
468     struct io_unit *event_io;
469     struct io_event *event;
470     int nr;
471     int i; 
472     int min_nr = io_iter;
473     struct timeval stop_time;
474
475     if (t->num_global_pending < io_iter)
476         min_nr = t->num_global_pending;
477
478 #ifdef NEW_GETEVENTS
479     nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
480 #else
481     nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
482 #endif
483     if (nr <= 0)
484         return nr;
485
486     gettimeofday(&stop_time, NULL);
487     for (i = 0 ; i < nr ; i++) {
488         event = t->events + i;
489         event_io = (struct io_unit *)((unsigned long)event->obj); 
490         finish_io(t, event_io, event->res, &stop_time);
491     }
492     return nr;
493 }
494
495 /* 
496  * finds a free io unit, waiting for pending requests if required.  returns
497  * null if none could be found
498  */
499 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
500 {
501     struct io_unit *event_io;
502     int nr;
503
504 retry:
505     if (t->free_ious) {
506         event_io = t->free_ious;
507         t->free_ious = t->free_ious->next;
508         if (grab_iou(event_io, oper)) {
509             fprintf(stderr, "io unit on free list but not free\n");
510             abort();
511         }
512         return event_io;
513     }
514     nr = read_some_events(t);
515     if (nr > 0)
516         goto retry;
517     else
518         fprintf(stderr, "no free ious after read_some_events\n");
519     return NULL;
520 }
521
522 /*
523  * wait for all pending requests for this io operation to finish
524  */
525 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
526     struct io_event event;
527     struct io_unit *event_io;
528
529     if (oper == NULL) {
530         return 0;
531     }
532
533     if (oper->num_pending == 0)
534         goto done;
535
536     /* this func is not speed sensitive, no need to go wild reading
537      * more than one event at a time
538      */
539 #ifdef NEW_GETEVENTS
540     while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
541 #else
542     while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
543 #endif
544         struct timeval tv_now;
545         event_io = (struct io_unit *)((unsigned long)event.obj); 
546
547         gettimeofday(&tv_now, NULL);
548         finish_io(t, event_io, event.res, &tv_now);
549
550         if (oper->num_pending == 0)
551             break;
552     }
553 done:
554     if (oper->num_err) {
555         fprintf(stderr, "%u errors on oper, last %u\n", 
556                 oper->num_err, oper->last_err);
557     }
558     return 0;
559 }
560
561 off_t random_byte_offset(struct io_oper *oper) {
562     off_t num;
563     off_t rand_byte = oper->start;
564     off_t range;
565     off_t offset = 1;
566
567     range = (oper->end - oper->start) / (1024 * 1024);
568     if ((page_size_mask+1) > (1024 * 1024))
569         offset = (page_size_mask+1) / (1024 * 1024);
570     if (range < offset)
571         range = 0;
572     else
573         range -= offset;
574
575     /* find a random mb offset */
576     num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
577     rand_byte += num * 1024 * 1024;
578     
579     /* find a random byte offset */
580     num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
581
582     /* page align */
583     num = (num + page_size_mask) & ~page_size_mask;
584     rand_byte += num;
585
586     if (rand_byte + oper->reclen > oper->end) {
587         rand_byte -= oper->reclen;
588     }
589     return rand_byte;
590 }
591
592 /* 
593  * build an aio iocb for an operation, based on oper->rw and the
594  * last offset used.  This finds the struct io_unit that will be attached
595  * to the iocb, and things are ready for submission to aio after this
596  * is called.
597  *
598  * returns null on error
599  */
600 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
601 {
602     struct io_unit *io;
603     off_t rand_byte;
604
605     io = find_iou(t, oper);
606     if (!io) {
607         fprintf(stderr, "unable to find io unit\n");
608         return NULL;
609     }
610
611     switch(oper->rw) {
612     case WRITE:
613         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
614                        oper->last_offset);
615         oper->last_offset += oper->reclen;
616         break;
617     case READ:
618         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
619                       oper->last_offset);
620         oper->last_offset += oper->reclen;
621         break;
622     case RREAD:
623         rand_byte = random_byte_offset(oper);
624         oper->last_offset = rand_byte;
625         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
626                       rand_byte);
627         break;
628     case RWRITE:
629         rand_byte = random_byte_offset(oper);
630         oper->last_offset = rand_byte;
631         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
632                       rand_byte);
633         
634         break;
635     }
636
637     return io;
638 }
639
640 /* 
641  * wait for any pending requests, and then free all ram associated with
642  * an operation.  returns the last error the operation hit (zero means none)
643  */
644 static int
645 finish_oper(struct thread_info *t, struct io_oper *oper)
646 {
647     unsigned long last_err;
648
649     io_oper_wait(t, oper);
650     last_err = oper->last_err;
651     if (oper->num_pending > 0) {
652         fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
653     }
654     close(oper->fd);
655     free(oper);
656     return last_err;
657 }
658
659 /* 
660  * allocates an io operation and fills in all the fields.  returns
661  * null on error
662  */
663 static struct io_oper * 
664 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
665             int iter, char *file_name)
666 {
667     struct io_oper *oper;
668
669     oper = malloc (sizeof(*oper));
670     if (!oper) {
671         fprintf(stderr, "unable to allocate io oper\n");
672         return NULL;
673     }
674     memset(oper, 0, sizeof(*oper));
675
676     oper->depth = depth;
677     oper->start = start;
678     oper->end = end;
679     oper->last_offset = oper->start;
680     oper->fd = fd;
681     oper->reclen = reclen;
682     oper->rw = rw;
683     oper->total_ios = (oper->end - oper->start) / oper->reclen;
684     oper->file_name = file_name;
685
686     return oper;
687 }
688
689 /*
690  * does setup on num_ios worth of iocbs, but does not actually
691  * start any io
692  */
693 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 
694                struct iocb **my_iocbs) 
695 {
696     int i;
697     struct io_unit *io;
698
699     if (oper->started_ios == 0)
700         gettimeofday(&oper->start_time, NULL);
701
702     if (num_ios == 0)
703         num_ios = oper->total_ios;
704
705     if ((oper->started_ios + num_ios) > oper->total_ios)
706         num_ios = oper->total_ios - oper->started_ios;   
707
708     for( i = 0 ; i < num_ios ; i++) {
709         io = build_iocb(t, oper);
710         if (!io) {
711             return -1;    
712         }
713         my_iocbs[i] = &io->iocb;
714     }
715     return num_ios;
716 }
717
718 /*
719  * runs through the iocbs in the array provided and updates
720  * counters in the associated oper struct
721  */
722 static void update_iou_counters(struct iocb **my_iocbs, int nr,
723         struct timeval *tv_now) 
724 {
725     struct io_unit *io;
726     int i;
727     for (i = 0 ; i < nr ; i++) {
728         io = (struct io_unit *)(my_iocbs[i]);
729         io->io_oper->num_pending++;
730         io->io_oper->started_ios++;
731         io->io_start_time = *tv_now;    /* set time of io_submit */
732     }
733 }
734
735 /* starts some io for a given file, returns zero if all went well */
736 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 
737 {
738     int ret;
739     struct timeval start_time;
740     struct timeval stop_time;
741
742 resubmit:
743     gettimeofday(&start_time, NULL);
744     ret = io_submit(t->io_ctx, num_ios, my_iocbs);
745     gettimeofday(&stop_time, NULL);
746     calc_latency(&start_time, &stop_time, &t->io_submit_latency);
747
748     if (ret != num_ios) {
749         /* some ios got through */
750         if (ret > 0) {
751             update_iou_counters(my_iocbs, ret, &stop_time);
752             my_iocbs += ret;
753             t->num_global_pending += ret;
754             num_ios -= ret;
755         }
756         /* 
757          * we've used all the requests allocated in aio_init, wait and
758          * retry
759          */
760         if (ret > 0 || ret == -EAGAIN) {
761             int old_ret = ret;
762             if ((ret = read_some_events(t) > 0)) {
763                 goto resubmit;
764             } else {
765                 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
766                 abort();
767             }
768         }
769
770         fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
771         return -1;
772     }
773     update_iou_counters(my_iocbs, ret, &stop_time);
774     t->num_global_pending += ret;
775     return 0;
776 }
777
778 /* 
779  * changes oper->rw to the next in a command sequence, or returns zero
780  * to say this operation is really, completely done for
781  */
782 static int restart_oper(struct io_oper *oper) {
783     int new_rw  = 0;
784     if (oper->last_err)
785         return 0;
786
787     /* this switch falls through */
788     switch(oper->rw) {
789     case WRITE:
790         if (stages & (1 << READ))
791             new_rw = READ;
792     case READ:
793         if (!new_rw && stages & (1 << RWRITE))
794             new_rw = RWRITE;
795     case RWRITE:
796         if (!new_rw && stages & (1 << RREAD))
797             new_rw = RREAD;
798     }
799
800     if (new_rw) {
801         oper->started_ios = 0;
802         oper->last_offset = oper->start;
803         oper->stonewalled = 0;
804
805         /* 
806          * we're restarting an operation with pending requests, so the
807          * timing info won't be printed by finish_io.  Printing it here
808          */
809         if (oper->num_pending)
810             print_time(oper);
811
812         oper->rw = new_rw;
813         return 1;
814     } 
815     return 0;
816 }
817
818 static int oper_runnable(struct io_oper *oper) {
819     struct stat buf;
820     int ret;
821
822     /* first context is always runnable, if started_ios > 0, no need to
823      * redo the calculations
824      */
825     if (oper->started_ios || oper->start == 0)
826         return 1;
827     /*
828      * only the sequential phases force delays in starting */
829     if (oper->rw >= RWRITE)
830         return 1;
831     ret = fstat(oper->fd, &buf);
832     if (ret < 0) {
833         perror("fstat");
834         exit(1);
835     }
836     if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
837         return 0;
838     return 1;
839 }
840
841 /*
842  * runs through all the io operations on the active list, and starts
843  * a chunk of io on each.  If any io operations are completely finished,
844  * it either switches them to the next stage or puts them on the 
845  * finished list.
846  *
847  * this function stops after max_io_submit iocbs are sent down the 
848  * pipe, even if it has not yet touched all the operations on the 
849  * active list.  Any operations that have finished are moved onto
850  * the finished_opers list.
851  */
852 static int run_active_list(struct thread_info *t,
853                          int io_iter,
854                          int max_io_submit)
855 {
856     struct io_oper *oper;
857     struct io_oper *built_opers = NULL;
858     struct iocb **my_iocbs = t->iocbs;
859     int ret = 0;
860     int num_built = 0;
861
862     oper = t->active_opers;
863     while(oper) {
864         if (!oper_runnable(oper)) {
865             oper = oper->next;
866             if (oper == t->active_opers)
867                 break;
868             continue;
869         }
870         ret = build_oper(t, oper, io_iter, my_iocbs);
871         if (ret >= 0) {
872             my_iocbs += ret;
873             num_built += ret;
874             oper_list_del(oper, &t->active_opers);
875             oper_list_add(oper, &built_opers);
876             oper = t->active_opers;
877             if (num_built + io_iter > max_io_submit)
878                 break;
879         } else
880             break;
881     }
882     if (num_built) {
883         ret = run_built(t, num_built, t->iocbs);
884         if (ret < 0) {
885             fprintf(stderr, "error %d on run_built\n", ret);
886             exit(1);
887         }
888         while(built_opers) {
889             oper = built_opers;
890             oper_list_del(oper, &built_opers);
891             oper_list_add(oper, &t->active_opers);
892             if (oper->started_ios == oper->total_ios) {
893                 oper_list_del(oper, &t->active_opers);
894                 oper_list_add(oper, &t->finished_opers);
895             }
896         }
897     }
898     return 0;
899 }
900
901 void drop_shm() {
902     int ret;
903     struct shmid_ds ds;
904     if (use_shm != USE_SHM)
905         return;
906
907     ret = shmctl(shm_id, IPC_RMID, &ds);
908     if (ret) {
909         perror("shmctl IPC_RMID");
910     }
911 }
912
913 void aio_setup(io_context_t *io_ctx, int n)
914 {
915     int res = io_queue_init(n, io_ctx);
916     if (res != 0) {
917         fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
918                 n, res, strerror(-res));
919         exit(3);
920     }
921 }
922
923 /*
924  * allocate io operation and event arrays for a given thread
925  */
926 int setup_ious(struct thread_info *t, 
927               int num_files, int depth, 
928               int reclen, int max_io_submit) {
929     int i;
930     size_t bytes = num_files * depth * sizeof(*t->ios);
931
932     t->ios = malloc(bytes);
933     if (!t->ios) {
934         fprintf(stderr, "unable to allocate io units\n");
935         return -1;
936     }
937     memset(t->ios, 0, bytes);
938
939     for (i = 0 ; i < depth * num_files; i++) {
940         t->ios[i].buf = aligned_buffer;
941         aligned_buffer += padded_reclen;
942         t->ios[i].buf_size = reclen;
943         if (verify)
944             memset(t->ios[i].buf, 'b', reclen);
945         else
946             memset(t->ios[i].buf, 0, reclen);
947         t->ios[i].next = t->free_ious;
948         t->free_ious = t->ios + i;
949     }
950     if (verify) {
951         verify_buf = aligned_buffer;
952         memset(verify_buf, 'b', reclen);
953     }
954
955     t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
956     if (!t->iocbs) {
957         fprintf(stderr, "unable to allocate iocbs\n");
958         goto free_buffers;
959     }
960
961     memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
962
963     t->events = malloc(sizeof(struct io_event) * depth * num_files);
964     if (!t->events) {
965         fprintf(stderr, "unable to allocate ram for events\n");
966         goto free_buffers;
967     }
968     memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
969
970     t->num_global_ios = num_files * depth;
971     t->num_global_events = t->num_global_ios;
972     return 0;
973
974 free_buffers:
975     if (t->ios)
976         free(t->ios);
977     if (t->iocbs)
978         free(t->iocbs);  
979     if (t->events)
980         free(t->events);
981     return -1;
982 }
983
984 /*
985  * The buffers used for file data are allocated as a single big
986  * malloc, and then each thread and operation takes a piece and uses
987  * that for file data.  This lets us do a large shm or bigpages alloc
988  * and without trying to find a special place in each thread to map the
989  * buffers to
990  */
991 int setup_shared_mem(int num_threads, int num_files, int depth, 
992                      int reclen, int max_io_submit) 
993 {
994     char *p = NULL;
995     size_t total_ram;
996     
997     padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
998     padded_reclen = padded_reclen * (page_size_mask+1);
999     total_ram = num_files * depth * padded_reclen + num_threads;
1000     if (verify)
1001         total_ram += padded_reclen;
1002
1003     if (use_shm == USE_MALLOC) {
1004         p = malloc(total_ram + page_size_mask);
1005     } else if (use_shm == USE_SHM) {
1006         shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1007         if (shm_id < 0) {
1008             perror("shmget");
1009             drop_shm();
1010             goto free_buffers;
1011         }
1012         p = shmat(shm_id, (char *)0x50000000, 0);
1013         if ((long)p == -1) {
1014             perror("shmat");
1015             goto free_buffers;
1016         }
1017         /* won't really be dropped until we shmdt */
1018         drop_shm();
1019     } else if (use_shm == USE_SHMFS) {
1020         char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */    
1021         int fd;
1022
1023         strcpy(mmap_name, "/dev/shm/XXXXXX");
1024         fd = mkstemp(mmap_name);
1025         if (fd < 0) {
1026             perror("mkstemp");
1027             goto free_buffers;
1028         }
1029         unlink(mmap_name);
1030         ftruncate(fd, total_ram);
1031         shm_id = fd;
1032         p = mmap((char *)0x50000000, total_ram,
1033                  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1034
1035         if (p == MAP_FAILED) {
1036             perror("mmap");
1037             goto free_buffers;
1038         }
1039     }
1040     if (!p) {
1041         fprintf(stderr, "unable to allocate buffers\n");
1042         goto free_buffers;
1043     }
1044     unaligned_buffer = p;
1045     p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1046     aligned_buffer = p;
1047     return 0;
1048
1049 free_buffers:
1050     drop_shm();
1051     if (unaligned_buffer)
1052         free(unaligned_buffer);
1053     return -1;
1054 }
1055
1056 /*
1057  * runs through all the thread_info structs and calculates a combined
1058  * throughput
1059  */
1060 void global_thread_throughput(struct thread_info *t, char *this_stage) {
1061     int i;
1062     double runtime = time_since_now(&global_stage_start_time);
1063     double total_mb = 0;
1064     double min_trans = 0;
1065
1066     for (i = 0 ; i < num_threads ; i++) {
1067         total_mb += global_thread_info[i].stage_mb_trans;
1068         if (!min_trans || t->stage_mb_trans < min_trans)
1069             min_trans = t->stage_mb_trans;
1070     }
1071     if (total_mb) {
1072         fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1073                 total_mb / runtime);
1074         fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1075         if (stonewall)
1076             fprintf(stderr, " min transfer %.2fMB", min_trans);
1077         fprintf(stderr, "\n");
1078     }
1079 }
1080
1081
1082 /* this is the meat of the state machine.  There is a list of
1083  * active operations structs, and as each one finishes the required
1084  * io it is moved to a list of finished operations.  Once they have
1085  * all finished whatever stage they were in, they are given the chance
1086  * to restart and pick a different stage (read/write/random read etc)
1087  *
1088  * various timings are printed in between the stages, along with
1089  * thread synchronization if there are more than one threads.
1090  */
1091 int worker(struct thread_info *t)
1092 {
1093     struct io_oper *oper;
1094     char *this_stage = NULL;
1095     struct timeval stage_time;
1096     int status = 0;
1097     int iteration = 0;
1098     int cnt;
1099
1100     aio_setup(&t->io_ctx, 512);
1101
1102 restart:
1103     if (num_threads > 1) {
1104         pthread_mutex_lock(&stage_mutex);
1105         threads_starting++;
1106         if (threads_starting == num_threads) {
1107             threads_ending = 0;
1108             gettimeofday(&global_stage_start_time, NULL);
1109             pthread_cond_broadcast(&stage_cond);
1110         }
1111         while (threads_starting != num_threads)
1112             pthread_cond_wait(&stage_cond, &stage_mutex);
1113         pthread_mutex_unlock(&stage_mutex);
1114     }
1115     if (t->active_opers) {
1116         this_stage = stage_name(t->active_opers->rw);
1117         gettimeofday(&stage_time, NULL);
1118         t->stage_mb_trans = 0;
1119     }
1120
1121     cnt = 0;
1122     /* first we send everything through aio */
1123     while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1124         if (stonewall && threads_ending) {
1125             oper = t->active_opers;
1126             oper->stonewalled = 1;
1127             oper_list_del(oper, &t->active_opers);
1128             oper_list_add(oper, &t->finished_opers);
1129         } else {
1130             run_active_list(t, io_iter,  max_io_submit);
1131         }
1132         cnt++;
1133     }
1134     if (latency_stats)
1135         print_latency(t);
1136
1137     if (completion_latency_stats)
1138         print_completion_latency(t);
1139
1140     /* then we wait for all the operations to finish */
1141     oper = t->finished_opers;
1142     do {
1143         if (!oper)
1144                 break;
1145         io_oper_wait(t, oper);
1146         oper = oper->next;
1147     } while(oper != t->finished_opers);
1148
1149     /* then we do an fsync to get the timing for any future operations
1150      * right, and check to see if any of these need to get restarted
1151      */
1152     oper = t->finished_opers;
1153     while(oper) {
1154         if (fsync_stages)
1155             fsync(oper->fd);
1156         t->stage_mb_trans += oper_mb_trans(oper);
1157         if (restart_oper(oper)) {
1158             oper_list_del(oper, &t->finished_opers);
1159             oper_list_add(oper, &t->active_opers);
1160             oper = t->finished_opers;
1161             continue;
1162         }
1163         oper = oper->next;
1164         if (oper == t->finished_opers)
1165             break;
1166     } 
1167
1168     if (t->stage_mb_trans && t->num_files > 0) {
1169         double seconds = time_since_now(&stage_time);
1170         fprintf(stderr, "thread %llu %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1171                 (unsigned long long)(t - global_thread_info), this_stage,
1172                 t->stage_mb_trans/seconds, t->stage_mb_trans, seconds);
1173     }
1174
1175     if (num_threads > 1) {
1176         pthread_mutex_lock(&stage_mutex);
1177         threads_ending++;
1178         if (threads_ending == num_threads) {
1179             threads_starting = 0;
1180             pthread_cond_broadcast(&stage_cond);
1181             global_thread_throughput(t, this_stage);
1182         }
1183         while(threads_ending != num_threads)
1184             pthread_cond_wait(&stage_cond, &stage_mutex);
1185         pthread_mutex_unlock(&stage_mutex);
1186     }
1187     
1188     /* someone got restarted, go back to the beginning */
1189     if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1190         iteration++;
1191         goto restart;
1192     }
1193
1194     /* finally, free all the ram */
1195     while(t->finished_opers) {
1196         oper = t->finished_opers;
1197         oper_list_del(oper, &t->finished_opers);
1198         status = finish_oper(t, oper);
1199     }
1200
1201     if (t->num_global_pending) {
1202         fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1203     }
1204     io_queue_release(t->io_ctx);
1205     
1206     return status;
1207 }
1208
1209 typedef void * (*start_routine)(void *);
1210 int run_workers(struct thread_info *t, int num_threads)
1211 {
1212     int ret;
1213     int i;
1214
1215     for(i = 0 ; i < num_threads ; i++) {
1216         ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1217         if (ret) {
1218             perror("pthread_create");
1219             exit(1);
1220         }
1221     }
1222     for(i = 0 ; i < num_threads ; i++) {
1223         ret = pthread_join(t[i].tid, NULL);
1224         if (ret) {
1225             perror("pthread_join");
1226             exit(1);
1227         }
1228     }
1229     return 0;
1230 }
1231
1232 off_t parse_size(char *size_arg, off_t mult) {
1233     char c;
1234     int num;
1235     off_t ret;
1236     c = size_arg[strlen(size_arg) - 1];
1237     if (c > '9') {
1238         size_arg[strlen(size_arg) - 1] = '\0';
1239     }
1240     num = atoi(size_arg);
1241     switch(c) {
1242     case 'g':
1243     case 'G':
1244         mult = 1024 * 1024 * 1024;
1245         break;
1246     case 'm':
1247     case 'M':
1248         mult = 1024 * 1024;
1249         break;
1250     case 'k':
1251     case 'K':
1252         mult = 1024;
1253         break;
1254     case 'b':
1255     case 'B':
1256         mult = 1;
1257         break;
1258     }
1259     ret = mult * num;
1260     return ret;
1261 }
1262
1263 void print_usage(void) {
1264     printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1265     printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1266     printf("                  file1 [file2 ...]\n");
1267     printf("\t-a size in KB at which to align buffers\n");
1268     printf("\t-b max number of iocbs to give io_submit at once\n");
1269     printf("\t-c number of io contexts per file\n");
1270     printf("\t-C offset between contexts, default 2MB\n");
1271     printf("\t-s size in MB of the test file(s), default 1024MB\n");
1272     printf("\t-r record size in KB used for each io, default 64KB\n");
1273     printf("\t-d number of pending aio requests for each file, default 64\n");
1274     printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1275     printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1276     printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1277     printf("\t-S Use O_SYNC for writes\n");
1278     printf("\t-o add an operation to the list: write=0, read=1,\n"); 
1279     printf("\t   random write=2, random read=3.\n");
1280     printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1281     printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1282     printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1283     printf("\t-n no fsyncs between write stage and read stage\n");
1284     printf("\t-l print io_submit latencies after each stage\n");
1285     printf("\t-L print io completion latencies after each stage\n");
1286     printf("\t-t number of threads to run\n");
1287     printf("\t-u unlink files after completion\n");
1288     printf("\t-v verification of bytes written\n");
1289     printf("\t-x turn off thread stonewalling\n");
1290     printf("\t-h this message\n");
1291     printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1292     printf("\t   translate to 400KB, 400MB and 400GB\n");
1293     printf("version %s\n", PROG_VERSION);
1294 }
1295
1296 int main(int ac, char **av) 
1297 {
1298     int rwfd;
1299     int i;
1300     int j;
1301     int c;
1302
1303     off_t file_size = 1 * 1024 * 1024 * 1024;
1304     int first_stage = WRITE;
1305     struct io_oper *oper;
1306     int status = 0;
1307     int num_files = 0;
1308     int open_fds = 0;
1309     struct thread_info *t;
1310
1311     page_size_mask = getpagesize() - 1;
1312
1313     while(1) {
1314         c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1315         if  (c < 0)
1316             break;
1317
1318         switch(c) {
1319         case 'a':
1320             page_size_mask = parse_size(optarg, 1024);
1321             page_size_mask--;
1322             break;
1323         case 'c':
1324             num_contexts = atoi(optarg);
1325             break;
1326         case 'C':
1327             context_offset = parse_size(optarg, 1024 * 1024);
1328         case 'b':
1329             max_io_submit = atoi(optarg);
1330             break;
1331         case 's':
1332             file_size = parse_size(optarg, 1024 * 1024);
1333             break;
1334         case 'd':
1335             depth = atoi(optarg);
1336             break;
1337         case 'r':
1338             rec_len = parse_size(optarg, 1024);
1339             break;
1340         case 'i':
1341             io_iter = atoi(optarg);
1342             break;
1343         case 'I':
1344           iterations = atoi(optarg);
1345         break;
1346         case 'n':
1347             fsync_stages = 0;
1348             break;
1349         case 'l':
1350             latency_stats = 1;
1351             break;
1352         case 'L':
1353             completion_latency_stats = 1;
1354             break;
1355         case 'm':
1356             if (!strcmp(optarg, "shm")) {
1357                 fprintf(stderr, "using ipc shm\n");
1358                 use_shm = USE_SHM;
1359             } else if (!strcmp(optarg, "shmfs")) {
1360                 fprintf(stderr, "using /dev/shm for buffers\n");
1361                 use_shm = USE_SHMFS;
1362             }
1363             break;
1364         case 'o': 
1365             i = atoi(optarg);
1366             stages |= 1 << i;
1367             fprintf(stderr, "adding stage %s\n", stage_name(i));
1368             break;
1369         case 'O':
1370             o_direct = O_DIRECT;
1371             break;
1372         case 'S':
1373             o_sync = O_SYNC;
1374             break;
1375         case 't':
1376             num_threads = atoi(optarg);
1377             break;
1378         case 'x':
1379             stonewall = 0;
1380             break;
1381         case 'u':
1382             unlink_files = 1;
1383             break;
1384         case 'v':
1385             verify = 1;
1386             break;
1387         case 'h':
1388         default:
1389             print_usage();
1390             exit(1);
1391         }
1392     }
1393
1394     /* 
1395      * make sure we don't try to submit more ios than we have allocated
1396      * memory for
1397      */
1398     if (depth < io_iter) {
1399         io_iter = depth;
1400         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1401     }
1402
1403     if (optind >= ac) {
1404         print_usage();
1405         exit(1);
1406     }
1407
1408     num_files = ac - optind;
1409
1410     if (num_threads > (num_files * num_contexts)) {
1411         num_threads = num_files * num_contexts;
1412         fprintf(stderr, "dropping thread count to the number of contexts %d\n", 
1413                 num_threads);
1414     }
1415
1416     t = calloc(num_threads, sizeof(*t));
1417     if (!t) {
1418         perror("calloc");
1419         exit(1);
1420     }
1421     global_thread_info = t;
1422
1423     /* by default, allow a huge number of iocbs to be sent towards
1424      * io_submit
1425      */
1426     if (!max_io_submit)
1427         max_io_submit = num_files * io_iter * num_contexts;
1428
1429     /*
1430      * make sure we don't try to submit more ios than max_io_submit allows 
1431      */
1432     if (max_io_submit < io_iter) {
1433         io_iter = max_io_submit;
1434         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1435     }
1436
1437     if (!stages) {
1438         stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1439     } else {
1440         for (i = 0 ; i < LAST_STAGE; i++) {
1441             if (stages & (1 << i)) {
1442                 first_stage = i;
1443                 fprintf(stderr, "starting with %s\n", stage_name(i));
1444                 break;
1445             }
1446         }
1447     }
1448
1449     if (file_size < num_contexts * context_offset) {
1450         fprintf(stderr, "file size %Lu too small for %d contexts\n", 
1451                 (unsigned long long)file_size, num_contexts);
1452         exit(1);
1453     }
1454
1455     fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n",
1456             (unsigned long long)file_size / (1024 * 1024),
1457             rec_len / 1024, depth, io_iter);
1458     fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 
1459             max_io_submit, (page_size_mask + 1)/1024);
1460     fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 
1461             num_threads, num_files, num_contexts, 
1462             (unsigned long long)context_offset / (1024 * 1024),
1463             verify ? "on" : "off");
1464     /* open all the files and do any required setup for them */
1465     for (i = optind ; i < ac ; i++) {
1466         int thread_index;
1467         for (j = 0 ; j < num_contexts ; j++) {
1468             thread_index = open_fds % num_threads;
1469             open_fds++;
1470
1471             rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1472             assert(rwfd != -1);
1473
1474             oper = create_oper(rwfd, first_stage, j * context_offset, 
1475                                file_size - j * context_offset, rec_len, 
1476                                depth, io_iter, av[i]);
1477             if (!oper) {
1478                 fprintf(stderr, "error in create_oper\n");
1479                 exit(-1);
1480             }
1481             oper_list_add(oper, &t[thread_index].active_opers);
1482             t[thread_index].num_files++;
1483         }
1484     }
1485     if (setup_shared_mem(num_threads, num_files * num_contexts, 
1486                          depth, rec_len, max_io_submit))
1487     {
1488         exit(1);
1489     }
1490     for (i = 0 ; i < num_threads ; i++) {
1491         if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1492                 exit(1);
1493     }
1494     if (num_threads > 1){
1495         printf("Running multi thread version num_threads:%d\n", num_threads);
1496         run_workers(t, num_threads);
1497     } else {
1498         printf("Running single thread version \n");
1499         status = worker(t);
1500     }
1501     if (unlink_files) {
1502         for (i = optind ; i < ac ; i++) {
1503             printf("Cleaning up file %s \n", av[i]);
1504             unlink(av[i]);
1505         }
1506     }
1507
1508     if (status) {
1509         exit(1);
1510     }
1511     return status;
1512 }
1513
1514