69 # include <sys/ioctl.h> 94 # define lseek64 lseek 141 WARN(
"cannot use O_DIRECT");
142 # define O_DIRECT 000000 144 # define O_DIRECT O_DIRECTIO 164 printf(
"-> hdfs_connect [nn:\"%s\", port:%d, user:%s]\n",
172 printf(
"<- hdfs_connect [nothing to do]\n");
178 struct hdfsBuilder* builder = hdfsNewBuilder();
182 hdfsBuilderSetForceNewInstance ( builder );
186 hdfsBuilderSetUserName ( builder, param->
hdfs_user );
189 param->
hdfs_fs = hdfsBuilderConnect( builder );
194 printf(
"<- hdfs_connect [success]\n");
200 printf(
"-> hdfs_disconnect\n");
203 hdfsDisconnect( param->
hdfs_fs );
207 printf(
"<- hdfs_disconnect\n");
219 printf(
"-> HDFS_Create_Or_Open\n");
222 hdfsFile hdfs_file =
NULL;
223 int fd_oflags = 0, hdfs_return;
238 ERR(
"Opening or creating a file in RDWR is not implemented in HDFS" );
242 fprintf( stdout,
"Opening or creating a file in Exclusive mode is not implemented in HDFS\n" );
246 fprintf( stdout,
"Opening or creating a file for appending is not implemented in HDFS\n" );
253 if ( createFile ==
TRUE ) {
262 fd_oflags |= O_WRONLY;
264 fd_oflags |= O_TRUNC;
265 fd_oflags |= O_WRONLY;
270 fd_oflags |= O_TRUNC;
271 fd_oflags |= O_WRONLY;
275 fd_oflags |= O_RDONLY;
282 if ( param->useO_DIRECT ==
TRUE ) {
305 printf(
"\thdfsOpenFile(0x%llx, %s, 0%o, %d, %d, %d)\n",
313 hdfs_file = hdfsOpenFile( param->
hdfs_fs,
320 ERR(
"Failed to open the file" );
335 printf(
"<- HDFS_Create_Or_Open\n");
337 return ((
void *) hdfs_file );
346 printf(
"-> HDFS_Create\n");
350 printf(
"<- HDFS_Create\n");
360 printf(
"-> HDFS_Open\n");
365 printf(
"<- HDFS_Open( ... TRUE)\n");
371 printf(
"<- HDFS_Open( ... FALSE)\n");
384 printf(
"-> HDFS_Xfer(acc:%d, file:0x%llx, buf:0x%llx, len:%llu, 0x%llx)\n",
385 access, file, buffer, length, param);
389 long long remaining = (
long long)length;
390 char* ptr = (
char *)buffer;
394 hdfsFile hdfs_file = (hdfsFile)file;
397 while ( remaining > 0 ) {
400 if (access ==
WRITE) {
402 fprintf( stdout,
"task %d writing to offset %lld\n",
404 param->
offset + length - remaining);
408 printf(
"\thdfsWrite( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
409 hdfs_fs, hdfs_file, ptr, remaining );
411 rc = hdfsWrite( hdfs_fs, hdfs_file, ptr, remaining );
413 ERR(
"hdfsWrite() failed" );
424 fprintf( stdout,
"task %d reading from offset %lld\n",
426 param->
offset + length - remaining );
430 printf(
"\thdfsRead( 0x%llx, 0x%llx, 0x%llx, %lld)\n",
431 hdfs_fs, hdfs_file, ptr, remaining );
433 rc = hdfsRead( hdfs_fs, hdfs_file, ptr, remaining );
436 ERR(
"hdfs_read() returned EOF prematurely" );
440 ERR(
"hdfs_read() failed" );
447 if ( rc < remaining ) {
448 fprintf(stdout,
"WARNING: Task %d, partial %s, %lld of %lld bytes at offset %lld\n",
450 access ==
WRITE ?
"hdfsWrite()" :
"hdfs_read()",
452 param->
offset + length - remaining );
455 MPI_CHECK( MPI_Abort( MPI_COMM_WORLD, -1 ),
"barrier error" );
459 ERR(
"too many retries -- aborting" );
464 assert( rc <= remaining );
471 printf(
"<- HDFS_Xfer\n");
482 printf(
"-> HDFS_Fsync\n");
485 hdfsFile hdfs_file = (hdfsFile)fd;
489 printf(
"\thdfsHSync(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
491 if ( hdfsHSync( hdfs_fs, hdfs_file ) != 0 ) {
492 EWARN(
"hdfsHSync() failed" );
496 printf(
"\thdfsHFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
498 if ( hdfsHFlush( hdfs_fs, hdfs_file ) != 0 ) {
499 EWARN(
"hdfsHFlush() failed" );
503 printf(
"\thdfsFlush(0x%llx, 0x%llx)\n", hdfs_fs, hdfs_file);
505 if ( hdfsFlush( hdfs_fs, hdfs_file ) != 0 ) {
506 EWARN(
"hdfsFlush() failed" );
511 printf(
"<- HDFS_Fsync\n");
521 printf(
"-> HDFS_Close\n");
525 hdfsFile hdfs_file = (hdfsFile)fd;
530 open_flags = O_CREAT | O_WRONLY;
532 open_flags = O_RDONLY;
535 if ( hdfsCloseFile( hdfs_fs, hdfs_file ) != 0 ) {
536 ERR(
"hdfsCloseFile() failed" );
540 printf(
"<- HDFS_Close\n");
552 printf(
"-> HDFS_Delete\n");
561 ERR_SIMPLE(
"Can't delete a file without an HDFS connection" );
563 if ( hdfsDelete( param->
hdfs_fs, testFileName, 0 ) != 0 ) {
565 "[RANK %03d]: hdfsDelete() of file \"%s\" failed\n",
571 printf(
"<- HDFS_Delete\n");
581 printf(
"-> HDFS_SetVersion\n");
586 printf(
"<- HDFS_SetVersion\n");
599 char * testFileName) {
601 printf(
"-> HDFS_GetFileSize(%s)\n", testFileName);
612 printf(
"\thdfsGetPathInfo(%s) ...", testFileName);fflush(stdout);
615 hdfsFileInfo* info = hdfsGetPathInfo( param->
hdfs_fs, testFileName );
619 printf(
"done.\n");fflush(stdout);
622 aggFileSizeFromStat = info->mSize;
626 printf(
"\tall-reduce (1)\n");
630 &aggFileSizeFromStat, &tmpSum, 1, MPI_LONG_LONG_INT, MPI_SUM, testComm ),
631 "cannot total data moved" );
633 aggFileSizeFromStat = tmpSum;
637 printf(
"\tall-reduce (2a)\n");
641 &aggFileSizeFromStat, &tmpMin, 1, MPI_LONG_LONG_INT, MPI_MIN, testComm ),
642 "cannot total data moved" );
645 printf(
"\tall-reduce (2b)\n");
649 &aggFileSizeFromStat, &tmpMax, 1, MPI_LONG_LONG_INT, MPI_MAX, testComm ),
650 "cannot total data moved" );
652 if ( tmpMin != tmpMax ) {
654 WARN(
"inconsistent file size by different tasks" );
658 aggFileSizeFromStat = tmpMin;
663 printf(
"<- HDFS_GetFileSize [%llu]\n", aggFileSizeFromStat);
665 return ( aggFileSizeFromStat );
static void HDFS_Delete(char *, IOR_param_t *)
IOR_offset_t transferSize
static IOR_offset_t HDFS_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
tPort hdfs_name_node_port
static void * HDFS_Create_Or_Open(char *testFileName, IOR_param_t *param, unsigned char createFile)
static void HDFS_SetVersion(IOR_param_t *)
#define MPI_CHECK(MPI_STATUS, MSG)
void hdfs_set_o_direct_flag(int *fd)
static void hdfs_connect(IOR_param_t *param)
static void * HDFS_Create(char *, IOR_param_t *)
static void hdfs_disconnect(IOR_param_t *param)
static IOR_offset_t HDFS_GetFileSize(IOR_param_t *, MPI_Comm, char *)
static void * HDFS_Open(char *, IOR_param_t *)
long long int IOR_offset_t
static void HDFS_Fsync(void *, IOR_param_t *)
const char * hdfs_name_node
static void HDFS_Close(void *, IOR_param_t *)