IOR
ior.c
Go to the documentation of this file.
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  */
4 /******************************************************************************\
5 * *
6 * Copyright (c) 2003, The Regents of the University of California *
7 * See the file COPYRIGHT for a complete copyright notice and license. *
8 * *
9 \******************************************************************************/
10 
11 #ifdef HAVE_CONFIG_H
12 # include "config.h"
13 #endif
14 
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <ctype.h> /* tolower() */
19 #include <errno.h>
20 #include <math.h>
21 #include <mpi.h>
22 #include <string.h>
23 
24 #if defined(HAVE_STRINGS_H)
25 #include <strings.h>
26 #endif
27 
28 #include <sys/stat.h> /* struct stat */
29 #include <time.h>
30 
31 #ifndef _WIN32
32 # include <sys/time.h> /* gettimeofday() */
33 # include <sys/utsname.h> /* uname() */
34 #endif
35 
36 #include <assert.h>
37 
38 #include "ior.h"
39 #include "ior-internal.h"
40 #include "aiori.h"
41 #include "utilities.h"
42 #include "parse_options.h"
43 
44 #define IOR_NB_TIMERS 6
45 
46 /* file scope globals */
47 extern char **environ;
48 static int totalErrorCount;
49 static const ior_aiori_t *backend;
50 
51 static void DestroyTests(IOR_test_t *tests_head);
52 static char *PrependDir(IOR_param_t *, char *);
53 static char **ParseFileName(char *, int *);
54 static void InitTests(IOR_test_t * , MPI_Comm);
55 static void TestIoSys(IOR_test_t *);
56 static void ValidateTests(IOR_param_t *);
58  void *fd, const int access,
59  IOR_io_buffers *ioBuffers);
60 
61 IOR_test_t * ior_run(int argc, char **argv, MPI_Comm world_com, FILE * world_out){
62  IOR_test_t *tests_head;
63  IOR_test_t *tptr;
64  out_logfile = world_out;
65  out_resultfile = world_out;
66  mpi_comm_world = world_com;
67 
68  MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
69 
70  /* setup tests, and validate parameters */
71  tests_head = ParseCommandLine(argc, argv);
72  InitTests(tests_head, world_com);
73  verbose = tests_head->params.verbose;
74 
75  PrintHeader(argc, argv);
76 
77  /* perform each test */
78  for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
79  aiori_initialize(tptr);
80  totalErrorCount = 0;
81  verbose = tptr->params.verbose;
82  backend = tptr->params.backend;
83  if (rank == 0 && verbose >= VERBOSE_0) {
84  ShowTestStart(&tptr->params);
85  }
86  TestIoSys(tptr);
88  ShowTestEnd(tptr);
89  aiori_finalize(tptr);
90  }
91 
92  PrintLongSummaryAllTests(tests_head);
93 
94  /* display finish time */
95  PrintTestEnds();
96  return tests_head;
97 }
98 
99 
100 
101 int ior_main(int argc, char **argv)
102 {
103  IOR_test_t *tests_head;
104  IOR_test_t *tptr;
105 
106  out_logfile = stdout;
107  out_resultfile = stdout;
108 
109  /*
110  * check -h option from commandline without starting MPI;
111  */
112  tests_head = ParseCommandLine(argc, argv);
113 
114  /* start the MPI code */
115  MPI_CHECK(MPI_Init(&argc, &argv), "cannot initialize MPI");
116 
117  mpi_comm_world = MPI_COMM_WORLD;
118  MPI_CHECK(MPI_Comm_rank(mpi_comm_world, &rank), "cannot get rank");
119 
120  /* set error-handling */
121  /*MPI_CHECK(MPI_Errhandler_set(mpi_comm_world, MPI_ERRORS_RETURN),
122  "cannot set errhandler"); */
123 
124  /* setup tests, and validate parameters */
125  InitTests(tests_head, mpi_comm_world);
126  verbose = tests_head->params.verbose;
127 
128  aiori_initialize(tests_head); // this is quite suspicious, likely an error when multiple tests need to be executed with different backends and options
129 
130  PrintHeader(argc, argv);
131 
132  /* perform each test */
133  for (tptr = tests_head; tptr != NULL; tptr = tptr->next) {
134  verbose = tptr->params.verbose;
135  backend = tptr->params.backend;
136  if (rank == 0 && verbose >= VERBOSE_0) {
137  backend = tptr->params.backend;
138  ShowTestStart(&tptr->params);
139  }
140 
141  // This is useful for trapping a running MPI process. While
142  // this is sleeping, run the script 'testing/hdfs/gdb.attach'
143  if (verbose >= VERBOSE_4) {
144  fprintf(out_logfile, "\trank %d: sleeping\n", rank);
145  sleep(5);
146  fprintf(out_logfile, "\trank %d: awake.\n", rank);
147  }
148 
149  TestIoSys(tptr);
150  ShowTestEnd(tptr);
151  }
152 
153  if (verbose < 0)
154  /* always print final summary */
155  verbose = 0;
156  PrintLongSummaryAllTests(tests_head);
157 
158  /* display finish time */
159  PrintTestEnds();
160 
161  aiori_finalize(tests_head);
162 
163  MPI_CHECK(MPI_Finalize(), "cannot finalize MPI");
164 
165  DestroyTests(tests_head);
166 
167  return totalErrorCount;
168 }
169 
170 /***************************** F U N C T I O N S ******************************/
171 
172 /*
173  * Initialize an IOR_param_t structure to the defaults
174  */
176 {
177  const char *default_aiori = aiori_default ();
178  char *hdfs_user;
179 
180  assert (NULL != default_aiori);
181 
182  memset(p, 0, sizeof(IOR_param_t));
183 
186 
187  p->api = strdup(default_aiori);
188  p->platform = strdup("HOST(OSTYPE)");
189  p->testFileName = strdup("testFile");
190 
191  p->writeFile = p->readFile = FALSE;
192  p->checkWrite = p->checkRead = FALSE;
193 
194  /*
195  * These can be overridden from the command-line but otherwise will be
196  * set from MPI.
197  */
198  p->numTasks = -1;
199  p->numNodes = -1;
200  p->numTasksOnNode0 = -1;
201 
202  p->repetitions = 1;
203  p->repCounter = -1;
204  p->open = WRITE;
205  p->taskPerNodeOffset = 1;
206  p->segmentCount = 1;
207  p->blockSize = 1048576;
208  p->transferSize = 262144;
209  p->randomSeed = -1;
210  p->incompressibleSeed = 573;
212  p->setAlignment = 1;
213  p->lustre_start_ost = -1;
214 
215  hdfs_user = getenv("USER");
216  if (!hdfs_user)
217  hdfs_user = "";
218  p->hdfs_user = strdup(hdfs_user);
219  p->hdfs_name_node = "default";
220  p->hdfs_name_node_port = 0; /* ??? */
221  p->hdfs_fs = NULL;
222  p->hdfs_replicas = 0; /* invokes the default */
223  p->hdfs_block_size = 0;
224 
225  p->URI = NULL;
226  p->part_number = 0;
227 
228  p->beegfs_numTargets = -1;
229  p->beegfs_chunkSize = -1;
230 }
231 
232 static void
234  double timerVal,
235  char *timeString, int access, int outlierThreshold)
236 {
237  char accessString[MAX_STR];
238  double sum, mean, sqrDiff, var, sd;
239 
240  /* for local timerVal, don't compensate for wall clock delta */
241  timerVal += wall_clock_delta;
242 
243  MPI_CHECK(MPI_Allreduce
244  (&timerVal, &sum, 1, MPI_DOUBLE, MPI_SUM, testComm),
245  "MPI_Allreduce()");
246  mean = sum / numTasks;
247  sqrDiff = pow((mean - timerVal), 2);
248  MPI_CHECK(MPI_Allreduce
249  (&sqrDiff, &var, 1, MPI_DOUBLE, MPI_SUM, testComm),
250  "MPI_Allreduce()");
251  var = var / numTasks;
252  sd = sqrt(var);
253 
254  if (access == WRITE) {
255  strcpy(accessString, "write");
256  } else { /* READ */
257  strcpy(accessString, "read");
258  }
259  if (fabs(timerVal - mean) > (double)outlierThreshold) {
260  char hostname[MAX_STR];
261  int ret = gethostname(hostname, MAX_STR);
262  if (ret != 0)
263  strcpy(hostname, "unknown");
264 
265  fprintf(out_logfile, "WARNING: for %s, task %d, %s %s is %f\n",
266  hostname, rank, accessString, timeString, timerVal);
267  fprintf(out_logfile, " (mean=%f, stddev=%f)\n", mean, sd);
268  fflush(out_logfile);
269  }
270 }
271 
272 /*
273  * Check for outliers in start/end times and elapsed create/xfer/close times.
274  */
275 static void
276 CheckForOutliers(IOR_param_t *test, const double *timer, const int access)
277 {
278  DisplayOutliers(test->numTasks, timer[0],
279  "start time", access, test->outlierThreshold);
281  timer[1] - timer[0],
282  "elapsed create time", access, test->outlierThreshold);
284  timer[3] - timer[2],
285  "elapsed transfer time", access,
286  test->outlierThreshold);
288  timer[5] - timer[4],
289  "elapsed close time", access, test->outlierThreshold);
290  DisplayOutliers(test->numTasks, timer[5], "end time",
291  access, test->outlierThreshold);
292 }
293 
294 /*
295  * Check if actual file size equals expected size; if not use actual for
296  * calculating performance rate.
297  */
298 static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep,
299  const int access)
300 {
301  IOR_param_t *params = &test->params;
302  IOR_results_t *results = test->results;
303  IOR_point_t *point = (access == WRITE) ? &results[rep].write :
304  &results[rep].read;
305 
306  MPI_CHECK(MPI_Allreduce(&dataMoved, &point->aggFileSizeFromXfer,
307  1, MPI_LONG_LONG_INT, MPI_SUM, testComm),
308  "cannot total data moved");
309 
310  if (strcasecmp(params->api, "HDF5") != 0 && strcasecmp(params->api, "NCMPI") != 0 &&
311  strcasecmp(params->api, "DAOS") != 0) {
312  if (verbose >= VERBOSE_0 && rank == 0) {
313  if ((params->expectedAggFileSize
314  != point->aggFileSizeFromXfer)
315  || (point->aggFileSizeFromStat
316  != point->aggFileSizeFromXfer)) {
317  fprintf(out_logfile,
318  "WARNING: Expected aggregate file size = %lld.\n",
319  (long long) params->expectedAggFileSize);
320  fprintf(out_logfile,
321  "WARNING: Stat() of aggregate file size = %lld.\n",
322  (long long) point->aggFileSizeFromStat);
323  fprintf(out_logfile,
324  "WARNING: Using actual aggregate bytes moved = %lld.\n",
325  (long long) point->aggFileSizeFromXfer);
326  if(params->deadlineForStonewalling){
327  fprintf(out_logfile,
328  "WARNING: maybe caused by deadlineForStonewalling\n");
329  }
330  }
331  }
332  }
333 
334  point->aggFileSizeForBW = point->aggFileSizeFromXfer;
335 }
336 
337 /*
338  * Compare buffers after reading/writing each transfer. Displays only first
339  * difference in buffers and returns total errors counted.
340  */
341 static size_t
342 CompareBuffers(void *expectedBuffer,
343  void *unknownBuffer,
344  size_t size,
345  IOR_offset_t transferCount, IOR_param_t *test, int access)
346 {
347  char testFileName[MAX_PATHLEN];
348  char bufferLabel1[MAX_STR];
349  char bufferLabel2[MAX_STR];
350  size_t i, j, length, first, last;
351  size_t errorCount = 0;
352  int inError = 0;
353  unsigned long long *goodbuf = (unsigned long long *)expectedBuffer;
354  unsigned long long *testbuf = (unsigned long long *)unknownBuffer;
355 
356  if (access == WRITECHECK || access == READCHECK) {
357  strcpy(bufferLabel1, "Expected: ");
358  strcpy(bufferLabel2, "Actual: ");
359  } else {
360  ERR("incorrect argument for CompareBuffers()");
361  }
362 
363  length = size / sizeof(IOR_size_t);
364  first = -1;
365  if (verbose >= VERBOSE_3) {
366  fprintf(out_logfile,
367  "[%d] At file byte offset %lld, comparing %llu-byte transfer\n",
368  rank, test->offset, (long long)size);
369  }
370  for (i = 0; i < length; i++) {
371  if (testbuf[i] != goodbuf[i]) {
372  errorCount++;
373  if (verbose >= VERBOSE_2) {
374  fprintf(out_logfile,
375  "[%d] At transfer buffer #%lld, index #%lld (file byte offset %lld):\n",
376  rank, transferCount - 1, (long long)i,
377  test->offset +
378  (IOR_size_t) (i * sizeof(IOR_size_t)));
379  fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel1);
380  fprintf(out_logfile, "%016llx\n", goodbuf[i]);
381  fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel2);
382  fprintf(out_logfile, "%016llx\n", testbuf[i]);
383  }
384  if (!inError) {
385  inError = 1;
386  first = i;
387  last = i;
388  } else {
389  last = i;
390  }
391  } else if (verbose >= VERBOSE_5 && i % 4 == 0) {
392  fprintf(out_logfile,
393  "[%d] PASSED offset = %lld bytes, transfer %lld\n",
394  rank,
395  ((i * sizeof(unsigned long long)) +
396  test->offset), transferCount);
397  fprintf(out_logfile, "[%d] GOOD %s0x", rank, bufferLabel1);
398  for (j = 0; j < 4; j++)
399  fprintf(out_logfile, "%016llx ", goodbuf[i + j]);
400  fprintf(out_logfile, "\n[%d] GOOD %s0x", rank, bufferLabel2);
401  for (j = 0; j < 4; j++)
402  fprintf(out_logfile, "%016llx ", testbuf[i + j]);
403  fprintf(out_logfile, "\n");
404  }
405  }
406  if (inError) {
407  inError = 0;
408  GetTestFileName(testFileName, test);
409  fprintf(out_logfile,
410  "[%d] FAILED comparison of buffer containing %d-byte ints:\n",
411  rank, (int)sizeof(unsigned long long int));
412  fprintf(out_logfile, "[%d] File name = %s\n", rank, testFileName);
413  fprintf(out_logfile, "[%d] In transfer %lld, ", rank,
414  transferCount);
415  fprintf(out_logfile,
416  "%lld errors between buffer indices %lld and %lld.\n",
417  (long long)errorCount, (long long)first,
418  (long long)last);
419  fprintf(out_logfile, "[%d] File byte offset = %lld:\n", rank,
420  ((first * sizeof(unsigned long long)) + test->offset));
421 
422  fprintf(out_logfile, "[%d] %s0x", rank, bufferLabel1);
423  for (j = first; j < length && j < first + 4; j++)
424  fprintf(out_logfile, "%016llx ", goodbuf[j]);
425  if (j == length)
426  fprintf(out_logfile, "[end of buffer]");
427  fprintf(out_logfile, "\n[%d] %s0x", rank, bufferLabel2);
428  for (j = first; j < length && j < first + 4; j++)
429  fprintf(out_logfile, "%016llx ", testbuf[j]);
430  if (j == length)
431  fprintf(out_logfile, "[end of buffer]");
432  fprintf(out_logfile, "\n");
433  if (test->quitOnError == TRUE)
434  ERR("data check error, aborting execution");
435  }
436  return (errorCount);
437 }
438 
439 /*
440  * Count all errors across all tasks; report errors found.
441  */
442 static int CountErrors(IOR_param_t * test, int access, int errors)
443 {
444  int allErrors = 0;
445 
446  if (test->checkWrite || test->checkRead) {
447  MPI_CHECK(MPI_Reduce(&errors, &allErrors, 1, MPI_INT, MPI_SUM,
448  0, testComm), "cannot reduce errors");
449  MPI_CHECK(MPI_Bcast(&allErrors, 1, MPI_INT, 0, testComm),
450  "cannot broadcast allErrors value");
451  if (allErrors != 0) {
452  totalErrorCount += allErrors;
453  test->errorFound = TRUE;
454  }
455  if (rank == 0 && allErrors != 0) {
456  if (allErrors < 0) {
457  WARN("overflow in errors counted");
458  allErrors = -1;
459  }
460  fprintf(out_logfile, "WARNING: incorrect data on %s (%d errors found).\n",
461  access == WRITECHECK ? "write" : "read", allErrors);
462  fprintf(out_logfile,
463  "Used Time Stamp %u (0x%x) for Data Signature\n",
466  }
467  }
468  return (allErrors);
469 }
470 
471 /*
472  * Allocate a page-aligned (required by O_DIRECT) buffer.
473  */
474 static void *aligned_buffer_alloc(size_t size)
475 {
476  size_t pageMask;
477  char *buf, *tmp;
478  char *aligned;
479 
480 #ifdef HAVE_SYSCONF
481  long pageSize = sysconf(_SC_PAGESIZE);
482 #else
483  size_t pageSize = getpagesize();
484 #endif
485 
486  pageMask = pageSize - 1;
487  buf = malloc(size + pageSize + sizeof(void *));
488  if (buf == NULL)
489  ERR("out of memory");
490  /* find the alinged buffer */
491  tmp = buf + sizeof(char *);
492  aligned = tmp + pageSize - ((size_t) tmp & pageMask);
493  /* write a pointer to the original malloc()ed buffer into the bytes
494  preceding "aligned", so that the aligned buffer can later be free()ed */
495  tmp = aligned - sizeof(void *);
496  *(void **)tmp = buf;
497 
498  return (void *)aligned;
499 }
500 
501 /*
502  * Free a buffer allocated by aligned_buffer_alloc().
503  */
504 static void aligned_buffer_free(void *buf)
505 {
506  free(*(void **)((char *)buf - sizeof(char *)));
507 }
508 
510 {
511  int reps;
512  if (test->results != NULL)
513  return;
514 
515  reps = test->params.repetitions;
516  test->results = (IOR_results_t *) safeMalloc(sizeof(IOR_results_t) * reps);
517 }
518 
520 {
521  if (test->results != NULL) {
522  free(test->results);
523  }
524 }
525 
526 
530 IOR_test_t *CreateTest(IOR_param_t *init_params, int test_num)
531 {
532  IOR_test_t *newTest = NULL;
533 
534  newTest = (IOR_test_t *) malloc(sizeof(IOR_test_t));
535  if (newTest == NULL)
536  ERR("malloc() of IOR_test_t failed");
537  newTest->params = *init_params;
538  newTest->params.platform = GetPlatformName();
539  newTest->params.id = test_num;
540  newTest->next = NULL;
541  newTest->results = NULL;
542 
543  return newTest;
544 }
545 
546 static void DestroyTest(IOR_test_t *test)
547 {
548  FreeResults(test);
549  free(test);
550 }
551 
552 static void DestroyTests(IOR_test_t *tests_head)
553 {
554  IOR_test_t *tptr, *next;
555 
556  for (tptr = tests_head; tptr != NULL; tptr = next) {
557  next = tptr->next;
558  DestroyTest(tptr);
559  }
560 }
561 
562 /*
563  * Distribute IOR_HINTs to all tasks' environments.
564  */
565 void DistributeHints(void)
566 {
567  char hint[MAX_HINTS][MAX_STR], fullHint[MAX_STR], hintVariable[MAX_STR];
568  int hintCount = 0, i;
569 
570  if (rank == 0) {
571  for (i = 0; environ[i] != NULL; i++) {
572  if (strncmp(environ[i], "IOR_HINT", strlen("IOR_HINT"))
573  == 0) {
574  hintCount++;
575  if (hintCount == MAX_HINTS) {
576  WARN("exceeded max hints; reset MAX_HINTS and recompile");
577  hintCount = MAX_HINTS;
578  break;
579  }
580  /* assume no IOR_HINT is greater than MAX_STR in length */
581  strncpy(hint[hintCount - 1], environ[i],
582  MAX_STR - 1);
583  }
584  }
585  }
586 
587  MPI_CHECK(MPI_Bcast(&hintCount, sizeof(hintCount), MPI_BYTE,
588  0, MPI_COMM_WORLD), "cannot broadcast hints");
589  for (i = 0; i < hintCount; i++) {
590  MPI_CHECK(MPI_Bcast(&hint[i], MAX_STR, MPI_BYTE,
591  0, MPI_COMM_WORLD),
592  "cannot broadcast hints");
593  strcpy(fullHint, hint[i]);
594  strcpy(hintVariable, strtok(fullHint, "="));
595  if (getenv(hintVariable) == NULL) {
596  /* doesn't exist in this task's environment; better set it */
597  if (putenv(hint[i]) != 0)
598  WARN("cannot set environment variable");
599  }
600  }
601 }
602 
603 /*
604  * Fill buffer, which is transfer size bytes long, with known 8-byte long long
605  * int values. In even-numbered 8-byte long long ints, store MPI task in high
606  * bits and timestamp signature in low bits. In odd-numbered 8-byte long long
607  * ints, store transfer offset. If storeFileOffset option is used, the file
608  * (not transfer) offset is stored instead.
609  */
610 
611 static void
613 
614 {
615  size_t i;
616  unsigned long long hi, lo;
617  unsigned long long *buf = (unsigned long long *)buffer;
618 
619  for (i = 0; i < test->transferSize / sizeof(unsigned long long); i++) {
620  hi = ((unsigned long long) rand_r(&test->incompressibleSeed) << 32);
621  lo = (unsigned long long) rand_r(&test->incompressibleSeed);
622  buf[i] = hi | lo;
623  }
624 }
625 
627 
628 static void
629 FillBuffer(void *buffer,
630  IOR_param_t * test, unsigned long long offset, int fillrank)
631 {
632  size_t i;
633  unsigned long long hi, lo;
634  unsigned long long *buf = (unsigned long long *)buffer;
635 
636  if(test->dataPacketType == incompressible ) { /* Make for some non compressable buffers with randomish data */
637 
638  /* In order for write checks to work, we have to restart the psuedo random sequence */
640  test->incompressibleSeed = test->setTimeStampSignature + rank; /* We copied seed into timestampSignature at initialization, also add the rank to add randomness between processes */
642  }
643  FillIncompressibleBuffer(buffer, test);
644  }
645 
646  else {
647  hi = ((unsigned long long)fillrank) << 32;
648  lo = (unsigned long long)test->timeStampSignatureValue;
649  for (i = 0; i < test->transferSize / sizeof(unsigned long long); i++) {
650  if ((i % 2) == 0) {
651  /* evens contain MPI rank and time in seconds */
652  buf[i] = hi | lo;
653  } else {
654  /* odds contain offset */
655  buf[i] = offset + (i * sizeof(unsigned long long));
656  }
657  }
658  }
659 }
660 
661 /*
662  * Return string describing machine name and type.
663  */
665 {
666  char nodeName[MAX_STR], *p, *start, sysName[MAX_STR];
667  char platformName[MAX_STR];
668  struct utsname name;
669 
670  if (uname(&name) != 0) {
671  EWARN("cannot get platform name");
672  sprintf(sysName, "%s", "Unknown");
673  sprintf(nodeName, "%s", "Unknown");
674  } else {
675  sprintf(sysName, "%s", name.sysname);
676  sprintf(nodeName, "%s", name.nodename);
677  }
678 
679  start = nodeName;
680  if (strlen(nodeName) == 0) {
681  p = start;
682  } else {
683  /* point to one character back from '\0' */
684  p = start + strlen(nodeName) - 1;
685  }
686  /*
687  * to cut off trailing node number, search backwards
688  * for the first non-numeric character
689  */
690  while (p != start) {
691  if (*p < '0' || *p > '9') {
692  *(p + 1) = '\0';
693  break;
694  } else {
695  p--;
696  }
697  }
698 
699  sprintf(platformName, "%s(%s)", nodeName, sysName);
700  return strdup(platformName);
701 }
702 
703 
704 
705 /*
706  * Parse file name.
707  */
708 static char **ParseFileName(char *name, int *count)
709 {
710  char **fileNames, *tmp, *token;
711  char delimiterString[3] = { FILENAME_DELIMITER, '\n', '\0' };
712  int i = 0;
713 
714  *count = 0;
715  tmp = name;
716 
717  /* pass one */
718  /* if something there, count the first item */
719  if (*tmp != '\0') {
720  (*count)++;
721  }
722  /* count the rest of the filenames */
723  while (*tmp != '\0') {
724  if (*tmp == FILENAME_DELIMITER) {
725  (*count)++;
726  }
727  tmp++;
728  }
729 
730  fileNames = (char **)malloc((*count) * sizeof(char **));
731  if (fileNames == NULL)
732  ERR("out of memory");
733 
734  /* pass two */
735  token = strtok(name, delimiterString);
736  while (token != NULL) {
737  fileNames[i] = token;
738  token = strtok(NULL, delimiterString);
739  i++;
740  }
741  return (fileNames);
742 }
743 
744 
745 /*
746  * Return test file name to access.
747  * for single shared file, fileNames[0] is returned in testFileName
748  */
749 void GetTestFileName(char *testFileName, IOR_param_t * test)
750 {
751  char **fileNames;
752  char initialTestFileName[MAX_PATHLEN];
753  char testFileNameRoot[MAX_STR];
754  char tmpString[MAX_STR];
755  int count;
756 
757  /* parse filename for multiple file systems */
758  strcpy(initialTestFileName, test->testFileName);
759  fileNames = ParseFileName(initialTestFileName, &count);
760  if (count > 1 && test->uniqueDir == TRUE)
761  ERR("cannot use multiple file names with unique directories");
762  if (test->filePerProc) {
763  strcpy(testFileNameRoot,
764  fileNames[((rank +
765  rankOffset) % test->numTasks) % count]);
766  } else {
767  strcpy(testFileNameRoot, fileNames[0]);
768  }
769 
770  /* give unique name if using multiple files */
771  if (test->filePerProc) {
772  /*
773  * prepend rank subdirectory before filename
774  * e.g., /dir/file => /dir/<rank>/file
775  */
776  if (test->uniqueDir == TRUE) {
777  strcpy(testFileNameRoot,
778  PrependDir(test, testFileNameRoot));
779  }
780  sprintf(testFileName, "%s.%08d", testFileNameRoot,
781  (rank + rankOffset) % test->numTasks);
782  } else {
783  strcpy(testFileName, testFileNameRoot);
784  }
785 
786  /* add suffix for multiple files */
787  if (test->repCounter > -1) {
788  sprintf(tmpString, ".%d", test->repCounter);
789  strcat(testFileName, tmpString);
790  }
791  free (fileNames);
792 }
793 
794 /*
795  * From absolute directory, insert rank as subdirectory. Allows each task
796  * to write to its own directory. E.g., /dir/file => /dir/<rank>/file.
797  */
798 static char *PrependDir(IOR_param_t * test, char *rootDir)
799 {
800  char *dir;
801  char *fname;
802  int i;
803 
804  dir = (char *)malloc(MAX_STR + 1);
805  if (dir == NULL)
806  ERR("out of memory");
807 
808  /* get dir name */
809  strcpy(dir, rootDir);
810  i = strlen(dir) - 1;
811  while (i > 0) {
812  if (dir[i] == '\0' || dir[i] == '/') {
813  dir[i] = '/';
814  dir[i + 1] = '\0';
815  break;
816  }
817  i--;
818  }
819 
820  /* get file name */
821  fname = rootDir + i + 1;
822 
823  /* create directory with rank as subdirectory */
824  sprintf(dir + i + 1, "%d", (rank + rankOffset) % test->numTasks);
825 
826  /* dir doesn't exist, so create */
827  if (backend->access(dir, F_OK, test) != 0) {
828  if (backend->mkdir(dir, S_IRWXU, test) < 0) {
829  ERRF("cannot create directory: %s", dir);
830  }
831 
832  /* check if correct permissions */
833  } else if (backend->access(dir, R_OK, test) != 0 ||
834  backend->access(dir, W_OK, test) != 0 ||
835  backend->access(dir, X_OK, test) != 0) {
836  ERRF("invalid directory permissions: %s", dir);
837  }
838 
839  /* concatenate dir and file names */
840  strcat(dir, "/");
841  strcat(dir, fname);
842 
843  return dir;
844 }
845 
846 /******************************************************************************/
847 /*
848  * Reduce test results, and show if verbose set.
849  */
850 static void
851 ReduceIterResults(IOR_test_t *test, double *timer, const int rep, const int access)
852 {
853  double reduced[IOR_NB_TIMERS] = { 0 };
854  double diff[IOR_NB_TIMERS / 2 + 1];
855  double totalTime, accessTime;
856  IOR_param_t *params = &test->params;
857  double bw, iops, latency, minlatency;
858  int i;
859  MPI_Op op;
860 
861  assert(access == WRITE || access == READ);
862 
863  /* Find the minimum start time of the even numbered timers, and the
864  maximum finish time for the odd numbered timers */
865  for (i = 0; i < IOR_NB_TIMERS; i++) {
866  op = i % 2 ? MPI_MAX : MPI_MIN;
867  MPI_CHECK(MPI_Reduce(&timer[i], &reduced[i], 1, MPI_DOUBLE,
868  op, 0, testComm), "MPI_Reduce()");
869  }
870 
871  /* Calculate elapsed times and throughput numbers */
872  for (i = 0; i < IOR_NB_TIMERS / 2; i++)
873  diff[i] = reduced[2 * i + 1] - reduced[2 * i];
874 
875  totalTime = reduced[5] - reduced[0];
876  accessTime = reduced[3] - reduced[2];
877 
878  IOR_point_t *point = (access == WRITE) ? &test->results[rep].write :
879  &test->results[rep].read;
880 
881  point->time = totalTime;
882 
883  if (verbose < VERBOSE_0)
884  return;
885 
886  bw = (double)point->aggFileSizeForBW / totalTime;
887 
888  /* For IOPS in this iteration, we divide the total amount of IOs from
889  * all ranks over the entire access time (first start -> last end). */
890  iops = (point->aggFileSizeForBW / params->transferSize) / accessTime;
891 
892  /* For Latency, we divide the total access time for each task over the
893  * number of I/Os issued from that task; then reduce and display the
894  * minimum (best) latency achieved. So what is reported is the average
895  * latency of all ops from a single task, then taking the minimum of
896  * that between all tasks. */
897  latency = (timer[3] - timer[2]) / (params->blockSize / params->transferSize);
898  MPI_CHECK(MPI_Reduce(&latency, &minlatency, 1, MPI_DOUBLE,
899  MPI_MIN, 0, testComm), "MPI_Reduce()");
900 
901  /* Only rank 0 tallies and prints the results. */
902  if (rank != 0)
903  return;
904 
905  PrintReducedResult(test, access, bw, iops, latency, diff, totalTime, rep);
906 }
907 
908 /*
909  * Check for file(s), then remove all files if file-per-proc, else single file.
910  *
911  */
912 static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t * test)
913 {
914  int tmpRankOffset = 0;
915  if (filePerProc) {
916  /* in random tasks, delete own file */
917  if (test->reorderTasksRandom == TRUE) {
918  tmpRankOffset = rankOffset;
919  rankOffset = 0;
920  GetTestFileName(testFileName, test);
921  }
922  if (backend->access(testFileName, F_OK, test) == 0) {
923  if (verbose >= VERBOSE_3) {
924  fprintf(out_logfile, "task %d removing %s\n", rank,
925  testFileName);
926  }
927  backend->delete(testFileName, test);
928  }
929  if (test->reorderTasksRandom == TRUE) {
930  rankOffset = tmpRankOffset;
931  GetTestFileName(testFileName, test);
932  }
933  } else {
934  if ((rank == 0) && (backend->access(testFileName, F_OK, test) == 0)) {
935  if (verbose >= VERBOSE_3) {
936  fprintf(out_logfile, "task %d removing %s\n", rank,
937  testFileName);
938  }
939  backend->delete(testFileName, test);
940  }
941  }
942 }
943 
944 /*
945  * Setup tests by parsing commandline and creating test script.
946  * Perform a sanity-check on the configured parameters.
947  */
948 static void InitTests(IOR_test_t *tests, MPI_Comm com)
949 {
950  int mpiNumNodes = 0;
951  int mpiNumTasks = 0;
952  int mpiNumTasksOnNode0 = 0;
953 
954  /*
955  * These default values are the same for every test and expensive to
956  * retrieve so just do it once.
957  */
958  mpiNumNodes = GetNumNodes(com);
959  mpiNumTasks = GetNumTasks(com);
960  mpiNumTasksOnNode0 = GetNumTasksOnNode0(com);
961 
962  /*
963  * Since there is no guarantee that anyone other than
964  * task 0 has the environment settings for the hints, pass
965  * the hint=value pair to everyone else in mpi_comm_world
966  */
967  DistributeHints();
968 
969  /* check validity of tests and create test queue */
970  while (tests != NULL) {
971  IOR_param_t *params = & tests->params;
972  params->testComm = com;
973 
974  /* use MPI values if not overridden on command-line */
975  if (params->numNodes == -1) {
976  params->numNodes = mpiNumNodes;
977  }
978  if (params->numTasks == -1) {
979  params->numTasks = mpiNumTasks;
980  } else if (params->numTasks > mpiNumTasks) {
981  if (rank == 0) {
982  fprintf(out_logfile,
983  "WARNING: More tasks requested (%d) than available (%d),",
984  params->numTasks, mpiNumTasks);
985  fprintf(out_logfile, " running with %d tasks.\n",
986  mpiNumTasks);
987  }
988  params->numTasks = mpiNumTasks;
989  }
990  if (params->numTasksOnNode0 == -1) {
991  params->numTasksOnNode0 = mpiNumTasksOnNode0;
992  }
993 
994  params->tasksBlockMapping = QueryNodeMapping(com,false);
995  params->expectedAggFileSize =
996  params->blockSize * params->segmentCount * params->numTasks;
997 
998  ValidateTests(&tests->params);
999  tests = tests->next;
1000  }
1001 
1002  init_clock();
1003 
1004  /* seed random number generator */
1006 }
1007 
1008 /*
1009  * Setup transfer buffers, creating and filling as needed.
1010  */
1011 static void XferBuffersSetup(IOR_io_buffers* ioBuffers, IOR_param_t* test,
1012  int pretendRank)
1013 {
1014  ioBuffers->buffer = aligned_buffer_alloc(test->transferSize);
1015 
1016  if (test->checkWrite || test->checkRead) {
1017  ioBuffers->checkBuffer = aligned_buffer_alloc(test->transferSize);
1018  }
1019  if (test->checkRead || test->checkWrite) {
1021  }
1022 
1023  return;
1024 }
1025 
1026 /*
1027  * Free transfer buffers.
1028  */
1029 static void XferBuffersFree(IOR_io_buffers* ioBuffers, IOR_param_t* test)
1030 
1031 {
1032  aligned_buffer_free(ioBuffers->buffer);
1033 
1034  if (test->checkWrite || test->checkRead) {
1035  aligned_buffer_free(ioBuffers->checkBuffer);
1036  }
1037  if (test->checkRead) {
1039  }
1040 
1041  return;
1042 }
1043 
1044 
1045 
1046 /*
1047  * malloc a buffer, touching every page in an attempt to defeat lazy allocation.
1048  */
1049 static void *malloc_and_touch(size_t size)
1050 {
1051  size_t page_size;
1052  char *buf;
1053  char *ptr;
1054 
1055  if (size == 0)
1056  return NULL;
1057 
1058  page_size = sysconf(_SC_PAGESIZE);
1059 
1060  buf = (char *)malloc(size);
1061  if (buf == NULL)
1062  return NULL;
1063 
1064  for (ptr = buf; ptr < buf+size; ptr += page_size) {
1065  *ptr = (char)1;
1066  }
1067 
1068  return (void *)buf;
1069 }
1070 
1071 static void file_hits_histogram(IOR_param_t *params)
1072 {
1073  int *rankoffs = NULL;
1074  int *filecont = NULL;
1075  int *filehits = NULL;
1076  int ifile;
1077  int jfile;
1078 
1079  if (rank == 0) {
1080  rankoffs = (int *)malloc(params->numTasks * sizeof(int));
1081  filecont = (int *)malloc(params->numTasks * sizeof(int));
1082  filehits = (int *)malloc(params->numTasks * sizeof(int));
1083  }
1084 
1085  MPI_CHECK(MPI_Gather(&rankOffset, 1, MPI_INT, rankoffs,
1086  1, MPI_INT, 0, mpi_comm_world),
1087  "MPI_Gather error");
1088 
1089  if (rank != 0)
1090  return;
1091 
1092  memset((void *)filecont, 0, params->numTasks * sizeof(int));
1093  for (ifile = 0; ifile < params->numTasks; ifile++) {
1094  filecont[(ifile + rankoffs[ifile]) % params->numTasks]++;
1095  }
1096  memset((void *)filehits, 0, params->numTasks * sizeof(int));
1097  for (ifile = 0; ifile < params->numTasks; ifile++)
1098  for (jfile = 0; jfile < params->numTasks; jfile++) {
1099  if (ifile == filecont[jfile])
1100  filehits[ifile]++;
1101  }
1102  fprintf(out_logfile, "#File Hits Dist:");
1103  jfile = 0;
1104  ifile = 0;
1105  while (jfile < params->numTasks && ifile < params->numTasks) {
1106  fprintf(out_logfile, " %d", filehits[ifile]);
1107  jfile += filehits[ifile], ifile++;
1108  }
1109  fprintf(out_logfile, "\n");
1110  free(rankoffs);
1111  free(filecont);
1112  free(filehits);
1113 }
1114 
1115 
1116 int test_time_elapsed(IOR_param_t *params, double startTime)
1117 {
1118  double endTime;
1119 
1120  if (params->maxTimeDuration == 0)
1121  return 0;
1122 
1123  endTime = startTime + (params->maxTimeDuration * 60);
1124 
1125  return GetTimeStamp() >= endTime;
1126 }
1127 
1128 /*
1129  * hog some memory as a rough simulation of a real application's memory use
1130  */
1131 static void *HogMemory(IOR_param_t *params)
1132 {
1133  size_t size;
1134  void *buf;
1135 
1136  if (params->memoryPerTask != 0) {
1137  size = params->memoryPerTask;
1138  } else if (params->memoryPerNode != 0) {
1139  if (verbose >= VERBOSE_3)
1140  fprintf(out_logfile, "This node hogging %ld bytes of memory\n",
1141  params->memoryPerNode);
1142  size = params->memoryPerNode / params->numTasksOnNode0;
1143  } else {
1144  return NULL;
1145  }
1146 
1147  if (verbose >= VERBOSE_3)
1148  fprintf(out_logfile, "This task hogging %ld bytes of memory\n", size);
1149 
1150  buf = malloc_and_touch(size);
1151  if (buf == NULL)
1152  ERR("malloc of simulated applciation buffer failed");
1153 
1154  return buf;
1155 }
1156 /*
1157  * Write times taken during each iteration of the test.
1158  */
1159 static void
1160 WriteTimes(IOR_param_t *test, const double *timer, const int iteration,
1161  const int access)
1162 {
1163  char timerName[MAX_STR];
1164 
1165  for (int i = 0; i < IOR_NB_TIMERS; i++) {
1166 
1167  if (access == WRITE) {
1168  switch (i) {
1169  case 0:
1170  strcpy(timerName, "write open start");
1171  break;
1172  case 1:
1173  strcpy(timerName, "write open stop");
1174  break;
1175  case 2:
1176  strcpy(timerName, "write start");
1177  break;
1178  case 3:
1179  strcpy(timerName, "write stop");
1180  break;
1181  case 4:
1182  strcpy(timerName, "write close start");
1183  break;
1184  case 5:
1185  strcpy(timerName, "write close stop");
1186  break;
1187  default:
1188  strcpy(timerName, "invalid timer");
1189  break;
1190  }
1191  }
1192  else {
1193  switch (i) {
1194  case 0:
1195  strcpy(timerName, "read open start");
1196  break;
1197  case 1:
1198  strcpy(timerName, "read open stop");
1199  break;
1200  case 2:
1201  strcpy(timerName, "read start");
1202  break;
1203  case 3:
1204  strcpy(timerName, "read stop");
1205  break;
1206  case 4:
1207  strcpy(timerName, "read close start");
1208  break;
1209  case 5:
1210  strcpy(timerName, "read close stop");
1211  break;
1212  default:
1213  strcpy(timerName, "invalid timer");
1214  break;
1215  }
1216  }
1217  fprintf(out_logfile, "Test %d: Iter=%d, Task=%d, Time=%f, %s\n",
1218  test->id, iteration, (int)rank, timer[i],
1219  timerName);
1220  }
1221 }
1222 /*
1223  * Using the test parameters, run iteration(s) of single test.
1224  */
1225 static void TestIoSys(IOR_test_t *test)
1226 {
1227  IOR_param_t *params = &test->params;
1228  IOR_results_t *results = test->results;
1229  char testFileName[MAX_STR];
1230  double timer[IOR_NB_TIMERS];
1231  double startTime;
1232  int pretendRank;
1233  int rep;
1234  void *fd;
1235  MPI_Group orig_group, new_group;
1236  int range[3];
1237  IOR_offset_t dataMoved; /* for data rate calculation */
1238  void *hog_buf;
1239  IOR_io_buffers ioBuffers;
1240 
1241  /* set up communicator for test */
1242  MPI_CHECK(MPI_Comm_group(mpi_comm_world, &orig_group),
1243  "MPI_Comm_group() error");
1244  range[0] = 0; /* first rank */
1245  range[1] = params->numTasks - 1; /* last rank */
1246  range[2] = 1; /* stride */
1247  MPI_CHECK(MPI_Group_range_incl(orig_group, 1, &range, &new_group),
1248  "MPI_Group_range_incl() error");
1249  MPI_CHECK(MPI_Comm_create(mpi_comm_world, new_group, &testComm),
1250  "MPI_Comm_create() error");
1251  MPI_CHECK(MPI_Group_free(&orig_group), "MPI_Group_Free() error");
1252  MPI_CHECK(MPI_Group_free(&new_group), "MPI_Group_Free() error");
1253  params->testComm = testComm;
1254  if (testComm == MPI_COMM_NULL) {
1255  /* tasks not in the group do not participate in this test */
1256  MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
1257  return;
1258  }
1259  if (rank == 0 && verbose >= VERBOSE_1) {
1260  fprintf(out_logfile, "Participating tasks: %d\n", params->numTasks);
1261  fflush(out_logfile);
1262  }
1263  if (rank == 0 && params->reorderTasks == TRUE && verbose >= VERBOSE_1) {
1264  fprintf(out_logfile,
1265  "Using reorderTasks '-C' (useful to avoid read cache in client)\n");
1266  fflush(out_logfile);
1267  }
1268  /* show test setup */
1269  if (rank == 0 && verbose >= VERBOSE_0)
1270  ShowSetup(params);
1271 
1272  hog_buf = HogMemory(params);
1273 
1274  pretendRank = (rank + rankOffset) % params->numTasks;
1275 
1276  /* IO Buffer Setup */
1277 
1278  if (params->setTimeStampSignature) { // initialize the buffer properly
1279  params->timeStampSignatureValue = (unsigned int) params->setTimeStampSignature;
1280  }
1281  XferBuffersSetup(&ioBuffers, params, pretendRank);
1282  reseed_incompressible_prng = TRUE; // reset pseudo random generator, necessary to guarantee the next call to FillBuffer produces the same value as it is right now
1283 
1284  /* Initial time stamp */
1285  startTime = GetTimeStamp();
1286 
1287  /* loop over test iterations */
1288  uint64_t params_saved_wearout = params->stoneWallingWearOutIterations;
1289  for (rep = 0; rep < params->repetitions; rep++) {
1290  PrintRepeatStart();
1291  /* Get iteration start time in seconds in task 0 and broadcast to
1292  all tasks */
1293  if (rank == 0) {
1294  if (! params->setTimeStampSignature) {
1295  time_t currentTime;
1296  if ((currentTime = time(NULL)) == -1) {
1297  ERR("cannot get current time");
1298  }
1299  params->timeStampSignatureValue =
1300  (unsigned int) currentTime;
1301  if (verbose >= VERBOSE_2) {
1302  fprintf(out_logfile,
1303  "Using Time Stamp %u (0x%x) for Data Signature\n",
1304  params->timeStampSignatureValue,
1305  params->timeStampSignatureValue);
1306  }
1307  }
1308  if (rep == 0 && verbose >= VERBOSE_0) {
1309  PrintTableHeader();
1310  }
1311  }
1312  MPI_CHECK(MPI_Bcast
1313  (&params->timeStampSignatureValue, 1, MPI_UNSIGNED, 0,
1314  testComm), "cannot broadcast start time value");
1315 
1316  FillBuffer(ioBuffers.buffer, params, 0, pretendRank);
1317  /* use repetition count for number of multiple files */
1318  if (params->multiFile)
1319  params->repCounter = rep;
1320 
1321  /*
1322  * write the file(s), getting timing between I/O calls
1323  */
1324 
1325  if (params->writeFile && !test_time_elapsed(params, startTime)) {
1326  GetTestFileName(testFileName, params);
1327  if (verbose >= VERBOSE_3) {
1328  fprintf(out_logfile, "task %d writing %s\n", rank,
1329  testFileName);
1330  }
1331  DelaySecs(params->interTestDelay);
1332  if (params->useExistingTestFile == FALSE) {
1333  RemoveFile(testFileName, params->filePerProc,
1334  params);
1335  }
1336 
1337  params->stoneWallingWearOutIterations = params_saved_wearout;
1338  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1339  params->open = WRITE;
1340  timer[0] = GetTimeStamp();
1341  fd = backend->create(testFileName, params);
1342  timer[1] = GetTimeStamp();
1343  if (params->intraTestBarriers)
1344  MPI_CHECK(MPI_Barrier(testComm),
1345  "barrier error");
1346  if (rank == 0 && verbose >= VERBOSE_1) {
1347  fprintf(out_logfile,
1348  "Commencing write performance test: %s",
1349  CurrentTimeString());
1350  }
1351  timer[2] = GetTimeStamp();
1352  dataMoved = WriteOrRead(params, &results[rep], fd, WRITE, &ioBuffers);
1353  if (params->verbose >= VERBOSE_4) {
1354  fprintf(out_logfile, "* data moved = %llu\n", dataMoved);
1355  fflush(out_logfile);
1356  }
1357  timer[3] = GetTimeStamp();
1358  if (params->intraTestBarriers)
1359  MPI_CHECK(MPI_Barrier(testComm),
1360  "barrier error");
1361  timer[4] = GetTimeStamp();
1362  backend->close(fd, params);
1363 
1364  timer[5] = GetTimeStamp();
1365  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1366 
1367  /* get the size of the file just written */
1368  results[rep].write.aggFileSizeFromStat =
1369  backend->get_file_size(params, testComm, testFileName);
1370 
1371  /* check if stat() of file doesn't equal expected file size,
1372  use actual amount of byte moved */
1373  CheckFileSize(test, dataMoved, rep, WRITE);
1374 
1375  if (verbose >= VERBOSE_3)
1376  WriteTimes(params, timer, rep, WRITE);
1377  ReduceIterResults(test, timer, rep, WRITE);
1378  if (params->outlierThreshold) {
1379  CheckForOutliers(params, timer, WRITE);
1380  }
1381 
1382  /* check if in this round we run write with stonewalling */
1383  if(params->deadlineForStonewalling > 0){
1384  params->stoneWallingWearOutIterations = results[rep].write.pairs_accessed;
1385  }
1386  }
1387 
1388  /*
1389  * perform a check of data, reading back data and comparing
1390  * against what was expected to be written
1391  */
1392  if (params->checkWrite && !test_time_elapsed(params, startTime)) {
1393  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1394  if (rank == 0 && verbose >= VERBOSE_1) {
1395  fprintf(out_logfile,
1396  "Verifying contents of the file(s) just written.\n");
1397  fprintf(out_logfile, "%s\n", CurrentTimeString());
1398  }
1399  if (params->reorderTasks) {
1400  /* move two nodes away from writing node */
1401  int shift = 1; /* assume a by-node (round-robin) mapping of tasks to nodes */
1402  if (params->tasksBlockMapping) {
1403  shift = params->numTasksOnNode0; /* switch to by-slot (contiguous block) mapping */
1404  }
1405  rankOffset = (2 * shift) % params->numTasks;
1406  }
1407 
1408  // update the check buffer
1409  FillBuffer(ioBuffers.readCheckBuffer, params, 0, (rank + rankOffset) % params->numTasks);
1410 
1411  reseed_incompressible_prng = TRUE; /* Re-Seed the PRNG to get same sequence back, if random */
1412 
1413  GetTestFileName(testFileName, params);
1414  params->open = WRITECHECK;
1415  fd = backend->open(testFileName, params);
1416  dataMoved = WriteOrRead(params, &results[rep], fd, WRITECHECK, &ioBuffers);
1417  backend->close(fd, params);
1418  rankOffset = 0;
1419  }
1420  /*
1421  * read the file(s), getting timing between I/O calls
1422  */
1423  if ((params->readFile || params->checkRead ) && !test_time_elapsed(params, startTime)) {
1424  /* check for stonewall */
1425  if(params->stoneWallingStatusFile){
1427  if(params->stoneWallingWearOutIterations == -1 && rank == 0){
1428  fprintf(out_logfile, "WARNING: Could not read back the stonewalling status from the file!\n");
1429  params->stoneWallingWearOutIterations = 0;
1430  }
1431  }
1432  int operation_flag = READ;
1433  if ( params->checkRead ){
1434  // actually read and then compare the buffer
1435  operation_flag = READCHECK;
1436  }
1437  /* Get rankOffset [file offset] for this process to read, based on -C,-Z,-Q,-X options */
1438  /* Constant process offset reading */
1439  if (params->reorderTasks) {
1440  /* move one node away from writing node */
1441  int shift = 1; /* assume a by-node (round-robin) mapping of tasks to nodes */
1442  if (params->tasksBlockMapping) {
1443  shift=params->numTasksOnNode0; /* switch to a by-slot (contiguous block) mapping */
1444  }
1445  rankOffset = (params->taskPerNodeOffset * shift) % params->numTasks;
1446  }
1447  /* random process offset reading */
1448  if (params->reorderTasksRandom) {
1449  /* this should not intefere with randomOffset within a file because GetOffsetArrayRandom */
1450  /* seeds every rand() call */
1451  int nodeoffset;
1452  unsigned int iseed0;
1453  nodeoffset = params->taskPerNodeOffset;
1454  nodeoffset = (nodeoffset < params->numNodes) ? nodeoffset : params->numNodes - 1;
1455  if (params->reorderTasksRandomSeed < 0)
1456  iseed0 = -1 * params->reorderTasksRandomSeed + rep;
1457  else
1458  iseed0 = params->reorderTasksRandomSeed;
1459  srand(rank + iseed0);
1460  {
1461  rankOffset = rand() % params->numTasks;
1462  }
1463  while (rankOffset <
1464  (nodeoffset * params->numTasksOnNode0)) {
1465  rankOffset = rand() % params->numTasks;
1466  }
1467  /* Get more detailed stats if requested by verbose level */
1468  if (verbose >= VERBOSE_2) {
1469  file_hits_histogram(params);
1470  }
1471  }
1472  if(operation_flag == READCHECK){
1473  FillBuffer(ioBuffers.readCheckBuffer, params, 0, (rank + rankOffset) % params->numTasks);
1474  }
1475 
1476  /* Using globally passed rankOffset, following function generates testFileName to read */
1477  GetTestFileName(testFileName, params);
1478 
1479  if (verbose >= VERBOSE_3) {
1480  fprintf(out_logfile, "task %d reading %s\n", rank,
1481  testFileName);
1482  }
1483  DelaySecs(params->interTestDelay);
1484  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1485  params->open = READ;
1486  timer[0] = GetTimeStamp();
1487  fd = backend->open(testFileName, params);
1488  timer[1] = GetTimeStamp();
1489  if (params->intraTestBarriers)
1490  MPI_CHECK(MPI_Barrier(testComm),
1491  "barrier error");
1492  if (rank == 0 && verbose >= VERBOSE_1) {
1493  fprintf(out_logfile,
1494  "Commencing read performance test: %s\n",
1495  CurrentTimeString());
1496  }
1497  timer[2] = GetTimeStamp();
1498  dataMoved = WriteOrRead(params, &results[rep], fd, operation_flag, &ioBuffers);
1499  timer[3] = GetTimeStamp();
1500  if (params->intraTestBarriers)
1501  MPI_CHECK(MPI_Barrier(testComm),
1502  "barrier error");
1503  timer[4] = GetTimeStamp();
1504  backend->close(fd, params);
1505  timer[5] = GetTimeStamp();
1506 
1507  /* get the size of the file just read */
1508  results[rep].read.aggFileSizeFromStat =
1509  backend->get_file_size(params, testComm,
1510  testFileName);
1511 
1512  /* check if stat() of file doesn't equal expected file size,
1513  use actual amount of byte moved */
1514  CheckFileSize(test, dataMoved, rep, READ);
1515 
1516  if (verbose >= VERBOSE_3)
1517  WriteTimes(params, timer, rep, READ);
1518  ReduceIterResults(test, timer, rep, READ);
1519  if (params->outlierThreshold) {
1520  CheckForOutliers(params, timer, READ);
1521  }
1522  }
1523 
1524  if (!params->keepFile
1525  && !(params->errorFound && params->keepFileWithError)) {
1526  double start, finish;
1527  start = GetTimeStamp();
1528  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1529  RemoveFile(testFileName, params->filePerProc, params);
1530  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1531  finish = GetTimeStamp();
1532  PrintRemoveTiming(start, finish, rep);
1533  } else {
1534  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1535  }
1536  params->errorFound = FALSE;
1537  rankOffset = 0;
1538 
1539  PrintRepeatEnd();
1540  }
1541 
1542  MPI_CHECK(MPI_Comm_free(&testComm), "MPI_Comm_free() error");
1543 
1544  if (params->summary_every_test) {
1547  } else {
1548  PrintShortSummary(test);
1549  }
1550 
1551  XferBuffersFree(&ioBuffers, params);
1552 
1553  if (hog_buf != NULL)
1554  free(hog_buf);
1555 
1556  /* Sync with the tasks that did not participate in this test */
1557  MPI_CHECK(MPI_Barrier(mpi_comm_world), "barrier error");
1558 
1559 }
1560 
1561 /*
1562  * Determine if valid tests from parameters.
1563  */
1564 static void ValidateTests(IOR_param_t * test)
1565 {
1566  IOR_param_t defaults;
1567  init_IOR_Param_t(&defaults);
1568 
1569  if (test->repetitions <= 0)
1570  WARN_RESET("too few test repetitions",
1571  test, &defaults, repetitions);
1572  if (test->numTasks <= 0)
1573  ERR("too few tasks for testing");
1574  if (test->interTestDelay < 0)
1575  WARN_RESET("inter-test delay must be nonnegative value",
1576  test, &defaults, interTestDelay);
1577  if (test->readFile != TRUE && test->writeFile != TRUE
1578  && test->checkRead != TRUE && test->checkWrite != TRUE)
1579  ERR("test must write, read, or check read/write file");
1580  if(! test->setTimeStampSignature && test->writeFile != TRUE && test->checkRead == TRUE)
1581  ERR("using readCheck only requires to write a timeStampSignature -- use -G");
1582  if (test->segmentCount < 0)
1583  ERR("segment count must be positive value");
1584  if ((test->blockSize % sizeof(IOR_size_t)) != 0)
1585  ERR("block size must be a multiple of access size");
1586  if (test->blockSize < 0)
1587  ERR("block size must be non-negative integer");
1588  if ((test->transferSize % sizeof(IOR_size_t)) != 0)
1589  ERR("transfer size must be a multiple of access size");
1590  if (test->setAlignment < 0)
1591  ERR("alignment must be non-negative integer");
1592  if (test->transferSize < 0)
1593  ERR("transfer size must be non-negative integer");
1594  if (test->transferSize == 0) {
1595  ERR("test will not complete with zero transfer size");
1596  } else {
1597  if ((test->blockSize % test->transferSize) != 0)
1598  ERR("block size must be a multiple of transfer size");
1599  }
1600  if (test->blockSize < test->transferSize)
1601  ERR("block size must not be smaller than transfer size");
1602 
1603  /* specific APIs */
1604  if ((strcasecmp(test->api, "MPIIO") == 0)
1605  && (test->blockSize < sizeof(IOR_size_t)
1606  || test->transferSize < sizeof(IOR_size_t)))
1607  ERR("block/transfer size may not be smaller than IOR_size_t for MPIIO");
1608  if ((strcasecmp(test->api, "HDF5") == 0)
1609  && (test->blockSize < sizeof(IOR_size_t)
1610  || test->transferSize < sizeof(IOR_size_t)))
1611  ERR("block/transfer size may not be smaller than IOR_size_t for HDF5");
1612  if ((strcasecmp(test->api, "NCMPI") == 0)
1613  && (test->blockSize < sizeof(IOR_size_t)
1614  || test->transferSize < sizeof(IOR_size_t)))
1615  ERR("block/transfer size may not be smaller than IOR_size_t for NCMPI");
1616  if ((test->useFileView == TRUE)
1617  && (sizeof(MPI_Aint) < 8) /* used for 64-bit datatypes */
1618  &&((test->numTasks * test->blockSize) >
1619  (2 * (IOR_offset_t) GIBIBYTE)))
1620  ERR("segment size must be < 2GiB");
1621  if ((strcasecmp(test->api, "POSIX") != 0) && test->singleXferAttempt)
1622  WARN_RESET("retry only available in POSIX",
1623  test, &defaults, singleXferAttempt);
1624  if (((strcasecmp(test->api, "POSIX") != 0)
1625  && (strcasecmp(test->api, "MPIIO") != 0)
1626  && (strcasecmp(test->api, "MMAP") != 0)
1627  && (strcasecmp(test->api, "HDFS") != 0)
1628  && (strcasecmp(test->api, "DFS") != 0)
1629  && (strcasecmp(test->api, "DAOS") != 0)
1630  && (strcasecmp(test->api, "Gfarm") != 0)
1631  && (strcasecmp(test->api, "RADOS") != 0)
1632  && (strcasecmp(test->api, "CEPHFS") != 0)) && test->fsync)
1633  WARN_RESET("fsync() not supported in selected backend",
1634  test, &defaults, fsync);
1635  if ((strcasecmp(test->api, "MPIIO") != 0) && test->preallocate)
1636  WARN_RESET("preallocation only available in MPIIO",
1637  test, &defaults, preallocate);
1638  if ((strcasecmp(test->api, "MPIIO") != 0) && test->useFileView)
1639  WARN_RESET("file view only available in MPIIO",
1640  test, &defaults, useFileView);
1641  if ((strcasecmp(test->api, "MPIIO") != 0) && test->useSharedFilePointer)
1642  WARN_RESET("shared file pointer only available in MPIIO",
1643  test, &defaults, useSharedFilePointer);
1644  if ((strcasecmp(test->api, "MPIIO") == 0) && test->useSharedFilePointer)
1645  WARN_RESET("shared file pointer not implemented",
1646  test, &defaults, useSharedFilePointer);
1647  if ((strcasecmp(test->api, "MPIIO") != 0) && test->useStridedDatatype)
1648  WARN_RESET("strided datatype only available in MPIIO",
1649  test, &defaults, useStridedDatatype);
1650  if ((strcasecmp(test->api, "MPIIO") == 0) && test->useStridedDatatype)
1651  WARN_RESET("strided datatype not implemented",
1652  test, &defaults, useStridedDatatype);
1653  if ((strcasecmp(test->api, "MPIIO") == 0)
1654  && test->useStridedDatatype && (test->blockSize < sizeof(IOR_size_t)
1655  || test->transferSize <
1656  sizeof(IOR_size_t)))
1657  ERR("need larger file size for strided datatype in MPIIO");
1658  if ((strcasecmp(test->api, "POSIX") == 0) && test->showHints)
1659  WARN_RESET("hints not available in POSIX",
1660  test, &defaults, showHints);
1661  if ((strcasecmp(test->api, "POSIX") == 0) && test->collective)
1662  WARN_RESET("collective not available in POSIX",
1663  test, &defaults, collective);
1664  if ((strcasecmp(test->api, "MMAP") == 0) && test->fsyncPerWrite
1665  && (test->transferSize & (sysconf(_SC_PAGESIZE) - 1)))
1666  ERR("transfer size must be aligned with PAGESIZE for MMAP with fsyncPerWrite");
1667 
1668  /* parameter consitency */
1669  if (test->reorderTasks == TRUE && test->reorderTasksRandom == TRUE)
1670  ERR("Both Constant and Random task re-ordering specified. Choose one and resubmit");
1671  if (test->randomOffset && test->reorderTasksRandom
1672  && test->filePerProc == FALSE)
1673  ERR("random offset and random reorder tasks specified with single-shared-file. Choose one and resubmit");
1674  if (test->randomOffset && test->reorderTasks
1675  && test->filePerProc == FALSE)
1676  ERR("random offset and constant reorder tasks specified with single-shared-file. Choose one and resubmit");
1677  if (test->randomOffset && test->checkRead)
1678  ERR("random offset not available with read check option (use write check)");
1679  if (test->randomOffset && test->storeFileOffset)
1680  ERR("random offset not available with store file offset option)");
1681 
1682 
1683  if ((strcasecmp(test->api, "MPIIO") == 0) && test->randomOffset
1684  && test->collective)
1685  ERR("random offset not available with collective MPIIO");
1686  if ((strcasecmp(test->api, "MPIIO") == 0) && test->randomOffset
1687  && test->useFileView)
1688  ERR("random offset not available with MPIIO fileviews");
1689  if ((strcasecmp(test->api, "HDF5") == 0) && test->randomOffset)
1690  ERR("random offset not available with HDF5");
1691  if ((strcasecmp(test->api, "NCMPI") == 0) && test->randomOffset)
1692  ERR("random offset not available with NCMPI");
1693  if ((strcasecmp(test->api, "HDF5") != 0) && test->individualDataSets)
1694  WARN_RESET("individual datasets only available in HDF5",
1695  test, &defaults, individualDataSets);
1696  if ((strcasecmp(test->api, "HDF5") == 0) && test->individualDataSets)
1697  WARN_RESET("individual data sets not implemented",
1698  test, &defaults, individualDataSets);
1699  if ((strcasecmp(test->api, "NCMPI") == 0) && test->filePerProc)
1700  ERR("file-per-proc not available in current NCMPI");
1701  if (test->noFill) {
1702  if (strcasecmp(test->api, "HDF5") != 0) {
1703  ERR("'no fill' option only available in HDF5");
1704  } else {
1705  /* check if hdf5 available */
1706 #if defined (H5_VERS_MAJOR) && defined (H5_VERS_MINOR)
1707  /* no-fill option not available until hdf5-1.6.x */
1708 #if (H5_VERS_MAJOR > 0 && H5_VERS_MINOR > 5)
1709  ;
1710 #else
1711  ERRF("'no fill' option not available in %s",
1712  test->apiVersion);
1713 #endif
1714 #else
1715  WARN("unable to determine HDF5 version for 'no fill' usage");
1716 #endif
1717  }
1718  }
1719  if (test->useExistingTestFile && test->lustre_set_striping)
1720  ERR("Lustre stripe options are incompatible with useExistingTestFile");
1721 
1722  /* allow the backend to validate the options */
1723  if(test->backend->check_params){
1724  int check = test->backend->check_params(test);
1725  if (check == 0){
1726  ERR("The backend returned that the test parameters are invalid.");
1727  }
1728  }
1729 }
1730 
1739 {
1740  IOR_offset_t i, j, k = 0;
1741  IOR_offset_t offsets;
1742  IOR_offset_t *offsetArray;
1743 
1744  /* count needed offsets */
1745  offsets = (test->blockSize / test->transferSize) * test->segmentCount;
1746 
1747  /* setup empty array */
1748  offsetArray =
1749  (IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
1750  if (offsetArray == NULL)
1751  ERR("malloc() failed");
1752  offsetArray[offsets] = -1; /* set last offset with -1 */
1753 
1754  /* fill with offsets */
1755  for (i = 0; i < test->segmentCount; i++) {
1756  for (j = 0; j < (test->blockSize / test->transferSize); j++) {
1757  offsetArray[k] = j * test->transferSize;
1758  if (test->filePerProc) {
1759  offsetArray[k] += i * test->blockSize;
1760  } else {
1761  offsetArray[k] +=
1762  (i * test->numTasks * test->blockSize)
1763  + (pretendRank * test->blockSize);
1764  }
1765  k++;
1766  }
1767  }
1768 
1769  return (offsetArray);
1770 }
1771 
1787 IOR_offset_t *GetOffsetArrayRandom(IOR_param_t * test, int pretendRank, int access)
1788 {
1789  int seed;
1790  IOR_offset_t i, value, tmp;
1791  IOR_offset_t offsets = 0;
1792  IOR_offset_t offsetCnt = 0;
1793  IOR_offset_t fileSize;
1794  IOR_offset_t *offsetArray;
1795 
1796  /* set up seed for random() */
1797  if (access == WRITE || access == READ) {
1798  test->randomSeed = seed = rand();
1799  } else {
1800  seed = test->randomSeed;
1801  }
1802  srand(seed);
1803 
1804  fileSize = test->blockSize * test->segmentCount;
1805  if (test->filePerProc == FALSE) {
1806  fileSize *= test->numTasks;
1807  }
1808 
1809  /* count needed offsets (pass 1) */
1810  for (i = 0; i < fileSize; i += test->transferSize) {
1811  if (test->filePerProc == FALSE) {
1812  // this counts which process get how many transferes in
1813  // a shared file
1814  if ((rand() % test->numTasks) == pretendRank) {
1815  offsets++;
1816  }
1817  } else {
1818  offsets++;
1819  }
1820  }
1821 
1822  /* setup empty array */
1823  offsetArray =
1824  (IOR_offset_t *) malloc((offsets + 1) * sizeof(IOR_offset_t));
1825  if (offsetArray == NULL)
1826  ERR("malloc() failed");
1827  offsetArray[offsets] = -1; /* set last offset with -1 */
1828 
1829  if (test->filePerProc) {
1830  /* fill array */
1831  for (i = 0; i < offsets; i++) {
1832  offsetArray[i] = i * test->transferSize;
1833  }
1834  } else {
1835  /* fill with offsets (pass 2) */
1836  srand(seed); /* need same seed to get same transfers as counted in the beginning*/
1837  for (i = 0; i < fileSize; i += test->transferSize) {
1838  if ((rand() % test->numTasks) == pretendRank) {
1839  offsetArray[offsetCnt] = i;
1840  offsetCnt++;
1841  }
1842  }
1843  }
1844  /* reorder array */
1845  for (i = 0; i < offsets; i++) {
1846  value = rand() % offsets;
1847  tmp = offsetArray[value];
1848  offsetArray[value] = offsetArray[i];
1849  offsetArray[i] = tmp;
1850  }
1851  SeedRandGen(test->testComm); /* synchronize seeds across tasks */
1852 
1853  return (offsetArray);
1854 }
1855 
1856 static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank,
1857  IOR_offset_t * transferCount, int * errors, IOR_param_t * test, int * fd, IOR_io_buffers* ioBuffers, int access){
1858  IOR_offset_t amtXferred = 0;
1859  IOR_offset_t transfer;
1860 
1861  void *buffer = ioBuffers->buffer;
1862  void *checkBuffer = ioBuffers->checkBuffer;
1863  void *readCheckBuffer = ioBuffers->readCheckBuffer;
1864 
1865  test->offset = offsetArray[pairCnt];
1866 
1867  transfer = test->transferSize;
1868  if (access == WRITE) {
1869  /* fills each transfer with a unique pattern
1870  * containing the offset into the file */
1871  if (test->storeFileOffset == TRUE) {
1872  FillBuffer(buffer, test, test->offset, pretendRank);
1873  }
1874  amtXferred =
1875  backend->xfer(access, fd, buffer, transfer, test);
1876  if (amtXferred != transfer)
1877  ERR("cannot write to file");
1878  if (test->interIODelay > 0){
1879  struct timespec wait = {test->interIODelay / 1000 / 1000, 1000l * (test->interIODelay % 1000000)};
1880  nanosleep( & wait, NULL);
1881  }
1882  } else if (access == READ) {
1883  amtXferred =
1884  backend->xfer(access, fd, buffer, transfer, test);
1885  if (amtXferred != transfer)
1886  ERR("cannot read from file");
1887  if (test->interIODelay > 0){
1888  struct timespec wait = {test->interIODelay / 1000 / 1000, 1000l * (test->interIODelay % 1000000)};
1889  nanosleep( & wait, NULL);
1890  }
1891  } else if (access == WRITECHECK) {
1892  memset(checkBuffer, 'a', transfer);
1893 
1894  if (test->storeFileOffset == TRUE) {
1895  FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
1896  }
1897 
1898  amtXferred = backend->xfer(access, fd, checkBuffer, transfer, test);
1899  if (amtXferred != transfer)
1900  ERR("cannot read from file write check");
1901  (*transferCount)++;
1902  *errors += CompareBuffers(readCheckBuffer, checkBuffer, transfer,
1903  *transferCount, test,
1904  WRITECHECK);
1905  } else if (access == READCHECK) {
1906  memset(checkBuffer, 'a', transfer);
1907 
1908  amtXferred = backend->xfer(access, fd, checkBuffer, transfer, test);
1909  if (amtXferred != transfer){
1910  ERR("cannot read from file");
1911  }
1912  if (test->storeFileOffset == TRUE) {
1913  FillBuffer(readCheckBuffer, test, test->offset, pretendRank);
1914  }
1915  *errors += CompareBuffers(readCheckBuffer, checkBuffer, transfer, *transferCount, test, READCHECK);
1916  }
1917  return amtXferred;
1918 }
1919 
1920 /*
1921  * Write or Read data to file(s). This loops through the strides, writing
1922  * out the data to each block in transfer sizes, until the remainder left is 0.
1923  */
1925  void *fd, const int access, IOR_io_buffers *ioBuffers)
1926 {
1927  int errors = 0;
1928  IOR_offset_t transferCount = 0;
1929  uint64_t pairCnt = 0;
1930  IOR_offset_t *offsetArray;
1931  int pretendRank;
1932  IOR_offset_t dataMoved = 0; /* for data rate calculation */
1933  double startForStonewall;
1934  int hitStonewall;
1935  IOR_point_t *point = ((access == WRITE) || (access == WRITECHECK)) ?
1936  &results->write : &results->read;
1937 
1938  /* initialize values */
1939  pretendRank = (rank + rankOffset) % test->numTasks;
1940 
1941  if (test->randomOffset) {
1942  offsetArray = GetOffsetArrayRandom(test, pretendRank, access);
1943  } else {
1944  offsetArray = GetOffsetArraySequential(test, pretendRank);
1945  }
1946 
1947  startForStonewall = GetTimeStamp();
1948  hitStonewall = 0;
1949 
1950  /* loop over offsets to access */
1951  while ((offsetArray[pairCnt] != -1) && !hitStonewall ) {
1952  dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
1953  pairCnt++;
1954 
1955  hitStonewall = ((test->deadlineForStonewalling != 0
1956  && (GetTimeStamp() - startForStonewall)
1957  > test->deadlineForStonewalling)) || (test->stoneWallingWearOutIterations != 0 && pairCnt == test->stoneWallingWearOutIterations) ;
1958 
1959  if ( test->collective && test->deadlineForStonewalling ) {
1960  // if collective-mode, you'll get a HANG, if some rank 'accidentally' leave this loop
1961  // it absolutely must be an 'all or none':
1962  MPI_CHECK(MPI_Bcast(&hitStonewall, 1, MPI_INT, 0, MPI_COMM_WORLD), "hitStonewall broadcast failed");
1963  }
1964 
1965  }
1966  if (test->stoneWallingWearOut){
1967  if (verbose >= VERBOSE_1){
1968  fprintf(out_logfile, "%d: stonewalling pairs accessed: %lld\n", rank, (long long) pairCnt);
1969  }
1970  long long data_moved_ll = (long long) dataMoved;
1971  long long pairs_accessed_min = 0;
1972  MPI_CHECK(MPI_Allreduce(& pairCnt, &point->pairs_accessed,
1973  1, MPI_LONG_LONG_INT, MPI_MAX, testComm), "cannot reduce pairs moved");
1974  double stonewall_runtime = GetTimeStamp() - startForStonewall;
1975  point->stonewall_time = stonewall_runtime;
1976  MPI_CHECK(MPI_Reduce(& pairCnt, & pairs_accessed_min,
1977  1, MPI_LONG_LONG_INT, MPI_MIN, 0, testComm), "cannot reduce pairs moved");
1978  MPI_CHECK(MPI_Reduce(& data_moved_ll, &point->stonewall_min_data_accessed,
1979  1, MPI_LONG_LONG_INT, MPI_MIN, 0, testComm), "cannot reduce pairs moved");
1980  MPI_CHECK(MPI_Reduce(& data_moved_ll, &point->stonewall_avg_data_accessed,
1981  1, MPI_LONG_LONG_INT, MPI_SUM, 0, testComm), "cannot reduce pairs moved");
1982 
1983  if(rank == 0){
1984  fprintf(out_logfile, "stonewalling pairs accessed min: %lld max: %zu -- min data: %.1f GiB mean data: %.1f GiB time: %.1fs\n",
1985  pairs_accessed_min, point->pairs_accessed,
1986  point->stonewall_min_data_accessed /1024.0 / 1024 / 1024, point->stonewall_avg_data_accessed / 1024.0 / 1024 / 1024 / test->numTasks , point->stonewall_time);
1987  point->stonewall_min_data_accessed *= test->numTasks;
1988  }
1989  if(pairCnt != point->pairs_accessed){
1990  // some work needs still to be done !
1991  for(; pairCnt < point->pairs_accessed; pairCnt++ ) {
1992  dataMoved += WriteOrReadSingle(pairCnt, offsetArray, pretendRank, & transferCount, & errors, test, fd, ioBuffers, access);
1993  }
1994  }
1995  }else{
1996  point->pairs_accessed = pairCnt;
1997  }
1998 
1999 
2000  totalErrorCount += CountErrors(test, access, errors);
2001 
2002  free(offsetArray);
2003 
2004  if (access == WRITE && test->fsync == TRUE) {
2005  backend->fsync(fd, test); /*fsync after all accesses */
2006  }
2007  return (dataMoved);
2008 }
int reorderTasks
Definition: ior.h:112
int uniqueDir
Definition: ior.h:134
IOR_offset_t setAlignment
Definition: ior.h:173
IOR_offset_t(* get_file_size)(IOR_param_t *, MPI_Comm, char *)
Definition: aiori.h:79
int GetNumTasks(MPI_Comm comm)
Definition: utilities.c:311
int quitOnError
Definition: ior.h:121
int reorderTasksRandomSeed
Definition: ior.h:115
int ior_main(int argc, char **argv)
Definition: ior.c:101
size_t pairs_accessed
Definition: ior.h:216
int showHints
Definition: ior.h:132
long long stonewall_avg_data_accessed
Definition: ior.h:220
char * hdfs_user
Definition: ior.h:176
void(* delete)(char *, IOR_param_t *)
Definition: aiori.h:76
int errors
Definition: ior.h:228
static IOR_offset_t WriteOrRead(IOR_param_t *test, IOR_results_t *results, void *fd, const int access, IOR_io_buffers *ioBuffers)
Definition: ior.c:1924
int multiFile
Definition: ior.h:105
#define ERR(MSG)
Definition: iordef.h:184
IOR_offset_t * GetOffsetArraySequential(IOR_param_t *test, int pretendRank)
Definition: ior.c:1738
static void file_hits_histogram(IOR_param_t *params)
Definition: ior.c:1071
static void DisplayOutliers(int numTasks, double timerVal, char *timeString, int access, int outlierThreshold)
Definition: ior.c:233
void PrintTestEnds()
Definition: ior-output.c:212
unsigned int incompressibleSeed
Definition: ior.h:149
#define VERBOSE_0
Definition: iordef.h:101
char * GetPlatformName()
Definition: ior.c:664
IOR_offset_t aggFileSizeFromStat
Definition: ior.h:222
unsigned int timeStampSignatureValue
Definition: ior.h:146
int filePerProc
Definition: ior.h:111
void PrintRepeatStart()
Definition: ior-output.c:203
static int size
Definition: mdtest.c:91
#define VERBOSE_3
Definition: iordef.h:104
double stonewall_time
Definition: ior.h:218
int noFill
Definition: ior.h:172
static void InitTests(IOR_test_t *, MPI_Comm)
Definition: ior.c:948
int repetitions
Definition: ior.h:103
int64_t ReadStoneWallingIterations(char *const filename)
Definition: utilities.c:795
IOR_offset_t segmentCount
Definition: ior.h:123
int useStridedDatatype
Definition: ior.h:131
static void aligned_buffer_free(void *buf)
Definition: ior.c:504
#define WARN_RESET(MSG, TO_STRUCT_PTR, FROM_STRUCT_PTR, MEMBER)
Definition: iordef.h:134
void * checkBuffer
Definition: ior.h:62
int keepFile
Definition: ior.h:118
void PrintHeader(int argc, char **argv)
Definition: ior-output.c:253
char ** environ
static void XferBuffersFree(IOR_io_buffers *ioBuffers, IOR_param_t *test)
Definition: ior.c:1029
int checkRead
Definition: ior.h:117
void PrintLongSummaryOneTest(IOR_test_t *test)
Definition: ior-output.c:641
int useSharedFilePointer
Definition: ior.h:130
int test_time_elapsed(IOR_param_t *params, double startTime)
Definition: ior.c:1116
int numTasksOnNode0
Definition: ior.h:101
void FreeResults(IOR_test_t *test)
Definition: ior.c:519
static void ValidateTests(IOR_param_t *)
Definition: ior.c:1564
IOR_offset_t transferSize
Definition: ior.h:125
size_t memoryPerNode
Definition: ior.h:152
#define WRITECHECK
Definition: iordef.h:96
int(* check_params)(IOR_param_t *)
Definition: aiori.h:89
IOR_param_t params
Definition: ior.h:235
void PrintLongSummaryHeader()
Definition: ior-output.c:651
#define READCHECK
Definition: iordef.h:98
int storeFileOffset
Definition: ior.h:136
int errorFound
Definition: ior.h:120
IOR_offset_t aggFileSizeFromXfer
Definition: ior.h:223
double sd
Definition: ior-internal.h:37
int QueryNodeMapping(MPI_Comm comm, int print_nodemap)
Definition: utilities.c:230
static IOR_offset_t WriteOrReadSingle(IOR_offset_t pairCnt, IOR_offset_t *offsetArray, int pretendRank, IOR_offset_t *transferCount, int *errors, IOR_param_t *test, int *fd, IOR_io_buffers *ioBuffers, int access)
Definition: ior.c:1856
static int totalErrorCount
Definition: ior.c:48
size_t part_number
Definition: ior.h:184
char * apiVersion
Definition: ior.h:91
static void * HogMemory(IOR_param_t *params)
Definition: ior.c:1131
int summary_every_test
Definition: ior.h:133
static void DestroyTest(IOR_test_t *test)
Definition: ior.c:546
static void ReduceIterResults(IOR_test_t *test, double *timer, const int rep, const int access)
Definition: ior.c:851
int numNodes
Definition: ior.h:100
int setTimeStampSignature
Definition: ior.h:145
int hdfs_replicas
Definition: ior.h:180
unsigned int openFlags
Definition: ior.h:88
int fsyncPerWrite
Definition: ior.h:162
int interTestDelay
Definition: ior.h:106
#define GIBIBYTE
Definition: iordef.h:88
int(* access)(const char *path, int mode, IOR_param_t *param)
Definition: aiori.h:83
int lustre_start_ost
Definition: ior.h:197
#define WRITE
Definition: iordef.h:95
#define EWARN(MSG)
Definition: iordef.h:169
IOR_test_t * ior_run(int argc, char **argv, MPI_Comm world_com, FILE *world_out)
Definition: ior.c:61
int maxTimeDuration
Definition: ior.h:142
char * testFileName
Definition: ior.h:93
void(* close)(void *, IOR_param_t *)
Definition: aiori.h:75
#define VERBOSE_5
Definition: iordef.h:106
char * stoneWallingStatusFile
Definition: ior.h:140
unsigned int mode
Definition: ior.h:87
void ShowTestStart(IOR_param_t *params)
Definition: ior-output.c:320
#define READ
Definition: iordef.h:97
MPI_Comm testComm
Definition: ior.h:166
int taskPerNodeOffset
Definition: ior.h:113
void init_clock()
Definition: utilities.c:775
#define IOR_CREAT
Definition: aiori.h:38
static char ** ParseFileName(char *, int *)
Definition: ior.c:708
void *(* open)(char *, IOR_param_t *)
Definition: aiori.h:72
double sum
Definition: ior-internal.h:38
void aiori_initialize(IOR_test_t *tests)
Definition: aiori.c:263
int fsync
Definition: ior.h:163
double var
Definition: ior-internal.h:36
struct IOR_test_t * next
Definition: ior.h:237
IOR_offset_t * GetOffsetArrayRandom(IOR_param_t *test, int pretendRank, int access)
Definition: ior.c:1787
hdfsFS hdfs_fs
Definition: ior.h:179
#define IOR_IRGRP
Definition: aiori.h:49
double wall_clock_delta
Definition: utilities.c:720
tPort hdfs_name_node_port
Definition: ior.h:178
int outlierThreshold
Definition: ior.h:143
int intraTestBarriers
Definition: ior.h:210
void GetTestFileName(char *testFileName, IOR_param_t *test)
Definition: ior.c:749
MPI_Comm testComm
Definition: utilities.c:60
int reorderTasksRandom
Definition: ior.h:114
void aiori_finalize(IOR_test_t *tests)
Definition: aiori.c:281
int checkWrite
Definition: ior.h:116
IOR_point_t write
Definition: ior.h:229
unsigned int reseed_incompressible_prng
Definition: ior.c:626
void(* fsync)(void *, IOR_param_t *)
Definition: aiori.h:78
void ShowSetup(IOR_param_t *params)
Definition: ior-output.c:413
void SeedRandGen(MPI_Comm testComm)
Definition: utilities.c:678
Definition: ior.h:59
IOR_offset_t aggFileSizeForBW
Definition: ior.h:224
int verbose
Definition: ior.h:144
static void XferBuffersSetup(IOR_io_buffers *ioBuffers, IOR_param_t *test, int pretendRank)
Definition: ior.c:1011
char * CurrentTimeString(void)
Definition: utilities.c:184
void PrintRemoveTiming(double start, double finish, int rep)
Definition: ior-output.c:775
#define MPI_CHECK(MPI_STATUS, MSG)
Definition: iordef.h:224
static void FillBuffer(void *buffer, IOR_param_t *test, unsigned long long offset, int fillrank)
Definition: ior.c:629
double time
Definition: ior.h:215
IOR_point_t read
Definition: ior.h:230
static void RemoveFile(char *testFileName, int filePerProc, IOR_param_t *test)
Definition: ior.c:912
static void CheckForOutliers(IOR_param_t *test, const double *timer, const int access)
Definition: ior.c:276
IOR_offset_t expectedAggFileSize
Definition: ior.h:127
char * platform
Definition: ior.h:92
int GetNumNodes(MPI_Comm comm)
Definition: utilities.c:274
int singleXferAttempt
Definition: ior.h:161
static void DestroyTests(IOR_test_t *tests_head)
Definition: ior.c:552
Definition: ior.h:48
#define IOR_IRUSR
Definition: aiori.h:45
int interIODelay
Definition: ior.h:107
FILE * out_resultfile
Definition: utilities.c:63
double GetTimeStamp(void)
Definition: utilities.c:726
static void WriteTimes(IOR_param_t *test, const double *timer, const int iteration, const int access)
Definition: ior.c:1160
void PrintShortSummary(IOR_test_t *test)
Definition: ior-output.c:696
int stoneWallingWearOut
Definition: ior.h:138
static const ior_aiori_t * backend
Definition: ior.c:49
void PrintRepeatEnd()
Definition: ior-output.c:197
int(* mkdir)(const char *path, mode_t mode, IOR_param_t *param)
Definition: aiori.h:81
long long stonewall_min_data_accessed
Definition: ior.h:219
IOR_test_t * CreateTest(IOR_param_t *init_params, int test_num)
Definition: ior.c:530
#define IOR_IWGRP
Definition: aiori.h:50
char * URI
Definition: ior.h:183
static void TestIoSys(IOR_test_t *)
Definition: ior.c:1225
void * buffer
Definition: ior.h:61
void PrintTableHeader()
Definition: ior-output.c:18
void DistributeHints(void)
Definition: ior.c:565
void PrintLongSummaryAllTests(IOR_test_t *tests_head)
Definition: ior-output.c:670
static size_t CompareBuffers(void *expectedBuffer, void *unknownBuffer, size_t size, IOR_offset_t transferCount, IOR_param_t *test, int access)
Definition: ior.c:342
static char hostname[MAX_PATHLEN]
Definition: mdtest.c:97
void PrintReducedResult(IOR_test_t *test, int access, double bw, double iops, double latency, double *diff_subset, double totalTime, int rep)
Definition: ior-output.c:222
int keepFileWithError
Definition: ior.h:119
int randomSeed
Definition: ior.h:148
#define FALSE
Definition: iordef.h:71
int rankOffset
Definition: utilities.c:58
int useExistingTestFile
Definition: ior.h:135
enum PACKET_TYPE dataPacketType
Definition: ior.h:156
int beegfs_numTargets
Definition: ior.h:206
void init_IOR_Param_t(IOR_param_t *p)
Definition: ior.c:175
int useFileView
Definition: ior.h:129
int readFile
Definition: ior.h:109
void *(* create)(char *, IOR_param_t *)
Definition: aiori.h:70
long long int IOR_size_t
Definition: iordef.h:123
#define WARN(MSG)
Definition: iordef.h:144
void * readCheckBuffer
Definition: ior.h:63
int tasksBlockMapping
Definition: ior.h:102
int hdfs_block_size
Definition: ior.h:181
int randomOffset
Definition: ior.h:150
int numTasks
Definition: ior.h:99
size_t memoryPerTask
Definition: ior.h:151
const char * aiori_default(void)
Definition: aiori.c:350
#define VERBOSE_2
Definition: iordef.h:103
#define IOR_NB_TIMERS
Definition: ior.c:44
int individualDataSets
Definition: ior.h:171
int writeFile
Definition: ior.h:110
static void * aligned_buffer_alloc(size_t size)
Definition: ior.c:474
uint64_t stoneWallingWearOutIterations
Definition: ior.h:139
#define MAX_STR
Definition: iordef.h:108
#define MAX_HINTS
Definition: iordef.h:109
int collective
Definition: ior.h:122
IOR_offset_t offset
Definition: ior.h:126
static int CountErrors(IOR_param_t *test, int access, int errors)
Definition: ior.c:442
#define VERBOSE_4
Definition: iordef.h:105
#define MAX_PATHLEN
Definition: utilities.h:33
double mean
Definition: ior-internal.h:35
static void * malloc_and_touch(size_t size)
Definition: ior.c:1049
int open
Definition: ior.h:108
const struct ior_aiori * backend
Definition: ior.h:85
static void FillIncompressibleBuffer(void *buffer, IOR_param_t *test)
Definition: ior.c:612
static char * PrependDir(IOR_param_t *, char *)
Definition: ior.c:798
#define IOR_RDWR
Definition: aiori.h:36
void DelaySecs(int delay)
Definition: utilities.c:832
#define VERBOSE_1
Definition: iordef.h:102
IOR_results_t * results
Definition: ior.h:236
int verbose
Definition: utilities.c:59
IOR_test_t * ParseCommandLine(int argc, char **argv)
MPI_Comm mpi_comm_world
Definition: utilities.c:61
int preallocate
Definition: ior.h:128
int deadlineForStonewalling
Definition: ior.h:137
char * api
Definition: ior.h:90
#define FILENAME_DELIMITER
Definition: iordef.h:116
int repCounter
Definition: ior.h:104
static void CheckFileSize(IOR_test_t *test, IOR_offset_t dataMoved, int rep, const int access)
Definition: ior.c:298
FILE * out_logfile
Definition: utilities.c:62
#define ERRF(FORMAT,...)
Definition: iordef.h:175
long long int IOR_offset_t
Definition: iordef.h:122
#define IOR_IWUSR
Definition: aiori.h:46
int rank
Definition: utilities.c:57
int numTasks
IOR_offset_t blockSize
Definition: ior.h:124
int GetNumTasksOnNode0(MPI_Comm comm)
Definition: utilities.c:349
#define TRUE
Definition: iordef.h:75
IOR_offset_t(* xfer)(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
Definition: aiori.h:73
int lustre_set_striping
Definition: ior.h:198
void ShowTestEnd(IOR_test_t *tptr)
Definition: ior-output.c:397
const char * hdfs_name_node
Definition: ior.h:177
void * safeMalloc(uint64_t size)
Definition: utilities.c:68
int beegfs_chunkSize
Definition: ior.h:207
#define NULL
Definition: iordef.h:79
int id
Definition: ior.h:209
void AllocResults(IOR_test_t *test)
Definition: ior.c:509