2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like. Any license provided herein, whether implied or
15 * otherwise, applies only to this software file. Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
26 * will open or create each file on the command line, and start a series
29 * aio is done in a rotating loop. first file1 gets 8 requests, then
30 * file2, then file3 etc. As each file finishes writing, it is switched
33 * io buffers are aligned in case you want to do raw io
35 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
37 * run aio-stress -h to see the options
39 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
41 #define _FILE_OFFSET_BITS 64
42 #define PROG_VERSION "0.21"
50 #include <sys/types.h>
64 #define RUN_FOREVER -1
67 #define O_DIRECT 040000 /* direct disk access hint */
83 * various globals, these are effectively read only by the time the threads
87 unsigned long page_size_mask;
90 int latency_stats = 0;
91 int completion_latency_stats = 0;
93 int iterations = RUN_FOREVER;
94 int max_io_submit = 0;
95 long rec_len = 64 * 1024;
99 off_t context_offset = 2 * 1024 * 1024;
100 int fsync_stages = 1;
103 char *unaligned_buffer = NULL;
104 char *aligned_buffer = NULL;
105 int padded_reclen = 0;
108 char *verify_buf = NULL;
109 int unlink_files = 0;
114 /* pthread mutexes and other globals for keeping the threads in sync */
115 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
116 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
117 int threads_ending = 0;
118 int threads_starting = 0;
119 struct timeval global_stage_start_time;
120 struct thread_info *global_thread_info;
123 * latencies during io_submit are measured, these are the
124 * granularities for deviations
127 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
133 double deviations[DEVIATIONS];
136 /* container for a series of operations to a file */
138 /* already open file descriptor, valid for whatever operation you want */
141 /* starting byte of the operation */
144 /* ending byte of the operation */
147 /* size of the read/write buffer */
150 /* max number of pending requests before a wait is triggered */
153 /* current number of pending requests */
156 /* last error, zero if there were none */
159 /* total number of errors hit. */
162 /* read,write, random, etc */
165 /* number of ios that will get sent to aio */
168 /* number of ios we've already sent */
171 /* last offset used in an io operation */
174 /* stonewalled = 1 when we got cut off before submitting all our ios */
177 /* list management */
178 struct io_oper *next;
179 struct io_oper *prev;
181 struct timeval start_time;
186 /* a single io, and all the tracking needed for it */
188 /* note, iocb must go first! */
191 /* pointer to parent io operation struct */
192 struct io_oper *io_oper;
197 /* size of the aligned buffer (record size) */
200 /* state of this io unit (free, pending, done) */
203 /* result of last operation */
206 struct io_unit *next;
208 struct timeval io_start_time; /* time of io_submit */
215 /* allocated array of io_unit structs */
218 /* list of io units available for io */
219 struct io_unit *free_ious;
221 /* number of io units in the ios array */
224 /* number of io units in flight */
225 int num_global_pending;
227 /* preallocated array of iocb pointers, only used in run_active */
230 /* preallocated array of events */
231 struct io_event *events;
233 /* size of the events array */
234 int num_global_events;
236 /* latency stats for io_submit */
237 struct io_latency io_submit_latency;
239 /* list of operations still in progress, and of those finished */
240 struct io_oper *active_opers;
241 struct io_oper *finished_opers;
243 /* number of files this thread is doing io on */
246 /* how much io this thread did in the last stage */
247 double stage_mb_trans;
249 /* latency completion stats i/o time from io_submit until io_getevents */
250 struct io_latency io_completion_latency;
254 * return seconds between start_tv and stop_tv in double precision
256 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
260 sec = stop_tv->tv_sec - start_tv->tv_sec;
261 usec = stop_tv->tv_usec - start_tv->tv_usec;
262 if (sec > 0 && usec < 0) {
266 ret = sec + usec / (double)1000000;
273 * return seconds between start_tv and now in double precision
275 static double time_since_now(struct timeval *start_tv)
277 struct timeval stop_time;
278 gettimeofday(&stop_time, NULL);
279 return time_since(start_tv, &stop_time);
283 * Add latency info to latency struct
285 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
286 struct io_latency *lat)
290 delta = time_since(start_tv, stop_tv);
291 delta = delta * 1000;
293 if (delta > lat->max)
295 if (!lat->min || delta < lat->min)
298 lat->total_lat += delta;
299 for (i = 0 ; i < DEVIATIONS ; i++) {
300 if (delta < deviations[i]) {
301 lat->deviations[i]++;
307 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
311 oper->prev = oper->next = oper;
314 oper->prev = (*list)->prev;
316 (*list)->prev->next = oper;
317 (*list)->prev = oper;
321 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
323 if ((*list)->next == (*list)->prev && *list == (*list)->next) {
327 oper->prev->next = oper->next;
328 oper->next->prev = oper->prev;
333 /* worker func to check error fields in the io unit */
334 static int check_finished_io(struct io_unit *io) {
336 if (io->res != io->buf_size) {
339 fstat(io->io_oper->fd, &s);
342 * If file size is large enough for the read, then this short
345 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
346 s.st_size > (io->iocb.u.c.offset + io->res)) {
348 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
349 io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
350 io->iocb.u.c.offset, io->buf_size);
351 io->io_oper->last_err = io->res;
352 io->io_oper->num_err++;
356 if (verify && io->io_oper->rw == READ) {
357 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
358 fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
359 io->io_oper->file_name, io->iocb.u.c.offset);
361 for (i = 0 ; i < io->io_oper->reclen ; i++) {
362 if (io->buf[i] != verify_buf[i]) {
363 fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
366 fprintf(stderr, "\n");
373 /* worker func to check the busy bits and get an io unit ready for use */
374 static int grab_iou(struct io_unit *io, struct io_oper *oper) {
375 if (io->busy == IO_PENDING)
378 io->busy = IO_PENDING;
384 char *stage_name(int rw) {
391 return "random write";
393 return "random read";
398 static inline double oper_mb_trans(struct io_oper *oper) {
399 return ((double)oper->started_ios * (double)oper->reclen) /
400 (double)(1024 * 1024);
403 static void print_time(struct io_oper *oper) {
408 runtime = time_since_now(&oper->start_time);
409 mb = oper_mb_trans(oper);
411 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
412 stage_name(oper->rw), oper->file_name, tput, mb, runtime);
415 static void print_lat(char *str, struct io_latency *lat) {
416 double avg = lat->total_lat / lat->total_io;
418 double total_counted = 0;
419 fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
420 str, lat->min, avg, lat->max);
422 for (i = 0 ; i < DEVIATIONS ; i++) {
423 fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
424 total_counted += lat->deviations[i];
426 if (total_counted && lat->total_io - total_counted)
427 fprintf(stderr, " < %.0f", lat->total_io - total_counted);
428 fprintf(stderr, "\n");
429 memset(lat, 0, sizeof(*lat));
432 static void print_latency(struct thread_info *t)
434 struct io_latency *lat = &t->io_submit_latency;
435 print_lat("latency", lat);
438 static void print_completion_latency(struct thread_info *t)
440 struct io_latency *lat = &t->io_completion_latency;
441 print_lat("completion latency", lat);
445 * updates the fields in the io operation struct that belongs to this
446 * io unit, and make the io unit reusable again
448 void finish_io(struct thread_info *t, struct io_unit *io, long result,
449 struct timeval *tv_now) {
450 struct io_oper *oper = io->io_oper;
452 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
455 io->next = t->free_ious;
458 t->num_global_pending--;
459 check_finished_io(io);
460 if (oper->num_pending == 0 &&
461 (oper->started_ios == oper->total_ios || oper->stonewalled))
467 int read_some_events(struct thread_info *t) {
468 struct io_unit *event_io;
469 struct io_event *event;
472 int min_nr = io_iter;
473 struct timeval stop_time;
475 if (t->num_global_pending < io_iter)
476 min_nr = t->num_global_pending;
479 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
481 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
486 gettimeofday(&stop_time, NULL);
487 for (i = 0 ; i < nr ; i++) {
488 event = t->events + i;
489 event_io = (struct io_unit *)((unsigned long)event->obj);
490 finish_io(t, event_io, event->res, &stop_time);
496 * finds a free io unit, waiting for pending requests if required. returns
497 * null if none could be found
499 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
501 struct io_unit *event_io;
506 event_io = t->free_ious;
507 t->free_ious = t->free_ious->next;
508 if (grab_iou(event_io, oper)) {
509 fprintf(stderr, "io unit on free list but not free\n");
514 nr = read_some_events(t);
518 fprintf(stderr, "no free ious after read_some_events\n");
523 * wait for all pending requests for this io operation to finish
525 static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
526 struct io_event event;
527 struct io_unit *event_io;
533 if (oper->num_pending == 0)
536 /* this func is not speed sensitive, no need to go wild reading
537 * more than one event at a time
540 while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
542 while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
544 struct timeval tv_now;
545 event_io = (struct io_unit *)((unsigned long)event.obj);
547 gettimeofday(&tv_now, NULL);
548 finish_io(t, event_io, event.res, &tv_now);
550 if (oper->num_pending == 0)
555 fprintf(stderr, "%u errors on oper, last %u\n",
556 oper->num_err, oper->last_err);
561 off_t random_byte_offset(struct io_oper *oper) {
563 off_t rand_byte = oper->start;
567 range = (oper->end - oper->start) / (1024 * 1024);
568 if ((page_size_mask+1) > (1024 * 1024))
569 offset = (page_size_mask+1) / (1024 * 1024);
575 /* find a random mb offset */
576 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
577 rand_byte += num * 1024 * 1024;
579 /* find a random byte offset */
580 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
583 num = (num + page_size_mask) & ~page_size_mask;
586 if (rand_byte + oper->reclen > oper->end) {
587 rand_byte -= oper->reclen;
593 * build an aio iocb for an operation, based on oper->rw and the
594 * last offset used. This finds the struct io_unit that will be attached
595 * to the iocb, and things are ready for submission to aio after this
598 * returns null on error
600 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
605 io = find_iou(t, oper);
607 fprintf(stderr, "unable to find io unit\n");
613 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
615 oper->last_offset += oper->reclen;
618 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
620 oper->last_offset += oper->reclen;
623 rand_byte = random_byte_offset(oper);
624 oper->last_offset = rand_byte;
625 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
629 rand_byte = random_byte_offset(oper);
630 oper->last_offset = rand_byte;
631 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
641 * wait for any pending requests, and then free all ram associated with
642 * an operation. returns the last error the operation hit (zero means none)
645 finish_oper(struct thread_info *t, struct io_oper *oper)
647 unsigned long last_err;
649 io_oper_wait(t, oper);
650 last_err = oper->last_err;
651 if (oper->num_pending > 0) {
652 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
660 * allocates an io operation and fills in all the fields. returns
663 static struct io_oper *
664 create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
665 int iter, char *file_name)
667 struct io_oper *oper;
669 oper = malloc (sizeof(*oper));
671 fprintf(stderr, "unable to allocate io oper\n");
674 memset(oper, 0, sizeof(*oper));
679 oper->last_offset = oper->start;
681 oper->reclen = reclen;
683 oper->total_ios = (oper->end - oper->start) / oper->reclen;
684 oper->file_name = file_name;
690 * does setup on num_ios worth of iocbs, but does not actually
693 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
694 struct iocb **my_iocbs)
699 if (oper->started_ios == 0)
700 gettimeofday(&oper->start_time, NULL);
703 num_ios = oper->total_ios;
705 if ((oper->started_ios + num_ios) > oper->total_ios)
706 num_ios = oper->total_ios - oper->started_ios;
708 for( i = 0 ; i < num_ios ; i++) {
709 io = build_iocb(t, oper);
713 my_iocbs[i] = &io->iocb;
719 * runs through the iocbs in the array provided and updates
720 * counters in the associated oper struct
722 static void update_iou_counters(struct iocb **my_iocbs, int nr,
723 struct timeval *tv_now)
727 for (i = 0 ; i < nr ; i++) {
728 io = (struct io_unit *)(my_iocbs[i]);
729 io->io_oper->num_pending++;
730 io->io_oper->started_ios++;
731 io->io_start_time = *tv_now; /* set time of io_submit */
735 /* starts some io for a given file, returns zero if all went well */
736 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
739 struct timeval start_time;
740 struct timeval stop_time;
743 gettimeofday(&start_time, NULL);
744 ret = io_submit(t->io_ctx, num_ios, my_iocbs);
745 gettimeofday(&stop_time, NULL);
746 calc_latency(&start_time, &stop_time, &t->io_submit_latency);
748 if (ret != num_ios) {
749 /* some ios got through */
751 update_iou_counters(my_iocbs, ret, &stop_time);
753 t->num_global_pending += ret;
757 * we've used all the requests allocated in aio_init, wait and
760 if (ret > 0 || ret == -EAGAIN) {
762 if ((ret = read_some_events(t) > 0)) {
765 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
770 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
773 update_iou_counters(my_iocbs, ret, &stop_time);
774 t->num_global_pending += ret;
779 * changes oper->rw to the next in a command sequence, or returns zero
780 * to say this operation is really, completely done for
782 static int restart_oper(struct io_oper *oper) {
787 /* this switch falls through */
790 if (stages & (1 << READ))
793 if (!new_rw && stages & (1 << RWRITE))
796 if (!new_rw && stages & (1 << RREAD))
801 oper->started_ios = 0;
802 oper->last_offset = oper->start;
803 oper->stonewalled = 0;
806 * we're restarting an operation with pending requests, so the
807 * timing info won't be printed by finish_io. Printing it here
809 if (oper->num_pending)
818 static int oper_runnable(struct io_oper *oper) {
822 /* first context is always runnable, if started_ios > 0, no need to
823 * redo the calculations
825 if (oper->started_ios || oper->start == 0)
828 * only the sequential phases force delays in starting */
829 if (oper->rw >= RWRITE)
831 ret = fstat(oper->fd, &buf);
836 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
842 * runs through all the io operations on the active list, and starts
843 * a chunk of io on each. If any io operations are completely finished,
844 * it either switches them to the next stage or puts them on the
847 * this function stops after max_io_submit iocbs are sent down the
848 * pipe, even if it has not yet touched all the operations on the
849 * active list. Any operations that have finished are moved onto
850 * the finished_opers list.
852 static int run_active_list(struct thread_info *t,
856 struct io_oper *oper;
857 struct io_oper *built_opers = NULL;
858 struct iocb **my_iocbs = t->iocbs;
862 oper = t->active_opers;
864 if (!oper_runnable(oper)) {
866 if (oper == t->active_opers)
870 ret = build_oper(t, oper, io_iter, my_iocbs);
874 oper_list_del(oper, &t->active_opers);
875 oper_list_add(oper, &built_opers);
876 oper = t->active_opers;
877 if (num_built + io_iter > max_io_submit)
883 ret = run_built(t, num_built, t->iocbs);
885 fprintf(stderr, "error %d on run_built\n", ret);
890 oper_list_del(oper, &built_opers);
891 oper_list_add(oper, &t->active_opers);
892 if (oper->started_ios == oper->total_ios) {
893 oper_list_del(oper, &t->active_opers);
894 oper_list_add(oper, &t->finished_opers);
904 if (use_shm != USE_SHM)
907 ret = shmctl(shm_id, IPC_RMID, &ds);
909 perror("shmctl IPC_RMID");
913 void aio_setup(io_context_t *io_ctx, int n)
915 int res = io_queue_init(n, io_ctx);
917 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
918 n, res, strerror(-res));
924 * allocate io operation and event arrays for a given thread
926 int setup_ious(struct thread_info *t,
927 int num_files, int depth,
928 int reclen, int max_io_submit) {
930 size_t bytes = num_files * depth * sizeof(*t->ios);
932 t->ios = malloc(bytes);
934 fprintf(stderr, "unable to allocate io units\n");
937 memset(t->ios, 0, bytes);
939 for (i = 0 ; i < depth * num_files; i++) {
940 t->ios[i].buf = aligned_buffer;
941 aligned_buffer += padded_reclen;
942 t->ios[i].buf_size = reclen;
944 memset(t->ios[i].buf, 'b', reclen);
946 memset(t->ios[i].buf, 0, reclen);
947 t->ios[i].next = t->free_ious;
948 t->free_ious = t->ios + i;
951 verify_buf = aligned_buffer;
952 memset(verify_buf, 'b', reclen);
955 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
957 fprintf(stderr, "unable to allocate iocbs\n");
961 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
963 t->events = malloc(sizeof(struct io_event) * depth * num_files);
965 fprintf(stderr, "unable to allocate ram for events\n");
968 memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
970 t->num_global_ios = num_files * depth;
971 t->num_global_events = t->num_global_ios;
985 * The buffers used for file data are allocated as a single big
986 * malloc, and then each thread and operation takes a piece and uses
987 * that for file data. This lets us do a large shm or bigpages alloc
988 * and without trying to find a special place in each thread to map the
991 int setup_shared_mem(int num_threads, int num_files, int depth,
992 int reclen, int max_io_submit)
997 padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
998 padded_reclen = padded_reclen * (page_size_mask+1);
999 total_ram = num_files * depth * padded_reclen + num_threads;
1001 total_ram += padded_reclen;
1003 if (use_shm == USE_MALLOC) {
1004 p = malloc(total_ram + page_size_mask);
1005 } else if (use_shm == USE_SHM) {
1006 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1012 p = shmat(shm_id, (char *)0x50000000, 0);
1013 if ((long)p == -1) {
1017 /* won't really be dropped until we shmdt */
1019 } else if (use_shm == USE_SHMFS) {
1020 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
1023 strcpy(mmap_name, "/dev/shm/XXXXXX");
1024 fd = mkstemp(mmap_name);
1030 ftruncate(fd, total_ram);
1032 p = mmap((char *)0x50000000, total_ram,
1033 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1035 if (p == MAP_FAILED) {
1041 fprintf(stderr, "unable to allocate buffers\n");
1044 unaligned_buffer = p;
1045 p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1051 if (unaligned_buffer)
1052 free(unaligned_buffer);
1057 * runs through all the thread_info structs and calculates a combined
1060 void global_thread_throughput(struct thread_info *t, char *this_stage) {
1062 double runtime = time_since_now(&global_stage_start_time);
1063 double total_mb = 0;
1064 double min_trans = 0;
1066 for (i = 0 ; i < num_threads ; i++) {
1067 total_mb += global_thread_info[i].stage_mb_trans;
1068 if (!min_trans || t->stage_mb_trans < min_trans)
1069 min_trans = t->stage_mb_trans;
1072 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1073 total_mb / runtime);
1074 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1076 fprintf(stderr, " min transfer %.2fMB", min_trans);
1077 fprintf(stderr, "\n");
1082 /* this is the meat of the state machine. There is a list of
1083 * active operations structs, and as each one finishes the required
1084 * io it is moved to a list of finished operations. Once they have
1085 * all finished whatever stage they were in, they are given the chance
1086 * to restart and pick a different stage (read/write/random read etc)
1088 * various timings are printed in between the stages, along with
1089 * thread synchronization if there are more than one threads.
1091 int worker(struct thread_info *t)
1093 struct io_oper *oper;
1094 char *this_stage = NULL;
1095 struct timeval stage_time;
1100 aio_setup(&t->io_ctx, 512);
1103 if (num_threads > 1) {
1104 pthread_mutex_lock(&stage_mutex);
1106 if (threads_starting == num_threads) {
1108 gettimeofday(&global_stage_start_time, NULL);
1109 pthread_cond_broadcast(&stage_cond);
1111 while (threads_starting != num_threads)
1112 pthread_cond_wait(&stage_cond, &stage_mutex);
1113 pthread_mutex_unlock(&stage_mutex);
1115 if (t->active_opers) {
1116 this_stage = stage_name(t->active_opers->rw);
1117 gettimeofday(&stage_time, NULL);
1118 t->stage_mb_trans = 0;
1122 /* first we send everything through aio */
1123 while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1124 if (stonewall && threads_ending) {
1125 oper = t->active_opers;
1126 oper->stonewalled = 1;
1127 oper_list_del(oper, &t->active_opers);
1128 oper_list_add(oper, &t->finished_opers);
1130 run_active_list(t, io_iter, max_io_submit);
1137 if (completion_latency_stats)
1138 print_completion_latency(t);
1140 /* then we wait for all the operations to finish */
1141 oper = t->finished_opers;
1145 io_oper_wait(t, oper);
1147 } while(oper != t->finished_opers);
1149 /* then we do an fsync to get the timing for any future operations
1150 * right, and check to see if any of these need to get restarted
1152 oper = t->finished_opers;
1156 t->stage_mb_trans += oper_mb_trans(oper);
1157 if (restart_oper(oper)) {
1158 oper_list_del(oper, &t->finished_opers);
1159 oper_list_add(oper, &t->active_opers);
1160 oper = t->finished_opers;
1164 if (oper == t->finished_opers)
1168 if (t->stage_mb_trans && t->num_files > 0) {
1169 double seconds = time_since_now(&stage_time);
1170 fprintf(stderr, "thread %llu %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1171 (unsigned long long)(t - global_thread_info), this_stage,
1172 t->stage_mb_trans/seconds, t->stage_mb_trans, seconds);
1175 if (num_threads > 1) {
1176 pthread_mutex_lock(&stage_mutex);
1178 if (threads_ending == num_threads) {
1179 threads_starting = 0;
1180 pthread_cond_broadcast(&stage_cond);
1181 global_thread_throughput(t, this_stage);
1183 while(threads_ending != num_threads)
1184 pthread_cond_wait(&stage_cond, &stage_mutex);
1185 pthread_mutex_unlock(&stage_mutex);
1188 /* someone got restarted, go back to the beginning */
1189 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1194 /* finally, free all the ram */
1195 while(t->finished_opers) {
1196 oper = t->finished_opers;
1197 oper_list_del(oper, &t->finished_opers);
1198 status = finish_oper(t, oper);
1201 if (t->num_global_pending) {
1202 fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1204 io_queue_release(t->io_ctx);
1209 typedef void * (*start_routine)(void *);
1210 int run_workers(struct thread_info *t, int num_threads)
1215 for(i = 0 ; i < num_threads ; i++) {
1216 ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1218 perror("pthread_create");
1222 for(i = 0 ; i < num_threads ; i++) {
1223 ret = pthread_join(t[i].tid, NULL);
1225 perror("pthread_join");
1232 off_t parse_size(char *size_arg, off_t mult) {
1236 c = size_arg[strlen(size_arg) - 1];
1238 size_arg[strlen(size_arg) - 1] = '\0';
1240 num = atoi(size_arg);
1244 mult = 1024 * 1024 * 1024;
1263 void print_usage(void) {
1264 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1265 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1266 printf(" file1 [file2 ...]\n");
1267 printf("\t-a size in KB at which to align buffers\n");
1268 printf("\t-b max number of iocbs to give io_submit at once\n");
1269 printf("\t-c number of io contexts per file\n");
1270 printf("\t-C offset between contexts, default 2MB\n");
1271 printf("\t-s size in MB of the test file(s), default 1024MB\n");
1272 printf("\t-r record size in KB used for each io, default 64KB\n");
1273 printf("\t-d number of pending aio requests for each file, default 64\n");
1274 printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n");
1275 printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1276 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1277 printf("\t-S Use O_SYNC for writes\n");
1278 printf("\t-o add an operation to the list: write=0, read=1,\n");
1279 printf("\t random write=2, random read=3.\n");
1280 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1281 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1282 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1283 printf("\t-n no fsyncs between write stage and read stage\n");
1284 printf("\t-l print io_submit latencies after each stage\n");
1285 printf("\t-L print io completion latencies after each stage\n");
1286 printf("\t-t number of threads to run\n");
1287 printf("\t-u unlink files after completion\n");
1288 printf("\t-v verification of bytes written\n");
1289 printf("\t-x turn off thread stonewalling\n");
1290 printf("\t-h this message\n");
1291 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1292 printf("\t translate to 400KB, 400MB and 400GB\n");
1293 printf("version %s\n", PROG_VERSION);
1296 int main(int ac, char **av)
1303 off_t file_size = 1 * 1024 * 1024 * 1024;
1304 int first_stage = WRITE;
1305 struct io_oper *oper;
1309 struct thread_info *t;
1311 page_size_mask = getpagesize() - 1;
1314 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1320 page_size_mask = parse_size(optarg, 1024);
1324 num_contexts = atoi(optarg);
1327 context_offset = parse_size(optarg, 1024 * 1024);
1329 max_io_submit = atoi(optarg);
1332 file_size = parse_size(optarg, 1024 * 1024);
1335 depth = atoi(optarg);
1338 rec_len = parse_size(optarg, 1024);
1341 io_iter = atoi(optarg);
1344 iterations = atoi(optarg);
1353 completion_latency_stats = 1;
1356 if (!strcmp(optarg, "shm")) {
1357 fprintf(stderr, "using ipc shm\n");
1359 } else if (!strcmp(optarg, "shmfs")) {
1360 fprintf(stderr, "using /dev/shm for buffers\n");
1361 use_shm = USE_SHMFS;
1367 fprintf(stderr, "adding stage %s\n", stage_name(i));
1370 o_direct = O_DIRECT;
1376 num_threads = atoi(optarg);
1395 * make sure we don't try to submit more ios than we have allocated
1398 if (depth < io_iter) {
1400 fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1408 num_files = ac - optind;
1410 if (num_threads > (num_files * num_contexts)) {
1411 num_threads = num_files * num_contexts;
1412 fprintf(stderr, "dropping thread count to the number of contexts %d\n",
1416 t = calloc(num_threads, sizeof(*t));
1421 global_thread_info = t;
1423 /* by default, allow a huge number of iocbs to be sent towards
1427 max_io_submit = num_files * io_iter * num_contexts;
1430 * make sure we don't try to submit more ios than max_io_submit allows
1432 if (max_io_submit < io_iter) {
1433 io_iter = max_io_submit;
1434 fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1438 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1440 for (i = 0 ; i < LAST_STAGE; i++) {
1441 if (stages & (1 << i)) {
1443 fprintf(stderr, "starting with %s\n", stage_name(i));
1449 if (file_size < num_contexts * context_offset) {
1450 fprintf(stderr, "file size %Lu too small for %d contexts\n",
1451 (unsigned long long)file_size, num_contexts);
1455 fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n",
1456 (unsigned long long)file_size / (1024 * 1024),
1457 rec_len / 1024, depth, io_iter);
1458 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1459 max_io_submit, (page_size_mask + 1)/1024);
1460 fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
1461 num_threads, num_files, num_contexts,
1462 (unsigned long long)context_offset / (1024 * 1024),
1463 verify ? "on" : "off");
1464 /* open all the files and do any required setup for them */
1465 for (i = optind ; i < ac ; i++) {
1467 for (j = 0 ; j < num_contexts ; j++) {
1468 thread_index = open_fds % num_threads;
1471 rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1474 oper = create_oper(rwfd, first_stage, j * context_offset,
1475 file_size - j * context_offset, rec_len,
1476 depth, io_iter, av[i]);
1478 fprintf(stderr, "error in create_oper\n");
1481 oper_list_add(oper, &t[thread_index].active_opers);
1482 t[thread_index].num_files++;
1485 if (setup_shared_mem(num_threads, num_files * num_contexts,
1486 depth, rec_len, max_io_submit))
1490 for (i = 0 ; i < num_threads ; i++) {
1491 if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1494 if (num_threads > 1){
1495 printf("Running multi thread version num_threads:%d\n", num_threads);
1496 run_workers(t, num_threads);
1498 printf("Running single thread version \n");
1502 for (i = optind ; i < ac ; i++) {
1503 printf("Cleaning up file %s \n", av[i]);