69 # include <sys/ioctl.h> 93 # define lseek64 lseek 141 .enable_mdtest =
true 167 if (init_values !=
NULL){
172 hdfs_user = getenv(
"USER");
176 o->
user = strdup(hdfs_user);
191 memcpy(help, h,
sizeof(h));
199 return hdfsCreateDirectory(o->
fs, path);
205 return hdfsDelete(o->
fs, path, 1);
211 return hdfsExists(o->
fs, path);
218 stat = hdfsGetPathInfo(o->
fs, path);
222 memset(buf, 0,
sizeof(
struct stat));
223 buf->st_atime = stat->mLastAccess;
224 buf->st_size = stat->mSize;
225 buf->st_mtime = stat->mLastMod;
226 buf->st_mode = stat->mPermissions;
228 hdfsFreeFileInfo(stat, 1);
236 stat->
f_bsize = hdfsGetDefaultBlockSize(o->
fs);
237 stat->
f_blocks = hdfsGetCapacity(o->
fs) / hdfsGetDefaultBlockSize(o->
fs);
255 WARN(
"cannot use O_DIRECT");
256 # define O_DIRECT 000000 258 # define O_DIRECT O_DIRECTIO 278 printf(
"-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n",
286 printf(
"<- hdfs_connect [nothing to do]\n");
292 struct hdfsBuilder* builder = hdfsNewBuilder();
294 ERR(
"couldn't create an hdfsBuilder");
297 hdfsBuilderSetForceNewInstance ( builder );
299 hdfsBuilderSetNameNode ( builder, o->
name_node );
301 hdfsBuilderSetUserName ( builder, o->
user );
304 o->
fs = hdfsBuilderConnect( builder );
306 ERR(
"hdsfsBuilderConnect failed");
309 printf(
"<- hdfs_connect [success]\n");
315 printf(
"-> hdfs_disconnect\n");
318 hdfsDisconnect( o->
fs );
322 printf(
"<- hdfs_disconnect\n");
334 printf(
"-> HDFS_Create_Or_Open\n");
338 hdfsFile hdfs_file =
NULL;
339 int fd_oflags = 0, hdfs_return;
354 ERR(
"Opening or creating a file in RDWR is not implemented in HDFS" );
358 fprintf( stdout,
"Opening or creating a file in Exclusive mode is not implemented in HDFS\n" );
362 fprintf( stdout,
"Opening or creating a file for appending is not implemented in HDFS\n" );
369 if ( createFile ==
TRUE ) {
378 fd_oflags |= O_WRONLY;
380 fd_oflags |= O_TRUNC;
381 fd_oflags |= O_WRONLY;
386 fd_oflags |= O_TRUNC;
387 fd_oflags |= O_WRONLY;
391 fd_oflags |= O_RDONLY;
409 if (( flags & IOR_WRONLY ) && ( ! hints->
filePerProc ) && (
rank != 0 )) {
418 printf(
"\thdfsOpenFile(%p, %s, 0%o, %lld, %d, %lld)\n",
428 ERR(
"Failed to open the file" );
435 if (( flags & IOR_WRONLY ) &&
443 printf(
"<- HDFS_Create_Or_Open\n");
445 return ((
void *) hdfs_file );
454 printf(
"-> HDFS_Create\n");
458 printf(
"<- HDFS_Create\n");
468 printf(
"-> HDFS_Open\n");
473 printf(
"<- HDFS_Open( ... TRUE)\n");
479 printf(
"<- HDFS_Open( ... FALSE)\n");
492 printf(
"-> HDFS_Xfer(acc:%d, file:%p, buf:%p, len:%llu, %p)\n",
493 access, file, buffer, length, param);
497 long long remaining = (
long long)length;
498 char* ptr = (
char *)buffer;
501 hdfsFile hdfs_file = (hdfsFile)file;
504 while ( remaining > 0 ) {
507 if (access ==
WRITE) {
509 fprintf( stdout,
"task %d writing to offset %lld\n",
511 offset + length - remaining);
515 printf(
"\thdfsWrite( %p, %p, %p, %lld)\n",
516 hdfs_fs, hdfs_file, ptr, remaining );
518 rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
520 ERR(
"hdfsWrite() failed" );
530 fprintf( stdout,
"task %d reading from offset %lld\n",
531 rank, offset + length - remaining );
535 printf(
"\thdfsRead( %p, %p, %p, %lld)\n",
536 hdfs_fs, hdfs_file, ptr, remaining );
538 rc = hdfsPread(hdfs_fs, hdfs_file, offset, ptr, remaining);
540 ERR(
"hdfs_read() returned EOF prematurely" );
544 ERR(
"hdfs_read() failed" );
551 if ( rc < remaining ) {
552 fprintf(stdout,
"WARNING: Task %d, partial %s, %lld of %lld bytes at offset %lld\n",
554 access ==
WRITE ?
"hdfsWrite()" :
"hdfs_read()",
556 offset + length - remaining );
559 MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ),
"barrier error" );
563 ERR(
"too many retries -- aborting" );
568 assert( rc <= remaining );
577 rc = hdfsHFlush(hdfs_fs, hdfs_file);
579 WARN(
"Error during flush");
584 printf(
"<- HDFS_Xfer\n");
595 hdfsFile hdfs_file = (hdfsFile)fd;
598 printf(
"\thdfsFlush(%p, %p)\n", hdfs_fs, hdfs_file);
600 if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
602 EWARN(
"hdfsFlush() failed" );
612 printf(
"-> HDFS_Close\n");
617 hdfsFile hdfs_file = (hdfsFile)fd;
619 if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) {
620 ERR(
"hdfsCloseFile() failed" );
624 printf(
"<- HDFS_Close\n");
636 printf(
"-> HDFS_Delete\n");
646 ERR(
"Can't delete a file without an HDFS connection" );
648 if ( hdfsDelete( o->
fs, testFileName, 0 ) != 0 ) {
649 sprintf(errmsg,
"[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
655 printf(
"<- HDFS_Delete\n");
666 char * testFileName) {
668 printf(
"-> HDFS_GetFileSize(%s)\n", testFileName);
680 printf(
"\thdfsGetPathInfo(%s) ...", testFileName);
684 hdfsFileInfo* info = hdfsGetPathInfo( o->
fs, testFileName );
686 ERR(
"hdfsGetPathInfo() failed" );
688 printf(
"done.\n");fflush(stdout);
691 aggFileSizeFromStat = info->mSize;
694 printf(
"<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat);
696 return ( aggFileSizeFromStat );
static void * HDFS_Create_Or_Open(char *testFileName, int flags, aiori_mod_opt_t *param, unsigned char createFile)
static int HDFS_access(const char *path, int mode, aiori_mod_opt_t *options)
static int HDFS_stat(const char *path, struct stat *buf, aiori_mod_opt_t *options)
static aiori_fd_t * HDFS_Create(char *testFileName, int flags, aiori_mod_opt_t *param)
static int HDFS_rmdir(const char *path, aiori_mod_opt_t *options)
static void HDFS_Delete(char *testFileName, aiori_mod_opt_t *param)
static void HDFS_Fsync(aiori_fd_t *, aiori_mod_opt_t *)
static aiori_xfer_hint_t * hints
struct benchmark_options o
static IOR_offset_t HDFS_Xfer(int access, aiori_fd_t *file, IOR_size_t *buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t *param)
#define MPI_CHECK(MPI_STATUS, MSG)
char * aiori_get_version()
static int HDFS_statfs(const char *path, ior_aiori_statfs_t *stat, aiori_mod_opt_t *options)
static option_help options[]
static void hdfs_xfer_hints(aiori_xfer_hint_t *params)
static int HDFS_mkdir(const char *path, mode_t mode, aiori_mod_opt_t *options)
void hdfs_set_o_direct_flag(int *fd)
IOR_offset_t transferSize
static option_help * HDFS_options(aiori_mod_opt_t **init_backend_options, aiori_mod_opt_t *init_values)
static void hdfs_connect(hdfs_options_t *o)
static void HDFS_Close(aiori_fd_t *, aiori_mod_opt_t *)
static aiori_fd_t * HDFS_Open(char *testFileName, int flags, aiori_mod_opt_t *param)
static void hdfs_disconnect(hdfs_options_t *o)
static IOR_offset_t HDFS_GetFileSize(aiori_mod_opt_t *, char *)
long long int IOR_offset_t