IOR
aiori-S3-4c.c
Go to the documentation of this file.
1 /* -*- mode: c; indent-tabs-mode: t; -*-
2  * vim:noexpandtab:
3  *
4  * Editing with tabs allows different users to pick their own indentation
5  * appearance without changing the file.
6  */
7 
8 /*
9  * Copyright (c) 2009, Los Alamos National Security, LLC All rights reserved.
10  * Copyright 2009. Los Alamos National Security, LLC. This software was produced
11  * under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National
12  * Laboratory (LANL), which is operated by Los Alamos National Security, LLC for
13  * the U.S. Department of Energy. The U.S. Government has rights to use,
14  * reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS
15  * ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
16  * ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is
17  * modified to produce derivative works, such modified software should be
18  * clearly marked, so as not to confuse it with the version available from
19  * LANL.
20  *
21  * Additionally, redistribution and use in source and binary forms, with or
22  * without modification, are permitted provided that the following conditions are
23  * met:
24  *
25  * • Redistributions of source code must retain the above copyright notice,
26  * this list of conditions and the following disclaimer.
27  *
28  * • Redistributions in binary form must reproduce the above copyright notice,
29  * this list of conditions and the following disclaimer in the documentation
30  * and/or other materials provided with the distribution.
31  *
32  * • Neither the name of Los Alamos National Security, LLC, Los Alamos National
33  * Laboratory, LANL, the U.S. Government, nor the names of its contributors may be
34  * used to endorse or promote products derived from this software without specific
35  * prior written permission.
36  *
37  * THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS
38  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
39  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
40  * ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
43  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
44  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
45  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
46  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
47  * OF SUCH DAMAGE.
48  */
49 
50 /******************************************************************************
51  *
52  * Implementation of abstract IOR interface, for the Amazon S3 API.
53  * EMC/ViPR supports some useful extensions to S3, which we also implement
54  * here. There are 3 different mixes:
55  *
56  * (1) "Pure S3" uses S3 "Multi-Part Upload" to do N:1 writes. N:N writes
57  * fail, in the case where IOR "transfer-size" differs from
58  * "block-size', because this implies an "append", and append is not
59  * supported in S3. [TBD: The spec also says multi-part upload can't
60  * have any individual part greater than 5MB, or more then 10k total
61  * parts. Failing these conditions may produce obscure errors. Should
62  * we enforce? ]
63  *
64  * --> Select this option with the '-a S3' command-line arg to IOR
65  *
66  *
67  * (2) "EMC S3 Extensions" uses the EMC byte-range support for N:1
68  * writes, eliminating Multi-Part Upload. EMC expects this will
69  * perform better than MPU, and it avoids some problems that are
70  * imposed by the S3 MPU spec. [See comments at EMC_Xfer().]
71  *
72  * --> Select this option with the '-a EMC_S3' command-line arg to IOR
73  *
74  *
75  * NOTE: Putting EMC's S3-extensions in the same file with the S3 API
76  * allows us to share some code that would otherwise be duplicated
77  * (e.g. s3_connect(), etc). This should also help us avoid losing
78  * bug fixes that are discovered in one interface or the other. In
79  * some cases, S3 is incapable of supporting all the needs of IOR.
80  * (For example, see notes about "append", above S3_Xfer().
81  *
82  ******************************************************************************/
83 
84 #ifdef HAVE_CONFIG_H
85 # include "config.h"
86 #endif
87 
88 #include <stdio.h>
89 #include <stdlib.h>
90 #include <string.h> /* strnstr() */
91 
92 #include <errno.h>
93 #include <assert.h>
94 #include <curl/curl.h>
95 
96 #include <libxml/parser.h> // from libxml2
97 #include <libxml/tree.h>
98 
99 #include "aws4c.h" // extended vers of "aws4c" lib for S3 via libcurl
100 #include "aws4c_extra.h" // utilities, e.g. for parsing XML in responses
101 
102 #include "ior.h"
103 #include "aiori.h"
104 #include "aiori-debug.h"
105 
106 extern int rank;
107 extern MPI_Comm testComm;
108 
109 #define BUFF_SIZE 1024
110 const int ETAG_SIZE = 32;
111 CURLcode rc;
112 
113 /* TODO: The following stuff goes into options! */
114 /* REST/S3 variables */
115 // CURL* curl; /* for libcurl "easy" fns (now managed by aws4c) */
116 # define IOR_CURL_INIT 0x01 /* curl top-level inits were performed once? */
117 # define IOR_CURL_NOCONTINUE 0x02
118 # define IOR_CURL_S3_EMC_EXT 0x04 /* allow EMC extensions to S3? */
119 
120 #define MAX_UPLOAD_ID_SIZE 256 /* TODO don't know the actual value */
121 
122 
123 #ifdef USE_S3_4C_AIORI
124 # include <curl/curl.h>
125 # include "aws4c.h"
126 #else
127  typedef void CURL; /* unused, but needs a type */
128  typedef void IOBuf; /* unused, but needs a type */
129 #endif
130 
131 
132 typedef struct {
133  /* Any objects we create or delete will be under this bucket */
134  char* bucket_name;
135  char* user;
136  char* host;
137  /* Runtime data, this data isn't yet safe to allow concurrent access to multiple files, only open one file at a time */
139  IOBuf* io_buf; /* aws4c places parsed header values here */
140  IOBuf* etags; /* accumulate ETags for N:1 parts */
141  size_t part_number;
142  char UploadId[MAX_UPLOAD_ID_SIZE]; /* key for multi-part-uploads */
143  int written; /* did we write to the file */
144 } s3_options_t;
145 
147 
149 
150 static void S3_xfer_hints(aiori_xfer_hint_t * params){
151  hints = params;
152 }
153 
154 /**************************** P R O T O T Y P E S *****************************/
155 static aiori_fd_t* S3_Create(char *path, int iorflags, aiori_mod_opt_t * options);
156 static aiori_fd_t* S3_Open(char *path, int flags, aiori_mod_opt_t * options);
157 static IOR_offset_t S3_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options);
158 static void S3_Close(aiori_fd_t * afd, aiori_mod_opt_t * options);
159 
160 static aiori_fd_t* EMC_Create(char *path, int iorflags, aiori_mod_opt_t * options);
161 static aiori_fd_t* EMC_Open(char *path, int flags, aiori_mod_opt_t * options);
162 static IOR_offset_t EMC_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options);
163 static void EMC_Close(aiori_fd_t * afd, aiori_mod_opt_t * options);
164 
165 static void S3_Delete(char *path, aiori_mod_opt_t * options);
166 static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * options);
167 static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName);
168 static void S3_init(aiori_mod_opt_t * options);
169 static void S3_finalize(aiori_mod_opt_t * options);
170 static int S3_check_params(aiori_mod_opt_t * options);
171 static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
172 
173 /************************** D E C L A R A T I O N S ***************************/
174 
175 // "Pure S3"
176 // N:1 writes use multi-part upload
177 // N:N fails if "transfer-size" != "block-size" (because that requires "append")
179  .name = "S3-4c",
180  .name_legacy = NULL,
181  .create = S3_Create,
182  .open = S3_Open,
183  .xfer = S3_Xfer,
184  .xfer_hints = S3_xfer_hints,
185  .close = S3_Close,
186  .delete = S3_Delete,
187  .get_version = aiori_get_version,
188  .fsync = S3_Fsync,
189  .get_file_size = S3_GetFileSize,
190  .initialize = S3_init,
191  .finalize = S3_finalize,
192  .check_params = S3_check_params,
193  .get_options = S3_options,
194  .enable_mdtest = true
195 };
196 
197 // "S3", plus EMC-extensions enabled
198 // N:1 writes use multi-part upload
199 // N:N succeeds (because EMC-extensions support "append")
201  .name = "S3_plus",
202  .create = S3_Create,
203  .open = S3_Open,
204  .xfer = S3_Xfer,
205  .close = S3_Close,
206  .delete = S3_Delete,
207  .get_version = aiori_get_version,
208  .fsync = S3_Fsync,
209  .get_file_size = S3_GetFileSize,
210  .initialize = S3_init,
211  .finalize = S3_finalize
212 };
213 
214 // Use EMC-extensions for N:1 write, as well
215 // N:1 writes use EMC byte-range
216 // N:N succeeds because EMC-extensions support "append"
218  .name = "S3_EMC",
219  .create = EMC_Create,
220  .open = EMC_Open,
221  .xfer = EMC_Xfer,
222  .close = EMC_Close,
223  .delete = S3_Delete,
224  .get_version = aiori_get_version,
225  .fsync = S3_Fsync,
226  .get_file_size = S3_GetFileSize,
227  .initialize = S3_init,
228  .finalize = S3_finalize
229 };
230 
231 
232 static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
233  s3_options_t * o = malloc(sizeof(s3_options_t));
234  if (init_values != NULL){
235  memcpy(o, init_values, sizeof(s3_options_t));
236  }else{
237  memset(o, 0, sizeof(s3_options_t));
238  }
239 
240  *init_backend_options = (aiori_mod_opt_t*) o;
241  o->bucket_name = "ior";
242 
243  option_help h [] = {
244  {0, "S3-4c.user", "The username (in ~/.awsAuth).", OPTION_OPTIONAL_ARGUMENT, 's', & o->user},
245  {0, "S3-4C.host", "The host optionally followed by:port.", OPTION_OPTIONAL_ARGUMENT, 's', & o->host},
246  {0, "S3-4c.bucket-name", "The name of the bucket.", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_name},
248  };
249  option_help * help = malloc(sizeof(h));
250  memcpy(help, h, sizeof(h));
251  return help;
252 }
253 
254 
255 static void S3_init(aiori_mod_opt_t * options){
256  /* This is supposed to be done before *any* threads are created.
257  * Could MPI_Init() create threads (or call multi-threaded
258  * libraries)? We'll assume so. */
259  AWS4C_CHECK( aws_init() );
260 }
261 
262 static void S3_finalize(aiori_mod_opt_t * options){
263  /* done once per program, after exiting all threads.
264  * NOTE: This fn doesn't return a value that can be checked for success. */
265  aws_cleanup();
266 }
267 
268 static int S3_check_params(aiori_mod_opt_t * test){
269  if(! hints) return 0;
270  /* N:1 and N:N */
271  IOR_offset_t NtoN = hints->filePerProc;
272  IOR_offset_t Nto1 = ! NtoN;
273  IOR_offset_t s = hints->segmentCount;
274  IOR_offset_t t = hints->transferSize;
275  IOR_offset_t b = hints->blockSize;
276 
277  if (Nto1 && (s != 1) && (b != t)) {
278  ERR("N:1 (strided) requires xfer-size == block-size");
279  return 1;
280  }
281 
282  return 0;
283 }
284 
285 /* modelled on similar macros in iordef.h */
286 #define CURL_ERR(MSG, CURL_ERRNO, PARAM) \
287  do { \
288  fprintf(stdout, "ior ERROR: %s: %s (curl-errno=%d) (%s:%d)\n", \
289  MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \
290  __FILE__, __LINE__); \
291  fflush(stdout); \
292  MPI_Abort((PARAM)->testComm, -1); \
293  } while (0)
294 
295 
296 #define CURL_WARN(MSG, CURL_ERRNO) \
297  do { \
298  fprintf(stdout, "ior WARNING: %s: %s (curl-errno=%d) (%s:%d)\n", \
299  MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \
300  __FILE__, __LINE__); \
301  fflush(stdout); \
302  } while (0)
303 
304 
305 /***************************** F U N C T I O N S ******************************/
306 
307 
308 
309 
310 /* ---------------------------------------------------------------------------
311  * "Connect" to an S3 object-file-system. We're really just initializing
312  * libcurl. We need this done before any interactions. It is easy for
313  * ior_aiori.open/create to assure that we connect, if we haven't already
314  * done so. However, there's not a simple way to assure that we
315  * "disconnect" at the end. For now, we'll make a special call at the end
316  * of ior.c
317  *
318  * NOTE: It's okay to call this thing whenever you need to be sure the curl
319  * handle is initialized.
320  *
321  * NOTE: Our custom version of aws4c can be configured so that connections
322  * are reused, instead of opened and closed on every operation. We
323  * do configure it that way, but you still need to call these
324  * connect/disconnect functions, in order to insure that aws4c has
325  * been configured.
326  * ---------------------------------------------------------------------------
327  */
328 
329 
330 static void s3_connect( s3_options_t* param ) {
331  //if (param->verbose >= VERBOSE_2) {
332  // printf("-> s3_connect\n"); /* DEBUGGING */
333  //}
334 
335  if ( param->curl_flags & IOR_CURL_INIT ) {
336  //if (param->verbose >= VERBOSE_2) {
337  // printf("<- s3_connect [nothing to do]\n"); /* DEBUGGING */
338  //}
339  return;
340  }
341 
342  // --- Done once-only (per rank). Perform all first-time inits.
343  //
344  // The aws library requires a config file, as illustrated below. We
345  // assume that the user running the test has an entry in this file,
346  // using their login moniker (i.e. `echo $USER`) as the key, as
347  // suggested in the example:
348  //
349  // <user>:<s3_login_id>:<s3_private_key>
350  //
351  // This file must not be readable by other than user.
352  //
353  // NOTE: These inits could be done in init_IORParam_t(), in ior.c, but
354  // would require conditional compilation, there.
355 
356  aws_set_debug(0); // param->verbose >= 4
357  aws_read_config(param->user); // requires ~/.awsAuth
358  aws_reuse_connections(1);
359 
360  // initialize IOBufs. These are basically dynamically-extensible
361  // linked-lists. "growth size" controls the increment of new memory
362  // allocated, whenever storage is used up.
363  param->io_buf = aws_iobuf_new();
364  aws_iobuf_growth_size(param->io_buf, 1024*1024*1);
365 
366  param->etags = aws_iobuf_new();
367  aws_iobuf_growth_size(param->etags, 1024*1024*8);
368 
369  // WARNING: if you have http_proxy set in your environment, you may need
370  // to override it here. TBD: add a command-line variable to
371  // allow you to define a proxy.
372  //
373  // our hosts are currently 10.140.0.15 - 10.140 0.18
374  // TBD: Try DNS-round-robin server at vi-lb.ccstar.lanl.gov
375  // TBD: try HAProxy round-robin at 10.143.0.1
376 
377 #if 1
378  // snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
379  // s3_set_proxy(buff);
380  //
381  // snprintf(buff, BUFF_SIZE, "10.140.0.%d", 15 + (rank % 4));
382  // s3_set_host(buff);
383 
384  //snprintf(options->buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
385  //s3_set_host(options->buff);
386 
387 #else
388 /*
389  * If you just want to go to one if the ECS nodes, put that IP
390  * address in here directly with port 9020.
391  *
392  */
393 // s3_set_host("10.140.0.15:9020");
394 
395 /*
396  * If you want to go to haproxy.ccstar.lanl.gov, this is its IP
397  * address.
398  *
399  */
400 // s3_set_proxy("10.143.0.1:80");
401 // s3_set_host( "10.143.0.1:80");
402 #endif
403 
404  s3_set_host(param->host);
405 
406  // make sure test-bucket exists
407  s3_set_bucket((char*) param->bucket_name);
408 
409  if (rank == 0) {
410  AWS4C_CHECK( s3_head(param->io_buf, "") );
411  if ( param->io_buf->code == 404 ) { // "404 Not Found"
412  printf(" bucket '%s' doesn't exist\n", param->bucket_name);
413 
414  AWS4C_CHECK( s3_put(param->io_buf, "") ); /* creates URL as bucket + obj */
415  AWS4C_CHECK_OK( param->io_buf ); // assure "200 OK"
416  printf("created bucket '%s'\n", param->bucket_name);
417  }
418  else { // assure "200 OK"
419  AWS4C_CHECK_OK( param->io_buf );
420  }
421  }
422  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
423 
424 
425  // Maybe allow EMC extensions to S3
426  s3_enable_EMC_extensions(param->curl_flags & IOR_CURL_S3_EMC_EXT);
427 
428  // don't perform these inits more than once
429  param->curl_flags |= IOR_CURL_INIT;
430 
431  //if (param->verbose >= VERBOSE_2) {
432  // printf("<- s3_connect [success]\n");
433  //}
434 }
435 
436 static
437 void
439  //if (param->verbose >= VERBOSE_2) {
440  // printf("-> s3_disconnect\n");
441  //}
442  // nothing to do here, if using new aws4c ...
443 
444  //if (param->verbose >= VERBOSE_2) {
445  // printf("<- s3_disconnect\n");
446  //}
447 }
448 
449 
450 
451 // After finalizing an S3 multi-part-upload, you must reset some things
452 // before you can use multi-part-upload again. This will also avoid (one
453 // particular set of) memory-leaks.
455  aws_iobuf_reset(param->io_buf);
456  aws_iobuf_reset(param->etags);
457  param->part_number = 0;
458 }
459 
460 
461 /* ---------------------------------------------------------------------------
462  * direct support for the IOR S3 interface
463  * ---------------------------------------------------------------------------
464  */
465 
466 /*
467  * One doesn't "open" an object, in REST semantics. All we really care
468  * about is whether caller expects the object to have zero-size, when we
469  * return. If so, we conceptually delete it, then recreate it empty.
470  *
471  * ISSUE: If the object is going to receive "appends" (supported in EMC S3
472  * extensions), the object has to exist before the first append
473  * operation. On the other hand, there appears to be a bug in the
474  * EMC implementation, such that if an object ever receives appends,
475  * and then is deleted, and then recreated, the recreated object will
476  * always return "500 Server Error" on GET (whether it has been
477  * appended or not).
478  *
479  * Therefore, a safer thing to do here is write zero-length contents,
480  * instead of deleting.
481  *
482  * NOTE: There's also no file-descriptor to return, in REST semantics. On
483  * the other hand, we keep needing the file *NAME*. Therefore, we
484  * will return the file-name, and let IOR pass it around to our
485  * functions, in place of what IOR understands to be a
486  * file-descriptor.
487  *
488  */
489 
490 static aiori_fd_t * S3_Create_Or_Open_internal(char* testFileName, int openFlags, s3_options_t* param, int multi_part_upload_p ) {
491  unsigned char createFile = openFlags & IOR_CREAT;
492 
493  //if (param->verbose >= VERBOSE_2) {
494  // printf("-> S3_Create_Or_Open('%s', ,%d, %d)\n",
495  // testFileName, createFile, multi_part_upload_p);
496  //}
497 
498  /* initialize curl, if needed */
499  s3_connect( param );
500 
501  /* Check for unsupported flags */
502  //if ( param->openFlags & IOR_EXCL ) {
503  // fprintf( stdout, "Opening in Exclusive mode is not implemented in S3\n" );
504  //}
505  //if ( param->useO_DIRECT == TRUE ) {
506  // fprintf( stdout, "Direct I/O mode is not implemented in S3\n" );
507  //}
508 
509  // easier to think
510  int n_to_n = hints->filePerProc;
511  int n_to_1 = ! n_to_n;
512 
513  /* check whether object needs reset to zero-length */
514  int needs_reset = 0;
515  if (! multi_part_upload_p)
516  needs_reset = 1; /* so "append" can work */
517  else if ( openFlags & IOR_TRUNC )
518  needs_reset = 1; /* so "append" can work */
519  else if (createFile) {
520  // AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
521  // if ( ! AWS4C_OK(param->io_buf) )
522  needs_reset = 1;
523  }
524  char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
525  param->written = 0;
526  if ( openFlags & IOR_WRONLY || openFlags & IOR_RDWR ) {
527  param->written = 1;
528 
529  /* initializations for N:1 or N:N writes using multi-part upload */
530  if (multi_part_upload_p) {
531 
532  // For N:N, all ranks do their own MPU open/close. For N:1, only
533  // rank0 does that. Either way, the response from the server
534  // includes an "uploadId", which must be used to upload parts to
535  // the same object.
536  if ( n_to_n || (rank == 0) ) {
537 
538  // rank0 handles truncate
539  if ( needs_reset) {
540  aws_iobuf_reset(param->io_buf);
541  AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); /* 0-length write */
542  AWS4C_CHECK_OK( param->io_buf );
543  }
544 
545  // POST request with URL+"?uploads" initiates multi-part upload
546  snprintf(buff, BUFF_SIZE, "%s?uploads", testFileName);
547  IOBuf* response = aws_iobuf_new();
548  AWS4C_CHECK( s3_post2(param->io_buf, buff, NULL, response) );
549  AWS4C_CHECK_OK( param->io_buf );
550 
551  // parse XML returned from server, into a tree structure
552  aws_iobuf_realloc(response);
553  xmlDocPtr doc = xmlReadMemory(response->first->buf,
554  response->first->len,
555  NULL, NULL, 0);
556  if (doc == NULL)
557  ERR("Rank0 Failed to find POST response\n");
558 
559  // navigate parsed XML-tree to find UploadId
560  xmlNode* root_element = xmlDocGetRootElement(doc);
561  const char* upload_id = find_element_named(root_element, (char*)"UploadId");
562  if (! upload_id)
563  ERR("couldn't find 'UploadId' in returned XML\n");
564 
565  //if (param->verbose >= VERBOSE_3)
566  // printf("got UploadId = '%s'\n", upload_id);
567 
568  const size_t upload_id_len = strlen(upload_id);
569  if (upload_id_len > MAX_UPLOAD_ID_SIZE) {
570  snprintf(buff, BUFF_SIZE, "UploadId length %zd exceeds expected max (%d)", upload_id_len, MAX_UPLOAD_ID_SIZE);
571  ERR(buff);
572  }
573 
574  // save the UploadId we found
575  memcpy(param->UploadId, upload_id, upload_id_len);
576  param->UploadId[upload_id_len] = 0;
577 
578  // free storage for parsed XML tree
579  xmlFreeDoc(doc);
580  aws_iobuf_free(response);
581 
582  // For N:1, share UploadId across all ranks
583  if (n_to_1)
584  MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, testComm);
585  }
586  else
587  // N:1, and we're not rank0. recv UploadID from Rank 0
588  MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, testComm);
589  }
590 
591  /* initializations for N:N or N:1 writes using EMC byte-range extensions */
592  else {
593  /* maybe reset to zero-length, so "append" can work */
594  if (needs_reset) {
595 
596  if (verbose >= VERBOSE_3) {
597  fprintf( stdout, "rank %d resetting\n",
598  rank);
599  }
600 
601  aws_iobuf_reset(param->io_buf);
602  AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
603  AWS4C_CHECK_OK( param->io_buf );
604  }
605  }
606  }
607 
608  //if (param->verbose >= VERBOSE_2) {
609  // printf("<- S3_Create_Or_Open\n");
610  //}
611  return ((aiori_fd_t *) testFileName );
612 }
613 
614 static aiori_fd_t * S3_Create( char *testFileName, int iorflags, aiori_mod_opt_t * param ) {
615  //if (param->verbose >= VERBOSE_2) {
616  // printf("-> S3_Create\n");
617  //}
618 
619  //if (param->verbose >= VERBOSE_2) {
620  // printf("<- S3_Create\n");
621  //}
622  return S3_Create_Or_Open_internal( testFileName, iorflags, (s3_options_t*) param, TRUE );
623 }
624 
625 static aiori_fd_t * EMC_Create( char *testFileName, int iorflags, aiori_mod_opt_t * param ) {
626  //if (param->verbose >= VERBOSE_2) {
627  // printf("-> EMC_Create\n");
628  //}
629 
630  //if (param->verbose >= VERBOSE_2) {
631  // printf("<- EMC_Create\n");
632  //}
633  return S3_Create_Or_Open_internal( testFileName, iorflags, (s3_options_t*) param, FALSE );
634 }
635 
636 static aiori_fd_t * S3_Open( char *testFileName, int flags, aiori_mod_opt_t * param ) {
637  //if (param->verbose >= VERBOSE_2) {
638  // printf("-> S3_Open\n");
639  //}
640 
641  return S3_Create_Or_Open_internal( testFileName, flags, (s3_options_t*) param, TRUE );
642 }
643 
644 static aiori_fd_t * EMC_Open( char *testFileName, int flags, aiori_mod_opt_t * param ) {
645  //if (param->verbose >= VERBOSE_2) {
646  // printf("-> S3_Open\n");
647  //}
648 
649  return S3_Create_Or_Open_internal( testFileName, flags, (s3_options_t*) param, FALSE );
650 }
651 
652 
653 /*
654  * transfer (more) data to an object. <file> is just the obj name.
655  *
656  * For N:1, param->offset is understood as offset for a given client to
657  * write into the "file". This translates to a byte-range in the HTTP
658  * request. Each write in the N:1 case is treated as a complete "part",
659  * so there is no such thing as a partial write.
660  *
661  * For N:N, when IOR "transfer-size" differs from "block-size", IOR treats
662  * Xfer as a partial write (i.e. there are multiple calls to XFER, to write
663  * any one of the "N" objects, as a series of "append" operations). This
664  * is not supported in S3/REST. Therefore, we depend on an EMC extension,
665  * in this case. This EMC extension allows appends using a byte-range
666  * header spec of "Range: bytes=-1-". aws4c now provides
667  * s3_enable_EMC_extensions(), to allow this behavior. If EMC-extensions
668  * are not enabled, the aws4c library will generate a run-time error, in
669  * this case.
670  *
671  * Each write-request returns an ETag which is a hash of the data. (The
672  * ETag could also be computed directly, if we wanted.) We must save the
673  * etags for later use by S3_close().
674  *
675  * WARNING: "Pure" S3 doesn't allow byte-ranges for writes to an object.
676  * Thus, you also can not append to an object. In the context of IOR,
677  * this causes objects to have only the size of the most-recent write.
678  * Thus, If the IOR "transfer-size" is different from the IOR
679  * "block-size", the files will be smaller than the amount of data
680  * that was written to them.
681  *
682  * EMC does support "append" to an object. In order to allow this,
683  * you must enable the EMC-extensions in the aws4c library, by calling
684  * s3_set_emc_compatibility() with a non-zero argument.
685  *
686  * NOTE: I don't think REST allows us to read/write an amount other than
687  * the size we request. Maybe our callback-handlers (above) could
688  * tell us? For now, this is assuming we only have to send one
689  * request, to transfer any amount of data. (But see above, re EMC
690  * support for "append".)
691  */
692 /* In the EMC case, instead of Multi-Part Upload we can use HTTP
693  * "byte-range" headers to write parts of a single object. This appears to
694  * have several advantages over the S3 MPU spec:
695  *
696  * (a) no need for a special "open" operation, to capture an "UploadID".
697  * Instead we simply write byte-ranges, and the server-side resolves
698  * any races, producing a single winner. In the IOR case, there should
699  * be no races, anyhow.
700  *
701  * (b) individual write operations don't have to refer to an ID, or to
702  * parse and save ETags returned from every write.
703  *
704  * (c) no need for a special "close" operation, in which all the saved
705  * ETags are gathered at a single rank, placed into XML, and shipped to
706  * the server, to finalize the MPU. That special close appears to
707  * impose two scaling problems: (1) requires all ETags to be shipped at
708  * the BW available to a single process, (1) requires either that they
709  * all fit into memory of a single process, or be written to disk
710  * (imposes additional BW constraints), or make a more-complex
711  * interaction with a threaded curl writefunction, to present the
712  * appearance of a single thread to curl, whilst allowing streaming
713  * reception of non-local ETags.
714  *
715  * (d) no constraints on the number or size of individual parts. (These
716  * exist in the S3 spec, the EMC's impl of the S3 multi-part upload is
717  * also free of these constraints.)
718  *
719  * Instead, parallel processes can write any number and/or size of updates,
720  * using a "byte-range" header. After each write returns, that part of the
721  * global object is visible to any reader. Places that are not updated
722  * read as zeros.
723  */
724 
725 
726 static IOR_offset_t S3_Xfer_internal(int access,
727  aiori_fd_t* file,
728  IOR_size_t* buffer,
729  IOR_offset_t length,
731  s3_options_t* param,
732  int multi_part_upload_p ) {
733  //if (param->verbose >= VERBOSE_2) {
734  // printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
735  // access, (char*)file, buffer, length, param);
736  //}
737 
738  char* fname = (char*)file; /* see NOTE above S3_Create_Or_Open() */
739  size_t remaining = (size_t)length;
740  char* data_ptr = (char *)buffer;
741 
742  // easier to think
743  int n_to_n = hints->filePerProc;
744  int n_to_1 = (! n_to_n);
745  int segmented = (hints->segmentCount == 1);
746 
747 
748  if (access == WRITE) { /* WRITE */
749  //if (verbose >= VERBOSE_3) {
750  // fprintf( stdout, "rank %d writing length=%lld to offset %lld\n",
751  // rank,
752  // remaining,
753  // param->offset + length - remaining);
754  //}
755 
756 
757  if (multi_part_upload_p) {
758 
759  // For N:1, part-numbers must have a global ordering for the
760  // components of the final object. param->part_number is
761  // incremented by 1 per write, on each rank. This lets us use it
762  // to compute a global part-numbering.
763  //
764  // In the N:N case, we only need to increment part-numbers within
765  // each rank.
766  //
767  // In the N:1 case, the global order of part-numbers we're writing
768  // depends on whether wer're writing strided or segmented, in
769  // other words, how <offset> and <remaining> are actually
770  // positioning the parts being written. [See discussion at
771  // S3_Close_internal().]
772  //
773  // NOTE: 's3curl.pl --debug' shows StringToSign having partNumber
774  // first, even if I put uploadId first in the URL. Maybe
775  // that's what the server will do. GetStringToSign() in
776  // aws4c is not clever about this, so we spoon-feed args in
777  // the proper order.
778 
779  size_t part_number;
780  if (n_to_1) {
781  if (segmented) { // segmented
782  size_t parts_per_rank = hints->blockSize / hints->transferSize;
783  part_number = (rank * parts_per_rank) + param->part_number;
784  }
785  else // strided
786  part_number = (param->part_number * hints->numTasks) + rank;
787  }
788  else
789  part_number = param->part_number;
790  ++ param->part_number;
791 
792 
793  // if (verbose >= VERBOSE_3) {
794  // fprintf( stdout, "rank %d of %d writing (%s,%s) part_number %lld\n",
795  // rank,
796  // hints->numTasks,
797  // (n_to_1 ? "N:1" : "N:N"),
798  // (segmented ? "segmented" : "strided"),
799  // part_number);
800  // }
801 
802  char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
803  snprintf(buff, BUFF_SIZE,
804  "%s?partNumber=%zd&uploadId=%s",
805  fname, part_number, param->UploadId);
806 
807  // For performance, we append <data_ptr> directly into the linked list
808  // of data in param->io_buf. We are "appending" rather than
809  // "extending", so the added buffer is seen as written data, rather
810  // than empty storage.
811  //
812  // aws4c parses some header-fields automatically for us (into members
813  // of the IOBuf). After s3_put2(), we can just read the etag from
814  // param->io_buf->eTag. The server actually returns literal
815  // quote-marks, at both ends of the string.
816 
817  aws_iobuf_reset(param->io_buf);
818  aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
819  AWS4C_CHECK( s3_put(param->io_buf, buff) );
820  AWS4C_CHECK_OK( param->io_buf );
821 
822  // if (verbose >= VERBOSE_3) {
823  // printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag);
824  // if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */
825  // fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n",
826  // rank, ETAG_SIZE);
827  // exit(1);
828  // }
829  // }
830 
831  //if (verbose >= VERBOSE_3) {
832  // fprintf( stdout, "rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n",
833  // rank,
834  // hints->numTasks,
835  // (n_to_1 ? "N:1" : "N:N"),
836  // (segmented ? "segmented" : "strided"),
837  // offset,
838  // part_number,
839  // param->io_buf->eTag); // incl quote-marks at [0] and [len-1]
840  //}
841  if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */
842  fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n",
843  rank, ETAG_SIZE);
844  exit(1);
845  }
846 
847  // save the eTag for later
848  //
849  // memcpy(etag, param->io_buf->eTag +1, strlen(param->io_buf->eTag) -2);
850  // etag[ETAG_SIZE] = 0;
851  aws_iobuf_append(param->etags,
852  param->io_buf->eTag +1,
853  strlen(param->io_buf->eTag) -2);
854  // DEBUGGING
855  //if (verbose >= VERBOSE_4) {
856  // printf("rank %d: part %d = ETag %s\n", rank, part_number, param->io_buf->eTag);
857  //}
858 
859  // drop ptrs to <data_ptr>, in param->io_buf
860  aws_iobuf_reset(param->io_buf);
861  }
862  else { // use EMC's byte-range write-support, instead of MPU
863 
864 
865  // NOTE: You must call 's3_enable_EMC_extensions(1)' for
866  // byte-ranges to work for writes.
867  if (n_to_n)
868  s3_set_byte_range(-1,-1); // EMC header "Range: bytes=-1-" means "append"
869  else
870  s3_set_byte_range(offset, remaining);
871 
872  // For performance, we append <data_ptr> directly into the linked list
873  // of data in param->io_buf. We are "appending" rather than
874  // "extending", so the added buffer is seen as written data, rather
875  // than empty storage.
876  aws_iobuf_reset(param->io_buf);
877  aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
878  AWS4C_CHECK ( s3_put(param->io_buf, (char*) file) );
879  AWS4C_CHECK_OK( param->io_buf );
880 
881  // drop ptrs to <data_ptr>, in param->io_buf
882  aws_iobuf_reset(param->io_buf);
883  }
884 
885 
886  if ( hints->fsyncPerWrite == TRUE ) {
887  WARN("S3 doesn't support 'fsync'" ); /* does it? */
888  }
889 
890  }
891  else { /* READ or CHECK */
892 
893  //if (verbose >= VERBOSE_3) {
894  // fprintf( stdout, "rank %d reading from offset %lld\n",
895  // rank,
896  // hints->offset + length - remaining );
897  //}
898 
899  // read specific byte-range from the object
900  // [This is included in the "pure" S3 spec.]
901  s3_set_byte_range(offset, remaining);
902 
903  // For performance, we append <data_ptr> directly into the linked
904  // list of data in param->io_buf. In this case (i.e. reading),
905  // we're "extending" rather than "appending". That means the
906  // buffer represents empty storage, which will be filled by the
907  // libcurl writefunction, invoked via aws4c.
908  aws_iobuf_reset(param->io_buf);
909  aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
910  AWS4C_CHECK( s3_get(param->io_buf, (char*) file) );
911  if (param->io_buf->code != 206) { /* '206 Partial Content' */
912  char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
913  snprintf(buff, BUFF_SIZE,
914  "Unexpected result (%d, '%s')",
915  param->io_buf->code, param->io_buf->result);
916  ERR(buff);
917  }
918 
919  // drop refs to <data_ptr>, in param->io_buf
920  aws_iobuf_reset(param->io_buf);
921  }
922 
923  //if (verbose >= VERBOSE_2) {
924  // printf("<- S3_Xfer\n");
925  //}
926  return ( length );
927 }
928 
929 
930 static IOR_offset_t S3_Xfer(int access,
931  aiori_fd_t* file,
932  IOR_size_t* buffer,
933  IOR_offset_t length,
935  aiori_mod_opt_t* param ) {
936  S3_Xfer_internal(access, file, buffer, length, offset, (s3_options_t*) param, TRUE);
937 }
938 
939 
940 static
942 EMC_Xfer(int access,
943  aiori_fd_t* file,
944  IOR_size_t* buffer,
945  IOR_offset_t length,
947  aiori_mod_opt_t* param ) {
948  S3_Xfer_internal(access, file, buffer, length, offset, (s3_options_t*) param, FALSE);
949 }
950 
951 
952 
953 
954 
955 /*
956  * Does this even mean anything, for HTTP/S3 ?
957  *
958  * I believe all interactions with the server are considered complete at
959  * the time we get a response, e.g. from s3_put(). Therefore, fsync is
960  * kind of meaningless, for REST/S3.
961  *
962  * In future, we could extend our interface so as to allow a non-blocking
963  * semantics, for example with the libcurl "multi" interface, and/or by
964  * adding threaded callback handlers to obj_put(). *IF* we do that, *THEN*
965  * we should revisit 'fsync'.
966  *
967  * Another special case is multi-part upload, where many parallel clients
968  * may be writing to the same "file". (It looks like param->filePerProc
969  * would be the flag to check, for this.) Maybe when you called 'fsync',
970  * you meant that you wanted *all* the clients to be complete? That's not
971  * really what fsync would do. In the N:1 case, this is accomplished by
972  * S3_Close(). If you really wanted this behavior from S3_Fsync, we could
973  * have S3_Fsync call S3_close.
974  *
975  * As explained above, we may eventually want to consider the following:
976  *
977  * (1) thread interaction with any handlers that are doing ongoing
978  * interactions with the socket, to make sure they have finished all
979  * actions and gotten responses.
980  *
981  * (2) MPI barrier for all clients involved in a multi-part upload.
982  * Presumably, for IOR, when we are doing N:1, all clients are
983  * involved in that transfer, so this would amount to a barrier on
984  * MPI_COMM_WORLD.
985  */
986 
987 static void S3_Fsync( aiori_fd_t *fd, aiori_mod_opt_t * param ) {
988  //if (param->verbose >= VERBOSE_2) {
989  // printf("-> S3_Fsync [no-op]\n");
990  //}
991 }
992 
993 
994 /*
995  * It seems the only kind of "close" that ever needs doing for S3 is in the
996  * case of multi-part upload (i.e. N:1). In this case, all the parties to
997  * the upload must provide their ETags to a single party (e.g. rank 0 in an
998  * MPI job). Then the rank doing the closing can generate XML and complete
999  * the upload.
1000  *
1001  * ISSUE: The S3 spec says that a multi-part upload can have at most 10,000
1002  * parts. Does EMC allow more than this? (NOTE the spec also says
1003  * parts must be at least 5MB, but EMC definitely allows smaller
1004  * parts than that.)
1005  *
1006  * ISSUE: All Etags must be sent from a single rank, in a single
1007  * transaction. If the issue above (regarding 10k Etags) is
1008  * resolved by a discovery that EMC supports more than 10k ETags,
1009  * then, for large-enough files (or small-enough transfer-sizes) an
1010  * N:1 write may generate more ETags than the single closing rank
1011  * can hold in memory. In this case, there are several options,
1012  * outlined
1013  *
1014  *
1015 
1016  * See S3_Fsync() for some possible considerations.
1017  */
1018 
1019 static void S3_Close_internal(aiori_fd_t* fd, s3_options_t* param, int multi_part_upload_p) {
1020 
1021  char* fname = (char*)fd; /* see NOTE above S3_Create_Or_Open() */
1022 
1023  // easier to think
1024  int n_to_n = hints->filePerProc;
1025  int n_to_1 = (! n_to_n);
1026  int segmented = (hints->segmentCount == 1);
1027 
1028 
1029  if (param->written) {
1030  // finalizing Multi-Part Upload (for N:1 or N:N)
1031  if (multi_part_upload_p) {
1032 
1033 
1034  size_t etag_data_size = param->etags->write_count; /* local ETag data (bytes) */
1035  size_t etags_per_rank = etag_data_size / ETAG_SIZE; /* number of local etags */
1036 
1037  // --- create XML containing ETags in an IOBuf for "close" request
1038  IOBuf* xml = NULL;
1039 
1040 
1041  if (n_to_1) {
1042 
1043  // for N:1, gather all Etags at Rank0
1044  MPI_Datatype mpi_size_t;
1045  if (sizeof(size_t) == sizeof(int))
1046  mpi_size_t = MPI_INT;
1047  else if (sizeof(size_t) == sizeof(long))
1048  mpi_size_t = MPI_LONG;
1049  else
1050  mpi_size_t = MPI_LONG_LONG;
1051 
1052  // Everybody should have the same number of ETags (?)
1053  size_t etag_count_max = 0; /* highest number on any proc */
1054  MPI_Allreduce(&etags_per_rank, &etag_count_max,
1055  1, mpi_size_t, MPI_MAX, testComm);
1056  if (etags_per_rank != etag_count_max) {
1057  printf("Rank %d: etag count mismatch: max:%zd, mine:%zd\n",
1058  rank, etag_count_max, etags_per_rank);
1059  MPI_Abort(testComm, 1);
1060  }
1061 
1062  // collect ETag data at Rank0
1063  aws_iobuf_realloc(param->etags); /* force single contiguous buffer */
1064  char* etag_data = param->etags->first->buf; /* per-rank data, contiguous */
1065 
1066  if (rank == 0) {
1067  char* etag_ptr;
1068  int i;
1069  int j;
1070  int rnk;
1071 
1072  char* etag_vec = (char*)malloc((hints->numTasks * etag_data_size) +1);
1073  if (! etag_vec) {
1074  fprintf(stderr, "rank 0 failed to malloc %zd bytes\n",
1075  hints->numTasks * etag_data_size);
1076  MPI_Abort(testComm, 1);
1077  }
1078  MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1079  etag_vec, etag_data_size, MPI_BYTE, 0, testComm);
1080 
1081  // --- debugging: show the gathered etag data
1082  // (This shows the raw concatenated etag-data from each node.)
1083  if (verbose >= VERBOSE_4) {
1084  printf("rank 0: gathered %zd etags from all ranks:\n", etags_per_rank);
1085  etag_ptr=etag_vec;
1086  for (rnk=0; rnk < hints->numTasks; ++rnk) {
1087  printf("\t[%d]: '", rnk);
1088 
1089  int ii;
1090  for (ii=0; ii < etag_data_size; ++ii) /* NOT null-terminated! */
1091  printf("%c", etag_ptr[ii]);
1092 
1093  printf("'\n");
1094  etag_ptr += etag_data_size;
1095  }
1096  }
1097 
1098 
1099  // add XML for *all* the parts. The XML must be ordered by
1100  // part-number. Each rank wrote <etags_per_rank> parts,
1101  // locally. At rank0, the etags for each rank are now
1102  // stored as a contiguous block of text, with the blocks
1103  // stored in rank order in etag_vec. In other words, our
1104  // internal rep at rank 0 matches the "segmented" format.
1105  // From this, we must select etags in an order matching how
1106  // they appear in the actual object, and give sequential
1107  // part-numbers to the resulting sequence.
1108  //
1109  // That ordering of parts in the actual written object
1110  // varies according to whether we wrote in the "segmented"
1111  // or "strided" format.
1112  //
1113  // supposing N ranks, and P parts per rank:
1114  //
1115  // segmented:
1116  //
1117  // all parts for a given rank are consecutive.
1118  // rank r writes these parts:
1119  //
1120  // rP, rP+1, ... (r+1)P -1
1121  //
1122  // i.e. rank0 writes parts 0,1,2,3 ... P-1
1123  //
1124  //
1125  // strided:
1126  //
1127  // rank r writes every P-th part, starting with r.
1128  //
1129  // r, P+r, ... (P-1)P + r
1130  //
1131  // i.e. rank0 writes parts 0,P,2P,3P ... (P-1)P
1132  //
1133  //
1134  // NOTE: If we knew ahead of time how many parts each rank was
1135  // going to write, we could assign part-number ranges, per
1136  // rank, and then have nice locality here.
1137  //
1138  // Alternatively, we could have everyone format their own
1139  // XML text and send that, instead of just the tags. This
1140  // would increase the amount of data being sent, but would
1141  // reduce the work for rank0 to format everything.
1142 
1143  size_t i_max; // outer-loop
1144  size_t j_max; // inner loop
1145  size_t start_multiplier; // initial offset in collected data
1146  size_t stride; // in etag_vec
1147 
1148  if (segmented) { // segmented
1149  i_max = hints->numTasks;
1150  j_max = etags_per_rank;
1151  start_multiplier = etag_data_size; /* one rank's-worth of Etag data */
1152  stride = ETAG_SIZE; /* one ETag */
1153  }
1154  else { // strided
1155  i_max = etags_per_rank;
1156  j_max = hints->numTasks;
1157  start_multiplier = ETAG_SIZE; /* one ETag */
1158  stride = etag_data_size; /* one rank's-worth of Etag data */
1159  }
1160 
1161 
1162  xml = aws_iobuf_new();
1163  aws_iobuf_growth_size(xml, 1024 * 8);
1164 
1165  // write XML header ...
1166  aws_iobuf_append_str(xml, "<CompleteMultipartUpload>\n");
1167 
1168  int part = 0;
1169  for (i=0; i<i_max; ++i) {
1170 
1171  etag_ptr=etag_vec + (i * start_multiplier);
1172 
1173  for (j=0; j<j_max; ++j) {
1174 
1175  // etags were saved as contiguous text. Extract the next one.
1176  char etag[ETAG_SIZE +1];
1177  memcpy(etag, etag_ptr, ETAG_SIZE);
1178  etag[ETAG_SIZE] = 0;
1179  char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
1180  // write XML for next part, with Etag ...
1181  snprintf(buff, BUFF_SIZE,
1182  " <Part>\n"
1183  " <PartNumber>%d</PartNumber>\n"
1184  " <ETag>%s</ETag>\n"
1185  " </Part>\n",
1186  part, etag);
1187 
1188  aws_iobuf_append_str(xml, buff);
1189 
1190  etag_ptr += stride;
1191  ++ part;
1192  }
1193  }
1194 
1195  // write XML tail ...
1196  aws_iobuf_append_str(xml, "</CompleteMultipartUpload>\n");
1197  } else {
1198  MPI_Gather(etag_data, etag_data_size, MPI_BYTE,
1199  NULL, etag_data_size, MPI_BYTE, 0, testComm);
1200  }
1201  } else { /* N:N */
1202 
1203  xml = aws_iobuf_new();
1204  aws_iobuf_growth_size(xml, 1024 * 8);
1205 
1206  // write XML header ...
1207  aws_iobuf_append_str(xml, "<CompleteMultipartUpload>\n");
1208 
1209  // all parts of our object were written from this rank.
1210  char etag[ETAG_SIZE +1];
1211  int part = 0;
1212  int i;
1213  char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
1214  for (i=0; i<etags_per_rank; ++i) {
1215 
1216  // TBD: Instead of reading into etag, then sprintf'ing, then
1217  // copying into xml, we could just read directly into xml
1218  int sz = aws_iobuf_get_raw(param->etags, etag, ETAG_SIZE);
1219  if (sz != ETAG_SIZE) {
1220  snprintf(buff, BUFF_SIZE,
1221  "Read of ETag %d had length %d (not %d)\n",
1222  i, sz, ETAG_SIZE);
1223  ERR(buff);
1224  }
1225  etag[ETAG_SIZE] = 0;
1226 
1227 
1228  // write XML for next part, with Etag ...
1229  snprintf(buff, BUFF_SIZE,
1230  " <Part>\n"
1231  " <PartNumber>%d</PartNumber>\n"
1232  " <ETag>%s</ETag>\n"
1233  " </Part>\n",
1234  part, etag);
1235 
1236  aws_iobuf_append_str(xml, buff);
1237 
1238  ++ part;
1239  }
1240 
1241  // write XML tail ...
1242  aws_iobuf_append_str(xml, "</CompleteMultipartUpload>\n");
1243  }
1244 
1245  // send request to finalize MPU
1246  if (n_to_n || (rank == 0)) {
1247 
1248  // DEBUGGING: show the XML we constructed
1249  if (verbose >= VERBOSE_3)
1250  debug_iobuf(xml, 1, 1);
1251  char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
1252  // --- POST our XML to the server.
1253  snprintf(buff, BUFF_SIZE,
1254  "%s?uploadId=%s",
1255  fname, param->UploadId);
1256 
1257  AWS4C_CHECK ( s3_post(xml, buff) );
1258  AWS4C_CHECK_OK( xml );
1259 
1260  aws_iobuf_free(xml);
1261  }
1262 
1263 
1264  // everybody reset MPU info. Allows another MPU, and frees memory.
1265  s3_MPU_reset(param);
1266 
1267  // Everybody meetup, so non-zero ranks won't go trying to stat the
1268  // N:1 file until rank0 has finished the S3 multi-part finalize.
1269  // The object will not appear to exist, until then.
1270  if (n_to_1)
1271  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1272  } else {
1273 
1274  // No finalization is needed, when using EMC's byte-range writing
1275  // support. However, we do need to make sure everyone has
1276  // finished writing, before anyone starts reading.
1277  if (n_to_1) {
1278  MPI_CHECK(MPI_Barrier(testComm), "barrier error");
1279  //if (verbose >= VERBOSE_2)
1280  // printf("rank %d: passed barrier\n", rank);
1281  //}
1282  }
1283  }
1284 
1285  // After writing, reset the CURL connection, so that caches won't be
1286  // used for reads.
1287  aws_reset_connection();
1288  }
1289 
1290  //if (param->verbose >= VERBOSE_2) {
1291  // printf("<- S3_Close\n");
1292  //}
1293 }
1294 
1295 static void S3_Close( aiori_fd_t* fd, aiori_mod_opt_t* param ) {
1296  S3_Close_internal(fd, (s3_options_t*) param, TRUE);
1297 }
1298 
1299 static void EMC_Close( aiori_fd_t* fd, aiori_mod_opt_t* param ) {
1300  S3_Close_internal(fd, (s3_options_t*) param, FALSE);
1301 }
1302 
1303 
1304 
1305 
1306 /*
1307  * Delete an object through the S3 interface.
1308  *
1309  * The only reason we separate out EMC version, is because EMC bug means a
1310  * file that was written with appends can't be deleted, recreated, and then
1311  * successfully read.
1312  */
1313 
1314 static void S3_Delete( char *testFileName, aiori_mod_opt_t * options ) {
1315  //if (param->verbose >= VERBOSE_2) {
1316  // printf("-> S3_Delete(%s)\n", testFileName);
1317  //}
1318  /* maybe initialize curl */
1319  s3_options_t * param = (s3_options_t*) options;
1320  s3_connect(param );
1321 
1322 #if 0
1323  // EMC BUG: If file was written with appends, and is deleted,
1324  // Then any future recreation will result in an object that can't be read.
1325  // this
1326  AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1327 #else
1328  // just replace with a zero-length object for now
1329  aws_iobuf_reset(param->io_buf);
1330  AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1331 #endif
1332 
1333  AWS4C_CHECK_OK( param->io_buf );
1334  //if (verbose >= VERBOSE_2)
1335  // printf("<- S3_Delete\n");
1336 }
1337 
1338 
1339 static void EMC_Delete( char *testFileName, aiori_mod_opt_t * options ) {
1340  s3_options_t * param = (s3_options_t*) options;
1341  //if (param->verbose >= VERBOSE_2) {
1342  // printf("-> EMC_Delete(%s)\n", testFileName);
1343  //}
1344 
1345  /* maybe initialize curl */
1346  s3_connect( param );
1347 
1348 #if 0
1349  // EMC BUG: If file was written with appends, and is deleted,
1350  // Then any future recreation will result in an object that can't be read.
1351  // this
1352  AWS4C_CHECK( s3_delete(param->io_buf, testFileName) );
1353 #else
1354  // just replace with a zero-length object for now
1355  aws_iobuf_reset(param->io_buf);
1356  AWS4C_CHECK ( s3_put(param->io_buf, testFileName) );
1357 #endif
1358 
1359  AWS4C_CHECK_OK( param->io_buf );
1360  //if (param->verbose >= VERBOSE_2)
1361  // printf("<- EMC_Delete\n");
1362 }
1363 
1364 /*
1365  * HTTP HEAD returns meta-data for a "file".
1366  *
1367  * QUESTION: What should the <size> parameter be, on a HEAD request? Does
1368  * it matter? We don't know how much data they are going to send, but
1369  * obj_get_callback protects us from overruns. Will someone complain if we
1370  * request more data than the header actually takes?
1371  */
1372 
1373 static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char * testFileName) {
1374  s3_options_t * param = (s3_options_t*) options;
1375  //if (param->verbose >= VERBOSE_2) {
1376  // printf("-> S3_GetFileSize(%s)\n", testFileName);
1377  //}
1378 
1379  IOR_offset_t aggFileSizeFromStat; /* i.e. "long long int" */
1380  IOR_offset_t tmpMin, tmpMax, tmpSum;
1381 
1382 
1383  /* make sure curl is connected, and inits are done */
1384  s3_connect( param );
1385 
1386  /* send HEAD request. aws4c parses some headers into IOBuf arg. */
1387  AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
1388  if ( ! AWS4C_OK(param->io_buf) ) {
1389  fprintf(stderr, "rank %d: couldn't stat '%s': %s\n",
1390  rank, testFileName, param->io_buf->result);
1391  MPI_Abort(testComm, 1);
1392  }
1393  aggFileSizeFromStat = param->io_buf->contentLen;
1394 
1395  return ( aggFileSizeFromStat );
1396 }
ior_aiori_t s3_emc_aiori
Definition: aiori-S3-4c.c:217
const int ETAG_SIZE
Definition: aiori-S3-4c.c:110
static void S3_xfer_hints(aiori_xfer_hint_t *params)
Definition: aiori-S3-4c.c:150
IOBuf * etags
Definition: aiori-S3-4c.c:140
static IOR_offset_t S3_Xfer(int access, aiori_fd_t *afd, IOR_size_t *buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:930
#define IOR_CURL_INIT
Definition: aiori-S3-4c.c:116
#define LAST_OPTION
Definition: option.h:39
IOR_offset_t segmentCount
Definition: aiori.h:71
static void S3_init(aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:255
static void EMC_Close(aiori_fd_t *afd, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:1299
CURLcode rc
Definition: aiori-S3-4c.c:111
void IOBuf
Definition: aiori-S3-4c.c:128
#define VERBOSE_3
Definition: iordef.h:95
MPI_Comm testComm
Definition: utilities.c:71
struct benchmark_options o
Definition: md-workbench.c:128
IOR_offset_t blockSize
Definition: aiori.h:72
size_t part_number
Definition: aiori-S3-4c.c:141
IOBuf * io_buf
Definition: aiori-S3-4c.c:139
static int S3_check_params(aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:268
int rank
Definition: utilities.c:68
static void EMC_Delete(char *testFileName, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:1339
void CURL
Definition: aiori-S3-4c.c:127
ior_aiori_t s3_4c_aiori
Definition: aiori-S3-4c.c:178
static void S3_finalize(aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:262
static IOR_offset_t S3_Xfer_internal(int access, aiori_fd_t *file, IOR_size_t *buffer, IOR_offset_t length, IOR_offset_t offset, s3_options_t *param, int multi_part_upload_p)
Definition: aiori-S3-4c.c:726
#define MPI_CHECK(MPI_STATUS, MSG)
Definition: aiori-debug.h:127
#define WRITE
Definition: iordef.h:86
static void s3_connect(s3_options_t *param)
Definition: aiori-S3-4c.c:330
#define IOR_CREAT
Definition: aiori.h:32
static aiori_fd_t * S3_Open(char *path, int flags, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:636
char * aiori_get_version()
Definition: aiori.c:232
static option_help options[]
Definition: aiori-CEPHFS.c:54
#define IOR_TRUNC
Definition: aiori.h:33
static IOR_offset_t EMC_Xfer(int access, aiori_fd_t *afd, IOR_size_t *buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:942
void s3_MPU_reset(s3_options_t *param)
Definition: aiori-S3-4c.c:454
static void S3_Close(aiori_fd_t *afd, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:1295
#define WARN(MSG)
Definition: aiori-debug.h:32
Definition: ior.h:56
static aiori_fd_t * S3_Create_Or_Open_internal(char *testFileName, int openFlags, s3_options_t *param, int multi_part_upload_p)
Definition: aiori-S3-4c.c:490
IOR_offset_t transferSize
Definition: aiori.h:73
static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t *options, char *testFileName)
Definition: aiori-S3-4c.c:1373
static option_help * S3_options(aiori_mod_opt_t **init_backend_options, aiori_mod_opt_t *init_values)
Definition: aiori-S3-4c.c:232
#define IOR_WRONLY
Definition: aiori.h:29
static void s3_disconnect(s3_options_t *param)
Definition: aiori-S3-4c.c:438
#define FALSE
Definition: iordef.h:62
static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:987
char UploadId[MAX_UPLOAD_ID_SIZE]
Definition: aiori-S3-4c.c:142
#define IOR_CURL_S3_EMC_EXT
Definition: aiori-S3-4c.c:118
long long int IOR_size_t
Definition: iordef.h:110
static aiori_fd_t * EMC_Create(char *path, int iorflags, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:625
static void S3_Delete(char *path, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:1314
static aiori_xfer_hint_t * hints
Definition: aiori-S3-4c.c:148
static void S3_Close_internal(aiori_fd_t *fd, s3_options_t *param, int multi_part_upload_p)
Definition: aiori-S3-4c.c:1019
char * bucket_name
Definition: aiori-S3-4c.c:134
int verbose
Definition: utilities.c:70
#define MAX_UPLOAD_ID_SIZE
Definition: aiori-S3-4c.c:120
#define VERBOSE_4
Definition: iordef.h:96
static aiori_fd_t * S3_Create(char *path, int iorflags, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:614
ior_aiori_t s3_plus_aiori
Definition: aiori-S3-4c.c:200
#define ERR(MSG)
Definition: aiori-debug.h:92
#define IOR_RDWR
Definition: aiori.h:30
#define BUFF_SIZE
Definition: aiori-S3-4c.c:109
int fsyncPerWrite
Definition: aiori.h:70
char * name
Definition: aiori.h:88
int filePerProc
Definition: aiori.h:65
long long int IOR_offset_t
Definition: iordef.h:109
#define TRUE
Definition: iordef.h:66
static aiori_fd_t * EMC_Open(char *path, int flags, aiori_mod_opt_t *options)
Definition: aiori-S3-4c.c:644
#define NULL
Definition: iordef.h:70