IOR
aiori-S3.c
Go to the documentation of this file.
1 /* -*- mode: c; indent-tabs-mode: t; -*-
2  * vim:noexpandtab:
3  *
4  * Editing with tabs allows different users to pick their own indentation
5  * appearance without changing the file.
6  */
7 
8 /*
9  * Copyright (c) 2009, Los Alamos National Security, LLC All rights reserved.
10  * Copyright 2009. Los Alamos National Security, LLC. This software was produced
11  * under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National
12  * Laboratory (LANL), which is operated by Los Alamos National Security, LLC for
13  * the U.S. Department of Energy. The U.S. Government has rights to use,
14  * reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS
15  * ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
16  * ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is
17  * modified to produce derivative works, such modified software should be
18  * clearly marked, so as not to confuse it with the version available from
19  * LANL.
20  *
21  * Additionally, redistribution and use in source and binary forms, with or
22  * without modification, are permitted provided that the following conditions are
23  * met:
24  *
25  * • Redistributions of source code must retain the above copyright notice,
26  * this list of conditions and the following disclaimer.
27  *
28  * • Redistributions in binary form must reproduce the above copyright notice,
29  * this list of conditions and the following disclaimer in the documentation
30  * and/or other materials provided with the distribution.
31  *
32  * • Neither the name of Los Alamos National Security, LLC, Los Alamos National
33  * Laboratory, LANL, the U.S. Government, nor the names of its contributors may be
34  * used to endorse or promote products derived from this software without specific
35  * prior written permission.
36  *
37  * THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS
38  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
39  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
40  * ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
43  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
44  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
45  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
46  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
47  * OF SUCH DAMAGE.
48  */
49 
50 /******************************************************************************
51  *
52  * Implementation of abstract IOR interface, for the Amazon S3 API.
53  * EMC/ViPR supports some useful extensions to S3, which we also implement
54  * here. There are 3 different mixes:
55  *
56  * (1) "Pure S3" uses S3 "Multi-Part Upload" to do N:1 writes. N:N writes
57  * fail, in the case where IOR "transfer-size" differs from
58  * "block-size', because this implies an "append", and append is not
59  * supported in S3. [TBD: The spec also says multi-part upload can't
60  * have any individual part greater than 5MB, or more then 10k total
61  * parts. Failing these conditions may produce obscure errors. Should
62  * we enforce? ]
63  *
64  * --> Select this option with the '-a S3' command-line arg to IOR
65  *
66  *
67  * (2) "EMC S3 Extensions" uses the EMC byte-range support for N:1
68  * writes, eliminating Multi-Part Upload. EMC expects this will
69  * perform better than MPU, and it avoids some problems that are
70  * imposed by the S3 MPU spec. [See comments at EMC_Xfer().]
71  *
72  * --> Select this option with the '-a EMC_S3' command-line arg to IOR
73  *
74  *
75  * NOTE: Putting EMC's S3-extensions in the same file with the S3 API
76  * allows us to share some code that would otherwise be duplicated
77  * (e.g. s3_connect(), etc). This should also help us avoid losing
78  * bug fixes that are discovered in one interface or the other. In
79  * some cases, S3 is incapable of supporting all the needs of IOR.
80  * (For example, see notes about "append", above S3_Xfer().
81  *
82  ******************************************************************************/
83 
84 #ifdef HAVE_CONFIG_H
85 # include "config.h"
86 #endif
87 
88 #include <stdio.h>
89 #include <stdlib.h>
90 #include <string.h> /* strnstr() */
91 
92 #include <errno.h>
93 #include <assert.h>
94 /*
95 #ifdef HAVE_LUSTRE_LUSTRE_USER_H
96 #include <lustre/lustre_user.h>
97 #endif
98 */
99 
100 #include "ior.h"
101 #include "aiori.h"
102 #include "iordef.h"
103 
104 #include <curl/curl.h>
105 
106 #include <libxml/parser.h> // from libxml2
107 #include <libxml/tree.h>
108 
109 #include "aws4c.h" // extended vers of "aws4c" lib for S3 via libcurl
110 #include "aws4c_extra.h" // utilities, e.g. for parsing XML in responses
111 
112 
113 
114 
115 /* buffer is used to generate URLs, err_msgs, etc */
116 #define BUFF_SIZE 1024
117 static char buff[BUFF_SIZE];
118 
119 const int ETAG_SIZE = 32;
120 
121 CURLcode rc;
122 
123 /* Any objects we create or delete will be under this bucket */
124 const char* bucket_name = "ior";
125 
126 /* TODO: The following stuff goes into options! */
127 /* REST/S3 variables */
128 // CURL* curl; /* for libcurl "easy" fns (now managed by aws4c) */
129 # define IOR_CURL_INIT 0x01 /* curl top-level inits were perfomed once? */
130 # define IOR_CURL_NOCONTINUE 0x02
131 # define IOR_CURL_S3_EMC_EXT 0x04 /* allow EMC extensions to S3? */
132 
133 #ifdef USE_S3_AIORI
134 # include <curl/curl.h>
135 # include "aws4c.h"
136 #else
137  typedef void CURL; /* unused, but needs a type */
138  typedef void IOBuf; /* unused, but needs a type */
139 #endif
140 
141  IOBuf* io_buf; /* aws4c places parsed header values here */
142  IOBuf* etags; /* accumulate ETags for N:1 parts */
143 
145 
146 /**************************** P R O T O T Y P E S *****************************/
147 static void* S3_Create(char*, IOR_param_t*);
148 static void* S3_Open(char*, IOR_param_t*);
149 static IOR_offset_t S3_Xfer(int, void*, IOR_size_t*, IOR_offset_t, IOR_param_t*);
150 static void S3_Close(void*, IOR_param_t*);
151 
152 static void* EMC_Create(char*, IOR_param_t*);
153 static void* EMC_Open(char*, IOR_param_t*);
155 static void EMC_Close(void*, IOR_param_t*);
156 
157 static void S3_Delete(char*, IOR_param_t*);
158 static void S3_Fsync(void*, IOR_param_t*);
159 static IOR_offset_t S3_GetFileSize(IOR_param_t*, MPI_Comm, char*);
160 static void S3_init();
161 static void S3_finalize();
162 static int S3_check_params(IOR_param_t *);
163 
164 
165 /************************** D E C L A R A T I O N S ***************************/
166 
167 // "Pure S3"
168 // N:1 writes use multi-part upload
169 // N:N fails if "transfer-size" != "block-size" (because that requires "append")
171  .name = "S3",
172  .name_legacy = NULL,
173  .create = S3_Create,
174  .open = S3_Open,
175  .xfer = S3_Xfer,
176  .close = S3_Close,
177  .delete = S3_Delete,
178  .get_version = aiori_get_version,
179  .fsync = S3_Fsync,
180  .get_file_size = S3_GetFileSize,
181  .initialize = S3_init,
182  .finalize = S3_finalize,
183  .check_params = S3_check_params
184 };
185 
186 // "S3", plus EMC-extensions enabled
187 // N:1 writes use multi-part upload
188 // N:N succeeds (because EMC-extensions support "append")
190  .name = "S3_plus",
191  .create = S3_Create,
192  .open = S3_Open,
193  .xfer = S3_Xfer,
194  .close = S3_Close,
195  .delete = S3_Delete,
196  .set_version = S3_SetVersion,
197  .fsync = S3_Fsync,
198  .get_file_size = S3_GetFileSize,
199  .initialize = S3_init,
200  .finalize = S3_finalize
201 };
202 
203 // Use EMC-extensions for N:1 write, as well
204 // N:1 writes use EMC byte-range
205 // N:N succeeds because EMC-extensions support "append"
207  .name = "S3_EMC",
208  .create = EMC_Create,
209  .open = EMC_Open,
210  .xfer = EMC_Xfer,
211  .close = EMC_Close,
212  .delete = S3_Delete,
213  .set_version = S3_SetVersion,
214  .fsync = S3_Fsync,
215  .get_file_size = S3_GetFileSize,
216  .initialize = S3_init,
217  .finalize = S3_finalize
218 };
219 
220 
221 static void S3_init(){
222  /* This is supposed to be done before *any* threads are created.
223  * Could MPI_Init() create threads (or call multi-threaded
224  * libraries)? We'll assume so. */
225  AWS4C_CHECK( aws_init() );
226 }
227 
228 static void S3_finalize(){
229  /* done once per program, after exiting all threads.
230  * NOTE: This fn doesn't return a value that can be checked for success. */
231  aws_cleanup();
232 }
233 
234 static int S3_check_params(IOR_param_t * test){
235  /* N:1 and N:N */
236  IOR_offset_t NtoN = test->filePerProc;
237  IOR_offset_t Nto1 = ! NtoN;
238  IOR_offset_t s = test->segmentCount;
239  IOR_offset_t t = test->transferSize;
240  IOR_offset_t b = test->blockSize;
241 
242  if (Nto1 && (s != 1) && (b != t)) {
243  ERR("N:1 (strided) requires xfer-size == block-size");
244  return 0;
245  }
246 
247  return 1;
248 }
249 
250 /* modelled on similar macros in iordef.h */
251 #define CURL_ERR(MSG, CURL_ERRNO, PARAM) \
252  do { \
253  fprintf(stdout, "ior ERROR: %s: %s (curl-errno=%d) (%s:%d)\n", \
254  MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \
255  __FILE__, __LINE__); \
256  fflush(stdout); \
257  MPI_Abort((PARAM)->testComm, -1); \
258  } while (0)
259 
260 
261 #define CURL_WARN(MSG, CURL_ERRNO) \
262  do { \
263  fprintf(stdout, "ior WARNING: %s: %s (curl-errno=%d) (%s:%d)\n", \
264  MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \
265  __FILE__, __LINE__); \
266  fflush(stdout); \
267  } while (0)
268 
269 
270 /***************************** F U N C T I O N S ******************************/
271 
272 
273 
274 
275 /* ---------------------------------------------------------------------------
276  * "Connect" to an S3 object-file-system. We're really just initializing
277  * libcurl. We need this done before any interactions. It is easy for
278  * ior_aiori.open/create to assure that we connect, if we haven't already
279  * done so. However, there's not a simple way to assure that we
280  * "disconnect" at the end. For now, we'll make a special call at the end
281  * of ior.c
282  *
283  * NOTE: It's okay to call this thing whenever you need to be sure the curl
284  * handle is initialized.
285  *
286  * NOTE: Our custom version of aws4c can be configured so that connections
287  * are reused, instead of opened and closed on every operation. We
288  * do configure it that way, but you still need to call these
289  * connect/disconnet functions, in order to insure that aws4c has
290  * been configured.
291  * ---------------------------------------------------------------------------
292  */
293 
294 
295 static void s3_connect( IOR_param_t* param ) {
296  if (param->verbose >= VERBOSE_2) {
297  printf("-> s3_connect\n"); /* DEBUGGING */
298  }
299 
300  if ( param->curl_flags & IOR_CURL_INIT ) {
301  if (param->verbose >= VERBOSE_2) {
302  printf("<- s3_connect [nothing to do]\n"); /* DEBUGGING */
303  }
304  return;
305  }
306 
307  // --- Done once-only (per rank). Perform all first-time inits.
308  //
309  // The aws library requires a config file, as illustrated below. We
310  // assume that the user running the test has an entry in this file,
311  // using their login moniker (i.e. `echo $USER`) as the key, as
312  // suggested in the example:
313  //
314  // <user>:<s3_login_id>:<s3_private_key>
315  //
316  // This file must not be readable by other than user.
317  //
318  // NOTE: These inits could be done in init_IORParam_t(), in ior.c, but
319  // would require conditional compilation, there.
320 
321  aws_set_debug(param->verbose >= 4);
322  aws_read_config(getenv("USER")); // requires ~/.awsAuth
323  aws_reuse_connections(1);
324 
325  // initalize IOBufs. These are basically dynamically-extensible
326  // linked-lists. "growth size" controls the increment of new memory
327  // allocated, whenever storage is used up.
328  param->io_buf = aws_iobuf_new();
329  aws_iobuf_growth_size(param->io_buf, 1024*1024*1);
330 
331  param->etags = aws_iobuf_new();
332  aws_iobuf_growth_size(param->etags, 1024*1024*8);
333 
334  // WARNING: if you have http_proxy set in your environment, you may need
335  // to override it here. TBD: add a command-line variable to
336  // allow you to define a proxy.
337  //
338  // our hosts are currently 10.140.0.15 - 10.140 0.18
339  // TBD: Try DNS-round-robin server at vi-lb.ccstar.lanl.gov
340  // TBD: try HAProxy round-robin at 10.143.0.1
341 
342 #if 1
343  // snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
344  // s3_set_proxy(buff);
345  //
346  // snprintf(buff, BUFF_SIZE, "10.140.0.%d", 15 + (rank % 4));
347  // s3_set_host(buff);
348 
349  snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
350  s3_set_host(buff);
351 
352 #else
353 /*
354  * If you just want to go to one if the ECS nodes, put that IP
355  * address in here directly with port 9020.
356  *
357  */
358 // s3_set_host("10.140.0.15:9020");
359 
360 /*
361  * If you want to go to haproxy.ccstar.lanl.gov, this is its IP
362  * address.
363  *
364  */
365 // s3_set_proxy("10.143.0.1:80");
366 // s3_set_host( "10.143.0.1:80");
367 #endif
368 
369  // make sure test-bucket exists
370  s3_set_bucket((char*)bucket_name);
371 
372  if (rank == 0) {
373  AWS4C_CHECK( s3_head(param->io_buf, "") );
374  if ( param->io_buf->code == 404 ) { // "404 Not Found"
375  printf(" bucket '%s' doesn't exist\n", bucket_name);
376 
377  AWS4C_CHECK( s3_put(param->io_buf, "") ); /* creates URL as bucket + obj */
378  AWS4C_CHECK_OK( param->io_buf ); // assure "200 OK"
379  printf("created bucket '%s'\n", bucket_name);
380  }
381  else { // assure "200 OK"
382  AWS4C_CHECK_OK( param->io_buf );
383  }
384  }
385  MPI_CHECK(MPI_Barrier(param->testComm), "barrier error");
386 
387 
388  // Maybe allow EMC extensions to S3
389  s3_enable_EMC_extensions(param->curl_flags & IOR_CURL_S3_EMC_EXT);
390 
391  // don't perform these inits more than once
392  param->curl_flags |= IOR_CURL_INIT;
393 
394 
395  if (param->verbose >= VERBOSE_2) {
396  printf("<- s3_connect [success]\n");
397  }
398 }
399 
400 static
401 void
403  if (param->verbose >= VERBOSE_2) {
404  printf("-> s3_disconnect\n");
405  }
406 
407  // nothing to do here, if using new aws4c ...
408 
409  if (param->verbose >= VERBOSE_2) {
410  printf("<- s3_disconnect\n");
411  }
412 }
413 
414 
415 
416 // After finalizing an S3 multi-part-upload, you must reset some things
417 // before you can use multi-part-upload again. This will also avoid (one
418 // particular set of) memory-leaks.
419 void
421  aws_iobuf_reset(param->io_buf);
422  aws_iobuf_reset(param->etags);
423  param->part_number = 0;
424 }
425 
426 
427 /* ---------------------------------------------------------------------------
428  * direct support for the IOR S3 interface
429  * ---------------------------------------------------------------------------
430  */
431 
432 /*
433  * One doesn't "open" an object, in REST semantics. All we really care
434  * about is whether caller expects the object to have zero-size, when we
435  * return. If so, we conceptually delete it, then recreate it empty.
436  *
437  * ISSUE: If the object is going to receive "appends" (supported in EMC S3
438  * extensions), the object has to exist before the first append
439  * operation. On the other hand, there appears to be a bug in the
440  * EMC implementation, such that if an object ever receives appends,
441  * and then is deleted, and then recreated, the recreated object will
442  * always return "500 Server Error" on GET (whether it has been
443  * appended or not).
444  *
445  * Therefore, a safer thing to do here is write zero-length contents,
446  * instead of deleting.
447  *
448  * NOTE: There's also no file-descriptor to return, in REST semantics. On
449  * the other hand, we keep needing the file *NAME*. Therefore, we
450  * will return the file-name, and let IOR pass it around to our
451  * functions, in place of what IOR understands to be a
452  * file-descriptor.
453  *
454  */
455 
456 static
457 void *
458 S3_Create_Or_Open_internal(char* testFileName,
460  unsigned char createFile,
461  int multi_part_upload_p ) {
462 
463  if (param->verbose >= VERBOSE_2) {
464  printf("-> S3_Create_Or_Open('%s', ,%d, %d)\n",
465  testFileName, createFile, multi_part_upload_p);
466  }
467 
468  /* initialize curl, if needed */
469  s3_connect( param );
470 
471  /* Check for unsupported flags */
472  if ( param->openFlags & IOR_EXCL ) {
473  fprintf( stdout, "Opening in Exclusive mode is not implemented in S3\n" );
474  }
475  if ( param->useO_DIRECT == TRUE ) {
476  fprintf( stdout, "Direct I/O mode is not implemented in S3\n" );
477  }
478 
479  // easier to think
480  int n_to_n = param->filePerProc;
481  int n_to_1 = ! n_to_n;
482 
483  /* check whether object needs reset to zero-length */
484  int needs_reset = 0;
485  if (! multi_part_upload_p)
486  needs_reset = 1; /* so "append" can work */
487  else if ( param->openFlags & IOR_TRUNC )
488  needs_reset = 1; /* so "append" can work */
489  else if (createFile) {
490  // AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
491  // if ( ! AWS4C_OK(param->io_buf) )
492  needs_reset = 1;
493  }
494 
495  if ( param->open == WRITE ) {
496 
497  /* initializations for N:1 or N:N writes using multi-part upload */
498  if (multi_part_upload_p) {
499 
500  // For N:N, all ranks do their own MPU open/close. For N:1, only
501  // rank0 does that. Either way, the response from the server
502  // includes an "uploadId", which must be used to upload parts to
503  // the same object.
504  if ( n_to_n || (rank == 0) ) {
505 
506  // rank0 handles truncate
507  if ( needs_reset) {
508  aws_iobuf_reset(param->io_buf);
509  AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); /* 0-length write */
510  AWS4C_CHECK_OK( param->io_buf );
511  }
512 
513  // POST request with URL+"?uploads" initiates multi-part upload
514  snprintf(buff, BUFF_SIZE, "%s?uploads", testFileName);
515  IOBuf* response = aws_iobuf_new();
516  AWS4C_CHECK( s3_post2(param->io_buf, buff, NULL, response) );
517  AWS4C_CHECK_OK( param->io_buf );
518 
519  // parse XML returned from server, into a tree structure
520  aws_iobuf_realloc(response);
521  xmlDocPtr doc = xmlReadMemory(response->first->buf,
522  response->first->len,
523  NULL, NULL, 0);
524  if (doc == NULL)
525  ERR_SIMPLE("Rank0 Failed to find POST response\n");
526 
527  // navigate parsed XML-tree to find UploadId
528  xmlNode* root_element = xmlDocGetRootElement(doc);
529  const char* upload_id = find_element_named(root_element, (char*)"UploadId");
530  if (! upload_id)
531  ERR_SIMPLE("couldn't find 'UploadId' in returned XML\n");
532 
533  if (param->verbose >= VERBOSE_3)
534  printf("got UploadId = '%s'\n", upload_id);
535 
536  const size_t upload_id_len = strlen(upload_id);
537  if (upload_id_len > MAX_UPLOAD_ID_SIZE) {
538  snprintf(buff, BUFF_SIZE,
539  "UploadId length %d exceeds expected max (%d)",
540  upload_id_len, MAX_UPLOAD_ID_SIZE);
541  ERR_SIMPLE(buff);
542  }
543 
544  // save the UploadId we found
545  memcpy(param->UploadId, upload_id, upload_id_len);
546  param->UploadId[upload_id_len] = 0;
547 
548  // free storage for parsed XML tree
549  xmlFreeDoc(doc);
550  aws_iobuf_free(response);
551 
552  // For N:1, share UploadId across all ranks
553  if (n_to_1)
554  MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm);
555  }
556  else
557  // N:1, and we're not rank0. recv UploadID from Rank 0
558  MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, param->testComm);
559  }
560 
561  /* initializations for N:N or N:1 writes using EMC byte-range extensions */
562  else {
563 
564  /* maybe reset to zero-length, so "append" can work */
565  if (needs_reset) {
566 
567  if (verbose >= VERBOSE_3) {
568  fprintf( stdout, "rank %d resetting\n",
569  rank);
570  }
571 
572  aws_iobuf_reset(param->io_buf);
573  AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
574  AWS4C_CHECK_OK( param->io_buf );
575  }
576  }
577  }
578 
579 
580  if (param->verbose >= VERBOSE_2) {
581  printf("<- S3_Create_Or_Open\n");
582  }
583  return ((void *) testFileName );
584 }
585 
586 
587 
588 static
589 void *
590 S3_Create( char *testFileName, IOR_param_t * param ) {
591  if (param->verbose >= VERBOSE_2) {
592  printf("-> S3_Create\n");
593  }
594 
595  if (param->verbose >= VERBOSE_2) {
596  printf("<- S3_Create\n");
597  }
598  return S3_Create_Or_Open_internal( testFileName, param, TRUE, TRUE );
599 }
600 static
601 void *
602 EMC_Create( char *testFileName, IOR_param_t * param ) {
603  if (param->verbose >= VERBOSE_2) {
604  printf("-> EMC_Create\n");
605  }
606 
607  if (param->verbose >= VERBOSE_2) {
608  printf("<- EMC_Create\n");
609  }
610  return S3_Create_Or_Open_internal( testFileName, param, TRUE, FALSE );
611 }
612 
613 
614 
615 
616 
617 
618 static
619 void *
620 S3_Open( char *testFileName, IOR_param_t * param ) {
621  if (param->verbose >= VERBOSE_2) {
622  printf("-> S3_Open\n");
623  }
624 
625  if ( param->openFlags & IOR_CREAT ) {
626  if (param->verbose >= VERBOSE_2) {
627  printf("<- S3_Open( ... TRUE)\n");
628  }
629  return S3_Create_Or_Open_internal( testFileName, param, TRUE, TRUE );
630  }
631  else {
632  if (param->verbose >= VERBOSE_2) {
633  printf("<- S3_Open( ... FALSE)\n");
634  }
635  return S3_Create_Or_Open_internal( testFileName, param, FALSE, TRUE );
636  }
637 }
638 static
639 void *
640 EMC_Open( char *testFileName, IOR_param_t * param ) {
641  if (param->verbose >= VERBOSE_2) {
642  printf("-> S3_Open\n");
643  }
644 
645  if ( param->openFlags & IOR_CREAT ) {
646  if (param->verbose >= VERBOSE_2) {
647  printf("<- EMC_Open( ... TRUE)\n");
648  }
649  return S3_Create_Or_Open_internal( testFileName, param, TRUE, FALSE );
650  }
651  else {
652  if (param->verbose >= VERBOSE_2) {
653  printf("<- EMC_Open( ... FALSE)\n");
654  }
655  return S3_Create_Or_Open_internal( testFileName, param, FALSE, FALSE );
656  }
657 }
658 
659 
660 /*
661  * transfer (more) data to an object. <file> is just the obj name.
662  *
663  * For N:1, param->offset is understood as offset for a given client to
664  * write into the "file". This translates to a byte-range in the HTTP
665  * request. Each write in the N:1 case is treated as a complete "part",
666  * so there is no such thing as a partial write.
667  *
668  * For N:N, when IOR "transfer-size" differs from "block-size", IOR treats
669  * Xfer as a partial write (i.e. there are multiple calls to XFER, to write
670  * any one of the "N" objects, as a series of "append" operations). This
671  * is not supported in S3/REST. Therefore, we depend on an EMC extension,
672  * in this case. This EMC extension allows appends using a byte-range
673  * header spec of "Range: bytes=-1-". aws4c now provides
674  * s3_enable_EMC_extensions(), to allow this behavior. If EMC-extensions
675  * are not enabled, the aws4c library will generate a run-time error, in
676  * this case.
677  *
678  * Each write-request returns an ETag which is a hash of the data. (The
679  * ETag could also be computed directly, if we wanted.) We must save the
680  * etags for later use by S3_close().
681  *
682  * WARNING: "Pure" S3 doesn't allow byte-ranges for writes to an object.
683  * Thus, you also can not append to an object. In the context of IOR,
684  * this causes objects to have only the size of the most-recent write.
685  * Thus, If the IOR "transfer-size" is different from the IOR
686  * "block-size", the files will be smaller than the amount of data
687  * that was written to them.
688  *
689  * EMC does support "append" to an object. In order to allow this,
690  * you must enable the EMC-extensions in the aws4c library, by calling
691  * s3_set_emc_compatibility() with a non-zero argument.
692  *
693  * NOTE: I don't think REST allows us to read/write an amount other than
694  * the size we request. Maybe our callback-handlers (above) could
695  * tell us? For now, this is assuming we only have to send one
696  * request, to transfer any amount of data. (But see above, re EMC
697  * support for "append".)
698  */
699 /* In the EMC case, instead of Multi-Part Upload we can use HTTP
700  * "byte-range" headers to write parts of a single object. This appears to
701  * have several advantages over the S3 MPU spec:
702  *
703  * (a) no need for a special "open" operation, to capture an "UploadID".
704  * Instead we simply write byte-ranges, and the server-side resolves
705  * any races, producing a single winner. In the IOR case, there should
706  * be no races, anyhow.
707  *
708  * (b) individual write operations don't have to refer to an ID, or to
709  * parse and save ETags returned from every write.
710  *
711  * (c) no need for a special "close" operation, in which all the saved
712  * ETags are gathered at a single rank, placed into XML, and shipped to
713  * the server, to finalize the MPU. That special close appears to
714  * impose two scaling problems: (1) requires all ETags to be shipped at
715  * the BW available to a single process, (1) requires either that they
716  * all fit into memory of a single process, or be written to disk
717  * (imposes additional BW contraints), or make a more-complex
718  * interaction with a threaded curl writefunction, to present the
719  * appearance of a single thread to curl, whilst allowing streaming
720  * reception of non-local ETags.
721  *
722  * (d) no constraints on the number or size of individual parts. (These
723  * exist in the S3 spec, the EMC's impl of the S3 multi-part upload is
724  * also free of these constraints.)
725  *
726  * Instead, parallel processes can write any number and/or size of updates,
727  * using a "byte-range" header. After each write returns, that part of the
728  * global object is visible to any reader. Places that are not updated
729  * read as zeros.
730  */
731 
732 
733 static
735 S3_Xfer_internal(int access,
736  void* file,
737  IOR_size_t* buffer,
738  IOR_offset_t length,
740  int multi_part_upload_p ) {
741 
742  if (param->verbose >= VERBOSE_2) {
743  printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
744  access, (char*)file, buffer, length, param);
745  }
746 
747  char* fname = (char*)file; /* see NOTE above S3_Create_Or_Open() */
748  size_t remaining = (size_t)length;
749  char* data_ptr = (char *)buffer;
750  off_t offset = param->offset;
751 
752  // easier to think
753  int n_to_n = param->filePerProc;
754  int n_to_1 = (! n_to_n);
755  int segmented = (param->segmentCount == 1);
756 
757 
758  if (access == WRITE) { /* WRITE */
759 
760  if (verbose >= VERBOSE_3) {
761  fprintf( stdout, "rank %d writing length=%lld to offset %lld\n",
762  rank,
763  remaining,
764  param->offset + length - remaining);
765  }
766 
767 
768  if (multi_part_upload_p) {
769 
770  // For N:1, part-numbers must have a global ordering for the
771  // components of the final object. param->part_number is
772  // incremented by 1 per write, on each rank. This lets us use it
773  // to compute a global part-numbering.
774  //
775  // In the N:N case, we only need to increment part-numbers within
776  // each rank.
777  //
778  // In the N:1 case, the global order of part-numbers we're writing
779  // depends on whether wer're writing strided or segmented, in
780  // other words, how <offset> and <remaining> are acutally
781  // positioning the parts being written. [See discussion at
782  // S3_Close_internal().]
783  //
784  // NOTE: 's3curl.pl --debug' shows StringToSign having partNumber
785  // first, even if I put uploadId first in the URL. Maybe
786  // that's what the server will do. GetStringToSign() in
787  // aws4c is not clever about this, so we spoon-feed args in
788  // the proper order.
789 
790  size_t part_number;
791  if (n_to_1) {
792  if (segmented) { // segmented
793  size_t parts_per_rank = param->blockSize / param->transferSize;
794  part_number = (rank * parts_per_rank) + param->part_number;
795  }
796  else // strided
797  part_number = (param->part_number * param->numTasks) + rank;
798  }
799  else
800  part_number = param->part_number;
801  ++ param->part_number;
802 
803 
804  // if (verbose >= VERBOSE_3) {
805  // fprintf( stdout, "rank %d of %d writing (%s,%s) part_number %lld\n",
806  // rank,
807  // param->numTasks,
808  // (n_to_1 ? "N:1" : "N:N"),
809  // (segmented ? "segmented" : "strided"),
810  // part_number);
811  // }
812 
813  snprintf(buff, BUFF_SIZE,
814  "%s?partNumber=%d&uploadId=%s",
815  fname, part_number, param->UploadId);
816 
817  // For performance, we append <data_ptr> directly into the linked list
818  // of data in param->io_buf. We are "appending" rather than
819  // "extending", so the added buffer is seen as written data, rather
820  // than empty storage.
821  //
822  // aws4c parses some header-fields automatically for us (into members
823  // of the IOBuf). After s3_put2(), we can just read the etag from
824  // param->io_buf->eTag. The server actually returns literal
825  // quote-marks, at both ends of the string.
826 
827  aws_iobuf_reset(param->io_buf);
828  aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
829  AWS4C_CHECK( s3_put(param->io_buf, buff) );
830  AWS4C_CHECK_OK( param->io_buf );
831 
832  // if (verbose >= VERBOSE_3) {
833  // printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag);
834  // if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */
835  // fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n",
836  // rank, ETAG_SIZE);
837  // exit(1);
838  // }
839  // }
840 
841  if (verbose >= VERBOSE_3) {
842  fprintf( stdout, "rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n",
843  rank,
844  param->numTasks,
845  (n_to_1 ? "N:1" : "N:N"),
846  (segmented ? "segmented" : "strided"),
847  offset,
848  part_number,
849  param->io_buf->eTag); // incl quote-marks at [0] and [len-1]
850  }
851  if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */
852  fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n",
853  rank, ETAG_SIZE);
854  exit(1);
855  }
856 
857  // save the eTag for later
858  //
859  // memcpy(etag, param->io_buf->eTag +1, strlen(param->io_buf->eTag) -2);
860  // etag[ETAG_SIZE] = 0;
861  aws_iobuf_append(param->etags,
862  param->io_buf->eTag +1,
863  strlen(param->io_buf->eTag) -2);
864  // DEBUGGING
865  if (verbose >= VERBOSE_4) {
866  printf("rank %d: part %d = ETag %s\n", rank, part_number, param->io_buf->eTag);
867  }
868 
869  // drop ptrs to <data_ptr>, in param->io_buf
870  aws_iobuf_reset(param->io_buf);
871  }
872  else { // use EMC's byte-range write-support, instead of MPU
873 
874 
875  // NOTE: You must call 's3_enable_EMC_extensions(1)' for
876  // byte-ranges to work for writes.
877  if (n_to_n)
878  s3_set_byte_range(-1,-1); // EMC header "Range: bytes=-1-" means "append"
879  else
880  s3_set_byte_range(offset, remaining);
881 
882  // For performance, we append <data_ptr> directly into the linked list
883  // of data in param->io_buf. We are "appending" rather than
884  // "extending", so the added buffer is seen as written data, rather
885  // than empty storage.
886  aws_iobuf_reset(param->io_buf);
887  aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
888  AWS4C_CHECK ( s3_put(param->io_buf, file) );
889  AWS4C_CHECK_OK( param->io_buf );
890 
891  // drop ptrs to <data_ptr>, in param->io_buf
892  aws_iobuf_reset(param->io_buf);
893  }
894 
895 
896  if ( param->fsyncPerWrite == TRUE ) {
897  WARN("S3 doesn't support 'fsync'" ); /* does it? */
898  }
899 
900  }
901  else { /* READ or CHECK */
902 
903  if (verbose >= VERBOSE_3) {
904  fprintf( stdout, "rank %d reading from offset %lld\n",
905  rank,
906  param->offset + length - remaining );
907  }
908 
909  // read specific byte-range from the object
910  // [This is included in the "pure" S3 spec.]
911  s3_set_byte_range(offset, remaining);
912 
913  // For performance, we append <data_ptr> directly into the linked
914  // list of data in param->io_buf. In this case (i.e. reading),
915  // we're "extending" rather than "appending". That means the
916  // buffer represents empty storage, which will be filled by the
917  // libcurl writefunction, invoked via aws4c.
918  aws_iobuf_reset(param->io_buf);
919  aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
920  AWS4C_CHECK( s3_get(param->io_buf, file) );
921  if (param->io_buf->code != 206) { /* '206 Partial Content' */
922  snprintf(buff, BUFF_SIZE,
923  "Unexpected result (%d, '%s')",
924  param->io_buf->code, param->io_buf->result);
925  ERR_SIMPLE(buff);
926  }
927 
928  // drop refs to <data_ptr>, in param->io_buf
929  aws_iobuf_reset(param->io_buf);
930  }
931 
932 
933  if (param->verbose >= VERBOSE_2) {
934  printf("<- S3_Xfer\n");
935  }
936  return ( length );
937 }
938 
939 
940 static
942 S3_Xfer(int access,
943  void* file,
944  IOR_size_t* buffer,
945  IOR_offset_t length,
946  IOR_param_t* param ) {
947  S3_Xfer_internal(access, file, buffer, length, param, TRUE);
948 }
949 static
951 EMC_Xfer(int access,
952  void* file,
953  IOR_size_t* buffer,
954  IOR_offset_t length,
955  IOR_param_t* param ) {
956  S3_Xfer_internal(access, file, buffer, length, param, FALSE);
957 }
958 
959 
960 
961 
962 
963 /*
964  * Does this even mean anything, for HTTP/S3 ?
965  *
966  * I believe all interactions with the server are considered complete at
967  * the time we get a response, e.g. from s3_put(). Therefore, fsync is
968  * kind of meaningless, for REST/S3.
969  *
970  * In future, we could extend our interface so as to allow a non-blocking
971  * semantics, for example with the libcurl "multi" interface, and/or by
972  * adding threaded callback handlers to obj_put(). *IF* we do that, *THEN*
973  * we should revisit 'fsync'.
974  *
975  * Another special case is multi-part upload, where many parallel clients
976  * may be writing to the same "file". (It looks like param->filePerProc
977  * would be the flag to check, for this.) Maybe when you called 'fsync',
978  * you meant that you wanted *all* the clients to be complete? That's not
979  * really what fsync would do. In the N:1 case, this is accomplished by
980  * S3_Close(). If you really wanted this behavior from S3_Fsync, we could
981  * have S3_Fsync call S3_close.
982  *
983  * As explained above, we may eventually want to consider the following:
984  *
985  * (1) thread interaction with any handlers that are doing ongoing
986  * interactions with the socket, to make sure they have finished all
987  * actions and gotten responses.
988  *
989  * (2) MPI barrier for all clients involved in a multi-part upload.
990  * Presumably, for IOR, when we are doing N:1, all clients are
991  * involved in that transfer, so this would amount to a barrier on
992  * MPI_COMM_WORLD.
993  */
994 
995 static
996 void
997 S3_Fsync( void *fd, IOR_param_t * param ) {
998  if (param->verbose >= VERBOSE_2) {
999  printf("-> S3_Fsync [no-op]\n");
1000  }
1001 
1002  if (param->verbose >= VERBOSE_2) {
1003  printf("<- S3_Fsync\n");
1004  }
1005 }
1006 
1007 
1008 /*
1009  * It seems the only kind of "close" that ever needs doing for S3 is in the
1010  * case of multi-part upload (i.e. N:1). In this case, all the parties to
1011  * the upload must provide their ETags to a single party (e.g. rank 0 in an
1012  * MPI job). Then the rank doing the closing can generate XML and complete
1013  * the upload.
1014  *
1015  * ISSUE: The S3 spec says that a multi-part upload can have at most 10,000
1016  * parts. Does EMC allow more than this? (NOTE the spec also says
1017  * parts must be at leaast 5MB, but EMC definitely allows smaller
1018  * parts than that.)
1019  *
1020  * ISSUE: All Etags must be sent from a single rank, in a single
1021  * transaction. If the issue above (regarding 10k Etags) is
1022  * resolved by a discovery that EMC supports more than 10k ETags,
1023  * then, for large-enough files (or small-enough transfer-sizes) an
1024  * N:1 write may generate more ETags than the single closing rank
1025  * can hold in memory. In this case, there are several options,
1026  * outlined
1027  *
1028  *
1029 
1030  * See S3_Fsync() for some possible considerations.
1031  */
1032 
1033 static
1034 void
1036  IOR_param_t* param,
1037  int multi_part_upload_p ) {
1038 
1039  char* fname = (char*)fd; /* see NOTE above S3_Create_Or_Open() */
1040 
1041  // easier to think
1042  int n_to_n = param->filePerProc;
1043  int n_to_1 = (! n_to_n);
1044  int segmented = (param->segmentCount == 1);
1045 
1046  if (param->verbose >= VERBOSE_2) {
1047  printf("-> S3_Close('%s', ,%d) %s\n",
1048  fname,
1049  multi_part_upload_p,
1050  ((n_to_n) ? "N:N" : ((segmented) ? "N:1(seg)" : "N:1(str)")));
1051  }
1052 
1053  if (param->open == WRITE) {
1054 
1055 
1056  // finalizing Multi-Part Upload (for N:1 or N:N)
1057  if (multi_part_upload_p) {
1058 
1059 
1060  size_t etag_data_size = param->etags->write_count; /* local ETag data (bytes) */
1061  size_t etags_per_rank = etag_data_size / ETAG_SIZE; /* number of local etags */
1062 
1063  // --- create XML containing ETags in an IOBuf for "close" request
1064  IOBuf* xml = NULL;
1065 
1066 
1067  if (n_to_1) {
1068 
1069  // for N:1, gather all Etags at Rank0
1070  MPI_Datatype mpi_size_t;
1071  if (sizeof(size_t) == sizeof(int))
1072  mpi_size_t = MPI_INT;
1073  else if (sizeof(size_t) == sizeof(long))
1074  mpi_size_t = MPI_LONG;
1075  else
1076  mpi_size_t = MPI_LONG_LONG;
1077 
1078  // Everybody should have the same number of ETags (?)
1079  size_t etag_count_max = 0; /* highest number on any proc */
1080  MPI_Allreduce(&etags_per_rank, &etag_count_max,
1081  1, mpi_size_t, MPI_MAX, param->testComm);
1082  if (etags_per_rank != etag_count_max) {
1083  printf("Rank %d: etag count mismatch: max:%d, mine:%d\n",
1084  rank, etag_count_max, etags_per_rank);
1085  MPI_Abort(param->testComm, 1);
1086  }
1087 
1088  // collect ETag data at Rank0
1089  aws_iobuf_realloc(param->etags); /* force single contiguous buffer */
1090  char* etag_data = param->etags->first->buf; /* per-rank data, contiguous */
1091 
1092  if (rank == 0) {
1093  char* etag_ptr;
1094  int i;
1095  int j;
1096  int rnk;
1097 
1098  char* etag_vec = (char*)malloc((param->numTasks * etag_data_size) +1);
1099  if (! etag_vec) {
1100  fprintf(stderr, "rank 0 failed to malloc %d bytes\n",
1101  param->numTasks * etag_data_size);
1102  MPI_Abort(param->testComm, 1);
1103  }
1104  MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1105  etag_vec, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
1106 
1107  // --- debugging: show the gathered etag data
1108  // (This shows the raw concatenated etag-data from each node.)
1109  if (param->verbose >= VERBOSE_4) {
1110 
1111  printf("rank 0: gathered %d etags from all ranks:\n", etags_per_rank);
1112  etag_ptr=etag_vec;
1113  for (rnk=0; rnk<param->numTasks; ++rnk) {
1114  printf("\t[%d]: '", rnk);
1115 
1116  int ii;
1117  for (ii=0; ii<etag_data_size; ++ii) /* NOT null-terminated! */
1118  printf("%c", etag_ptr[ii]);
1119 
1120  printf("'\n");
1121  etag_ptr += etag_data_size;
1122  }
1123  }
1124 
1125 
1126  // add XML for *all* the parts. The XML must be ordered by
1127  // part-number. Each rank wrote <etags_per_rank> parts,
1128  // locally. At rank0, the etags for each rank are now
1129  // stored as a continguous block of text, with the blocks
1130  // stored in rank order in etag_vec. In other words, our
1131  // internal rep at rank 0 matches the "segmented" format.
1132  // From this, we must select etags in an order matching how
1133  // they appear in the actual object, and give sequential
1134  // part-numbers to the resulting sequence.
1135  //
1136  // That ordering of parts in the actual written object
1137  // varies according to whether we wrote in the "segmented"
1138  // or "strided" format.
1139  //
1140  // supposing N ranks, and P parts per rank:
1141  //
1142  // segmented:
1143  //
1144  // all parts for a given rank are consecutive.
1145  // rank r writes these parts:
1146  //
1147  // rP, rP+1, ... (r+1)P -1
1148  //
1149  // i.e. rank0 writes parts 0,1,2,3 ... P-1
1150  //
1151  //
1152  // strided:
1153  //
1154  // rank r writes every P-th part, starting with r.
1155  //
1156  // r, P+r, ... (P-1)P + r
1157  //
1158  // i.e. rank0 writes parts 0,P,2P,3P ... (P-1)P
1159  //
1160  //
1161  // NOTE: If we knew ahead of time how many parts each rank was
1162  // going to write, we could assign part-number ranges, per
1163  // rank, and then have nice locality here.
1164  //
1165  // Alternatively, we could have everyone format their own
1166  // XML text and send that, instead of just the tags. This
1167  // would increase the amount of data being sent, but would
1168  // reduce the work for rank0 to format everything.
1169 
1170  size_t i_max; // outer-loop
1171  size_t j_max; // inner loop
1172  size_t start_multiplier; // initial offset in collected data
1173  size_t stride; // in etag_vec
1174 
1175  if (segmented) { // segmented
1176  i_max = param->numTasks;
1177  j_max = etags_per_rank;
1178  start_multiplier = etag_data_size; /* one rank's-worth of Etag data */
1179  stride = ETAG_SIZE; /* one ETag */
1180  }
1181  else { // strided
1182  i_max = etags_per_rank;
1183  j_max = param->numTasks;
1184  start_multiplier = ETAG_SIZE; /* one ETag */
1185  stride = etag_data_size; /* one rank's-worth of Etag data */
1186  }
1187 
1188 
1189  xml = aws_iobuf_new();
1190  aws_iobuf_growth_size(xml, 1024 * 8);
1191 
1192  // write XML header ...
1193  aws_iobuf_append_str(xml, "<CompleteMultipartUpload>\n");
1194 
1195  int part = 0;
1196  for (i=0; i<i_max; ++i) {
1197 
1198  etag_ptr=etag_vec + (i * start_multiplier);
1199 
1200  for (j=0; j<j_max; ++j) {
1201 
1202  // etags were saved as contiguous text. Extract the next one.
1203  char etag[ETAG_SIZE +1];
1204  memcpy(etag, etag_ptr, ETAG_SIZE);
1205  etag[ETAG_SIZE] = 0;
1206 
1207  // write XML for next part, with Etag ...
1208  snprintf(buff, BUFF_SIZE,
1209  " <Part>\n"
1210  " <PartNumber>%d</PartNumber>\n"
1211  " <ETag>%s</ETag>\n"
1212  " </Part>\n",
1213  part, etag);
1214 
1215  aws_iobuf_append_str(xml, buff);
1216 
1217  etag_ptr += stride;
1218  ++ part;
1219  }
1220  }
1221 
1222  // write XML tail ...
1223  aws_iobuf_append_str(xml, "</CompleteMultipartUpload>\n");
1224  }
1225 
1226  else {
1227  MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1228  NULL, etag_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
1229  }
1230  }
1231 
1232  else { /* N:N */
1233 
1234  xml = aws_iobuf_new();
1235  aws_iobuf_growth_size(xml, 1024 * 8);
1236 
1237  // write XML header ...
1238  aws_iobuf_append_str(xml, "<CompleteMultipartUpload>\n");
1239 
1240  // all parts of our object were written from this rank.
1241  char etag[ETAG_SIZE +1];
1242  int part = 0;
1243  int i;
1244  for (i=0; i<etags_per_rank; ++i) {
1245 
1246  // TBD: Instead of reading into etag, then sprintf'ing, then
1247  // copying into xml, we could just read directly into xml
1248  int sz = aws_iobuf_get_raw(param->etags, etag, ETAG_SIZE);
1249  if (sz != ETAG_SIZE) {
1250  snprintf(buff, BUFF_SIZE,
1251  "Read of ETag %d had length %d (not %d)\n",
1252  rank, i, sz, ETAG_SIZE);
1253  ERR_SIMPLE(buff);
1254  }
1255  etag[ETAG_SIZE] = 0;
1256 
1257 
1258  // write XML for next part, with Etag ...
1259  snprintf(buff, BUFF_SIZE,
1260  " <Part>\n"
1261  " <PartNumber>%d</PartNumber>\n"
1262  " <ETag>%s</ETag>\n"
1263  " </Part>\n",
1264  part, etag);
1265 
1266  aws_iobuf_append_str(xml, buff);
1267 
1268  ++ part;
1269  }
1270 
1271  // write XML tail ...
1272  aws_iobuf_append_str(xml, "</CompleteMultipartUpload>\n");
1273  }
1274 
1275 
1276 
1277  // send request to finalize MPU
1278  if (n_to_n || (rank == 0)) {
1279 
1280  // DEBUGGING: show the XML we constructed
1281  if (param->verbose >= VERBOSE_3)
1282  debug_iobuf(xml, 1, 1);
1283 
1284  // --- POST our XML to the server.
1285  snprintf(buff, BUFF_SIZE,
1286  "%s?uploadId=%s",
1287  fname, param->UploadId);
1288 
1289  AWS4C_CHECK ( s3_post(xml, buff) );
1290  AWS4C_CHECK_OK( xml );
1291 
1292  aws_iobuf_free(xml);
1293  }
1294 
1295 
1296  // everybody reset MPU info. Allows another MPU, and frees memory.
1297  s3_MPU_reset(param);
1298 
1299  // Everybody meetup, so non-zero ranks won't go trying to stat the
1300  // N:1 file until rank0 has finished the S3 multi-part finalize.
1301  // The object will not appear to exist, until then.
1302  if (n_to_1)
1303  MPI_CHECK(MPI_Barrier(param->testComm), "barrier error");
1304  }
1305  else {
1306 
1307  // No finalization is needed, when using EMC's byte-range writing
1308  // support. However, we do need to make sure everyone has
1309  // finished writing, before anyone starts reading.
1310  if (n_to_1) {
1311  MPI_CHECK(MPI_Barrier(param->testComm), "barrier error");
1312  if (param->verbose >= VERBOSE_2)
1313  printf("rank %d: passed barrier\n", rank);
1314  }
1315  }
1316 
1317  // After writing, reset the CURL connection, so that caches won't be
1318  // used for reads.
1319  aws_reset_connection();
1320  }
1321 
1322 
1323  if (param->verbose >= VERBOSE_2) {
1324  printf("<- S3_Close\n");
1325  }
1326 }
1327 
1328 static
1329 void
1330 S3_Close( void* fd,
1331  IOR_param_t* param ) {
1332  S3_Close_internal(fd, param, TRUE);
1333 }
1334 static
1335 void
1336 EMC_Close( void* fd,
1337  IOR_param_t* param ) {
1338  S3_Close_internal(fd, param, FALSE);
1339 }
1340 
1341 
1342 
1343 
1344 /*
1345  * Delete an object through the S3 interface.
1346  *
1347  * The only reason we separate out EMC version, is because EMC bug means a
1348  * file that was written with appends can't be deleted, recreated, and then
1349  * successfully read.
1350  */
1351 
1352 static
1353 void
1354 S3_Delete( char *testFileName, IOR_param_t * param ) {
1355 
1356  if (param->verbose >= VERBOSE_2) {
1357  printf("-> S3_Delete(%s)\n", testFileName);
1358  }
1359 
1360  /* maybe initialize curl */
1361  s3_connect( param );
1362 
1363 #if 0
1364  // EMC BUG: If file was written with appends, and is deleted,
1365  // Then any future recreation will result in an object that can't be read.
1366  // this
1367  AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1368 #else
1369  // just replace with a zero-length object for now
1370  aws_iobuf_reset(param->io_buf);
1371  AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1372 #endif
1373 
1374  AWS4C_CHECK_OK( param->io_buf );
1375 
1376  if (param->verbose >= VERBOSE_2)
1377  printf("<- S3_Delete\n");
1378 }
1379 
1380 
1381 static
1382 void
1383 EMC_Delete( char *testFileName, IOR_param_t * param ) {
1384 
1385  if (param->verbose >= VERBOSE_2) {
1386  printf("-> EMC_Delete(%s)\n", testFileName);
1387  }
1388 
1389  /* maybe initialize curl */
1390  s3_connect( param );
1391 
1392 #if 0
1393  // EMC BUG: If file was written with appends, and is deleted,
1394  // Then any future recreation will result in an object that can't be read.
1395  // this
1396  AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1397 #else
1398  // just replace with a zero-length object for now
1399  aws_iobuf_reset(param->io_buf);
1400  AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1401 #endif
1402 
1403  AWS4C_CHECK_OK( param->io_buf );
1404 
1405  if (param->verbose >= VERBOSE_2)
1406  printf("<- EMC_Delete\n");
1407 }
1408 
1409 
1410 
1411 
1412 
1413 
1414 /*
1415  * HTTP HEAD returns meta-data for a "file".
1416  *
1417  * QUESTION: What should the <size> parameter be, on a HEAD request? Does
1418  * it matter? We don't know how much data they are going to send, but
1419  * obj_get_callback protects us from overruns. Will someone complain if we
1420  * request more data than the header actually takes?
1421  */
1422 
1423 static
1426  MPI_Comm testComm,
1427  char * testFileName) {
1428 
1429  if (param->verbose >= VERBOSE_2) {
1430  printf("-> S3_GetFileSize(%s)\n", testFileName);
1431  }
1432 
1433  IOR_offset_t aggFileSizeFromStat; /* i.e. "long long int" */
1434  IOR_offset_t tmpMin, tmpMax, tmpSum;
1435 
1436 
1437  /* make sure curl is connected, and inits are done */
1438  s3_connect( param );
1439 
1440  /* send HEAD request. aws4c parses some headers into IOBuf arg. */
1441  AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
1442  if ( ! AWS4C_OK(param->io_buf) ) {
1443  fprintf(stderr, "rank %d: couldn't stat '%s': %s\n",
1444  rank, testFileName, param->io_buf->result);
1445  MPI_Abort(param->testComm, 1);
1446  }
1447  aggFileSizeFromStat = param->io_buf->contentLen;
1448 
1449  if (param->verbose >= VERBOSE_2) {
1450  printf("\trank %d: file-size %llu\n", rank, aggFileSizeFromStat);
1451  }
1452 
1453  if ( param->filePerProc == TRUE ) {
1454  if (param->verbose >= VERBOSE_2) {
1455  printf("\tall-reduce (1)\n");
1456  }
1457  MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1458  &tmpSum, /* sum */
1459  1,
1460  MPI_LONG_LONG_INT,
1461  MPI_SUM,
1462  testComm ),
1463  "cannot total data moved" );
1464 
1465  aggFileSizeFromStat = tmpSum;
1466  }
1467  else {
1468  if (param->verbose >= VERBOSE_2) {
1469  printf("\tall-reduce (2a)\n");
1470  }
1471  MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1472  &tmpMin, /* min */
1473  1,
1474  MPI_LONG_LONG_INT,
1475  MPI_MIN,
1476  testComm ),
1477  "cannot total data moved" );
1478 
1479  if (param->verbose >= VERBOSE_2) {
1480  printf("\tall-reduce (2b)\n");
1481  }
1482  MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat,
1483  &tmpMax, /* max */
1484  1,
1485  MPI_LONG_LONG_INT,
1486  MPI_MAX,
1487  testComm ),
1488  "cannot total data moved" );
1489 
1490  if ( tmpMin != tmpMax ) {
1491  if ( rank == 0 ) {
1492  WARN( "inconsistent file size by different tasks" );
1493  }
1494 
1495  /* incorrect, but now consistent across tasks */
1496  aggFileSizeFromStat = tmpMin;
1497  }
1498  }
1499 
1500  if (param->verbose >= VERBOSE_2) {
1501  printf("<- S3_GetFileSize [%llu]\n", aggFileSizeFromStat);
1502  }
1503  return ( aggFileSizeFromStat );
1504 }
void CURL
Definition: aiori-S3.c:137
static void S3_Fsync(void *, IOR_param_t *)
Definition: aiori-S3.c:997
static void EMC_Close(void *, IOR_param_t *)
Definition: aiori-S3.c:1336
static IOR_offset_t EMC_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
Definition: aiori-S3.c:951
static void * EMC_Create(char *, IOR_param_t *)
Definition: aiori-S3.c:602
#define ERR(MSG)
Definition: iordef.h:184
static void S3_finalize()
Definition: aiori-S3.c:228
IOBuf * io_buf
Definition: aiori-S3.c:141
static void S3_Close(void *, IOR_param_t *)
Definition: aiori-S3.c:1330
int filePerProc
Definition: ior.h:111
#define VERBOSE_3
Definition: iordef.h:104
IOR_offset_t segmentCount
Definition: ior.h:123
CURLcode rc
Definition: aiori-S3.c:121
static void s3_connect(IOR_param_t *param)
Definition: aiori-S3.c:295
IOR_offset_t transferSize
Definition: ior.h:125
static IOR_offset_t S3_GetFileSize(IOR_param_t *, MPI_Comm, char *)
Definition: aiori-S3.c:1425
#define BUFF_SIZE
Definition: aiori-S3.c:116
ior_aiori_t s3_aiori
Definition: aiori-S3.c:170
size_t part_number
Definition: ior.h:184
unsigned int openFlags
Definition: ior.h:88
int fsyncPerWrite
Definition: ior.h:162
#define ERR_SIMPLE(MSG)
Definition: iordef.h:190
#define WRITE
Definition: iordef.h:95
static void EMC_Delete(char *testFileName, IOR_param_t *param)
Definition: aiori-S3.c:1383
ior_aiori_t s3_emc_aiori
Definition: aiori-S3.c:206
MPI_Comm testComm
Definition: ior.h:166
#define IOR_CREAT
Definition: aiori.h:38
#define IOR_EXCL
Definition: aiori.h:40
char * aiori_get_version()
Definition: aiori.c:200
static void * S3_Create_Or_Open_internal(char *testFileName, IOR_param_t *param, unsigned char createFile, int multi_part_upload_p)
Definition: aiori-S3.c:458
static IOR_offset_t S3_Xfer_internal(int access, void *file, IOR_size_t *buffer, IOR_offset_t length, IOR_param_t *param, int multi_part_upload_p)
Definition: aiori-S3.c:735
static void * EMC_Open(char *, IOR_param_t *)
Definition: aiori-S3.c:640
static char buff[BUFF_SIZE]
Definition: aiori-S3.c:117
MPI_Comm testComm
Definition: utilities.c:60
#define IOR_TRUNC
Definition: aiori.h:39
static void S3_init()
Definition: aiori-S3.c:221
const char * bucket_name
Definition: aiori-S3.c:124
int verbose
Definition: ior.h:144
#define MPI_CHECK(MPI_STATUS, MSG)
Definition: iordef.h:224
static int S3_check_params(IOR_param_t *)
Definition: aiori-S3.c:234
static void S3_Close_internal(void *fd, IOR_param_t *param, int multi_part_upload_p)
Definition: aiori-S3.c:1035
Definition: ior.h:48
static void * S3_Open(char *, IOR_param_t *)
Definition: aiori-S3.c:620
static void * S3_Create(char *, IOR_param_t *)
Definition: aiori-S3.c:590
char * UploadId
Definition: ior.h:185
static IOR_param_t param
Definition: mdtest.c:170
#define FALSE
Definition: iordef.h:71
static IOR_offset_t S3_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
Definition: aiori-S3.c:942
static void s3_disconnect(IOR_param_t *param)
Definition: aiori-S3.c:402
long long int IOR_size_t
Definition: iordef.h:123
#define WARN(MSG)
Definition: iordef.h:144
void s3_MPU_reset(IOR_param_t *param)
Definition: aiori-S3.c:420
void IOBuf
Definition: aiori-S3.c:138
int numTasks
Definition: ior.h:99
#define VERBOSE_2
Definition: iordef.h:103
IOR_offset_t offset
Definition: ior.h:126
#define VERBOSE_4
Definition: iordef.h:105
int open
Definition: ior.h:108
static void S3_Delete(char *, IOR_param_t *)
Definition: aiori-S3.c:1354
int verbose
Definition: utilities.c:59
#define IOR_CURL_INIT
Definition: aiori-S3.c:129
char * name
Definition: aiori.h:68
IOBuf * etags
Definition: aiori-S3.c:142
const int ETAG_SIZE
Definition: aiori-S3.c:119
long long int IOR_offset_t
Definition: iordef.h:122
int rank
Definition: utilities.c:57
IOR_offset_t blockSize
Definition: ior.h:124
#define TRUE
Definition: iordef.h:75
#define IOR_CURL_S3_EMC_EXT
Definition: aiori-S3.c:131
ior_aiori_t s3_plus_aiori
Definition: aiori-S3.c:189
#define NULL
Definition: iordef.h:79