6ebe082301570fc715a3027fad7a484572b498da
[xfstests-dev.git] / ltp / aio-stress.c
1 /*
2  */
3 #define _FILE_OFFSET_BITS 64
4 #define PROG_VERSION "0.18"
5 #define NEW_GETEVENTS
6
7 #include <stdio.h>
8 #include <errno.h>
9 #include <assert.h>
10 #include <stdlib.h>
11
12 #include <sys/types.h>
13 #include <sys/stat.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 #include <sys/time.h>
17 #include <libaio.h>
18 #include <sys/ipc.h>
19 #include <sys/shm.h>
20 #include <sys/mman.h>
21 #include <string.h>
22 #include <pthread.h>
23
24 #define IO_FREE 0
25 #define IO_PENDING 1
26 #define RUN_FOREVER -1
27
28 #ifndef O_DIRECT
29 #define O_DIRECT         040000 /* direct disk access hint */
30 #endif
31
32 enum {
33     WRITE,
34     READ,
35     RWRITE,
36     RREAD,
37     LAST_STAGE,
38 };
39
40 #define USE_MALLOC 0
41 #define USE_SHM 1
42 #define USE_SHMFS 2
43
44 /* 
45  * various globals, these are effectively read only by the time the threads
46  * are started
47  */
48 long stages = 0;
49 unsigned long page_size_mask;
50 int o_direct = 0;
51 int o_sync = 0;
52 int latency_stats = 0;
53 int io_iter = 8;
54 int iterations = RUN_FOREVER;
55 int max_io_submit = 0;
56 long rec_len = 64 * 1024;
57 int depth = 64;
58 int num_threads = 1;
59 int num_contexts = 1;
60 off_t context_offset = 2 * 1024 * 1024;
61 int fsync_stages = 1;
62 int use_shm = 0;
63 int shm_id;
64 char *unaligned_buffer = NULL;
65 char *aligned_buffer = NULL;
66 int padded_reclen = 0;
67 int stonewall = 1;
68 int verify = 0;
69 char *verify_buf = NULL;
70
71 struct io_unit;
72 struct thread_info;
73
74 /* pthread mutexes and other globals for keeping the threads in sync */
75 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
76 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
77 int threads_ending = 0;
78 int threads_starting = 0;
79 struct timeval global_stage_start_time;
80 struct thread_info *global_thread_info;
81
82 /* 
83  * latencies during io_submit are measured, these are the 
84  * granularities for deviations 
85  */
86 #define DEVIATIONS 6
87 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
88 struct io_latency {
89     double max;
90     double min;
91     double total_io;
92     double total_lat;
93     double deviations[DEVIATIONS]; 
94 };
95
96 /* container for a series of operations to a file */
97 struct io_oper {
98     /* already open file descriptor, valid for whatever operation you want */
99     int fd;
100
101     /* starting byte of the operation */
102     off_t start;
103
104     /* ending byte of the operation */
105     off_t end;
106
107     /* size of the read/write buffer */
108     int reclen;
109
110     /* max number of pending requests before a wait is triggered */
111     int depth;
112
113     /* current number of pending requests */
114     int num_pending;
115
116     /* last error, zero if there were none */
117     int last_err;
118
119     /* total number of errors hit. */
120     int num_err;
121
122     /* read,write, random, etc */
123     int rw;
124
125     /* number of ios that will get sent to aio */
126     int total_ios;
127
128     /* number of ios we've already sent */
129     int started_ios;
130
131     /* last offset used in an io operation */
132     off_t last_offset;
133
134     /* stonewalled = 1 when we got cut off before submitting all our ios */
135     int stonewalled;
136
137     /* list management */
138     struct io_oper *next;
139     struct io_oper *prev;
140
141     struct timeval start_time;
142
143     char *file_name;
144 };
145
146 /* a single io, and all the tracking needed for it */
147 struct io_unit {
148     /* note, iocb must go first! */
149     struct iocb iocb;
150
151     /* pointer to parent io operation struct */
152     struct io_oper *io_oper;
153
154     /* aligned buffer */
155     char *buf;
156
157     /* size of the aligned buffer (record size) */
158     int buf_size;
159
160     /* state of this io unit (free, pending, done) */
161     int busy;
162
163     /* result of last operation */
164     long res;
165
166     struct io_unit *next;
167 };
168
169 struct thread_info {
170     io_context_t io_ctx;
171     pthread_t tid;
172
173     /* allocated array of io_unit structs */
174     struct io_unit *ios;
175
176     /* list of io units available for io */
177     struct io_unit *free_ious;
178
179     /* number of io units in the ios array */
180     int num_global_ios;
181
182     /* number of io units in flight */
183     int num_global_pending;
184
185     /* preallocated array of iocb pointers, only used in run_active */
186     struct iocb **iocbs;
187
188     /* preallocated array of events */
189     struct io_event *events;
190
191     /* size of the events array */
192     int num_global_events;
193
194     /* latency stats for io_submit */
195     struct io_latency io_submit_latency;
196
197     /* list of operations still in progress, and of those finished */
198     struct io_oper *active_opers;
199     struct io_oper *finished_opers;
200
201     /* number of files this thread is doing io on */
202     int num_files;
203
204     /* how much io this thread did in the last stage */
205     double stage_mb_trans;
206 };
207
208 static double time_since(struct timeval *tv) {
209     double sec, usec;
210     double ret;
211     struct timeval stop;
212     gettimeofday(&stop, NULL);
213     sec = stop.tv_sec - tv->tv_sec;
214     usec = stop.tv_usec - tv->tv_usec;
215     if (sec > 0 && usec < 0) {
216         sec--;
217         usec += 1000000;
218     } 
219     ret = sec + usec / (double)1000000;
220     if (ret < 0)
221         ret = 0;
222     return ret;
223 }
224
225 static void calc_latency(struct timeval *tv, struct io_latency *lat)
226 {
227     double delta;
228     int i;
229     delta = time_since(tv);
230     delta = delta * 1000;
231
232     if (delta > lat->max)
233         lat->max = delta;
234     if (!lat->min || delta < lat->min)
235         lat->min = delta;
236     lat->total_io++;
237     lat->total_lat += delta;
238     for (i = 0 ; i < DEVIATIONS ; i++) {
239         if (delta < deviations[i]) {
240             lat->deviations[i]++;
241             break;
242         }
243     }
244 }
245
246 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
247 {
248     if (!*list) {
249         *list = oper;
250         oper->prev = oper->next = oper;
251         return;
252     }
253     oper->prev = (*list)->prev;
254     oper->next = *list;
255     (*list)->prev->next = oper;
256     (*list)->prev = oper;
257     return;
258 }
259
260 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
261 {
262     if ((*list)->next == (*list)->prev && *list == (*list)->next) {
263         *list = NULL;
264         return;
265     }
266     oper->prev->next = oper->next;
267     oper->next->prev = oper->prev;
268     if (*list == oper)
269         *list = oper->next;
270 }
271
272 /* worker func to check error fields in the io unit */
273 static int check_finished_io(struct io_unit *io) {
274     int i;
275     if (io->res != io->buf_size) {
276
277                  struct stat s;
278                  fstat(io->io_oper->fd, &s);
279   
280                  /*
281                   * If file size is large enough for the read, then this short
282                   * read is an error.
283                   */
284                  if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
285                      s.st_size > (io->iocb.u.c.offset + io->res)) {
286   
287                                  fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
288                                                  io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
289                                                  io->iocb.u.c.offset, io->buf_size);
290                                  io->io_oper->last_err = io->res;
291                                  io->io_oper->num_err++;
292                                  return -1;
293                  }
294     }
295     if (verify && io->io_oper->rw == READ) {
296         if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
297             fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 
298                     io->io_oper->file_name, io->iocb.u.c.offset);
299             
300             for (i = 0 ; i < io->io_oper->reclen ; i++) {
301                 if (io->buf[i] != verify_buf[i]) {
302                     fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
303                 }
304             }
305             fprintf(stderr, "\n");
306         }
307
308     }
309     return 0;
310 }
311
312 /* worker func to check the busy bits and get an io unit ready for use */
313 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
314     if (io->busy == IO_PENDING)
315         return -1;
316
317     io->busy = IO_PENDING;
318     io->res = 0;
319     io->io_oper = oper;
320     return 0;
321 }
322
323 char *stage_name(int rw) {
324     switch(rw) {
325     case WRITE:
326         return "write";
327     case READ:
328         return "read";
329     case RWRITE:
330         return "random write";
331     case RREAD:
332         return "random read";
333     }
334     return "unknown";
335 }
336
337 static inline double oper_mb_trans(struct io_oper *oper) {
338     return ((double)oper->started_ios * (double)oper->reclen) /
339                 (double)(1024 * 1024);
340 }
341
342 static void print_time(struct io_oper *oper) {
343     double runtime;
344     double tput;
345     double mb;
346
347     runtime = time_since(&oper->start_time); 
348     mb = oper_mb_trans(oper);
349     tput = mb / runtime;
350     fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 
351             stage_name(oper->rw), oper->file_name, tput, mb, runtime);
352 }
353
354 static void print_latency(struct thread_info *t) {
355     struct io_latency *lat = &t->io_submit_latency;
356     double avg = lat->total_lat / lat->total_io;
357     int i;
358     double total_counted = 0;
359     fprintf(stderr, "latency min %.2f avg %.2f max %.2f\n\t", 
360             lat->min, avg, lat->max);
361
362     for (i = 0 ; i < DEVIATIONS ; i++) {
363         fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
364         total_counted += lat->deviations[i];
365     }
366     if (total_counted && lat->total_io - total_counted)
367         fprintf(stderr, " < %.0f", lat->total_io - total_counted);
368     fprintf(stderr, "\n");
369     memset(&t->io_submit_latency, 0, sizeof(t->io_submit_latency));
370 }
371
372 /*
373  * updates the fields in the io operation struct that belongs to this
374  * io unit, and make the io unit reusable again
375  */
376 void finish_io(struct thread_info *t, struct io_unit *io, long result) {
377     struct io_oper *oper = io->io_oper;
378
379     io->res = result;
380     io->busy = IO_FREE;
381     io->next = t->free_ious;
382     t->free_ious = io;
383     oper->num_pending--;
384     t->num_global_pending--;
385     check_finished_io(io);
386     if (oper->num_pending == 0 && 
387        (oper->started_ios == oper->total_ios || oper->stonewalled)) 
388     {
389         print_time(oper);
390     } 
391 }
392
393 int read_some_events(struct thread_info *t) {
394     struct io_unit *event_io;
395     struct io_event *event;
396     int nr;
397     int i; 
398     int min_nr = io_iter;
399
400     if (t->num_global_pending < io_iter)
401         min_nr = t->num_global_pending;
402
403 #ifdef NEW_GETEVENTS
404     nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
405 #else
406     nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
407 #endif
408     if (nr <= 0)
409         return nr;
410
411     for (i = 0 ; i < nr ; i++) {
412         event = t->events + i;
413         event_io = (struct io_unit *)((unsigned long)event->obj); 
414         finish_io(t, event_io, event->res);
415     }
416     return nr;
417 }
418
419 /* 
420  * finds a free io unit, waiting for pending requests if required.  returns
421  * null if none could be found
422  */
423 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
424 {
425     struct io_unit *event_io;
426     int nr;
427
428 retry:
429     if (t->free_ious) {
430         event_io = t->free_ious;
431         t->free_ious = t->free_ious->next;
432         if (grab_iou(event_io, oper)) {
433             fprintf(stderr, "io unit on free list but not free\n");
434             abort();
435         }
436         return event_io;
437     }
438     nr = read_some_events(t);
439     if (nr > 0)
440         goto retry;
441     else
442         fprintf(stderr, "no free ious after read_some_events\n");
443     return NULL;
444 }
445
446 /*
447  * wait for all pending requests for this io operation to finish
448  */
449 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
450     struct io_event event;
451     struct io_unit *event_io;
452
453     if (oper == NULL) {
454         return 0;
455     }
456
457     if (oper->num_pending == 0)
458         goto done;
459
460     /* this func is not speed sensitive, no need to go wild reading
461      * more than one event at a time
462      */
463 #ifdef NEW_GETEVENTS
464     while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
465 #else
466     while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
467 #endif
468         event_io = (struct io_unit *)((unsigned long)event.obj); 
469
470         finish_io(t, event_io, event.res);
471
472         if (oper->num_pending == 0)
473             break;
474     }
475 done:
476     if (oper->num_err) {
477         fprintf(stderr, "%u errors on oper, last %u\n", 
478                 oper->num_err, oper->last_err);
479     }
480     return 0;
481 }
482
483 off_t random_byte_offset(struct io_oper *oper) {
484     off_t num;
485     off_t rand_byte = oper->start;
486     off_t range;
487     off_t offset = 1;
488
489     range = (oper->end - oper->start) / (1024 * 1024);
490     if ((page_size_mask+1) > (1024 * 1024))
491         offset = (page_size_mask+1) / (1024 * 1024);
492     if (range < offset)
493         range = 0;
494     else
495         range -= offset;
496
497     /* find a random mb offset */
498     num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
499     rand_byte += num * 1024 * 1024;
500     
501     /* find a random byte offset */
502     num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
503
504     /* page align */
505     num = (num + page_size_mask) & ~page_size_mask;
506     rand_byte += num;
507
508     if (rand_byte + oper->reclen > oper->end) {
509         rand_byte -= oper->reclen;
510     }
511     return rand_byte;
512 }
513
514 /* 
515  * build an aio iocb for an operation, based on oper->rw and the
516  * last offset used.  This finds the struct io_unit that will be attached
517  * to the iocb, and things are ready for submission to aio after this
518  * is called.
519  *
520  * returns null on error
521  */
522 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
523 {
524     struct io_unit *io;
525     off_t rand_byte;
526
527     io = find_iou(t, oper);
528     if (!io) {
529         fprintf(stderr, "unable to find io unit\n");
530         return NULL;
531     }
532
533     switch(oper->rw) {
534     case WRITE:
535         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
536                        oper->last_offset);
537         oper->last_offset += oper->reclen;
538         break;
539     case READ:
540         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
541                       oper->last_offset);
542         oper->last_offset += oper->reclen;
543         break;
544     case RREAD:
545         rand_byte = random_byte_offset(oper);
546         oper->last_offset = rand_byte;
547         io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 
548                       rand_byte);
549         break;
550     case RWRITE:
551         rand_byte = random_byte_offset(oper);
552         oper->last_offset = rand_byte;
553         io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 
554                       rand_byte);
555         
556         break;
557     }
558
559     return io;
560 }
561
562 /* 
563  * wait for any pending requests, and then free all ram associated with
564  * an operation.  returns the last error the operation hit (zero means none)
565  */
566 static int
567 finish_oper(struct thread_info *t, struct io_oper *oper)
568 {
569     unsigned long last_err;
570
571     io_oper_wait(t, oper);
572     last_err = oper->last_err;
573     if (oper->num_pending > 0) {
574         fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
575     }
576     close(oper->fd);
577     free(oper);
578     return last_err;
579 }
580
581 /* 
582  * allocates an io operation and fills in all the fields.  returns
583  * null on error
584  */
585 static struct io_oper * 
586 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
587             int iter, char *file_name)
588 {
589     struct io_oper *oper;
590
591     oper = malloc (sizeof(*oper));
592     if (!oper) {
593         fprintf(stderr, "unable to allocate io oper\n");
594         return NULL;
595     }
596     memset(oper, 0, sizeof(*oper));
597
598     oper->depth = depth;
599     oper->start = start;
600     oper->end = end;
601     oper->last_offset = oper->start;
602     oper->fd = fd;
603     oper->reclen = reclen;
604     oper->rw = rw;
605     oper->total_ios = (oper->end - oper->start) / oper->reclen;
606     oper->file_name = file_name;
607
608     return oper;
609 }
610
611 /*
612  * does setup on num_ios worth of iocbs, but does not actually
613  * start any io
614  */
615 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 
616                struct iocb **my_iocbs) 
617 {
618     int i;
619     struct io_unit *io;
620
621     if (oper->started_ios == 0)
622         gettimeofday(&oper->start_time, NULL);
623
624     if (num_ios == 0)
625         num_ios = oper->total_ios;
626
627     if ((oper->started_ios + num_ios) > oper->total_ios)
628         num_ios = oper->total_ios - oper->started_ios;   
629
630     for( i = 0 ; i < num_ios ; i++) {
631         io = build_iocb(t, oper);
632         if (!io) {
633             return -1;    
634         }
635         my_iocbs[i] = &io->iocb;
636     }
637     return num_ios;
638 }
639
640 /*
641  * runs through the iocbs in the array provided and updates
642  * counters in the associated oper struct
643  */
644 static void update_iou_counters(struct iocb **my_iocbs, int nr) 
645 {
646     struct io_unit *io;
647     int i;
648     for (i = 0 ; i < nr ; i++) {
649         io = (struct io_unit *)(my_iocbs[i]);
650         io->io_oper->num_pending++;
651         io->io_oper->started_ios++;
652     }
653 }
654
655 /* starts some io for a given file, returns zero if all went well */
656 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 
657 {
658     int ret;
659     struct timeval start_time;
660
661 resubmit:
662     gettimeofday(&start_time, NULL);
663     ret = io_submit(t->io_ctx, num_ios, my_iocbs);
664     calc_latency(&start_time, &t->io_submit_latency);
665     if (ret != num_ios) {
666         /* some ios got through */
667         if (ret > 0) {
668             update_iou_counters(my_iocbs, ret);
669             my_iocbs += ret;
670             t->num_global_pending += ret;
671             num_ios -= ret;
672         }
673         /* 
674          * we've used all the requests allocated in aio_init, wait and
675          * retry
676          */
677         if (ret > 0 || ret == -EAGAIN) {
678             if ((ret = read_some_events(t) > 0)) {
679                 goto resubmit;
680             }
681         }
682
683         fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
684         return -1;
685     }
686     update_iou_counters(my_iocbs, ret);
687     t->num_global_pending += ret;
688     return 0;
689 }
690
691 /* 
692  * changes oper->rw to the next in a command sequence, or returns zero
693  * to say this operation is really, completely done for
694  */
695 static int restart_oper(struct io_oper *oper) {
696     int new_rw  = 0;
697     if (oper->last_err)
698         return 0;
699
700     /* this switch falls through */
701     switch(oper->rw) {
702     case WRITE:
703         if (stages & (1 << READ))
704             new_rw = READ;
705     case READ:
706         if (!new_rw && stages & (1 << RWRITE))
707             new_rw = RWRITE;
708     case RWRITE:
709         if (!new_rw && stages & (1 << RREAD))
710             new_rw = RREAD;
711     }
712
713     if (new_rw) {
714         oper->started_ios = 0;
715         oper->last_offset = oper->start;
716         oper->stonewalled = 0;
717
718         /* 
719          * we're restarting an operation with pending requests, so the
720          * timing info won't be printed by finish_io.  Printing it here
721          */
722         if (oper->num_pending)
723             print_time(oper);
724
725         oper->rw = new_rw;
726         return 1;
727     } 
728     return 0;
729 }
730
731 static int oper_runnable(struct io_oper *oper) {
732     struct stat buf;
733     int ret;
734
735     /* first context is always runnable, if started_ios > 0, no need to
736      * redo the calculations
737      */
738     if (oper->started_ios || oper->start == 0)
739         return 1;
740     /*
741      * only the sequential phases force delays in starting */
742     if (oper->rw >= RWRITE)
743         return 1;
744     ret = fstat(oper->fd, &buf);
745     if (ret < 0) {
746         perror("fstat");
747         exit(1);
748     }
749     if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
750         return 0;
751     return 1;
752 }
753
754 /*
755  * runs through all the io operations on the active list, and starts
756  * a chunk of io on each.  If any io operations are completely finished,
757  * it either switches them to the next stage or puts them on the 
758  * finished list.
759  *
760  * this function stops after max_io_submit iocbs are sent down the 
761  * pipe, even if it has not yet touched all the operations on the 
762  * active list.  Any operations that have finished are moved onto
763  * the finished_opers list.
764  */
765 static int run_active_list(struct thread_info *t,
766                          int io_iter,
767                          int max_io_submit)
768 {
769     struct io_oper *oper;
770     struct io_oper *built_opers = NULL;
771     struct iocb **my_iocbs = t->iocbs;
772     int ret = 0;
773     int num_built = 0;
774
775     oper = t->active_opers;
776     while(oper) {
777         if (!oper_runnable(oper)) {
778             oper = oper->next;
779             if (oper == t->active_opers)
780                 break;
781             continue;
782         }
783         ret = build_oper(t, oper, io_iter, my_iocbs);
784         if (ret >= 0) {
785             my_iocbs += ret;
786             num_built += ret;
787             oper_list_del(oper, &t->active_opers);
788             oper_list_add(oper, &built_opers);
789             oper = t->active_opers;
790             if (num_built + io_iter > max_io_submit)
791                 break;
792         } else
793             break;
794     }
795     if (num_built) {
796         ret = run_built(t, num_built, t->iocbs);
797         if (ret < 0) {
798             fprintf(stderr, "error %d on run_built\n", ret);
799             exit(1);
800         }
801         while(built_opers) {
802             oper = built_opers;
803             oper_list_del(oper, &built_opers);
804             oper_list_add(oper, &t->active_opers);
805             if (oper->started_ios == oper->total_ios) {
806                 oper_list_del(oper, &t->active_opers);
807                 oper_list_add(oper, &t->finished_opers);
808             }
809         }
810     }
811     return 0;
812 }
813
814 void drop_shm() {
815     int ret;
816     struct shmid_ds ds;
817     if (use_shm != USE_SHM)
818         return;
819
820     ret = shmctl(shm_id, IPC_RMID, &ds);
821     if (ret) {
822         perror("shmctl IPC_RMID");
823     }
824 }
825
826 void aio_setup(io_context_t *io_ctx, int n)
827 {
828     int res = io_queue_init(n, io_ctx);
829     if (res != 0) {
830         fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
831                 n, res, strerror(-res));
832         exit(3);
833     }
834 }
835
836 /*
837  * allocate io operation and event arrays for a given thread
838  */
839 int setup_ious(struct thread_info *t, 
840               int num_files, int depth, 
841               int reclen, int max_io_submit) {
842     int i;
843     size_t bytes = num_files * depth * sizeof(*t->ios);
844
845     t->ios = malloc(bytes);
846     if (!t->ios) {
847         fprintf(stderr, "unable to allocate io units\n");
848         return -1;
849     }
850     memset(t->ios, 0, bytes);
851
852     for (i = 0 ; i < depth * num_files; i++) {
853         t->ios[i].buf = aligned_buffer;
854         aligned_buffer += padded_reclen;
855         t->ios[i].buf_size = reclen;
856         if (verify)
857             memset(t->ios[i].buf, 'b', reclen);
858         else
859             memset(t->ios[i].buf, 0, reclen);
860         t->ios[i].next = t->free_ious;
861         t->free_ious = t->ios + i;
862     }
863     if (verify) {
864         verify_buf = aligned_buffer;
865         memset(verify_buf, 'b', reclen);
866     }
867
868     t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
869     if (!t->iocbs) {
870         fprintf(stderr, "unable to allocate iocbs\n");
871         goto free_buffers;
872     }
873
874     memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
875
876     t->events = malloc(sizeof(struct io_event) * depth * num_files);
877     if (!t->events) {
878         fprintf(stderr, "unable to allocate ram for events\n");
879         goto free_buffers;
880     }
881     memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
882
883     t->num_global_ios = num_files * depth;
884     t->num_global_events = t->num_global_ios;
885     return 0;
886
887 free_buffers:
888     if (t->ios)
889         free(t->ios);
890     if (t->iocbs)
891         free(t->iocbs);  
892     if (t->events)
893         free(t->events);
894     return -1;
895 }
896
897 /*
898  * The buffers used for file data are allocated as a single big
899  * malloc, and then each thread and operation takes a piece and uses
900  * that for file data.  This lets us do a large shm or bigpages alloc
901  * and without trying to find a special place in each thread to map the
902  * buffers to
903  */
904 int setup_shared_mem(int num_threads, int num_files, int depth, 
905                      int reclen, int max_io_submit) 
906 {
907     char *p = NULL;
908     size_t total_ram;
909     
910     padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
911     padded_reclen = padded_reclen * (page_size_mask+1);
912     total_ram = num_files * depth * padded_reclen + num_threads;
913     if (verify)
914         total_ram += padded_reclen;
915
916     if (use_shm == USE_MALLOC) {
917         p = malloc(total_ram + page_size_mask);
918     } else if (use_shm == USE_SHM) {
919         shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
920         if (shm_id < 0) {
921             perror("shmget");
922             drop_shm();
923             goto free_buffers;
924         }
925         p = shmat(shm_id, (char *)0x50000000, 0);
926         if ((long)p == -1) {
927             perror("shmat");
928             goto free_buffers;
929         }
930         /* won't really be dropped until we shmdt */
931         drop_shm();
932     } else if (use_shm == USE_SHMFS) {
933         char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */    
934         int fd;
935
936         strcpy(mmap_name, "/dev/shm/XXXXXX");
937         fd = mkstemp(mmap_name);
938         if (fd < 0) {
939             perror("mkstemp");
940             goto free_buffers;
941         }
942         unlink(mmap_name);
943         ftruncate(fd, total_ram);
944         shm_id = fd;
945         p = mmap((char *)0x50000000, total_ram,
946                  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
947
948         if (p == MAP_FAILED) {
949             perror("mmap");
950             goto free_buffers;
951         }
952     }
953     if (!p) {
954         fprintf(stderr, "unable to allocate buffers\n");
955         goto free_buffers;
956     }
957     unaligned_buffer = p;
958     p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
959     aligned_buffer = p;
960     return 0;
961
962 free_buffers:
963     drop_shm();
964     if (unaligned_buffer)
965         free(unaligned_buffer);
966     return -1;
967 }
968
969 /*
970  * runs through all the thread_info structs and calculates a combined
971  * throughput
972  */
973 void global_thread_throughput(struct thread_info *t, char *this_stage) {
974     int i;
975     double runtime = time_since(&global_stage_start_time);
976     double total_mb = 0;
977     double min_trans = 0;
978
979     for (i = 0 ; i < num_threads ; i++) {
980         total_mb += global_thread_info[i].stage_mb_trans;
981         if (!min_trans || t->stage_mb_trans < min_trans)
982             min_trans = t->stage_mb_trans;
983     }
984     if (total_mb) {
985         fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
986                 total_mb / runtime);
987         fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
988         if (stonewall)
989             fprintf(stderr, " min transfer %.2fMB", min_trans);
990         fprintf(stderr, "\n");
991     }
992 }
993
994
995 /* this is the meat of the state machine.  There is a list of
996  * active operations structs, and as each one finishes the required
997  * io it is moved to a list of finished operations.  Once they have
998  * all finished whatever stage they were in, they are given the chance
999  * to restart and pick a different stage (read/write/random read etc)
1000  *
1001  * various timings are printed in between the stages, along with
1002  * thread synchronization if there are more than one threads.
1003  */
1004 int worker(struct thread_info *t)
1005 {
1006     struct io_oper *oper;
1007     char *this_stage = NULL;
1008     struct timeval stage_time;
1009     int status = 0;
1010     int iteration = 0;
1011     int cnt;
1012
1013     aio_setup(&t->io_ctx, 512);
1014
1015 restart:
1016     printf("Starting %s iter:%d \n", __FUNCTION__,iteration);
1017     if (num_threads > 1) {
1018         printf("num_threads %d \n", num_threads);
1019         pthread_mutex_lock(&stage_mutex);
1020         threads_starting++;
1021         if (threads_starting == num_threads) {
1022             threads_ending = 0;
1023             gettimeofday(&global_stage_start_time, NULL);
1024             pthread_cond_broadcast(&stage_cond);
1025         }
1026         while (threads_starting != num_threads)
1027             pthread_cond_wait(&stage_cond, &stage_mutex);
1028         pthread_mutex_unlock(&stage_mutex);
1029     }
1030     if (t->active_opers) {
1031 //        printf("active_opers %p line:%d\n", t->active_opers, __LINE__);
1032         this_stage = stage_name(t->active_opers->rw);
1033         gettimeofday(&stage_time, NULL);
1034         t->stage_mb_trans = 0;
1035     }
1036     cnt = 0;
1037     /* first we send everything through aio */
1038 //    printf("cnt:%d max_iterations:%d oper:%p\n",cnt, iterations,oper);
1039                         
1040     while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1041 //        printf("active_opers %p line:%d cnt:%d ", t->active_opers,__LINE__,cnt);
1042         if (stonewall && threads_ending) {
1043             oper = t->active_opers;     
1044             oper->stonewalled = 1;
1045             oper_list_del(oper, &t->active_opers);
1046             oper_list_add(oper, &t->finished_opers);
1047 //            printf(" if branch\n");
1048         } else {
1049             run_active_list(t, io_iter,  max_io_submit);
1050 //            printf(" else branch\n");
1051         }
1052         cnt++;
1053     }
1054   
1055     if (latency_stats)
1056         print_latency(t);
1057
1058     /* then we wait for all the operations to finish */
1059     oper = t->finished_opers;
1060 //    printf("line:%d oper:%p\n", __LINE__, oper);
1061     do {
1062         io_oper_wait(t, oper);
1063         if (oper != NULL) {
1064             oper = oper->next;
1065         }
1066     } while (oper != t->finished_opers);
1067 //    printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__);
1068
1069     /* then we do an fsync to get the timing for any future operations
1070      * right, and check to see if any of these need to get restarted
1071      */
1072     oper = t->finished_opers;
1073 //    printf("oper %p line:%d\n", oper,__LINE__);
1074     while (oper) {
1075         if (fsync_stages)
1076             fsync(oper->fd);
1077         t->stage_mb_trans += oper_mb_trans(oper);
1078         if (restart_oper(oper)) {
1079             oper_list_del(oper, &t->finished_opers);
1080             oper_list_add(oper, &t->active_opers);
1081             oper = t->finished_opers;
1082             continue;
1083         }
1084         oper = oper->next;
1085         if (oper == t->finished_opers)
1086             break;
1087     } 
1088
1089     if (t->stage_mb_trans && t->num_files > 0) {
1090 //        printf("num_files %d line:%d\n", t->num_files,__LINE__);
1091         double seconds = time_since(&stage_time);
1092         fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 
1093                 t - global_thread_info, this_stage, t->stage_mb_trans/seconds, 
1094                 t->stage_mb_trans, seconds);
1095     }
1096
1097     if (num_threads > 1) {
1098 //        printf("num_threads %d line:%d\n", num_threads,__LINE__);
1099         pthread_mutex_lock(&stage_mutex);
1100         threads_ending++;
1101         if (threads_ending == num_threads) {
1102             threads_starting = 0;
1103             pthread_cond_broadcast(&stage_cond);
1104             global_thread_throughput(t, this_stage);
1105         }
1106 //        printf("threads_ending %d line:%d\n", threads_ending,__LINE__);
1107         while (threads_ending != num_threads)
1108             pthread_cond_wait(&stage_cond, &stage_mutex);
1109         pthread_mutex_unlock(&stage_mutex);
1110     }
1111
1112     /* someone got restarted, go back to the beginning */
1113     if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1114         iteration++;
1115         goto restart;
1116     }
1117
1118     /* finally, free all the ram */
1119 //    printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__);
1120     while (t->finished_opers) {
1121         oper = t->finished_opers;
1122         oper_list_del(oper, &t->finished_opers);
1123         status = finish_oper(t, oper);
1124     }
1125
1126     if (t->num_global_pending) {
1127         fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1128     }
1129     io_queue_release(t->io_ctx);
1130
1131     return status;
1132 }
1133
1134 typedef void * (*start_routine)(void *);
1135 int run_workers(struct thread_info *t, int num_threads)
1136 {
1137     int ret;
1138     int thread_ret;
1139     int i;
1140
1141 //    printf("%s num_threads %d line:%d\n", __FUNCTION__,num_threads,__LINE__);
1142     for(i = 0 ; i < num_threads ; i++) {
1143         ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1144         if (ret) {
1145             perror("pthread_create");
1146             exit(1);
1147         }
1148     }
1149     for(i = 0 ; i < num_threads ; i++) {
1150         ret = pthread_join(t[i].tid, (void *)&thread_ret);
1151         if (ret) {
1152             perror("pthread_join");
1153             exit(1);
1154         }
1155     }
1156     return 0;
1157 }
1158
1159 off_t parse_size(char *size_arg, off_t mult) {
1160     char c;
1161     int num;
1162     off_t ret;
1163     c = size_arg[strlen(size_arg) - 1];
1164     if (c > '9') {
1165         size_arg[strlen(size_arg) - 1] = '\0';
1166     }
1167     num = atoi(size_arg);
1168     switch(c) {
1169     case 'g':
1170     case 'G':
1171         mult = 1024 * 1024 * 1024;
1172         break;
1173     case 'm':
1174     case 'M':
1175         mult = 1024 * 1024;
1176         break;
1177     case 'k':
1178     case 'K':
1179         mult = 1024;
1180         break;
1181     }
1182     ret = mult * num;
1183     return ret;
1184 }
1185
1186 void print_usage(void) {
1187     printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1188     printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1189     printf("                  file1 [file2 ...]\n");
1190     printf("\t-a size in KB at which to align buffers\n");
1191     printf("\t-b max number of iocbs to give io_submit at once\n");
1192     printf("\t-c number of io contexts per file\n");
1193     printf("\t-C offset between contexts, default 2MB\n");
1194     printf("\t-s size in MB of the test file(s), default 1024MB\n");
1195     printf("\t-r record size in KB used for each io, default 64KB\n");
1196     printf("\t-d number of pending aio requests for each file, default 64\n");
1197     printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1198     printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1199     printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1200     printf("\t-S Use O_SYNC for writes\n");
1201     printf("\t-o add an operation to the list: write=0, read=1,\n"); 
1202     printf("\t   random write=2, random read=3.\n");
1203     printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1204     printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1205     printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1206     printf("\t-n no fsyncs between write stage and read stage\n");
1207     printf("\t-l print io_submit latencies after each stage\n");
1208     printf("\t-t number of threads to run\n");
1209     printf("\t-v verification of bytes written\n");
1210     printf("\t-x turn off thread stonewalling\n");
1211     printf("\t-h this message\n");
1212     printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1213     printf("\t   translate to 400KB, 400MB and 400GB\n");
1214     printf("version %s\n", PROG_VERSION);
1215 }
1216
1217 int main(int ac, char **av) 
1218 {
1219     int rwfd;
1220     int i;
1221     int j;
1222     int c;
1223
1224     off_t file_size = 1 * 1024 * 1024 * 1024;
1225     int first_stage = WRITE;
1226     struct io_oper *oper;
1227     int status = 0;
1228     int num_files = 0;
1229     int open_fds = 0;
1230     struct thread_info *t;
1231
1232     page_size_mask = getpagesize() - 1;
1233
1234     while(1) {
1235         c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lnhOSxv");
1236         if  (c < 0)
1237             break;
1238
1239         switch(c) {
1240         case 'a':
1241             page_size_mask = parse_size(optarg, 1024);
1242             page_size_mask--;
1243             break;
1244         case 'c':
1245             num_contexts = atoi(optarg);
1246             break;
1247         case 'C':
1248             context_offset = parse_size(optarg, 1024 * 1024);
1249         case 'b':
1250             max_io_submit = atoi(optarg);
1251             break;
1252         case 's':
1253             file_size = parse_size(optarg, 1024 * 1024);
1254             break;
1255         case 'd':
1256             depth = atoi(optarg);
1257             break;
1258         case 'r':
1259             rec_len = parse_size(optarg, 1024);
1260             break;
1261         case 'i':
1262             io_iter = atoi(optarg);
1263             break;
1264         case 'I':
1265           iterations = atoi(optarg);
1266         break;
1267         case 'n':
1268             fsync_stages = 0;
1269             break;
1270         case 'l':
1271             latency_stats = 1;
1272             break;
1273         case 'm':
1274             if (!strcmp(optarg, "shm")) {
1275                 fprintf(stderr, "using ipc shm\n");
1276                 use_shm = USE_SHM;
1277             } else if (!strcmp(optarg, "shmfs")) {
1278                 fprintf(stderr, "using /dev/shm for buffers\n");
1279                 use_shm = USE_SHMFS;
1280             }
1281             break;
1282         case 'o': 
1283             i = atoi(optarg);
1284             stages |= 1 << i;
1285             fprintf(stderr, "adding stage %s\n", stage_name(i));
1286             break;
1287         case 'O':
1288             o_direct = O_DIRECT;
1289             break;
1290         case 'S':
1291             o_sync = O_SYNC;
1292             break;
1293         case 't':
1294             num_threads = atoi(optarg);
1295             break;
1296         case 'x':
1297             stonewall = 0;
1298             break;
1299         case 'v':
1300             verify = 1;
1301             break;
1302         case 'h':
1303         default:
1304             print_usage();
1305             exit(1);
1306         }
1307     }
1308
1309     /* 
1310      * make sure we don't try to submit more ios than we have allocated
1311      * memory for
1312      */
1313     if (depth < io_iter) {
1314         io_iter = depth;
1315         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1316     }
1317
1318     if (optind >= ac) {
1319         print_usage();
1320         exit(1);
1321     }
1322
1323     num_files = ac - optind;
1324
1325     if (num_threads > (num_files * num_contexts)) {
1326         num_threads = num_files * num_contexts;
1327         fprintf(stderr, "dropping thread count to the number of contexts %d\n", 
1328                 num_threads);
1329     }
1330
1331     t = malloc(num_threads * sizeof(*t));
1332     if (!t) {
1333         perror("malloc");
1334         exit(1);
1335     }
1336     global_thread_info = t;
1337
1338     /* by default, allow a huge number of iocbs to be sent towards
1339      * io_submit
1340      */
1341     if (!max_io_submit)
1342         max_io_submit = num_files * io_iter * num_contexts;
1343
1344     /*
1345      * make sure we don't try to submit more ios than max_io_submit allows 
1346      */
1347     if (max_io_submit < io_iter) {
1348         io_iter = max_io_submit;
1349         fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1350     }
1351
1352     if (!stages) {
1353         stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1354     } else {
1355         for (i = 0 ; i < LAST_STAGE; i++) {
1356             if (stages & (1 << i)) {
1357                 first_stage = i;
1358                 fprintf(stderr, "starting with %s\n", stage_name(i));
1359                 break;
1360             }
1361         }
1362     }
1363
1364     if (file_size < num_contexts * context_offset) {
1365         fprintf(stderr, "file size %Lu too small for %d contexts\n", 
1366                 file_size, num_contexts);
1367         exit(1);
1368     }
1369
1370     fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
1371     fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 
1372             max_io_submit, (page_size_mask + 1)/1024);
1373     fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 
1374             num_threads, num_files, num_contexts, 
1375             context_offset / (1024 * 1024), verify ? "on" : "off");
1376     /* open all the files and do any required setup for them */
1377     for (i = optind ; i < ac ; i++) {
1378         int thread_index;
1379         for (j = 0 ; j < num_contexts ; j++) {
1380             thread_index = open_fds % num_threads;
1381             open_fds++;
1382 //          fprintf(stderr, "adding file %s thread %d\n", av[i], thread_index);
1383
1384             rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1385             assert(rwfd != -1);
1386
1387             oper = create_oper(rwfd, first_stage, j * context_offset, 
1388                                file_size - j * context_offset, rec_len, 
1389                                depth, io_iter, av[i]);
1390             if (!oper) {
1391                 fprintf(stderr, "error in create_oper\n");
1392                 exit(-1);
1393             }
1394             oper_list_add(oper, &t[thread_index].active_opers);
1395             t[thread_index].num_files++;
1396         }
1397     }
1398     if (setup_shared_mem(num_threads, num_files * num_contexts, 
1399                          depth, rec_len, max_io_submit))
1400     {
1401         exit(1);
1402     }
1403     for (i = 0 ; i < num_threads ; i++) {
1404         if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1405                 exit(1);
1406     }
1407     if (num_threads > 1){
1408         printf("Running multi thread version num_threads:%d\n", num_threads);
1409         run_workers(t, num_threads);
1410     }
1411     else {
1412         printf("Running single thread version \n");
1413         status = worker(t);
1414     }
1415
1416
1417     for (i = optind ; i < ac ; i++) {
1418         printf("Cleaning up file %s \n", av[i]);
1419         unlink(av[i]);
1420     }
1421
1422     if (status) {
1423     printf("non zero return %d \n", status);
1424     }
1425     else{
1426         printf("aio-stress Completed successfully %d \n", status);
1427     }
1428
1429     exit(0);
1430 }
1431