104 #include <curl/curl.h> 106 #include <libxml/parser.h> 107 #include <libxml/tree.h> 110 #include "aws4c_extra.h" 116 #define BUFF_SIZE 1024 129 # define IOR_CURL_INIT 0x01 130 # define IOR_CURL_NOCONTINUE 0x02 131 # define IOR_CURL_S3_EMC_EXT 0x04 134 # include <curl/curl.h> 196 .set_version = S3_SetVersion,
213 .set_version = S3_SetVersion,
225 AWS4C_CHECK( aws_init() );
242 if (Nto1 && (s != 1) && (b != t)) {
243 ERR(
"N:1 (strided) requires xfer-size == block-size");
251 #define CURL_ERR(MSG, CURL_ERRNO, PARAM) \ 253 fprintf(stdout, "ior ERROR: %s: %s (curl-errno=%d) (%s:%d)\n", \ 254 MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \ 255 __FILE__, __LINE__); \ 257 MPI_Abort((PARAM)->testComm, -1); \ 261 #define CURL_WARN(MSG, CURL_ERRNO) \ 263 fprintf(stdout, "ior WARNING: %s: %s (curl-errno=%d) (%s:%d)\n", \ 264 MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \ 265 __FILE__, __LINE__); \ 297 printf(
"-> s3_connect\n");
302 printf(
"<- s3_connect [nothing to do]\n");
321 aws_set_debug(param->
verbose >= 4);
322 aws_read_config(getenv(
"USER"));
323 aws_reuse_connections(1);
328 param->io_buf = aws_iobuf_new();
329 aws_iobuf_growth_size(param->io_buf, 1024*1024*1);
331 param->etags = aws_iobuf_new();
332 aws_iobuf_growth_size(param->etags, 1024*1024*8);
373 AWS4C_CHECK( s3_head(param->io_buf,
"") );
374 if ( param->io_buf->code == 404 ) {
375 printf(
" bucket '%s' doesn't exist\n", bucket_name);
377 AWS4C_CHECK( s3_put(param->io_buf,
"") );
378 AWS4C_CHECK_OK( param->io_buf );
379 printf(
"created bucket '%s'\n", bucket_name);
382 AWS4C_CHECK_OK( param->io_buf );
396 printf(
"<- s3_connect [success]\n");
404 printf(
"-> s3_disconnect\n");
410 printf(
"<- s3_disconnect\n");
421 aws_iobuf_reset(param->io_buf);
422 aws_iobuf_reset(param->etags);
460 unsigned char createFile,
461 int multi_part_upload_p ) {
464 printf(
"-> S3_Create_Or_Open('%s', ,%d, %d)\n",
465 testFileName, createFile, multi_part_upload_p);
473 fprintf( stdout,
"Opening in Exclusive mode is not implemented in S3\n" );
475 if ( param->useO_DIRECT ==
TRUE ) {
476 fprintf( stdout,
"Direct I/O mode is not implemented in S3\n" );
481 int n_to_1 = ! n_to_n;
485 if (! multi_part_upload_p)
489 else if (createFile) {
498 if (multi_part_upload_p) {
504 if ( n_to_n || (
rank == 0) ) {
508 aws_iobuf_reset(param->io_buf);
509 AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
510 AWS4C_CHECK_OK( param->io_buf );
515 IOBuf* response = aws_iobuf_new();
516 AWS4C_CHECK( s3_post2(param->io_buf,
buff,
NULL, response) );
517 AWS4C_CHECK_OK( param->io_buf );
520 aws_iobuf_realloc(response);
521 xmlDocPtr doc = xmlReadMemory(response->first->buf,
522 response->first->len,
525 ERR_SIMPLE(
"Rank0 Failed to find POST response\n");
528 xmlNode* root_element = xmlDocGetRootElement(doc);
529 const char* upload_id = find_element_named(root_element, (
char*)
"UploadId");
531 ERR_SIMPLE(
"couldn't find 'UploadId' in returned XML\n");
534 printf(
"got UploadId = '%s'\n", upload_id);
536 const size_t upload_id_len = strlen(upload_id);
537 if (upload_id_len > MAX_UPLOAD_ID_SIZE) {
539 "UploadId length %d exceeds expected max (%d)",
540 upload_id_len, MAX_UPLOAD_ID_SIZE);
545 memcpy(param->
UploadId, upload_id, upload_id_len);
550 aws_iobuf_free(response);
554 MPI_Bcast(param->
UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->
testComm);
558 MPI_Bcast(param->
UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->
testComm);
568 fprintf( stdout,
"rank %d resetting\n",
572 aws_iobuf_reset(param->io_buf);
573 AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
574 AWS4C_CHECK_OK( param->io_buf );
581 printf(
"<- S3_Create_Or_Open\n");
583 return ((
void *) testFileName );
592 printf(
"-> S3_Create\n");
596 printf(
"<- S3_Create\n");
604 printf(
"-> EMC_Create\n");
608 printf(
"<- EMC_Create\n");
622 printf(
"-> S3_Open\n");
627 printf(
"<- S3_Open( ... TRUE)\n");
633 printf(
"<- S3_Open( ... FALSE)\n");
642 printf(
"-> S3_Open\n");
647 printf(
"<- EMC_Open( ... TRUE)\n");
653 printf(
"<- EMC_Open( ... FALSE)\n");
740 int multi_part_upload_p ) {
743 printf(
"-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
744 access, (
char*)file, buffer, length, param);
747 char* fname = (
char*)file;
748 size_t remaining = (size_t)length;
749 char* data_ptr = (
char *)buffer;
754 int n_to_1 = (! n_to_n);
758 if (access ==
WRITE) {
761 fprintf( stdout,
"rank %d writing length=%lld to offset %lld\n",
764 param->
offset + length - remaining);
768 if (multi_part_upload_p) {
814 "%s?partNumber=%d&uploadId=%s",
815 fname, part_number, param->
UploadId);
827 aws_iobuf_reset(param->io_buf);
828 aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
829 AWS4C_CHECK( s3_put(param->io_buf,
buff) );
830 AWS4C_CHECK_OK( param->io_buf );
842 fprintf( stdout,
"rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n",
845 (n_to_1 ?
"N:1" :
"N:N"),
846 (segmented ?
"segmented" :
"strided"),
849 param->io_buf->eTag);
851 if (strlen(param->io_buf->eTag) !=
ETAG_SIZE+2) {
852 fprintf(stderr,
"Rank %d: ERROR: expected ETag to be %d hex digits\n",
861 aws_iobuf_append(param->etags,
862 param->io_buf->eTag +1,
863 strlen(param->io_buf->eTag) -2);
866 printf(
"rank %d: part %d = ETag %s\n",
rank, part_number, param->io_buf->eTag);
870 aws_iobuf_reset(param->io_buf);
878 s3_set_byte_range(-1,-1);
880 s3_set_byte_range(offset, remaining);
886 aws_iobuf_reset(param->io_buf);
887 aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
888 AWS4C_CHECK ( s3_put(param->io_buf, file) );
889 AWS4C_CHECK_OK( param->io_buf );
892 aws_iobuf_reset(param->io_buf);
897 WARN(
"S3 doesn't support 'fsync'" );
904 fprintf( stdout,
"rank %d reading from offset %lld\n",
906 param->
offset + length - remaining );
911 s3_set_byte_range(offset, remaining);
918 aws_iobuf_reset(param->io_buf);
919 aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
920 AWS4C_CHECK( s3_get(param->io_buf, file) );
921 if (param->io_buf->code != 206) {
923 "Unexpected result (%d, '%s')",
924 param->io_buf->code, param->io_buf->result);
929 aws_iobuf_reset(param->io_buf);
934 printf(
"<- S3_Xfer\n");
999 printf(
"-> S3_Fsync [no-op]\n");
1003 printf(
"<- S3_Fsync\n");
1037 int multi_part_upload_p ) {
1039 char* fname = (
char*)fd;
1043 int n_to_1 = (! n_to_n);
1047 printf(
"-> S3_Close('%s', ,%d) %s\n",
1049 multi_part_upload_p,
1050 ((n_to_n) ?
"N:N" : ((segmented) ?
"N:1(seg)" :
"N:1(str)")));
1057 if (multi_part_upload_p) {
1060 size_t etag_data_size = param->etags->write_count;
1061 size_t etags_per_rank = etag_data_size /
ETAG_SIZE;
1070 MPI_Datatype mpi_size_t;
1071 if (
sizeof(
size_t) ==
sizeof(
int))
1072 mpi_size_t = MPI_INT;
1073 else if (
sizeof(
size_t) ==
sizeof(
long))
1074 mpi_size_t = MPI_LONG;
1076 mpi_size_t = MPI_LONG_LONG;
1079 size_t etag_count_max = 0;
1080 MPI_Allreduce(&etags_per_rank, &etag_count_max,
1081 1, mpi_size_t, MPI_MAX, param->
testComm);
1082 if (etags_per_rank != etag_count_max) {
1083 printf(
"Rank %d: etag count mismatch: max:%d, mine:%d\n",
1084 rank, etag_count_max, etags_per_rank);
1089 aws_iobuf_realloc(param->etags);
1090 char* etag_data = param->etags->first->buf;
1098 char* etag_vec = (
char*)malloc((param->
numTasks * etag_data_size) +1);
1100 fprintf(stderr,
"rank 0 failed to malloc %d bytes\n",
1104 MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1105 etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
1111 printf(
"rank 0: gathered %d etags from all ranks:\n", etags_per_rank);
1113 for (rnk=0; rnk<param->
numTasks; ++rnk) {
1114 printf(
"\t[%d]: '", rnk);
1117 for (ii=0; ii<etag_data_size; ++ii)
1118 printf(
"%c", etag_ptr[ii]);
1121 etag_ptr += etag_data_size;
1172 size_t start_multiplier;
1177 j_max = etags_per_rank;
1178 start_multiplier = etag_data_size;
1182 i_max = etags_per_rank;
1185 stride = etag_data_size;
1189 xml = aws_iobuf_new();
1190 aws_iobuf_growth_size(xml, 1024 * 8);
1193 aws_iobuf_append_str(xml,
"<CompleteMultipartUpload>\n");
1196 for (i=0; i<i_max; ++i) {
1198 etag_ptr=etag_vec + (i * start_multiplier);
1200 for (j=0; j<j_max; ++j) {
1210 " <PartNumber>%d</PartNumber>\n" 1211 " <ETag>%s</ETag>\n" 1215 aws_iobuf_append_str(xml,
buff);
1223 aws_iobuf_append_str(xml,
"</CompleteMultipartUpload>\n");
1227 MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1228 NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
1234 xml = aws_iobuf_new();
1235 aws_iobuf_growth_size(xml, 1024 * 8);
1238 aws_iobuf_append_str(xml,
"<CompleteMultipartUpload>\n");
1244 for (i=0; i<etags_per_rank; ++i) {
1248 int sz = aws_iobuf_get_raw(param->etags, etag,
ETAG_SIZE);
1251 "Read of ETag %d had length %d (not %d)\n",
1261 " <PartNumber>%d</PartNumber>\n" 1262 " <ETag>%s</ETag>\n" 1266 aws_iobuf_append_str(xml,
buff);
1272 aws_iobuf_append_str(xml,
"</CompleteMultipartUpload>\n");
1278 if (n_to_n || (
rank == 0)) {
1282 debug_iobuf(xml, 1, 1);
1289 AWS4C_CHECK ( s3_post(xml,
buff) );
1290 AWS4C_CHECK_OK( xml );
1292 aws_iobuf_free(xml);
1313 printf(
"rank %d: passed barrier\n",
rank);
1319 aws_reset_connection();
1324 printf(
"<- S3_Close\n");
1357 printf(
"-> S3_Delete(%s)\n", testFileName);
1367 AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1370 aws_iobuf_reset(param->io_buf);
1371 AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1374 AWS4C_CHECK_OK( param->io_buf );
1377 printf(
"<- S3_Delete\n");
1386 printf(
"-> EMC_Delete(%s)\n", testFileName);
1396 AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1399 aws_iobuf_reset(param->io_buf);
1400 AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1403 AWS4C_CHECK_OK( param->io_buf );
1406 printf(
"<- EMC_Delete\n");
1427 char * testFileName) {
1430 printf(
"-> S3_GetFileSize(%s)\n", testFileName);
1441 AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
1442 if ( ! AWS4C_OK(param->io_buf) ) {
1443 fprintf(stderr,
"rank %d: couldn't stat '%s': %s\n",
1444 rank, testFileName, param->io_buf->result);
1447 aggFileSizeFromStat = param->io_buf->contentLen;
1450 printf(
"\trank %d: file-size %llu\n",
rank, aggFileSizeFromStat);
1455 printf(
"\tall-reduce (1)\n");
1457 MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1463 "cannot total data moved" );
1465 aggFileSizeFromStat = tmpSum;
1469 printf(
"\tall-reduce (2a)\n");
1471 MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1477 "cannot total data moved" );
1480 printf(
"\tall-reduce (2b)\n");
1482 MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1488 "cannot total data moved" );
1490 if ( tmpMin != tmpMax ) {
1492 WARN(
"inconsistent file size by different tasks" );
1496 aggFileSizeFromStat = tmpMin;
1501 printf(
"<- S3_GetFileSize [%llu]\n", aggFileSizeFromStat);
1503 return ( aggFileSizeFromStat );
static void S3_Fsync(void *, IOR_param_t *)
static void EMC_Close(void *, IOR_param_t *)
static IOR_offset_t EMC_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
static void * EMC_Create(char *, IOR_param_t *)
static void S3_finalize()
static void S3_Close(void *, IOR_param_t *)
IOR_offset_t segmentCount
static void s3_connect(IOR_param_t *param)
IOR_offset_t transferSize
static IOR_offset_t S3_GetFileSize(IOR_param_t *, MPI_Comm, char *)
static void EMC_Delete(char *testFileName, IOR_param_t *param)
char * aiori_get_version()
static void * S3_Create_Or_Open_internal(char *testFileName, IOR_param_t *param, unsigned char createFile, int multi_part_upload_p)
static IOR_offset_t S3_Xfer_internal(int access, void *file, IOR_size_t *buffer, IOR_offset_t length, IOR_param_t *param, int multi_part_upload_p)
static void * EMC_Open(char *, IOR_param_t *)
static char buff[BUFF_SIZE]
#define MPI_CHECK(MPI_STATUS, MSG)
static int S3_check_params(IOR_param_t *)
static void S3_Close_internal(void *fd, IOR_param_t *param, int multi_part_upload_p)
static void * S3_Open(char *, IOR_param_t *)
static void * S3_Create(char *, IOR_param_t *)
static IOR_offset_t S3_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
static void s3_disconnect(IOR_param_t *param)
void s3_MPU_reset(IOR_param_t *param)
static void S3_Delete(char *, IOR_param_t *)
long long int IOR_offset_t
#define IOR_CURL_S3_EMC_EXT
ior_aiori_t s3_plus_aiori